1 22 package org.jboss.test.jbossmq.test; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.DeliveryMode ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.Queue ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 import javax.naming.Context ; 42 43 import junit.framework.Test; 44 import junit.framework.TestSuite; 45 46 import org.jboss.logging.Logger; 47 import org.jboss.test.JBossTestCase; 48 49 55 public class UnackedUnitTestCase extends JBossTestCase 56 { 57 static String TOPIC_FACTORY = "ConnectionFactory"; 59 static String QUEUE_FACTORY = "ConnectionFactory"; 60 61 static String TEST_QUEUE = "queue/testQueue"; 62 static String TEST_TOPIC = "topic/testTopic"; 63 static String TEST_DURABLE_TOPIC = "topic/testDurableTopic"; 64 65 static byte[] PAYLOAD = new byte[10]; 66 67 static Context context; 68 static QueueConnection queueConnection; 69 static TopicConnection topicConnection; 70 static TopicConnection topicDurableConnection; 71 72 public static Test suite() throws Exception 73 { 74 TestSuite suite = new TestSuite(); 77 suite.addTest(new UnackedUnitTestCase("testUnackedQueue")); 78 suite.addTest(new UnackedUnitTestCase("testUnackedMultipleSession")); 79 suite.addTest(new UnackedUnitTestCase("testUnackedMultipleConnection")); 80 suite.addTest(new UnackedUnitTestCase("testUnackedTopic")); 81 suite.addTest(new UnackedUnitTestCase("testUnackedDurableTopic")); 82 suite.addTest(new UnackedUnitTestCase("testDummyLast")); 83 84 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 85 String module = loader.getResource("messaging/test-destinations-service.xml").toString(); 86 87 return getDeploySetup(suite, module); 88 } 89 90 96 public UnackedUnitTestCase(String name) throws Exception 97 { 98 super(name); 99 } 100 101 107 public void runUnackedQueue(final int persistence) throws Exception 108 { 109 drainQueue(); 110 111 final int iterationCount = getIterationCount(); 112 113 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 114 Queue queue = (Queue )context.lookup(TEST_QUEUE); 115 116 QueueSender sender = session.createSender(queue); 117 118 Message message = session.createBytesMessage(); 119 ((BytesMessage )message).writeBytes(PAYLOAD); 120 121 for (int i = 0; i < iterationCount; i++) 122 sender.send(message, persistence, 4, 0); 123 124 session.close(); 125 126 session = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 127 queue = (Queue )context.lookup(TEST_QUEUE); 128 QueueReceiver receiver = session.createReceiver(queue); 129 queueConnection.start(); 130 message = receiver.receive(50); 131 int c = 0; 132 while (message != null) 133 { 134 message = receiver.receive(50); 135 c++; 136 } 137 assertTrue("Should have received all data unacked", c == iterationCount); 138 139 queueConnection.close(); 140 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 141 queueConnection = queueFactory.createQueueConnection(); 142 143 assertTrue("Queue should be full", drainQueue() == iterationCount); 144 145 } 146 147 153 public void runUnackedMultipleSession(final int persistence) throws Exception 154 { 155 drainQueue(); 156 157 final int iterationCount = getIterationCount(); 158 159 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 160 Queue queue = (Queue )context.lookup(TEST_QUEUE); 161 162 QueueSender sender = session.createSender(queue); 163 164 Message message = session.createBytesMessage(); 165 ((BytesMessage )message).writeBytes(PAYLOAD); 166 167 for (int i = 0; i < iterationCount; i++) 168 sender.send(message, persistence, 4, 0); 169 170 session.close(); 171 172 QueueSession session1 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 173 queue = (Queue )context.lookup(TEST_QUEUE); 174 QueueReceiver receiver1 = session1.createReceiver(queue); 175 QueueSession session2 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 176 QueueReceiver receiver2 = session2.createReceiver(queue); 177 queueConnection.start(); 178 179 int c = 0; 181 for (int l = 0; l < iterationCount/2; l++) 182 { 183 message = receiver1.receive(50); 184 if (message != null) 185 c++; 186 } 187 assertTrue("Should have received half data unacked", c == iterationCount/2); 188 189 c = 0; 191 Message lastMessage = null; 192 while (message != null) 193 { 194 message = receiver2.receive(50); 195 if (message != null) 196 { 197 c++; 198 lastMessage = message; 199 } 200 } 201 assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2); 202 203 session1.close(); 205 206 lastMessage.acknowledge(); 208 session2.close(); 209 210 queueConnection.stop(); 211 212 assertTrue("Session1 messages should be available", drainQueue() == iterationCount/2); 213 214 } 215 216 222 public void runUnackedMultipleConnection(final int persistence) throws Exception 223 { 224 drainQueue(); 225 226 final int iterationCount = getIterationCount(); 227 228 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 229 Queue queue = (Queue )context.lookup(TEST_QUEUE); 230 231 QueueSender sender = session.createSender(queue); 232 233 Message message = session.createBytesMessage(); 234 ((BytesMessage )message).writeBytes(PAYLOAD); 235 236 for (int i = 0; i < iterationCount; i++) 237 sender.send(message, persistence, 4, 0); 238 239 session.close(); 240 241 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 242 QueueConnection queueConnection1 = queueFactory.createQueueConnection(); 243 QueueSession session1 = queueConnection1.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 244 queue = (Queue )context.lookup(TEST_QUEUE); 245 QueueReceiver receiver1 = session1.createReceiver(queue); 246 247 QueueConnection queueConnection2 = queueFactory.createQueueConnection(); 248 QueueSession session2 = queueConnection2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 249 QueueReceiver receiver2 = session2.createReceiver(queue); 250 251 queueConnection1.start(); 252 queueConnection2.start(); 253 254 int c = 0; 256 for (int l = 0; l < iterationCount/2; l++) 257 { 258 message = receiver1.receive(50); 259 if (message != null) 260 c++; 261 } 262 assertTrue("Should have received half data unacked", c == iterationCount/2); 263 264 Message lastMessage = null; 266 c = 0; 267 while (message != null) 268 { 269 message = receiver2.receive(50); 270 if (message != null) 271 { 272 c++; 273 lastMessage = message; 274 } 275 } 276 assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2); 277 278 queueConnection1.close(); 280 281 lastMessage.acknowledge(); 283 queueConnection2.close(); 284 285 assertTrue("Connection1 messages should be available", drainQueue() == iterationCount/2); 286 287 } 288 289 295 public void runUnackedTopic(final int persistence) throws Exception 296 { 297 drainQueue(); 298 drainTopic(); 299 300 final int iterationCount = getIterationCount(); 301 final Logger log = getLog(); 302 303 Thread sendThread = 304 new Thread () 305 { 306 public void run() 307 { 308 try 309 { 310 311 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 312 Topic topic = (Topic )context.lookup(TEST_TOPIC); 313 314 TopicPublisher publisher = session.createPublisher(topic); 315 316 waitForSynchMessage(); 317 318 BytesMessage message = session.createBytesMessage(); 319 message.writeBytes(PAYLOAD); 320 321 for (int i = 0; i < iterationCount; i++) 322 { 323 publisher.publish(message, persistence, 4, 0); 324 } 325 326 session.close(); 327 } 328 catch (Exception e) 329 { 330 log.error("error", e); 331 } 332 } 333 }; 334 335 TopicSession session = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 336 Topic topic = (Topic )context.lookup(TEST_TOPIC); 337 TopicSubscriber subscriber = session.createSubscriber(topic); 338 339 340 MyMessageListener listener = new MyMessageListener(iterationCount, log); 341 342 queueConnection.start(); 343 sendThread.start(); 344 subscriber.setMessageListener(listener); 345 topicConnection.start(); 346 sendSynchMessage(); 347 synchronized (listener) 348 { 349 if (listener.i < iterationCount) 350 listener.wait(); 351 } 352 sendThread.join(); 353 topicConnection.close(); 354 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 355 topicConnection = topicFactory.createTopicConnection(); 356 queueConnection.stop(); 357 assertTrue("Topic should be empty", drainTopic() == 0); 358 } 359 360 366 public void runUnackedDurableTopic(final int persistence) throws Exception 367 { 368 drainQueue(); 369 drainDurableTopic(); 370 371 final int iterationCount = getIterationCount(); 372 final Logger log = getLog(); 373 374 Thread sendThread = 375 new Thread () 376 { 377 public void run() 378 { 379 try 380 { 381 382 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 383 Topic topic = (Topic )context.lookup(TEST_DURABLE_TOPIC); 384 385 TopicPublisher publisher = session.createPublisher(topic); 386 387 waitForSynchMessage(); 388 389 BytesMessage message = session.createBytesMessage(); 390 message.writeBytes(PAYLOAD); 391 392 for (int i = 0; i < iterationCount; i++) 393 { 394 publisher.publish(message, persistence, 4, 0); 395 } 396 397 session.close(); 398 } 399 catch (Exception e) 400 { 401 log.error("error", e); 402 } 403 } 404 }; 405 406 TopicSession session = topicDurableConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 407 Topic topic = (Topic )context.lookup(TEST_DURABLE_TOPIC); 408 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test"); 409 410 MyMessageListener listener = new MyMessageListener(iterationCount, log); 411 412 queueConnection.start(); 413 sendThread.start(); 414 subscriber.setMessageListener(listener); 415 topicDurableConnection.start(); 416 sendSynchMessage(); 417 synchronized (listener) 418 { 419 if (listener.i < iterationCount) 420 listener.wait(); 421 } 422 423 sendThread.join(); 424 topicDurableConnection.close(); 425 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 426 topicDurableConnection = topicFactory.createTopicConnection("john", "needle"); 427 queueConnection.stop(); 428 assertTrue("Topic should be full", drainDurableTopic() == iterationCount); 429 } 430 431 436 public void testUnackedQueue() throws Exception 437 { 438 439 getLog().debug("Starting UnackedQueue test"); 440 441 runUnackedQueue(DeliveryMode.NON_PERSISTENT); 442 runUnackedQueue(DeliveryMode.PERSISTENT); 443 444 getLog().debug("UnackedQueue passed"); 445 } 446 447 452 public void testUnackedMultipleSession() throws Exception 453 { 454 455 getLog().debug("Starting UnackedMultipleSession test"); 456 457 runUnackedMultipleSession(DeliveryMode.NON_PERSISTENT); 458 runUnackedMultipleSession(DeliveryMode.PERSISTENT); 459 460 getLog().debug("UnackedMultipleSession passed"); 461 } 462 463 468 public void testUnackedMultipleConnection() throws Exception 469 { 470 471 getLog().debug("Starting UnackedMultipleConnection test"); 472 473 runUnackedMultipleConnection(DeliveryMode.NON_PERSISTENT); 474 runUnackedMultipleConnection(DeliveryMode.PERSISTENT); 475 476 getLog().debug("UnackedMultipleConnection passed"); 477 } 478 479 484 public void testUnackedTopic() throws Exception 485 { 486 487 getLog().debug("Starting UnackedTopic test"); 488 489 runUnackedTopic(DeliveryMode.NON_PERSISTENT); 490 runUnackedTopic(DeliveryMode.PERSISTENT); 491 492 getLog().debug("UnackedTopic passed"); 493 } 494 495 500 public void testUnackedDurableTopic() throws Exception 501 { 502 503 getLog().debug("Starting UnackedDurableTopic test"); 504 505 runUnackedDurableTopic(DeliveryMode.NON_PERSISTENT); 506 runUnackedDurableTopic(DeliveryMode.PERSISTENT); 507 508 getLog().debug("UnackedDurableTopic passed"); 509 } 510 511 516 public void testDummyLast() throws Exception 517 { 518 519 TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 520 session.unsubscribe("test"); 521 522 queueConnection.close(); 523 topicConnection.close(); 524 topicDurableConnection.close(); 525 } 526 527 532 protected void setUp() throws Exception 533 { 534 if (context == null) 535 { 536 context = getInitialContext(); 537 538 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 539 queueConnection = queueFactory.createQueueConnection(); 540 541 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 542 topicConnection = topicFactory.createTopicConnection(); 543 topicDurableConnection = topicFactory.createTopicConnection("john", "needle"); 544 545 getLog().debug("Connection to JBossMQ established."); 546 } 547 } 548 549 private int drainQueue() throws Exception 551 { 552 getLog().debug("Draining Queue"); 553 queueConnection.start(); 554 555 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 556 Queue queue = (Queue )context.lookup(TEST_QUEUE); 557 558 QueueReceiver receiver = session.createReceiver(queue); 559 Message message = receiver.receive(1000); 560 int c = 0; 561 while (message != null) 562 { 563 message = receiver.receive(1000); 564 c++; 565 } 566 567 getLog().debug(" Drained " + c + " messages from the queue"); 568 569 session.close(); 570 571 queueConnection.stop(); 572 573 return c; 574 } 575 576 private int drainTopic() throws Exception 578 { 579 getLog().debug("Draining Topic"); 580 topicConnection.start(); 581 582 final TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 583 Topic topic = (Topic )context.lookup(TEST_TOPIC); 584 TopicSubscriber subscriber = session.createSubscriber(topic); 585 586 Message message = subscriber.receive(1000); 587 int c = 0; 588 while (message != null) 589 { 590 message = subscriber.receive(1000); 591 c++; 592 } 593 594 getLog().debug(" Drained " + c + " messages from the topic"); 595 596 session.close(); 597 598 topicConnection.stop(); 599 600 return c; 601 } 602 603 private int drainDurableTopic() throws Exception 605 { 606 getLog().debug("Draining Durable Topic"); 607 topicDurableConnection.start(); 608 609 final TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 610 Topic topic = (Topic )context.lookup(TEST_DURABLE_TOPIC); 611 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test"); 612 613 Message message = subscriber.receive(1000); 614 int c = 0; 615 while (message != null) 616 { 617 message = subscriber.receive(1000); 618 c++; 619 } 620 621 getLog().debug(" Drained " + c + " messages from the durable topic"); 622 623 session.close(); 624 625 topicDurableConnection.stop(); 626 627 return c; 628 } 629 630 private void waitForSynchMessage() throws Exception 631 { 632 getLog().debug("Waiting for Synch Message"); 633 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 634 Queue queue = (Queue )context.lookup(TEST_QUEUE); 635 636 QueueReceiver receiver = session.createReceiver(queue); 637 receiver.receive(); 638 session.close(); 639 getLog().debug("Got Synch Message"); 640 } 641 642 private void sendSynchMessage() throws Exception 643 { 644 getLog().debug("Sending Synch Message"); 645 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 646 Queue queue = (Queue )context.lookup(TEST_QUEUE); 647 648 QueueSender sender = session.createSender(queue); 649 650 Message message = session.createMessage(); 651 sender.send(message); 652 653 session.close(); 654 getLog().debug("Sent Synch Message"); 655 } 656 657 public class MyMessageListener 658 implements MessageListener 659 { 660 public int i = 0; 661 662 public int iterationCount; 663 664 public Logger log; 665 666 public MyMessageListener(int iterationCount, Logger log) 667 { 668 this.iterationCount = iterationCount; 669 this.log = log; 670 } 671 672 public void onMessage(Message message) 673 { 674 synchronized (this) 675 { 676 i++; 677 log.debug("Got message " + i); 678 if (i >= iterationCount) 679 this.notify(); 680 } 681 } 682 } 683 684 public int getIterationCount() 685 { 686 return 5; 687 } 688 } 689 | Popular Tags |