1 24 package org.objectweb.joram.mom.dest; 25 26 import java.io.IOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.util.ArrayList ; 30 import java.util.Enumeration ; 31 import java.util.Hashtable ; 32 import java.util.Iterator ; 33 import java.util.List ; 34 import java.util.Properties ; 35 import java.util.Vector ; 36 37 import org.objectweb.joram.mom.messages.Message; 38 import org.objectweb.joram.mom.notifications.AbortReceiveRequest; 39 import org.objectweb.joram.mom.notifications.AcknowledgeRequest; 40 import org.objectweb.joram.mom.notifications.AdminReply; 41 import org.objectweb.joram.mom.notifications.BrowseReply; 42 import org.objectweb.joram.mom.notifications.BrowseRequest; 43 import org.objectweb.joram.mom.notifications.ClientMessages; 44 import org.objectweb.joram.mom.notifications.DenyRequest; 45 import org.objectweb.joram.mom.notifications.DestinationAdminRequestNot; 46 import org.objectweb.joram.mom.notifications.ExceptionReply; 47 import org.objectweb.joram.mom.notifications.Monit_GetDMQSettings; 48 import org.objectweb.joram.mom.notifications.Monit_GetDMQSettingsRep; 49 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsg; 50 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsgRep; 51 import org.objectweb.joram.mom.notifications.Monit_GetNumberRep; 52 import org.objectweb.joram.mom.notifications.Monit_GetPendingMessages; 53 import org.objectweb.joram.mom.notifications.Monit_GetPendingRequests; 54 import org.objectweb.joram.mom.notifications.QueueMsgReply; 55 import org.objectweb.joram.mom.notifications.ReceiveRequest; 56 import org.objectweb.joram.mom.notifications.SetNbMaxMsgRequest; 57 import org.objectweb.joram.mom.notifications.SetRightRequest; 58 import org.objectweb.joram.mom.notifications.SetThreshRequest; 59 import org.objectweb.joram.mom.notifications.WakeUpNot; 60 import org.objectweb.joram.shared.admin.ClearQueue; 61 import org.objectweb.joram.shared.admin.DeleteQueueMessage; 62 import org.objectweb.joram.shared.admin.GetQueueMessage; 63 import org.objectweb.joram.shared.admin.GetQueueMessageIds; 64 import org.objectweb.joram.shared.admin.GetQueueMessageIdsRep; 65 import org.objectweb.joram.shared.admin.GetQueueMessageRep; 66 import org.objectweb.joram.shared.excepts.AccessException; 67 import org.objectweb.joram.shared.excepts.DestinationException; 68 import org.objectweb.joram.shared.selectors.Selector; 69 import org.objectweb.util.monolog.api.BasicLevel; 70 import org.objectweb.util.monolog.api.Logger; 71 72 import fr.dyade.aaa.agent.AgentId; 73 import fr.dyade.aaa.agent.DeleteNot; 74 import fr.dyade.aaa.agent.Notification; 75 import fr.dyade.aaa.agent.UnknownAgent; 76 import fr.dyade.aaa.util.Debug; 77 78 82 public class QueueImpl extends DestinationImpl implements QueueImplMBean { 83 public static Logger logger = Debug.getLogger(QueueImpl.class.getName()); 84 85 86 protected long period = -1; 87 88 93 public long getPeriod() { 94 return period; 95 } 96 97 103 public void setPeriod(long period) { 104 if ((this.period == -1L) && (period != -1L)) { 105 forward(destId, new WakeUpNot()); 107 } 108 this.period = period; 109 } 110 111 116 private Integer threshold = null; 117 118 123 public int getThreshold() { 124 if (threshold == null) 125 return -1; 126 else 127 return threshold.intValue(); 128 } 129 130 135 public void setThreshold(int threshold) { 136 if (threshold < 0) 137 this.threshold = null; 138 else 139 this.threshold = new Integer (threshold); 140 } 141 142 143 private boolean samePriorities; 144 145 private int priority; 146 147 148 protected Hashtable consumers; 149 150 protected Hashtable contexts; 151 152 153 protected long arrivalsCounter = 0; 154 155 160 public int getMessageCounter() { 161 if (messages != null) { 162 return messages.size(); 163 } 164 return 0; 165 } 166 167 168 protected Vector requests; 169 170 177 protected void cleanWaitingRequest(long currentTime) { 178 int index = 0; 179 while (index < requests.size()) { 180 if (! ((ReceiveRequest) requests.get(index)).isValid(currentTime)) { 181 requests.remove(index); 183 } else { 186 index++; 187 } 188 } 189 } 190 191 196 public int getWaitingRequestCount() { 197 if (requests != null) { 198 cleanWaitingRequest(System.currentTimeMillis()); 199 return requests.size(); 200 } 201 return 0; 202 } 203 204 205 protected transient boolean receiving = false; 206 207 208 protected transient Vector messages; 209 210 218 protected ClientMessages cleanPendingMessage(long currentTime) { 219 int index = 0; 220 221 ClientMessages deadMessages = null; 222 223 Message message = null; 224 while (index < messages.size()) { 225 message = (Message) messages.get(index); 226 if (! message.isValid(currentTime)) { 227 messages.remove(index); 228 message.delete(); 229 230 message.msg.expired = true; 231 232 if (deadMessages == null) 233 deadMessages = new ClientMessages(); 234 deadMessages.addMessage(message.msg); 235 236 if (logger.isLoggable(BasicLevel.DEBUG)) 237 logger.log(BasicLevel.DEBUG, 238 "Removes expired message " + message.getIdentifier()); 239 } else { 240 index++; 241 } 242 } 243 return deadMessages; 244 } 245 246 251 public int getPendingMessageCount() { 252 if (messages != null) { 253 return messages.size(); 254 } 255 return 0; 256 } 257 258 259 protected transient Hashtable deliveredMsgs; 260 261 266 public int getDeliveredMessageCount() { 267 if (deliveredMsgs != null) { 268 return deliveredMsgs.size(); 269 } 270 return 0; 271 } 272 273 274 protected int nbMaxMsg = -1; 275 276 283 public int getNbMaxMsg() { 284 return nbMaxMsg; 285 } 286 287 292 public void setNbMaxMsg(int nbMaxMsg) { 293 setSave(); 295 this.nbMaxMsg = nbMaxMsg; 296 } 297 298 305 public QueueImpl(AgentId destId, AgentId adminId, Properties prop) { 306 super(destId, adminId, prop); 307 308 try { 309 if (prop != null) 310 period = Long.valueOf(prop.getProperty("period")).longValue(); 311 } catch (NumberFormatException exc) { 312 period = -1L; 313 } 314 315 consumers = new Hashtable (); 316 contexts = new Hashtable (); 317 requests = new Vector (); 318 } 319 320 323 public String toString() { 324 return "QueueImpl:" + (destId == null ? "null" : destId.toString()); 325 } 326 327 330 public void wakeUpNot(WakeUpNot not) { 331 long current = System.currentTimeMillis(); 332 cleanWaitingRequest(current); 333 ClientMessages deadMessages = cleanPendingMessage(current); 335 if (deadMessages != null) 337 sendToDMQ(deadMessages, null); 338 } 339 340 346 public void setThreshRequest(AgentId from, SetThreshRequest req) throws AccessException { 347 if (! isAdministrator(from)) 348 throw new AccessException("ADMIN right not granted"); 349 350 setSave(); 352 353 threshold = req.getThreshold(); 354 355 String info = strbuf.append("Request [").append(req.getClass().getName()) 356 .append("], sent to Queue [").append(destId) 357 .append("], successful [true]: threshold [") 358 .append(threshold).append("] set").toString(); 359 strbuf.setLength(0); 360 forward(from, new AdminReply(req, true, info)); 361 362 if (logger.isLoggable(BasicLevel.DEBUG)) 363 logger.log(BasicLevel.DEBUG, info); 364 } 365 366 372 public void setNbMaxMsgRequest(AgentId from, SetNbMaxMsgRequest req) throws AccessException { 373 if (! isAdministrator(from)) 374 throw new AccessException("ADMIN right not granted"); 375 376 nbMaxMsg = req.getNbMaxMsg(); 377 378 String info = strbuf.append("Request [").append(req.getClass().getName()) 379 .append("], sent to Queue [").append(destId) 380 .append("], successful [true]: nbMaxMsg [") 381 .append(nbMaxMsg).append("] set").toString(); 382 strbuf.setLength(0); 383 forward(from, new AdminReply(req, true, info)); 384 385 if (logger.isLoggable(BasicLevel.DEBUG)) 386 logger.log(BasicLevel.DEBUG, info); 387 } 388 389 395 public void MonitGetDMQSettings(AgentId from, Monit_GetDMQSettings not) throws AccessException { 396 if (! isAdministrator(from)) 397 throw new AccessException("ADMIN right not granted"); 398 399 String id = null; 400 if (dmqId != null) 401 id = dmqId.toString(); 402 forward(from, new Monit_GetDMQSettingsRep(not, id, threshold)); 403 } 404 405 412 public void monitGetPendingMessages(AgentId from, Monit_GetPendingMessages not) throws AccessException { 413 if (! isAdministrator(from)) 414 throw new AccessException("ADMIN right not granted"); 415 416 ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis()); 418 if (deadMessages != null) 420 sendToDMQ(deadMessages, null); 421 422 forward(from, new Monit_GetNumberRep(not, messages.size())); 423 } 424 425 432 public void monitGetPendingRequests(AgentId from, Monit_GetPendingRequests not) throws AccessException { 433 if (! isAdministrator(from)) 434 throw new AccessException("ADMIN right not granted"); 435 436 437 forward(from, new Monit_GetNumberRep(not, getWaitingRequestCount())); 438 } 439 440 447 public void monitGetNbMaxMsg(AgentId from, Monit_GetNbMaxMsg not) throws AccessException { 448 if (! isAdministrator(from)) 449 throw new AccessException("ADMIN right not granted"); 450 451 forward(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg)); 452 } 453 454 462 public void receiveRequest(AgentId from, ReceiveRequest not) throws AccessException { 463 if (! isReader(from)) 465 throw new AccessException("READ right not granted"); 466 467 String [] toAck = not.getMessageIds(); 468 if (toAck != null) { 469 for (int i = 0; i < toAck.length; i++) { 470 acknowledge(toAck[i]); 471 } 472 } 473 474 long current = System.currentTimeMillis(); 475 cleanWaitingRequest(current); 476 not.requester = from; 478 not.setExpiration(current); 479 if (not.isPersistent()) { 480 setSave(); 482 } 483 requests.add(not); 484 485 if (logger.isLoggable(BasicLevel.DEBUG)) 486 logger.log(BasicLevel.DEBUG, " -> requests count = " + requests.size()); 487 488 int reqIndex = requests.size() - 1; 490 deliverMessages(reqIndex); 491 492 if ((requests.size() - 1) == reqIndex && not.getTimeOut() == -1) { 495 requests.remove(reqIndex); 496 QueueMsgReply reply = new QueueMsgReply(not); 497 if (isLocal(from)) { 498 reply.setPersistent(false); 499 } 500 forward(from, reply); 501 502 if (logger.isLoggable(BasicLevel.DEBUG)) 503 logger.log(BasicLevel.DEBUG, "Receive answered by a null."); 504 } 505 } 506 507 516 public void browseRequest(AgentId from, BrowseRequest not) throws AccessException { 517 if (! isReader(from)) 519 throw new AccessException("READ right not granted"); 520 521 BrowseReply rep = new BrowseReply(not); 523 524 ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis()); 526 int i = 0; 528 Message message; 529 while (i < messages.size()) { 530 message = (Message) messages.get(i); 531 if (Selector.matches(message.msg, not.getSelector())) { 532 rep.addMessage(message.msg); 534 } 535 i++; 536 } 537 538 if (deadMessages != null) 540 sendToDMQ(deadMessages, null); 541 542 forward(from, rep); 544 545 if (logger.isLoggable(BasicLevel.DEBUG)) 546 logger.log(BasicLevel.DEBUG, "Request answered."); 547 } 548 549 553 public void acknowledgeRequest(AgentId from, AcknowledgeRequest not) { 554 for (Enumeration ids = not.getIds(); ids.hasMoreElements();) { 555 String msgId = (String ) ids.nextElement(); 556 acknowledge(msgId); 557 } 558 } 559 560 private void acknowledge(String msgId) { 561 Message msg = (Message) deliveredMsgs.remove(msgId); 562 if ((msg != null) && msg.getPersistent()) { 563 setSave(); 565 } 566 consumers.remove(msgId); 567 contexts.remove(msgId); 568 if (msg != null) { 569 msg.delete(); 570 571 if (logger.isLoggable(BasicLevel.DEBUG)) { 572 logger.log(BasicLevel.DEBUG, "Message " + msgId + " acknowledged."); 573 } 574 } else if (logger.isLoggable(BasicLevel.WARN)) { 575 logger.log(BasicLevel.WARN, 576 "Message " + msgId + " not found for acknowledgement."); 577 } 578 } 579 580 587 public void denyRequest(AgentId from, DenyRequest not) { 588 if (logger.isLoggable(BasicLevel.DEBUG)) 589 logger.log(BasicLevel.DEBUG, 590 "QueueImpl.DenyRequest(" + from + ',' + not + ')'); 591 592 Enumeration ids = not.getIds(); 593 594 String msgId; 595 Message message; 596 AgentId consId; 597 int consCtx; 598 ClientMessages deadMessages = null; 599 600 if (! ids.hasMoreElements()) { 604 for (Enumeration delIds = deliveredMsgs.keys(); 606 delIds.hasMoreElements();) { 607 msgId = (String ) delIds.nextElement(); 608 609 message = (Message) deliveredMsgs.get(msgId); 610 consId = (AgentId) consumers.get(msgId); 611 consCtx = ((Integer ) contexts.get(msgId)).intValue(); 612 613 if (logger.isLoggable(BasicLevel.DEBUG)) 614 logger.log(BasicLevel.DEBUG, 615 " -> deny msg " + msgId + "(consId = " + consId + ')'); 616 617 if (consId.equals(from) && consCtx == not.getClientContext()) { 620 setSave(); 622 consumers.remove(msgId); 623 contexts.remove(msgId); 624 deliveredMsgs.remove(msgId); 625 message.msg.redelivered = true; 626 627 if (isUndeliverable(message)) { 630 message.delete(); 631 message.msg.undeliverable = true; 632 633 if (deadMessages == null) 634 deadMessages = new ClientMessages(); 635 deadMessages.addMessage(message.msg); 636 } else { 637 storeMessage(message); 639 } 640 641 if (logger.isLoggable(BasicLevel.DEBUG)) 642 logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied."); 643 } 644 } 645 } 646 647 for (ids = not.getIds(); ids.hasMoreElements();) { 649 msgId = (String ) ids.nextElement(); 650 message = (Message) deliveredMsgs.remove(msgId); 651 652 if (message == null) { 658 if (logger.isLoggable(BasicLevel.ERROR)) 659 logger.log(BasicLevel.ERROR, " -> already denied message " + msgId); 660 break; 661 } 662 663 message.msg.redelivered = true; 664 665 666 if (logger.isLoggable(BasicLevel.DEBUG)) 667 logger.log(BasicLevel.DEBUG, " -> deny " + msgId); 668 669 setSave(); 671 consumers.remove(msgId); 672 contexts.remove(msgId); 673 674 if (isUndeliverable(message)) { 677 message.delete(); 678 679 message.msg.undeliverable = true; 680 681 if (deadMessages == null) 682 deadMessages = new ClientMessages(); 683 deadMessages.addMessage(message.msg); 684 } else { 685 storeMessage(message); 687 } 688 689 if (logger.isLoggable(BasicLevel.DEBUG)) 690 logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied."); 691 } 692 if (deadMessages != null) 694 sendToDMQ(deadMessages, null); 695 696 deliverMessages(0); 698 } 699 700 public void abortReceiveRequest(AgentId from, 701 AbortReceiveRequest not) { 702 for (int i = 0; i < requests.size(); i++) { 703 ReceiveRequest request = (ReceiveRequest) requests.get(i); 704 if (request.requester.equals(from) && 705 request.getClientContext() == not.getClientContext() && 706 request.getRequestId() == not.getAbortedRequestId()) { 707 if (not.isPersistent()) { 708 setSave(); 710 } 711 requests.remove(i); 712 break; 713 } 714 } 715 } 716 717 public void destinationAdminRequestNot(AgentId from, DestinationAdminRequestNot not) { 718 org.objectweb.joram.shared.admin.AdminRequest adminRequest = 719 not.getRequest(); 720 if (adminRequest instanceof GetQueueMessageIds) { 721 getQueueMessageIds((GetQueueMessageIds)adminRequest, 722 not.getReplyTo(), 723 not.getRequestMsgId(), 724 not.getReplyMsgId()); 725 } else if (adminRequest instanceof GetQueueMessage) { 726 getQueueMessage((GetQueueMessage)adminRequest, 727 not.getReplyTo(), 728 not.getRequestMsgId(), 729 not.getReplyMsgId()); 730 } else if (adminRequest instanceof DeleteQueueMessage) { 731 deleteQueueMessage((DeleteQueueMessage)adminRequest, 732 not.getReplyTo(), 733 not.getRequestMsgId(), 734 not.getReplyMsgId()); 735 } else if (adminRequest instanceof ClearQueue) { 736 clearQueue((ClearQueue)adminRequest, 737 not.getReplyTo(), 738 not.getRequestMsgId(), 739 not.getReplyMsgId()); 740 } 741 } 742 743 private void getQueueMessageIds(GetQueueMessageIds request, 744 AgentId replyTo, 745 String requestMsgId, 746 String replyMsgId) { 747 String [] res = new String [messages.size()]; 748 for (int i = 0; i < messages.size(); i++) { 749 Message msg = (Message)messages.elementAt(i); 750 res[i] = msg.getIdentifier(); 751 } 752 GetQueueMessageIdsRep reply = 753 new GetQueueMessageIdsRep(res); 754 replyToTopic(reply, replyTo, requestMsgId, replyMsgId); 755 } 756 757 private void getQueueMessage(GetQueueMessage request, 758 AgentId replyTo, 759 String requestMsgId, 760 String replyMsgId) { 761 Message message = null; 762 for (int i = 0; i < messages.size(); i++) { 763 message = (Message) messages.elementAt(i); 764 if (message.getIdentifier().equals(request.getMessageId())) 765 break; 766 } 767 if (message != null) { 768 replyToTopic(new GetQueueMessageRep(message.msg), 769 replyTo, requestMsgId, replyMsgId); 770 } else { 771 772 replyToTopic( 773 new org.objectweb.joram.shared.admin.AdminReply( 774 false, "Message not found: " + message.getIdentifier()), 775 replyTo, requestMsgId, replyMsgId); 776 } 777 } 778 779 private void deleteQueueMessage(DeleteQueueMessage request, 780 AgentId replyTo, 781 String requestMsgId, 782 String replyMsgId) { 783 for (int i = 0; i < messages.size(); i++) { 784 Message message = (Message) messages.elementAt(i); 785 if (message.getIdentifier().equals(request.getMessageId())) { 786 messages.removeElementAt(i); 787 message.delete(); 788 ClientMessages deadMessages = new ClientMessages(); 789 deadMessages.addMessage(message.msg); 790 sendToDMQ(deadMessages, null); 791 break; 792 } 793 } 794 replyToTopic(new org.objectweb.joram.shared.admin.AdminReply(true, null), 795 replyTo, requestMsgId, replyMsgId); 796 } 797 798 private void clearQueue(ClearQueue request, 799 AgentId replyTo, 800 String requestMsgId, 801 String replyMsgId) { 802 if (messages.size() > 0) { 803 ClientMessages deadMessages = new ClientMessages(); 804 for (int i = 0; i < messages.size(); i++) { 805 Message message = (Message) messages.elementAt(i); 806 message.delete(); 807 deadMessages.addMessage(message.msg); 808 } 809 sendToDMQ(deadMessages, null); 810 messages.clear(); 811 } 812 replyToTopic(new org.objectweb.joram.shared.admin.AdminReply(true, null), 813 replyTo, requestMsgId, replyMsgId); 814 } 815 816 822 protected void doRightRequest(SetRightRequest not) { 823 if (not.getRight() != -READ) 825 return; 826 827 SetRightRequest rightRequest = preProcess(not); 828 if (rightRequest != null) { 829 AgentId user = rightRequest.getClient(); 830 831 ReceiveRequest request; 832 AccessException exc; 833 ExceptionReply reply; 834 835 if (user == null) { 838 for (int i = 0; i < requests.size(); i++) { 839 request = (ReceiveRequest) requests.get(i); 840 if (! isReader(request.requester)) { 841 exc = new AccessException("Free READ access removed"); 842 reply = new ExceptionReply(request, exc); 843 forward(request.requester, reply); 844 setSave(); 846 requests.remove(i); 847 i--; 848 } 849 } 850 } else { 851 for (int i = 0; i < requests.size(); i++) { 854 request = (ReceiveRequest) requests.get(i); 855 if (user.equals(request.requester)) { 856 exc = new AccessException("READ right removed"); 857 reply = new ExceptionReply(request, exc); 858 forward(request.requester, reply); 859 setSave(); 861 requests.remove(i); 862 i--; 863 } 864 } 865 } 866 postProcess(rightRequest); 867 } 868 } 869 870 875 protected void doClientMessages(AgentId from, ClientMessages not) { 876 receiving = true; 877 ClientMessages clientMsgs = preProcess(from, not); 878 879 if (clientMsgs != null) { 880 Message msg; 881 for (Enumeration msgs = clientMsgs.getMessages().elements(); 883 msgs.hasMoreElements();) { 884 885 msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 886 if (clientMsgs.isPersistent()) { 887 setSave(); 889 } 890 msg.order = arrivalsCounter++; 891 storeMessage(msg); 892 } 893 } 894 895 deliverMessages(0); 897 898 if (clientMsgs != null) 899 postProcess(clientMsgs); 900 901 receiving = false; 902 } 903 904 914 protected void doUnknownAgent(UnknownAgent uA) { 915 AgentId client = uA.agent; 916 Notification not = uA.not; 917 918 if (! (not instanceof QueueMsgReply)) 920 return; 921 922 String msgId; 923 Message message; 924 AgentId consId; 925 ClientMessages deadMessages = null; 926 for (Enumeration e = deliveredMsgs.keys(); e.hasMoreElements();) { 927 msgId = (String ) e.nextElement(); 928 message = (Message) deliveredMsgs.get(msgId); 929 consId = (AgentId) consumers.get(msgId); 930 if (consId.equals(client)) { 933 deliveredMsgs.remove(msgId); 934 message.msg.redelivered = true; 935 936 setSave(); 938 consumers.remove(msgId); 939 contexts.remove(msgId); 940 941 if (isUndeliverable(message)) { 944 message.delete(); 945 message.msg.undeliverable = true; 946 if (deadMessages == null) 947 deadMessages = new ClientMessages(); 948 deadMessages.addMessage(message.msg); 949 } else { 950 storeMessage(message); 952 } 953 954 if (logger.isLoggable(BasicLevel.WARN)) 955 logger.log(BasicLevel.WARN, 956 "Message " + message.getIdentifier() + " denied."); 957 } 958 } 959 if (deadMessages != null) 961 sendToDMQ(deadMessages, null); 962 963 deliverMessages(0); 965 } 966 967 974 protected void doDeleteNot(DeleteNot not) { 975 DestinationException exc = new DestinationException("Queue " + destId 977 + " is deleted."); 978 ReceiveRequest rec; 979 ExceptionReply excRep; 980 cleanWaitingRequest(System.currentTimeMillis()); 982 for (int i = 0; i < requests.size(); i++) { 983 rec = (ReceiveRequest) requests.elementAt(i); 984 985 excRep = new ExceptionReply(rec, exc); 986 if (logger.isLoggable(BasicLevel.DEBUG)) 987 logger.log(BasicLevel.DEBUG, 988 "Requester " + rec.requester + 989 " notified of the queue deletion."); 990 forward(rec.requester, excRep); 991 } 992 if (! messages.isEmpty()) { 994 Message message; 995 ClientMessages deadMessages = new ClientMessages(); 996 while (! messages.isEmpty()) { 997 message = (Message) messages.remove(0); 998 message.msg.deletedDest = true; 999 deadMessages.addMessage(message.msg); 1000 } 1001 sendToDMQ(deadMessages, null); 1002 } 1003 1004 Message.deleteAll(getMsgTxname()); 1006 } 1007 1008 transient String msgTxname = null; 1009 1010 protected final String getMsgTxname() { 1011 if (msgTxname == null) 1012 msgTxname = 'M' + getDestinationId() + '_'; 1013 return msgTxname; 1014 } 1015 1016 protected final void setMsgTxName(Message msg) { 1017 if (msg.getTxName() == null) 1018 msg.setTxName(getMsgTxname() + msg.order); 1019 } 1020 1021 1026 protected final synchronized void storeMessage(Message message) { 1027 addMessage(message); 1028 1029 setMsgTxName(message); 1031 message.save(); 1032 1033 if (logger.isLoggable(BasicLevel.DEBUG)) 1034 logger.log(BasicLevel.DEBUG, 1035 "Message " + message.getIdentifier() + " stored."); 1036 1037 } 1038 1039 protected final synchronized void addMessage(Message message) { 1040 nbMsgsReceiveSinceCreation++; 1041 1042 if (nbMaxMsg > -1 && nbMaxMsg <= messages.size()) { 1043 ClientMessages deadMessages = new ClientMessages(); 1044 deadMessages.addMessage(message.msg); 1045 sendToDMQ(deadMessages, null); 1046 return; 1047 } 1048 1049 if (messages.isEmpty()) { 1050 samePriorities = true; 1051 priority = message.getPriority(); 1052 } else if (samePriorities && priority != message.getPriority()) { 1053 samePriorities = false; 1054 } 1055 1056 if (samePriorities) { 1057 if (receiving) { 1060 messages.add(message); 1062 } else { 1063 long currentO; 1066 int i = 0; 1067 for (Enumeration e = messages.elements(); e.hasMoreElements();) { 1068 currentO = ((Message) e.nextElement()).order; 1069 if (currentO > message.order) break; 1070 i++; 1071 } 1072 messages.insertElementAt(message, i); 1073 } 1074 } else { 1075 Message currentMsg; 1078 int currentP; 1079 long currentO; 1080 int i = 0; 1081 for (Enumeration e = messages.elements(); e.hasMoreElements();) { 1082 currentMsg = (Message) e.nextElement(); 1083 currentP = currentMsg.getPriority(); 1084 currentO = currentMsg.order; 1085 1086 if (! receiving && currentP == message.getPriority()) { 1087 if (currentO > message.order) break; 1090 } else if (currentP < message.getPriority()) { 1091 break; 1093 } 1094 i++; 1095 } 1096 messages.insertElementAt(message, i); 1097 } 1098 } 1099 1100 1108 protected ClientMessages getClientMessages(int nb, String selector, boolean remove) { 1109 if (logger.isLoggable(BasicLevel.DEBUG)) 1110 logger.log(BasicLevel.DEBUG, "QueueImpl.getClientMessages(" + nb + ',' + selector + ',' + remove + ')'); 1111 1112 ClientMessages cm = null ; 1113 List lsMessages = getMessages(nb, selector, remove); 1114 if (lsMessages.size() > 0) { 1115 cm = new ClientMessages(); 1116 } 1117 Message message = null; 1118 Iterator itMessages = lsMessages.iterator(); 1119 while (itMessages.hasNext()) { 1120 message = (Message) itMessages.next(); 1121 cm.addMessage(message.msg); 1122 } 1123 return cm; 1124 } 1125 1126 protected ClientMessages getClientMessages(List lsMsgId, boolean remove) { 1127 if (logger.isLoggable(BasicLevel.DEBUG)) 1128 logger.log(BasicLevel.DEBUG, "QueueImpl.getClientMessages(" + lsMsgId + ',' + remove + ')'); 1129 1130 ClientMessages cm = new ClientMessages(); 1131 Message message = null; 1132 String msgId = null; 1133 Iterator itMsgId = lsMsgId.iterator(); 1134 while (itMsgId.hasNext()) { 1136 msgId = (String ) itMsgId.next(); 1137 message = getMessage(msgId, remove); 1138 1139 if (checkDelivery(message)) { 1140 message.msg.deliveryCount++; 1141 nbMsgsDeliverSinceCreation++; 1142 1143 messageDelivered(message.getIdentifier()); 1145 1146 if (logger.isLoggable(BasicLevel.DEBUG)) 1147 logger.log(BasicLevel.DEBUG, "Message " + msgId); 1148 1149 cm.addMessage(message.msg); 1150 } 1151 } 1152 return cm; 1153 } 1154 1155 1160 protected void removeMessages(List msgIds) { 1161 String id = null; 1162 Iterator itMessages = msgIds.iterator(); 1163 while (itMessages.hasNext()) { 1164 id = (String ) itMessages.next(); 1165 int i = 0; 1166 Message message = null; 1167 while (i < messages.size()) { 1168 message = (Message) messages.get(i); 1169 if (id.equals(message.getIdentifier())) { 1170 messages.remove(i); 1171 message.delete(); 1172 break; 1173 } 1174 } 1175 } 1176 } 1177 1178 1185 private List getMessages(int nb, String selector, boolean remove) { 1186 if (logger.isLoggable(BasicLevel.DEBUG)) 1187 logger.log(BasicLevel.DEBUG, "QueueImpl.getMessage(" + nb + ',' + selector + ',' + remove +')'); 1188 1189 List lsMessages = new ArrayList (); 1190 Message message; 1191 int j = 0; 1192 while ((lsMessages.size() < nb || nb == -1) && j < messages.size()) { 1194 message = (Message) messages.get(j); 1195 1196 if (Selector.matches(message.msg, selector) && 1198 checkDelivery(message)) { 1199 message.msg.deliveryCount++; 1200 nbMsgsDeliverSinceCreation++; 1201 1202 messageDelivered(message.getIdentifier()); 1204 1205 if (logger.isLoggable(BasicLevel.DEBUG)) 1206 logger.log(BasicLevel.DEBUG, "Message " + message.msg.id); 1207 1208 lsMessages.add(message); 1209 1210 if (remove) { 1211 messages.remove(message); 1212 message.delete(); 1213 } 1214 1215 } else { 1216 j++; 1218 } 1219 } 1220 return lsMessages; 1221 } 1222 1223 private Message getMomMessage(String msgId) { 1224 Message msg = null; 1225 for (Enumeration e = messages.elements(); e.hasMoreElements(); ) { 1226 msg = (Message) e.nextElement(); 1227 if (msgId.equals(msg.getIdentifier())) 1228 return msg; 1229 } 1230 return msg; 1231 } 1232 1233 1240 protected Message getMessage(String msgId, boolean remove) { 1241 if (logger.isLoggable(BasicLevel.DEBUG)) 1242 logger.log(BasicLevel.DEBUG, "QueueImpl.getMessage(" + msgId + ',' + remove + ')'); 1243 1244 Message message = getMomMessage(msgId); 1245 if (checkDelivery(message)) { 1246 message.msg.deliveryCount++; 1247 nbMsgsDeliverSinceCreation++; 1248 1249 messageDelivered(message.getIdentifier()); 1251 1252 if (logger.isLoggable(BasicLevel.DEBUG)) 1253 logger.log(BasicLevel.DEBUG, "Message " + msgId); 1254 1255 if (remove) { 1256 messages.remove(message); 1257 message.delete(); 1258 } 1259 } 1260 return message; 1261 } 1262 1263 1270 protected void deliverMessages(int index) { 1271 if (logger.isLoggable(BasicLevel.DEBUG)) 1272 logger.log(BasicLevel.DEBUG, "QueueImpl.deliverMessages(" + index + ')'); 1273 1274 ReceiveRequest notRec = null; 1275 Message message; 1276 QueueMsgReply notMsg; 1277 ClientMessages deadMessages = null; 1278 List lsMessages = null; 1279 1280 if (logger.isLoggable(BasicLevel.DEBUG)) 1281 logger.log(BasicLevel.DEBUG, " -> requests = " + requests + ')'); 1282 1283 long current = System.currentTimeMillis(); 1284 cleanWaitingRequest(current); 1285 deadMessages = cleanPendingMessage(current); 1287 1288 while (! messages.isEmpty() && index < requests.size()) { 1290 notRec = (ReceiveRequest) requests.get(index); 1291 notMsg = new QueueMsgReply(notRec); 1292 1293 lsMessages = getMessages(notRec.getMessageCount(), notRec.getSelector(), notRec.getAutoAck()); 1294 1295 if (!notRec.getAutoAck()) { 1296 Iterator itMessages = lsMessages.iterator(); 1297 while (itMessages.hasNext()) { 1298 message = (Message) itMessages.next(); 1299 notMsg.addMessage(message.msg); 1300 consumers.put(message.getIdentifier(), notRec.requester); 1302 contexts.put(message.getIdentifier(), 1303 new Integer (notRec.getClientContext())); 1304 deliveredMsgs.put(message.getIdentifier(), message); 1305 messages.remove(message); 1306 1307 if (logger.isLoggable(BasicLevel.DEBUG)) 1308 logger.log(BasicLevel.DEBUG, 1309 "Message " + message.msg.id + " to " + notRec.requester + 1310 " as reply to " + notRec.getRequestId()); 1311 } 1312 } 1313 1314 if (isLocal(notRec.requester)) { 1315 notMsg.setPersistent(false); 1316 } 1317 1318 if (notMsg.isPersistent() && !notRec.getAutoAck()) { 1319 setSave(); 1321 } 1322 1323 if (notMsg.getSize() > 0) { 1325 requests.remove(index); 1326 forward(notRec.requester, notMsg); 1327 } else { 1328 index++; 1329 } 1330 } 1331 if (deadMessages != null) 1333 sendToDMQ(deadMessages, null); 1334 } 1335 1336 1431 protected boolean checkDelivery(Message msg) { 1432 return true; 1433 } 1434 1435 1439 protected void messageDelivered(String msgId) {} 1440 1441 1445 protected void messageRemoved(String msgId) {} 1446 1447 1452 protected boolean isUndeliverable(Message message) { 1453 if (threshold != null) 1454 return message.msg.deliveryCount == threshold.intValue(); 1455 else if (DeadMQueueImpl.threshold != null) 1456 return message.msg.deliveryCount == DeadMQueueImpl.threshold.intValue(); 1457 return false; 1458 } 1459 1460 1461 private void readObject(java.io.ObjectInputStream in) 1462 throws IOException , ClassNotFoundException { 1463 if (logger.isLoggable(BasicLevel.DEBUG)) 1464 logger.log(BasicLevel.DEBUG, "QueueImpl.readObject()"); 1465 in.defaultReadObject(); 1466 1467 cleanWaitingRequest(System.currentTimeMillis()); 1468 1469 receiving = false; 1470 messages = new Vector (); 1471 deliveredMsgs = new Hashtable (); 1472 1473 Vector persistedMsgs = null; 1475 persistedMsgs = Message.loadAll(getMsgTxname()); 1476 1477 if (persistedMsgs != null) { 1478 Message persistedMsg; 1479 AgentId consId; 1480 while (! persistedMsgs.isEmpty()) { 1481 persistedMsg = (Message) persistedMsgs.remove(0); 1482 consId = (AgentId) consumers.get(persistedMsg.getIdentifier()); 1483 if (consId == null) { 1484 addMessage(persistedMsg); 1485 } else if (isLocal(consId)) { 1486 if (logger.isLoggable(BasicLevel.DEBUG)) 1487 logger.log(BasicLevel.DEBUG, 1488 " -> deny " + persistedMsg.getIdentifier()); 1489 consumers.remove(persistedMsg.getIdentifier()); 1490 contexts.remove(persistedMsg.getIdentifier()); 1491 addMessage(persistedMsg); 1492 } else { 1493 deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg); 1494 } 1495 } 1496 } 1497 } 1498 1499 public void readBag(ObjectInputStream in) throws IOException , ClassNotFoundException { 1500 receiving = in.readBoolean(); 1501 messages = (Vector ) in.readObject(); 1502 deliveredMsgs = (Hashtable ) in.readObject(); 1503 1504 for (int i = 0; i < messages.size(); i++) { 1505 Message message = (Message)messages.elementAt(i); 1506 setMsgTxName(message); 1508 message.save(); 1509 } 1510 } 1511 1512 public void writeBag(ObjectOutputStream out) throws IOException { 1513 out.writeBoolean(receiving); 1514 out.writeObject(messages); 1515 out.writeObject(deliveredMsgs); 1516 } 1517} 1518 | Popular Tags |