KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > server > JmsServerSession


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: JmsServerSession.java,v 1.2 2005/03/18 04:07:02 tanderson Exp $
44  */

45 package org.exolab.jms.server;
46
47 import java.rmi.RemoteException JavaDoc;
48 import java.util.ArrayList JavaDoc;
49 import java.util.HashMap JavaDoc;
50 import java.util.Iterator JavaDoc;
51 import java.util.List JavaDoc;
52 import java.util.Vector JavaDoc;
53 import javax.jms.DeliveryMode JavaDoc;
54 import javax.jms.InvalidDestinationException JavaDoc;
55 import javax.jms.JMSException JavaDoc;
56 import javax.jms.Session JavaDoc;
57 import javax.transaction.xa.XAException JavaDoc;
58 import javax.transaction.xa.XAResource JavaDoc;
59 import javax.transaction.xa.Xid JavaDoc;
60
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63
64 import org.exolab.jms.client.JmsDestination;
65 import org.exolab.jms.client.JmsMessageListener;
66 import org.exolab.jms.client.JmsQueue;
67 import org.exolab.jms.client.JmsTopic;
68 import org.exolab.jms.message.MessageImpl;
69 import org.exolab.jms.messagemgr.ConsumerEndpoint;
70 import org.exolab.jms.messagemgr.ConsumerManager;
71 import org.exolab.jms.messagemgr.DestinationManager;
72 import org.exolab.jms.messagemgr.ConsumerEndpointListener;
73 import org.exolab.jms.messagemgr.MessageHandle;
74 import org.exolab.jms.messagemgr.MessageMgr;
75 import org.exolab.jms.messagemgr.QueueBrowserEndpoint;
76 import org.exolab.jms.messagemgr.ResourceManager;
77 import org.exolab.jms.messagemgr.ResourceManagerException;
78 import org.exolab.jms.server.ServerSession;
79 import org.exolab.jms.server.JmsServerConnection;
80 import org.exolab.jms.server.SentMessageCache;
81
82
83 /**
84  * A session represents a server side endpoint to the JMSServer. A client can
85  * create producers, consumers and destinations through the session in addi-
86  * tion to other functions. A session has a unique identifer which is a comb-
87  * ination of clientId-connectionId-sessionId.
88  * <p/>
89  * A session represents a single-threaded context which implies that it cannot
90  * be used with more than one thread concurrently. Threads registered with this
91  * session are synchronized.
92  * <p/>
93  * Finally, instances of this object can only be created by classes within the
94  * same package.
95  *
96  * @author <a HREF="mailto:jima@exoffice.com">Jim Alateras</a>
97  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
98  * @version $Revision: 1.2 $ $Date: 2005/03/18 04:07:02 $
99  * @see JmsServerConnection
100  */

101 public class JmsServerSession
102         implements ServerSession, ConsumerEndpointListener, XAResource JavaDoc {
103
104     /**
105      * Back pointer to the connection that created this session. This is set
106      * during object creation time
107      */

108     private JmsServerConnection _connection = null;
109
110     /**
111      * Maintain a set of ConsumerEndpoint instances, keyed on id
112      */

113     private HashMap JavaDoc _consumers = new HashMap JavaDoc();
114
115     /**
116      * The message listener is the reference to a remote client that will
117      * receive the messages
118      */

119     private JmsMessageListener _listener = null;
120
121     /**
122      * This is the acknowledgement mode for the session
123      */

124     private int _ackMode = Session.AUTO_ACKNOWLEDGE;
125
126     /**
127      * Indicates whether the session is transactional
128      */

129     private boolean _transacted = false;
130
131     /**
132      * Holds the current xid that this session is associated with. A session can
133      * olny be associated with one xid at any one time.
134      */

135     private Xid JavaDoc _xid = null;
136
137     /**
138      * Indicates if the underlying connection of this session has been stopped
139      */

140     private boolean _stopped = true;
141
142     /**
143      * Indicated that the session has been closed
144      */

145     private boolean _closed = false;
146
147     /**
148      * Caches all sent messages
149      */

150     private SentMessageCache _sentMessageCache;
151
152     /**
153      * The logger
154      */

155     private static final Log _log = LogFactory.getLog(JmsServerSession.class);
156
157
158     /**
159      * Construct a new <code>JmsServerSession</code>
160      *
161      * @param connection the connection that created this session
162      * @param ackMode the acknowledgement mode for the session
163      * @param transacted <code>true</code> if the session is transactional
164      */

165     public JmsServerSession(JmsServerConnection connection, int ackMode,
166                                boolean transacted) {
167         _connection = connection;
168         _ackMode = ackMode;
169         _transacted = transacted;
170         _stopped = true;
171         _sentMessageCache = new SentMessageCache(this);
172     }
173
174     /**
175      * Returns the identifier of the connection that created this session
176      *
177      * @return the connection identifier
178      */

179     public long getConnectionId() {
180         return _connection.getConnectionId();
181     }
182
183     /**
184      * Acknowledge that a message has been processed
185      *
186      * @param consumerId the identity of the consumer performing the ack
187      * @param messageId the message identifier
188      * @throws JMSException for any error
189      */

190     public void acknowledgeMessage(long consumerId, String JavaDoc messageId)
191             throws JMSException JavaDoc {
192         _sentMessageCache.acknowledgeMessage(messageId, consumerId);
193     }
194
195     /**
196      * Send a message
197      *
198      * @param message the message to send
199      * @throws JMSException for any error
200      */

201     public void send(MessageImpl message) throws JMSException JavaDoc {
202         if (message == null) {
203             throw new JMSException JavaDoc("Message is null");
204         }
205
206         try {
207             // check the delivery mode of the message
208
checkDeliveryMode((MessageImpl) message);
209
210             // set the connection identity and then let the message manager
211
// process it
212
((MessageImpl) message).setConnectionId(_connection.getConnectionId());
213
214             // if there is a global transaction currently in process then
215
// we must send the message to the resource manager, otherwise
216
// send it directly to the message manager
217
if (_xid != null) {
218                 ResourceManager.instance().logPublishedMessage(_xid,
219                                                                (MessageImpl) message);
220             } else {
221                 MessageMgr.instance().add((MessageImpl) message);
222             }
223         } catch (JMSException JavaDoc exception) {
224             _log.error("Failed to process message", exception);
225             throw exception;
226         } catch (OutOfMemoryError JavaDoc exception) {
227             String JavaDoc msg =
228                     "Failed to process message due to out-of-memory error";
229             _log.error(msg, exception);
230             throw new JMSException JavaDoc(msg);
231         } catch (Exception JavaDoc exception) {
232             String JavaDoc msg = "Failed to process message";
233             _log.error(msg, exception);
234             throw new JMSException JavaDoc(msg);
235         }
236     }
237
238     /**
239      * Send a set of messages
240      *
241      * @param messages a list of <code>MessageImpl</code> instances
242      * @throws JMSException for any JMS error
243      */

244     public void send(List JavaDoc messages) throws JMSException JavaDoc {
245         if (messages == null) {
246             throw new JMSException JavaDoc("Argument 'messages' is null");
247         }
248
249         Iterator JavaDoc iterator = messages.iterator();
250         while (iterator.hasNext()) {
251             MessageImpl message = (MessageImpl) iterator.next();
252             send(message);
253         }
254     }
255
256     /**
257      * Return the next available message to the specified consumer.
258      * <p/>
259      * The <code>wait</code> parameter indicates how many milliseconds to wait
260      * for a message before returning. If <code>wait</code> is <code>0</code>
261      * then do not wait. If <code>wait</code> is <code>-1</code> then wait
262      * indefinitely for the next message.
263      *
264      * @param consumerId the consumer identifier
265      * @param wait number of milliseconds to wait
266      * @return the next message or <code>null</code>
267      * @throws JMSException for any JMS error
268      */

269     public MessageImpl receive(long consumerId, long wait)
270             throws JMSException JavaDoc {
271         MessageImpl message = null;
272         ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
273         if (consumer == null) {
274             throw new JMSException JavaDoc("Can't receive message: no consumer registered with "
275                                    + "identifier "
276                                    + consumerId
277                                    + " on session");
278         }
279
280         // got a valid consumer, so retrieve a handle.
281
MessageHandle handle = consumer.receive(wait);
282
283         if (handle != null) {
284             // if we get a non-null handle, retrieve the message
285
MessageImpl orig = handle.getMessage();
286             if (orig != null) {
287                 // now clone the message to set client specific properties
288
try {
289                     message = (MessageImpl) orig.clone();
290                     message.setJMSRedelivered(handle.getDelivered());
291                     message.setConsumerId(handle.getConsumerId());
292                 } catch (Exception JavaDoc exception) {
293                     _log.error(exception);
294                 }
295             }
296         }
297
298         // if we have a non-null message then add it to the sent message
299
// cache. Additionally, if we are part of a global transaction then
300
// we must also sent it to the ResourceManager for recovery.
301
if (message != null) {
302             _sentMessageCache.process(handle);
303
304             if (_xid != null) {
305                 try {
306                     ResourceManager.instance().logReceivedMessage(_xid,
307                                                                   consumer.getId(),
308                                                                   handle);
309                 } catch (Exception JavaDoc exception) {
310                     _log.error(exception);
311                     JMSException JavaDoc error = new JMSException JavaDoc("Error in receive");
312                     error.setLinkedException(exception);
313                     throw error;
314                 }
315             }
316         }
317
318         return message;
319     }
320
321     /**
322      * Browse up to count messages
323      *
324      * @param consumerId the consumer identifier
325      * @param count the maximum number of messages to receive
326      * @return a list of {@link MessageImpl} instances
327      * @throws JMSException for any JMS error
328      */

329     public List JavaDoc browse(long consumerId, int count) throws JMSException JavaDoc {
330         ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
331         if (consumer == null) {
332             throw new JMSException JavaDoc("Can't browse messages: no browser registered with "
333                                    + "identifier "
334                                    + consumerId
335                                    + " on session");
336         }
337         if (!(consumer instanceof QueueBrowserEndpoint)) {
338             throw new JMSException JavaDoc("Can't browse messages: invalid consumer");
339         }
340
341         Vector JavaDoc handles = ((QueueBrowserEndpoint) consumer).receiveMessages(
342                 count);
343         List JavaDoc messages = new ArrayList JavaDoc(count);
344
345         Iterator JavaDoc iterator = handles.iterator();
346         while (iterator.hasNext()) {
347             MessageHandle handle = (MessageHandle) iterator.next();
348             MessageImpl orig = handle.getMessage();
349             if (orig != null) {
350                 // clone the message to set client specific properties
351
try {
352                     MessageImpl message = (MessageImpl) orig.clone();
353                     message.setJMSRedelivered(handle.getDelivered());
354                     message.setConsumerId(handle.getConsumerId());
355                     messages.add(message);
356                 } catch (Exception JavaDoc exception) {
357                     _log.error(exception);
358                 }
359                 if (messages.size() == count) {
360                     break;
361                 }
362             }
363         }
364         return messages;
365     }
366
367     /**
368      * Create a new message consumer
369      *
370      * @param destination the destination to consume messages from
371      * @param selector the message selector. May be <code>null</code>
372      * @param noLocal if true, and the destination is a topic, inhibits the
373      * delivery of messages published by its own connection.
374      * The behavior for <code>noLocal</code> is not specified
375      * if the destination is a queue.
376      * @return the identifty of the message consumer
377      * @throws JMSException for any JMS error
378      */

379     public long createConsumer(JmsDestination destination, String JavaDoc selector,
380                                  boolean noLocal) throws JMSException JavaDoc {
381         if (_log.isDebugEnabled()) {
382             _log.debug("createConsumer(destination=" + destination
383                        + ", selector=" + selector + ", noLocal=" + noLocal
384                        + ") [session=" + this + "]");
385         }
386
387         if (destination == null) {
388             throw new InvalidDestinationException JavaDoc(
389                     "Cannot create MessageConsumer for null destination");
390         }
391
392         // Retrieve the destination from the destination manager and use
393
// it to create the consumer
394
ConsumerEndpoint consumer =
395                 ConsumerManager.instance().createConsumerEndpoint(this,
396                                                                   destination,
397                                                                   selector, noLocal);
398         final long id = consumer.getId();
399         consumer.setStopped(_stopped);
400         _consumers.put(new Long JavaDoc(id), consumer);
401         return id;
402     }
403
404     /**
405      * Create a new durable consumer. Durable consumers may only consume from
406      * non-temporary <code>Topic</code> destinations.
407      *
408      * @param topic the non-temporary <code>Topic</code> to subscribe to
409      * @param name the name used to identify this subscription
410      * @param selector only messages with properties matching the message
411      * selector expression are delivered. A value of null or an
412      * empty string indicates that there is no message selector
413      * for the message consumer.
414      * @param noLocal if set, inhibits the delivery of messages published by
415      * its own connection
416      * @return the identity of the durable consumer
417      * @throws JMSException for any JMS error
418      */

419     public long createDurableConsumer(JmsTopic topic, String JavaDoc name,
420                                         String JavaDoc selector, boolean noLocal)
421             throws JMSException JavaDoc {
422         if (_log.isDebugEnabled()) {
423             _log.debug("createDurableConsumer(topic=" + topic + ", name="
424                        + name
425                        + ", selector=" + selector + ", noLocal=" + noLocal
426                        + ") [session=" + this + "]");
427         }
428
429         if (topic == null || topic.isTemporaryDestination()) {
430             throw new InvalidDestinationException JavaDoc("Invalid topic: " + topic);
431         }
432
433         if (name == null) {
434             throw new InvalidDestinationException JavaDoc("Invalid subscription name");
435         }
436
437         ConsumerManager manager = ConsumerManager.instance();
438
439         if (manager.durableConsumerExists(name)) {
440             // if the durable consumer exists then validate that
441
// it was the specified topic that it was registered
442
// under. If it is not registered for the topic then
443
// we must delete the existing entry and recreate it
444
// against the new topic
445
if (!manager.validSubscription(topic.getName(), name)) {
446                 unsubscribe(name);
447                 manager.createDurableConsumer(topic, name);
448             }
449         } else {
450             // the durable consumer does not exist. so create
451
// it
452
manager.createDurableConsumer(topic, name);
453         }
454
455         // if a durable subscriber with the specified name is
456
// already active then this method will throw an exception.
457
// attempt to create a durable consuinmer
458
ConsumerEndpoint consumer = manager.createDurableConsumerEndpoint(this,
459                                                                           topic,
460                                                                           name,
461                                                                           noLocal,
462                                                                           selector);
463         final long id = consumer.getId();
464         consumer.setStopped(_stopped);
465         _consumers.put(new Long JavaDoc(id), consumer);
466         return id;
467     }
468
469     /**
470      * Create a queue browser for this session. This allows clients to browse a
471      * queue without removing any messages.
472      *
473      * @param queue the queue to browse
474      * @param selector the message selector. May be <code>null</code>
475      * @return the identity of the queue browser
476      * @throws JMSException for any JMS error
477      */

478     public long createBrowser(JmsQueue queue, String JavaDoc selector)
479             throws JMSException JavaDoc {
480         if (_log.isDebugEnabled()) {
481             _log.debug("createBrowser(queue=" + queue + ", selector="
482                        + selector
483                        + ") [session=" + this + "]");
484         }
485
486         if (queue == null) {
487             throw new JMSException JavaDoc("Cannot create QueueBrowser for null queue");
488         }
489
490         ConsumerEndpoint consumer =
491                 ConsumerManager.instance().createQueueBrowserEndpoint(this,
492                                                                       queue,
493                                                                       selector);
494
495         final long id = consumer.getId();
496         consumer.setStopped(_stopped);
497         _consumers.put(new Long JavaDoc(id), consumer);
498         return id;
499     }
500
501     /**
502      * Delete the receiver with the specified identity and clean up all
503      * associated resources.
504      *
505      * @param consumerId the consumer identifier
506      * @throws JMSException if the consumer cannot be deleted
507      */

508     public void removeConsumer(long consumerId) throws JMSException JavaDoc {
509         if (_log.isDebugEnabled()) {
510             _log.debug("removeConsumer(consumerId=" + consumerId
511                        + ") [session="
512                        + this + "]");
513         }
514
515         ConsumerEndpoint consumer =
516                 (ConsumerEndpoint) _consumers.remove(new Long JavaDoc(consumerId));
517         if (consumer == null) {
518             throw new JMSException JavaDoc("No consuemr with id=" + consumerId);
519         }
520
521         // destroy the consumer endpoint
522
ConsumerManager.instance().deleteConsumerEndpoint(consumer);
523     }
524
525     /**
526      * Unsubscribe a durable subscription
527      *
528      * @param name the name used to identify the subscription
529      * @throws JMSException for any JMS error
530      */

531     public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
532         if (_log.isDebugEnabled()) {
533             _log.debug("unsubscribe(name=" + name + ") [session=" + this + "]");
534         }
535
536         ConsumerManager manager = ConsumerManager.instance();
537
538         // check that the durable consumer actually exists. If it doesn't then
539
// throw an exception
540
if (!manager.durableConsumerExists(name)) {
541             throw new InvalidDestinationException JavaDoc(
542                     name + " is not a durable subscriber name");
543         }
544
545         // check that the durable consumer is not active before removing it. If
546
// it is then throw an exception
547
if (!manager.isDurableConsumerActive(name)) {
548             manager.removeDurableConsumer(name);
549         } else {
550             throw new JMSException JavaDoc("Failed to unsubscribe subscriber "
551                                    + name + " since is still active");
552         }
553     }
554
555     /**
556      * Start the message delivery for the session.
557      */

558     public void start() {
559         if (_log.isDebugEnabled()) {
560             _log.debug("start() [session=" + this + "]");
561         }
562
563         if (_stopped) {
564             pause(false);
565             _stopped = false;
566         }
567     }
568
569     /**
570      * Stop message delivery for the session
571      */

572     public void stop() {
573         if (_log.isDebugEnabled()) {
574             _log.debug("stop() [session=" + this + "]");
575         }
576         if (!_stopped) {
577             pause(true);
578             _stopped = true;
579         }
580     }
581
582     /**
583      * Set a message listener for the session. This is the channel used to
584      * asynchronously deliver messages to consumers created on this session.
585      *
586      * @param listener the message listener
587      */

588     public void setMessageListener(JmsMessageListener listener) {
589         _listener = listener;
590     }
591
592     /**
593      * Enable or disable asynchronous message delivery for a particular
594      * consumer
595      *
596      * @param consumerId the consumer identifier
597      * @param enable true to enable; false to disable
598      * @throws JMSException for any JMS error
599      */

600     public void enableAsynchronousDelivery(long consumerId, boolean enable)
601             throws JMSException JavaDoc {
602         ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
603         if (consumer == null) {
604             throw new JMSException JavaDoc(consumerId + " is not registered");
605         }
606
607         if (enable) {
608             consumer.setMessageListener(this);
609         } else {
610             consumer.setMessageListener(null);
611         }
612     }
613
614     /**
615      * Close and release any resource allocated to this session.
616      *
617      * @throws JMSException if the session cannot be closed
618      */

619     public void close() throws JMSException JavaDoc {
620         boolean closed = false;
621
622         synchronized (this) {
623             closed = _closed;
624             if (!closed) {
625                 _closed = true;
626             }
627         }
628
629         if (!closed) {
630             if (_log.isDebugEnabled()) {
631                 _log.debug("close() [session=" + this + "]");
632             }
633
634             // reset the listener
635
setMessageListener(null);
636
637             // iterate over the list of consumers and deregister the
638
// associated endpoints and then remove all the entries
639
Iterator JavaDoc consumers = _consumers.values().iterator();
640             while (consumers.hasNext()) {
641                 ConsumerEndpoint consumer = (ConsumerEndpoint) consumers.next();
642                 ConsumerManager.instance().deleteConsumerEndpoint(consumer);
643             }
644
645             // clear the unacked message cache
646
_sentMessageCache.clear();
647
648             // clear the consumers
649
_consumers.clear();
650
651             // de-register the session from the connection
652
_connection.closed(this);
653         } else {
654             if (_log.isDebugEnabled()) {
655                 _log.debug("close() [session=" + this +
656                            "]: session already closed");
657             }
658         }
659     }
660
661     /**
662      * Send the specified message to the client
663      *
664      * @param handle a handle to the message
665      * @throws JMSException if the message can't be resolved from the handle
666      * @throws RemoteException if the message can't be delivered to the client
667      */

668     public void onMessage(MessageHandle handle) throws JMSException JavaDoc,
669             RemoteException JavaDoc {
670         if (_listener != null) {
671             MessageImpl message = handle.getMessage();
672             MessageImpl m = null;
673
674             // get the message. It may be null if it has expired
675
if (message != null) {
676                 try {
677                     m = (MessageImpl) message.clone();
678                 } catch (CloneNotSupportedException JavaDoc exception) {
679                     throw new JMSException JavaDoc(exception.toString());
680                 }
681
682                 m.setConsumerId(handle.getConsumerId());
683                 m.setJMSRedelivered(handle.getDelivered());
684
685                 // if we are acking the message and the session is
686
// transacted and the acknowledge mode is
687
// CLIENT_ACKNOWLEDGE then send it to the cache before
688
// we send it to the listener. This will enable clients
689
// to ack the message while in the onMessage method
690
if (_transacted || (_ackMode == Session.CLIENT_ACKNOWLEDGE)) {
691                     _sentMessageCache.process(handle);
692                 }
693
694                 try {
695                     // send the message to the listener.
696
_listener.onMessage(m);
697
698                     // if the session is not transacted or the acknowledge mode
699
// is not CLIENT_ACKNOWLEDGE then process it through the
700
// sent message cache now.
701
if (!_transacted &&
702                             (_ackMode != Session.CLIENT_ACKNOWLEDGE)) {
703                         _sentMessageCache.process(handle);
704                     }
705                 } catch (RemoteException JavaDoc exception) {
706                     // close all resources and rethrow it
707
close();
708                     throw exception;
709                 }
710             }
711         } else {
712             _log.error("Failed to stop async consumer endpoints?");
713         }
714     }
715
716     /**
717      * Notifies that a message is available for a particular consumer
718      *
719      * @param consumerId the identity of the message consumer
720      * @throws RemoteException if the session can't be notified
721      */

722     public void onMessageAvailable(long consumerId) throws RemoteException JavaDoc {
723         _listener.onMessageAvailable(consumerId);
724     }
725
726     /**
727      * Call recover on all registered consumers. This will cause all
728      * unacknowledged messages to be redelivered. Before we recover we need to
729      * stop messages delivery. We then need to start redelivery when the
730      * recovery has been completed
731      *
732      * @throws JMSException if the session can't be recovered
733      */

734     public void recover() throws JMSException JavaDoc {
735         // stop message delivery
736
stop();
737
738         // clear the messages in the sent message cache
739
_sentMessageCache.clear();
740
741         // restart message delivery
742
start();
743     }
744
745     /**
746      * Commit this session, which will acknowledge all sent messages for all
747      * consumers.
748      *
749      * @throws JMSException - if there are any problems
750      */

751     public void commit() throws JMSException JavaDoc {
752         try {
753             _sentMessageCache.acknowledgeAllMessages();
754         } catch (OutOfMemoryError JavaDoc exception) {
755             String JavaDoc msg =
756                     "Failed to commit transaction due to out-of-memory error";
757             _log.error(msg, exception);
758             throw new JMSException JavaDoc(msg);
759         }
760     }
761
762     /**
763      * Abort, will return all unacked messages to their respective endpoints, if
764      * they are still active.
765      *
766      * @throws JMSException - if there are any problems
767      */

768     public void rollback() throws JMSException JavaDoc {
769         _sentMessageCache.clear();
770     }
771
772     // implementation of XAResource.setTransactionTimeout
773
public void start(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
774         try {
775             ResourceManager.instance().start(xid, flags);
776
777             // set this as the current xid for this session
778
_xid = xid;
779         } catch (ResourceManagerException exception) {
780             throw new XAException JavaDoc("Failed in start " + exception);
781         }
782     }
783
784     // implementation XAResource.isSame
785
public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
786         try {
787             return ResourceManager.instance().prepare(xid);
788         } catch (ResourceManagerException exception) {
789             throw new XAException JavaDoc("Failed in prepare " + exception);
790         }
791     }
792
793     // implementation XAResource.commit
794
public void commit(Xid JavaDoc xid, boolean onePhase) throws XAException JavaDoc {
795         try {
796             ResourceManager.instance().commit(xid, onePhase);
797         } catch (ResourceManagerException exception) {
798             throw new XAException JavaDoc("Failed in commit " + exception);
799         } finally {
800             _xid = null;
801         }
802     }
803
804     // implementation of XAResource.end
805
public void end(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
806         try {
807             ResourceManager.instance().end(xid, flags);
808         } catch (ResourceManagerException exception) {
809             throw new XAException JavaDoc("Failed in end " + exception);
810         } finally {
811             _xid = null;
812         }
813     }
814
815     // implementation of XAResource.forget
816
public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
817         try {
818             ResourceManager.instance().forget(xid);
819         } catch (ResourceManagerException exception) {
820             throw new XAException JavaDoc("Failed in forget " + exception);
821         } finally {
822             _xid = null;
823         }
824     }
825
826     // implementation of XAResource.prepare
827
public Xid JavaDoc[] recover(int flag) throws XAException JavaDoc {
828         try {
829             return ResourceManager.instance().recover(flag);
830         } catch (ResourceManagerException exception) {
831             throw new XAException JavaDoc("Failed in recover " + exception);
832         }
833     }
834
835     // implementation of XAResource.recover
836
public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
837         try {
838             ResourceManager.instance().rollback(xid);
839         } catch (ResourceManagerException exception) {
840             throw new XAException JavaDoc("Failed in rollback " + exception);
841         } finally {
842              // clear the current xid
843
_xid = null;
844         }
845     }
846
847     // implementation of XAResource.getTransactionTimeout
848
public int getTransactionTimeout() throws XAException JavaDoc {
849         try {
850             return ResourceManager.instance().getTransactionTimeout();
851         } catch (ResourceManagerException exception) {
852             throw new XAException JavaDoc("Failed in getTransactionTimeout " +
853                                   exception);
854         }
855     }
856
857     // implementation of XAResource.isSameRM
858
public boolean isSameRM(XAResource JavaDoc xares) throws XAException JavaDoc {
859         return true;
860     }
861
862
863     // implementation of XAResource.rollback
864
public boolean setTransactionTimeout(int seconds) throws XAException JavaDoc {
865         try {
866             return ResourceManager.instance().setTransactionTimeout(seconds);
867         } catch (ResourceManagerException exception) {
868             throw new XAException JavaDoc("Failed in setTransactionTimeout "
869                                   + exception);
870         }
871     }
872
873     /**
874      * Return the xid that is currently associated with this session or null if
875      * this session is currently not part of a global transactions
876      *
877      * @return Xid
878      */

879     public Xid JavaDoc getXid() {
880         return _xid;
881     }
882
883     /**
884      * Return the identity of the {@link ResourceManager}. The transaction
885      * manager should be the only one to initiating this call.
886      *
887      * @return the identity of the resource manager
888      * @throws XAException - if it cannot retrieve the rid.
889      */

890     public String JavaDoc getResourceManagerId() throws XAException JavaDoc {
891         try {
892             return ResourceManager.instance().getResourceManagerId();
893         } catch (ResourceManagerException exception) {
894             throw new XAException JavaDoc("Failed in getResourceManagerId "
895                                   + exception);
896         }
897     }
898
899     /**
900      * Determines if the session is transacted
901      *
902      * @return <code>true</code> if the session is transacted
903      */

904     public boolean isTransacted() {
905         return _transacted;
906     }
907
908     /**
909      * Returns the message acknowledgement mode for the session
910      */

911     public int getAckMode() {
912         return _ackMode;
913     }
914
915     /**
916      * Returns the consumer endpoint given its identifier
917      *
918      * @param consumerId the consumer identifier
919      * @return the consumer endpoint corresponding to <code>consumerId</code>,
920      * or <code>null</code> if none exists
921      */

922     public ConsumerEndpoint getConsumerEndpoint(long consumerId) {
923         return (ConsumerEndpoint) _consumers.get(new Long JavaDoc(consumerId));
924     }
925
926     /**
927      * This method is used to stop and restart the session. Stopping the session
928      * should stop all message delivery to session consumers
929      *
930      * @param stop - true if we need to stop the session, false otherwise
931      */

932     private void pause(boolean stop) {
933         Iterator JavaDoc iter = _consumers.values().iterator();
934         while (iter.hasNext()) {
935             ((ConsumerEndpoint) iter.next()).setStopped(stop);
936         }
937     }
938
939     /**
940      * Check the delivery mode of the message. If the delivery mode is
941      * persistent and the destination is non-administered then change the
942      * delivery mode to non-persistent so that it can be processed correctly by
943      * the server
944      *
945      * @param message - the message to check
946      * @throws JMSException - propagate JMSException to client
947      */

948     private void checkDeliveryMode(MessageImpl message) throws JMSException JavaDoc {
949         if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT)
950                 &&
951                 (!DestinationManager.instance()
952                 .isMessageForAdministeredDestination(message))) {
953             message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
954         }
955     }
956
957 }
958
Popular Tags