KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > adapter > jms > inflow > JmsActivation


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.adapter.jms.inflow;
23
24 import java.lang.reflect.Method JavaDoc;
25
26 import javax.jms.Connection JavaDoc;
27 import javax.jms.Destination JavaDoc;
28 import javax.jms.ExceptionListener JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageListener JavaDoc;
32 import javax.jms.Queue JavaDoc;
33 import javax.jms.QueueConnection JavaDoc;
34 import javax.jms.QueueConnectionFactory JavaDoc;
35 import javax.jms.Topic JavaDoc;
36 import javax.jms.TopicConnection JavaDoc;
37 import javax.jms.TopicConnectionFactory JavaDoc;
38 import javax.jms.XAQueueConnectionFactory JavaDoc;
39 import javax.jms.XATopicConnectionFactory JavaDoc;
40 import javax.naming.Context JavaDoc;
41 import javax.resource.ResourceException JavaDoc;
42 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
43 import javax.resource.spi.work.Work JavaDoc;
44 import javax.resource.spi.work.WorkManager JavaDoc;
45 import javax.transaction.TransactionManager JavaDoc;
46
47 import org.jboss.jms.jndi.JMSProviderAdapter;
48 import org.jboss.logging.Logger;
49 import org.jboss.resource.adapter.jms.JmsResourceAdapter;
50 import org.jboss.tm.TransactionManagerLocator;
51 import org.jboss.util.Strings;
52 import org.jboss.util.naming.Util;
53
54 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
55
56 /**
57  * A generic jms Activation.
58  *
59  * @author <a HREF="adrian@jboss.com">Adrian Brock</a>
60  * @version $Revision: 58487 $
61  */

62 public class JmsActivation implements ExceptionListener JavaDoc
63 {
64    /** The log */
65    private static final Logger log = Logger.getLogger(JmsActivation.class);
66    
67    /** The onMessage method */
68    public static final Method JavaDoc ONMESSAGE;
69    
70    /** The resource adapter */
71    protected JmsResourceAdapter ra;
72    
73    /** The activation spec */
74    protected JmsActivationSpec spec;
75
76    /** The message endpoint factory */
77    protected MessageEndpointFactory JavaDoc endpointFactory;
78    
79    /** Whether delivery is active */
80    protected SynchronizedBoolean deliveryActive;
81    
82    /** The jms provider adapter */
83    protected JMSProviderAdapter adapter;
84    
85    /** The destination */
86    protected Destination JavaDoc destination;
87    
88    /** The connection */
89    protected Connection JavaDoc connection;
90    
91    /** The server session pool */
92    protected JmsServerSessionPool pool;
93    
94    /** Is the delivery transacted */
95    protected boolean isDeliveryTransacted;
96    
97    /** The DLQ handler */
98    protected DLQHandler dlqHandler;
99    
100    /** The TransactionManager */
101    protected TransactionManager JavaDoc tm;
102    
103    
104    static
105    {
106       try
107       {
108          ONMESSAGE = MessageListener JavaDoc.class.getMethod("onMessage", new Class JavaDoc[] { Message JavaDoc.class });
109       }
110       catch (Exception JavaDoc e)
111       {
112          throw new RuntimeException JavaDoc(e);
113       }
114    }
115
116    public JmsActivation(JmsResourceAdapter ra, MessageEndpointFactory JavaDoc endpointFactory, JmsActivationSpec spec) throws ResourceException JavaDoc
117    {
118       this.ra = ra;
119       this.endpointFactory = endpointFactory;
120       this.spec = spec;
121       try
122       {
123          this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE);
124       }
125       catch (Exception JavaDoc e)
126       {
127          throw new ResourceException JavaDoc(e);
128       }
129    }
130
131    /**
132     * @return the activation spec
133     */

134    public JmsActivationSpec getActivationSpec()
135    {
136       return spec;
137    }
138
139    /**
140     * @return the message endpoint factory
141     */

142    public MessageEndpointFactory JavaDoc getMessageEndpointFactory()
143    {
144       return endpointFactory;
145    }
146
147    /**
148     * @return whether delivery is transacted
149     */

150    public boolean isDeliveryTransacted()
151    {
152       return isDeliveryTransacted;
153    }
154
155    /**
156     * @return the work manager
157     */

158    public WorkManager JavaDoc getWorkManager()
159    {
160       return ra.getWorkManager();
161    }
162    
163    public TransactionManager JavaDoc getTransactionManager()
164    {
165       if (tm == null)
166       {
167          tm = TransactionManagerLocator.getInstance().locate();
168
169       }
170
171       return tm;
172    }
173
174    /**
175     * @return the connection
176     */

177    public Connection JavaDoc getConnection()
178    {
179       return connection;
180    }
181
182    /**
183     * @return the destination
184     */

185    public Destination JavaDoc getDestination()
186    {
187       return destination;
188    }
189    
190    /**
191     * @return the provider adapter
192     */

193    public JMSProviderAdapter getProviderAdapter()
194    {
195       return adapter;
196    }
197    
198    /**
199     * @return the dlq handler
200     */

201    public DLQHandler getDLQHandler()
202    {
203       return dlqHandler;
204    }
205    
206    /**
207     * Start the activation
208     *
209     * @throws ResourceException for any error
210     */

211    public void start() throws ResourceException JavaDoc
212    {
213       deliveryActive = new SynchronizedBoolean(true);
214       ra.getWorkManager().scheduleWork(new SetupActivation());
215    }
216
217    /**
218     * Stop the activation
219     */

220    public void stop()
221    {
222       deliveryActive.set(false);
223       teardown();
224    }
225
226    /**
227     * Handles any failure by trying to reconnect
228     */

229    public void handleFailure(Throwable JavaDoc failure)
230    {
231       log.warn("Failure in jms activation " + spec, failure);
232       int reconnectCount = 0;
233       
234       while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
235       {
236          teardown();
237          try
238          {
239             Thread.sleep(spec.getReconnectIntervalLong());
240          }
241          catch (InterruptedException JavaDoc e)
242          {
243             log.debug("Interrupted trying to reconnect " + spec, e);
244             break;
245          }
246
247          log.info("Attempting to reconnect " + spec);
248          try
249          {
250             setup();
251             log.info("Reconnected with messaging provider.");
252             break;
253          }
254          catch (Throwable JavaDoc t)
255          {
256             log.error("Unable to reconnect " + spec, t);
257          }
258          
259          ++reconnectCount;
260
261       }
262    }
263
264    public void onException(JMSException JavaDoc exception)
265    {
266       handleFailure(exception);
267    }
268
269    public String JavaDoc toString()
270    {
271       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
272       buffer.append(Strings.defaultToString(this)).append('(');
273       buffer.append("spec=").append(Strings.defaultToString(spec));
274       buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory));
275       buffer.append(" active=").append(deliveryActive.get());
276       if (destination != null)
277          buffer.append(" destination=").append(destination);
278       if (connection != null)
279          buffer.append(" connection=").append(connection);
280       if (pool != null)
281          buffer.append(" pool=").append(Strings.defaultToString(pool));
282       if (dlqHandler != null)
283          buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler));
284       buffer.append(" transacted=").append(isDeliveryTransacted);
285       buffer.append(')');
286       return buffer.toString();
287    }
288
289    /**
290     * Setup the activation
291     *
292     * @throws Exception for any error
293     */

294    protected void setup() throws Exception JavaDoc
295    {
296       log.debug("Setting up " + spec);
297
298       setupJMSProviderAdapter();
299       Context JavaDoc ctx = adapter.getInitialContext();
300       log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
301       try
302       {
303          setupDLQ(ctx);
304          setupDestination(ctx);
305          setupConnection(ctx);
306       }
307       finally
308       {
309          ctx.close();
310       }
311       setupSessionPool();
312       
313       log.debug("Setup complete " + this);
314    }
315    
316    /**
317     * Teardown the activation
318     */

319    protected void teardown()
320    {
321       log.debug("Tearing down " + spec);
322
323       teardownSessionPool();
324       teardownConnection();
325       teardownDestination();
326       teardownDLQ();
327
328       log.debug("Tearing down complete " + this);
329    }
330
331    /**
332     * Get the jms provider
333     */

334    protected void setupJMSProviderAdapter() throws Exception JavaDoc
335    {
336       String JavaDoc providerAdapterJNDI = spec.getProviderAdapterJNDI();
337       if (providerAdapterJNDI.startsWith("java:") == false)
338          providerAdapterJNDI = "java:" + providerAdapterJNDI;
339
340       log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this);
341       adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class);
342       log.debug("Using jms provider adapter " + adapter + " for " + this);
343    }
344    
345    /**
346     * Setup the DLQ
347     *
348     * @param ctx the naming context
349     * @throws Exception for any error
350     */

351    protected void setupDLQ(Context JavaDoc ctx) throws Exception JavaDoc
352    {
353       if (spec.isUseDLQ())
354       {
355          Class JavaDoc clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
356          dlqHandler = (DLQHandler) clazz.newInstance();
357          dlqHandler.setup(this, ctx);
358       }
359       
360       log.debug("Setup DLQ " + this);
361    }
362    
363    /**
364     * Teardown the DLQ
365     */

366    protected void teardownDLQ()
367    {
368       log.debug("Removing DLQ " + this);
369       try
370       {
371          if (dlqHandler != null)
372             dlqHandler.teardown();
373       }
374       catch (Throwable JavaDoc t)
375       {
376          log.debug("Error tearing down the DLQ " + dlqHandler, t);
377       }
378       dlqHandler = null;
379    }
380    
381    /**
382     * Setup the Destination
383     *
384     * @param ctx the naming context
385     * @throws Exception for any error
386     */

387    protected void setupDestination(Context JavaDoc ctx) throws Exception JavaDoc
388    {
389       Class JavaDoc destinationType;
390       if (spec.isTopic())
391          destinationType = Topic JavaDoc.class;
392       else
393          destinationType = Queue JavaDoc.class;
394
395       String JavaDoc destinationName = spec.getDestination();
396       log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
397       destination = (Destination JavaDoc) Util.lookup(ctx, destinationName, destinationType);
398       log.debug("Got destination " + destination + " from " + destinationName);
399    }
400    
401    /**
402     * Teardown the destination
403     */

404    protected void teardownDestination()
405    {
406    }
407    
408    /**
409     * Setup the Connection
410     *
411     * @param ctx the naming context
412     * @throws Exception for any error
413     */

414    protected void setupConnection(Context JavaDoc ctx) throws Exception JavaDoc
415    {
416       log.debug("setup connection " + this);
417
418       String JavaDoc user = spec.getUser();
419       String JavaDoc pass = spec.getPassword();
420       String JavaDoc clientID = spec.getClientId();
421       if (spec.isTopic())
422          connection = setupTopicConnection(ctx, user, pass, clientID);
423       else
424          connection = setupQueueConnection(ctx, user, pass, clientID);
425       
426       log.debug("established connection " + this);
427    }
428    
429    /**
430     * Setup a Queue Connection
431     *
432     * @param ctx the naming context
433     * @param user the user
434     * @param pass the password
435     * @param clientID the client id
436     * @throws Exception for any error
437     */

438    protected QueueConnection JavaDoc setupQueueConnection(Context JavaDoc ctx, String JavaDoc user, String JavaDoc pass, String JavaDoc clientID) throws Exception JavaDoc
439    {
440       String JavaDoc queueFactoryRef = adapter.getQueueFactoryRef();
441       log.debug("Attempting to lookup queue connection factory " + queueFactoryRef);
442       QueueConnectionFactory JavaDoc qcf = (QueueConnectionFactory JavaDoc) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory JavaDoc.class);
443       log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef);
444       log.debug("Attempting to create queue connection with user " + user);
445       QueueConnection JavaDoc result;
446       if (qcf instanceof XAQueueConnectionFactory JavaDoc && isDeliveryTransacted)
447       {
448          XAQueueConnectionFactory JavaDoc xaqcf = (XAQueueConnectionFactory JavaDoc) qcf;
449          if (user != null)
450             result = xaqcf.createXAQueueConnection(user, pass);
451          else
452             result = xaqcf.createXAQueueConnection();
453       }
454       else
455       {
456          if (user != null)
457             result = qcf.createQueueConnection(user, pass);
458          else
459             result = qcf.createQueueConnection();
460       }
461       if (clientID != null)
462          result.setClientID(clientID);
463       result.setExceptionListener(this);
464       log.debug("Using queue connection " + result);
465       return result;
466    }
467    
468    /**
469     * Setup a Topic Connection
470     *
471     * @param ctx the naming context
472     * @param user the user
473     * @param pass the password
474     * @param clientID the client id
475     * @throws Exception for any error
476     */

477    protected TopicConnection JavaDoc setupTopicConnection(Context JavaDoc ctx, String JavaDoc user, String JavaDoc pass, String JavaDoc clientID) throws Exception JavaDoc
478    {
479       String JavaDoc topicFactoryRef = adapter.getTopicFactoryRef();
480       log.debug("Attempting to lookup topic connection factory " + topicFactoryRef);
481       TopicConnectionFactory JavaDoc tcf = (TopicConnectionFactory JavaDoc) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory JavaDoc.class);
482       log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef);
483       log.debug("Attempting to create topic connection with user " + user);
484       TopicConnection JavaDoc result;
485       if (tcf instanceof XATopicConnectionFactory JavaDoc && isDeliveryTransacted)
486       {
487          XATopicConnectionFactory JavaDoc xatcf = (XATopicConnectionFactory JavaDoc) tcf;
488          if (user != null)
489             result = xatcf.createXATopicConnection(user, pass);
490          else
491             result = xatcf.createXATopicConnection();
492       }
493       else
494       {
495          if (user != null)
496             result = tcf.createTopicConnection(user, pass);
497          else
498             result = tcf.createTopicConnection();
499       }
500       if (clientID != null)
501          result.setClientID(clientID);
502       result.setExceptionListener(this);
503       log.debug("Using topic connection " + result);
504       return result;
505    }
506    
507    /**
508     * Teardown the connection
509     */

510    protected void teardownConnection()
511    {
512       try
513       {
514          if (connection != null)
515          {
516             log.debug("Closing the " + connection);
517             connection.close();
518          }
519       }
520       catch (Throwable JavaDoc t)
521       {
522          log.debug("Error closing the connection " + connection, t);
523       }
524       connection = null;
525    }
526    
527    /**
528     * Setup the server session pool
529     *
530     * @throws Exception for any error
531     */

532    protected void setupSessionPool() throws Exception JavaDoc
533    {
534       pool = new JmsServerSessionPool(this);
535       log.debug("Created session pool " + pool);
536       
537       log.debug("Starting session pool " + pool);
538       pool.start();
539       log.debug("Started session pool " + pool);
540       
541       log.debug("Starting delivery " + connection);
542       connection.start();
543       log.debug("Started delivery " + connection);
544    }
545    
546    /**
547     * Teardown the server session pool
548     */

549    protected void teardownSessionPool()
550    {
551       try
552       {
553          if (connection != null)
554          {
555             log.debug("Stopping delivery " + connection);
556             connection.stop();
557          }
558       }
559       catch (Throwable JavaDoc t)
560       {
561          log.debug("Error stopping delivery " + connection, t);
562       }
563
564       try
565       {
566          if (pool != null)
567          {
568             log.debug("Stopping the session pool " + pool);
569             pool.stop();
570          }
571       }
572       catch (Throwable JavaDoc t)
573       {
574          log.debug("Error clearing the pool " + pool, t);
575       }
576    }
577
578    /**
579     * Handles the setup
580     */

581    private class SetupActivation implements Work JavaDoc
582    {
583       public void run()
584       {
585          try
586          {
587             setup();
588          }
589          catch (Throwable JavaDoc t)
590          {
591             handleFailure(t);
592          }
593       }
594
595       public void release()
596       {
597       }
598    }
599 }
600
Popular Tags