KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > session > SessionImpl


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.jms.session;
31
32 import com.caucho.jms.AbstractDestination;
33 import com.caucho.jms.message.BytesMessageImpl;
34 import com.caucho.jms.message.MapMessageImpl;
35 import com.caucho.jms.message.MessageImpl;
36 import com.caucho.jms.message.ObjectMessageImpl;
37 import com.caucho.jms.message.StreamMessageImpl;
38 import com.caucho.jms.message.TextMessageImpl;
39 import com.caucho.log.Log;
40 import com.caucho.util.Alarm;
41 import com.caucho.util.L10N;
42 import com.caucho.util.ThreadPool;
43 import com.caucho.util.ThreadTask;
44
45 import javax.jms.*;
46 import javax.jms.IllegalStateException JavaDoc;
47 import java.io.Serializable JavaDoc;
48 import java.util.ArrayList JavaDoc;
49 import java.util.logging.Level JavaDoc;
50 import java.util.logging.Logger JavaDoc;
51
52 /**
53  * Manages the JMS session.
54  */

55 public class SessionImpl implements Session, ThreadTask {
56   protected static final Logger JavaDoc log = Log.open(SessionImpl.class);
57   protected static final L10N L = new L10N(SessionImpl.class);
58
59   private static final long SHUTDOWN_WAIT_TIME = 10000;
60
61   private boolean _isTransacted;
62   private int _acknowledgeMode;
63
64   private ClassLoader JavaDoc _classLoader;
65   
66   private ConnectionImpl _connection;
67   private ArrayList JavaDoc<MessageConsumerImpl> _consumers =
68     new ArrayList JavaDoc<MessageConsumerImpl>();
69   private MessageListener _messageListener;
70   private boolean _isAsynchronous;
71
72   // 4.4.1 - client's responsibility
73
private Thread JavaDoc _thread;
74
75   // transacted messages
76
private ArrayList JavaDoc<TransactedMessage> _transactedMessages;
77
78   private volatile boolean _isRunning;
79   private volatile boolean _isClosed;
80   private volatile boolean _hasMessage;
81
82   public SessionImpl(ConnectionImpl connection,
83              boolean isTransacted, int ackMode)
84     throws JMSException
85   {
86     _classLoader = Thread.currentThread().getContextClassLoader();
87     
88     _connection = connection;
89     _isTransacted = isTransacted;
90     if (isTransacted)
91       _acknowledgeMode = SESSION_TRANSACTED;
92     else {
93       switch (ackMode) {
94       case CLIENT_ACKNOWLEDGE:
95       case DUPS_OK_ACKNOWLEDGE:
96       case AUTO_ACKNOWLEDGE:
97     _acknowledgeMode = ackMode;
98     break;
99       default:
100     throw new JMSException(L.l("{0} is an illegal acknowledge mode",
101                    ackMode));
102       }
103     }
104     
105     _connection.addSession(this);
106   }
107
108   /**
109    * Returns the connection.
110    */

111   ConnectionImpl getConnection()
112   {
113     return _connection;
114   }
115
116   /**
117    * Returns the connection's clientID
118    */

119   public String JavaDoc getClientID()
120     throws JMSException
121   {
122     return _connection.getClientID();
123   }
124
125   /**
126    * Returns true if the connection is active.
127    */

128   public boolean isActive()
129   {
130     return ! _isClosed && _connection.isActive();
131   }
132
133   /**
134    * Returns true if the connection is active.
135    */

136   boolean isStopping()
137   {
138     return _connection.isStopping();
139   }
140
141   /**
142    * Returns true if the session is in a transaction.
143    */

144   public boolean getTransacted()
145     throws JMSException
146   {
147     checkOpen();
148     
149     return _isTransacted;
150   }
151
152   /**
153    * Returns the acknowledge mode for the session.
154    */

155   public int getAcknowledgeMode()
156     throws JMSException
157   {
158     checkOpen();
159     
160     return _acknowledgeMode;
161   }
162
163   /**
164    * Returns the message listener
165    */

166   public MessageListener getMessageListener()
167     throws JMSException
168   {
169     checkOpen();
170     
171     return _messageListener;
172   }
173
174   /**
175    * Sets the message listener
176    */

177   public void setMessageListener(MessageListener listener)
178     throws JMSException
179   {
180     checkOpen();
181     
182     _messageListener = listener;
183     setAsynchronous();
184   }
185
186   /**
187    * Set true for a synchronous session.
188    */

189   void setAsynchronous()
190   {
191     boolean oldAsynchronous = _isAsynchronous;
192     
193     _isAsynchronous = true;
194
195     notifyListener();
196   }
197
198   /**
199    * Set true for a synchronous session.
200    */

201   boolean isAsynchronous()
202   {
203     return _isAsynchronous;
204   }
205
206   /**
207    * Creates a new byte[] message.
208    */

209   public BytesMessage createBytesMessage()
210     throws JMSException
211   {
212     checkOpen();
213     
214     return new BytesMessageImpl();
215   }
216
217   /**
218    * Creates a new map message.
219    */

220   public MapMessage createMapMessage()
221     throws JMSException
222   {
223     checkOpen();
224     
225     return new MapMessageImpl();
226   }
227
228   /**
229    * Creates a message. Used when only header info is important.
230    */

231   public Message createMessage()
232     throws JMSException
233   {
234     checkOpen();
235     
236     return new MessageImpl();
237   }
238
239   /**
240    * Creates an object message.
241    */

242   public ObjectMessage createObjectMessage()
243     throws JMSException
244   {
245     checkOpen();
246     
247     return new ObjectMessageImpl();
248   }
249
250   /**
251    * Creates an object message.
252    *
253    * @param obj a serializable message.
254    */

255   public ObjectMessage createObjectMessage(Serializable JavaDoc obj)
256     throws JMSException
257   {
258     checkOpen();
259     
260     ObjectMessage msg = createObjectMessage();
261
262     msg.setObject(obj);
263
264     return msg;
265   }
266
267   /**
268    * Creates a stream message.
269    */

270   public StreamMessage createStreamMessage()
271     throws JMSException
272   {
273     checkOpen();
274     
275     return new StreamMessageImpl();
276   }
277
278   /**
279    * Creates a text message.
280    */

281   public TextMessage createTextMessage()
282     throws JMSException
283   {
284     checkOpen();
285     
286     return new TextMessageImpl();
287   }
288
289   /**
290    * Creates a text message.
291    */

292   public TextMessage createTextMessage(String JavaDoc message)
293     throws JMSException
294   {
295     checkOpen();
296     
297     TextMessage msg = createTextMessage();
298
299     msg.setText(message);
300
301     return msg;
302   }
303
304   /**
305    * Creates a consumer to receive messages.
306    *
307    * @param destination the destination to receive messages from.
308    */

309   public MessageConsumer createConsumer(Destination destination)
310     throws JMSException
311   {
312     checkOpen();
313
314     return createConsumer(destination, null, false);
315   }
316
317   /**
318    * Creates a consumer to receive messages.
319    *
320    * @param destination the destination to receive messages from.
321    * @param messageSelector query to restrict the messages.
322    */

323   public MessageConsumer createConsumer(Destination destination,
324                                         String JavaDoc messageSelector)
325     throws JMSException
326   {
327     checkOpen();
328     
329     return createConsumer(destination, messageSelector, false);
330   }
331
332   /**
333    * Creates a consumer to receive messages.
334    *
335    * @param destination the destination to receive messages from.
336    * @param messageSelector query to restrict the messages.
337    */

338   public MessageConsumer createConsumer(Destination destination,
339                                         String JavaDoc messageSelector,
340                                         boolean noLocal)
341     throws JMSException
342   {
343     checkOpen();
344
345     AbstractDestination dest = (AbstractDestination) destination;
346     
347     MessageConsumer consumer;
348     consumer = dest.createConsumer(this, messageSelector, noLocal);
349     
350     addConsumer((MessageConsumerImpl) consumer);
351
352     return consumer;
353   }
354
355   /**
356    * Creates a producer to produce messages.
357    *
358    * @param destination the destination to send messages from.
359    */

360   public MessageProducer createProducer(Destination destination)
361     throws JMSException
362   {
363     checkOpen();
364
365     AbstractDestination dest = (AbstractDestination) destination;
366
367     return dest.createProducer(this);
368   }
369
370   /**
371    * Creates a QueueBrowser to browse messages in the queue.
372    *
373    * @param queue the queue to send messages to.
374    */

375   public QueueBrowser createBrowser(Queue queue)
376     throws JMSException
377   {
378     checkOpen();
379     
380     return createBrowser(queue, null);
381   }
382
383   /**
384    * Creates a QueueBrowser to browse messages in the queue.
385    *
386    * @param queue the queue to send messages to.
387    */

388   public QueueBrowser createBrowser(Queue queue, String JavaDoc messageSelector)
389     throws JMSException
390   {
391     checkOpen();
392     
393     return ((AbstractDestination) queue).createBrowser(this, messageSelector);
394   }
395
396   /**
397    * Creates a new queue.
398    */

399   public Queue createQueue(String JavaDoc queueName)
400     throws JMSException
401   {
402     checkOpen();
403
404     return _connection.getConnectionFactory().createQueue(queueName);
405   }
406
407   /**
408    * Creates a temporary queue.
409    */

410   public TemporaryQueue createTemporaryQueue()
411     throws JMSException
412   {
413     checkOpen();
414     
415     return new TemporaryQueueImpl();
416   }
417
418   /**
419    * Creates a new topic.
420    */

421   public Topic createTopic(String JavaDoc topicName)
422     throws JMSException
423   {
424     checkOpen();
425
426     return _connection.getConnectionFactory().createTopic(topicName);
427   }
428
429   /**
430    * Creates a temporary topic.
431    */

432   public TemporaryTopic createTemporaryTopic()
433     throws JMSException
434   {
435     checkOpen();
436     
437     return new TemporaryTopicImpl();
438   }
439
440   /**
441    * Creates a durable subscriber to receive messages.
442    *
443    * @param topic the topic to receive messages from.
444    */

445   public TopicSubscriber createDurableSubscriber(Topic topic, String JavaDoc name)
446     throws JMSException
447   {
448     checkOpen();
449     
450     if (getClientID() == null)
451       throw new JMSException(L.l("connection may not create a durable subscriber because it does not have an assigned ClientID."));
452
453     return createDurableSubscriber(topic, name, null, false);
454   }
455
456   /**
457    * Creates a subscriber to receive messages.
458    *
459    * @param topic the topic to receive messages from.
460    * @param messageSelector topic to restrict the messages.
461    * @param noLocal if true, don't receive messages we've sent
462    */

463   public TopicSubscriber createDurableSubscriber(Topic topic,
464                                                  String JavaDoc name,
465                                                  String JavaDoc messageSelector,
466                                                  boolean noLocal)
467     throws JMSException
468   {
469     checkOpen();
470     
471     AbstractDestination topicImpl = (AbstractDestination) topic;
472
473     if (_connection.getDurableSubscriber(name) != null)
474       throw new JMSException(L.l("'{0}' is already an active durable subscriber",
475                  name));
476     
477     TopicSubscriber consumer;
478     consumer = topicImpl.createDurableSubscriber(this, messageSelector,
479                          noLocal, name);
480     
481     _connection.putDurableSubscriber(name, consumer);
482     
483     addConsumer((MessageConsumerImpl) consumer);
484
485     return consumer;
486   }
487
488   /**
489    * Unsubscribe from a durable subscription.
490    */

491   public void unsubscribe(String JavaDoc name)
492     throws JMSException
493   {
494     checkOpen();
495
496     _connection.removeDurableSubscriber(name);
497   }
498
499   /**
500    * Starts the session.
501    */

502   void start()
503   {
504     notifyListener();
505   }
506
507   /**
508    * Stops the session.
509    */

510   void stop()
511   {
512     synchronized (_consumers) {
513       _consumers.notifyAll();
514
515       long timeout = Alarm.getCurrentTime() + SHUTDOWN_WAIT_TIME;
516       while (_isRunning && Alarm.getCurrentTime() < timeout) {
517     try {
518       _consumers.wait(SHUTDOWN_WAIT_TIME);
519     
520       if (Alarm.isTest()) {
521         return;
522       }
523     } catch (Throwable JavaDoc e) {
524     }
525       }
526     }
527   }
528   
529   /**
530    * Commits the messages.
531    */

532   public void commit()
533     throws JMSException
534   {
535     checkOpen();
536
537     if (! _isTransacted)
538       throw new IllegalStateException JavaDoc(L.l("commit() can only be called on a transacted session."));
539
540
541     ArrayList JavaDoc<TransactedMessage> messages = _transactedMessages;
542     if (messages != null) {
543       try {
544     for (int i = 0; i < messages.size(); i++) {
545       messages.get(i).send();
546     }
547       } finally {
548     messages.clear();
549       }
550     }
551
552     acknowledge();
553   }
554   
555   /**
556    * Commits the messages.
557    */

558   public void acknowledge()
559     throws JMSException
560   {
561     checkOpen();
562
563     for (int i = 0; i < _consumers.size(); i++) {
564       MessageConsumerImpl consumer = _consumers.get(i);
565
566       try {
567     consumer.acknowledge();
568       } catch (Throwable JavaDoc e) {
569     log.log(Level.WARNING, e.toString(), e);
570       }
571     }
572   }
573   
574   /**
575    * Rollsback the messages.
576    */

577   public void rollback()
578     throws JMSException
579   {
580     checkOpen();
581
582     if (! _isTransacted)
583       throw new IllegalStateException JavaDoc(L.l("rollback() can only be called on a transacted session."));
584     
585     if (_transactedMessages != null)
586       _transactedMessages.clear();
587
588     
589     for (int i = 0; i < _consumers.size(); i++) {
590       MessageConsumerImpl consumer = _consumers.get(i);
591
592       try {
593     consumer.rollback();
594       } catch (Throwable JavaDoc e) {
595     log.log(Level.WARNING, e.toString(), e);
596       }
597     }
598   }
599   
600   /**
601    * Recovers the messages.
602    */

603   public void recover()
604     throws JMSException
605   {
606     checkOpen();
607
608     if (_isTransacted)
609       throw new IllegalStateException JavaDoc(L.l("recover() may not be called on a transacted session."));
610     
611     for (int i = 0; i < _consumers.size(); i++) {
612       MessageConsumerImpl consumer = _consumers.get(i);
613
614       try {
615     consumer.rollback();
616       } catch (Throwable JavaDoc e) {
617     log.log(Level.WARNING, e.toString(), e);
618       }
619     }
620   }
621   
622   /**
623    * Closes the session
624    */

625   public void close()
626     throws JMSException
627   {
628     if (_isClosed)
629       return;
630
631     try {
632       stop();
633     } catch (Throwable JavaDoc e) {
634       log.log(Level.WARNING, e.toString(), e);
635     }
636
637     for (int i = 0; i < _consumers.size(); i++) {
638       MessageConsumerImpl consumer = _consumers.get(i);
639
640       try {
641     consumer.rollback();
642       } catch (Throwable JavaDoc e) {
643     log.log(Level.WARNING, e.toString(), e);
644       }
645
646       try {
647     consumer.close();
648       } catch (Throwable JavaDoc e) {
649     log.log(Level.WARNING, e.toString(), e);
650       }
651     }
652
653     try {
654       _connection.removeSession(this);
655     } finally {
656       _isClosed = true;
657     }
658
659     _classLoader = null;
660   }
661
662   protected void addConsumer(MessageConsumerImpl consumer)
663   {
664     if (_consumers == null)
665       _consumers = new ArrayList JavaDoc<MessageConsumerImpl>();
666
667     _consumers.add(consumer);
668
669     notifyListener();
670   }
671
672   protected void removeConsumer(MessageConsumerImpl consumer)
673   {
674     if (_consumers != null)
675       _consumers.remove(consumer);
676   }
677
678   /**
679    * Notifies the receiver.
680    */

681   void notifyListener()
682   {
683     _hasMessage = true;
684
685     synchronized (_consumers) {
686       _consumers.notifyAll();
687     }
688     
689     if (_isAsynchronous) {
690       ThreadPool.getThreadPool().schedule(this);
691       // the yield is only needed for the regressions
692
Thread.yield();
693     }
694   }
695
696   /**
697    * Adds a message to the session message queue.
698    */

699   public void send(AbstractDestination queue,
700                    MessageImpl message,
701                    int deliveryMode,
702                    int priority,
703                    long expiration)
704     throws JMSException
705   {
706     checkOpen();
707     
708     message.setJMSMessageID(queue.generateMessageID());
709     message.setJMSDestination(queue);
710     message.setJMSDeliveryMode(deliveryMode);
711     message.setJMSTimestamp(Alarm.getCurrentTime());
712     message.setJMSExpiration(expiration);
713     message.setJMSPriority(priority);
714     
715     MessageImpl destMessage = ((MessageImpl) message).copy();
716     destMessage.setSession(this);
717     destMessage.setReceive();
718
719     if (_isTransacted) {
720       if (_transactedMessages == null)
721     _transactedMessages = new ArrayList JavaDoc<TransactedMessage>();
722
723       TransactedMessage transMsg = new TransactedMessage(queue, message);
724       
725       _transactedMessages.add(transMsg);
726     }
727     else
728       queue.send(destMessage);
729   }
730
731   /**
732    * Called to synchronously receive a message.
733    */

734   protected Message receive(MessageConsumerImpl consumer,
735                 long timeout)
736     throws JMSException
737   {
738     throw new UnsupportedOperationException JavaDoc();
739     /*
740     checkOpen();
741     
742     if (Long.MAX_VALUE / 2 < timeout || timeout < 0)
743       timeout = Long.MAX_VALUE / 2;
744     
745     long now = Alarm.getCurrentTime();
746     long failTime = Alarm.getCurrentTime() + timeout;
747     
748     Selector selector = consumer.getSelector();
749     AbstractDestination queue;
750     queue = (AbstractDestination) consumer.getDestination();
751
752     // 4.4.1 user's reponsibility
753     // checkThread();
754
755     Thread oldThread = Thread.currentThread();
756     try {
757       // _thread = Thread.currentThread();
758       
759       while (! consumer.isClosed()) {
760     if (isActive()) {
761       Message msg = queue.receive(selector);
762       if (msg != null)
763         return msg;
764       _hasMessage = false;
765     }
766       
767     long delta = failTime - Alarm.getCurrentTime();
768
769     if (delta <= 0 || _isClosed || Alarm.isTest())
770       return null;
771
772     synchronized (_consumers) {
773       if (! _hasMessage || ! isActive()) {
774         try {
775           _consumers.wait(delta);
776         } catch (Throwable e) {
777         }
778       }
779     }
780       }
781     } finally {
782       // _thread = oldThread;
783     }
784
785     return null;
786     */

787   }
788
789   /**
790    * Called to synchronously receive messages
791    */

792   public void run()
793   {
794     _hasMessage = true;
795     Thread JavaDoc thread = Thread.currentThread();
796
797     while (_hasMessage && isActive() && ! isStopping()) {
798       synchronized (_consumers) {
799     if (_isRunning)
800       return;
801
802     _isRunning = true;
803       }
804
805       try {
806     // _thread = Thread.currentThread();
807
_hasMessage = false;
808
809     for (int i = 0; i < _consumers.size(); i++) {
810       MessageConsumerImpl consumer = _consumers.get(i);
811       //AbstractDestination queue;
812
//queue = (AbstractDestination) consumer.getDestination();
813
//Selector selector = consumer.getSelector();
814
MessageListener listener = consumer.getMessageListener();
815         
816       if (_messageListener != null)
817         listener = _messageListener;
818         
819       if (consumer.isActive() && ! isStopping() && listener != null) {
820         try {
821           Message msg = consumer.receiveNoWait();
822
823           if (msg != null) {
824         _hasMessage = true;
825
826         if (log.isLoggable(Level.FINE))
827           log.fine("JMS " + msg + " delivered to " + listener);
828
829         ClassLoader JavaDoc oldLoader = thread.getContextClassLoader();
830         try {
831           thread.setContextClassLoader(_classLoader);
832           listener.onMessage(msg);
833         } finally {
834           thread.setContextClassLoader(oldLoader);
835         }
836           }
837         } catch (Throwable JavaDoc e) {
838           log.log(Level.WARNING, e.toString(), e);
839         }
840       }
841     }
842       } finally {
843     // _thread = null;
844

845     synchronized (_consumers) {
846       _isRunning = false;
847       
848       _consumers.notifyAll();
849     }
850       }
851     }
852   }
853
854   /**
855    * Checks that the session is open.
856    */

857   public void checkOpen()
858     throws IllegalStateException JavaDoc
859   {
860     if (_isClosed)
861       throw new IllegalStateException JavaDoc(L.l("session is closed"));
862   }
863
864   /**
865    * Verifies that multiple threads aren't using the session.
866    *
867    * 4.4.1 the client takes the responsibility. There's no
868    * validation check.
869    */

870   void checkThread()
871     throws JMSException
872   {
873     Thread JavaDoc thread = _thread;
874     
875     if (thread != Thread.currentThread() && thread != null) {
876       Exception JavaDoc e = new IllegalStateException JavaDoc(L.l("Can't use session from concurrent threads."));
877       log.log(Level.WARNING, e.toString(), e);
878     }
879   }
880
881   static class TransactedMessage {
882     private AbstractDestination _queue;
883     private MessageImpl _message;
884
885     TransactedMessage(AbstractDestination queue, MessageImpl message)
886     {
887       _queue = queue;
888       _message = message;
889     }
890
891     void send()
892       throws JMSException
893     {
894       _queue.send(_message);
895     }
896   }
897 }
898
Popular Tags