KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQSession


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.io.File JavaDoc;
21 import java.io.InputStream JavaDoc;
22 import java.io.Serializable JavaDoc;
23 import java.net.URL JavaDoc;
24 import java.util.Collections JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
28 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
29
30 import javax.jms.BytesMessage JavaDoc;
31 import javax.jms.Destination JavaDoc;
32 import javax.jms.IllegalStateException JavaDoc;
33 import javax.jms.InvalidDestinationException JavaDoc;
34 import javax.jms.InvalidSelectorException JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.MapMessage JavaDoc;
37 import javax.jms.Message JavaDoc;
38 import javax.jms.MessageConsumer JavaDoc;
39 import javax.jms.MessageListener JavaDoc;
40 import javax.jms.MessageProducer JavaDoc;
41 import javax.jms.ObjectMessage JavaDoc;
42 import javax.jms.Queue JavaDoc;
43 import javax.jms.QueueBrowser JavaDoc;
44 import javax.jms.QueueReceiver JavaDoc;
45 import javax.jms.QueueSender JavaDoc;
46 import javax.jms.QueueSession JavaDoc;
47 import javax.jms.Session JavaDoc;
48 import javax.jms.StreamMessage JavaDoc;
49 import javax.jms.TemporaryQueue JavaDoc;
50 import javax.jms.TemporaryTopic JavaDoc;
51 import javax.jms.TextMessage JavaDoc;
52 import javax.jms.Topic JavaDoc;
53 import javax.jms.TopicPublisher JavaDoc;
54 import javax.jms.TopicSession JavaDoc;
55 import javax.jms.TopicSubscriber JavaDoc;
56 import javax.jms.TransactionRolledBackException JavaDoc;
57
58 import org.apache.activemq.blob.BlobTransferPolicy;
59 import org.apache.activemq.blob.BlobUploader;
60 import org.apache.activemq.command.ActiveMQBlobMessage;
61 import org.apache.activemq.command.ActiveMQBytesMessage;
62 import org.apache.activemq.command.ActiveMQDestination;
63 import org.apache.activemq.command.ActiveMQMapMessage;
64 import org.apache.activemq.command.ActiveMQMessage;
65 import org.apache.activemq.command.ActiveMQObjectMessage;
66 import org.apache.activemq.command.ActiveMQQueue;
67 import org.apache.activemq.command.ActiveMQStreamMessage;
68 import org.apache.activemq.command.ActiveMQTempDestination;
69 import org.apache.activemq.command.ActiveMQTextMessage;
70 import org.apache.activemq.command.ActiveMQTopic;
71 import org.apache.activemq.command.Command;
72 import org.apache.activemq.command.ConsumerId;
73 import org.apache.activemq.command.MessageAck;
74 import org.apache.activemq.command.MessageDispatch;
75 import org.apache.activemq.command.MessageId;
76 import org.apache.activemq.command.ProducerId;
77 import org.apache.activemq.command.Response;
78 import org.apache.activemq.command.SessionId;
79 import org.apache.activemq.command.SessionInfo;
80 import org.apache.activemq.command.TransactionId;
81 import org.apache.activemq.management.JMSSessionStatsImpl;
82 import org.apache.activemq.management.StatsCapable;
83 import org.apache.activemq.management.StatsImpl;
84 import org.apache.activemq.memory.UsageManager;
85 import org.apache.activemq.thread.Scheduler;
86 import org.apache.activemq.transaction.Synchronization;
87 import org.apache.activemq.util.Callback;
88 import org.apache.activemq.util.LongSequenceGenerator;
89 import org.apache.commons.logging.Log;
90 import org.apache.commons.logging.LogFactory;
91
92 /**
93  * <P>
94  * A <CODE>Session</CODE> object is a single-threaded context for producing
95  * and consuming messages. Although it may allocate provider resources outside
96  * the Java virtual machine (JVM), it is considered a lightweight JMS object.
97  * <P>
98  * A session serves several purposes:
99  * <UL>
100  * <LI>It is a factory for its message producers and consumers.
101  * <LI>It supplies provider-optimized message factories.
102  * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
103  * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
104  * objects for those clients that need to dynamically manipulate
105  * provider-specific destination names.
106  * <LI>It supports a single series of transactions that combine work spanning
107  * its producers and consumers into atomic units.
108  * <LI>It defines a serial order for the messages it consumes and the messages
109  * it produces.
110  * <LI>It retains messages it consumes until they have been acknowledged.
111  * <LI>It serializes execution of message listeners registered with its message
112  * consumers.
113  * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
114  * </UL>
115  * <P>
116  * A session can create and service multiple message producers and consumers.
117  * <P>
118  * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE>
119  * until a message arrives. The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
120  * <P>
121  * If a client desires to have one thread produce messages while others consume
122  * them, the client should use a separate session for its producing thread.
123  * <P>
124  * Once a connection has been started, any session with one or more registered
125  * message listeners is dedicated to the thread of control that delivers
126  * messages to it. It is erroneous for client code to use this session or any of
127  * its constituent objects from another thread of control. The only exception to
128  * this rule is the use of the session or connection <CODE>close</CODE>
129  * method.
130  * <P>
131  * It should be easy for most clients to partition their work naturally into
132  * sessions. This model allows clients to start simply and incrementally add
133  * message processing complexity as their need for concurrency grows.
134  * <P>
135  * The <CODE>close</CODE> method is the only session method that can be called
136  * while some other session method is being executed in another thread.
137  * <P>
138  * A session may be specified as transacted. Each transacted session supports a
139  * single series of transactions. Each transaction groups a set of message sends
140  * and a set of message receives into an atomic unit of work. In effect,
141  * transactions organize a session's input message stream and output message
142  * stream into series of atomic units. When a transaction commits, its atomic
143  * unit of input is acknowledged and its associated atomic unit of output is
144  * sent. If a transaction rollback is done, the transaction's sent messages are
145  * destroyed and the session's input is automatically recovered.
146  * <P>
147  * The content of a transaction's input and output units is simply those
148  * messages that have been produced and consumed within the session's current
149  * transaction.
150  * <P>
151  * A transaction is completed using either its session's <CODE>commit</CODE>
152  * method or its session's <CODE>rollback </CODE> method. The completion of a
153  * session's current transaction automatically begins the next. The result is
154  * that a transacted session always has a current transaction within which its
155  * work is done.
156  * <P>
157  * The Java Transaction Service (JTS) or some other transaction monitor may be
158  * used to combine a session's transaction with transactions on other resources
159  * (databases, other JMS sessions, etc.). Since Java distributed transactions
160  * are controlled via the Java Transaction API (JTA), use of the session's
161  * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
162  * prohibited.
163  * <P>
164  * The JMS API does not require support for JTA; however, it does define how a
165  * provider supplies this support.
166  * <P>
167  * Although it is also possible for a JMS client to handle distributed
168  * transactions directly, it is unlikely that many JMS clients will do this.
169  * Support for JTA in the JMS API is targeted at systems vendors who will be
170  * integrating the JMS API into their application server products.
171  *
172  * @version $Revision: 1.34 $
173  * @see javax.jms.Session
174  * @see javax.jms.QueueSession
175  * @see javax.jms.TopicSession
176  * @see javax.jms.XASession
177  */

178 public class ActiveMQSession implements Session JavaDoc, QueueSession JavaDoc, TopicSession JavaDoc, StatsCapable, ActiveMQDispatcher {
179
180     public static interface DeliveryListener {
181         public void beforeDelivery(ActiveMQSession session, Message msg);
182         public void afterDelivery(ActiveMQSession session, Message msg);
183     }
184
185     private static final Log log = LogFactory.getLog(ActiveMQSession.class);
186
187     protected int acknowledgementMode;
188
189     private MessageListener JavaDoc messageListener;
190     private JMSSessionStatsImpl stats;
191     private TransactionContext transactionContext;
192     private DeliveryListener deliveryListener;
193     private MessageTransformer transformer;
194     private BlobTransferPolicy blobTransferPolicy;
195
196     protected final ActiveMQConnection connection;
197     protected final SessionInfo info;
198     protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
199     protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
200     protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
201     protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this);
202     protected final AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
203     
204     protected final CopyOnWriteArrayList JavaDoc consumers = new CopyOnWriteArrayList JavaDoc();
205     protected final CopyOnWriteArrayList JavaDoc producers = new CopyOnWriteArrayList JavaDoc();
206
207     protected boolean closed;
208     protected boolean asyncDispatch;
209     protected boolean sessionAsyncDispatch;
210     protected final boolean debug;
211     protected Object JavaDoc sendMutex = new Object JavaDoc();
212
213     /**
214      * Construct the Session
215      *
216      * @param connection
217      * @param sessionId
218      * @param acknowledgeMode
219      * n.b if transacted - the acknowledgeMode ==
220      * Session.SESSION_TRANSACTED
221      * @param asyncDispatch
222      * @param sessionAsyncDispatch
223      * @throws JMSException
224      * on internal error
225      */

226     protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch,boolean sessionAsyncDispatch)
227             throws JMSException JavaDoc {
228         this.debug = log.isDebugEnabled();
229         this.connection = connection;
230         this.acknowledgementMode = acknowledgeMode;
231         this.asyncDispatch=asyncDispatch;
232         this.sessionAsyncDispatch = sessionAsyncDispatch;
233         this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
234         setTransactionContext(new TransactionContext(connection));
235         connection.addSession(this);
236         stats = new JMSSessionStatsImpl(producers, consumers);
237         this.connection.asyncSendPacket(info);
238         setTransformer(connection.getTransformer());
239         setBlobTransferPolicy(connection.getBlobTransferPolicy());
240         
241         if( connection.isStarted() )
242             start();
243
244     }
245     
246     protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch)throws JMSException JavaDoc {
247         this(connection,sessionId,acknowledgeMode,asyncDispatch,true);
248     }
249
250     /**
251      * Sets the transaction context of the session.
252      *
253      * @param transactionContext -
254      * provides the means to control a JMS transaction.
255      */

256     public void setTransactionContext(TransactionContext transactionContext) {
257         this.transactionContext = transactionContext;
258     }
259
260     /**
261      * Returns the transaction context of the session.
262      *
263      * @return transactionContext - session's transaction context.
264      */

265     public TransactionContext getTransactionContext() {
266         return transactionContext;
267     }
268
269     /*
270      * (non-Javadoc)
271      *
272      * @see org.apache.activemq.management.StatsCapable#getStats()
273      */

274     public StatsImpl getStats() {
275         return stats;
276     }
277
278     /**
279      * Returns the session's statistics.
280      *
281      * @return stats - session's statistics.
282      */

283     public JMSSessionStatsImpl getSessionStats() {
284         return stats;
285     }
286
287     /**
288      * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
289      * object is used to send a message containing a stream of uninterpreted
290      * bytes.
291      *
292      * @return the an ActiveMQBytesMessage
293      * @throws JMSException
294      * if the JMS provider fails to create this message due to some
295      * internal error.
296      */

297     public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc {
298         ActiveMQBytesMessage message = new ActiveMQBytesMessage();
299         configureMessage(message);
300         return message;
301     }
302
303     /**
304      * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
305      * object is used to send a self-defining set of name-value pairs, where
306      * names are <CODE>String</CODE> objects and values are primitive values
307      * in the Java programming language.
308      *
309      * @return an ActiveMQMapMessage
310      * @throws JMSException
311      * if the JMS provider fails to create this message due to some
312      * internal error.
313      */

314     public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc {
315         ActiveMQMapMessage message = new ActiveMQMapMessage();
316         configureMessage(message);
317         return message;
318     }
319
320     /**
321      * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
322      * interface is the root interface of all JMS messages. A <CODE>Message</CODE>
323      * object holds all the standard message header information. It can be sent
324      * when a message containing only header information is sufficient.
325      *
326      * @return an ActiveMQMessage
327      * @throws JMSException
328      * if the JMS provider fails to create this message due to some
329      * internal error.
330      */

331     public Message createMessage() throws JMSException JavaDoc {
332         ActiveMQMessage message = new ActiveMQMessage();
333         configureMessage(message);
334         return message;
335     }
336
337     /**
338      * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE>
339      * object is used to send a message that contains a serializable Java
340      * object.
341      *
342      * @return an ActiveMQObjectMessage
343      * @throws JMSException
344      * if the JMS provider fails to create this message due to some
345      * internal error.
346      */

347     public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc {
348         ActiveMQObjectMessage message = new ActiveMQObjectMessage();
349         configureMessage(message);
350         return message;
351     }
352
353     /**
354      * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE>
355      * object is used to send a message that contains a serializable Java
356      * object.
357      *
358      * @param object
359      * the object to use to initialize this message
360      * @return an ActiveMQObjectMessage
361      * @throws JMSException
362      * if the JMS provider fails to create this message due to some
363      * internal error.
364      */

365     public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc {
366         ActiveMQObjectMessage message = new ActiveMQObjectMessage();
367         configureMessage(message);
368         message.setObject(object);
369         return message;
370     }
371
372     /**
373      * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE>
374      * object is used to send a self-defining stream of primitive values in the
375      * Java programming language.
376      *
377      * @return an ActiveMQStreamMessage
378      * @throws JMSException
379      * if the JMS provider fails to create this message due to some
380      * internal error.
381      */

382     public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc {
383         ActiveMQStreamMessage message = new ActiveMQStreamMessage();
384         configureMessage(message);
385         return message;
386     }
387
388     /**
389      * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
390      * object is used to send a message containing a <CODE>String</CODE>
391      * object.
392      *
393      * @return an ActiveMQTextMessage
394      * @throws JMSException
395      * if the JMS provider fails to create this message due to some
396      * internal error.
397      */

398     public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc {
399         ActiveMQTextMessage message = new ActiveMQTextMessage();
400         configureMessage(message);
401         return message;
402     }
403
404     /**
405      * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
406      * object is used to send a message containing a <CODE>String</CODE>.
407      *
408      * @param text
409      * the string used to initialize this message
410      * @return an ActiveMQTextMessage
411      * @throws JMSException
412      * if the JMS provider fails to create this message due to some
413      * internal error.
414      */

415     public TextMessage JavaDoc createTextMessage(String JavaDoc text) throws JMSException JavaDoc {
416         ActiveMQTextMessage message = new ActiveMQTextMessage();
417         message.setText(text);
418         configureMessage(message);
419         return message;
420     }
421
422     /**
423      * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE>
424      * object is used to send a message containing a <CODE>URL</CODE> which points to some
425      * network addressible BLOB.
426      *
427      * @param url
428      * the network addressable URL used to pass directly to the consumer
429      * @return a BlobMessage
430      * @throws JMSException
431      * if the JMS provider fails to create this message due to some
432      * internal error.
433      */

434     public BlobMessage createBlobMessage(URL JavaDoc url) throws JMSException JavaDoc {
435         return createBlobMessage(url, false);
436     }
437
438
439     /**
440      * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE>
441      * object is used to send a message containing a <CODE>URL</CODE> which points to some
442      * network addressible BLOB.
443      *
444      * @param url
445      * the network addressable URL used to pass directly to the consumer
446      * @param deletedByBroker
447      * indicates whether or not the resource is deleted by the broker when the message
448      * is acknowledged
449      * @return a BlobMessage
450      * @throws JMSException
451      * if the JMS provider fails to create this message due to some
452      * internal error.
453      */

454     public BlobMessage createBlobMessage(URL JavaDoc url, boolean deletedByBroker) throws JMSException JavaDoc {
455         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
456         configureMessage(message);
457         message.setURL(url);
458         message.setDeletedByBroker(deletedByBroker);
459         return message;
460     }
461
462     /**
463      * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE>
464      * object is used to send a message containing the <CODE>File</CODE> content. Before the
465      * message is sent the file conent will be uploaded to the broker or some other remote repository
466      * depending on the {@link #getBlobTransferPolicy()}.
467      *
468      * @param file
469      * the file to be uploaded to some remote repo (or the broker) depending on the strategy
470      *
471      * @return a BlobMessage
472      * @throws JMSException
473      * if the JMS provider fails to create this message due to some
474      * internal error.
475      */

476     public BlobMessage createBlobMessage(File JavaDoc file) throws JMSException JavaDoc {
477         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
478         configureMessage(message);
479         message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
480         message.setDeletedByBroker(true);
481         message.setName(file.getName());
482         return message;
483     }
484
485
486     /**
487      * Creates an initialized <CODE>BlobMessage</CODE> object. A <CODE>BlobMessage</CODE>
488      * object is used to send a message containing the <CODE>File</CODE> content. Before the
489      * message is sent the file conent will be uploaded to the broker or some other remote repository
490      * depending on the {@link #getBlobTransferPolicy()}.
491      *
492      * @param in
493      * the stream to be uploaded to some remote repo (or the broker) depending on the strategy
494      *
495      * @return a BlobMessage
496      * @throws JMSException
497      * if the JMS provider fails to create this message due to some
498      * internal error.
499      */

500     public BlobMessage createBlobMessage(InputStream JavaDoc in) throws JMSException JavaDoc {
501         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
502         configureMessage(message);
503         message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
504         message.setDeletedByBroker(true);
505         return message;
506     }
507
508
509     /**
510      * Indicates whether the session is in transacted mode.
511      *
512      * @return true if the session is in transacted mode
513      * @throws JMSException
514      * if there is some internal error.
515      */

516     public boolean getTransacted() throws JMSException JavaDoc {
517         checkClosed();
518         return ((acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction()));
519     }
520
521     /**
522      * Returns the acknowledgement mode of the session. The acknowledgement mode
523      * is set at the time that the session is created. If the session is
524      * transacted, the acknowledgement mode is ignored.
525      *
526      * @return If the session is not transacted, returns the current
527      * acknowledgement mode for the session. If the session is
528      * transacted, returns SESSION_TRANSACTED.
529      * @throws JMSException
530      * @see javax.jms.Connection#createSession(boolean,int)
531      * @since 1.1 exception JMSException if there is some internal error.
532      */

533     public int getAcknowledgeMode() throws JMSException JavaDoc {
534         checkClosed();
535         return this.acknowledgementMode;
536     }
537
538     /**
539      * Commits all messages done in this transaction and releases any locks
540      * currently held.
541      *
542      * @throws JMSException
543      * if the JMS provider fails to commit the transaction due to
544      * some internal error.
545      * @throws TransactionRolledBackException
546      * if the transaction is rolled back due to some internal error
547      * during commit.
548      * @throws javax.jms.IllegalStateException
549      * if the method is not called by a transacted session.
550      */

551     public void commit() throws JMSException JavaDoc {
552         checkClosed();
553         if (!getTransacted()) {
554             throw new javax.jms.IllegalStateException JavaDoc("Not a transacted session");
555         }
556         transactionContext.commit();
557     }
558
559     /**
560      * Rolls back any messages done in this transaction and releases any locks
561      * currently held.
562      *
563      * @throws JMSException
564      * if the JMS provider fails to roll back the transaction due to
565      * some internal error.
566      * @throws javax.jms.IllegalStateException
567      * if the method is not called by a transacted session.
568      */

569     public void rollback() throws JMSException JavaDoc {
570         checkClosed();
571         if (!getTransacted()) {
572             throw new javax.jms.IllegalStateException JavaDoc("Not a transacted session");
573         }
574         transactionContext.rollback();
575     }
576     
577     /**
578      * Closes the session.
579      * <P>
580      * Since a provider may allocate some resources on behalf of a session
581      * outside the JVM, clients should close the resources when they are not
582      * needed. Relying on garbage collection to eventually reclaim these
583      * resources may not be timely enough.
584      * <P>
585      * There is no need to close the producers and consumers of a closed
586      * session.
587      * <P>
588      * This call will block until a <CODE>receive</CODE> call or message
589      * listener in progress has completed. A blocked message consumer <CODE>receive</CODE>
590      * call returns <CODE>null</CODE> when this session is closed.
591      * <P>
592      * Closing a transacted session must roll back the transaction in progress.
593      * <P>
594      * This method is the only <CODE>Session</CODE> method that can be called
595      * concurrently.
596      * <P>
597      * Invoking any other <CODE>Session</CODE> method on a closed session must
598      * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
599      * closed session must <I>not </I> throw an exception.
600      *
601      * @throws JMSException
602      * if the JMS provider fails to close the session due to some
603      * internal error.
604      */

605     public void close() throws JMSException JavaDoc {
606         if (!closed) {
607             dispose();
608             connection.asyncSendPacket(info.createRemoveCommand());
609         }
610     }
611     
612     void clearMessagesInProgress(){
613         executor.clearMessagesInProgress();
614         for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
615             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
616             consumer.clearMessagesInProgress();
617         }
618     }
619     
620     void deliverAcks(){
621         for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
622             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
623             consumer.deliverAcks();
624         }
625     }
626
627     synchronized public void dispose() throws JMSException JavaDoc {
628         if (!closed) {
629
630             try {
631                 executor.stop();
632
633                 for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
634                     ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
635                     consumer.dispose();
636                 }
637                 consumers.clear();
638
639                 for (Iterator JavaDoc iter = producers.iterator(); iter.hasNext();) {
640                     ActiveMQMessageProducer producer = (ActiveMQMessageProducer) iter.next();
641                     producer.dispose();
642                 }
643                 producers.clear();
644
645                 try {
646                     if (getTransactionContext().isInLocalTransaction()) {
647                         rollback();
648                     }
649                 }
650                 catch (JMSException JavaDoc e) {
651                 }
652
653
654             }
655             finally {
656                 connection.removeSession(this);
657                 this.transactionContext = null;
658                 closed = true;
659             }
660         }
661     }
662
663     /**
664      * Checks that the session is not closed then configures the message
665      */

666     protected void configureMessage(ActiveMQMessage message) throws IllegalStateException JavaDoc {
667         checkClosed();
668         message.setConnection(connection);
669     }
670
671
672     /**
673      * Check if the session is closed. It is used for ensuring that the session
674      * is open before performing various operations.
675      *
676      * @throws IllegalStateException
677      * if the Session is closed
678      */

679     protected void checkClosed() throws IllegalStateException JavaDoc {
680         if (closed) {
681             throw new IllegalStateException JavaDoc("The Session is closed");
682         }
683     }
684
685     /**
686      * Stops message delivery in this session, and restarts message delivery
687      * with the oldest unacknowledged message.
688      * <P>
689      * All consumers deliver messages in a serial order. Acknowledging a
690      * received message automatically acknowledges all messages that have been
691      * delivered to the client.
692      * <P>
693      * Restarting a session causes it to take the following actions:
694      * <UL>
695      * <LI>Stop message delivery
696      * <LI>Mark all messages that might have been delivered but not
697      * acknowledged as "redelivered"
698      * <LI>Restart the delivery sequence including all unacknowledged messages
699      * that had been previously delivered. Redelivered messages do not have to
700      * be delivered in exactly their original delivery order.
701      * </UL>
702      *
703      * @throws JMSException
704      * if the JMS provider fails to stop and restart message
705      * delivery due to some internal error.
706      * @throws IllegalStateException
707      * if the method is called by a transacted session.
708      */

709     public void recover() throws JMSException JavaDoc {
710
711         checkClosed();
712         if (getTransacted()) {
713             throw new IllegalStateException JavaDoc("This session is transacted");
714         }
715
716         for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
717             ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
718             c.rollback();
719         }
720
721     }
722
723     /**
724      * Returns the session's distinguished message listener (optional).
725      *
726      * @return the message listener associated with this session
727      * @throws JMSException
728      * if the JMS provider fails to get the message listener due to
729      * an internal error.
730      * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
731      * @see javax.jms.ServerSessionPool
732      * @see javax.jms.ServerSession
733      */

734     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
735         checkClosed();
736         return this.messageListener;
737     }
738
739     /**
740      * Sets the session's distinguished message listener (optional).
741      * <P>
742      * When the distinguished message listener is set, no other form of message
743      * receipt in the session can be used; however, all forms of sending
744      * messages are still supported.
745      * <P>
746      * This is an expert facility not used by regular JMS clients.
747      *
748      * @param listener
749      * the message listener to associate with this session
750      * @throws JMSException
751      * if the JMS provider fails to set the message listener due to
752      * an internal error.
753      * @see javax.jms.Session#getMessageListener()
754      * @see javax.jms.ServerSessionPool
755      * @see javax.jms.ServerSession
756      */

757     public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc {
758         checkClosed();
759         this.messageListener = listener;
760
761         if (listener != null) {
762             executor.setDispatchedBySessionPool(true);
763         }
764     }
765
766     /**
767      * Optional operation, intended to be used only by Application Servers, not
768      * by ordinary JMS clients.
769      *
770      * @see javax.jms.ServerSession
771      */

772     public void run() {
773         MessageDispatch messageDispatch;
774         while ((messageDispatch = executor.dequeueNoWait()) != null) {
775             final MessageDispatch md = messageDispatch;
776             ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
777             if( message.isExpired() ) {
778                 //TODO: Ack it without delivery to client
779
continue;
780             }
781             
782             if( isClientAcknowledge() ) {
783                 message.setAcknowledgeCallback(new Callback() {
784                     public void execute() throws Exception JavaDoc {
785                     }
786                 });
787             }
788             
789             if (deliveryListener != null) {
790                 deliveryListener.beforeDelivery(this, message);
791             }
792
793             md.setDeliverySequenceId(getNextDeliveryId());
794
795             try {
796                 messageListener.onMessage(message);
797             } catch ( Throwable JavaDoc e ) {
798                 // TODO: figure out proper way to handle error.
799
log.error("error dispatching message: ",e);
800                 connection.onAsyncException(e);
801             }
802
803             try {
804                 MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1);
805                 ack.setFirstMessageId(md.getMessage().getMessageId());
806                 doStartTransaction();
807                 ack.setTransactionId(getTransactionContext().getTransactionId());
808                 if( ack.getTransactionId()!=null ) {
809                     getTransactionContext().addSynchronization(new Synchronization(){
810                         public void afterRollback() throws Exception JavaDoc {
811
812                             md.getMessage().onMessageRolledBack();
813                             
814                             RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
815                             int redeliveryCounter = md.getMessage().getRedeliveryCounter();
816                             if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
817                                     && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
818                                 
819                                 // We need to NACK the messages so that they get sent to the
820
// DLQ.
821

822                                 // Acknowledge the last message.
823
MessageAck ack = new MessageAck(md,MessageAck.POSION_ACK_TYPE,1);
824                                 ack.setFirstMessageId(md.getMessage().getMessageId());
825                                 asyncSendPacket(ack);
826
827                             } else {
828                                 
829                                 // Figure out how long we should wait to resend this message.
830
long redeliveryDelay=0;
831                                 for( int i=0; i < redeliveryCounter; i++) {
832                                     redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
833                                 }
834                                 
835                                 Scheduler.executeAfterDelay(new Runnable JavaDoc() {
836                                     public void run() {
837                                         ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
838                                     }
839                                 }, redeliveryDelay);
840                                 
841                             }
842                         }
843                     });
844                 }
845                 asyncSendPacket(ack);
846             } catch ( Throwable JavaDoc e ) {
847                 connection.onAsyncException(e);
848             }
849
850             if (deliveryListener != null) {
851                 deliveryListener.afterDelivery(this, message);
852             }
853         }
854     }
855
856     /**
857      * Creates a <CODE>MessageProducer</CODE> to send messages to the
858      * specified destination.
859      * <P>
860      * A client uses a <CODE>MessageProducer</CODE> object to send messages to
861      * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
862      * inherit from <CODE>Destination</CODE>, they can be used in the
863      * destination parameter to create a <CODE>MessageProducer</CODE> object.
864      *
865      * @param destination
866      * the <CODE>Destination</CODE> to send to, or null if this is
867      * a producer which does not have a specified destination.
868      * @return the MessageProducer
869      * @throws JMSException
870      * if the session fails to create a MessageProducer due to some
871      * internal error.
872      * @throws InvalidDestinationException
873      * if an invalid destination is specified.
874      * @since 1.1
875      */

876     public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc {
877         checkClosed();
878         return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation
879                 .transformDestination(destination));
880     }
881
882     /**
883      * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
884      * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
885      * <CODE>Destination</CODE>, they can be used in the destination
886      * parameter to create a <CODE>MessageConsumer</CODE>.
887      *
888      * @param destination
889      * the <CODE>Destination</CODE> to access.
890      * @return the MessageConsumer
891      * @throws JMSException
892      * if the session fails to create a consumer due to some
893      * internal error.
894      * @throws InvalidDestinationException
895      * if an invalid destination is specified.
896      * @since 1.1
897      */

898     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
899         checkClosed();
900         return createConsumer(destination, null);
901     }
902
903     /**
904      * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
905      * using a message selector. Since <CODE> Queue</CODE> and <CODE>Topic</CODE>
906      * both inherit from <CODE>Destination</CODE>, they can be used in the
907      * destination parameter to create a <CODE>MessageConsumer</CODE>.
908      * <P>
909      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
910      * that have been sent to a destination.
911      *
912      * @param destination
913      * the <CODE>Destination</CODE> to access
914      * @param messageSelector
915      * only messages with properties matching the message selector
916      * expression are delivered. A value of null or an empty string
917      * indicates that there is no message selector for the message
918      * consumer.
919      * @return the MessageConsumer
920      * @throws JMSException
921      * if the session fails to create a MessageConsumer due to some
922      * internal error.
923      * @throws InvalidDestinationException
924      * if an invalid destination is specified.
925      * @throws InvalidSelectorException
926      * if the message selector is invalid.
927      * @since 1.1
928      */

929     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector) throws JMSException JavaDoc {
930         checkClosed();
931         int prefetch = 0;
932
933         ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
934         if (destination instanceof Topic JavaDoc) {
935             prefetch = prefetchPolicy.getTopicPrefetch();
936         } else {
937             prefetch = prefetchPolicy.getQueuePrefetch();
938         }
939
940         return new ActiveMQMessageConsumer(this, getNextConsumerId(),
941                 ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch,
942                 prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch);
943     }
944
945     /**
946      * @return
947      */

948     protected ConsumerId getNextConsumerId() {
949         return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
950     }
951
952     /**
953      * @return
954      */

955     protected ProducerId getNextProducerId() {
956         return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
957     }
958
959     /**
960      * Creates <CODE>MessageConsumer</CODE> for the specified destination,
961      * using a message selector. This method can specify whether messages
962      * published by its own connection should be delivered to it, if the
963      * destination is a topic.
964      * <P>
965      * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
966      * <CODE>Destination</CODE>, they can be used in the destination
967      * parameter to create a <CODE>MessageConsumer</CODE>.
968      * <P>
969      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
970      * that have been published to a destination.
971      * <P>
972      * In some cases, a connection may both publish and subscribe to a topic.
973      * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
974      * inhibit the delivery of messages published by its own connection. The
975      * default value for this attribute is False. The <CODE>noLocal</CODE>
976      * value must be supported by destinations that are topics.
977      *
978      * @param destination
979      * the <CODE>Destination</CODE> to access
980      * @param messageSelector
981      * only messages with properties matching the message selector
982      * expression are delivered. A value of null or an empty string
983      * indicates that there is no message selector for the message
984      * consumer.
985      * @param NoLocal -
986      * if true, and the destination is a topic, inhibits the delivery
987      * of messages published by its own connection. The behavior for
988      * <CODE>NoLocal</CODE> is not specified if the destination is
989      * a queue.
990      * @return the MessageConsumer
991      * @throws JMSException
992      * if the session fails to create a MessageConsumer due to some
993      * internal error.
994      * @throws InvalidDestinationException
995      * if an invalid destination is specified.
996      * @throws InvalidSelectorException
997      * if the message selector is invalid.
998      * @since 1.1
999      */

1000    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector, boolean NoLocal)
1001            throws JMSException JavaDoc {
1002        checkClosed();
1003        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1004        return new ActiveMQMessageConsumer(this, getNextConsumerId(),
1005                ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector,
1006                prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), NoLocal, false, asyncDispatch);
1007    }
1008
1009    /**
1010     * Creates a queue identity given a <CODE>Queue</CODE> name.
1011     * <P>
1012     * This facility is provided for the rare cases where clients need to
1013     * dynamically manipulate queue identity. It allows the creation of a queue
1014     * identity with a provider-specific name. Clients that depend on this
1015     * ability are not portable.
1016     * <P>
1017     * Note that this method is not for creating the physical queue. The
1018     * physical creation of queues is an administrative task and is not to be
1019     * initiated by the JMS API. The one exception is the creation of temporary
1020     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1021     * method.
1022     *
1023     * @param queueName
1024     * the name of this <CODE>Queue</CODE>
1025     * @return a <CODE>Queue</CODE> with the given name
1026     * @throws JMSException
1027     * if the session fails to create a queue due to some internal
1028     * error.
1029     * @since 1.1
1030     */

1031    public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc {
1032        checkClosed();
1033        return new ActiveMQQueue(queueName);
1034    }
1035
1036    /**
1037     * Creates a topic identity given a <CODE>Topic</CODE> name.
1038     * <P>
1039     * This facility is provided for the rare cases where clients need to
1040     * dynamically manipulate topic identity. This allows the creation of a
1041     * topic identity with a provider-specific name. Clients that depend on this
1042     * ability are not portable.
1043     * <P>
1044     * Note that this method is not for creating the physical topic. The
1045     * physical creation of topics is an administrative task and is not to be
1046     * initiated by the JMS API. The one exception is the creation of temporary
1047     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1048     * method.
1049     *
1050     * @param topicName
1051     * the name of this <CODE>Topic</CODE>
1052     * @return a <CODE>Topic</CODE> with the given name
1053     * @throws JMSException
1054     * if the session fails to create a topic due to some internal
1055     * error.
1056     * @since 1.1
1057     */

1058    public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc {
1059        checkClosed();
1060        return new ActiveMQTopic(topicName);
1061    }
1062
1063    /**
1064     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1065     * the specified queue.
1066     *
1067     * @param queue
1068     * the <CODE>queue</CODE> to access
1069     * @exception InvalidDestinationException
1070     * if an invalid destination is specified
1071     * @since 1.1
1072     */

1073    /**
1074     * Creates a durable subscriber to the specified topic.
1075     * <P>
1076     * If a client needs to receive all the messages published on a topic,
1077     * including the ones published while the subscriber is inactive, it uses a
1078     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1079     * record of this durable subscription and insures that all messages from
1080     * the topic's publishers are retained until they are acknowledged by this
1081     * durable subscriber or they have expired.
1082     * <P>
1083     * Sessions with durable subscribers must always provide the same client
1084     * identifier. In addition, each client must specify a name that uniquely
1085     * identifies (within client identifier) each durable subscription it
1086     * creates. Only one session at a time can have a <CODE>TopicSubscriber</CODE>
1087     * for a particular durable subscription.
1088     * <P>
1089     * A client can change an existing durable subscription by creating a
1090     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1091     * and/or message selector. Changing a durable subscriber is equivalent to
1092     * unsubscribing (deleting) the old one and creating a new one.
1093     * <P>
1094     * In some cases, a connection may both publish and subscribe to a topic.
1095     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1096     * inhibit the delivery of messages published by its own connection. The
1097     * default value for this attribute is false.
1098     *
1099     * @param topic
1100     * the non-temporary <CODE>Topic</CODE> to subscribe to
1101     * @param name
1102     * the name used to identify this subscription
1103     * @return the TopicSubscriber
1104     * @throws JMSException
1105     * if the session fails to create a subscriber due to some
1106     * internal error.
1107     * @throws InvalidDestinationException
1108     * if an invalid topic is specified.
1109     * @since 1.1
1110     */

1111    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name) throws JMSException JavaDoc {
1112        checkClosed();
1113        return createDurableSubscriber(topic, name, null, false);
1114    }
1115
1116    /**
1117     * Creates a durable subscriber to the specified topic, using a message
1118     * selector and specifying whether messages published by its own connection
1119     * should be delivered to it.
1120     * <P>
1121     * If a client needs to receive all the messages published on a topic,
1122     * including the ones published while the subscriber is inactive, it uses a
1123     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1124     * record of this durable subscription and insures that all messages from
1125     * the topic's publishers are retained until they are acknowledged by this
1126     * durable subscriber or they have expired.
1127     * <P>
1128     * Sessions with durable subscribers must always provide the same client
1129     * identifier. In addition, each client must specify a name which uniquely
1130     * identifies (within client identifier) each durable subscription it
1131     * creates. Only one session at a time can have a <CODE>TopicSubscriber</CODE>
1132     * for a particular durable subscription. An inactive durable subscriber is
1133     * one that exists but does not currently have a message consumer associated
1134     * with it.
1135     * <P>
1136     * A client can change an existing durable subscription by creating a
1137     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1138     * and/or message selector. Changing a durable subscriber is equivalent to
1139     * unsubscribing (deleting) the old one and creating a new one.
1140     *
1141     * @param topic
1142     * the non-temporary <CODE>Topic</CODE> to subscribe to
1143     * @param name
1144     * the name used to identify this subscription
1145     * @param messageSelector
1146     * only messages with properties matching the message selector
1147     * expression are delivered. A value of null or an empty string
1148     * indicates that there is no message selector for the message
1149     * consumer.
1150     * @param noLocal
1151     * if set, inhibits the delivery of messages published by its own
1152     * connection
1153     * @return the Queue Browser
1154     * @throws JMSException
1155     * if the session fails to create a subscriber due to some
1156     * internal error.
1157     * @throws InvalidDestinationException
1158     * if an invalid topic is specified.
1159     * @throws InvalidSelectorException
1160     * if the message selector is invalid.
1161     * @since 1.1
1162     */

1163    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic,String JavaDoc name,String JavaDoc messageSelector,boolean noLocal)
1164                    throws JMSException JavaDoc{
1165        checkClosed();
1166        connection.checkClientIDWasManuallySpecified();
1167        ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy();
1168        int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy
1169                        .getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch();
1170        int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit();
1171        return new ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation
1172                        .transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false,
1173                        asyncDispatch);
1174    }
1175
1176    /**
1177     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1178     * the specified queue.
1179     *
1180     * @param queue
1181     * the <CODE>queue</CODE> to access
1182     * @return the Queue Browser
1183     * @throws JMSException
1184     * if the session fails to create a browser due to some internal
1185     * error.
1186     * @throws InvalidDestinationException
1187     * if an invalid destination is specified
1188     * @since 1.1
1189     */

1190    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc {
1191        checkClosed();
1192        return createBrowser(queue, null);
1193    }
1194
1195    /**
1196     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1197     * the specified queue using a message selector.
1198     *
1199     * @param queue
1200     * the <CODE>queue</CODE> to access
1201     * @param messageSelector
1202     * only messages with properties matching the message selector
1203     * expression are delivered. A value of null or an empty string
1204     * indicates that there is no message selector for the message
1205     * consumer.
1206     * @return the Queue Browser
1207     * @throws JMSException
1208     * if the session fails to create a browser due to some internal
1209     * error.
1210     * @throws InvalidDestinationException
1211     * if an invalid destination is specified
1212     * @throws InvalidSelectorException
1213     * if the message selector is invalid.
1214     * @since 1.1
1215     */

1216    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc {
1217        checkClosed();
1218        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation
1219                .transformDestination(queue), messageSelector, asyncDispatch);
1220    }
1221
1222    /**
1223     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1224     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1225     *
1226     * @return a temporary queue identity
1227     * @throws JMSException
1228     * if the session fails to create a temporary queue due to some
1229     * internal error.
1230     * @since 1.1
1231     */

1232    public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
1233        checkClosed();
1234        return (TemporaryQueue JavaDoc) connection.createTempDestination(false);
1235    }
1236
1237    /**
1238     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1239     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1240     *
1241     * @return a temporary topic identity
1242     * @throws JMSException
1243     * if the session fails to create a temporary topic due to some
1244     * internal error.
1245     * @since 1.1
1246     */

1247    public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
1248        checkClosed();
1249        return (TemporaryTopic JavaDoc)connection.createTempDestination(true);
1250    }
1251
1252    /**
1253     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1254     * the specified queue.
1255     *
1256     * @param queue
1257     * the <CODE>Queue</CODE> to access
1258     * @return
1259     * @throws JMSException
1260     * if the session fails to create a receiver due to some
1261     * internal error.
1262     * @throws JMSException
1263     * @throws InvalidDestinationException
1264     * if an invalid queue is specified.
1265     */

1266    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue) throws JMSException JavaDoc {
1267        checkClosed();
1268        return createReceiver(queue, null);
1269    }
1270
1271    /**
1272     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1273     * the specified queue using a message selector.
1274     *
1275     * @param queue
1276     * the <CODE>Queue</CODE> to access
1277     * @param messageSelector
1278     * only messages with properties matching the message selector
1279     * expression are delivered. A value of null or an empty string
1280     * indicates that there is no message selector for the message
1281     * consumer.
1282     * @return QueueReceiver
1283     * @throws JMSException
1284     * if the session fails to create a receiver due to some
1285     * internal error.
1286     * @throws InvalidDestinationException
1287     * if an invalid queue is specified.
1288     * @throws InvalidSelectorException
1289     * if the message selector is invalid.
1290     */

1291    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc {
1292        checkClosed();
1293        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1294        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation
1295                .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1296                prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1297    }
1298
1299    /**
1300     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1301     * specified queue.
1302     *
1303     * @param queue
1304     * the <CODE>Queue</CODE> to access, or null if this is an
1305     * unidentified producer
1306     * @return QueueSender
1307     * @throws JMSException
1308     * if the session fails to create a sender due to some internal
1309     * error.
1310     * @throws InvalidDestinationException
1311     * if an invalid queue is specified.
1312     */

1313    public QueueSender JavaDoc createSender(Queue JavaDoc queue) throws JMSException JavaDoc {
1314        checkClosed();
1315        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
1316    }
1317
1318    /**
1319     * Creates a nondurable subscriber to the specified topic. <p/>
1320     * <P>
1321     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1322     * that have been published to a topic. <p/>
1323     * <P>
1324     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1325     * receive only messages that are published while they are active. <p/>
1326     * <P>
1327     * In some cases, a connection may both publish and subscribe to a topic.
1328     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1329     * inhibit the delivery of messages published by its own connection. The
1330     * default value for this attribute is false.
1331     *
1332     * @param topic
1333     * the <CODE>Topic</CODE> to subscribe to
1334     * @return TopicSubscriber
1335     * @throws JMSException
1336     * if the session fails to create a subscriber due to some
1337     * internal error.
1338     * @throws InvalidDestinationException
1339     * if an invalid topic is specified.
1340     */

1341    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic) throws JMSException JavaDoc {
1342        checkClosed();
1343        return createSubscriber(topic, null, false);
1344    }
1345
1346    /**
1347     * Creates a nondurable subscriber to the specified topic, using a message
1348     * selector or specifying whether messages published by its own connection
1349     * should be delivered to it. <p/>
1350     * <P>
1351     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1352     * that have been published to a topic. <p/>
1353     * <P>
1354     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1355     * receive only messages that are published while they are active. <p/>
1356     * <P>
1357     * Messages filtered out by a subscriber's message selector will never be
1358     * delivered to the subscriber. From the subscriber's perspective, they do
1359     * not exist. <p/>
1360     * <P>
1361     * In some cases, a connection may both publish and subscribe to a topic.
1362     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1363     * inhibit the delivery of messages published by its own connection. The
1364     * default value for this attribute is false.
1365     *
1366     * @param topic
1367     * the <CODE>Topic</CODE> to subscribe to
1368     * @param messageSelector
1369     * only messages with properties matching the message selector
1370     * expression are delivered. A value of null or an empty string
1371     * indicates that there is no message selector for the message
1372     * consumer.
1373     * @param noLocal
1374     * if set, inhibits the delivery of messages published by its own
1375     * connection
1376     * @return TopicSubscriber
1377     * @throws JMSException
1378     * if the session fails to create a subscriber due to some
1379     * internal error.
1380     * @throws InvalidDestinationException
1381     * if an invalid topic is specified.
1382     * @throws InvalidSelectorException
1383     * if the message selector is invalid.
1384     */

1385    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc {
1386        checkClosed();
1387        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1388        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
1389                .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(),
1390                prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1391    }
1392
1393    /**
1394     * Creates a publisher for the specified topic. <p/>
1395     * <P>
1396     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1397     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1398     * a topic, it defines a new sequence of messages that have no ordering
1399     * relationship with the messages it has previously sent.
1400     *
1401     * @param topic
1402     * the <CODE>Topic</CODE> to publish to, or null if this is an
1403     * unidentified producer
1404     * @return TopicPublisher
1405     * @throws JMSException
1406     * if the session fails to create a publisher due to some
1407     * internal error.
1408     * @throws InvalidDestinationException
1409     * if an invalid topic is specified.
1410     */

1411    public TopicPublisher JavaDoc createPublisher(Topic JavaDoc topic) throws JMSException JavaDoc {
1412        checkClosed();
1413        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
1414    }
1415
1416    /**
1417     * Unsubscribes a durable subscription that has been created by a client.
1418     * <P>
1419     * This method deletes the state being maintained on behalf of the
1420     * subscriber by its provider.
1421     * <P>
1422     * It is erroneous for a client to delete a durable subscription while there
1423     * is an active <CODE>MessageConsumer </CODE> or <CODE>TopicSubscriber</CODE>
1424     * for the subscription, or while a consumed message is part of a pending
1425     * transaction or has not been acknowledged in the session.
1426     *
1427     * @param name
1428     * the name used to identify this subscription
1429     * @throws JMSException
1430     * if the session fails to unsubscribe to the durable
1431     * subscription due to some internal error.
1432     * @throws InvalidDestinationException
1433     * if an invalid subscription name is specified.
1434     * @since 1.1
1435     */

1436    public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
1437        checkClosed();
1438        connection.unsubscribe(name);
1439    }
1440
1441    
1442    public void dispatch(MessageDispatch messageDispatch) {
1443        try {
1444            executor.execute(messageDispatch);
1445        } catch (InterruptedException JavaDoc e) {
1446            Thread.currentThread().interrupt();
1447            connection.onAsyncException(e);
1448        }
1449    }
1450    
1451    
1452    
1453    /**
1454     * Acknowledges all consumed messages of the session of this consumed
1455     * message.
1456     * <P>
1457     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1458     * for use when a client has specified that its JMS session's consumed
1459     * messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE>
1460     * on a consumed message, a client acknowledges all messages consumed by the
1461     * session that the message was delivered to.
1462     * <P>
1463     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1464     * sessions and sessions specified to use implicit acknowledgement modes.
1465     * <P>
1466     * A client may individually acknowledge each message as it is consumed, or
1467     * it may choose to acknowledge messages as an application-defined group
1468     * (which is done by calling acknowledge on the last received message of the
1469     * group, thereby acknowledging all messages consumed by the session.)
1470     * <P>
1471     * Messages that have been received but not acknowledged may be redelivered.
1472     *
1473     * @throws JMSException
1474     * if the JMS provider fails to acknowledge the messages due to
1475     * some internal error.
1476     * @throws javax.jms.IllegalStateException
1477     * if this method is called on a closed session.
1478     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1479     */

1480    public void acknowledge() throws JMSException JavaDoc {
1481        for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
1482            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
1483            c.acknowledge();
1484        }
1485    }
1486
1487   
1488    /**
1489     * Add a message consumer.
1490     *
1491     * @param consumer -
1492     * message consumer.
1493     * @throws JMSException
1494     */

1495    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException JavaDoc {
1496        this.consumers.add(consumer);
1497        if (consumer.isDurableSubscriber()) {
1498            stats.onCreateDurableSubscriber();
1499        }
1500        this.connection.addDispatcher(consumer.getConsumerId(), this);
1501    }
1502
1503    /**
1504     * Remove the message consumer.
1505     *
1506     * @param consumer -
1507     * consumer to be removed.
1508     * @throws JMSException
1509     */

1510    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1511        this.connection.removeDispatcher(consumer.getConsumerId());
1512        if (consumer.isDurableSubscriber()) {
1513            stats.onRemoveDurableSubscriber();
1514        }
1515        this.consumers.remove(consumer);
1516    }
1517
1518    /**
1519     * Adds a message producer.
1520     *
1521     * @param producer -
1522     * message producer to be added.
1523     * @throws JMSException
1524     */

1525    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException JavaDoc {
1526        this.producers.add(producer);
1527        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1528    }
1529
1530    /**
1531     * Removes a message producer.
1532     *
1533     * @param producer -
1534     * message producer to be removed.
1535     * @throws JMSException
1536     */

1537    protected void removeProducer(ActiveMQMessageProducer producer) {
1538        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1539        this.producers.remove(producer);
1540    }
1541
1542    /**
1543     * Start this Session.
1544     *
1545     * @throws JMSException
1546     */

1547    protected void start() throws JMSException JavaDoc {
1548        started.set(true);
1549        for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
1550            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
1551            c.start();
1552        }
1553        executor.start();
1554    }
1555
1556    /**
1557     * Stops this session.
1558     * @throws JMSException
1559     */

1560    protected void stop() throws JMSException JavaDoc {
1561        
1562        for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
1563            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
1564            c.stop();
1565        }
1566
1567        started.set(false);
1568        executor.stop();
1569    }
1570
1571    /**
1572     * Returns the session id.
1573     *
1574     * @return value - session id.
1575     */

1576    protected SessionId getSessionId() {
1577        return info.getSessionId();
1578    }
1579
1580    /**
1581     * Sends the message for dispatch by the broker.
1582     *
1583     * @param producer -
1584     * message producer.
1585     * @param destination -
1586     * message destination.
1587     * @param message -
1588     * message to be sent.
1589     * @param deliveryMode -
1590     * JMS messsage delivery mode.
1591     * @param priority -
1592     * message priority.
1593     * @param timeToLive -
1594     * message expiration.
1595     * @param producerWindow
1596     * @throws JMSException
1597     */

1598    protected void send(ActiveMQMessageProducer producer,
1599            ActiveMQDestination destination,Message message,int deliveryMode,
1600            int priority,long timeToLive, UsageManager producerWindow) throws JMSException JavaDoc{
1601        
1602        checkClosed();
1603        if(destination.isTemporary()&&connection.isDeleted(destination)){
1604            throw new JMSException JavaDoc("Cannot publish to a deleted Destination: "
1605                    +destination);
1606        }
1607        synchronized(sendMutex){
1608            // tell the Broker we are about to start a new transaction
1609
doStartTransaction();
1610            TransactionId txid=transactionContext.getTransactionId();
1611            message.setJMSDestination(destination);
1612            message.setJMSDeliveryMode(deliveryMode);
1613            long expiration=0L;
1614            if(!producer.getDisableMessageTimestamp()){
1615                long timeStamp=System.currentTimeMillis();
1616                message.setJMSTimestamp(timeStamp);
1617                if(timeToLive>0){
1618                    expiration=timeToLive+timeStamp;
1619                }
1620            }
1621            message.setJMSExpiration(expiration);
1622            message.setJMSPriority(priority);
1623            long sequenceNumber=producer.getMessageSequence();
1624            message.setJMSRedelivered(false);
1625            // transform to our own message format here
1626
ActiveMQMessage msg=ActiveMQMessageTransformation.transformMessage(
1627                    message,connection);
1628            // Set the message id.
1629
if(msg==message){
1630                msg.setMessageId(new MessageId(producer.getProducerInfo()
1631                        .getProducerId(),sequenceNumber));
1632            }else{
1633                msg.setMessageId(new MessageId(producer.getProducerInfo()
1634                        .getProducerId(),sequenceNumber));
1635                message.setJMSMessageID(msg.getMessageId().toString());
1636            }
1637            msg.setTransactionId(txid);
1638            if(connection.isCopyMessageOnSend()){
1639                msg=(ActiveMQMessage)msg.copy();
1640            }
1641            msg.setConnection(connection);
1642            msg.onSend();
1643            msg.setProducerId(msg.getMessageId().getProducerId());
1644            if(this.debug){
1645                log.debug("Sending message: "+msg);
1646            }
1647            if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){
1648                this.connection.asyncSendPacket(msg);
1649                if( producerWindow!=null ) {
1650                    // Since we defer lots of the marshaling till we hit the wire, this might not
1651
// provide and accurate size. We may change over to doing more aggressive marshaling,
1652
// to get more accurate sizes.. this is more important once users start using producer window
1653
// flow control.
1654
int size = msg.getSize();
1655                    producerWindow.increaseUsage(size);
1656                }
1657            }else{
1658                this.connection.syncSendPacket(msg);
1659            }
1660
1661        }
1662    }
1663
1664    /**
1665     * Send TransactionInfo to indicate transaction has started
1666     *
1667     * @throws JMSException
1668     * if some internal error occurs
1669     */

1670    protected void doStartTransaction() throws JMSException JavaDoc{
1671        if(getTransacted()&&!transactionContext.isInXATransaction()){
1672            transactionContext.begin();
1673        }
1674    }
1675
1676    /**
1677     * Checks whether the session has unconsumed messages.
1678     *
1679     * @return true - if there are unconsumed messages.
1680     */

1681    public boolean hasUncomsumedMessages() {
1682        return executor.hasUncomsumedMessages();
1683    }
1684
1685    /**
1686     * Checks whether the session uses transactions.
1687     *
1688     * @return true - if the session uses transactions.
1689     */

1690    public boolean isTransacted() {
1691        return this.acknowledgementMode == Session.SESSION_TRANSACTED;
1692    }
1693
1694    /**
1695     * Checks whether the session used client acknowledgment.
1696     *
1697     * @return true - if the session uses client acknowledgment.
1698     */

1699    protected boolean isClientAcknowledge() {
1700        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1701    }
1702    
1703    /**
1704     * Checks whether the session used auto acknowledgment.
1705     *
1706     * @return true - if the session uses client acknowledgment.
1707     */

1708    public boolean isAutoAcknowledge() {
1709        return acknowledgementMode==Session.AUTO_ACKNOWLEDGE;
1710    }
1711    
1712    /**
1713     * Checks whether the session used dup ok acknowledgment.
1714     *
1715     * @return true - if the session uses client acknowledgment.
1716     */

1717    public boolean isDupsOkAcknowledge() {
1718        return acknowledgementMode==Session.DUPS_OK_ACKNOWLEDGE;
1719    }
1720
1721    /**
1722     * Returns the message delivery listener.
1723     *
1724     * @return deliveryListener - message delivery listener.
1725     */

1726    public DeliveryListener getDeliveryListener() {
1727        return deliveryListener;
1728    }
1729
1730    /**
1731     * Sets the message delivery listener.
1732     *
1733     * @param deliveryListener -
1734     * message delivery listener.
1735     */

1736    public void setDeliveryListener(DeliveryListener deliveryListener) {
1737        this.deliveryListener = deliveryListener;
1738    }
1739
1740    /**
1741     * Returns the SessionInfo bean.
1742     *
1743     * @return info - SessionInfo bean.
1744     * @throws JMSException
1745     */

1746    protected SessionInfo getSessionInfo() throws JMSException JavaDoc {
1747        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1748        return info;
1749    }
1750
1751    /**
1752     * Send the asynchronus command.
1753     *
1754     * @param command -
1755     * command to be executed.
1756     * @throws JMSException
1757     */

1758    public void asyncSendPacket(Command command) throws JMSException JavaDoc {
1759        connection.asyncSendPacket(command);
1760    }
1761
1762    /**
1763     * Send the synchronus command.
1764     *
1765     * @param command -
1766     * command to be executed.
1767     * @return Response
1768     * @throws JMSException
1769     */

1770    public Response syncSendPacket(Command command) throws JMSException JavaDoc {
1771        return connection.syncSendPacket(command);
1772    }
1773
1774    public long getNextDeliveryId() {
1775        return deliveryIdGenerator.getNextSequenceId();
1776    }
1777
1778    public void redispatch(MessageDispatchChannel unconsumedMessages) throws JMSException JavaDoc {
1779        
1780        List JavaDoc c = unconsumedMessages.removeAll();
1781        Collections.reverse(c);
1782        
1783        for (Iterator JavaDoc iter = c.iterator(); iter.hasNext();) {
1784            MessageDispatch md = (MessageDispatch) iter.next();
1785            executor.executeFirst(md);
1786        }
1787                
1788    }
1789
1790    public boolean isRunning() {
1791        return started.get();
1792    }
1793
1794    public boolean isAsyncDispatch() {
1795        return asyncDispatch;
1796    }
1797
1798    public void setAsyncDispatch(boolean asyncDispatch) {
1799        this.asyncDispatch = asyncDispatch;
1800    }
1801    
1802    /**
1803     * @return Returns the sessionAsyncDispatch.
1804     */

1805    public boolean isSessionAsyncDispatch(){
1806        return sessionAsyncDispatch;
1807    }
1808
1809    /**
1810     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1811     */

1812    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch){
1813        this.sessionAsyncDispatch=sessionAsyncDispatch;
1814    }
1815
1816    public MessageTransformer getTransformer() {
1817        return transformer;
1818    }
1819
1820    /**
1821     * Sets the transformer used to transform messages before they are sent on to the JMS bus
1822     * or when they are received from the bus but before they are delivered to the JMS client
1823     */

1824    public void setTransformer(MessageTransformer transformer) {
1825        this.transformer = transformer;
1826    }
1827
1828    public BlobTransferPolicy getBlobTransferPolicy() {
1829        return blobTransferPolicy;
1830    }
1831
1832    /**
1833     * Sets the policy used to describe how out-of-band BLOBs (Binary Large OBjects)
1834     * are transferred from producers to brokers to consumers
1835     */

1836    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1837        this.blobTransferPolicy = blobTransferPolicy;
1838    }
1839
1840    public List JavaDoc getUnconsumedMessages() {
1841        return executor.getUnconsumedMessages();
1842    }
1843
1844
1845    public String JavaDoc toString() {
1846        return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}";
1847    }
1848
1849    public void checkMessageListener() throws JMSException JavaDoc {
1850        if (messageListener != null) {
1851            throw new IllegalStateException JavaDoc("Cannot synchronously receive a message when a MessageListener is set");
1852        }
1853        for (Iterator JavaDoc i = consumers.iterator(); i.hasNext();) {
1854            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1855            if( consumer.getMessageListener()!=null ) {
1856                throw new IllegalStateException JavaDoc("Cannot synchronously receive a message when a MessageListener is set");
1857            }
1858        }
1859    }
1860    
1861    protected void setOptimizeAcknowledge(boolean value){
1862        for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
1863            ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
1864            c.setOptimizeAcknowledge(value);
1865        }
1866    }
1867    
1868    protected void setPrefetchSize(ConsumerId id,int prefetch){
1869        for(Iterator JavaDoc iter=consumers.iterator();iter.hasNext();){
1870            ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
1871            if(c.getConsumerId().equals(id)){
1872                c.setPrefetchSize(prefetch);
1873                break;
1874            }
1875        }
1876    }
1877    
1878    protected void close(ConsumerId id){
1879        for(Iterator JavaDoc iter=consumers.iterator();iter.hasNext();){
1880            ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
1881            if(c.getConsumerId().equals(id)){
1882                try{
1883                    c.close();
1884                }catch(JMSException JavaDoc e){
1885                    log.warn("Exception closing consumer",e);
1886                }
1887                log.warn("Closed consumer on Command");
1888                break;
1889            }
1890        }
1891    }
1892
1893    public boolean isInUse(ActiveMQTempDestination destination) {
1894        for(Iterator JavaDoc iter=consumers.iterator();iter.hasNext();){
1895            ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
1896            if( c.isInUse(destination) ) {
1897                return true;
1898            }
1899        }
1900        return false;
1901    }
1902
1903}
1904
Popular Tags