| 1 22 package org.jboss.test.jbossmq.test; 23 24 import java.util.Enumeration ; 25 26 import javax.jms.DeliveryMode ; 27 import javax.jms.InvalidDestinationException ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageListener ; 31 import javax.jms.Queue ; 32 import javax.jms.QueueBrowser ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.QueueReceiver ; 36 import javax.jms.QueueRequestor ; 37 import javax.jms.QueueSender ; 38 import javax.jms.QueueSession ; 39 import javax.jms.ServerSession ; 40 import javax.jms.ServerSessionPool ; 41 import javax.jms.Session ; 42 import javax.jms.TemporaryQueue ; 43 import javax.jms.TemporaryTopic ; 44 import javax.jms.TextMessage ; 45 import javax.jms.Topic ; 46 import javax.jms.TopicConnection ; 47 import javax.jms.TopicConnectionFactory ; 48 import javax.jms.TopicPublisher ; 49 import javax.jms.TopicSession ; 50 import javax.jms.TopicSubscriber ; 51 import javax.naming.Context ; 52 import javax.naming.InitialContext ; 53 54 import org.jboss.logging.Logger; 55 import org.jboss.test.JBossTestCase; 56 57 import EDU.oswego.cs.dl.util.concurrent.CountDown; 58 59 65 public class JBossMQUnitTest extends JBossTestCase 66 { 67 68 static String TOPIC_FACTORY = "ConnectionFactory"; 69 70 static String QUEUE_FACTORY = "ConnectionFactory"; 71 72 static String TEST_QUEUE = "queue/testQueue"; 73 static String TEST_TOPIC = "topic/testTopic"; 74 static String TEST_DURABLE_TOPIC = "topic/testDurableTopic"; 75 76 static Context context; 78 static QueueConnection queueConnection; 79 static TopicConnection topicConnection; 80 81 public JBossMQUnitTest(String name) throws Exception  82 { 83 super(name); 84 } 85 86 protected void drainQueue() throws Exception  88 { 89 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 90 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 91 92 QueueReceiver receiver = session.createReceiver(queue); 93 Message message = receiver.receive(50); 94 int c = 0; 95 while (message != null) 96 { 97 message = receiver.receive(50); 98 c++; 99 } 100 101 if (c != 0) 102 getLog().debug(" Drained " + c + " messages from the queue"); 103 104 session.close(); 105 } 106 107 protected void connect() throws Exception  108 { 109 110 if (context == null) 111 { 112 113 context = new InitialContext (); 114 115 } 116 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 117 queueConnection = queueFactory.createQueueConnection(); 118 119 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 120 topicConnection = topicFactory.createTopicConnection(); 121 122 getLog().debug("Connection to spyderMQ established."); 123 124 } 125 126 protected void disconnect() throws Exception  127 { 128 queueConnection.close(); 129 topicConnection.close(); 130 } 131 132 141 public void testQueueMessageOrder() throws Exception  142 { 143 144 getLog().debug("Starting QueueMessageOrder test"); 145 146 connect(); 147 148 queueConnection.start(); 149 150 drainQueue(); 151 152 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 153 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 154 QueueSender sender = session.createSender(queue); 155 156 TextMessage message = session.createTextMessage(); 157 message.setText("Normal message"); 158 sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0); 159 message.setText("Persistent message"); 161 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 162 message.setText("High Priority Persistent message"); 164 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 165 167 170 QueueBrowser browser = session.createBrowser(queue); 171 Enumeration i = browser.getEnumeration(); 172 getLog().debug(message.getText()); 176 177 message = (TextMessage ) i.nextElement(); 178 getLog().debug(message.getText()); 181 182 message = (TextMessage ) i.nextElement(); 183 getLog().debug(message.getText()); 186 187 190 disconnect(); 191 getLog().debug("QueueMessageOrder passed"); 192 } 193 194 199 public void testRequestReplyQueue() throws Exception  200 { 201 202 getLog().debug("Starting RequestReplyQueue test"); 203 connect(); 204 205 { 206 queueConnection.start(); 207 drainQueue(); 208 } 209 210 Thread serverThread = new Thread () 211 { 212 public void run() 213 { 214 Logger log = Logger.getLogger(getClass().getName()); 215 try 216 { 217 log.debug("Server Thread Started"); 218 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 219 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 220 221 QueueReceiver queueReceiver = session.createReceiver(queue); 222 223 boolean done = false; 224 while (!done) 225 { 226 TextMessage message = (TextMessage ) queueReceiver.receive(); 227 Queue tempQueue = (Queue ) message.getJMSReplyTo(); 228 229 QueueSender replySender = session.createSender(tempQueue); 230 TextMessage reply = session.createTextMessage(); 231 reply.setText("Request Processed"); 232 reply.setJMSCorrelationID(message.getJMSMessageID()); 233 replySender.send(reply); 234 235 if (message.getText().equals("Quit")) 236 done = true; 237 } 238 239 session.close(); 240 log.debug("Server Thread Finished"); 241 242 } 243 catch (Exception e) 244 { 245 log.error("Error", e); 246 } 247 } 248 }; 249 250 serverThread.start(); 251 252 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 253 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 254 255 QueueRequestor queueRequestor = new QueueRequestor (session, queue); 256 TextMessage message = session.createTextMessage(); 257 message.setText("Request Test"); 258 259 for (int i = 0; i < 5; i++) 260 { 261 262 getLog().debug("Making client request #" + i); 263 TextMessage reply = (TextMessage ) queueRequestor.request(message); 264 String replyID = new String (reply.getJMSCorrelationID()); 265 if (!replyID.equals(message.getJMSMessageID())) 266 throw new Exception ("REQUEST: ERROR: Reply does not match sent message"); 267 268 } 269 270 getLog().debug("Making client request to shut server down."); 271 message.setText("Quit"); 272 queueRequestor.request(message); 273 274 serverThread.join(); 275 disconnect(); 276 277 getLog().debug("RequestReplyQueue passed"); 278 } 279 280 283 public void testTemporaryQueueDelete() throws Exception  284 { 285 286 getLog().debug("Starting TemporaryQueueDelete test"); 287 connect(); 288 289 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 290 TemporaryQueue queue = session.createTemporaryQueue(); 291 292 queue.delete(); 293 294 disconnect(); 295 296 getLog().debug("TemporaryQueueDelete passed"); 297 } 298 299 302 public void testTemporaryTopicDelete() throws Exception  303 { 304 305 getLog().debug("Starting TemporaryTopicDelete test"); 306 connect(); 307 308 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 309 TemporaryTopic topic = session.createTemporaryTopic(); 310 311 topic.delete(); 312 313 disconnect(); 314 315 getLog().debug("TemporaryTopicDelete passed"); 316 } 317 318 321 public void testInvalidDestinationQueueSend() throws Exception  322 { 323 324 getLog().debug("Starting InvaidDestinationQueueSend test"); 325 connect(); 326 327 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 328 TemporaryQueue queue = session.createTemporaryQueue(); 329 QueueSender sender = session.createSender(queue); 330 queue.delete(); 331 332 TextMessage message = session.createTextMessage("hello"); 333 boolean caught = false; 334 try 335 { 336 sender.send(message); 337 } 338 catch (InvalidDestinationException expected) 339 { 340 caught = true; 341 } 342 343 disconnect(); 344 345 assertTrue("Expected an InvalidDestinationException", caught); 346 347 getLog().debug("InvaldDestinationQueueSend passed"); 348 } 349 350 353 public void testInvalidDestinationQueueBrowse() throws Exception  354 { 355 356 getLog().debug("Starting InvalidDestinationQueueBrowse test"); 357 connect(); 358 359 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 360 TemporaryQueue queue = session.createTemporaryQueue(); 361 QueueBrowser browser = session.createBrowser(queue); 362 queue.delete(); 363 364 boolean caught = false; 365 try 366 { 367 browser.getEnumeration(); 368 } 369 catch (InvalidDestinationException expected) 370 { 371 caught = true; 372 } 373 374 disconnect(); 375 376 assertTrue("Expected an InvalidDestinationException", caught); 377 378 getLog().debug("InvalidDestinationQueueBrowse passed"); 379 } 380 381 384 public void testInvalidDestinationTopicPublish() throws Exception  385 { 386 387 getLog().debug("Starting InvaidDestinationTopicPublish test"); 388 connect(); 389 390 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 391 TemporaryTopic topic = session.createTemporaryTopic(); 392 TopicPublisher publisher = session.createPublisher(topic); 393 topic.delete(); 394 395 TextMessage message = session.createTextMessage("hello"); 396 boolean caught = false; 397 try 398 { 399 publisher.publish(message); 400 } 401 catch (InvalidDestinationException expected) 402 { 403 caught = true; 404 } 405 406 disconnect(); 407 408 assertTrue("Expected an InvalidDestinationException", caught); 409 410 getLog().debug("InvaldDestinationTopicPublish passed"); 411 } 412 413 416 public void testErrorsTopicSubscribe() throws Exception  417 { 418 419 getLog().debug("Starting InvalidDestinationTopicSubscribe test"); 420 connect(); 421 422 try 423 { 424 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 425 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 426 TemporaryTopic temp = session.createTemporaryTopic(); 427 428 boolean caught = false; 429 try 430 { 431 session.createSubscriber(null); 432 } 433 catch (InvalidDestinationException expected) 434 { 435 caught = true; 436 } 437 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 438 439 caught = false; 440 try 441 { 442 session.createSubscriber(null, null, true); 443 } 444 catch (InvalidDestinationException expected) 445 { 446 caught = true; 447 } 448 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 449 450 caught = false; 451 try 452 { 453 session.createDurableSubscriber(null, "NotUsed"); 454 } 455 catch (InvalidDestinationException expected) 456 { 457 caught = true; 458 } 459 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 460 461 caught = false; 462 try 463 { 464 session.createDurableSubscriber(temp, "NotUsed"); 465 } 466 catch (InvalidDestinationException expected) 467 { 468 caught = true; 469 } 470 assertTrue("Expected an InvalidDestinationException for a temporary topic", caught); 471 472 caught = false; 473 try 474 { 475 session.createDurableSubscriber(null, "NotUsed", null, true); 476 } 477 catch (InvalidDestinationException expected) 478 { 479 caught = true; 480 } 481 assertTrue("Expected an InvalidDestinationException for a null topic", caught); 482 483 caught = false; 484 try 485 { 486 session.createDurableSubscriber(temp, "NotUsed", null, true); 487 } 488 catch (InvalidDestinationException expected) 489 { 490 caught = true; 491 } 492 assertTrue("Expected an InvalidDestinationException for a temporary topic", caught); 493 494 caught = false; 495 try 496 { 497 session.createDurableSubscriber(topic, null); 498 } 499 catch (Exception expected) 500 { 501 caught = true; 502 } 503 assertTrue("Expected a Exception for a null subscription", caught); 504 505 caught = false; 506 try 507 { 508 session.createDurableSubscriber(topic, null, null, false); 509 } 510 catch (Exception expected) 511 { 512 caught = true; 513 } 514 assertTrue("Expected a Exception for a null subscription", caught); 515 516 caught = false; 517 try 518 { 519 session.createDurableSubscriber(topic, " "); 520 } 521 catch (Exception expected) 522 { 523 caught = true; 524 } 525 assertTrue("Expected a Exception for an empty subscription", caught); 526 527 caught = false; 528 try 529 { 530 session.createDurableSubscriber(topic, " ", null, false); 531 } 532 catch (Exception expected) 533 { 534 caught = true; 535 } 536 assertTrue("Expected a Exception for an empty subscription", caught); 537 } 538 finally 539 { 540 disconnect(); 541 } 542 543 getLog().debug("InvalidDestinationTopicSubscriber passed"); 544 } 545 546 549 public void testCreateQueue() throws Exception  550 { 551 552 getLog().debug("Starting create queue test"); 553 connect(); 554 555 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 556 557 Queue jndiQueue = (Queue ) getInitialContext().lookup("queue/testQueue"); 558 Queue createQueue = session.createQueue(jndiQueue.getQueueName()); 559 assertTrue("Failed for " + QUEUE_FACTORY, jndiQueue.equals(createQueue)); 560 561 getLog().debug("InvalidDestinationTopicSubscriber passed"); 562 } 563 564 public void testMessageListener() throws Exception  565 { 566 getLog().debug("Starting create queue test"); 567 568 connect(); 569 queueConnection.start(); 570 drainQueue(); 571 final CountDown counter1 = new CountDown(3); 572 573 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 574 Queue queue = (Queue ) context.lookup(TEST_QUEUE); 575 576 QueueReceiver receiver = session.createReceiver(queue); 577 receiver.setMessageListener(new MessageListener () 578 { 579 public void onMessage(Message msg) 580 { 581 Logger log = Logger.getLogger(getClass().getName()); 582 log.debug("ML"); 583 try 584 { 585 if (msg instanceof TextMessage ) 586 { 587 log.debug(((TextMessage ) msg).getText()); 588 counter1.release(); 589 } 590 } 591 catch (Exception e) 592 { 593 } 594 } 595 }); 596 597 QueueSender sender = session.createSender(queue); 598 599 TextMessage message = session.createTextMessage(); 600 message.setText("Normal message"); 601 sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0); 602 message.setText("Persistent message"); 604 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 605 message.setText("High Priority Persistent message"); 607 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 608 610 counter1.acquire(); 612 log.debug("MessageListener1 received the TMs sent"); 613 614 final CountDown counter2 = new CountDown(2); 615 receiver.setMessageListener(new MessageListener () 616 { 617 public void onMessage(Message msg) 618 { 619 Logger log = Logger.getLogger(getClass().getName()); 620 log.debug("ML 2"); 621 try 622 { 623 if (msg instanceof TextMessage ) 624 { 625 log.debug(((TextMessage ) msg).getText()); 626 counter2.release(); 627 } 628 } 629 catch (Exception e) 630 { 631 } 632 } 633 }); 634 635 message.setText("Persistent message"); 636 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 637 message.setText("High Priority Persistent message"); 639 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 640 642 counter2.acquire(); 644 log.debug("MessageListener2 received the TMs sent"); 645 646 receiver.setMessageListener(null); 647 648 message.setText("Persistent message"); 649 sender.send(message, DeliveryMode.PERSISTENT, 4, 0); 650 message.setText("High Priority Persistent message"); 652 sender.send(message, DeliveryMode.PERSISTENT, 10, 0); 653 655 sender.close(); 656 drainQueue(); 657 disconnect(); 658 getLog().debug("MessageListener test passed"); 659 } 660 661 public void testApplicationServerStuff() throws Exception  662 { 663 getLog().debug("Starting testing app server stuff"); 664 connect(); 665 666 Queue testQueue = (Queue ) context.lookup(TEST_QUEUE); 667 final QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 668 669 session.setMessageListener(new MessageListener () 670 { 671 public void onMessage(Message mess) 672 { 673 Logger log = Logger.getLogger(getClass().getName()); 674 log.debug("Processing message"); 675 try 676 { 677 if (mess instanceof TextMessage ) 678 log.debug(((TextMessage ) mess).getText()); 679 } 680 catch (Exception e) 681 { 682 log.error("Error", e); 683 } 684 } 685 }); 686 687 QueueSender sender = session.createSender(testQueue); 688 sender.send(session.createTextMessage("Hi")); 689 sender.send(session.createTextMessage("There")); 690 sender.send(session.createTextMessage("Guys")); 691 queueConnection.createConnectionConsumer(testQueue, null, new ServerSessionPool () 692 { 693 public ServerSession getServerSession() 694 { 695 Logger.getLogger(getClass().getName()).debug("Getting server session."); 696 return new ServerSession () 697 { 698 public Session getSession() 699 { 700 return session; 701 } 702 public void start() 703 { 704 Logger.getLogger(getClass().getName()).debug("Starting server session."); 705 session.run(); 706 } 707 }; 708 } 709 }, 10); 710 711 queueConnection.start(); 712 713 try 714 { 715 Thread.sleep(5 * 1000); 716 } 717 catch (Exception e) 718 { 719 } 720 721 disconnect(); 722 getLog().debug("Testing app server stuff passed"); 723 } 724 725 743 private void drainMessagesForTopic(TopicSubscriber sub) throws JMSException  744 { 745 Message msg = sub.receive(50); 746 int c = 0; 747 while (msg != null) 748 { 749 c++; 750 if (msg instanceof TextMessage ) 751 getLog().debug(((TextMessage ) msg).getText()); 752 msg = sub.receive(50); 753 } 754 getLog().debug("Received " + c + " messages from topic."); 755 } 756 757 public void testTopics() throws Exception  758 { 759 getLog().debug("Starting Topic test"); 760 connect(); 761 762 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 763 topicConnection = topicFactory.createTopicConnection("john", "needle"); 764 765 topicConnection.start(); 766 767 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 769 Topic topic = (Topic ) context.lookup(TEST_TOPIC); 770 771 TopicSubscriber sub1 = session.createDurableSubscriber(topic, "sub1"); 772 TopicSubscriber sub2 = session.createSubscriber(topic); 773 TopicSubscriber sub3 = session.createSubscriber(topic); 774 775 TopicPublisher sender = session.createPublisher(topic); 777 778 sender.publish(session.createTextMessage("Message 1")); 780 sender.publish(session.createTextMessage("Message 2")); 781 sender.publish(session.createTextMessage("Message 3")); 782 drainMessagesForTopic(sub1); 783 drainMessagesForTopic(sub2); 784 drainMessagesForTopic(sub3); 785 786 sub1.close(); 788 sub2.close(); 789 790 sender.publish(session.createTextMessage("Message 4")); 792 sender.publish(session.createTextMessage("Message 5")); 793 sender.publish(session.createTextMessage("Message 6")); 794 795 try 797 { 798 Thread.sleep(5 * 1000); 799 } 800 catch (InterruptedException e) 801 { 802 } 803 804 drainMessagesForTopic(sub3); 805 806 sub1 = session.createDurableSubscriber(topic, "sub1"); 808 sub2 = session.createSubscriber(topic); 809 810 sender.publish(session.createTextMessage("Final message")); 812 sender.close(); 813 814 drainMessagesForTopic(sub1); 815 drainMessagesForTopic(sub2); 816 drainMessagesForTopic(sub3); 817 818
|