1 24 package org.mr.kernel.services.queues; 25 26 import java.io.IOException ; 27 import java.util.ArrayList ; 28 import java.util.Enumeration ; 29 import java.util.HashMap ; 30 import java.util.Iterator ; 31 import java.util.LinkedList ; 32 import java.util.List ; 33 import java.util.Map ; 34 35 import javax.jms.JMSException ; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.mr.MantaAgent; 40 import org.mr.MantaAgentConstants; 41 import org.mr.MantaException; 42 import org.mr.api.jms.MantaConnection; 43 import org.mr.api.jms.MantaMessage; 44 import org.mr.api.jms.MantaTextMessage; 45 import org.mr.core.net.MantaAddress; 46 import org.mr.core.persistent.PersistentMap; 47 import org.mr.core.protocol.MantaBusMessage; 48 import org.mr.core.protocol.MantaBusMessageConsts; 49 import org.mr.core.protocol.MantaBusMessageUtil; 50 import org.mr.core.util.PrioritizedList; 51 import org.mr.core.util.SystemTime; 52 import org.mr.core.util.byteable.ByteableList; 53 import org.mr.kernel.delivery.DeliveryAckListener; 54 import org.mr.kernel.delivery.DeliveryAckNotifier; 55 import org.mr.kernel.delivery.PostOffice; 56 import org.mr.kernel.services.MantaService; 57 import org.mr.kernel.services.ServiceActorControlCenter; 58 import org.mr.kernel.services.ServiceActorStatusListener; 59 import org.mr.kernel.services.ServiceConsumer; 60 import org.mr.kernel.services.ServiceProducer; 61 import org.mr.kernel.services.DeadLetterHandler; 62 63 70 81 82 class ForeignQueueService extends AbstractQueueService implements DeliveryAckListener, ServiceActorStatusListener, QueueServiceMBean { 83 public static final int throttleDelay = 5; 89 90 protected Log log; 91 private LinkedList sentMessages; 93 private QueueSubscriberManager subscriberManager; 95 private boolean active; 96 private QueueDispatcher dispatch; 97 private LinkedList queueListeners; 99 104 private ServiceConsumer currentServiceConsumer = null; 105 private Object currentServiceConsumerLock = null; 106 107 private QueueMaster queueMaster; 108 109 private boolean iAmQueueMaster = false; 110 111 private Object queueMasterLockObject = new Object (); 112 113 private DeliveryAckNotifier ackNotifier; 114 115 private static long maxQueueSize = Long.MAX_VALUE; 116 private boolean pause; 120 private Object pauseLockObject = new Object (); 121 private boolean isTempQueue = false; 122 private ForeignQueueImpl impl; 123 124 129 public ForeignQueueService(String serviceName, ForeignQueueImpl impl) { 130 super(serviceName); 131 this.impl = impl; 132 log = LogFactory.getLog("QueueService"); 133 isTempQueue = getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX); 134 subscriberManager = new QueueSubscriberManager(this); 135 ackNotifier = MantaAgent.getInstance().getSingletonRepository().getDeliveryAckNotifier(); 136 maxQueueSize = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getLongProperty("jms.max_queue_size", 1000000); 137 overflowStrategy = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getIntProperty("jms.queue_overflow_strategy", 2); 138 if (!isTempQueue) { 139 } 147 148 149 } 150 151 154 public boolean isPaused() { 155 return this.pause; 156 } 157 158 163 public void pause() { 164 synchronized (pauseLockObject) { 165 this.pause = true; 166 } 167 } 168 169 172 public void resume() { 173 synchronized (pauseLockObject) { 174 this.pause = false; 175 pauseLockObject.notifyAll(); 176 } 177 } 178 179 183 public void purge() { 184 } 192 193 198 public void close() throws MantaException { 199 if (dispatch != null) { 200 dispatch.stopIt(); 201 } 202 if (iAmQueueMaster) { 203 MantaAgent.getInstance().recallService(this.getQueueMaster()); 204 ServiceActorControlCenter.removeConsumerStatusListeners(this); 205 synchronized (this.impl) { 207 Iterator i = sentMessages.iterator(); 208 while (i.hasNext()) { 209 MantaBusMessage msg = (MantaBusMessage) i.next(); 210 ackNotifier.removeTempListener(msg); 211 ackNotifier = null; 212 } 213 214 } 215 } 216 this.active = false; 217 purge(); 218 219 } 220 221 224 public List examineMessages() { 225 System.err.println("ForeignQueueService: not implemented: " + 226 "examineMessages"); 227 231 238 251 return new ArrayList (); 268 } 269 270 272 284 286 289 public long getMaxQueueSize() { 290 return maxQueueSize; 291 } 292 293 296 public byte getServiceType() { 297 return super.SERVICE_TYPE_QUEUE; 298 } 299 300 301 305 public synchronized void active() { 306 if (!this.active) { 308 currentServiceConsumerLock = new Object (); 309 boolean persistent = this.getPersistentMode() == MantaAgentConstants.PERSISTENT; 310 316 this.impl.init(); 318 sentMessages = new LinkedList (); 319 queueListeners = new LinkedList (); 320 active = true; 321 ServiceActorControlCenter.addConsumerStatusListeners(this); 322 recover(); 323 dispatch = new QueueDispatcher(this); 324 dispatch.start(); 325 } 326 327 } 329 334 public void waitForListeners() throws InterruptedException { 335 synchronized (queueListeners) { 336 if (queueListeners.size() > 0) 337 return; 338 queueListeners.wait(); 339 return; 340 } 341 } 342 343 344 350 protected void registerReceiverToQueue(ServiceConsumer consumer, long numberOfReceive) { 351 352 QueueReceiver receiver = new QueueReceiver(consumer, numberOfReceive); 353 if (numberOfReceive == 0) { 354 doReceiveNoWait(receiver); 355 356 } else { 357 doHandleReceiver(receiver); 358 } 359 361 } 363 private void doHandleReceiver(QueueReceiver receiver) { 364 synchronized (queueListeners) { 365 queueListeners.add(receiver); 366 queueListeners.notifyAll(); 367 } 368 } 370 375 private void doReceiveNoWait(QueueReceiver receiver) { 376 MantaBusMessage msg = null; 377 if (!pause) { 378 synchronized (currentServiceConsumerLock) { 379 if (currentServiceConsumer == null || currentServiceConsumer.getId().equals(receiver.getConsumer().getId())) { 380 currentServiceConsumer = receiver.getConsumer(); 381 synchronized (this.impl) { 383 msg = this.impl.receiveNoWait(); 384 msg.setSource(this.queueMaster); 385 System.out.println("doReceiveNoWait: master = " + 386 queueMaster); 387 } 401 } 402 } 403 } 405 406 if (msg == null) { 407 msg = MantaBusMessage.getInstance(); 408 msg.setPayload(null); 409 msg.setSource(this.queueMaster); 410 msg.setPriority(MantaAgentConstants.HIGH); 411 msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 412 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 413 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_IS_EMPTY, MantaBusMessageConsts.HEADER_VALUE_TRUE); 414 415 } 416 if (ackNotifier != null) { 417 ackNotifier.setTempListener(msg, this); 418 } 419 420 receiver.receive(msg); 421 } 423 424 429 protected void unregisterConsumerToQueue(ServiceConsumer consumer) { 430 synchronized (this.impl) { 433 synchronized (currentServiceConsumerLock) { 434 synchronized (queueListeners) { 435 Iterator receivers = queueListeners.iterator(); 436 437 while (receivers.hasNext()) { 438 QueueReceiver rec = (QueueReceiver) receivers.next(); 439 if (rec.getConsumer().getId().equals(consumer.getId())) { 440 receivers.remove(); 441 } 443 } 444 445 if (currentServiceConsumer != null && 446 currentServiceConsumer.getId().equals(consumer.getId())) { 447 rollback(); 448 } 449 450 451 } 452 } 453 } 454 } 456 457 public void unregisterReceiverToQueue(ServiceConsumer consumer) { 458 synchronized (currentServiceConsumerLock) { 459 synchronized (queueListeners) { 460 Iterator receivers = queueListeners.iterator(); 461 462 while (receivers.hasNext()) { 463 QueueReceiver rec = (QueueReceiver) receivers.next(); 464 if (rec.getConsumer().getId().equals(consumer.getId())) { 465 receivers.remove(); 466 } } 470 } 471 } 472 473 } 475 476 private void rollback() { 477 478 481 Iterator sentIter = sentMessages.iterator(); 484 boolean shouldNotify = false; 485 while (sentIter.hasNext()) { 486 try { 487 MantaBusMessage rollBackMsg = (MantaBusMessage) sentIter.next(); 488 if (ackNotifier != null) { 489 ackNotifier.removeTempListener(rollBackMsg); 490 } 491 this.impl.sendToHead(PostOffice. 492 prepareMessageShallowCopy(rollBackMsg)); 493 shouldNotify = true; 494 } catch (IOException e) { 495 log.error("Rollback error", e); 496 497 } 498 } 499 if (shouldNotify) { 501 this.impl.notifyAll(); 502 } 503 sentMessages.clear(); 504 505 synchronized (currentServiceConsumerLock) { 506 currentServiceConsumerLock.notifyAll(); 507 currentServiceConsumer = null; 508 509 } 510 } 511 512 517 protected void sendQueueCopy(ServiceConsumer consumer) { 518 ByteableList underlineCopy = new ByteableList(); 519 underlineCopy.addAll(this.impl.getCopy()); 520 521 int size = underlineCopy.size(); 522 for (int i = 0; i < size; i++) { 523 MantaBusMessage msg = (MantaBusMessage) underlineCopy.get(i); 524 if (!checkValidMessage(msg, consumer)) { 525 underlineCopy.remove(i); 527 i--; 528 size--; 529 } 530 } 531 532 QueueReceiver receiver = new QueueReceiver(consumer, 0); 533 534 MantaBusMessage msg = MantaBusMessage.getInstance(); 535 msg.setPayload(underlineCopy); 536 msg.setPriority(MantaAgentConstants.HIGH); 537 msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 538 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 539 540 MantaAddress address = new ServiceProducer(MantaAgent.getInstance().getAgentName(), this.getServiceName(), MantaService.SERVICE_TYPE_QUEUE); 541 msg.setSource(address); 542 543 receiver.receive(msg); 544 545 } 547 548 551 public boolean isActive() { 552 return active; 553 } 554 555 558 public QueueMaster getQueueMaster() { 559 return queueMaster; 560 } 561 562 565 public void setQueueMaster(QueueMaster queueMaster) { 566 synchronized (queueMasterLockObject) { 567 if (this.queueMaster != null) { 568 MantaAgent.getInstance().getSingletonRepository().getPostOffice().handleCoordinatorDown(this.queueMaster, queueMaster); 570 } 571 this.queueMaster = queueMaster; 572 if (queueMaster != null) { 573 queueMasterLockObject.notifyAll(); 574 if (queueMaster.getAgentName().equals(MantaAgent.getInstance().getAgentName())) { 575 iAmQueueMaster = true; 576 } 577 subscriberManager.queueCoordinatorFound(queueMaster); 578 } else { 579 if (iAmQueueMaster) { 580 iAmQueueMaster = false; 581 } else if (isTempQueue) { 582 try { 583 MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager().closeQueue(getServiceName()); 584 } catch (MantaException e) { 585 log.error("Failed to close temp queue", e); 586 } 587 } 588 } 589 590 } 591 } 592 593 public boolean amIQueueMaster() { 594 595 return iAmQueueMaster; 596 597 } 598 599 public void waitForQueueMaster(long timeToWait) throws InterruptedException { 600 synchronized (queueMasterLockObject) { 601 if (queueMaster == null) { 602 queueMasterLockObject.wait(timeToWait); 603 604 } } } 607 608 public QueueSubscriberManager getSubscriberManager() { 609 return subscriberManager; 610 } 611 612 613 616 public void doDequeue() throws InterruptedException { 617 waitForMessages(); 619 QueueReceiver receiver = findEligibleReceiver(); 620 621 if (receiver == null) { 622 synchronized (this.impl) { 624 synchronized (currentServiceConsumerLock) { 625 if (currentServiceConsumer != null) { 626 if (!ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)) { 627 rollback(); 628 } 629 } 630 } 631 } 632 Thread.sleep(100); 633 return; 634 } 635 boolean fed; 636 synchronized (this.impl) { 637 fed = feedReceiver(receiver); 638 } 639 if (receiver.getNumberOfReceive() > 0) { 641 synchronized (queueListeners) { 642 if (ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)) { 643 queueListeners.addLast(receiver); 644 645 } else { 646 Thread.sleep(1500); 649 if (ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)) { 650 queueListeners.addLast(receiver); 651 } 655 } 656 } 657 } 661 if (fed) { 662 synchronized (this.impl) { 663 if (sentMessages.size() > 0) { 664 synchronized (currentServiceConsumerLock) { 665 currentServiceConsumer = receiver.getConsumer(); 666 } 667 } 668 } 669 670 } else { 671 Thread.sleep(100); 673 } 674 } 676 private void checkNotPause() { 677 synchronized (pauseLockObject) { 678 if (this.pause) { 679 try { 680 pauseLockObject.wait(); 681 } catch (InterruptedException e) { 682 } 684 } 685 } 686 687 688 } 689 690 private QueueReceiver findEligibleReceiver() throws InterruptedException { 691 waitForListeners(); 692 checkNotPause(); 693 synchronized (currentServiceConsumerLock) { 694 695 synchronized (queueListeners) { 696 if (currentServiceConsumer == null) { 697 try { 698 return (QueueReceiver) queueListeners.removeFirst(); 699 } catch (Exception e) { 700 return null; 701 } 702 703 } 704 705 Iterator receivers = queueListeners.iterator(); 706 while (receivers.hasNext()) { 707 QueueReceiver receiver = (QueueReceiver) receivers.next(); 708 if (receiver.getConsumer().getId().equals(currentServiceConsumer.getId())) { 709 receivers.remove(); 710 return receiver; 711 } 712 } 713 } } 715 return null; 716 717 } 718 719 private void waitForMessages() throws InterruptedException { 720 this.impl.waitForMessages(); 721 } 722 723 724 732 private boolean feedReceiver(QueueReceiver receiver) { 733 MantaBusMessage msg = null; 734 735 msg = this.impl.receiveNoWait(); 736 msg.setSource(this.queueMaster); 737 if (msg != null) { 738 if (msg.getValidUntil() < SystemTime.gmtCurrentTimeMillis()) { 739 if (this.log.isInfoEnabled()) { 740 log.info("Not sending message " + msg + 741 " msg.getValidUntil()=" + msg.getValidUntil() + 742 " SystemTime.gmtCurrentTimeMillis()=" + 743 SystemTime.gmtCurrentTimeMillis() + "."); 744 } 745 DeadLetterHandler.HandleDeadMessage(msg); 746 return false; 747 } else { 748 sentMessages.addFirst(msg); 749 if (ackNotifier != null) { 750 ackNotifier.setTempListener(msg, this); 751 } 752 receiver.receive(msg); 753 return true; 754 } 755 } 756 757 return false; 758 } 759 760 protected void enqueue(MantaBusMessage enqueuedMessage, boolean persistent) { 761 synchronized (this.impl) { 762 this.impl.send(enqueuedMessage); 763 this.impl.notifyAll(); 764 } 765 } 766 767 772 public boolean isOverflow() { 773 return false; 774 } 775 776 779 public void gotAck(MantaBusMessage msg, MantaAddress source) { 780 synchronized (this.impl) { 781 sentMessages.remove(msg); 782 if (sentMessages.size() == 0) { 783 synchronized (currentServiceConsumerLock) { 784 if (currentServiceConsumer != null) { 785 currentServiceConsumerLock.notifyAll(); 786 currentServiceConsumer = null; 787 } 788 } 789 } 790 } 791 } 793 797 public void gotAckReject(MantaBusMessage msg, MantaAddress source) { 798 synchronized (this.impl) { 799 msg.setDeliveryCount(msg.getDeliveryCount() + 1); 800 this.impl.sendToHead(msg); 801 sentMessages.remove(msg); 802 if (sentMessages.size() == 0) { 803 synchronized (currentServiceConsumerLock) { 804 if (currentServiceConsumer != null) { 805 currentServiceConsumerLock.notifyAll(); 806 currentServiceConsumer = null; 807 } 808 } 809 } 810 } 811 } 812 813 816 public synchronized void recover() { 817 } 819 820 821 public void handleConsumerUp(ServiceConsumer consumer) { 822 } 824 825 826 public void handleConsumerDown(ServiceConsumer consumer) { 827 unregisterConsumerToQueue(consumer); 828 } 829 830 831 public int getUnsentCount() { 832 return 0; 834 } 835 836 837 public String toString() { 838 StringBuffer buff = new StringBuffer (); 839 buff.append(" service{"); 840 buff.append(" service name="); 841 buff.append(logicalName); 842 buff.append(" serviceType="); 843 buff.append(getServiceType()); 844 buff.append(" consumers="); 845 buff.append(consumers); 846 buff.append(" producers="); 847 buff.append(producers); 848 buff.append(" coordinator="); 849 buff.append(queueMaster); 850 buff.append(" persistentMode="); 851 buff.append(super.getPersistentMode()); 852 buff.append(" }"); 853 return buff.toString(); 854 } 855 856 857 public boolean isTempQueue() { 858 return isTempQueue; 859 } 860 } 861 | Popular Tags |