KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > Session


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.client.jms;
25
26 import java.util.*;
27
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.TransactionRolledBackException JavaDoc;
30 import javax.jms.IllegalStateException JavaDoc;
31 import javax.jms.MessageFormatException JavaDoc;
32
33 import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
34 import org.objectweb.joram.client.jms.connection.Requestor;
35
36 import org.objectweb.joram.shared.client.*;
37
38 import org.objectweb.util.monolog.api.BasicLevel;
39 import org.objectweb.util.monolog.api.Logger;
40 import fr.dyade.aaa.util.Debug;
41
42 /**
43  * Implements the <code>javax.jms.Session</code> interface.
44  */

45 public class Session implements javax.jms.Session JavaDoc {
46
47   public static Logger logger = Debug.getLogger(Session.class.getName());
48   
49   
50   public static final String JavaDoc RECEIVE_ACK =
51       "org.objectweb.joram.client.jms.receiveAck";
52
53   public static boolean receiveAck = Boolean.getBoolean(RECEIVE_ACK);
54
55   /**
56    * Status of the session
57    */

58   private static class Status {
59     /**
60      * Status of the session when the connection is stopped.
61      * This is the initial status.
62      */

63     public static final int STOP = 0;
64
65     /**
66      * Status of the session when the connection is started.
67      */

68     public static final int START = 1;
69
70     /**
71      * Status of the connection when it is closed.
72      */

73     public static final int CLOSE = 2;
74
75     private static final String JavaDoc[] names = {
76       "STOP", "START", "CLOSE"};
77
78     public static String JavaDoc toString(int status) {
79       return names[status];
80     }
81   }
82
83   /**
84    * The way the session is used.
85    */

86   private static class SessionMode {
87     /**
88      * The session is still not used.
89      * This is the initial mode.
90      */

91     public static final int NONE = 0;
92
93     /**
94      * The session is used to synchronously receive messages.
95      */

96     public static final int RECEIVE = 1;
97
98     /**
99      * The session is used to asynchronously listen to messages.
100      */

101     public static final int LISTENER = 2;
102
103     /**
104      * The session is used by an application server.
105      */

106     public static final int APP_SERVER = 3;
107
108     private static final String JavaDoc[] names = {
109       "NONE", "RECEIVE", "LISTENER", "APP_SERVER"};
110
111     public static String JavaDoc toString(int status) {
112       return names[status];
113     }
114   }
115
116   /**
117    * The status of the current request.
118    * Only valid in the mode RECEIVE.
119    */

120   private static class RequestStatus {
121     /** No request. This is the initial status. */
122     public static final int NONE = 0;
123     /** A request is running (pending). */
124     public static final int RUN = 1;
125     /** The request is done. */
126     public static final int DONE = 2;
127
128     private static final String JavaDoc[] names = {
129       "NONE", "RUN", "DONE"};
130
131     public static String JavaDoc toString(int status) {
132       return names[status];
133     }
134   }
135
136   /** Task for closing the session if it becomes pending. */
137   private SessionCloseTask closingTask;
138
139   /** <code>true</code> if the session's transaction is scheduled. */
140   private boolean scheduled;
141
142   /** The message listener of the session, if any. */
143   protected javax.jms.MessageListener JavaDoc messageListener;
144
145   /** The identifier of the session. */
146   private String JavaDoc ident;
147
148   /** The connection the session belongs to. */
149   private Connection cnx;
150
151   /** <code>true</code> if the session is transacted. */
152   boolean transacted;
153
154   /** The acknowledgement mode of the session. */
155   private int acknowledgeMode;
156
157   /** <code>true</code> if the session's acknowledgements are automatic. */
158   private boolean autoAck;
159
160   /** Vector of message consumers. */
161   private Vector consumers;
162
163   /** Vector of message producers. */
164   private Vector producers;
165
166   /** Vector of queue browsers. */
167   private Vector browsers;
168
169   /** FIFO queue holding the asynchronous server deliveries. */
170   private fr.dyade.aaa.util.Queue repliesIn;
171
172   /** Daemon distributing asynchronous server deliveries. */
173   private SessionDaemon daemon;
174
175   /** Counter of message listeners. */
176   private int listenerCount;
177
178   /**
179    * Table holding the <code>ProducerMessages</code> holding producers'
180    * messages and destinated to be sent at commit.
181    * <p>
182    * <b>Key:</b> destination name<br>
183    * <b>Object:</b> <code>ProducerMessages</code>
184    */

185   Hashtable sendings;
186
187   /**
188    * Table holding the identifiers of the messages delivered per
189    * destination or subscription, and not acknowledged.
190    * <p>
191    * <b>Key:</b> destination or subscription name<br>
192    * <b>Object:</b> <code>MessageAcks</code> instance
193    */

194   Hashtable deliveries;
195
196   /**
197    * The request multiplexer used to communicate with the user proxy.
198    */

199   private RequestMultiplexer mtpx;
200
201   /**
202    * The requestor used by the session to communicate with the user proxy.
203    */

204   private Requestor requestor;
205
206   /**
207    * The requestor used by the session to make 'receive' with the user
208    * proxy. This second requestor is necessary because it must be closed
209    * during the session close (see method close).
210    */

211   private Requestor receiveRequestor;
212
213   /**
214    * Indicates that the session has been recovered by a message listener.
215    * Doesn't need to be volatile because it is only used by the SessionDaemon
216    * thread.
217    */

218   private boolean recover;
219
220   /**
221    * Status of the session: STOP, START, CLOSE
222    */

223   private int status;
224
225   /**
226    * Mode of the session: NONE, RECEIVE, LISTENER, APP_SERVER
227    */

228   private int sessionMode;
229
230   /**
231    * Status of the request: NONE, RUN, DONE.
232    */

233   private int requestStatus;
234
235   /**
236    * The message consumer currently making a request (null if none).
237    */

238   private MessageConsumer pendingMessageConsumer;
239
240   /**
241    * The current active control thread.
242    */

243   private Thread JavaDoc singleThreadOfControl;
244
245   /**
246    * Status boolean indicating whether the message input is activated or not
247    * for the message listeners.
248    */

249   private boolean passiveMsgInput;
250   
251   /**
252    * Used to synchronize the method close()
253    */

254   private Closer closer;
255   
256   /**
257    * Indicates whether the messages produced are asynchronously
258    * sent or not (without or with acknowledgement)
259    */

260   private boolean asyncSend;
261
262   /**
263    * Maximum number of messages that can be read at once from a queue.
264    */

265   private int queueMessageReadMax;
266   
267   /**
268    * Maximum number of acknowledgements that can be buffered in
269    * Session.DUPS_OK_ACKNOWLEDGE mode, default is 0.
270    */

271   private int topicAckBufferMax;
272   
273   /**
274    * This threshold is the maximum messages number over which the
275    * subscription is passivated.
276    */

277   private int topicPassivationThreshold;
278   
279   /**
280    * This threshold is the minimum messages number below which
281    * the subscription is activated.
282    */

283   private int topicActivationThreshold;
284   
285   private MessageConsumerListener messageConsumerListener;
286   
287   /**
288    * Opens a session.
289    *
290    * @param cnx The connection the session belongs to.
291    * @param transacted <code>true</code> for a transacted session.
292    * @param acknowledgeMode 1 (auto), 2 (client) or 3 (dups ok).
293    *
294    * @exception JMSException In case of an invalid acknowledge mode.
295    */

296   Session(Connection cnx,
297           boolean transacted,
298           int acknowledgeMode,
299           RequestMultiplexer mtpx)
300     throws JMSException JavaDoc {
301     if (! transacted
302         && acknowledgeMode != javax.jms.Session.AUTO_ACKNOWLEDGE
303         && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE
304         && acknowledgeMode != javax.jms.Session.DUPS_OK_ACKNOWLEDGE
305         && !(cnx instanceof XAQueueConnection)
306         && !(cnx instanceof XATopicConnection)
307         && !(cnx instanceof XAConnection))
308       throw new JMSException JavaDoc("Can't create a non transacted session with an"
309                              + " invalid acknowledge mode.");
310
311     this.ident = cnx.nextSessionId();
312     this.cnx = cnx;
313     this.transacted = transacted;
314     this.acknowledgeMode = acknowledgeMode;
315     this.mtpx = mtpx;
316     requestor = new Requestor(mtpx);
317     receiveRequestor = new Requestor(mtpx);
318
319     autoAck = ! transacted
320       && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE;
321
322     consumers = new Vector();
323     producers = new Vector();
324     browsers = new Vector();
325     repliesIn = new fr.dyade.aaa.util.Queue();
326     sendings = new Hashtable();
327     deliveries = new Hashtable();
328     
329     closer = new Closer();
330
331     // If the session is transacted and the transactions limited by a timer,
332
// a closing task might be useful.
333
if (transacted && cnx.getTxPendingTimer() > 0) {
334       closingTask = new SessionCloseTask(
335         cnx.getTxPendingTimer() * 1000);
336     }
337     
338     asyncSend = cnx.getAsyncSend();
339     queueMessageReadMax = cnx.getQueueMessageReadMax();
340     topicAckBufferMax = cnx.getTopicAckBufferMax();
341     topicActivationThreshold = cnx.getTopicActivationThreshold();
342     topicPassivationThreshold = cnx.getTopicPassivationThreshold();
343
344     setStatus(Status.STOP);
345     setSessionMode(SessionMode.NONE);
346     setRequestStatus(RequestStatus.NONE);
347   }
348
349   /**
350    * Sets the status of the session.
351    */

352   private void setStatus(int status) {
353     if (logger.isLoggable(BasicLevel.DEBUG))
354       logger.log(
355         BasicLevel.DEBUG,
356         "Session.setStatus(" +
357         Status.toString(status) + ')');
358     this.status = status;
359   }
360
361   boolean isStarted() {
362     return (status == Status.START);
363   }
364
365   /**
366    * Sets the session mode.
367    */

368   private void setSessionMode(int sessionMode) {
369     if (logger.isLoggable(BasicLevel.DEBUG))
370       logger.log(
371         BasicLevel.DEBUG,
372         "Session.setSessionMode(" +
373         SessionMode.toString(sessionMode) + ')');
374     this.sessionMode = sessionMode;
375   }
376
377   /**
378    * Sets the request status.
379    */

380   private void setRequestStatus(int requestStatus) {
381     if (logger.isLoggable(BasicLevel.DEBUG))
382       logger.log(
383         BasicLevel.DEBUG,
384         "Session.setRequestStatus(" +
385         RequestStatus.toString(requestStatus) + ')');
386     this.requestStatus = requestStatus;
387   }
388   
389   /**
390    * Checks if the session is closed.
391    * If true, an IllegalStateException is raised.
392    */

393   protected synchronized void checkClosed()
394     throws IllegalStateException JavaDoc {
395     if (status == Status.CLOSE)
396       throw new IllegalStateException JavaDoc(
397         "Forbidden call on a closed session.");
398   }
399
400   /**
401    * Checks if the calling thread is the thread of control. If not,
402    * an IllegalStateException is raised.
403    */

404   private synchronized void checkThreadOfControl()
405     throws IllegalStateException JavaDoc {
406     if (singleThreadOfControl != null &&
407         Thread.currentThread() != singleThreadOfControl)
408       throw new IllegalStateException JavaDoc("Illegal control thread");
409   }
410
411   /**
412    * Checks the session mode. If it is not the expected session mode,
413    * raises an IllegalStateException.
414    *
415    * @param expectedSessionMode the expected session mode.
416    */

417   private void checkSessionMode(
418     int expectedSessionMode)
419     throws IllegalStateException JavaDoc {
420     if (sessionMode == SessionMode.NONE) {
421       setSessionMode(sessionMode);
422     } else if (sessionMode != expectedSessionMode) {
423       throw new IllegalStateException JavaDoc("Bad session mode");
424     }
425   }
426
427   /** Returns a String image of this session. */
428   public String JavaDoc toString() {
429     return "Sess:" + ident;
430   }
431
432   /**
433    * API method.
434    *
435    * @exception JMSException Actually never thrown.
436    */

437   public final int getAcknowledgeMode() throws JMSException JavaDoc {
438     checkClosed();
439     return getAckMode();
440   }
441   
442   int getAckMode() {
443     if (transacted)
444       return Session.SESSION_TRANSACTED;
445     return acknowledgeMode;
446   }
447
448   /**
449    * API method.
450    *
451    * @exception IllegalStateException If the session is closed.
452    */

453   public synchronized final boolean getTransacted()
454     throws JMSException JavaDoc {
455     checkClosed();
456     return transacted;
457   }
458
459   /**
460    * set transacted.
461    * see connector ManagedConnectionImpl (Connector).
462    */

463   public void setTransacted(boolean t) {
464     if (status != Status.CLOSE) {
465       transacted = t;
466     }
467     // else should throw an exception but not expected in
468
// the connector.
469
}
470
471   /**
472    * API method.
473    *
474    * @exception JMSException Actually never thrown.
475    */

476   public synchronized void setMessageListener(
477     javax.jms.MessageListener JavaDoc messageListener)
478     throws JMSException JavaDoc {
479     checkSessionMode(SessionMode.APP_SERVER);
480     this.messageListener = messageListener;
481   }
482
483   /**
484    * API method.
485    *
486    * @exception JMSException Actually never thrown.
487    */

488   public synchronized javax.jms.MessageListener JavaDoc
489       getMessageListener()
490     throws JMSException JavaDoc {
491     return messageListener;
492   }
493
494   /**
495    * Creates a Message object.
496    * API method.
497    *
498    * @exception IllegalStateException If the session is closed.
499    */

500   public synchronized javax.jms.Message JavaDoc createMessage()
501     throws JMSException JavaDoc {
502     checkClosed();
503     return new Message();
504   }
505
506   /**
507    * Creates a <code>TextMessage</code> object.
508    * API method.
509    *
510    * @exception IllegalStateException If the session is closed.
511    */

512   public synchronized javax.jms.TextMessage JavaDoc createTextMessage()
513     throws JMSException JavaDoc {
514     checkClosed();
515     return new TextMessage();
516   }
517
518   /**
519    * Creates a <code>TextMessage</code> object with the specified text.
520    * API method.
521    *
522    * @exception IllegalStateException If the session is closed.
523    */

524   public synchronized javax.jms.TextMessage JavaDoc createTextMessage(String JavaDoc text)
525     throws JMSException JavaDoc {
526     checkClosed();
527     TextMessage message = new TextMessage();
528     message.setText(text);
529     return message;
530   }
531   
532   /**
533    * Creates a <code>BytesMessage</code> object.
534    * API method.
535    *
536    * @exception IllegalStateException If the session is closed.
537    */

538   public synchronized javax.jms.BytesMessage JavaDoc createBytesMessage()
539     throws JMSException JavaDoc {
540     checkClosed();
541     return new BytesMessage();
542   }
543
544   /**
545    * Creates a <code>MapMessage</code> object.
546    * API method.
547    *
548    * @exception IllegalStateException If the session is closed.
549    */

550   public synchronized javax.jms.MapMessage JavaDoc createMapMessage()
551     throws JMSException JavaDoc {
552     checkClosed();
553     return new MapMessage();
554   }
555
556   /**
557    * Creates a <code>ObjectMessage</code> object.
558    * API method.
559    *
560    * @exception IllegalStateException If the session is closed.
561    */

562   public synchronized javax.jms.ObjectMessage JavaDoc createObjectMessage()
563     throws JMSException JavaDoc {
564     checkClosed();
565     return new ObjectMessage();
566   }
567
568   /**
569    * Creates a <code>ObjectMessage</code> object.
570    * API method.
571    *
572    * @exception IllegalStateException If the session is closed.
573    */

574   public synchronized javax.jms.ObjectMessage JavaDoc createObjectMessage(
575     java.io.Serializable JavaDoc obj)
576     throws JMSException JavaDoc {
577     checkClosed();
578     ObjectMessage message = new ObjectMessage();
579     message.setObject(obj);
580     return message;
581   }
582
583   /**
584    * Creates a <code>StreamMessage</code> object.
585    * API method.
586    *
587    * @exception IllegalStateException If the session is closed.
588    */

589   public synchronized javax.jms.StreamMessage JavaDoc createStreamMessage()
590     throws JMSException JavaDoc {
591     checkClosed();
592     return new StreamMessage();
593   }
594
595   /**
596    * API method
597    *
598    * @exception IllegalStateException If the session is closed.
599    */

600   public synchronized javax.jms.QueueBrowser JavaDoc
601       createBrowser(javax.jms.Queue JavaDoc queue,
602                     String JavaDoc selector)
603     throws JMSException JavaDoc {
604     checkClosed();
605     checkThreadOfControl();
606     QueueBrowser qb = new QueueBrowser(this, (Queue) queue, selector);
607     browsers.addElement(qb);
608     return qb;
609   }
610
611   /**
612    * API method
613    *
614    * @exception IllegalStateException If the session is closed.
615    */

616   public synchronized javax.jms.QueueBrowser JavaDoc
617       createBrowser(javax.jms.Queue JavaDoc queue)
618     throws JMSException JavaDoc {
619     checkClosed();
620     checkThreadOfControl();
621     QueueBrowser qb = new QueueBrowser(this, (Queue) queue, null);
622     browsers.addElement(qb);
623     return qb;
624   }
625
626   /**
627    * Creates a MessageProducer to send messages to the specified destination.
628    * API method.
629    *
630    * @exception IllegalStateException If the session is closed or if the
631    * connection is broken.
632    * @exception JMSException If the creation fails for any other reason.
633    */

634   public synchronized javax.jms.MessageProducer JavaDoc createProducer(
635     javax.jms.Destination JavaDoc dest)
636     throws JMSException JavaDoc {
637     checkClosed();
638     checkThreadOfControl();
639     MessageProducer mp = new MessageProducer(
640       this,
641       (Destination) dest);
642     addProducer(mp);
643     return mp;
644   }
645
646   /**
647    * Creates a MessageConsumer for the specified destination using a
648    * message selector.
649    * API method.
650    *
651    * @exception IllegalStateException If the session is closed or if the
652    * connection is broken.
653    * @exception JMSException If the creation fails for any other reason.
654    */

655   public synchronized javax.jms.MessageConsumer JavaDoc
656       createConsumer(javax.jms.Destination JavaDoc dest,
657                      String JavaDoc selector,
658                      boolean noLocal)
659     throws JMSException JavaDoc {
660     checkClosed();
661     checkThreadOfControl();
662     MessageConsumer mc = new MessageConsumer(
663       this, (Destination) dest,
664       selector, null,
665       noLocal);
666     addConsumer(mc);
667     return mc;
668   }
669
670   /**
671    * Creates a MessageConsumer for the specified destination using a
672    * message selector.
673    * API method.
674    *
675    * @exception IllegalStateException If the session is closed or if the
676    * connection is broken.
677    * @exception JMSException If the creation fails for any other reason.
678    */

679   public synchronized javax.jms.MessageConsumer JavaDoc
680       createConsumer(javax.jms.Destination JavaDoc dest,
681                      String JavaDoc selector)
682     throws JMSException JavaDoc {
683     checkClosed();
684     checkThreadOfControl();
685     MessageConsumer mc = new MessageConsumer(
686       this, (Destination) dest, selector);
687     addConsumer(mc);
688     return mc;
689   }
690
691   /**
692    * Creates a MessageConsumer for the specified destination.
693    * API method.
694    *
695    * @exception IllegalStateException If the session is closed or if the
696    * connection is broken.
697    * @exception JMSException If the creation fails for any other reason.
698    */

699   public synchronized javax.jms.MessageConsumer JavaDoc
700       createConsumer(javax.jms.Destination JavaDoc dest)
701     throws JMSException JavaDoc {
702     checkClosed();
703     checkThreadOfControl();
704     MessageConsumer mc = new MessageConsumer(
705       this, (Destination) dest, null);
706     addConsumer(mc);
707     return mc;
708   }
709
710   /**
711    * API method.
712    *
713    * @exception IllegalStateException If the session is closed or if the
714    * connection is broken.
715    * @exception JMSException If the creation fails for any other reason.
716    */

717   public synchronized javax.jms.TopicSubscriber JavaDoc
718       createDurableSubscriber(javax.jms.Topic JavaDoc topic,
719                               String JavaDoc name,
720                               String JavaDoc selector,
721                               boolean noLocal)
722     throws JMSException JavaDoc {
723     if (logger.isLoggable(BasicLevel.DEBUG))
724       logger.log(
725         BasicLevel.DEBUG,
726         "Session.createDurableSubscriber(" +
727         topic + ',' + name + ',' +
728         selector + ',' + noLocal + ')');
729     checkClosed();
730     checkThreadOfControl();
731     TopicSubscriber ts = new TopicSubscriber(
732       this, (Topic) topic, name, selector, noLocal);
733     addConsumer(ts);
734     return ts;
735   }
736
737   /**
738    * API method.
739    *
740    * @exception IllegalStateException If the session is closed or if the
741    * connection is broken.
742    * @exception JMSException If the creation fails for any other reason.
743    */

744   public synchronized javax.jms.TopicSubscriber JavaDoc
745       createDurableSubscriber(javax.jms.Topic JavaDoc topic,
746                               String JavaDoc name)
747     throws JMSException JavaDoc {
748     if (logger.isLoggable(BasicLevel.DEBUG))
749       logger.log(
750         BasicLevel.DEBUG,
751         "Session.createDurableSubscriber(" +
752         topic + ',' + name + ')');
753     checkClosed();
754     checkThreadOfControl();
755     TopicSubscriber ts = new TopicSubscriber(
756       this, (Topic) topic, name, null, false);
757     addConsumer(ts);
758     return ts;
759   }
760
761   /**
762    * API method.
763    *
764    * @exception IllegalStateException If the session is closed.
765    */

766   public synchronized javax.jms.Queue JavaDoc createQueue(
767     String JavaDoc queueName)
768     throws JMSException JavaDoc {
769     checkClosed();
770     return new Queue(queueName);
771   }
772
773   /**
774    * API method.
775    *
776    * @exception IllegalStateException If the session is closed.
777    * @exception JMSException If the topic creation failed.
778    */

779   public synchronized javax.jms.Topic JavaDoc createTopic(
780     String JavaDoc topicName)
781     throws JMSException JavaDoc {
782     checkClosed();
783     checkThreadOfControl();
784
785     // Checks if the topic to retrieve is the administration topic:
786
if (topicName.equals("#AdminTopic")) {
787       try {
788         GetAdminTopicReply reply =
789           (GetAdminTopicReply) requestor.request(new GetAdminTopicRequest());
790         if (reply.getId() != null)
791           return new Topic(reply.getId());
792         else
793           throw new JMSException JavaDoc("AdminTopic could not be retrieved.");
794       }
795       catch (JMSException JavaDoc exc) {
796         throw exc;
797       }
798       catch (Exception JavaDoc exc) {
799         throw new JMSException JavaDoc("AdminTopic could not be retrieved: " + exc);
800       }
801     }
802     return new Topic(topicName);
803   }
804
805   /**
806    * API method.
807    *
808    * @exception IllegalStateException If the session is closed or if the
809    * connection is broken.
810    * @exception JMSException If the request fails for any other reason.
811    */

812   public synchronized javax.jms.TemporaryQueue JavaDoc createTemporaryQueue()
813     throws JMSException JavaDoc {
814     checkClosed();
815     checkThreadOfControl();
816
817     SessCreateTDReply reply =
818       (SessCreateTDReply) requestor.request(new SessCreateTQRequest());
819     String JavaDoc tempDest = reply.getAgentId();
820     return new TemporaryQueue(tempDest, cnx);
821   }
822
823   /**
824    * API method.
825    *
826    * @exception IllegalStateException If the session is closed or if the
827    * connection is broken.
828    * @exception JMSException If the request fails for any other reason.
829    */

830   public synchronized javax.jms.TemporaryTopic JavaDoc createTemporaryTopic()
831     throws JMSException JavaDoc {
832     checkClosed();
833     checkThreadOfControl();
834
835     SessCreateTDReply reply =
836       (SessCreateTDReply) requestor.request(new SessCreateTTRequest());
837     String JavaDoc tempDest = reply.getAgentId();
838     return new TemporaryTopic(tempDest, cnx);
839   }
840
841   /** API method. */
842   public synchronized void run() {
843     int load = repliesIn.size();
844
845     if (logger.isLoggable(BasicLevel.DEBUG))
846       logger.log(BasicLevel.DEBUG,
847                  "-- " + this + ": loaded with " + load +
848                  " message(s) and started.");
849
850     try {
851       // Processing the current number of messages in the queue:
852
for (int i = 0; i < load; i++) {
853         org.objectweb.joram.shared.messages.Message momMsg =
854           (org.objectweb.joram.shared.messages.Message) repliesIn.pop();
855         String JavaDoc msgId = momMsg.id;
856         
857         onMessage(momMsg, messageConsumerListener);
858       }
859     } catch (Exception JavaDoc exc) {
860       if (logger.isLoggable(BasicLevel.ERROR))
861         logger.log(BasicLevel.ERROR, "", exc);
862     }
863   }
864   
865   /**
866    * Called by MultiSessionConsumer
867    * ASF mode
868    */

869   void setMessageConsumerListener(MessageConsumerListener mcl) {
870     messageConsumerListener = mcl;
871   }
872       
873   /**
874    * API method.
875    *
876    * @exception IllegalStateException If the session is closed, or not
877    * transacted, or if the connection is broken.
878    */

879   public synchronized void commit() throws JMSException JavaDoc {
880     if (logger.isLoggable(BasicLevel.DEBUG))
881       logger.log(
882         BasicLevel.DEBUG,
883         "Session.commit()");
884
885     checkClosed();
886     checkThreadOfControl();
887
888     if (! transacted)
889       throw new IllegalStateException JavaDoc("Can't commit a non transacted"
890                                       + " session.");
891
892     if (logger.isLoggable(BasicLevel.DEBUG))
893       logger.log(BasicLevel.DEBUG, "--- " + this
894                                  + ": committing...");
895
896     // If the transaction was scheduled: cancelling.
897
if (scheduled) {
898       closingTask.cancel();
899       scheduled = false;
900     }
901
902     // Sending client messages:
903
try {
904       CommitRequest commitReq= new CommitRequest();
905       
906       Enumeration producerMessages = sendings.elements();
907       while (producerMessages.hasMoreElements()) {
908         ProducerMessages pM =
909           (ProducerMessages) producerMessages.nextElement();
910         commitReq.addProducerMessages(pM);
911       }
912       sendings.clear();
913       
914       // Acknowledging the received messages:
915
Enumeration targets = deliveries.keys();
916       while (targets.hasMoreElements()) {
917         String JavaDoc target = (String JavaDoc) targets.nextElement();
918         MessageAcks acks = (MessageAcks) deliveries.get(target);
919         commitReq.addAckRequest(
920           new SessAckRequest(
921             target,
922             acks.getIds(),
923             acks.getQueueMode()));
924       }
925       deliveries.clear();
926       
927       if (asyncSend) {
928         // Asynchronous sending
929
commitReq.setAsyncSend(true);
930         mtpx.sendRequest(commitReq);
931       } else {
932         requestor.request(commitReq);
933       }
934
935       if (logger.isLoggable(BasicLevel.DEBUG))
936         logger.log(BasicLevel.DEBUG, this + ": committed.");
937     }
938     // Catching an exception if the sendings or acknowledgement went wrong:
939
catch (JMSException JavaDoc jE) {
940       if (logger.isLoggable(BasicLevel.ERROR))
941         logger.log(BasicLevel.ERROR, "", jE);
942       TransactionRolledBackException JavaDoc tE =
943         new TransactionRolledBackException JavaDoc("A JMSException was thrown during"
944                                            + " the commit.");
945       tE.setLinkedException(jE);
946
947       if (logger.isLoggable(BasicLevel.ERROR))
948         logger.log(BasicLevel.ERROR, "Exception: " + tE);
949
950       rollback();
951       throw tE;
952     }
953   }
954
955   /**
956    * API method.
957    *
958    * @exception IllegalStateException If the session is closed, or not
959    * transacted.
960    */

961   public synchronized void rollback() throws JMSException JavaDoc {
962     if (logger.isLoggable(BasicLevel.DEBUG))
963       logger.log(
964         BasicLevel.DEBUG,
965         "Session.rollback()");
966
967     checkClosed();
968     checkThreadOfControl();
969
970     if (! transacted)
971