KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > Connection


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq;
23
24 import java.io.IOException JavaDoc;
25 import java.io.Serializable JavaDoc;
26 import java.util.Arrays JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.HashSet JavaDoc;
29 import java.util.LinkedList JavaDoc;
30
31 import javax.jms.ConnectionMetaData JavaDoc;
32 import javax.jms.Destination JavaDoc;
33 import javax.jms.ExceptionListener JavaDoc;
34 import javax.jms.IllegalStateException JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.JMSSecurityException JavaDoc;
37 import javax.jms.Queue JavaDoc;
38 import javax.jms.TemporaryQueue JavaDoc;
39 import javax.jms.TemporaryTopic JavaDoc;
40 import javax.transaction.xa.Xid JavaDoc;
41
42 import org.jboss.logging.Logger;
43 import org.jboss.mq.il.ClientILService;
44 import org.jboss.mq.il.ServerIL;
45 import org.jboss.util.UnreachableStatementException;
46
47 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
48 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
49 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
50 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
51
52 /**
53  * This class implements javax.jms.Connection.
54  *
55  * <p>
56  * It is also the gateway through wich all calls to the JMS server is done. To
57  * do its work it needs a ServerIL to invoke (@see
58  * org.jboss.mq.server.ServerIL).
59  * </p>
60  *
61  * <p>
62  * The (new from february 2002) logic for clientID is the following: if logging
63  * in with a user and passwork a preconfigured clientID may be automatically
64  * delivered from the server.
65  * </p>
66  *
67  * <p>
68  * If the client wants to set it's own clientID it must do so on a connection
69  * wich does not have a prefonfigured clientID and it must do so before it
70  * calls any other methods on the connection (even getClientID()). It is not
71  * allowable to use a clientID that either looks like JBossMQ internal one
72  * (beginning with ID) or a clientID that is allready in use by someone, or a
73  * clientID that is already preconfigured in the server.
74  * </p>
75  *
76  * <p>
77  * If a preconfigured ID is not get, or a valid one is not set, the server will
78  * set an internal ID. This ID is NEVER possible to use for durable
79  * subscriptions. If a prefconfigured ID or one manually set is possible to use
80  * to create a durable subscriptions is governed by the security configuration
81  * of JBossMQ. In the default setup, only preconfigured clientID's are possible
82  * to use. If using a SecurityManager, permissions to create a surable
83  * subscriptions is * the resiult of a combination of the following:
84  * </p>
85  * <p>- The clientID is not one of JBossMQ's internal.
86  * </p>
87  * <p>- The user is authenticated and has a role that has create set to true
88  * in the security config of the destination.
89  * </p>
90  *
91  * <p>
92  * Notes for JBossMQ developers: All calls, except close(), that is possible to
93  * do on a connection must call checkClientID()
94  * </p>
95  *
96  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
97  * @author Hiram Chirino (Cojonudo14@hotmail.com)
98  * @author <a HREF="pra@tim.se">Peter Antman</a>
99  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
100  * @version $Revision: 45317 $
101  */

102 public abstract class Connection implements Serializable JavaDoc, javax.jms.Connection JavaDoc
103 {
104    /** The serialVersionUID */
105    private static final long serialVersionUID = 87938199839407082L;
106
107    /** The threadGroup */
108    private static ThreadGroup JavaDoc threadGroup = new ThreadGroup JavaDoc("JBossMQ Client Threads");
109
110    /** The log */
111    static Logger log = Logger.getLogger(Connection.class);
112
113    /** Whether trace is enabled */
114    static boolean trace = log.isTraceEnabled();
115
116    /** Manages the thread that pings the connection to see if it is 'alive' */
117    static protected ClockDaemon clockDaemon = new ClockDaemon();
118
119    /** Maps a destination to a LinkedList of Subscriptions */
120    public HashMap JavaDoc destinationSubscriptions = new HashMap JavaDoc();
121
122    /** Maps a subscription id to a Subscription */
123    public HashMap JavaDoc subscriptions = new HashMap JavaDoc();
124
125    /** Is the connection stopped ? */
126    public boolean modeStop;
127
128    /** This is our connection to the JMS server */
129    protected ServerIL serverIL;
130
131    /** This is the clientID */
132    protected String JavaDoc clientID;
133
134    /** The connection token is used to identify our connection to the server. */
135    protected ConnectionToken connectionToken;
136
137    /** The object that sets up the client IL */
138    protected ClientILService clientILService;
139
140    /** How often to ping the connection */
141    protected long pingPeriod = 1000 * 60;
142
143    /** This field is reset when a ping is sent, set when ponged. */
144    protected boolean ponged = true;
145
146    /** This is used to know when the PingTask is running */
147    Semaphore pingTaskSemaphore = new Semaphore(1);
148
149    /** Identifies the PinkTask in the ClockDaemon */
150    Object JavaDoc pingTaskId;
151
152    /** Set a soon as close() is called on the connection. */
153    private SynchronizedBoolean closing = new SynchronizedBoolean(false);
154
155    /** Whether setClientId is Allowed */
156    private volatile boolean setClientIdAllowed = true;
157
158    /** LinkedList of all created sessions by this connection */
159    HashSet JavaDoc createdSessions;
160
161    /** Numbers subscriptions */
162    int subscriptionCounter = Integer.MIN_VALUE;
163
164    /** The lock for subscriptionCounter */
165    Object JavaDoc subCountLock = new Object JavaDoc();
166
167    /** Is the connection closed */
168    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
169
170    /** Used to control tranactions */
171    SpyXAResourceManager spyXAResourceManager;
172
173    /** The class that created this connection */
174    GenericConnectionFactory genericConnectionFactory;
175
176    /** Last message ID returned */
177    private int lastMessageID;
178
179    /** the exceptionListener */
180    private ExceptionListener JavaDoc exceptionListener;
181
182    /** The exception listener lock */
183    private Object JavaDoc elLock = new Object JavaDoc();
184    
185    /** The exception listener invocation thread */
186    private Thread JavaDoc elThread;
187    
188    /** Used in message id generation */
189    private StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
190
191    /** Used in message id generation */
192    private char[] charStack = new char[22];
193
194    /** The next session id */
195    String JavaDoc sessionId;
196
197    /** Temporary destinations created by this connection */
198    protected HashSet JavaDoc temps = new HashSet JavaDoc();
199    
200    static
201    {
202       log.debug("Setting the clockDaemon's thread factory");
203       clockDaemon.setThreadFactory(new ThreadFactory()
204       {
205          public Thread JavaDoc newThread(Runnable JavaDoc r)
206          {
207             Thread JavaDoc t = new Thread JavaDoc(getThreadGroup(), r, "Connection Monitor Thread");
208             t.setDaemon(true);
209             return t;
210          }
211       });
212    }
213
214    public static ThreadGroup JavaDoc getThreadGroup()
215    {
216       if (threadGroup.isDestroyed())
217          threadGroup = new ThreadGroup JavaDoc("JBossMQ Client Threads");
218       return threadGroup;
219    }
220
221    /**
222      * Create a new Connection
223      *
224      * @param userName the username
225      * @param password the password
226      * @param genericConnectionFactory the constructing class
227      * @throws JMSException for any error
228      */

229    Connection(String JavaDoc userName, String JavaDoc password, GenericConnectionFactory genericConnectionFactory) throws JMSException JavaDoc
230    {
231       //Set the attributes
232
createdSessions = new HashSet JavaDoc();
233       connectionToken = null;
234       lastMessageID = 0;
235       modeStop = true;
236
237       if (trace)
238          log.trace("Connection Initializing userName=" + userName + " " + this);
239       this.genericConnectionFactory = genericConnectionFactory;
240       genericConnectionFactory.initialise(this);
241
242       // Connect to the server
243
if (trace)
244          log.trace("Getting the serverIL " + this);
245       serverIL = genericConnectionFactory.createServerIL();
246       if (trace)
247          log.trace("serverIL=" + serverIL + " " + this);
248
249       // Register ourselves as a client
250
try
251       {
252          authenticate(userName, password);
253
254          if (userName != null)
255             askForAnID(userName, password);
256
257          startILService();
258       }
259       catch (Throwable JavaDoc t)
260       {
261          // Client registeration failed, close the connection
262
try
263          {
264             serverIL.connectionClosing(null);
265          }
266          catch (Throwable JavaDoc t2)
267          {
268             log.debug("Error closing the connection", t2);
269          }
270
271          SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
272       }
273
274       // Finish constructing the connection
275
try
276       {
277          if (trace)
278             log.trace("Creating XAResourceManager " + this);
279
280          // Setup the XA Resource manager,
281
spyXAResourceManager = new SpyXAResourceManager(this);
282
283          if (trace)
284             log.trace("Starting the ping thread " + this);
285          startPingThread();
286
287          if (trace)
288             log.trace("Connection establishment successful " + this);
289       }
290       catch (Throwable JavaDoc t)
291       {
292          // Could not complete the connection, tidy up
293
// the server and client ILs.
294
try
295          {
296             serverIL.connectionClosing(connectionToken);
297          }
298          catch (Throwable JavaDoc t2)
299          {
300             log.debug("Error closing the connection", t2);
301          }
302          try
303          {
304             stopILService();
305          }
306          catch (Throwable JavaDoc t2)
307          {
308             log.debug("Error stopping the client IL", t2);
309          }
310
311          SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
312       }
313    }
314
315    /**
316      * Create a new Connection
317      *
318      * @param genericConnectionFactory the constructing class
319      * @throws JMSException for any error
320      */

321    Connection(GenericConnectionFactory genericConnectionFactory) throws JMSException JavaDoc
322    {
323       this(null, null, genericConnectionFactory);
324    }
325
326    /**
327      * Gets the ServerIL attribute of the Connection object
328      *
329      * @return The ServerIL value
330      */

331    public ServerIL getServerIL()
332    {
333       return serverIL;
334    }
335
336    /**
337      * Notification from the server that the connection is closed
338      */

339    public void asynchClose()
340    {
341       // This obviously did something at some point?
342
}
343
344    /**
345      * Called by a TemporaryDestination which is going to be deleted()
346      *
347      * @param dest the temporary destination
348      */

349    public void asynchDeleteTemporaryDestination(SpyDestination dest)
350    {
351       if (trace)
352          log.trace("Deleting temporary destination " + dest);
353       try
354       {
355          deleteTemporaryDestination(dest);
356       }
357       catch (Throwable JavaDoc t)
358       {
359          asynchFailure("Error deleting temporary destination " + dest, t);
360       }
361    }
362
363    /**
364      * Gets the first consumer that is listening to a destination.
365      *
366      * @param requests the receive requests
367      */

368    public void asynchDeliver(ReceiveRequest requests[])
369    {
370       // If we are closing the connection, the server will nack the messages
371
if (closing.get())
372          return;
373
374       if (trace)
375          log.trace("Async deliver requests=" + Arrays.asList(requests) + " " + this);
376       
377       try
378       {
379          for (int i = 0; i < requests.length; i++)
380          {
381             ReceiveRequest r = requests[i];
382             if (trace)
383                log.trace("Processing request=" + r + " " + this);
384             
385             SpyConsumer consumer = (SpyConsumer) subscriptions.get(r.subscriptionId);
386             r.message.createAcknowledgementRequest(r.subscriptionId.intValue());
387
388             if (consumer == null)
389             {
390                send(r.message.getAcknowledgementRequest(false));
391                log.debug("WARNING: NACK issued due to non existent subscription " + r.message.header.messageId);
392                continue;
393             }
394
395             if (trace)
396                log.trace("Delivering messageid=" + r.message.header.messageId + " to consumer=" + consumer);
397             
398             consumer.addMessage(r.message);
399          }
400       }
401       catch (Throwable JavaDoc t)
402       {
403          asynchFailure("Error during async delivery", t);
404       }
405    }
406    /**
407      * Notification of a failure on this connection
408      *
409      * @param reason the reason for the failure
410      * @param t the throwable
411      */

412    public void asynchFailure(String JavaDoc reason, Throwable JavaDoc t)
413    {
414       if (trace)
415          log.trace("Notified of failure reason=" + reason + " " + this, t);
416
417       // Exceptions due to closing will be ignored.
418
if (closing.get())
419          return;
420
421       JMSException JavaDoc excep = SpyJMSException.getAsJMSException(reason, t);
422
423       synchronized (elLock)
424       {
425          ExceptionListener JavaDoc el = exceptionListener;
426          if (el != null && elThread == null)
427          {
428             try
429             {
430                Runnable JavaDoc run = new ExceptionListenerRunnable(el, excep);
431                elThread = new Thread JavaDoc(getThreadGroup(), run, "ExceptionListener " + this);
432                elThread.setDaemon(false);
433                elThread.start();
434             }
435             catch (Throwable JavaDoc t1)
436             {
437                log.warn("Connection failure: ", excep);
438                log.warn("Unable to start exception listener thread: ", t1);
439             }
440          }
441          else if (elThread != null)
442             log.warn("Connection failure, already in the exception listener", excep);
443          else
444             log.warn("Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect", excep);
445       }
446    }
447
448    /**
449      * Invoked when the server pong us
450      *
451      * @param serverTime the server time
452      */

453    public void asynchPong(long serverTime)
454    {
455       if (trace)
456          log.trace("PONG serverTime=" + serverTime + " " + this);
457       ponged = true;
458    }
459
460    /**
461      * Called by a TemporaryDestination which is going to be deleted
462      *
463      * @param dest the temporary destination
464      * @exception JMSException for any error
465      */

466    public void deleteTemporaryDestination(SpyDestination dest) throws JMSException JavaDoc
467    {
468       checkClosed();
469       if (trace)
470          log.trace("DeleteDestination dest=" + dest + " " + this);
471       try
472       {
473          //Ask the broker to delete() this TemporaryDestination
474
serverIL.deleteTemporaryDestination(connectionToken, dest);
475
476          //Remove it from the destinations list
477
synchronized (subscriptions)
478          {
479             destinationSubscriptions.remove(dest);
480          }
481
482          // Remove it from the temps list
483
synchronized (temps)
484          {
485             temps.remove(dest);
486          }
487       }
488       catch (Throwable JavaDoc t)
489       {
490          
491          SpyJMSException.rethrowAsJMSException("Cannot delete the TemporaryDestination", t);
492       }
493    }
494
495    public void setClientID(String JavaDoc cID) throws JMSException JavaDoc
496    {
497       checkClosed();
498       if (clientID != null)
499          throw new IllegalStateException JavaDoc("The connection has already a clientID");
500       if (setClientIdAllowed == false)
501          throw new IllegalStateException JavaDoc("SetClientID was not called emediately after creation of connection");
502
503       if (trace)
504          log.trace("SetClientID clientID=" + clientID + " " + this);
505
506       try
507       {
508          serverIL.checkID(cID);
509       }
510       catch (Throwable JavaDoc t)
511       {
512          SpyJMSException.rethrowAsJMSException("Cannot connect to the JMSServer", t);
513       }
514
515       clientID = cID;
516       connectionToken.setClientID(clientID);
517    }
518
519    public String JavaDoc getClientID() throws JMSException JavaDoc
520    {
521       checkClosed();
522       return clientID;
523    }
524
525    public ExceptionListener JavaDoc getExceptionListener() throws JMSException JavaDoc
526    {
527       checkClosed();
528       checkClientID();
529       return exceptionListener;
530    }
531
532    public void setExceptionListener(ExceptionListener JavaDoc listener) throws JMSException JavaDoc
533    {
534       checkClosed();
535       checkClientID();
536
537       exceptionListener = listener;
538    }
539
540    public ConnectionMetaData JavaDoc getMetaData() throws JMSException JavaDoc
541    {
542       checkClosed();
543       checkClientID();
544
545       return new SpyConnectionMetaData();
546    }
547
548    public synchronized void close() throws JMSException JavaDoc
549    {
550       if (closed.get())
551          return;
552       if (trace)
553          log.trace("Closing connection " + this);
554       
555       closing.set(true);
556
557       // We don't want to notify the exception listener
558
exceptionListener = null;
559
560       // The first exception
561
JMSException JavaDoc exception = null;
562
563       try
564       {
565          doStop();
566       }
567       catch (Throwable JavaDoc t)
568       {
569          log.trace("Error during stop", t);
570       }
571       
572       if (trace)
573          log.trace("Closing sessions " + this);
574       Object JavaDoc[] vect = null;
575       synchronized (createdSessions)
576       {
577          vect = createdSessions.toArray();
578       }
579       for (int i = 0; i < vect.length; i++)
580       {
581          SpySession session = (SpySession) vect[i];
582          try
583          {
584             session.close();
585          }
586          catch (Throwable JavaDoc t)
587          {
588             if (trace)
589                log.trace("Error closing session " + session, t);
590          }
591       }
592       if (trace)
593          log.trace("Closed sessions " + this);
594
595       if (trace)
596          log.trace("Notifying the server of close " + this);
597       try
598       {
599          serverIL.connectionClosing(connectionToken);
600       }
601       catch (Throwable JavaDoc t)
602       {
603          log.trace("Cannot close properly the connection", t);
604       }
605
606       if (trace)
607          log.trace("Stopping ping thread " + this);
608       try
609       {
610          stopPingThread();
611       }
612       catch (Throwable JavaDoc t)
613       {
614          if (exception == null)
615             exception = SpyJMSException.getAsJMSException("Cannot stop the ping thread", t);
616       }
617
618       if (trace)
619          log.trace("Stopping the ClientIL service " + this);
620       try
621       {
622          stopILService();
623       }
624       catch (Throwable JavaDoc t)
625       {
626          log.trace("Cannot stop the client il service", t);
627       }
628
629       // Only set the closed flag after all the objects that depend
630
// on this connection have been closed.
631
closed.set(true);
632
633       if (trace)
634          log.trace("Disconnected from server " + this);
635
636       // Throw the first exception
637
if (exception != null)
638          throw exception;
639    }
640
641    public void start() throws JMSException JavaDoc
642    {
643       checkClosed();
644       checkClientID();
645
646       if (modeStop == false)
647          return;
648       modeStop = false;
649
650       if (trace)
651          log.trace("Starting connection " + this);
652
653       try
654       {
655          serverIL.setEnabled(connectionToken, true);
656       }
657       catch (Throwable JavaDoc t)
658       {
659          SpyJMSException.rethrowAsJMSException("Cannot enable the connection with the JMS server", t);
660       }
661    }
662
663    public void stop() throws JMSException JavaDoc
664    {
665       checkClosed();
666       checkClientID();
667       doStop();
668    }
669
670    public String JavaDoc toString()
671    {
672       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
673       buffer.append("Connection@").append(System.identityHashCode(this));
674       buffer.append('[');
675       if (connectionToken != null)
676          buffer.append("token=").append(connectionToken);
677       else
678          buffer.append("clientID=").append(clientID);
679       if (closed.get())
680          buffer.append(" CLOSED");
681       else if (closing.get())
682          buffer.append(" CLOSING");
683       buffer.append(" rcvstate=");
684       if (modeStop)
685          buffer.append("STOPPED");
686       else
687          buffer.append("STARTED");
688       buffer.append(']');
689       return buffer.toString();
690    }
691
692    /**
693      * Get the next message id
694      * <p>
695      *
696      * All longs are less than 22 digits long
697      * <p>
698      *
699      * Note that in this routine we assume that System.currentTimeMillis() is
700      * non-negative always be non-negative (so don't set lastMessageID to a
701      * positive for a start).
702      *
703      * @return the next message id
704      * @throws JMSException for any error
705      */

706    String JavaDoc getNewMessageID() throws JMSException JavaDoc
707    {
708       checkClosed();
709       synchronized (sb)
710       {
711          sb.setLength(0);
712          sb.append(clientID);
713          sb.append('-');
714          long time = System.currentTimeMillis();
715          int count = 0;
716          do
717          {
718             charStack[count] = (char) ('0' + (time % 10));
719             time = time / 10;
720             ++count;
721          }
722          while (time != 0);
723          --count;
724          for (; count >= 0; --count)
725          {
726             sb.append(charStack[count]);
727          }
728          ++lastMessageID;
729          //avoid having to deal with negative numbers.
730
if (lastMessageID < 0)
731          {
732             lastMessageID = 0;
733          }
734          int id = lastMessageID;
735          count = 0;
736          do
737          {
738             charStack[count] = (char) ('0' + (id % 10));
739             id = id / 10;
740             ++count;
741          }
742          while (id != 0);
743          --count;
744          for (; count >= 0; --count)
745          {
746             sb.append(charStack[count]);
747          }
748          return sb.toString();
749       }
750    }
751
752    /**
753      * A new Consumer has been created.
754      * <p>
755      * We have to handle security issues, a consumer may actually not be allowed
756      * to be created
757      *
758      * @param consumer the consumer added
759      * @throws JMSException for any error
760      */

761    void addConsumer(SpyConsumer consumer) throws JMSException JavaDoc
762    {
763       checkClosed();
764       Subscription req = consumer.getSubscription();
765       synchronized (subCountLock)
766       {
767          req.subscriptionId = subscriptionCounter++;
768       }
769       req.connectionToken = connectionToken;
770       if (trace)
771          log.trace("addConsumer sub=" + req);
772
773       try
774       {
775          synchronized (subscriptions)
776          {
777             subscriptions.put(new Integer JavaDoc(req.subscriptionId), consumer);
778
779             LinkedList JavaDoc ll = (LinkedList JavaDoc) destinationSubscriptions.get(req.destination);
780             if (ll == null)
781             {
782                ll = new LinkedList JavaDoc();
783                destinationSubscriptions.put(req.destination, ll);
784             }
785
786             ll.add(consumer);
787          }
788
789          serverIL.subscribe(connectionToken, req);
790       }
791       catch (JMSSecurityException JavaDoc ex)
792       {
793          removeConsumerInternal(consumer);
794          throw ex;
795       }
796       catch (Throwable JavaDoc t)
797       {
798          SpyJMSException.rethrowAsJMSException("Cannot subscribe to this Destination: ", t);
799       }
800    }
801
802    /**
803      * Browse a queue
804      *
805      * @param queue the queue
806      * @param selector the selector
807      * @return an array of messages
808      * @exception JMSException for any error
809      */

810    SpyMessage[] browse(Queue JavaDoc queue, String JavaDoc selector) throws JMSException JavaDoc
811    {
812       checkClosed();
813       if (trace)
814          log.trace("Browsing queue=" + queue + " selector=" + selector + " " + this);
815
816       try
817       {
818          return serverIL.browse(connectionToken, queue, selector);
819       }
820       catch (Throwable JavaDoc t)
821       {
822          SpyJMSException.rethrowAsJMSException("Cannot browse the Queue", t);
823          throw new UnreachableStatementException();
824       }
825    }
826
827    /**
828      * Ping the server
829      *
830      * @param clientTime the start of the ping
831      * @throws JMSException for any error
832      */

833    void pingServer(long clientTime) throws JMSException JavaDoc
834    {
835       checkClosed();
836       trace = log.isTraceEnabled();
837       if (trace)
838          log.trace("PING " + clientTime + " " + this);
839
840       try
841       {
842          serverIL.ping(connectionToken, clientTime);
843       }
844       catch (Throwable JavaDoc t)
845       {
846          SpyJMSException.rethrowAsJMSException("Cannot ping the JMS server", t);
847       }
848    }
849
850    /**
851      * Receive a message
852      *
853      * @param sub the subscription
854      * @param wait the wait time
855      * @return the message or null if there isn't one
856      * @throws JMSException for any error
857      */

858    SpyMessage receive(Subscription sub, long wait) throws JMSException JavaDoc
859    {
860       checkClosed();
861       if (trace)
862          log.trace("Receive subscription=" + sub + " wait=" + wait);
863
864       try
865       {
866          SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait);
867          if (message != null)
868             message.createAcknowledgementRequest(sub.subscriptionId);
869          return message;
870       }
871       catch (Throwable JavaDoc t)
872       {
873          SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
874          throw new UnreachableStatementException();
875       }
876    }
877
878    /**
879     * Remove a consumer
880     *
881     * @param consumer the consumer
882     * @throws JMSException for any error
883     */

884    void removeConsumer(SpyConsumer consumer) throws JMSException JavaDoc
885    {
886       checkClosed();
887       Subscription req = consumer.getSubscription();
888       if (trace)
889          log.trace("removeConsumer req=" + req);
890
891       try
892       {
893          serverIL.unsubscribe(connectionToken, req.subscriptionId);
894
895          removeConsumerInternal(consumer);
896       }
897       catch (Throwable JavaDoc t)
898       {
899          SpyJMSException.rethrowAsJMSException("Cannot unsubscribe to this destination", t);
900       }
901
902    }
903
904    /**
905     * Send a message to the server
906     *
907     * @param mes the message
908     * @throws JMSException for any error
909     */

910    void sendToServer(SpyMessage mes) throws JMSException JavaDoc
911    {
912       checkClosed();
913       if (trace)
914          log.trace("SendToServer message=" + mes.header.jmsMessageID + " " + this);
915       
916       try
917       {
918          serverIL.addMessage(connectionToken, mes);
919       }
920       catch (Throwable JavaDoc t)
921       {
922          SpyJMSException.rethrowAsJMSException("Cannot send a message to the JMS server", t);
923       }
924    }
925
926    /**
927     * Closing a session
928     *
929     * @param who the session
930     */

931    void sessionClosing(SpySession who)
932    {
933       if (trace)
934          log.trace("Closing session " + who);
935       
936       synchronized (createdSessions)
937       {
938          createdSessions.remove(who);
939       }
940
941       //This session should not be in the "destinations" object anymore.
942
//We could check this, though
943
}
944
945    void unsubscribe(DurableSubscriptionID id) throws JMSException JavaDoc
946    {
947       if (trace)
948          log.trace("Unsubscribe id=" + id + " " + this);
949       
950       try
951       {
952          serverIL.destroySubscription(connectionToken, id);
953       }
954       catch (Throwable JavaDoc t)
955       {
956          SpyJMSException.rethrowAsJMSException("Cannot destroy durable subscription " + id, t);
957       }
958    }
959
960    /**
961     * Check a tempoary destination
962     *
963     * @param destination the destination
964     */

965    void checkTemporary(Destination JavaDoc destination) throws JMSException JavaDoc
966    {
967       if (destination instanceof TemporaryQueue JavaDoc || destination instanceof TemporaryTopic JavaDoc)
968       {
969          synchronized (temps)
970          {
971             if (temps.contains(destination) == false)
972                throw new JMSException JavaDoc("Cannot create a consumer for a temporary destination from a different session. " + destination);
973          }
974       }
975    }
976
977    /**
978      * Check that a clientID exists. If not get one from server.
979      *
980      * Also sets the setClientIdAllowed to false.
981      *
982      * Check clientId, must be called by all public methods on the
983      * jacax.jmx.Connection interface and its children.
984      *
985      * @exception JMSException if clientID is null as post condition
986      */

987    synchronized protected void checkClientID() throws JMSException JavaDoc
988    {
989       if (setClientIdAllowed == false)
990          return;
991
992       setClientIdAllowed = false;
993       if (trace)
994          log.trace("Checking clientID=" + clientID + " " + this);
995       if (clientID == null)
996       {
997          askForAnID();//Request a random one
998
if (clientID == null)
999             throw new JMSException JavaDoc("Could not get a clientID");
1000         connectionToken.setClientID(clientID);
1001
1002         if (trace)
1003            log.trace("ClientID established " + this);
1004      }
1005   }
1006
1007   /**
1008     * Ask the server for an id
1009     *
1010     * @exception JMSException for any error
1011     */

1012   protected void askForAnID() throws JMSException JavaDoc
1013   {
1014      if (trace)
1015         log.trace("Ask for an id " + this);
1016      
1017      try
1018      {
1019         if (clientID == null)
1020            clientID = serverIL.getID();
1021      }
1022      catch (Throwable JavaDoc t)
1023      {
1024         SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
1025      }
1026   }
1027
1028   /**
1029     * Ask the server for an id
1030     *
1031     * @param userName the user
1032     * @param password the password
1033     * @exception JMSException for any error
1034     */

1035   protected void askForAnID(String JavaDoc userName, String JavaDoc password) throws JMSException JavaDoc
1036   {
1037      if (trace)
1038         log.trace("Ask for an id user=" + userName + " " + this);
1039
1040      try
1041      {
1042         String JavaDoc configuredClientID = serverIL.checkUser(userName, password);
1043         if (configuredClientID != null)
1044            clientID = configuredClientID;
1045      }
1046      catch (Throwable JavaDoc t)
1047      {
1048         SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
1049      }
1050   }
1051
1052   /**
1053    * Authenticate a user
1054    *
1055    * @param userName the user
1056    * @param password the password
1057    * @throws JMSException for any error
1058    */

1059   protected void authenticate(String JavaDoc userName, String JavaDoc password) throws JMSException JavaDoc
1060   {
1061      if (trace)
1062         log.trace("Authenticating user " + userName + " " + this);
1063      try
1064      {
1065         sessionId = serverIL.authenticate(userName, password);
1066      }
1067      catch (Throwable JavaDoc t)
1068      {
1069         SpyJMSException.rethrowAsJMSException("Cannot authenticate user", t);
1070      }
1071   }
1072
1073   // used to acknowledge a message
1074
/**
1075     * Acknowledge/Nack a message
1076     *
1077     * @param item the acknowledgement
1078     * @exception JMSException for any error
1079     */

1080   protected void send(AcknowledgementRequest item) throws JMSException JavaDoc
1081   {
1082      checkClosed();
1083      if (trace)
1084         log.trace("Acknowledge item=" + item + " " + this);
1085
1086      try
1087      {
1088         serverIL.acknowledge(connectionToken, item);
1089      }
1090      catch (Throwable JavaDoc t)
1091      {
1092         SpyJMSException.rethrowAsJMSException("Cannot acknowlege a message", t);
1093      }
1094   }
1095
1096   /**
1097     * Commit/rollback
1098     *
1099     * @param transaction the transaction request
1100     * @exception JMSException for any error
1101     */

1102   protected void send(TransactionRequest transaction) throws JMSException JavaDoc
1103   {
1104      checkClosed();
1105      if (trace)
1106         log.trace("Transact request=" + transaction + " " + this);
1107
1108      try
1109      {
1110         serverIL.transact(connectionToken, transaction);
1111      }
1112      catch (Throwable JavaDoc t)
1113      {
1114         SpyJMSException.rethrowAsJMSException("Cannot process a transaction", t);
1115      }
1116   }
1117
1118   /**
1119    * Recover
1120    *
1121    * @param flags the flags
1122    * @throws JMSException for any error
1123    */

1124   protected Xid JavaDoc[] recover(int flags) throws JMSException JavaDoc
1125   {
1126      checkClosed();
1127      if (trace)
1128         log.trace("Recover flags=" + flags + " " + this);
1129
1130      try
1131      {
1132         if (serverIL instanceof Recoverable)
1133         {
1134            Recoverable recoverableIL = (Recoverable) serverIL;
1135            return recoverableIL.recover(connectionToken, flags);
1136         }
1137      }
1138      catch (Throwable JavaDoc t)
1139      {
1140         SpyJMSException.rethrowAsJMSException("Cannot recover", t);
1141      }
1142      
1143      log.warn(serverIL + " does not implement " + Recoverable.class.getName());
1144      return new Xid JavaDoc[0];
1145   }
1146
1147   /**
1148     * Start the il
1149     *
1150     * @exception JMSException for any error
1151     */

1152   protected void startILService() throws JMSException JavaDoc
1153   {
1154      if (trace)
1155         log.trace("Starting the client il " + this);
1156      try
1157      {
1158         clientILService = genericConnectionFactory.createClientILService(this);
1159         clientILService.start();
1160         if (trace)
1161            log.trace("Using client id " + clientILService + " " + this);
1162         connectionToken = new ConnectionToken(clientID, clientILService.getClientIL(), sessionId);
1163         serverIL.setConnectionToken(connectionToken);
1164      }
1165      catch (Throwable JavaDoc t)
1166      {
1167         SpyJMSException.rethrowAsJMSException("Cannot start a the client IL service", t);
1168      }
1169   }
1170
1171   /**
1172     * Stop the il
1173     *
1174     * @exception JMSException for any error
1175     */

1176   protected void stopILService() throws JMSException JavaDoc
1177   {
1178      try
1179      {
1180         clientILService.stop();
1181      }
1182      catch (Throwable JavaDoc t)
1183      {
1184         SpyJMSException.rethrowAsJMSException("Cannot stop a the client IL service", t);
1185      }
1186   }
1187   
1188   /**
1189    * Stop delivery
1190    *
1191    * @param consumer the consumer
1192    */

1193   public void doStop() throws JMSException JavaDoc
1194   {
1195      if (modeStop)
1196         return;
1197      modeStop = true;
1198
1199      if (trace)
1200         log.trace("Stopping connection " + this);
1201
1202      try
1203      {
1204         serverIL.setEnabled(connectionToken, false);
1205      }
1206      catch (Throwable JavaDoc t)
1207      {
1208         SpyJMSException.rethrowAsJMSException("Cannot disable the connection with the JMS server", t);
1209      }
1210   }
1211   
1212   /**
1213    * Remove a consumer
1214    *
1215    * @param consumer the consumer
1216    */

1217   private void removeConsumerInternal(SpyConsumer consumer)
1218   {
1219      synchronized (subscriptions)
1220      {
1221         Subscription req = consumer.getSubscription();
1222         subscriptions.remove(new Integer JavaDoc(req.subscriptionId));
1223
1224         LinkedList JavaDoc ll = (LinkedList JavaDoc) destinationSubscriptions.get(req.destination);
1225         if (ll != null)
1226         {
1227            ll.remove(consumer);
1228            if (ll.size() == 0)
1229            {
1230               destinationSubscriptions.remove(req.destination);
1231            }
1232         }
1233      }
1234   }
1235   
1236   /**
1237    * Check whether we are closed
1238    *
1239    * @throws IllegalStateException when the session is closed
1240    */

1241   protected void checkClosed() throws IllegalStateException JavaDoc
1242   {
1243      if (closed.get())
1244         throw new IllegalStateException JavaDoc("The connection is closed");
1245   }
1246
1247   /**
1248    * Start the ping thread
1249    */

1250   private void startPingThread()
1251   {
1252      // Ping thread does not need to be running if the ping period is 0.
1253
if (pingPeriod == 0)
1254         return;
1255      pingTaskId = clockDaemon.executePeriodically(pingPeriod, new PingTask(), true);
1256   }
1257
1258   /**
1259    * Stop the ping thread
1260    */

1261   private void stopPingThread()
1262   {
1263      // Ping thread was not running if ping period is 0.
1264
if (pingPeriod == 0)
1265         return;
1266
1267      ClockDaemon.cancel(pingTaskId);
1268
1269      //Aquire the Semaphore to make sure the ping task is not running.
1270
try
1271      {
1272         pingTaskSemaphore.attempt(1000 * 10);
1273      }
1274      catch (InterruptedException JavaDoc e)
1275      {
1276         Thread.currentThread().interrupt();
1277      }
1278   }
1279
1280   /**
1281     * The ping task
1282     */

1283   class PingTask implements Runnable JavaDoc
1284   {
1285      /**
1286         * Main processing method for the PingTask object
1287         */

1288      public void run()
1289      {
1290         try
1291         {
1292            pingTaskSemaphore.acquire();
1293         }
1294         catch (InterruptedException JavaDoc e)
1295         {
1296            log.debug("Interrupted requesting ping semaphore");
1297            return;
1298         }
1299         try
1300         {
1301            if (ponged == false)
1302            {
1303               // Server did not pong use with in the timeout
1304
// period.. Assuming the connection is dead.
1305
throw new SpyJMSException("No pong received", new IOException JavaDoc("ping timeout."));
1306            }
1307
1308            ponged = false;
1309            pingServer(System.currentTimeMillis());
1310         }
1311         catch (Throwable JavaDoc t)
1312         {
1313            asynchFailure("Unexpected ping failure", t);
1314         }
1315         finally
1316         {
1317            pingTaskSemaphore.release();
1318         }
1319      }
1320   }
1321   
1322   /**
1323    * The Exception listener runnable
1324    */

1325   class ExceptionListenerRunnable implements Runnable JavaDoc
1326   {
1327      ExceptionListener JavaDoc el;
1328      JMSException JavaDoc excep;
1329      
1330      /**
1331       * Create a new ExceptionListener runnable
1332       *
1333       * @param el the exception exception
1334       * @param excep the jms exception
1335       */

1336      public ExceptionListenerRunnable(ExceptionListener JavaDoc el, JMSException JavaDoc excep)
1337      {
1338         this.el = el;
1339         this.excep = excep;
1340      }
1341      
1342      public void run()
1343      {
1344         try
1345         {
1346            synchronized (elLock)
1347            {
1348               el.onException(excep);
1349            }
1350         }
1351         catch (Throwable JavaDoc t)
1352         {
1353            log.warn("Connection failure: ", excep);
1354            log.warn("Exception listener ended abnormally: ", t);
1355         }
1356         
1357         synchronized (elLock)
1358         {
1359            elThread = null;
1360         }
1361      }
1362   }
1363}
1364
Popular Tags