KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > Connection


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq;
23
24 import java.io.IOException JavaDoc;
25 import java.io.Serializable JavaDoc;
26 import java.util.Arrays JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.HashSet JavaDoc;
29 import java.util.LinkedList JavaDoc;
30
31 import javax.jms.ConnectionMetaData JavaDoc;
32 import javax.jms.Destination JavaDoc;
33 import javax.jms.ExceptionListener JavaDoc;
34 import javax.jms.IllegalStateException JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.JMSSecurityException JavaDoc;
37 import javax.jms.Queue JavaDoc;
38 import javax.jms.TemporaryQueue JavaDoc;
39 import javax.jms.TemporaryTopic JavaDoc;
40 import javax.transaction.xa.Xid JavaDoc;
41
42 import org.jboss.logging.Logger;
43 import org.jboss.mq.il.ClientILService;
44 import org.jboss.mq.il.ServerIL;
45 import org.jboss.util.UnreachableStatementException;
46
47 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
48 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
49 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
50 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
51
52 /**
53  * This class implements javax.jms.Connection.
54  *
55  * <p>
56  * It is also the gateway through wich all calls to the JMS server is done. To
57  * do its work it needs a ServerIL to invoke (@see
58  * org.jboss.mq.server.ServerIL).
59  * </p>
60  *
61  * <p>
62  * The (new from february 2002) logic for clientID is the following: if logging
63  * in with a user and passwork a preconfigured clientID may be automatically
64  * delivered from the server.
65  * </p>
66  *
67  * <p>
68  * If the client wants to set it's own clientID it must do so on a connection
69  * wich does not have a prefonfigured clientID and it must do so before it
70  * calls any other methods on the connection (even getClientID()). It is not
71  * allowable to use a clientID that either looks like JBossMQ internal one
72  * (beginning with ID) or a clientID that is allready in use by someone, or a
73  * clientID that is already preconfigured in the server.
74  * </p>
75  *
76  * <p>
77  * If a preconfigured ID is not get, or a valid one is not set, the server will
78  * set an internal ID. This ID is NEVER possible to use for durable
79  * subscriptions. If a prefconfigured ID or one manually set is possible to use
80  * to create a durable subscriptions is governed by the security configuration
81  * of JBossMQ. In the default setup, only preconfigured clientID's are possible
82  * to use. If using a SecurityManager, permissions to create a surable
83  * subscriptions is * the resiult of a combination of the following:
84  * </p>
85  * <p>- The clientID is not one of JBossMQ's internal.
86  * </p>
87  * <p>- The user is authenticated and has a role that has create set to true
88  * in the security config of the destination.
89  * </p>
90  *
91  * <p>
92  * Notes for JBossMQ developers: All calls, except close(), that is possible to
93  * do on a connection must call checkClientID()
94  * </p>
95  *
96  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
97  * @author Hiram Chirino (Cojonudo14@hotmail.com)
98  * @author <a HREF="pra@tim.se">Peter Antman</a>
99  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
100  * @version $Revision: 45317 $
101  */

102 public abstract class Connection implements Serializable JavaDoc, javax.jms.Connection JavaDoc
103 {
104    /** The serialVersionUID */
105    private static final long serialVersionUID = 87938199839407082L;
106
107    /** The threadGroup */
108    private static ThreadGroup JavaDoc threadGroup = new ThreadGroup JavaDoc("JBossMQ Client Threads");
109
110    /** The log */
111    static Logger log = Logger.getLogger(Connection.class);
112
113    /** Whether trace is enabled */
114    static boolean trace = log.isTraceEnabled();
115
116    /** Manages the thread that pings the connection to see if it is 'alive' */
117    static protected ClockDaemon clockDaemon = new ClockDaemon();
118
119    /** Maps a destination to a LinkedList of Subscriptions */
120    public HashMap JavaDoc destinationSubscriptions = new HashMap JavaDoc();
121
122    /** Maps a subscription id to a Subscription */
123    public HashMap JavaDoc subscriptions = new HashMap JavaDoc();
124
125    /** Is the connection stopped ? */
126    public boolean modeStop;
127
128    /** This is our connection to the JMS server */
129    protected ServerIL serverIL;
130
131    /** This is the clientID */
132    protected String JavaDoc clientID;
133
134    /** The connection token is used to identify our connection to the server. */
135    protected ConnectionToken connectionToken;
136
137    /** The object that sets up the client IL */
138    protected ClientILService clientILService;
139
140    /** How often to ping the connection */
141    protected long pingPeriod = 1000 * 60;
142
143    /** This field is reset when a ping is sent, set when ponged. */
144    protected boolean ponged = true;
145
146    /** This is used to know when the PingTask is running */
147    Semaphore pingTaskSemaphore = new Semaphore(1);
148
149    /** Identifies the PinkTask in the ClockDaemon */
150    Object JavaDoc pingTaskId;
151
152    /** Set a soon as close() is called on the connection. */
153    private SynchronizedBoolean closing = new SynchronizedBoolean(false);
154
155    /** Whether setClientId is Allowed */
156    private volatile boolean setClientIdAllowed = true;
157
158    /** LinkedList of all created sessions by this connection */
159    HashSet JavaDoc createdSessions;
160
161    /** Numbers subscriptions */
162    int subscriptionCounter = Integer.MIN_VALUE;
163
164    /** The lock for subscriptionCounter */
165    Object JavaDoc subCountLock = new Object JavaDoc();
166
167    /** Is the connection closed */
168    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
169
170    /** Used to control tranactions */
171    SpyXAResourceManager spyXAResourceManager;
172
173    /** The class that created this connection */
174    GenericConnectionFactory genericConnectionFactory;
175
176    /** Last message ID returned */
177    private int lastMessageID;
178
179    /** the exceptionListener */
180    private ExceptionListener JavaDoc exceptionListener;
181
182    /** The exception listener lock */
183    private Object JavaDoc elLock = new Object JavaDoc();
184    
185    /** The exception listener invocation thread */
186    private Thread JavaDoc elThread;
187    
188    /** Used in message id generation */
189    private StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
190
191    /** Used in message id generation */
192    private char[] charStack = new char[22];
193
194    /** The next session id */
195    String JavaDoc sessionId;
196
197    /** Temporary destinations created by this connection */
198    protected HashSet JavaDoc temps = new HashSet JavaDoc();
199    
200    static
201    {
202       log.debug("Setting the clockDaemon's thread factory");
203       clockDaemon.setThreadFactory(new ThreadFactory()
204       {
205          public Thread JavaDoc newThread(Runnable JavaDoc r)
206          {
207             Thread JavaDoc t = new Thread JavaDoc(getThreadGroup(), r, "Connection Monitor Thread");
208             t.setDaemon(true);
209             return t;
210          }
211       });
212    }
213
214    public static ThreadGroup JavaDoc getThreadGroup()
215    {
216       if (threadGroup.isDestroyed())
217          threadGroup = new ThreadGroup JavaDoc("JBossMQ Client Threads");
218       return threadGroup;
219    }
220
221    /**
222      * Create a new Connection
223      *
224      * @param userName the username
225      * @param password the password
226      * @param genericConnectionFactory the constructing class
227      * @throws JMSException for any error
228      */

229    Connection(String JavaDoc userName, String JavaDoc password, GenericConnectionFactory genericConnectionFactory) throws JMSException JavaDoc
230    {
231       //Set the attributes
232
createdSessions = new HashSet JavaDoc();
233       connectionToken = null;
234       lastMessageID = 0;
235       modeStop = true;
236
237       if (trace)
238          log.trace("Connection Initializing userName=" + userName + " " + this);
239       this.genericConnectionFactory = genericConnectionFactory;
240       genericConnectionFactory.initialise(this);
241
242       // Connect to the server
243
if (trace)
244          log.trace("Getting the serverIL " + this);
245       serverIL = genericConnectionFactory.createServerIL();
246       if (trace)
247          log.trace("serverIL=" + serverIL + " " + this);
248
249       // Register ourselves as a client
250
try
251       {
252          authenticate(userName, password);
253
254          if (userName != null)
255             askForAnID(userName, password);
256
257          startILService();
258       }
259       catch (Throwable JavaDoc t)
260       {
261          // Client registeration failed, close the connection
262
try
263          {
264             serverIL.connectionClosing(null);
265          }
266          catch (Throwable JavaDoc t2)
267          {
268             log.debug("Error closing the connection", t2);
269          }
270
271          SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
272       }
273
274       // Finish constructing the connection
275
try
276       {
277          if (trace)
278             log.trace("Creating XAResourceManager " + this);
279
280          // Setup the XA Resource manager,
281
spyXAResourceManager = new SpyXAResourceManager(this);
282
283          if (trace)
284             log.trace("Starting the ping thread " + this);
285          startPingThread();
286
287          if (trace)
288             log.trace("Connection establishment successful " + this);
289       }
290       catch (Throwable JavaDoc t)
291       {
292          // Could not complete the connection, tidy up
293
// the server and client ILs.
294
try
295          {
296             serverIL.connectionClosing(connectionToken);
297          }
298          catch (Throwable JavaDoc t2)
299          {
300             log.debug("Error closing the connection", t2);
301          }
302          try
303          {
304             stopILService();
305          }
306          catch (Throwable JavaDoc t2)
307          {
308             log.debug("Error stopping the client IL", t2);
309          }
310
311          SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
312       }
313    }
314
315    /**
316      * Create a new Connection
317      *
318      * @param genericConnectionFactory the constructing class
319      * @throws JMSException for any error
320      */

321    Connection(GenericConnectionFactory genericConnectionFactory) throws JMSException JavaDoc
322    {
323       this(null, null, genericConnectionFactory);
324    }
325
326    /**
327      * Gets the ServerIL attribute of the Connection object
328      *
329      * @return The ServerIL value
330      */

331    public ServerIL getServerIL()
332    {
333       return serverIL;
334    }
335
336    /**
337      * Notification from the server that the connection is closed
338      */

339    public void asynchClose()
340    {
341       // This obviously did something at some point?
342
}
343
344    /**
345      * Called by a TemporaryDestination which is going to be deleted()
346      *
347      * @param dest the temporary destination
348      */

349    public void asynchDeleteTemporaryDestination(SpyDestination dest)
350    {
351       if (trace)
352          log.trace("Deleting temporary destination " + dest);
353       try
354       {
355          deleteTemporaryDestination(dest);
356       }
357       catch (Throwable JavaDoc t)
358       {
359          asynchFailure("Error deleting temporary destination " + dest, t);
360       }
361    }
362
363    /**
364      * Gets the first consumer that is listening to a destination.
365      *
366      * @param requests the receive requests
367      */

368    public void asynchDeliver(ReceiveRequest requests[])
369    {
370       // If we are closing the connection, the server will nack the messages
371
if (closing.get())
372          return;
373
374       if (trace)
375          log.trace("Async deliver requests=" + Arrays.asList(requests) + " " + this);
376       
377       try
378       {
379          for (int i = 0; i < requests.length; i++)
380          {
381             ReceiveRequest r = requests[i];
382             if (trace)
383                log.trace("Processing request=" + r + " " + this);
384             
385             SpyConsumer consumer = (SpyConsumer) subscriptions.get(r.subscriptionId);
386             r.message.createAcknowledgementRequest(r.subscriptionId.intValue());
387
388             if (consumer == null)
389             {
390                send(r.message.getAcknowledgementRequest(false));
391                log.debug("WARNING: NACK issued due to non existent subscription " + r.message.header.messageId);
392                continue;
393             }
394
395             if (trace)
396                log.trace("Delivering messageid=" + r.message.header.messageId + " to consumer=" + consumer);
397             
398             consumer.addMessage(r.message);
399          }
400       }
401       catch (Throwable JavaDoc t)
402       {
403          asynchFailure("Error during async delivery", t);
404       }
405    }
406    /**
407      * Notification of a failure on this connection
408      *
409      * @param reason the reason for the failure
410      * @param t the throwable
411      */

412    public void asynchFailure(String JavaDoc reason, Throwable JavaDoc t)
413    {
414       if (trace)
415          log.trace("Notified of failure reason=" + reason + " " + this, t);
416
417       // Exceptions due to closing will be ignored.
418
if (closing.get())
419          return;
420
421       JMSException JavaDoc excep = SpyJMSException.getAsJMSException(reason, t);
422
423       synchronized (elLock)
424       {
425          ExceptionListener JavaDoc el = exceptionListener;
426          if (el != null && elThread == null)
427          {
428             try
429             {
430                Runnable JavaDoc run = new ExceptionListenerRunnable(el, excep);
431                elThread = new Thread JavaDoc(getThreadGroup(), run, "ExceptionListener " + this);
432                elThread.setDaemon(false);
433                elThread.start();
434             }
435             catch (Throwable JavaDoc t1)
436             {
437                log.warn("Connection failure: ", excep);
438                log.warn("Unable to start exception listener thread: ", t1);
439             }
440          }
441          else if (elThread != null)
442             log.warn("Connection failure, already in the exception listener", excep);
443          else
444             log.warn("Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect", excep);
445       }
446    }
447
448    /**
449      * Invoked when the server pong us
450      *
451      * @param serverTime the server time
452      */

453    public void asynchPong(long serverTime)
454    {
455       if (trace)
456          log.trace("PONG serverTime=" + serverTime + " " + this);
457       ponged = true;
458    }
459
460    /**
461      * Called by a TemporaryDestination which is going to be deleted
462      *
463      * @param dest the temporary destination
464      * @exception JMSException for any error
465      */

466    public void deleteTemporaryDestination(SpyDestination dest) throws JMSException JavaDoc
467    {
468       checkClosed();
469       if (trace)
470          log.trace("DeleteDestination dest=" + dest + " " + this);
471       try
472       {
473          //Ask the broker to delete() this TemporaryDestination
474
serverIL.deleteTemporaryDestination(connectionToken, dest);
475
476          //Remove it from the destinations list
477
synchronized (subscriptions)
478          {
479             destinationSubscriptions.remove(dest);
480          }
481
482          // Remove it from the temps list
483
synchronized (temps)
484          {
485             temps.remove(dest);
486          }
487       }
488       catch (Throwable JavaDoc t)
489       {
490          
491          SpyJMSException.rethrowAsJMSException("Cannot delete the TemporaryDestination", t);
492       }
493    }
494
495    public void setClientID(String JavaDoc cID) throws JMSException JavaDoc
496    {
497       checkClosed();
498       if (clientID != null)
499          throw new IllegalStateException JavaDoc("The connection has already a clientID");
500       if (setClientIdAllowed == false)
501          throw new IllegalStateException JavaDoc("SetClientID was not called emediately after creation of connection");
502
503       if (trace)
504          log.trace("SetClientID clientID=" + clientID + " " + this);
505
506       try
507       {
508          serverIL.checkID(cID);
509       }
510       catch (Throwable JavaDoc t)
511       {
512          SpyJMSException.rethrowAsJMSException("Cannot connect to the JMSServer", t);
513       }
514
515       clientID = cID;
516       connectionToken.setClientID(clientID);
517    }
518
519    public String JavaDoc getClientID() throws JMSException JavaDoc
520    {
521       checkClosed();
522       return clientID;
523    }
524
525    public ExceptionListener JavaDoc getExceptionListener() throws JMSException JavaDoc
526    {
527       checkClosed();
528       checkClientID();
529       return exceptionListener;
530    }
531
532    public void setExceptionListener(ExceptionListener JavaDoc listener) throws JMSException JavaDoc
533    {
534       checkClosed();
535       checkClientID();
536
537       exceptionListener = listener;
538    }
539
540    public ConnectionMetaData JavaDoc getMetaData() throws JMSException JavaDoc
541    {
542       checkClosed();
543       checkClientID();
544
545       return new SpyConnectionMetaData();
546    }
547
548    public synchronized void close() throws JMSException JavaDoc
549    {
550       if (closed.get())
551          return;
552       if (trace)
553          log.trace("Closing connection " + this);
554       
555       closing.set(true);
556
557       // We don't want to notify the exception listener
558
exceptionListener = null;
559
560       // The first exception
561
JMSException JavaDoc exception = null;
562
563       try
564       {
565          doStop();
566       }
567       catch (Throwable JavaDoc t)
568       {
569          log.trace("Error during stop", t);
570       }
571       
572       if (trace)
573          log.trace("Closing sessions " + this);
574       Object JavaDoc[] vect = null;
575       synchronized (createdSessions)
576       {
577          vect = createdSessions.toArray();
578       }
579       for (int i = 0; i < vect.length; i++)
580       {
581          SpySession session = (SpySession) vect[i];
582          try
583          {
584             session.close();
585          }
586          catch (Throwable JavaDoc t)
587          {
588             if (trace)
589                log.trace("Error closing session " + session, t);
590          }
591       }
592       if (trace)
593          log.trace("Closed sessions " + this);
594
595       if (trace)
596          log.trace("Notifying the server of close " + this);
597       try
598       {
599          serverIL.connectionClosing(connectionToken);
600       }
601       catch (Throwable JavaDoc t)
602       {
603          log.trace("Cannot close properly the connection", t);
604       }
605
606       if (trace)
607          log.trace("Stopping ping thread " + this);
608       try
609       {
610          stopPingThread();
611       }
612       catch (Throwable JavaDoc t)
613       {
614          if (exception == null)
615             exception = SpyJMSException.getAsJMSException("Cannot stop the ping thread", t);
616       }
617
618       if (trace)
619          log.trace("Stopping the ClientIL service " + this);
620       try
621       {
622          stopILService();
623       }
624       catch (Throwable JavaDoc t)
625       {
626          log.trace("Cannot stop the client il service", t);
627       }
628
629       // Only set the closed flag after all the objects that depend
630
// on this connection have been closed.
631
closed.set(true);
632
633       if (trace)
634          log.trace("Disconnected from server " + this);
635
636       // Throw the first exception
637
if (exception != null)
638          throw exception;
639    }
640
641    public void start() throws JMSException JavaDoc
642    {
643       checkClosed();
644       checkClientID();
645
646       if (modeStop == false)
647          return;
648       modeStop = false;
649
650       if (trace)
651          log.trace("Starting connection " + this);
652
653       try
654       {
655          serverIL.setEnabled(connectionToken, true);
656       }
657       catch (Throwable JavaDoc t)
658       {
659          SpyJMSException.rethrowAsJMSException("Cannot enable the connection with the JMS server", t);
660       }
661    }
662
663    public void stop() throws JMSException JavaDoc
664    {
665       checkClosed();
666       checkClientID();
667       doStop();
668    }
669
670    public String JavaDoc toString()
671    {
672       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
673       buffer.append("Connection@").append(System.identityHashCode(this));
674       buffer.append('[');
675       if (connectionToken != null)
676          buffer.append("token=").append(connectionToken);
677       else
678          buffer.append("clientID=").append(clientID);
679       if (closed.get())
680          buffer.append(" CLOSED");
681       else if (closing.get())
682          buffer.append(" CLOSING");
683       buffer.append(" rcvstate=");
684       if (modeStop)
685          buffer.append("STOPPED");
686       else
687          buffer.append("STARTED");
688       buffer.append(']');
689       return buffer.toString();
690    }
691
692    /**
693      * Get the next message id
694      * <p>
695      *
696      * All longs are less than 22 digits long
697      * <p>
698      *
699      * Note that in this routine we assume that System.currentTimeMillis() is
700      * non-negative always be non-negative (so don't set lastMessageID to a
701      * positive for a start).
702      *
703      * @return the next message id
704      * @throws JMSException for any error
705      */

706    String JavaDoc getNewMessageID() throws JMSException JavaDoc
707    {
708       checkClosed();
709       synchronized (sb)
710       {
711          sb.setLength(0);
712          sb.append(clientID);
713          sb.append('-');
714          long time = System.currentTimeMillis();
715          int count = 0;
716          do
717          {
718             charStack[count] = (char) ('0' + (time % 10));
719             time = time / 10;
720             ++count;
721          }
722          while (time != 0);
723          --count;
724          for (; count >= 0; --count)
725          {
726             sb.append(charStack[count]);
727          }
728          ++lastMessageID;
729          //avoid having to deal with negative numbers.
730
if (lastMessageID < 0)
731          {
732             lastMessageID = 0;
733          }
734          int id = lastMessageID;
735          count = 0;
736          do
737          {
738             charStack[count] = (char) ('0' + (id % 10));
739             id = id / 10;
740             ++count;
741          }
742          while (id != 0);
743          --count;
744          for (; count >= 0; --count)
745          {
746             sb.append(charStack[count]);
747          }
748          return sb.toString();
749       }
750    }
751
752    /**
753      * A new Consumer has been created.
754      * <p>
755      * We have to handle security issues, a consumer may actually not be allowed
756      * to be created
757      *
758      * @param consumer the consumer added
759      * @throws JMSException for any error
760      */

761    void addConsumer(SpyConsumer consumer) throws JMSException JavaDoc
762    {
763       checkClosed();
764       Subscription req = consumer.getSubscription();
765       synchronized (subCountLock)
766       {
767          req.subscriptionId = subscriptionCounter++;
768       }
769       req.connectionToken = connectionToken;
770       if (trace)
771          log.trace("addConsumer sub=" + req);
772
773       try
774       {
775          synchronized (subscriptions)
776          {
777             subscriptions.put(new Integer JavaDoc(req.subscriptionId), consumer);
778
779             LinkedList JavaDoc ll = (LinkedList JavaDoc) destinationSubscriptions.get(req.destination);
780             if (ll == null)
781             {
782                ll = new LinkedList JavaDoc();
783                destinationSubscriptions.put(req.destination, ll);
784             }
785
786             ll.add(consumer);
787          }
788
789          serverIL.subscribe(connectionToken, req);
790       }
791       catch (JMSSecurityException JavaDoc ex)
792       {
793          removeConsumerInternal(consumer);
794          throw ex;
795       }
796       catch (Throwable JavaDoc t)
797       {
798          SpyJMSException.rethrowAsJMSException("Cannot subscribe to this Destination: ", t);
799       }
800    }
801
802    /**
803      * Browse a queue
804      *
805      * @param queue the queue
806      * @param selector the selector
807      * @return an array of messages
808      * @exception JMSException for any error
809      */

810    SpyMessage[] browse(Queue JavaDoc queue, String JavaDoc selector) throws JMSException JavaDoc
811    {
812       checkClosed();
813       if (trace)
814          log.trace("Browsing queue=" + queue + " selector=" + selector + " " + this);
815
816       try
817       {
818          return serverIL.browse(connectionToken, queue, selector);
819       }
820       catch (Throwable JavaDoc t)
821       {
822          SpyJMSException.rethrowAsJMSException("Cannot browse the Queue", t);
823          throw new UnreachableStatementException();
824       }
825    }
826
827    /**
828      * Ping the server
829      *
830      * @param clientTime the start of the ping
831      * @throws JMSException for any error
832      */

833    void pingServer(long clientTime) throws JMSException JavaDoc
834    {
835       checkClosed();
836       trace = log.isTraceEnabled();
837       if (trace)
838          log.trace("PING " + clientTime + " " + this);
839
840       try
841       {
842          serverIL.ping(connectionToken, clientTime);
843       }
844       catch (Throwable JavaDoc t)
845       {
846          SpyJMSException.rethrowAsJMSException("Cannot ping the JMS server", t);
847       }
848    }
849
850    /**
851      * Receive a message
852      *
853      * @param sub the subscription
854      * @param wait the wait time
855      * @return the message or null if there isn't one
856      * @throws JMSException for any error
857      */

858    SpyMessage receive(Subscription sub, long wait) throws JMSException JavaDoc
859    {
860       checkClosed();
861       if (trace)
862          log.trace("Receive subscription=" + sub + " wait=" + wait);
863
864       try
865       {
866          SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait);
867          if (message != null)
868             message.createAcknowledgementRequest(sub.subscriptionId);
869          return message;
870       }
871       catch (Throwable JavaDoc t)
872       {
873          SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
874          throw new UnreachableStatementException();
875       }
876    }
877
878    /**
879     * Remove a consumer
880     *
881     * @param consumer the consumer
882     * @throws JMSException for any error
883     */

884    void removeConsumer(SpyConsumer consumer) throws JMSException JavaDoc
885    {
886       checkClosed();
887       Subscription req = consumer.getSubscription();
888       if (trace)
889          log.trace("removeConsumer req=" + req);
890
891       try
892       {
893          serverIL.unsubscribe(connectionToken, req.subscriptionId);
894
895          removeConsumerInternal(consumer);
896       }
897       catch (Throwable JavaDoc t)
898       {
899          SpyJMSException.rethrowAsJMSException("Cannot unsubscribe to this destination", t);
900       }
901
902    }
903
904    /**
905     * Send a message to the server
906     *
907     * @param mes the message
908     * @throws JMSException for any error
909     */

910    void sendToServer(SpyMessage mes) throws JMSException JavaDoc
911    {
912       checkClosed();
913       if (trace)
914          log.trace("SendToServer message=" + mes.header.jmsMessageID + " " + this);
915       
916       try
917       {
918          serverIL.addMessage(connectionToken, mes);
919       }
920       catch (Throwable JavaDoc t)
921       {
922          SpyJMSException.rethrowAsJMSException("Cannot send a message to the JMS server", t);
923       }
924    }
925
926    /**
927     * Closing a session
928     *
929     * @param who the session
930     */

931    void sessionClosing(SpySession who)
932    {
933       if (trace)
934          log.trace("Closing session " + who);
935       
936       synchronized (createdSessions)
93