1 22 package org.jboss.test.jbossmessaging.test; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.DeliveryMode ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.Queue ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 import javax.naming.Context ; 42 43 import junit.framework.Test; 44 import junit.framework.TestSuite; 45 46 import org.jboss.logging.Logger; 47 import org.jboss.test.jbossmessaging.JMSTestCase; 48 49 56 public class UnackedUnitTestCase extends JMSTestCase 57 { 58 static String TOPIC_FACTORY = "ConnectionFactory"; 60 static String QUEUE_FACTORY = "ConnectionFactory"; 61 62 static String TEST_QUEUE = "queue/testQueue"; 63 static String TEST_TOPIC = "topic/testTopic"; 64 static String TEST_DURABLE_TOPIC = "topic/testDurableTopic"; 65 66 static byte[] PAYLOAD = new byte[10]; 67 68 static Context context; 69 static QueueConnection queueConnection; 70 static TopicConnection topicConnection; 71 static TopicConnection topicDurableConnection; 72 73 public static Test suite() throws Exception 74 { 75 TestSuite suite = new TestSuite(); 78 suite.addTest(new UnackedUnitTestCase("testUnackedQueue")); 79 suite.addTest(new UnackedUnitTestCase("testUnackedMultipleSession")); 80 suite.addTest(new UnackedUnitTestCase("testUnackedMultipleConnection")); 81 suite.addTest(new UnackedUnitTestCase("testUnackedTopic")); 82 suite.addTest(new UnackedUnitTestCase("testUnackedDurableTopic")); 83 suite.addTest(new UnackedUnitTestCase("testDummyLast")); 84 85 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 86 String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ; 87 String module = loader.getResource(resourceName).toString(); 88 89 return getDeploySetup(suite, module); 90 } 91 92 98 public UnackedUnitTestCase(String name) throws Exception 99 { 100 super(name); 101 } 102 103 109 public void runUnackedQueue(final int persistence) throws Exception 110 { 111 drainQueue(); 112 113 final int iterationCount = getIterationCount(); 114 115 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 116 Queue queue = (Queue )context.lookup(TEST_QUEUE); 117 118 QueueSender sender = session.createSender(queue); 119 120 Message message = session.createBytesMessage(); 121 ((BytesMessage )message).writeBytes(PAYLOAD); 122 123 for (int i = 0; i < iterationCount; i++) 124 sender.send(message, persistence, 4, 0); 125 126 session.close(); 127 128 session = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 129 queue = (Queue )context.lookup(TEST_QUEUE); 130 QueueReceiver receiver = session.createReceiver(queue); 131 queueConnection.start(); 132 message = receiver.receive(50); 133 int c = 0; 134 while (message != null) 135 { 136 message = receiver.receive(50); 137 c++; 138 } 139 assertTrue("Should have received all data unacked", c == iterationCount); 140 141 queueConnection.close(); 142 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 143 queueConnection = queueFactory.createQueueConnection(); 144 145 assertTrue("Queue should be full", drainQueue() == iterationCount); 146 147 } 148 149 155 public void runUnackedMultipleSession(final int persistence) throws Exception 156 { 157 drainQueue(); 158 159 final int iterationCount = getIterationCount(); 160 161 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 162 Queue queue = (Queue )context.lookup(TEST_QUEUE); 163 164 QueueSender sender = session.createSender(queue); 165 166 Message message = session.createBytesMessage(); 167 ((BytesMessage )message).writeBytes(PAYLOAD); 168 169 for (int i = 0; i < iterationCount; i++) 170 sender.send(message, persistence, 4, 0); 171 172 session.close(); 173 174 QueueSession session1 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 175 queue = (Queue )context.lookup(TEST_QUEUE); 176 QueueReceiver receiver1 = session1.createReceiver(queue); 177 QueueSession session2 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 178 QueueReceiver receiver2 = session2.createReceiver(queue); 179 queueConnection.start(); 180 181 int c = 0; 183 for (int l = 0; l < iterationCount/2; l++) 184 { 185 message = receiver1.receive(50); 186 if (message != null) 187 c++; 188 } 189 assertTrue("Should have received half data unacked", c == iterationCount/2); 190 191 c = 0; 193 Message lastMessage = null; 194 while (message != null) 195 { 196 message = receiver2.receive(50); 197 if (message != null) 198 { 199 c++; 200 lastMessage = message; 201 } 202 } 203 assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2); 204 205 session1.close(); 207 208 lastMessage.acknowledge(); 210 session2.close(); 211 212 queueConnection.stop(); 213 214 assertTrue("Session1 messages should be available", drainQueue() == iterationCount/2); 215 216 } 217 218 224 public void runUnackedMultipleConnection(final int persistence) throws Exception 225 { 226 drainQueue(); 227 228 final int iterationCount = getIterationCount(); 229 230 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 231 Queue queue = (Queue )context.lookup(TEST_QUEUE); 232 233 QueueSender sender = session.createSender(queue); 234 235 Message message = session.createBytesMessage(); 236 ((BytesMessage )message).writeBytes(PAYLOAD); 237 238 for (int i = 0; i < iterationCount; i++) 239 sender.send(message, persistence, 4, 0); 240 241 session.close(); 242 243 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 244 QueueConnection queueConnection1 = queueFactory.createQueueConnection(); 245 QueueSession session1 = queueConnection1.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 246 queue = (Queue )context.lookup(TEST_QUEUE); 247 QueueReceiver receiver1 = session1.createReceiver(queue); 248 249 QueueConnection queueConnection2 = queueFactory.createQueueConnection(); 250 QueueSession session2 = queueConnection2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 251 QueueReceiver receiver2 = session2.createReceiver(queue); 252 253 queueConnection1.start(); 254 queueConnection2.start(); 255 256 int c = 0; 258 for (int l = 0; l < iterationCount/2; l++) 259 { 260 message = receiver1.receive(50); 261 if (message != null) 262 c++; 263 } 264 assertTrue("Should have received half data unacked", c == iterationCount/2); 265 266 Message lastMessage = null; 268 c = 0; 269 while (message != null) 270 { 271 message = receiver2.receive(50); 272 if (message != null) 273 { 274 c++; 275 lastMessage = message; 276 } 277 } 278 assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2); 279 280 queueConnection1.close(); 282 283 lastMessage.acknowledge(); 285 queueConnection2.close(); 286 287 assertTrue("Connection1 messages should be available", drainQueue() == iterationCount/2); 288 289 } 290 291 297 public void runUnackedTopic(final int persistence) throws Exception 298 { 299 drainQueue(); 300 drainTopic(); 301 302 final int iterationCount = getIterationCount(); 303 final Logger log = getLog(); 304 305 Thread sendThread = 306 new Thread () 307 { 308 public void run() 309 { 310 try 311 { 312 313 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 314 Topic topic = (Topic )context.lookup(TEST_TOPIC); 315 316 TopicPublisher publisher = session.createPublisher(topic); 317 318 waitForSynchMessage(); 319 320 BytesMessage message = session.createBytesMessage(); 321 message.writeBytes(PAYLOAD); 322 323 for (int i = 0; i < iterationCount; i++) 324 { 325 publisher.publish(message, persistence, 4, 0); 326 } 327 328 session.close(); 329 } 330 catch (Exception e) 331 { 332 log.error("error", e); 333 } 334 } 335 }; 336 337 TopicSession session = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 338 Topic topic = (Topic )context.lookup(TEST_TOPIC); 339 TopicSubscriber subscriber = session.createSubscriber(topic); 340 341 342 MyMessageListener listener = new MyMessageListener(iterationCount, log); 343 344 queueConnection.start(); 345 sendThread.start(); 346 subscriber.setMessageListener(listener); 347 topicConnection.start(); 348 sendSynchMessage(); 349 synchronized (listener) 350 { 351 if (listener.i < iterationCount) 352 listener.wait(); 353 } 354 sendThread.join(); 355 topicConnection.close(); 356 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 357 topicConnection = topicFactory.createTopicConnection(); 358 queueConnection.stop(); 359 assertTrue("Topic should be empty", drainTopic() == 0); 360 } 361 362 368 public void runUnackedDurableTopic(final int persistence) throws Exception 369 { 370 drainQueue(); 371 drainDurableTopic(); 372 373 final int iterationCount = getIterationCount(); 374 final Logger log = getLog(); 375 376 Thread sendThread = 377 new Thread () 378 { 379 public void run() 380 { 381 try 382 { 383 384 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 385 Topic topic = (Topic )context.lookup(TEST_DURABLE_TOPIC); 386 387 TopicPublisher publisher = session.createPublisher(topic); 388 389 waitForSynchMessage(); 390 391 BytesMessage message = session.createBytesMessage(); 392 message.writeBytes(PAYLOAD); 393 394 for (int i = 0; i < iterationCount; i++) 395 { 396 publisher.publish(message, persistence, 4, 0); 397 } 398 399 session.close(); 400 } 401 catch (Exception e) 402 { 403 log.error("error", e); 404 } 405 } 406 }; 407 408 TopicSession session = topicDurableConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 409 Topic topic = (Topic )context.lookup(TEST_DURABLE_TOPIC); 410 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test"); 411 412 MyMessageListener listener = new MyMessageListener(iterationCount, log); 413 414 queueConnection.start(); 415 sendThread.start(); 416 subscriber.setMessageListener(listener); 417 topicDurableConnection.start(); 418 sendSynchMessage(); 419 synchronized (listener) 420 { 421 if (listener.i < iterationCount) 422 listener.wait(); 423 } 424 425 sendThread.join(); 426 topicDurableConnection.close(); 427 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 428 topicDurableConnection = topicFactory.createTopicConnection("john", "needle"); 429 queueConnection.stop(); 430 assertTrue("Topic should be full", drainDurableTopic() == iterationCount); 431 } 432 433 438 public void testUnackedQueue() throws Exception 439 { 440 441 getLog().debug("Starting UnackedQueue test"); 442 443 runUnackedQueue(DeliveryMode.NON_PERSISTENT); 444 runUnackedQueue(DeliveryMode.PERSISTENT); 445 446 getLog().debug("UnackedQueue passed"); 447 } 448 449 454 public void testUnackedMultipleSession() throws Exception 455 { 456 457 getLog().debug("Starting UnackedMultipleSession test"); 458 459 runUnackedMultipleSession(DeliveryMode.NON_PERSISTENT); 460 runUnackedMultipleSession(DeliveryMode.PERSISTENT); 461 462 getLog().debug("UnackedMultipleSession passed"); 463 } 464 465 470 public void testUnackedMultipleConnection() throws Exception 471 { 472 473 getLog().debug("Starting UnackedMultipleConnection test"); 474 475 runUnackedMultipleConnection(DeliveryMode.NON_PERSISTENT); 476 runUnackedMultipleConnection(DeliveryMode.PERSISTENT); 477 478 getLog().debug("UnackedMultipleConnection passed"); 479 } 480 481 486 public void testUnackedTopic() throws Exception 487 { 488 489 getLog().debug("Starting UnackedTopic test"); 490 491 runUnackedTopic(DeliveryMode.NON_PERSISTENT); 492 runUnackedTopic(DeliveryMode.PERSISTENT); 493 494 getLog().debug("UnackedTopic passed"); 495 } 496 497 502 public void testUnackedDurableTopic() throws Exception 503 { 504 505 getLog().debug("Starting UnackedDurableTopic test"); 506 507 runUnackedDurableTopic(DeliveryMode.NON_PERSISTENT); 508 runUnackedDurableTopic(DeliveryMode.PERSISTENT); 509 510 getLog().debug("UnackedDurableTopic passed"); 511 } 512 513 518 public void testDummyLast() throws Exception 519 { 520 521 TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 522 session.unsubscribe("test"); 523 524 queueConnection.close(); 525 topicConnection.close(); 526 topicDurableConnection.close(); 527 } 528 529 534 protected void setUp() throws Exception 535 { 536 super.setUp() ; 538 539 if (context == null) 540 { 541 context = getInitialContext(); 542 543 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 544 queueConnection = queueFactory.createQueueConnection(); 545 546 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 547 topicConnection = topicFactory.createTopicConnection(); 548 topicDurableConnection = topicFactory.createTopicConnection("john", "needle"); 549 550 getLog().debug("Connection to JBossMQ established."); 551 } 552 } 553 554 private int drainQueue() throws Exception 556 { 557 getLog().debug("Draining Queue"); 558 queueConnection.start(); 559 560 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 561 Queue queue = (Queue )context.lookup(TEST_QUEUE); 562 563 QueueReceiver receiver = session.createReceiver(queue); 564 Message message = receiver.receive(1000); 565 int c = 0; 566 while (message != null) 567 { 568 message = receiver.receive(1000); 569 c++; 570 } 571 572 getLog().debug(" Drained " + c + " messages from the queue"); 573 574 session.close(); 575 576 queueConnection.stop(); 577 578 return c; 579 } 580 581 private int drainTopic() throws Exception 583 { 584 getLog().debug("Draining Topic"); 585 topicConnection.start(); 586 587 final TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 588 Topic topic = (Topic )context.lookup(TEST_TOPIC); 589 TopicSubscriber subscriber = session.createSubscriber(topic); 590 591 Message message = subscriber.receive(1000); 592 int c = 0; 593 while (message != null) 594 { 595 message = subscriber.receive(1000); 596 c++; 597 } 598 599 getLog().debug(" Drained " + c + " messages from the topic"); 600 601 session.close(); 602 603 topicConnection.stop(); 604 605 return c; 606 } 607 608 private int drainDurableTopic() throws Exception 610 { 611 getLog().debug("Draining Durable Topic"); 612 topicDurableConnection.start(); 613 614 final TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 615 Topic topic = (Topic )context.lookup(TEST_DURABLE_TOPIC); 616 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test"); 617 618 Message message = subscriber.receive(1000); 619 int c = 0; 620 while (message != null) 621 { 622 message = subscriber.receive(1000); 623 c++; 624 } 625 626 getLog().debug(" Drained " + c + " messages from the durable topic"); 627 628 session.close(); 629 630 topicDurableConnection.stop(); 631 632 return c; 633 } 634 635 private void waitForSynchMessage() throws Exception 636 { 637 getLog().debug("Waiting for Synch Message"); 638 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 639 Queue queue = (Queue )context.lookup(TEST_QUEUE); 640 641 QueueReceiver receiver = session.createReceiver(queue); 642 receiver.receive(); 643 session.close(); 644 getLog().debug("Got Synch Message"); 645 } 646 647 private void sendSynchMessage() throws Exception 648 { 649 getLog().debug("Sending Synch Message"); 650 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 651 Queue queue = (Queue )context.lookup(TEST_QUEUE); 652 653 QueueSender sender = session.createSender(queue); 654 655 Message message = session.createMessage(); 656 sender.send(message); 657 658 session.close(); 659 getLog().debug("Sent Synch Message"); 660 } 661 662 public class MyMessageListener 663 implements MessageListener 664 { 665 public int i = 0; 666 667 public int iterationCount; 668 669 public Logger log; 670 671 public MyMessageListener(int iterationCount, Logger log) 672 { 673 this.iterationCount = iterationCount; 674 this.log = log; 675 } 676 677 public void onMessage(Message message) 678 { 679 synchronized (this) 680 { 681 i++; 682 log.debug("Got message " + i); 683 if (i >= iterationCount) 684 this.notify(); 685 } 686 } 687 } 688 689 public int getIterationCount() 690 { 691 return 5; 692 } 693 } 694 | Popular Tags |