1 22 package org.jboss.mq; 23 24 import java.util.LinkedList ; 25 26 import javax.jms.Destination ; 27 import javax.jms.IllegalStateException ; 28 import javax.jms.InvalidSelectorException ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.MessageListener ; 33 import javax.jms.Session ; 34 35 import org.jboss.logging.Logger; 36 import org.jboss.util.UnreachableStatementException; 37 38 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 39 40 49 public class SpyMessageConsumer implements MessageConsumer , SpyConsumer, Runnable 50 { 51 52 static Logger log = Logger.getLogger(SpyMessageConsumer.class); 53 54 55 static boolean trace = log.isTraceEnabled(); 56 57 58 static final Integer ONCE = new Integer (1); 59 60 61 public SpySession session; 62 63 public Subscription subscription = new Subscription(); 64 65 private SynchronizedBoolean closed = new SynchronizedBoolean(false); 66 67 protected Object stateLock = new Object (); 68 69 protected boolean receiving = false; 70 71 protected boolean waitingForMessage = false; 72 73 protected boolean listening = false; 74 75 protected Thread listenerThread = null; 76 77 MessageListener messageListener; 78 79 LinkedList messages; 80 81 boolean sessionConsumer; 82 83 89 SpyMessageConsumer(SpySession s, boolean sessionConsumer) 90 { 91 trace = log.isTraceEnabled(); 92 93 session = s; 94 this.sessionConsumer = sessionConsumer; 95 messageListener = null; 96 messages = new LinkedList (); 97 98 if (trace) 99 log.trace("New message consumer " + this); 100 } 101 102 111 SpyMessageConsumer(SpySession s, boolean sessionConsumer, SpyDestination destination, String selector, boolean noLocal) throws InvalidSelectorException 112 { 113 trace = log.isTraceEnabled(); 114 115 session = s; 116 this.sessionConsumer = sessionConsumer; 117 subscription.destination = destination; 118 subscription.messageSelector = selector; 119 subscription.noLocal = noLocal; 120 121 if (subscription.messageSelector != null) 125 subscription.getSelector(); 126 127 messageListener = null; 128 messages = new LinkedList (); 129 130 if (trace) 131 log.trace("New message consumer " + this); 132 } 133 134 139 public Subscription getSubscription() 140 { 141 return subscription; 142 } 143 144 150 public void addMessage(SpyMessage message) throws JMSException 151 { 152 if (isClosed()) 153 { 154 if (trace) 155 log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID + 156 " The message consumer was closed. " + this); 157 session.connection.send(message.getAcknowledgementRequest(false)); 158 return; 159 } 160 161 163 if (subscription.accepts(message.header)) 168 { 169 if (sessionConsumer) 170 sessionConsumerProcessMessage(message); 171 else 172 { 173 synchronized (messages) 174 { 175 if (waitingForMessage) 176 { 177 if (trace) 178 log.trace("Adding message=" + message.header.jmsMessageID + " " + this); 179 messages.addLast(message); 180 messages.notifyAll(); 181 } 182 else 183 { 184 if (trace) 187 log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID + 188 " The message consumer was not waiting for a message. " + this); 189 session.connection.send(message.getAcknowledgementRequest(false)); 190 } 191 } 192 } 193 } 194 else 195 { 196 if (trace) 197 log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID + 198 " The subscription did not accept the message. " + this); 199 session.connection.send(message.getAcknowledgementRequest(false)); 200 } 201 } 202 203 206 public void restartProcessing() 207 { 208 synchronized (messages) 209 { 210 if (trace) 211 log.trace("Restarting processing " + this); 212 messages.notifyAll(); 213 } 214 } 215 216 public void setMessageListener(MessageListener listener) throws JMSException 217 { 218 checkClosed(); 219 220 synchronized (stateLock) 221 { 222 if (receiving) 223 throw new JMSException ("Another thread is already in receive."); 224 225 if (trace) 226 log.trace("Set message listener=" + listener + " old listener=" + messageListener + " " + this); 227 228 boolean oldListening = listening; 229 listening = (listener != null); 230 messageListener = listener; 231 232 if (!sessionConsumer && listening && !oldListening) 233 { 234 if (listenerThread == null) 236 { 237 listenerThread = new Thread (this, "MessageListenerThread - " + subscription.destination.getName()); 238 listenerThread.start(); 239 } 240 } 241 } 242 } 243 244 public String getMessageSelector() throws JMSException 245 { 246 checkClosed(); 247 return subscription.messageSelector; 248 } 249 250 public MessageListener getMessageListener() throws JMSException 251 { 252 checkClosed(); 253 return messageListener; 254 } 255 256 public Message receive() throws JMSException 257 { 258 checkClosed(); 259 synchronized (stateLock) 260 { 261 if (receiving) 262 throw new JMSException ("Another thread is already in receive."); 263 if (listening) 264 throw new JMSException ("A message listener is already registered"); 265 receiving = true; 266 267 if (trace) 268 log.trace("receive() " + this); 269 } 270 271 try 272 { 273 synchronized (messages) 274 { 275 Message message = getMessage(); 278 if (message != null) 279 { 280 if (trace) 281 log.trace("receive() message in list " + message.getJMSMessageID() + " " + this); 282 return message; 283 } 284 285 while (true) 287 { 288 SpyMessage msg = session.connection.receive(subscription, 0); 289 if (msg != null) 290 { 291 Message mes = preProcessMessage(msg); 292 if (mes != null) 293 { 294 if (trace) 295 log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this); 296 return mes; 297 } 298 } 299 else 300 break; 301 } 302 303 if (trace) 304 log.trace("No message in receive(), waiting " + this); 305 306 try 307 { 308 waitingForMessage = true; 309 while (true) 310 { 311 if (isClosed()) 312 { 313 if (trace) 314 log.trace("Consumer closed in receive() " + this); 315 return null; 316 } 317 Message mes = getMessage(); 318 if (mes != null) 319 { 320 if (trace) 321 log.trace("receive() message from list after wait " + this); 322 return mes; 323 } 324 messages.wait(); 325 } 326 } 327 catch (Throwable t) 328 { 329 SpyJMSException.rethrowAsJMSException("Receive interupted", t); 330 throw new UnreachableStatementException(); 331 } 332 finally 333 { 334 waitingForMessage = false; 335 } 336 } 337 } 338 finally 339 { 340 synchronized (stateLock) 341 { 342 receiving = false; 343 } 344 } 345 } 346 347 public Message receive(long timeOut) throws JMSException 348 { 349 if (timeOut == 0) 350 { 351 if (trace) 352 log.trace("Timeout is zero in receive(long) using receive() " + this); 353 return receive(); 354 } 355 356 checkClosed(); 357 synchronized (stateLock) 358 { 359 if (receiving) 360 throw new JMSException ("Another thread is already in receive."); 361 if (listening) 362 throw new JMSException ("A message listener is already registered"); 363 receiving = true; 364 365 if (trace) 366 log.trace("receive(long) " + this); 367 } 368 369 long endTime = System.currentTimeMillis() + timeOut; 370 371 if (trace) 372 log.trace("receive(long) endTime=" + endTime + " " + this); 373 374 try 375 { 376 synchronized (messages) 377 { 378 Message message = getMessage(); 381 if (message != null) 382 { 383 if (trace) 384 log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this); 385 return message; 386 } 387 while (true) 389 { 390 SpyMessage msg = session.connection.receive(subscription, timeOut); 391 if (msg != null) 392 { 393 Message mes = preProcessMessage(msg); 394 if (mes != null) 395 { 396 if (trace) 397 log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this); 398 return mes; 399 } 400 } 401 else 402 break; 403 } 404 405 if (trace) 406 log.trace("No message in receive(), waiting " + this); 407 408 try 409 { 410 waitingForMessage = true; 411 while (true) 412 { 413 if (isClosed()) 414 { 415 if (trace) 416 log.trace("Consumer closed in receive(long) " + this); 417 return null; 418 } 419 420 Message mes = getMessage(); 421 if (mes != null) 422 { 423 if (trace) 424 log.trace("receive(long) message from list after wait " + this); 425 return mes; 426 } 427 428 long att = endTime - System.currentTimeMillis(); 429 if (att <= 0) 430 { 431 if (trace) 432 log.trace("receive(long) timed out endTime=" + endTime + " " + this); 433 return null; 434 } 435 436 messages.wait(att); 437 } 438 } 439 catch (Throwable t) 440 { 441 SpyJMSException.rethrowAsJMSException("Receive interupted", t); 442 throw new UnreachableStatementException(); 443 } 444 finally 445 { 446 waitingForMessage = false; 447 } 448 } 449 } 450 finally 451 { 452 synchronized (stateLock) 453 { 454 receiving = false; 455 } 456 } 457 } 458 459 public Message receiveNoWait() throws JMSException 460 { 461 checkClosed(); 462 synchronized (stateLock) 463 { 464 if (receiving) 465 throw new JMSException ("Another thread is already in receive."); 466 if (listening) 467 throw new JMSException ("A message listener is already registered"); 468 receiving = true; 469 470 if (trace) 471 log.trace("receiveNoWait() " + this); 472 } 473 474 try 475 { 476 synchronized (messages) 479 { 480 Message mes = getMessage(); 481 if (mes != null) 482 { 483 if (trace) 484 log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this); 485 return mes; 486 } 487 } 488 while (true) 490 { 491 SpyMessage msg = session.connection.receive(subscription, -1); 492 if (msg != null) 493 { 494 Message mes = preProcessMessage(msg); 495 if (mes != null) 496 { 497 if (trace) 498 log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this); 499 return mes; 500 } 501 } 502 else 503 { 504 if (trace) 505 log.trace("receiveNoWait() no message " + this); 506 return null; 507 } 508 } 509 } 510 finally 511 { 512 synchronized (stateLock) 513 { 514 receiving = false; 515 } 516 } 517 } 518 519 public void close() throws JMSException 520 { 521 synchronized (messages) 522 { 523 if (closed.set(true)) 524 return; 525 526 if (trace) 527 log.trace("Message consumer closing. " + this); 528 messages.notifyAll(); 529 } 530 531 session.interruptDeliveryLockWaiters(); 533 534 if (listenerThread != null && !Thread.currentThread().equals(listenerThread)) 535 { 536 try 537 { 538 if (trace) 539 log.trace("Joining listener thread. " + this); 540 listenerThread.join(); 541 } 542 catch (InterruptedException e) 543 { 544 } 545 } 546 547 if (!sessionConsumer) 548 { 549 session.removeConsumer(this); 550 } 551 552 if (trace) 553 log.trace("Closed. " + this); 554 } 555 556 public void run() 557 { 558 SpyMessage mes = null; 559 try 560 { 561 outer : while (true) 562 { 563 while (mes == null) 565 { 566 synchronized (messages) 567 { 568 if (isClosed()) 569 { 570 waitingForMessage = false; 571 if (trace) 572 log.trace("Consumer closed in run() " + this); 573 break outer; 574 } 575 if (messages.isEmpty()) 576 mes = session.connection.receive(subscription, 0); 577 if (mes == null) 578 { 579 waitingForMessage = true; 580 if (trace) 581 log.trace("waiting in run() " + this); 582 while ((messages.isEmpty() && isClosed() == false) || (!session.running)) 583 { 584 try 585 { 586 messages.wait(); 587 } 588 catch (InterruptedException e) 589 { 590 log.trace("Ignored interruption waiting for messages"); 591 } 592 } 593 if (isClosed()) 594 { 595 waitingForMessage = false; 596 if (trace) 597 log.trace("Consumer closed while waiting in run() " + this); 598 break outer; 599 } 600 mes = (SpyMessage) messages.removeFirst(); 601 waitingForMessage = false; 602 } 603 else 604 { 605 if (trace) 606 log.trace("run() message from server mes=" + mes.getJMSMessageID() + " " + this); 607 } 608 } 609 mes.session = session; 610 } 611 612 MessageListener thisListener; 613 synchronized (stateLock) 614 { 615 if (!isListening()) 616 { 617 if (mes != null) 619 { 620 if (trace) 621 log.trace("run() nacking not listening message mes=" + mes.getJMSMessageID() + " " + this); 622 session.connection.send(mes.getAcknowledgementRequest(false)); 623 } 624 listenerThread = null; 627 mes = null; 628 break; 629 } 630 thisListener = messageListener; 631 } 632 Message message = mes; 633 if (mes instanceof SpyEncapsulatedMessage) 634 message = ((SpyEncapsulatedMessage) mes).getMessage(); 635 636 boolean gotDeliveryLock = false; 639 while (gotDeliveryLock == false) 640 { 641 gotDeliveryLock = session.tryDeliveryLock(); 642 if (gotDeliveryLock == false) 644 { 645 synchronized (messages) 646 { 647 if (isClosed()) 648 break; 649 } 650 } 651 } 652 if (gotDeliveryLock == false) 653 { 654 if (trace) 655 log.trace("run() nacking didn't get delivery lock mes=" + mes.getJMSMessageID() + " " + this); 656 session.connection.send(mes.getAcknowledgementRequest(false)); 657 } 658 else 659 { 660 try 666 { 667 if (session.transacted) 668 { 669 if (trace) 674 log.trace("run() acknowledging message in tx mes=" + mes.getJMSMessageID() + " " + this); 675 session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), mes); 676 } 677 678 try 679 { 680 prepareDelivery((SpyMessage) message); 681 session.addUnacknowlegedMessage((SpyMessage) message); 682 thisListener.onMessage(message); 683 } 684 catch (Throwable t) 685 { 686 log.warn("Message listener " + thisListener + " threw a throwable.", t); 687 } 688 } 689 finally 690 { 691 session.releaseDeliveryLock(); 692 } 693 694 if (!session.transacted 695 && (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)) 696 { 697 boolean recovered; 699 synchronized (messages) 700 { 701 recovered = messages.contains(message); 702 } 703 if (recovered == false) 704 mes.doAcknowledge(); 705 } 706 mes = null; 707 } 708 } 709 } 710 catch (Throwable t) 711 { 712 log.warn("Message consumer closing due to error in listening thread.", t); 713 try 714 { 715 close(); 716 } 717 catch (Throwable ignore) 718 { 719 } 720 session.asynchFailure("Message consumer closing due to error in listening thread.", t); 721 } 722 } 723 724 public String toString() 725 { 726 StringBuffer buffer = new StringBuffer (100); 727 buffer.append("SpyMessageConsumer@").append(System.identityHashCode(this)); 728 buffer.append("[sub=").append(subscription); 729 if (isClosed()) 730 buffer.append(" CLOSED"); 731 buffer.append(" listening=").append(listening); 732 buffer.append(" receiving=").append(receiving); 733 buffer.append(" sessionConsumer=").append(sessionConsumer); 734 buffer.append(" waitingForMessage=").append(waitingForMessage); 735 buffer.append(" messages=").append(messages.size()); 736 if (listenerThread != null) 737 buffer.append(" thread=").append(listenerThread); 738 if (messageListener != null) 739 buffer.append(" listener=").append(messageListener); 740 buffer.append(" session=").append(session); 741 buffer.append(']'); 742 return buffer.toString(); 743 } 744 745 Message getMessage() 746 { 747 synchronized (messages) 748 { 749 if (trace) 750 log.trace("Getting message from list " + this); 751 while (true) 752 { 753 try 754 { 755 if (messages.size() == 0) 756 return null; 757 758 SpyMessage mes = (SpyMessage) messages.removeFirst(); 759 760 Message rc = preProcessMessage(mes); 761 if (rc == null) 763 continue; 764 765 return rc; 766 } 767 catch (Throwable t) 768 { 769 log.error("Ignoring error", t); 770 } 771 } 772 } 773 } 774 775 Message preProcessMessage(SpyMessage message) throws JMSException 776 { 777 message.session = session; 778 session.addUnacknowlegedMessage(message); 779 780 prepareDelivery(message); 781 782 if (!isListening()) 784 { 785 if (session.transacted) 786 { 787 if (trace) 788 log.trace("preprocess() acking message in tx message=" + message.getJMSMessageID() + " " + this); 789 session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message); 790 } 791 else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE 792 || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) 793 { 794 message.doAcknowledge(); 795 } 796 797 if (message instanceof SpyEncapsulatedMessage) 798 { 799 return ((SpyEncapsulatedMessage) message).getMessage(); 800 } 801 return message; 802 } 803 else 804 { 805 return message; 806 } 807 } 808 809 815 void prepareDelivery(SpyMessage message) throws JMSException 816 { 817 Integer delivery = ONCE; 818 Integer redelivery = (Integer ) message.header.jmsProperties.get(SpyMessage.PROPERTY_REDELIVERY_COUNT); 819 if (redelivery != null) 820 { 821 int value = redelivery.intValue(); 822 if (value != 0) 823 delivery = new Integer (value + 1); 824 } 825 message.header.jmsProperties.put(SpyMessage.PROPERTY_DELIVERY_COUNT, delivery); 826 } 827 828 protected Destination getDestination() throws JMSException 829 { 830 checkClosed(); 831 return subscription.destination; 832 } 833 834 protected boolean getNoLocal() throws JMSException 835 { 836 checkClosed(); 837 return subscription.noLocal; 838 } 839 840 845 protected boolean isListening() 846 { 847 synchronized (stateLock) 848 { 849 return listening; 850 } 851 } 852 853 protected void sessionConsumerProcessMessage(SpyMessage message) throws JMSException 854 { 855 message.session = session; 856 MessageListener thisListener; 858 synchronized (stateLock) 859 { 860 thisListener = messageListener; 861 } 862 863 Object anonymousTXID = null; 868 if (session.transacted) 869 { 870 if (session.getCurrentTransactionId() == null) 872 { 873 anonymousTXID = session.connection.spyXAResourceManager.startTx(); 874 session.setCurrentTransactionId(anonymousTXID); 875 } 876 if (trace) 877 log.trace("consumer() acking message in tx message=" + message.getJMSMessageID() + " " + this); 878 session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message); 879 } 880 881 if (thisListener != null) 882 { 883 Message mes = message; 884 if (message instanceof SpyEncapsulatedMessage) 885 { 886 mes = ((SpyEncapsulatedMessage) message).getMessage(); 887 } 888 session.addUnacknowlegedMessage((SpyMessage) mes); 889 if (trace) 890 log.trace("consumer() before onMessage=" + message.getJMSMessageID() + " " + this); 891 thisListener.onMessage(mes); 892 if (trace) 893 log.trace("consumer() after onMessage=" + message.getJMSMessageID() + " " + this); 894 } 895 896 if (session.transacted) 897 { 898 if (anonymousTXID != null) 900 { 901 if (session.getCurrentTransactionId() == anonymousTXID) 902 { 903 try 905 { 906 if (trace) 907 log.trace("XASession was not enlisted - Committing work using anonymous xid: " + anonymousTXID); 908 session.connection.spyXAResourceManager.endTx(anonymousTXID, true); 909 session.connection.spyXAResourceManager.commit(anonymousTXID, true); 910 } 911 catch (Throwable t) 912 { 913 log.error("Could not commit", t); 914 } 915 finally 916 { 917 session.unsetCurrentTransactionId(anonymousTXID); 918 } 919 } 920 } 921 } 922 else 923 { 924 if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE 927 || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) 928 { 929 message.doAcknowledge(); 930 } 931 } 932 } 933 934 939 private boolean isClosed() 940 { 941 return closed.get(); 942 } 943 944 949 private void checkClosed() throws IllegalStateException 950 { 951 if (closed.get()) 952 throw new IllegalStateException ("The consumer is closed"); 953 } 954 } 955 | Popular Tags |