KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > scalagent > kjoram > Session


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): Nicolas Tachker (ScalAgent)
23  */

24 package com.scalagent.kjoram;
25
26 import com.scalagent.kjoram.jms.*;
27 import com.scalagent.kjoram.util.TimerTask;
28
29 import java.util.*;
30
31 import com.scalagent.kjoram.excepts.IllegalStateException;
32 import com.scalagent.kjoram.excepts.*;
33
34
35 public class Session
36 {
37   public static final int SESSION_TRANSACTED = 0;
38   public static final int AUTO_ACKNOWLEDGE = 1;
39   public static final int CLIENT_ACKNOWLEDGE = 2;
40   public static final int DUPS_OK_ACKNOWLEDGE = 3;
41   
42   /** Task for closing the session if it becomes pending. */
43   private TimerTask closingTask = null;
44   /** <code>true</code> if the session's transaction is scheduled. */
45   private boolean scheduled = false;
46
47   /** Timer for replying to expired consumers' requests. */
48   private com.scalagent.kjoram.util.Timer consumersTimer = null;
49
50   /** The message listener of the session, if any. */
51   protected MessageListener messageListener = null;
52
53   /** The identifier of the session. */
54   String JavaDoc ident;
55   /** The connection the session belongs to. */
56   Connection cnx;
57   /** <code>true</code> if the session is transacted. */
58   boolean transacted;
59   /** The acknowledgement mode of the session. */
60   int acknowledgeMode;
61   /** <code>true</code> if the session is closed. */
62   boolean closed = false;
63   /** <code>true</code> if the session is started. */
64   boolean started = false;
65
66   /** <code>true</code> if the session's acknowledgements are automatic. */
67   boolean autoAck;
68
69   /** Vector of message consumers. */
70   Vector consumers;
71   /** Vector of message producers. */
72   Vector producers;
73   /** Vector of queue browsers. */
74   Vector browsers;
75   /** FIFO queue holding the asynchronous server deliveries. */
76   com.scalagent.kjoram.util.Queue repliesIn;
77   /** Daemon distributing asynchronous server deliveries. */
78   SessionDaemon daemon = null;
79   /** Counter of message listeners. */
80   int msgListeners = 0;
81   /**
82    * Table holding the <code>ProducerMessages</code> holding producers'
83    * messages and destinated to be sent at commit.
84    * <p>
85    * <b>Key:</b> destination name<br>
86    * <b>Object:</b> <code>ProducerMessages</code>
87    */

88   Hashtable sendings;
89   /**
90    * Table holding the identifiers of the messages delivered per
91    * destination or subscription, and not acknowledged.
92    * <p>
93    * <b>Key:</b> destination or subscription name<br>
94    * <b>Object:</b> <code>MessageAcks</code> instance
95    */

96   Hashtable deliveries;
97
98   /** The connection consumer delivering messages to the session, if any. */
99   ConnectionConsumer connectionConsumer = null;
100
101
102   /**
103    * Opens a session.
104    *
105    * @param cnx The connection the session belongs to.
106    * @param transacted <code>true</code> for a transacted session.
107    * @param acknowledgeMode 1 (auto), 2 (client) or 3 (dups ok).
108    *
109    * @exception JMSException In case of an invalid acknowledge mode.
110    */

111   Session(Connection cnx, boolean transacted,
112           int acknowledgeMode) throws JMSException
113   {
114     if (! transacted
115         && acknowledgeMode != Session.AUTO_ACKNOWLEDGE
116         && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE
117         && acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE)
118       throw new JMSException("Can't create a non transacted session with an"
119                              + " invalid acknowledge mode.");
120
121     this.ident = cnx.nextSessionId();
122     this.cnx = cnx;
123     this.transacted = transacted;
124     this.acknowledgeMode = acknowledgeMode;
125
126     autoAck = ! transacted
127               && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;
128
129     consumers = new Vector();
130     producers = new Vector();
131     browsers = new Vector();
132     repliesIn = new com.scalagent.kjoram.util.Queue();
133     sendings = new Hashtable();
134     deliveries = new Hashtable();
135
136     // If the session is transacted and the transactions limited by a timer,
137
// a closing task might be useful.
138
if (transacted && cnx.factoryParameters.txPendingTimer != 0)
139       closingTask = new SessionCloseTask();
140
141     cnx.sessions.addElement(this);
142
143     if (JoramTracing.dbgClient)
144       JoramTracing.log(JoramTracing.DEBUG,this + ": created.");
145   }
146
147   /** Returns a String image of this session. */
148   public String JavaDoc toString()
149   {
150     return "Sess:" + ident;
151   }
152
153
154   /**
155    * API method.
156    *
157    * @exception JMSException Actually never thrown.
158    */

159   public int getAcknowledgeMode() throws JMSException
160   {
161     return acknowledgeMode;
162   }
163
164   /**
165    * API method.
166    *
167    * @exception JMSException Actually never thrown.
168    */

169   public boolean getTransacted() throws JMSException
170   {
171     return transacted;
172   }
173
174   /**
175    * API method.
176    *
177    * @exception JMSException Actually never thrown.
178    */

179   public void setMessageListener(MessageListener messageListener)
180               throws JMSException
181   {
182     this.messageListener = messageListener;
183   }
184
185   /**
186    * API method.
187    *
188    * @exception JMSException Actually never thrown.
189    */

190   public MessageListener getMessageListener() throws JMSException
191   {
192     return messageListener;
193   }
194
195   /**
196    * API method.
197    *
198    * @exception IllegalStateException If the session is closed.
199    */

200   public Message createMessage() throws JMSException
201   {
202     if (closed)
203       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
204     
205     return new Message();
206   }
207
208   /**
209    * API method.
210    *
211    * @exception IllegalStateException If the session is closed.
212    */

213   public TextMessage createTextMessage() throws JMSException
214   {
215     if (closed)
216       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
217     
218     return new TextMessage();
219   }
220
221   /**
222    * API method.
223    *
224    * @exception IllegalStateException If the session is closed.
225    */

226   public TextMessage createTextMessage(String JavaDoc text)
227          throws JMSException
228   {
229     if (closed)
230       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
231    
232     TextMessage message = new TextMessage();
233     message.setText(text);
234     return message;
235   }
236
237   /**
238    * API method.
239    *
240    * @exception IllegalStateException If the session is closed.
241    */

242   public BytesMessage createBytesMessage()
243          throws JMSException
244   {
245     if (closed)
246       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
247     
248     return new BytesMessage();
249   }
250
251   /**
252    * API method.
253    *
254    * @exception IllegalStateException If the session is closed.
255    */

256   public MapMessage createMapMessage()
257          throws JMSException
258   {
259     if (closed)
260       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
261
262     return new MapMessage();
263   }
264
265   /**
266    * API method
267    *
268    * @exception IllegalStateException If the session is closed.
269    */

270   public QueueBrowser
271          createBrowser(Queue queue, String JavaDoc selector)
272          throws JMSException
273   {
274     if (closed)
275       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
276
277     return new QueueBrowser(this, (Queue) queue, selector);
278   }
279
280   /**
281    * API method
282    *
283    * @exception IllegalStateException If the session is closed.
284    */

285   public QueueBrowser createBrowser(Queue queue)
286          throws JMSException
287   {
288     if (closed)
289       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
290
291     return new QueueBrowser(this, (Queue) queue, null);
292   }
293
294   /**
295    * API method.
296    *
297    * @exception IllegalStateException If the session is closed or if the
298    * connection is broken.
299    * @exception JMSException If the creation fails for any other reason.
300    */

301   public MessageProducer createProducer(Destination dest)
302          throws JMSException
303   {
304     if (closed)
305       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
306
307     return new MessageProducer(this, (Destination) dest);
308   }
309
310   /**
311    * API method.
312    *
313    * @exception IllegalStateException If the session is closed or if the
314    * connection is broken.
315    * @exception JMSException If the creation fails for any other reason.
316    */

317   public MessageConsumer
318          createConsumer(Destination dest, String JavaDoc selector,
319                         boolean noLocal) throws JMSException
320   {
321     if (closed)
322       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
323
324     return new MessageConsumer(this, (Destination) dest, selector, null,
325                                noLocal);
326   }
327
328   /**
329    * API method.
330    *
331    * @exception IllegalStateException If the session is closed or if the
332    * connection is broken.
333    * @exception JMSException If the creation fails for any other reason.
334    */

335   public MessageConsumer
336          createConsumer(Destination dest, String JavaDoc selector)
337          throws JMSException
338   {
339     if (closed)
340       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
341
342     return new MessageConsumer(this, (Destination) dest, selector);
343   }
344
345   /**
346    * API method.
347    *
348    * @exception IllegalStateException If the session is closed or if the
349    * connection is broken.
350    * @exception JMSException If the creation fails for any other reason.
351    */

352   public MessageConsumer createConsumer(Destination dest)
353          throws JMSException
354   {
355     if (closed)
356       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
357
358     return new MessageConsumer(this, (Destination) dest, null);
359   }
360
361   /**
362    * API method.
363    *
364    * @exception IllegalStateException If the session is closed or if the
365    * connection is broken.
366    * @exception JMSException If the creation fails for any other reason.
367    */

368   public TopicSubscriber
369          createDurableSubscriber(Topic topic, String JavaDoc name,
370                                  String JavaDoc selector,
371                                  boolean noLocal) throws JMSException
372   {
373     if (closed)
374       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
375
376     return new TopicSubscriber(this, (Topic) topic, name, selector, noLocal);
377   }
378
379   /**
380    * API method.
381    *
382    * @exception IllegalStateException If the session is closed or if the
383    * connection is broken.
384    * @exception JMSException If the creation fails for any other reason.
385    */

386   public TopicSubscriber
387          createDurableSubscriber(Topic topic, String JavaDoc name)
388          throws JMSException
389   {
390     if (closed)
391       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
392
393     return new TopicSubscriber(this, (Topic) topic, name, null, false);
394   }
395
396   /**
397    * API method.
398    *
399    * @exception IllegalStateException If the session is closed.
400    */

401   public Queue createQueue(String JavaDoc queueName) throws JMSException
402   {
403     if (closed)
404       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
405
406     return new Queue(queueName);
407   }
408
409   /**
410    * API method.
411    *
412    * @exception IllegalStateException If the session is closed.
413    * @exception JMSException If the topic creation failed.
414    */

415   public Topic createTopic(String JavaDoc topicName) throws JMSException
416   {
417     if (closed)
418       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
419
420     // Checks if the topic to retrieve is the administration topic:
421
if (topicName.equals("#AdminTopic")) {
422       try {
423         GetAdminTopicReply reply =
424           (GetAdminTopicReply) cnx.syncRequest(new GetAdminTopicRequest());
425         if (reply.getId() != null)
426           return new Topic(reply.getId());
427         else
428           throw new JMSException("AdminTopic could not be retrieved.");
429       }
430       catch (JMSException exc) {
431         throw exc;
432       }
433       catch (Exception JavaDoc exc) {
434         throw new JMSException("AdminTopic could not be retrieved: " + exc);
435       }
436     }
437     return new Topic(topicName);
438   }
439
440   /**
441    * API method.
442    *
443    * @exception IllegalStateException If the session is closed or if the
444    * connection is broken.
445    * @exception JMSException If the request fails for any other reason.
446    */

447   public TemporaryQueue createTemporaryQueue() throws JMSException
448   {
449     if (closed)
450       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
451
452     SessCreateTDReply reply =
453       (SessCreateTDReply) cnx.syncRequest(new SessCreateTQRequest());
454     String JavaDoc tempDest = reply.getAgentId();
455     return new TemporaryQueue(tempDest, cnx);
456   }
457
458   /**
459    * API method.
460    *
461    * @exception IllegalStateException If the session is closed or if the
462    * connection is broken.
463    * @exception JMSException If the request fails for any other reason.
464    */

465   public TemporaryTopic createTemporaryTopic() throws JMSException
466   {
467     if (closed)
468       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
469
470     SessCreateTDReply reply =
471       (SessCreateTDReply) cnx.syncRequest(new SessCreateTTRequest());
472     String JavaDoc tempDest = reply.getAgentId();
473     return new TemporaryTopic(tempDest, cnx);
474   }
475
476   /** API method. */
477   public synchronized void run()
478   {
479     int load = repliesIn.size();
480     com.scalagent.kjoram.messages.Message momMsg;
481     String JavaDoc msgId;
482     String JavaDoc targetName = connectionConsumer.targetName;
483     boolean queueMode = connectionConsumer.queueMode;
484
485     if (JoramTracing.dbgClient)
486       JoramTracing.log(JoramTracing.DEBUG, "-- " + this
487                        + ": loaded with " + load
488                        + " message(s) and started.");
489     try {
490       // Processing the current number of messages in the queue:
491
for (int i = 0; i < load; i++) {
492         momMsg = (com.scalagent.kjoram.messages.Message) repliesIn.pop();
493         msgId = momMsg.getIdentifier();
494
495         // If no message listener has been set for the session, denying the
496
// processed message:
497
if (messageListener == null) {
498           JoramTracing.log(JoramTracing.ERROR,this + ": an"
499                            + " asynchronous delivery arrived for"
500                            + " a non existing session listener:"
501                            + " denying the message.");
502           
503           if (queueMode)
504             cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, true));
505           else
506             cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,false));
507         }
508         // Else:
509
else {
510           // Preparing ack for manual sessions:
511
if (! autoAck)
512             prepareAck(targetName, msgId, queueMode);
513   
514           // Passing the current message:
515
try {
516             messageListener.onMessage(Message.wrapMomMessage(this, momMsg));
517   
518             // Auto ack: acknowledging the message:
519
if (autoAck)
520               cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,
521                                                       queueMode));
522           }
523           // Catching a JMSException means that the building of the Joram
524
// message went wrong: denying the message:
525
catch (JMSException jE) {
526             JoramTracing.log(JoramTracing.ERROR, this
527                              + ": error while processing the"
528                              + " received message: " + jE);
529             if (queueMode)
530               cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
531                                                       queueMode));
532             else
533               cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
534                                                        queueMode));
535           }
536           // Catching a RuntimeException means that the client onMessage() code
537
// is incorrect; denying the message if needed:
538
catch (RuntimeException JavaDoc rE) {
539             JoramTracing.log(JoramTracing.ERROR,this
540                              + ": RuntimeException thrown"
541                              + " by the listener: " + rE);
542             if (autoAck && queueMode)
543               cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,
544                                                       queueMode));
545             else if (autoAck && ! queueMode)
546               cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,
547                                                        queueMode));
548           }
549         }
550       }
551     }
552     catch (JMSException e) {}
553   }
554
555   /**
556    * API method.
557    *
558    * @exception IllegalStateException If the session is closed, or not
559    * transacted, or if the connection is broken.
560    */

561   public void commit() throws JMSException
562   {
563     if (closed)
564       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
565
566     if (! transacted)
567       throw new IllegalStateException JavaDoc("Can't commit a non transacted"
568                                       + " session.");
569
570     if (JoramTracing.dbgClient)
571       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
572                        + ": committing...");
573
574     // If the transaction was scheduled: cancelling.
575
if (scheduled) {
576       closingTask.cancel();
577       scheduled = false;
578     }
579
580     // Sending client messages:
581
try {
582       Enumeration dests = sendings.keys();
583       String JavaDoc dest;
584       ProducerMessages pM;
585       while (dests.hasMoreElements()) {
586         dest = (String JavaDoc) dests.nextElement();
587         pM = (ProducerMessages) sendings.remove(dest);
588         cnx.syncRequest(pM);
589       }
590       // Acknowledging the received messages:
591
acknowledge();
592
593       if (JoramTracing.dbgClient)
594         JoramTracing.log(JoramTracing.DEBUG, this + ": committed.");
595     }
596     // Catching an exception if the sendings or acknowledgement went wrong:
597
catch (JMSException jE) {
598       TransactionRolledBackException tE =
599         new TransactionRolledBackException("A JMSException was thrown during"
600                                            + " the commit.");
601       tE.setLinkedException(jE);
602
603       JoramTracing.log(JoramTracing.ERROR, "Exception: " + tE);
604       
605       rollback();
606       throw tE;
607     }
608   }
609
610   /**
611    * API method.
612    *
613    * @exception IllegalStateException If the session is closed, or not
614    * transacted.
615    */

616   public void rollback() throws JMSException
617   {
618     if (closed)
619       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
620
621     if (! transacted)
622       throw new IllegalStateException JavaDoc("Can't rollback a non transacted"
623                                       + " session.");
624
625     if (JoramTracing.dbgClient)
626       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
627                        + ": rolling back...");
628
629     // If the transaction was scheduled: cancelling.
630
if (scheduled) {
631       closingTask.cancel();
632       scheduled = false;
633     }
634
635     // Denying the received messages:
636
deny();
637     // Deleting the produced messages:
638
sendings.clear();
639
640     if (JoramTracing.dbgClient)
641       JoramTracing.log(JoramTracing.DEBUG, this + ": rolled back.");
642   }
643
644   /**
645    * API method.
646    *
647    * @exception IllegalStateException If the session is closed, or transacted.
648    */

649   public void recover() throws JMSException
650   {
651     if (transacted)
652       throw new IllegalStateException JavaDoc("Can't recover a transacted session.");
653
654     if (JoramTracing.dbgClient)
655       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
656                        + " recovering...");
657
658     // Stopping the session, denying the received messages:
659
stop();
660     deny();
661     // Re-starting the session:
662
start();
663
664     if (JoramTracing.dbgClient)
665       JoramTracing.log(JoramTracing.DEBUG, this + ": recovered.");
666   }
667
668
669   /**
670    * API method.
671    *
672    * @exception IllegalStateException If the session is closed or if the
673    * connection is broken.
674    * @exception JMSException If the request fails for any other reason.
675    */

676   public void unsubscribe(String JavaDoc name) throws JMSException
677   {
678     MessageConsumer cons;
679     for (int i = 0; i < consumers.size(); i++) {
680       cons = (MessageConsumer) consumers.elementAt(i);
681       if (! cons.queueMode && cons.targetName.equals(name))
682         throw new JMSException("Can't delete durable subscription " + name
683                                + " as long as an active subscriber exists.");
684     }
685     cnx.syncRequest(new ConsumerUnsubRequest(name));
686   }
687
688   /**
689    * API method.
690    *
691    * @exception JMSException Actually never thrown.
692    */

693   public synchronized void close() throws JMSException
694   {
695     // Ignoring the call if the session is already closed:
696
if (closed)
697       return;
698
699     if (JoramTracing.dbgClient)
700       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
701                        + ": closing...");
702
703     // Finishing the timer, if any:
704
if (consumersTimer != null)
705       consumersTimer.cancel();
706
707     // Emptying the current pending deliveries:
708
try {
709       repliesIn.stop();
710     }
711     catch (InterruptedException JavaDoc iE) {}
712
713     // Stopping the session:
714
stop();
715
716     // Denying the non acknowledged messages:
717
if (transacted)
718       rollback();
719     else
720       deny();
721       
722     // Closing the session's resources:
723
while (! browsers.isEmpty())
724       ((QueueBrowser) browsers.elementAt(0)).close();
725     while (! consumers.isEmpty())
726       ((MessageConsumer) consumers.elementAt(0)).close();
727     while (! producers.isEmpty())
728       ((MessageProducer) producers.elementAt(0)).close();
729
730     cnx.sessions.removeElement(this);
731
732     closed = true;
733
734     if (JoramTracing.dbgClient)
735       JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
736   }
737
738   /** Schedules a consumer task to the session's timer. */
739   synchronized void schedule(TimerTask task, long timer)
740   {
741     if (consumersTimer == null)
742       consumersTimer = new com.scalagent.kjoram.util.Timer();
743
744     try {
745       consumersTimer.schedule(task, timer);
746     }
747     catch (Exception JavaDoc exc) {}
748   }
749   
750   /**
751    * Starts the asynchronous deliveries in the session.
752    * <p>
753    * This method is called either by a consumer when setting the first
754    * message listener of the session, if the connection is started, or
755    * by the starting connection if at least one listener has previously
756    * been set by a consumer.
757    * <p>
758    * It creates and starts a daemon dedicated to distributing the
759    * asynchronous deliveries arriving on the connection to their consumers.
760    *
761    * @exception IllegalStateException If the session is closed.
762    */

763   void start() throws IllegalStateException JavaDoc
764   {
765     if (closed)
766       throw new IllegalStateException JavaDoc("Forbidden call on a closed session.");
767
768     if (JoramTracing.dbgClient)
769       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
770                        + ": starting...");
771
772     repliesIn.start();
773
774     // Starting the daemon if needed:
775
if (! started && msgListeners > 0) {
776       daemon = new SessionDaemon(this);
777       daemon.setDaemon(false);
778       daemon.start();
779     }
780     started = true;
781
782     if (JoramTracing.dbgClient)
783       JoramTracing.log(JoramTracing.DEBUG, this + ": started.");
784   }
785
786   /**
787    * Stops the asynchronous deliveries processing in the session.
788    * <p>
789    * This method must be carefully used. When the session is stopped, the
790    * connection might very well going on pushing deliveries in the
791    * session's queue. If the session is never re-started, these deliveries
792    * will never be poped out, and this may lead to a situation of consumed
793    * but never acknowledged messages.
794    * <p>
795    * This fatal situation never occurs as the <code>stop()</code> method is
796    * either called by the <code>recover()</code> method, which then calls
797    * the <code>start()</code> method, or by the <code>Session.close()</code>
798    * and <code>Connection.stop()</code> methods, which first empty the
799    * session's deliveries and forbid any further push.
800    */

801   void stop()
802   {
803     // Ignoring the call if the session is already stopped:
804
if (! started)
805       return;
806
807     if (JoramTracing.dbgClient)
808       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
809                        + ": stopping...");
810
811     // Stopping the daemon if needed:
812
if (daemon != null) {
813       daemon.stop();
814       daemon = null;
815     }
816     // Synchronizing the stop() with the consumers:
817
if (consumers != null) {
818       MessageConsumer consumer;
819       for (int i = 0; i < consumers.size(); i++) {
820         consumer = (MessageConsumer) consumers.elementAt(i);
821         consumer.syncro();
822       }
823     }
824
825     started = false;
826
827     if (JoramTracing.dbgClient)
828       JoramTracing.log(JoramTracing.DEBUG, this + ": stopped.");
829   }
830
831   /**
832    * Method called by message producers when producing a message for
833    * preparing the session to later commit it.
834    *
835    * @param dest The destination the message is destinated to.
836    * @param msg The message.
837    */

838   void prepareSend(Destination dest, com.scalagent.kjoram.messages.Message msg)
839   {
840     // If the transaction was scheduled, cancelling:
841
if (scheduled)
842       closingTask.cancel();
843
844     ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName());
845     if (pM == null) {
846       pM = new ProducerMessages(dest.getName());
847       sendings.put(dest.getName(), pM);
848     }
849     pM.addMessage(msg);
850
851     // If the transaction was scheduled, re-scheduling it:
852
if (scheduled)
853       cnx.schedule(closingTask);
854   }
855
856   /**
857    * Method called by message consumers when receiving a message for
858    * preparing the session to later acknowledge or deny it.
859    *
860    * @param name Name of the destination or of the proxy subscription
861    * the message comes from.
862    * @param id Identifier of the consumed message.
863    * @param queueMode <code>true</code> if the message consumed comes from
864    * a queue.
865    */

866   void prepareAck(String JavaDoc name, String JavaDoc id, boolean queueMode)
867   {
868     // If the transaction was scheduled, cancelling:
869
if (scheduled)
870       closingTask.cancel();
871
872     MessageAcks acks = (MessageAcks) deliveries.get(name);
873     if (acks == null) {
874       acks = new MessageAcks(queueMode);
875       deliveries.put(name, acks);
876     }
877     acks.addId(id);
878
879     // If the transaction must be scheduled, scheduling it:
880
if (closingTask != null) {
881       scheduled = true;
882       cnx.schedule(closingTask);
883     }
884   }
885
886   /**
887    * Method acknowledging the received messages.
888    *
889    * @exception IllegalStateException If the connection is broken.
890    */

891   void acknowledge() throws IllegalStateException JavaDoc
892   {
893     String JavaDoc target;
894     MessageAcks acks;
895
896     Enumeration targets = deliveries.keys();
897     while (targets.hasMoreElements()) {
898       target = (String JavaDoc) targets.nextElement();
899       acks = (MessageAcks) deliveries.remove(target);
900       cnx.asyncRequest(new SessAckRequest(target, acks.getIds(),
901                                           acks.getQueueMode()));
902     }
903   }
904
905   /** Method denying the received messages. */
906   void deny()
907   {
908     try {
909       String JavaDoc target;
910       MessageAcks acks;
911       SessDenyRequest deny;
912
913       Enumeration targets = deliveries.keys();
914       while (targets.hasMoreElements()) {
915         target = (String JavaDoc) targets.nextElement();
916         acks = (MessageAcks) deliveries.remove(target);
917         deny = new SessDenyRequest(target, acks.getIds(), acks.getQueueMode());
918         if (acks.getQueueMode())
919           cnx.syncRequest(deny);
920         else
921           cnx.asyncRequest(deny);
922       }
923     }
924     catch (JMSException jE) {}
925   }
926
927   /**
928    * Method called by the session daemon for passing an
929    * asynchronous message delivery to the appropriate consumer.
930    */

931   void distribute(AbstractJmsReply asyncReply)
932   {
933     // Getting the message:
934
ConsumerMessages reply = (ConsumerMessages) asyncReply;
935
936     // Getting the consumer:
937
MessageConsumer cons = null;
938     if (reply.getQueueMode()) {
939       cons =
940         (MessageConsumer) cnx.requestsTable.remove(reply.getKey());
941     }
942     else
943       cons = (MessageConsumer) cnx.requestsTable.get(reply.getKey());
944
945     // Passing the message(s) to the consumer:
946
if (cons != null) {
947       Vector msgs = reply.getMessages();
948       for (int i = 0; i < msgs.size(); i++)
949         cons.onMessage((com.scalagent.kjoram.messages.Message) msgs.elementAt(i));
950     }
951     // The target consumer of the received message may be null if it has
952
// been closed without having stopped the connection: denying the
953
// deliveries.
954
else {
955       if (JoramTracing.dbgClient)
956         JoramTracing.log(JoramTracing.WARN, this + ": an asynchronous"
957                          + " delivery arrived for an improperly"
958                          + " closed consumer: denying the"
959                          + " messages.");
960
961       Vector msgs = reply.getMessages();
962       com.scalagent.kjoram.messages.Message msg;
963       Vector ids = new Vector();
964       for (int i = 0; i < msgs.size(); i++) {
965         msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i);
966         ids.addElement(msg.getIdentifier());
967       }
968   
969       if (ids.isEmpty())
970         return;
971   
972       try {
973         cnx.asyncRequest(new SessDenyRequest(reply.comesFrom(), ids,
974                                              reply.getQueueMode(), true));
975       }
976       catch (JMSException jE) {}
977     }
978   }
979
980   /**
981    * The <code>SessionCloseTask</code> class is used by non-XA transacted
982    * sessions for taking care of closing them if they tend to be pending,
983    * and if a transaction timer has been set.
984    */

985   private class SessionCloseTask extends TimerTask
986   {
987     /** Method called when the timer expires, actually closing the session. */
988     public void run()
989     {
990       try {
991         if (JoramTracing.dbgClient)
992           JoramTracing.log(JoramTracing.WARN, "Session closed "
993                            + "because of pending transaction");
994         close();
995       }
996       catch (Exception JavaDoc e) {}
997     }
998   }
999 }
1000
Popular Tags