| 1 22 package org.jboss.mq.server; 23 24 import java.util.ArrayList ; 25 import java.util.Date ; 26 import java.util.HashMap ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.LinkedList ; 30 import java.util.List ; 31 import java.util.Map ; 32 import java.util.Set ; 33 import java.util.SortedSet ; 34 import java.util.TreeSet ; 35 36 import javax.jms.IllegalStateException ; 37 import javax.jms.JMSException ; 38 39 import org.jboss.logging.Logger; 40 import org.jboss.mq.AcknowledgementRequest; 41 import org.jboss.mq.DestinationFullException; 42 import org.jboss.mq.SpyDestination; 43 import org.jboss.mq.SpyJMSException; 44 import org.jboss.mq.SpyMessage; 45 import org.jboss.mq.Subscription; 46 import org.jboss.mq.pm.Tx; 47 import org.jboss.mq.pm.TxManager; 48 import org.jboss.mq.selectors.Selector; 49 import org.jboss.util.NestedRuntimeException; 50 import org.jboss.util.timeout.Timeout; 51 import org.jboss.util.timeout.TimeoutTarget; 52 53 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 54 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 55 56 76 public class BasicQueue 77 { 78 static final Logger log = Logger.getLogger(BasicQueue.class); 79 80 82 SortedSet messages = new TreeSet (); 83 84 85 ConcurrentHashMap events = new ConcurrentHashMap(); 86 87 88 CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet(); 89 90 91 JMSDestinationManager server; 92 93 94 Receivers receivers; 95 96 97 String description; 98 99 100 MessageCounter counter; 101 102 104 HashMap unacknowledgedMessages = new HashMap (); 105 107 HashMap unackedByMessageRef = new HashMap (); 108 110 HashMap unackedBySubscription = new HashMap (); 111 112 114 HashSet subscribers = new HashSet (); 115 116 118 HashSet removedSubscribers = new HashSet (); 119 120 121 BasicQueueParameters parameters; 122 123 124 boolean stopped = false; 125 126 134 public BasicQueue(JMSDestinationManager server, String description, BasicQueueParameters parameters) 135 throws JMSException  136 { 137 this.server = server; 138 this.description = description; 139 this.parameters = parameters; 140 141 Class receiversImpl = parameters.receiversImpl; 142 if (receiversImpl == null) 143 receiversImpl = ReceiversImpl.class; 144 145 try 146 { 147 receivers = (Receivers) receiversImpl.newInstance(); 148 } 149 catch (Throwable t) 150 { 151 throw new SpyJMSException("Error instantiating receivers implementation: " + receiversImpl, t); 152 } 153 } 154 155 160 public String getDescription() 161 { 162 return description; 163 } 164 165 170 public int getReceiversCount() 171 { 172 return receivers.size(); 173 } 174 175 180 public ArrayList getReceivers() 181 { 182 synchronized (receivers) 183 { 184 return receivers.listReceivers(); 185 } 186 } 187 188 193 public boolean isInUse() 194 { 195 synchronized (receivers) 196 { 197 return subscribers.size() > 0; 198 } 199 } 200 201 207 public void addReceiver(Subscription sub) throws JMSException  208 { 209 boolean trace = log.isTraceEnabled(); 210 if (trace) 211 log.trace("addReceiver " + sub + " " + this); 212 213 MessageReference found = null; 214 synchronized (messages) 215 { 216 if (messages.size() != 0) 217 { 218 for (Iterator it = messages.iterator(); it.hasNext();) 219 { 220 MessageReference message = (MessageReference) it.next(); 221 try 222 { 223 if (message.isExpired()) 224 { 225 it.remove(); 226 expireMessageAsync(message); 227 } 228 else if (sub.accepts(message.getHeaders())) 229 { 230 it.remove(); 232 found = message; 233 break; 234 } 235 } 236 catch (JMSException ignore) 237 { 238 log.info("Caught unusual exception in addToReceivers.", ignore); 239 } 240 } 241 } 242 } 243 if (found != null) 244 queueMessageForSending(sub, found); 245 else 246 addToReceivers(sub); 247 } 248 249 254 public Set getSubscribers() 255 { 256 synchronized (receivers) 257 { 258 return (Set ) subscribers.clone(); 259 } 260 } 261 262 268 public void addSubscriber(Subscription sub) throws JMSException  269 { 270 boolean trace = log.isTraceEnabled(); 271 if (trace) 272 log.trace("addSubscriber " + sub + " " + this); 273 synchronized (receivers) 274 { 275 if (stopped) 276 throw new IllegalStateException ("The destination is stopped " + getDescription()); 277 subscribers.add(sub); 278 } 279 } 280 281 286 public void removeSubscriber(Subscription sub) 287 { 288 boolean trace = log.isTraceEnabled(); 289 if (trace) 290 log.trace("removeSubscriber " + sub + " " + this); 291 synchronized (receivers) 292 { 293 removeReceiver(sub); 294 synchronized (messages) 295 { 296 if (hasUnackedMessages(sub)) 297 { 298 if (trace) 299 log.trace("Delaying removal of subscriber is has unacked messages " + sub); 300 removedSubscribers.add(sub); 301 } 302 else 303 { 304 if (trace) 305 log.trace("Removing subscriber " + sub); 306 subscribers.remove(sub); 307 ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId); 308 } 309 } 310 } 311 } 312 313 318 public int getQueueDepth() 319 { 320 return messages.size(); 321 } 322 323 328 public int getScheduledMessageCount() 329 { 330 return scheduledMessages.size(); 331 } 332 333 338 public int getInProcessMessageCount() 339 { 340 synchronized (messages) 341 { 342 return unacknowledgedMessages.size(); 343 } 344 } 345 346 353 public void addMessage(MessageReference mes, Tx txId) throws JMSException  354 { 355 boolean trace = log.isTraceEnabled(); 356 if (trace) 357 log.trace("addMessage " + mes + " " + txId + " " + this); 358 359 try 360 { 361 synchronized (receivers) 362 { 363 if (stopped) 364 throw new IllegalStateException ("The destination is stopped " + getDescription()); 365 } 366 367 if (parameters.maxDepth > 0) 368 { 369 synchronized (messages) 370 { 371 if (messages.size() >= parameters.maxDepth) 372 { 373 dropMessage(mes); 374 String message = "Maximum size " + parameters.maxDepth + 375 " exceeded for " + description; 376 log.warn(message); 377 throw new DestinationFullException(message); 378 } 379 } 380 } 381 382 performOrPrepareAddMessage(mes, txId); 383 } 384 catch (Throwable t) 385 { 386 String error = "Error in addMessage " + mes; 387 log.trace(error, t); 388 dropMessage(mes, txId); 389 SpyJMSException.rethrowAsJMSException(error, t); 390 } 391 } 392 393 400 protected void performOrPrepareAddMessage(MessageReference mes, Tx txId) throws Exception  401 { 402 TxManager txManager = server.getPersistenceManager().getTxManager(); 403 404 Runnable task = new AddMessagePostRollBackTask(mes); 406 txManager.addPostRollbackTask(txId, task); 407 408 task = new AddMessagePostCommitTask(mes); 410 txManager.addPostCommitTask(txId, task); 411 } 412 413 418 public void restoreMessage(MessageReference mes) 419 { 420 restoreMessage(mes, null, Tx.UNKNOWN); 421 } 422 423 430 public void restoreMessage(MessageReference mes, Tx txid, int type) 431 { 432 boolean trace = log.isTraceEnabled(); 433 if (trace) 434 log.trace("restoreMessage " + mes + " " + this + " txid=" + txid + " type=" + type); 435 436 try 437 { 438 if (txid == null) 439 { 440 internalAddMessage(mes); 441 } 442 else if (type == Tx.ADD) 443 { 444 performOrPrepareAddMessage(mes, txid); 445 } 446 else if (type == Tx.REMOVE) 447 { 448 performOrPrepareAcknowledgeMessage(mes, txid); 449 } 450 else 451 { 452 throw new IllegalStateException ("Unknown restore type " + type + " for message " + mes + " txid=" + txid); 453 } 454 } 455 catch (RuntimeException e) 456 { 457 throw e; 458 } 459 catch (Exception e) 460 { 461 throw new NestedRuntimeException("Unable to restore message " + mes, e); 462 } 463 } 464 465 468 protected void nackMessage(MessageReference message) 469 { 470 if (log.isTraceEnabled()) 471 log.trace("Restoring message: " + message); 472 473 try 474 { 475 message.redelivered(); 476 message.invalidate(); 478 if (message.isPersistent()) 481 server.getPersistenceManager().update(message, null); 482 } 483 catch (JMSException e) 484 { 485 log.error("Caught unusual exception in nackMessage for " + message, e); 486 } 487 488 internalAddMessage(message); 489 } 490 491 499 public SpyMessage[] browse(String selector) throws JMSException  500 { 501 if (selector == null) 502 { 503 SpyMessage list[]; 504 synchronized (messages) 505 { 506 list = new SpyMessage[messages.size()]; 507 Iterator iter = messages.iterator(); 508 for (int i = 0; iter.hasNext(); i++) 509 list[i] = ((MessageReference) iter.next()).getMessageForDelivery(); 510 } 511 return list; 512 } 513 else 514 { 515 Selector s = new Selector(selector); 516 LinkedList selection = new LinkedList (); 517 518 synchronized (messages) 519 { 520 Iterator i = messages.iterator(); 521 while (i.hasNext()) 522 { 523 MessageReference m = (MessageReference) i.next(); 524 if (s.test(m.getHeaders())) 525 selection.add(m.getMessageForDelivery()); 526 } 527 } 528 529 SpyMessage list[]; 530 list = new SpyMessage[selection.size()]; 531 list = (SpyMessage[]) selection.toArray(list); 532 return list; 533 } 534 } 535 536 544 public List browseScheduled(String selector) throws JMSException  545 { 546 if (selector == null) 547 { 548 ArrayList list; 549 synchronized (messages) 550 { 551 list = new ArrayList (scheduledMessages.size()); 552 Iterator iter = scheduledMessages.iterator(); 553 while (iter.hasNext()) 554 { 555 MessageReference ref = (MessageReference) iter.next(); 556 list.add(ref.getMessageForDelivery()); 557 } 558 } 559 return list; 560 } 561 else 562 { 563 Selector s = new Selector(selector); 564 LinkedList selection = new LinkedList (); 565 566 synchronized (messages) 567 { 568 Iterator iter = scheduledMessages.iterator(); 569 while (iter.hasNext()) 570 { 571 MessageReference ref = (MessageReference) iter.next(); 572 if (s.test(ref.getHeaders())) 573 selection.add(ref.getMessageForDelivery()); 574 } 575 } 576 577 return selection; 578 } 579 } 580 581 589 public List browseInProcess(String selector) throws JMSException  590 { 591 if (selector == null) 592 { 593 ArrayList list; 594 synchronized (messages) 595 { 596 list = new ArrayList (unacknowledgedMessages.size()); 597 Iterator iter = unacknowledgedMessages.values().iterator(); 598 while (iter.hasNext()) 599 { 600 UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next(); 601 MessageReference ref = unacked.messageRef; 602 list.add(ref.getMessageForDelivery()); 603 } 604 } 605 return list; 606 } 607 else 608 { 609 Selector s = new Selector(selector); 610 LinkedList selection = new LinkedList (); 611 612 synchronized (messages) 613 { 614 Iterator iter = unacknowledgedMessages.values().iterator(); 615 while (iter.hasNext()) 616 { 617 UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next(); 618 MessageReference ref = unacked.messageRef; 619 if (s.test(ref.getHeaders())) 620 selection.add(ref.getMessageForDelivery()); 621 } 622 } 623 624 return selection; 625 } 626 } 627 628 636 public SpyMessage receive(Subscription sub, boolean wait) throws JMSException  637 { 638 boolean trace = log.isTraceEnabled(); 639 if (trace) 640 log.trace("receive " + sub + " wait=" + wait + " " + this); 641 642 MessageReference messageRef = null; 643 synchronized (receivers) 644 { 645 if (stopped) 646 throw new IllegalStateException ("The destination is stopped " + getDescription()); 647 if (sub.getSelector() == null && sub.noLocal == false) 649 { 650 synchronized (messages) 651 { 652 while (messages.size() != 0) 654 { 655 messageRef = (MessageReference) messages.first(); 656 messages.remove(messageRef); 657 658 if (messageRef.isExpired()) 659 { 660 expireMessageAsync(messageRef); 661 messageRef = null; 662 } 663 else 664 break; 665 } 666 } 667 } 668 else 669 { 670 synchronized (messages) 672 { 673 Iterator i = messages.iterator(); 674 while (i.hasNext()) 675 { 676 MessageReference mr = (MessageReference) i.next(); 677 if (mr.isExpired()) 678 { 679 i.remove(); 680 expireMessageAsync(mr); 681 } 682 else if (sub.accepts(mr.getHeaders())) 683 { 684 messageRef = mr; 685 i.remove(); 686 break; 687 } 688 } 689 } 690 } 691 692 if (messageRef == null) 693 { 694 if (wait) 695 addToReceivers(sub); 696 } 697 else 698 { 699 setupMessageAcknowledgement(sub, messageRef); 700 } 701 } 702 703 if (messageRef == null) 704 return null; 705 return messageRef.getMessageForDelivery(); 706 } 707 708 715 public void acknowledge(AcknowledgementRequest item, Tx txId) throws JMSException  716 { 717 boolean trace = log.isTraceEnabled(); 718 if (trace) 719 log.trace("acknowledge " + item + " " + txId + " " + this); 720 721 UnackedMessageInfo unacked = null; 722 synchronized (messages) 723 { 724 unacked = (UnackedMessageInfo) unacknowledgedMessages.remove(item); 725 if (unacked == null) 726 return; 727 unackedByMessageRef.remove(unacked.messageRef); 728 HashMap map = (HashMap ) unackedBySubscription.get(unacked.sub); 729 if (map != null) 730 map.remove(unacked.messageRef); 731 if (map == null || map.isEmpty()) 732 unackedBySubscription.remove(unacked.sub); 733 } 734 735 MessageReference m = unacked.messageRef; 736 737 if (!item.isAck) 739 { 740 Runnable task = new RestoreMessageTask(m); 741 server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task); 742 server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task); 743 } 744 else 745 { 746 try 747 { 748 if (m.isPersistent()) 749 server.getPersistenceManager().remove(m, txId); 750 } 751 catch (Throwable t) 752 { 753 Runnable task = new RestoreMessageTask(m); 756 server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task); 757 server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task); 758 SpyJMSException.rethrowAsJMSException("Error during ACK ref=" + m, t); 759 } 760 761 performOrPrepareAcknowledgeMessage(m, txId); 762 } 763 764 synchronized (receivers) 765 { 766 synchronized (messages) 767 { 768 checkRemovedSubscribers(unacked.sub); 769 } 770 } 771 } 772 773 780 protected void performOrPrepareAcknowledgeMessage(MessageReference mes, Tx txId) throws JMSException  781 { 782 TxManager txManager = server.getPersistenceManager().getTxManager(); 783 784 Runnable task = new RestoreMessageTask(mes); 786 txManager.addPostRollbackTask(txId, task); 787 788 task = new RemoveMessageTask(mes); 790 txManager.addPostCommitTask(txId, task); 791 } 792 793 798 public void nackMessages(Subscription sub) 799 { 800 boolean trace = log.isTraceEnabled(); 801 if (trace) 802 log.trace("nackMessages " + sub + " " + this); 803 804 synchronized (receivers) 806 { 807 synchronized (messages) 808 { 809 int count = 0; 810 HashMap map = (HashMap ) unackedBySubscription.get(sub); 811 if (map != null) 812 { 813 Iterator i = ((HashMap ) map.clone()).values().iterator(); 814 while (i.hasNext()) 815 { 816 AcknowledgementRequest item = (AcknowledgementRequest) i.next(); 817 try 818 { 819 acknowledge(item, null); 820 count++; 821 } 822 catch (JMSException ignore) 823 { 824 log.debug("Unable to nack message: " + item, ignore); 825 } 826 } 827 if (log.isDebugEnabled()) 828 log.debug("Nacked " + count + " messages for removed subscription " + sub); 829 } 830 } 831 } 832 } 833 834 public void removeAllMessages() throws JMSException  835 { 836 boolean trace = log.isTraceEnabled(); 837 if (trace) 838 log.trace("removeAllMessages " + this); 839 840 for (Iterator i = events.entrySet().iterator(); i.hasNext();) 842 { 843 Map.Entry entry = (Map.Entry ) i.next(); 844 MessageReference message = (MessageReference) entry.getKey(); 845 Timeout timeout = (Timeout) entry.getValue(); 846 if (timeout != null) 847 { 848 timeout.cancel(); 849 i.remove(); 850 dropMessage(message); 851 } 852 } 853 scheduledMessages.clear(); 854 855 synchronized (receivers) 856 { 857 synchronized (messages) 858 { 859 Iterator i = ((HashMap ) unacknowledgedMessages.clone()).keySet().iterator(); 860 while (i.hasNext()) 861 { 862 AcknowledgementRequest item = (AcknowledgementRequest) i.next(); 863 try 864 { 865 acknowledge(item, null); 866 } 867 catch (JMSException ignore) 868 { 869 } 870 } 871 872 i = messages.iterator(); 874 while (i.hasNext()) 875
|