1 22 package org.jboss.test.jbossmq.perf; 23 import javax.jms.BytesMessage ; 24 import javax.jms.DeliveryMode ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.QueueConnection ; 29 import javax.jms.QueueConnectionFactory ; 30 import javax.jms.QueueReceiver ; 31 import javax.jms.QueueSender ; 32 import javax.jms.QueueSession ; 33 import javax.jms.Session ; 34 import javax.jms.Topic ; 35 import javax.jms.TopicConnection ; 36 import javax.jms.TopicConnectionFactory ; 37 import javax.jms.TopicPublisher ; 38 import javax.jms.TopicSession ; 39 import javax.jms.TopicSubscriber ; 40 import javax.jms.Queue ; 41 import javax.naming.Context ; 42 43 import org.jboss.logging.Logger; 44 import org.jboss.test.JBossTestCase; 45 51 52 public class JBossMQPerfStressTestCase extends JBossTestCase 53 { 54 55 static String TOPIC_FACTORY = "ConnectionFactory"; 57 static String QUEUE_FACTORY = "ConnectionFactory"; 58 59 static String TEST_QUEUE = "queue/testQueue"; 60 static String TEST_TOPIC = "topic/testTopic"; 61 62 static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024]; 64 65 static int TRANS_NONE = 0; 66 static int TRANS_INDIVIDUAL = 1; 67 static int TRANS_TOTAL = 2; 68 static String [] TRANS_DESC = {"NOT", "individually", "totally"}; 69 70 static Context context; 72 static QueueConnection queueConnection; 73 static TopicConnection topicConnection; 74 75 81 public JBossMQPerfStressTestCase(String name) throws Exception 82 { 83 super(name); 84 } 85 86 87 94 public void runAsynchQueuePerformance(final int transacted, final int persistence) throws Exception 95 { 96 { 97 queueConnection.start(); 98 drainQueue(); 99 queueConnection.stop(); 100 } 101 final int iterationCount = getIterationCount(); 102 final Logger log = getLog(); 103 104 Thread sendThread = 105 new Thread () 106 { 107 110 public void run() 111 { 112 try 113 { 114 QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 115 Queue queue = (Queue )context.lookup(TEST_QUEUE); 116 117 QueueSender sender = session.createSender(queue); 118 119 BytesMessage message = session.createBytesMessage(); 120 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 121 122 long startTime = System.currentTimeMillis(); 123 for (int i = 0; i < iterationCount; i++) 124 { 125 sender.send(message, persistence, 4, 0); 127 if (transacted == TRANS_INDIVIDUAL) 129 { 130 session.commit(); 131 } 132 } 133 134 if (transacted == TRANS_TOTAL) 135 { 136 session.commit(); 137 } 138 139 long endTime = System.currentTimeMillis(); 140 141 session.close(); 142 143 long pTime = endTime - startTime; 144 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 145 } 146 catch (Exception e) 147 { 148 log.error("error", e); 149 } 150 } 151 }; 152 153 final QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 154 Queue queue = (Queue )context.lookup(TEST_QUEUE); 155 QueueReceiver receiver = session.createReceiver(queue); 156 157 MessageListener listener = 158 new MessageListener () 159 { 160 long startTime = System.currentTimeMillis(); 161 int i = 0; 162 163 168 public void onMessage(Message message) 169 { 170 try 171 { 172 if( transacted == TRANS_INDIVIDUAL ) 173 session.commit(); 174 i++; 175 } 176 catch (JMSException e) 177 { 178 getLog().error("Unable to commit", e); 179 synchronized (this) 180 { 181 this.notify(); 182 } 183 } 184 if (i >= iterationCount) 185 { 186 long endTime = System.currentTimeMillis(); 187 long pTime = endTime - startTime; 188 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 189 190 synchronized (this) 191 { 192 this.notify(); 193 } 194 } 195 } 196 }; 197 198 getLog().debug(" Asynch Queue: This test will send " + getIterationCount() + " " 199 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 200 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 201 + " Session is " + TRANS_DESC[transacted] + " transacted"); 202 long startTime = System.currentTimeMillis(); 203 sendThread.start(); 204 receiver.setMessageListener(listener); 205 synchronized (listener) 206 { 207 queueConnection.start(); 208 listener.wait(); 209 } 210 211 if (transacted == TRANS_TOTAL) 212 { 213 session.commit(); 214 } 215 216 session.close(); 217 sendThread.join(); 218 long endTime = System.currentTimeMillis(); 219 long pTime = endTime - startTime; 220 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 221 222 } 223 224 231 public void runAsynchTopicPerformance(final int transacted, final int persistence) throws Exception 232 { 233 { 234 queueConnection.start(); 235 drainQueue(); 236 } 237 238 final int iterationCount = getIterationCount(); 239 final Logger log = getLog(); 240 241 Thread sendThread = 242 new Thread () 243 { 244 247 public void run() 248 { 249 try 250 { 251 252 TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 253 Topic topic = (Topic )context.lookup(TEST_TOPIC); 254 255 TopicPublisher publisher = session.createPublisher(topic); 256 257 waitForSynchMessage(); 258 259 BytesMessage message = session.createBytesMessage(); 260 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 261 262 long startTime = System.currentTimeMillis(); 263 for (int i = 0; i < iterationCount; i++) 264 { 265 publisher.publish(message, persistence, 4, 0); 266 if (transacted == TRANS_INDIVIDUAL) 269 { 270 session.commit(); 271 } 272 } 273 274 if (transacted == TRANS_TOTAL) 275 { 276 session.commit(); 277 } 278 279 long endTime = System.currentTimeMillis(); 280 session.close(); 281 282 long pTime = endTime - startTime; 283 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 284 } 285 catch (Exception e) 286 { 287 log.error("error", e); 288 } 289 } 290 }; 291 292 final TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 293 Topic topic = (Topic )context.lookup(TEST_TOPIC); 294 TopicSubscriber subscriber = session.createSubscriber(topic); 295 296 MessageListener listener = 297 new MessageListener () 298 { 299 long startTime = System.currentTimeMillis(); 300 int i = 0; 301 302 307 public void onMessage(Message message) 308 { 309 try 310 { 311 if( transacted == TRANS_INDIVIDUAL ) 312 session.commit(); 313 i++; 314 } 315 catch (JMSException e) 316 { 317 getLog().error("Unable to commit", e); 318 synchronized (this) 319 { 320 this.notify(); 321 } 322 } 323 if (i >= iterationCount) 324 { 325 long endTime = System.currentTimeMillis(); 326 long pTime = endTime - startTime; 327 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 328 329 synchronized (this) 330 { 331 this.notify(); 332 } 333 } 334 } 335 }; 336 337 getLog().debug(" Asynch Topic: This test will send " + getIterationCount() + " " 338 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 339 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 340 + " Session is " + TRANS_DESC[transacted] + " transacted"); 341 long startTime = System.currentTimeMillis(); 342 sendThread.start(); 343 subscriber.setMessageListener(listener); 344 sendSynchMessage(); 345 synchronized (listener) 346 { 347 topicConnection.start(); 348 listener.wait(); 349 } 350 351 if (transacted == TRANS_TOTAL) 352 { 353 session.commit(); 354 } 355 356 session.close(); 357 sendThread.join(); 358 long endTime = System.currentTimeMillis(); 359 long pTime = endTime - startTime; 360 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 361 362 } 363 364 371 public void runSynchQueuePerformance(final int transacted, final int persistence) throws Exception 372 { 373 { 374 queueConnection.start(); 375 drainQueue(); 376 } 377 final int iterationCount = getIterationCount(); 378 final Logger log = getLog(); 379 380 Thread sendThread = 381 new Thread () 382 { 383 386 public void run() 387 { 388 try 389 { 390 QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 391 Queue queue = (Queue )context.lookup(TEST_QUEUE); 392 393 QueueSender sender = session.createSender(queue); 394 395 BytesMessage message = session.createBytesMessage(); 396 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 397 398 long startTime = System.currentTimeMillis(); 399 for (int i = 0; i < iterationCount; i++) 400 { 401 sender.send( message, persistence, 4, 0); 402 if (transacted == TRANS_INDIVIDUAL) 405 { 406 session.commit(); 407 } 408 } 409 410 if (transacted == TRANS_TOTAL) 411 { 412 session.commit(); 413 } 414 415 session.close(); 416 417 long endTime = System.currentTimeMillis(); 418 419 long pTime = endTime - startTime; 420 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 421 } 422 catch (Exception e) 423 { 424 log.error("error", e); 425 } 426 } 427 }; 428 429 Thread recvThread = 430 new Thread () 431 { 432 435 public void run() 436 { 437 try 438 { 439 440 QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 441 Queue queue = (Queue )context.lookup(TEST_QUEUE); 442 443 QueueReceiver receiver = session.createReceiver(queue); 444 long startTime = System.currentTimeMillis(); 445 for (int i = 0; i < iterationCount; i++) 446 { 447 receiver.receive(); 448 if (transacted == TRANS_INDIVIDUAL) 450 { 451 session.commit(); 452 } 453 } 454 455 if (transacted == TRANS_TOTAL) 456 { 457 session.commit(); 458 } 459 460 long endTime = System.currentTimeMillis(); 461 462 session.close(); 463 464 long pTime = endTime - startTime; 465 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 466 467 } 468 catch (Exception e) 469 { 470 log.error("error", e); 471 } 472 } 473 }; 474 475 getLog().debug(" Synch Queue: This test will send " + getIterationCount() + " " 476 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 477 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 478 + " Session is " + TRANS_DESC[transacted] + " transacted"); 479 long startTime = System.currentTimeMillis(); 480 sendThread.start(); 481 recvThread.start(); 482 sendThread.join(); 483 recvThread.join(); 484 long endTime = System.currentTimeMillis(); 485 long pTime = endTime - startTime; 486 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 487 488 } 489 490 497 public void runSynchTopicPerformance(final int transacted, final int persistence) throws Exception 498 { 499 { 500 queueConnection.start(); 501 topicConnection.start(); 502 drainQueue(); 503 } 504 final int iterationCount = getIterationCount(); 505 final Logger log = getLog(); 506 507 Thread sendThread = 508 new Thread () 509 { 510 513 public void run() 514 { 515 try 516 { 517 518 TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 519 Topic topic = (Topic )context.lookup(TEST_TOPIC); 520 521 TopicPublisher publisher = session.createPublisher(topic); 522 523 waitForSynchMessage(); 524 525 BytesMessage message = session.createBytesMessage(); 526 message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 527 528 long startTime = System.currentTimeMillis(); 529 for (int i = 0; i < iterationCount; i++) 530 { 531 publisher.publish(message, persistence, 4, 0); 532 if (transacted == TRANS_INDIVIDUAL) 535 { 536 session.commit(); 537 } 538 } 539 540 if (transacted == TRANS_TOTAL) 541 { 542 session.commit(); 543 } 544 545 long endTime = System.currentTimeMillis(); 546 547 session.close(); 548 549 long pTime = endTime - startTime; 550 log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. "); 551 } 552 catch (Exception e) 553 { 554 log.error("error", e); 555 } 556 } 557 }; 558 559 Thread recvThread = 560 new Thread () 561 { 562 565 public void run() 566 { 567 try 568 { 569 570 TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 571 Topic topic = (Topic )context.lookup(TEST_TOPIC); 572 TopicSubscriber subscriber = session.createSubscriber(topic); 573 574 sendSynchMessage(); 575 576 long startTime = System.currentTimeMillis(); 577 for (int i = 0; i < iterationCount; i++) 578 { 579 subscriber.receive(); 580 if (transacted == TRANS_INDIVIDUAL) 582 { 583 session.commit(); 584 } 585 } 586 587 if (transacted == TRANS_TOTAL) 588 { 589 session.commit(); 590 } 591 592 long endTime = System.currentTimeMillis(); 593 594 session.close(); 595 596 long pTime = endTime - startTime; 597 log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. "); 598 599 } 600 catch (Exception e) 601 { 602 log.error("error", e); 603 } 604 } 605 }; 606 607 getLog().debug(" Synch Topic: This test will send " + getIterationCount() + " " 608 + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of " 609 + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb" 610 + " Session is " + TRANS_DESC[transacted] + " transacted"); 611 long startTime = System.currentTimeMillis(); 612 sendThread.start(); 613 recvThread.start(); 614 sendThread.join(); 615 recvThread.join(); 616 long endTime = System.currentTimeMillis(); 617 long pTime = endTime - startTime; 618 getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. "); 619 620 } 621 622 627 public void testAsynchQueuePerformance() throws Exception 628 { 629 630 getLog().debug("Starting AsynchQueuePerformance test"); 631 632 runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 633 runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 634 runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 635 runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 636 runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 637 runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 638 639 getLog().debug("AsynchQueuePerformance passed"); 640 } 641 642 647 public void testAsynchTopicPerformance() throws Exception 648 { 649 650 getLog().debug("Starting AsynchTopicPerformance test"); 651 652 runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 653 runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 654 runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 655 runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 656 runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 657 runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 658 659 getLog().debug("AsynchTopicPerformance passed"); 660 } 661 662 667 public void testSynchQueuePerformance() throws Exception 668 { 669 670 getLog().debug("Starting SynchQueuePerformance test"); 671 672 runSynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 673 runSynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 674 runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 675 runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 676 runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 677 runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 678 679 getLog().debug("SynchQueuePerformance passed"); 680 } 681 682 687 public void testSynchTopicPerformance() throws Exception 688 { 689 690 getLog().debug("Starting SynchTopicPerformance test"); 691 692 runSynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT); 693 runSynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT); 694 runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT); 695 runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT); 696 runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT); 697 runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT); 698 699 getLog().debug("SynchTopicPerformance passed"); 700 } 701 702 707 protected void setUp() throws Exception 708 { 709 if (context == null) 710 { 711 712 context = getInitialContext(); 713 714 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 715 queueConnection = queueFactory.createQueueConnection(); 716 717 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 718 topicConnection = topicFactory.createTopicConnection(); 719 720 getLog().debug("Connection to JBossMQ established."); 721 } 722 723 } 724 725 726 private void drainQueue() throws Exception 728 { 729 730 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 731 Queue queue = (Queue )context.lookup(TEST_QUEUE); 732 733 QueueReceiver receiver = session.createReceiver(queue); 734 Message message = receiver.receive(50); 735 int c = 0; 736 while (message != null) 737 { 738 message = receiver.receive(50); 739 c++; 740 } 741 742 if (c != 0) 743 { 744 getLog().debug(" Drained " + c + " messages from the queue"); 745 } 746 747 session.close(); 748 749 } 750 751 private void waitForSynchMessage() throws Exception 752 { 753 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 754 Queue queue = (Queue )context.lookup(TEST_QUEUE); 755 756 QueueReceiver receiver = session.createReceiver(queue); 757 receiver.receive(); 758 session.close(); 759 } 760 761 private void sendSynchMessage() throws Exception 762 { 763 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 764 Queue queue = (Queue )context.lookup(TEST_QUEUE); 765 766 QueueSender sender = session.createSender(queue); 767 768 Message message = session.createMessage(); 769 sender.send(message); 770 771 session.close(); 772 } 773 774 779 public static void main(String [] args) 780 { 781 782 String newArgs[] = {"org.jboss.test.jbossmq.perf.JBossMQPerfStressTestCase"}; 783 junit.swingui.TestRunner.main(newArgs); 784 785 } 786 787 public static junit.framework.Test suite() throws Exception 788 { 789 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 790 return getDeploySetup(JBossMQPerfStressTestCase.class, 791 loader.getResource("messaging/test-destinations-service.xml").toString()); 792 } 793 } 794 | Popular Tags |