KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQMessageConsumer


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import org.apache.activemq.command.*;
21 import org.apache.activemq.management.JMSConsumerStatsImpl;
22 import org.apache.activemq.management.StatsCapable;
23 import org.apache.activemq.management.StatsImpl;
24 import org.apache.activemq.selector.SelectorParser;
25 import org.apache.activemq.thread.Scheduler;
26 import org.apache.activemq.transaction.Synchronization;
27 import org.apache.activemq.util.Callback;
28 import org.apache.activemq.util.IntrospectionSupport;
29 import org.apache.activemq.util.JMSExceptionSupport;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 import javax.jms.IllegalStateException JavaDoc;
34 import javax.jms.*;
35 import javax.jms.Message JavaDoc;
36 import java.util.HashMap JavaDoc;
37 import java.util.Iterator JavaDoc;
38 import java.util.LinkedList JavaDoc;
39 import java.util.concurrent.ExecutorService JavaDoc;
40 import java.util.concurrent.Executors JavaDoc;
41 import java.util.concurrent.TimeUnit JavaDoc;
42 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
43
44 /**
45  * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
46  * from a destination. A <CODE> MessageConsumer</CODE> object is created by
47  * passing a <CODE>Destination</CODE> object to a message-consumer creation
48  * method supplied by a session.
49  * <P>
50  * <CODE>MessageConsumer</CODE> is the parent interface for all message
51  * consumers.
52  * <P>
53  * A message consumer can be created with a message selector. A message selector
54  * allows the client to restrict the messages delivered to the message consumer
55  * to those that match the selector.
56  * <P>
57  * A client may either synchronously receive a message consumer's messages or
58  * have the consumer asynchronously deliver them as they arrive.
59  * <P>
60  * For synchronous receipt, a client can request the next message from a message
61  * consumer using one of its <CODE> receive</CODE> methods. There are several
62  * variations of <CODE>receive</CODE> that allow a client to poll or wait for
63  * the next message.
64  * <P>
65  * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE>
66  * object with a message consumer. As messages arrive at the message consumer,
67  * it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
68  * onMessage</CODE> method.
69  * <P>
70  * It is a client programming error for a <CODE>MessageListener</CODE> to
71  * throw an exception.
72  *
73  * @version $Revision: 1.22 $
74  * @see javax.jms.MessageConsumer
75  * @see javax.jms.QueueReceiver
76  * @see javax.jms.TopicSubscriber
77  * @see javax.jms.Session
78  */

79 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
80
81     private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
82
83     protected final ActiveMQSession session;
84     protected final ConsumerInfo info;
85
86     // These are the messages waiting to be delivered to the client
87
private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
88
89     // The are the messages that were delivered to the consumer but that have
90
// not been acknowledged. It's kept in reverse order since we
91
// Always walk list in reverse order. Only used when session is client ack.
92
private final LinkedList JavaDoc deliveredMessages = new LinkedList JavaDoc();
93     private int deliveredCounter = 0;
94     private int additionalWindowSize = 0;
95     private int rollbackCounter = 0;
96     private long redeliveryDelay = 0;
97     private int ackCounter = 0;
98     private int dispatchedCount = 0;
99     private MessageListener messageListener;
100     private JMSConsumerStatsImpl stats;
101
102     private final String JavaDoc selector;
103     private boolean synchronizationRegistered = false;
104     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
105
106     private MessageAvailableListener availableListener;
107
108     private RedeliveryPolicy redeliveryPolicy;
109     private boolean optimizeAcknowledge;
110     private AtomicBoolean JavaDoc deliveryingAcknowledgements = new AtomicBoolean JavaDoc();
111     private ExecutorService JavaDoc executorService = null;
112     private MessageTransformer transformer;
113     private boolean clearDispatchList;
114
115     /**
116      * Create a MessageConsumer
117      *
118      * @param session
119      * @param dest
120      * @param name
121      * @param selector
122      * @param prefetch
123      * @param maximumPendingMessageCount TODO
124      * @param noLocal
125      * @param browser
126      * @param dispatchAsync
127      * @throws JMSException
128      */

129     public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
130             String JavaDoc name, String JavaDoc selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync)
131             throws JMSException {
132         if (dest == null) {
133             throw new InvalidDestinationException("Don't understand null destinations");
134         } else if (dest.getPhysicalName() == null) {
135                 throw new InvalidDestinationException("The destination object was not given a physical name.");
136         } else if (dest.isTemporary()) {
137             String JavaDoc physicalName = dest.getPhysicalName();
138
139             if (physicalName == null) {
140                 throw new IllegalArgumentException JavaDoc("Physical name of Destination should be valid: " + dest);
141             }
142
143             String JavaDoc connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
144
145             if (physicalName.indexOf(connectionID) < 0) {
146                 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
147             }
148
149             if (session.connection.isDeleted(dest)) {
150                 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
151             }
152         }
153
154         this.session = session;
155         this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
156         setTransformer(session.getTransformer());
157
158         this.info = new ConsumerInfo(consumerId);
159         this.info.setSubscriptionName(name);
160         this.info.setPrefetchSize(prefetch);
161         this.info.setCurrentPrefetchSize(prefetch);
162         this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
163         this.info.setNoLocal(noLocal);
164         this.info.setDispatchAsync(dispatchAsync);
165         this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
166         this.info.setSelector(null);
167
168         // Allows the options on the destination to configure the consumerInfo
169
if (dest.getOptions() != null) {
170             HashMap JavaDoc options = new HashMap JavaDoc(dest.getOptions());
171             IntrospectionSupport.setProperties(this.info, options, "consumer.");
172         }
173
174         this.info.setDestination(dest);
175         this.info.setBrowser(browser);
176         if (selector != null && selector.trim().length() != 0) {
177             // Validate the selector
178
new SelectorParser().parse(selector);
179             this.info.setSelector(selector);
180             this.selector = selector;
181         } else if (info.getSelector() != null) {
182             // Validate the selector
183
new SelectorParser().parse(this.info.getSelector());
184             this.selector = this.info.getSelector();
185         } else {
186             this.selector = null;
187         }
188
189         this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
190         this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
191                         &&!info.isBrowser();
192         this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
193         try {
194             this.session.addConsumer(this);
195             this.session.syncSendPacket(info);
196         } catch (JMSException e) {
197             this.session.removeConsumer(this);
198             throw e;
199         }
200         
201         if(session.connection.isStarted())
202             start();
203     }
204
205     public StatsImpl getStats() {
206         return stats;
207     }
208
209     public JMSConsumerStatsImpl getConsumerStats() {
210         return stats;
211     }
212
213     public RedeliveryPolicy getRedeliveryPolicy() {
214         return redeliveryPolicy;
215     }
216
217     /**
218      * Sets the redelivery policy used when messages are redelivered
219      */

220     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
221         this.redeliveryPolicy = redeliveryPolicy;
222     }
223
224     public MessageTransformer getTransformer() {
225         return transformer;
226     }
227
228     /**
229      * Sets the transformer used to transform messages before they are sent on to the JMS bus
230      */

231     public void setTransformer(MessageTransformer transformer) {
232         this.transformer = transformer;
233     }
234
235
236     /**
237      * @return Returns the value.
238      */

239     protected ConsumerId getConsumerId() {
240         return info.getConsumerId();
241     }
242
243     /**
244      * @return the consumer name - used for durable consumers
245      */

246     protected String JavaDoc getConsumerName() {
247         return this.info.getSubscriptionName();
248     }
249
250     /**
251      * @return true if this consumer does not accept locally produced messages
252      */

253     protected boolean isNoLocal() {
254         return info.isNoLocal();
255     }
256
257     /**
258      * Retrieve is a browser
259      *
260      * @return true if a browser
261      */

262     protected boolean isBrowser() {
263         return info.isBrowser();
264     }
265
266     /**
267      * @return ActiveMQDestination
268      */

269     protected ActiveMQDestination getDestination() {
270         return info.getDestination();
271     }
272
273     /**
274      * @return Returns the prefetchNumber.
275      */

276     public int getPrefetchNumber() {
277         return info.getPrefetchSize();
278     }
279
280     /**
281      * @return true if this is a durable topic subscriber
282      */

283     public boolean isDurableSubscriber() {
284         return info.getSubscriptionName()!=null && info.getDestination().isTopic();
285     }
286
287     /**
288      * Gets this message consumer's message selector expression.
289      *
290      * @return this message consumer's message selector, or null if no message
291      * selector exists for the message consumer (that is, if the message
292      * selector was not set or was set to null or the empty string)
293      * @throws JMSException
294      * if the JMS provider fails to receive the next message due to
295      * some internal error.
296      */

297     public String JavaDoc getMessageSelector() throws JMSException {
298         checkClosed();
299         return selector;
300     }
301
302     /**
303      * Gets the message consumer's <CODE>MessageListener</CODE>.
304      *
305      * @return the listener for the message consumer, or null if no listener is
306      * set
307      * @throws JMSException
308      * if the JMS provider fails to get the message listener due to
309      * some internal error.
310      * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
311      */

312     public MessageListener getMessageListener() throws JMSException {
313         checkClosed();
314         return this.messageListener;
315     }
316
317     /**
318      * Sets the message consumer's <CODE>MessageListener</CODE>.
319      * <P>
320      * Setting the message listener to null is the equivalent of unsetting the
321      * message listener for the message consumer.
322      * <P>
323      * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
324      * while messages are being consumed by an existing listener or the consumer
325      * is being used to consume messages synchronously is undefined.
326      *
327      * @param listener
328      * the listener to which the messages are to be delivered
329      * @throws JMSException
330      * if the JMS provider fails to receive the next message due to
331      * some internal error.
332      * @see javax.jms.MessageConsumer#getMessageListener
333      */

334     public void setMessageListener(MessageListener listener) throws JMSException {
335         checkClosed();
336         if (info.getPrefetchSize() == 0) {
337             throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
338         }
339         this.messageListener = listener;
340         if (listener != null) {
341             boolean wasRunning = session.isRunning();
342             if (wasRunning)
343                 session.stop();
344
345             session.redispatch(unconsumedMessages);
346
347             if (wasRunning)
348                 session.start();
349
350         }
351     }
352
353     
354     public MessageAvailableListener getAvailableListener() {
355         return availableListener;
356     }
357
358     /**
359      * Sets the listener used to notify synchronous consumers that there is a message
360      * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
361      */

362     public void setAvailableListener(MessageAvailableListener availableListener) {
363         this.availableListener = availableListener;
364     }
365
366     /**
367      * Used to get an enqueued message from the unconsumedMessages list. The
368      * amount of time this method blocks is based on the timeout value. - if
369      * timeout==-1 then it blocks until a message is received. - if timeout==0
370      * then it it tries to not block at all, it returns a message if it is
371      * available - if timeout>0 then it blocks up to timeout amount of time.
372      *
373      * Expired messages will consumed by this method.
374      *
375      * @throws JMSException
376      *
377      * @return null if we timeout or if the consumer is closed.
378      */

379     private MessageDispatch dequeue(long timeout) throws JMSException {
380         try {
381             long deadline = 0;
382             if (timeout > 0) {
383                 deadline = System.currentTimeMillis() + timeout;
384             }
385             while (true) {
386                 MessageDispatch md = unconsumedMessages.dequeue(timeout);
387                 if (md == null) {
388                     if (timeout > 0 && !unconsumedMessages.isClosed()) {
389                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
390                     } else {
391                         return null;
392                     }
393                 } else if ( md.getMessage()==null ) {
394                     return null;
395                 } else if (md.getMessage().isExpired()) {
396                     if (log.isDebugEnabled()) {
397                         log.debug("Received expired message: " + md);
398                     }
399                     beforeMessageIsConsumed(md);
400                     afterMessageIsConsumed(md, true);
401                     if (timeout > 0) {
402                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
403                     }
404                 } else {
405                     if (log.isDebugEnabled()) {
406                         log.debug("Received message: " + md);
407                     }
408                     return md;
409                 }
410             }
411         } catch (InterruptedException JavaDoc e) {
412             Thread.currentThread().interrupt();
413             throw JMSExceptionSupport.create(e);
414         }
415     }
416
417     /**
418      * Receives the next message produced for this message consumer.
419      * <P>
420      * This call blocks indefinitely until a message is produced or until this
421      * message consumer is closed.
422      * <P>
423      * If this <CODE>receive</CODE> is done within a transaction, the consumer
424      * retains the message until the transaction commits.
425      *
426      * @return the next message produced for this message consumer, or null if
427      * this message consumer is concurrently closed
428      */

429     public Message receive() throws JMSException {
430         checkClosed();
431         checkMessageListener();
432         
433         sendPullCommand(0);
434         MessageDispatch md = dequeue(-1);
435         if (md == null)
436             return null;
437
438         beforeMessageIsConsumed(md);
439         afterMessageIsConsumed(md, false);
440
441         return createActiveMQMessage(md);
442     }
443
444     /**
445      * @param md
446      * @return
447      */

448     private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
449         ActiveMQMessage m = (ActiveMQMessage) md.getMessage().copy();
450         if (transformer != null) {
451             Message transformedMessage = transformer.consumerTransform(session, this, m);
452             if (transformedMessage != null) {
453                 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
454             }
455         }
456         if (session.isClientAcknowledge()) {
457             m.setAcknowledgeCallback(new Callback() {
458                 public void execute() throws Exception JavaDoc {
459                     session.checkClosed();
460                     session.acknowledge();
461                 }
462             });
463         }
464         return m;
465     }
466
467     /**
468      * Receives the next message that arrives within the specified timeout
469      * interval.
470      * <P>
471      * This call blocks until a message arrives, the timeout expires, or this
472      * message consumer is closed. A <CODE>timeout</CODE> of zero never
473      * expires, and the call blocks indefinitely.
474      *
475      * @param timeout
476      * the timeout value (in milliseconds), a time out of zero never expires.
477      * @return the next message produced for this message consumer, or null if
478      * the timeout expires or this message consumer is concurrently
479      * closed
480      */

481     public Message receive(long timeout) throws JMSException {
482         checkClosed();
483         checkMessageListener();
484         if (timeout == 0) {
485             return this.receive();
486
487         }
488         
489         sendPullCommand(timeout);
490         while (timeout > 0) {
491             
492             MessageDispatch md;
493             if (info.getPrefetchSize() == 0) {
494                 md = dequeue(-1); // We let the broker let us know when we timeout.
495
} else {
496                 md = dequeue(timeout);
497             }
498
499             if (md == null)
500                 return null;
501
502             beforeMessageIsConsumed(md);
503             afterMessageIsConsumed(md, false);
504             return createActiveMQMessage(md);
505         }
506         return null;
507     }
508
509     /**
510      * Receives the next message if one is immediately available.
511      *
512      * @return the next message produced for this message consumer, or null if
513      * one is not available
514      * @throws JMSException
515      * if the JMS provider fails to receive the next message due to
516      * some internal error.
517      */

518     public Message receiveNoWait() throws JMSException {
519         checkClosed();
520         checkMessageListener();
521         sendPullCommand(-1);
522         
523         MessageDispatch md;
524         if (info.getPrefetchSize() == 0) {
525             md = dequeue(-1); // We let the broker let us know when we timeout.
526
} else {
527             md = dequeue(0);
528         }
529         
530         if (md == null)
531             return null;
532
533         beforeMessageIsConsumed(md);
534         afterMessageIsConsumed(md, false);
535         return createActiveMQMessage(md);
536     }
537
538     /**
539      * Closes the message consumer.
540      * <P>
541      * Since a provider may allocate some resources on behalf of a <CODE>
542      * MessageConsumer</CODE> outside the Java virtual machine, clients should
543      * close them when they are not needed. Relying on garbage collection to
544      * eventually reclaim these resources may not be timely enough.
545      * <P>
546      * This call blocks until a <CODE>receive</CODE> or message listener in
547      * progress has completed. A blocked message consumer <CODE>receive </CODE>
548      * call returns null when this message consumer is closed.
549      *
550      * @throws JMSException
551      * if the JMS provider fails to close the consumer due to some
552      * internal error.
553      */

554     public void close() throws JMSException {
555         if (!unconsumedMessages.isClosed()) {
556             dispose();
557             this.session.asyncSendPacket(info.createRemoveCommand());
558         }
559     }
560     
561     void clearMessagesInProgress(){
562         // we are called from inside the transport reconnection logic
563
// which involves us clearing all the connections' consumers
564
// dispatch lists and clearing them
565
// so rather than trying to grab a mutex (which could be already
566
// owned by the message listener calling the send) we will just set
567
// a flag so that the list can be cleared as soon as the
568
// dispatch thread is ready to flush the dispatch list
569
clearDispatchList= true;
570     }
571     
572     void deliverAcks(){
573         MessageAck ack=null;
574         if(deliveryingAcknowledgements.compareAndSet(false,true)){
575             if(this.optimizeAcknowledge){
576                 if(!deliveredMessages.isEmpty()){
577                     MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
578                     ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
579                     deliveredMessages.clear();
580                     ackCounter=0;
581                 }
582             }
583             if(ack!=null){
584                 final MessageAck ackToSend=ack;
585                 if(executorService==null){
586                     executorService=Executors.newSingleThreadExecutor();
587                 }
588                 executorService.submit(new Runnable JavaDoc(){
589                     public void run(){
590                         try{
591                             session.asyncSendPacket(ackToSend);
592                         }catch(JMSException e){
593                             log.error("Failed to delivered acknowledgements",e);
594                         }finally{
595                             deliveryingAcknowledgements.set(false);
596                         }
597                     }
598                 });
599             }else{
600                 deliveryingAcknowledgements.set(false);
601             }
602         }
603     }
604
605     public void dispose() throws JMSException {
606         if (!unconsumedMessages.isClosed()) {
607             // Do we have any acks we need to send out before closing?
608
// Ack any delivered messages now. (session may still
609
// commit/rollback the acks).
610
deliverAcks();//only processes optimized acknowledgements
611
if (executorService!=null){
612                 executorService.shutdown();
613                 try {
614                     executorService.awaitTermination(60, TimeUnit.SECONDS);
615                 } catch (InterruptedException JavaDoc e) {
616                     Thread.currentThread().interrupt();
617                 }
618             }
619             if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
620                 acknowledge();
621             }
622             deliveredMessages.clear();
623             unconsumedMessages.close();
624             this.session.removeConsumer(this);
625         }
626     }
627
628     /**
629      * @throws IllegalStateException
630      */

631     protected void checkClosed() throws IllegalStateException JavaDoc {
632         if (unconsumedMessages.isClosed()) {
633             throw new IllegalStateException JavaDoc("The Consumer is closed");
634         }
635     }
636     
637     /**
638      * If we have a zero prefetch specified then send a pull command to the broker to pull a message
639      * we are about to receive
640      *
641      */

642     protected void sendPullCommand(long timeout) throws JMSException {
643         if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
644             MessagePull messagePull = new MessagePull();
645             messagePull.configure(info);
646             messagePull.setTimeout(timeout);
647             session.asyncSendPacket(messagePull);
648         }
649     }
650
651     protected void checkMessageListener() throws JMSException {
652         session.checkMessageListener();
653     }
654     
655     protected void setOptimizeAcknowledge(boolean value){
656         if (optimizeAcknowledge && !value){
657             deliverAcks();
658         }
659         optimizeAcknowledge=value;
660     }
661     
662     protected void setPrefetchSize(int prefetch){
663         deliverAcks();
664         this.info.setCurrentPrefetchSize(prefetch);
665     }
666
667     private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
668         md.setDeliverySequenceId(session.getNextDeliveryId());
669         if (!session.isDupsOkAcknowledge()) {
670             deliveredMessages.addFirst(md);
671             if( session.isTransacted() ) {
672                 ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
673             }
674         }
675     }
676
677     private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{
678         if(unconsumedMessages.isClosed())
679             return;
680         if(messageExpired){
681             ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
682         }else{
683             stats.onMessage();
684             if( session.isTransacted() ) {
685             } else if(session.isAutoAcknowledge()) {
686                 if(!deliveredMessages.isEmpty()){
687                     if(optimizeAcknowledge){
688                         if(deliveryingAcknowledgements.compareAndSet(false,true)){
689                             ackCounter++;
690                             if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){
691                                 MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
692                                 session.asyncSendPacket(ack);
693                                 ackCounter=0;
694                                 deliveredMessages.clear();
695                             }
696                             deliveryingAcknowledgements.set(false);
697                         }
698                     }else{
699                         MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
700                         session.asyncSendPacket(ack);
701                         deliveredMessages.clear();
702                     }
703                 }
704             } else if(session.isDupsOkAcknowledge()){
705                 ackLater(md,MessageAck.STANDARD_ACK_TYPE);
706             } else if(session.isClientAcknowledge()){
707                 ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
708             } else{
709                 throw new IllegalStateException JavaDoc("Invalid session state.");
710             }
711         }
712     }
713
714     private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
715
716         // Don't acknowledge now, but we may need to let the broker know the
717
// consumer got the message
718
// to expand the pre-fetch window
719
if (session.isTransacted()) {
720             session.doStartTransaction();
721             if (!synchronizationRegistered) {
722                 synchronizationRegistered = true;
723                 session.getTransactionContext().addSynchronization(new Synchronization() {
724                     public void beforeEnd() throws Exception JavaDoc {
725                         acknowledge();
726                         synchronizationRegistered = false;
727                     }
728
729                     public void afterCommit() throws Exception JavaDoc {
730                         commit();
731                         synchronizationRegistered = false;
732                     }
733
734                     public void afterRollback() throws Exception JavaDoc {
735                         rollback();
736                         synchronizationRegistered = false;
737                     }
738                 });
739             }
740         }
741
742         // The delivered message list is only needed for the recover method
743
// which is only used with client ack.
744
deliveredCounter++;
745         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
746             MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
747             ack.setTransactionId(session.getTransactionContext().getTransactionId());
748             session.asyncSendPacket(ack);
749             additionalWindowSize = deliveredCounter;
750
751             // When using DUPS ok, we do a real ack.
752
if (ackType == MessageAck.STANDARD_ACK_TYPE) {
753                 deliveredCounter = additionalWindowSize = 0;
754             }
755         }
756     }
757
758     /**
759      * Acknowledge all the messages that have been delivered to the client upto
760      * this point.
761      *
762      * @throws JMSException
763      */

764     public void acknowledge() throws JMSException {
765         if (deliveredMessages.isEmpty())
766             return;
767
768         // Acknowledge the last message.
769
MessageDispatch lastMd = (MessageDispatch) deliveredMessages.get(0);
770         MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
771         if (session.isTransacted()) {
772             session.doStartTransaction();
773             ack.setTransactionId(session.getTransactionContext().getTransactionId());
774         }
775         session.asyncSendPacket(ack);
776
777         // Adjust the counters
778
deliveredCounter -= deliveredMessages.size();
779         additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
780
781         if (!session.isTransacted()) {
782             deliveredMessages.clear();
783         }
784     }
785
786     public void commit() throws JMSException {
787         deliveredMessages.clear();
788         rollbackCounter = 0;
789         redeliveryDelay = 0;
790     }
791
792     public void rollback() throws JMSException{
793         synchronized(unconsumedMessages.getMutex()){
794             if(optimizeAcknowledge){
795                 // remove messages read but not acked at the broker yet through optimizeAcknowledge
796
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
797                     deliveredMessages.removeLast();
798                 }
799             }
800             if(deliveredMessages.isEmpty())
801                 return;
802
803             // Only increase the redlivery delay after the first redelivery..
804
if( rollbackCounter > 0 )
805                 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
806
807             rollbackCounter++;
808             if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
809                     && rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
810                 // We need to NACK the messages so that they get sent to the
811
// DLQ.
812
// Acknowledge the last message.
813
MessageDispatch lastMd=(MessageDispatch) deliveredMessages.get(0);
814                 MessageAck ack=new MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size());
815                 session.asyncSendPacket(ack);
816                 // Adjust the window size.
817
additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size());
818                 rollbackCounter=0;
819                 redeliveryDelay=0;
820             }else{
821                 // stop the delivery of messages.
822
unconsumedMessages.stop();
823                                 
824                 for(Iterator JavaDoc iter=deliveredMessages.iterator();iter.hasNext();){
825                     MessageDispatch md=(MessageDispatch) iter.next();
826                     md.getMessage().onMessageRolledBack();
827                     unconsumedMessages.enqueueFirst(md);
828                 }
829                                 
830                 if( redeliveryDelay > 0 ) {
831                     // Start up the delivery again a little later.
832
Scheduler.executeAfterDelay(new Runnable JavaDoc(){
833                         public void run(){
834                             try{
835                                 if(started.get())
836                                     start();
837                             }catch(JMSException e){
838                                 session.connection.onAsyncException(e);
839                             }
840                         }
841                     },redeliveryDelay);
842                 } else {
843                     start();
844                 }
845
846             }
847             deliveredCounter-=deliveredMessages.size();
848             deliveredMessages.clear();
849         }
850         if(messageListener!=null){
851             session.redispatch(unconsumedMessages);
852         }
853     }
854
855     public void dispatch(MessageDispatch md) {
856         MessageListener listener = this.messageListener;
857         try {
858             synchronized(unconsumedMessages.getMutex()){
859                 if (clearDispatchList) {
860                     // we are reconnecting so lets flush the in progress messages
861
clearDispatchList = false;
862                     unconsumedMessages.clear();
863                 }
864
865                 if (!unconsumedMessages.isClosed()) {
866                     if (listener != null && unconsumedMessages.isRunning() ) {
867                         ActiveMQMessage message = createActiveMQMessage(md);
868                         beforeMessageIsConsumed(md);
869                         try {
870                             listener.onMessage(message);
871                             afterMessageIsConsumed(md, false);
872                         } catch (RuntimeException JavaDoc e) {
873                             if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
874                                 // Redeliver the message
875
} else {
876                                 // Transacted or Client ack: Deliver the next message.
877
afterMessageIsConsumed(md, false);
878                             }
879                             log.error("Exception while processing message: " + e, e);
880                         }
881                     } else {
882                         unconsumedMessages.enqueue(md);
883                         if (availableListener != null) {
884                             availableListener.onMessageAvailable(this);
885                         }
886                     }
887                 }
888             }
889             if (++dispatchedCount%1000==0) {
890                 dispatchedCount=0;
891             Thread.yield();
892             }
893         } catch (Exception JavaDoc e) {
894             session.connection.onAsyncException(e);
895         }
896     }
897
898     public int getMessageSize() {
899         return unconsumedMessages.size();
900     }
901
902     public void start() throws JMSException {
903         if (unconsumedMessages.isClosed()) {
904             return;
905         }
906         started.set(true);
907         unconsumedMessages.start();
908         session.executor.wakeup();
909     }
910
911     public void stop() {
912         started.set(false);
913         unconsumedMessages.stop();
914     }
915     
916     public String JavaDoc toString() {
917         return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }";
918     }
919
920     /**
921      * Delivers a message to the message listener.
922      * @return
923      * @throws JMSException
924      */

925     public boolean iterate() {
926         MessageListener listener = this.messageListener;
927         if( listener!=null ) {
928             MessageDispatch md = unconsumedMessages.dequeueNoWait();
929             if( md!=null ) {
930                 try {
931                     ActiveMQMessage message = createActiveMQMessage(md);
932                     beforeMessageIsConsumed(md);
933                     listener.onMessage(message);
934                     afterMessageIsConsumed(md, false);
935                 } catch (JMSException e) {
936                     session.connection.onAsyncException(e);
937                 }
938                 return true;
939             }
940         }
941         return false;
942     }
943     
944     public boolean isInUse(ActiveMQTempDestination destination) {
945         return info.getDestination().equals(destination);
946     }
947
948 }
949
Popular Tags