1 package net.walend.somnifugi.test; 2 3 import java.util.List ; 4 import java.util.ArrayList ; 5 import java.util.Set ; 6 import java.util.HashSet ; 7 import java.util.Iterator ; 8 import java.util.Hashtable ; 9 import java.util.Properties ; 10 import java.util.Enumeration ; 11 12 import javax.naming.InitialContext ; 13 import javax.naming.Context ; 14 import javax.naming.NamingException ; 15 16 import javax.jms.QueueConnectionFactory ; 17 import javax.jms.QueueConnection ; 18 import javax.jms.Queue ; 19 import javax.jms.QueueSession ; 20 import javax.jms.QueueSender ; 21 import javax.jms.Message ; 22 import javax.jms.ObjectMessage ; 23 import javax.jms.QueueReceiver ; 24 import javax.jms.Session ; 25 import javax.jms.JMSException ; 26 import javax.jms.MessageListener ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.QueueRequestor ; 29 import javax.jms.QueueBrowser ; 30 31 import junit.framework.TestSuite; 32 import junit.framework.Test; 33 34 import net.walend.toolkit.junit.TestCase; 35 36 import net.walend.somnifugi.SomniJNDIBypass; 37 import net.walend.somnifugi.SomniQueueConnectionFactory; 38 import net.walend.somnifugi.TimeoutChannelFactory; 39 import net.walend.somnifugi.SomniProperties; 40 import net.walend.somnifugi.SomniQueueRequestor; 41 import net.walend.somnifugi.SomniQueueReceiver; 42 import net.walend.somnifugi.SomniConnection; 43 44 import net.walend.somnifugi.channel.ChannelFactory; 45 46 import net.walend.somnifugi.juc.PriorityChannelFactory; 47 import net.walend.somnifugi.juc.MessageSelectingPriorityChannelFactory; 48 import net.walend.somnifugi.juc.SimpleChannelFactory; 49 50 56 57 public class QueueTest extends TestCase 58 { 59 private List <Message > sent = new ArrayList <Message >(); 60 private List <Message > received = new ArrayList <Message >(); 61 62 public QueueTest(String testName) 63 { 64 super(testName); 65 } 66 67 protected void sendObjects(QueueConnection connection,Queue queue,int howMany) 68 { 69 sent.clear(); 70 try 71 { 72 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 73 QueueSender sender = session.createSender(queue); 74 for(int i=0;i<howMany;i++) 75 { 76 Integer object = new Integer (i); 77 78 Message message = session.createObjectMessage(object); 79 80 sent.add(message); 81 82 sender.send(message); 83 } 84 } 85 catch(JMSException jmse) 86 { 87 fail(jmse); 88 } 89 } 90 91 protected void sendPriorityObjects(QueueConnection connection,Queue queue,int howMany) 92 { 93 sent.clear(); 94 try 95 { 96 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 97 QueueSender sender = session.createSender(queue); 98 for(int i=0;i<howMany;i++) 99 { 100 Integer object = new Integer (i); 101 102 Message message = session.createObjectMessage(object); 103 message.setJMSPriority(i); 104 105 sent.add(message); 106 107 sender.send(message); 108 } 109 } 110 catch(JMSException jmse) 111 { 112 fail(jmse); 113 } 114 } 115 116 protected void sendObjectsToExpire(QueueConnection connection,Queue queue,long time,int howMany) 117 { 118 sent.clear(); 119 try 120 { 121 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 122 QueueSender sender = session.createSender(queue); 123 for(int i=0;i<howMany;i++) 124 { 125 Message message = session.createObjectMessage(new Integer (i)); 126 127 sent.add(message); 128 129 sender.send(message,DeliveryMode.NON_PERSISTENT,0,time); 130 } 131 } 132 catch(JMSException jmse) 133 { 134 fail(jmse); 135 } 136 } 137 138 protected void receiveObjects(QueueConnection connection,Queue queue,int howMany) 139 { 140 receiveObjects(connection,queue,howMany,SomniProperties.DEEPCOPY); 141 } 142 143 protected void receiveObjects(QueueConnection connection,Queue queue,int howMany,String copyType) 144 { 145 received.clear(); 146 try 147 { 148 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 149 QueueReceiver receiver = session.createReceiver(queue); 150 151 assertTrue("The receiver should have 10 but has "+((SomniQueueReceiver)receiver).guessSize()+" pending messages",((SomniQueueReceiver)receiver).guessSize()==10); 152 153 for(int i=0;i<howMany;i++) 154 { 155 ObjectMessage message = (ObjectMessage )receiver.receive(100); 156 157 received.add(message); 158 } 159 160 assertTrue("The receiver should be empty but has "+((SomniQueueReceiver)receiver).guessSize()+" pending messages",((SomniQueueReceiver)receiver).guessSize()==0); 161 162 if(copyType.equals(SomniProperties.NOCOPY)) 163 { 164 assertTrue("Expected results to be "+sent+", but got "+received,received.equals(sent)); 165 } 166 else if(copyType.equals(SomniProperties.SHALLOWCOPY)) 167 { 168 assertTrue("Shallow copied messages should be different objects.",!received.equals(sent)); 169 assertTrue("received and sent should be the same size.",received.size()==sent.size()); 170 171 for(int i=0;i<sent.size();i++) 172 { 173 ObjectMessage sentMessage = (ObjectMessage )sent.get(i); 174 ObjectMessage receivedMessage = (ObjectMessage )received.get(i); 175 176 assertTrue("sentMessage.getObject() should be receivedMessage.getObject().",sentMessage.getObject()==receivedMessage.getObject()); 177 } 178 } 179 else if(copyType.equals(SomniProperties.DEEPCOPY)) 180 { 181 assertTrue("Deep copied messages should be different objects.",!received.equals(sent)); 182 assertTrue("received and sent should be the same size.",received.size()==sent.size()); 183 184 for(int i=0;i<sent.size();i++) 185 { 186 ObjectMessage sentMessage = (ObjectMessage )sent.get(i); 187 ObjectMessage receivedMessage = (ObjectMessage )received.get(i); 188 189 assertNotNull("receivedMessage should not be null.",receivedMessage); 190 assertTrue("sentMessage.getObject() should have the same location in memory as receivedMessage.getObject().",sentMessage.getObject()!=receivedMessage.getObject()); 191 assertTrue("sentMessage.getObject() should be equal to receivedMessage.getObject(), but "+sentMessage.getObject()+" does not equal "+receivedMessage.getObject(), sentMessage.getObject().equals(receivedMessage.getObject())); 192 } 193 } 194 } 195 catch(JMSException jmse) 196 { 197 fail(jmse); 198 } 199 } 200 201 private ObjectMessage findMessageInSet(int i,Set <ObjectMessage > received) 202 throws JMSException 203 { 204 for(ObjectMessage message : received) 205 { 206 Integer messageContents = (Integer )message.getObject(); 207 if(messageContents == i) 208 { 209 return message; 210 } 211 } 212 fail("Message not found for "+i+" in "+received); 213 return null; 214 } 215 216 217 protected void receivePriorityObjects(QueueConnection connection,Queue queue,int howMany) 218 { 219 String copyType = SomniProperties.DEEPCOPY; 220 221 Set <ObjectMessage > received = new HashSet <ObjectMessage >(); 222 223 try 224 { 225 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 226 QueueReceiver receiver = session.createReceiver(queue); 227 228 for(int i=0;i<howMany;i++) 229 { 230 ObjectMessage message = (ObjectMessage )receiver.receive(100); 231 232 received.add(message); 233 } 234 235 if(copyType.equals(SomniProperties.NOCOPY)) 236 { 237 assertTrue("Expected results to be "+sent+", but got "+received,received.equals(sent)); 238 } 239 else if(copyType.equals(SomniProperties.SHALLOWCOPY)) 240 { 241 assertTrue("Shallow copied messages should be different objects.",!received.equals(sent)); 242 assertTrue("received and sent should be the same size.",received.size()==sent.size()); 243 244 for(int i=0;i<sent.size();i++) 245 { 246 ObjectMessage sentMessage = (ObjectMessage )sent.get(i); 247 ObjectMessage receivedMessage = (ObjectMessage )findMessageInSet(i,received); 248 249 assertTrue("sentMessage.getObject() should be receivedMessage.getObject().",sentMessage.getObject()==receivedMessage.getObject()); 250 } 251 } 252 else if(copyType.equals(SomniProperties.DEEPCOPY)) 253 { 254 assertTrue("Deep copied messages should be different objects.",!received.equals(sent)); 255 assertTrue("received and sent should be the same size.",received.size()==sent.size()); 256 257 for(int i=0;i<sent.size();i++) 258 { 259 ObjectMessage sentMessage = (ObjectMessage )sent.get(i); 260 ObjectMessage receivedMessage = (ObjectMessage )findMessageInSet(i,received); 261 262 assertTrue("sentMessage.getObject() should have the same location in memory as receivedMessage.getObject().",sentMessage.getObject()!=receivedMessage.getObject()); 263 assertTrue("sentMessage.getObject() should be equal to receivedMessage.getObject(), but "+sentMessage.getObject()+" does not equal "+receivedMessage.getObject(), sentMessage.getObject().equals(receivedMessage.getObject())); 264 } 265 } 266 } 267 catch(JMSException jmse) 268 { 269 fail(jmse); 270 } 271 } 272 273 274 protected void receiveNoObjects(QueueConnection connection,Queue queue) 275 { 276 try 277 { 278 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 279 QueueReceiver receiver = session.createReceiver(queue); 280 281 for(int i=0;i<10;i++) 282 { 283 ObjectMessage message = (ObjectMessage )receiver.receive(10); 284 285 assertTrue("Message should be null but is "+message,message==null); 286 } 287 } 288 catch(JMSException jmse) 289 { 290 fail(jmse); 291 } 292 ((SomniConnection)connection).createSessionReport(); 294 } 295 296 297 public void testHotStart() 298 { 299 try 300 { 301 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 302 303 TestExceptionListener exceptionListener = new TestExceptionListener(); 304 connection.setExceptionListener(exceptionListener); 305 306 connection.start(); 307 308 Queue queue = SomniJNDIBypass.IT.getQueue("hotStartTest"); 309 310 sendObjects(connection,queue,10); 311 312 receiveObjects(connection,queue,10); 313 314 receiveNoObjects(connection,queue); 315 316 connection.close(); 317 connection.close(); 319 320 if(exceptionListener.caughtException()) 321 { 322 fail(exceptionListener.getException()); 323 } 324 } 325 catch(JMSException jmse) 326 { 327 fail(jmse); 328 } 329 } 330 331 public void testExpire() 332 { 333 try 334 { 335 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 336 337 TestExceptionListener exceptionListener = new TestExceptionListener(); 338 connection.setExceptionListener(exceptionListener); 339 340 connection.start(); 341 342 Queue queue = SomniJNDIBypass.IT.getQueue("expireTest"); 343 344 sendObjectsToExpire(connection,queue,1000,10); 345 receiveObjects(connection,queue,10); 346 347 sendObjectsToExpire(connection,queue,1,10); 348 Thread.sleep(10); 349 receiveNoObjects(connection,queue); 350 351 connection.close(); 352 353 if(exceptionListener.caughtException()) 354 { 355 fail(exceptionListener.getException()); 356 } 357 } 358 catch(JMSException jmse) 359 { 360 fail(jmse); 361 } 362 catch(InterruptedException ie) 363 { 364 fail(ie); 365 } 366 } 367 368 369 protected class ObjectSender 370 implements Runnable 371 { 372 private QueueConnection connection; 373 private Queue queue; 374 private int howMany; 375 376 public ObjectSender(QueueConnection connection,Queue queue,int howMany) 377 { 378 this.connection = connection; 379 this.queue = queue; 380 this.howMany = howMany; 381 } 382 383 public void run() 384 { 385 try 386 { 387 Thread.sleep(10); 388 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 389 QueueSender sender = session.createSender(queue); 390 for(int i=0;i<howMany;i++) 391 { 392 Message message = session.createObjectMessage(new Integer (i)); 393 sender.send(message); 394 } 395 } 396 catch(InterruptedException ie) 397 { 398 fail(ie); 399 } 400 catch(JMSException jmse) 401 { 402 fail(jmse); 403 } 404 } 405 } 406 407 protected class ObjectReceiver 408 implements Runnable 409 { 410 private QueueConnection connection; 411 private Queue queue; 412 private int howMany; 413 414 public ObjectReceiver(QueueConnection connection,Queue queue,int howMany) 415 { 416 this.connection = connection; 417 this.queue = queue; 418 this.howMany = howMany; 419 } 420 421 public void run() 422 { 423 try 424 { 425 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 426 QueueReceiver receiver = session.createReceiver(queue); 427 List <Object > results = new ArrayList <Object >(); 428 List <Object > expected = new ArrayList <Object >(); 429 430 for(int i=0;i<howMany;i++) 431 { 432 expected.add(new Integer (i)); 433 ObjectMessage message = (ObjectMessage )receiver.receive(100); 434 435 results.add(message.getObject()); 436 } 437 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 438 } 439 catch(JMSException jmse) 440 { 441 fail(jmse); 442 } 443 catch(RuntimeException re) 444 { 445 fail(re); 446 } 447 } 448 } 449 450 451 452 public void testTwoThreads() 453 { 454 try 455 { 456 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 457 458 TestExceptionListener exceptionListener = new TestExceptionListener(); 459 connection.setExceptionListener(exceptionListener); 460 461 connection.start(); 462 463 Queue queue = SomniJNDIBypass.IT.getQueue("twoThreadTest"); 464 465 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 466 sendThread.start(); 467 468 Thread receiveThread = new Thread (new ObjectReceiver(connection,queue,10)); 469 receiveThread.start(); 470 471 sendThread.join(10000); 472 receiveThread.join(10000); 473 474 receiveNoObjects(connection,queue); 475 476 connection.close(); 477 478 if(exceptionListener.caughtException()) 479 { 480 fail(exceptionListener.getException()); 481 } 482 } 483 catch(JMSException jmse) 484 { 485 fail(jmse); 486 } 487 catch(InterruptedException ie) 488 { 489 fail(ie); 490 } 491 } 492 493 protected class ObjectReceiveNoWaiter 494 implements Runnable 495 { 496 private QueueConnection connection; 497 private Queue queue; 498 private int howMany; 499 500 public ObjectReceiveNoWaiter(QueueConnection connection,Queue queue,int howMany) 501 { 502 this.connection = connection; 503 this.queue = queue; 504 this.howMany = howMany; 505 } 506 507 public void run() 508 { 509 try 510 { 511 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 512 QueueReceiver receiver = session.createReceiver(queue); 513 List <Object > results = new ArrayList <Object >(); 514 List <Object > expected = new ArrayList <Object >(); 515 516 for(int i=0;i<howMany;i++) 517 { 518 expected.add(new Integer (i)); 519 } 520 while(results.size()<howMany) 521 { 522 ObjectMessage message = (ObjectMessage )receiver.receiveNoWait(); 523 if(message!=null) 524 { 525 results.add(message.getObject()); 526 } 527 else 528 { 529 Thread.sleep(100); 530 } 531 } 532 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 533 Message nullMessage = receiver.receiveNoWait(); 534 535 assertTrue("nullMessage should be null but is "+nullMessage,nullMessage==null); 536 537 } 538 catch(JMSException jmse) 539 { 540 fail(jmse); 541 } 542 catch(InterruptedException ie) 543 { 544 fail(ie); 545 } 546 catch(RuntimeException re) 547 { 548 fail(re); 549 } 550 } 551 } 552 553 public void testNoWait() 554 { 555 try 556 { 557 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 558 559 TestExceptionListener exceptionListener = new TestExceptionListener(); 560 connection.setExceptionListener(exceptionListener); 561 562 connection.start(); 563 564 Queue queue = SomniJNDIBypass.IT.getQueue("testNoWait"); 565 566 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 567 sendThread.start(); 568 569 sendThread.join(5000); 570 571 Thread receiveThread = new Thread (new ObjectReceiveNoWaiter(connection,queue,10)); 572 receiveThread.start(); 573 574 receiveThread.join(5000); 575 576 receiveNoObjects(connection,queue); 577 578 connection.close(); 579 580 if(exceptionListener.caughtException()) 581 { 582 fail(exceptionListener.getException()); 583 } 584 } 585 catch(JMSException jmse) 586 { 587 fail(jmse); 588 } 589 catch(InterruptedException ie) 590 { 591 fail(ie); 592 } 593 } 594 595 596 protected class TestMessageListener 597 implements MessageListener 598 { 599 private List <Object > results = new ArrayList <Object >(); 600 private final Object guard = new Object (); 601 602 protected TestMessageListener() 603 { 604 605 } 606 607 public void onMessage(Message message) 608 { 609 ObjectMessage om = (ObjectMessage )message; 610 611 try 612 { 613 Thread.sleep(10); 614 synchronized(guard) 615 { 616 results.add(om.getObject()); 617 } 618 } 619 catch(JMSException jmse) 620 { 621 throw new RuntimeException (jmse); 622 } 623 catch(InterruptedException ie) 624 { 625 Thread.currentThread().interrupt(); 626 } 627 } 628 629 public void check() 630 { 631 List expected = getExpected(); 632 synchronized(guard) 633 { 634 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 635 } 636 } 637 638 public List <Object > getResults() 639 { 640 return results; 641 } 642 643 public List <Object > getExpected() 644 { 645 List <Object > expected = new ArrayList <Object >(); 646 for(int i=0;i<10;i++) 647 { 648 expected.add(new Integer (i)); 649 } 650 return expected; 651 } 652 653 } 654 655 public void testMessageListener() 656 { 657 try 658 { 659 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 660 661 TestExceptionListener exceptionListener = new TestExceptionListener(); 662 connection.setExceptionListener(exceptionListener); 663 664 connection.start(); 665 666 Queue queue = SomniJNDIBypass.IT.getQueue("twoThreadTest"); 667 668 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 669 sendThread.start(); 670 671 TestMessageListener messageListener = new TestMessageListener(); 672 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 673 QueueReceiver receiver = session.createReceiver(queue); 674 receiver.setMessageListener(messageListener); 675 676 sendThread.join(10000); 677 678 Thread.sleep(1000); 679 680 receiveNoObjects(connection,queue); 681 682 connection.close(); 683 684 messageListener.check(); 685 686 if(exceptionListener.caughtException()) 687 { 688 fail(exceptionListener.getException()); 689 } 690 691 } 692 catch(JMSException jmse) 693 { 694 fail(jmse); 695 } 696 catch(InterruptedException ie) 697 { 698 fail(ie); 699 } 700 } 701 702 protected class ClosingMessageListener 703 implements MessageListener 704 { 705 private QueueSession session; 706 707 public ClosingMessageListener(QueueSession session) 708 { 709 this.session = session; 710 } 711 712 public void onMessage(Message message) 713 { 714 try 715 { 716 session.close(); 717 } 718 catch(JMSException jmse) 719 { 720 jmse.printStackTrace(); 721 fail(jmse.getMessage()); 722 } 723 } 724 } 725 726 public void testClosingMessageListener() 727 { 728 try 729 { 730 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 731 732 TestExceptionListener exceptionListener = new TestExceptionListener(); 733 connection.setExceptionListener(exceptionListener); 734 735 connection.start(); 736 737 Queue queue = SomniJNDIBypass.IT.getQueue("testClosingMessageListener"); 738 739 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 740 sendThread.start(); 741 742 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 743 ClosingMessageListener messageListener = new ClosingMessageListener(session); 744 QueueReceiver receiver = session.createReceiver(queue); 745 receiver.setMessageListener(messageListener); 746 747 sendThread.join(10000); 748 749 Thread.sleep(1000); 750 751 connection.close(); 752 753 if(exceptionListener.caughtException()) 754 { 755 fail(exceptionListener.getException()); 756 } 757 758 } 759 catch(JMSException jmse) 760 { 761 fail(jmse); 762 } 763 catch(InterruptedException ie) 764 { 765 fail(ie); 766 } 767 } 768 769 public void testTwoMessageListenersInTwoSessions() 770 { 771 try 772 { 773 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 774 connection.start(); 775 776 Queue queue = SomniJNDIBypass.IT.getQueue("testTwoMessageListenersInTwoSessions"); 777 778 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 779 sendThread.start(); 780 781 TestMessageListener messageListener1 = new TestMessageListener(); 782 QueueSession session1 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 783 QueueReceiver receiver1 = session1.createReceiver(queue); 784 receiver1.setMessageListener(messageListener1); 785 786 TestMessageListener messageListener2 = new TestMessageListener(); 787 QueueSession session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 788 QueueReceiver receiver2 = session2.createReceiver(queue); 789 receiver2.setMessageListener(messageListener2); 790 791 sendThread.join(10000); 792 Thread.sleep(1000); 793 794 receiveNoObjects(connection,queue); 795 796 connection.close(); 797 798 Set <Object > results = new HashSet <Object >(); 799 Iterator it1 = messageListener1.getResults().iterator(); 800 while(it1.hasNext()) 801 { 802 Object ob = it1.next(); 803 assertTrue("results already contained "+ob,results.add(ob)); 804 } 805 Iterator it2 = messageListener2.getResults().iterator(); 806 while(it2.hasNext()) 807 { 808 Object ob = it2.next(); 809 assertTrue("results already contained "+ob,results.add(ob)); 810 } 811 812 Set <Object > expected = new HashSet <Object >(messageListener1.getExpected()); 813 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 814 815 } 816 catch(JMSException jmse) 817 { 818 fail(jmse); 819 } 820 catch(InterruptedException ie) 821 { 822 fail(ie); 823 } 824 } 825 826 public void testTwoMessageListenersInOneSession() 827 { 828 try 829 { 830 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 831 832 TestExceptionListener exceptionListener = new TestExceptionListener(); 833 connection.setExceptionListener(exceptionListener); 834 835 connection.start(); 836 837 Queue queue = SomniJNDIBypass.IT.getQueue("twoThreadTest"); 838 839 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 840 sendThread.start(); 841 842 TestMessageListener messageListener1 = new TestMessageListener(); 843 QueueSession session1 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 844 QueueReceiver receiver1 = session1.createReceiver(queue); 845 receiver1.setMessageListener(messageListener1); 846 847 TestMessageListener messageListener2 = new TestMessageListener(); 848 QueueReceiver receiver2 = session1.createReceiver(queue); 849 receiver2.setMessageListener(messageListener2); 850 851 sendThread.join(10000); 852 Thread.sleep(1000); 853 854 receiveNoObjects(connection,queue); 855 856 connection.close(); 857 858 Set <Object > results = new HashSet <Object >(); 859 Iterator it1 = messageListener1.getResults().iterator(); 860 while(it1.hasNext()) 861 { 862 Object ob = it1.next(); 863 assertTrue("results already contained "+ob,results.add(ob)); 864 } 865 Iterator it2 = messageListener2.getResults().iterator(); 866 while(it2.hasNext()) 867 { 868 Object ob = it2.next(); 869 assertTrue("results already contained "+ob,results.add(ob)); 870 } 871 872 Set <Object > expected = new HashSet <Object >(messageListener1.getExpected()); 873 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 874 875 if(exceptionListener.caughtException()) 876 { 877 fail(exceptionListener.getException()); 878 } 879 } 880 catch(JMSException jmse) 881 { 882 fail(jmse); 883 } 884 catch(InterruptedException ie) 885 { 886 fail(ie); 887 } 888 } 889 890 public void testJNDI() 891 { 892 try 893 { 894 Hashtable <String ,Object > env = getEnvironment(); 896 897 env.put(Context.INITIAL_CONTEXT_FACTORY,"com.sun.jndi.fscontext.RefFSContextFactory"); 898 env.put(Context.PROVIDER_URL,"file://"+System.getProperty("jndifs")); 899 900 Context ctx = new InitialContext (env); 902 903 Queue queueJNDI = SomniJNDIBypass.IT.getQueue("jndiQueueTest"); 905 QueueConnection connectionJNDI = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 906 907 ctx.rebind(queueJNDI.getQueueName(),queueJNDI); 909 ctx.rebind("jndiQueueConnection",connectionJNDI); 910 911 Queue queue = (Queue )ctx.lookup("jndiQueueTest"); 913 QueueConnection connection = (QueueConnection )ctx.lookup("jndiQueueConnection"); 914 915 TestExceptionListener exceptionListener = new TestExceptionListener(); 916 connection.setExceptionListener(exceptionListener); 917 918 ctx.close(); 919 920 connection.start(); 921 922 sendObjects(connection,queue,10); 923 924 receiveObjects(connection,queue,10); 925 receiveNoObjects(connection,queue); 926 927 connection.close(); 928 929 if(exceptionListener.caughtException()) 930 { 931 fail(exceptionListener.getException()); 932 } 933 } 934 catch(JMSException jmse) 935 { 936 fail(jmse); 937 } 938 catch(NamingException ioe) 939 { 940 fail(ioe); 941 } 942 } 943 944 public void testMinimalJNDI() 945 { 946 try 947 { 948 Hashtable <String ,Object > env = getEnvironment(); 950 951 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 952 env.put(Context.PROVIDER_URL,"<not-used>"); 953 954 Context ctx = new InitialContext (env); 956 957 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 959 Queue queue = (Queue )ctx.lookup("testMinimalJNDI"); 960 961 TestExceptionListener exceptionListener = new TestExceptionListener(); 962 connection.setExceptionListener(exceptionListener); 963 964 ctx.close(); 965 966 connection.start(); 967 968 sendObjects(connection,queue,10); 969 970 receiveObjects(connection,queue,10); 971 receiveNoObjects(connection,queue); 972 973 connection.close(); 974 975 if(exceptionListener.caughtException()) 976 { 977 fail(exceptionListener.getException()); 978 } 979 } 980 catch(JMSException jmse) 981 { 982 fail(jmse); 983 } 984 catch(NamingException ioe) 985 { 986 fail(ioe); 987 } 988 } 989 990 991 public void testStopStart() 992 { 993 try 994 { 995 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 996 997 TestExceptionListener exceptionListener = new TestExceptionListener(); 998 connection.setExceptionListener(exceptionListener); 999 1000 connection.start(); 1001 1002 Queue queue = SomniJNDIBypass.IT.getQueue("twoThreadTest"); 1003 1004 Thread sendThread = new Thread (new ObjectSender(connection,queue,10)); 1005 sendThread.start(); 1006 1007 Thread receiveThread = new Thread (new ObjectReceiver(connection,queue,10)); 1008 receiveThread.start(); 1009 1010 connection.stop(); 1011 1012 Thread.sleep(10); 1013 1014 connection.start(); 1015 1016 sendThread.join(10000); 1017 receiveThread.join(10000); 1018 receiveNoObjects(connection,queue); 1019 1020 connection.close(); 1021 1022 if(exceptionListener.caughtException()) 1023 { 1024 fail(exceptionListener.getException()); 1025 } 1026 } 1027 catch(JMSException jmse) 1028 { 1029 fail(jmse); 1030 } 1031 catch(InterruptedException ie) 1032 { 1033 fail(ie); 1034 } 1035 } 1036 1037 private class Replier 1038 implements MessageListener 1039 { 1040 1041 protected Replier() 1042 {} 1043 1044 public void onMessage(Message message) 1045 { 1046 ObjectMessage om = (ObjectMessage )message; 1047 1048 try 1049 { 1050 assertTrue(om.getObject().equals("request")); 1051 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1052 connection.start(); 1053 1054 Queue queue = (Queue )om.getJMSReplyTo(); 1055 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1056 QueueSender sender = session.createSender(queue); 1057 Message reply = session.createObjectMessage("reply"); 1058 reply.setJMSCorrelationID(om.getJMSMessageID()); 1059 1060 sender.send(reply); 1061 1062 connection.close(); 1063 } 1064 catch(JMSException jmse) 1065 { 1066 throw new RuntimeException (jmse); 1067 } 1068 } 1069 } 1070 1071 private void startReplier(String queueName) 1072 throws JMSException 1073 { 1074 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1075 1076 Queue queue = SomniJNDIBypass.IT.getQueue(queueName); 1077 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1078 QueueReceiver receiver = session.createReceiver(queue); 1079 receiver.setMessageListener(new Replier()); 1080 1081 connection.start(); 1082 } 1083 1084 public void testRequestReply() 1085 { 1086 try 1087 { 1088 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1089 connection.start(); 1090 1091 TestExceptionListener exceptionListener = new TestExceptionListener(); 1092 connection.setExceptionListener(exceptionListener); 1093 1094 String requestQueueName = "requestQueue"; 1095 Queue queue = SomniJNDIBypass.IT.getQueue(requestQueueName); 1096 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1097 1098 startReplier(requestQueueName); 1099 1100 QueueRequestor requestor = new QueueRequestor (session,queue); 1101 Message request = session.createObjectMessage("request"); 1102 ObjectMessage reply = (ObjectMessage )requestor.request(request); 1103 1104 assertTrue(reply.getObject().equals("reply")); 1105 1106 receiveNoObjects(connection,queue); 1107 1108 requestor.close(); 1109 connection.close(); 1110 1111 if(exceptionListener.caughtException()) 1112 { 1113 fail(exceptionListener.getException()); 1114 } 1115 } 1116 catch(JMSException jmse) 1117 { 1118 fail(jmse); 1119 } 1120 } 1121 1122 public void testSomniRequestReply() 1123 { 1124 try 1125 { 1126 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1127 connection.start(); 1128 1129 TestExceptionListener exceptionListener = new TestExceptionListener(); 1130 connection.setExceptionListener(exceptionListener); 1131 1132 String requestQueueName = "requestQueue"; 1133 Queue queue = SomniJNDIBypass.IT.getQueue(requestQueueName); 1134 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1135 1136 startReplier(requestQueueName); 1137 1138 SomniQueueRequestor requestor = new SomniQueueRequestor(session,queue); 1139 Message request = session.createObjectMessage("request"); 1140 ObjectMessage reply = (ObjectMessage )requestor.request(request); 1141 1142 assertTrue(reply.getObject().equals("reply")); 1143 1144 receiveNoObjects(connection,queue); 1145 1146 requestor.close(); 1147 connection.close(); 1148 1149 if(exceptionListener.caughtException()) 1150 { 1151 fail(exceptionListener.getException()); 1152 } 1153 } 1154 catch(JMSException jmse) 1155 { 1156 fail(jmse); 1157 } 1158 } 1159 1160 1162 public void testSomniRequestReplyWithTimeout() 1163 { 1164 try 1165 { 1166 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1167 connection.start(); 1168 1169 TestExceptionListener exceptionListener = new TestExceptionListener(); 1170 connection.setExceptionListener(exceptionListener); 1171 1172 String requestQueueName = "requestQueue"; 1173 Queue queue = SomniJNDIBypass.IT.getQueue(requestQueueName); 1174 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1175 1176 startReplier(requestQueueName); 1177 1178 SomniQueueRequestor requestor = new SomniQueueRequestor(session,queue); 1179 Message request = session.createObjectMessage("request"); 1180 ObjectMessage reply = (ObjectMessage )requestor.request(request,1000); 1181 1182 assertTrue(reply.getObject().equals("reply")); 1183 1184 receiveNoObjects(connection,queue); 1185 1186 requestor.close(); 1187 connection.close(); 1188 1189 if(exceptionListener.caughtException()) 1190 { 1191 fail(exceptionListener.getException()); 1192 } 1193 } 1194 catch(JMSException jmse) 1195 { 1196 fail(jmse); 1197 } 1198 } 1199 1200 private class Forwarder 1201 implements MessageListener 1202 { 1203 1204 private String forwardQueueName; 1205 1206 protected Forwarder(String forwardQueueName) 1207 { 1208 this.forwardQueueName = forwardQueueName; 1209 } 1210 1211 public void onMessage(Message message) 1212 { 1213 try 1214 { 1215 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1216 connection.start(); 1217 1218 Queue queue = SomniJNDIBypass.IT.getQueue(forwardQueueName); 1219 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1220 QueueSender sender = session.createSender(queue); 1221 1222 sender.send(message); 1223 1224 connection.close(); 1225 } 1226 catch(JMSException jmse) 1227 { 1228 jmse.printStackTrace(); 1229 throw new RuntimeException (jmse); 1230 } 1231 } 1232 } 1233 1234 private void startForwarder(String requestQueueName,String forwardQueueName) 1235 throws JMSException 1236 { 1237 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1238 1239 Queue queue = SomniJNDIBypass.IT.getQueue(requestQueueName); 1240 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1241 QueueReceiver receiver = session.createReceiver(queue); 1242 receiver.setMessageListener(new Forwarder(forwardQueueName)); 1243 1244 connection.start(); 1245 } 1246 1247 1248 1249 public void testRequestForwardReply() 1250 { 1251 try 1252 { 1253 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1254 connection.start(); 1255 1256 TestExceptionListener exceptionListener = new TestExceptionListener(); 1257 connection.setExceptionListener(exceptionListener); 1258 1259 String requestQueueName = "testRequestForwardReply-RequestQueue"; 1260 String forwardQueueName = "testRequestForwardReply-ForwardQueue"; 1261 1262 startReplier(forwardQueueName); 1263 startForwarder(requestQueueName,forwardQueueName); 1264 1265 Queue queue = SomniJNDIBypass.IT.getQueue(requestQueueName); 1266 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1267 1268 QueueRequestor requestor = new QueueRequestor (session,queue); 1269 Message request = session.createObjectMessage("request"); 1270 ObjectMessage reply = (ObjectMessage )requestor.request(request); 1271 1272 assertTrue(reply.getObject().equals("reply")); 1273 1274 receiveNoObjects(connection,queue); 1275 1276 requestor.close(); 1277 connection.close(); 1278 1279 if(exceptionListener.caughtException()) 1280 { 1281 fail(exceptionListener.getException()); 1282 } 1283 } 1284 catch(JMSException jmse) 1285 { 1286 fail(jmse); 1287 } 1288 } 1289 1290 private static class DeadlockMessageListener 1291 implements MessageListener 1292 { 1293 private int i = 0; 1294 private long delay; 1295 1296 protected DeadlockMessageListener(long delay) 1297 { 1298 this.delay = delay; 1299 } 1300 1301 public void onMessage(Message message) 1302 { 1303 ObjectMessage om = (ObjectMessage )message; 1304 i++; 1305 1310 try 1311 { 1312 Thread.sleep(delay); 1313 } 1314 catch(InterruptedException ie) 1315 { 1316 Thread.currentThread().interrupt(); 1317 } 1318 } 1319 } 1320 1321 private static class DeadlockObjectSender 1322 implements Runnable 1323 { 1324 private int delay; 1325 private QueueSession session; 1326 private QueueSender sender; 1327 private Thread runningThread = null; 1328 private Exception exception = null; 1329 1330 public DeadlockObjectSender(QueueSession session, QueueSender sender,int delay) 1331 { 1332 this.sender = sender; 1333 this.session = session; 1334 this.delay = delay; 1335 } 1336 1337 public void run() 1338 { 1339 try 1340 { 1341 runningThread = Thread.currentThread(); 1342 1343 int i = 0; 1344 while(!runningThread.isInterrupted()) 1345 { 1346 byte[] bytes = new byte[10000]; 1347 i++; 1348 1353 Message message = session.createObjectMessage(bytes); 1354 sender.send(message); 1355 if ( delay > 0 ) 1356 { 1357 Thread.currentThread().sleep(delay); 1358 } 1359 } 1360 } 1361 catch(InterruptedException ie) 1362 { 1363 } 1365 catch(JMSException jmse) 1366 { 1367 exception = jmse; 1368 } 1369 } 1370 1371 public void stop() 1372 { 1373 runningThread.interrupt(); 1374 } 1375 1376 public Exception getException() 1377 { 1378 return exception; 1379 } 1380 } 1381 1382 public void testMessageListenerClosingDeadlock() 1383 { 1384 try 1385 { 1386 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1387 connection.start(); 1388 1389 TestExceptionListener exceptionListener = new TestExceptionListener(); 1390 connection.setExceptionListener(exceptionListener); 1391 1392 Queue queue = SomniJNDIBypass.IT.getQueue("testMessageListenerClosingDeadlock"); 1393 1394 DeadlockMessageListener messageListener = new DeadlockMessageListener(100); 1395 QueueSession session1 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1396 QueueReceiver receiver = session1.createReceiver(queue); 1397 receiver.setMessageListener(messageListener); 1398 1399 1400 QueueSession session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1401 QueueSender sender = session2.createSender(queue); 1402 DeadlockObjectSender senderHelper = new DeadlockObjectSender(session2,sender,10); 1403 Thread sendThread = new Thread (senderHelper); 1404 sendThread.start(); 1405 1406 Thread.currentThread().sleep(2000); 1407 senderHelper.stop(); 1408 sendThread.join(2000); 1409 if ( senderHelper.getException() != null ) 1410 { 1411 fail(senderHelper.getException()); 1412 } 1413 1414 sender.close(); 1415 receiver.close(); 1416 session1.close(); 1417 session2.close(); 1418 connection.close(); 1419 } 1420 catch(JMSException jmse) 1421 { 1422 fail(jmse); 1423 } 1424 catch(InterruptedException ie) 1425 { 1426 fail(ie); 1427 } 1428 } 1429 1430 public void testSimpleChannelFactory() 1431 { 1432 try 1433 { 1434 Hashtable <String ,Object > env = getEnvironment(); 1436 1437 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 1438 env.put(Context.PROVIDER_URL,"<not-used>"); 1439 env.put(SomniProperties.DEFAULT+"."+SomniProperties.CHANNELFACTORYCLASSNAMEPROP,env.get(SIMPLECHANNELFACTORYCLASS)); 1440 env.put(SimpleChannelFactory.DEFAULTCAPACITYPROP,"20"); 1441 1442 Context ctx = new InitialContext (env); 1444 1445 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 1447 Queue queue = (Queue )ctx.lookup("testSimpleChannelFactory"); 1448 1449 TestExceptionListener exceptionListener = new TestExceptionListener(); 1450 connection.setExceptionListener(exceptionListener); 1451 1452 ctx.close(); 1453 1454 connection.start(); 1455 1456 sendObjects(connection,queue,10); 1457 1458 receiveObjects(connection,queue,10); 1459 receiveNoObjects(connection,queue); 1460 1461 connection.close(); 1462 1463 if(exceptionListener.caughtException()) 1464 { 1465 fail(exceptionListener.getException()); 1466 } 1467 } 1468 catch(JMSException jmse) 1469 { 1470 fail(jmse); 1471 } 1472 catch(NamingException ioe) 1473 { 1474 fail(ioe); 1475 } 1476 } 1477 1478 public void testPriorityChannelFactory() 1479 { 1480 try 1481 { 1482 Hashtable <String ,Object > env = getEnvironment(); 1484 1485 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 1486 env.put(Context.PROVIDER_URL,"<not-used>"); 1487 env.put(SomniProperties.DEFAULT+"."+SomniProperties.CHANNELFACTORYCLASSNAMEPROP,env.get(PRIORITYCHANNELFACTORYCLASS)); 1488 1489 Context ctx = new InitialContext (env); 1491 1492 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 1494 Queue queue = (Queue )ctx.lookup("testPriorityChannelFactory"); 1495 1496 TestExceptionListener exceptionListener = new TestExceptionListener(); 1497 connection.setExceptionListener(exceptionListener); 1498 1499 ctx.close(); 1500 1501 connection.start(); 1502 1503 sendObjects(connection,queue,10); 1504 1505 receiveObjects(connection,queue,10); 1506 receiveNoObjects(connection,queue); 1507 1508 sendPriorityObjects(connection,queue,10); 1509 receivePriorityObjects(connection,queue,10); 1510 receiveNoObjects(connection,queue); 1511 1512 connection.close(); 1513 1514 if(exceptionListener.caughtException()) 1515 { 1516 fail(exceptionListener.getException()); 1517 } 1518 } 1519 catch(JMSException jmse) 1520 { 1521 fail(jmse); 1522 } 1523 catch(NamingException ioe) 1524 { 1525 fail(ioe); 1526 } 1527 } 1528 1529 public void testMessageSelectingPriorityChannelFactory() 1530 { 1531 try 1532 { 1533 Hashtable <String ,Object > env = getEnvironment(); 1535 1536 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 1537 env.put(Context.PROVIDER_URL,"<not-used>"); 1538 env.put(SomniProperties.DEFAULT+"."+SomniProperties.CHANNELFACTORYCLASSNAMEPROP,env.get(MESSAGESELECTINGPRIORITYCHANNELFACTORYCLASS)); 1539 1540 Context ctx = new InitialContext (env); 1542 1543 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 1545 Queue queue = (Queue )ctx.lookup("testMessageSelectingPriorityChannelFactory"); 1546 1547 TestExceptionListener exceptionListener = new TestExceptionListener(); 1548 connection.setExceptionListener(exceptionListener); 1549 1550 ctx.close(); 1551 1552 connection.start(); 1553 1554 sendObjects(connection,queue,10); 1555 1556 receiveObjects(connection,queue,10); 1557 receiveNoObjects(connection,queue); 1558 1559 sendPriorityObjects(connection,queue,10); 1560 receivePriorityObjects(connection,queue,10); 1561 receiveNoObjects(connection,queue); 1562 1563 connection.close(); 1564 1565 if(exceptionListener.caughtException()) 1566 { 1567 fail(exceptionListener.getException()); 1568 } 1569 } 1570 catch(JMSException jmse) 1571 { 1572 fail(jmse); 1573 } 1574 catch(NamingException ioe) 1575 { 1576 fail(ioe); 1577 } 1578 } 1579 1580 private void sendObjectsWithBooleanProperties(QueueConnection connection,Queue queue,int howMany) 1581 { 1582 sent.clear(); 1583 try 1584 { 1585 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1586 QueueSender sender = session.createSender(queue); 1587 for(int i=0;i<howMany;i++) 1588 { 1589 Integer object = new Integer (i); 1590 1591 Message message = session.createObjectMessage(object); 1592 if(i%2==0) 1593 { 1594 message.setBooleanProperty("a",true); 1595 } 1596 if(i%3==0) 1597 { 1598 message.setBooleanProperty("b",true); 1599 } 1600 1601 sent.add(message); 1602 1603 sender.send(message); 1604 } 1605 } 1606 catch(JMSException jmse) 1607 { 1608 fail(jmse); 1609 } 1610 } 1611 1612 protected void receiveObjectsWithProperty(QueueConnection connection,Queue queue,int howMany,String propName) 1613 { 1614 Set <ObjectMessage > received = new HashSet <ObjectMessage >(); 1615 1616 try 1617 { 1618 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1619 QueueReceiver receiver = session.createReceiver(queue,propName+" = 'true'"); 1620 1621 for(int i=0;i<howMany;i++) 1622 { 1623 ObjectMessage message = (ObjectMessage )receiver.receive(100); 1624 1625 assertNotNull(message); 1626 1627 received.add(message); 1628 assertTrue("message should have "+propName+" be true, but "+propName+" is "+message.getBooleanProperty(propName),message.getBooleanProperty(propName)); 1629 } 1630 1631 assertTrue("Expected "+howMany+" but only got ",received.size()==howMany); 1632 } 1633 catch(JMSException jmse) 1634 { 1635 fail(jmse); 1636 } 1637 } 1638 1639 protected void receiveObjectsCount(QueueConnection connection,Queue queue,int howMany) 1640 { 1641 Set <ObjectMessage > received = new HashSet <ObjectMessage >(); 1642 1643 try 1644 { 1645 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1646 QueueReceiver receiver = session.createReceiver(queue); 1647 1648 for(int i=0;i<howMany;i++) 1649 { 1650 ObjectMessage message = (ObjectMessage )receiver.receive(100); 1651 1652 assertNotNull(message); 1653 1654 received.add(message); 1655 } 1656 1657 assertTrue("Expected "+howMany+" but only got ",received.size()==howMany); 1658 } 1659 catch(JMSException jmse) 1660 { 1661 fail(jmse); 1662 } 1663 } 1664 1665 public void testMessageSelectorQueues() 1666 { 1667 try 1668 { 1669 Hashtable <String ,Object > env = getEnvironment(); 1671 1672 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 1673 env.put(Context.PROVIDER_URL,"<not-used>"); 1674 env.put(SomniProperties.DEFAULT+"."+SomniProperties.CHANNELFACTORYCLASSNAMEPROP,env.get(MESSAGESELECTINGPRIORITYCHANNELFACTORYCLASS)); 1675 1676 Context ctx = new InitialContext (env); 1678 1679 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 1681 Queue queue = (Queue )ctx.lookup("testMessageSelectorQueues"); 1682 1683 TestExceptionListener exceptionListener = new TestExceptionListener(); 1684 connection.setExceptionListener(exceptionListener); 1685 1686 ctx.close(); 1687 1688 connection.start(); 1689 1690 sendObjectsWithBooleanProperties(connection,queue,10); 1691 1692 receiveObjectsWithProperty(connection,queue,5,"a"); 1693 receiveObjectsWithProperty(connection,queue,2,"b"); 1694 receiveObjectsCount(connection,queue,3); 1695 receiveNoObjects(connection,queue); 1696 1697 connection.close(); 1698 1699 if(exceptionListener.caughtException()) 1700 { 1701 fail(exceptionListener.getException()); 1702 } 1703 } 1704 catch(JMSException jmse) 1705 { 1706 fail(jmse); 1707 } 1708 catch(NamingException ioe) 1709 { 1710 fail(ioe); 1711 } 1712 } 1713 1714 protected void receiveObjectsWithProperty(QueueReceiver receiver,int howMany,String propName) 1715 { 1716 Set <ObjectMessage > received = new HashSet <ObjectMessage >(); 1717 1718 try 1719 { 1720 for(int i=0;i<howMany;i++) 1721 { 1722 ObjectMessage message = (ObjectMessage )receiver.receive(100); 1723 1724 assertNotNull(message); 1725 1726 received.add(message); 1727 assertTrue("message should have "+propName+" be true, but "+propName+" is "+message.getBooleanProperty(propName),message.getBooleanProperty(propName)); 1728 } 1729 1730 assertTrue("Expected "+howMany+" but only got ",received.size()==howMany); 1731 } 1732 catch(JMSException jmse) 1733 { 1734 fail(jmse); 1735 } 1736 } 1737 1738 public void testMessageSelectorQueuesLater() 1739 { 1740 try 1741 { 1742 Hashtable <String ,Object > env = getEnvironment(); 1744 1745 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 1746 env.put(Context.PROVIDER_URL,"<not-used>"); 1747 env.put(SomniProperties.DEFAULT+"."+SomniProperties.CHANNELFACTORYCLASSNAMEPROP,env.get(MESSAGESELECTINGPRIORITYCHANNELFACTORYCLASS)); 1748 1749 Context ctx = new InitialContext (env); 1751 1752 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 1754 Queue queue = (Queue )ctx.lookup("testMessageSelectorQueuesLater"); 1755 1756 TestExceptionListener exceptionListener = new TestExceptionListener(); 1757 connection.setExceptionListener(exceptionListener); 1758 1759 ctx.close(); 1760 1761 connection.start(); 1762 1763 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1764 QueueReceiver aReceiver = session.createReceiver(queue,"a = 'true'"); 1765 QueueReceiver bReceiver = session.createReceiver(queue,"b = 'true'"); 1766 1767 sendObjectsWithBooleanProperties(connection,queue,10); 1768 1769 receiveObjectsWithProperty(aReceiver,5,"a"); 1770 receiveObjectsWithProperty(bReceiver,2,"b"); 1771 receiveObjectsCount(connection,queue,3); 1772 receiveNoObjects(connection,queue); 1773 1774 connection.close(); 1775 1776 if(exceptionListener.caughtException()) 1777 { 1778 fail(exceptionListener.getException()); 1779 } 1780 } 1781 catch(JMSException jmse) 1782 { 1783 fail(jmse); 1784 } 1785 catch(NamingException ioe) 1786 { 1787 fail(ioe); 1788 } 1789 } 1790 1791 public void testTimeoutChannelFactory() 1792 { 1793 try 1794 { 1795 Hashtable <String ,Object > env = getEnvironment(); 1797 1798 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 1799 env.put(Context.PROVIDER_URL,"<not-used>"); 1800 env.put("testTimeoutChannelFactory"+"."+SomniProperties.CHANNELFACTORYCLASSNAMEPROP,TimeoutChannelFactory.class.getName()); 1801 env.put(TimeoutChannelFactory.DEFAULTTIMEOUTPROP,"200"); 1802 1803 Context ctx = new InitialContext (env); 1805 1806 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 1808 Queue queue = (Queue )ctx.lookup("testTimeoutChannelFactory"); 1809 1810 TestExceptionListener exceptionListener = new TestExceptionListener(); 1811 connection.setExceptionListener(exceptionListener); 1812 1813 ctx.close(); 1814 1815 connection.start(); 1816 1817 sendObjects(connection,queue,10); 1818 1819 receiveObjects(connection,queue,10); 1820 receiveNoObjects(connection,queue); 1821 1822 connection.close(); 1823 1824 if(exceptionListener.caughtException()) 1825 { 1826 fail(exceptionListener.getException()); 1827 } 1828 } 1829 catch(JMSException jmse) 1830 { 1831 fail(jmse); 1832 } 1833 catch(NamingException ioe) 1834 { 1835 fail(ioe); 1836 } 1837 } 1838 1839 public void testNoCopyHotStart() 1840 { 1841 try 1842 { 1843 SomniJNDIBypass.IT.getQueueContext().getEnvironment().put("testNoCopyHotStart."+SomniProperties.COPYMODE,SomniProperties.NOCOPY); 1844 1845 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1846 1847 TestExceptionListener exceptionListener = new TestExceptionListener(); 1848 connection.setExceptionListener(exceptionListener); 1849 1850 connection.start(); 1851 1852 Queue queue = SomniJNDIBypass.IT.getQueue("testNoCopyHotStart"); 1853 1854 sendObjects(connection,queue,10); 1855 1856 receiveObjects(connection,queue,10,SomniProperties.NOCOPY); 1857 1858 receiveNoObjects(connection,queue); 1859 1860 connection.close(); 1861 connection.close(); 1863 1864 if(exceptionListener.caughtException()) 1865 { 1866 fail(exceptionListener.getException()); 1867 } 1868 } 1869 catch(JMSException jmse) 1870 { 1871 fail(jmse); 1872 } 1873 catch(NamingException ne) 1874 { 1875 fail(ne); 1876 } 1877 } 1878 1879 public void testShallowCopyHotStart() 1880 { 1881 try 1882 { 1883 SomniJNDIBypass.IT.getQueueContext().getEnvironment().put("testShallowCopyHotStart."+SomniProperties.COPYMODE,SomniProperties.SHALLOWCOPY); 1884 1885 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1886 1887 TestExceptionListener exceptionListener = new TestExceptionListener(); 1888 connection.setExceptionListener(exceptionListener); 1889 1890 connection.start(); 1891 1892 Queue queue = SomniJNDIBypass.IT.getQueue("testShallowCopyHotStart"); 1893 1894 sendObjects(connection,queue,10); 1895 1896 receiveObjects(connection,queue,10,SomniProperties.SHALLOWCOPY); 1897 1898 receiveNoObjects(connection,queue); 1899 1900 connection.close(); 1901 connection.close(); 1903 1904 if(exceptionListener.caughtException()) 1905 { 1906 fail(exceptionListener.getException()); 1907 } 1908 } 1909 catch(JMSException jmse) 1910 { 1911 fail(jmse); 1912 } 1913 catch(NamingException ne) 1914 { 1915 fail(ne); 1916 } 1917 } 1918 1919 public void testDeepCopyHotStart() 1920 { 1921 try 1922 { 1923 SomniJNDIBypass.IT.getQueueContext().getEnvironment().put("testDeepCopyHotStart."+SomniProperties.COPYMODE,SomniProperties.DEEPCOPY); 1924 1925 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1926 1927 TestExceptionListener exceptionListener = new TestExceptionListener(); 1928 connection.setExceptionListener(exceptionListener); 1929 1930 connection.start(); 1931 1932 Queue queue = SomniJNDIBypass.IT.getQueue("testDeepCopyHotStart"); 1933 1934 sendObjects(connection,queue,10); 1935 1936 receiveObjects(connection,queue,10,SomniProperties.DEEPCOPY); 1937 1938 receiveNoObjects(connection,queue); 1939 1940 connection.close(); 1941 connection.close(); 1943 1944 if(exceptionListener.caughtException()) 1945 { 1946 fail(exceptionListener.getException()); 1947 } 1948 } 1949 catch(JMSException jmse) 1950 { 1951 fail(jmse); 1952 } 1953 catch(NamingException ne) 1954 { 1955 fail(ne); 1956 } 1957 } 1958 1959 public void testQueueBrowser() 1960 { 1961 try 1962 { 1963 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 1964 1965 TestExceptionListener exceptionListener = new TestExceptionListener(); 1966 connection.setExceptionListener(exceptionListener); 1967 1968 connection.start(); 1969 1970 Queue queue = SomniJNDIBypass.IT.getQueue("hotStartTest"); 1971 1972 sendObjects(connection,queue,10); 1973 1974 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 1975 QueueBrowser queueBrowser = session.createBrowser(queue); 1976 1977 List <Message > browsed = new ArrayList <Message >(); 1978 Enumeration en = queueBrowser.getEnumeration(); 1979 while(en.hasMoreElements()) 1980 { 1981 browsed.add((Message )en.nextElement()); 1982 } 1983 1984 assertTrue("Expected 10 messages in the browser, but only found "+browsed.size(),browsed.size()==10); 1985 1986 receiveObjects(connection,queue,10); 1987 1988 connection.close(); 1989 1990 if(exceptionListener.caughtException()) 1991 { 1992 fail(exceptionListener.getException()); 1993 } 1994 } 1995 catch(JMSException jmse) 1996 { 1997 fail(jmse); 1998 } 1999 } 2000 2001 public void testQueueBrowserWithMessageSelector() 2002 { 2003 try 2004 { 2005 QueueConnection connection = SomniJNDIBypass.IT.getQueueConnectionFactory().createQueueConnection(); 2006 2007 TestExceptionListener exceptionListener = new TestExceptionListener(); 2008 connection.setExceptionListener(exceptionListener); 2009 2010 connection.start(); 2011 2012 Queue queue = SomniJNDIBypass.IT.getQueue("testQueueBrowserWithMessageSelector"); 2013 2014 sendObjectsWithBooleanProperties(connection,queue,10); 2015 2016 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 2017 QueueBrowser queueBrowser = session.createBrowser(queue,"a = 'true'"); 2018 2019 List <Message > browsed = new ArrayList <Message >(); 2020 Enumeration en = queueBrowser.getEnumeration(); 2021 while(en.hasMoreElements()) 2022 { 2023 browsed.add((Message )en.nextElement()); 2024 } 2025 2026 assertTrue("Expected 5 messages in the browser, but only found "+browsed.size(),browsed.size()==5); 2027 2028 receiveObjects(connection,queue,10); 2029 2030 connection.close(); 2031 2032 if(exceptionListener.caughtException()) 2033 { 2034 fail(exceptionListener.getException()); 2035 } 2036 } 2037 catch(JMSException jmse) 2038 { 2039 fail(jmse); 2040 } 2041 } 2042 2043 protected static final String SIMPLECHANNELFACTORYCLASS = "simpleChannelFactoryClass"; 2044 protected static final String PRIORITYCHANNELFACTORYCLASS = "priorityChannelFactoryClass"; 2045 protected static final String MESSAGESELECTINGPRIORITYCHANNELFACTORYCLASS = "MessageSelectingPriorityChannelFactoryClass"; 2046 2047 protected Hashtable <String ,Object > getEnvironment() 2048 { 2049 Hashtable <String ,Object > env = new Hashtable <String ,Object >(); 2050 2051 env.put(SIMPLECHANNELFACTORYCLASS,SimpleChannelFactory.class.getName()); 2052 env.put(PRIORITYCHANNELFACTORYCLASS,PriorityChannelFactory.class.getName()); 2053 env.put(MESSAGESELECTINGPRIORITYCHANNELFACTORYCLASS,MessageSelectingPriorityChannelFactory.class.getName()); 2054 env.put(SomniProperties.DEFAULTQUEUECHANNELFACTORYCLASSNAME,SimpleChannelFactory.class.getName()); 2055 env.put(SomniProperties.DEFAULT+"."+SomniProperties.COPYMODE,SomniProperties.DEEPCOPY); 2056 2057 return env; 2058 } 2059 2060 protected void setUp() 2061 { 2062 System.getProperties().putAll(getEnvironment()); 2063 } 2064 2065 public static Test suite() 2066 { 2067 TestSuite suite = new TestSuite() ; 2068 2069 suite.addTest(new QueueTest("testHotStart")); 2070 suite.addTest(new QueueTest("testExpire")); 2071 suite.addTest(new QueueTest("testTwoThreads")); 2072 suite.addTest(new QueueTest("testNoWait")); 2073 suite.addTest(new QueueTest("testMessageListener")); 2074 suite.addTest(new QueueTest("testClosingMessageListener")); 2075 suite.addTest(new QueueTest("testTwoMessageListenersInTwoSessions")); 2076 suite.addTest(new QueueTest("testTwoMessageListenersInOneSession")); 2077 suite.addTest(new QueueTest("testJNDI")); 2078 suite.addTest(new QueueTest("testMinimalJNDI")); 2079 suite.addTest(new QueueTest("testStopStart")); 2080 suite.addTest(new QueueTest("testRequestReply")); 2081 suite.addTest(new QueueTest("testRequestForwardReply")); 2082 suite.addTest(new QueueTest("testMessageListenerClosingDeadlock")); 2083 suite.addTest(new QueueTest("testSimpleChannelFactory")); 2084 suite.addTest(new QueueTest("testPriorityChannelFactory")); 2085 suite.addTest(new QueueTest("testMessageSelectingPriorityChannelFactory")); 2086 suite.addTest(new QueueTest("testMessageSelectorQueues")); 2087 suite.addTest(new QueueTest("testMessageSelectorQueuesLater")); 2088 suite.addTest(new QueueTest("testTimeoutChannelFactory")); 2089 suite.addTest(new QueueTest("testNoCopyHotStart")); 2090 suite.addTest(new QueueTest("testShallowCopyHotStart")); 2091 suite.addTest(new QueueTest("testDeepCopyHotStart")); 2092 suite.addTest(new QueueTest("testQueueBrowser")); 2093 suite.addTest(new QueueTest("testQueueBrowserWithMessageSelector")); 2094 suite.addTest(new QueueTest("testSomniRequestReply")); 2095 suite.addTest(new QueueTest("testSomniRequestReplyWithTimeout")); 2096 2097 return suite; 2098 } 2099} 2100 2101 2102 2124 2125 2126 | Popular Tags |