1 package com.ubermq.jms.client.test; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.client.impl.*; 6 import com.ubermq.jms.common.routing.impl.*; 7 import com.ubermq.kernel.*; 8 import com.ubermq.kernel.event.*; 9 import java.util.*; 10 import java.util.regex.*; 11 import javax.jms.*; 12 import junit.framework.*; 13 14 import javax.jms.Connection ; 15 import javax.jms.Queue ; 16 import javax.jms.Session ; 17 import javax.jms.TopicSession ; 18 19 23 public class RegressionTestCase 24 extends TestCase 25 { 26 private static final org.apache.log4j.Logger log = 27 org.apache.log4j.Logger.getLogger(RegressionTestCase.class); 28 29 public static TestSuite suite() { 30 return new TestSuite(RegressionTestCase.class); 31 } 32 33 public RegressionTestCase(String sz) { 34 super(sz); 35 } 36 37 public RegressionTestCase() 38 { 39 super("Regression"); 40 } 41 42 private static final int RECV_TIMEOUT = 1000; 43 private static final int SEND_TIMEOUT = 200; 44 45 private TopicConnectionFactory f; 46 private TopicConnection tc1, tc2; 47 private TopicSession ts_client, ts_auto; 48 private Topic theTopic, theTopic2, asyncTopic, contentTopic; 49 50 public void setUp() 51 throws Exception 52 { 53 f = new UnicastConnectionFactory("localhost", 3999); 55 tc1 = f.createTopicConnection(); 56 tc2 = f.createTopicConnection(); 57 ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 58 ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 59 60 tc1.start(); 61 tc2.start(); 62 63 theTopic = ts_client.createTopic(THE_TOPIC); 64 theTopic2 = ts_client.createTopic(THE_TOPIC2); 65 asyncTopic = ts_client.createTopic(ASYNC_TOPIC); 66 contentTopic = ts_client.createTopic("content"); 67 } 68 69 public static final String THE_TOPIC = "TheTopic"; 71 public static final String THE_TOPIC2 = "TheTopic2"; 72 public static final String ASYNC_TOPIC = "AsyncTopic"; 73 74 public void testPubSub() 75 throws Exception 76 { 77 TopicPublisher p = ts_auto.createPublisher(theTopic), 78 p2 = ts_auto.createPublisher(theTopic2); 79 TopicSubscriber s = ts_client.createSubscriber(theTopic), 80 s2 = ts_client.createSubscriber(theTopic2); 81 82 sendExactly(ts_auto, p, theTopic, 10); 84 receiveAtLeast(s, 10); 85 86 sendExactly(ts_auto, p, theTopic, 50); 88 receiveExactly(s, 50); 89 receiveExactly(s2, 0); 90 91 sendExactly(ts_auto, p2, theTopic2, 10); 93 receiveExactly(s2, 10); 94 95 TopicSubscriber all = ts_client.createSubscriber(theTopic); 97 sendExactly(ts_auto, p, theTopic, 10); 98 receiveExactly(s, 10); 99 receiveExactly(all, 10); 100 all.close(); 101 102 all = ts_auto.createSubscriber(theTopic, "", false); 105 sendExactly(ts_auto, p, theTopic, 10); 106 receiveExactly(s, 10); 107 receiveExactly(all, 10); 108 all.close(); 109 110 TopicConnection temp = f.createTopicConnection(); 113 all = temp.createTopicSession(false, Session.AUTO_ACKNOWLEDGE).createSubscriber(theTopic); 114 temp.start(); 115 sendExactly(ts_auto, p, theTopic, 10); 116 receiveExactly(s, 10); 117 receiveExactly(all, 10); 118 all.close(); 119 temp.close(); 120 121 sendExactly(ts_auto, p, theTopic, 50); 123 receiveOrdered(s, 0, 50); 124 125 s2.close(); 127 s.close(); 128 p2.close(); 129 p.close(); 130 } 131 132 133 public void testLongMessage() 134 throws Exception 135 { 136 TopicPublisher p = ts_auto.createPublisher(contentTopic); 137 TopicSubscriber s = ts_client.createSubscriber(contentTopic); 138 139 StringBuffer sb = new StringBuffer (); 141 for (int i = 0; i < 80000; i++) 142 { 143 sb.append('h'); 144 } 145 146 Message m = ts_auto.createTextMessage(sb.toString()); 147 setProps(m); 148 p.publish(m); 149 150 TextMessage rm = (TextMessage)s.receive(RECV_TIMEOUT); 151 Assert.assertTrue(rm.getText().length() == sb.length()); 152 Assert.assertEquals(rm.getText(), sb.toString()); 153 } 154 155 public void testMessages() 156 throws Exception 157 { 158 TopicPublisher p = ts_auto.createPublisher(contentTopic); 159 TopicSubscriber s = ts_client.createSubscriber(contentTopic); 160 161 Message m = ts_auto.createMessage(); 163 setProps(m); 164 p.publish(m, 165 javax.jms.DeliveryMode.NON_PERSISTENT, 166 9, 167 150); 168 169 Message rm = s.receive(RECV_TIMEOUT); 170 verifyProps(rm); 171 Assert.assertEquals(javax.jms.DeliveryMode.NON_PERSISTENT, rm.getJMSDeliveryMode()); 172 Assert.assertEquals(9, rm.getJMSPriority()); 173 174 int actualTTL = (int)(rm.getJMSExpiration() - rm.getJMSTimestamp()); 175 Assert.assertTrue(Math.abs(actualTTL - 150) < 100); 176 177 BytesMessage bm = ts_auto.createBytesMessage(); 179 setProps(bm); 180 bm.writeUTF("some data"); 181 bm.writeInt(12345); 182 bm.writeLong(98765L); 183 p.publish(bm); 184 185 BytesMessage rbm = (BytesMessage)s.receive(RECV_TIMEOUT); 186 Assert.assertEquals("some data", rbm.readUTF()); 187 Assert.assertEquals(12345, rbm.readInt()); 188 Assert.assertEquals(98765L, rbm.readLong()); 189 verifyProps(rbm); 190 191 194 197 TextMessage tm = ts_auto.createTextMessage("hola bandito"); 199 setProps(tm); 200 p.publish(tm); 201 202 TextMessage rtm = (TextMessage)s.receive(RECV_TIMEOUT); 203 verifyProps(rtm); 204 Assert.assertEquals("hola bandito", rtm.getText()); 205 206 ObjectMessage om = ts_auto.createObjectMessage((java.io.Serializable ) 208 Collections.singleton(new String ("this is a collection"))); 209 setProps(om); 210 p.publish(om); 211 212 ObjectMessage rom = (ObjectMessage)s.receive(RECV_TIMEOUT); 213 verifyProps(rom); 214 Assert.assertEquals(Collections.singleton(new String ("this is a collection")), 215 rom.getObject()); 216 217 218 s.close(); 220 p.close(); 221 } 222 223 private void setProps(Message m) 224 throws Exception 225 { 226 m.setBooleanProperty("bool-true", true); 227 m.setBooleanProperty("bool-false", false); 228 m.setByteProperty("byte", (byte)0xcc); 229 m.setDoubleProperty("double", 45.4545D); 230 m.setFloatProperty("float", 123.456F); 231 m.setIntProperty("int", 98765); 232 m.setLongProperty("long", Long.MAX_VALUE); 233 m.setObjectProperty("object", new String ("blah")); 234 m.setShortProperty("short", Short.MAX_VALUE); 235 m.setStringProperty("string", "hello there"); 236 237 m.setJMSCorrelationID("My Correlation"); 238 m.setJMSReplyTo(ts_client.createTopic("reply")); 239 } 240 241 private void verifyProps(Message m) 242 throws Exception 243 { 244 Assert.assertEquals(true, m.getBooleanProperty("bool-true")); 245 Assert.assertEquals(false, m.getBooleanProperty("bool-false")); 246 Assert.assertEquals((byte)0xcc, m.getByteProperty("byte")); 247 Assert.assertTrue(45.4545D == m.getDoubleProperty("double")); 248 Assert.assertTrue(123.456F == m.getFloatProperty("float")); 249 Assert.assertEquals(98765, m.getIntProperty("int")); 250 Assert.assertEquals(Long.MAX_VALUE, m.getLongProperty("long")); 251 Assert.assertEquals("blah", m.getObjectProperty("object")); 252 Assert.assertEquals(Short.MAX_VALUE, m.getShortProperty("short")); 253 Assert.assertEquals("hello there", m.getStringProperty("string")); 254 255 Assert.assertEquals("My Correlation", m.getJMSCorrelationID()); 256 Assert.assertEquals(false, m.getJMSRedelivered()); 257 Assert.assertEquals(ts_client.createTopic("reply"), m.getJMSReplyTo()); 258 } 259 260 261 public void testStress() 262 throws Exception 263 { 264 doAsync(30, 100); 265 } 266 267 public void testAsyncSubscriber() 268 throws Exception 269 { 270 doAsync(15, 1); 271 } 272 273 private void doAsync(int N, int LOOPS) 274 throws Exception 275 { 276 final SynchronizedInt i = new SynchronizedInt(0); 277 278 TopicPublisher p = ts_auto.createPublisher(asyncTopic); 279 TopicSubscriber s = ts_client.createSubscriber(asyncTopic); 280 281 s.setMessageListener(new MessageListener() { 282 public void onMessage(Message p0) 283 { 284 i.increment(); 285 } 286 }); 287 288 for (int j = 0; j < LOOPS; j++) 289 { 290 sendExactly(ts_auto, p, asyncTopic, N); 291 } 292 293 Thread.sleep(RECV_TIMEOUT); 295 296 Assert.assertEquals(N * LOOPS, i.get()); 298 299 s.close(); 301 p.close(); 302 } 303 304 public void testAsyncClose() 305 throws Exception 306 { 307 final TopicConnection tc1 = f.createTopicConnection(); 308 final TopicSession ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 309 final TopicSubscriber s = ts_client.createSubscriber(asyncTopic); 310 311 tc1.start(); 312 313 TopicPublisher p = ts_auto.createPublisher(asyncTopic); 314 315 s.setMessageListener(new MessageListener() 316 { 317 public void onMessage(Message message) 318 { 319 try 320 { 321 s.close(); 322 ts_client.close(); 323 tc1.close(); 324 System.out.println("done"); 325 } 326 catch (JMSException e) { 327 log.error("", e);; 328 } 329 } 330 }); 331 332 p.publish(ts_client.createMessage()); 333 p.close(); 334 } 335 336 public void testJMS11() 337 throws JMSException 338 { 339 ConnectionFactory f = new UnicastConnectionFactory("localhost"); 340 Connection c = f.createConnection(), c2 = f.createConnection(); 341 342 Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); 343 Topic t = s.createTopic("jms11-topic"); 344 Queue q = s.createQueue("jms11-queue"); 345 346 MessageProducer mp = s.createProducer(t); 347 348 Session s2 = c2.createSession(false, Session.AUTO_ACKNOWLEDGE); 349 MessageConsumer mc = s2.createConsumer(t); 350 351 c.start(); 352 c2.start(); 353 354 sendExactly(s, mp, t, 50); 356 receiveOrdered(mc, 50); 357 358 mp.close(); 360 mp = s.createProducer(q); 361 362 mc.close(); 363 mc = s2.createConsumer(q); 364 365 while(mc.receiveNoWait() != null); 367 368 sendExactly(s, mp, q, 50); 370 receiveOrdered(mc, 50); 371 372 mc.close(); 374 mp.close(); 375 s.close(); 376 s2.close(); 377 c.close(); 378 c2.close(); 379 } 380 381 public void testNamespace() 382 throws JMSException 383 { 384 sendAndReceive(f, "test", "test", 10, 10); 386 sendAndReceive(f, "test", "ubermq", 10, 0); 387 388 sendAndReceive(f, "test.blah.blah", "test.blah.blah", 10, 10); 390 391 sendAndReceive(f, "howdy", "#", 10, 10); 393 sendAndReceive(f, ".secret", "#", 10, 0); 394 sendAndReceive(f, ".secret", "*", 10, 0); 395 396 sendAndReceive(f, "howdy", "~[^x]*", 10, 10); 398 sendAndReceive(f, "ydwoh", "~[howdy]*", 10, 10); 399 400 sendAndReceive(f, "test.thisone", "test.*", 10, 10); 402 sendAndReceive(f, "test.thisone", "test.#", 10, 10); 403 sendAndReceive(f, "test.thisone", "test.thisone", 10, 10); 404 sendAndReceive(f, "test.thisone.another", "test.*", 10, 0); 405 sendAndReceive(f, "test.thisone.another", "test.#", 10, 10); 406 sendAndReceive(f, "test..secret", "test.#", 10, 0); 407 } 408 409 public void testSelectors() 410 throws Exception 411 { 412 sendAndReceive(f, "test", "test", "ordinal = 2", 10, 1); 413 sendAndReceive(f, "test2", "test2", "where ordinal != 2", 10, 9); 414 415 TopicPublisher pub = ts_auto.createPublisher(contentTopic); 417 TopicSubscriber sub = ts_client.createSubscriber(contentTopic, "string='hello there'", true); 418 419 Message msg = this.ts_auto.createMessage(); 420 setProps(msg); 421 422 pub.publish(msg); 423 Assert.assertNotNull(sub.receive(RECV_TIMEOUT)); 424 pub.publish(ts_auto.createMessage()); 425 Assert.assertNull(sub.receive(RECV_TIMEOUT)); 426 } 427 428 432 public void testPublishSenderIdentifiers() 433 throws JMSException 434 { 435 Message m = ts_auto.createMessage(), r = null, r2 = null; 436 TopicPublisher p = ts_auto.createPublisher(null); 437 TopicSubscriber s = ts_client.createSubscriber(ts_client.createTopic("#")); 438 439 p.publish(ts_auto.createTopic("A"), m); 440 r = s.receive(RECV_TIMEOUT); 441 p.publish(ts_auto.createTopic("B"), m); 442 r2 = s.receive(RECV_TIMEOUT); 443 Assert.assertTrue(((LocalMessage)r).getSenderId() != 444 ((LocalMessage)r2).getSenderId()); 445 446 TopicPublisher p2 = ts_auto.createPublisher(ts_auto.createTopic("C")); 447 p2.publish(m); 448 r = s.receive(RECV_TIMEOUT); 449 p2.publish(m); 450 r2 = s.receive(RECV_TIMEOUT); 451 452 Assert.assertEquals(((LocalMessage)r).getSenderId(), ((LocalMessage)r2).getSenderId()); 453 Assert.assertEquals(((LocalMessage)r).getSequence() + 1, ((LocalMessage)r2).getSequence()); 454 455 s.close(); 456 p.close(); 457 } 458 459 463 public void testResending() 464 throws JMSException 465 { 466 Message m = ts_auto.createMessage(), r = null; 467 TopicPublisher p = ts_auto.createPublisher(theTopic), 468 p2 = ts_client.createPublisher(theTopic2); 469 TopicSubscriber s = ts_client.createSubscriber(theTopic), 470 s2 = ts_auto.createSubscriber(theTopic2); 471 472 p.publish(m); 473 r = s.receive(RECV_TIMEOUT); 474 475 Assert.assertEquals(m.getJMSMessageID(), r.getJMSMessageID()); 477 478 String originalID = r.getJMSMessageID(); 479 p2.publish(r); 480 m = s2.receive(RECV_TIMEOUT); 481 482 Assert.assertEquals(m.getJMSMessageID(), r.getJMSMessageID()); 484 Assert.assertTrue(!originalID.equals(m.getJMSMessageID())); 485 486 s2.close(); 488 s.close(); 489 p2.close(); 490 p.close(); 491 } 492 493 496 public void testSSL() 497 throws JMSException 498 { 499 System.setProperty("javax.net.ssl.trustStore", "bin/sample.keystore"); 500 TopicConnectionFactory localFactory = new URLTopicConnectionFactory("ubermqs://localhost"), 501 remoteFactory = new URLTopicConnectionFactory("ubermqs://localhost"); 502 503 System.out.println("testPipes"); 504 sendAndReceive(localFactory, remoteFactory, "A", "A", null, 20, 20); 505 sendAndReceive(localFactory, "hello", "hello", null, 20, 20); 506 sendAndReceive(localFactory, "B", "B", "where ordinal != 2", 10, 9); 507 sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20); 508 } 509 512 public void testQueues() 513 throws Exception 514 { 515 QueueConnectionFactory f = new UnicastConnectionFactory("localhost"); 516 517 javax.jms.QueueConnection qc = f.createQueueConnection(); 518 javax.jms.QueueSession qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 519 javax.jms.QueueSender sender = qs.createSender(qs.createQueue("my-queue")); 520 qc.start(); 521 522 javax.jms.QueueConnection qc2 = f.createQueueConnection(); 523 javax.jms.QueueSession qs2 = qc2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 524 javax.jms.QueueReceiver r = qs2.createReceiver(qs2.createQueue("my-queue")); 525 qc2.start(); 526 527 Message m = qs.createMessage(); 528 529 sender.send(m); 531 Assert.assertNotNull(r.receive(RECV_TIMEOUT)); 532 } 533 534 538 public void xtestOverflows() 539 throws Exception 540 { 541 TopicConnection tc1 = f.createTopicConnection(); 542 TopicConnection tc2 = f.createTopicConnection(); 543 TopicSession ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 544 TopicSession ts_client2 = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 545 TopicSession ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 546 Topic overflowTopic = ts_auto.createTopic("F"); 547 TopicSubscriber tsub = ts_client.createSubscriber(overflowTopic ); 548 TopicSubscriber tsub2 = ts_client2.createSubscriber(overflowTopic ); 549 TopicPublisher tpub = ts_auto.createPublisher(overflowTopic ); 550 551 tc1.start(); 552 tc2.start(); 553 554 final SynchronizedBoolean assertThisValue = new SynchronizedBoolean(true); 556 final SynchronizedBoolean shouldBlock = new SynchronizedBoolean(true); 557 final Object bogusLock = new Object (); 558 559 tsub.setMessageListener(new MessageListener() { 562 public void onMessage(Message p0) 563 { 564 try 565 { 566 if (shouldBlock.get()) { 567 synchronized(bogusLock) { 568 bogusLock.wait(); 569 } 570 } 571 p0.acknowledge(); 572 } 573 catch (Exception e) { 574 log.error("", e);; 575 } 576 } 577 }); 578 tsub2.setMessageListener(new MessageListener() { 579 public void onMessage(Message p0) 580 { 581 Assert.assertTrue(assertThisValue.get()); 582 } 583 }); 584 585 sendExactly(ts_auto, tpub, overflowTopic, 40000); 587 System.out.println("just finished sending"); 588 589 shouldBlock.set(false); 592 synchronized(bogusLock) { 593 bogusLock.notifyAll(); 594 } 595 try 596 { 597 Thread.sleep(10000); 598 } 599 catch (InterruptedException e) 600 { 601 log.error("", e);; 602 } 603 604 assertThisValue.set(false); 610 sendAndReceive(f, overflowTopic.getTopicName(), overflowTopic.getTopicName(), 20, 20); 611 612 tsub.close(); 614 tpub.close(); 615 } 616 617 620 public void testEvents() 621 throws Exception 622 { 623 final SynchronizedInt i = new SynchronizedInt(0); 624 625 ConnectionEventListener l = new ConnectionEventListener() { 626 public void connectionEvent(ConnectionEvent e) 627 { 628 if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED) 629 i.increment(); 630 } 631 }, l2 = new ConnectionEventListener() { 632 633 public void connectionEvent(ConnectionEvent e) 634 { 635 if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED) 636 i.decrement(); 637 } 638 }; 639 640 TopicConnection tc1 = f.createTopicConnection(); 641 ((IConnectionInfo)tc1).addEventListener(l); 642 tc1.close(); 643 Assert.assertEquals(i.get(), 1); 644 i.set(0); 645 646 tc1 = f.createTopicConnection(); 647 ((IConnectionInfo)tc1).addEventListener(l); 648 ((IConnectionInfo)tc1).addEventListener(l2); 649 tc1.close(); 650 Assert.assertEquals(i.get(), 0); 651 i.set(0); 652 653 tc1 = f.createTopicConnection(); 654 ((IConnectionInfo)tc1).addEventListener(l); 655 ((IConnectionInfo)tc1).addEventListener(l2); 656 ((IConnectionInfo)tc1).removeEventListener(l2); 657 tc1.close(); 658 Assert.assertEquals(i.get(), 1); 659 i.set(0); 660 661 } 662 663 public void testMulticast() 664 throws Exception 665 { 666 TopicConnectionFactory mcf = new MulticastTopicConnectionFactory("234.2.3.4", 1234); 667 668 sendAndReceive(mcf, "hello", "hello", null, 20, 20); 669 sendAndReceive(mcf, "A", "A", null, 20, 20); 670 sendAndReceive(mcf, "B", "B", "where ordinal != 2", 10, 9); 671 sendAndReceive(mcf, "B", "B", "where ordinal = 2", 10, 1); 672 sendAndReceive(mcf, "B", "B", null, 20, 20); 673 } 674 675 public static void sendAndReceive(TopicConnectionFactory f, 676 String pub, 677 String sub, 678 int send, 679 int recv) 680 throws JMSException 681 { 682 sendAndReceive(f, pub, sub, null, send, recv); 683 } 684 685 public static void sendAndReceive(TopicConnectionFactory f, 686 String pub, 687 String sub, 688 String selector, 689 int send, 690 int recv) 691 throws JMSException 692 { 693 sendAndReceive(f, f, pub, sub, selector, send, recv); 694 } 695 696 public static void sendAndReceive(TopicConnectionFactory f, 697 TopicConnectionFactory g, 698 String pub, 699 String sub, 700 String selector, 701 int send, 702 int recv) 703 throws JMSException 704 { 705 TopicConnection tc1 = f.createTopicConnection(); 706 TopicConnection tc2 = g.createTopicConnection(); 707 TopicSession ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 708 TopicSession ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 709 710 TopicPublisher p = ts_auto.createPublisher(ts_auto.createTopic(pub)); 711 TopicSubscriber s = ts_client.createSubscriber(ts_client.createTopic(sub), 712 selector, 713 true); 714 715 tc1.start(); 716 tc2.start(); 717 718 sendExactly(ts_auto, p, ts_auto.createTopic(pub), send); 719 720 if (selector == null && recv > 0) 722 receiveOrdered(s, recv); 723 else receiveExactly(s, recv); 724 725 s.close(); 726 p.close(); 727 ts_client.close(); 728 ts_auto.close(); 729 tc1.close(); 730 tc2.close(); 731 } 732 733 public static void sendExactly(Session s, 734 MessageProducer p, 735 Destination t, 736 int n) 737 throws JMSException 738 { 739 for (int i = 0; i < n; i++) 740 { 741 Message m = s.createMessage(); 742 m.setIntProperty("ordinal", i); 743 p.send(t, m); 744 745 if ((i % 1000) == 0 && i > 0) 746 System.out.println("sendExactly " + i); 747 } 748 749 try 750 { 751 Thread.sleep(SEND_TIMEOUT); 752 } 753 catch (InterruptedException e) {} 754 } 755 756 public static void receiveExactly(MessageConsumer s, 757 int n) 758 throws JMSException 759 { 760 receiveAtLeast(s, n); 761 Assert.assertNull(s.receiveNoWait()); 762 } 763 764 public static void receiveAtLeast(MessageConsumer s, 765 int n) 766 throws JMSException 767 { 768 for (int i = 0; i < n; i++) 769 { 770 Message m = s.receive(RECV_TIMEOUT); 771 Assert.assertNotNull(m); 772 m.acknowledge(); 773 log.debug("got message " + i + " ordinal was " + m.getIntProperty("ordinal")); 774 } 775 } 776 777 public static void receiveOrdered(MessageConsumer s, 778 int n) 779 throws JMSException 780 { 781 if (n >= 1) { 782 Message m = s.receive(RECV_TIMEOUT); 783 Assert.assertNotNull(m); 784 m.acknowledge(); 785 receiveOrdered(s, m.getIntProperty("ordinal")+1, n-1); 786 } 787 } 788 789 public static void receiveOrdered(MessageConsumer s, 790 int startingAt, 791 int n) 792 throws JMSException 793 { 794 int lastSequence=0; 795 796 for (int i = 0; i < n; i++) 797 { 798 Message m = s.receive(RECV_TIMEOUT); 799 Assert.assertNotNull(m); 800 Assert.assertEquals(i + startingAt, m.getIntProperty("ordinal")); 801 Assert.assertTrue( ((LocalMessage)m).getSequence() > lastSequence ); 802 m.acknowledge(); 803 804 lastSequence = ((LocalMessage)m).getSequence(); 805 } 806 } 807 } 808 | Popular Tags |