KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > axis > transport > jms > JMSConnector


1 /*
2  * The Apache Software License, Version 1.1
3  *
4  *
5  * Copyright (c) 2001, 2002 The Apache Software Foundation. All rights
6  * reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Apache Software Foundation (http://www.apache.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Axis" and "Apache Software Foundation" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact apache@apache.org.
31  *
32  * 5. Products derived from this software may not be called "Apache",
33  * nor may "Apache" appear in their name, without prior written
34  * permission of the Apache Software Foundation.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
40  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
41  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
42  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
43  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
44  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
45  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
46  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47  * SUCH DAMAGE.
48  * ====================================================================
49  *
50  * This software consists of voluntary contributions made by many
51  * individuals on behalf of the Apache Software Foundation. For more
52  * information on the Apache Software Foundation, please see
53  * <http://www.apache.org/>.
54  */

55
56 package org.jboss.axis.transport.jms;
57
58 import org.jboss.axis.components.jms.JMSVendorAdapter;
59
60 import javax.jms.BytesMessage JavaDoc;
61 import javax.jms.ConnectionFactory JavaDoc;
62 import javax.jms.Destination JavaDoc;
63 import javax.jms.ExceptionListener JavaDoc;
64 import javax.jms.JMSException JavaDoc;
65 import javax.jms.Message JavaDoc;
66 import javax.jms.MessageConsumer JavaDoc;
67 import javax.jms.MessageProducer JavaDoc;
68 import javax.jms.Session JavaDoc;
69 import java.io.ByteArrayOutputStream JavaDoc;
70 import java.util.HashMap JavaDoc;
71 import java.util.Iterator JavaDoc;
72 import java.util.LinkedList JavaDoc;
73 import java.util.Map JavaDoc;
74
75 // No vendor dependent exception classes
76
//import progress.message.client.EUserAlreadyConnected;
77
//import progress.message.jclient.ErrorCodes;
78

79 /**
80  * JMSConnector is an abstract class that encapsulates the work of connecting
81  * to JMS destinations. Its subclasses are TopicConnector and QueueConnector
82  * which further specialize connections to the pub-sub and the ptp domains.
83  * It also implements the capability to retry connections in the event of
84  * failures.
85  *
86  * @author Jaime Meritt (jmeritt@sonicsoftware.com)
87  * @author Richard Chung (rchung@sonicsoftware.com)
88  * @author Dave Chappell (chappell@sonicsoftware.com)
89  */

90 public abstract class JMSConnector
91 {
92    protected int m_numRetries;
93    protected long m_connectRetryInterval;
94    protected long m_interactRetryInterval;
95    protected long m_timeoutTime;
96    protected long m_poolTimeout;
97    protected AsyncConnection m_receiveConnection;
98    protected SyncConnection m_sendConnection;
99    protected int m_numSessions;
100    protected boolean m_allowReceive;
101    protected JMSVendorAdapter m_adapter;
102
103    public JMSConnector(ConnectionFactory JavaDoc connectionFactory,
104                        int numRetries,
105                        int numSessions,
106                        long connectRetryInterval,
107                        long interactRetryInterval,
108                        long timeoutTime,
109                        boolean allowReceive,
110                        String JavaDoc clientID,
111                        String JavaDoc username,
112                        String JavaDoc password,
113                        JMSVendorAdapter adapter)
114            throws JMSException JavaDoc
115    {
116       m_numRetries = numRetries;
117       m_connectRetryInterval = connectRetryInterval;
118       m_interactRetryInterval = interactRetryInterval;
119       m_timeoutTime = timeoutTime;
120       m_poolTimeout = timeoutTime / (long)numRetries;
121       m_numSessions = numSessions;
122       m_allowReceive = allowReceive;
123       m_adapter = adapter;
124
125       // try to connect initially so we can fail fast
126
// in the case of irrecoverable errors.
127
// If we fail in a recoverable fashion we will retry
128
javax.jms.Connection JavaDoc sendConnection = createConnectionWithRetry(connectionFactory,
129               username,
130               password);
131       m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
132               m_numSessions, "SendThread",
133               clientID,
134               username,
135               password);
136
137       m_sendConnection.start();
138
139       if (m_allowReceive)
140       {
141          javax.jms.Connection JavaDoc receiveConnection = createConnectionWithRetry(connectionFactory,
142                  username,
143                  password);
144          m_receiveConnection = createAsyncConnection(connectionFactory,
145                  receiveConnection,
146                  "ReceiveThread",
147                  clientID,
148                  username,
149                  password);
150          m_receiveConnection.start();
151       }
152    }
153
154    protected javax.jms.Connection JavaDoc createConnectionWithRetry(ConnectionFactory JavaDoc connectionFactory,
155                                                             String JavaDoc username,
156                                                             String JavaDoc password)
157            throws JMSException JavaDoc
158    {
159       javax.jms.Connection JavaDoc connection = null;
160       for (int numTries = 1; connection == null; numTries++)
161       {
162          try
163          {
164             connection = internalConnect(connectionFactory, username, password);
165          }
166          catch (JMSException JavaDoc jmse)
167          {
168             if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
169                throw jmse;
170             else
171                try
172                {
173                   Thread.sleep(m_connectRetryInterval);
174                }
175                catch (InterruptedException JavaDoc ie)
176                {
177                }
178             ;
179          }
180       }
181       return connection;
182    }
183
184    public void stop()
185    {
186       m_sendConnection.stopConnection();
187       if (m_allowReceive)
188          m_receiveConnection.stopConnection();
189    }
190
191    public void start()
192    {
193       m_sendConnection.startConnection();
194       if (m_allowReceive)
195          m_receiveConnection.startConnection();
196    }
197
198    public void shutdown()
199    {
200       m_sendConnection.shutdown();
201       if (m_allowReceive)
202          m_receiveConnection.shutdown();
203    }
204
205    public abstract JMSEndpoint createEndpoint(String JavaDoc destinationName)
206            throws JMSException JavaDoc;
207
208    public abstract JMSEndpoint createEndpoint(Destination JavaDoc destination)
209            throws JMSException JavaDoc;
210
211
212    protected abstract javax.jms.Connection JavaDoc internalConnect(ConnectionFactory JavaDoc connectionFactory,
213                                                            String JavaDoc username,
214                                                            String JavaDoc password)
215            throws JMSException JavaDoc;
216
217    private abstract class Connection extends Thread JavaDoc implements ExceptionListener JavaDoc
218    {
219       private ConnectionFactory JavaDoc m_connectionFactory;
220       protected javax.jms.Connection JavaDoc m_connection;
221
222       protected boolean m_isActive;
223       private boolean m_needsToConnect;
224       private boolean m_startConnection;
225       private String JavaDoc m_clientID;
226       private String JavaDoc m_username;
227       private String JavaDoc m_password;
228
229       private Object JavaDoc m_jmsLock;
230       private Object JavaDoc m_lifecycleLock;
231
232
233       protected Connection(ConnectionFactory JavaDoc connectionFactory,
234                            javax.jms.Connection JavaDoc connection,
235                            String JavaDoc threadName,
236                            String JavaDoc clientID,
237                            String JavaDoc username,
238                            String JavaDoc password)
239               throws JMSException JavaDoc
240       {
241          super(threadName);
242          m_connectionFactory = connectionFactory;
243
244          m_clientID = clientID;
245          m_username = username;
246          m_password = password;
247
248          m_jmsLock = new Object JavaDoc();
249          m_lifecycleLock = new Object JavaDoc();
250
251          if (connection != null)
252          {
253             m_needsToConnect = false;
254             m_connection = connection;
255             m_connection.setExceptionListener(this);
256             if (m_clientID != null)
257                m_connection.setClientID(m_clientID);
258          }
259          else
260          {
261             m_needsToConnect = true;
262          }
263
264          m_isActive = true;
265       }
266
267       /**
268        * @todo handle non-recoverable errors
269        */

270
271       public void run()
272       {
273          // loop until a connection is made and when a connection is made (re)establish
274
// any subscriptions
275
while (m_isActive)
276          {
277             if (m_needsToConnect)
278             {
279                m_connection = null;
280                try
281                {
282                   m_connection = internalConnect(m_connectionFactory,
283                           m_username, m_password);
284                   m_connection.setExceptionListener(this);
285                   if (m_clientID != null)
286                      m_connection.setClientID(m_clientID);
287                }
288                catch (JMSException JavaDoc e)
289                {
290                   // simply backoff for a while and then retry
291
try
292                   {
293                      Thread.sleep(m_connectRetryInterval);
294                   }
295                   catch (InterruptedException JavaDoc ie)
296                   {
297                   }
298                   continue;
299                }
300             }
301             else
302                m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
303
// we lost the connection
304

305             // we now have a valid connection so establish some context
306
try
307             {
308                internalOnConnect();
309             }
310             catch (Exception JavaDoc e)
311             {
312                // insert code to handle non recoverable errors
313
// simply retry
314
continue;
315             }
316
317             synchronized (m_jmsLock)
318             {
319                try
320                {
321                   m_jmsLock.wait();
322                }
323                catch (InterruptedException JavaDoc ie)
324                {
325                } // until notified due to some change in status
326
}
327          }
328
329          // no longer staying connected, so see what we can cleanup
330
internalOnShutdown();
331       }
332
333
334       void startConnection()
335       {
336          synchronized (m_lifecycleLock)
337          {
338             if (m_startConnection)
339                return;
340             m_startConnection = true;
341             try
342             {
343                m_connection.start();
344             }
345             catch (Throwable JavaDoc e)
346             {
347             } // ignore
348
}
349       }
350
351       void stopConnection()
352       {
353          synchronized (m_lifecycleLock)
354          {
355             if (!m_startConnection)
356                return;
357             m_startConnection = false;
358             try
359             {
360                m_connection.stop();
361             }
362             catch (Throwable JavaDoc e)
363             {
364             } // ignore
365
}
366       }
367
368       void shutdown()
369       {
370          m_isActive = false;
371          synchronized (m_jmsLock)
372          {
373             m_jmsLock.notifyAll();
374          }
375       }
376
377
378       public void onException(JMSException JavaDoc exception)
379       {
380          if (m_adapter.isRecoverable(exception,
381                  JMSVendorAdapter.ON_EXCEPTION_ACTION))
382             return;
383          onException();
384          synchronized (m_jmsLock)
385          {
386             m_jmsLock.notifyAll();
387          }
388       }
389
390       private final void internalOnConnect()
391               throws Exception JavaDoc
392       {
393          onConnect();
394          synchronized (m_lifecycleLock)
395          {
396             if (m_startConnection)
397             {
398                try
399                {
400                   m_connection.start();
401                }
402                catch (Throwable JavaDoc e)
403                {
404                } // ignore
405
}
406          }
407       }
408
409       private final void internalOnShutdown()
410       {
411          stopConnection();
412          onShutdown();
413          try
414          {
415             m_connection.close();
416          }
417          catch (Throwable JavaDoc e)
418          {
419          } // ignore
420
}
421
422       protected abstract void onConnect() throws Exception JavaDoc;
423
424       protected abstract void onShutdown();
425
426       protected abstract void onException();
427    }
428
429    protected abstract SyncConnection createSyncConnection(ConnectionFactory JavaDoc factory,
430                                                           javax.jms.Connection JavaDoc connection,
431                                                           int numSessions,
432                                                           String JavaDoc threadName,
433                                                           String JavaDoc clientID,
434                                                           String JavaDoc username,
435                                                           String JavaDoc password)
436
437            throws JMSException JavaDoc;
438
439    SyncConnection getSendConnection()
440    {
441       return m_sendConnection;
442    }
443
444    protected abstract class SyncConnection extends Connection
445    {
446       LinkedList JavaDoc m_senders;
447       int m_numSessions;
448       Object JavaDoc m_senderLock;
449
450       SyncConnection(ConnectionFactory JavaDoc connectionFactory,
451                      javax.jms.Connection JavaDoc connection,
452                      int numSessions,
453                      String JavaDoc threadName,
454                      String JavaDoc clientID,
455                      String JavaDoc username,
456                      String JavaDoc password)
457               throws JMSException JavaDoc
458       {
459          super(connectionFactory, connection, threadName,
460                  clientID, username, password);
461          m_senders = new LinkedList JavaDoc();
462          m_numSessions = numSessions;
463          m_senderLock = new Object JavaDoc();
464       }
465
466       protected abstract SendSession createSendSession(javax.jms.Connection JavaDoc connection)
467               throws JMSException JavaDoc;
468
469       protected void onConnect()
470               throws JMSException JavaDoc
471       {
472          synchronized (m_senderLock)
473          {
474             for (int i = 0; i < m_numSessions; i++)
475             {
476                m_senders.add(createSendSession(m_connection));
477             }
478             m_senderLock.notifyAll();
479          }
480       }
481
482       byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap JavaDoc properties)
483               throws Exception JavaDoc
484       {
485          long timeoutTime = System.currentTimeMillis() + timeout;
486          while (true)
487          {
488             if (System.currentTimeMillis() > timeoutTime)
489             {
490                throw new InvokeTimeoutException("Unable to complete call in time allotted");
491             }
492
493             SendSession sendSession = null;
494             try
495             {
496                sendSession = getSessionFromPool(m_poolTimeout);
497                byte[] response = sendSession.call(endpoint,
498                        message,
499                        timeoutTime - System.currentTimeMillis(),
500                        properties);
501                returnSessionToPool(sendSession);
502                if (response == null)
503                {
504                   throw new InvokeTimeoutException("Unable to complete call in time allotted");
505                }
506                return response;
507             }
508             catch (JMSException JavaDoc jmse)
509             {
510                if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION))
511                {
512                   //this we cannot recover from
513
//but it does not invalidate the session
514
returnSessionToPool(sendSession);
515                   throw jmse;
516                }
517
518                //for now we will assume this is a reconnect related issue
519
//and let the sender be collected
520
//give the reconnect thread a chance to fill the pool
521
Thread.yield();
522                continue;
523             }
524             catch (NullPointerException JavaDoc npe)
525             {
526                Thread.yield();
527                continue;
528             }
529          }
530       }
531
532       /** @todo add in handling for security exceptions
533        * @todo add support for timeouts */

534       void send(JMSEndpoint endpoint, byte[] message, HashMap JavaDoc properties)
535               throws Exception JavaDoc
536       {
537          long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
538          while (true)
539          {
540             if (System.currentTimeMillis() > timeoutTime)
541             {
542                throw new InvokeTimeoutException("Cannot complete send in time allotted");
543             }
544
545             SendSession sendSession = null;
546             try
547             {
548                sendSession = getSessionFromPool(m_poolTimeout);
549                sendSession.send(endpoint, message, properties);
550                returnSessionToPool(sendSession);
551             }
552             catch (JMSException JavaDoc jmse)
553             {
554                if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION))
555                {
556                   //this we cannot recover from
557
//but it does not invalidate the session
558
returnSessionToPool(sendSession);
559                   throw jmse;
560                }
561                //for now we will assume this is a reconnect related issue
562
//and let the sender be collected
563
//give the reconnect thread a chance to fill the pool
564
Thread.yield();
565                continue;
566             }
567             catch (NullPointerException JavaDoc npe)
568             {
569                //give the reconnect thread a chance to fill the pool
570
Thread.yield();
571                continue;
572             }
573             break;
574          }
575       }
576
577       protected void onException()
578       {
579          synchronized (m_senderLock)
580          {
581             m_senders.clear();
582          }
583       }
584
585       protected void onShutdown()
586       {
587          synchronized (m_senderLock)
588          {
589             Iterator JavaDoc senders = m_senders.iterator();
590             while (senders.hasNext())
591             {
592                SendSession session = (SendSession)senders.next();
593                session.cleanup();
594             }
595             m_senders.clear();
596          }
597       }
598
599       private SendSession getSessionFromPool(long timeout)
600       {
601          synchronized (m_senderLock)
602          {
603             while (m_senders.size() == 0)
604             {
605                try
606                {
607                   m_senderLock.wait(timeout);
608                   if (m_senders.size() == 0)
609                   {
610                      return null;
611                   }
612                }
613                catch (InterruptedException JavaDoc ignore)
614                {
615                   return null;
616                }
617             }
618             return (SendSession)m_senders.removeFirst();
619          }
620       }
621
622       private void returnSessionToPool(SendSession sendSession)
623       {
624          synchronized (m_senderLock)
625          {
626             m_senders.addLast(sendSession);
627             m_senderLock.notifyAll();
628          }
629       }
630
631       protected abstract class SendSession extends ConnectorSession
632       {
633          MessageProducer JavaDoc m_producer;
634
635          SendSession(Session JavaDoc session,
636                      MessageProducer JavaDoc producer)
637                  throws JMSException JavaDoc
638          {
639             super(session);
640             m_producer = producer;
641          }
642
643          protected abstract Destination JavaDoc createTemporaryDestination()
644                  throws JMSException JavaDoc;
645
646          protected abstract void deleteTemporaryDestination(Destination JavaDoc destination)
647                  throws JMSException JavaDoc;
648
649          protected abstract MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination)
650                  throws JMSException JavaDoc;
651
652          protected abstract void send(Destination JavaDoc destination,
653                                       Message JavaDoc message,
654                                       int deliveryMode,
655                                       int priority,
656                                       long timeToLive)
657                  throws JMSException JavaDoc;
658
659          void send(JMSEndpoint endpoint, byte[] message, HashMap JavaDoc properties)
660                  throws Exception JavaDoc
661          {
662             BytesMessage JavaDoc jmsMessage = m_session.createBytesMessage();
663             jmsMessage.writeBytes(message);
664             int deliveryMode = extractDeliveryMode(properties);
665             int priority = extractPriority(properties);
666             long timeToLive = extractTimeToLive(properties);
667
668             if (properties != null && !properties.isEmpty())
669                setProperties(properties, jmsMessage);
670
671             send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
672                     priority, timeToLive);
673          }
674
675
676          void cleanup()
677          {
678             try
679             {
680                m_producer.close();
681             }
682             catch (Throwable JavaDoc t)
683             {
684             }
685             try
686             {
687                m_session.close();
688             }
689             catch (Throwable JavaDoc t)
690             {
691             }
692          }
693
694          byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
695                      HashMap JavaDoc properties)
696                  throws Exception JavaDoc
697          {
698             Destination JavaDoc reply = createTemporaryDestination();
699             MessageConsumer JavaDoc subscriber = createConsumer(reply);
700             BytesMessage JavaDoc jmsMessage = m_session.createBytesMessage();
701             jmsMessage.writeBytes(message);
702             jmsMessage.setJMSReplyTo(reply);
703
704             int deliveryMode = extractDeliveryMode(properties);
705             int priority = extractPriority(properties);
706             long timeToLive = extractTimeToLive(properties);
707
708             if (properties != null && !properties.isEmpty())
709                setProperties(properties, jmsMessage);
710
711             send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
712                     priority, timeToLive);
713             BytesMessage JavaDoc response = null;
714             try
715             {
716                response = (BytesMessage JavaDoc)subscriber.receive(timeout);
717             }
718             catch (ClassCastException JavaDoc cce)
719             {
720                throw new InvokeException
721                        ("Error: unexpected message type received - expected BytesMessage");
722             }
723             byte[] respBytes = null;
724             if (response != null)
725             {
726                byte[] buffer = new byte[8 * 1024];
727                ByteArrayOutputStream JavaDoc out = new ByteArrayOutputStream JavaDoc();
728                for (int bytesRead = response.readBytes(buffer);
729                     bytesRead != -1; bytesRead = response.readBytes(buffer))
730                {
731                   out.write(buffer, 0, bytesRead);
732                }
733                respBytes = out.toByteArray();
734             }
735             subscriber.close();
736             deleteTemporaryDestination(reply);
737             return respBytes;
738          }
739
740          private int extractPriority(HashMap JavaDoc properties)
741          {
742             return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
743                     JMSConstants.DEFAULT_PRIORITY);
744          }
745
746          private int extractDeliveryMode(HashMap JavaDoc properties)
747          {
748             return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
749                     JMSConstants.DEFAULT_DELIVERY_MODE);
750          }
751
752          private long extractTimeToLive(HashMap JavaDoc properties)
753          {
754             return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
755                     JMSConstants.DEFAULT_TIME_TO_LIVE);
756          }
757
758          private void setProperties(HashMap JavaDoc properties, Message JavaDoc message)
759                  throws JMSException JavaDoc
760          {
761             Iterator JavaDoc propertyIter = properties.entrySet().iterator();
762             while (propertyIter.hasNext())
763             {
764                Map.Entry JavaDoc property = (Map.Entry JavaDoc)propertyIter.next();
765                setProperty((String JavaDoc)property.getKey(), property.getValue(),
766                        message);
767             }
768          }
769
770          private void setProperty(String JavaDoc property, Object JavaDoc value, Message JavaDoc message)
771                  throws JMSException JavaDoc
772          {
773             if (property == null)
774                return;
775             if (property.equals(JMSConstants.JMS_CORRELATION_ID))
776                message.setJMSCorrelationID((String JavaDoc)value);
777             else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
778                message.setJMSCorrelationIDAsBytes((byte[])value);
779             else if (property.equals(JMSConstants.JMS_TYPE))
780                message.setJMSType((String JavaDoc)value);
781             else
782                message.setObjectProperty(property, value);
783          }
784       }
785    }
786
787    AsyncConnection getReceiveConnection()
788    {
789       return m_receiveConnection;
790    }
791
792    protected abstract AsyncConnection createAsyncConnection(ConnectionFactory JavaDoc factory,
793                                                             javax.jms.Connection JavaDoc connection,
794                                                             String JavaDoc threadName,
795                                                             String JavaDoc clientID,
796                                                             String JavaDoc username,
797                                                             String JavaDoc password)
798
799            throws JMSException JavaDoc;
800
801    protected abstract class AsyncConnection extends Connection
802    {
803       HashMap JavaDoc m_subscriptions;
804       Object JavaDoc m_subscriptionLock;
805
806       protected AsyncConnection(ConnectionFactory JavaDoc connectionFactory,
807                                 javax.jms.Connection JavaDoc connection,
808                                 String JavaDoc threadName,
809                                 String JavaDoc clientID,
810                                 String JavaDoc username,
811                                 String JavaDoc password)
812               throws JMSException JavaDoc
813       {
814          super(connectionFactory, connection, threadName,
815                  clientID, username, password);
816          m_subscriptions = new HashMap JavaDoc();
817          m_subscriptionLock = new Object JavaDoc();
818       }
819
820       protected abstract ListenerSession createListenerSession(javax.jms.Connection JavaDoc connection,
821                                                                Subscription subscription)
822               throws Exception JavaDoc;
823
824       protected void onShutdown()
825       {
826          synchronized (m_subscriptionLock)
827          {
828             Iterator JavaDoc subscriptions = m_subscriptions.keySet().iterator();
829             while (subscriptions.hasNext())
830             {
831                Subscription subscription = (Subscription)subscriptions.next();
832                ListenerSession session = (ListenerSession)
833                        m_subscriptions.get(subscription);
834                if (session != null)
835                {
836                   session.cleanup();
837                }
838
839             }
840             m_subscriptions.clear();
841          }
842       }
843
844       /**
845        * @todo add in security exception propagation
846        * @param subscription
847        */

848       void subscribe(Subscription subscription)
849               throws Exception JavaDoc
850       {
851          long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
852          synchronized (m_subscriptionLock)
853          {
854             if (m_subscriptions.containsKey(subscription))
855                return;
856             while (true)
857             {
858                if (System.currentTimeMillis() > timeoutTime)
859                {
860                   throw new InvokeTimeoutException("Cannot subscribe listener");
861                }
862
863                try
864                {
865                   ListenerSession session = createListenerSession(m_connection,
866                           subscription);
867                   m_subscriptions.put(subscription, session);
868                   break;
869                }
870                catch (JMSException JavaDoc jmse)
871                {
872                   if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION))
873                   {
874                      throw jmse;
875                   }
876
877                   try
878                   {
879                      m_subscriptionLock.wait(m_interactRetryInterval);
880                   }
881                   catch (InterruptedException JavaDoc ignore)
882                   {
883                   }
884                   //give reconnect a chance
885
Thread.yield();
886                   continue;
887                }
888                catch (NullPointerException JavaDoc jmse)
889                {
890                   //we ARE reconnecting
891
try
892                   {
893                      m_subscriptionLock.wait(m_interactRetryInterval);
894                   }
895                   catch (InterruptedException JavaDoc ignore)
896                   {
897                   }
898                   //give reconnect a chance
899
Thread.yield();
900                   continue;
901                }
902             }
903          }
904       }
905
906       void unsubscribe(Subscription subscription)
907       {
908          long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
909          synchronized (m_subscriptionLock)
910          {
911             if (!m_subscriptions.containsKey(subscription))
912                return;
913             while (true)
914             {
915                if (System.currentTimeMillis() > timeoutTime)
916                {
917                   throw new InvokeTimeoutException("Cannot unsubscribe listener");
918                }
919
920                //give reconnect a chance
921
Thread.yield();
922                try
923                {
924                   ListenerSession session = (ListenerSession)
925                           m_subscriptions.get(subscription);
926                   session.cleanup();
927                   m_subscriptions.remove(subscription);
928                   break;
929                }
930                catch (NullPointerException JavaDoc jmse)
931                {
932                   //we are reconnecting
933
try
934                   {
935                      m_subscriptionLock.wait(m_interactRetryInterval);
936                   }
937                   catch (InterruptedException JavaDoc ignore)
938                   {
939                   }
940                   continue;
941                }
942             }
943          }
944       }
945
946       protected void onConnect()
947               throws Exception JavaDoc
948       {
949          synchronized (m_subscriptionLock)
950          {
951             Iterator JavaDoc subscriptions = m_subscriptions.keySet().iterator();
952             while (subscriptions.hasNext())
953             {
954                Subscription subscription = (Subscription)subscriptions.next();
955
956                if (m_subscriptions.get(subscription) == null)
957                {
958                   m_subscriptions.put(subscription,
959                           createListenerSession(m_connection, subscription));
960                }
961             }
962             m_subscriptionLock.notifyAll();
963          }
964       }
965
966       protected void onException()
967       {
968          synchronized (m_subscriptionLock)
969          {
970             Iterator JavaDoc subscriptions = m_subscriptions.keySet().iterator();
971             while (subscriptions.hasNext())
972             {
973                Subscription subscription = (Subscription)subscriptions.next();
974                m_subscriptions.put(subscription, null);
975             }
976          }
977       }
978
979
980       protected class ListenerSession extends ConnectorSession
981       {
982          protected MessageConsumer JavaDoc m_consumer;
983          protected Subscription m_subscription;
984
985          ListenerSession(Session JavaDoc session,
986                          MessageConsumer JavaDoc consumer,
987                          Subscription subscription)
988                  throws Exception JavaDoc
989          {
990             super(session);
991             m_subscription = subscription;
992             m_consumer = consumer;
993             Destination JavaDoc destination = subscription.m_endpoint.getDestination(m_session);
994             m_consumer.setMessageListener(subscription.m_listener);
995          }
996
997          void cleanup()
998          {
999             try
1000            {
1001               m_consumer.close();
1002            }
1003            catch (Exception JavaDoc ignore)
1004            {
1005            }
1006            try
1007            {
1008               m_session.close();
1009            }
1010            catch (Exception JavaDoc ignore)
1011            {
1012            }
1013         }
1014
1015      }
1016   }
1017
1018
1019   private abstract class ConnectorSession
1020   {
1021      Session JavaDoc m_session;
1022
1023      ConnectorSession(Session JavaDoc session)
1024              throws JMSException JavaDoc
1025      {
1026         m_session = session;
1027      }
1028
1029   }
1030
1031}
Popular Tags