KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > SpySession


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq;
23
24 import java.io.Serializable JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.HashSet JavaDoc;
27 import java.util.Iterator JavaDoc;
28 import java.util.LinkedList JavaDoc;
29
30 import javax.jms.BytesMessage JavaDoc;
31 import javax.jms.Destination JavaDoc;
32 import javax.jms.IllegalStateException JavaDoc;
33 import javax.jms.InvalidDestinationException JavaDoc;
34 import javax.jms.JMSException JavaDoc;
35 import javax.jms.JMSSecurityException JavaDoc;
36 import javax.jms.MapMessage JavaDoc;
37 import javax.jms.Message JavaDoc;
38 import javax.jms.MessageConsumer JavaDoc;
39 import javax.jms.MessageListener JavaDoc;
40 import javax.jms.MessageProducer JavaDoc;
41 import javax.jms.ObjectMessage JavaDoc;
42 import javax.jms.Queue JavaDoc;
43 import javax.jms.QueueBrowser JavaDoc;
44 import javax.jms.QueueReceiver JavaDoc;
45 import javax.jms.QueueSender JavaDoc;
46 import javax.jms.Session JavaDoc;
47 import javax.jms.StreamMessage JavaDoc;
48 import javax.jms.TemporaryQueue JavaDoc;
49 import javax.jms.TemporaryTopic JavaDoc;
50 import javax.jms.TextMessage JavaDoc;
51 import javax.jms.Topic JavaDoc;
52 import javax.jms.TopicPublisher JavaDoc;
53 import javax.jms.TopicSubscriber JavaDoc;
54 import javax.jms.XASession JavaDoc;
55 import javax.transaction.xa.XAResource JavaDoc;
56
57 import org.jboss.logging.Logger;
58
59 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
60
61 /**
62  * This class implements javax.jms.Session and javax.jms.XASession
63  *
64  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
65  * @author Hiram Chirino (Cojonudo14@hotmail.com) @created August 16, 2001
66  * @version $Revision: 41667 $
67  */

68 public class SpySession implements Session JavaDoc, XASession JavaDoc
69 {
70    /** The log */
71    static Logger log = Logger.getLogger(SpySession.class);
72
73    /** Whether trace is enabled */
74    static boolean trace = log.isTraceEnabled();
75
76    /** The connection object to which this session is linked */
77    public Connection connection;
78
79    /** Is this session running right now? */
80    public boolean running;
81    /** Is this session transacted ? */
82    protected boolean transacted;
83    /** What is the type of acknowledgement ? */
84    protected int acknowledgeMode;
85    /** MessageConsumers created by this session */
86    protected HashSet JavaDoc consumers;
87    /** MessageProducers created by this session */
88    protected HashSet JavaDoc producers;
89    /** The delivery lock */
90    protected Object JavaDoc deliveryLock = new Object JavaDoc();
91    /** Whether we are doing asynchronous delivery */
92    protected boolean inDelivery = false;
93    
94    /**
95      * This consumer is the consumer that receives messages for the
96      * MessageListener assigned to the session. The SpyConnectionConsumer
97      * delivers messages to
98      */

99    SpyMessageConsumer sessionConsumer;
100
101    /** Is the session closed ? */
102    SynchronizedBoolean closed = new SynchronizedBoolean(false);
103
104    /** Used to lock the run() method */
105    Object JavaDoc runLock = new Object JavaDoc();
106
107    /**
108      * The transctionId of the current transaction (registed with the
109      * SpyXAResourceManager).
110      */

111    private Object JavaDoc currentTransactionId;
112
113    /** If this is an XASession, we have an associated XAResource */
114    SpyXAResource spyXAResource;
115
116    /** Optional Connection consumer methods */
117    LinkedList JavaDoc messages = new LinkedList JavaDoc();
118
119    /** keep track of unacknowledged messages */
120    ArrayList JavaDoc unacknowledgedMessages = new ArrayList JavaDoc();
121
122    /**
123      * Create a new SpySession
124      *
125      * @param conn the connection
126      * @param trans is the session transacted
127      * @param acknowledge the acknowledgement mode
128      * @param xaSession is the session an xa session
129      */

130    SpySession(Connection conn, boolean trans, int acknowledge, boolean xaSession)
131    {
132       trace = log.isTraceEnabled();
133
134       connection = conn;
135       transacted = trans;
136       acknowledgeMode = acknowledge;
137       if (xaSession)
138          spyXAResource = new SpyXAResource(this);
139
140       running = true;
141       consumers = new HashSet JavaDoc();
142       producers = new HashSet JavaDoc();
143
144       //Have a TX ready with the resource manager.
145
if (spyXAResource == null && transacted)
146          currentTransactionId = connection.spyXAResourceManager.startTx();
147
148       if (trace)
149          log.trace("New session " + this);
150    }
151
152    /**
153      * JMS 11.2.21.2 Note that the acknowledge method of Message acknowledges
154      * all messages received on that messages session.
155      *
156      * JMS 11.3.2.2.3 Message.acknowledge method: Clarify that the method
157      * applies to all consumed messages of the session. Rationale for this
158      * change: A possible misinterpretation of the existing Java API
159      * documentation for Message.acknowledge assumed that only messages received
160      * prior to this message should be acknowledged. The updated Java API
161      * documentation statement emphasizes that message acknowledgement is really
162      * a session-level activity and that this message is only being used to
163      * identify the session in order to acknowledge all messages consumed by the
164      * session. The acknowledge method was placed in the message object only to
165      * enable easy access to acknowledgement capability within a message
166      * listeners onMessage method. This change aligns the specification and Java
167      * API documentation to define Message.acknowledge in the same manner.
168      *
169      * @param message the message to acknowledge
170      * @param ack the acknowledgement request
171      * @throws JMSException for any error
172      */

173    public void doAcknowledge(Message JavaDoc message, AcknowledgementRequest ack) throws JMSException JavaDoc
174    {
175       checkClosed();
176       //if we are acking, ack all messages consumed by this session
177
if (ack.isAck())
178       {
179          synchronized (unacknowledgedMessages)
180          {
181             if (trace)
182                log.trace("Acknowledging message " + ack);
183
184             //ack the current message
185
connection.send(((SpyMessage) message).getAcknowledgementRequest(true));
186             unacknowledgedMessages.remove(message);
187
188             //ack the other messages consumed in this session
189
Iterator JavaDoc i = unacknowledgedMessages.iterator();
190             while (i.hasNext())
191             {
192                Message JavaDoc mess = (Message JavaDoc) i.next();
193                i.remove();
194                connection.send(((SpyMessage) mess).getAcknowledgementRequest(true));
195             }
196          }
197       }
198       //if we are nacking, only nack the one message
199
else
200       {
201          if (trace)
202             log.trace("Nacking message " + message.getJMSMessageID());
203
204          //nack the current message
205
unacknowledgedMessages.remove(message);
206          connection.send(ack);
207       }
208    }
209
210    /**
211      * Retrieve the XA resource manager
212      *
213      * @return the resource manager
214      */

215    public SpyXAResourceManager getXAResourceManager()
216    {
217       return connection.spyXAResourceManager;
218    }
219
220    public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc
221    {
222       checkClosed();
223
224       if (trace)
225          log.trace("Set message listener " + listener + " " + this);
226
227       sessionConsumer = new SpyMessageConsumer(this, true);
228       sessionConsumer.setMessageListener(listener);
229    }
230
231    public boolean getTransacted() throws JMSException JavaDoc
232    {
233       checkClosed();
234       return transacted;
235    }
236
237    public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc
238    {
239       checkClosed();
240       if (sessionConsumer == null)
241          return null;
242
243       return sessionConsumer.getMessageListener();
244    }
245
246    public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc
247    {
248       checkClosed();
249       SpyBytesMessage message = MessagePool.getBytesMessage();
250       message.header.producerClientId = connection.getClientID();
251       return message;
252    }
253
254    public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc
255    {
256       checkClosed();
257       SpyMapMessage message = MessagePool.getMapMessage();
258       message.header.producerClientId = connection.getClientID();
259       return message;
260    }
261
262    public Message JavaDoc createMessage() throws JMSException JavaDoc
263    {
264       checkClosed();
265       SpyMessage message = MessagePool.getMessage();
266       message.header.producerClientId = connection.getClientID();
267       return message;
268    }
269
270    public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc
271    {
272       checkClosed();
273       SpyObjectMessage message = MessagePool.getObjectMessage();
274       message.header.producerClientId = connection.getClientID();
275       return message;
276    }
277
278    public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc
279    {
280       checkClosed();
281       SpyObjectMessage message = MessagePool.getObjectMessage();
282       message.setObject(object);
283       message.header.producerClientId = connection.getClientID();
284       return message;
285    }
286
287    public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc
288    {
289       checkClosed();
290       SpyStreamMessage message = MessagePool.getStreamMessage();
291       message.header.producerClientId = connection.getClientID();
292       return message;
293    }
294
295    public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc
296    {
297       checkClosed();
298       SpyTextMessage message = MessagePool.getTextMessage();
299       message.header.producerClientId = connection.getClientID();
300       return message;
301    }
302
303    // Delivers messages queued by ConnectionConsumer to the message listener
304
public void run()
305    {
306       synchronized (messages)
307       {
308          if (trace)
309             log.trace("Run messages=" + messages.size() + " " + this);
310          while (messages.size() > 0)
311          {
312             SpyMessage message = (SpyMessage) messages.removeFirst();
313             try
314             {
315                if (sessionConsumer == null)
316                {
317                   log.warn("Session has no message listener set, cannot process message. " + this);
318                   //Nack message
319
connection.send(message.getAcknowledgementRequest(false));
320                }
321                else
322                {
323                   sessionConsumer.addMessage(message);
324                }
325             }
326             catch (Throwable JavaDoc ignore)
327             {
328                if (trace)
329                   log.trace("Ignored error from session consumer", ignore);
330             }
331          }
332       }
333    }
334
335    public void close() throws JMSException JavaDoc
336    {
337       if (closed.set(true))
338          return;
339
340       if (trace)
341          log.trace("Session closing " + this);
342
343       JMSException JavaDoc exception = null;
344
345       if (trace)
346          log.trace("Closing consumers " + this);
347
348       Iterator JavaDoc i;
349       synchronized (consumers)
350       {
351          //notify the sleeping synchronous listeners
352
if (sessionConsumer != null)
353          {
354             try
355             {
356                sessionConsumer.close();
357             }
358             catch (Throwable JavaDoc t)
359             {
360                log.trace("Error closing session consumer", t);
361             }
362          }
363
364          i = new ArrayList JavaDoc(consumers).iterator();
365       }
366
367       while (i.hasNext())
368       {
369          SpyMessageConsumer messageConsumer = (SpyMessageConsumer) i.next();
370          try
371          {
372             messageConsumer.close();
373          }
374          catch (Throwable JavaDoc t)
375          {
376             log.trace("Error closing message consumer", t);
377          }
378       }
379
380       synchronized (producers)
381       {
382          i = new ArrayList JavaDoc(producers).iterator();
383       }
384
385       while (i.hasNext())
386       {
387          SpyMessageProducer messageProducer = (SpyMessageProducer) i.next();
388          try
389          {
390             messageProducer.close();
391          }
392          catch (InvalidDestinationException JavaDoc ignored)
393          {
394             log.warn(ignored.getMessage(), ignored);
395          }
396          catch (Throwable JavaDoc t)
397          {
398             log.trace("Error closing message producer", t);
399          }
400       }
401
402       if (trace)
403          log.trace("Close handling unacknowledged messages " + this);
404       try
405       {
406          if (spyXAResource == null)
407          {
408             if (transacted)
409                internalRollback();
410             else
411             {
412                i = unacknowledgedMessages.iterator();
413                while (i.hasNext())
414                {
415                   SpyMessage message = (SpyMessage) i.next();
416                   connection.send(message.getAcknowledgementRequest(false));
417                   i.remove();
418                }
419             }
420          }
421       }
422       catch (Throwable JavaDoc t)
423       {
424          if (exception == null)
425             exception = SpyJMSException.getAsJMSException("Error nacking message", t);
426       }
427
428       if (trace)
429          log.trace("Informing connection of close " + this);
430       connection.sessionClosing(this);
431
432       // Throw the first exception
433
if (exception != null)
434          throw exception;
435    }
436
437    //Commit a transacted session
438
public void commit() throws JMSException JavaDoc
439    {
440       checkClosed();
441       trace = log.isTraceEnabled();
442
443       //Don't deliver any more messages while commiting
444
synchronized (runLock)
445       {
446          if (spyXAResource != null)
447             throw new javax.jms.TransactionInProgressException JavaDoc("Should not be call from a XASession");
448          if (!transacted)
449             throw new IllegalStateException JavaDoc("The session is not transacted");
450
451          if (trace)
452             log.trace("Committing transaction " + this);
453          try
454          {
455             connection.spyXAResourceManager.endTx(currentTransactionId, true);
456             connection.spyXAResourceManager.commit(currentTransactionId, true);
457          }
458          catch (Throwable JavaDoc t)
459          {
460             SpyJMSException.rethrowAsJMSException("Could not commit", t);
461          }
462          finally
463          {
464             unacknowledgedMessages.clear();
465             try
466             {
467                currentTransactionId = connection.spyXAResourceManager.startTx();
468
469                if (trace)
470                   log.trace("Current transaction id: " + currentTransactionId + " " + this);
471             }
472             catch (Throwable JavaDoc ignore)
473             {
474                if (trace)
475                   log.trace("Failed to start tx " + this, ignore);
476             }
477          }
478       }
479    }
480
481    public void rollback() throws JMSException JavaDoc
482    {
483       checkClosed();
484       trace = log.isTraceEnabled();
485
486       synchronized (runLock)
487       {
488          internalRollback();
489       }
490    }
491
492    public void recover() throws JMSException JavaDoc
493    {
494       checkClosed();
495       boolean stopped = connection.modeStop;
496       
497       synchronized (runLock)
498       {
499          if (currentTransactionId != null)
500             throw new IllegalStateException JavaDoc("The session is transacted");
501
502          if (trace)
503             log.trace("Session recovery stopping delivery " + this);
504          try
505          {
506             connection.stop();
507             running = false;
508          }
509          catch (Throwable JavaDoc t)
510          {
511             SpyJMSException.rethrowAsJMSException("Could not stop message delivery", t);
512          }
513
514          // Loop over all consumers, check their unacknowledged messages, set
515
// then as redelivered and add back to the list of messages
516
try
517          {
518             synchronized (messages)
519             {
520                if (stopped == false)
521                {
522                   if (trace)
523                      log.trace("Recovering: unacknowledged messages=" + unacknowledgedMessages + " " + this);
524                   Iterator JavaDoc i = consumers.iterator();
525                   while (i.hasNext())
526                   {
527                      SpyMessageConsumer consumer = (SpyMessageConsumer) i.next();
528
529                      Iterator JavaDoc ii = unacknowledgedMessages.iterator();
530                      while (ii.hasNext())
531                      {
532                         SpyMessage message = (SpyMessage) ii.next();
533
534                         if (consumer.getSubscription().accepts(message.header))
535                         {
536                            message.setJMSRedelivered(true);
537                            consumer.messages.addLast(message);
538                            ii.remove();
539                            if (trace)
540                               log.trace("Recovered: message=" + message + " consumer=" + consumer);
541                         }
542                      }
543                   }
544                }
545
546                // We no longer have consumers for the remaining messages
547
Iterator JavaDoc i = unacknowledgedMessages.iterator();
548                while (i.hasNext())
549                {
550                   SpyMessage message = (SpyMessage) i.next();
551                   connection.send(message.getAcknowledgementRequest(false));
552                   i.remove();
553                   if (trace)
554                      log.trace("Recovered: nacked with no consumer message=" + message + " " + this);
555                }
556             }
557          }
558          catch (Throwable JavaDoc t)
559          {
560             SpyJMSException.rethrowAsJMSException("Unable to recover session ", t);
561          }
562          // Restart the delivery sequence including all unacknowledged messages
563
// that had
564
// been previously delivered. Redelivered messages do not have to be
565
// delivered
566
// in exactly their original delivery order.
567

568          if (stopped == false)
569          {
570             if (trace)
571                log.trace("Recovery restarting message delivery " + this);
572             try
573             {
574                running = true;
575                connection.start();
576
577                Iterator JavaDoc i = consumers.iterator();
578                while (i.hasNext())
579                   ((SpyMessageConsumer) i.next()).restartProcessing();
580             }
581             catch (Throwable JavaDoc t)
582             {
583                SpyJMSException.rethrowAsJMSException("Could not resume message delivery", t);
584             }
585          }
586       }
587    }
588
589    public TextMessage JavaDoc createTextMessage(String JavaDoc string) throws JMSException JavaDoc
590    {
591       checkClosed();
592       SpyTextMessage message = new SpyTextMessage();
593       message.setText(string);
594       message.header.producerClientId = connection.getClientID();
595       return message;
596    }
597
598    public int getAcknowledgeMode() throws JMSException JavaDoc
599    {
600       return acknowledgeMode;
601    }
602
603    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc
604    {
605       return createConsumer(destination, null, false);
606    }
607
608    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector) throws JMSException JavaDoc
609    {
610       return createConsumer(destination, messageSelector, false);
611    }
612
613    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector, boolean noLocal)
614          throws JMSException JavaDoc
615    {
616       if (destination instanceof Topic JavaDoc)
617          return createSubscriber((Topic JavaDoc) destination, messageSelector, noLocal);
618       else
619          return createReceiver((Queue JavaDoc) destination, messageSelector);
620    }
621
622    public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc
623    {
624       if (destination instanceof Topic JavaDoc)
625          return createPublisher((Topic JavaDoc) destination);
626       else
627          return createSender((Queue JavaDoc) destination);
628    }
629
630    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc
631    {
632       return createBrowser(queue, null);
633    }
634
635    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc
636    {
637       checkClosed();
638       if (this instanceof SpyTopicSession)
639          throw new IllegalStateException JavaDoc("Not allowed for a TopicSession");
640       if (queue == null)
641          throw new InvalidDestinationException JavaDoc("Cannot browse a null queue.");
642       return new SpyQueueBrowser(this, queue, messageSelector);
643    }
644
645    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue) throws JMSException JavaDoc
646    {
647       return createReceiver(queue, null);
648    }
649
650    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc
651    {
652       checkClosed();
653       if (queue == null)
654          throw new InvalidDestinationException JavaDoc("Queue cannot be null.");
655
656       connection.checkTemporary(queue);
657       SpyQueueReceiver receiver = new SpyQueueReceiver(this, queue, messageSelector);
658       addConsumer(receiver);
659
660       return receiver;
661    }
662
663    public QueueSender JavaDoc createSender(Queue JavaDoc queue) throws JMSException JavaDoc
664    {
665       checkClosed();
666       SpyQueueSender producer = new SpyQueueSender(this, queue);
667       addProducer(producer);
668       return producer;
669    }
670
671    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name) throws JMSException JavaDoc
672    {
673       return createDurableSubscriber(topic, name, null, false);
674    }
675
676    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name, String JavaDoc messageSelector, boolean noLocal)
677          throws JMSException JavaDoc
678    {
679       checkClosed();
680       if (this instanceof SpyQueueSession)
681          throw new IllegalStateException JavaDoc("Not allowed for a QueueSession");
682       if (topic == null)
683          throw new InvalidDestinationException JavaDoc("Topic cannot be null");
684       if (topic instanceof TemporaryTopic JavaDoc)
685          throw new InvalidDestinationException JavaDoc("Attempt to create a durable subscription for a temporary topic");
686
687       if (name == null || name.trim().length() == 0)
688          throw new JMSException JavaDoc("Null or empty subscription");
689
690       SpyTopic t = new SpyTopic((SpyTopic) topic, connection.getClientID(), name, messageSelector);
691       SpyTopicSubscriber sub = new SpyTopicSubscriber(this, t, noLocal, messageSelector);
692       addConsumer(sub);
693
694       return sub;
695    }
696
697    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic) throws JMSException JavaDoc
698    {
699       return createSubscriber(topic, null, false);
700    }
701
702    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc
703    {
704       checkClosed();
705       if (topic == null)
706          throw new InvalidDestinationException JavaDoc("Topic cannot be null");
707
708       connection.checkTemporary(topic);
709       SpyTopicSubscriber sub = new SpyTopicSubscriber(this, (SpyTopic) topic, noLocal, messageSelector);
710       addConsumer(sub);
711
712       return sub;
713    }
714
715    public TopicPublisher JavaDoc createPublisher(Topic JavaDoc topic) throws JMSException JavaDoc
716    {
717       checkClosed();
718       SpyTopicPublisher producer = new SpyTopicPublisher(this, topic);
719       addProducer(producer);
720       return producer;
721    }
722
723    public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc
724    {
725       checkClosed();
726       if (this instanceof SpyTopicSession)
727          throw new IllegalStateException JavaDoc("Not allowed for a TopicSession");
728       if (queueName == null)
729          throw new InvalidDestinationException JavaDoc("Queue name cannot be null.");
730       return ((SpyConnection) connection).createQueue(queueName);
731    }
732
733    public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc
734    {
735       checkClosed();
736       if (this instanceof SpyQueueSession)
737          throw new IllegalStateException JavaDoc("Not allowed for a QueueSession");
738       if (topicName == null)
739          throw new InvalidDestinationException JavaDoc("The topic name cannot be null");
740
741       return ((SpyConnection) connection).createTopic(topicName);
742    }
743
744    public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc
745    {
746       checkClosed();
747       if (this instanceof SpyTopicSession)
748          throw new IllegalStateException JavaDoc("Not allowed for a TopicSession");
749
750       return ((SpyConnection) connection).getTemporaryQueue();
751    }
752
753    public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc
754    {
755       checkClosed();
756       if (this instanceof SpyQueueSession)
757          throw new IllegalStateException JavaDoc("Not allowed for a QueueSession");
758       return ((SpyConnection) connection).getTemporaryTopic();
759    }
760
761    public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc
762    {
763       checkClosed();
764       if (this instanceof SpyQueueSession)
765          throw new IllegalStateException JavaDoc("Not allowed for a QueueSession");
766
767       // @todo Not yet implemented
768
DurableSubscriptionID id = new DurableSubscriptionID(connection.getClientID(), name, null);
769       connection.unsubscribe(id);
770    }
771    
772    public XAResource JavaDoc getXAResource()
773    {
774       return spyXAResource;
775    }
776    
777    public Session JavaDoc getSession() throws JMSException JavaDoc
778    {
779       checkClosed();
780       return this;
781    }
782
783    public String JavaDoc toString()
784    {
785       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(100);
786       buffer.append("SpySession@").append(System.identityHashCode(this));
787       buffer.append('[');
788       buffer.append("tx=").append(transacted);
789       if (transacted == false)
790       {
791          if (acknowledgeMode == AUTO_ACKNOWLEDGE)
792             buffer.append(" ack=").append("AUTO");
793          else if (acknowledgeMode == CLIENT_ACKNOWLEDGE)
794             buffer.append(" ack=").append("CLIENT");
795          else if (acknowledgeMode == DUPS_OK_ACKNOWLEDGE)
796             buffer.append(" ack=").append("DUPSOK");
797       }
798       buffer.append(" txid=" + currentTransactionId);
799       if (spyXAResource != null)
800          buffer.append(" XA");
801       if (running)
802          buffer.append(" RUNNING");
803       if (closed.get())
804          buffer.append(" CLOSED");
805       buffer.append(" connection=").append(connection);
806       buffer.append(']');
807       return buffer.toString();
808    }
809
810    /**
811      * Set the session's transaction id
812      *
813      * @param xid the transaction id
814      */

815    void setCurrentTransactionId(final Object JavaDoc xid)
816    {
817       if (xid == null)
818          throw new org.jboss.util.NullArgumentException("xid");
819
820       if (trace)
821          log.trace("Setting current tx xid=" + xid + " previous: " + currentTransactionId + " " + this);
822
823       this.currentTransactionId = xid;
824    }
825
826    /**
827      * Remove the session's transaction id
828      *
829      * @param xid the transaction id
830      */

831    void unsetCurrentTransactionId(final Object JavaDoc xid)
832    {
833       if (xid == null)
834          throw new org.jboss.util.NullArgumentException("xid");
835
836       if (trace)
837          log.trace("Unsetting current tx xid=" + xid + " previous: " + currentTransactionId + " " + this);
838
839       // Don't unset the xid if it has previously been suspended
840
// The session could have been recycled
841
if (xid.equals(currentTransactionId))
842          this.currentTransactionId = null;
843    }
844
845    /**
846      * Get the session's transaction id
847      *
848      * @param xid the transaction id
849      */

850    Object JavaDoc getCurrentTransactionId()
851    {
852       return currentTransactionId;
853    }
854
855    /**
856      * Get a new message
857      *
858      * @return the new message id
859      * @throws JMSException for any error
860      */

861    String JavaDoc getNewMessageID() throws JMSException JavaDoc
862    {
863       checkClosed();
864       return connection.getNewMessageID();
865    }
866
867    /**
868      * Add a message tot the session
869      *
870      * @param message the message
871      */

872    void addMessage(SpyMessage message)
873    {
874       synchronized (messages)
875       {
876          if (trace)
877             log.trace("Add message msgid=" + message.header.jmsMessageID + " " + this);
878          messages.addLast(message);
879       }
880    }
881
882    /**
883      * Add an unacknowledged message
884      *
885      * @param message the message
886      */

887    void addUnacknowlegedMessage(SpyMessage message)
888    {
889       if (!transacted)
890       {
891          synchronized (unacknowledgedMessages)
892          {
893             if (trace)
894                log.trace("Add unacked message msgid=" + message.header.jmsMessageID + " " + this);
895
896             unacknowledgedMessages.add(message);
897          }
898       }
899    }
900
901    /**
902      * Send a message
903      *
904      * @param m the message
905      * @throws JMSException for any error
906      */

907    void sendMessage(SpyMessage m) throws JMSException JavaDoc
908    {
909       checkClosed();
910
911       // Make sure the message has the correct client id
912
m.header.producerClientId = connection.getClientID();
913
914       if (transacted)
915       {
916          if (trace)
917             log.trace("Adding message to transaction " + m.header.jmsMessageID + " " + this);
918          connection.spyXAResourceManager.addMessage(currentTransactionId, m.myClone());
919       }
920       else
921       {
922          if (trace)
923             log.trace("Sending message to server " + m.header.jmsMessageID + " " + this);
924          connection.sendToServer(m);
925       }
926    }
927
928    /**
929      * Add a consumer
930      *
931      * @param who the consumer
932      * @throws JMSException for any error
933      */

934    void addConsumer(SpyMessageConsumer who) throws JMSException JavaDoc
935    {
936       checkClosed();
937
938       synchronized (consumers)
939       {
940          if (trace)
941             log.trace("Adding consumer " + who);
942
943          consumers.add(who);
944       }
945       try
946       {
947          connection.addConsumer(who);
948       }
949       catch (JMSSecurityException JavaDoc ex)
950       {
951          removeConsumerInternal(who);
952          throw ex;
953       }
954       catch (Throwable JavaDoc t)
955       {
956          SpyJMSException.rethrowAsJMSException("Error adding consumer", t);
957       }
958    }
959
960    /**
961      * Remove a consumer
962      *
963      * @param who the consumer
964      * @throws JMSException for any error
965      */

966    void removeConsumer(SpyMessageConsumer who) throws JMSException JavaDoc
967    {
968       connection.removeConsumer(who);
969       removeConsumerInternal(who);
970    }
971
972    /**
973      * Add a producer
974      *
975      * @param who the producer
976      * @throws JMSException for any error
977      */

978    void addProducer(SpyMessageProducer who) throws JMSException JavaDoc
979    {
980       checkClosed();
981
982       synchronized (producers)
983       {
984          if (trace)
985             log.trace("Adding producer " + who);
986
987          producers.add(who);
988       }
989    }
990
991    /**
992      * Remove a producer
993      *
994      * @param who the producer
995      * @throws JMSException for any error
996      */

997    void removeProducer(SpyMessageProducer who) throws JMSException JavaDoc
998    {
999       removeProducerInternal(who);
1000   }
1001
1002   /**
1003    * Try to lock the session for asynchronous delivery
1004    *
1005    * @return true when the lock was obtained
1006    */

1007   boolean tryDeliveryLock()
1008   {
1009      synchronized (deliveryLock)
1010      {
1011         if (inDelivery)
1012         {
1013            try
1014            {
1015               deliveryLock.wait();
1016            }
1017            catch (InterruptedException JavaDoc e)
1018            {
1019               log.trace("Ignored interruption waiting for delivery lock");
1020            }
1021         }
1022         // We got the lock
1023
if (inDelivery == false)
1024         {
1025            inDelivery = true;
1026            return true;
1027         }
1028      }
1029      return false;
1030   }
1031
1032   /**
1033    * Release the delivery lock
1034    */

1035   void releaseDeliveryLock()
1036   {
1037      synchronized (deliveryLock)
1038      {
1039         inDelivery = false;
1040         deliveryLock.notifyAll();
1041      }
1042   }
1043
1044   /**
1045    * Interrupt threads waiting for the delivery lock
1046    */

1047   void interruptDeliveryLockWaiters()
1048   {
1049      synchronized (deliveryLock)
1050      {
1051         deliveryLock.notifyAll();
1052      }
1053   }
1054   
1055   /**
1056    * Invoked to notify of asynchronous failure
1057    *
1058    * @param message the message
1059    * @param t the throwable
1060    */

1061   void asynchFailure(String JavaDoc message, Throwable JavaDoc t)
1062   {
1063      connection.asynchFailure(message, t);
1064   }
1065
1066   /**
1067     * Rollback a transaction
1068     *
1069     * @throws JMSException for any error
1070     */

1071   private void internalRollback() throws JMSException JavaDoc
1072   {
1073      synchronized (runLock)
1074      {
1075         if (spyXAResource != null)
1076            throw new javax.jms.TransactionInProgressException JavaDoc("Should not be call from a XASession");
1077         if (!transacted)
1078            throw new IllegalStateException JavaDoc("The session is not transacted");
1079
1080         if (trace)
1081            log.trace("Rollback transaction " + this);
1082         try
1083         {
1084            connection.spyXAResourceManager.endTx(currentTransactionId, true);
1085            connection.spyXAResourceManager.rollback(currentTransactionId);
1086         }
1087         catch (Throwable JavaDoc t)
1088         {
1089            SpyJMSException.rethrowAsJMSException("Could not rollback", t);
1090         }
1091         finally
1092         {
1093            unacknowledgedMessages.clear();
1094            try
1095            {
1096               currentTransactionId = connection.spyXAResourceManager.startTx();
1097               if (trace)
1098                  log.trace("Current transaction id: " + currentTransactionId + " " + this);
1099            }
1100            catch (Throwable JavaDoc ignore)
1101            {
1102               if (trace)
1103                  log.trace("Failed to start tx " + this, ignore);
1104            }
1105         }
1106      }
1107   }
1108
1109   /**
1110     * Remove a consumer
1111     *
1112     * @param who the consumer
1113     */

1114   private void removeConsumerInternal(SpyMessageConsumer who)
1115   {
1116      synchronized (consumers)
1117      {
1118         if (trace)
1119            log.trace("Remove consumer " + who);
1120
1121         consumers.remove(who);
1122      }
1123   }
1124
1125   /**
1126     * Remove a producer
1127     *
1128     * @param who the producer
1129     */

1130   private void removeProducerInternal(SpyMessageProducer who)
1131   {
1132      synchronized (producers)
1133      {
1134         if (trace)
1135            log.trace("Remove producer " + who);
1136
1137         producers.remove(who);
1138      }
1139   }
1140   
1141   /**
1142    * Check whether we are closed
1143    *
1144    * @throws IllegalStateException when the session is closed
1145    */

1146   private void checkClosed() throws IllegalStateException JavaDoc
1147   {
1148      if (closed.get())
1149         throw new IllegalStateException JavaDoc("The session is closed");
1150   }
1151}
1152
Popular Tags