1 22 package org.jboss.test.jbossmq; 23 24 import java.util.Enumeration ; 25 26 import javax.jms.Connection ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.Destination ; 29 import javax.jms.ExceptionListener ; 30 import javax.jms.JMSException ; 31 import javax.jms.Message ; 32 import javax.jms.MessageConsumer ; 33 import javax.jms.MessageListener ; 34 import javax.jms.MessageProducer ; 35 import javax.jms.Queue ; 36 import javax.jms.QueueBrowser ; 37 import javax.jms.QueueConnection ; 38 import javax.jms.QueueConnectionFactory ; 39 import javax.jms.QueueSender ; 40 import javax.jms.QueueSession ; 41 import javax.jms.Session ; 42 import javax.jms.Topic ; 43 import javax.jms.TopicConnection ; 44 import javax.jms.TopicConnectionFactory ; 45 import javax.jms.TopicPublisher ; 46 import javax.jms.TopicSession ; 47 import javax.naming.Context ; 48 import javax.naming.NamingException ; 49 50 import org.jboss.test.JBossTestCase; 51 import org.jboss.logging.Logger; 52 53 69 public class MQBase extends JBossTestCase 70 { 71 public static final int PUBLISHER = 0; 72 public static final int SUBSCRIBER = 1; 73 public static final int GETTER = 2; 74 public static final int CONNECTOR = 3; 75 public static final int FAILSAFE_SUBSCRIBER = 4; 76 public static final int TRANS_NONE = 0; 77 public static final int TRANS_INDIVIDUAL = 1; 78 public static final int TRANS_TOTAL = 2; 79 public static final String [] TRANS_DESC = {"NOT", "individually", "totally"}; 80 public static final int DEFAULT_RUNSLEEP = 50; 81 public final Logger log = getLog(); 82 83 public String TOPIC_FACTORY = "ConnectionFactory"; 85 public String QUEUE_FACTORY = "ConnectionFactory"; 86 87 public String TEST_QUEUE = "queue/testQueue"; 88 public String TEST_TOPIC = "topic/testTopic"; 89 90 public Context context; 91 public QueueConnectionFactory queueFactory; 92 public TopicConnectionFactory topicFactory; 93 94 public MQBase(String name) 95 { 96 super(name); 97 } 98 99 public long getRunSleep() 100 { 101 log.info("run.sleep: " + System.getProperty("run.sleep")); 102 return 1000L * Integer.getInteger("run.sleep", DEFAULT_RUNSLEEP).intValue(); 103 } 104 105 public void sleep(long sleep) 106 { 107 try 108 { 109 Thread.sleep(sleep); 110 } 111 catch (InterruptedException e) 112 { 113 } 114 } 115 116 public void drainTopic() throws JMSException 117 { 118 TopicWorker sub1 = new TopicWorker(GETTER, 119 TRANS_NONE, 120 null 121 ); 122 sub1.connect(); 123 sub1.get(); 124 sub1.close(); 125 } 126 127 public void drainQueue() throws JMSException 128 { 129 QueueWorker sub1 = new QueueWorker(GETTER, 130 TRANS_NONE, 131 null 132 ); 133 sub1.connect(); 134 sub1.get(); 135 sub1.close(); 136 } 137 138 143 protected void setUp() throws Exception 144 { 145 QUEUE_FACTORY = System.getProperty("jbosstest.queuefactory", QUEUE_FACTORY); 147 TOPIC_FACTORY = System.getProperty("jbosstest.topicfactory", TOPIC_FACTORY); 148 TEST_QUEUE = System.getProperty("jbosstest.queue", TEST_QUEUE); 149 TEST_TOPIC = System.getProperty("jbosstest.topic", TEST_TOPIC); 150 151 if (context == null) 152 { 153 154 context = getInitialContext(); 155 156 queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 157 topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 158 159 getLog().debug("Connection to JBossMQ established."); 160 } 161 162 } 163 164 165 public static void main(String [] args) 166 { 167 168 } 169 170 public abstract class JMSWorker implements Runnable , MessageListener , ExceptionListener 171 { 172 173 protected boolean stopRequested = false; 174 protected int messageHandled = 0; 175 protected Exception runEx = null; 176 protected MessageFilter filter; 177 protected MessageCreator creator; 178 protected int number = 1; 179 protected int type = -1; 180 protected int transacted; 181 protected QosConfig qosConfig = new QosConfig(); 182 protected String userName; 183 protected String password; 184 protected String clientID; 185 186 public Connection connection; 188 public Destination destination; 189 public Session session; 190 public MessageProducer producer; 191 public MessageConsumer consumer; 192 193 196 public JMSWorker() 197 { 198 } 199 200 public JMSWorker(int type, int transacted, MessageFilter filter) 201 { 202 this.type = type; 203 this.transacted = transacted; 204 this.filter = filter; 205 } 206 207 public JMSWorker(int type, 208 int transacted, 209 MessageCreator creator, 210 int number 211 ) 212 { 213 this.type = type; 214 this.transacted = transacted; 215 this.creator = creator; 216 this.number = number; 217 } 218 219 public void setSubscriberAttrs(int type, int transacted, MessageFilter filter) 220 { 221 this.type = type; 222 this.transacted = transacted; 223 this.filter = filter; 224 } 225 226 public void setPublisherAttrs(int type, 227 int transacted, 228 MessageCreator creator, 229 int number) 230 { 231 this.type = type; 232 this.transacted = transacted; 233 this.creator = creator; 234 this.number = number; 235 } 236 237 public void setUser(String userName, String password) 238 { 239 this.userName = userName; 240 this.password = password; 241 } 242 243 public void setClientID(String ID) 244 { 245 this.clientID = ID; 246 } 247 248 abstract public void publish() throws JMSException ; 249 250 abstract public void publish(int nr) throws JMSException ; 251 252 255 public void subscribe() throws JMSException 256 { 257 subscribe(false); 258 } 259 260 263 public void subscribe(boolean failsafe) throws JMSException 264 { 265 if (consumer == null) 266 throw new JMSException ("No messageConsumer created"); 267 268 if (failsafe) 269 connection.setExceptionListener(this); 270 271 consumer.setMessageListener(this); 272 273 } 274 275 public void get() throws JMSException 276 { 277 Message msg = consumer.receive(2000); 278 while (msg != null) 279 { 280 if (filter != null) 281 { 282 if (filter.ok(msg)) 283 messageHandled++; 284 } 285 else 286 { 287 messageHandled++; 288 } 289 msg = consumer.receive(2000); 290 } 291 } 292 293 abstract public void connect() throws JMSException ; 294 295 public void setQosConfig(QosConfig qosConfig) 296 { 297 this.qosConfig = qosConfig; 298 } 299 300 public void setStoped() throws JMSException 301 { 302 stopRequested = true; 303 } 304 305 public int getMessageHandled() 306 { 307 return messageHandled; 308 } 309 310 public Exception getException() 311 { 312 return runEx; 313 } 314 315 public void reset() 316 { 317 messageHandled = 0; 318 stopRequested = false; 319 runEx = null; 320 } 321 322 public void close() 323 { 324 try 325 { 326 if (consumer != null) 327 consumer.close(); 328 if (producer != null) 329 producer.close(); 330 if (session != null) 331 session.close(); 332 } 333 catch (JMSException ex) 334 { 335 } 336 finally 337 { 338 if (connection != null) 339 { 340 try 341 { 342 connection.close(); 343 } 344 catch (JMSException ex) 345 { 346 } 347 } 348 } 349 } 350 351 public void onMessage(Message msg) 352 { 353 try 354 { 355 if (filter != null) 356 { 357 if (filter.ok(msg)) 358 messageHandled++; 359 } 360 else 361 { 362 messageHandled++; 363 } 364 if (session.getTransacted()) 365 session.commit(); 366 } 367 catch (Exception ex) 368 { 369 log.warn("Exception in on message: " + ex, ex); 370 runEx = ex; 371 } 372 } 373 374 378 public void onException(JMSException ex) 379 { 380 log.error("Ex in connection: " + ex); 381 382 try 383 { 384 connection.setExceptionListener(null); 385 close(); 386 } 387 catch (JMSException c) 388 { 389 } 390 391 try 393 { 394 boolean tryIt = true; 395 while (tryIt && !stopRequested) 396 { 397 log.info("Trying reconnect..."); 398 try 399 { 400 Thread.sleep(10000); 401 } 402 catch (InterruptedException ie) 403 { 404 } 405 try 406 { 407 connect(); 408 subscribe(true); 409 tryIt = false; 410 log.info("Reconnect OK"); 411 } 413 catch (JMSException e) 414 { 415 log.error("Error in reconnect: " + e); 416 } 417 } 418 419 } 420 catch (Exception je) 421 { 422 log.error("Strange error in failsafe handling" + je, je); 423 } 424 } 425 426 public void run() 427 { 428 try 429 { 430 switch (type) 431 { 432 case -1: 433 log.info("Nothing to do for type " + type); 434 break; 435 case PUBLISHER: 436 connect(); 437 publish(); 438 break; 439 case SUBSCRIBER: 440 connect(); 441 subscribe(); 442 break; 443 case GETTER: 444 connect(); 445 get(); 446 break; 447 case CONNECTOR: 448 connect(); 449 break; 450 case FAILSAFE_SUBSCRIBER: 451 connect(); 452 subscribe(true); 453 break; 454 } 455 456 while (!stopRequested) 458 { 459 try 460 { 461 Thread.sleep(1000); 462 } 463 catch (InterruptedException ex) 464 { 465 466 } 467 } 468 } 469 catch (JMSException ex) 470 { 471 runEx = ex; 472 log.error("Could not run: " + ex, ex); 473 } 474 } 475 } 476 477 public interface MessageCreator 478 { 479 public void setSession(Session session); 480 481 public Message createMessage(int nr) throws JMSException ; 482 } 483 484 public abstract class BaseMessageCreator implements MessageCreator 485 { 486 protected Session session; 487 protected String property; 488 489 public BaseMessageCreator(String property) 490 { 491 this.property = property; 492 } 493 494 public void setSession(Session session) 495 { 496 this.session = session; 497 } 498 499 abstract public Message createMessage(int nr) throws JMSException ; 500 } 501 502 503 public class IntRangeMessageCreator extends BaseMessageCreator 504 { 505 int start = 0; 506 507 public IntRangeMessageCreator(String property) 508 { 509 super(property); 510 } 511 512 public IntRangeMessageCreator(String property, int start) 513 { 514 super(property); 515 this.start = start; 516 } 517 518 public Message createMessage(int nr) throws JMSException 519 { 520 if (session == null) 521 throw new JMSException ("Session not allowed to be null"); 522 523 Message msg = session.createMessage(); 524 msg.setStringProperty(property, String.valueOf(start + nr)); 525 return msg; 526 } 527 } 528 529 public interface MessageFilter 530 { 531 public boolean ok(Message msg) throws JMSException ; 532 } 533 534 public class IntRangeMessageFilter implements MessageFilter 535 { 536 Class messageClass; 537 String className; 538 String property; 539 int low; 540 int max; 541 int counter = 0; 542 int report = 1000; 543 544 public IntRangeMessageFilter(Class messageClass, String property, int low, int max) 545 { 546 this.messageClass = messageClass; 547 this.property = property; 548 className = messageClass.getName(); 549 this.low = low; 550 this.max = max; 551 } 552 553 private boolean validateClass(Message msg) 554 { 555 Class clazz = null; 556 if (msg instanceof javax.jms.TextMessage ) 557 clazz = javax.jms.TextMessage .class; 558 else if (msg instanceof javax.jms.BytesMessage ) 559 clazz = javax.jms.BytesMessage .class; 560 else if (msg instanceof javax.jms.MapMessage ) 561 clazz = javax.jms.MapMessage .class; 562 else if (msg instanceof javax.jms.ObjectMessage ) 563 clazz = javax.jms.ObjectMessage .class; 564 else if (msg instanceof javax.jms.StreamMessage ) 565 clazz = javax.jms.StreamMessage .class; 566 else 567 clazz = javax.jms.Message .class; 568 569 return clazz.equals(messageClass); 570 } 571 572 public boolean ok(Message msg) throws JMSException 573 { 574 boolean res = false; 575 if (validateClass(msg)) 576 { 577 if (msg.propertyExists(property)) 578 { 579 String p = msg.getStringProperty(property); 580 try 581 { 582 int i = Integer.parseInt(p); 583 if (i >= low && i < max) 585 res = true; 586 } 587 catch (NumberFormatException ex) 588 { 589 throw new JMSException ("Property " + property + " was not int: " + p); 590 } 591 } 592 } 593 counter++; 594 int mod = counter % report; 595 if (mod == 0) 596 log.debug("Have received " + counter + " messages"); 597 return res; 598 } 599 600 } 601 602 628 632 public class QosConfig 633 { 634 int deliveryMode = DeliveryMode.PERSISTENT; 635 int priority = 4; 636 long ttl = 0; 637 } 638 639 public class TopicWorker extends JMSWorker 640 { 641 String durableHandle; 642 643 646 public TopicWorker() 647 { 648 super(); 649 } 650 651 public TopicWorker(int type, int transacted, MessageFilter filter) 652 { 653 super(type, transacted, filter); 654 } 655 656 public TopicWorker(int type, 657 int transacted, 658 MessageCreator creator, 659 int number 660 ) 661 { 662 super(type, transacted, creator, number); 663 } 664 665 666 public void publish() throws JMSException 667 { 668 publish(number); 669 } 670 671 public void publish(int nr) throws JMSException 672 { 673 if (producer == null) 674 producer = ((TopicSession ) session).createPublisher((Topic ) destination); 675 if (creator == null) 676 throw new JMSException ("Publish must have a MessageCreator set"); 677 678 creator.setSession(session); 679 log.debug("Publishing " + nr + " messages"); 680 for (int i = 0; i < nr; i++) 681 { 682 if (qosConfig != null) 683 { 684 ((TopicPublisher ) producer).publish(creator.createMessage(i), 685 qosConfig.deliveryMode, 686 qosConfig.priority, 687 qosConfig.ttl); 688 } 689 else 690 { 691 ((TopicPublisher ) producer).publish(creator.createMessage(i)); 692 } 693 694 messageHandled++; 695 } 696 if (session.getTransacted()) 697 session.commit(); 698 log.debug("Finished publishing"); 699 } 700 701 public void subscribe() throws JMSException 702 { 703 subscribe(false); 704 } 705 706 public void subscribe(boolean failsafe) throws JMSException 707 { 708 if (durableHandle != null) 709 consumer = ((TopicSession ) session).createDurableSubscriber((Topic ) destination, durableHandle); 710 else 711 consumer = ((TopicSession ) session).createSubscriber((Topic ) destination); 712 super.subscribe(failsafe); 713 connection.start(); 714 } 715 716 public void get() throws JMSException 717 { 718 consumer = ((TopicSession ) session).createSubscriber((Topic ) destination); 719 super.subscribe(); 720 connection.start(); 721 } 722 723 public void connect() throws JMSException 724 { 725 log.debug("Connecting: " + this.toString()); 726 if (userName != null) 727 connection = topicFactory.createTopicConnection(userName, password); 728 else 729 connection = topicFactory.createTopicConnection(); 730 731 if (clientID != null) 732 { 733 log.debug("Setting clientID" + clientID); 734 connection.setClientID(clientID); 735 } 736 737 session = ((TopicConnection ) connection).createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 738 try 739 { 740 destination = (Destination ) context.lookup(TEST_TOPIC); 741 } 742 catch (NamingException ex) 743 { 744 throw new JMSException ("Could not lookup topic " + ex); 745 } 746 } 747 748 public void setDurable(String userId, String pwd, String handle) 750 { 751 this.userName = userId; 752 this.password = pwd; 753 this.durableHandle = handle; 754 } 755 756 public void setDurable(String handle) 757 { 758 this.durableHandle = handle; 759 } 760 761 public void unsubscribe() throws JMSException 762 { 763 if (durableHandle != null) 764 ((TopicSession ) session).unsubscribe(durableHandle); 765 } 766 767 public String toString() 768 { 769 return "(userId=" + userName + " pwd=" + password + " handle=" + durableHandle + ")"; 770 } 771 772 } 773 774 public class QueueWorker extends JMSWorker 775 { 776 String userId; 777 String pwd; 778 String handle; 779 780 783 public QueueWorker() 784 { 785 super(); 786 } 787 788 public QueueWorker(int type, int transacted, MessageFilter filter) 789 { 790 super(type, transacted, filter); 791 } 792 793 public QueueWorker(int type, 794 int transacted, 795 MessageCreator creator, 796 int number 797 ) 798 { 799 super(type, transacted, creator, number); 800 } 801 802 803 public void publish() throws JMSException 804 { 805 publish(number); 806 } 807 808 public void publish(int nr) throws JMSException 809 { 810 if (producer == null) 811 producer = ((QueueSession ) session).createSender((Queue ) destination); 812 if (creator == null) 813 throw new JMSException ("Publish must have a MessageCreator set"); 814 815 creator.setSession(session); 816 log.debug("Publishing " + nr + " messages"); 817 for (int i = 0; i < nr; i++) 818 { 819 if (qosConfig != null) 820 { 821 ((QueueSender ) producer).send(creator.createMessage(i), 822 qosConfig.deliveryMode, 823 qosConfig.priority, 824 qosConfig.ttl); 825 } 826 else 827 { 828 ((QueueSender ) producer).send(creator.createMessage(i)); 829 } 830 831 messageHandled++; 832 } 833 if (session.getTransacted()) 834 session.commit(); 835 log.debug("Finished publishing"); 836 } 837 838 public void subscribe() throws JMSException 839 { 840 subscribe(false); 841 } 842 843 public void subscribe(boolean failsafe) throws JMSException 844 { 845 846 consumer = ((QueueSession ) session).createReceiver((Queue ) destination); 847 super.subscribe(failsafe); 848 connection.start(); 849 } 850 851 public void get() throws JMSException 852 { 853 consumer = ((QueueSession ) session).createReceiver((Queue ) destination); 854 super.subscribe(); 855 connection.start(); 856 } 857 858 public void connect() throws JMSException 859 { 860 log.debug("Connecting: " + this.toString()); 861 if (userName != null) 862 connection = queueFactory.createQueueConnection(userName, password); 863 else 864 connection = queueFactory.createQueueConnection(); 865 866 if (clientID != null) 867 connection.setClientID(clientID); 868 869 session = ((QueueConnection ) connection).createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE); 870 try 871 { 872 destination = (Destination ) context.lookup(TEST_QUEUE); 873 } 874 catch (NamingException ex) 875 { 876 throw new JMSException ("Could not lookup topic " + ex); 877 } 878 } 879 880 881 public Enumeration browse() throws JMSException 883 { 884 QueueBrowser b = ((QueueSession ) session).createBrowser((Queue ) destination); 885 return b.getEnumeration(); 886 } 887 } 888 } | Popular Tags |