1 22 package org.jboss.test.jbossmessaging; 23 24 import java.util.Enumeration ; 25 26 import javax.jms.Connection ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.Destination ; 29 import javax.jms.ExceptionListener ; 30 import javax.jms.JMSException ; 31 import javax.jms.Message ; 32 import javax.jms.MessageConsumer ; 33 import javax.jms.MessageListener ; 34 import javax.jms.MessageProducer ; 35 import javax.jms.Queue ; 36 import javax.jms.QueueBrowser ; 37 import javax.jms.QueueConnection ; 38 import javax.jms.QueueConnectionFactory ; 39 import javax.jms.QueueSender ; 40 import javax.jms.QueueSession ; 41 import javax.jms.Session ; 42 import javax.jms.Topic ; 43 import javax.jms.TopicConnection ; 44 import javax.jms.TopicConnectionFactory ; 45 import javax.jms.TopicPublisher ; 46 import javax.jms.TopicSession ; 47 import javax.naming.Context ; 48 import javax.naming.NamingException ; 49 50 import org.jboss.test.jbossmessaging.JMSTestCase; 51 import org.jboss.logging.Logger; 52 53 69 public class JMSBase extends JMSTestCase 70 { 71 public static final int PUBLISHER = 0; 72 public static final int SUBSCRIBER = 1; 73 public static final int GETTER = 2; 74 public static final int CONNECTOR = 3; 75 public static final int FAILSAFE_SUBSCRIBER = 4; 76 public static final int TRANS_NONE = 0; 77 public static final int TRANS_INDIVIDUAL = 1; 78 public static final int TRANS_TOTAL = 2; 79 public static final String [] TRANS_DESC = {"NOT", "individually", "totally"}; 80 public static final int DEFAULT_RUNSLEEP = 50; 81 public final Logger log = getLog(); 82 83 public String TOPIC_FACTORY = "ConnectionFactory"; 85 public String QUEUE_FACTORY = "ConnectionFactory"; 86 87 public String TEST_QUEUE = "queue/testQueue"; 88 public String TEST_TOPIC = "topic/testTopic"; 89 90 public Context context; 91 public QueueConnectionFactory queueFactory; 92 public TopicConnectionFactory topicFactory; 93 94 public JMSBase(String name) 95 { 96 super(name); 97 } 98 99 public long getRunSleep() 100 { 101 log.info("run.sleep: " + System.getProperty("run.sleep")); 102 return 1000L * Integer.getInteger("run.sleep", DEFAULT_RUNSLEEP).intValue(); 103 } 104 105 public void sleep(long sleep) 106 { 107 try 108 { 109 Thread.sleep(sleep); 110 } 111 catch (InterruptedException e) 112 { 113 } 114 } 115 116 public void drainTopic() throws JMSException 117 { 118 TopicWorker sub1 = new TopicWorker(GETTER, 119 TRANS_NONE, 120 null 121 ); 122 sub1.connect(); 123 sub1.get(); 124 sub1.close(); 125 } 126 127 public void drainQueue() throws JMSException 128 { 129 QueueWorker sub1 = new QueueWorker(GETTER, 130 TRANS_NONE, 131 null 132 ); 133 sub1.connect(); 134 sub1.get(); 135 sub1.close(); 136 } 137 138 143 protected void setUp() throws Exception 144 { 145 super.setUp() ; 147 148 QUEUE_FACTORY = System.getProperty("jbosstest.queuefactory", QUEUE_FACTORY); 150 TOPIC_FACTORY = System.getProperty("jbosstest.topicfactory", TOPIC_FACTORY); 151 TEST_QUEUE = System.getProperty("jbosstest.queue", TEST_QUEUE); 152 TEST_TOPIC = System.getProperty("jbosstest.topic", TEST_TOPIC); 153 154 if (context == null) 155 { 156 157 context = getInitialContext(); 158 159 queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 160 topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 161 162 getLog().debug("Connection to JMS provider established."); 163 } 164 165 } 166 167 168 public static void main(String [] args) 169 { 170 171 } 172 173 public abstract class JMSWorker implements Runnable , MessageListener , ExceptionListener 174 { 175 176 protected boolean stopRequested = false; 177 protected int messageHandled = 0; 178 protected Exception runEx = null; 179 protected MessageFilter filter; 180 protected MessageCreator creator; 181 protected int number = 1; 182 protected int type = -1; 183 protected int transacted; 184 protected QosConfig qosConfig = new QosConfig(); 185 protected String userName; 186 protected String password; 187 protected String clientID; 188 189 public Connection connection; 191 public Destination destination; 192 public Session session; 193 public MessageProducer producer; 194 public MessageConsumer consumer; 195 196 199 public JMSWorker() 200 { 201 } 202 203 public JMSWorker(int type, int transacted, MessageFilter filter) 204 { 205 this.type = type; 206 this.transacted = transacted; 207 this.filter = filter; 208 } 209 210 public JMSWorker(int type, 211 int transacted, 212 MessageCreator creator, 213 int number 214 ) 215 { 216 this.type = type; 217 this.transacted = transacted; 218 this.creator = creator; 219 this.number = number; 220 } 221 222 public void setSubscriberAttrs(int type, int transacted, MessageFilter filter) 223 { 224 this.type = type; 225 this.transacted = transacted; 226 this.filter = filter; 227 } 228 229 public void setPublisherAttrs(int type, 230 int transacted, 231 MessageCreator creator, 232 int number) 233 { 234 this.type = type; 235 this.transacted = transacted; 236 this.creator = creator; 237 this.number = number; 238 } 239 240 public void setUser(String userName, String password) 241 { 242 this.userName = userName; 243 this.password = password; 244 } 245 246 public void setClientID(String ID) 247 { 248 this.clientID = ID; 249 } 250 251 abstract public void publish() throws JMSException ; 252 253 abstract public void publish(int nr) throws JMSException ; 254 255 258 public void subscribe() throws JMSException 259 { 260 subscribe(false); 261 } 262 263 266 public void subscribe(boolean failsafe) throws JMSException 267 { 268 if (consumer == null) 269 throw new JMSException ("No messageConsumer created"); 270 271 if (failsafe) 272 connection.setExceptionListener(this); 273 274 consumer.setMessageListener(this); 275 276 } 277 278 public void get() throws JMSException 279 { 280 Message msg = consumer.receive(2000); 281 while (msg != null) 282 { 283 if (filter != null) 284 { 285 if (filter.ok(msg)) 286 messageHandled++; 287 } 288 else 289 { 290 messageHandled++; 291 } 292 msg = consumer.receive(2000); 293 } 294 } 295 296 abstract public void connect() throws JMSException ; 297 298 public void setQosConfig(QosConfig qosConfig) 299 { 300 this.qosConfig = qosConfig; 301 } 302 303 public void setStoped() throws JMSException 304 { 305 stopRequested = true; 306 } 307 308 public int getMessageHandled() 309 { 310 return messageHandled; 311 } 312 313 public Exception getException() 314 { 315 return runEx; 316 } 317 318 public void reset() 319 { 320 messageHandled = 0; 321 stopRequested = false; 322 runEx = null; 323 } 324 325 public void close() 326 { 327 try 328 { 329 if (consumer != null) 330 consumer.close(); 331 if (producer != null) 332 producer.close(); 333 if (session != null) 334 session.close(); 335 } 336 catch (JMSException ex) 337 { 338 } 339 finally 340 { 341 if (connection != null) 342 { 343 try 344 { 345 connection.close(); 346 } 347 catch (JMSException ex) 348 { 349 } 350 } 351 } 352 } 353 354 public void onMessage(Message msg) 355 { 356 try 357 { 358 if (filter != null) 359 { 360 if (filter.ok(msg)) 361 messageHandled++; 362 } 363 else 364 { 365 messageHandled++; 366 } 367 if (session.getTransacted()) 368 session.commit(); 369 } 370 catch (Exception ex) 371 { 372 log.warn("Exception in on message: " + ex, ex); 373 runEx = ex; 374 } 375 } 376 377 381 public void onException(JMSException ex) 382 { 383 log.error("Ex in connection: " + ex); 384 385 try 386 { 387 connection.setExceptionListener(null); 388 close(); 389 } 390 catch (JMSException c) 391 { 392 } 393 394 try 396 { 397 boolean tryIt = true; 398 while (tryIt && !stopRequested) 399 { 400 log.info("Trying reconnect..."); 401 try 402 { 403 Thread.sleep(10000); 404 } 405 catch (InterruptedException ie) 406 { 407 } 408 try 409 { 410 connect(); 411 subscribe(true); 412 tryIt = false; 413 log.info("Reconnect OK"); 414 } 416 catch (JMSException e) 417 { 418 log.error("Error in reconnect: " + e); 419 } 420 } 421 422 } 423 catch (Exception je) 424 { 425 log.error("Strange error in failsafe handling" + je, je); 426 } 427 } 428 429 public void run() 430 { 431 try 432 { 433 switch (type) 434 { 435 case -1: 436 log.info("Nothing to do for type " + type); 437 break; 438 case PUBLISHER: 439 connect(); 440 publish(); 441 break; 442 case SUBSCRIBER: 443 connect(); 444 subscribe(); 445 break; 446 case GETTER: 447 connect(); 448 get(); 449 break; 450 case CONNECTOR: 451 connect(); 452 break; 453 case FAILSAFE_SUBSCRIBER: 454 connect(); 455 subscribe(true); 456 break; 457 } 458 459 while (!stopRequested) 461 { 462 try 463 { 464 Thread.sleep(1000); 465 } 466 catch (InterruptedException ex) 467 { 468 469 } 470 } 471 } 472 catch (JMSException ex) 473 { 474 runEx = ex; 475 log.error("Could not run: " + ex, ex); 476 } 477 } 478 } 479 480 public interface MessageCreator 481 { 482 public void setSession(Session session); 483 484 public Message createMessage(int nr) throws JMSException ; 485 } 486 487 public abstract class BaseMessageCreator implements MessageCreator 488 { 489 protected Session session; 490 protected String property; 491 492 public BaseMessageCreator(String property) 493 { 494 this.property = property; 495 } 496 497 public void setSession(Session session) 498 { 499 this.session = session; 500 } 501 502 abstract public Message createMessage(int nr) throws JMSException ; 503 } 504 505 506 public class IntRangeMessageCreator extends BaseMessageCreator 507 { 508 int start = 0; 509 510 public IntRangeMessageCreator(String property) 511 { 512 super(property); 513 } 514 515 public IntRangeMessageCreator(String property, int start) 516 { 517 super(property); 518 this.start = start; 519 } 520 521 public Message createMessage(int nr) throws JMSException 522 { 523 if (session == null) 524 throw new JMSException ("Session not allowed to be null"); 525 526 Message msg = session.createMessage(); 527 msg.setStringProperty(property, String.valueOf(start + nr)); 528 return msg; 529 } 530 } 531 532 public interface MessageFilter 533 { 534 public boolean ok(Message msg) throws JMSException ; 535 } 536 537 public class IntRangeMessageFilter implements MessageFilter 538 { 539 Class messageClass; 540 String className; 541 String property; 542 int low; 543 int max; 544 int counter = 0; 545 int report = 1000; 546 547 public IntRangeMessageFilter(Class messageClass, String property, int low, int max) 548 { 549 this.messageClass = messageClass; 550 this.property = property; 551 className = messageClass.getName(); 552 this.low = low; 553 this.max = max; 554 } 555 556 private boolean validateClass(Message msg) 557 { 558 Class clazz = null; 559 if (msg instanceof javax.jms.TextMessage ) 560 clazz = javax.jms.TextMessage .class; 561 else if (msg instanceof javax.jms.BytesMessage ) 562 clazz = javax.jms.BytesMessage .class; 563 else if (msg instanceof javax.jms.MapMessage ) 564 clazz = javax.jms.MapMessage .class; 565 else if (msg instanceof javax.jms.ObjectMessage ) 566 clazz = javax.jms.ObjectMessage .class; 567 else if (msg instanceof javax.jms.StreamMessage ) 568 clazz = javax.jms.StreamMessage .class; 569 else 570 clazz = javax.jms.Message .class; 571 572 return clazz.equals(messageClass); 573 } 574 575 public boolean ok(Message msg) throws JMSException 576 { 577 boolean res = false; 578 if (validateClass(msg)) 579 { 580 if (msg.propertyExists(property)) 581 { 582 String p = msg.getStringProperty(property); 583 try 584 { 585 int i = Integer.parseInt(p); 586 if (i >= low && i < max) 588 res = true; 589 } 590 catch (NumberFormatException ex) 591 { 592 throw new JMSException ("Property " + property + " was not int: " + p); 593 } 594 } 595 } 596 counter++; 597 int mod = counter % report; 598 if (mod == 0) 599 log.debug("Have received " + counter + " messages"); 600 return res; 601 } 602 603 } 604 605 631 635 public class QosConfig 636 { 637 int deliveryMode = DeliveryMode.PERSISTENT; 638 int priority = 4; 639 long ttl = 0; 640 } 641 642 public class TopicWorker extends JMSWorker 643 { 644 String durableHandle; 645 646 649 public TopicWorker() 650 { 651 super(); 652 } 653 654 public TopicWorker(int type, int transacted, MessageFilter filter) 655 { 656 super(type, transacted, filter); 657 } 658 659 public TopicWorker(int type, 660 int transacted, 661 MessageCreator creator, 662 int number 663 ) 664 { 665 super(type, transacted, creator, number); 666 } 667 668 669 public void publish() throws JMSException 670 { 671 publish(number); 672 } 673 674 public void publish(int nr) throws JMSException 675 { 676 if (producer == null) 677 producer = ((TopicSession ) session).createPublisher((Topic ) destination); 678 if (creator == null) 679 throw new JMSException ("Publish must have a MessageCreator set"); 680 681 creator.setSession(session); 682 log.debug("Publishing " + nr + " messages"); 683 for (int i = 0; i < nr; i++) 684 { 685 if (qosConfig != null) 686 { 687 ((TopicPublisher ) producer).publish(creator.createMessage(i), 688 qosConfig.deliveryMode, 689 qosConfig.priority, 690 qosConfig.ttl); 691 } 692 else 693 { 694 ((TopicPublisher ) producer).publish(creator.createMessage(i)); 695 } 696 697 messageHandled++; 698 } 699 if (session.getTransacted()) 700 session.commit(); 701 log.debug("Finished publishing"); 702 } 703 704 public void subscribe() throws JMSException 705 { 706 subscribe(false); 707 } 708 709 public void subscribe(boolean failsafe) throws JMSException 710 { 711 if (durableHandle != null) 712 consumer = ((TopicSession ) session).createDurableSubscriber((Topic ) destination, durableHandle); 713 else 714 consumer = ((TopicSession ) session).createSubscriber((Topic ) destination); 715 super.subscribe(failsafe); 716 connection.start(); 717 } 718 719 public void get() throws JMSException 720 { 721 consumer = ((TopicSession ) session).createSubscriber((Topic ) destination); 722 super.subscribe(); 723 connection.start(); 724 } 725 726 public void connect() throws JMSException 727 { 728 log.debug("Connecting: " + this.toString()); 729 if (userName != null) 730 connection = topicFactory.createTopicConnection(userName, password); 731 else 732 connection = topicFactory.createTopicConnection(); 733 734 if (clientID != null) 735 { 736 log.debug("Setting clientID" + clientID); 737 connection.setClientID(clientID); 738 } 739 740 session = ((TopicConnection ) connection).createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 741 try 742 { 743 destination = (Destination ) context.lookup(TEST_TOPIC); 744 } 745 catch (NamingException ex) 746 { 747 throw new JMSException ("Could not lookup topic " + ex); 748 } 749 } 750 751 public void setDurable(String userId, String pwd, String handle) 753 { 754 this.userName = userId; 755 this.password = pwd; 756 this.durableHandle = handle; 757 } 758 759 public void setDurable(String handle) 760 { 761 this.durableHandle = handle; 762 } 763 764 public void unsubscribe() throws JMSException 765 { 766 if (durableHandle != null) 767 ((TopicSession ) session).unsubscribe(durableHandle); 768 } 769 770 public String toString() 771 { 772 return "(userId=" + userName + " pwd=" + password + " handle=" + durableHandle + ")"; 773 } 774 775 } 776 777 public class QueueWorker extends JMSWorker 778 { 779 String userId; 780 String pwd; 781 String handle; 782 783 786 public QueueWorker() 787 { 788 super(); 789 } 790 791 public QueueWorker(int type, int transacted, MessageFilter filter) 792 { 793 super(type, transacted, filter); 794 } 795 796 public QueueWorker(int type, 797 int transacted, 798 MessageCreator creator, 799 int number 800 ) 801 { 802 super(type, transacted, creator, number); 803 } 804 805 806 public void publish() throws JMSException 807 { 808 publish(number); 809 } 810 811 public void publish(int nr) throws JMSException 812 { 813 if (producer == null) 814 producer = ((QueueSession ) session).createSender((Queue ) destination); 815 if (creator == null) 816 throw new JMSException ("Publish must have a MessageCreator set"); 817 818 creator.setSession(session); 819 log.debug("Publishing " + nr + " messages"); 820 for (int i = 0; i < nr; i++) 821 { 822 if (qosConfig != null) 823 { 824 ((QueueSender ) producer).send(creator.createMessage(i), 825 qosConfig.deliveryMode, 826 qosConfig.priority, 827 qosConfig.ttl); 828 } 829 else 830 { 831 ((QueueSender ) producer).send(creator.createMessage(i)); 832 } 833 834 messageHandled++; 835 } 836 if (session.getTransacted()) 837 session.commit(); 838 log.debug("Finished publishing"); 839 } 840 841 public void subscribe() throws JMSException 842 { 843 subscribe(false); 844 } 845 846 public void subscribe(boolean failsafe) throws JMSException 847 { 848 849 consumer = ((QueueSession ) session).createReceiver((Queue ) destination); 850 super.subscribe(failsafe); 851 connection.start(); 852 } 853 854 public void get() throws JMSException 855 { 856 consumer = ((QueueSession ) session).createReceiver((Queue ) destination); 857 super.subscribe(); 858 connection.start(); 859 } 860 861 public void connect() throws JMSException 862 { 863 log.debug("Connecting: " + this.toString()); 864 if (userName != null) 865 connection = queueFactory.createQueueConnection(userName, password); 866 else 867 connection = queueFactory.createQueueConnection(); 868 869 if (clientID != null) 870 connection.setClientID(clientID); 871 872 session = ((QueueConnection ) connection).createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 873 try 874 { 875 destination = (Destination ) context.lookup(TEST_QUEUE); 876 } 877 catch (NamingException ex) 878 { 879 throw new JMSException ("Could not lookup topic " + ex); 880 } 881 } 882 883 884 public Enumeration browse() throws JMSException 886 { 887 QueueBrowser b = ((QueueSession ) session).createBrowser((Queue ) destination); 888 return b.getEnumeration(); 889 } 890 } 891 } | Popular Tags |