1 22 package org.jboss.test.jbossmq.test; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.DeliveryMode ; 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageListener ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 import javax.jms.Queue ; 42 import javax.naming.Context ; 43 44 import org.jboss.logging.Logger; 45 import org.jboss.test.JBossTestCase; 46 47 53 public class RollBackUnitTestCase extends JBossTestCase 54 { 55 56 static String TOPIC_FACTORY = "ConnectionFactory"; 58 59 static String QUEUE_FACTORY = "ConnectionFactory"; 60 61 static String TEST_QUEUE = "queue/testQueue"; 62 63 static String TEST_TOPIC = "topic/testTopic"; 64 65 static String TEST_DURABLE_TOPIC = "topic/testDurableTopic"; 66 67 static byte[] PAYLOAD = new byte[10]; 68 69 static Context context; 70 71 static QueueConnection queueConnection; 72 73 static TopicConnection topicConnection; 74 75 static TopicConnection topicDurableConnection; 76 77 83 public RollBackUnitTestCase(String name) throws Exception 84 { 85 super(name); 86 } 87 88 94 public void runQueueSendRollBack(final int persistence, final boolean explicit) throws Exception 95 { 96 drainQueue(); 97 final int iterationCount = getIterationCount(); 98 final Logger log = getLog(); 99 100 Thread sendThread = new Thread () 101 { 102 public void run() 103 { 104 try 105 { 106 QueueSession session = queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); 107 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 108 109 QueueSender sender = session.createSender(queue); 110 111 BytesMessage message = session.createBytesMessage(); 112 message.writeBytes(PAYLOAD); 113 message.setStringProperty("TEST_NAME", "runQueueSendRollback"); 114 message.setIntProperty("TEST_PERSISTENCE", persistence); 115 message.setBooleanProperty("TEST_EXPLICIT", explicit); 116 117 for (int i = 0; i < iterationCount; i++) 118 { 119 sender.send(message, persistence, 4, 0); 120 } 121 122 if (explicit) 123 session.rollback(); 124 session.close(); 125 } 126 catch (Exception e) 127 { 128 log.error("error", e); 129 } 130 } 131 }; 132 133 sendThread.start(); 134 sendThread.join(); 135 assertTrue("Queue should be empty", drainQueue() == 0); 136 } 137 138 144 public void runTopicSendRollBack(final int persistence, final boolean explicit) throws Exception 145 { 146 drainQueue(); 147 drainTopic(); 148 149 final int iterationCount = getIterationCount(); 150 final Logger log = getLog(); 151 152 Thread sendThread = new Thread () 153 { 154 public void run() 155 { 156 try 157 { 158 159 TopicSession session = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); 160 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 161 162 TopicPublisher publisher = session.createPublisher(topic); 163 164 BytesMessage message = session.createBytesMessage(); 165 message.writeBytes(PAYLOAD); 166 message.setStringProperty("TEST_NAME", "runTopicSendRollback"); 167 message.setIntProperty("TEST_PERSISTENCE", persistence); 168 message.setBooleanProperty("TEST_EXPLICIT", explicit); 169 170 for (int i = 0; i < iterationCount; i++) 171 { 172 publisher.publish(message, persistence, 4, 0); 173 } 174 175 session.close(); 176 } 177 catch (Exception e) 178 { 179 log.error("error", e); 180 } 181 } 182 }; 183 184 sendThread.start(); 185 sendThread.join(); 186 assertTrue("Topic should be empty", drainTopic() == 0); 187 } 188 189 195 public void runAsynchQueueReceiveRollBack(final int persistence, final boolean explicit) throws Exception 196 { 197 drainQueue(); 198 199 final int iterationCount = getIterationCount(); 200 final Logger log = getLog(); 201 202 Thread sendThread = new Thread () 203 { 204 public void run() 205 { 206 try 207 { 208 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 209 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 210 211 QueueSender sender = session.createSender(queue); 212 213 BytesMessage message = session.createBytesMessage(); 214 message.writeBytes(PAYLOAD); 215 message.setStringProperty("TEST_NAME", "runAsynchQueueReceiveRollback"); 216 message.setIntProperty("TEST_PERSISTENCE", persistence); 217 message.setBooleanProperty("TEST_EXPLICIT", explicit); 218 219 for (int i = 0; i < iterationCount; i++) 220 { 221 sender.send(message, persistence, 4, 0); 222 } 223 224 session.close(); 225 } 226 catch (Exception e) 227 { 228 log.error("error", e); 229 } 230 } 231 }; 232 233 QueueSession session = queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); 234 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 235 QueueReceiver receiver = session.createReceiver(queue); 236 237 MyMessageListener listener = new MyMessageListener(iterationCount, log); 238 239 sendThread.start(); 240 receiver.setMessageListener(listener); 241 queueConnection.start(); 242 synchronized (listener) 243 { 244 if (listener.i < iterationCount) 245 listener.wait(); 246 } 247 receiver.setMessageListener(null); 248 249 if (explicit) 250 session.rollback(); 251 session.close(); 252 253 queueConnection.stop(); 254 255 sendThread.join(); 256 257 assertTrue("Queue should be full", drainQueue() == iterationCount); 258 259 } 260 261 267 public void runAsynchTopicReceiveRollBack(final int persistence, final boolean explicit) throws Exception 268 { 269 drainQueue(); 270 drainTopic(); 271 272 final int iterationCount = getIterationCount(); 273 final Logger log = getLog(); 274 275 Thread sendThread = new Thread () 276 { 277 public void run() 278 { 279 try 280 { 281 282 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 283 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 284 285 TopicPublisher publisher = session.createPublisher(topic); 286 287 waitForSynchMessage(); 288 289 BytesMessage message = session.createBytesMessage(); 290 message.writeBytes(PAYLOAD); 291 message.setStringProperty("TEST_NAME", "runAsynchTopicReceiveRollback"); 292 message.setIntProperty("TEST_PERSISTENCE", persistence); 293 message.setBooleanProperty("TEST_EXPLICIT", explicit); 294 295 for (int i = 0; i < iterationCount; i++) 296 { 297 publisher.publish(message, persistence, 4, 0); 298 log.debug("Published message " + i); 299 } 300 301 session.close(); 302 } 303 catch (Exception e) 304 { 305 log.error("error", e); 306 } 307 } 308 }; 309 310 TopicSession session = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); 311 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 312 TopicSubscriber subscriber = session.createSubscriber(topic); 313 314 MyMessageListener listener = new MyMessageListener(iterationCount, log); 315 316 queueConnection.start(); 317 sendThread.start(); 318 subscriber.setMessageListener(listener); 319 topicConnection.start(); 320 sendSynchMessage(); 321 getLog().debug("Waiting for all messages"); 322 synchronized (listener) 323 { 324 if (listener.i < iterationCount) 325 listener.wait(); 326 } 327 getLog().debug("Got all messages"); 328 subscriber.setMessageListener(null); 329 330 if (explicit) 331 session.rollback(); 332 session.close(); 333 334 sendThread.join(); 335 topicConnection.stop(); 336 queueConnection.stop(); 337 assertTrue("Topic should be empty", drainTopic() == 0); 338 } 339 340 346 public void runAsynchDurableTopicReceiveRollBack(final int persistence, final boolean explicit) throws Exception 347 { 348 getLog().debug("====> runAsynchDurableTopicReceiveRollBack persistence=" + persistence + " explicit=" + explicit); 349 drainQueue(); 350 drainDurableTopic(); 351 352 final int iterationCount = getIterationCount(); 353 final Logger log = getLog(); 354 355 Thread sendThread = new Thread () 356 { 357 public void run() 358 { 359 try 360 { 361 362 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 363 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 364 365 TopicPublisher publisher = session.createPublisher(topic); 366 367 waitForSynchMessage(); 368 369 BytesMessage message = session.createBytesMessage(); 370 message.writeBytes(PAYLOAD); 371 message.setStringProperty("TEST_NAME", "runAsynchDurableTopicReceiveRollback"); 372 message.setIntProperty("TEST_PERSISTENCE", persistence); 373 message.setBooleanProperty("TEST_EXPLICIT", explicit); 374 375 for (int i = 0; i < iterationCount; i++) 376 { 377 publisher.publish(message, persistence, 4, 0); 378 log.debug("Published message " + i); 379 } 380 381 session.close(); 382 } 383 catch (Exception e) 384 { 385 log.error("error", e); 386 } 387 } 388 }; 389 390 TopicSession session = topicDurableConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); 391 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 392 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test"); 393 try 394 { 395 MyMessageListener listener = new MyMessageListener(iterationCount, log); 396 397 queueConnection.start(); 398 sendThread.start(); 399 subscriber.setMessageListener(listener); 400 topicDurableConnection.start(); 401 sendSynchMessage(); 402 getLog().debug("Waiting for all messages"); 403 synchronized (listener) 404 { 405 if (listener.i < iterationCount) 406 listener.wait(); 407 } 408 getLog().debug("Got all messages"); 409 subscriber.setMessageListener(null); 410 subscriber.close(); 411 412 if (explicit) 413 session.rollback(); 414 session.close(); 415 416 sendThread.join(); 417 topicDurableConnection.stop(); 418 queueConnection.stop(); 419 assertTrue("Topic should be full", drainDurableTopic() == iterationCount); 420 } 421 finally 422 { 423 removeDurableSubscription(); 424 } 425 } 426 427 432 public void testQueueSendRollBack() throws Exception 433 { 434 435 getLog().debug("Starting AsynchQueueSendRollBack test"); 436 437 runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, false); 438 runQueueSendRollBack(DeliveryMode.PERSISTENT, false); 439 runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true); 440 runQueueSendRollBack(DeliveryMode.PERSISTENT, true); 441 442 getLog().debug("AsynchQueueSendRollBack passed"); 443 } 444 445 450 public void testAsynchQueueReceiveBack() throws Exception 451 { 452 453 getLog().debug("Starting AsynchQueueReceiveRollBack test"); 454 455 runAsynchQueueReceiveRollBack(DeliveryMode.NON_PERSISTENT, false); 456 runAsynchQueueReceiveRollBack(DeliveryMode.PERSISTENT, false); 457 runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true); 458 runQueueSendRollBack(DeliveryMode.PERSISTENT, true); 459 460 getLog().debug("AsynchQueueReceiveRollBack passed"); 461 } 462 463 468 public void testTopicSendRollBack() throws Exception 469 { 470 471 getLog().debug("Starting AsynchTopicSendRollBack test"); 472 473 runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, false); 474 runTopicSendRollBack(DeliveryMode.PERSISTENT, false); 475 runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, true); 476 runTopicSendRollBack(DeliveryMode.PERSISTENT, true); 477 478 getLog().debug("AsynchTopicSendRollBack passed"); 479 } 480 481 486 public void testAsynchTopicReceiveRollBack() throws Exception 487 { 488 489 getLog().debug("Starting AsynchTopicReceiveRollBack test"); 490 491 runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, false); 492 runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, false); 493 runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true); 494 runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, true); 495 496 getLog().debug("AsynchTopicReceiveRollBack passed"); 497 } 498 499 504 public void testAsynchDurableTopicReceiveRollBack() throws Exception 505 { 506 507 getLog().debug("Starting AsynchDurableTopicReceiveRollBack test"); 508 509 runAsynchDurableTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, false); 510 runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT, false); 511 runAsynchDurableTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true); 512 runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT, true); 513 514 getLog().debug("AsynchDurableTopicReceiveRollBack passed"); 515 } 516 517 522 public void removeDurableSubscription() throws Exception 523 { 524 525 TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 526 session.unsubscribe("test"); 527 } 528 529 534 protected void setUp() throws Exception 535 { 536 super.setUp(); 537 538 getLog().debug("START TEST " + getName()); 539 context = getInitialContext(); 540 541 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 542 queueConnection = queueFactory.createQueueConnection(); 543 544 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 545 topicConnection = topicFactory.createTopicConnection(); 546 topicDurableConnection = topicFactory.createTopicConnection("john", "needle"); 547 548 getLog().debug("Connection to JBossMQ established."); 549 } 550 551 protected void tearDown() throws Exception 552 { 553 try 554 { 555 if (topicDurableConnection != null) 556 { 557 topicDurableConnection.close(); 558 topicDurableConnection = null; 559 } 560 } 561 catch (JMSException ignored) 562 { 563 } 564 try 565 { 566 if (topicConnection != null) 567 { 568 topicConnection.close(); 569 topicConnection = null; 570 } 571 } 572 catch (JMSException ignored) 573 { 574 } 575 try 576 { 577 if (queueConnection != null) 578 { 579 queueConnection.close(); 580 queueConnection = null; 581 } 582 } 583 catch (JMSException ignored) 584 { 585 } 586 super.tearDown(); 587 } 588 589 private int drainQueue() throws Exception 591 { 592 getLog().debug("Draining Queue"); 593 queueConnection.start(); 594 595 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 596 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 597 598 QueueReceiver receiver = session.createReceiver(queue); 599 Message message = receiver.receive(50); 600 int c = 0; 601 while (message != null) 602 { 603 c++; 604 message = receiver.receive(50); 605 } 606 607 getLog().debug(" Drained " + c + " messages from the queue"); 608 609 session.close(); 610 611 queueConnection.stop(); 612 613 return c; 614 } 615 616 private int drainTopic() throws Exception 618 { 619 getLog().debug("Draining Topic"); 620 topicConnection.start(); 621 622 final TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 623 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 624 TopicSubscriber subscriber = session.createSubscriber(topic); 625 626 Message message = subscriber.receive(50); 627 int c = 0; 628 while (message != null) 629 { 630 c++; 631 message = subscriber.receive(50); 632 } 633 634 getLog().debug(" Drained " + c + " messages from the topic"); 635 636 session.close(); 637 638 topicConnection.stop(); 639 640 return c; 641 } 642 643 private int drainDurableTopic() throws Exception 645 { 646 getLog().debug("Draining Durable Topic"); 647 topicDurableConnection.start(); 648 649 final TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 650 Topic topic = (Topic ) context.lookup(TEST_DURABLE_TOPIC); 651 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test"); 652 653 Message message = subscriber.receive(50); 654 int c = 0; 655 while (message != null) 656 { 657 c++; 658 message = subscriber.receive(50); 659 } 660 661 getLog().debug(" Drained " + c + " messages from the durable topic"); 662 663 session.close(); 664 665 topicDurableConnection.stop(); 666 667 return c; 668 } 669 670 private void waitForSynchMessage() throws Exception 671 { 672 getLog().debug("Waiting for Synch Message"); 673 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 674 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 675 676 QueueReceiver receiver = session.createReceiver(queue); 677 receiver.receive(); 678 session.close(); 679 getLog().debug("Got Synch Message"); 680 } 681 682 private void sendSynchMessage() throws Exception 683 { 684 getLog().debug("Sending Synch Message"); 685 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 686 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 687 688 QueueSender sender = session.createSender(queue); 689 690 Message message = session.createMessage(); 691 sender.send(message); 692 693 session.close(); 694 getLog().debug("Sent Synch Message"); 695 } 696 697 public class MyMessageListener implements MessageListener 698 { 699 public int i = 0; 700 701 public int iterationCount; 702 703 public Logger log; 704 705 public MyMessageListener(int iterationCount, Logger log) 706 { 707 this.iterationCount = iterationCount; 708 this.log = log; 709 } 710 711 public void onMessage(Message message) 712 { 713 synchronized (this) 714 { 715 i++; 716 log.debug("Got message " + i); 717 if (i >= iterationCount) 718 this.notify(); 719 } 720 } 721 } 722 723 public int getIterationCount() 724 { 725 return 5; 726 } 727 728 public static junit.framework.Test suite() throws Exception 729 { 730 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 731 return getDeploySetup(RollBackUnitTestCase.class, 732 loader.getResource("messaging/test-destinations-service.xml").toString()); 733 } 734 } 735 | Popular Tags |