1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.Iterator ; 23 import java.util.LinkedList ; 24 import java.util.List ; 25 import java.util.concurrent.CopyOnWriteArrayList ; 26 27 import javax.jms.InvalidSelectorException ; 28 import javax.jms.JMSException ; 29 30 import org.apache.activemq.broker.ConnectionContext; 31 import org.apache.activemq.broker.ProducerBrokerExchange; 32 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 33 import org.apache.activemq.broker.region.cursors.StoreQueueCursor; 34 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 35 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; 36 import org.apache.activemq.broker.region.group.MessageGroupMap; 37 import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 38 import org.apache.activemq.broker.region.group.MessageGroupSet; 39 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 40 import org.apache.activemq.broker.region.policy.DispatchPolicy; 41 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 42 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; 43 import org.apache.activemq.command.ActiveMQDestination; 44 import org.apache.activemq.command.ConsumerId; 45 import org.apache.activemq.command.ExceptionResponse; 46 import org.apache.activemq.command.Message; 47 import org.apache.activemq.command.MessageAck; 48 import org.apache.activemq.command.MessageId; 49 import org.apache.activemq.command.ProducerAck; 50 import org.apache.activemq.command.Response; 51 import org.apache.activemq.filter.BooleanExpression; 52 import org.apache.activemq.filter.MessageEvaluationContext; 53 import org.apache.activemq.kaha.Store; 54 import org.apache.activemq.memory.UsageManager; 55 import org.apache.activemq.selector.SelectorParser; 56 import org.apache.activemq.store.MessageRecoveryListener; 57 import org.apache.activemq.store.MessageStore; 58 import org.apache.activemq.thread.Task; 59 import org.apache.activemq.thread.TaskRunner; 60 import org.apache.activemq.thread.TaskRunnerFactory; 61 import org.apache.activemq.thread.Valve; 62 import org.apache.activemq.transaction.Synchronization; 63 import org.apache.activemq.util.BrokerSupport; 64 import org.apache.commons.logging.Log; 65 import org.apache.commons.logging.LogFactory; 66 67 73 public class Queue implements Destination, Task { 74 75 private final Log log; 76 77 private final ActiveMQDestination destination; 78 private final List consumers = new CopyOnWriteArrayList (); 79 private final Valve dispatchValve = new Valve(true); 80 private final UsageManager usageManager; 81 private final DestinationStatistics destinationStatistics = new DestinationStatistics(); 82 private PendingMessageCursor messages; 83 private final LinkedList pagedInMessages = new LinkedList (); 84 private LockOwner exclusiveOwner; 85 private MessageGroupMap messageGroupOwners; 86 87 private int garbageSize = 0; 88 private int garbageSizeBeforeCollection = 1000; 89 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); 90 private final MessageStore store; 91 private int highestSubscriptionPriority = Integer.MIN_VALUE; 92 private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); 93 private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); 94 private int maximumPagedInMessages = garbageSizeBeforeCollection * 2; 95 private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext(); 96 private final Object exclusiveLockMutex = new Object (); 97 private final Object doDispatchMutex = new Object (); 98 private TaskRunner taskRunner; 99 private boolean started = false; 100 101 public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, 102 TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { 103 this.destination = destination; 104 this.usageManager = new UsageManager(memoryManager,destination.toString()); 105 this.usageManager.setUsagePortion(1.0f); 106 this.store = store; 107 if(destination.isTemporary()){ 108 this.messages=new VMPendingMessageCursor(); 109 }else{ 110 this.messages=new StoreQueueCursor(this,tmpStore); 111 } 112 113 this.taskRunner = taskFactory.createTaskRunner(this, "Queue "+destination.getPhysicalName()); 114 115 if (store != null) { 119 store.setUsageManager(usageManager); 120 } 121 122 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 124 destinationStatistics.setParent(parentStats); 125 this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); 126 127 128 } 129 130 public void initialize() throws Exception { 131 if(store!=null){ 132 messages.setUsageManager(getUsageManager()); 134 if(messages.isRecoveryRequired()){ 135 store.recover(new MessageRecoveryListener(){ 136 137 public void recoverMessage(Message message){ 138 if(message.isExpired()){ 140 return; 142 } 143 message.setRegionDestination(Queue.this); 144 synchronized(messages){ 145 try{ 146 messages.addMessageLast(message); 147 }catch(Exception e){ 148 log.fatal("Failed to add message to cursor",e); 149 } 150 } 151 destinationStatistics.getMessages().increment(); 152 } 153 154 public void recoverMessageReference(MessageId messageReference) throws Exception { 155 throw new RuntimeException ("Should not be called."); 156 } 157 158 public void finished(){ 159 } 160 161 public boolean hasSpace(){ 162 return true; 163 } 164 }); 165 } 166 } 167 } 168 169 178 public boolean lock(MessageReference node,LockOwner lockOwner){ 179 synchronized(exclusiveLockMutex){ 180 if(exclusiveOwner==lockOwner){ 181 return true; 182 } 183 if(exclusiveOwner!=null){ 184 return false; 185 } 186 if(lockOwner.getLockPriority()<highestSubscriptionPriority){ 187 return false; 188 } 189 if(lockOwner.isLockExclusive()){ 190 exclusiveOwner=lockOwner; 191 } 192 } 193 return true; 194 } 195 196 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 197 sub.add(context, this); 198 destinationStatistics.getConsumers().increment(); 199 maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize(); 200 201 202 203 MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); 204 try{ 205 synchronized(consumers){ 206 if (sub.getConsumerInfo().isExclusive()) { 207 consumers.add(0, sub); 210 } else { 211 consumers.add(sub); 212 } 213 } 214 doPageIn(); 216 dispatchValve.turnOff(); 221 if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) { 222 highestSubscriptionPriority = sub.getConsumerInfo().getPriority(); 223 } 224 msgContext.setDestination(destination); 225 synchronized(pagedInMessages){ 226 for(Iterator i=pagedInMessages.iterator();i.hasNext();){ 229 QueueMessageReference node=(QueueMessageReference)i.next(); 230 if(node.isDropped()){ 231 continue; 232 } 233 try{ 234 msgContext.setMessageReference(node); 235 if(sub.matches(node,msgContext)){ 236 sub.add(node); 237 } 238 }catch(IOException e){ 239 log.warn("Could not load message: "+e,e); 240 } 241 } 242 } 243 }finally{ 244 msgContext.clear(); 245 dispatchValve.turnOn(); 246 } 247 } 248 249 public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { 250 251 destinationStatistics.getConsumers().decrement(); 252 maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize(); 253 254 dispatchValve.turnOff(); 258 259 try { 260 261 synchronized (consumers) { 262 consumers.remove(sub); 263 if (consumers.isEmpty()) { 264 messages.gc(); 265 } 266 } 267 sub.remove(context, this); 268 269 highestSubscriptionPriority = calcHighestSubscriptionPriority(); 270 271 boolean wasExclusiveOwner = false; 272 if (exclusiveOwner == sub) { 273 exclusiveOwner = null; 274 wasExclusiveOwner = true; 275 } 276 277 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); 278 MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId); 279 280 if (!sub.getConsumerInfo().isBrowser()) { 281 MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); 282 try { 283 msgContext.setDestination(destination); 284 285 List messagesToDispatch = new ArrayList (); 287 synchronized (pagedInMessages) { 288 for(Iterator i = pagedInMessages.iterator();i.hasNext();) { 289 QueueMessageReference node = (QueueMessageReference) i.next(); 290 if (node.isDropped()) { 291 continue; 292 } 293 294 String groupID = node.getGroupID(); 295 296 if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) { 298 messagesToDispatch.add(node); 299 } 300 } 301 } 302 303 for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) { 306 QueueMessageReference node = (QueueMessageReference) iter.next(); 307 node.incrementRedeliveryCounter(); 308 node.unlock(); 309 msgContext.setMessageReference(node); 310 dispatchPolicy.dispatch(node, msgContext, consumers); 311 } 312 } 313 finally { 314 msgContext.clear(); 315 } 316 } 317 } 318 finally { 319 dispatchValve.turnOn(); 320 } 321 322 } 323 324 private final LinkedList <Runnable > messagesWaitingForSpace = new LinkedList <Runnable >(); 325 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable () { 326 public void run() { 327 328 331 synchronized( messagesWaitingForSpace ) { 332 while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) { 333 Runnable op = messagesWaitingForSpace.removeFirst(); 334 op.run(); 335 } 336 } 337 338 }; 339 }; 340 341 public void send(final ProducerBrokerExchange producerExchange,final Message message) throws Exception { 342 final ConnectionContext context = producerExchange.getConnectionContext(); 343 if(message.isExpired()){ 346 if (log.isDebugEnabled()) { 347 log.debug("Expired message: " + message); 348 } 349 if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { 350 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); 351 context.getConnection().dispatchAsync(ack); 352 } 353 return; 354 } 355 if ( context.isProducerFlowControl() && usageManager.isFull() ) { 356 if(usageManager.isSendFailIfNoSpace()){ 357 throw new javax.jms.ResourceAllocationException ("Usage Manager memory limit reached"); 358 } 359 360 if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) { 363 synchronized( messagesWaitingForSpace ) { 364 messagesWaitingForSpace.add(new Runnable () { 365 public void run() { 366 367 if(message.isExpired()){ 369 if (log.isDebugEnabled()) { 370 log.debug("Expired message: " + message); 371 } 372 373 if( !message.isResponseRequired() ) { 374 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); 375 context.getConnection().dispatchAsync(ack); 376 } 377 return; 378 } 379 380 381 try { 382 doMessageSend(producerExchange, message); 383 } catch (Exception e) { 384 if( message.isResponseRequired() ) { 385 ExceptionResponse response = new ExceptionResponse(e); 386 response.setCorrelationId(message.getCommandId()); 387 context.getConnection().dispatchAsync(response); 388 } 389 } 390 } 391 }); 392 393 if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { 395 sendMessagesWaitingForSpaceTask.run(); 397 } 398 context.setDontSendReponse(true); 399 return; 400 } 401 402 } else { 403 404 while( !usageManager.waitForSpace(1000) ) { 407 if( context.getStopping().get() ) 408 throw new IOException ("Connection closed, send aborted."); 409 } 410 411 if(message.isExpired()){ 414 if (log.isDebugEnabled()) { 415 log.debug("Expired message: " + message); 416 } 417 return; 418 } 419 } 420 } 421 doMessageSend(producerExchange, message); 422 } 423 424 private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException , Exception { 425 final ConnectionContext context = producerExchange.getConnectionContext(); 426 message.setRegionDestination(this); 427 if(store!=null&&message.isPersistent()){ 428 store.addMessage(context,message); 429 } 430 if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { 431 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); 432 context.getConnection().dispatchAsync(ack); 433 } 434 if(context.isInTransaction()){ 435 message.incrementReferenceCount(); 438 context.getTransaction().addSynchronization(new Synchronization(){ 439 public void afterCommit() throws Exception { 440 try { 441 if(message.isExpired()){ 444 if (log.isDebugEnabled()) { 446 log.debug("Expired message: " + message); 447 } 448 return; 449 } 450 sendMessage(context,message); 451 } finally { 452 message.decrementReferenceCount(); 453 } 454 } 455 456 @Override 457 public void afterRollback() throws Exception { 458 message.decrementReferenceCount(); 459 } 460 }); 461 }else{ 462 sendMessage(context,message); 464 } 465 } 466 467 public void dispose(ConnectionContext context) throws IOException { 468 if (store != null) { 469 store.removeAllMessages(context); 470 } 471 destinationStatistics.setParent(null); 472 } 473 474 public void dropEvent() { 475 dropEvent(false); 476 } 477 478 public void dropEvent(boolean skipGc){ 479 destinationStatistics.getMessages().decrement(); 481 synchronized(pagedInMessages){ 482 garbageSize++; 483 } 484 if(!skipGc&&garbageSize>garbageSizeBeforeCollection){ 485 gc(); 486 } 487 try{ 488 taskRunner.wakeup(); 489 }catch(InterruptedException e){ 490 log.warn("Task Runner failed to wakeup ",e); 491 } 492 } 493 494 public void gc() { 495 synchronized (pagedInMessages) { 496 for(Iterator i = pagedInMessages.iterator(); i.hasNext();) { 497 QueueMessageReference node = (QueueMessageReference) i.next(); 499 if (node.isDropped()) { 500 garbageSize--; 501 i.remove(); 502 continue; 503 } 504 } 505 } 506 } 507 508 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { 509 if (store != null && node.isPersistent()) { 510 if (ack.getMessageCount() > 0) { 514 MessageAck a = new MessageAck(); 516 ack.copy(a); 517 ack = a; 518 ack.setFirstMessageId(node.getMessageId()); 520 ack.setLastMessageId(node.getMessageId()); 521 ack.setMessageCount(1); 522 } 523 store.removeMessage(context, ack); 524 } 525 } 526 527 Message loadMessage(MessageId messageId) throws IOException { 528 Message msg = store.getMessage(messageId); 529 if (msg != null) { 530 msg.setRegionDestination(this); 531 } 532 return msg; 533 } 534 535 public String toString() { 536 int size = 0; 537 synchronized (messages) { 538 size = messages.size(); 539 } 540 return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage() 541 + "%, size=" + size + ", in flight groups=" + messageGroupOwners; 542 } 543 544 public void start() throws Exception { 545 started = true; 546 if (usageManager != null) { 547 usageManager.start(); 548 } 549 messages.start(); 550 doPageIn(false); 551 } 552 553 public void stop() throws Exception { 554 started = false; 555 if( taskRunner!=null ) { 556 taskRunner.shutdown(); 557 } 558 if(messages!=null){ 559 messages.stop(); 560 } 561 if (usageManager != null) { 562 usageManager.stop(); 563 } 564 } 565 566 public ActiveMQDestination getActiveMQDestination() { 569 return destination; 570 } 571 572 public String getDestination() { 573 return destination.getPhysicalName(); 574 } 575 576 public UsageManager getUsageManager() { 577 return usageManager; 578 } 579 580 public DestinationStatistics getDestinationStatistics() { 581 return destinationStatistics; 582 } 583 584 public MessageGroupMap getMessageGroupOwners() { 585 if (messageGroupOwners == null) { 586 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); 587 } 588 return messageGroupOwners; 589 } 590 591 public DispatchPolicy getDispatchPolicy() { 592 return dispatchPolicy; 593 } 594 595 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 596 this.dispatchPolicy = dispatchPolicy; 597 } 598 599 public DeadLetterStrategy getDeadLetterStrategy() { 600 return deadLetterStrategy; 601 } 602 603 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 604 this.deadLetterStrategy = deadLetterStrategy; 605 } 606 607 public MessageGroupMapFactory getMessageGroupMapFactory() { 608 return messageGroupMapFactory; 609 } 610 611 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 612 this.messageGroupMapFactory = messageGroupMapFactory; 613 } 614 615 public String getName() { 616 return getActiveMQDestination().getPhysicalName(); 617 } 618 619 public PendingMessageCursor getMessages(){ 620 return this.messages; 621 } 622 public void setMessages(PendingMessageCursor messages){ 623 this.messages=messages; 624 } 625 626 private MessageReference createMessageReference(Message message) { 629 MessageReference result = new IndirectMessageReference(this, store, message); 630 result.decrementReferenceCount(); 631 return result; 632 } 633 634 635 private int calcHighestSubscriptionPriority() { 636 int rc = Integer.MIN_VALUE; 637 synchronized (consumers) { 638 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 639 Subscription sub = (Subscription) iter.next(); 640 if (sub.getConsumerInfo().getPriority() > rc) { 641 rc = sub.getConsumerInfo().getPriority(); 642 } 643 } 644 } 645 return rc; 646 } 647 648 public MessageStore getMessageStore() { 649 return store; 650 } 651 652 public Message[] browse() { 653 ArrayList l = new ArrayList (); 654 try{ 655 doPageIn(true); 656 }catch(Exception e){ 657 log.error("caught an exception browsing " + this,e); 658 } 659 synchronized(pagedInMessages) { 660 for (Iterator i = pagedInMessages.iterator();i.hasNext();) { 661 MessageReference r = (MessageReference)i.next(); 662 r.incrementReferenceCount(); 663 try { 664 Message m = r.getMessage(); 665 if (m != null) { 666 l.add(m); 667 } 668 }catch(IOException e){ 669 log.error("caught an exception browsing " + this,e); 670 } 671 finally { 672 r.decrementReferenceCount(); 673 } 674 } 675 } 676 synchronized(messages){ 677 try{ 678 messages.reset(); 679 while(messages.hasNext()){ 680 try{ 681 MessageReference r=messages.next(); 682 r.incrementReferenceCount(); 683 try{ 684 Message m=r.getMessage(); 685 if(m!=null){ 686 l.add(m); 687 } 688 }finally{ 689 r.decrementReferenceCount(); 690 } 691 }catch(IOException e){ 692 log.error("caught an exception brwsing "+this,e); 693 } 694 } 695 }finally{ 696 messages.release(); 697 } 698 } 699 700 return (Message[]) l.toArray(new Message[l.size()]); 701 } 702 703 public Message getMessage(String messageId){ 704 synchronized(messages){ 705 try{ 706 messages.reset(); 707 while(messages.hasNext()){ 708 try{ 709 MessageReference r=messages.next(); 710 if(messageId.equals(r.getMessageId().toString())){ 711 r.incrementReferenceCount(); 712 try{ 713 Message m=r.getMessage(); 714 if(m!=null){ 715 return m; 716 } 717 }finally{ 718 r.decrementReferenceCount(); 719 } 720 break; 721 } 722 }catch(IOException e){ 723 log.error("got an exception retrieving message "+messageId); 724 } 725 } 726 }finally{ 727 messages.release(); 728 } 729 } 730 return null; 731 } 732 733 public void purge() throws Exception { 734 735 pageInMessages(); 736 737 synchronized (pagedInMessages) { 738 ConnectionContext c = createConnectionContext(); 739 for(Iterator i = pagedInMessages.iterator(); i.hasNext();){ 740 try { 741 QueueMessageReference r = (QueueMessageReference) i.next(); 742 743 if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { 745 MessageAck ack = new MessageAck(); 746 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 747 ack.setDestination(destination); 748 ack.setMessageID(r.getMessageId()); 749 acknowledge(c, null, ack, r); 750 r.drop(); 751 dropEvent(true); 752 } 753 } 754 catch (IOException e) { 755 } 756 } 757 758 gc(); 761 } 762 } 763 764 765 768 public boolean removeMessage(String messageId) throws Exception { 769 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0; 770 } 771 772 777 public int removeMatchingMessages(String selector) throws Exception { 778 return removeMatchingMessages(selector, -1); 779 } 780 781 786 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 787 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages); 788 } 789 790 795 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { 796 pageInMessages(); 797 int counter = 0; 798 synchronized (pagedInMessages) { 799 ConnectionContext c = createConnectionContext(); 800 for(Iterator i = pagedInMessages.iterator(); i.hasNext();) { 801 IndirectMessageReference r = (IndirectMessageReference) i.next(); 802 if (filter.evaluate(c, r)) { 803 removeMessage(c, r); 804 if (++counter >= maximumMessages && maximumMessages > 0) { 805 break; 806 } 807 808 } 809 } 810 } 811 return counter; 812 } 813 814 817 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception { 818 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0; 819 } 820 821 826 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { 827 return copyMatchingMessagesTo(context, selector, dest, -1); 828 } 829 830 835 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception { 836 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages); 837 } 838 839 844 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { 845 pageInMessages(); 846 int counter = 0; 847 synchronized (pagedInMessages) { 848 for(Iterator i = pagedInMessages.iterator(); i.hasNext();) { 849 MessageReference r = (MessageReference) i.next(); 850 if (filter.evaluate(context, r)) { 851 r.incrementReferenceCount(); 852 try { 853 Message m = r.getMessage(); 854 BrokerSupport.resend(context, m, dest); 855 if (++counter >= maximumMessages && maximumMessages > 0) { 856 break; 857 } 858 } 859 finally { 860 r.decrementReferenceCount(); 861 } 862 } 863 } 864 } 865 return counter; 866 } 867 868 871 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception { 872 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0; 873 } 874 875 880 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { 881 return moveMatchingMessagesTo(context, selector, dest, -1); 882 } 883 884 887 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception { 888 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages); 889 } 890 891 894 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { 895 pageInMessages(); 896 int counter = 0; 897 synchronized (pagedInMessages) { 898 for(Iterator i = pagedInMessages.iterator(); i.hasNext();) { 899 IndirectMessageReference r = (IndirectMessageReference) i.next(); 900 if (filter.evaluate(context, r)) { 901 if (lockMessage(r)) { 903 r.incrementReferenceCount(); 904 try { 905 Message m = r.getMessage(); 906 BrokerSupport.resend(context, m, dest); 907 removeMessage(context, r); 908 if (++counter >= maximumMessages && maximumMessages > 0) { 909 break; 910 } 911 } 912 finally { 913 r.decrementReferenceCount(); 914 } 915 } 916 } 917 } 918 } 919 return counter; 920 } 921 922 926 public boolean iterate(){ 927 try{ 928 pageInMessages(false); 929 }catch(Exception e){ 930 log.error("Failed to page in more queue messages ",e); 931 } 932 return false; 933 } 934 935 protected MessageReferenceFilter createMessageIdFilter(final String messageId) { 936 return new MessageReferenceFilter() { 937 public boolean evaluate(ConnectionContext context, MessageReference r) { 938 return messageId.equals(r.getMessageId().toString()); 939 } 940 }; 941 } 942 943 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException { 944 final BooleanExpression selectorExpression = new SelectorParser().parse(selector); 945 946 return new MessageReferenceFilter() { 947 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException { 948 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext(); 949 950 messageEvaluationContext.setMessageReference(r); 951 if (messageEvaluationContext.getDestination() == null) { 952 messageEvaluationContext.setDestination(getActiveMQDestination()); 953 } 954 955 return selectorExpression.matches(messageEvaluationContext); 956 } 957 }; 958 } 959 960 961 protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException { 962 MessageAck ack = new MessageAck(); 963 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 964 ack.setDestination(destination); 965 ack.setMessageID(r.getMessageId()); 966 acknowledge(c, null, ack, r); 967 r.drop(); 968 dropEvent(); 969 } 970 971 protected boolean lockMessage(IndirectMessageReference r) { 972 return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER); 973 } 974 975 protected ConnectionContext createConnectionContext() { 976 ConnectionContext answer = new ConnectionContext(); 977 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 978 return answer; 979 } 980 981 982 private void sendMessage(final ConnectionContext context,Message msg) throws Exception { 983 synchronized(messages){ 984 messages.addMessageLast(msg); 985 } 986 destinationStatistics.getEnqueues().increment(); 987 destinationStatistics.getMessages().increment(); 988 pageInMessages(false); 989 } 990 991 private List doPageIn() throws Exception { 992 return doPageIn(true); 993 } 994 995 private List doPageIn(boolean force) throws Exception { 996 final int toPageIn=maximumPagedInMessages-pagedInMessages.size(); 997 List result=null; 998 if((force||!consumers.isEmpty())&&toPageIn>0){ 999 messages.setMaxBatchSize(toPageIn); 1000 try{ 1001 dispatchValve.increment(); 1002 int count=0; 1003 result=new ArrayList (toPageIn); 1004 synchronized(messages){ 1005 try{ 1006 messages.reset(); 1007 while(messages.hasNext()&&count<toPageIn){ 1008 MessageReference node=messages.next(); 1009 messages.remove(); 1010 if(!node.isExpired()){ 1011 node=createMessageReference(node.getMessage()); 1012 result.add(node); 1013 count++; 1014 }else{ 1015 if (log.isDebugEnabled()) { 1016 log.debug("Expired message: " + node); 1017 } 1018 } 1019 } 1020 }finally{ 1021 messages.release(); 1022 } 1023 } 1024 synchronized(pagedInMessages){ 1025 pagedInMessages.addAll(result); 1026 } 1027 }finally{ 1028 queueMsgConext.clear(); 1029 dispatchValve.decrement(); 1030 } 1031 } 1032 return result; 1033 } 1034 1035 private void doDispatch(List list) throws Exception { 1036 if(list!=null&&!list.isEmpty()){ 1037 try{ 1038 dispatchValve.increment(); 1039 for(int i=0;i<list.size();i++){ 1040 MessageReference node=(MessageReference)list.get(i); 1041 queueMsgConext.setDestination(destination); 1042 queueMsgConext.setMessageReference(node); 1043 dispatchPolicy.dispatch(node,queueMsgConext,consumers); 1044 } 1045 }finally{ 1046 queueMsgConext.clear(); 1047 dispatchValve.decrement(); 1048 } 1049 } 1050 } 1051 1052 private void pageInMessages() throws Exception { 1053 pageInMessages(true); 1054 } 1055 private void pageInMessages(boolean force) throws Exception { 1056 synchronized(doDispatchMutex) { 1057 doDispatch(doPageIn(force)); 1058 } 1059 } 1060 1061 1062} 1063 | Popular Tags |