KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > impl > Connection


1 package com.ubermq.jms.client.impl;
2
3 import com.ubermq.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.common.datagram.*;
6 import com.ubermq.kernel.*;
7 import com.ubermq.kernel.event.*;
8 import com.ubermq.kernel.overflow.*;
9 import com.ubermq.util.*;
10
11 import java.io.*;
12 import java.util.*;
13 import javax.jms.*;
14
15 /**
16  * The base Connection class encapsulates information about UberMQ and
17  * contains generally useful resources for all connections.<P>
18  *
19  * This implementation of the Connection class also can serve
20  * as either a topic or queue connection. There is no meaningful distinction
21  * functionally between the two.<P>
22  */

23 public class Connection
24     implements javax.jms.Connection JavaDoc,
25     javax.jms.QueueConnection JavaDoc,
26     javax.jms.TopicConnection JavaDoc,
27     IConnectionInfo,
28     ConnectionEventListener
29 {
30     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(Connection.class);
31     
32     // version information
33
public static final int UBERMQ_MAJOR_VERSION = 2;
34     public static final int UBERMQ_MINOR_VERSION = 7;
35     public static final int UBERMQ_REVISION = 0;
36     public static final String JavaDoc UBERMQ_PROVIDER_NAME = "UberMQ";
37     public static final String JavaDoc UBERMQ_PROVIDER_VERSION = UBERMQ_MAJOR_VERSION + "." +
38         UBERMQ_MINOR_VERSION +
39         ((UBERMQ_REVISION > 0) ? ("." + UBERMQ_REVISION) : "");
40     public static final int JMS_MAJOR_VERSION = 1;
41     public static final int JMS_MINOR_VERSION = 1;
42     public static final String JavaDoc JMS_VERSION = "1.1";
43
44     /**
45      * Timeout between reconnection attempts when connections fail.
46      */

47     public static final long DEFAULT_TIMEOUT = 30000L;
48
49     /**
50      * The client session used for this connection.
51      */

52     final IClientSession theClient;
53
54     /**
55      * The connection info used to read and write data.
56      */

57     IConnectionInfo theConn;
58
59     /**
60      * The client processor for client-side datagram and state management.
61      */

62     final IClientProcessor clientProc;
63
64     /**
65      * The delivery manager to manage message delivery.
66      */

67     final IDeliveryManager delivery;
68
69     /**
70      * The exception listener which receives indications of failures and
71      * abnormal conditions.
72      */

73     ExceptionListener exceptionListener;
74
75     /**
76      * Registered event handlers.
77      */

78     private List eventHandlers;
79
80     /**
81      * An Identifier for this client.
82      */

83     private String JavaDoc clientId;
84
85     /**
86      * The sessions of this connection.
87      */

88     private List sessions;
89
90     /**
91      * the set of local sender IDs.
92      */

93     private Set localSenders;
94
95     /**
96      * The descriptor used to open the connection.
97      */

98     ConnectionDescriptor connDescriptor;
99
100     /**
101      * Whether the connection is currently active.
102      */

103     private volatile boolean open;
104
105     /**
106      * Whether the connection is started.
107      */

108     private volatile boolean started;
109
110     /**
111      * Datagram factories.
112      */

113     final DatagramFactoryHolder factories;
114
115     /**
116      * Creates a connection.
117      *
118      * @param theClient the client session
119      * @param clientProc the client processor
120      * @param delivery the delivery manager
121      * @param factories the datagram factories
122      * @param conn describes how to connect
123      */

124     public Connection(IClientSession theClient,
125                       IClientProcessor clientProc,
126                       IDeliveryManager delivery,
127                       DatagramFactoryHolder factories,
128                       ConnectionDescriptor conn)
129         throws IOException
130     {
131         this.theClient = theClient;
132         this.clientProc = clientProc;
133         this.delivery = delivery;
134         this.connDescriptor = conn;
135         this.factories = factories;
136         this.eventHandlers = new LinkedList();
137         this.sessions = new ArrayList();
138         this.localSenders = new HashSet();
139         this.clientId = null;
140
141         this.theConn = theClient.connect(this, connDescriptor, clientProc);
142         theConn.addEventListener(this);
143
144         theClient.started(theConn);
145
146         open = true;
147         started = false;
148     }
149
150     public void close()
151     {
152         if (open) {
153             open = false;
154             started = false;
155
156             // close the underlying
157
theConn.close();
158
159             // close event
160
sendEvent(ConnectionEvent.CONNECTION_CLOSED);
161         } else {
162             // per JMS Spec, we ignore this.
163
// 4.3.5
164
}
165     }
166
167     /**
168      * Attempt to reconnect to the original destination.
169      * @throws IOException if the connection attempt fails.
170      */

171     public void reconnect()
172         throws IOException
173     {
174         reconnect(connDescriptor);
175     }
176
177     /**
178      * Attempts to reconnect to a different target than originally specified.
179      * @throws IOException if the reconnect failed.
180      */

181     public synchronized void reconnect(ConnectionDescriptor conn)
182         throws IOException
183     {
184         theConn = theClient.connect(this, conn, clientProc);
185         theConn.addEventListener(this);
186
187         open = true;
188
189         // add to I/O thread
190
theClient.started(theConn);
191
192         // notify the client proc, so that subscriptions
193
// can be restored.
194
clientProc.reconnected();
195         start();
196
197         // send the reconnected event to listeners.
198
sendEvent(ConnectionEvent.CONNECTION_RECONNECTED);
199     }
200
201     public void setClientID(String JavaDoc id)
202         throws JMSException
203     {
204         if (clientId == null &&
205             !started)
206         {
207             clientId = id;
208         }
209         else
210             throw new javax.jms.IllegalStateException JavaDoc("Cannot set Client ID at this time.");
211     }
212
213     public String JavaDoc getClientID()
214     {
215         return clientId;
216     }
217
218     void addSession(Session s)
219     {
220         sessions.add(s);
221     }
222
223     void removeSession(Session s)
224     {
225         sessions.remove(s);
226     }
227
228     public void stop()
229     {
230         if (started)
231         {
232             started = false;
233
234             // propagate the pause message to our sessions.
235
Iterator iter = sessions.iterator();
236             while (iter.hasNext())
237             {
238                 Session ts = (Session)iter.next();
239                 ts.pause();
240             }
241
242             // stop the connection
243
theConn.stop();
244         }
245     }
246
247     public void start()
248     {
249         if (!started)
250         {
251             theConn.start();
252
253             Iterator iter = sessions.iterator();
254             while (iter.hasNext())
255             {
256                 Session ts = (Session)iter.next();
257                 ts.resume();
258             }
259
260             started = true;
261         }
262     }
263
264     public boolean isOpen() {return open;}
265
266     public ConnectionMetaData getMetaData()
267     {
268         return new ConnectionMetaData() {
269             public String JavaDoc getJMSProviderName() throws JMSException
270             {
271                 return UBERMQ_PROVIDER_NAME;
272             }
273
274             public String JavaDoc getProviderVersion() throws JMSException
275             {
276                 return UBERMQ_PROVIDER_VERSION;
277             }
278
279             public String JavaDoc getJMSVersion() throws JMSException
280             {
281                 return JMS_VERSION;
282             }
283
284             public java.util.Enumeration JavaDoc getJMSXPropertyNames() throws JMSException
285             {
286                 return new java.util.Vector JavaDoc().elements();
287             }
288
289             public int getProviderMinorVersion() throws JMSException
290             {
291                 return UBERMQ_MINOR_VERSION;
292             }
293
294             public int getJMSMajorVersion() throws JMSException
295             {
296                 return JMS_MAJOR_VERSION;
297             }
298
299             public int getProviderMajorVersion() throws JMSException
300             {
301                 return UBERMQ_MAJOR_VERSION;
302             }
303
304             public int getJMSMinorVersion() throws JMSException
305             {
306                 return JMS_MINOR_VERSION;
307             }
308         };
309     }
310
311     public void setExceptionListener(ExceptionListener el)
312     {
313         exceptionListener = el;
314     }
315
316     public ExceptionListener getExceptionListener() {return exceptionListener;}
317
318     public void addEventListener(ConnectionEventListener l)
319     {
320         eventHandlers.add(l);
321     }
322
323     public void removeEventListener(ConnectionEventListener l)
324     {
325         eventHandlers.remove(l);
326     }
327
328     /**
329      * Sends an event to all the registered event listeners.
330      *
331      * @param event the event object
332      */

333     void sendEvent(ConnectionEvent event)
334     {
335         log.debug("sending connection event " + event);
336
337         Iterator iter = eventHandlers.iterator();
338         while (iter.hasNext())
339         {
340             ConnectionEventListener l = (ConnectionEventListener)iter.next();
341             try {
342                 l.connectionEvent(event);
343             } catch(RuntimeException JavaDoc x) {
344                 // listeners should not throw a runtime exception.
345
// move on.
346
}
347         }
348     }
349
350     /**
351      * Sends an event, using the code specified.
352      * @param eventCode an event code
353      */

354     void sendEvent(int eventCode)
355     {
356         sendEvent(new ConnectionEvent(this, eventCode));
357     }
358
359     /**
360      * Receives connection events from the underlying IConnectionInfo
361      * object. The JMS connection uses failure notifications to
362      * attempt reconnection. <p>
363      *
364      * Only certain events are propagated to listeners on this
365      * connection. Close and reconnect events from the underlying
366      * connection are not forwarded because the JMS connection has
367      * an independent open/closed state. All failure events are propagated.
368      */

369     public void connectionEvent(ConnectionEvent e)
370     {
371         // we route the abnormal close message to the exception listener
372
// to support JMS conventions
373
if (e.getEventCode() == ConnectionEvent.CONNECTION_IO_EXCEPTION &&
374             exceptionListener != null)
375         {
376             exceptionListener.onException(new JMSIOException(""));
377         }
378
379         // ok, if we got an IO or protocol exception,
380
// propagate the event and reconnect
381
if (e.isFailure())
382         {
383             open = false;
384             sendEvent(e);
385
386             new Thread JavaDoc(new Runnable JavaDoc() {
387                         public void run() {
388                             while(!Thread.interrupted())
389                             {
390                                 try
391                                 {
392                                     reconnect();
393                                     break;
394                                 }
395                                 catch (IOException iox)
396                                 {
397                                     // didn't work... try again,
398
// and again, ad infinitum.
399
log.error("",iox);
400                                     try
401                                     {
402                                         Thread.sleep(DEFAULT_TIMEOUT);
403                                     }
404                                     catch (InterruptedException JavaDoc e) {}
405                                 }
406                             }
407                         }
408                     },
409                        "Connection Reconnector").start();
410         }
411     }
412
413     // additional IConnectionInfo methods.
414
public void output(IDatagram d, IOverflowHandler h)
415         throws IOException
416     {
417         if (!isOpen())
418             throw new java.lang.IllegalStateException JavaDoc();
419
420         synchronized(this) {
421             theConn.output(d, h);
422         }
423     }
424
425     /**
426      * Sends a control sequence via the client processor.
427      */

428     public boolean controlSequence(IControlDatagram d, IOverflowHandler h)
429         throws IOException
430     {
431         return clientProc.controlSequence(d, h);
432     }
433
434     /**
435      * Returns the client processor for this connection. Subclasses
436      * can directly modify the client processor behavior this way,
437      * if appropriate.
438      */

439     public IClientProcessor getClientProcessor()
440     {
441         return clientProc;
442     }
443
444     public String JavaDoc getId() {return theConn.getId();}
445
446     ///// IMPLEMENTATION of queue and topic APIs.
447

448     public ConnectionConsumer createConnectionConsumer(Destination p0, String JavaDoc p1, ServerSessionPool p2, int p3) throws JMSException
449     {
450         throw new JMSUnsupportedOperationException();
451     }
452
453     public javax.jms.Session JavaDoc createSession(boolean transacted, int ackMode) throws JMSException
454     {
455         return new Session(this, factories.messageFactory(), ackMode);
456     }
457
458     public javax.jms.QueueSession JavaDoc createQueueSession(boolean transacted, int ackMode) throws javax.jms.JMSException JavaDoc
459     {
460         if (transacted)
461             throw new UnsupportedOperationException JavaDoc();
462         if (!open)
463             throw new javax.jms.IllegalStateException JavaDoc("not open");
464
465         QueueSession qs = new QueueSession(this, this.factories.messageFactory(), ackMode);
466         addSession(qs);
467         return qs;
468     }
469
470     public ConnectionConsumer createConnectionConsumer(javax.jms.Queue JavaDoc p0,
471                                                        String JavaDoc p1,
472                                                        ServerSessionPool p2,
473                                                        int p3)
474         throws JMSException
475     {
476         throw new JMSUnsupportedOperationException();
477     }
478
479
480     public ConnectionConsumer createConnectionConsumer(Topic topic,
481                                                        java.lang.String JavaDoc messageSelector,
482                                                        ServerSessionPool sessionPool,
483                                                        int maxMessages)
484         throws JMSException
485     {
486         throw new JMSUnsupportedOperationException();
487     }
488
489     public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
490                                                               java.lang.String JavaDoc subscriptionName,
491                                                               java.lang.String JavaDoc messageSelector,
492                                                               ServerSessionPool sessionPool,
493                                                               int maxMessages)
494         throws JMSException
495     {
496         throw new JMSUnsupportedOperationException();
497     }
498
499     public javax.jms.TopicSession JavaDoc createTopicSession(boolean transacted,
500                                                      int acknowledgeMode)
501         throws JMSException
502     {
503         // no transacted sessions yet
504
if (transacted)
505             throw new JMSUnsupportedOperationException();
506         if (!open)
507             throw new javax.jms.IllegalStateException JavaDoc("not open");
508
509         TopicSession ts = new TopicSession(this,
510                                            factories.messageFactory(),
511                                            acknowledgeMode);
512         addSession(ts);
513         return ts;
514     }
515
516     // LOCAL SENDERS
517

518     /**
519      * Adds a local sender to our set of local
520      * senders.
521      */

522     void addLocalSender(long senderId)
523     {
524         localSenders.add(new Long JavaDoc(senderId));
525     }
526
527     /**
528      * Removes a local sender from our set of local senders.
529      */

530     void removeLocalSender(long senderId)
531     {
532         localSenders.remove(new Long JavaDoc(senderId));
533     }
534
535     /**
536      * Determines if the specified sender is
537      * local to this connection or not.
538      * @return true if the sender registered itself
539      * as local
540      */

541     boolean isSenderLocal(long senderId)
542     {
543         return localSenders.contains(new Long JavaDoc(senderId));
544     }
545     
546     public String JavaDoc toString()
547     {
548         return connDescriptor.toString();
549     }
550 }
551
Popular Tags