1 46 package org.mr.kernel.services.queues; 47 48 import java.io.IOException ; 49 import java.util.ArrayList ; 50 import java.util.Enumeration ; 51 import java.util.HashMap ; 52 import java.util.Iterator ; 53 import java.util.LinkedList ; 54 import java.util.List ; 55 import java.util.Map ; 56 57 import javax.jms.JMSException ; 58 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 import org.mr.MantaAgent; 62 import org.mr.MantaAgentConstants; 63 import org.mr.MantaException; 64 import org.mr.api.jms.MantaConnection; 65 import org.mr.api.jms.MantaMessage; 66 import org.mr.api.jms.MantaTextMessage; 67 import org.mr.core.net.MantaAddress; 68 import org.mr.core.persistent.PersistentMap; 69 import org.mr.core.protocol.MantaBusMessage; 70 import org.mr.core.protocol.MantaBusMessageConsts; 71 import org.mr.core.protocol.MantaBusMessageUtil; 72 import org.mr.core.util.PrioritizedList; 73 import org.mr.core.util.SystemTime; 74 import org.mr.core.util.byteable.ByteableList; 75 import org.mr.kernel.delivery.DeliveryAckListener; 76 import org.mr.kernel.delivery.DeliveryAckNotifier; 77 import org.mr.kernel.delivery.PostOffice; 78 import org.mr.kernel.services.DeadLetterHandler; 79 import org.mr.kernel.services.MantaService; 80 import org.mr.kernel.services.ServiceActorControlCenter; 81 import org.mr.kernel.services.ServiceActorStatusListener; 82 import org.mr.kernel.services.ServiceConsumer; 83 import org.mr.kernel.services.ServiceProducer; 84 85 92 class QueueService extends AbstractQueueService implements DeliveryAckListener, ServiceActorStatusListener, QueueServiceMBean{ 93 public static final int throttleDelay = 5; 99 100 protected Log log; 101 private PrioritizedList unsentMessages; 102 private LinkedList sentMessages; 103 private Map savedMessages; 104 private QueueSubscriberManager subscriberManager; 105 private boolean active; 106 private QueueDispatcher dispatch; 107 private LinkedList queueListeners ; 109 114 private ServiceConsumer currentServiceConsumer = null; 115 private Object currentServiceConsumerLock = null; 116 117 private QueueMaster queueMaster; 118 119 private boolean iAmQueueMaster = false; 120 121 private Object queueMasterLockObject = new Object (); 122 123 private DeliveryAckNotifier ackNotifier; 124 125 private static long maxQueueSize = Long.MAX_VALUE ; 126 private boolean pause; 130 private Object pauseLockObject = new Object (); 131 private boolean isTempQueue = false; 132 133 134 138 public QueueService(String serviceName ) { 139 super(serviceName); 140 log=LogFactory.getLog("QueueService"); 141 isTempQueue = getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX); 142 subscriberManager = new QueueSubscriberManager(this); 143 ackNotifier = MantaAgent.getInstance().getSingletonRepository().getDeliveryAckNotifier(); 144 maxQueueSize = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getLongProperty("jms.max_queue_size", 1000000); 145 overflowStrategy = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getIntProperty("jms.queue_overflow_strategy",2); 146 if(!isTempQueue){ 147 try { 148 MantaAgent.getInstance().getSingletonRepository().getMantaJMXManagment().addManagedObject(this, "MantaRay:queue="+this.getServiceName()); 149 } catch (MantaException e) { 150 if(log.isErrorEnabled()){ 151 log.error("Could not create the JMX MBean 'MantaRay:queue="+this.getServiceName()+"'.",e); 152 } 153 } 154 } 155 156 157 } 158 159 162 public boolean isPaused(){ 163 return this.pause; 164 } 165 166 171 public void pause(){ 172 synchronized(pauseLockObject){ 173 this.pause = true; 174 } 175 } 176 177 180 public void resume(){ 181 synchronized(pauseLockObject){ 182 this.pause = false; 183 pauseLockObject.notifyAll(); 184 } 185 } 186 187 191 public void purge(){ 192 if(unsentMessages != null) 193 this.unsentMessages.clear(); 194 if(sentMessages != null) 195 this.sentMessages.clear(); 196 if(savedMessages != null) 197 this.savedMessages.clear(); 198 } 199 200 205 public void close() throws MantaException{ 206 if(dispatch !=null){ 207 dispatch.stopIt(); 208 } 209 if(iAmQueueMaster){ 210 MantaAgent.getInstance().recallService(this.getQueueMaster()); 211 ServiceActorControlCenter.removeConsumerStatusListeners(this); 212 synchronized (unsentMessages) { 213 Iterator i = sentMessages.iterator(); 214 while(i.hasNext()){ 215 MantaBusMessage msg = (MantaBusMessage) i.next(); 216 ackNotifier.removeTempListener(msg); 217 ackNotifier=null; 218 } 219 220 } 221 } 222 this.active = false; 223 purge(); 224 225 } 226 227 231 public List examineMessages(){ 232 ArrayList list = new ArrayList (); 233 ByteableList underlineCopy =new ByteableList(); 234 underlineCopy.addAll(unsentMessages) ; 235 236 int size = underlineCopy.size(); 237 for (int i = 0; i < size; i++) { 238 HashMap details = new HashMap (); 239 MantaBusMessage msg =(MantaBusMessage) underlineCopy.get(i); 240 MantaMessage payload = (MantaMessage) msg.getPayload(); 241 try { 242 243 String id = payload.getJMSMessageID(); 244 details.put(QueueServiceMBean.MESSAGE_ID, id); 245 if(payload instanceof MantaTextMessage){ 246 details.put(QueueServiceMBean.MESSAGE_TEXT,((MantaTextMessage)payload).getText()); 247 } 248 HashMap properties = new HashMap (); 249 Enumeration propNames = payload.getPropertyNames(); 250 while(propNames.hasMoreElements()){ 251 String key = (String ) propNames.nextElement(); 252 properties.put(key, payload.getStringProperty(key)); 253 } 254 details.put(QueueServiceMBean.MESSAGE_PROPERTIES, properties); 255 256 HashMap headers = new HashMap (); 257 headers.put("JMSCorrelationID",String.valueOf(payload.getJMSCorrelationID())); 258 headers.put("JMSDestination",String.valueOf(payload.getJMSDestination())); 259 headers.put("JMSReplyTo",String.valueOf(payload.getJMSReplyTo())); 260 headers.put("JMSType",String.valueOf(payload.getJMSType())); 261 headers.put("JMSDeliveryMode",String.valueOf(payload.getJMSDeliveryMode()) ); 262 headers.put("JMSExpiration",String.valueOf(payload.getJMSExpiration())); 263 headers.put("JMSPriority",String.valueOf(payload.getJMSPriority())); 264 details.put(QueueServiceMBean.MESSAGE_HEADERS, headers); 265 } catch (JMSException e) { 266 e.printStackTrace(); 268 } 269 list.add(details); 270 } 271 return list; 272 } 273 274 286 290 public long getMaxQueueSize(){ 291 return maxQueueSize; 292 } 293 294 297 public byte getServiceType() { 298 return super.SERVICE_TYPE_QUEUE; 299 } 300 301 302 303 304 308 public synchronized void active(){ 309 if(unsentMessages == null){ 310 currentServiceConsumerLock = new Object (); 311 boolean persistent = this.getPersistentMode() ==MantaAgentConstants.PERSISTENT; 312 if(!isTempQueue){ 313 savedMessages = new PersistentMap("queueService_"+this.getServiceName(),persistent,true); 314 }else{ 315 savedMessages = new HashMap (); 316 } 317 318 unsentMessages = new PrioritizedList(MantaAgentConstants.TOTAL_PRIORITIES); 319 sentMessages = new LinkedList (); 320 queueListeners = new LinkedList (); 321 active =true; 322 ServiceActorControlCenter.addConsumerStatusListeners(this); 323 recover(); 324 dispatch = new QueueDispatcher(this); 325 dispatch.start(); 326 } 327 328 } 330 334 public void waitForListeners() throws InterruptedException { 335 synchronized(queueListeners){ 336 if(queueListeners.size()>0) 337 return; 338 queueListeners.wait(); 339 return; 340 } 341 } 342 343 344 349 protected void registerReceiverToQueue( ServiceConsumer consumer,long numberOfReceive ){ 350 351 QueueReceiver receiver = new QueueReceiver(consumer , numberOfReceive); 352 if(numberOfReceive == 0){ 353 doReceiveNoWait(receiver); 354 355 }else{ 356 doHandleReceiver(receiver); 357 } 358 360 } 362 private void doHandleReceiver(QueueReceiver receiver){ 363 synchronized(queueListeners){ 364 queueListeners.add(receiver); 365 queueListeners.notifyAll(); 366 } 367 } 369 373 private void doReceiveNoWait(QueueReceiver receiver){ 374 MantaBusMessage msg = null; 375 if(!pause){ 376 synchronized(currentServiceConsumerLock){ 377 if(currentServiceConsumer == null ||currentServiceConsumer.getId().equals(receiver.getConsumer().getId())){ 378 currentServiceConsumer = receiver.getConsumer(); 379 synchronized(unsentMessages){ 380 int size = unsentMessages.size(); 381 for (int index = 0; index < size; index++) { 382 msg =(MantaBusMessage) unsentMessages.get(index); 383 if(checkValidMessage(msg ,receiver.getConsumer() )){ 384 unsentMessages.remove(index); 386 sentMessages.addFirst(msg); 387 break; 388 }else{ 389 msg = null; 390 } 391 } 392 } 393 } 394 } 395 } 397 398 if(msg == null ){ 399 msg = MantaBusMessage.getInstance(); 400 msg.setPayload(null); 401 msg.setSource(this.queueMaster); 402 msg.setPriority(MantaAgentConstants.HIGH); 403 msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 404 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 405 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_IS_EMPTY,MantaBusMessageConsts.HEADER_VALUE_TRUE); 406 407 } 408 if(ackNotifier != null){ 409 ackNotifier.setTempListener(msg, this); 410 } 411 412 receiver.receive(msg); 413 } 415 416 420 protected void unregisterConsumerToQueue(ServiceConsumer consumer ){ 421 synchronized(unsentMessages){ 423 synchronized(currentServiceConsumerLock){ 424 synchronized(queueListeners){ 425 Iterator receivers = queueListeners.iterator(); 426 427 while (receivers.hasNext()) { 428 QueueReceiver rec = (QueueReceiver) receivers.next(); 429 if(rec.getConsumer().getId().equals(consumer.getId())){ 430 receivers.remove(); 431 } 433 } 434 435 if(currentServiceConsumer != null && 436 currentServiceConsumer.getId().equals(consumer.getId())){ 437 rollback(); 438 } 439 440 441 } 442 } 443 } 444 } 446 447 public void unregisterReceiverToQueue(ServiceConsumer consumer) { 448 synchronized(currentServiceConsumerLock){ 449 synchronized(queueListeners){ 450 Iterator receivers = queueListeners.iterator(); 451 452 while (receivers.hasNext()) { 453 QueueReceiver rec = (QueueReceiver) receivers.next(); 454 if(rec.getConsumer().getId().equals(consumer.getId())){ 455 receivers.remove(); 456 } } 460 } 461 } 462 463 } 465 466 private void rollback(){ 467 468 471 LinkedList tempCopy = new LinkedList (); 474 Iterator sentIter = sentMessages.iterator(); 475 while(sentIter.hasNext()){ 476 try { 477 MantaBusMessage rollBackMsg = (MantaBusMessage) sentIter.next(); 478 if(ackNotifier != null){ 479 ackNotifier.removeTempListener(rollBackMsg); 480 } 481 tempCopy.add(PostOffice.prepareMessageShallowCopy(rollBackMsg)); 482 } catch (IOException e) { 483 log.error("Rollback error",e); 484 485 } 486 } 487 unsentMessages.addAllToHead(tempCopy); 488 unsentMessages.notifyAll(); 489 sentMessages.clear(); 490 491 synchronized(currentServiceConsumerLock){ 492 currentServiceConsumerLock.notifyAll(); 493 currentServiceConsumer = null; 494 495 } 496 } 497 498 502 protected void sendQueueCopy(ServiceConsumer consumer ) { 503 ByteableList underlineCopy =new ByteableList(); 504 underlineCopy.addAll(unsentMessages) ; 505 506 int size = underlineCopy.size(); 507 for (int i = 0; i < size; i++) { 508 MantaBusMessage msg =(MantaBusMessage) underlineCopy.get(i); 509 if(!checkValidMessage(msg ,consumer )){ 510 underlineCopy.remove(i); 512 i--; 513 size--; 514 } 515 } 516 517 QueueReceiver receiver = new QueueReceiver(consumer, 0); 518 519 MantaBusMessage msg = MantaBusMessage.getInstance(); 520 msg.setPayload(underlineCopy); 521 msg.setPriority(MantaAgentConstants.HIGH); 522 msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 523 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 524 525 MantaAddress address = new ServiceProducer(MantaAgent.getInstance().getAgentName(),this.getServiceName(),MantaService.SERVICE_TYPE_QUEUE); 526 msg.setSource(address); 527 528 receiver.receive(msg); 529 530 } 532 533 534 537 public boolean isActive() { 538 return active; 539 } 540 541 544 public QueueMaster getQueueMaster() { 545 return queueMaster; 546 } 547 550 public void setQueueMaster(QueueMaster queueMaster) { 551 QueueMaster oldMaster = null; 552 synchronized(queueMasterLockObject){ 553 if(this.queueMaster!=null){ 554 oldMaster = this.queueMaster; 555 } 556 this.queueMaster = queueMaster; 557 if(queueMaster != null){ 558 queueMasterLockObject.notifyAll(); 559 if(queueMaster.getAgentName().equals(MantaAgent.getInstance().getAgentName())){ 560 iAmQueueMaster = true; 561 } 562 subscriberManager.queueCoordinatorFound(queueMaster); 563 }else{ 564 if(iAmQueueMaster){ 565 iAmQueueMaster = false; 566 }else if(isTempQueue){ 567 try { 568 MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager().closeQueue(getServiceName()); 569 } catch (MantaException e) { 570 log.error("Failed to close temp queue",e); 571 } 572 } 573 } 574 if (oldMaster != null) { 575 MantaAgent.getInstance().getSingletonRepository().getPostOffice().handleCoordinatorDown(oldMaster, this.queueMaster); 577 } 578 579 } 580 } 581 582 public boolean amIQueueMaster(){ 583 584 return iAmQueueMaster; 585 586 } 587 588 public void waitForQueueMaster(long timeToWait) throws InterruptedException { 589 synchronized(queueMasterLockObject){ 590 if(queueMaster == null){ 591 queueMasterLockObject.wait(timeToWait); 592 593 } } } 596 597 public QueueSubscriberManager getSubscriberManager() { 598 return subscriberManager; 599 } 600 601 602 605 public void doDequeue() throws InterruptedException { 606 waitForMessages(); 608 QueueReceiver receiver = findEligibleReceiver(); 609 610 if(receiver == null){ 611 synchronized(unsentMessages){ 613 synchronized(currentServiceConsumerLock){ 614 if(currentServiceConsumer !=null){ 615 if(!ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)){ 616 rollback(); 617 } 618 } 619 } 620 } 621 Thread.sleep(100); 622 return; 623 } 624 boolean fed; 625 synchronized(unsentMessages){ 626 fed = feedReceiver(receiver); 627 } 628 if(receiver.getNumberOfReceive()>0){ 630 synchronized(queueListeners){ 631 if(ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)){ 632 queueListeners.addLast(receiver); 633 634 }else{ 635 Thread.sleep(1500); 638 if(ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)){ 639 queueListeners.addLast(receiver); 640 } 644 } 645 } 646 } 650 if(fed){ 651 synchronized(unsentMessages){ 652 if(sentMessages.size()>0){ 653 synchronized(currentServiceConsumerLock){ 654 currentServiceConsumer = receiver.getConsumer(); 655 } 656 } 657 } 658 659 }else{ 660 Thread.sleep(100); 662 } 663 } 665 private void checkNotPause() { 666 synchronized(pauseLockObject){ 667 if(this.pause){ 668 try { 669 pauseLockObject.wait(); 670 } catch (InterruptedException e) { 671 } 673 } 674 } 675 676 677 } 678 679 private QueueReceiver findEligibleReceiver() throws InterruptedException { 680 waitForListeners(); 681 checkNotPause(); 682 synchronized(currentServiceConsumerLock){ 683 684 synchronized(queueListeners){ 685 if(currentServiceConsumer ==null){ 686 try{ 687 return (QueueReceiver) queueListeners.removeFirst(); 688 }catch(Exception e){ 689 return null; 690 } 691 692 } 693 694 Iterator receivers = queueListeners.iterator(); 695 while(receivers.hasNext()){ 696 QueueReceiver receiver = (QueueReceiver) receivers.next(); 697 if(receiver.getConsumer().getId().equals(currentServiceConsumer.getId())){ 698 receivers.remove(); 699 return receiver; 700 } 701 } 702 } } 704 return null; 705 706 } 707 708 private void waitForMessages() throws InterruptedException { 709 synchronized(unsentMessages){ 710 if(unsentMessages.size()>0) 711 return; 712 unsentMessages.wait(); 713 return; 714 } 715 716 } 717 718 719 724 private boolean feedReceiver(QueueReceiver receiver) { 725 int size = unsentMessages.size(); 726 MantaBusMessage msg = null; 727 728 for (int index = 0; index < size; index++) { 729 msg =(MantaBusMessage) unsentMessages.get(index); 730 if(msg.getValidUntil() <SystemTime.gmtCurrentTimeMillis()){ 732 if(log.isInfoEnabled()){ 733 log.info("Not sending message "+msg +" msg.getValidUntil()=" +msg.getValidUntil()+ " SystemTime.gmtCurrentTimeMillis()=" +SystemTime.gmtCurrentTimeMillis()+"."); 734 } 735 unsentMessages.remove(index); 736 DeadLetterHandler.HandleDeadMessage(msg); 737 return false; 738 } 739 if(checkValidMessage(msg ,receiver.getConsumer() )){ 740 unsentMessages.remove(index); 742 sentMessages.addFirst(msg); 743 if(ackNotifier!=null) 744 ackNotifier.setTempListener(msg, this); 745 receiver.receive(msg); 746 return true; 747 } 748 } 749 750 return false; 751 } 752 753 protected void enqueue(MantaBusMessage enqueuedMessage, boolean persistent) { 754 synchronized(unsentMessages){ 755 if(!isTempQueue){ 756 ((PersistentMap)savedMessages).put(enqueuedMessage.getMessageId(),enqueuedMessage, persistent); 757 }else{ 758 savedMessages.put(enqueuedMessage.getMessageId(),enqueuedMessage); 759 } 760 761 unsentMessages.add(enqueuedMessage); 762 unsentMessages.notifyAll(); 763 } 764 } 765 766 770 public boolean isOverflow() { 771 int size = unsentMessages.size(); 772 if(size < maxQueueSize ){ 773 return false; 774 } 775 return true; 776 777 } 778 779 782 public void gotAck(MantaBusMessage msg, MantaAddress source) { 783 synchronized(unsentMessages){ 784 savedMessages.remove(msg.getMessageId()); 785 sentMessages.remove(msg); 786 if(sentMessages.size() == 0){ 787 synchronized( currentServiceConsumerLock){ 788 if(currentServiceConsumer!= null ){ 789 currentServiceConsumerLock.notifyAll(); 790 currentServiceConsumer = null; 791 } 792 } 793 } 794 } 795 } 797 801 public void gotAckReject(MantaBusMessage msg, MantaAddress source) { 802 synchronized(unsentMessages) { 803 msg.setDeliveryCount(msg.getDeliveryCount()+1); 804 unsentMessages.addToHead(msg); 805 sentMessages.remove(msg); 806 if (sentMessages.size() == 0) { 807 synchronized (currentServiceConsumerLock) { 808 if (currentServiceConsumer!= null) { 809 currentServiceConsumerLock.notifyAll(); 810 currentServiceConsumer = null; 811 } 812 } 813 } 814 } 815 } 816 817 820 public synchronized void recover() { 821 822 if(savedMessages.isEmpty()){ 823 return; 824 } 825 ArrayList tempList = new ArrayList (); 826 synchronized(savedMessages){ 827 tempList.addAll(savedMessages.values()); 828 } 829 MantaBusMessageUtil.sortMessagesByEnqueueTime(tempList,VirtualQueuesManager.ENQUEUE_TIME); 831 833 int size = tempList.size(); 834 for (int i = 0; i < size; i++) { 835 MantaBusMessage msg = (MantaBusMessage)tempList.get(i); 836 long now = SystemTime.gmtCurrentTimeMillis() ; 837 if( (msg.getValidUntil() <now ) ){ 838 if(log.isInfoEnabled()){ 839 log.info("Not sending message "+msg +" msg.getValidUntil()=" +msg.getValidUntil()+ " SystemTime.gmtCurrentTimeMillis()=" +SystemTime.gmtCurrentTimeMillis()+"."); 840 } 841 savedMessages.remove(msg.getMessageId()); 842 DeadLetterHandler.HandleDeadMessage(msg); 843 844 } else 846 847 unsentMessages.add(msg); 849 850 } 852 } 854 855 public void handleConsumerUp(ServiceConsumer consumer) { 856 858 } 859 860 861 public void handleConsumerDown(ServiceConsumer consumer) { 862 unregisterConsumerToQueue(consumer); 863 864 } 865 866 867 868 public int getUnsentCount() { 869 return unsentMessages.size(); 870 } 871 872 873 874 public String toString(){ 875 StringBuffer buff = new StringBuffer (); 876 buff.append(" service{"); 877 buff.append(" service name="); 878 buff.append(logicalName); 879 buff.append(" serviceType="); 880 buff.append(getServiceType()); 881 buff.append(" consumers="); 882 buff.append(consumers); 883 buff.append(" producers="); 884 buff.append(producers); 885 buff.append(" coordinator="); 886 buff.append(queueMaster); 887 buff.append(" persistentMode="); 888 buff.append(super.getPersistentMode()); 889 buff.append(" }"); 890 return buff.toString(); 891 } 892 893 894 public boolean isTempQueue() { 895 return isTempQueue; 896 } 897 } 898 | Popular Tags |