1 22 package org.jboss.test.jbossmq.test; 23 24 import java.util.Enumeration ; 25 26 import javax.jms.DeliveryMode ; 27 import javax.jms.InvalidDestinationException ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageListener ; 31 import javax.jms.Queue ; 32 import javax.jms.QueueBrowser ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.QueueReceiver ; 36 import javax.jms.QueueRequestor ; 37 import javax.jms.QueueSender ; 38 import javax.jms.QueueSession ; 39 import javax.jms.ServerSession ; 40 import javax.jms.ServerSessionPool ; 41 import javax.jms.Session ; 42 import javax.jms.TemporaryQueue ; 43 import javax.jms.TemporaryTopic ; 44 import javax.jms.TextMessage ; 45 import javax.jms.Topic ; 46 import javax.jms.TopicConnection ; 47 import javax.jms.TopicConnectionFactory ; 48 import javax.jms.TopicPublisher ; 49 import javax.jms.TopicSession ; 50 import javax.jms.TopicSubscriber ; 51 import javax.naming.Context ; 52 import javax.naming.InitialContext ; 53 54 import org.jboss.logging.Logger; 55 import org.jboss.test.JBossTestCase; 56 57 import EDU.oswego.cs.dl.util.concurrent.CountDown; 58 59 65 public class JBossMQUnitTest extends JBossTestCase 66 { 67 68 static String TOPIC_FACTORY = "ConnectionFactory"; 69 70 static String QUEUE_FACTORY = "ConnectionFactory"; 71 72 static String TEST_QUEUE = "queue/testQueue"; 73 static String TEST_TOPIC = "topic/testTopic"; 74 static String TEST_DURABLE_TOPIC = "topic/testDurableTopic"; 75 76 static Context context; 78 static QueueConnection queueConnection; 79 static TopicConnection topicConnection; 80 81 public JBossMQUnitTest(String name) throws Exception 82 { 83 super(name); 84 } 85 86 protected void drainQueue() throws Exception 88 { 89 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 90 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 91 92 QueueReceiver receiver = session.createReceiver(queue); 93 Message message = receiver.receive(50); 94 int c = 0; 95 while (message != null) 96 { 97 message = receiver.receive(50); 98 c++; 99 } 100 101 if (c != 0) 102 getLog().debug(" Drained " + c + " messages from the queue"); 103 104 session.close(); 105 } 106 107 protected void connect() throws Exception 108 { 109 110 if (context == null) 111 { 112 113 context = new InitialContext (); 114 115 } 116 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 117 queueConnection = queueFactory.createQueueConnection(); 118 119 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 120 topicConnection = topicFactory.createTopicConnection(); 121 122 getLog().debug("Connection to spyderMQ established."); 123 124 } 125 126 protected void disconnect() throws Exception 127 { 128 queueConnection.close(); 129 topicConnection.close(); 130 } 131 132 141 public void testQueueMessageOrder() throws Exception 142 { 143 144 getLog().debug("Starting QueueMessageOrder test"); 145 146 connect(); 147 148 queueConnection.start(); 149 150 drainQueue(); 151 152 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 153 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 154 QueueSender sender = session.createSender(queue); 155 156 TextMessage message = session.createTextMessage(); 157 message.setText("Normal message"); 158 sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0); 159 message.setText("Persistent message"); 161 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 162 message.setText("High Priority Persistent message"); 164 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 165 167 170 QueueBrowser browser = session.createBrowser(queue); 171 Enumeration i = browser.getEnumeration(); 172 getLog().debug(message.getText()); 176 177 message = (TextMessage ) i.nextElement(); 178 getLog().debug(message.getText()); 181 182 message = (TextMessage ) i.nextElement(); 183 getLog().debug(message.getText()); 186 187 190 disconnect(); 191 getLog().debug("QueueMessageOrder passed"); 192 } 193 194 199 public void testRequestReplyQueue() throws Exception 200 { 201 202 getLog().debug("Starting RequestReplyQueue test"); 203 connect(); 204 205 { 206 queueConnection.start(); 207 drainQueue(); 208 } 209 210 Thread serverThread = new Thread () 211 { 212 public void run() 213 { 214 Logger log = Logger.getLogger(getClass().getName()); 215 try 216 { 217 log.debug("Server Thread Started"); 218 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 219 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 220 221 QueueReceiver queueReceiver = session.createReceiver(queue); 222 223 boolean done = false; 224 while (!done) 225 { 226 TextMessage message = (TextMessage ) queueReceiver.receive(); 227 Queue tempQueue = (Queue ) message.getJMSReplyTo(); 228 229 QueueSender replySender = session.createSender(tempQueue); 230 TextMessage reply = session.createTextMessage(); 231 reply.setText("Request Processed"); 232 reply.setJMSCorrelationID(message.getJMSMessageID()); 233 replySender.send(reply); 234 235 if (message.getText().equals("Quit")) 236 done = true; 237 } 238 239 session.close(); 240 log.debug("Server Thread Finished"); 241 242 } 243 catch (Exception e) 244 { 245 log.error("Error", e); 246 } 247 } 248 }; 249 250 serverThread.start(); 251 252 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 253 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 254 255 QueueRequestor queueRequestor = new QueueRequestor (session, queue); 256 TextMessage message = session.createTextMessage(); 257 message.setText("Request Test"); 258 259 for (int i = 0; i < 5; i++) 260 { 261 262 getLog().debug("Making client request #" + i); 263 TextMessage reply = (TextMessage ) queueRequestor.request(message); 264 String replyID = new String (reply.getJMSCorrelationID()); 265 if (!replyID.equals(message.getJMSMessageID())) 266 throw new Exception ("REQUEST: ERROR: Reply does not match sent message"); 267 268 } 269 270 getLog().debug("Making client request to shut server down."); 271 message.setText("Quit"); 272 queueRequestor.request(message); 273 274 serverThread.join(); 275 disconnect(); 276 277 getLog().debug("RequestReplyQueue passed"); 278 } 279 280 283 public void testTemporaryQueueDelete() throws Exception 284 { 285 286 getLog().debug("Starting TemporaryQueueDelete test"); 287 connect(); 288 289 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 290 TemporaryQueue queue = session.createTemporaryQueue(); 291 292 queue.delete(); 293 294 disconnect(); 295 296 getLog().debug("TemporaryQueueDelete passed"); 297 } 298 299 302 public void testTemporaryTopicDelete() throws Exception 303 { 304 305 getLog().debug("Starting TemporaryTopicDelete test"); 306 connect(); 307 308 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 309 TemporaryTopic topic = session.createTemporaryTopic(); 310 311 topic.delete(); 312 313 disconnect(); 314 315 getLog().debug("TemporaryTopicDelete passed"); 316 } 317 318 321 public void testInvalidDestinationQueueSend() throws Exception 322 { 323 324 getLog().debug("Starting InvaidDestinationQueueSend test"); 325 connect(); 326 327 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 328 TemporaryQueue queue = session.createTemporaryQueue(); 329 QueueSender sender = session.createSender(queue); 330 queue.delete(); 331 332 TextMessage message = session.createTextMessage("hello"); 333 boolean caught = false; 334 try 335 { 336 sender.send(message); 337 } 338 catch (InvalidDestinationException expected) 339 { 340 caught = true; 341 } 342 343 disconnect(); 344 345 assertTrue("Expected an InvalidDestinationException", caught); 346 347 getLog().debug("InvaldDestinationQueueSend passed"); 348 } 349 350 353 public void testInvalidDestinationQueueBrowse() throws Exception 354 { 355 356 getLog().debug("Starting InvalidDestinationQueueBrowse test"); 357 connect(); 358 359 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 360 TemporaryQueue queue = session.createTemporaryQueue(); 361 QueueBrowser browser = session.createBrowser(queue); 362 queue.delete(); 363 364 boolean caught = false; 365 try 366 { 367 browser.getEnumeration(); 368 } 369 catch (InvalidDestinationException expected) 370 { 371 caught = true; 372 } 373 374 disconnect(); 375 376 assertTrue("Expected an InvalidDestinationException", caught); 377 378 getLog().debug("InvalidDestinationQueueBrowse passed"); 379 } 380 381 384 public void testInvalidDestinationTopicPublish() throws Exception 385 { 386 387 getLog().debug("Starting InvaidDestinationTopicPublish test"); 388 connect(); 389 390 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 391 TemporaryTopic topic = session.createTemporaryTopic(); 392 TopicPublisher publisher = session.createPublisher(topic); 393 topic.delete(); 394 395 TextMessage message = session.createTextMessage("hello"); 396 boolean caught = false; 397 try 398 { 399 publisher.publish(message); 400 } 401 catch (InvalidDestinationException expected) 402 { 403 caught = true; 404 } 405 406 disconnect(); 407 408 assertTrue("Expected an InvalidDestinationException", caught); 409 410 getLog().debug("InvaldDestinationTopicPublish passed"); 411 } 412 413 416 public void testErrorsTopicSubscribe() throws Exception 417 { 418 419 getLog().debug("Starting InvalidDestinationTopicSubscribe test"); 420 connect(); 421 422 try 423 { 424 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 425 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 426 TemporaryTopic temp = session.createTemporaryTopic(); 427 428 boolean caught = false; 429 try 430 { 431 session.createSubscriber(null); 432 } 433 catch (InvalidDestinationException expected) 434 { 435 caught = true; 436 } 437 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 438 439 caught = false; 440 try 441 { 442 session.createSubscriber(null, null, true); 443 } 444 catch (InvalidDestinationException expected) 445 { 446 caught = true; 447 } 448 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 449 450 caught = false; 451 try 452 { 453 session.createDurableSubscriber(null, "NotUsed"); 454 } 455 catch (InvalidDestinationException expected) 456 { 457 caught = true; 458 } 459 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 460 461 caught = false; 462 try 463 { 464 session.createDurableSubscriber(temp, "NotUsed"); 465 } 466 catch (InvalidDestinationException expected) 467 { 468 caught = true; 469 } 470 assertTrue("Expected an InvalidDestinationException for a temporary topic", caught); 471 472 caught = false; 473 try 474 { 475 session.createDurableSubscriber(null, "NotUsed", null, true); 476 } 477 catch (InvalidDestinationException expected) 478 { 479 caught = true; 480 } 481 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 482 483 caught = false; 484 try 485 { 486 session.createDurableSubscriber(temp, "NotUsed", null, true); 487 } 488 catch (InvalidDestinationException expected) 489 { 490 caught = true; 491 } 492 assertTrue("Expected an InvalidDestinationException for a temporary topic", caught); 493 494 caught = false; 495 try 496 { 497 session.createDurableSubscriber(topic, null); 498 } 499 catch (Exception expected) 500 { 501 caught = true; 502 } 503 assertTrue("Expected a Exception for a null subscription", caught); 504 505 caught = false; 506 try 507 { 508 session.createDurableSubscriber(topic, null, null, false); 509 } 510 catch (Exception expected) 511 { 512 caught = true; 513 } 514 assertTrue("Expected a Exception for a null subscription", caught); 515 516 caught = false; 517 try 518 { 519 session.createDurableSubscriber(topic, " "); 520 } 521 catch (Exception expected) 522 { 523 caught = true; 524 } 525 assertTrue("Expected a Exception for an empty subscription", caught); 526 527 caught = false; 528 try 529 { 530 session.createDurableSubscriber(topic, " ", null, false); 531 } 532 catch (Exception expected) 533 { 534 caught = true; 535 } 536 assertTrue("Expected a Exception for an empty subscription", caught); 537 } 538 finally 539 { 540 disconnect(); 541 } 542 543 getLog().debug("InvalidDestinationTopicSubscriber passed"); 544 } 545 546 549 public void testCreateQueue() throws Exception 550 { 551 552 getLog().debug("Starting create queue test"); 553 connect(); 554 555 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 556 557 Queue jndiQueue = (Queue ) getInitialContext().lookup("queue/testQueue"); 558 Queue createQueue = session.createQueue(jndiQueue.getQueueName()); 559 assertTrue("Failed for " + QUEUE_FACTORY, jndiQueue.equals(createQueue)); 560 561 getLog().debug("InvalidDestinationTopicSubscriber passed"); 562 } 563 564 public void testMessageListener() throws Exception 565 { 566 getLog().debug("Starting create queue test"); 567 568 connect(); 569 queueConnection.start(); 570 drainQueue(); 571 final CountDown counter1 = new CountDown(3); 572 573 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 574 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 575 576 QueueReceiver receiver = session.createReceiver(queue); 577 receiver.setMessageListener(new MessageListener () 578 { 579 public void onMessage(Message msg) 580 { 581 Logger log = Logger.getLogger(getClass().getName()); 582 log.debug("ML"); 583 try 584 { 585 if (msg instanceof TextMessage ) 586 { 587 log.debug(((TextMessage ) msg).getText()); 588 counter1.release(); 589 } 590 } 591 catch (Exception e) 592 { 593 } 594 } 595 }); 596 597 QueueSender sender = session.createSender(queue); 598 599 TextMessage message = session.createTextMessage(); 600 message.setText("Normal message"); 601 sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0); 602 message.setText("Persistent message"); 604 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 605 message.setText("High Priority Persistent message"); 607 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 608 610 counter1.acquire(); 612 log.debug("MessageListener1 received the TMs sent"); 613 614 final CountDown counter2 = new CountDown(2); 615 receiver.setMessageListener(new MessageListener () 616 { 617 public void onMessage(Message msg) 618 { 619 Logger log = Logger.getLogger(getClass().getName()); 620 log.debug("ML 2"); 621 try 622 { 623 if (msg instanceof TextMessage ) 624 { 625 log.debug(((TextMessage ) msg).getText()); 626 counter2.release(); 627 } 628 } 629 catch (Exception e) 630 { 631 } 632 } 633 }); 634 635 message.setText("Persistent message"); 636 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 637 message.setText("High Priority Persistent message"); 639 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 640 642 counter2.acquire(); 644 log.debug("MessageListener2 received the TMs sent"); 645 646 receiver.setMessageListener(null); 647 648 message.setText("Persistent message"); 649 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 650 message.setText("High Priority Persistent message"); 652 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 653 655 sender.close(); 656 drainQueue(); 657 disconnect(); 658 getLog().debug("MessageListener test passed"); 659 } 660 661 public void testApplicationServerStuff() throws Exception 662 { 663 getLog().debug("Starting testing app server stuff"); 664 connect(); 665 666 Queue testQueue = (Queue ) context.lookup(TEST_QUEUE); 667 final QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 668 669 session.setMessageListener(new MessageListener () 670 { 671 public void onMessage(Message mess) 672 { 673 Logger log = Logger.getLogger(getClass().getName()); 674 log.debug("Processing message"); 675 try 676 { 677 if (mess instanceof TextMessage ) 678 log.debug(((TextMessage ) mess).getText()); 679 } 680 catch (Exception e) 681 { 682 log.error("Error", e); 683 } 684 } 685 }); 686 687 QueueSender sender = session.createSender(testQueue); 688 sender.send(session.createTextMessage("Hi")); 689 sender.send(session.createTextMessage("There")); 690 sender.send(session.createTextMessage("Guys")); 691 queueConnection.createConnectionConsumer(testQueue, null, new ServerSessionPool () 692 { 693 public ServerSession getServerSession() 694 { 695 Logger.getLogger(getClass().getName()).debug("Getting server session."); 696 return new ServerSession () 697 { 698 public Session getSession() 699 { 700 return session; 701 } 702 public void start() 703 { 704 Logger.getLogger(getClass().getName()).debug("Starting server session."); 705 session.run(); 706 } 707 }; 708 } 709 }, 10); 710 711 queueConnection.start(); 712 713 try 714 { 715 Thread.sleep(5 * 1000); 716 } 717 catch (Exception e) 718 { 719 } 720 721 disconnect(); 722 getLog().debug("Testing app server stuff passed"); 723 } 724 725 743 private void drainMessagesForTopic(TopicSubscriber sub) throws JMSException 744 { 745 Message msg = sub.receive(50); 746 int c = 0; 747 while (msg != null) 748 { 749 c++; 750 if (msg instanceof TextMessage ) 751 getLog().debug(((TextMessage ) msg).getText()); 752 msg = sub.receive(50); 753 } 754 getLog().debug("Received " + c + " messages from topic."); 755 } 756 757 public void testTopics() throws Exception 758 { 759 getLog().debug("Starting Topic test"); 760 connect(); 761 762 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 763 topicConnection = topicFactory.createTopicConnection("john", "needle"); 764 765 topicConnection.start(); 766 767 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 769 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 770 771 TopicSubscriber sub1 = session.createDurableSubscriber(topic, "sub1"); 772 TopicSubscriber sub2 = session.createSubscriber(topic); 773 TopicSubscriber sub3 = session.createSubscriber(topic); 774 775 TopicPublisher sender = session.createPublisher(topic); 777 778 sender.publish(session.createTextMessage("Message 1")); 780 sender.publish(session.createTextMessage("Message 2")); 781 sender.publish(session.createTextMessage("Message 3")); 782 drainMessagesForTopic(sub1); 783 drainMessagesForTopic(sub2); 784 drainMessagesForTopic(sub3); 785 786 sub1.close(); 788 sub2.close(); 789 790 sender.publish(session.createTextMessage("Message 4")); 792 sender.publish(session.createTextMessage("Message 5")); 793 sender.publish(session.createTextMessage("Message 6")); 794 795 try 797 { 798 Thread.sleep(5 * 1000); 799 } 800 catch (InterruptedException e) 801 { 802 } 803 804 drainMessagesForTopic(sub3); 805 806 sub1 = session.createDurableSubscriber(topic, "sub1"); 808 sub2 = session.createSubscriber(topic); 809 810 sender.publish(session.createTextMessage("Final message")); 812 sender.close(); 813 814 drainMessagesForTopic(sub1); 815 drainMessagesForTopic(sub2); 816 drainMessagesForTopic(sub3); 817 818 sub1.close(); 819 sub2.close(); 820 sub3.close(); 821 822 session.unsubscribe("sub1"); 823 824 topicConnection.stop(); 825 topicConnection.close(); 826 827 disconnect(); 828 getLog().debug("Topic test passed"); 829 } 830 831 836 public void testTopicNoLocal() throws Exception 837 { 838 getLog().debug("Starting TopicNoLocal test"); 839 connect(); 840 841 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 842 TopicConnection topicConnection1 = topicFactory.createTopicConnection(); 843 topicConnection1.start(); 844 TopicConnection topicConnection2 = topicFactory.createTopicConnection(); 845 topicConnection2.start(); 846 847 TopicSession session1 = topicConnection1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 849 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 850 TopicSubscriber subscriber1 = session1.createSubscriber(topic, null, true); 851 TopicPublisher sender1 = session1.createPublisher(topic); 852 853 TopicSession session2 = topicConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 855 TopicPublisher sender2 = session2.createPublisher(topic); 856 857 drainMessagesForTopic(subscriber1); 858 859 sender1.publish(session1.createTextMessage("Local Message")); 861 sender2.publish(session2.createTextMessage("Remote Message")); 862 863 TextMessage msg1 = (TextMessage ) subscriber1.receive(2000); 866 if (msg1 == null) 867 { 868 fail("Did not get any messages"); 869 } 870 else 871 { 872 getLog().debug("Got message: " + msg1); 873 if (msg1.getText().equals("Local Message")) 874 { 875 fail("Got a local message"); 876 } 877 TextMessage msg2 = (TextMessage ) subscriber1.receive(2000); 878 if (msg2 != null) 879 { 880 getLog().debug("Got message: " + msg2); 881 fail("Got an extra message. msg1:" + msg1 + ", msg2:" + msg2); 882 } 883 } 884 885 topicConnection1.stop(); 886 topicConnection1.close(); 887 topicConnection2.stop(); 888 topicConnection2.close(); 889 890 disconnect(); 891 getLog().debug("TopicNoLocal test passed"); 892 } 893 894 898 public void testTopicNoLocalBounce() throws Exception 899 { 900 getLog().debug("Starting TopicNoLocalBounce test"); 901 connect(); 902 903 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 904 TopicConnection topicConnection1 = topicFactory.createTopicConnection(); 905 topicConnection1.start(); 906 TopicConnection topicConnection2 = topicFactory.createTopicConnection(); 907 topicConnection2.start(); 908 909 TopicSession session1 = topicConnection1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 911 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 912 TopicSubscriber subscriber1 = session1.createSubscriber(topic, null, true); 913 TopicPublisher sender1 = session1.createPublisher(topic); 914 915 TopicSession session2 = topicConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 917 TopicSubscriber subscriber2 = session2.createSubscriber(topic, null, true); 918 TopicPublisher sender2 = session2.createPublisher(topic); 919 920 drainMessagesForTopic(subscriber1); 921 drainMessagesForTopic(subscriber2); 922 923 sender1.publish(session1.createTextMessage("Message")); 925 926 assertTrue("Subscriber1 should not get a message", subscriber1.receiveNoWait() == null); 927 TextMessage msg = (TextMessage ) subscriber2.receive(2000); 928 assertTrue("Subscriber2 should get a message, got " + msg, msg != null && msg.getText().equals("Message")); 929 930 sender2.publish(msg); 932 933 msg = (TextMessage ) subscriber1.receive(2000); 934 assertTrue("Subscriber1 should get a message, got " + msg, msg != null && msg.getText().equals("Message")); 935 assertTrue("Subscriber2 should not get a message", subscriber2.receiveNoWait() == null); 936 937 topicConnection1.stop(); 938 topicConnection1.close(); 939 topicConnection2.stop(); 940 topicConnection2.close(); 941 942 disconnect(); 943 getLog().debug("TopicNoLocalBounce test passed"); 944 } 945 946 949 public void testTopicSelectorChange() throws Exception 950 { 951 getLog().debug("Starting TopicSelectorChange test"); 952 953 getLog().debug("Create topic connection"); 954 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 955 topicConnection = topicFactory.createTopicConnection("john", "needle"); 956 topicConnection.start(); 957 958 try 959 { 960 getLog().debug("Retrieving Topic"); 961 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 962 963 getLog().debug("Creating a send session"); 964 TopicSession sendSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 965 TopicPublisher sender = sendSession.createPublisher(topic); 966 967 getLog().debug("Clearing the topic"); 968 TopicSession subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 969 TopicSubscriber subscriber = subSession.createDurableSubscriber(topic, "test"); 970 Message message = subscriber.receive(50); 971 while (message != null) 972 message = subscriber.receive(50); 973 subSession.close(); 974 975 getLog().debug("Subscribing to topic, looking for Value = 'A'"); 976 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 977 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false); 978 979 getLog().debug("Send some messages"); 980 message = sendSession.createTextMessage("Message1"); 981 message.setStringProperty("Value", "A"); 982 sender.publish(message); 983 message = sendSession.createTextMessage("Message2"); 984 message.setStringProperty("Value", "A"); 985 sender.publish(message); 986 message = sendSession.createTextMessage("Message3"); 987 message.setStringProperty("Value", "B"); 988 sender.publish(message); 989 990 getLog().debug("Retrieving the A messages"); 991 message = subscriber.receive(2000); 992 assertTrue("Expected message 1", message != null); 993 assertTrue("Should get an A", message.getStringProperty("Value").equals("A")); 994 message = subscriber.receive(2000); 995 assertTrue("Expected message 2", message != null); 996 assertTrue("Should get a second A", message.getStringProperty("Value").equals("A")); 997 assertTrue("That should be it for A", subscriber.receive(2000) == null); 998 999 getLog().debug("Closing the subscriber without acknowledgement"); 1000 subSession.close(); 1001 1002 getLog().debug("Subscribing to topic, looking for Value = 'B'"); 1003 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1004 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'B'", false); 1005 1006 getLog().debug("Retrieving the non-existent B messages"); 1007 assertTrue("B should not be there", subscriber.receive(2000) == null); 1008 1009 getLog().debug("Closing the subscriber."); 1010 subSession.close(); 1011 1012 getLog().debug("Subscribing to topic, looking for those Value = 'A'"); 1013 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1014 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false); 1015 assertTrue("Should not be any A the subscription was changed", subscriber.receive(2000) == null); 1016 subSession.close(); 1017 1018 getLog().debug("Subscribing to topic, looking for everything"); 1019 subSession = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 1020 subscriber = subSession.createDurableSubscriber(topic, "test", null, false); 1021 1022 message = sendSession.createTextMessage("Message4"); 1023 message.setStringProperty("Value", "A"); 1024 sender.publish(message); 1025 1026 message = subscriber.receive(2000); 1027 assertTrue("Expected message 4", message != null); 1028 assertTrue("Should be an A which we don't acknowledge", message.getStringProperty("Value").equals("A")); 1029 subSession.close(); 1030 1031 getLog().debug("Subscribing to topic, looking for the Value = 'A'"); 1032 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1033 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false); 1034 assertTrue( 1035 "Should not be any A, the subscription was changed. Even though the old and new selectors match the message", 1036 subscriber.receive(2000) == null); 1037 subSession.close(); 1038 1039 getLog().debug("Closing the send session"); 1040 sendSession.close(); 1041 1042 getLog().debug("Removing the subscription"); 1043 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1044 subSession.unsubscribe("test"); 1045 1046 } 1047 finally 1048 { 1049 getLog().debug("Closing the connection"); 1050 topicConnection.close(); 1051 } 1052 1053 getLog().debug("TopicSelectorChange test passed"); 1054 } 1055 1056 1059 public void testTopicSelectorNullOrEmpty() throws Exception 1060 { 1061 getLog().debug("Starting TopicSelectorNullOrEmpty test"); 1062 1063 getLog().debug("Create topic connection"); 1064 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 1065 topicConnection = topicFactory.createTopicConnection("john", "needle"); 1066 topicConnection.start(); 1067 1068 try 1069 { 1070 getLog().debug("Retrieving Topic"); 1071 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 1072 1073 getLog().debug("Creating a send session"); 1074 TopicSession sendSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1075 TopicPublisher sender = sendSession.createPublisher(topic); 1076 1077 getLog().debug("Clearing the topic"); 1078 TopicSession subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1079 TopicSubscriber subscriber = subSession.createDurableSubscriber(topic, "test"); 1080 TextMessage message = (TextMessage ) subscriber.receive(50); 1081 while (message != null) 1082 message = (TextMessage ) subscriber.receive(50); 1083 subSession.close(); 1084 1085 getLog().debug("Subscribing to topic, with null selector"); 1086 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1087 subscriber = subSession.createDurableSubscriber(topic, "test", null, false); 1088 1089 getLog().debug("Send a message"); 1090 message = sendSession.createTextMessage("Message1"); 1091 sender.publish(message); 1092 1093 getLog().debug("Retrieving the message"); 1094 message = (TextMessage ) subscriber.receive(2000); 1095 assertTrue("Expected message 1", message != null); 1096 assertTrue("Should get Message1", message.getText().equals("Message1")); 1097 getLog().debug("Closing the subscriber"); 1098 subSession.close(); 1099 1100 getLog().debug("Subscribing to topic, with an empty selector"); 1101 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1102 subscriber = subSession.createDurableSubscriber(topic, "test", " ", false); 1103 1104 getLog().debug("Send a message"); 1105 message = sendSession.createTextMessage("Message2"); 1106 sender.publish(message); 1107 1108 getLog().debug("Retrieving the message"); 1109 message = (TextMessage ) subscriber.receive(2000); 1110 assertTrue("Expected message 2", message != null); 1111 assertTrue("Should get Message2", message.getText().equals("Message2")); 1112 getLog().debug("Closing the subscriber"); 1113 1114 getLog().debug("Removing the subscription"); 1115 subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 1116 subSession.unsubscribe("test"); 1117 subSession.close(); 1118 1119 } 1120 finally 1121 { 1122 getLog().debug("Closing the connection"); 1123 topicConnection.close(); 1124 } 1125 1126 getLog().debug("TopicSelectorNullOrEmpty test passed"); 1127 } 1128 1129 1132 public void testSendReceiveOutdated() throws Exception 1133 { 1134 getLog().debug("Starting SendReceiveOutdated test"); 1135 1136 connect(); 1137 try 1138 { 1139 queueConnection.start(); 1140 drainQueue(); 1141 queueConnection.stop(); 1142 1143 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 1144 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 1145 QueueSender sender = session.createSender(queue); 1146 QueueReceiver receiver = session.createReceiver(queue); 1147 1148 TextMessage message = session.createTextMessage("Outdated"); 1150 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 1151 Thread.sleep(100); 1152 1153 message = session.createTextMessage("OK"); 1155 sender.send(message); 1156 1157 queueConnection.start(); 1159 message = (TextMessage ) receiver.receiveNoWait(); 1160 assertEquals("OK", message.getText()); 1161 1162 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1164 } 1165 finally 1166 { 1167 disconnect(); 1168 } 1169 1170 getLog().debug("SendReceiveOutdated test passed"); 1171 } 1172 1173 public void testSendReceiveExpired() throws Exception 1174 { 1175 getLog().debug("Starting testSendReceiveExpired test"); 1176 1177 connect(); 1178 try 1179 { 1180 queueConnection.start(); 1181 drainQueue(); 1182 1183 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 1184 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 1185 QueueSender sender = session.createSender(queue); 1186 QueueReceiver receiver = session.createReceiver(queue); 1187 1188 TextMessage message = session.createTextMessage("5 Second Expiration"); 1190 sender.send(message, DeliveryMode.PERSISTENT, 4, 5*1000); 1191 message = session.createTextMessage("OK"); 1193 sender.send(message); 1194 Thread.sleep(6*1000); 1196 message = (TextMessage ) receiver.receiveNoWait(); 1198 assertEquals("OK", message.getText()); 1199 1200 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1202 1203 message = session.createTextMessage("10 Second Expiration"); 1205 sender.send(message, DeliveryMode.PERSISTENT, 4, 10*1000); 1206 message = session.createTextMessage("OK"); 1208 sender.send(message); 1209 Thread.sleep(1*1000); 1211 message = (TextMessage ) receiver.receiveNoWait(); 1213 assertEquals("10 Second Expiration", message.getText()); 1214 message = (TextMessage ) receiver.receiveNoWait(); 1215 assertEquals("OK", message.getText()); 1216 1217 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1219 1220 message = session.createTextMessage("5 Second Expiration"); 1222 message.setJMSExpiration(System.currentTimeMillis() + 5*1000); 1223 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 1224 message = session.createTextMessage("OK"); 1226 sender.send(message); 1227 Thread.sleep(6*1000); 1229 message = (TextMessage ) receiver.receiveNoWait(); 1231 assertEquals("5 Second Expiration", message.getText()); 1232 message = (TextMessage ) receiver.receiveNoWait(); 1233 assertEquals("OK", message.getText()); 1234 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1235 } 1236 finally 1237 { 1238 disconnect(); 1239 } 1240 } 1241 1242 class Synch 1243 { 1244 boolean waiting = false; 1245 String text; 1246 public synchronized void doWait(long timeout) throws InterruptedException 1247 { 1248 waiting = true; 1249 this.wait(timeout); 1250 } 1251 public synchronized void doNotify() throws InterruptedException 1252 { 1253 while (waiting == false) 1254 wait(100); 1255 this.notifyAll(); 1256 } 1257 public String getText() 1258 { 1259 return text; 1260 } 1261 public void setText(String text) 1262 { 1263 this.text = text; 1264 } 1265 } 1266 1267 1270 public void testSendListenOutdated() throws Exception 1271 { 1272 getLog().debug("Starting SendListenOutdated test"); 1273 1274 connect(); 1275 try 1276 { 1277 queueConnection.start(); 1278 drainQueue(); 1279 queueConnection.stop(); 1280 1281 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 1282 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 1283 QueueSender sender = session.createSender(queue); 1284 QueueReceiver receiver = session.createReceiver(queue); 1285 1286 TextMessage message = session.createTextMessage("Outdated"); 1288 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 1289 Thread.sleep(100); 1290 1291 message = session.createTextMessage("OK"); 1293 sender.send(message); 1294 1295 final Synch synch = new Synch(); 1297 MessageListener messagelistener = new MessageListener () 1298 { 1299 public void onMessage(Message message) 1300 { 1301 listenOutdated(message, synch); 1302 } 1303 }; 1304 receiver.setMessageListener(messagelistener); 1305 queueConnection.start(); 1306 1307 synch.doWait(10000); 1308 assertEquals("OK", synch.getText()); 1309 } 1310 finally 1311 { 1312 disconnect(); 1313 } 1314 1315 getLog().debug("SendListenOutdated test passed"); 1316 } 1317 1318 private void listenOutdated(Message message, Synch synch) 1319 { 1320 try 1321 { 1322 synch.setText(((TextMessage ) message).getText()); 1323 } 1324 catch (Throwable t) 1325 { 1326 log.error("Error:", t); 1327 } 1328 finally 1329 { 1330 try 1331 { 1332 synch.doNotify(); 1333 } 1334 catch (Throwable t) 1335 { 1336 log.error("Error:", t); 1337 } 1338 } 1339 } 1340} 1341 | Popular Tags |