1 23 package org.objectweb.joram.mom.proxies; 24 25 import java.util.Enumeration ; 26 import java.util.Hashtable ; 27 import java.util.Vector ; 28 import java.io.ObjectInputStream ; 29 import java.io.ObjectOutputStream ; 30 import java.io.IOException ; 31 32 import org.objectweb.joram.mom.dest.DeadMQueueImpl; 33 import org.objectweb.joram.mom.notifications.ClientMessages; 34 import org.objectweb.joram.mom.messages.Message; 35 import org.objectweb.joram.shared.client.ConsumerMessages; 36 import org.objectweb.joram.shared.selectors.Selector; 37 38 import fr.dyade.aaa.agent.AgentId; 39 import fr.dyade.aaa.agent.Channel; 40 41 import org.objectweb.joram.shared.JoramTracing; 42 import org.objectweb.util.monolog.api.BasicLevel; 43 44 49 class ClientSubscription implements java.io.Serializable { 50 51 private AgentId proxyId; 52 53 private boolean durable; 54 55 private AgentId topicId; 56 57 private String name; 58 59 private String selector; 60 64 private AgentId dmqId; 65 69 private Integer threshold; 70 71 72 protected int nbMaxMsg = -1; 73 74 75 private Vector messageIds; 76 77 private Hashtable deliveredIds; 78 79 private Hashtable deniedMsgs; 80 81 82 private transient int contextId; 83 84 private transient int subRequestId; 85 89 private transient boolean noLocal; 90 94 private transient boolean noFiltering; 95 96 97 private transient boolean active; 98 102 private transient int requestId; 103 104 private transient boolean toListener; 105 106 private transient long requestExpTime; 107 108 112 private transient Hashtable messagesTable; 113 114 115 private transient String proxyStringId; 116 117 private transient ProxyAgentItf proxy; 118 119 135 ClientSubscription(AgentId proxyId, 136 int contextId, 137 int reqId, 138 boolean durable, 139 AgentId topicId, 140 String name, 141 String selector, 142 boolean noLocal, 143 AgentId dmqId, 144 Integer threshold, 145 Hashtable messagesTable) 146 { 147 this.proxyId = proxyId; 148 this.contextId = contextId; 149 this.subRequestId = reqId; 150 this.durable = durable; 151 this.topicId = topicId; 152 this.name = name; 153 this.selector = selector; 154 this.noLocal = noLocal; 155 this.dmqId = dmqId; 156 this.threshold = threshold; 157 this.messagesTable = messagesTable; 158 159 messageIds = new Vector (); 160 deliveredIds = new Hashtable (); 161 deniedMsgs = new Hashtable (); 162 163 noFiltering = (! noLocal) && (selector == null || selector.equals("")); 164 165 active = true; 166 requestId = -1; 167 toListener = false; 168 169 proxyStringId = proxyId.toString(); 170 171 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 172 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 173 this + ": created."); 174 } 175 176 202 public String toString() 203 { 204 return "ClientSubscription" + proxyId + name; 205 } 206 207 208 209 int getContextId() 210 { 211 return contextId; 212 } 213 214 215 int getSubRequestId() 216 { 217 return subRequestId; 218 } 219 220 221 String getName() 222 { 223 return name; 224 } 225 226 227 AgentId getTopicId() 228 { 229 return topicId; 230 } 231 232 233 String getSelector() 234 { 235 return selector; 236 } 237 238 239 boolean getDurable() 240 { 241 return durable; 242 } 243 244 245 boolean getActive() 246 { 247 return active; 248 } 249 250 257 public int getNbMaxMsg() { 258 return nbMaxMsg; 259 } 260 261 267 public void setNbMaxMsg(int nbMaxMsg) { 268 this.nbMaxMsg = nbMaxMsg; 269 } 270 271 276 int getMessageCount() { 277 return messageIds.size(); 278 } 279 280 285 String [] getMessageIds() { 286 String [] res = new String [messageIds.size()]; 287 messageIds.copyInto(res); 288 return res; 289 } 290 291 void setProxyAgent(ProxyAgentItf px) { 292 proxy = px; 293 } 294 295 303 void reinitialize(String proxyStringId, 304 Hashtable messagesTable, 305 Vector persistedMessages, 306 boolean denyDeliveredMessages) 307 { 308 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 309 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 310 "ClientSubscription[" + this + 311 "].reinitialize()"); 312 313 this.proxyStringId = proxyStringId; 314 this.messagesTable = messagesTable; 315 316 Message message; 318 String msgId; 319 for (Enumeration e = persistedMessages.elements(); e.hasMoreElements();) { 320 message = (Message) e.nextElement(); 321 msgId = message.getIdentifier(); 322 323 if (messageIds.contains(msgId) || deliveredIds.contains(msgId)) { 324 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 325 JoramTracing.dbgProxy.log( 326 BasicLevel.DEBUG, 327 " -> contains message " + msgId); 328 message.acksCounter++; 329 message.durableAcksCounter++; 330 331 if (message.acksCounter == 1) { 332 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 333 JoramTracing.dbgProxy.log( 334 BasicLevel.DEBUG, 335 " -> messagesTable.put(" + msgId + ')'); 336 messagesTable.put(msgId, message); 337 } 338 } 347 } 348 349 if (denyDeliveredMessages) { 350 deny(deliveredIds.keys()); 352 deliveredIds.clear(); 353 } 354 } 355 356 366 void reactivate(int contextId, 367 int reqId, 368 AgentId topicId, 369 String selector, 370 boolean noLocal) 371 { 372 this.contextId = contextId; 373 this.subRequestId = reqId; 374 this.topicId = topicId; 375 this.selector = selector; 376 this.noLocal = noLocal; 377 378 noFiltering = (! noLocal) && (selector == null || selector.equals("")); 379 380 active = true; 381 requestId = -1; 382 toListener = false; 383 384 save(); 386 387 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 388 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 389 this + ": reactivated."); 390 } 391 392 393 void deactivate() { 394 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 395 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 396 "ClientSubscription.deactivate()"); 397 398 unsetListener(); 399 unsetReceiver(); 400 active = false; 401 402 deny(deliveredIds.keys()); 404 deliveredIds.clear(); 405 406 save(); 408 409 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 410 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 411 this + ": deactivated."); 412 } 413 414 void setActive(boolean active) { 415 this.active = active; 416 } 417 418 423 void setListener(int requestId) 424 { 425 this.requestId = requestId; 426 toListener = true; 427 428 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 429 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 430 this + ": listener set."); 431 } 432 433 434 void unsetListener() 435 { 436 requestId = -1; 437 toListener = false; 438 439 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 440 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 441 this + ": listener unset."); 442 } 443 444 450 void setReceiver(int requestId, long timeToLive) { 451 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 452 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 453 this + ".setReceiver(" + requestId + 454 "," + timeToLive + ")"); 455 456 this.requestId = requestId; 457 toListener = false; 458 459 if (timeToLive > 0) 460 requestExpTime = System.currentTimeMillis() + timeToLive; 461 else 462 requestExpTime = 0; 463 } 464 465 466 void unsetReceiver() { 467 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 468 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 469 this + ".unsetReceiver()"); 470 requestId = -1; 471 requestExpTime = 0; 472 } 473 474 475 void setDMQId(AgentId dmqId) 476 { 477 this.dmqId = dmqId; 478 save(); 479 } 480 481 482 void setThreshold(Integer threshold) 483 { 484 this.threshold = threshold; 485 save(); 486 } 487 488 489 493 void browseNewMessages(Vector newMessages) { 496 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 497 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 498 this + ".browseNewMessages(" + 499 newMessages + ')'); 500 Message message; 502 String msgId; 503 for (Enumeration e = newMessages.elements(); e.hasMoreElements();) { 504 message = (Message) e.nextElement(); 505 msgId = message.getIdentifier(); 506 507 if (nbMaxMsg > -1 && nbMaxMsg <= messageIds.size()) { 509 ClientMessages deadMessages = new ClientMessages(); 510 deadMessages.addMessage(message.msg); 511 sendToDMQ(deadMessages); 512 continue; 513 } 514 515 if (noFiltering || 517 (Selector.matches(message.msg, selector) && 518 (! noLocal || ! msgId.startsWith(proxyId.toString().substring(1) + "c" + contextId + "m", 3)))) { 519 520 if (message.acksCounter == 0) 522 523 messagesTable.put(msgId, message); 524 525 message.acksCounter++; 526 if (durable) 527 message.durableAcksCounter++; 528 529 messageIds.add(msgId); 530 save(); 531 532 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 533 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 534 this + ": added msg " + msgId 535 + " for delivery."); 536 } 537 } 538 } 539 540 543 ConsumerMessages deliver() { 544 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 545 JoramTracing.dbgProxy.log( 546 BasicLevel.DEBUG, 547 "ClientSubscription[" + proxyId + ',' + 548 topicId + ',' + name + "].deliver()"); 549 550 if (requestId == -1) 552 return null; 553 554 if (! toListener 556 && requestExpTime > 0 557 && System.currentTimeMillis() >= requestExpTime) { 558 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 559 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 560 this + ": receive request " + requestId 561 + " expired."); 562 requestId = -1; 563 requestExpTime = 0; 564 return null; 565 } 566 567 String id; 568 Message message; 569 Integer deliveryAttempts = null; 570 int lastPrior = -1; 571 int insertionIndex = -1; 572 int prior; 573 Vector deliverables = new Vector (); 574 ClientMessages deadMessages = null; 575 576 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 577 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 578 " -> messageIds.size() = " + messageIds.size()); 579 580 if (toListener) { 582 while (! messageIds.isEmpty()) { 584 id = (String ) messageIds.remove(0); 585 save(); 586 message = (Message) messagesTable.get(id); 587 588 if (message != null) { 590 if (message.isValid(System.currentTimeMillis())) { 592 deliveredIds.put(id, id); 593 594 deliveryAttempts = (Integer ) deniedMsgs.get(id); 596 if (deliveryAttempts == null) 597 message.msg.deliveryCount = 1; 598 else { 599 message.msg.deliveryCount = deliveryAttempts.intValue() + 1; 600 message.msg.redelivered = true; 601 } 602 603 if (lastPrior == -1 || message.getPriority() == lastPrior) 605 insertionIndex++; 606 else { 607 insertionIndex = 0; 608 while (insertionIndex < deliverables.size()) { 609 prior = 610 ((Message) deliverables.get(insertionIndex)).getPriority(); 611 if (prior >= message.getPriority()) 612 insertionIndex++; 613 else 614 break; 615 } 616 } 617 lastPrior = message.getPriority(); 618 deliverables.insertElementAt(message.msg.clone(), insertionIndex); 619 620 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 621 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 622 this + ": message " + id 623 + " added for delivery."); 624 } else { 625 messagesTable.remove(id); 628 if (durable) 630 message.delete(); 631 632 deliveryAttempts = (Integer ) deniedMsgs.remove(id); 634 if (deliveryAttempts != null) { 635 message.msg.deliveryCount = deliveryAttempts.intValue(); 636 message.msg.redelivered = true; 637 } 638 message.msg.expired = true; 639 if (deadMessages == null) 640 deadMessages = new ClientMessages(); 641 deadMessages.addMessage(message.msg); 642 } 643 } else { 644 deniedMsgs.remove(id); 646 } 647 } 648 } else { 649 int highestP = -1; 651 Message keptMsg = null; 652 int i = 0; 654 while (i < messageIds.size()) { 655 id = (String ) messageIds.elementAt(i); 656 message = (Message) messagesTable.get(id); 657 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 658 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> message = " + message); 659 660 if (message != null) { 662 if (message.isValid(System.currentTimeMillis())) { 664 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 665 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> valid message"); 666 if (message.getPriority() > highestP) { 668 highestP = message.getPriority(); 669 keptMsg = message; 670 } 671 672 i++; 674 } else { 675 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 678 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> invalid message"); 679 messageIds.remove(id); 680 save(); 681 messagesTable.remove(id); 682 if (durable) 684 message.delete(); 685 686 deliveryAttempts = (Integer ) deniedMsgs.remove(id); 688 if (deliveryAttempts != null) { 689 message.msg.deliveryCount = deliveryAttempts.intValue(); 690 message.msg.redelivered = true; 691 } 692 message.msg.expired = true; 693 deadMessages = new ClientMessages(); 694 deadMessages.addMessage(message.msg); 695 } 696 } else { 697 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 699 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> deleted message"); 700 701 messageIds.remove(id); 702 deniedMsgs.remove(id); 703 save(); 704 } 705 } 706 707 if (keptMsg != null) { 709 messageIds.remove(keptMsg.getIdentifier()); 710 deliveredIds.put(keptMsg.getIdentifier(), keptMsg.getIdentifier()); 711 save(); 712 713 deliveryAttempts = (Integer ) deniedMsgs.get(keptMsg.getIdentifier()); 715 if (deliveryAttempts == null) 716 keptMsg.msg.deliveryCount = 1; 717 else { 718 keptMsg.msg.deliveryCount = deliveryAttempts.intValue() + 1; 719 keptMsg.msg.redelivered = true; 720 } 721 deliverables.add(keptMsg.msg.clone()); 722 723 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 724 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 725 this + ": message " + keptMsg.getIdentifier() 726 + " added for delivery."); 727 } else { 728 i++; 729 } 730 } 731 732 if (deadMessages != null) 734 sendToDMQ(deadMessages); 735 736 if (! deliverables.isEmpty()) { 738 ConsumerMessages consM = new ConsumerMessages(requestId, 739 deliverables, 740 name, 741 false); 742 if (! toListener) requestId = -1; 743 744 return consM; 745 } 746 return null; 747 } 748 749 752 void acknowledge(Enumeration acks) { 753 while (acks.hasMoreElements()) { 754 String id = (String ) acks.nextElement(); 755 acknowledge(id); 756 } 757 } 758 759 void acknowledge(String id) { 760 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 761 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 762 this + ": acknowledges message: " + id); 763 764 deliveredIds.remove(id); 765 deniedMsgs.remove(id); 766 save(); 767 Message msg = (Message) messagesTable.get(id); 768 769 if (msg != null) { 771 msg.acksCounter--; 772 if (msg.acksCounter == 0) 773 messagesTable.remove(id); 774 if (durable) { 775 msg.durableAcksCounter--; 776 777 if (msg.durableAcksCounter == 0) 778 msg.delete(); 779 } 780 } 781 } 782 783 786 void deny(Enumeration denies) { 787 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 788 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 789 this + ".deny(" + denies + ')'); 790 String id; 791 Message message; 792 ClientMessages deadMessages = null; 793 int deliveryAttempts = 1; 794 int i; 795 String currentId; 796 long currentO; 797 798 denyLoop: 799 while (denies.hasMoreElements()) { 800 id = (String ) denies.nextElement(); 801 802 String deliveredMsgId = (String )deliveredIds.remove(id); 803 if (deliveredMsgId == null) { 804 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 805 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 806 this + ": cannot denies message: " + id); 807 808 continue denyLoop; 809 } 810 save(); 811 812 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 813 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 814 this + ": denies message: " + id); 815 816 message = (Message) messagesTable.get(id); 817 818 if (message == null) continue denyLoop; 820 821 Integer value = (Integer ) deniedMsgs.get(id); 822 if (value != null) 823 deliveryAttempts = value.intValue() + 1; 824 825 if (isUndeliverable(deliveryAttempts)) { 828 deniedMsgs.remove(id); 829 message.msg.deliveryCount = deliveryAttempts; 830 message.msg.undeliverable = true; 831 if (deadMessages == null) 832 deadMessages = new ClientMessages(); 833 deadMessages.addMessage(message.msg); 834 835 message.acksCounter--; 836 if (message.acksCounter == 0) 837 messagesTable.remove(id); 838 839 if (durable) { 840 message.durableAcksCounter--; 841 if (message.durableAcksCounter == 0) 842 message.delete(); 843 } 844 } else { 845 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 849 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 850 " -> put back to the messages to deliver"); 851 852 i = 0; 853 insertLoop: 854 while (i < messageIds.size()) { 855 currentId = (String ) messageIds.elementAt(i); 856 Message currentMessage = (Message) messagesTable.get(currentId); 857 858 if (currentMessage != null) { 860 currentO = currentMessage.order; 861 if (currentO > message.order) { 862 break insertLoop; 863 } else { 864 i++; 865 } 866 } else { 867 messageIds.removeElementAt(i); 869 } 870 } 871 872 messageIds.insertElementAt(id, i); 873 deniedMsgs.put(id, new Integer (deliveryAttempts)); 874 } 875 } 876 877 if (deadMessages != null) 879 sendToDMQ(deadMessages); 880 881 } 882 883 887 void delete() { 888 for (Enumeration e = deliveredIds.keys(); e.hasMoreElements();) 889 messageIds.add(e.nextElement()); 890 891 for (Enumeration allMessageIds = messageIds.elements(); 892 allMessageIds.hasMoreElements();) { 893 removeMessage((String ) allMessageIds.nextElement()); 894 } 895 } 896 897 898 902 private boolean isUndeliverable(int deliveryAttempts) { 903 if (threshold != null) 904 return deliveryAttempts == threshold.intValue(); 905 else if (DeadMQueueImpl.getDefaultThreshold() != null) 906 return deliveryAttempts == DeadMQueueImpl.getDefaultThreshold().intValue(); 907 return false; 908 } 909 910 913 private void sendToDMQ(ClientMessages messages) 914 { 915 if (dmqId != null) 916 Channel.sendTo(dmqId, messages); 917 else if (DeadMQueueImpl.getId() != null) 918 Channel.sendTo(DeadMQueueImpl.getId(), messages); 919 } 920 921 Message getMessage(String msgId) { 922 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 923 JoramTracing.dbgProxy.log( 924 BasicLevel.DEBUG, 925 "ClientSubscription.getMessage(" + msgId + ')'); 926 int index = messageIds.indexOf(msgId); 927 if (index < 0) { 928 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 930 JoramTracing.dbgProxy.log( 931 BasicLevel.DEBUG, " -> message not found"); 932 return null; 933 } else { 934 return (Message) messagesTable.get(msgId); 935 } 936 } 937 938 void deleteMessage(String msgId) { 939 messageIds.remove(msgId); 940 Message message = removeMessage(msgId); 941 save(); 942 if (message != null) { 943 ClientMessages deadMessages = new ClientMessages(); 944 deadMessages.addMessage(message.msg); 945 sendToDMQ(deadMessages); 946 } 947 } 948 949 void clear() { 950 ClientMessages deadMessages = null; 951 for (int i = 0; i < messageIds.size(); i++) { 952 String msgId = (String )messageIds.elementAt(i); 953 Message message = removeMessage(msgId); 954 if (message != null) { 955 if (deadMessages == null) 956 deadMessages = new ClientMessages(); 957 deadMessages.addMessage(message.msg); 958 } 959 } 960 if (deadMessages != null) 961 sendToDMQ(deadMessages); 962 messageIds.clear(); 963 save(); 964 } 965 966 972 Message removeMessage(String msgId) { 973 Message message = (Message) messagesTable.get(msgId); 974 if (message != null) { 975 message.acksCounter--; 976 if (message.acksCounter == 0) 977 messagesTable.remove(msgId); 978 if (durable) { 979 message.durableAcksCounter--; 980 if (message.durableAcksCounter == 0) 981 message.delete(); 982 } 983 } 984 return message; 985 } 986 987 private void save() { 988 if (durable) proxy.setSave(); 989 } 990 991 public void readBag(ObjectInputStream in) 992 throws IOException , ClassNotFoundException { 993 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 994 JoramTracing.dbgProxy.log( 995 BasicLevel.DEBUG, 996 "ClientSubscription[" + 997 proxyId + 998 "].readbag()"); 999 1000 contextId = in.readInt(); 1001 subRequestId = in.readInt(); 1002 noLocal = in.readBoolean(); 1003 noFiltering = in.readBoolean(); 1004 active = in.readBoolean(); 1005 requestId = in.readInt(); 1006 toListener = in.readBoolean(); 1007 requestExpTime = in.readLong(); 1008 } 1009 1010 public void writeBag(ObjectOutputStream out) 1011 throws IOException { 1012 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 1013 JoramTracing.dbgProxy.log( 1014 BasicLevel.DEBUG, 1015 "ClientSubscription[" + 1016 proxyId + 1017 "].writeBag()"); 1018 1019 out.writeInt(contextId); 1020 out.writeInt(subRequestId); 1021 out.writeBoolean(noLocal); 1022 out.writeBoolean(noFiltering); 1023 out.writeBoolean(active); 1024 out.writeInt(requestId); 1025 out.writeBoolean(toListener); 1026 out.writeLong(requestExpTime); 1027 } 1028} 1029 | Popular Tags |