1 22 package org.jboss.test.jbossmq.test; 23 24 import java.util.Enumeration ; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.InvalidDestinationException ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.MessageListener ; 33 import javax.jms.MessageProducer ; 34 import javax.jms.Queue ; 35 import javax.jms.QueueBrowser ; 36 import javax.jms.ServerSession ; 37 import javax.jms.ServerSessionPool ; 38 import javax.jms.Session ; 39 import javax.jms.TemporaryQueue ; 40 import javax.jms.TemporaryTopic ; 41 import javax.jms.TextMessage ; 42 import javax.jms.Topic ; 43 import javax.jms.TopicConnection ; 44 import javax.jms.TopicConnectionFactory ; 45 import javax.jms.TopicSubscriber ; 46 import javax.naming.Context ; 47 import javax.naming.InitialContext ; 48 49 import EDU.oswego.cs.dl.util.concurrent.CountDown; 50 import org.jboss.logging.Logger; 51 import org.jboss.test.JBossTestCase; 52 53 59 public class Jms11UnitTest extends JBossTestCase 60 { 61 62 static String TOPIC_FACTORY = "ConnectionFactory"; 63 64 static String QUEUE_FACTORY = "ConnectionFactory"; 65 66 static String TEST_QUEUE = "queue/testQueue"; 67 static String TEST_TOPIC = "topic/testTopic"; 68 static String TEST_DURABLE_TOPIC = "topic/testDurableTopic"; 69 70 static Context context; 72 static Connection queueConnection; 73 static Connection topicConnection; 74 75 public Jms11UnitTest(String name) throws Exception 76 { 77 super(name); 78 } 79 80 protected void drainQueue() throws Exception 82 { 83 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 84 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 85 86 MessageConsumer receiver = session.createConsumer(queue); 87 Message message = receiver.receive(50); 88 int c = 0; 89 while (message != null) 90 { 91 message = receiver.receive(50); 92 c++; 93 } 94 95 if (c != 0) 96 getLog().debug(" Drained " + c + " messages from the queue"); 97 98 session.close(); 99 } 100 101 protected void connect() throws Exception 102 { 103 if (context == null) 104 { 105 context = new InitialContext (); 106 } 107 ConnectionFactory queueFactory = (ConnectionFactory ) context.lookup(QUEUE_FACTORY); 108 queueConnection = queueFactory.createConnection(); 109 110 ConnectionFactory topicFactory = (ConnectionFactory ) context.lookup(TOPIC_FACTORY); 111 topicConnection = topicFactory.createConnection(); 112 getLog().debug("Connection to JBossMQ established."); 113 } 114 115 protected void disconnect() throws Exception 116 { 117 queueConnection.close(); 118 topicConnection.close(); 119 } 120 121 130 public void testQueueMessageOrder() throws Exception 131 { 132 133 getLog().debug("Starting QueueMessageOrder test"); 134 135 connect(); 136 137 queueConnection.start(); 138 139 drainQueue(); 140 141 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 142 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 143 MessageProducer sender = session.createProducer(queue); 144 145 TextMessage message = session.createTextMessage(); 146 message.setText("Normal message"); 147 sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0); 148 message.setText("Persistent message"); 149 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 150 message.setText("High Priority Persistent message"); 151 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 152 153 QueueBrowser browser = session.createBrowser(queue); 154 Enumeration i = browser.getEnumeration(); 155 getLog().debug(message.getText()); 156 157 message = (TextMessage ) i.nextElement(); 158 getLog().debug(message.getText()); 159 160 message = (TextMessage ) i.nextElement(); 161 getLog().debug(message.getText()); 162 163 disconnect(); 164 getLog().debug("QueueMessageOrder passed"); 165 } 166 167 170 public void testTemporaryQueueDelete() throws Exception 171 { 172 173 getLog().debug("Starting TemporaryQueueDelete test"); 174 connect(); 175 176 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 177 TemporaryQueue queue = session.createTemporaryQueue(); 178 179 queue.delete(); 180 181 disconnect(); 182 183 getLog().debug("TemporaryQueueDelete passed"); 184 } 185 186 189 public void testTemporaryTopicDelete() throws Exception 190 { 191 192 getLog().debug("Starting TemporaryTopicDelete test"); 193 connect(); 194 195 Session session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 196 TemporaryTopic topic = session.createTemporaryTopic(); 197 198 topic.delete(); 199 200 disconnect(); 201 202 getLog().debug("TemporaryTopicDelete passed"); 203 } 204 205 208 public void testInvalidDestinationQueueSend() throws Exception 209 { 210 211 getLog().debug("Starting InvaidDestinationQueueSend test"); 212 connect(); 213 214 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 215 TemporaryQueue queue = session.createTemporaryQueue(); 216 MessageProducer sender = session.createProducer(queue); 217 queue.delete(); 218 219 TextMessage message = session.createTextMessage("hello"); 220 boolean caught = false; 221 try 222 { 223 sender.send(message); 224 } 225 catch (InvalidDestinationException expected) 226 { 227 caught = true; 228 } 229 230 disconnect(); 231 232 assertTrue("Expected an InvalidDestinationException", caught); 233 234 getLog().debug("InvaldDestinationQueueSend passed"); 235 } 236 237 240 public void testInvalidDestinationQueueBrowse() throws Exception 241 { 242 243 getLog().debug("Starting InvalidDestinationQueueBrowse test"); 244 connect(); 245 246 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 247 TemporaryQueue queue = session.createTemporaryQueue(); 248 QueueBrowser browser = session.createBrowser(queue); 249 queue.delete(); 250 251 boolean caught = false; 252 try 253 { 254 browser.getEnumeration(); 255 } 256 catch (InvalidDestinationException expected) 257 { 258 caught = true; 259 } 260 261 disconnect(); 262 263 assertTrue("Expected an InvalidDestinationException", caught); 264 265 getLog().debug("InvalidDestinationQueueBrowse passed"); 266 } 267 268 271 public void testInvalidDestinationTopicPublish() throws Exception 272 { 273 274 getLog().debug("Starting InvaidDestinationTopicPublish test"); 275 connect(); 276 277 Session session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 278 TemporaryTopic topic = session.createTemporaryTopic(); 279 MessageProducer publisher = session.createProducer(topic); 280 topic.delete(); 281 282 TextMessage message = session.createTextMessage("hello"); 283 boolean caught = false; 284 try 285 { 286 publisher.send(message); 287 } 288 catch (InvalidDestinationException expected) 289 { 290 caught = true; 291 } 292 293 disconnect(); 294 295 assertTrue("Expected an InvalidDestinationException", caught); 296 297 getLog().debug("InvaldDestinationTopicPublish passed"); 298 } 299 300 303 public void testErrorsTopicSubscribe() throws Exception 304 { 305 306 getLog().debug("Starting InvalidDestinationTopicSubscribe test"); 307 connect(); 308 309 try 310 { 311 Session session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 312 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 313 TemporaryTopic temp = session.createTemporaryTopic(); 314 315 boolean caught = false; 316 try 317 { 318 session.createConsumer(null); 319 } 320 catch (InvalidDestinationException expected) 321 { 322 caught = true; 323 } 324 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 325 326 caught = false; 327 try 328 { 329 session.createConsumer(null, null, true); 330 } 331 catch (InvalidDestinationException expected) 332 { 333 caught = true; 334 } 335 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 336 337 caught = false; 338 try 339 { 340 session.createDurableSubscriber(null, "NotUsed"); 341 } 342 catch (InvalidDestinationException expected) 343 { 344 caught = true; 345 } 346 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 347 348 caught = false; 349 try 350 { 351 session.createDurableSubscriber(temp, "NotUsed"); 352 } 353 catch (InvalidDestinationException expected) 354 { 355 caught = true; 356 } 357 assertTrue("Expected an InvalidDestinationException for a temporary topic", caught); 358 359 caught = false; 360 try 361 { 362 session.createDurableSubscriber(null, "NotUsed", null, true); 363 } 364 catch (InvalidDestinationException expected) 365 { 366 caught = true; 367 } 368 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 369 370 caught = false; 371 try 372 { 373 session.createDurableSubscriber(temp, "NotUsed", null, true); 374 } 375 catch (InvalidDestinationException expected) 376 { 377 caught = true; 378 } 379 assertTrue("Expected an InvalidDestinationException for a temporary topic", caught); 380 381 caught = false; 382 try 383 { 384 session.createDurableSubscriber(topic, null); 385 } 386 catch (Exception expected) 387 { 388 caught = true; 389 } 390 assertTrue("Expected a Exception for a null subscription", caught); 391 392 caught = false; 393 try 394 { 395 session.createDurableSubscriber(topic, null, null, false); 396 } 397 catch (Exception expected) 398 { 399 caught = true; 400 } 401 assertTrue("Expected a Exception for a null subscription", caught); 402 403 caught = false; 404 try 405 { 406 session.createDurableSubscriber(topic, " "); 407 } 408 catch (Exception expected) 409 { 410 caught = true; 411 } 412 assertTrue("Expected a Exception for an empty subscription", caught); 413 414 caught = false; 415 try 416 { 417 session.createDurableSubscriber(topic, " ", null, false); 418 } 419 catch (Exception expected) 420 { 421 caught = true; 422 } 423 assertTrue("Expected a Exception for an empty subscription", caught); 424 } 425 finally 426 { 427 disconnect(); 428 } 429 430 getLog().debug("InvalidDestinationTopicSubscriber passed"); 431 } 432 433 436 public void testCreateQueue() throws Exception 437 { 438 439 getLog().debug("Starting create queue test"); 440 connect(); 441 442 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 443 444 Queue jndiQueue = (Queue ) getInitialContext().lookup("queue/testQueue"); 445 Queue createQueue = session.createQueue(jndiQueue.getQueueName()); 446 assertTrue("Failed for " + QUEUE_FACTORY, jndiQueue.equals(createQueue)); 447 448 getLog().debug("InvalidDestinationTopicSubscriber passed"); 449 } 450 451 public void testMessageListener() throws Exception 452 { 453 getLog().debug("Starting create queue test"); 454 455 connect(); 456 queueConnection.start(); 457 drainQueue(); 458 final CountDown counter1 = new CountDown(3); 459 460 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 461 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 462 463 MessageConsumer receiver = session.createConsumer(queue); 464 receiver.setMessageListener(new MessageListener () 465 { 466 public void onMessage(Message msg) 467 { 468 Logger log = Logger.getLogger(getClass().getName()); 469 log.debug("ML"); 470 try 471 { 472 if (msg instanceof TextMessage ) 473 { 474 log.debug(((TextMessage ) msg).getText()); 475 counter1.release(); 476 } 477 } 478 catch (Exception e) 479 { 480 } 481 } 482 }); 483 484 MessageProducer sender = session.createProducer(queue); 485 486 TextMessage message = session.createTextMessage(); 487 message.setText("Normal message"); 488 sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0); 489 message.setText("Persistent message"); 491 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 492 message.setText("High Priority Persistent message"); 494 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 495 497 counter1.acquire(); 499 log.debug("MessageListener1 received the TMs sent"); 500 501 final CountDown counter2 = new CountDown(2); 502 receiver.setMessageListener(new MessageListener () 503 { 504 public void onMessage(Message msg) 505 { 506 Logger log = Logger.getLogger(getClass().getName()); 507 log.debug("ML 2"); 508 try 509 { 510 if (msg instanceof TextMessage ) 511 { 512 log.debug(((TextMessage ) msg).getText()); 513 counter2.release(); 514 } 515 } 516 catch (Exception e) 517 { 518 } 519 } 520 }); 521 522 message.setText("Persistent message"); 523 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 524 message.setText("High Priority Persistent message"); 526 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 527 529 counter2.acquire(); 531 log.debug("MessageListener2 received the TMs sent"); 532 533 receiver.setMessageListener(null); 534 535 message.setText("Persistent message"); 536 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 537 message.setText("High Priority Persistent message"); 539 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 540 542 sender.close(); 543 drainQueue(); 544 disconnect(); 545 getLog().debug("MessageListener test passed"); 546 } 547 548 public void testApplicationServerStuff() throws Exception 549 { 550 getLog().debug("Starting testing app server stuff"); 551 connect(); 552 553 Queue testQueue = (Queue ) context.lookup(TEST_QUEUE); 554 final Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 555 556 session.setMessageListener(new MessageListener () 557 { 558 public void onMessage(Message mess) 559 { 560 Logger log = Logger.getLogger(getClass().getName()); 561 log.debug("Processing message"); 562 try 563 { 564 if (mess instanceof TextMessage ) 565 log.debug(((TextMessage ) mess).getText()); 566 } 567 catch (Exception e) 568 { 569 log.error("Error", e); 570 } 571 } 572 }); 573 574 MessageProducer sender = session.createProducer(testQueue); 575 sender.send(session.createTextMessage("Hi")); 576 sender.send(session.createTextMessage("There")); 577 sender.send(session.createTextMessage("Guys")); 578 queueConnection.createConnectionConsumer(testQueue, null, new ServerSessionPool () 579 { 580 public ServerSession getServerSession() 581 { 582 Logger.getLogger(getClass().getName()).debug("Getting server session."); 583 return new ServerSession () 584 { 585 public Session getSession() 586 { 587 return session; 588 } 589 public void start() 590 { 591 Logger.getLogger(getClass().getName()).debug("Starting server session."); 592 session.run(); 593 } 594 }; 595 } 596 }, 10); 597 598 queueConnection.start(); 599 600 try 601 { 602 Thread.sleep(5 * 1000); 603 } 604 catch (Exception e) 605 { 606 } 607 608 disconnect(); 609 getLog().debug("Testing app server stuff passed"); 610 } 611 612 private void drainMessagesForTopic(MessageConsumer sub) throws JMSException 613 { 614 Message msg = sub.receive(50); 615 int c = 0; 616 while (msg != null) 617 { 618 c++; 619 if (msg instanceof TextMessage ) 620 getLog().debug(((TextMessage ) msg).getText()); 621 msg = sub.receive(50); 622 } 623 getLog().debug("Received " + c + " messages from topic."); 624 } 625 626 public void testTopics() throws Exception 627 { 628 getLog().debug("Starting Topic test"); 629 connect(); 630 631 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 632 topicConnection = topicFactory.createTopicConnection("john", "needle"); 633 634 topicConnection.start(); 635 636 Session session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 638 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 639 640 TopicSubscriber sub1 = session.createDurableSubscriber(topic, "sub1"); 641 MessageConsumer sub2 = session.createConsumer(topic); 642 MessageConsumer sub3 = session.createConsumer(topic); 643 644 MessageProducer sender = session.createProducer(topic); 646 647 sender.send(session.createTextMessage("Message 1")); 649 sender.send(session.createTextMessage("Message 2")); 650 sender.send(session.createTextMessage("Message 3")); 651 drainMessagesForTopic(sub1); 652 drainMessagesForTopic(sub2); 653 drainMessagesForTopic(sub3); 654 655 sub1.close(); 657 sub2.close(); 658 659 sender.send(session.createTextMessage("Message 4")); 661 sender.send(session.createTextMessage("Message 5")); 662 sender.send(session.createTextMessage("Message 6")); 663 664 try 666 { 667 Thread.sleep(5 * 1000); 668 } 669 catch (InterruptedException e) 670 { 671 } 672 673 drainMessagesForTopic(sub3); 674 675 sub1 = session.createDurableSubscriber(topic, "sub1"); 677 sub2 = session.createConsumer(topic); 678 679 sender.send(session.createTextMessage("Final message")); 681 sender.close(); 682 683 drainMessagesForTopic(sub1); 684 drainMessagesForTopic(sub2); 685 drainMessagesForTopic(sub3); 686 687 sub1.close(); 688 sub2.close(); 689 sub3.close(); 690 691 session.unsubscribe("sub1"); 692 693 topicConnection.stop(); 694 topicConnection.close(); 695 696 disconnect(); 697 getLog().debug("Topic test passed"); 698 } 699 700 705 public void testTopicNoLocal() throws Exception 706 { 707 getLog().debug("Starting TopicNoLocal test"); 708 connect(); 709 710 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 711 TopicConnection topicConnection1 = topicFactory.createTopicConnection(); 712 topicConnection1.start(); 713 TopicConnection topicConnection2 = topicFactory.createTopicConnection(); 714 topicConnection2.start(); 715 716 Session session1 = topicConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE); 718 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 719 MessageConsumer subscriber1 = session1.createConsumer(topic, null, true); 720 MessageProducer sender1 = session1.createProducer(topic); 721 722 Session session2 = topicConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE); 724 MessageProducer sender2 = session2.createProducer(topic); 725 726 drainMessagesForTopic(subscriber1); 727 728 sender1.send(session1.createTextMessage("Local Message")); 730 sender2.send(session2.createTextMessage("Remote Message")); 731 732 TextMessage msg1 = (TextMessage ) subscriber1.receive(2000); 735 if (msg1 == null) 736 { 737 fail("Did not get any messages"); 738 } 739 else 740 { 741 getLog().debug("Got message: " + msg1); 742 if (msg1.getText().equals("Local Message")) 743 { 744 fail("Got a local message"); 745 } 746 TextMessage msg2 = (TextMessage ) subscriber1.receive(2000); 747 if (msg2 != null) 748 { 749 getLog().debug("Got message: " + msg2); 750 fail("Got an extra message. msg1:" + msg1 + ", msg2:" + msg2); 751 } 752 } 753 754 topicConnection1.stop(); 755 topicConnection1.close(); 756 topicConnection2.stop(); 757 topicConnection2.close(); 758 759 disconnect(); 760 getLog().debug("TopicNoLocal test passed"); 761 } 762 763 767 public void testTopicNoLocalBounce() throws Exception 768 { 769 getLog().debug("Starting TopicNoLocalBounce test"); 770 connect(); 771 772 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 773 TopicConnection topicConnection1 = topicFactory.createTopicConnection(); 774 topicConnection1.start(); 775 TopicConnection topicConnection2 = topicFactory.createTopicConnection(); 776 topicConnection2.start(); 777 778 Session session1 = topicConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE); 780 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 781 MessageConsumer subscriber1 = session1.createConsumer(topic, null, true); 782 MessageProducer sender1 = session1.createProducer(topic); 783 784 Session session2 = topicConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE); 786 MessageConsumer subscriber2 = session2.createConsumer(topic, null, true); 787 MessageProducer sender2 = session2.createProducer(topic); 788 789 drainMessagesForTopic(subscriber1); 790 drainMessagesForTopic(subscriber2); 791 792 sender1.send(session1.createTextMessage("Message")); 794 795 assertTrue("Subscriber1 should not get a message", subscriber1.receiveNoWait() == null); 796 TextMessage msg = (TextMessage ) subscriber2.receive(2000); 797 assertTrue("Subscriber2 should get a message, got " + msg, msg != null && msg.getText().equals("Message")); 798 799 sender2.send(msg); 801 802 msg = (TextMessage ) subscriber1.receive(2000); 803 assertTrue("Subscriber1 should get a message, got " + msg, msg != null && msg.getText().equals("Message")); 804 assertTrue("Subscriber2 should not get a message", subscriber2.receiveNoWait() == null); 805 806 topicConnection1.stop(); 807 topicConnection1.close(); 808 topicConnection2.stop(); 809 topicConnection2.close(); 810 811 disconnect(); 812 getLog().debug("TopicNoLocalBounce test passed"); 813 } 814 815 818 public void testTopicSelectorChange() throws Exception 819 { 820 getLog().debug("Starting TopicSelectorChange test"); 821 822 getLog().debug("Create topic connection"); 823 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 824 topicConnection = topicFactory.createTopicConnection("john", "needle"); 825 topicConnection.start(); 826 827 try 828 { 829 getLog().debug("Retrieving Topic"); 830 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 831 832 getLog().debug("Creating a send session"); 833 Session sendSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 834 MessageProducer sender = sendSession.createProducer(topic); 835 836 getLog().debug("Clearing the topic"); 837 Session subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 838 MessageConsumer subscriber = subSession.createDurableSubscriber(topic, "test"); 839 Message message = subscriber.receive(50); 840 while (message != null) 841 message = subscriber.receive(50); 842 subSession.close(); 843 844 getLog().debug("Subscribing to topic, looking for Value = 'A'"); 845 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 846 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false); 847 848 getLog().debug("Send some messages"); 849 message = sendSession.createTextMessage("Message1"); 850 message.setStringProperty("Value", "A"); 851 sender.send(message); 852 message = sendSession.createTextMessage("Message2"); 853 message.setStringProperty("Value", "A"); 854 sender.send(message); 855 message = sendSession.createTextMessage("Message3"); 856 message.setStringProperty("Value", "B"); 857 sender.send(message); 858 859 getLog().debug("Retrieving the A messages"); 860 message = subscriber.receive(2000); 861 assertTrue("Expected message 1", message != null); 862 assertTrue("Should get an A", message.getStringProperty("Value").equals("A")); 863 message = subscriber.receive(2000); 864 assertTrue("Expected message 2", message != null); 865 assertTrue("Should get a second A", message.getStringProperty("Value").equals("A")); 866 assertTrue("That should be it for A", subscriber.receive(2000) == null); 867 868 getLog().debug("Closing the subscriber without acknowledgement"); 869 subSession.close(); 870 871 getLog().debug("Subscribing to topic, looking for Value = 'B'"); 872 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 873 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'B'", false); 874 875 getLog().debug("Retrieving the non-existent B messages"); 876 assertTrue("B should not be there", subscriber.receive(2000) == null); 877 878 getLog().debug("Closing the subscriber."); 879 subSession.close(); 880 881 getLog().debug("Subscribing to topic, looking for those Value = 'A'"); 882 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 883 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false); 884 assertTrue("Should not be any A the subscription was changed", subscriber.receive(2000) == null); 885 subSession.close(); 886 887 getLog().debug("Subscribing to topic, looking for everything"); 888 subSession = topicConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 889 subscriber = subSession.createDurableSubscriber(topic, "test", null, false); 890 891 message = sendSession.createTextMessage("Message4"); 892 message.setStringProperty("Value", "A"); 893 sender.send(message); 894 895 message = subscriber.receive(2000); 896 assertTrue("Expected message 4", message != null); 897 assertTrue("Should be an A which we don't acknowledge", message.getStringProperty("Value").equals("A")); 898 subSession.close(); 899 900 getLog().debug("Subscribing to topic, looking for the Value = 'A'"); 901 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 902 subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false); 903 assertTrue( 904 "Should not be any A, the subscription was changed. Even though the old and new selectors match the message", 905 subscriber.receive(2000) == null); 906 subSession.close(); 907 908 getLog().debug("Closing the send session"); 909 sendSession.close(); 910 911 getLog().debug("Removing the subscription"); 912 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 913 subSession.unsubscribe("test"); 914 915 } 916 finally 917 { 918 getLog().debug("Closing the connection"); 919 topicConnection.close(); 920 } 921 922 getLog().debug("TopicSelectorChange test passed"); 923 } 924 925 928 public void testTopicSelectorNullOrEmpty() throws Exception 929 { 930 getLog().debug("Starting TopicSelectorNullOrEmpty test"); 931 932 getLog().debug("Create topic connection"); 933 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 934 topicConnection = topicFactory.createTopicConnection("john", "needle"); 935 topicConnection.start(); 936 937 try 938 { 939 getLog().debug("Retrieving Topic"); 940 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 941 942 getLog().debug("Creating a send session"); 943 Session sendSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 944 MessageProducer sender = sendSession.createProducer(topic); 945 946 getLog().debug("Clearing the topic"); 947 Session subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 948 MessageConsumer subscriber = subSession.createDurableSubscriber(topic, "test"); 949 TextMessage message = (TextMessage ) subscriber.receive(50); 950 while (message != null) 951 message = (TextMessage ) subscriber.receive(50); 952 subSession.close(); 953 954 getLog().debug("Subscribing to topic, with null selector"); 955 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 956 subscriber = subSession.createDurableSubscriber(topic, "test", null, false); 957 958 getLog().debug("Send a message"); 959 message = sendSession.createTextMessage("Message1"); 960 sender.send(message); 961 962 getLog().debug("Retrieving the message"); 963 message = (TextMessage ) subscriber.receive(2000); 964 assertTrue("Expected message 1", message != null); 965 assertTrue("Should get Message1", message.getText().equals("Message1")); 966 getLog().debug("Closing the subscriber"); 967 subSession.close(); 968 969 getLog().debug("Subscribing to topic, with an empty selector"); 970 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 971 subscriber = subSession.createDurableSubscriber(topic, "test", " ", false); 972 973 getLog().debug("Send a message"); 974 message = sendSession.createTextMessage("Message2"); 975 sender.send(message); 976 977 getLog().debug("Retrieving the message"); 978 message = (TextMessage ) subscriber.receive(2000); 979 assertTrue("Expected message 2", message != null); 980 assertTrue("Should get Message2", message.getText().equals("Message2")); 981 getLog().debug("Closing the subscriber"); 982 983 getLog().debug("Removing the subscription"); 984 subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 985 subSession.unsubscribe("test"); 986 subSession.close(); 987 988 } 989 finally 990 { 991 getLog().debug("Closing the connection"); 992 topicConnection.close(); 993 } 994 995 getLog().debug("TopicSelectorNullOrEmpty test passed"); 996 } 997 998 1001 public void testSendReceiveOutdated() throws Exception 1002 { 1003 getLog().debug("Starting SendReceiveOutdated test"); 1004 1005 connect(); 1006 try 1007 { 1008 queueConnection.start(); 1009 drainQueue(); 1010 queueConnection.stop(); 1011 1012 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 1013 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 1014 MessageProducer sender = session.createProducer(queue); 1015 MessageConsumer receiver = session.createConsumer(queue); 1016 1017 TextMessage message = session.createTextMessage("Outdated"); 1019 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 1020 Thread.sleep(100); 1021 1022 message = session.createTextMessage("OK"); 1024 sender.send(message); 1025 1026 queueConnection.start(); 1028 message = (TextMessage ) receiver.receiveNoWait(); 1029 assertEquals("OK", message.getText()); 1030 1031 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1033 } 1034 finally 1035 { 1036 disconnect(); 1037 } 1038 1039 getLog().debug("SendReceiveOutdated test passed"); 1040 } 1041 1042 public void testSendReceiveExpired() throws Exception 1043 { 1044 getLog().debug("Starting testSendReceiveExpired test"); 1045 1046 connect(); 1047 try 1048 { 1049 queueConnection.start(); 1050 drainQueue(); 1051 1052 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 1053 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 1054 MessageProducer sender = session.createProducer(queue); 1055 MessageConsumer receiver = session.createConsumer(queue); 1056 1057 TextMessage message = session.createTextMessage("5 Second Expiration"); 1059 sender.send(message, DeliveryMode.PERSISTENT, 4, 5*1000); 1060 message = session.createTextMessage("OK"); 1062 sender.send(message); 1063 Thread.sleep(6*1000); 1065 message = (TextMessage ) receiver.receiveNoWait(); 1067 assertEquals("OK", message.getText()); 1068 1069 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1071 1072 message = session.createTextMessage("10 Second Expiration"); 1074 sender.send(message, DeliveryMode.PERSISTENT, 4, 10*1000); 1075 message = session.createTextMessage("OK"); 1077 sender.send(message); 1078 Thread.sleep(1*1000); 1080 message = (TextMessage ) receiver.receiveNoWait(); 1082 assertEquals("10 Second Expiration", message.getText()); 1083 message = (TextMessage ) receiver.receiveNoWait(); 1084 assertEquals("OK", message.getText()); 1085 1086 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1088 1089 message = session.createTextMessage("5 Second Expiration"); 1091 message.setJMSExpiration(System.currentTimeMillis() + 5*1000); 1092 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 1093 message = session.createTextMessage("OK"); 1095 sender.send(message); 1096 Thread.sleep(6*1000); 1098 message = (TextMessage ) receiver.receiveNoWait(); 1100 assertEquals("5 Second Expiration", message.getText()); 1101 message = (TextMessage ) receiver.receiveNoWait(); 1102 assertEquals("OK", message.getText()); 1103 assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null); 1104 } 1105 finally 1106 { 1107 disconnect(); 1108 } 1109 } 1110 1111 class Synch 1112 { 1113 boolean waiting = false; 1114 String text; 1115 public synchronized void doWait(long timeout) throws InterruptedException 1116 { 1117 waiting = true; 1118 this.wait(timeout); 1119 } 1120 public synchronized void doNotify() throws InterruptedException 1121 { 1122 while (waiting == false) 1123 wait(100); 1124 this.notifyAll(); 1125 } 1126 public String getText() 1127 { 1128 return text; 1129 } 1130 public void setText(String text) 1131 { 1132 this.text = text; 1133 } 1134 } 1135 1136 1139 public void testSendListenOutdated() throws Exception 1140 { 1141 getLog().debug("Starting SendListenOutdated test"); 1142 1143 connect(); 1144 try 1145 { 1146 queueConnection.start(); 1147 drainQueue(); 1148 queueConnection.stop(); 1149 1150 Session session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 1151 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 1152 MessageProducer sender = session.createProducer(queue); 1153 MessageConsumer receiver = session.createConsumer(queue); 1154 1155 TextMessage message = session.createTextMessage("Outdated"); 1157 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 1158 Thread.sleep(100); 1159 1160 message = session.createTextMessage("OK"); 1162 sender.send(message); 1163 1164 final Synch synch = new Synch(); 1166 MessageListener messagelistener = new MessageListener () 1167 { 1168 public void onMessage(Message message) 1169 { 1170 listenOutdated(message, synch); 1171 } 1172 }; 1173 receiver.setMessageListener(messagelistener); 1174 queueConnection.start(); 1175 1176 synch.doWait(10000); 1177 assertEquals("OK", synch.getText()); 1178 } 1179 finally 1180 { 1181 disconnect(); 1182 } 1183 1184 getLog().debug("SendListenOutdated test passed"); 1185 } 1186 1187 private void listenOutdated(Message message, Synch synch) 1188 { 1189 try 1190 { 1191 synch.setText(((TextMessage ) message).getText()); 1192 } 1193 catch (Throwable t) 1194 { 1195 log.error("Error:", t); 1196 } 1197 finally 1198 { 1199 try 1200 { 1201 synch.doNotify(); 1202 } 1203 catch (Throwable t) 1204 { 1205 log.error("Error:", t); 1206 } 1207 } 1208 } 1209} 1210 | Popular Tags |