KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > PrefetchSubscription


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Iterator JavaDoc;
19 import java.util.LinkedList JavaDoc;
20 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
21 import javax.jms.InvalidSelectorException JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import org.apache.activemq.broker.Broker;
24 import org.apache.activemq.broker.ConnectionContext;
25 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
26 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
27 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ConsumerControl;
30 import org.apache.activemq.command.ConsumerInfo;
31 import org.apache.activemq.command.Message;
32 import org.apache.activemq.command.MessageAck;
33 import org.apache.activemq.command.MessageDispatch;
34 import org.apache.activemq.command.MessageDispatchNotification;
35 import org.apache.activemq.command.MessageId;
36 import org.apache.activemq.command.MessagePull;
37 import org.apache.activemq.command.Response;
38 import org.apache.activemq.thread.Scheduler;
39 import org.apache.activemq.transaction.Synchronization;
40 import org.apache.activemq.util.BrokerSupport;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 /**
45  * A subscription that honors the pre-fetch option of the ConsumerInfo.
46  *
47  * @version $Revision: 1.15 $
48  */

49 abstract public class PrefetchSubscription extends AbstractSubscription{
50
51     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
52     protected PendingMessageCursor pending;
53     final protected LinkedList JavaDoc dispatched=new LinkedList JavaDoc();
54     protected int prefetchExtension=0;
55     protected long enqueueCounter;
56     protected long dispatchCounter;
57     protected long dequeueCounter;
58     private AtomicBoolean JavaDoc dispatching=new AtomicBoolean JavaDoc();
59
60     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor)
61             throws InvalidSelectorException JavaDoc{
62         super(broker,context,info);
63         pending=cursor;
64     }
65
66     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
67             throws InvalidSelectorException JavaDoc{
68         this(broker,context,info,new VMPendingMessageCursor());
69     }
70
71     /**
72      * Allows a message to be pulled on demand by a client
73      */

74     public synchronized Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception JavaDoc{
75         // The slave should not deliver pull messages. TODO: when the slave becomes a master,
76
// He should send a NULL message to all the consumers to 'wake them up' in case
77
// they were waiting for a message.
78
if(getPrefetchSize()==0&&!isSlaveBroker()){
79             prefetchExtension++;
80             final long dispatchCounterBeforePull=dispatchCounter;
81             dispatchMatched();
82             // If there was nothing dispatched.. we may need to setup a timeout.
83
if(dispatchCounterBeforePull==dispatchCounter){
84                 // imediate timeout used by receiveNoWait()
85
if(pull.getTimeout()==-1){
86                     // Send a NULL message.
87
add(QueueMessageReference.NULL_MESSAGE);
88                     dispatchMatched();
89                 }
90                 if(pull.getTimeout()>0){
91                     Scheduler.executeAfterDelay(new Runnable JavaDoc(){
92
93                         public void run(){
94                             pullTimeout(dispatchCounterBeforePull);
95                         }
96                     },pull.getTimeout());
97                 }
98             }
99         }
100         return null;
101     }
102
103     /**
104      * Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL
105      * message.
106      */

107     private synchronized void pullTimeout(long dispatchCounterBeforePull){
108         if(dispatchCounterBeforePull==dispatchCounter){
109             try{
110                 add(QueueMessageReference.NULL_MESSAGE);
111                 dispatchMatched();
112             }catch(Exception JavaDoc e){
113                 context.getConnection().serviceException(e);
114             }
115         }
116     }
117
118     public synchronized void add(MessageReference node) throws Exception JavaDoc{
119         boolean pendingEmpty=false;
120         pendingEmpty=pending.isEmpty();
121         enqueueCounter++;
122        
123         if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
124             dispatch(node);
125         }else{
126             optimizePrefetch();
127             synchronized(pending){
128                 if(pending.isEmpty()&&log.isDebugEnabled()){
129                     log.debug("Prefetch limit.");
130                 }
131                 pending.addMessageLast(node);
132             }
133         }
134     }
135
136     public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception JavaDoc{
137         try{
138             pending.reset();
139             while(pending.hasNext()){
140                 MessageReference node=pending.next();
141                 if(node.getMessageId().equals(mdn.getMessageId())){
142                     pending.remove();
143                     createMessageDispatch(node,node.getMessage());
144                     dispatched.addLast(node);
145                     return;
146                 }
147             }
148         }finally{
149             pending.release();
150         }
151         throw new JMSException JavaDoc("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()
152                 +") was not in the pending list");
153     }
154
155     public synchronized void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception JavaDoc{
156         // Handle the standard acknowledgment case.
157
boolean callDispatchMatched=false;
158         if(ack.isStandardAck()){
159             // Acknowledge all dispatched messages up till the message id of the acknowledgment.
160
int index=0;
161             boolean inAckRange=false;
162             for(Iterator JavaDoc iter=dispatched.iterator();iter.hasNext();){
163                 final MessageReference node=(MessageReference)iter.next();
164                 MessageId messageId=node.getMessageId();
165                 if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
166                     inAckRange=true;
167                 }
168                 if(inAckRange){
169                     // Don't remove the nodes until we are committed.
170
if(!context.isInTransaction()){
171                         dequeueCounter++;
172                         node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
173                         iter.remove();
174                     }else{
175                         // setup a Synchronization to remove nodes from the dispatched list.
176
context.getTransaction().addSynchronization(new Synchronization(){
177
178                             public void afterCommit() throws Exception JavaDoc{
179                                 synchronized(PrefetchSubscription.this){
180                                     dequeueCounter++;
181                                     dispatched.remove(node);
182                                     node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
183                                     prefetchExtension--;
184                                 }
185                             }
186
187                             public void afterRollback() throws Exception JavaDoc{
188                                 super.afterRollback();
189                             }
190                         });
191                     }
192                     index++;
193                     acknowledge(context,ack,node);
194                     if(ack.getLastMessageId().equals(messageId)){
195                         if(context.isInTransaction()){
196                             // extend prefetch window only if not a pulling consumer
197
if(getPrefetchSize()!=0){
198                                 prefetchExtension=Math.max(prefetchExtension,index+1);
199                             }
200                         }else{
201                             prefetchExtension=Math.max(0,prefetchExtension-(index+1));
202                         }
203                         callDispatchMatched=true;
204                         break;
205                     }
206                 }
207             }
208             // this only happens after a reconnect - get an ack which is not valid
209
if(!callDispatchMatched){
210                 log.info("Could not correlate acknowledgment with dispatched message: "+ack);
211             }
212         }else if(ack.isDeliveredAck()){
213             // Message was delivered but not acknowledged: update pre-fetch counters.
214
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
215
int index=0;
216             for(Iterator JavaDoc iter=dispatched.iterator();iter.hasNext();index++){
217                 final MessageReference node=(MessageReference)iter.next();
218                 if(ack.getLastMessageId().equals(node.getMessageId())){
219                     prefetchExtension=Math.max(prefetchExtension,index+1);
220                     callDispatchMatched=true;
221                     break;
222                 }
223             }
224             if(!callDispatchMatched){
225                 throw new JMSException JavaDoc("Could not correlate acknowledgment with dispatched message: "+ack);
226             }
227         }else if(ack.isPoisonAck()){
228             // TODO: what if the message is already in a DLQ???
229
// Handle the poison ACK case: we need to send the message to a DLQ
230
if(ack.isInTransaction())
231                 throw new JMSException JavaDoc("Poison ack cannot be transacted: "+ack);
232             // Acknowledge all dispatched messages up till the message id of the acknowledgment.
233
int index=0;
234             boolean inAckRange=false;
235             for(Iterator JavaDoc iter=dispatched.iterator();iter.hasNext();){
236                 final MessageReference node=(MessageReference)iter.next();
237                 MessageId messageId=node.getMessageId();
238                 if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
239                     inAckRange=true;
240                 }
241                 if(inAckRange){
242                     sendToDLQ(context,node);
243                     node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
244                     iter.remove();
245                     dequeueCounter++;
246                     index++;
247                     acknowledge(context,ack,node);
248                     if(ack.getLastMessageId().equals(messageId)){
249                         prefetchExtension=Math.max(0,prefetchExtension-(index+1));
250                         callDispatchMatched=true;
251                         break;
252                     }
253                 }
254             }
255             if(!callDispatchMatched){
256                 throw new JMSException JavaDoc("Could not correlate acknowledgment with dispatched message: "+ack);
257             }
258         }
259         if(callDispatchMatched){
260             dispatchMatched();
261         }else{
262             if(isSlaveBroker()){
263                 throw new JMSException JavaDoc("Slave broker out of sync with master: Acknowledgment ("+ack
264                         +") was not in the dispatch list: "+dispatched);
265             }else{
266                 log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
267             }
268         }
269     }
270
271     /**
272      * @param context
273      * @param node
274      * @throws IOException
275      * @throws Exception
276      */

277     protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException JavaDoc,Exception JavaDoc{
278         // Send the message to the DLQ
279
Message message=node.getMessage();
280         if(message!=null){
281             // The original destination and transaction id do not get filled when the message is first
282
// sent,
283
// it is only populated if the message is routed to another destination like the DLQ
284
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
285             ActiveMQDestination deadLetterDestination=deadLetterStrategy
286                     .getDeadLetterQueueFor(message.getDestination());
287             BrokerSupport.resend(context,message,deadLetterDestination);
288         }
289     }
290
291     /**
292      * Used to determine if the broker can dispatch to the consumer.
293      *
294      * @return
295      */

296     protected synchronized boolean isFull(){
297         return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
298     }
299
300     /**
301      * @return true when 60% or more room is left for dispatching messages
302      */

303     public boolean isLowWaterMark(){
304         return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
305     }
306
307     /**
308      * @return true when 10% or less room is left for dispatching messages
309      */

310     public boolean isHighWaterMark(){
311         return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
312     }
313
314     public synchronized int countBeforeFull(){
315         return info.getPrefetchSize()+prefetchExtension-dispatched.size();
316     }
317
318     public int getPendingQueueSize(){
319         synchronized(pending){
320             return pending.size();
321         }
322     }
323
324     public int getDispatchedQueueSize(){
325         synchronized(dispatched){
326             return dispatched.size();
327         }
328     }
329
330     synchronized public long getDequeueCounter(){
331         return dequeueCounter;
332     }
333
334     synchronized public long getDispatchedCounter(){
335         return dispatchCounter;
336     }
337
338     synchronized public long getEnqueueCounter(){
339         return enqueueCounter;
340     }
341
342     public boolean isRecoveryRequired(){
343         return pending.isRecoveryRequired();
344     }
345     
346    
347     public PendingMessageCursor getPending(){
348         return this.pending;
349     }
350
351     public void setPending(PendingMessageCursor pending){
352         this.pending=pending;
353     }
354     
355    
356
357     /**
358      * optimize message consumer prefetch if the consumer supports it
359      *
360      */

361     public void optimizePrefetch(){
362         /*
363          * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
364          * &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
365          * isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
366          * updateConsumerPrefetch(info.getPrefetchSize()); }else
367          * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any
368          * outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
369          */

370     }
371
372     public synchronized void add(ConnectionContext context,Destination destination) throws Exception JavaDoc{
373         super.add(context,destination);
374         pending.add(context,destination);
375     }
376
377     public synchronized void remove(ConnectionContext context,Destination destination) throws Exception JavaDoc{
378         super.remove(context,destination);
379         pending.remove(context,destination);
380     }
381
382     protected synchronized void dispatchMatched() throws IOException JavaDoc{
383         if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){
384             try{
385                 try{
386                     int numberToDispatch=countBeforeFull();
387                     if(numberToDispatch>0){
388                         pending.setMaxBatchSize(numberToDispatch);
389                         int count=0;
390                         pending.reset();
391                         while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
392                             MessageReference node=pending.next();
393                             if(node==null)
394                                 break;
395                             if(canDispatch(node)){
396                                 pending.remove();
397                                 // Message may have been sitting in the pending list a while
398
// waiting for the consumer to ak the message.
399
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
400                                     continue; // just drop it.
401
}
402                                 dispatch(node);
403                                 count++;
404                             }
405                         }
406                     }
407                 }finally{
408                     pending.release();
409                 }
410             }finally{
411                 dispatching.set(false);
412             }
413         }
414     }
415
416     protected boolean dispatch(final MessageReference node) throws IOException JavaDoc{
417         final Message message=node.getMessage();
418         if(message==null){
419             return false;
420         }
421         // Make sure we can dispatch a message.
422
if(canDispatch(node)&&!isSlaveBroker()){
423             MessageDispatch md=createMessageDispatch(node,message);
424             // NULL messages don't count... they don't get Acked.
425
if(node!=QueueMessageReference.NULL_MESSAGE){
426                 dispatchCounter++;
427                 dispatched.addLast(node);
428             }else{
429                 prefetchExtension=Math.max(0,prefetchExtension-1);
430             }
431             if(info.isDispatchAsync()){
432                 md.setTransmitCallback(new Runnable JavaDoc(){
433
434                     public void run(){
435                         // Since the message gets queued up in async dispatch, we don't want to
436
// decrease the reference count until it gets put on the wire.
437
onDispatch(node,message);
438                     }
439                 });
440                 context.getConnection().dispatchAsync(md);
441             }else{
442                 context.getConnection().dispatchSync(md);
443                 onDispatch(node,message);
444             }
445             //System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter + ", " + dispatchCounter +") " + node);
446
return true;
447         }else{
448             return false;
449         }
450     }
451
452     protected void onDispatch(final MessageReference node,final Message message){
453         if(node.getRegionDestination()!=null){
454             if(node!=QueueMessageReference.NULL_MESSAGE){
455                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
456             }
457             try{
458                 dispatchMatched();
459             }catch(IOException JavaDoc e){
460                 context.getConnection().serviceExceptionAsync(e);
461             }
462         }
463     }
464
465     /**
466      * inform the MessageConsumer on the client to change it's prefetch
467      *
468      * @param newPrefetch
469      */

470     public void updateConsumerPrefetch(int newPrefetch){
471         if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
472             ConsumerControl cc=new ConsumerControl();
473             cc.setConsumerId(info.getConsumerId());
474             cc.setPrefetch(newPrefetch);
475             context.getConnection().dispatchAsync(cc);
476         }
477     }
478
479     /**
480      * @param node
481      * @param message
482      * @return MessageDispatch
483      */

484     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
485         if(node==QueueMessageReference.NULL_MESSAGE){
486             MessageDispatch md=new MessageDispatch();
487             md.setMessage(null);
488             md.setConsumerId(info.getConsumerId());
489             md.setDestination(null);
490             return md;
491         }else{
492             MessageDispatch md=new MessageDispatch();
493             md.setConsumerId(info.getConsumerId());
494             md.setDestination(node.getRegionDestination().getActiveMQDestination());
495             md.setMessage(message);
496             md.setRedeliveryCounter(node.getRedeliveryCounter());
497             return md;
498         }
499     }
500
501     /**
502      * Use when a matched message is about to be dispatched to the client.
503      *
504      * @param node
505      * @return false if the message should not be dispatched to the client (another sub may have already dispatched it
506      * for example).
507      * @throws IOException
508      */

509     abstract protected boolean canDispatch(MessageReference node) throws IOException JavaDoc;
510
511     /**
512      * Used during acknowledgment to remove the message.
513      *
514      * @throws IOException
515      */

516     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
517             throws IOException JavaDoc{
518     }
519
520     
521     
522 }
523
Popular Tags