1 18 package org.apache.activemq; 19 20 import org.apache.activemq.command.*; 21 import org.apache.activemq.management.JMSConsumerStatsImpl; 22 import org.apache.activemq.management.StatsCapable; 23 import org.apache.activemq.management.StatsImpl; 24 import org.apache.activemq.selector.SelectorParser; 25 import org.apache.activemq.thread.Scheduler; 26 import org.apache.activemq.transaction.Synchronization; 27 import org.apache.activemq.util.Callback; 28 import org.apache.activemq.util.IntrospectionSupport; 29 import org.apache.activemq.util.JMSExceptionSupport; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 import javax.jms.IllegalStateException ; 34 import javax.jms.*; 35 import javax.jms.Message ; 36 import java.util.HashMap ; 37 import java.util.Iterator ; 38 import java.util.LinkedList ; 39 import java.util.concurrent.ExecutorService ; 40 import java.util.concurrent.Executors ; 41 import java.util.concurrent.TimeUnit ; 42 import java.util.concurrent.atomic.AtomicBoolean ; 43 44 79 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { 80 81 private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class); 82 83 protected final ActiveMQSession session; 84 protected final ConsumerInfo info; 85 86 private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); 88 89 private final LinkedList deliveredMessages = new LinkedList (); 93 private int deliveredCounter = 0; 94 private int additionalWindowSize = 0; 95 private int rollbackCounter = 0; 96 private long redeliveryDelay = 0; 97 private int ackCounter = 0; 98 private int dispatchedCount = 0; 99 private MessageListener messageListener; 100 private JMSConsumerStatsImpl stats; 101 102 private final String selector; 103 private boolean synchronizationRegistered = false; 104 private AtomicBoolean started = new AtomicBoolean (false); 105 106 private MessageAvailableListener availableListener; 107 108 private RedeliveryPolicy redeliveryPolicy; 109 private boolean optimizeAcknowledge; 110 private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean (); 111 private ExecutorService executorService = null; 112 private MessageTransformer transformer; 113 private boolean clearDispatchList; 114 115 129 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, 130 String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync) 131 throws JMSException { 132 if (dest == null) { 133 throw new InvalidDestinationException("Don't understand null destinations"); 134 } else if (dest.getPhysicalName() == null) { 135 throw new InvalidDestinationException("The destination object was not given a physical name."); 136 } else if (dest.isTemporary()) { 137 String physicalName = dest.getPhysicalName(); 138 139 if (physicalName == null) { 140 throw new IllegalArgumentException ("Physical name of Destination should be valid: " + dest); 141 } 142 143 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); 144 145 if (physicalName.indexOf(connectionID) < 0) { 146 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 147 } 148 149 if (session.connection.isDeleted(dest)) { 150 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 151 } 152 } 153 154 this.session = session; 155 this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); 156 setTransformer(session.getTransformer()); 157 158 this.info = new ConsumerInfo(consumerId); 159 this.info.setSubscriptionName(name); 160 this.info.setPrefetchSize(prefetch); 161 this.info.setCurrentPrefetchSize(prefetch); 162 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); 163 this.info.setNoLocal(noLocal); 164 this.info.setDispatchAsync(dispatchAsync); 165 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); 166 this.info.setSelector(null); 167 168 if (dest.getOptions() != null) { 170 HashMap options = new HashMap (dest.getOptions()); 171 IntrospectionSupport.setProperties(this.info, options, "consumer."); 172 } 173 174 this.info.setDestination(dest); 175 this.info.setBrowser(browser); 176 if (selector != null && selector.trim().length() != 0) { 177 new SelectorParser().parse(selector); 179 this.info.setSelector(selector); 180 this.selector = selector; 181 } else if (info.getSelector() != null) { 182 new SelectorParser().parse(this.info.getSelector()); 184 this.selector = this.info.getSelector(); 185 } else { 186 this.selector = null; 187 } 188 189 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); 190 this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge() 191 &&!info.isBrowser(); 192 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); 193 try { 194 this.session.addConsumer(this); 195 this.session.syncSendPacket(info); 196 } catch (JMSException e) { 197 this.session.removeConsumer(this); 198 throw e; 199 } 200 201 if(session.connection.isStarted()) 202 start(); 203 } 204 205 public StatsImpl getStats() { 206 return stats; 207 } 208 209 public JMSConsumerStatsImpl getConsumerStats() { 210 return stats; 211 } 212 213 public RedeliveryPolicy getRedeliveryPolicy() { 214 return redeliveryPolicy; 215 } 216 217 220 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 221 this.redeliveryPolicy = redeliveryPolicy; 222 } 223 224 public MessageTransformer getTransformer() { 225 return transformer; 226 } 227 228 231 public void setTransformer(MessageTransformer transformer) { 232 this.transformer = transformer; 233 } 234 235 236 239 protected ConsumerId getConsumerId() { 240 return info.getConsumerId(); 241 } 242 243 246 protected String getConsumerName() { 247 return this.info.getSubscriptionName(); 248 } 249 250 253 protected boolean isNoLocal() { 254 return info.isNoLocal(); 255 } 256 257 262 protected boolean isBrowser() { 263 return info.isBrowser(); 264 } 265 266 269 protected ActiveMQDestination getDestination() { 270 return info.getDestination(); 271 } 272 273 276 public int getPrefetchNumber() { 277 return info.getPrefetchSize(); 278 } 279 280 283 public boolean isDurableSubscriber() { 284 return info.getSubscriptionName()!=null && info.getDestination().isTopic(); 285 } 286 287 297 public String getMessageSelector() throws JMSException { 298 checkClosed(); 299 return selector; 300 } 301 302 312 public MessageListener getMessageListener() throws JMSException { 313 checkClosed(); 314 return this.messageListener; 315 } 316 317 334 public void setMessageListener(MessageListener listener) throws JMSException { 335 checkClosed(); 336 if (info.getPrefetchSize() == 0) { 337 throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); 338 } 339 this.messageListener = listener; 340 if (listener != null) { 341 boolean wasRunning = session.isRunning(); 342 if (wasRunning) 343 session.stop(); 344 345 session.redispatch(unconsumedMessages); 346 347 if (wasRunning) 348 session.start(); 349 350 } 351 } 352 353 354 public MessageAvailableListener getAvailableListener() { 355 return availableListener; 356 } 357 358 362 public void setAvailableListener(MessageAvailableListener availableListener) { 363 this.availableListener = availableListener; 364 } 365 366 379 private MessageDispatch dequeue(long timeout) throws JMSException { 380 try { 381 long deadline = 0; 382 if (timeout > 0) { 383 deadline = System.currentTimeMillis() + timeout; 384 } 385 while (true) { 386 MessageDispatch md = unconsumedMessages.dequeue(timeout); 387 if (md == null) { 388 if (timeout > 0 && !unconsumedMessages.isClosed()) { 389 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 390 } else { 391 return null; 392 } 393 } else if ( md.getMessage()==null ) { 394 return null; 395 } else if (md.getMessage().isExpired()) { 396 if (log.isDebugEnabled()) { 397 log.debug("Received expired message: " + md); 398 } 399 beforeMessageIsConsumed(md); 400 afterMessageIsConsumed(md, true); 401 if (timeout > 0) { 402 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 403 } 404 } else { 405 if (log.isDebugEnabled()) { 406 log.debug("Received message: " + md); 407 } 408 return md; 409 } 410 } 411 } catch (InterruptedException e) { 412 Thread.currentThread().interrupt(); 413 throw JMSExceptionSupport.create(e); 414 } 415 } 416 417 429 public Message receive() throws JMSException { 430 checkClosed(); 431 checkMessageListener(); 432 433 sendPullCommand(0); 434 MessageDispatch md = dequeue(-1); 435 if (md == null) 436 return null; 437 438 beforeMessageIsConsumed(md); 439 afterMessageIsConsumed(md, false); 440 441 return createActiveMQMessage(md); 442 } 443 444 448 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 449 ActiveMQMessage m = (ActiveMQMessage) md.getMessage().copy(); 450 if (transformer != null) { 451 Message transformedMessage = transformer.consumerTransform(session, this, m); 452 if (transformedMessage != null) { 453 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); 454 } 455 } 456 if (session.isClientAcknowledge()) { 457 m.setAcknowledgeCallback(new Callback() { 458 public void execute() throws Exception { 459 session.checkClosed(); 460 session.acknowledge(); 461 } 462 }); 463 } 464 return m; 465 } 466 467 481 public Message receive(long timeout) throws JMSException { 482 checkClosed(); 483 checkMessageListener(); 484 if (timeout == 0) { 485 return this.receive(); 486 487 } 488 489 sendPullCommand(timeout); 490 while (timeout > 0) { 491 492 MessageDispatch md; 493 if (info.getPrefetchSize() == 0) { 494 md = dequeue(-1); } else { 496 md = dequeue(timeout); 497 } 498 499 if (md == null) 500 return null; 501 502 beforeMessageIsConsumed(md); 503 afterMessageIsConsumed(md, false); 504 return createActiveMQMessage(md); 505 } 506 return null; 507 } 508 509 518 public Message receiveNoWait() throws JMSException { 519 checkClosed(); 520 checkMessageListener(); 521 sendPullCommand(-1); 522 523 MessageDispatch md; 524 if (info.getPrefetchSize() == 0) { 525 md = dequeue(-1); } else { 527 md = dequeue(0); 528 } 529 530 if (md == null) 531 return null; 532 533 beforeMessageIsConsumed(md); 534 afterMessageIsConsumed(md, false); 535 return createActiveMQMessage(md); 536 } 537 538 554 public void close() throws JMSException { 555 if (!unconsumedMessages.isClosed()) { 556 dispose(); 557 this.session.asyncSendPacket(info.createRemoveCommand()); 558 } 559 } 560 561 void clearMessagesInProgress(){ 562 clearDispatchList= true; 570 } 571 572 void deliverAcks(){ 573 MessageAck ack=null; 574 if(deliveryingAcknowledgements.compareAndSet(false,true)){ 575 if(this.optimizeAcknowledge){ 576 if(!deliveredMessages.isEmpty()){ 577 MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst(); 578 ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); 579 deliveredMessages.clear(); 580 ackCounter=0; 581 } 582 } 583 if(ack!=null){ 584 final MessageAck ackToSend=ack; 585 if(executorService==null){ 586 executorService=Executors.newSingleThreadExecutor(); 587 } 588 executorService.submit(new Runnable (){ 589 public void run(){ 590 try{ 591 session.asyncSendPacket(ackToSend); 592 }catch(JMSException e){ 593 log.error("Failed to delivered acknowledgements",e); 594 }finally{ 595 deliveryingAcknowledgements.set(false); 596 } 597 } 598 }); 599 }else{ 600 deliveryingAcknowledgements.set(false); 601 } 602 } 603 } 604 605 public void dispose() throws JMSException { 606 if (!unconsumedMessages.isClosed()) { 607 deliverAcks(); if (executorService!=null){ 612 executorService.shutdown(); 613 try { 614 executorService.awaitTermination(60, TimeUnit.SECONDS); 615 } catch (InterruptedException e) { 616 Thread.currentThread().interrupt(); 617 } 618 } 619 if ((session.isTransacted() || session.isDupsOkAcknowledge())) { 620 acknowledge(); 621 } 622 deliveredMessages.clear(); 623 unconsumedMessages.close(); 624 this.session.removeConsumer(this); 625 } 626 } 627 628 631 protected void checkClosed() throws IllegalStateException { 632 if (unconsumedMessages.isClosed()) { 633 throw new IllegalStateException ("The Consumer is closed"); 634 } 635 } 636 637 642 protected void sendPullCommand(long timeout) throws JMSException { 643 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 644 MessagePull messagePull = new MessagePull(); 645 messagePull.configure(info); 646 messagePull.setTimeout(timeout); 647 session.asyncSendPacket(messagePull); 648 } 649 } 650 651 protected void checkMessageListener() throws JMSException { 652 session.checkMessageListener(); 653 } 654 655 protected void setOptimizeAcknowledge(boolean value){ 656 if (optimizeAcknowledge && !value){ 657 deliverAcks(); 658 } 659 optimizeAcknowledge=value; 660 } 661 662 protected void setPrefetchSize(int prefetch){ 663 deliverAcks(); 664 this.info.setCurrentPrefetchSize(prefetch); 665 } 666 667 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 668 md.setDeliverySequenceId(session.getNextDeliveryId()); 669 if (!session.isDupsOkAcknowledge()) { 670 deliveredMessages.addFirst(md); 671 if( session.isTransacted() ) { 672 ackLater(md,MessageAck.DELIVERED_ACK_TYPE); 673 } 674 } 675 } 676 677 private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{ 678 if(unconsumedMessages.isClosed()) 679 return; 680 if(messageExpired){ 681 ackLater(md,MessageAck.DELIVERED_ACK_TYPE); 682 }else{ 683 stats.onMessage(); 684 if( session.isTransacted() ) { 685 } else if(session.isAutoAcknowledge()) { 686 if(!deliveredMessages.isEmpty()){ 687 if(optimizeAcknowledge){ 688 if(deliveryingAcknowledgements.compareAndSet(false,true)){ 689 ackCounter++; 690 if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){ 691 MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); 692 session.asyncSendPacket(ack); 693 ackCounter=0; 694 deliveredMessages.clear(); 695 } 696 deliveryingAcknowledgements.set(false); 697 } 698 }else{ 699 MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); 700 session.asyncSendPacket(ack); 701 deliveredMessages.clear(); 702 } 703 } 704 } else if(session.isDupsOkAcknowledge()){ 705 ackLater(md,MessageAck.STANDARD_ACK_TYPE); 706 } else if(session.isClientAcknowledge()){ 707 ackLater(md,MessageAck.DELIVERED_ACK_TYPE); 708 } else{ 709 throw new IllegalStateException ("Invalid session state."); 710 } 711 } 712 } 713 714 private void ackLater(MessageDispatch md, byte ackType) throws JMSException { 715 716 if (session.isTransacted()) { 720 session.doStartTransaction(); 721 if (!synchronizationRegistered) { 722 synchronizationRegistered = true; 723 session.getTransactionContext().addSynchronization(new Synchronization() { 724 public void beforeEnd() throws Exception { 725 acknowledge(); 726 synchronizationRegistered = false; 727 } 728 729 public void afterCommit() throws Exception { 730 commit(); 731 synchronizationRegistered = false; 732 } 733 734 public void afterRollback() throws Exception { 735 rollback(); 736 synchronizationRegistered = false; 737 } 738 }); 739 } 740 } 741 742 deliveredCounter++; 745 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { 746 MessageAck ack = new MessageAck(md, ackType, deliveredCounter); 747 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 748 session.asyncSendPacket(ack); 749 additionalWindowSize = deliveredCounter; 750 751 if (ackType == MessageAck.STANDARD_ACK_TYPE) { 753 deliveredCounter = additionalWindowSize = 0; 754 } 755 } 756 } 757 758 764 public void acknowledge() throws JMSException { 765 if (deliveredMessages.isEmpty()) 766 return; 767 768 MessageDispatch lastMd = (MessageDispatch) deliveredMessages.get(0); 770 MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); 771 if (session.isTransacted()) { 772 session.doStartTransaction(); 773 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 774 } 775 session.asyncSendPacket(ack); 776 777 deliveredCounter -= deliveredMessages.size(); 779 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 780 781 if (!session.isTransacted()) { 782 deliveredMessages.clear(); 783 } 784 } 785 786 public void commit() throws JMSException { 787 deliveredMessages.clear(); 788 rollbackCounter = 0; 789 redeliveryDelay = 0; 790 } 791 792 public void rollback() throws JMSException{ 793 synchronized(unconsumedMessages.getMutex()){ 794 if(optimizeAcknowledge){ 795 for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){ 797 deliveredMessages.removeLast(); 798 } 799 } 800 if(deliveredMessages.isEmpty()) 801 return; 802 803 if( rollbackCounter > 0 ) 805 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); 806 807 rollbackCounter++; 808 if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 809 && rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){ 810 MessageDispatch lastMd=(MessageDispatch) deliveredMessages.get(0); 814 MessageAck ack=new MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size()); 815 session.asyncSendPacket(ack); 816 additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size()); 818 rollbackCounter=0; 819 redeliveryDelay=0; 820 }else{ 821 unconsumedMessages.stop(); 823 824 for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){ 825 MessageDispatch md=(MessageDispatch) iter.next(); 826 md.getMessage().onMessageRolledBack(); 827 unconsumedMessages.enqueueFirst(md); 828 } 829 830 if( redeliveryDelay > 0 ) { 831 Scheduler.executeAfterDelay(new Runnable (){ 833 public void run(){ 834 try{ 835 if(started.get()) 836 start(); 837 }catch(JMSException e){ 838 session.connection.onAsyncException(e); 839 } 840 } 841 },redeliveryDelay); 842 } else { 843 start(); 844 } 845 846 } 847 deliveredCounter-=deliveredMessages.size(); 848 deliveredMessages.clear(); 849 } 850 if(messageListener!=null){ 851 session.redispatch(unconsumedMessages); 852 } 853 } 854 855 public void dispatch(MessageDispatch md) { 856 MessageListener listener = this.messageListener; 857 try { 858 synchronized(unconsumedMessages.getMutex()){ 859 if (clearDispatchList) { 860 clearDispatchList = false; 862 unconsumedMessages.clear(); 863 } 864 865 if (!unconsumedMessages.isClosed()) { 866 if (listener != null && unconsumedMessages.isRunning() ) { 867 ActiveMQMessage message = createActiveMQMessage(md); 868 beforeMessageIsConsumed(md); 869 try { 870 listener.onMessage(message); 871 afterMessageIsConsumed(md, false); 872 } catch (RuntimeException e) { 873 if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) { 874 } else { 876 afterMessageIsConsumed(md, false); 878 } 879 log.error("Exception while processing message: " + e, e); 880 } 881 } else { 882 unconsumedMessages.enqueue(md); 883 if (availableListener != null) { 884 availableListener.onMessageAvailable(this); 885 } 886 } 887 } 888 } 889 if (++dispatchedCount%1000==0) { 890 dispatchedCount=0; 891 Thread.yield(); 892 } 893 } catch (Exception e) { 894 session.connection.onAsyncException(e); 895 } 896 } 897 898 public int getMessageSize() { 899 return unconsumedMessages.size(); 900 } 901 902 public void start() throws JMSException { 903 if (unconsumedMessages.isClosed()) { 904 return; 905 } 906 started.set(true); 907 unconsumedMessages.start(); 908 session.executor.wakeup(); 909 } 910 911 public void stop() { 912 started.set(false); 913 unconsumedMessages.stop(); 914 } 915 916 public String toString() { 917 return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }"; 918 } 919 920 925 public boolean iterate() { 926 MessageListener listener = this.messageListener; 927 if( listener!=null ) { 928 MessageDispatch md = unconsumedMessages.dequeueNoWait(); 929 if( md!=null ) { 930 try { 931 ActiveMQMessage message = createActiveMQMessage(md); 932 beforeMessageIsConsumed(md); 933 listener.onMessage(message); 934 afterMessageIsConsumed(md, false); 935 } catch (JMSException e) { 936 session.connection.onAsyncException(e); 937 } 938 return true; 939 } 940 } 941 return false; 942 } 943 944 public boolean isInUse(ActiveMQTempDestination destination) { 945 return info.getDestination().equals(destination); 946 } 947 948 } 949 | Popular Tags |