KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > UnackedUnitTestCase


1 /*
2  * JBoss, Home of Professional Open Source
3  * Copyright 2005, JBoss Inc., and individual contributors as indicated
4  * by the @authors tag. See the copyright.txt in the distribution for a
5  * full listing of individual contributors.
6  *
7  * This is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as
9  * published by the Free Software Foundation; either version 2.1 of
10  * the License, or (at your option) any later version.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this software; if not, write to the Free
19  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21  */

22 package org.jboss.test.jbossmq.test;
23
24 import javax.jms.BytesMessage JavaDoc;
25 import javax.jms.DeliveryMode JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.Queue JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.jms.Topic JavaDoc;
36 import javax.jms.TopicConnection JavaDoc;
37 import javax.jms.TopicConnectionFactory JavaDoc;
38 import javax.jms.TopicPublisher JavaDoc;
39 import javax.jms.TopicSession JavaDoc;
40 import javax.jms.TopicSubscriber JavaDoc;
41 import javax.naming.Context JavaDoc;
42
43 import junit.framework.Test;
44 import junit.framework.TestSuite;
45
46 import org.jboss.logging.Logger;
47 import org.jboss.test.JBossTestCase;
48
49 /**
50  * Rollback tests
51  *
52  * @author
53  * @version $Revision: 58115 $
54  */

55 public class UnackedUnitTestCase extends JBossTestCase
56 {
57    // Provider specific
58
static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
59    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
60
61    static String JavaDoc TEST_QUEUE = "queue/testQueue";
62    static String JavaDoc TEST_TOPIC = "topic/testTopic";
63    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
64
65    static byte[] PAYLOAD = new byte[10];
66
67    static Context JavaDoc context;
68    static QueueConnection JavaDoc queueConnection;
69    static TopicConnection JavaDoc topicConnection;
70    static TopicConnection JavaDoc topicDurableConnection;
71
72    public static Test suite() throws Exception JavaDoc
73    {
74       // JBAS-3580, the execution order of tests in this test case is important
75
// so it must be defined explicitly when running under some JVMs
76
TestSuite suite = new TestSuite();
77       suite.addTest(new UnackedUnitTestCase("testUnackedQueue"));
78       suite.addTest(new UnackedUnitTestCase("testUnackedMultipleSession"));
79       suite.addTest(new UnackedUnitTestCase("testUnackedMultipleConnection"));
80       suite.addTest(new UnackedUnitTestCase("testUnackedTopic"));
81       suite.addTest(new UnackedUnitTestCase("testUnackedDurableTopic"));
82       suite.addTest(new UnackedUnitTestCase("testDummyLast"));
83       
84       ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
85       String JavaDoc module = loader.getResource("messaging/test-destinations-service.xml").toString();
86
87       return getDeploySetup(suite, module);
88    }
89    
90    /**
91     * Constructor the test
92     *
93     * @param name Description of Parameter
94     * @exception Exception Description of Exception
95     */

96    public UnackedUnitTestCase(String JavaDoc name) throws Exception JavaDoc
97    {
98       super(name);
99    }
100
101    /**
102     * #Description of the Method
103     *
104     * @param persistence Description of Parameter
105     * @exception Exception Description of Exception
106     */

107    public void runUnackedQueue(final int persistence) throws Exception JavaDoc
108    {
109       drainQueue();
110
111       final int iterationCount = getIterationCount();
112
113       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
114       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
115
116       QueueSender JavaDoc sender = session.createSender(queue);
117
118       Message JavaDoc message = session.createBytesMessage();
119       ((BytesMessage JavaDoc)message).writeBytes(PAYLOAD);
120
121       for (int i = 0; i < iterationCount; i++)
122          sender.send(message, persistence, 4, 0);
123
124       session.close();
125
126       session = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
127       queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
128       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
129       queueConnection.start();
130       message = receiver.receive(50);
131       int c = 0;
132       while (message != null)
133       {
134          message = receiver.receive(50);
135          c++;
136       }
137       assertTrue("Should have received all data unacked", c == iterationCount);
138
139       queueConnection.close();
140       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc)context.lookup(QUEUE_FACTORY);
141       queueConnection = queueFactory.createQueueConnection();
142
143       assertTrue("Queue should be full", drainQueue() == iterationCount);
144
145    }
146
147    /**
148     * #Description of the Method
149     *
150     * @param persistence Description of Parameter
151     * @exception Exception Description of Exception
152     */

153    public void runUnackedMultipleSession(final int persistence) throws Exception JavaDoc
154    {
155       drainQueue();
156
157       final int iterationCount = getIterationCount();
158
159       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
160       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
161
162       QueueSender JavaDoc sender = session.createSender(queue);
163
164       Message JavaDoc message = session.createBytesMessage();
165       ((BytesMessage JavaDoc)message).writeBytes(PAYLOAD);
166
167       for (int i = 0; i < iterationCount; i++)
168          sender.send(message, persistence, 4, 0);
169
170       session.close();
171
172       QueueSession JavaDoc session1 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
173       queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
174       QueueReceiver JavaDoc receiver1 = session1.createReceiver(queue);
175       QueueSession JavaDoc session2 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
176       QueueReceiver JavaDoc receiver2 = session2.createReceiver(queue);
177       queueConnection.start();
178
179       // Read half from session1
180
int c = 0;
181       for (int l = 0; l < iterationCount/2; l++)
182       {
183          message = receiver1.receive(50);
184          if (message != null)
185             c++;
186       }
187       assertTrue("Should have received half data unacked", c == iterationCount/2);
188
189       // Read the rest from session2
190
c = 0;
191       Message JavaDoc lastMessage = null;
192       while (message != null)
193       {
194          message = receiver2.receive(50);
195          if (message != null)
196          {
197             c++;
198             lastMessage = message;
199          }
200       }
201       assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2);
202
203       // Close session1, the messages are unacked and should go back in the queue
204
session1.close();
205
206       // Acknowledge messages on session2 and close it
207
lastMessage.acknowledge();
208       session2.close();
209
210       queueConnection.stop();
211
212       assertTrue("Session1 messages should be available", drainQueue() == iterationCount/2);
213
214    }
215
216    /**
217     * #Description of the Method
218     *
219     * @param persistence Description of Parameter
220     * @exception Exception Description of Exception
221     */

222    public void runUnackedMultipleConnection(final int persistence) throws Exception JavaDoc
223    {
224       drainQueue();
225
226       final int iterationCount = getIterationCount();
227
228       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
229       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
230
231       QueueSender JavaDoc sender = session.createSender(queue);
232
233       Message JavaDoc message = session.createBytesMessage();
234       ((BytesMessage JavaDoc)message).writeBytes(PAYLOAD);
235
236       for (int i = 0; i < iterationCount; i++)
237          sender.send(message, persistence, 4, 0);
238
239       session.close();
240
241       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc)context.lookup(QUEUE_FACTORY);
242       QueueConnection JavaDoc queueConnection1 = queueFactory.createQueueConnection();
243       QueueSession JavaDoc session1 = queueConnection1.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
244       queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
245       QueueReceiver JavaDoc receiver1 = session1.createReceiver(queue);
246
247       QueueConnection JavaDoc queueConnection2 = queueFactory.createQueueConnection();
248       QueueSession JavaDoc session2 = queueConnection2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
249       QueueReceiver JavaDoc receiver2 = session2.createReceiver(queue);
250
251       queueConnection1.start();
252       queueConnection2.start();
253
254       // Read half from session1
255
int c = 0;
256       for (int l = 0; l < iterationCount/2; l++)
257       {
258          message = receiver1.receive(50);
259          if (message != null)
260             c++;
261       }
262       assertTrue("Should have received half data unacked", c == iterationCount/2);
263
264       // Read the rest from session2
265
Message JavaDoc lastMessage = null;
266       c = 0;
267       while (message != null)
268       {
269          message = receiver2.receive(50);
270          if (message != null)
271          {
272             c++;
273             lastMessage = message;
274          }
275       }
276       assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2);
277
278       // Close session1, the messages are unacked and should go back in the queue
279
queueConnection1.close();
280
281       // Acknowledge messages for connection 2 and close it
282
lastMessage.acknowledge();
283       queueConnection2.close();
284
285       assertTrue("Connection1 messages should be available", drainQueue() == iterationCount/2);
286
287    }
288
289    /**
290     * #Description of the Method
291     *
292     * @param persistence Description of Parameter
293     * @exception Exception Description of Exception
294     */

295    public void runUnackedTopic(final int persistence) throws Exception JavaDoc
296    {
297       drainQueue();
298       drainTopic();
299
300       final int iterationCount = getIterationCount();
301       final Logger log = getLog();
302
303       Thread JavaDoc sendThread =
304          new Thread JavaDoc()
305          {
306             public void run()
307             {
308                try
309                {
310
311                   TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
312                   Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_TOPIC);
313
314                   TopicPublisher JavaDoc publisher = session.createPublisher(topic);
315
316                   waitForSynchMessage();
317
318                   BytesMessage JavaDoc message = session.createBytesMessage();
319                   message.writeBytes(PAYLOAD);
320
321                   for (int i = 0; i < iterationCount; i++)
322                   {
323                      publisher.publish(message, persistence, 4, 0);
324                   }
325
326                   session.close();
327                }
328                catch (Exception JavaDoc e)
329                {
330                   log.error("error", e);
331                }
332             }
333          };
334
335       TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
336       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_TOPIC);
337       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
338
339
340       MyMessageListener listener = new MyMessageListener(iterationCount, log);
341
342       queueConnection.start();
343       sendThread.start();
344       subscriber.setMessageListener(listener);
345       topicConnection.start();
346       sendSynchMessage();
347       synchronized (listener)
348       {
349          if (listener.i < iterationCount)
350             listener.wait();
351       }
352       sendThread.join();
353       topicConnection.close();
354       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc)context.lookup(TOPIC_FACTORY);
355       topicConnection = topicFactory.createTopicConnection();
356       queueConnection.stop();
357       assertTrue("Topic should be empty", drainTopic() == 0);
358    }
359
360    /**
361     * #Description of the Method
362     *
363     * @param persistence Description of Parameter
364     * @exception Exception Description of Exception
365     */

366    public void runUnackedDurableTopic(final int persistence) throws Exception JavaDoc
367    {
368       drainQueue();
369       drainDurableTopic();
370
371       final int iterationCount = getIterationCount();
372       final Logger log = getLog();
373
374       Thread JavaDoc sendThread =
375          new Thread JavaDoc()
376          {
377             public void run()
378             {
379                try
380                {
381
382                   TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
383                   Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_DURABLE_TOPIC);
384
385                   TopicPublisher JavaDoc publisher = session.createPublisher(topic);
386
387                   waitForSynchMessage();
388
389                   BytesMessage JavaDoc message = session.createBytesMessage();
390                   message.writeBytes(PAYLOAD);
391
392                   for (int i = 0; i < iterationCount; i++)
393                   {
394                      publisher.publish(message, persistence, 4, 0);
395                   }
396
397                   session.close();
398                }
399                catch (Exception JavaDoc e)
400                {
401                   log.error("error", e);
402                }
403             }
404          };
405
406       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
407       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_DURABLE_TOPIC);
408       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
409
410       MyMessageListener listener = new MyMessageListener(iterationCount, log);
411
412       queueConnection.start();
413       sendThread.start();
414       subscriber.setMessageListener(listener);
415       topicDurableConnection.start();
416       sendSynchMessage();
417       synchronized (listener)
418       {
419          if (listener.i < iterationCount)
420             listener.wait();
421       }
422
423       sendThread.join();
424       topicDurableConnection.close();
425       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc)context.lookup(TOPIC_FACTORY);
426       topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
427       queueConnection.stop();
428       assertTrue("Topic should be full", drainDurableTopic() == iterationCount);
429    }
430
431    /**
432     * A unit test for JUnit
433     *
434     * @exception Exception Description of Exception
435     */

436    public void testUnackedQueue() throws Exception JavaDoc
437    {
438
439       getLog().debug("Starting UnackedQueue test");
440
441       runUnackedQueue(DeliveryMode.NON_PERSISTENT);
442       runUnackedQueue(DeliveryMode.PERSISTENT);
443
444       getLog().debug("UnackedQueue passed");
445    }
446
447    /**
448     * A unit test for JUnit
449     *
450     * @exception Exception Description of Exception
451     */

452    public void testUnackedMultipleSession() throws Exception JavaDoc
453    {
454
455       getLog().debug("Starting UnackedMultipleSession test");
456
457       runUnackedMultipleSession(DeliveryMode.NON_PERSISTENT);
458       runUnackedMultipleSession(DeliveryMode.PERSISTENT);
459
460       getLog().debug("UnackedMultipleSession passed");
461    }
462
463    /**
464     * A unit test for JUnit
465     *
466     * @exception Exception Description of Exception
467     */

468    public void testUnackedMultipleConnection() throws Exception JavaDoc
469    {
470
471       getLog().debug("Starting UnackedMultipleConnection test");
472
473       runUnackedMultipleConnection(DeliveryMode.NON_PERSISTENT);
474       runUnackedMultipleConnection(DeliveryMode.PERSISTENT);
475
476       getLog().debug("UnackedMultipleConnection passed");
477    }
478
479    /**
480     * A unit test for JUnit
481     *
482     * @exception Exception Description of Exception
483     */

484    public void testUnackedTopic() throws Exception JavaDoc
485    {
486
487       getLog().debug("Starting UnackedTopic test");
488
489       runUnackedTopic(DeliveryMode.NON_PERSISTENT);
490       runUnackedTopic(DeliveryMode.PERSISTENT);
491
492       getLog().debug("UnackedTopic passed");
493    }
494
495    /**
496     * A unit test for JUnit
497     *
498     * @exception Exception Description of Exception
499     */

500    public void testUnackedDurableTopic() throws Exception JavaDoc
501    {
502
503       getLog().debug("Starting UnackedDurableTopic test");
504
505       runUnackedDurableTopic(DeliveryMode.NON_PERSISTENT);
506       runUnackedDurableTopic(DeliveryMode.PERSISTENT);
507
508       getLog().debug("UnackedDurableTopic passed");
509    }
510
511    /**
512     * A unit test for JUnit
513     *
514     * @exception Exception Description of Exception
515     */

516    public void testDummyLast() throws Exception JavaDoc
517    {
518
519       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
520       session.unsubscribe("test");
521
522       queueConnection.close();
523       topicConnection.close();
524       topicDurableConnection.close();
525    }
526
527    /**
528     * The JUnit setup method
529     *
530     * @exception Exception Description of Exception
531     */

532    protected void setUp() throws Exception JavaDoc
533    {
534       if (context == null)
535       {
536          context = getInitialContext();
537
538          QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc)context.lookup(QUEUE_FACTORY);
539          queueConnection = queueFactory.createQueueConnection();
540
541          TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc)context.lookup(TOPIC_FACTORY);
542          topicConnection = topicFactory.createTopicConnection();
543          topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
544
545          getLog().debug("Connection to JBossMQ established.");
546       }
547    }
548
549    // Emptys out all the messages in a queue
550
private int drainQueue() throws Exception JavaDoc
551    {
552       getLog().debug("Draining Queue");
553       queueConnection.start();
554
555       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
556       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
557
558       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
559       Message JavaDoc message = receiver.receive(1000);
560       int c = 0;
561       while (message != null)
562       {
563          message = receiver.receive(1000);
564          c++;
565       }
566
567       getLog().debug(" Drained " + c + " messages from the queue");
568
569       session.close();
570
571       queueConnection.stop();
572
573       return c;
574    }
575
576    // Emptys out all the messages in a topic
577
private int drainTopic() throws Exception JavaDoc
578    {
579       getLog().debug("Draining Topic");
580       topicConnection.start();
581
582       final TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
583       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_TOPIC);
584       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
585
586       Message JavaDoc message = subscriber.receive(1000);
587       int c = 0;
588       while (message != null)
589       {
590          message = subscriber.receive(1000);
591          c++;
592       }
593
594       getLog().debug(" Drained " + c + " messages from the topic");
595
596       session.close();
597
598       topicConnection.stop();
599
600       return c;
601    }
602
603    // Emptys out all the messages in a durable topic
604
private int drainDurableTopic() throws Exception JavaDoc
605    {
606       getLog().debug("Draining Durable Topic");
607       topicDurableConnection.start();
608
609       final TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
610       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_DURABLE_TOPIC);
611       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
612
613       Message JavaDoc message = subscriber.receive(1000);
614       int c = 0;
615       while (message != null)
616       {
617          message = subscriber.receive(1000);
618          c++;
619       }
620
621       getLog().debug(" Drained " + c + " messages from the durable topic");
622
623       session.close();
624
625       topicDurableConnection.stop();
626
627       return c;
628    }
629
630    private void waitForSynchMessage() throws Exception JavaDoc
631    {
632       getLog().debug("Waiting for Synch Message");
633       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
634       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
635
636       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
637       receiver.receive();
638       session.close();
639       getLog().debug("Got Synch Message");
640    }
641
642    private void sendSynchMessage() throws Exception JavaDoc
643    {
644       getLog().debug("Sending Synch Message");
645       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
646       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
647
648       QueueSender JavaDoc sender = session.createSender(queue);
649
650       Message JavaDoc message = session.createMessage();
651       sender.send(message);
652
653       session.close();
654       getLog().debug("Sent Synch Message");
655    }
656
657    public class MyMessageListener
658       implements MessageListener JavaDoc
659    {
660       public int i = 0;
661
662       public int iterationCount;
663
664       public Logger log;
665
666       public MyMessageListener(int iterationCount, Logger log)
667       {
668          this.iterationCount = iterationCount;
669          this.log = log;
670       }
671
672       public void onMessage(Message JavaDoc message)
673       {
674          synchronized (this)
675          {
676             i++;
677             log.debug("Got message " + i);
678             if (i >= iterationCount)
679                this.notify();
680          }
681       }
682    }
683
684    public int getIterationCount()
685    {
686       return 5;
687    }
688 }
689
Popular Tags