KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > SpyMessageConsumer


1 /*
2  * JBoss, Home of Professional Open Source.
3  * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4  * as indicated by the @author tags. See the copyright.txt file in the
5  * distribution for a full listing of individual contributors.
6  *
7  * This is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as
9  * published by the Free Software Foundation; either version 2.1 of
10  * the License, or (at your option) any later version.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this software; if not, write to the Free
19  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21  */

22 package org.jboss.mq;
23
24 import java.util.LinkedList JavaDoc;
25
26 import javax.jms.Destination JavaDoc;
27 import javax.jms.IllegalStateException JavaDoc;
28 import javax.jms.InvalidSelectorException JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageConsumer JavaDoc;
32 import javax.jms.MessageListener JavaDoc;
33 import javax.jms.Session JavaDoc;
34
35 import org.jboss.logging.Logger;
36 import org.jboss.util.UnreachableStatementException;
37
38 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
39
40 /**
41  * This class implements <tt>javax.jms.MessageConsumer</tt>.
42  *
43  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
44  * @author Hiram Chirino (Cojonudo14@hotmail.com)
45  * @author David Maplesden (David.Maplesden@orion.co.nz)
46  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
47  * @version $Revision: 58404 $
48  */

49 public class SpyMessageConsumer implements MessageConsumer JavaDoc, SpyConsumer, Runnable JavaDoc
50 {
51    /** The log */
52    static Logger log = Logger.getLogger(SpyMessageConsumer.class);
53
54    /** Is trace enabled */
55    static boolean trace = log.isTraceEnabled();
56
57    /** Delivered once */
58    static final Integer JavaDoc ONCE = new Integer JavaDoc(1);
59    
60    /** Link to my session */
61    public SpySession session;
62    /** The subscription structure should be fill out by the descendent */
63    public Subscription subscription = new Subscription();
64    /** Are we closed ? */
65    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
66    /** The state lock */
67    protected Object JavaDoc stateLock = new Object JavaDoc();
68    /** Are we receiving a message */
69    protected boolean receiving = false;
70    /** Are we waiting for a message */
71    protected boolean waitingForMessage = false;
72    /** Are we listening */
73    protected boolean listening = false;
74    /** The listener thread */
75    protected Thread JavaDoc listenerThread = null;
76    /** My message listener (null if none) */
77    MessageListener JavaDoc messageListener;
78    /** List of Pending messages (not yet delivered) */
79    LinkedList JavaDoc messages;
80    /** Is this a session consumer? */
81    boolean sessionConsumer;
82
83    /**
84      * Create a new SpyMessageConsumer
85      *
86      * @param s the session
87      * @param sessionConsumer true for a session consumer, false otherwise
88      */

89    SpyMessageConsumer(SpySession s, boolean sessionConsumer)
90    {
91       trace = log.isTraceEnabled();
92
93       session = s;
94       this.sessionConsumer = sessionConsumer;
95       messageListener = null;
96       messages = new LinkedList JavaDoc();
97
98       if (trace)
99          log.trace("New message consumer " + this);
100    }
101
102    /**
103     * Create a new SpyMessageConsumer
104     *
105     * @param s the session
106     * @param sessionConsumer true for a session consumer, false otherwise
107     * @param destination the destination
108     * @param selector the selector
109     * @param noLocal true for noLocal, false otherwise
110     */

111    SpyMessageConsumer(SpySession s, boolean sessionConsumer, SpyDestination destination, String JavaDoc selector, boolean noLocal) throws InvalidSelectorException JavaDoc
112    {
113       trace = log.isTraceEnabled();
114
115       session = s;
116       this.sessionConsumer = sessionConsumer;
117       subscription.destination = destination;
118       subscription.messageSelector = selector;
119       subscription.noLocal = noLocal;
120
121       // If the selector is set, try to build it, throws an
122
// InvalidSelectorException
123
// if it is not valid.
124
if (subscription.messageSelector != null)
125          subscription.getSelector();
126       
127       messageListener = null;
128       messages = new LinkedList JavaDoc();
129
130       if (trace)
131          log.trace("New message consumer " + this);
132    }
133
134    /**
135     * Get the subscription
136     *
137     * @return the subscription
138     */

139    public Subscription getSubscription()
140    {
141       return subscription;
142    }
143
144    /**
145     * Add a message
146     *
147     * @param message the message to add
148     * @throws JMSException for any error
149     */

150    public void addMessage(SpyMessage message) throws JMSException JavaDoc
151    {
152       if (isClosed())
153       {
154          if (trace)
155             log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
156                       " The message consumer was closed. " + this);
157          session.connection.send(message.getAcknowledgementRequest(false));
158          return;
159       }
160
161       //Add a message to the queue
162

163       // Consider removing this test (subscription.accepts). I don't think it
164
// will ever fail
165
// because the test is also done by the server before message is even
166
// sent.
167
if (subscription.accepts(message.header))
168       {
169          if (sessionConsumer)
170             sessionConsumerProcessMessage(message);
171          else
172          {
173             synchronized (messages)
174             {
175                if (waitingForMessage)
176                {
177                   if (trace)
178                      log.trace("Adding message=" + message.header.jmsMessageID + " " + this);
179                   messages.addLast(message);
180                   messages.notifyAll();
181                }
182                else
183                {
184                   //unwanted message (due to consumer receive timing out) Nack
185
// it.
186
if (trace)
187                      log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
188                            " The message consumer was not waiting for a message. " + this);
189                   session.connection.send(message.getAcknowledgementRequest(false));
190                }
191             }
192          }
193       }
194       else
195       {
196          if (trace)
197             log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
198                   " The subscription did not accept the message. " + this);
199          session.connection.send(message.getAcknowledgementRequest(false));
200       }
201    }
202
203    /**
204      * Restarts the processing of the messages in case of a recovery
205      */

206    public void restartProcessing()
207    {
208       synchronized (messages)
209       {
210          if (trace)
211             log.trace("Restarting processing " + this);
212          messages.notifyAll();
213       }
214    }
215
216    public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc
217    {
218       checkClosed();
219
220       synchronized (stateLock)
221       {
222          if (receiving)
223             throw new JMSException JavaDoc("Another thread is already in receive.");
224
225          if (trace)
226             log.trace("Set message listener=" + listener + " old listener=" + messageListener + " " + this);
227
228          boolean oldListening = listening;
229          listening = (listener != null);
230          messageListener = listener;
231
232          if (!sessionConsumer && listening && !oldListening)
233          {
234             //Start listener thread (if one is not already running)
235
if (listenerThread == null)
236             {
237                listenerThread = new Thread JavaDoc(this, "MessageListenerThread - " + subscription.destination.getName());
238                listenerThread.start();
239             }
240          }
241       }
242    }
243
244    public String JavaDoc getMessageSelector() throws JMSException JavaDoc
245    {
246       checkClosed();
247       return subscription.messageSelector;
248    }
249
250    public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc
251    {
252       checkClosed();
253       return messageListener;
254    }
255
256    public Message JavaDoc receive() throws JMSException JavaDoc
257    {
258       checkClosed();
259       synchronized (stateLock)
260       {
261          if (receiving)
262             throw new JMSException JavaDoc("Another thread is already in receive.");
263          if (listening)
264             throw new JMSException JavaDoc("A message listener is already registered");
265          receiving = true;
266          
267          if (trace)
268             log.trace("receive() " + this);
269       }
270
271       try
272       {
273          synchronized (messages)
274          {
275             //see if we have any undelivered messages before we go to the JMS
276
//server to look.
277
Message JavaDoc message = getMessage();
278             if (message != null)
279             {
280                if (trace)
281                   log.trace("receive() message in list " + message.getJMSMessageID() + " " + this);
282                return message;
283             }
284             
285             // Loop through expired messages
286
while (true)
287             {
288                SpyMessage msg = session.connection.receive(subscription, 0);
289                if (msg != null)
290                {
291                   Message JavaDoc mes = preProcessMessage(msg);
292                   if (mes != null)
293                   {
294                      if (trace)
295                         log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this);
296                      return mes;
297                   }
298                }
299                else
300                   break;
301             }
302
303             if (trace)
304                log.trace("No message in receive(), waiting " + this);
305             
306             try
307             {
308                waitingForMessage = true;
309                while (true)
310                {
311                   if (isClosed())
312                   {
313                      if (trace)
314                         log.trace("Consumer closed in receive() " + this);
315                      return null;
316                   }
317                   Message JavaDoc mes = getMessage();
318                   if (mes != null)
319                   {
320                      if (trace)
321                         log.trace("receive() message from list after wait " + this);
322                      return mes;
323                   }
324                   messages.wait();
325                }
326             }
327             catch (Throwable JavaDoc t)
328             {
329                SpyJMSException.rethrowAsJMSException("Receive interupted", t);
330                throw new UnreachableStatementException();
331             }
332             finally
333             {
334                waitingForMessage = false;
335             }
336          }
337       }
338       finally
339       {
340          synchronized (stateLock)
341          {
342             receiving = false;
343          }
344       }
345    }
346
347    public Message JavaDoc receive(long timeOut) throws JMSException JavaDoc
348    {
349       if (timeOut == 0)
350       {
351          if (trace)
352             log.trace("Timeout is zero in receive(long) using receive() " + this);
353          return receive();
354       }
355
356       checkClosed();
357       synchronized (stateLock)
358       {
359          if (receiving)
360             throw new JMSException JavaDoc("Another thread is already in receive.");
361          if (listening)
362             throw new JMSException JavaDoc("A message listener is already registered");
363          receiving = true;
364          
365          if (trace)
366             log.trace("receive(long) " + this);
367       }
368
369       long endTime = System.currentTimeMillis() + timeOut;
370       
371       if (trace)
372          log.trace("receive(long) endTime=" + endTime + " " + this);
373       
374       try
375       {
376          synchronized (messages)
377          {
378             //see if we have any undelivered messages before we go to the JMS
379
//server to look.
380
Message JavaDoc message = getMessage();
381             if (message != null)
382             {
383                if (trace)
384                   log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this);
385                return message;
386             }
387             // Loop through expired messages
388
while (true)
389             {
390                SpyMessage msg = session.connection.receive(subscription, timeOut);
391                if (msg != null)
392                {
393                   Message JavaDoc mes = preProcessMessage(msg);
394                   if (mes != null)
395                   {
396                      if (trace)
397                         log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this);
398                      return mes;
399                   }
400                }
401                else
402                   break;
403             }
404
405             if (trace)
406                log.trace("No message in receive(), waiting " + this);
407             
408             try
409             {
410                waitingForMessage = true;
411                while (true)
412                {
413                   if (isClosed())
414                   {
415                      if (trace)
416                         log.trace("Consumer closed in receive(long) " + this);
417                      return null;
418                   }
419
420                   Message JavaDoc mes = getMessage();
421                   if (mes != null)
422                   {
423                      if (trace)
424                         log.trace("receive(long) message from list after wait " + this);
425                      return mes;
426                   }
427
428                   long att = endTime - System.currentTimeMillis();
429                   if (att <= 0)
430                   {
431                      if (trace)
432                         log.trace("receive(long) timed out endTime=" + endTime + " " + this);
433                      return null;
434                   }
435
436                   messages.wait(att);
437                }
438             }
439             catch (Throwable JavaDoc t)
440             {
441                SpyJMSException.rethrowAsJMSException("Receive interupted", t);
442                throw new UnreachableStatementException();
443             }
444             finally
445             {
446                waitingForMessage = false;
447             }
448          }
449       }
450       finally
451       {
452          synchronized (stateLock)
453          {
454             receiving = false;
455          }
456       }
457    }
458
459    public Message JavaDoc receiveNoWait() throws JMSException JavaDoc
460    {
461       checkClosed();
462       synchronized (stateLock)
463       {
464          if (receiving)
465             throw new JMSException JavaDoc("Another thread is already in receive.");
466          if (listening)
467             throw new JMSException JavaDoc("A message listener is already registered");
468          receiving = true;
469          
470          if (trace)
471             log.trace("receiveNoWait() " + this);
472       }
473
474       try
475       {
476          //see if we have any undelivered messages before we go to the JMS
477
//server to look.
478
synchronized (messages)
479          {
480             Message JavaDoc mes = getMessage();
481             if (mes != null)
482             {
483                if (trace)
484                   log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this);
485                return mes;
486             }
487          }
488          // Loop through expired messages
489
while (true)
490          {
491             SpyMessage msg = session.connection.receive(subscription, -1);
492             if (msg != null)
493             {
494                Message JavaDoc mes = preProcessMessage(msg);
495                if (mes != null)
496                {
497                   if (trace)
498                      log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this);
499                   return mes;
500                }
501             }
502             else
503             {
504                if (trace)
505                   log.trace("receiveNoWait() no message " + this);
506                return null;
507             }
508          }
509       }
510       finally
511       {
512          synchronized (stateLock)
513          {
514             receiving = false;
515          }
516       }
517    }
518
519    public void close() throws JMSException JavaDoc
520    {
521       synchronized (messages)
522       {
523          if (closed.set(true))
524             return;
525
526          if (trace)
527             log.trace("Message consumer closing. " + this);
528          messages.notifyAll();
529       }
530       
531       // Notification to break out of delivery lock loop
532
session.interruptDeliveryLockWaiters();
533
534       if (listenerThread != null && !Thread.currentThread().equals(listenerThread))
535       {
536          try
537          {
538             if (trace)
539                log.trace("Joining listener thread. " + this);
540             listenerThread.join();
541          }
542          catch (InterruptedException JavaDoc e)
543          {
544          }
545       }
546
547       if (!sessionConsumer)
548       {
549          session.removeConsumer(this);
550       }
551
552       if (trace)
553          log.trace("Closed. " + this);
554    }
555
556    public void run()
557    {
558       SpyMessage mes = null;
559       try
560       {
561          outer : while (true)
562          {
563             //get Message
564
while (mes == null)
565             {
566                synchronized (messages)
567                {
568                   if (isClosed())
569                   {
570                      waitingForMessage = false;
571                      if (trace)
572                         log.trace("Consumer closed in run() " + this);
573                      break outer;
574                   }
575                   if (messages.isEmpty())
576                      mes = session.connection.receive(subscription, 0);
577                   if (mes == null)
578                   {
579                      waitingForMessage = true;
580                      if (trace)
581                         log.trace("waiting in run() " + this);
582                      while ((messages.isEmpty() && isClosed() == false) || (!session.running))
583                      {
584                         try
585                         {
586                            messages.wait();
587                         }
588                         catch (InterruptedException JavaDoc e)
589                         {
590                            log.trace("Ignored interruption waiting for messages");
591                         }
592                      }
593                      if (isClosed())
594                      {
595                         waitingForMessage = false;
596                         if (trace)
597                            log.trace("Consumer closed while waiting in run() " + this);
598                         break outer;
599                      }
600                      mes = (SpyMessage) messages.removeFirst();
601                      waitingForMessage = false;
602                   }
603                   else
604                   {
605                      if (trace)
606                         log.trace("run() message from server mes=" + mes.getJMSMessageID() + " " + this);
607                   }
608                }
609                mes.session = session;
610             }
611
612             MessageListener JavaDoc thisListener;
613             synchronized (stateLock)
614             {
615                if (!isListening())
616                {
617                   //send NACK cause we have closed listener
618
if (mes != null)
619                   {
620                      if (trace)
621                         log.trace("run() nacking not listening message mes=" + mes.getJMSMessageID() + " " + this);
622                      session.connection.send(mes.getAcknowledgementRequest(false));
623                   }
624                   //this thread is about to die, so we will need a new one if
625
// a new listener is added
626
listenerThread = null;
627                   mes = null;
628                   break;
629                }
630                thisListener = messageListener;
631             }
632             Message JavaDoc message = mes;
633             if (mes instanceof SpyEncapsulatedMessage)
634                message = ((SpyEncapsulatedMessage) mes).getMessage();
635
636             // Try to obtain the session delivery lock
637
// This avoids concurrent delivery to message listeners in the same session as per spec
638
boolean gotDeliveryLock = false;
639             while (gotDeliveryLock == false)
640             {
641                gotDeliveryLock = session.tryDeliveryLock();
642                // We didn't get the lock, check whether we are closing
643
if (gotDeliveryLock == false)
644                {
645                   synchronized (messages)
646                   {
647                      if (isClosed())
648                         break;
649                   }
650                }
651             }
652             if (gotDeliveryLock == false)
653             {
654                if (trace)
655                   log.trace("run() nacking didn't get delivery lock mes=" + mes.getJMSMessageID() + " " + this);
656                session.connection.send(mes.getAcknowledgementRequest(false));
657             }
658             else
659             {
660                //Handle runtime exceptions. These are handled as per the spec if
661
// you assume
662
//the number of times erroneous messages are redelivered in
663
// auto_acknowledge mode
664
//is 0. :)
665
try
666                {
667                   if (session.transacted)
668                   {
669                      // REVIEW: for an XASession without a transaction this will ack the message
670
// before it has been processed. Plain message listeners
671
// are not supported in a j2ee environment, but what if somebody is trying
672
// to be clever?
673
if (trace)
674                         log.trace("run() acknowledging message in tx mes=" + mes.getJMSMessageID() + " " + this);
675                      session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), mes);
676                   }
677
678                   try
679                   {
680                      prepareDelivery((SpyMessage) message);
681                      session.addUnacknowlegedMessage((SpyMessage) message);
682                      thisListener.onMessage(message);
683                   }
684                   catch (Throwable JavaDoc t)
685                   {
686                      log.warn("Message listener " + thisListener + " threw a throwable.", t);
687                   }
688                }
689                finally
690                {
691                   session.releaseDeliveryLock();
692                }
693
694                if (!session.transacted
695                      && (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE))
696                {
697                   // Only acknowledge the message if the message wasn't recovered
698
boolean recovered;
699                   synchronized (messages)
700                   {
701                      recovered = messages.contains(message);
702                   }
703                   if (recovered == false)
704                      mes.doAcknowledge();
705                }
706                mes = null;
707             }
708          }
709       }
710       catch (Throwable JavaDoc t)
711       {
712          log.warn("Message consumer closing due to error in listening thread.", t);
713          try
714          {
715             close();
716          }
717          catch (Throwable JavaDoc ignore)
718          {
719          }
720          session.asynchFailure("Message consumer closing due to error in listening thread.", t);
721       }
722    }
723
724    public String JavaDoc toString()
725    {
726       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(100);
727       buffer.append("SpyMessageConsumer@").append(System.identityHashCode(this));
728       buffer.append("[sub=").append(subscription);
729       if (isClosed())
730          buffer.append(" CLOSED");
731       buffer.append(" listening=").append(listening);
732       buffer.append(" receiving=").append(receiving);
733       buffer.append(" sessionConsumer=").append(sessionConsumer);
734       buffer.append(" waitingForMessage=").append(waitingForMessage);
735       buffer.append(" messages=").append(messages.size());
736       if (listenerThread != null)
737          buffer.append(" thread=").append(listenerThread);
738       if (messageListener != null)
739          buffer.append(" listener=").append(messageListener);
740       buffer.append(" session=").append(session);
741       buffer.append(']');
742       return buffer.toString();
743    }
744
745    Message JavaDoc getMessage()
746    {
747       synchronized (messages)
748       {
749          if (trace)
750             log.trace("Getting message from list " + this);
751          while (true)
752          {
753             try
754             {
755                if (messages.size() == 0)
756                   return null;
757
758                SpyMessage mes = (SpyMessage) messages.removeFirst();
759
760                Message JavaDoc rc = preProcessMessage(mes);
761                // could happen if the message has expired.
762
if (rc == null)
763                   continue;
764
765                return rc;
766             }
767             catch (Throwable JavaDoc t)
768             {
769                log.error("Ignoring error", t);
770             }
771          }
772       }
773    }
774
775    Message JavaDoc preProcessMessage(SpyMessage message) throws JMSException JavaDoc
776    {
777       message.session = session;
778       session.addUnacknowlegedMessage(message);
779
780       prepareDelivery(message);
781       
782       // Should we try to ack before the message is processed?
783
if (!isListening())
784       {
785          if (session.transacted)
786          {
787             if (trace)
788                log.trace("preprocess() acking message in tx message=" + message.getJMSMessageID() + " " + this);
789             session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
790          }
791          else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
792                || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
793          {
794             message.doAcknowledge();
795          }
796
797          if (message instanceof SpyEncapsulatedMessage)
798          {
799             return ((SpyEncapsulatedMessage) message).getMessage();
800          }
801          return message;
802       }
803       else
804       {
805          return message;
806       }
807    }
808
809    /**
810     * Prepare the message for delivery
811     *
812     * @param message the message
813     * @throws JMSException for any error
814     */

815    void prepareDelivery(SpyMessage message) throws JMSException JavaDoc
816    {
817       Integer JavaDoc delivery = ONCE;
818       Integer JavaDoc redelivery = (Integer JavaDoc) message.header.jmsProperties.get(SpyMessage.PROPERTY_REDELIVERY_COUNT);
819       if (redelivery != null)
820       {
821          int value = redelivery.intValue();
822          if (value != 0)
823             delivery = new Integer JavaDoc(value + 1);
824       }
825       message.header.jmsProperties.put(SpyMessage.PROPERTY_DELIVERY_COUNT, delivery);
826    }
827    
828    protected Destination JavaDoc getDestination() throws JMSException JavaDoc
829    {
830       checkClosed();
831       return subscription.destination;
832    }
833
834    protected boolean getNoLocal() throws JMSException JavaDoc
835    {
836       checkClosed();
837       return subscription.noLocal;
838    }
839
840    /**
841      * Are we listening
842      *
843      * @return true when listening, false otherwise
844      */

845    protected boolean isListening()
846    {
847       synchronized (stateLock)
848       {
849          return listening;
850       }
851    }
852
853    protected void sessionConsumerProcessMessage(SpyMessage message) throws JMSException JavaDoc
854    {
855       message.session = session;
856       //simply pass on to messageListener (if there is one)
857
MessageListener JavaDoc thisListener;
858       synchronized (stateLock)
859       {
860          thisListener = messageListener;
861       }
862
863       // Add the message to XAResource manager before we call onMessages since
864
// the
865
// resource may get elisted IN the onMessage method.
866
// This gives onMessage a chance to roll the message back.
867
Object JavaDoc anonymousTXID = null;
868       if (session.transacted)
869       {
870          // Only happens with XA transactions
871
if (session.getCurrentTransactionId() == null)
872          {
873             anonymousTXID = session.connection.spyXAResourceManager.startTx();
874             session.setCurrentTransactionId(anonymousTXID);
875          }
876          if (trace)
877             log.trace("consumer() acking message in tx message=" + message.getJMSMessageID() + " " + this);
878          session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
879       }
880
881       if (thisListener != null)
882       {
883          Message JavaDoc mes = message;
884          if (message instanceof SpyEncapsulatedMessage)
885          {
886             mes = ((SpyEncapsulatedMessage) message).getMessage();
887          }
888          session.addUnacknowlegedMessage((SpyMessage) mes);
889          if (trace)
890             log.trace("consumer() before onMessage=" + message.getJMSMessageID() + " " + this);
891          thisListener.onMessage(mes);
892          if (trace)
893             log.trace("consumer() after onMessage=" + message.getJMSMessageID() + " " + this);
894       }
895
896       if (session.transacted)
897       {
898          // If we started an anonymous tx
899
if (anonymousTXID != null)
900          {
901             if (session.getCurrentTransactionId() == anonymousTXID)
902             {
903                // We never got enlisted, so just commit the transaction
904
try
905                {
906                   if (trace)
907                      log.trace("XASession was not enlisted - Committing work using anonymous xid: " + anonymousTXID);
908                   session.connection.spyXAResourceManager.endTx(anonymousTXID, true);
909                   session.connection.spyXAResourceManager.commit(anonymousTXID, true);
910                }
911                catch (Throwable JavaDoc t)
912                {
913                   log.error("Could not commit", t);
914                }
915                finally
916                {
917                   session.unsetCurrentTransactionId(anonymousTXID);
918                }
919             }
920          }
921       }
922       else
923       {
924          // Should we Auto-ack the message since the message has now been
925
// processesed
926
if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
927                || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
928          {
929             message.doAcknowledge();
930          }
931       }
932    }
933    
934    /**
935     * Check whether we are closed
936     *
937     * @return true when closed
938     */

939    private boolean isClosed()
940    {
941       return closed.get();
942    }
943    
944    /**
945     * Check whether we are closed
946     *
947     * @throws IllegalStateException when the session is closed
948     */

949    private void checkClosed() throws IllegalStateException JavaDoc
950    {
951       if (closed.get())
952          throw new IllegalStateException JavaDoc("The consumer is closed");
953    }
954 }
955
Popular Tags