KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > client > JmsSession


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: JmsSession.java,v 1.2 2005/03/18 03:36:37 tanderson Exp $
44  */

45 package org.exolab.jms.client;
46
47 import java.io.Serializable JavaDoc;
48 import java.util.ArrayList JavaDoc;
49 import java.util.HashMap JavaDoc;
50 import java.util.List JavaDoc;
51 import java.util.Vector JavaDoc;
52 import javax.jms.BytesMessage JavaDoc;
53 import javax.jms.Connection JavaDoc;
54 import javax.jms.Destination JavaDoc;
55 import javax.jms.IllegalStateException JavaDoc;
56 import javax.jms.InvalidDestinationException JavaDoc;
57 import javax.jms.InvalidSelectorException JavaDoc;
58 import javax.jms.JMSException JavaDoc;
59 import javax.jms.MapMessage JavaDoc;
60 import javax.jms.Message JavaDoc;
61 import javax.jms.MessageConsumer JavaDoc;
62 import javax.jms.MessageListener JavaDoc;
63 import javax.jms.MessageProducer JavaDoc;
64 import javax.jms.ObjectMessage JavaDoc;
65 import javax.jms.Queue JavaDoc;
66 import javax.jms.QueueBrowser JavaDoc;
67 import javax.jms.Session JavaDoc;
68 import javax.jms.StreamMessage JavaDoc;
69 import javax.jms.TemporaryQueue JavaDoc;
70 import javax.jms.TemporaryTopic JavaDoc;
71 import javax.jms.TextMessage JavaDoc;
72 import javax.jms.Topic JavaDoc;
73 import javax.jms.TopicSubscriber JavaDoc;
74
75 import org.apache.commons.logging.Log;
76 import org.apache.commons.logging.LogFactory;
77
78 import org.exolab.jms.message.BytesMessageImpl;
79 import org.exolab.jms.message.MapMessageImpl;
80 import org.exolab.jms.message.MessageConverter;
81 import org.exolab.jms.message.MessageConverterFactory;
82 import org.exolab.jms.message.MessageImpl;
83 import org.exolab.jms.message.MessageSessionIfc;
84 import org.exolab.jms.message.ObjectMessageImpl;
85 import org.exolab.jms.message.StreamMessageImpl;
86 import org.exolab.jms.message.TextMessageImpl;
87 import org.exolab.jms.server.ServerSession;
88
89
90 /**
91  * Client implementation of the <code>javax.jms.Session</code> interface.
92  *
93  * @author <a HREF="mailto:jima@exoffice.com">Jim Alateras</a>
94  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
95  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:36:37 $
96  */

97 class JmsSession implements Session JavaDoc, JmsMessageListener, MessageSessionIfc {
98
99     /**
100      * The owner of the session.
101      */

102     private JmsConnection _connection;
103
104     /**
105      * The proxy to the remote session implementation
106      */

107     private ServerSession _session = null;
108
109     /**
110      * If true, indicates that the session has been closed.
111      */

112     private volatile boolean _closed = false;
113
114     /**
115      * If true, indicates that the session is in the process of being closed.
116      */

117     private volatile boolean _closing = false;
118
119     /**
120      * This flag determines whether message delivery is enabled or disabled.
121      * Message delivery if disabled if the enclosing connection is stopped.
122      */

123     private volatile boolean _stopped = true;
124
125     /**
126      * A transacted session is bounded by successive commit. If this variable
127      * set to true then this session is transacted. This implies that the
128      * session is always in a transaction and transactions are demarcated by
129      * commit or rollback.
130      */

131     private final boolean _transacted;
132
133     /**
134      * Indicates whether the consumer or the client will acknowledge any
135      * messages it receives. Ignored if the session is transacted. Legal values
136      * are <code>Session.AUTO_ACKNOWLEDGE</code>,
137      * <code>Session.CLIENT_ACKNOWLEDGE</code> and
138      * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
139      */

140     private final int _ackMode;
141
142     /**
143      * Maintains the a map of JmsMessageConsumer.getConsumerId() ->
144      * JmsMessageConsumer objects.
145      */

146     private HashMap JavaDoc _consumers = new HashMap JavaDoc();
147
148     /**
149      * Maintains a list of producers for the session.
150      */

151     private List JavaDoc _producers = new ArrayList JavaDoc();
152
153     /**
154      * Maintain a collection of acked messages for a transacted session.
155      * These messages are only sent to the server on commit.
156      */

157     private List JavaDoc _messagesToSend = new ArrayList JavaDoc();
158
159     /**
160      * This is the session's session listener which is used to receive all
161      * messages associated with all consumers registered with this session.
162      */

163     private MessageListener JavaDoc _listener = null;
164
165     /**
166      * The message cache holds all messages for the session, allocated by a
167      * JmsConnectionConsumer.
168      */

169     private Vector JavaDoc _messageCache = new Vector JavaDoc();
170
171     /**
172      * Monitor used to block consumers, if the session has been stopped, or no
173      * messages are available
174      */

175     private final Object JavaDoc _receiveLock = new Object JavaDoc();
176
177     /**
178      * The logger
179      */

180     private static final Log _log = LogFactory.getLog(JmsSession.class);
181
182
183     /**
184      * Construct a new <code>JmsSession</code>
185      *
186      * @param connection the owner of the session
187      * @param transacted if <code>true</code>, the session is transacted.
188      * @param ackMode indicates whether the consumer or the client will
189      * acknowledge any messages it receives. This parameter
190      * will be ignored if the session is transacted. Legal
191      * values are <code>Session.AUTO_ACKNOWLEDGE</code>,
192      * <code>Session.CLIENT_ACKNOWLEDGE</code> and
193      * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
194      * @throws JMSException if the session cannot be created
195      */

196     public JmsSession(JmsConnection connection, boolean transacted,
197                       int ackMode) throws JMSException JavaDoc {
198         if (connection == null) {
199             throw new IllegalArgumentException JavaDoc("Argument 'connection' is null");
200         }
201
202         _connection = connection;
203         _transacted = transacted;
204         _ackMode = ackMode;
205
206         // construct the remote stub
207
_session = connection.getServerConnection().createSession(_ackMode,
208                                                                 transacted);
209
210         // set up this instance to be a message listener
211
_session.setMessageListener(this);
212
213         // now we need to check whether we should start the session
214
if (!connection.isStopped()) {
215             start();
216         }
217     }
218
219     /**
220      * Creates a <code>BytesMessage</code> object. A <code>BytesMessage</code>
221      * object is used to send a message containing a stream of uninterpreted
222      * bytes.
223      *
224      * @throws JMSException if the JMS provider fails to create this message due
225      * to some internal error.
226      */

227     public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc {
228         ensureOpen();
229         return new BytesMessageImpl();
230     }
231
232     /**
233      * Creates a <code>MapMessage</code> object. A <code>MapMessage</code>
234      * object is used to send a self-defining set of name-value pairs, where
235      * names are <code>String</code> objects and values are primitive values in
236      * the Java programming language.
237      *
238      * @throws JMSException if the JMS provider fails to create this message due
239      * to some internal error.
240      */

241     public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc {
242         ensureOpen();
243         return new MapMessageImpl();
244     }
245
246     /**
247      * Creates a <code>Message</code> object. The <code>Message</code> interface
248      * is the root interface of all JMS messages. A <code>Message</code> object
249      * holds all the standard message header information. It can be sent when a
250      * message containing only header information is sufficient.
251      *
252      * @throws JMSException if the JMS provider fails to create this message due
253      * to some internal error.
254      */

255     public Message JavaDoc createMessage() throws JMSException JavaDoc {
256         ensureOpen();
257         return new MessageImpl();
258     }
259
260     /**
261      * Creates an <code>ObjectMessage</code> object. An <code>ObjectMessage</code>
262      * object is used to send a message that contains a serializable Java
263      * object.
264      *
265      * @throws JMSException if the JMS provider fails to create this message due
266      * to some internal error.
267      */

268     public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc {
269         ensureOpen();
270         return new ObjectMessageImpl();
271     }
272
273     /**
274      * Creates an initialized <code>ObjectMessage</code> object. An
275      * <code>ObjectMessage</code> object is used to send a message that contains
276      * a serializable Java object.
277      *
278      * @param object the object to use to initialize this message
279      * @throws JMSException if the JMS provider fails to create this message due
280      * to some internal error.
281      */

282     public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object)
283             throws JMSException JavaDoc {
284         ensureOpen();
285         ObjectMessageImpl result = new ObjectMessageImpl();
286         result.setObject(object);
287         return result;
288     }
289
290     /**
291      * Creates a <code>StreamMessage</code> object. A <code>StreamMessage</code>
292      * object is used to send a self-defining stream of primitive values in the
293      * Java programming language.
294      *
295      * @throws JMSException if the JMS provider fails to create this message due
296      * to some internal error.
297      */

298     public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc {
299         ensureOpen();
300         return new StreamMessageImpl();
301     }
302
303     /**
304      * Creates a <code>TextMessage</code> object. A <code>TextMessage</code>
305      * object is used to send a message containing a <code>String</code>
306      * object.
307      *
308      * @throws JMSException if the JMS provider fails to create this message due
309      * to some internal error.
310      */

311     public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc {
312         ensureOpen();
313         return new TextMessageImpl();
314     }
315
316     /**
317      * Creates an initialized <code>TextMessage</code> object. A
318      * <code>TextMessage</code> object is used to send a message containing a
319      * <code>String</code>.
320      *
321      * @param text the string used to initialize this message
322      * @throws JMSException if the JMS provider fails to create this message due
323      * to some internal error.
324      */

325     public TextMessage JavaDoc createTextMessage(String JavaDoc text) throws JMSException JavaDoc {
326         ensureOpen();
327         TextMessageImpl result = new TextMessageImpl();
328         result.setText(text);
329         return result;
330     }
331
332     /**
333      * Determines if the session is transacted
334      *
335      * @return <code>true</code> if the session is transacted
336      * @throws JMSException if the session is closed
337      */

338     public boolean getTransacted() throws JMSException JavaDoc {
339         ensureOpen();
340         return _transacted;
341     }
342
343     /**
344      * Returns the acknowledgement mode of the session. The acknowledgement mode
345      * is set at the time that the session is created. If the session is
346      * transacted, the acknowledgement mode is ignored.
347      *
348      * @return If the session is not transacted, returns the current
349      * acknowledgement mode for the session. If the session is
350      * transacted, returns SESSION_TRANSACTED.
351      * @throws JMSException if the JMS provider fails to return the
352      * acknowledgment mode due to some internal error.
353      * @see Connection#createSession
354      */

355     public int getAcknowledgeMode() throws JMSException JavaDoc {
356         ensureOpen();
357         return _ackMode;
358     }
359
360     /**
361      * Creates a <code>MessageProducer</code> to send messages to the specified
362      * destination.
363      *
364      * @param destination the <code>Destination</code> to send to, or null if
365      * this is a producer which does not have a specified
366      * destination.
367      * @throws JMSException if the session fails to create a
368      * MessageProducer due to some internal
369      * error.
370      * @throws InvalidDestinationException if an invalid destination is
371      * specified.
372      */

373     public MessageProducer JavaDoc createProducer(Destination JavaDoc destination)
374             throws JMSException JavaDoc {
375         return new JmsMessageProducer(this, destination);
376     }
377
378     /**
379      * Creates a <code>MessageConsumer</code> for the specified destination.
380      *
381      * @param destination the <code>Destination</code> to access.
382      * @throws JMSException if the session fails to create a
383      * consumer due to some internal error.
384      * @throws InvalidDestinationException if an invalid destination is
385      * specified.
386      */

387
388     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination)
389             throws JMSException JavaDoc {
390         return createConsumer(destination, null);
391     }
392
393     /**
394      * Creates a <code>MessageProducer</code> to to receive messages from the
395      * specified destination, matching particular selection criteria
396      *
397      * @param destination the <code>Destination</code> to access
398      * @param messageSelector only messages with properties matching the message
399      * selector expression are delivered. A value of null
400      * or an empty string indicates that there is no
401      * message selector for the message consumer.
402      * @throws JMSException if the session fails to create a
403      * MessageConsumer due to some internal
404      * error.
405      * @throws InvalidDestinationException if an invalid destination is
406      * specified.
407      * @throws InvalidSelectorException if the message selector is invalid.
408      */

409     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination,
410                                           String JavaDoc messageSelector)
411             throws JMSException JavaDoc {
412         return createConsumer(destination, messageSelector, false);
413     }
414
415     /**
416      * Creates a <code>MessageConsumer</code> to to receive messages from the
417      * specified destination, matching particular selection criteria. This
418      * method can specify whether messages published by its own connection
419      * should be delivered to it, if the destination is a topic. <P>In some
420      * cases, a connection may both publish and subscribe to a topic. The
421      * consumer <code>noLocal</code> attribute allows a consumer to inhibit the
422      * delivery of messages published by its own connection. The default value
423      * for this attribute is false. The <code>noLocal</code> value must be
424      * supported by destinations that are topics.
425      *
426      * @param destination the <code>Destination</code> to access
427      * @param messageSelector only messages with properties matching the message
428      * selector expression are delivered. A value of null
429      * or an empty string indicates that there is no
430      * message selector for the message consumer.
431      * @param noLocal if true, and the destination is a topic, inhibits
432      * the delivery of messages published by its own
433      * connection. The behavior for <code>noLocal</code>
434      * is not specified if the destination is a queue.
435      * @throws JMSException if the session fails to create a
436      * MessageConsumer due to some internal
437      * error.
438      * @throws InvalidDestinationException if an invalid destination is
439      * specified.
440      * @throws InvalidSelectorException if the message selector is invalid.
441      */

442     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination,
443                                           String JavaDoc messageSelector,
444                                           boolean noLocal) throws JMSException JavaDoc {
445         long consumerId = allocateConsumer(destination, messageSelector,
446                                            noLocal);
447         JmsMessageConsumer consumer = new JmsMessageConsumer(this, consumerId,
448                                                              destination,
449                                                              messageSelector);
450         addConsumer(consumer);
451         return consumer;
452     }
453
454     /**
455      * Creates a queue identity given a <code>Queue</code> name.
456      * <p/>
457      * <P>This facility is provided for the rare cases where clients need to
458      * dynamically manipulate queue identity. It allows the creation of a queue
459      * identity with a provider-specific name. Clients that depend on this
460      * ability are not portable.
461      * <p/>
462      * <P>Note that this method is not for creating the physical queue. The
463      * physical creation of queues is an administrative task and is not to be
464      * initiated by the JMS API. The one exception is the creation of temporary
465      * queues, which is accomplished with the <code>createTemporaryQueue</code>
466      * method.
467      *
468      * @param queueName the name of this <code>Queue</code>
469      * @return a <code>Queue</code> with the given name
470      * @throws JMSException if the session fails to create a queue due to some
471      * internal error.
472      */

473     public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc {
474         ensureOpen();
475
476         JmsQueue queue = null;
477
478         if (queueName != null && queueName.length() > 0) {
479             queue = new JmsQueue(queueName);
480         } else {
481             throw new JMSException JavaDoc(
482                     "Cannot create a queue with null or empty name");
483         }
484
485         return queue;
486     }
487
488     /**
489      * Creates a topic identity given a <code>Topic</code> name.
490      * <p/>
491      * <P>This facility is provided for the rare cases where clients need to
492      * dynamically manipulate topic identity. This allows the creation of a
493      * topic identity with a provider-specific name. Clients that depend on this
494      * ability are not portable.
495      * <p/>
496      * <P>Note that this method is not for creating the physical topic. The
497      * physical creation of topics is an administrative task and is not to be
498      * initiated by the JMS API. The one exception is the creation of temporary
499      * topics, which is accomplished with the <code>createTemporaryTopic</code>
500      * method.
501      *
502      * @param topicName the name of this <code>Topic</code>
503      * @return a <code>Topic</code> with the given name
504      * @throws JMSException if the session fails to create a topic due to some
505      * internal error.
506      */

507     public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc {
508         ensureOpen();
509
510         JmsTopic topic = null;
511
512         if (topicName != null && topicName.length() > 0) {
513             topic = new JmsTopic(topicName);
514         } else {
515             throw new JMSException JavaDoc("Invalid or null topic name specified");
516         }
517
518         return topic;
519     }
520
521     /**
522      * Creates a durable subscriber to the specified topic.
523      * <p/>
524      * <P>If a client needs to receive all the messages published on a topic,
525      * including the ones published while the subscriber is inactive, it uses a
526      * durable <code>TopicSubscriber</code>. The JMS provider retains a record
527      * of this durable subscription and insures that all messages from the
528      * topic's publishers are retained until they are acknowledged by this
529      * durable subscriber or they have expired.
530      * <p/>
531      * <P>Sessions with durable subscribers must always provide the same client
532      * identifier. In addition, each client must specify a name that uniquely
533      * identifies (within client identifier) each durable subscription it
534      * creates. Only one session at a time can have a <code>TopicSubscriber</code>
535      * for a particular durable subscription.
536      * <p/>
537      * <P>A client can change an existing durable subscription by creating a
538      * durable <code>TopicSubscriber</code> with the same name and a new topic
539      * and/or message selector. Changing a durable subscriber is equivalent to
540      * unsubscribing (deleting) the old one and creating a new one.
541      * <p/>
542      * <P>In some cases, a connection may both publish and subscribe to a topic.
543      * The subscriber <code>noLocal</code> attribute allows a subscriber to
544      * inhibit the delivery of messages published by its own connection. The
545      * default value for this attribute is false.
546      *
547      * @param topic the non-temporary <code>Topic</code> to subscribe to
548      * @param name the name used to identify this subscription
549      * @throws JMSException if the session fails to create a
550      * subscriber due to some internal
551      * error.
552      * @throws InvalidDestinationException if an invalid topic is specified.
553      */

554     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name)
555             throws JMSException JavaDoc {
556         return createDurableSubscriber(topic, name, null, false);
557     }
558
559     /**
560      * Creates a durable subscriber to the specified topic, using a message
561      * selector and specifying whether messages published by its own connection
562      * should be delivered to it.
563      * <p/>
564      * <P>If a client needs to receive all the messages published on a topic,
565      * including the ones published while the subscriber is inactive, it uses a
566      * durable <code>TopicSubscriber</code>. The JMS provider retains a record
567      * of this durable subscription and insures that all messages from the
568      * topic's publishers are retained until they are acknowledged by this
569      * durable subscriber or they have expired.
570      * <p/>
571      * <P>Sessions with durable subscribers must always provide the same client
572      * identifier. In addition, each client must specify a name which uniquely
573      * identifies (within client identifier) each durable subscription it
574      * creates. Only one session at a time can have a <code>TopicSubscriber</code>
575      * for a particular durable subscription. An inactive durable subscriber is
576      * one that exists but does not currently have a message consumer associated
577      * with it.
578      * <p/>
579      * <P>A client can change an existing durable subscription by creating a
580      * durable <code>TopicSubscriber</code> with the same name and a new topic
581      * and/or message selector. Changing a durable subscriber is equivalent to
582      * unsubscribing (deleting) the old one and creating a new one.
583      *
584      * @param topic the non-temporary <code>Topic</code> to subscribe
585      * to
586      * @param name the name used to identify this subscription
587      * @param messageSelector only messages with properties matching the message
588      * selector expression are delivered. A value of
589      * null or an empty string indicates that there is no
590      * message selector for the message consumer.
591      * @param noLocal if set, inhibits the delivery of messages
592      * published by its own connection
593      * @throws JMSException if the session fails to create a
594      * subscriber due to some internal
595      * error.
596      * @throws InvalidDestinationException if an invalid topic is specified.
597      * @throws InvalidSelectorException if the message selector is invalid.
598      */

599     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name,
600                                                    String JavaDoc messageSelector,
601                                                    boolean noLocal)
602             throws JMSException JavaDoc {
603         ensureOpen();
604
605         if (topic == null) {
606             throw new InvalidDestinationException JavaDoc(
607                     "Cannot create durable subscriber: argument 'topic' is "
608                     + " null");
609         }
610         if (name == null || name.trim().length() == 0) {
611             throw new JMSException JavaDoc("Invalid subscription name specified");
612         }
613
614         // check to see if the topic is a temporary topic. You cannot
615
// create a durable subscriber for a temporary topic
616
if (((JmsTopic) topic).isTemporaryDestination()) {
617             throw new InvalidDestinationException JavaDoc(
618                     "Cannot create a durable subscriber for a temporary topic");
619         }
620
621         long consumerId = _session.createDurableConsumer((JmsTopic) topic, name,
622                                                          messageSelector,
623                                                          noLocal);
624         JmsTopicSubscriber subscriber = new JmsTopicSubscriber(this,
625                                                                consumerId,
626                                                                topic,
627                                                                messageSelector,
628                                                                noLocal);
629         addConsumer(subscriber);
630
631         return subscriber;
632     }
633
634     /**
635      * Creates a <code>QueueBrowser</code> object to peek at the messages on the
636      * specified queue.
637      *
638      * @param queue the queue to access
639      * @throws JMSException if the session fails to create a
640      * browser due to some internal error.
641      * @throws InvalidDestinationException if an invalid destination is
642      * specified
643      */

644     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc {
645         return createBrowser(queue, null);
646     }
647
648     /**
649      * Creates a <code>QueueBrowser</code> object to peek at the messages on the
650      * specified queue using a message selector.
651      *
652      * @param queue the <code>queue</code> to access
653      * @param messageSelector only messages with properties matching the message
654      * selector expression are delivered. A value of null
655      * or an empty string indicates that there is no
656      * message selector for the message consumer.
657      * @throws JMSException if the session fails to create a
658      * browser due to some internal error.
659      * @throws InvalidDestinationException if an invalid destination is
660      * specified
661      * @throws InvalidSelectorException if the message selector is invalid.
662      */

663     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc messageSelector)
664             throws JMSException JavaDoc {
665         ensureOpen();
666         if (!(queue instanceof JmsQueue)) {
667             throw new InvalidDestinationException JavaDoc("Cannot create QueueBrowser for destination="
668                                                   + queue);
669         }
670
671         JmsQueue dest = (JmsQueue) queue;
672         // check to see if the queue is temporary. A temporary queue
673
// can only be used within the context of the owning connection
674
if (!checkForValidTemporaryDestination(dest)) {
675             throw new InvalidDestinationException JavaDoc(
676                     "Cannot create a queue browser for a temporary queue "
677                     + "that is not bound to this connection");
678         }
679
680         long consumerId = _session.createBrowser(dest, messageSelector);
681         JmsQueueBrowser browser = new JmsQueueBrowser(this, consumerId, queue,
682                                                       messageSelector);
683         addConsumer(browser);
684         return browser;
685     }
686
687     /**
688      * Creates a <code>TemporaryQueue</code> object. Its lifetime will be that
689      * of the <code>Connection</code> unless it is deleted earlier.
690      *
691      * @return a temporary queue identity
692      * @throws JMSException if the session fails to create a temporary queue due
693      * to some internal error.
694      */

695     public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
696         ensureOpen();
697
698         JmsTemporaryQueue queue = new JmsTemporaryQueue();
699         queue.setOwningConnection(getConnection());
700         return queue;
701     }
702
703     /**
704      * Creates a <code>TemporaryTopic</code> object. Its lifetime will be that
705      * of the <code>Connection</code> unless it is deleted earlier.
706      *
707      * @return a temporary topic identity
708      * @throws JMSException if the session fails to create a temporary topic due
709      * to some internal error.
710      */

711     public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
712         ensureOpen();
713
714         JmsTemporaryTopic topic = new JmsTemporaryTopic();
715         topic.setOwningConnection(getConnection());
716         return topic;
717     }
718
719     /**
720      * Unsubscribes a durable subscription that has been created by a client.
721      * <p/>
722      * <P>This method deletes the state being maintained on behalf of the
723      * subscriber by its provider.
724      * <p/>
725      * <P>It is erroneous for a client to delete a durable subscription while
726      * there is an active <code>MessageConsumer</code> or
727      * <code>TopicSubscriber</code> for the subscription, or while a consumed
728      * message is part of a pending transaction or has not been acknowledged in
729      * the session.
730      *
731      * @param name the name used to identify this subscription
732      * @throws JMSException if the session fails to unsubscribe
733      * to the durable subscription due to
734      * some internal error.
735      * @throws InvalidDestinationException if an invalid subscription name is
736      * specified.
737      */

738     public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
739         ensureOpen();
740         _session.unsubscribe(name);
741     }
742
743     /**
744      * Commit all messages done in this transaction
745      *
746      * @throws JMSException if the transaction cannot be committed
747      */

748     public synchronized void commit() throws JMSException JavaDoc {
749         ensureOpen();
750         ensureTransactional();
751
752         // send all the cached messages to the server
753
getServerSession().send(_messagesToSend);
754         _messagesToSend.clear();
755
756         // commit the session
757
getServerSession().commit();
758     }
759
760     /**
761      * Rollback any messages done in this transaction
762      *
763      * @throws JMSException if the transaction cannot be rolled back
764      */

765     public synchronized void rollback() throws JMSException JavaDoc {
766         ensureOpen();
767         ensureTransactional();
768
769         // clear all the cached messages
770
_messagesToSend.clear();
771
772         // rollback the session
773
getServerSession().rollback();
774     }
775
776     /**
777      * Close the session. This call will block until a receive or message
778      * listener in progress has completed. A blocked message consumer receive
779      * call returns <code>null</code> when this session is closed.
780      *
781      * @throws JMSException if the session can't be closed
782      */

783     public synchronized void close() throws JMSException JavaDoc {
784         if (!_closed) {
785             _closing = true;
786
787             // must stop first before we close
788
stop();
789
790             // wake up any blocking consumers
791
notifyConsumers();
792
793             // go through all the producer and call close on them
794
// respectively
795
JmsMessageProducer[] producers =
796                (JmsMessageProducer[]) _producers.toArray(
797                        new JmsMessageProducer[0]);
798             for (int i = 0; i < producers.length; ++i) {
799                 JmsMessageProducer producer = producers[i];
800                 producer.close();
801             }
802
803             // go through all the consumer and call close on them
804
// respectively
805
JmsMessageConsumer[] consumers =
806                     (JmsMessageConsumer[]) _consumers.values().toArray(
807                             new JmsMessageConsumer[0]);
808             for (int i = 0; i < consumers.length; ++i) {
809                 JmsMessageConsumer consumer = consumers[i];
810                 consumer.close();
811             }
812
813             // deregister this with the connection
814
_connection.removeSession(this);
815             _connection = null;
816
817             // clear any cached messages or acks
818
_messagesToSend.clear();
819
820             // issue a close to the remote session. This will release any
821
// allocated remote resources
822
getServerSession().close();
823             _session = null;
824
825             // update the session state
826
_closed = true;
827             _closing = false;
828         }
829     }
830
831     /**
832      * Stop message delivery in this session, and restart sending messages with
833      * the oldest unacknowledged message
834      *
835      * @throws JMSException if the session can't be recovered
836      */

837     public synchronized void recover() throws JMSException JavaDoc {
838         ensureOpen();
839         if (!_transacted) {
840             // let the server handle the recovery
841
getServerSession().recover();
842         } else {
843             throw new IllegalStateException JavaDoc(
844                     "Cannot recover from a transacted session");
845         }
846     }
847
848     /**
849      * Returns the message listener associated with the session
850      *
851      * @return the message listener associated with the session, or
852      * <code>null</code> if no listener is registered
853      * @throws JMSException if the session is closed
854      */

855     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
856         ensureOpen();
857         return _listener;
858     }
859
860     /**
861      * Sets the session's message listener.
862      *
863      * @param listener the session's message listener
864      * @throws JMSException if the session is closed
865      */

866     public void setMessageListener(MessageListener JavaDoc listener)
867             throws JMSException JavaDoc {
868         ensureOpen();
869         _listener = listener;
870     }
871
872     /**
873      * Iterates through the list of messages added by an {@link
874      * JmsConnectionConsumer}, sending them to the registered listener
875      */

876     public void run() {
877         try {
878             while (!_messageCache.isEmpty()) {
879                 Message JavaDoc message = (Message JavaDoc) _messageCache.remove(0);
880                 _listener.onMessage(message);
881             }
882         } catch (Exception JavaDoc exception) {
883             _log.error("Error in the Session.run()", exception);
884         } finally {
885             // Clear message cache
886
_messageCache.clear();
887         }
888     }
889
890     /**
891      * Set the message listener for a particular consumer.
892      * <p/>
893      * If a listener is already registered for the consumer, it will be
894      * automatically overwritten
895      *
896      * @param listener the message listener
897      * @throws JMSException if the listener can't be set
898      */

899     public void setMessageListener(JmsMessageConsumer listener)
900             throws JMSException JavaDoc {
901         ensureOpen();
902         enableAsynchronousDelivery(listener.getConsumerId(), true);
903     }
904
905     /**
906      * Remove a message listener
907      *
908      * @param listener the message listener to remove
909      * @throws JMSException if the listener can't be removed
910      */

911     public void removeMessageListener(JmsMessageConsumer listener)
912             throws JMSException JavaDoc {
913
914         ensureOpen();
915         enableAsynchronousDelivery(listener.getConsumerId(), false);
916     }
917
918     /**
919      * This will start message delivery to this session. If message delivery has
920      * already started then this is a no-op.
921      *
922      * @throws JMSException if message delivery can't be started
923      */

924     public void start() throws JMSException JavaDoc {
925         ensureOpen();
926         if (_stopped) {
927             getServerSession().start();
928             _stopped = false;
929
930             // wake up any blocking consumers
931
notifyConsumers();
932         }
933     }
934
935     /**
936      * This will stop message delivery to this session. If message delivery has
937      * already stoped then this is a no-op.
938      *
939      * @throws JMSException if message delivery can't be stopped
940      */

941     public void stop() throws JMSException JavaDoc {
942         ensureOpen();
943         if (!_stopped) {
944             getServerSession().stop();
945             _stopped = true;
946
947             // wake up any blocking consumers
948
notifyConsumers();
949         }
950     }
951
952     /**
953      * Acknowledge the specified message. This is only applicable for
954      * CLIENT_ACKNOWLEDGE sessions. For other session types, the request is
955      * ignored.
956      * <p/>
957      * Acking a message automatically acks all those that have come before it.
958      *
959      * @param message the message to acknowledge
960      * @throws JMSException if the message can't be acknowledged
961      */

962     public void acknowledgeMessage(Message JavaDoc message) throws JMSException JavaDoc {
963         ensureOpen();
964         if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
965             MessageImpl impl = (MessageImpl) message;
966             getServerSession().acknowledgeMessage(impl.getConsumerId(),
967                                                    impl.getAckMessageID());
968         }
969     }
970
971     /**
972      * Enable or disable asynchronous message delivery for the specified
973      * client.
974      *
975      * @param consumerId the consumer identifier
976      * @param enable <code>true</code> to enable; <code>false</code> to
977      * disable
978      * @throws JMSException if message delivery cannot be enabled or disabled
979      */

980     public void enableAsynchronousDelivery(long consumerId, boolean enable)
981             throws JMSException JavaDoc {
982
983         ensureOpen();
984         getServerSession().enableAsynchronousDelivery(consumerId, enable);
985     }
986
987     /**
988      * Asynchronously deliver a message to a <code>MessageConsumer</code>
989      *
990      * @param message the message to deliver
991      */

992     public void onMessage(Message JavaDoc message) {
993         if (message != null) {
994             MessageImpl impl = (MessageImpl) message;
995             impl.setJMSXRcvTimestamp(System.currentTimeMillis());
996
997             // dispatch the message;
998
execute(message);
999         }
1000    }
1001
1002    /**
1003     * Inform the session that there is a message available for a particular
1004     * consumer
1005     *
1006     * @param consumerId the consumer identity
1007     */

1008    public void onMessageAvailable(long consumerId) {
1009        // wake up any blocking consumers
1010
notifyConsumers();
1011    }
1012
1013    /**
1014     * This is the called to process messages asynchronously delivered by the
1015     * server. The session is then responsible for delivering it to the
1016     * appropriate registered consumer. If it cannot resolve the consumer then
1017     * it must log an exception
1018     * <p/>
1019     * If the session has a registered listener then all messages will be
1020     * delivered to the session's listener instead of the individual consumer
1021     * message listeners.
1022     *
1023     * @param object received message
1024     */

1025    public synchronized void execute(Object JavaDoc object) {
1026        // if the session is closed then drop the object
1027
if (_closed) {
1028            _log.error("Received a message for a closed session");
1029            return;
1030        }
1031
1032        MessageImpl message = (MessageImpl) object;
1033        long consumerId = message.getConsumerId();
1034        JmsMessageConsumer consumer =
1035                (JmsMessageConsumer) _consumers.get(new Long JavaDoc(consumerId));
1036
1037        // tag the session that received this message
1038
message.setSession(this);
1039        if (consumer != null) {
1040            // if a listener is defined for the session then send all the
1041
// messages to that listener regardless if any consumers are
1042
// have registered listeners...bit confusing but this is what
1043
// I believe it should do
1044
if (_listener != null) {
1045                _listener.onMessage(message);
1046            } else {
1047                // send it to the appropriate consumer
1048
consumer.onMessage(message);
1049            }
1050        } else {
1051            // consumer no longer active...so drop the message
1052
_log.error("Received a message for an inactive consumer");
1053        }
1054    }
1055
1056    /**
1057     * Fetch the next message for this client. If the session's ackMode is
1058     * client acknowledge then set the session for the message, othwerwise ack
1059     * the message before returning it.
1060     *
1061     * @param consumerId the consumer identififer
1062     * @param wait the maximum time to wait for a message, in
1063     * milliseconds. If <code>-1</code>, don't wait, if
1064     * <code>0</code> wait indefinitely, otherwise wait the
1065     * specified time.
1066     * @return the received message, or <code>null</code>, if no message is
1067     * available
1068     * @throws JMSException if an error occurs retrieving the message
1069     */

1070    public Message JavaDoc retrieveMessage(long consumerId, long wait)
1071            throws JMSException JavaDoc {
1072
1073        ensureOpen();
1074
1075        boolean breakOnNextRead = false;
1076        long start = System.currentTimeMillis();
1077        long end = start + wait;
1078        MessageImpl message = null;
1079        while (true) {
1080            synchronized (_receiveLock) {
1081                if (_closing || _closed) {
1082                    // session is in the process of closing, or has been
1083
// closed. Need to return null.
1084
break;
1085                } else if (_stopped) {
1086                    // connection has been stopped. No message can be returned,
1087
// but receives continue to time out
1088
} else {
1089                    // connection is started. Messages may be returned.
1090
message = (MessageImpl) getServerSession().receive(
1091                            consumerId, wait);
1092                }
1093                if (message != null) {
1094                    message.setSession(this);
1095                    break;
1096                } else {
1097                    // if we have instructed to break, then exit the loop.
1098
if (breakOnNextRead) {
1099                        break;
1100                    }
1101
1102                    // no message was received. Block for the specified time
1103
// until one of the following occurs:
1104
// . a message is received
1105
// . the receive times out
1106
// . the session is closed
1107
if (wait >= 0) {
1108                        try {
1109                            if (wait > 0) {
1110                                // wait for a specific period of time
1111
_receiveLock.wait(wait);
1112                                long current = System.currentTimeMillis();
1113                                if (current >= end) {
1114                                    breakOnNextRead = true;
1115                                } else {
1116                                    // update the time to wait. If the value
1117
// is zero then break on the next read
1118
wait = end - current;
1119                                    if (wait == 0) {
1120                                        breakOnNextRead = true;
1121                                    }
1122                                }
1123                            } else {
1124                                // wait indefinitely
1125
_receiveLock.wait();
1126                            }
1127                        } catch (InterruptedException JavaDoc ignore) {
1128                            // no-op
1129
}
1130                    } else {
1131                        // exit the loop since the client is performing a non
1132
// blocking read
1133
break;
1134                    }
1135                }
1136            }
1137        }
1138
1139        return message;
1140    }
1141
1142    /**
1143     * Browse up to count messages.
1144     *
1145     * @param consumerId the consumer identifier
1146     * @param count the maximum number of messages to receive
1147     * @return a list of {@link MessageImpl} instances
1148     * @throws JMSException for any JMS error
1149     */

1150    public synchronized List JavaDoc browse(long consumerId, int count)
1151            throws JMSException JavaDoc {
1152        ensureOpen();
1153        return getServerSession().browse(consumerId, count);
1154    }
1155
1156    /**
1157     * Send the specified message to the server.
1158     *
1159     * @param message the message to send
1160     * @throws JMSException if the message can't be sent
1161     */

1162    protected synchronized void sendMessage(Message JavaDoc message)
1163            throws JMSException JavaDoc {
1164
1165        if (_transacted) {
1166            // if the session is transacted then cache the message locally.
1167
// and wait for a commit or a rollback
1168
if (message instanceof MessageImpl) {
1169                try {
1170                    message = (Message JavaDoc) ((MessageImpl) message).clone();
1171                } catch (CloneNotSupportedException JavaDoc error) {
1172                    throw new JMSException JavaDoc(error.getMessage());
1173                }
1174            } else {
1175                message = convert(message);
1176            }
1177            _messagesToSend.add(message);
1178        } else {
1179            if (!(message instanceof MessageImpl)) {
1180                message = convert(message);
1181            }
1182            getServerSession().send((MessageImpl) message);
1183        }
1184    }
1185
1186    /**
1187     * Returns the server session.
1188     *
1189     * @return the server session
1190     */

1191    protected ServerSession getServerSession() {
1192        return _session;
1193    }
1194
1195    /**
1196     * Return a reference to the connection that created this session.
1197     *
1198     * @return the owning connection
1199     */

1200    protected JmsConnection getConnection() {
1201        return _connection;
1202    }
1203
1204    /**
1205     * Creates a new message consumer, returning its identity.
1206     *
1207     * @param destination the destination to access
1208     * @param selector the message selector. May be <code>null</code>
1209     * @param noLocal if true, and the destination is a topic, inhibits the
1210     * delivery of messages published by its own connection.
1211     * The behavior for <code>noLocal</code> is not specified
1212     * if the destination is a queue.
1213     * @throws JMSException if the session fails to create a
1214     * MessageConsumer due to some internal
1215     * error.
1216     * @throws InvalidDestinationException if an invalid destination is
1217     * specified.
1218     * @throws InvalidSelectorException if the message selector is invalid.
1219     */

1220    protected long allocateConsumer(Destination JavaDoc destination,
1221                                      String JavaDoc selector, boolean noLocal)
1222            throws JMSException JavaDoc {
1223        ensureOpen();
1224
1225        if (!(destination instanceof JmsDestination)) {
1226            throw new InvalidDestinationException JavaDoc(
1227                    "Cannot create MessageConsumer for destination="
1228                     + destination);
1229        }
1230        JmsDestination dest = (JmsDestination) destination;
1231
1232        // check to see if the destination is temporary. A temporary destination
1233
// can only be used within the context of the owning connection
1234
if (!checkForValidTemporaryDestination(dest)) {
1235            throw new InvalidDestinationException JavaDoc(
1236                    "Trying to create a MessageConsumer for a temporary "
1237                    + "destination that is not bound to this connection");
1238        }
1239
1240        long consumerId = _session.createConsumer(dest, selector, noLocal);
1241        return consumerId;
1242    }
1243
1244    /**
1245     * This method checks the destination. If the destination is not temporary
1246     * then return true. If it is a temporary destination and it is owned by
1247     * this session's connection then it returns true. If it is a tmeporary
1248     * destination and it is owned by another connection then it returns false
1249     *
1250     * @param destination the destination to check
1251     * @return <code>true</code> if the destination is valid
1252     */

1253    protected boolean checkForValidTemporaryDestination(
1254            JmsDestination destination) {
1255        boolean result = false;
1256
1257        if (destination.isTemporaryDestination()) {
1258            JmsTemporaryDestination temp =
1259                    (JmsTemporaryDestination) destination;
1260
1261            // check that this temp destination is owned by the session's
1262
// connection.
1263
if (temp.validForConnection(getConnection())) {
1264                result = true;
1265            }
1266        } else {
1267            result = true;
1268        }
1269
1270        return result;
1271    }
1272
1273    /**
1274     * Add a consumer to the list of consumers managed by this session.
1275     *
1276     * @param consumer the consumer to add
1277     */

1278    protected void addConsumer(JmsMessageConsumer consumer) {
1279        _consumers.put(new Long JavaDoc(consumer.getConsumerId()), consumer);
1280    }
1281
1282    /**
1283     * Remove a consumer, deregistering it on the server.
1284     *
1285     * @param consumer the consumer to remove
1286     * @throws JMSException if removal fails
1287     */

1288    protected void removeConsumer(JmsMessageConsumer consumer)
1289            throws JMSException JavaDoc {
1290        long consumerId = consumer.getConsumerId();
1291        try {
1292            if (!(consumer instanceof JmsQueueBrowser)) {
1293                removeMessageListener(consumer);
1294            }
1295            _session.removeConsumer(consumerId);
1296        } finally {
1297            _consumers.remove(new Long JavaDoc(consumerId));
1298        }
1299    }
1300
1301    /**
1302     * Add a producer to the list of producers managed by this session.
1303     *
1304     * @param producer the producer to add
1305     */

1306    protected void addProducer(JmsMessageProducer producer) {
1307        _producers.add(producer);
1308    }
1309
1310    /**
1311     * Remove the producer from the list of managed producers.
1312     *
1313     * @param producer the producer to remove
1314     */

1315    protected void removeProducer(JmsMessageProducer producer) {
1316        _producers.remove(producer);
1317    }
1318
1319    /**
1320     * Check if the session is closed.
1321     *
1322     * @return <code>true</code> if the session is closed
1323     */

1324    protected final boolean isClosed() {
1325        return _closed;
1326    }
1327
1328    /**
1329     * Add a message to the message cache. This message will be processed when
1330     * the run() method is called.
1331     *
1332     * @param message the message to add.
1333     */

1334    protected void addMessage(Message JavaDoc message) {
1335        _messageCache.add(message);
1336    }
1337
1338    /**
1339     * Verifies that the session isn't closed.
1340     *
1341     * @throws IllegalStateException if the session is closed
1342     */

1343    protected void ensureOpen() throws IllegalStateException JavaDoc {
1344        if (_closed) {
1345            throw new IllegalStateException JavaDoc(
1346                    "Cannot perform operation - session has been closed");
1347        }
1348    }
1349
1350    /**
1351     * Verifies that the session is transactional.
1352     *
1353     * @throws IllegalStateException if the session isn't transactional
1354     */

1355    private void ensureTransactional() throws IllegalStateException JavaDoc {
1356        if (!_transacted) {
1357            throw new IllegalStateException JavaDoc(
1358                    "Cannot perform operatiorn - session is not transactional");
1359        }
1360    }
1361
1362    /**
1363     * Notifies any blocking synchronous consumers.
1364     */

1365    private void notifyConsumers() {
1366        synchronized (_receiveLock) {
1367            _receiveLock.notifyAll();
1368        }
1369    }
1370
1371    /**
1372     * Convert a message to its corresponding OpenJMS implementation.
1373     *
1374     * @param message the message to convert
1375     * @return the OpenJMS implementation of the message
1376     * @throws JMSException for any error
1377     */

1378    private Message JavaDoc convert(Message JavaDoc message) throws JMSException JavaDoc {
1379        MessageConverter converter =
1380                MessageConverterFactory.create(message);
1381        return converter.convert(message);
1382    }
1383
1384}
1385
1386
Popular Tags