1 22 package org.jboss.test.jbossmq.test; 23 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageListener ; 27 import javax.jms.ObjectMessage ; 28 import javax.jms.Queue ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.naming.Context ; 36 import javax.naming.InitialContext ; 37 38 import org.jboss.test.JBossTestCase; 39 40 41 42 43 44 59 60 public class JBossSessionRecoverUnitTestCase 61 62 extends JBossTestCase 63 64 { 65 66 String QUEUE_FACTORY = "ConnectionFactory"; 67 68 String TEST_QUEUE = "queue/testQueue"; 69 70 71 72 Context context; 73 74 QueueConnection queueConnection; 75 76 QueueSession session; 77 78 79 80 int counter=0; 81 82 Exception exception=null; 83 84 85 86 public JBossSessionRecoverUnitTestCase(String name) throws Exception 87 88 { 89 90 super(name); 91 92 } 93 94 95 96 protected void setUp() 97 98 throws Exception 99 100 { 101 102 this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory started"); 103 104 } 105 106 107 108 protected void tearDown() throws Exception 109 110 { 111 112 this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory done"); 113 114 } 115 116 117 118 120 private void drainQueue() throws Exception 121 122 { 123 124 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 125 126 Queue queue = (Queue )context.lookup(TEST_QUEUE); 127 128 129 130 QueueReceiver receiver = session.createReceiver(queue); 131 132 Message message = receiver.receive( 1000 ); 133 134 int c=0; 135 136 while( message != null ) 137 138 { 139 140 message = receiver.receive( 1000 ); 141 142 c++; 143 144 } 145 146 147 148 if( c!=0 ) 149 150 getLog().debug(" Drained "+c+" messages from the queue"); 151 152 153 154 session.close(); 155 156 } 157 158 159 160 static public void main ( String []args ) 161 162 { 163 164 String newArgs[] = { "org.jboss.test.jbossmq.test.JBossSessionRecoverUnitTestCase" }; 165 166 junit.swingui.TestRunner.main(newArgs); 167 168 } 169 170 171 172 protected void connect() throws Exception 173 174 { 175 176 if( context == null ) 177 178 { 179 180 context = new InitialContext (); 181 182 } 183 184 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 185 186 queueConnection = queueFactory.createQueueConnection(); 187 188 189 190 getLog().debug("Connection to JBossMQ established."); 191 192 } 193 194 195 196 201 202 public void testQueueSessionRecovermessageListener() throws Exception 203 204 { 205 206 counter = 0; 207 208 getLog().debug("Starting session.recover() Message Listener test"); 209 210 211 212 connect(); 213 214 215 216 queueConnection.start(); 217 218 219 220 drainQueue(); 221 222 223 224 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 225 226 Queue queue = (Queue )context.lookup(TEST_QUEUE); 227 228 QueueSender sender = session.createSender(queue); 229 230 231 232 234 for ( int i=0; i<20; i++ ) 235 236 { 237 238 sender.send(session.createObjectMessage(new Integer (i))); 239 240 } 241 242 243 244 246 session.close(); 247 248 queueConnection.stop(); 249 250 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 251 252 253 254 256 QueueReceiver receiver = session.createReceiver( queue ); 257 258 MessageListener messagelistener = new MessageListener () 259 260 { 261 262 public void onMessage(Message message) 263 264 { 265 266 processMessage( message ); 267 268 } 269 270 }; 271 272 receiver.setMessageListener( messagelistener ); 273 274 queueConnection.start(); 275 276 277 278 280 282 while ( counter < 40 && exception == null ) 283 284 { 285 286 try 287 288 { 289 290 Thread.sleep( 500 ); 291 292 } 293 294 catch ( InterruptedException ie ) 295 296 { 297 298 } 299 300 } 301 302 303 304 if ( exception != null ) 305 306 { 307 308 queueConnection.close(); 309 310 throw exception; 311 312 } 313 314 315 316 queueConnection.close(); 317 318 getLog().debug("session.recover() Message Listener passed"); 319 320 } 321 322 323 324 private void processMessage ( Message message ) 325 326 { 327 328 try 329 330 { 331 332 if ( message instanceof ObjectMessage ) 333 334 { 335 336 counter++; 337 338 ObjectMessage objectmessage = (ObjectMessage )message; 339 340 Integer integer = (Integer )objectmessage.getObject(); 341 342 int mynumber = integer.intValue(); 343 344 getLog().debug("message object " + integer + " counter=" + counter ); 345 346 if ( mynumber == 19 ) 347 348 { 349 350 if (counter == 20) 351 352 { 353 354 session.recover(); 355 356 } 357 358 else 359 360 { 361 362 message.acknowledge(); 363 364 } 365 366 } 367 368 } 369 370 } 371 372 catch ( JMSException e ) 373 374 { 375 376 exception = e; 377 378 } 379 380 } 381 382 383 class Synch 384 { 385 boolean waiting = false; 386 public synchronized void doWait(long timeout) 387 throws InterruptedException 388 { 389 waiting = true; 390 this.wait(timeout); 391 } 392 public synchronized void doNotify() 393 throws InterruptedException 394 { 395 while (waiting == false) 396 wait(100); 397 this.notifyAll(); 398 } 399 } 400 401 402 407 408 public void testQueueSessionRecoverMessageListenerOrder() 409 throws Exception 410 411 { 412 413 counter = 0; 414 415 exception = null; 416 getLog().debug("Starting session.recover() Message Listener Order test"); 417 418 419 420 connect(); 421 422 423 424 queueConnection.start(); 425 426 427 428 drainQueue(); 429 430 431 432 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 433 434 Queue queue = (Queue )context.lookup(TEST_QUEUE); 435 436 QueueSender sender = session.createSender(queue); 437 438 439 440 442 for (int i=0; i<4; ++i) 443 444 { 445 446 sender.send(session.createObjectMessage(new Integer (i))); 447 448 } 449 450 451 452 454 QueueReceiver receiver = session.createReceiver( queue ); 455 456 final Synch synch = new Synch(); 457 MessageListener messagelistener = new MessageListener () 458 459 { 460 461 public void onMessage(Message message) 462 463 { 464 465 checkMessagesInOrder(session, message, synch); 466 467 } 468 469 }; 470 471 receiver.setMessageListener( messagelistener ); 472 473 queueConnection.start(); 474 475 476 synch.doWait(10000); 477 478 479 if ( exception != null ) 480 481 { 482 483 queueConnection.close(); 484 485 throw exception; 486 487 } 488 489 490 491 queueConnection.close(); 492 493 getLog().debug("session.recover() Message Listener Order passed"); 494 495 } 496 497 498 499 private void checkMessagesInOrder(Session session, Message message, Synch synch) 500 501 { 502 503 try 504 505 { 506 507 ObjectMessage objectmessage = (ObjectMessage )message; 508 509 Integer integer = (Integer )objectmessage.getObject(); 510 511 int mynumber = integer.intValue(); 512 513 if (message.getJMSRedelivered() == false) 514 { 515 log.debug("Recovering " + mynumber); 516 session.recover(); 517 return; 518 } 519 520 log.debug("Checking " + mynumber); 521 assertTrue("Expected messages in order", mynumber == counter); 522 counter++; 523 if (counter == 4) 524 synch.doNotify(); 525 } 526 527 catch (Exception e) 528 529 { 530 531 exception = e; 532 533 } 534 535 } 536 537 538 539 544 545 public void testQueueSessionRecoverReceive() throws Exception 546 547 { 548 549 counter = 0; 550 551 getLog().debug("Starting session.recover() receive test"); 552 553 554 555 connect(); 556 557 558 559 queueConnection.start(); 560 561 562 563 drainQueue(); 564 565 566 567 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 568 569 Queue queue = (Queue )context.lookup(TEST_QUEUE); 570 571 QueueSender sender = session.createSender(queue); 572 573 574 575 577 for ( int i=0; i<20; i++ ) 578 579 { 580 581 sender.send(session.createObjectMessage(new Integer (i))); 582 583 } 584 585 586 587 589 session.close(); 590 591 queueConnection.stop(); 592 593 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 594 595 596 597 599 QueueReceiver receiver = session.createReceiver( queue ); 600 601 queueConnection.start(); 602 603 604 605 Message message = receiver.receive( 1000 ); 606 607 int messagecounter=0; 608 609 while( message != null ) 610 611 { 612 613 message = receiver.receive( 1000 ); 614 615 messagecounter++; 616 617 } 618 619 620 621 if ( messagecounter != 20 ) 622 623 { 624 625 throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter ); 626 627 } 628 629 630 631 633 session.recover(); 634 635 636 637 message = receiver.receive(); 638 639 messagecounter=0; 640 641 while( message != null ) 642 643 { 644 645 if ( !message.getJMSRedelivered() ) 646 647 { 648 649 throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter ); 650 651 } 652 653 message.acknowledge(); 654 655 656 657 messagecounter++; 658 659 661 if ( messagecounter < 15 ) 662 663 { 664 665 message = receiver.receive(); 666 667 } 668 669 else 670 671 { 672 673 message = receiver.receive ( 1000 ); 674 675 } 676 677 } 678 679 680 681 if ( messagecounter != 20 ) 682 683 { 684 685 throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter ); 686 687 } 688 689 690 691 queueConnection.close(); 692 693 getLog().debug("session.recover() receive passed"); 694 695 } 696 697 698 699 704 705 public void testQueueSessionRecoverReceiveTimeout() throws Exception 706 707 { 708 709 counter = 0; 710 711 getLog().debug("Starting session.recover() receive(timeout) test"); 712 713 714 715 connect(); 716 717 718 719 queueConnection.start(); 720 721 722 723 drainQueue(); 724 725 726 727 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 728 729 Queue queue = (Queue )context.lookup(TEST_QUEUE); 730 731 QueueSender sender = session.createSender(queue); 732 733 734 735 737 for ( int i=0; i<20; i++ ) 738 739 { 740 741 sender.send(session.createObjectMessage(new Integer (i))); 742 743 } 744 745 746 747 749 session.close(); 750 751 queueConnection.stop(); 752 753 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 754 755 756 757 759 QueueReceiver receiver = session.createReceiver( queue ); 760 761 queueConnection.start(); 762 763 764 765 Message message = receiver.receive( 1000 ); 766 767 int messagecounter=0; 768 769 while( message != null ) 770 771 { 772 773 message = receiver.receive( 1000 ); 774 775 messagecounter++; 776 777 } 778 779 780 781 if ( messagecounter != 20 ) 782 783 { 784 785 throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter ); 786 787 } 788 789 790 791 793 session.recover(); 794 795 796 797 message = receiver.receive(1000); 798 799 messagecounter=0; 800 801 while( message != null ) 802 803 { 804 805 if ( !message.getJMSRedelivered() ) 806 807 { 808 809 throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter ); 810 811 } 812 813 message.acknowledge(); 814 815 816 817 messagecounter++; 818 819 message = receiver.receive( 1000 ); 820 821 } 822 823 824 825 if ( messagecounter != 20 ) 826 827 { 828 829 throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter ); 830 831 } 832 833 834 835 queueConnection.close(); 836 837 getLog().debug("session.recover() receive(timeout) passed"); 838 839 } 840 841 842 843 848 849 public void testQueueSessionRecoverReceiveNoWait() throws Exception 850 851 { 852 853 counter = 0; 854 855 getLog().debug("Starting session.recover() receiveNoWait test"); 856 857 858 859 connect(); 860 861 862 863 queueConnection.start(); 864 865 866 867 drainQueue(); 868 869 870 871 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 872 873 Queue queue = (Queue )context.lookup(TEST_QUEUE); 874 875 QueueSender sender = session.createSender(queue); 876 877 878 879 881 for ( int i=0; i<20; i++ ) 882 883 { 884 885 sender.send(session.createObjectMessage(new Integer (i))); 886 887 } 888 889 890 891 893 session.close(); 894 895 queueConnection.stop(); 896 897 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 898 899 900 901 903 QueueReceiver receiver = session.createReceiver( queue ); 904 905 queueConnection.start(); 906 907 908 909 Message message = receiver.receiveNoWait(); 910 911 int messagecounter=0; 912 913 while( message != null ) 914 915 { 916 917 message = receiver.receiveNoWait(); 918 919 messagecounter++; 920 921 } 922 923 924 925 if ( messagecounter != 20 ) 926 927 { 928 929 throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter ); 930 931 } 932 933 934 935 937 session.recover(); 938 939 940 941 message = receiver.receiveNoWait(); 942 943 messagecounter=0; 944 945 while( message != null ) 946 947 { 948 949 if ( !message.getJMSRedelivered() ) 950 951 { 952 953 throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter ); 954 955 } 956 957 message.acknowledge(); 958 959 960 961 messagecounter++; 962 963 message = receiver.receiveNoWait(); 964 965 } 966 967 968 969 if ( messagecounter != 20 ) 970 971 { 972 973 throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter ); 974 975 } 976 977 978 979 queueConnection.close(); 980 981 getLog().debug("session.recover() receiveNoWait passed"); 982 983 } 984 985 public static junit.framework.Test suite() throws Exception 986 { 987 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 988 return getDeploySetup(JBossSessionRecoverUnitTestCase.class, 989 loader.getResource("messaging/test-destinations-service.xml").toString()); 990 } 991 992 } | Popular Tags |