|                                                                                                              1
 14
 15  package org.apache.activemq.broker.region;
 16
 17  import java.io.IOException
  ; 18  import java.util.Iterator
  ; 19  import java.util.LinkedList
  ; 20  import java.util.concurrent.atomic.AtomicBoolean
  ; 21  import javax.jms.InvalidSelectorException
  ; 22  import javax.jms.JMSException
  ; 23  import org.apache.activemq.broker.Broker;
 24  import org.apache.activemq.broker.ConnectionContext;
 25  import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 26  import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 27  import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 28  import org.apache.activemq.command.ActiveMQDestination;
 29  import org.apache.activemq.command.ConsumerControl;
 30  import org.apache.activemq.command.ConsumerInfo;
 31  import org.apache.activemq.command.Message;
 32  import org.apache.activemq.command.MessageAck;
 33  import org.apache.activemq.command.MessageDispatch;
 34  import org.apache.activemq.command.MessageDispatchNotification;
 35  import org.apache.activemq.command.MessageId;
 36  import org.apache.activemq.command.MessagePull;
 37  import org.apache.activemq.command.Response;
 38  import org.apache.activemq.thread.Scheduler;
 39  import org.apache.activemq.transaction.Synchronization;
 40  import org.apache.activemq.util.BrokerSupport;
 41  import org.apache.commons.logging.Log;
 42  import org.apache.commons.logging.LogFactory;
 43
 44
 49  abstract public class PrefetchSubscription extends AbstractSubscription{
 50
 51      static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
 52      protected PendingMessageCursor pending;
 53      final protected LinkedList
  dispatched=new LinkedList  (); 54      protected int prefetchExtension=0;
 55      protected long enqueueCounter;
 56      protected long dispatchCounter;
 57      protected long dequeueCounter;
 58      private AtomicBoolean
  dispatching=new AtomicBoolean  (); 59
 60      public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor)
 61              throws InvalidSelectorException
  { 62          super(broker,context,info);
 63          pending=cursor;
 64      }
 65
 66      public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
 67              throws InvalidSelectorException
  { 68          this(broker,context,info,new VMPendingMessageCursor());
 69      }
 70
 71
 74      public synchronized Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception
  { 75                                  if(getPrefetchSize()==0&&!isSlaveBroker()){
 79              prefetchExtension++;
 80              final long dispatchCounterBeforePull=dispatchCounter;
 81              dispatchMatched();
 82                          if(dispatchCounterBeforePull==dispatchCounter){
 84                                  if(pull.getTimeout()==-1){
 86                                          add(QueueMessageReference.NULL_MESSAGE);
 88                      dispatchMatched();
 89                  }
 90                  if(pull.getTimeout()>0){
 91                      Scheduler.executeAfterDelay(new Runnable
  (){ 92
 93                          public void run(){
 94                              pullTimeout(dispatchCounterBeforePull);
 95                          }
 96                      },pull.getTimeout());
 97                  }
 98              }
 99          }
 100         return null;
 101     }
 102
 103
 107     private synchronized void pullTimeout(long dispatchCounterBeforePull){
 108         if(dispatchCounterBeforePull==dispatchCounter){
 109             try{
 110                 add(QueueMessageReference.NULL_MESSAGE);
 111                 dispatchMatched();
 112             }catch(Exception
  e){ 113                 context.getConnection().serviceException(e);
 114             }
 115         }
 116     }
 117
 118     public synchronized void add(MessageReference node) throws Exception
  { 119         boolean pendingEmpty=false;
 120         pendingEmpty=pending.isEmpty();
 121         enqueueCounter++;
 122
 123         if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
 124             dispatch(node);
 125         }else{
 126             optimizePrefetch();
 127             synchronized(pending){
 128                 if(pending.isEmpty()&&log.isDebugEnabled()){
 129                     log.debug("Prefetch limit.");
 130                 }
 131                 pending.addMessageLast(node);
 132             }
 133         }
 134     }
 135
 136     public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception
  { 137         try{
 138             pending.reset();
 139             while(pending.hasNext()){
 140                 MessageReference node=pending.next();
 141                 if(node.getMessageId().equals(mdn.getMessageId())){
 142                     pending.remove();
 143                     createMessageDispatch(node,node.getMessage());
 144                     dispatched.addLast(node);
 145                     return;
 146                 }
 147             }
 148         }finally{
 149             pending.release();
 150         }
 151         throw new JMSException
  ("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId() 152                 +") was not in the pending list");
 153     }
 154
 155     public synchronized void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception
  { 156                 boolean callDispatchMatched=false;
 158         if(ack.isStandardAck()){
 159                         int index=0;
 161             boolean inAckRange=false;
 162             for(Iterator
  iter=dispatched.iterator();iter.hasNext();){ 163                 final MessageReference node=(MessageReference)iter.next();
 164                 MessageId messageId=node.getMessageId();
 165                 if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
 166                     inAckRange=true;
 167                 }
 168                 if(inAckRange){
 169                                         if(!context.isInTransaction()){
 171                         dequeueCounter++;
 172                         node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
 173                         iter.remove();
 174                     }else{
 175                                                 context.getTransaction().addSynchronization(new Synchronization(){
 177
 178                             public void afterCommit() throws Exception
  { 179                                 synchronized(PrefetchSubscription.this){
 180                                     dequeueCounter++;
 181                                     dispatched.remove(node);
 182                                     node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
 183                                     prefetchExtension--;
 184                                 }
 185                             }
 186
 187                             public void afterRollback() throws Exception
  { 188                                 super.afterRollback();
 189                             }
 190                         });
 191                     }
 192                     index++;
 193                     acknowledge(context,ack,node);
 194                     if(ack.getLastMessageId().equals(messageId)){
 195                         if(context.isInTransaction()){
 196                                                         if(getPrefetchSize()!=0){
 198                                 prefetchExtension=Math.max(prefetchExtension,index+1);
 199                             }
 200                         }else{
 201                             prefetchExtension=Math.max(0,prefetchExtension-(index+1));
 202                         }
 203                         callDispatchMatched=true;
 204                         break;
 205                     }
 206                 }
 207             }
 208                         if(!callDispatchMatched){
 210                 log.info("Could not correlate acknowledgment with dispatched message: "+ack);
 211             }
 212         }else if(ack.isDeliveredAck()){
 213                                     int index=0;
 216             for(Iterator
  iter=dispatched.iterator();iter.hasNext();index++){ 217                 final MessageReference node=(MessageReference)iter.next();
 218                 if(ack.getLastMessageId().equals(node.getMessageId())){
 219                     prefetchExtension=Math.max(prefetchExtension,index+1);
 220                     callDispatchMatched=true;
 221                     break;
 222                 }
 223             }
 224             if(!callDispatchMatched){
 225                 throw new JMSException
  ("Could not correlate acknowledgment with dispatched message: "+ack); 226             }
 227         }else if(ack.isPoisonAck()){
 228                                     if(ack.isInTransaction())
 231                 throw new JMSException
  ("Poison ack cannot be transacted: "+ack); 232                         int index=0;
 234             boolean inAckRange=false;
 235             for(Iterator
  iter=dispatched.iterator();iter.hasNext();){ 236                 final MessageReference node=(MessageReference)iter.next();
 237                 MessageId messageId=node.getMessageId();
 238                 if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
 239                     inAckRange=true;
 240                 }
 241                 if(inAckRange){
 242                     sendToDLQ(context,node);
 243                     node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
 244                     iter.remove();
 245                     dequeueCounter++;
 246                     index++;
 247                     acknowledge(context,ack,node);
 248                     if(ack.getLastMessageId().equals(messageId)){
 249                         prefetchExtension=Math.max(0,prefetchExtension-(index+1));
 250                         callDispatchMatched=true;
 251                         break;
 252                     }
 253                 }
 254             }
 255             if(!callDispatchMatched){
 256                 throw new JMSException
  ("Could not correlate acknowledgment with dispatched message: "+ack); 257             }
 258         }
 259         if(callDispatchMatched){
 260             dispatchMatched();
 261         }else{
 262             if(isSlaveBroker()){
 263                 throw new JMSException
  ("Slave broker out of sync with master: Acknowledgment ("+ack 264                         +") was not in the dispatch list: "+dispatched);
 265             }else{
 266                 log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
 267             }
 268         }
 269     }
 270
 271
 277     protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException
  ,Exception  { 278                 Message message=node.getMessage();
 280         if(message!=null){
 281                                                 DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
 285             ActiveMQDestination deadLetterDestination=deadLetterStrategy
 286                     .getDeadLetterQueueFor(message.getDestination());
 287             BrokerSupport.resend(context,message,deadLetterDestination);
 288         }
 289     }
 290
 291
 296     protected synchronized boolean isFull(){
 297         return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
 298     }
 299
 300
 303     public boolean isLowWaterMark(){
 304         return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
 305     }
 306
 307
 310     public boolean isHighWaterMark(){
 311         return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
 312     }
 313
 314     public synchronized int countBeforeFull(){
 315         return info.getPrefetchSize()+prefetchExtension-dispatched.size();
 316     }
 317
 318     public int getPendingQueueSize(){
 319         synchronized(pending){
 320             return pending.size();
 321         }
 322     }
 323
 324     public int getDispatchedQueueSize(){
 325         synchronized(dispatched){
 326             return dispatched.size();
 327         }
 328     }
 329
 330     synchronized public long getDequeueCounter(){
 331         return dequeueCounter;
 332     }
 333
 334     synchronized public long getDispatchedCounter(){
 335         return dispatchCounter;
 336     }
 337
 338     synchronized public long getEnqueueCounter(){
 339         return enqueueCounter;
 340     }
 341
 342     public boolean isRecoveryRequired(){
 343         return pending.isRecoveryRequired();
 344     }
 345
 346
 347     public PendingMessageCursor getPending(){
 348         return this.pending;
 349     }
 350
 351     public void setPending(PendingMessageCursor pending){
 352         this.pending=pending;
 353     }
 354
 355
 356
 357
 361     public void optimizePrefetch(){
 362
 370     }
 371
 372     public synchronized void add(ConnectionContext context,Destination destination) throws Exception
  { 373         super.add(context,destination);
 374         pending.add(context,destination);
 375     }
 376
 377     public synchronized void remove(ConnectionContext context,Destination destination) throws Exception
  { 378         super.remove(context,destination);
 379         pending.remove(context,destination);
 380     }
 381
 382     protected synchronized void dispatchMatched() throws IOException
  { 383         if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){
 384             try{
 385                 try{
 386                     int numberToDispatch=countBeforeFull();
 387                     if(numberToDispatch>0){
 388                         pending.setMaxBatchSize(numberToDispatch);
 389                         int count=0;
 390                         pending.reset();
 391                         while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
 392                             MessageReference node=pending.next();
 393                             if(node==null)
 394                                 break;
 395                             if(canDispatch(node)){
 396                                 pending.remove();
 397                                                                                                 if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
 400                                     continue;                                 }
 402                                 dispatch(node);
 403                                 count++;
 404                             }
 405                         }
 406                     }
 407                 }finally{
 408                     pending.release();
 409                 }
 410             }finally{
 411                 dispatching.set(false);
 412             }
 413         }
 414     }
 415
 416     protected boolean dispatch(final MessageReference node) throws IOException
  { 417         final Message message=node.getMessage();
 418         if(message==null){
 419             return false;
 420         }
 421                 if(canDispatch(node)&&!isSlaveBroker()){
 423             MessageDispatch md=createMessageDispatch(node,message);
 424                         if(node!=QueueMessageReference.NULL_MESSAGE){
 426                 dispatchCounter++;
 427                 dispatched.addLast(node);
 428             }else{
 429                 prefetchExtension=Math.max(0,prefetchExtension-1);
 430             }
 431             if(info.isDispatchAsync()){
 432                 md.setTransmitCallback(new Runnable
  (){ 433
 434                     public void run(){
 435                                                                         onDispatch(node,message);
 438                     }
 439                 });
 440                 context.getConnection().dispatchAsync(md);
 441             }else{
 442                 context.getConnection().dispatchSync(md);
 443                 onDispatch(node,message);
 444             }
 445                         return true;
 447         }else{
 448             return false;
 449         }
 450     }
 451
 452     protected void onDispatch(final MessageReference node,final Message message){
 453         if(node.getRegionDestination()!=null){
 454             if(node!=QueueMessageReference.NULL_MESSAGE){
 455                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
 456             }
 457             try{
 458                 dispatchMatched();
 459             }catch(IOException
  e){ 460                 context.getConnection().serviceExceptionAsync(e);
 461             }
 462         }
 463     }
 464
 465
 470     public void updateConsumerPrefetch(int newPrefetch){
 471         if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
 472             ConsumerControl cc=new ConsumerControl();
 473             cc.setConsumerId(info.getConsumerId());
 474             cc.setPrefetch(newPrefetch);
 475             context.getConnection().dispatchAsync(cc);
 476         }
 477     }
 478
 479
 484     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
 485         if(node==QueueMessageReference.NULL_MESSAGE){
 486             MessageDispatch md=new MessageDispatch();
 487             md.setMessage(null);
 488             md.setConsumerId(info.getConsumerId());
 489             md.setDestination(null);
 490             return md;
 491         }else{
 492             MessageDispatch md=new MessageDispatch();
 493             md.setConsumerId(info.getConsumerId());
 494             md.setDestination(node.getRegionDestination().getActiveMQDestination());
 495             md.setMessage(message);
 496             md.setRedeliveryCounter(node.getRedeliveryCounter());
 497             return md;
 498         }
 499     }
 500
 501
 509     abstract protected boolean canDispatch(MessageReference node) throws IOException
  ; 510
 511
 516     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
 517             throws IOException
  { 518     }
 519
 520
 521
 522 }
 523
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |