1 14 15 package org.apache.activemq.broker.region; 16 17 import java.io.IOException ; 18 import java.util.Iterator ; 19 import java.util.LinkedList ; 20 import java.util.concurrent.atomic.AtomicBoolean ; 21 import javax.jms.InvalidSelectorException ; 22 import javax.jms.JMSException ; 23 import org.apache.activemq.broker.Broker; 24 import org.apache.activemq.broker.ConnectionContext; 25 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 26 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 27 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ConsumerControl; 30 import org.apache.activemq.command.ConsumerInfo; 31 import org.apache.activemq.command.Message; 32 import org.apache.activemq.command.MessageAck; 33 import org.apache.activemq.command.MessageDispatch; 34 import org.apache.activemq.command.MessageDispatchNotification; 35 import org.apache.activemq.command.MessageId; 36 import org.apache.activemq.command.MessagePull; 37 import org.apache.activemq.command.Response; 38 import org.apache.activemq.thread.Scheduler; 39 import org.apache.activemq.transaction.Synchronization; 40 import org.apache.activemq.util.BrokerSupport; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 49 abstract public class PrefetchSubscription extends AbstractSubscription{ 50 51 static private final Log log=LogFactory.getLog(PrefetchSubscription.class); 52 protected PendingMessageCursor pending; 53 final protected LinkedList dispatched=new LinkedList (); 54 protected int prefetchExtension=0; 55 protected long enqueueCounter; 56 protected long dispatchCounter; 57 protected long dequeueCounter; 58 private AtomicBoolean dispatching=new AtomicBoolean (); 59 60 public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor) 61 throws InvalidSelectorException { 62 super(broker,context,info); 63 pending=cursor; 64 } 65 66 public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) 67 throws InvalidSelectorException { 68 this(broker,context,info,new VMPendingMessageCursor()); 69 } 70 71 74 public synchronized Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception { 75 if(getPrefetchSize()==0&&!isSlaveBroker()){ 79 prefetchExtension++; 80 final long dispatchCounterBeforePull=dispatchCounter; 81 dispatchMatched(); 82 if(dispatchCounterBeforePull==dispatchCounter){ 84 if(pull.getTimeout()==-1){ 86 add(QueueMessageReference.NULL_MESSAGE); 88 dispatchMatched(); 89 } 90 if(pull.getTimeout()>0){ 91 Scheduler.executeAfterDelay(new Runnable (){ 92 93 public void run(){ 94 pullTimeout(dispatchCounterBeforePull); 95 } 96 },pull.getTimeout()); 97 } 98 } 99 } 100 return null; 101 } 102 103 107 private synchronized void pullTimeout(long dispatchCounterBeforePull){ 108 if(dispatchCounterBeforePull==dispatchCounter){ 109 try{ 110 add(QueueMessageReference.NULL_MESSAGE); 111 dispatchMatched(); 112 }catch(Exception e){ 113 context.getConnection().serviceException(e); 114 } 115 } 116 } 117 118 public synchronized void add(MessageReference node) throws Exception { 119 boolean pendingEmpty=false; 120 pendingEmpty=pending.isEmpty(); 121 enqueueCounter++; 122 123 if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){ 124 dispatch(node); 125 }else{ 126 optimizePrefetch(); 127 synchronized(pending){ 128 if(pending.isEmpty()&&log.isDebugEnabled()){ 129 log.debug("Prefetch limit."); 130 } 131 pending.addMessageLast(node); 132 } 133 } 134 } 135 136 public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 137 try{ 138 pending.reset(); 139 while(pending.hasNext()){ 140 MessageReference node=pending.next(); 141 if(node.getMessageId().equals(mdn.getMessageId())){ 142 pending.remove(); 143 createMessageDispatch(node,node.getMessage()); 144 dispatched.addLast(node); 145 return; 146 } 147 } 148 }finally{ 149 pending.release(); 150 } 151 throw new JMSException ("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId() 152 +") was not in the pending list"); 153 } 154 155 public synchronized void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 156 boolean callDispatchMatched=false; 158 if(ack.isStandardAck()){ 159 int index=0; 161 boolean inAckRange=false; 162 for(Iterator iter=dispatched.iterator();iter.hasNext();){ 163 final MessageReference node=(MessageReference)iter.next(); 164 MessageId messageId=node.getMessageId(); 165 if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ 166 inAckRange=true; 167 } 168 if(inAckRange){ 169 if(!context.isInTransaction()){ 171 dequeueCounter++; 172 node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); 173 iter.remove(); 174 }else{ 175 context.getTransaction().addSynchronization(new Synchronization(){ 177 178 public void afterCommit() throws Exception { 179 synchronized(PrefetchSubscription.this){ 180 dequeueCounter++; 181 dispatched.remove(node); 182 node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); 183 prefetchExtension--; 184 } 185 } 186 187 public void afterRollback() throws Exception { 188 super.afterRollback(); 189 } 190 }); 191 } 192 index++; 193 acknowledge(context,ack,node); 194 if(ack.getLastMessageId().equals(messageId)){ 195 if(context.isInTransaction()){ 196 if(getPrefetchSize()!=0){ 198 prefetchExtension=Math.max(prefetchExtension,index+1); 199 } 200 }else{ 201 prefetchExtension=Math.max(0,prefetchExtension-(index+1)); 202 } 203 callDispatchMatched=true; 204 break; 205 } 206 } 207 } 208 if(!callDispatchMatched){ 210 log.info("Could not correlate acknowledgment with dispatched message: "+ack); 211 } 212 }else if(ack.isDeliveredAck()){ 213 int index=0; 216 for(Iterator iter=dispatched.iterator();iter.hasNext();index++){ 217 final MessageReference node=(MessageReference)iter.next(); 218 if(ack.getLastMessageId().equals(node.getMessageId())){ 219 prefetchExtension=Math.max(prefetchExtension,index+1); 220 callDispatchMatched=true; 221 break; 222 } 223 } 224 if(!callDispatchMatched){ 225 throw new JMSException ("Could not correlate acknowledgment with dispatched message: "+ack); 226 } 227 }else if(ack.isPoisonAck()){ 228 if(ack.isInTransaction()) 231 throw new JMSException ("Poison ack cannot be transacted: "+ack); 232 int index=0; 234 boolean inAckRange=false; 235 for(Iterator iter=dispatched.iterator();iter.hasNext();){ 236 final MessageReference node=(MessageReference)iter.next(); 237 MessageId messageId=node.getMessageId(); 238 if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ 239 inAckRange=true; 240 } 241 if(inAckRange){ 242 sendToDLQ(context,node); 243 node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); 244 iter.remove(); 245 dequeueCounter++; 246 index++; 247 acknowledge(context,ack,node); 248 if(ack.getLastMessageId().equals(messageId)){ 249 prefetchExtension=Math.max(0,prefetchExtension-(index+1)); 250 callDispatchMatched=true; 251 break; 252 } 253 } 254 } 255 if(!callDispatchMatched){ 256 throw new JMSException ("Could not correlate acknowledgment with dispatched message: "+ack); 257 } 258 } 259 if(callDispatchMatched){ 260 dispatchMatched(); 261 }else{ 262 if(isSlaveBroker()){ 263 throw new JMSException ("Slave broker out of sync with master: Acknowledgment ("+ack 264 +") was not in the dispatch list: "+dispatched); 265 }else{ 266 log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack); 267 } 268 } 269 } 270 271 277 protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException ,Exception { 278 Message message=node.getMessage(); 280 if(message!=null){ 281 DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); 285 ActiveMQDestination deadLetterDestination=deadLetterStrategy 286 .getDeadLetterQueueFor(message.getDestination()); 287 BrokerSupport.resend(context,message,deadLetterDestination); 288 } 289 } 290 291 296 protected synchronized boolean isFull(){ 297 return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize(); 298 } 299 300 303 public boolean isLowWaterMark(){ 304 return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4); 305 } 306 307 310 public boolean isHighWaterMark(){ 311 return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9); 312 } 313 314 public synchronized int countBeforeFull(){ 315 return info.getPrefetchSize()+prefetchExtension-dispatched.size(); 316 } 317 318 public int getPendingQueueSize(){ 319 synchronized(pending){ 320 return pending.size(); 321 } 322 } 323 324 public int getDispatchedQueueSize(){ 325 synchronized(dispatched){ 326 return dispatched.size(); 327 } 328 } 329 330 synchronized public long getDequeueCounter(){ 331 return dequeueCounter; 332 } 333 334 synchronized public long getDispatchedCounter(){ 335 return dispatchCounter; 336 } 337 338 synchronized public long getEnqueueCounter(){ 339 return enqueueCounter; 340 } 341 342 public boolean isRecoveryRequired(){ 343 return pending.isRecoveryRequired(); 344 } 345 346 347 public PendingMessageCursor getPending(){ 348 return this.pending; 349 } 350 351 public void setPending(PendingMessageCursor pending){ 352 this.pending=pending; 353 } 354 355 356 357 361 public void optimizePrefetch(){ 362 370 } 371 372 public synchronized void add(ConnectionContext context,Destination destination) throws Exception { 373 super.add(context,destination); 374 pending.add(context,destination); 375 } 376 377 public synchronized void remove(ConnectionContext context,Destination destination) throws Exception { 378 super.remove(context,destination); 379 pending.remove(context,destination); 380 } 381 382 protected synchronized void dispatchMatched() throws IOException { 383 if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){ 384 try{ 385 try{ 386 int numberToDispatch=countBeforeFull(); 387 if(numberToDispatch>0){ 388 pending.setMaxBatchSize(numberToDispatch); 389 int count=0; 390 pending.reset(); 391 while(pending.hasNext()&&!isFull()&&count<numberToDispatch){ 392 MessageReference node=pending.next(); 393 if(node==null) 394 break; 395 if(canDispatch(node)){ 396 pending.remove(); 397 if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ 400 continue; } 402 dispatch(node); 403 count++; 404 } 405 } 406 } 407 }finally{ 408 pending.release(); 409 } 410 }finally{ 411 dispatching.set(false); 412 } 413 } 414 } 415 416 protected boolean dispatch(final MessageReference node) throws IOException { 417 final Message message=node.getMessage(); 418 if(message==null){ 419 return false; 420 } 421 if(canDispatch(node)&&!isSlaveBroker()){ 423 MessageDispatch md=createMessageDispatch(node,message); 424 if(node!=QueueMessageReference.NULL_MESSAGE){ 426 dispatchCounter++; 427 dispatched.addLast(node); 428 }else{ 429 prefetchExtension=Math.max(0,prefetchExtension-1); 430 } 431 if(info.isDispatchAsync()){ 432 md.setTransmitCallback(new Runnable (){ 433 434 public void run(){ 435 onDispatch(node,message); 438 } 439 }); 440 context.getConnection().dispatchAsync(md); 441 }else{ 442 context.getConnection().dispatchSync(md); 443 onDispatch(node,message); 444 } 445 return true; 447 }else{ 448 return false; 449 } 450 } 451 452 protected void onDispatch(final MessageReference node,final Message message){ 453 if(node.getRegionDestination()!=null){ 454 if(node!=QueueMessageReference.NULL_MESSAGE){ 455 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 456 } 457 try{ 458 dispatchMatched(); 459 }catch(IOException e){ 460 context.getConnection().serviceExceptionAsync(e); 461 } 462 } 463 } 464 465 470 public void updateConsumerPrefetch(int newPrefetch){ 471 if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){ 472 ConsumerControl cc=new ConsumerControl(); 473 cc.setConsumerId(info.getConsumerId()); 474 cc.setPrefetch(newPrefetch); 475 context.getConnection().dispatchAsync(cc); 476 } 477 } 478 479 484 protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ 485 if(node==QueueMessageReference.NULL_MESSAGE){ 486 MessageDispatch md=new MessageDispatch(); 487 md.setMessage(null); 488 md.setConsumerId(info.getConsumerId()); 489 md.setDestination(null); 490 return md; 491 }else{ 492 MessageDispatch md=new MessageDispatch(); 493 md.setConsumerId(info.getConsumerId()); 494 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 495 md.setMessage(message); 496 md.setRedeliveryCounter(node.getRedeliveryCounter()); 497 return md; 498 } 499 } 500 501 509 abstract protected boolean canDispatch(MessageReference node) throws IOException ; 510 511 516 protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node) 517 throws IOException { 518 } 519 520 521 522 } 523 | Popular Tags |