1 22 package org.jboss.test.jbossmessaging.perf; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.DeliveryMode ; 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageListener ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 import javax.jms.Queue ; 42 import javax.naming.Context ; 43 44 import org.jboss.logging.Logger; 45 import org.jboss.test.jbossmessaging.JMSTestCase; 46 53 54 public class JMSPerfStressTestCase extends JMSTestCase 55 { 56 57 static String TOPIC_FACTORY = "ConnectionFactory"; 59 static String QUEUE_FACTORY = "ConnectionFactory"; 60 61 static String TEST_QUEUE = "queue/testQueue"; 62 static String TEST_TOPIC = "topic/testTopic"; 63 64 static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024]; 66 67 static int TRANS_NONE = 0; 68 static int TRANS_INDIVIDUAL = 1; 69 static int TRANS_TOTAL = 2; 70 static String [] TRANS_DESC = {"NOT", "individually", "totally"}; 71 72 static Context context; 74 static QueueConnection queueConnection; 75 static TopicConnection topicConnection; 76 77 83 public JMSPerfStressTestCase(String name) throws Exception 84 { 85 super(name); 86 } 87 88 89 96 public void runAsynchQueuePerformance(final int transacted, final int persistence) throws Exception 97 { 98 { 99 queueConnection.start(); 100 drainQueue(); 101 queueConnection.stop(); 102 } 103 final int iterationCount = getIterationCount(); 104 final Logger log = getLog(); 105 106 Thread sendThread = 107 new Thread () 108 { 109 112 public void run() 113 { 114 try 115 { 116 QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 117 Queue queue = (Queue )context.lookup(TEST_QUEUE); 118 119 QueueSender sender = session.createSender(queue); 120 121 BytesMessage message = session.createBytesMessage(); 122 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 123 124 long startTime = System.currentTimeMillis(); 125 for (int i = 0; i < iterationCount; i++) 126 { 127 sender.send(message, persistence, 4, 0); 129 if (transacted == TRANS_INDIVIDUAL) 131 { 132 session.commit(); 133 } 134 } 135 136 if (transacted == TRANS_TOTAL) 137 { 138 session.commit(); 139 } 140 141 long endTime = System.currentTimeMillis(); 142 143 session.close(); 144 145 long pTime = endTime - startTime; 146 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 147 } 148 catch (Exception e) 149 { 150 log.error("error", e); 151 } 152 } 153 }; 154 155 final QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 156 Queue queue = (Queue )context.lookup(TEST_QUEUE); 157 QueueReceiver receiver = session.createReceiver(queue); 158 159 MessageListener listener = 160 new MessageListener () 161 { 162 long startTime = System.currentTimeMillis(); 163 int i = 0; 164 165 170 public void onMessage(Message message) 171 { 172 try 173 { 174 if( transacted == TRANS_INDIVIDUAL ) 175 session.commit(); 176 i++; 177 } 178 catch (JMSException e) 179 { 180 getLog().error("Unable to commit", e); 181 synchronized (this) 182 { 183 this.notify(); 184 } 185 } 186 if (i >= iterationCount) 187 { 188 long endTime = System.currentTimeMillis(); 189 long pTime = endTime - startTime; 190 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 191 192 synchronized (this) 193 { 194 this.notify(); 195 } 196 } 197 } 198 }; 199 200 getLog().debug(" Asynch Queue: This test will send " + getIterationCount() + " " 201 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 202 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 203 + " Session is " + TRANS_DESC[transacted] + " transacted"); 204 long startTime = System.currentTimeMillis(); 205 sendThread.start(); 206 receiver.setMessageListener(listener); 207 synchronized (listener) 208 { 209 queueConnection.start(); 210 listener.wait(); 211 } 212 213 if (transacted == TRANS_TOTAL) 214 { 215 session.commit(); 216 } 217 218 session.close(); 219 sendThread.join(); 220 long endTime = System.currentTimeMillis(); 221 long pTime = endTime - startTime; 222 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 223 224 } 225 226 233 public void runAsynchTopicPerformance(final int transacted, final int persistence) throws Exception 234 { 235 { 236 queueConnection.start(); 237 drainQueue(); 238 } 239 240 final int iterationCount = getIterationCount(); 241 final Logger log = getLog(); 242 243 Thread sendThread = 244 new Thread () 245 { 246 249 public void run() 250 { 251 try 252 { 253 254 TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 255 Topic topic = (Topic )context.lookup(TEST_TOPIC); 256 257 TopicPublisher publisher = session.createPublisher(topic); 258 259 waitForSynchMessage(); 260 261 BytesMessage message = session.createBytesMessage(); 262 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 263 264 long startTime = System.currentTimeMillis(); 265 for (int i = 0; i < iterationCount; i++) 266 { 267 publisher.publish(message, persistence, 4, 0); 268 if (transacted == TRANS_INDIVIDUAL) 271 { 272 session.commit(); 273 } 274 } 275 276 if (transacted == TRANS_TOTAL) 277 { 278 session.commit(); 279 } 280 281 long endTime = System.currentTimeMillis(); 282 session.close(); 283 284 long pTime = endTime - startTime; 285 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 286 } 287 catch (Exception e) 288 { 289 log.error("error", e); 290 } 291 } 292 }; 293 294 final TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 295 Topic topic = (Topic )context.lookup(TEST_TOPIC); 296 TopicSubscriber subscriber = session.createSubscriber(topic); 297 298 MessageListener listener = 299 new MessageListener () 300 { 301 long startTime = System.currentTimeMillis(); 302 int i = 0; 303 304 309 public void onMessage(Message message) 310 { 311 try 312 { 313 if( transacted == TRANS_INDIVIDUAL ) 314 session.commit(); 315 i++; 316 } 317 catch (JMSException e) 318 { 319 getLog().error("Unable to commit", e); 320 synchronized (this) 321 { 322 this.notify(); 323 } 324 } 325 if (i >= iterationCount) 326 { 327 long endTime = System.currentTimeMillis(); 328 long pTime = endTime - startTime; 329 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 330 331 synchronized (this) 332 { 333 this.notify(); 334 } 335 } 336 } 337 }; 338 339 getLog().debug(" Asynch Topic: This test will send " + getIterationCount() + " " 340 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 341 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 342 + " Session is " + TRANS_DESC[transacted] + " transacted"); 343 long startTime = System.currentTimeMillis(); 344 sendThread.start(); 345 subscriber.setMessageListener(listener); 346 sendSynchMessage(); 347 synchronized (listener) 348 { 349 topicConnection.start(); 350 listener.wait(); 351 } 352 353 if (transacted == TRANS_TOTAL) 354 { 355 session.commit(); 356 } 357 358 session.close(); 359 sendThread.join(); 360 long endTime = System.currentTimeMillis(); 361 long pTime = endTime - startTime; 362 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 363 364 } 365 366 373 public void runSynchQueuePerformance(final int transacted, final int persistence) throws Exception 374 { 375 { 376 queueConnection.start(); 377 drainQueue(); 378 } 379 final int iterationCount = getIterationCount(); 380 final Logger log = getLog(); 381 382 Thread sendThread = 383 new Thread () 384 { 385 388 public void run() 389 { 390 try 391 { 392 QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 393 Queue queue = (Queue )context.lookup(TEST_QUEUE); 394 395 QueueSender sender = session.createSender(queue); 396 397 BytesMessage message = session.createBytesMessage(); 398 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 399 400 long startTime = System.currentTimeMillis(); 401 for (int i = 0; i < iterationCount; i++) 402 { 403 sender.send( message, persistence, 4, 0); 404 if (transacted == TRANS_INDIVIDUAL) 407 { 408 session.commit(); 409 } 410 } 411 412 if (transacted == TRANS_TOTAL) 413 { 414 session.commit(); 415 } 416 417 session.close(); 418 419 long endTime = System.currentTimeMillis(); 420 421 long pTime = endTime - startTime; 422 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 423 } 424 catch (Exception e) 425 { 426 log.error("error", e); 427 } 428 } 429 }; 430 431 Thread recvThread = 432 new Thread () 433 { 434 437 public void run() 438 { 439 try 440 { 441 442 QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 443 Queue queue = (Queue )context.lookup(TEST_QUEUE); 444 445 QueueReceiver receiver = session.createReceiver(queue); 446 long startTime = System.currentTimeMillis(); 447 for (int i = 0; i < iterationCount; i++) 448 { 449 receiver.receive(); 450 if (transacted == TRANS_INDIVIDUAL) 452 { 453 session.commit(); 454 } 455 } 456 457 if (transacted == TRANS_TOTAL) 458 { 459 session.commit(); 460 } 461 462 long endTime = System.currentTimeMillis(); 463 464 session.close(); 465 466 long pTime = endTime - startTime; 467 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 468 469 } 470 catch (Exception e) 471 { 472 log.error("error", e); 473 } 474 } 475 }; 476 477 getLog().debug(" Synch Queue: This test will send " + getIterationCount() + " " 478 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 479 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 480 + " Session is " + TRANS_DESC[transacted] + " transacted"); 481 long startTime = System.currentTimeMillis(); 482 sendThread.start(); 483 recvThread.start(); 484 sendThread.join(); 485 recvThread.join(); 486 long endTime = System.currentTimeMillis(); 487 long pTime = endTime - startTime; 488 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 489 490 } 491 492 499 public void runSynchTopicPerformance(final int transacted, final int persistence) throws Exception 500 { 501 { 502 queueConnection.start(); 503 topicConnection.start(); 504 drainQueue(); 505 } 506 final int iterationCount = getIterationCount(); 507 final Logger log = getLog(); 508 509 Thread sendThread = 510 new Thread () 511 { 512 515 public void run() 516 { 517 try 518 { 519 520 TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 521 Topic topic = (Topic )context.lookup(TEST_TOPIC); 522 523 TopicPublisher publisher = session.createPublisher(topic); 524 525 waitForSynchMessage(); 526 527 BytesMessage message = session.createBytesMessage(); 528 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 529 530 long startTime = System.currentTimeMillis(); 531 for (int i = 0; i < iterationCount; i++) 532 { 533 publisher.publish(message, persistence, 4, 0); 534 if (transacted == TRANS_INDIVIDUAL) 537 { 538 session.commit(); 539 } 540 } 541 542 if (transacted == TRANS_TOTAL) 543 { 544 session.commit(); 545 } 546 547 long endTime = System.currentTimeMillis(); 548 549 session.close(); 550 551 long pTime = endTime - startTime; 552 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 553 } 554 catch (Exception e) 555 { 556 log.error("error", e); 557 } 558 } 559 }; 560 561 Thread recvThread = 562 new Thread () 563 { 564 567 public void run() 568 { 569 try 570 { 571 572 TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 573 Topic topic = (Topic )context.lookup(TEST_TOPIC); 574 TopicSubscriber subscriber = session.createSubscriber(topic); 575 576 sendSynchMessage(); 577 578 long startTime = System.currentTimeMillis(); 579 for (int i = 0; i < iterationCount; i++) 580 { 581 subscriber.receive(); 582 if (transacted == TRANS_INDIVIDUAL) 584 { 585 session.commit(); 586 } 587 } 588 589 if (transacted == TRANS_TOTAL) 590 { 591 session.commit(); 592 } 593 594 long endTime = System.currentTimeMillis(); 595 596 session.close(); 597 598 long pTime = endTime - startTime; 599 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 600 601 } 602 catch (Exception e) 603 { 604 log.error("error", e); 605 } 606 } 607 }; 608 609 getLog().debug(" Synch Topic: This test will send " + getIterationCount() + " " 610 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 611 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 612 + " Session is " + TRANS_DESC[transacted] + " transacted"); 613 long startTime = System.currentTimeMillis(); 614 sendThread.start(); 615 recvThread.start(); 616 sendThread.join(); 617 recvThread.join(); 618 long endTime = System.currentTimeMillis(); 619 long pTime = endTime - startTime; 620 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 621 622 } 623 624 629 public void testAsynchQueuePerformance() throws Exception 630 { 631 632 getLog().debug("Starting AsynchQueuePerformance test"); 633 634 runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 635 runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 636 runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 637 runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 638 runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 639 runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 640 641 getLog().debug("AsynchQueuePerformance passed"); 642 } 643 644 649 public void testAsynchTopicPerformance() throws Exception 650 { 651 652 getLog().debug("Starting AsynchTopicPerformance test"); 653 654 runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 655 runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 656 runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 657 runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 658 runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 659 runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 660 661 getLog().debug("AsynchTopicPerformance passed"); 662 } 663 664 669 public void testSynchQueuePerformance() throws Exception 670 { 671 672 getLog().debug("Starting SynchQueuePerformance test"); 673 674 runSynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 675 runSynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 676 runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 677 runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 678 runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 679 runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 680 681 getLog().debug("SynchQueuePerformance passed"); 682 } 683 684 689 public void testSynchTopicPerformance() throws Exception 690 { 691 692 getLog().debug("Starting SynchTopicPerformance test"); 693 694 runSynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 695 runSynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 696 runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 697 runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 698 runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 699 runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 700 701 getLog().debug("SynchTopicPerformance passed"); 702 } 703 704 709 protected void setUp() throws Exception 710 { 711 super.setUp() ; 713 714 if (context == null) 715 { 716 Logger log = getLog() ; 717 if (log == null) 718 System.out.println("JMSPerfStressTestCase: getLog() returned null") ; 719 720 getLog().debug("JMSPerfStresTestCase - setUp") ; 721 722 context = getInitialContext(); 723 724 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 725 queueConnection = queueFactory.createQueueConnection(); 726 727 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 728 topicConnection = topicFactory.createTopicConnection(); 729 730 getLog().debug("Connection to JMS provider established."); 731 } 732 733 } 734 735 736 private void drainQueue() throws Exception 738 { 739 740 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 741 Queue queue = (Queue )context.lookup(TEST_QUEUE); 742 743 QueueReceiver receiver = session.createReceiver(queue); 744 Message message = receiver.receive(50); 745 int c = 0; 746 while (message != null) 747 { 748 message = receiver.receive(50); 749 c++; 750 } 751 752 if (c != 0) 753 { 754 getLog().debug(" Drained " + c + " messages from the queue"); 755 } 756 757 session.close(); 758 759 } 760 761 private void waitForSynchMessage() throws Exception 762 { 763 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 764 Queue queue = (Queue )context.lookup(TEST_QUEUE); 765 766 QueueReceiver receiver = session.createReceiver(queue); 767 receiver.receive(); 768 session.close(); 769 } 770 771 private void sendSynchMessage() throws Exception 772 { 773 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 774 Queue queue = (Queue )context.lookup(TEST_QUEUE); 775 776 QueueSender sender = session.createSender(queue); 777 778 Message message = session.createMessage(); 779 sender.send(message); 780 781 session.close(); 782 } 783 784 789 public static void main(String [] args) 790 { 791 792 String newArgs[] = {"org.jboss.test.jbossmessaging.perf.JMSPerfStressTestCase"}; 793 junit.swingui.TestRunner.main(newArgs); 794 795 } 796 797 public static junit.framework.Test suite() throws Exception 798 { 799 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 800 String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ; 801 802 return getDeploySetup(JMSPerfStressTestCase.class, 803 loader.getResource(resourceName).toString()); 804 } 805 } 806 | Popular Tags |