KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > axis > transport > jms > JMSConnector


1 /*
2  * Copyright 2001, 2002,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.axis.transport.jms;
18
19 import org.apache.axis.components.jms.JMSVendorAdapter;
20
21 import javax.jms.BytesMessage JavaDoc;
22 import javax.jms.ConnectionFactory JavaDoc;
23 import javax.jms.Destination JavaDoc;
24 import javax.jms.ExceptionListener JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageProducer JavaDoc;
29 import javax.jms.Session JavaDoc;
30 import java.io.ByteArrayOutputStream JavaDoc;
31 import java.util.HashMap JavaDoc;
32 import java.util.Iterator JavaDoc;
33 import java.util.LinkedList JavaDoc;
34 import java.util.Map JavaDoc;
35
36 // No vendor dependent exception classes
37
//import progress.message.client.EUserAlreadyConnected;
38
//import progress.message.jclient.ErrorCodes;
39

40 /**
41  * JMSConnector is an abstract class that encapsulates the work of connecting
42  * to JMS destinations. Its subclasses are TopicConnector and QueueConnector
43  * which further specialize connections to the pub-sub and the ptp domains.
44  * It also implements the capability to retry connections in the event of
45  * failures.
46  *
47  * @author Jaime Meritt (jmeritt@sonicsoftware.com)
48  * @author Richard Chung (rchung@sonicsoftware.com)
49  * @author Dave Chappell (chappell@sonicsoftware.com)
50  * @author Ray Chun (rchun@sonicsoftware.com)
51  */

52 public abstract class JMSConnector
53 {
54     protected int m_numRetries;
55     protected long m_connectRetryInterval;
56     protected long m_interactRetryInterval;
57     protected long m_timeoutTime;
58     protected long m_poolTimeout;
59     protected AsyncConnection m_receiveConnection;
60     protected SyncConnection m_sendConnection;
61     protected int m_numSessions;
62     protected boolean m_allowReceive;
63     protected JMSVendorAdapter m_adapter;
64     protected JMSURLHelper m_jmsurl;
65
66     public JMSConnector(ConnectionFactory JavaDoc connectionFactory,
67                         int numRetries,
68                         int numSessions,
69                         long connectRetryInterval,
70                         long interactRetryInterval,
71                         long timeoutTime,
72                         boolean allowReceive,
73                         String JavaDoc clientID,
74                         String JavaDoc username,
75                         String JavaDoc password,
76                         JMSVendorAdapter adapter,
77                         JMSURLHelper jmsurl)
78         throws JMSException JavaDoc
79     {
80         m_numRetries = numRetries;
81         m_connectRetryInterval = connectRetryInterval;
82         m_interactRetryInterval = interactRetryInterval;
83         m_timeoutTime = timeoutTime;
84         m_poolTimeout = timeoutTime/(long)numRetries;
85         m_numSessions = numSessions;
86         m_allowReceive = allowReceive;
87         m_adapter = adapter;
88         m_jmsurl = jmsurl;
89
90         // try to connect initially so we can fail fast
91
// in the case of irrecoverable errors.
92
// If we fail in a recoverable fashion we will retry
93
javax.jms.Connection JavaDoc sendConnection = createConnectionWithRetry(
94                                                                 connectionFactory,
95                                                                 username,
96                                                                 password);
97         m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
98                                                 m_numSessions, "SendThread",
99                                                 clientID,
100                                                 username,
101                                                 password);
102
103         m_sendConnection.start();
104
105         if(m_allowReceive)
106         {
107             javax.jms.Connection JavaDoc receiveConnection = createConnectionWithRetry(
108                                                             connectionFactory,
109                                                             username,
110                                                             password);
111             m_receiveConnection = createAsyncConnection(connectionFactory,
112                                                         receiveConnection,
113                                                         "ReceiveThread",
114                                                         clientID,
115                                                         username,
116                                                         password);
117             m_receiveConnection.start();
118         }
119     }
120
121     public int getNumRetries()
122     {
123         return m_numRetries;
124     }
125
126     public int numSessions()
127     {
128         return m_numSessions;
129     }
130
131     public ConnectionFactory JavaDoc getConnectionFactory()
132     {
133         // there is always a send connection
134
return getSendConnection().getConnectionFactory();
135     }
136
137     public String JavaDoc getClientID()
138     {
139         return getSendConnection().getClientID();
140     }
141
142     public String JavaDoc getUsername()
143     {
144         return getSendConnection().getUsername();
145     }
146
147     public String JavaDoc getPassword()
148     {
149         return getSendConnection().getPassword();
150     }
151
152     public JMSVendorAdapter getVendorAdapter()
153     {
154         return m_adapter;
155     }
156
157     public JMSURLHelper getJMSURL()
158     {
159         return m_jmsurl;
160     }
161
162     protected javax.jms.Connection JavaDoc createConnectionWithRetry(
163                                             ConnectionFactory JavaDoc connectionFactory,
164                                             String JavaDoc username,
165                                             String JavaDoc password)
166         throws JMSException JavaDoc
167     {
168         javax.jms.Connection JavaDoc connection = null;
169         for(int numTries = 1; connection == null; numTries++)
170         {
171             try
172             {
173                 connection = internalConnect(connectionFactory, username, password);
174             }
175             catch(JMSException JavaDoc jmse)
176             {
177                 if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
178                     throw jmse;
179                 else
180                     try{Thread.sleep(m_connectRetryInterval);}catch(InterruptedException JavaDoc ie){};
181             }
182         }
183         return connection;
184     }
185
186     public void stop()
187     {
188         JMSConnectorManager.getInstance().removeConnectorFromPool(this);
189
190         m_sendConnection.stopConnection();
191         if(m_allowReceive)
192             m_receiveConnection.stopConnection();
193     }
194
195     public void start()
196     {
197         m_sendConnection.startConnection();
198         if(m_allowReceive)
199             m_receiveConnection.startConnection();
200
201         JMSConnectorManager.getInstance().addConnectorToPool(this);
202     }
203
204     public void shutdown()
205     {
206         m_sendConnection.shutdown();
207         if(m_allowReceive)
208             m_receiveConnection.shutdown();
209     }
210
211     public abstract JMSEndpoint createEndpoint(String JavaDoc destinationName)
212         throws JMSException JavaDoc;
213
214     public abstract JMSEndpoint createEndpoint(Destination JavaDoc destination)
215         throws JMSException JavaDoc;
216
217
218     protected abstract javax.jms.Connection JavaDoc internalConnect(
219                                                 ConnectionFactory JavaDoc connectionFactory,
220                                                 String JavaDoc username,
221                                                 String JavaDoc password)
222         throws JMSException JavaDoc;
223
224     private abstract class Connection extends Thread JavaDoc implements ExceptionListener JavaDoc
225     {
226         private ConnectionFactory JavaDoc m_connectionFactory;
227         protected javax.jms.Connection JavaDoc m_connection;
228
229         protected boolean m_isActive;
230         private boolean m_needsToConnect;
231         private boolean m_startConnection;
232         private String JavaDoc m_clientID;
233         private String JavaDoc m_username;
234         private String JavaDoc m_password;
235
236         private Object JavaDoc m_jmsLock;
237         private Object JavaDoc m_lifecycleLock;
238
239         protected Connection(ConnectionFactory JavaDoc connectionFactory,
240                              javax.jms.Connection JavaDoc connection,
241                              String JavaDoc threadName,
242                              String JavaDoc clientID,
243                              String JavaDoc username,
244                              String JavaDoc password)
245             throws JMSException JavaDoc
246         {
247             super(threadName);
248             m_connectionFactory = connectionFactory;
249
250             m_clientID = clientID;
251             m_username = username;
252             m_password = password;
253
254             m_jmsLock = new Object JavaDoc();
255             m_lifecycleLock = new Object JavaDoc();
256
257             if (connection != null)
258             {
259                 m_needsToConnect = false;
260                 m_connection = connection;
261                 m_connection.setExceptionListener(this);
262                 if(m_clientID != null)
263                     m_connection.setClientID(m_clientID);
264             }
265             else
266             {
267                 m_needsToConnect = true;
268             }
269
270             m_isActive = true;
271         }
272
273         public ConnectionFactory JavaDoc getConnectionFactory()
274         {
275             return m_connectionFactory;
276         }
277
278         public String JavaDoc getClientID()
279         {
280             return m_clientID;
281         }
282         public String JavaDoc getUsername()
283         {
284             return m_username;
285         }
286         public String JavaDoc getPassword()
287         {
288             return m_password;
289         }
290
291         /**
292          * @todo handle non-recoverable errors
293          */

294
295         public void run()
296         {
297             // loop until a connection is made and when a connection is made (re)establish
298
// any subscriptions
299
while (m_isActive)
300             {
301                 if (m_needsToConnect)
302                 {
303                     m_connection = null;
304                     try
305                     {
306                         m_connection = internalConnect(m_connectionFactory,
307                                                        m_username, m_password);
308                         m_connection.setExceptionListener(this);
309                         if(m_clientID != null)
310                             m_connection.setClientID(m_clientID);
311                     }
312                     catch(JMSException JavaDoc e)
313                     {
314                         // simply backoff for a while and then retry
315
try { Thread.sleep(m_connectRetryInterval); } catch(InterruptedException JavaDoc ie) { }
316                         continue;
317                     }
318                 }
319                 else
320                     m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
321
// we lost the connection
322

323                 // we now have a valid connection so establish some context
324
try
325                 {
326                     internalOnConnect();
327                 }
328                 catch(Exception JavaDoc e)
329                 {
330                     // insert code to handle non recoverable errors
331
// simply retry
332
continue;
333                 }
334
335                 synchronized(m_jmsLock)
336                 {
337                     try { m_jmsLock.wait(); } catch(InterruptedException JavaDoc ie) { } // until notified due to some change in status
338
}
339             }
340
341             // no longer staying connected, so see what we can cleanup
342
internalOnShutdown();
343         }
344
345
346
347         void startConnection()
348         {
349             synchronized(m_lifecycleLock)
350             {
351                 if(m_startConnection)
352                     return;
353                 m_startConnection = true;
354                 try {m_connection.start();}catch(Throwable JavaDoc e) { } // ignore
355
}
356         }
357
358         void stopConnection()
359         {
360             synchronized(m_lifecycleLock)
361             {
362                 if(!m_startConnection)
363                     return;
364                 m_startConnection = false;
365                 try {m_connection.stop();}catch(Throwable JavaDoc e) { } // ignore
366
}
367         }
368
369         void shutdown()
370         {
371             m_isActive = false;
372             synchronized(m_jmsLock)
373             {
374                 m_jmsLock.notifyAll();
375             }
376         }
377
378
379
380         public void onException(JMSException JavaDoc exception)
381         {
382             if(m_adapter.isRecoverable(exception,
383                                        JMSVendorAdapter.ON_EXCEPTION_ACTION))
384                 return;
385             onException();
386             synchronized(m_jmsLock)
387             {
388                 m_jmsLock.notifyAll();
389             }
390         }
391
392         private final void internalOnConnect()
393             throws Exception JavaDoc
394         {
395             onConnect();
396             synchronized(m_lifecycleLock)
397             {
398                 if(m_startConnection)
399                 {
400                     try {m_connection.start();}catch(Throwable JavaDoc e) { } // ignore
401
}
402             }
403         }
404
405         private final void internalOnShutdown()
406         {
407             stopConnection();
408             onShutdown();
409             try { m_connection.close(); } catch(Throwable JavaDoc e) { } // ignore
410
}
411
412         protected abstract void onConnect()throws Exception JavaDoc;
413         protected abstract void onShutdown();
414         protected abstract void onException();
415     }
416
417     protected abstract SyncConnection createSyncConnection(ConnectionFactory JavaDoc factory,
418                                                            javax.jms.Connection JavaDoc connection,
419                                                            int numSessions,
420                                                            String JavaDoc threadName,
421                                                            String JavaDoc clientID,
422                                                            String JavaDoc username,
423                                                            String JavaDoc password)
424
425         throws JMSException JavaDoc;
426
427     SyncConnection getSendConnection()
428     {
429         return m_sendConnection;
430     }
431
432     protected abstract class SyncConnection extends Connection
433     {
434         LinkedList JavaDoc m_senders;
435         int m_numSessions;
436         Object JavaDoc m_senderLock;
437
438         SyncConnection(ConnectionFactory JavaDoc connectionFactory,
439                        javax.jms.Connection JavaDoc connection,
440                        int numSessions,
441                        String JavaDoc threadName,
442                        String JavaDoc clientID,
443                        String JavaDoc username,
444                        String JavaDoc password)
445             throws JMSException JavaDoc
446         {
447             super(connectionFactory, connection, threadName,
448                   clientID, username, password);
449             m_senders = new LinkedList JavaDoc();
450             m_numSessions = numSessions;
451             m_senderLock = new Object JavaDoc();
452         }
453
454         protected abstract SendSession createSendSession(javax.jms.Connection JavaDoc connection)
455             throws JMSException JavaDoc;
456
457         protected void onConnect()
458           throws JMSException JavaDoc
459         {
460             synchronized(m_senderLock)
461             {
462                 for(int i = 0; i < m_numSessions; i++)
463                 {
464                     m_senders.add(createSendSession(m_connection));
465                 }
466                 m_senderLock.notifyAll();
467             }
468         }
469
470         byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap JavaDoc properties)
471             throws Exception JavaDoc
472         {
473             long timeoutTime = System.currentTimeMillis() + timeout;
474             while(true)
475             {
476                 if(System.currentTimeMillis() > timeoutTime)
477                 {
478                     throw new InvokeTimeoutException("Unable to complete call in time allotted");
479                 }
480
481                 SendSession sendSession = null;
482                 try
483                 {
484                     sendSession = getSessionFromPool(m_poolTimeout);
485                     byte[] response = sendSession.call(endpoint,
486                                                         message,
487                                                         timeoutTime - System.currentTimeMillis(),
488                                                         properties);
489                     returnSessionToPool(sendSession);
490                     if(response == null)
491                     {
492                         throw new InvokeTimeoutException("Unable to complete call in time allotted");
493                     }
494                     return response;
495                 }
496                 catch(JMSException JavaDoc jmse)
497                 {
498                     if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION))
499                     {
500                         //this we cannot recover from
501
//but it does not invalidate the session
502
returnSessionToPool(sendSession);
503                         throw jmse;
504                     }
505
506                     //for now we will assume this is a reconnect related issue
507
//and let the sender be collected
508
//give the reconnect thread a chance to fill the pool
509
Thread.yield();
510                     continue;
511                 }
512                 catch(NullPointerException JavaDoc npe)
513                 {
514                     Thread.yield();
515                     continue;
516                 }
517             }
518         }
519
520         /** @todo add in handling for security exceptions
521          * @todo add support for timeouts */

522         void send(JMSEndpoint endpoint, byte[] message, HashMap JavaDoc properties)
523             throws Exception JavaDoc
524         {
525             long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
526             while(true)
527             {
528                 if(System.currentTimeMillis() > timeoutTime)
529                 {
530                     throw new InvokeTimeoutException("Cannot complete send in time allotted");
531                 }
532
533                 SendSession sendSession = null;
534                 try
535                 {
536                     sendSession = getSessionFromPool(m_poolTimeout);
537                     sendSession.send(endpoint, message, properties);
538                     returnSessionToPool(sendSession);
539                 }
540                 catch(JMSException JavaDoc jmse)
541                 {
542                     if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION))
543                     {
544                         //this we cannot recover from
545
//but it does not invalidate the session
546
returnSessionToPool(sendSession);
547                         throw jmse;
548                     }
549                     //for now we will assume this is a reconnect related issue
550
//and let the sender be collected
551
//give the reconnect thread a chance to fill the pool
552
Thread.yield();
553                     continue;
554                 }
555                 catch(NullPointerException JavaDoc npe)
556                 {
557                     //give the reconnect thread a chance to fill the pool
558
Thread.yield();
559                     continue;
560                 }
561                 break;
562             }
563         }
564
565         protected void onException()
566         {
567             synchronized(m_senderLock)
568             {
569                 m_senders.clear();
570             }
571         }
572
573         protected void onShutdown()
574         {
575             synchronized(m_senderLock)
576             {
577                 Iterator JavaDoc senders = m_senders.iterator();
578                 while(senders.hasNext())
579                 {
580                     SendSession session = (SendSession)senders.next();
581                     session.cleanup();
582                 }
583                 m_senders.clear();
584             }
585         }
586
587         private SendSession getSessionFromPool(long timeout)
588         {
589             synchronized(m_senderLock)
590             {
591                 while(m_senders.size() == 0)
592                 {
593                     try
594                     {
595                         m_senderLock.wait(timeout);
596                         if(m_senders.size() == 0)
597                         {
598                             return null;
599                         }
600                     }
601                     catch(InterruptedException JavaDoc ignore)
602                     {
603                         return null;
604                     }
605                 }
606                 return (SendSession)m_senders.removeFirst();
607             }
608         }
609
610         private void returnSessionToPool(SendSession sendSession)
611         {
612             synchronized(m_senderLock)
613             {
614                 m_senders.addLast(sendSession);
615                 m_senderLock.notifyAll();
616             }
617         }
618
619         protected abstract class SendSession extends ConnectorSession
620         {
621             MessageProducer JavaDoc m_producer;
622
623             SendSession(Session JavaDoc session,
624                         MessageProducer JavaDoc producer)
625               throws JMSException JavaDoc
626             {
627                 super(session);
628                 m_producer = producer;
629             }
630
631             protected abstract Destination JavaDoc createTemporaryDestination()
632                 throws JMSException JavaDoc;
633
634             protected abstract void deleteTemporaryDestination(Destination JavaDoc destination)
635                 throws JMSException JavaDoc;
636
637             protected abstract MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination)
638                 throws JMSException JavaDoc;
639
640             protected abstract void send(Destination JavaDoc destination,
641                                          Message JavaDoc message,
642                                          int deliveryMode,
643                                          int priority,
644                                          long timeToLive)
645                 throws JMSException JavaDoc;
646
647             void send(JMSEndpoint endpoint, byte[] message, HashMap JavaDoc properties)
648                 throws Exception JavaDoc
649             {
650                 BytesMessage JavaDoc jmsMessage = m_session.createBytesMessage();
651                 jmsMessage.writeBytes(message);
652                 int deliveryMode = extractDeliveryMode(properties);
653                 int priority = extractPriority(properties);
654                 long timeToLive = extractTimeToLive(properties);
655
656                 if(properties != null && !properties.isEmpty())
657                     setProperties(properties, jmsMessage);
658
659                 send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
660                      priority, timeToLive);
661             }
662
663
664             void cleanup()
665             {
666                 try{m_producer.close();}catch(Throwable JavaDoc t){}
667                 try{m_session.close();}catch(Throwable JavaDoc t){}
668             }
669
670             byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
671                         HashMap JavaDoc properties)
672                 throws Exception JavaDoc
673             {
674                 Destination JavaDoc reply = createTemporaryDestination();
675                 MessageConsumer JavaDoc subscriber = createConsumer(reply);
676                 BytesMessage JavaDoc jmsMessage = m_session.createBytesMessage();
677                 jmsMessage.writeBytes(message);
678                 jmsMessage.setJMSReplyTo(reply);
679
680                 int deliveryMode = extractDeliveryMode(properties);
681                 int priority = extractPriority(properties);
682                 long timeToLive = extractTimeToLive(properties);
683
684                 if(properties != null && !properties.isEmpty())
685                     setProperties(properties, jmsMessage);
686
687                 send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
688                      priority, timeToLive);
689                 BytesMessage JavaDoc response = null;
690                 try {
691                     response = (BytesMessage JavaDoc)subscriber.receive(timeout);
692                 } catch (ClassCastException JavaDoc cce) {
693                     throw new InvokeException
694                             ("Error: unexpected message type received - expected BytesMessage");
695                 }
696                 byte[] respBytes = null;
697                 if(response != null)
698                 {
699                     byte[] buffer = new byte[8 * 1024];
700                     ByteArrayOutputStream JavaDoc out = new ByteArrayOutputStream JavaDoc();
701                     for(int bytesRead = response.readBytes(buffer);
702                         bytesRead != -1; bytesRead = response.readBytes(buffer))
703                     {
704                         out.write(buffer, 0, bytesRead);
705                     }
706                     respBytes = out.toByteArray();
707                 }
708                 subscriber.close();
709                 deleteTemporaryDestination(reply);
710                 return respBytes;
711             }
712
713             private int extractPriority(HashMap JavaDoc properties)
714             {
715                 return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
716                                          JMSConstants.DEFAULT_PRIORITY);
717             }
718
719             private int extractDeliveryMode(HashMap JavaDoc properties)
720             {
721                 return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
722                                          JMSConstants.DEFAULT_DELIVERY_MODE);
723             }
724
725             private long extractTimeToLive(HashMap JavaDoc properties)
726             {
727                 return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
728                                           JMSConstants.DEFAULT_TIME_TO_LIVE);
729             }
730
731             private void setProperties(HashMap JavaDoc properties, Message JavaDoc message)
732                 throws JMSException JavaDoc
733             {
734                 Iterator JavaDoc propertyIter = properties.entrySet().iterator();
735                 while(propertyIter.hasNext())
736                 {
737                     Map.Entry JavaDoc property = (Map.Entry JavaDoc)propertyIter.next();
738                     setProperty((String JavaDoc)property.getKey(), property.getValue(),
739                                 message);
740                 }
741             }
742
743             private void setProperty(String JavaDoc property, Object JavaDoc value, Message JavaDoc message)
744                 throws JMSException JavaDoc
745             {
746                 if(property == null)
747                     return;
748                 if(property.equals(JMSConstants.JMS_CORRELATION_ID))
749                     message.setJMSCorrelationID((String JavaDoc)value);
750                 else if(property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
751                     message.setJMSCorrelationIDAsBytes((byte[])value);
752                 else if(property.equals(JMSConstants.JMS_TYPE))
753                     message.setJMSType((String JavaDoc)value);
754                 else
755                     message.setObjectProperty(property, value);
756             }
757         }
758     }
759
760     AsyncConnection getReceiveConnection()
761     {
762         return m_receiveConnection;
763     }
764
765     protected abstract AsyncConnection createAsyncConnection(ConnectionFactory JavaDoc factory,
766                                                              javax.jms.Connection JavaDoc connection,
767                                                              String JavaDoc threadName,
768                                                              String JavaDoc clientID,
769                                                              String JavaDoc username,
770                                                              String JavaDoc password)
771
772         throws JMSException JavaDoc;
773
774     protected abstract class AsyncConnection extends Connection
775     {
776         HashMap JavaDoc m_subscriptions;
777         Object JavaDoc m_subscriptionLock;
778
779         protected AsyncConnection(ConnectionFactory JavaDoc connectionFactory,
780                                   javax.jms.Connection JavaDoc connection,
781                                   String JavaDoc threadName,
782                                   String JavaDoc clientID,
783                                   String JavaDoc username,
784                                   String JavaDoc password)
785             throws JMSException JavaDoc
786         {
787             super(connectionFactory, connection, threadName,
788                   clientID, username, password);
789             m_subscriptions = new HashMap JavaDoc();
790             m_subscriptionLock = new Object JavaDoc();
791         }
792
793         protected abstract ListenerSession createListenerSession(
794                                                 javax.jms.Connection JavaDoc connection,
795                                                 Subscription subscription)
796             throws Exception JavaDoc;
797
798         protected void onShutdown()
799         {
800             synchronized(m_subscriptionLock)
801             {
802                 Iterator JavaDoc subscriptions = m_subscriptions.keySet().iterator();
803                 while(subscriptions.hasNext())
804                 {
805                     Subscription subscription = (Subscription)subscriptions.next();
806                     ListenerSession session = (ListenerSession)
807                                                 m_subscriptions.get(subscription);
808                     if(session != null)
809                     {
810                         session.cleanup();
811                     }
812
813                 }
814                 m_subscriptions.clear();
815             }
816         }
817
818         /**
819          * @todo add in security exception propagation
820          * @param subscription
821          */

822         void subscribe(Subscription subscription)
823             throws Exception JavaDoc
824         {
825             long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
826             synchronized(m_subscriptionLock)
827             {
828                 if(m_subscriptions.containsKey(subscription))
829                     return;
830                 while(true)
831                 {
832                     if(System.currentTimeMillis() > timeoutTime)
833                     {
834                         throw new InvokeTimeoutException("Cannot subscribe listener");
835                     }
836
837                     try
838                     {
839                         ListenerSession session = createListenerSession(m_connection,
840                                                                         subscription);
841                         m_subscriptions.put(subscription, session);
842                         break;
843                     }
844                     catch(JMSException JavaDoc jmse)
845                     {
846                         if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION))
847                         {
848                             throw jmse;
849                         }
850
851                         try{m_subscriptionLock.wait(m_interactRetryInterval);}
852                         catch(InterruptedException JavaDoc ignore){}
853                         //give reconnect a chance
854
Thread.yield();
855                         continue;
856                     }
857                     catch(NullPointerException JavaDoc jmse)
858                     {
859                         //we ARE reconnecting
860
try{m_subscriptionLock.wait(m_interactRetryInterval);}
861                         catch(InterruptedException JavaDoc ignore){}
862                         //give reconnect a chance
863
Thread.yield();
864                         continue;
865                     }
866                 }
867             }
868         }
869
870         void unsubscribe(Subscription subscription)
871         {
872             long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
873             synchronized(m_subscriptionLock)
874             {
875                 if(!m_subscriptions.containsKey(subscription))
876                     return;
877                 while(true)
878                 {
879                     if(System.currentTimeMillis() > timeoutTime)
880                     {
881                         throw new InvokeTimeoutException("Cannot unsubscribe listener");
882                     }
883
884                     //give reconnect a chance
885
Thread.yield();
886                     try
887                     {
888                         ListenerSession session = (ListenerSession)
889                                                 m_subscriptions.get(subscription);
890                         session.cleanup();
891                         m_subscriptions.remove(subscription);
892                         break;
893                     }
894                     catch(NullPointerException JavaDoc jmse)
895                     {
896                         //we are reconnecting
897
try{m_subscriptionLock.wait(m_interactRetryInterval);}
898                         catch(InterruptedException JavaDoc ignore){}
899                         continue;
900                     }
901                 }
902             }
903         }
904
905         protected void onConnect()
906             throws Exception JavaDoc
907         {
908             synchronized(m_subscriptionLock)
909             {
910                 Iterator JavaDoc subscriptions = m_subscriptions.keySet().iterator();
911                 while(subscriptions.hasNext())
912                 {
913                     Subscription subscription = (Subscription)subscriptions.next();
914
915                     if(m_subscriptions.get(subscription) == null)
916                     {
917                         m_subscriptions.put(subscription,
918                             createListenerSession(m_connection, subscription));
919                     }
920                 }
921                 m_subscriptionLock.notifyAll();
922             }
923         }
924
925         protected void onException()
926         {
927             synchronized(m_subscriptionLock)
928             {
929                 Iterator JavaDoc subscriptions = m_subscriptions.keySet().iterator();
930                 while(subscriptions.hasNext())
931                 {
932                     Subscription subscription = (Subscription)subscriptions.next();
933                     m_subscriptions.put(subscription, null);
934                 }
935             }
936         }
937
938
939
940         protected class ListenerSession extends ConnectorSession
941         {
942             protected MessageConsumer JavaDoc m_consumer;
943             protected Subscription m_subscription;
944
945             ListenerSession(Session JavaDoc session,
946                             MessageConsumer JavaDoc consumer,
947                             Subscription subscription)
948                 throws Exception JavaDoc
949             {
950                 super(session);
951                 m_subscription = subscription;
952                 m_consumer = consumer;
953                 Destination JavaDoc destination = subscription.m_endpoint.getDestination(m_session);
954                 m_consumer.setMessageListener(subscription.m_listener);
955             }
956
957             void cleanup()
958             {
959                 try{m_consumer.close();}catch(Exception JavaDoc ignore){}
960                 try{m_session.close();}catch(Exception JavaDoc ignore){}
961             }
962
963         }
964     }
965
966     private abstract class ConnectorSession
967     {
968         Session JavaDoc m_session;
969
970         ConnectorSession(Session JavaDoc session)
971           throws JMSException JavaDoc
972         {
973             m_session = session;
974         }
975     }
976 }
Popular Tags