1 package net.walend.somnifugi.test; 2 3 import java.util.List ; 4 import java.util.ArrayList ; 5 import java.util.Properties ; 6 import java.util.Date ; 7 import java.util.Hashtable ; 8 9 import javax.naming.InitialContext ; 10 import javax.naming.Context ; 11 import javax.naming.NamingException ; 12 13 import javax.jms.TopicConnectionFactory ; 14 import javax.jms.TopicConnection ; 15 import javax.jms.Topic ; 16 import javax.jms.TopicSession ; 17 import javax.jms.TopicPublisher ; 18 import javax.jms.Message ; 19 import javax.jms.ObjectMessage ; 20 import javax.jms.TopicSubscriber ; 21 import javax.jms.Session ; 22 import javax.jms.JMSException ; 23 import javax.jms.MessageListener ; 24 import javax.jms.TopicRequestor ; 25 26 import junit.framework.TestSuite; 27 import junit.framework.Test; 28 29 import net.walend.toolkit.junit.TestCase; 30 31 import net.walend.somnifugi.SomniJNDIBypass; 32 import net.walend.somnifugi.SomniTopicConnectionFactory; 33 import net.walend.somnifugi.SomniMessageSelector; 34 import net.walend.somnifugi.SomniTopicSession; 35 import net.walend.somnifugi.SomniTopicSubscriber; 36 import net.walend.somnifugi.SomniProperties; 37 38 import net.walend.somnifugi.channel.ChannelFactory; 39 40 import net.walend.somnifugi.juc.PriorityChannelFactory; 41 import net.walend.somnifugi.juc.SimpleChannelFactory; 42 43 48 49 public class TopicTest extends TestCase 50 { 51 public List <Message > sent = new ArrayList <Message >(); 53 54 public TopicTest(String testName) 55 { 56 super(testName); 57 } 58 59 protected class TenObjectPublisher 60 implements Runnable 61 { 62 private TopicConnection connection; 63 private Topic topic; 64 65 public TenObjectPublisher(TopicConnection connection,Topic topic) 66 { 67 this.connection = connection; 68 this.topic = topic; 69 } 70 71 public void run() 72 { 73 try 74 { 75 sent.clear(); 77 78 Thread.sleep(10); 79 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 80 TopicPublisher publisher = session.createPublisher(topic); 81 for(int i=0;i<10;i++) 82 { 83 Message message = session.createObjectMessage(new Integer (i)); 84 85 sent.add(message); 86 87 publisher.publish(message); 88 } 89 } 90 catch(InterruptedException ie) 91 { 92 fail(ie); 93 } 94 catch(JMSException jmse) 95 { 96 fail(jmse); 97 } 98 } 99 } 100 101 protected class TenObjectSubscriber 102 implements Runnable 103 { 104 private TopicConnection connection; 105 private Topic topic; 106 private List <Object > received = new ArrayList <Object >(); 107 private final Object guard = new Object (); 108 private boolean durable = false; 109 private String copyMode = SomniProperties.DEEPCOPY; 110 private boolean ready = false; 111 private final Object readyGuard = new Object (); 112 113 public TenObjectSubscriber(TopicConnection connection,Topic topic) 114 { 115 this.connection = connection; 116 this.topic = topic; 117 } 118 119 public TenObjectSubscriber(TopicConnection connection,Topic topic,String copyMode) 120 { 121 this.connection = connection; 122 this.topic = topic; 123 this.copyMode = copyMode; 124 } 125 126 public TenObjectSubscriber(TopicConnection connection,Topic topic,boolean durable) 127 { 128 this.connection = connection; 129 this.topic = topic; 130 this.durable = durable; 131 } 132 133 public void test() 134 { 135 try 136 { 137 synchronized(guard) 138 { 139 if(copyMode==SomniProperties.NOCOPY) 140 { 141 assertTrue("Expected results to be "+sent+", but got "+received,received.equals(sent)); 142 } 143 else if(copyMode==SomniProperties.SHALLOWCOPY) 144 { 145 assertTrue("received and sent should be the same size but received is "+received.size()+" and sent is "+sent.size()+".",received.size()==sent.size()); 146 assertTrue("Shallow copied messages should be different objects.",!received.equals(sent)); 147 148 for(int i=0;i<sent.size();i++) 149 { 150 ObjectMessage sentMessage = (ObjectMessage )sent.get(i); 151 ObjectMessage receivedMessage = (ObjectMessage )received.get(i); 152 153 assertTrue("sentMessage.getObject() should be receivedMessage.getObject().",sentMessage.getObject()==receivedMessage.getObject()); 154 } 155 } 156 else if(copyMode==SomniProperties.DEEPCOPY) 157 { 158 assertTrue("received and sent should be the same size but received is "+received.size()+" and sent is "+sent.size()+".",received.size()==sent.size()); 159 assertTrue("Deep copied messages should be different objects.",!received.equals(sent)); 160 for(int i=0;i<sent.size();i++) 161 { 162 ObjectMessage sentMessage = (ObjectMessage )sent.get(i); 163 ObjectMessage receivedMessage = (ObjectMessage )received.get(i); 164 165 assertTrue("sentMessage.getObject() should have the same location in memory as receivedMessage.getObject().",sentMessage.getObject()!=receivedMessage.getObject()); 166 assertTrue("sentMessage.getObject() should be equal to receivedMessage.getObject().",sentMessage.getObject().equals(receivedMessage.getObject())); 167 } 168 } 169 170 else if(copyMode.equals("separate process")) 172 { 173 assertTrue("received size should be 10 but received.size() is "+received.size(),received.size()==10); 174 assertTrue("Deep copied messages should be different objects.",!received.equals(sent)); 175 177 } 178 } 179 } 180 catch(JMSException jmse) 181 { 182 jmse.printStackTrace(); 183 fail(jmse.getMessage()); 184 } 185 } 186 187 public void run() 188 { 189 190 synchronized(guard) 191 { 192 try 193 { 194 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 195 TopicSubscriber subscriber; 196 if(durable) 197 { 198 subscriber = session.createDurableSubscriber(topic,"durableTestSub"); 199 } 200 else 201 { 202 subscriber = session.createSubscriber(topic); 203 } 204 205 synchronized(readyGuard) 206 { 207 ready = true; 208 readyGuard.notifyAll(); 209 } 210 211 for(int i=0;i<10;i++) 212 { 213 ObjectMessage message = (ObjectMessage )subscriber.receive(20000); 214 assertTrue("Message not received after 20000 ms at "+new Date (),message!=null); 215 216 received.add(message); 217 } 218 assertTrue("The subscriber should have 0 but has "+((SomniTopicSubscriber)subscriber).guessSize()+" pending messages",((SomniTopicSubscriber)subscriber).guessSize()==0); 219 } 220 catch(JMSException jmse) 221 { 222 fail(jmse); 223 } 224 } 225 } 226 227 public void waitForReady() 228 throws InterruptedException 229 { 230 synchronized(readyGuard) 231 { 232 if(!ready) 233 { 234 readyGuard.wait(); 235 } 236 } 237 } 238 } 239 240 protected class NoObjectSubscriber 241 implements Runnable 242 { 243 private TopicConnection connection; 244 private Topic topic; 245 private List <Object > received = new ArrayList <Object >(); 246 private final Object guard = new Object (); 247 private boolean durable = false; 248 private boolean noLocal = true; 249 250 public NoObjectSubscriber(TopicConnection connection,Topic topic) 251 { 252 this.connection = connection; 253 this.topic = topic; 254 } 255 256 public NoObjectSubscriber(TopicConnection connection,Topic topic,String copyMode) 257 { 258 this.connection = connection; 259 this.topic = topic; 260 } 261 262 public NoObjectSubscriber(TopicConnection connection,Topic topic,boolean durable) 263 { 264 this.connection = connection; 265 this.topic = topic; 266 this.durable = durable; 267 } 268 269 public NoObjectSubscriber(TopicConnection connection,Topic topic,boolean durable,boolean noLocal) 270 { 271 this.connection = connection; 272 this.topic = topic; 273 this.durable = durable; 274 this.noLocal = noLocal; 275 } 276 277 public void test() 278 { 279 synchronized(guard) 280 { 281 assertTrue("received should be empty but is "+received,received.isEmpty()); 282 } 283 } 284 285 public void run() 286 { 287 synchronized(guard) 288 { 289 try 290 { 291 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 292 TopicSubscriber subscriber; 293 if(durable) 294 { 295 subscriber = session.createDurableSubscriber(topic,"durableTestSub","",noLocal); 296 } 297 else 298 { 299 subscriber = session.createSubscriber(topic,"",noLocal); 300 } 301 302 for(int i=0;i<10;i++) 303 { 304 ObjectMessage message = (ObjectMessage )subscriber.receive(200); 305 assertTrue("Message received within 200 ms.",message==null); 306 if(message!=null) 307 { 308 received.add(message); 309 } 310 } 311 } 312 catch(JMSException jmse) 313 { 314 fail(jmse); 315 } 316 } 317 } 318 } 319 320 public void testTwoThreads() 321 { 322 try 323 { 324 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 325 326 TestExceptionListener exceptionListener = new TestExceptionListener(); 327 connection.setExceptionListener(exceptionListener); 328 329 connection.start(); 330 331 Topic topic = SomniJNDIBypass.IT.getTopic("twoThreadTest"); 332 TenObjectSubscriber tos = new TenObjectSubscriber(connection,topic); 333 334 335 Thread subscribeThread = new Thread (tos); 336 subscribeThread.start(); 337 338 tos.waitForReady(); 340 341 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 342 publishThread.start(); 343 344 publishThread.join(10000); 345 subscribeThread.join(10000); 346 347 tos.test(); 348 349 connection.close(); 350 connection.close(); 352 353 if(exceptionListener.caughtException()) 354 { 355 fail(exceptionListener.getException()); 356 } 357 } 358 catch(JMSException jmse) 359 { 360 fail(jmse); 361 } 362 catch(InterruptedException ie) 363 { 364 fail(ie); 365 } 366 } 367 368 public void testTwoSubscribers() 369 { 370 try 371 { 372 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 373 374 TestExceptionListener exceptionListener = new TestExceptionListener(); 375 connection.setExceptionListener(exceptionListener); 376 377 connection.start(); 378 379 Topic topic = SomniJNDIBypass.IT.getTopic("twoThreadTest"); 380 TenObjectSubscriber tos1 = new TenObjectSubscriber(connection,topic); 381 TenObjectSubscriber tos2 = new TenObjectSubscriber(connection,topic); 382 383 384 Thread subscribeThread1 = new Thread (tos1); 385 subscribeThread1.start(); 386 387 Thread.sleep(10); 388 389 Thread subscribeThread2 = new Thread (tos2); 390 subscribeThread2.start(); 391 392 Thread.sleep(10); 393 394 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 395 publishThread.start(); 396 397 publishThread.join(10000); 398 subscribeThread1.join(10000); 399 subscribeThread2.join(10000); 400 401 tos1.test(); 402 tos2.test(); 403 404 connection.close(); 405 406 if(exceptionListener.caughtException()) 407 { 408 fail(exceptionListener.getException()); 409 } 410 } 411 catch(JMSException jmse) 412 { 413 fail(jmse); 414 } 415 catch(InterruptedException ie) 416 { 417 fail(ie); 418 } 419 } 420 421 protected class TestMessageListener 422 implements MessageListener 423 { 424 private List <Object > results = new ArrayList <Object >(); 425 private final Object guard = new Object (); 426 427 protected TestMessageListener() 428 { 429 430 } 431 432 public void onMessage(Message message) 433 { 434 ObjectMessage om = (ObjectMessage )message; 435 436 try 437 { 438 Thread.sleep(10); 439 synchronized(guard) 440 { 441 results.add(om.getObject()); 442 } 443 } 444 catch(JMSException jmse) 445 { 446 throw new RuntimeException (jmse); 447 } 448 catch(InterruptedException ie) 449 { 450 Thread.currentThread().interrupt(); 451 } 452 } 453 454 public void check() 455 { 456 check(getExpected()); 457 } 458 459 public void check(List expected) 460 { 461 synchronized(guard) 462 { 463 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 464 } 465 } 466 467 public List getResults() 468 { 469 return results; 470 } 471 472 public List <Object > getExpected() 473 { 474 List <Object > expected = new ArrayList <Object >(); 475 for(int i=0;i<10;i++) 476 { 477 expected.add(new Integer (i)); 478 } 479 return expected; 480 } 481 482 } 483 484 public void testMessageListener() 485 { 486 try 487 { 488 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 489 490 TestExceptionListener exceptionListener = new TestExceptionListener(); 491 connection.setExceptionListener(exceptionListener); 492 493 connection.start(); 494 495 Topic topic = SomniJNDIBypass.IT.getTopic("twoThreadTest"); 496 497 TestMessageListener messageListener = new TestMessageListener(); 498 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 499 TopicSubscriber subscriber = session.createSubscriber(topic); 500 subscriber.setMessageListener(messageListener); 501 502 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 503 publishThread.start(); 504 505 publishThread.join(10000); 506 Thread.sleep(1000); 507 508 connection.close(); 509 510 messageListener.check(); 511 512 if(exceptionListener.caughtException()) 513 { 514 fail(exceptionListener.getException()); 515 } 516 } 517 catch(JMSException jmse) 518 { 519 fail(jmse); 520 } 521 catch(InterruptedException ie) 522 { 523 fail(ie); 524 } 525 } 526 527 public void testTwoMessageListenersInTwoSessions() 528 { 529 try 530 { 531 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 532 533 TestExceptionListener exceptionListener = new TestExceptionListener(); 534 connection.setExceptionListener(exceptionListener); 535 536 connection.start(); 537 538 Topic topic = SomniJNDIBypass.IT.getTopic("testTwoMessageListenersInTwoThreads"); 539 540 TestMessageListener messageListener1 = new TestMessageListener(); 541 TopicSession session1 = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 542 TopicSubscriber subscriber1 = session1.createSubscriber(topic); 543 subscriber1.setMessageListener(messageListener1); 544 545 TestMessageListener messageListener2 = new TestMessageListener(); 546 TopicSession session2 = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 547 TopicSubscriber subscriber2 = session2.createSubscriber(topic); 548 subscriber2.setMessageListener(messageListener2); 549 550 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 551 publishThread.start(); 552 553 publishThread.join(10000); 554 Thread.sleep(2000); 555 556 connection.close(); 557 558 messageListener1.check(); 559 messageListener2.check(); 560 561 if(exceptionListener.caughtException()) 562 { 563 fail(exceptionListener.getException()); 564 } 565 } 566 catch(JMSException jmse) 567 { 568 fail(jmse); 569 } 570 catch(InterruptedException ie) 571 { 572 fail(ie); 573 } 574 } 575 576 public void testTwoMessageListenersInOneSession() 577 { 578 try 579 { 580 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 581 582 TestExceptionListener exceptionListener = new TestExceptionListener(); 583 connection.setExceptionListener(exceptionListener); 584 585 connection.start(); 586 587 Topic topic = SomniJNDIBypass.IT.getTopic("twoThreadTest"); 588 589 TestMessageListener messageListener1 = new TestMessageListener(); 590 TopicSession session1 = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 591 TopicSubscriber subscriber1 = session1.createSubscriber(topic); 592 subscriber1.setMessageListener(messageListener1); 593 594 Thread.sleep(10); 595 596 TestMessageListener messageListener2 = new TestMessageListener(); 597 TopicSubscriber subscriber2 = session1.createSubscriber(topic); 598 subscriber2.setMessageListener(messageListener2); 599 600 Thread.sleep(10); 601 602 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 603 publishThread.start(); 604 605 publishThread.join(10000); 606 Thread.sleep(1000); 607 608 connection.close(); 609 610 messageListener1.check(); 611 messageListener2.check(); 612 613 if(exceptionListener.caughtException()) 614 { 615 fail(exceptionListener.getException()); 616 } 617 } 618 catch(JMSException jmse) 619 { 620 fail(jmse); 621 } 622 catch(InterruptedException ie) 623 { 624 fail(ie); 625 } 626 } 627 628 public void testJNDI() 629 { 630 try 631 { 632 Hashtable <String ,Object > env = getEnvironment(); 634 635 env.put(Context.INITIAL_CONTEXT_FACTORY,"com.sun.jndi.fscontext.RefFSContextFactory"); 636 env.put(Context.PROVIDER_URL,"file://"+System.getProperty("jndifs")); 637 638 Context ctx = new InitialContext (env); 640 641 Topic topicJNDI = SomniJNDIBypass.IT.getTopic("jndiTopicTest"); 643 TopicConnection connectionJNDI = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 644 645 ctx.rebind(topicJNDI.getTopicName(),topicJNDI); 647 ctx.rebind("jndiTopicConnection",connectionJNDI); 648 649 Topic topic = (Topic )ctx.lookup("jndiTopicTest"); 651 TopicConnection connection = (TopicConnection )ctx.lookup("jndiTopicConnection"); 652 653 ctx.close(); 654 655 TestExceptionListener exceptionListener = new TestExceptionListener(); 656 connection.setExceptionListener(exceptionListener); 657 658 connection.start(); 659 660 TenObjectSubscriber tos = new TenObjectSubscriber(connection,topic); 661 662 Thread subscribeThread = new Thread (tos); 663 subscribeThread.start(); 664 665 Thread.sleep(10); 666 667 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 668 publishThread.start(); 669 670 publishThread.join(10000); 671 subscribeThread.join(10000); 672 673 tos.test(); 674 675 connection.close(); 676 677 if(exceptionListener.caughtException()) 678 { 679 fail(exceptionListener.getException()); 680 } 681 } 682 catch(JMSException jmse) 683 { 684 fail(jmse); 685 } 686 catch(NamingException ioe) 687 { 688 fail(ioe); 689 } 690 catch(InterruptedException ie) 691 { 692 fail(ie); 693 } 694 } 695 696 public void testStopStart() 697 { 698 try 699 { 700 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 701 702 TestExceptionListener exceptionListener = new TestExceptionListener(); 703 connection.setExceptionListener(exceptionListener); 704 705 connection.start(); 706 707 Topic topic = SomniJNDIBypass.IT.getTopic("twoThreadTest"); 708 TenObjectSubscriber tos = new TenObjectSubscriber(connection,topic); 709 710 711 Thread subscribeThread = new Thread (tos); 712 subscribeThread.start(); 713 714 Thread.sleep(10); 715 716 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 717 publishThread.start(); 718 719 connection.stop(); 720 721 Thread.sleep(10); 722 723 connection.start(); 724 725 publishThread.join(10000); 726 subscribeThread.join(10000); 727 728 tos.test(); 729 730 connection.close(); 731 if(exceptionListener.caughtException()) 732 { 733 fail(exceptionListener.getException()); 734 } 735 } 736 catch(JMSException jmse) 737 { 738 fail(jmse); 739 } 740 catch(InterruptedException ie) 741 { 742 fail(ie); 743 } 744 } 745 746 public void testDurable() 747 { 748 try 749 { 750 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 751 752 TestExceptionListener exceptionListener = new TestExceptionListener(); 753 connection.setExceptionListener(exceptionListener); 754 755 connection.start(); 756 757 Topic topic = SomniJNDIBypass.IT.getTopic("durableTest"); 758 TenObjectSubscriber tos1 = new TenObjectSubscriber(connection,topic,true); 759 760 Thread subscribeThread1 = new Thread (tos1); 761 subscribeThread1.start(); 762 763 Thread.sleep(10); 764 765 Thread publishThread1 = new Thread (new TenObjectPublisher(connection,topic)); 766 publishThread1.start(); 767 768 publishThread1.join(10000); 769 subscribeThread1.join(10000); 770 771 tos1.test(); 772 773 Thread publishThread2 = new Thread (new TenObjectPublisher(connection,topic)); 774 publishThread2.start(); 775 776 publishThread2.join(10000); 777 778 TenObjectSubscriber tos2 = new TenObjectSubscriber(connection,topic,true); 779 780 Thread subscribeThread2 = new Thread (tos2); 781 subscribeThread2.start(); 782 subscribeThread2.join(10000); 783 784 tos2.test(); 785 786 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 787 session.unsubscribe("durableTestSub"); 788 789 connection.close(); 790 if(exceptionListener.caughtException()) 791 { 792 fail(exceptionListener.getException()); 793 } 794 } 795 catch(JMSException jmse) 796 { 797 fail(jmse); 798 } 799 catch(InterruptedException ie) 800 { 801 fail(ie); 802 } 803 } 804 805 private class Replier 806 implements MessageListener 807 { 808 809 protected Replier() 810 {} 811 812 public void onMessage(Message message) 813 { 814 ObjectMessage om = (ObjectMessage )message; 815 816 try 817 { 818 assertTrue(om.getObject().equals("request")); 819 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 820 connection.start(); 821 822 Topic topic = (Topic )om.getJMSReplyTo(); 823 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 824 TopicPublisher publisher = session.createPublisher(topic); 825 Message reply = session.createObjectMessage("reply"); 826 reply.setJMSCorrelationID(om.getJMSMessageID()); 827 828 publisher.publish(reply); 829 830 connection.close(); 831 } 832 catch(JMSException jmse) 833 { 834 throw new RuntimeException (jmse); 835 } 836 } 837 } 838 839 private void startReplier() 840 throws JMSException 841 { 842 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 843 844 Topic topic = SomniJNDIBypass.IT.getTopic("requestTopic"); 845 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 846 TopicSubscriber subscriber = session.createSubscriber(topic); 847 subscriber.setMessageListener(new Replier()); 848 849 connection.start(); 850 } 851 852 public void testRequestReply() 853 { 854 try 855 { 856 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 857 858 TestExceptionListener exceptionListener = new TestExceptionListener(); 859 connection.setExceptionListener(exceptionListener); 860 861 connection.start(); 862 863 Topic topic = SomniJNDIBypass.IT.getTopic("requestTopic"); 864 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 865 866 startReplier(); 867 868 TopicRequestor requestor = new TopicRequestor (session,topic); 869 Message request = session.createObjectMessage("request"); 870 ObjectMessage reply = (ObjectMessage )requestor.request(request); 871 872 assertTrue(reply.getObject().equals("reply")); 873 874 requestor.close(); 875 connection.close(); 876 if(exceptionListener.caughtException()) 877 { 878 fail(exceptionListener.getException()); 879 } 880 } 881 catch(JMSException jmse) 882 { 883 fail(jmse); 884 } 885 } 886 887 private class TestMessageSelector 888 implements SomniMessageSelector 889 { 890 private String propName; 891 892 public TestMessageSelector(String propName) 893 { 894 this.propName = propName; 895 } 896 897 public boolean matches(Message message) 898 { 899 try 900 { 901 return message.getBooleanProperty(propName); 902 } 903 catch(JMSException jmsException) 904 { 905 throw new RuntimeException (jmsException); 906 } 907 } 908 } 909 910 protected class SelectorVictemPublisher 911 implements Runnable 912 { 913 private TopicConnection connection; 914 private Topic topic; 915 916 public SelectorVictemPublisher(TopicConnection connection,Topic topic) 917 { 918 this.connection = connection; 919 this.topic = topic; 920 } 921 922 public void run() 923 { 924 try 925 { 926 Thread.sleep(10); 927 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 928 TopicPublisher publisher = session.createPublisher(topic); 929 for(int i=0;i<13;i++) 930 { 931 Message message = session.createObjectMessage(new Integer (i)); 932 if(i%2==0) 933 { 934 message.setBooleanProperty("a",true); 935 } 936 if(i%3==0) 937 { 938 message.setBooleanProperty("b",true); 939 } 940 941 publisher.publish(message); 942 } 943 } 944 catch(InterruptedException ie) 945 { 946 fail(ie); 947 } 948 catch(JMSException jmse) 949 { 950 fail(jmse); 951 } 952 } 953 } 954 955 public void testMessageSelectors() 956 { 957 try 958 { 959 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 960 961 TestExceptionListener exceptionListener = new TestExceptionListener(); 962 connection.setExceptionListener(exceptionListener); 963 964 connection.start(); 965 966 Topic topic = SomniJNDIBypass.IT.getTopic("testMessageSelectors"); 967 968 TestMessageListener messageListener1 = new TestMessageListener(); 969 SomniTopicSession session1 = (SomniTopicSession)connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 970 TopicSubscriber subscriber1 = session1.createSubscriber(topic,new TestMessageSelector("a")); 971 subscriber1.setMessageListener(messageListener1); 972 973 TestMessageListener messageListener2 = new TestMessageListener(); 974 SomniTopicSession session2 = (SomniTopicSession)connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 975 TopicSubscriber subscriber2 = session2.createSubscriber(topic,new TestMessageSelector("b")); 976 subscriber2.setMessageListener(messageListener2); 977 978 Thread publishThread = new Thread (new SelectorVictemPublisher(connection,topic)); 979 publishThread.start(); 980 981 publishThread.join(10000); 982 Thread.sleep(2000); 983 984 connection.close(); 985 986 List <Object > aList = new ArrayList <Object >(7); 987 List <Object > bList = new ArrayList <Object >(7); 988 989 for(int i=0;i<13;i++) 990 { 991 if(i%2==0) 992 { 993 aList.add(new Integer (i)); 994 } 995 if(i%3==0) 996 { 997 bList.add(new Integer (i)); 998 } 999 } 1000 messageListener1.check(aList); 1001 messageListener2.check(bList); 1002 1003 if(exceptionListener.caughtException()) 1004 { 1005 fail(exceptionListener.getException()); 1006 } 1007 } 1008 catch(JMSException jmse) 1009 { 1010 fail(jmse); 1011 } 1012 catch(InterruptedException ie) 1013 { 1014 fail(ie); 1015 } 1016 } 1017 1018 protected class SQL92SelectorVictemPublisher 1019 implements Runnable 1020 { 1021 private TopicConnection connection; 1022 private Topic topic; 1023 1024 public SQL92SelectorVictemPublisher(TopicConnection connection,Topic topic) 1025 { 1026 this.connection = connection; 1027 this.topic = topic; 1028 } 1029 1030 public void run() 1031 { 1032 try 1033 { 1034 Thread.sleep(10); 1035 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 1036 TopicPublisher publisher = session.createPublisher(topic); 1037 for(int i=0;i<13;i++) 1038 { 1039 Message message = session.createObjectMessage(new Integer (i)); 1040 if(i%2==0) 1041 { 1042 message.setStringProperty("color","red"); 1043 } 1044 if(i%3==0) 1045 { 1046 message.setStringProperty("flavor","blue"); 1047 } 1048 if(i%13==0) 1049 { 1050 message.setJMSPriority(2); 1051 } 1052 1053 publisher.publish(message); 1054 } 1055 } 1056 catch(InterruptedException ie) 1057 { 1058 fail(ie); 1059 } 1060 catch(JMSException jmse) 1061 { 1062 fail(jmse); 1063 } 1064 } 1065 } 1066 1067 public void testSQL92MessageSelectors() 1068 { 1069 try 1070 { 1071 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 1072 1073 TestExceptionListener exceptionListener = new TestExceptionListener(); 1074 connection.setExceptionListener(exceptionListener); 1075 1076 connection.start(); 1077 1078 Topic topic = SomniJNDIBypass.IT.getTopic("testSQL92MessageSelectors"); 1079 1080 TestMessageListener messageListener1 = new TestMessageListener(); 1081 SomniTopicSession session1 = (SomniTopicSession)connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 1082 TopicSubscriber subscriber1 = session1.createSubscriber(topic,"color = 'red'",false); 1083 subscriber1.setMessageListener(messageListener1); 1084 1085 TestMessageListener messageListener2 = new TestMessageListener(); 1086 SomniTopicSession session2 = (SomniTopicSession)connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 1087 TopicSubscriber subscriber2 = session2.createSubscriber(topic,"flavor = 'blue'",false); 1088 subscriber2.setMessageListener(messageListener2); 1089 1090 TestMessageListener messageListener3 = new TestMessageListener(); 1091 SomniTopicSession session3 = (SomniTopicSession)connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 1092 TopicSubscriber subscriber3 = session3.createSubscriber(topic,"JMSPriority > 0",false); 1093 subscriber3.setMessageListener(messageListener3); 1094 1095 Thread publishThread = new Thread (new SQL92SelectorVictemPublisher(connection,topic)); 1096 publishThread.start(); 1097 1098 publishThread.join(10000); 1099 Thread.sleep(2000); 1100 1101 connection.close(); 1102 1103 List <Object > redList = new ArrayList <Object >(7); 1104 List <Object > blueList = new ArrayList <Object >(7); 1105 List <Object > sevenList = new ArrayList <Object >(7); 1106 1107 for(int i=0;i<13;i++) 1108 { 1109 if(i%2==0) 1110 { 1111 redList.add(new Integer (i)); 1112 } 1113 if(i%3==0) 1114 { 1115 blueList.add(new Integer (i)); 1116 } 1117 if(i%13==0) 1118 { 1119 sevenList.add(new Integer (i)); 1120 } 1121 } 1122 messageListener1.check(redList); 1123 messageListener2.check(blueList); 1124 messageListener3.check(sevenList); 1125 1126 if(exceptionListener.caughtException()) 1127 { 1128 fail(exceptionListener.getException()); 1129 } 1130 } 1131 catch(JMSException jmse) 1132 { 1133 fail(jmse); 1134 } 1135 catch(InterruptedException ie) 1136 { 1137 fail(ie); 1138 } 1139 } 1140 1141 public void testTwoThreadsNoCopy() 1142 { 1143 try 1144 { 1145 SomniJNDIBypass.IT.getTopicContext().getEnvironment().put("testTwoThreadsNoCopy."+SomniProperties.COPYMODE,SomniProperties.NOCOPY); 1146 1147 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 1148 1149 TestExceptionListener exceptionListener = new TestExceptionListener(); 1150 connection.setExceptionListener(exceptionListener); 1151 1152 connection.start(); 1153 1154 Topic topic = SomniJNDIBypass.IT.getTopic("testTwoThreadsNoCopy"); 1155 TenObjectSubscriber tos = new TenObjectSubscriber(connection,topic,SomniProperties.NOCOPY); 1156 1157 Thread subscribeThread = new Thread (tos); 1158 subscribeThread.start(); 1159 1160 Thread.sleep(10); 1161 1162 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 1163 publishThread.start(); 1164 1165 publishThread.join(10000); 1166 subscribeThread.join(10000); 1167 1168 tos.test(); 1169 1170 connection.close(); 1171 1172 if(exceptionListener.caughtException()) 1173 { 1174 fail(exceptionListener.getException()); 1175 } 1176 } 1177 catch(JMSException jmse) 1178 { 1179 fail(jmse); 1180 } 1181 catch(InterruptedException ie) 1182 { 1183 fail(ie); 1184 } 1185 catch(NamingException ne) 1186 { 1187 fail(ne); 1188 } 1189 } 1190 1191 public void testTwoThreadsShallowCopy() 1192 { 1193 try 1194 { 1195 SomniJNDIBypass.IT.getTopicContext().getEnvironment().put("testTwoThreadsShallowCopy."+SomniProperties.COPYMODE,SomniProperties.SHALLOWCOPY); 1196 1197 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 1198 1199 TestExceptionListener exceptionListener = new TestExceptionListener(); 1200 connection.setExceptionListener(exceptionListener); 1201 1202 connection.start(); 1203 1204 Topic topic = SomniJNDIBypass.IT.getTopic("testTwoThreadsShallowCopy"); 1205 TenObjectSubscriber tos = new TenObjectSubscriber(connection,topic,SomniProperties.SHALLOWCOPY); 1206 1207 Thread subscribeThread = new Thread (tos); 1208 subscribeThread.start(); 1209 1210 Thread.sleep(10); 1211 1212 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 1213 publishThread.start(); 1214 1215 publishThread.join(10000); 1216 subscribeThread.join(10000); 1217 1218 tos.test(); 1219 1220 connection.close(); 1221 1222 if(exceptionListener.caughtException()) 1223 { 1224 fail(exceptionListener.getException()); 1225 } 1226 } 1227 catch(JMSException jmse) 1228 { 1229 fail(jmse); 1230 } 1231 catch(InterruptedException ie) 1232 { 1233 fail(ie); 1234 } 1235 catch(NamingException ne) 1236 { 1237 fail(ne); 1238 } 1239 } 1240 1241 1242 public void testTwoThreadsDeepCopy() 1243 { 1244 try 1245 { 1246 SomniJNDIBypass.IT.getTopicContext().getEnvironment().put("testTwoThreadsDeepCopy."+SomniProperties.COPYMODE,SomniProperties.DEEPCOPY); 1247 1248 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 1249 1250 TestExceptionListener exceptionListener = new TestExceptionListener(); 1251 connection.setExceptionListener(exceptionListener); 1252 1253 connection.start(); 1254 1255 Topic topic = SomniJNDIBypass.IT.getTopic("testTwoThreadsDeepCopy"); 1256 TenObjectSubscriber tos = new TenObjectSubscriber(connection,topic,SomniProperties.DEEPCOPY); 1257 1258 Thread subscribeThread = new Thread (tos); 1259 subscribeThread.start(); 1260 1261 Thread.sleep(10); 1262 1263 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 1264 publishThread.start(); 1265 1266 publishThread.join(10000); 1267 subscribeThread.join(10000); 1268 1269 tos.test(); 1270 1271 connection.close(); 1272 1273 if(exceptionListener.caughtException()) 1274 { 1275 fail(exceptionListener.getException()); 1276 } 1277 } 1278 catch(JMSException jmse) 1279 { 1280 fail(jmse); 1281 } 1282 catch(InterruptedException ie) 1283 { 1284 fail(ie); 1285 } 1286 catch(NamingException ne) 1287 { 1288 fail(ne); 1289 } 1290 } 1291 1292 public void testNoLocalNoMessagesWhenLocal() 1293 { 1294 try 1295 { 1296 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 1297 1298 TestExceptionListener exceptionListener = new TestExceptionListener(); 1299 connection.setExceptionListener(exceptionListener); 1300 1301 connection.start(); 1302 1303 Topic topic = SomniJNDIBypass.IT.getTopic("testNoLocalNoMessagesWhenLocal"); 1304 NoObjectSubscriber nos = new NoObjectSubscriber(connection,topic); 1305 1306 Thread subscribeThread = new Thread (nos); 1307 subscribeThread.start(); 1308 1309 Thread.sleep(10); 1310 1311 Thread publishThread = new Thread (new TenObjectPublisher(connection,topic)); 1312 publishThread.start(); 1313 1314 publishThread.join(10000); 1315 subscribeThread.join(10000); 1316 1317 nos.test(); 1318 1319 connection.close(); 1320 1321 if(exceptionListener.caughtException()) 1322 { 1323 fail(exceptionListener.getException()); 1324 } 1325 } 1326 catch(JMSException jmse) 1327 { 1328 fail(jmse); 1329 } 1330 catch(InterruptedException ie) 1331 { 1332 fail(ie); 1333 } 1334 } 1335 1336 protected static final String SIMPLECHANNELFACTORYCLASS = "simpleChannelFactoryClass"; 1337 protected static final String PRIORITYCHANNELFACTORYCLASS = "priorityChannelFactoryClass"; 1338 1339 protected Hashtable <String ,Object > getEnvironment() 1341 { 1342 Hashtable <String ,Object > env = new Hashtable <String ,Object >(); 1343 1344 env.put(SIMPLECHANNELFACTORYCLASS,SimpleChannelFactory.class.getName()); 1345 env.put(PRIORITYCHANNELFACTORYCLASS,PriorityChannelFactory.class.getName()); 1346 env.put(SomniProperties.DEFAULTTOPICCHANNELFACTORYCLASSNAME,SimpleChannelFactory.class.getName()); 1347 1348 return env; 1349 } 1350 1351 protected void setUp() 1352 { 1353 System.getProperties().putAll(getEnvironment()); 1354 } 1356 1357 1358 public static Test suite() 1359 { 1360 TestSuite suite = new TestSuite() ; 1361 1362 suite.addTest(new TopicTest("testTwoThreads")); 1363 suite.addTest(new TopicTest("testTwoSubscribers")); 1364 suite.addTest(new TopicTest("testMessageListener")); 1365 suite.addTest(new TopicTest("testTwoMessageListenersInTwoSessions")); 1366 suite.addTest(new TopicTest("testTwoMessageListenersInOneSession")); 1367 suite.addTest(new TopicTest("testJNDI")); 1368 suite.addTest(new TopicTest("testStopStart")); 1369 suite.addTest(new TopicTest("testDurable")); 1370 suite.addTest(new TopicTest("testRequestReply")); 1371 suite.addTest(new TopicTest("testMessageSelectors")); 1372 suite.addTest(new TopicTest("testSQL92MessageSelectors")); 1373 suite.addTest(new TopicTest("testTwoThreadsNoCopy")); 1374 suite.addTest(new TopicTest("testTwoThreadsShallowCopy")); 1375 suite.addTest(new TopicTest("testTwoThreadsDeepCopy")); 1376 suite.addTest(new TopicTest("testNoLocalNoMessagesWhenLocal")); 1377 1378 return suite; 1379 } 1380} 1381 1382 1383 1405 1406 1407 | Popular Tags |