1 22 package org.jboss.mq.server; 23 24 import java.util.ArrayList ; 25 import java.util.Date ; 26 import java.util.HashMap ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.LinkedList ; 30 import java.util.List ; 31 import java.util.Map ; 32 import java.util.Set ; 33 import java.util.SortedSet ; 34 import java.util.TreeSet ; 35 36 import javax.jms.IllegalStateException ; 37 import javax.jms.JMSException ; 38 39 import org.jboss.logging.Logger; 40 import org.jboss.mq.AcknowledgementRequest; 41 import org.jboss.mq.DestinationFullException; 42 import org.jboss.mq.SpyDestination; 43 import org.jboss.mq.SpyJMSException; 44 import org.jboss.mq.SpyMessage; 45 import org.jboss.mq.Subscription; 46 import org.jboss.mq.pm.Tx; 47 import org.jboss.mq.pm.TxManager; 48 import org.jboss.mq.selectors.Selector; 49 import org.jboss.util.NestedRuntimeException; 50 import org.jboss.util.timeout.Timeout; 51 import org.jboss.util.timeout.TimeoutTarget; 52 53 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 54 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 55 56 76 public class BasicQueue 77 { 78 static final Logger log = Logger.getLogger(BasicQueue.class); 79 80 82 SortedSet messages = new TreeSet (); 83 84 85 ConcurrentHashMap events = new ConcurrentHashMap(); 86 87 88 CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet(); 89 90 91 JMSDestinationManager server; 92 93 94 Receivers receivers; 95 96 97 String description; 98 99 100 MessageCounter counter; 101 102 104 HashMap unacknowledgedMessages = new HashMap (); 105 107 HashMap unackedByMessageRef = new HashMap (); 108 110 HashMap unackedBySubscription = new HashMap (); 111 112 114 HashSet subscribers = new HashSet (); 115 116 118 HashSet removedSubscribers = new HashSet (); 119 120 121 BasicQueueParameters parameters; 122 123 124 boolean stopped = false; 125 126 134 public BasicQueue(JMSDestinationManager server, String description, BasicQueueParameters parameters) 135 throws JMSException 136 { 137 this.server = server; 138 this.description = description; 139 this.parameters = parameters; 140 141 Class receiversImpl = parameters.receiversImpl; 142 if (receiversImpl == null) 143 receiversImpl = ReceiversImpl.class; 144 145 try 146 { 147 receivers = (Receivers) receiversImpl.newInstance(); 148 } 149 catch (Throwable t) 150 { 151 throw new SpyJMSException("Error instantiating receivers implementation: " + receiversImpl, t); 152 } 153 } 154 155 160 public String getDescription() 161 { 162 return description; 163 } 164 165 170 public int getReceiversCount() 171 { 172 return receivers.size(); 173 } 174 175 180 public ArrayList getReceivers() 181 { 182 synchronized (receivers) 183 { 184 return receivers.listReceivers(); 185 } 186 } 187 188 193 public boolean isInUse() 194 { 195 synchronized (receivers) 196 { 197 return subscribers.size() > 0; 198 } 199 } 200 201 207 public void addReceiver(Subscription sub) throws JMSException 208 { 209 boolean trace = log.isTraceEnabled(); 210 if (trace) 211 log.trace("addReceiver " + sub + " " + this); 212 213 MessageReference found = null; 214 synchronized (messages) 215 { 216 if (messages.size() != 0) 217 { 218 for (Iterator it = messages.iterator(); it.hasNext();) 219 { 220 MessageReference message = (MessageReference) it.next(); 221 try 222 { 223 if (message.isExpired()) 224 { 225 it.remove(); 226 expireMessageAsync(message); 227 } 228 else if (sub.accepts(message.getHeaders())) 229 { 230 it.remove(); 232 found = message; 233 break; 234 } 235 } 236 catch (JMSException ignore) 237 { 238 log.info("Caught unusual exception in addToReceivers.", ignore); 239 } 240 } 241 } 242 } 243 if (found != null) 244 queueMessageForSending(sub, found); 245 else 246 addToReceivers(sub); 247 } 248 249 254 public Set getSubscribers() 255 { 256 synchronized (receivers) 257 { 258 return (Set ) subscribers.clone(); 259 } 260 } 261 262 268 public void addSubscriber(Subscription sub) throws JMSException 269 { 270 boolean trace = log.isTraceEnabled(); 271 if (trace) 272 log.trace("addSubscriber " + sub + " " + this); 273 synchronized (receivers) 274 { 275 if (stopped) 276 throw new IllegalStateException ("The destination is stopped " + getDescription()); 277 subscribers.add(sub); 278 } 279 } 280 281 286 public void removeSubscriber(Subscription sub) 287 { 288 boolean trace = log.isTraceEnabled(); 289 if (trace) 290 log.trace("removeSubscriber " + sub + " " + this); 291 synchronized (receivers) 292 { 293 removeReceiver(sub); 294 synchronized (messages) 295 { 296 if (hasUnackedMessages(sub)) 297 { 298 if (trace) 299 log.trace("Delaying removal of subscriber is has unacked messages " + sub); 300 removedSubscribers.add(sub); 301 } 302 else 303 { 304 if (trace) 305 log.trace("Removing subscriber " + sub); 306 subscribers.remove(sub); 307 ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId); 308 } 309 } 310 } 311 } 312 313 318 public int getQueueDepth() 319 { 320 return messages.size(); 321 } 322 323 328 public int getScheduledMessageCount() 329 { 330 return scheduledMessages.size(); 331 } 332 333 338 public int getInProcessMessageCount() 339 { 340 synchronized (messages) 341 { 342 return unacknowledgedMessages.size(); 343 } 344 } 345 346 353 public void addMessage(MessageReference mes, Tx txId) throws JMSException 354 { 355 boolean trace = log.isTraceEnabled(); 356 if (trace) 357 log.trace("addMessage " + mes + " " + txId + " " + this); 358 359 try 360 { 361 synchronized (receivers) 362 { 363 if (stopped) 364 throw new IllegalStateException ("The destination is stopped " + getDescription()); 365 } 366 367 if (parameters.maxDepth > 0) 368 { 369 synchronized (messages) 370 { 371 if (messages.size() >= parameters.maxDepth) 372 { 373 dropMessage(mes); 374 String message = "Maximum size " + parameters.maxDepth + 375 " exceeded for " + description; 376 log.warn(message); 377 throw new DestinationFullException(message); 378 } 379 } 380 } 381 382 performOrPrepareAddMessage(mes, txId); 383 } 384 catch (Throwable t) 385 { 386 String error = "Error in addMessage " + mes; 387 log.trace(error, t); 388 dropMessage(mes, txId); 389 SpyJMSException.rethrowAsJMSException(error, t); 390 } 391 } 392 393 400 protected void performOrPrepareAddMessage(MessageReference mes, Tx txId) throws Exception 401 { 402 TxManager txManager = server.getPersistenceManager().getTxManager(); 403 404 Runnable task = new AddMessagePostRollBackTask(mes); 406 txManager.addPostRollbackTask(txId, task); 407 408 task = new AddMessagePostCommitTask(mes); 410 txManager.addPostCommitTask(txId, task); 411 } 412 413 418 public void restoreMessage(MessageReference mes) 419 { 420 restoreMessage(mes, null, Tx.UNKNOWN); 421 } 422 423 430 public void restoreMessage(MessageReference mes, Tx txid, int type) 431 { 432 boolean trace = log.isTraceEnabled(); 433 if (trace) 434 log.trace("restoreMessage " + mes + " " + this + " txid=" + txid + " type=" + type); 435 436 try 437 { 438 if (txid == null) 439 { 440 internalAddMessage(mes); 441 } 442 else if (type == Tx.ADD) 443 { 444 performOrPrepareAddMessage(mes, txid); 445 } 446 else if (type == Tx.REMOVE) 447 { 448 performOrPrepareAcknowledgeMessage(mes, txid); 449 } 450 else 451 { 452 throw new IllegalStateException ("Unknown restore type " + type + " for message " + mes + " txid=" + txid); 453 } 454 } 455 catch (RuntimeException e) 456 { 457 throw e; 458 } 459 catch (Exception e) 460 { 461 throw new NestedRuntimeException("Unable to restore message " + mes, e); 462 } 463 } 464 465 468 protected void nackMessage(MessageReference message) 469 { 470 if (log.isTraceEnabled()) 471 log.trace("Restoring message: " + message); 472 473 try 474 { 475 message.redelivered(); 476 message.invalidate(); 478 if (message.isPersistent()) 481 server.getPersistenceManager().update(message, null); 482 } 483 catch (JMSException e) 484 { 485 log.error("Caught unusual exception in nackMessage for " + message, e); 486 } 487 488 internalAddMessage(message); 489 } 490 491 499 public SpyMessage[] browse(String selector) throws JMSException 500 { 501 if (selector == null) 502 { 503 SpyMessage list[]; 504 synchronized (messages) 505 { 506 list = new SpyMessage[messages.size()]; 507 Iterator iter = messages.iterator(); 508 for (int i = 0; iter.hasNext(); i++) 509 list[i] = ((MessageReference) iter.next()).getMessageForDelivery(); 510 } 511 return list; 512 } 513 else 514 { 515 Selector s = new Selector(selector); 516 LinkedList selection = new LinkedList (); 517 518 synchronized (messages) 519 { 520 Iterator i = messages.iterator(); 521 while (i.hasNext()) 522 { 523 MessageReference m = (MessageReference) i.next(); 524 if (s.test(m.getHeaders())) 525 selection.add(m.getMessageForDelivery()); 526 } 527 } 528 529 SpyMessage list[]; 530 list = new SpyMessage[selection.size()]; 531 list = (SpyMessage[]) selection.toArray(list); 532 return list; 533 } 534 } 535 536 544 public List browseScheduled(String selector) throws JMSException 545 { 546 if (selector == null) 547 { 548 ArrayList list; 549 synchronized (messages) 550 { 551 list = new ArrayList (scheduledMessages.size()); 552 Iterator iter = scheduledMessages.iterator(); 553 while (iter.hasNext()) 554 { 555 MessageReference ref = (MessageReference) iter.next(); 556 list.add(ref.getMessageForDelivery()); 557 } 558 } 559 return list; 560 } 561 else 562 { 563 Selector s = new Selector(selector); 564 LinkedList selection = new LinkedList (); 565 566 synchronized (messages) 567 { 568 Iterator iter = scheduledMessages.iterator(); 569 while (iter.hasNext()) 570 { 571 MessageReference ref = (MessageReference) iter.next(); 572 if (s.test(ref.getHeaders())) 573 selection.add(ref.getMessageForDelivery()); 574 } 575 } 576 577 return selection; 578 } 579 } 580 581 589 public List browseInProcess(String selector) throws JMSException 590 { 591 if (selector == null) 592 { 593 ArrayList list; 594 synchronized (messages) 595 { 596 list = new ArrayList (unacknowledgedMessages.size()); 597 Iterator iter = unacknowledgedMessages.values().iterator(); 598 while (iter.hasNext()) 599 { 600 UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next(); 601 MessageReference ref = unacked.messageRef; 602 list.add(ref.getMessageForDelivery()); 603 } 604 } 605 return list; 606 } 607 else 608 { 609 Selector s = new Selector(selector); 610 LinkedList selection = new LinkedList (); 611 612 synchronized (messages) 613 { 614 Iterator iter = unacknowledgedMessages.values().iterator(); 615 while (iter.hasNext()) 616 { 617 UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next(); 618 MessageReference ref = unacked.messageRef; 619 if (s.test(ref.getHeaders())) 620 selection.add(ref.getMessageForDelivery()); 621 } 622 } 623 624 return selection; 625 } 626 } 627 628 636 public SpyMessage receive(Subscription sub, boolean wait) throws JMSException 637 { 638 boolean trace = log.isTraceEnabled(); 639 if (trace) 640 log.trace("receive " + sub + " wait=" + wait + " " + this); 641 642 MessageReference messageRef = null; 643 synchronized (receivers) 644 { 645 if (stopped) 646 throw new IllegalStateException ("The destination is stopped " + getDescription()); 647 if (sub.getSelector() == null && sub.noLocal == false) 649 { 650 synchronized (messages) 651 { 652 while (messages.size() != 0) 654 { 655 messageRef = (MessageReference) messages.first(); 656 messages.remove(messageRef); 657 658 if (messageRef.isExpired()) 659 { 660 expireMessageAsync(messageRef); 661 messageRef = null; 662 } 663 else 664 break; 665 } 666 } 667 } 668 else 669 { 670 synchronized (messages) 672 { 673 Iterator i = messages.iterator(); 674 while (i.hasNext()) 675 { 676 MessageReference mr = (MessageReference) i.next(); 677 if (mr.isExpired()) 678 { 679 i.remove(); 680 expireMessageAsync(mr); 681 } 682 else if (sub.accepts(mr.getHeaders())) 683 { 684 messageRef = mr; 685 i.remove(); 686 break; 687 } 688 } 689 } 690 } 691 692 if (messageRef == null) 693 { 694 if (wait) 695 addToReceivers(sub); 696 } 697 else 698 { 699 setupMessageAcknowledgement(sub, messageRef); 700 } 701 } 702 703 if (messageRef == null) 704 return null; 705 return messageRef.getMessageForDelivery(); 706 } 707 708 715 public void acknowledge(AcknowledgementRequest item, Tx txId) throws JMSException 716 { 717 boolean trace = log.isTraceEnabled(); 718 if (trace) 719 log.trace("acknowledge " + item + " " + txId + " " + this); 720 721 UnackedMessageInfo unacked = null; 722 synchronized (messages) 723 { 724 unacked = (UnackedMessageInfo) unacknowledgedMessages.remove(item); 725 if (unacked == null) 726 return; 727 unackedByMessageRef.remove(unacked.messageRef); 728 HashMap map = (HashMap ) unackedBySubscription.get(unacked.sub); 729 if (map != null) 730 map.remove(unacked.messageRef); 731 if (map == null || map.isEmpty()) 732 unackedBySubscription.remove(unacked.sub); 733 } 734 735 MessageReference m = unacked.messageRef; 736 737 if (!item.isAck) 739 { 740 Runnable task = new RestoreMessageTask(m); 741 server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task); 742 server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task); 743 } 744 else 745 { 746 try 747 { 748 if (m.isPersistent()) 749 server.getPersistenceManager().remove(m, txId); 750 } 751 catch (Throwable t) 752 { 753 Runnable task = new RestoreMessageTask(m); 756 server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task); 757 server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task); 758 SpyJMSException.rethrowAsJMSException("Error during ACK ref=" + m, t); 759 } 760 761 performOrPrepareAcknowledgeMessage(m, txId); 762 } 763 764 synchronized (receivers) 765 { 766 synchronized (messages) 767 { 768 checkRemovedSubscribers(unacked.sub); 769 } 770 } 771 } 772 773 780 protected void performOrPrepareAcknowledgeMessage(MessageReference mes, Tx txId) throws JMSException 781 { 782 TxManager txManager = server.getPersistenceManager().getTxManager(); 783 784 Runnable task = new RestoreMessageTask(mes); 786 txManager.addPostRollbackTask(txId, task); 787 788 task = new RemoveMessageTask(mes); 790 txManager.addPostCommitTask(txId, task); 791 } 792 793 798 public void nackMessages(Subscription sub) 799 { 800 boolean trace = log.isTraceEnabled(); 801 if (trace) 802 log.trace("nackMessages " + sub + " " + this); 803 804 synchronized (receivers) 806 { 807 synchronized (messages) 808 { 809 int count = 0; 810 HashMap map = (HashMap ) unackedBySubscription.get(sub); 811 if (map != null) 812 { 813 Iterator i = ((HashMap ) map.clone()).values().iterator(); 814 while (i.hasNext()) 815 { 816 AcknowledgementRequest item = (AcknowledgementRequest) i.next(); 817 try 818 { 819 acknowledge(item, null); 820 count++; 821 } 822 catch (JMSException ignore) 823 { 824 log.debug("Unable to nack message: " + item, ignore); 825 } 826 } 827 if (log.isDebugEnabled()) 828 log.debug("Nacked " + count + " messages for removed subscription " + sub); 829 } 830 } 831 } 832 } 833 834 public void removeAllMessages() throws JMSException 835 { 836 boolean trace = log.isTraceEnabled(); 837 if (trace) 838 log.trace("removeAllMessages " + this); 839 840 for (Iterator i = events.entrySet().iterator(); i.hasNext();) 842 { 843 Map.Entry entry = (Map.Entry ) i.next(); 844 MessageReference message = (MessageReference) entry.getKey(); 845 Timeout timeout = (Timeout) entry.getValue(); 846 if (timeout != null) 847 { 848 timeout.cancel(); 849 i.remove(); 850 dropMessage(message); 851 } 852 } 853 scheduledMessages.clear(); 854 855 synchronized (receivers) 856 { 857 synchronized (messages) 858 { 859 Iterator i = ((HashMap ) unacknowledgedMessages.clone()).keySet().iterator(); 860 while (i.hasNext()) 861 { 862 AcknowledgementRequest item = (AcknowledgementRequest) i.next(); 863 try 864 { 865 acknowledge(item, null); 866 } 867 catch (JMSException ignore) 868 { 869 } 870 } 871 872 i = messages.iterator(); 874 while (i.hasNext()) 875 { 876 MessageReference message = (MessageReference) i.next(); 877 i.remove(); 878 dropMessage(message); 879 } 880 } 881 } 882 } 883 884 public void stop() 885 { 886 HashSet subs; 887 synchronized (receivers) 888 { 889 stopped = true; 890 subs = new HashSet (subscribers); 891 if (log.isTraceEnabled()) 892 log.trace("Stopping " + this + " with subscribers " + subs); 893 clearEvents(); 894 } 895 896 for (Iterator i = subs.iterator(); i.hasNext();) 897 { 898 Subscription sub = (Subscription) i.next(); 899 ClientConsumer consumer = (ClientConsumer) sub.clientConsumer; 900 try 901 { 902 consumer.removeSubscription(sub.subscriptionId); 903 } 904 catch (Throwable t) 905 { 906 log.warn("Error during stop - removing subscriber " + sub, t); 907 } 908 nackMessages(sub); 909 } 910 911 MessageCache cache = server.getMessageCache(); 912 synchronized (messages) 913 { 914 for (Iterator i = messages.iterator(); i.hasNext();) 915 { 916 MessageReference message = (MessageReference) i.next(); 917 try 918 { 919 cache.remove(message); 920 } 921 catch (JMSException ignored) 922 { 923 log.trace("Ignored error removing message from cache", ignored); 924 } 925 } 926 } 927 928 messages.clear(); 930 unacknowledgedMessages.clear(); 931 unackedByMessageRef.clear(); 932 unackedBySubscription.clear(); 933 subscribers.clear(); 934 removedSubscribers.clear(); 935 } 936 937 949 public void createMessageCounter(String name, String subscription, boolean topic, boolean durable, int daycountmax) 950 { 951 counter = new MessageCounter(name, subscription, this, topic, durable, daycountmax); 953 } 954 955 960 public MessageCounter getMessageCounter() 961 { 962 return counter; 963 } 964 965 public String toString() 966 { 967 return super.toString() + "{id=" + description + '}'; 968 } 969 970 973 protected void clearEvents() 974 { 975 for (Iterator i = events.entrySet().iterator(); i.hasNext();) 976 { 977 Map.Entry entry = (Map.Entry ) i.next(); 978 Timeout timeout = (Timeout) entry.getValue(); 979 if (timeout != null) 980 { 981 timeout.cancel(); 982 i.remove(); 983 } 984 } 985 scheduledMessages.clear(); 986 } 987 988 993 protected void clearEvent(MessageReference message) 994 { 995 Timeout timeout = (Timeout) events.remove(message); 996 if (timeout != null) 997 timeout.cancel(); 998 scheduledMessages.remove(message); 999 } 1000 1001 1006 protected void addToReceivers(Subscription sub) throws JMSException 1007 { 1008 boolean trace = log.isTraceEnabled(); 1009 if (trace) 1010 log.trace("addReceiver " + " " + sub + " " + this); 1011 1012 synchronized (receivers) 1013 { 1014 if (stopped) 1015 throw new IllegalStateException ("The destination is stopped " + getDescription()); 1016 receivers.add(sub); 1017 } 1018 } 1019 1020 1025 protected void removeReceiver(Subscription sub) 1026 { 1027 boolean trace = log.isTraceEnabled(); 1028 if (trace) 1029 log.trace("removeReceiver " + " " + sub + " " + this); 1030 1031 synchronized (receivers) 1032 { 1033 receivers.remove(sub); 1034 } 1035 } 1036 1037 private void addTimeout(MessageReference message, TimeoutTarget t, long ts) 1038 { 1039 Timeout timeout = server.getTimeoutFactory().schedule(ts, t); 1040 events.put(message, timeout); 1041 } 1042 1043 1048 private void internalAddMessage(MessageReference message) 1049 { 1050 boolean trace = log.isTraceEnabled(); 1051 if (trace) 1052 log.trace("internalAddMessage " + " " + message + " " + this); 1053 1054 long ts = message.messageScheduledDelivery; 1056 if (ts > 0 && ts > System.currentTimeMillis()) 1057 { 1058 scheduledMessages.add(message); 1059 addTimeout(message, new EnqueueMessageTask(message), ts); 1060 if (trace) 1061 log.trace("scheduled message at " + new Date (ts) + ": " + message); 1062 return; 1064 } 1065 1066 if (message.isExpired()) 1068 { 1069 expireMessageAsync(message); 1070 return; 1071 } 1072 1073 try 1074 { 1075 Subscription found = null; 1076 synchronized (receivers) 1077 { 1078 if (receivers.size() != 0) 1079 { 1080 for (Iterator it = receivers.iterator(); it.hasNext();) 1081 { 1082 Subscription sub = (Subscription) it.next(); 1083 if (sub.accepts(message.getHeaders())) 1084 { 1085 it.remove(); 1086 found = sub; 1087 break; 1088 } 1089 } 1090 } 1091 1092 if (found == null) 1093 { 1094 synchronized (messages) 1095 { 1096 messages.add(message); 1097 1098 if (message.messageExpiration > 0) 1101 { 1102 addTimeout(message, new ExpireMessageTask(message), message.messageExpiration); 1103 } 1104 } 1105 } 1106 } 1107 1108 if (found != null) 1110 queueMessageForSending(found, message); 1111 } 1112 catch (JMSException e) 1113 { 1114 log.error("Caught unusual exception in internalAddMessage.", e); 1116 dropMessage(message); 1118 } 1119 } 1120 1121 1127 protected void queueMessageForSending(Subscription sub, MessageReference message) 1128 { 1129 boolean trace = log.isTraceEnabled(); 1130 if (trace) 1131 log.trace("queueMessageForSending " + " " + sub + " " + message + " " + this); 1132 1133 try 1134 { 1135 setupMessageAcknowledgement(sub, message); 1136 RoutedMessage r = new RoutedMessage(); 1137 r.message = message; 1138 r.subscriptionId = new Integer (sub.subscriptionId); 1139 ((ClientConsumer) sub.clientConsumer).queueMessageForSending(r); 1140 } 1141 catch (Throwable t) 1142 { 1143 log.warn("Caught unusual exception sending message to receiver.", t); 1144 } 1145 } 1146 1147 1154 protected void setupMessageAcknowledgement(Subscription sub, MessageReference messageRef) throws JMSException 1155 { 1156 SpyMessage message = messageRef.getMessage(); 1157 AcknowledgementRequest nack = new AcknowledgementRequest(false); 1158 nack.destination = message.getJMSDestination(); 1159 nack.messageID = message.getJMSMessageID(); 1160 nack.subscriberId = sub.subscriptionId; 1161 1162 synchronized (messages) 1163 { 1164 UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub); 1165 unacknowledgedMessages.put(nack, unacked); 1166 unackedByMessageRef.put(messageRef, nack); 1167 HashMap map = (HashMap ) unackedBySubscription.get(sub); 1168 if (map == null) 1169 { 1170 map = new HashMap (); 1171 unackedBySubscription.put(sub, map); 1172 } 1173 map.put(messageRef, nack); 1174 } 1175 } 1176 1177 1182 protected void dropMessage(MessageReference message) 1183 { 1184 dropMessage(message, null); 1185 } 1186 1187 1193 protected void dropMessage(MessageReference message, Tx txid) 1194 { 1195 boolean trace = log.isTraceEnabled(); 1196 if (trace) 1197 log.trace("dropMessage " + this + " txid=" + txid); 1198 1199 clearEvent(message); 1200 try 1201 { 1202 if (message.isPersistent()) 1203 { 1204 try 1205 { 1206 server.getPersistenceManager().remove(message, txid); 1207 } 1208 catch (JMSException e) 1209 { 1210 try 1211 { 1212 log.warn("Message removed from queue, but not from the persistent store: " + message.getMessage(), e); 1213 } 1214 catch (JMSException x) 1215 { 1216 log.warn("Message removed from queue, but not from the persistent store: " + message, e); 1217 } 1218 } 1219 } 1220 server.getMessageCache().remove(message); 1221 } 1222 catch (JMSException e) 1223 { 1224 log.warn("Error dropping message " + message, e); 1225 } 1226 } 1227 1228 1233 protected void expireMessageAsync(MessageReference messageRef) 1234 { 1235 server.getThreadPool().run(new ExpireMessageTask(messageRef)); 1236 } 1237 1238 1243 protected void expireMessage(MessageReference messageRef) 1244 { 1245 boolean trace = log.isTraceEnabled(); 1246 if (trace) 1247 log.trace("message expired: " + messageRef); 1248 1249 SpyDestination ed = parameters.expiryDestination; 1250 if (ed == null) 1251 { 1252 dropMessage(messageRef); 1253 return; 1254 } 1255 1256 if (trace) 1257 log.trace("sending to: " + ed); 1258 1259 try 1260 { 1261 SpyMessage orig = messageRef.getMessage(); 1262 SpyMessage copy = orig.myClone(); 1263 copy.header.jmsPropertiesReadWrite = true; 1264 copy.setJMSExpiration(0); 1265 copy.setJMSDestination(ed); 1266 copy.setLongProperty(SpyMessage.PROPERTY_ORIG_EXPIRATION, orig.getJMSExpiration()); 1267 copy.setStringProperty(SpyMessage.PROPERTY_ORIG_DESTINATION, orig.getJMSDestination().toString()); 1268 TxManager tm = server.getPersistenceManager().getTxManager(); 1269 Tx tx = tm.createTx(); 1270 try 1271 { 1272 server.addMessage(null, copy, tx); 1273 dropMessage(messageRef, tx); 1274 tm.commitTx(tx); 1275 } 1276 catch (JMSException e) 1277 { 1278 tm.rollbackTx(tx); 1279 throw e; 1280 } 1281 } 1282 catch (JMSException e) 1283 { 1284 log.error("Could not move expired message: " + messageRef, e); 1285 } 1286 } 1287 1288 1294 private void checkRemovedSubscribers(Subscription sub) 1295 { 1296 boolean trace = log.isTraceEnabled(); 1297 if (removedSubscribers.contains(sub) && hasUnackedMessages(sub) == false) 1298 { 1299 if (trace) 1300 log.trace("Removing subscriber " + sub); 1301 removedSubscribers.remove(sub); 1302 subscribers.remove(sub); 1303 ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId); 1304 } 1305 } 1306 1307 1314 private boolean hasUnackedMessages(Subscription sub) 1315 { 1316 return unackedBySubscription.containsKey(sub); 1317 } 1318 1319 1322 class AddMessagePostRollBackTask implements Runnable 1323 { 1324 MessageReference message; 1325 1326 AddMessagePostRollBackTask(MessageReference m) 1327 { 1328 message = m; 1329 } 1330 1331 public void run() 1332 { 1333 try 1334 { 1335 server.getMessageCache().remove(message); 1336 } 1337 catch (JMSException e) 1338 { 1339 log.error("Could not remove message from the message cache after an add rollback: ", e); 1340 } 1341 } 1342 } 1343 1344 1347 class AddMessagePostCommitTask implements Runnable 1348 { 1349 MessageReference message; 1350 1351 AddMessagePostCommitTask(MessageReference m) 1352 { 1353 message = m; 1354 } 1355 1356 public void run() 1357 { 1358 internalAddMessage(message); 1359 1360 if (counter != null) 1362 { 1363 counter.incrementCounter(); 1364 } 1365 } 1366 } 1367 1368 1371 class RestoreMessageTask implements Runnable 1372 { 1373 MessageReference message; 1374 1375 RestoreMessageTask(MessageReference m) 1376 { 1377 message = m; 1378 } 1379 1380 public void run() 1381 { 1382 nackMessage(message); 1383 } 1384 } 1385 1386 1389 class RemoveMessageTask implements Runnable 1390 { 1391 MessageReference message; 1392 1393 RemoveMessageTask(MessageReference m) 1394 { 1395 message = m; 1396 } 1397 1398 public void run() 1399 { 1400 try 1401 { 1402 clearEvent(message); 1403 server.getMessageCache().remove(message); 1404 } 1405 catch (JMSException e) 1406 { 1407 log.error("Could not remove an acknowleged message from the message cache: ", e); 1408 } 1409 } 1410 } 1411 1412 1415 private class EnqueueMessageTask implements TimeoutTarget 1416 { 1417 private MessageReference messageRef; 1418 1419 public EnqueueMessageTask(MessageReference messageRef) 1420 { 1421 this.messageRef = messageRef; 1422 } 1423 1424 public void timedOut(Timeout timeout) 1425 { 1426 if (log.isTraceEnabled()) 1427 log.trace("scheduled message delivery: " + messageRef); 1428 events.remove(messageRef); 1429 scheduledMessages.remove(messageRef); 1430 internalAddMessage(messageRef); 1431 } 1432 } 1433 1434 1437 private class ExpireMessageTask implements TimeoutTarget, Runnable 1438 { 1439 private MessageReference messageRef; 1440 1441 public ExpireMessageTask(MessageReference messageRef) 1442 { 1443 this.messageRef = messageRef; 1444 } 1445 1446 public void timedOut(Timeout timout) 1447 { 1448 events.remove(messageRef); 1449 scheduledMessages.remove(messageRef); 1450 synchronized (messages) 1451 { 1452 if (messages.remove(messageRef) == false) 1455 return; 1456 } 1457 expireMessage(messageRef); 1458 } 1459 1460 public void run() 1461 { 1462 expireMessage(messageRef); 1463 } 1464 } 1465 1466 1469 private static class UnackedMessageInfo 1470 { 1471 public MessageReference messageRef; 1472 public Subscription sub; 1473 public UnackedMessageInfo(MessageReference messageRef, Subscription sub) 1474 { 1475 this.messageRef = messageRef; 1476 this.sub = sub; 1477 } 1478 } 1479} 1480 | Popular Tags |