KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > test > RegressionTestCase


1 package com.ubermq.jms.client.test;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.client.impl.*;
6 import com.ubermq.jms.common.routing.impl.*;
7 import com.ubermq.kernel.*;
8 import com.ubermq.kernel.event.*;
9 import java.util.*;
10 import java.util.regex.*;
11 import javax.jms.*;
12 import junit.framework.*;
13
14 import javax.jms.Connection JavaDoc;
15 import javax.jms.Queue JavaDoc;
16 import javax.jms.Session JavaDoc;
17 import javax.jms.TopicSession JavaDoc;
18
19 /**
20  * A JUnit test case that exercises significant portions of the
21  * UberMQ system.
22  */

23 public class RegressionTestCase
24     extends TestCase
25 {
26     private static final org.apache.log4j.Logger log =
27         org.apache.log4j.Logger.getLogger(RegressionTestCase.class);
28
29     public static TestSuite suite() {
30         return new TestSuite(RegressionTestCase.class);
31     }
32
33     public RegressionTestCase(String JavaDoc sz) {
34         super(sz);
35     }
36
37     public RegressionTestCase()
38     {
39         super("Regression");
40     }
41
42     private static final int RECV_TIMEOUT = 1000;
43     private static final int SEND_TIMEOUT = 200;
44
45     private TopicConnectionFactory f;
46     private TopicConnection tc1, tc2;
47     private TopicSession JavaDoc ts_client, ts_auto;
48     private Topic theTopic, theTopic2, asyncTopic, contentTopic;
49
50     public void setUp()
51         throws Exception JavaDoc
52     {
53         // connect
54
f = new UnicastConnectionFactory("localhost", 3999);
55         tc1 = f.createTopicConnection();
56         tc2 = f.createTopicConnection();
57         ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
58         ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
59
60         tc1.start();
61         tc2.start();
62
63         theTopic = ts_client.createTopic(THE_TOPIC);
64         theTopic2 = ts_client.createTopic(THE_TOPIC2);
65         asyncTopic = ts_client.createTopic(ASYNC_TOPIC);
66         contentTopic = ts_client.createTopic("content");
67     }
68
69     // a topic used for general testing.
70
public static final String JavaDoc THE_TOPIC = "TheTopic";
71     public static final String JavaDoc THE_TOPIC2 = "TheTopic2";
72     public static final String JavaDoc ASYNC_TOPIC = "AsyncTopic";
73
74     public void testPubSub()
75         throws Exception JavaDoc
76     {
77         TopicPublisher p = ts_auto.createPublisher(theTopic),
78             p2 = ts_auto.createPublisher(theTopic2);
79         TopicSubscriber s = ts_client.createSubscriber(theTopic),
80             s2 = ts_client.createSubscriber(theTopic2);
81
82         // send 10, receive at least 10
83
sendExactly(ts_auto, p, theTopic, 10);
84         receiveAtLeast(s, 10);
85
86         // send 50, receive exactly 50, s2 gets none
87
sendExactly(ts_auto, p, theTopic, 50);
88         receiveExactly(s, 50);
89         receiveExactly(s2, 0);
90
91         // send 10, receive exactly ten on s2
92
sendExactly(ts_auto, p2, theTopic2, 10);
93         receiveExactly(s2, 10);
94
95         // set up another subscriber on the same cxn and make sure both subs get 10 msgs
96
TopicSubscriber all = ts_client.createSubscriber(theTopic);
97         sendExactly(ts_auto, p, theTopic, 10);
98         receiveExactly(s, 10);
99         receiveExactly(all, 10);
100         all.close();
101
102         // set up another subscriber, noLocal = false, on the pub cxn and make sure
103
// both get 10
104
all = ts_auto.createSubscriber(theTopic, "", false);
105         sendExactly(ts_auto, p, theTopic, 10);
106         receiveExactly(s, 10);
107         receiveExactly(all, 10);
108         all.close();
109
110         // set up another subscriber on a new cxnand make sure
111
// both get 10
112
TopicConnection temp = f.createTopicConnection();
113         all = temp.createTopicSession(false, Session.AUTO_ACKNOWLEDGE).createSubscriber(theTopic);
114         temp.start();
115         sendExactly(ts_auto, p, theTopic, 10);
116         receiveExactly(s, 10);
117         receiveExactly(all, 10);
118         all.close();
119         temp.close();
120
121         // test ordering
122
sendExactly(ts_auto, p, theTopic, 50);
123         receiveOrdered(s, 0, 50);
124
125         // close out
126
s2.close();
127         s.close();
128         p2.close();
129         p.close();
130     }
131
132
133     public void testLongMessage()
134         throws Exception JavaDoc
135     {
136         TopicPublisher p = ts_auto.createPublisher(contentTopic);
137         TopicSubscriber s = ts_client.createSubscriber(contentTopic);
138
139         // std message
140
StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
141         for (int i = 0; i < 80000; i++)
142         {
143             sb.append('h');
144         }
145
146         Message m = ts_auto.createTextMessage(sb.toString());
147         setProps(m);
148         p.publish(m);
149
150         TextMessage rm = (TextMessage)s.receive(RECV_TIMEOUT);
151         Assert.assertTrue(rm.getText().length() == sb.length());
152         Assert.assertEquals(rm.getText(), sb.toString());
153     }
154
155     public void testMessages()
156         throws Exception JavaDoc
157     {
158         TopicPublisher p = ts_auto.createPublisher(contentTopic);
159         TopicSubscriber s = ts_client.createSubscriber(contentTopic);
160
161         // std message
162
Message m = ts_auto.createMessage();
163         setProps(m);
164         p.publish(m,
165                   javax.jms.DeliveryMode.NON_PERSISTENT,
166                   9,
167                   150);
168
169         Message rm = s.receive(RECV_TIMEOUT);
170         verifyProps(rm);
171         Assert.assertEquals(javax.jms.DeliveryMode.NON_PERSISTENT, rm.getJMSDeliveryMode());
172         Assert.assertEquals(9, rm.getJMSPriority());
173
174         int actualTTL = (int)(rm.getJMSExpiration() - rm.getJMSTimestamp());
175         Assert.assertTrue(Math.abs(actualTTL - 150) < 100);
176
177         // bytes
178
BytesMessage bm = ts_auto.createBytesMessage();
179         setProps(bm);
180         bm.writeUTF("some data");
181         bm.writeInt(12345);
182         bm.writeLong(98765L);
183         p.publish(bm);
184
185         BytesMessage rbm = (BytesMessage)s.receive(RECV_TIMEOUT);
186         Assert.assertEquals("some data", rbm.readUTF());
187         Assert.assertEquals(12345, rbm.readInt());
188         Assert.assertEquals(98765L, rbm.readLong());
189         verifyProps(rbm);
190
191         // stream msg
192
// TODO
193

194         // map msg
195
// TODO
196

197         // text msg
198
TextMessage tm = ts_auto.createTextMessage("hola bandito");
199         setProps(tm);
200         p.publish(tm);
201
202         TextMessage rtm = (TextMessage)s.receive(RECV_TIMEOUT);
203         verifyProps(rtm);
204         Assert.assertEquals("hola bandito", rtm.getText());
205
206         // object msg
207
ObjectMessage om = ts_auto.createObjectMessage((java.io.Serializable JavaDoc)
208                                                                           Collections.singleton(new String JavaDoc("this is a collection")));
209         setProps(om);
210         p.publish(om);
211
212         ObjectMessage rom = (ObjectMessage)s.receive(RECV_TIMEOUT);
213         verifyProps(rom);
214         Assert.assertEquals(Collections.singleton(new String JavaDoc("this is a collection")),
215                             rom.getObject());
216
217
218         // done, close.
219
s.close();
220         p.close();
221     }
222
223     private void setProps(Message m)
224         throws Exception JavaDoc
225     {
226         m.setBooleanProperty("bool-true", true);
227         m.setBooleanProperty("bool-false", false);
228         m.setByteProperty("byte", (byte)0xcc);
229         m.setDoubleProperty("double", 45.4545D);
230         m.setFloatProperty("float", 123.456F);
231         m.setIntProperty("int", 98765);
232         m.setLongProperty("long", Long.MAX_VALUE);
233         m.setObjectProperty("object", new String JavaDoc("blah"));
234         m.setShortProperty("short", Short.MAX_VALUE);
235         m.setStringProperty("string", "hello there");
236
237         m.setJMSCorrelationID("My Correlation");
238         m.setJMSReplyTo(ts_client.createTopic("reply"));
239     }
240
241     private void verifyProps(Message m)
242         throws Exception JavaDoc
243     {
244         Assert.assertEquals(true, m.getBooleanProperty("bool-true"));
245         Assert.assertEquals(false, m.getBooleanProperty("bool-false"));
246         Assert.assertEquals((byte)0xcc, m.getByteProperty("byte"));
247         Assert.assertTrue(45.4545D == m.getDoubleProperty("double"));
248         Assert.assertTrue(123.456F == m.getFloatProperty("float"));
249         Assert.assertEquals(98765, m.getIntProperty("int"));
250         Assert.assertEquals(Long.MAX_VALUE, m.getLongProperty("long"));
251         Assert.assertEquals("blah", m.getObjectProperty("object"));
252         Assert.assertEquals(Short.MAX_VALUE, m.getShortProperty("short"));
253         Assert.assertEquals("hello there", m.getStringProperty("string"));
254
255         Assert.assertEquals("My Correlation", m.getJMSCorrelationID());
256         Assert.assertEquals(false, m.getJMSRedelivered());
257         Assert.assertEquals(ts_client.createTopic("reply"), m.getJMSReplyTo());
258     }
259
260
261     public void testStress()
262         throws Exception JavaDoc
263     {
264         doAsync(30, 100);
265     }
266
267     public void testAsyncSubscriber()
268         throws Exception JavaDoc
269     {
270         doAsync(15, 1);
271     }
272
273     private void doAsync(int N, int LOOPS)
274         throws Exception JavaDoc
275     {
276         final SynchronizedInt i = new SynchronizedInt(0);
277
278         TopicPublisher p = ts_auto.createPublisher(asyncTopic);
279         TopicSubscriber s = ts_client.createSubscriber(asyncTopic);
280
281         s.setMessageListener(new MessageListener() {
282                     public void onMessage(Message p0)
283                     {
284                         i.increment();
285                     }
286                 });
287
288         for (int j = 0; j < LOOPS; j++)
289         {
290             sendExactly(ts_auto, p, asyncTopic, N);
291         }
292
293         // wait a little while...
294
Thread.sleep(RECV_TIMEOUT);
295
296         // make sure we got all the messages
297
Assert.assertEquals(N * LOOPS, i.get());
298
299         // go away
300
s.close();
301         p.close();
302     }
303
304     public void testAsyncClose()
305         throws Exception JavaDoc
306     {
307         final TopicConnection tc1 = f.createTopicConnection();
308         final TopicSession JavaDoc ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
309         final TopicSubscriber s = ts_client.createSubscriber(asyncTopic);
310
311         tc1.start();
312
313         TopicPublisher p = ts_auto.createPublisher(asyncTopic);
314
315         s.setMessageListener(new MessageListener()
316                              {
317                     public void onMessage(Message message)
318                     {
319                         try
320                         {
321                             s.close();
322                             ts_client.close();
323                             tc1.close();
324                             System.out.println("done");
325                         }
326                         catch (JMSException e) {
327                             log.error("", e);;
328                         }
329                     }
330                 });
331
332         p.publish(ts_client.createMessage());
333         p.close();
334     }
335
336     public void testJMS11()
337         throws JMSException
338     {
339         ConnectionFactory f = new UnicastConnectionFactory("localhost");
340         Connection JavaDoc c = f.createConnection(), c2 = f.createConnection();
341
342         Session JavaDoc s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
343         Topic t = s.createTopic("jms11-topic");
344         Queue JavaDoc q = s.createQueue("jms11-queue");
345
346         MessageProducer mp = s.createProducer(t);
347
348         Session JavaDoc s2 = c2.createSession(false, Session.AUTO_ACKNOWLEDGE);
349         MessageConsumer mc = s2.createConsumer(t);
350
351         c.start();
352         c2.start();
353
354         // send & recv
355
sendExactly(s, mp, t, 50);
356         receiveOrdered(mc, 50);
357
358         // do queue stuff
359
mp.close();
360         mp = s.createProducer(q);
361
362         mc.close();
363         mc = s2.createConsumer(q);
364
365         // drain the queue first.
366
while(mc.receiveNoWait() != null);
367
368         // send & recv
369
sendExactly(s, mp, q, 50);
370         receiveOrdered(mc, 50);
371
372         // close out.
373
mc.close();
374         mp.close();
375         s.close();
376         s2.close();
377         c.close();
378         c2.close();
379     }
380
381     public void testNamespace()
382         throws JMSException
383     {
384         // test single level
385
sendAndReceive(f, "test", "test", 10, 10);
386         sendAndReceive(f, "test", "ubermq", 10, 0);
387
388         // test dots
389
sendAndReceive(f, "test.blah.blah", "test.blah.blah", 10, 10);
390
391         // test that # does not catch things that begin with dot
392
sendAndReceive(f, "howdy", "#", 10, 10);
393         sendAndReceive(f, ".secret", "#", 10, 0);
394         sendAndReceive(f, ".secret", "*", 10, 0);
395
396         // test regex pass thru
397
sendAndReceive(f, "howdy", "~[^x]*", 10, 10);
398         sendAndReceive(f, "ydwoh", "~[howdy]*", 10, 10);
399
400         // test general hierarchical stuff
401
sendAndReceive(f, "test.thisone", "test.*", 10, 10);
402         sendAndReceive(f, "test.thisone", "test.#", 10, 10);
403         sendAndReceive(f, "test.thisone", "test.thisone", 10, 10);
404         sendAndReceive(f, "test.thisone.another", "test.*", 10, 0);
405         sendAndReceive(f, "test.thisone.another", "test.#", 10, 10);
406         sendAndReceive(f, "test..secret", "test.#", 10, 0);
407     }
408
409     public void testSelectors()
410         throws Exception JavaDoc
411     {
412         sendAndReceive(f, "test", "test", "ordinal = 2", 10, 1);
413         sendAndReceive(f, "test2", "test2", "where ordinal != 2", 10, 9);
414
415         // do a more sophisticated selector
416
TopicPublisher pub = ts_auto.createPublisher(contentTopic);
417         TopicSubscriber sub = ts_client.createSubscriber(contentTopic, "string='hello there'", true);
418
419         Message msg = this.ts_auto.createMessage();
420         setProps(msg);
421
422         pub.publish(msg);
423         Assert.assertNotNull(sub.receive(RECV_TIMEOUT));
424         pub.publish(ts_auto.createMessage());
425         Assert.assertNull(sub.receive(RECV_TIMEOUT));
426     }
427
428     /**
429      * A test case to make sure that sender ID and sequence numbers are being
430      * correctly set.
431      */

432     public void testPublishSenderIdentifiers()
433         throws JMSException
434     {
435         Message m = ts_auto.createMessage(), r = null, r2 = null;
436         TopicPublisher p = ts_auto.createPublisher(null);
437         TopicSubscriber s = ts_client.createSubscriber(ts_client.createTopic("#"));
438
439         p.publish(ts_auto.createTopic("A"), m);
440         r = s.receive(RECV_TIMEOUT);
441         p.publish(ts_auto.createTopic("B"), m);
442         r2 = s.receive(RECV_TIMEOUT);
443         Assert.assertTrue(((LocalMessage)r).getSenderId() !=
444                               ((LocalMessage)r2).getSenderId());
445
446         TopicPublisher p2 = ts_auto.createPublisher(ts_auto.createTopic("C"));
447         p2.publish(m);
448         r = s.receive(RECV_TIMEOUT);
449         p2.publish(m);
450         r2 = s.receive(RECV_TIMEOUT);
451
452         Assert.assertEquals(((LocalMessage)r).getSenderId(), ((LocalMessage)r2).getSenderId());
453         Assert.assertEquals(((LocalMessage)r).getSequence() + 1, ((LocalMessage)r2).getSequence());
454
455         s.close();
456         p.close();
457     }
458
459     /**
460      * A test to make sure that receiving a message, then turingin around
461      * and republishing it works as intended.
462      */

463     public void testResending()
464         throws JMSException
465     {
466         Message m = ts_auto.createMessage(), r = null;
467         TopicPublisher p = ts_auto.createPublisher(theTopic),
468             p2 = ts_client.createPublisher(theTopic2);
469         TopicSubscriber s = ts_client.createSubscriber(theTopic),
470             s2 = ts_auto.createSubscriber(theTopic2);
471
472         p.publish(m);
473         r = s.receive(RECV_TIMEOUT);
474
475         // should be the same message
476
Assert.assertEquals(m.getJMSMessageID(), r.getJMSMessageID());
477
478         String JavaDoc originalID = r.getJMSMessageID();
479         p2.publish(r);
480         m = s2.receive(RECV_TIMEOUT);
481
482         // should be the same message, but different from the original ID
483
Assert.assertEquals(m.getJMSMessageID(), r.getJMSMessageID());
484         Assert.assertTrue(!originalID.equals(m.getJMSMessageID()));
485
486         // close out
487
s2.close();
488         s.close();
489         p2.close();
490         p.close();
491     }
492
493     /**
494      * Tests SSL connections and makes sure they work w/ themselves, and others.
495      */

496     public void testSSL()
497         throws JMSException
498     {
499         System.setProperty("javax.net.ssl.trustStore", "bin/sample.keystore");
500         TopicConnectionFactory localFactory = new URLTopicConnectionFactory("ubermqs://localhost"),
501             remoteFactory = new URLTopicConnectionFactory("ubermqs://localhost");
502
503         System.out.println("testPipes");
504         sendAndReceive(localFactory, remoteFactory, "A", "A", null, 20, 20);
505         sendAndReceive(localFactory, "hello", "hello", null, 20, 20);
506         sendAndReceive(localFactory, "B", "B", "where ordinal != 2", 10, 9);
507         sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20);
508     }
509     /**
510      * Tests queue APIs.
511      */

512     public void testQueues()
513         throws Exception JavaDoc
514     {
515         QueueConnectionFactory f = new UnicastConnectionFactory("localhost");
516
517         javax.jms.QueueConnection JavaDoc qc = f.createQueueConnection();
518         javax.jms.QueueSession JavaDoc qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
519         javax.jms.QueueSender JavaDoc sender = qs.createSender(qs.createQueue("my-queue"));
520         qc.start();
521
522         javax.jms.QueueConnection JavaDoc qc2 = f.createQueueConnection();
523         javax.jms.QueueSession JavaDoc qs2 = qc2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
524         javax.jms.QueueReceiver JavaDoc r = qs2.createReceiver(qs2.createQueue("my-queue"));
525         qc2.start();
526
527         Message m = qs.createMessage();
528
529         // send & recv
530
sender.send(m);
531         Assert.assertNotNull(r.receive(RECV_TIMEOUT));
532     }
533
534     /**
535      * Tests overflow handling for catatonic message listeners. This is named
536      * particuarly so that it does not get called in standard regression tests.
537      */

538     public void xtestOverflows()
539         throws Exception JavaDoc
540     {
541         TopicConnection tc1 = f.createTopicConnection();
542         TopicConnection tc2 = f.createTopicConnection();
543         TopicSession JavaDoc ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
544         TopicSession JavaDoc ts_client2 = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
545         TopicSession JavaDoc ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
546         Topic overflowTopic = ts_auto.createTopic("F");
547         TopicSubscriber tsub = ts_client.createSubscriber(overflowTopic );
548         TopicSubscriber tsub2 = ts_client2.createSubscriber(overflowTopic );
549         TopicPublisher tpub = ts_auto.createPublisher(overflowTopic );
550
551         tc1.start();
552         tc2.start();
553
554         // a boolean indicating which mode we are in.
555
final SynchronizedBoolean assertThisValue = new SynchronizedBoolean(true);
556         final SynchronizedBoolean shouldBlock = new SynchronizedBoolean(true);
557         final Object JavaDoc bogusLock = new Object JavaDoc();
558
559         // create a badly behaved subscriber
560
// never do in production code, obviously!
561
tsub.setMessageListener(new MessageListener() {
562                     public void onMessage(Message p0)
563                     {
564                         try
565                         {
566                             if (shouldBlock.get()) {
567                                 synchronized(bogusLock) {
568                                     bogusLock.wait();
569                                 }
570                             }
571                             p0.acknowledge();
572                         }
573                         catch (Exception JavaDoc e) {
574                             log.error("", e);;
575                         }
576                     }
577                 });
578         tsub2.setMessageListener(new MessageListener() {
579                     public void onMessage(Message p0)
580                     {
581                         Assert.assertTrue(assertThisValue.get());
582                     }
583                 });
584
585         // bah ok... send a bunch of messages
586
sendExactly(ts_auto, tpub, overflowTopic, 40000);
587         System.out.println("just finished sending");
588
589         // let the listener proceed, and wait a while for the
590
// enqueued messages to drain. this is ugly, but hey, it's test code.
591
shouldBlock.set(false);
592         synchronized(bogusLock) {
593             bogusLock.notifyAll();
594         }
595         try
596         {
597             Thread.sleep(10000);
598         }
599         catch (InterruptedException JavaDoc e)
600         {
601             log.error("", e);;
602         }
603
604         // at this point, the overflow manager should have
605
// killed the receiver connection, so that the
606
// message server remains viable.
607
// in addition, the second subscriber should receive no
608
// messages as we are operating on a connection level.
609
assertThisValue.set(false);
610         sendAndReceive(f, overflowTopic.getTopicName(), overflowTopic.getTopicName(), 20, 20);
611
612         // close
613
tsub.close();
614         tpub.close();
615     }
616
617     /**
618      * Test event handling.
619      */

620     public void testEvents()
621         throws Exception JavaDoc
622     {
623         final SynchronizedInt i = new SynchronizedInt(0);
624
625         ConnectionEventListener l = new ConnectionEventListener() {
626             public void connectionEvent(ConnectionEvent e)
627             {
628                 if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED)
629                     i.increment();
630             }
631         }, l2 = new ConnectionEventListener() {
632
633             public void connectionEvent(ConnectionEvent e)
634             {
635                 if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED)
636                     i.decrement();
637             }
638         };
639
640         TopicConnection tc1 = f.createTopicConnection();
641         ((IConnectionInfo)tc1).addEventListener(l);
642         tc1.close();
643         Assert.assertEquals(i.get(), 1);
644         i.set(0);
645
646         tc1 = f.createTopicConnection();
647         ((IConnectionInfo)tc1).addEventListener(l);
648         ((IConnectionInfo)tc1).addEventListener(l2);
649         tc1.close();
650         Assert.assertEquals(i.get(), 0);
651         i.set(0);
652
653         tc1 = f.createTopicConnection();
654         ((IConnectionInfo)tc1).addEventListener(l);
655         ((IConnectionInfo)tc1).addEventListener(l2);
656         ((IConnectionInfo)tc1).removeEventListener(l2);
657         tc1.close();
658         Assert.assertEquals(i.get(), 1);
659         i.set(0);
660
661     }
662
663     public void testMulticast()
664         throws Exception JavaDoc
665     {
666         TopicConnectionFactory mcf = new MulticastTopicConnectionFactory("234.2.3.4", 1234);
667
668         sendAndReceive(mcf, "hello", "hello", null, 20, 20);
669         sendAndReceive(mcf, "A", "A", null, 20, 20);
670         sendAndReceive(mcf, "B", "B", "where ordinal != 2", 10, 9);
671         sendAndReceive(mcf, "B", "B", "where ordinal = 2", 10, 1);
672         sendAndReceive(mcf, "B", "B", null, 20, 20);
673     }
674
675     public static void sendAndReceive(TopicConnectionFactory f,
676                                       String JavaDoc pub,
677                                       String JavaDoc sub,
678                                       int send,
679                                       int recv)
680         throws JMSException
681     {
682         sendAndReceive(f, pub, sub, null, send, recv);
683     }
684
685     public static void sendAndReceive(TopicConnectionFactory f,
686                                       String JavaDoc pub,
687                                       String JavaDoc sub,
688                                       String JavaDoc selector,
689                                       int send,
690                                       int recv)
691         throws JMSException
692     {
693         sendAndReceive(f, f, pub, sub, selector, send, recv);
694     }
695
696     public static void sendAndReceive(TopicConnectionFactory f,
697                                       TopicConnectionFactory g,
698                                       String JavaDoc pub,
699                                       String JavaDoc sub,
700                                       String JavaDoc selector,
701                                       int send,
702                                       int recv)
703         throws JMSException
704     {
705         TopicConnection tc1 = f.createTopicConnection();
706         TopicConnection tc2 = g.createTopicConnection();
707         TopicSession JavaDoc ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
708         TopicSession JavaDoc ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
709
710         TopicPublisher p = ts_auto.createPublisher(ts_auto.createTopic(pub));
711         TopicSubscriber s = ts_client.createSubscriber(ts_client.createTopic(sub),
712                                                        selector,
713                                                        true);
714
715         tc1.start();
716         tc2.start();
717
718         sendExactly(ts_auto, p, ts_auto.createTopic(pub), send);
719
720         // we can assume ordering holds, unless tehre is a selector.
721
if (selector == null && recv > 0)
722             receiveOrdered(s, recv);
723         else receiveExactly(s, recv);
724
725         s.close();
726         p.close();
727         ts_client.close();
728         ts_auto.close();
729         tc1.close();
730         tc2.close();
731     }
732
733     public static void sendExactly(Session JavaDoc s,
734                                    MessageProducer p,
735                                    Destination t,
736                                    int n)
737         throws JMSException
738     {
739         for (int i = 0; i < n; i++)
740         {
741             Message m = s.createMessage();
742             m.setIntProperty("ordinal", i);
743             p.send(t, m);
744
745             if ((i % 1000) == 0 && i > 0)
746                 System.out.println("sendExactly " + i);
747         }
748
749         try
750         {
751             Thread.sleep(SEND_TIMEOUT);
752         }
753         catch (InterruptedException JavaDoc e) {}
754     }
755
756     public static void receiveExactly(MessageConsumer s,
757                                       int n)
758         throws JMSException
759     {
760         receiveAtLeast(s, n);
761         Assert.assertNull(s.receiveNoWait());
762     }
763
764     public static void receiveAtLeast(MessageConsumer s,
765                                       int n)
766         throws JMSException
767     {
768         for (int i = 0; i < n; i++)
769         {
770             Message m = s.receive(RECV_TIMEOUT);
771             Assert.assertNotNull(m);
772             m.acknowledge();
773             log.debug("got message " + i + " ordinal was " + m.getIntProperty("ordinal"));
774         }
775     }
776
777     public static void receiveOrdered(MessageConsumer s,
778                                       int n)
779         throws JMSException
780     {
781         if (n >= 1) {
782             Message m = s.receive(RECV_TIMEOUT);
783             Assert.assertNotNull(m);
784             m.acknowledge();
785             receiveOrdered(s, m.getIntProperty("ordinal")+1, n-1);
786         }
787     }
788
789     public static void receiveOrdered(MessageConsumer s,
790                                       int startingAt,
791                                       int n)
792         throws JMSException
793     {
794         int lastSequence=0;
795
796         for (int i = 0; i < n; i++)
797         {
798             Message m = s.receive(RECV_TIMEOUT);
799             Assert.assertNotNull(m);
800             Assert.assertEquals(i + startingAt, m.getIntProperty("ordinal"));
801             Assert.assertTrue( ((LocalMessage)m).getSequence() > lastSequence );
802             m.acknowledge();
803
804             lastSequence = ((LocalMessage)m).getSequence();
805         }
806     }
807 }
808
Popular Tags