KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmessaging > test > UnackedUnitTestCase


1 /*
2  * JBoss, Home of Professional Open Source
3  * Copyright 2005, JBoss Inc., and individual contributors as indicated
4  * by the @authors tag. See the copyright.txt in the distribution for a
5  * full listing of individual contributors.
6  *
7  * This is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as
9  * published by the Free Software Foundation; either version 2.1 of
10  * the License, or (at your option) any later version.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this software; if not, write to the Free
19  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21  */

22 package org.jboss.test.jbossmessaging.test;
23
24 import javax.jms.BytesMessage JavaDoc;
25 import javax.jms.DeliveryMode JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.Queue JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.jms.Topic JavaDoc;
36 import javax.jms.TopicConnection JavaDoc;
37 import javax.jms.TopicConnectionFactory JavaDoc;
38 import javax.jms.TopicPublisher JavaDoc;
39 import javax.jms.TopicSession JavaDoc;
40 import javax.jms.TopicSubscriber JavaDoc;
41 import javax.naming.Context JavaDoc;
42
43 import junit.framework.Test;
44 import junit.framework.TestSuite;
45
46 import org.jboss.logging.Logger;
47 import org.jboss.test.jbossmessaging.JMSTestCase;
48
49 /**
50  * Rollback tests
51  *
52  * @author <a HREF="mailto:richard.achmatowicz@jboss.com">Richard Achmatowicz</a>
53  * @author
54  * @version $Revision: 56718 $
55  */

56 public class UnackedUnitTestCase extends JMSTestCase
57 {
58    // Provider specific
59
static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
60    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
61
62    static String JavaDoc TEST_QUEUE = "queue/testQueue";
63    static String JavaDoc TEST_TOPIC = "topic/testTopic";
64    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
65
66    static byte[] PAYLOAD = new byte[10];
67
68    static Context JavaDoc context;
69    static QueueConnection JavaDoc queueConnection;
70    static TopicConnection JavaDoc topicConnection;
71    static TopicConnection JavaDoc topicDurableConnection;
72
73    public static Test suite() throws Exception JavaDoc
74    {
75       // JBAS-3580, the execution order of tests in this test case is important
76
// so it must be defined explicitly when running under some JVMs
77
TestSuite suite = new TestSuite();
78       suite.addTest(new UnackedUnitTestCase("testUnackedQueue"));
79       suite.addTest(new UnackedUnitTestCase("testUnackedMultipleSession"));
80       suite.addTest(new UnackedUnitTestCase("testUnackedMultipleConnection"));
81       suite.addTest(new UnackedUnitTestCase("testUnackedTopic"));
82       suite.addTest(new UnackedUnitTestCase("testUnackedDurableTopic"));
83       suite.addTest(new UnackedUnitTestCase("testDummyLast"));
84       
85       ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
86       String JavaDoc resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
87       String JavaDoc module = loader.getResource(resourceName).toString();
88
89       return getDeploySetup(suite, module);
90    }
91    
92    /**
93     * Constructor the test
94     *
95     * @param name Description of Parameter
96     * @exception Exception Description of Exception
97     */

98    public UnackedUnitTestCase(String JavaDoc name) throws Exception JavaDoc
99    {
100       super(name);
101    }
102
103    /**
104     * #Description of the Method
105     *
106     * @param persistence Description of Parameter
107     * @exception Exception Description of Exception
108     */

109    public void runUnackedQueue(final int persistence) throws Exception JavaDoc
110    {
111       drainQueue();
112
113       final int iterationCount = getIterationCount();
114
115       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
116       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
117
118       QueueSender JavaDoc sender = session.createSender(queue);
119
120       Message JavaDoc message = session.createBytesMessage();
121       ((BytesMessage JavaDoc)message).writeBytes(PAYLOAD);
122
123       for (int i = 0; i < iterationCount; i++)
124          sender.send(message, persistence, 4, 0);
125
126       session.close();
127
128       session = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
129       queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
130       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
131       queueConnection.start();
132       message = receiver.receive(50);
133       int c = 0;
134       while (message != null)
135       {
136          message = receiver.receive(50);
137          c++;
138       }
139       assertTrue("Should have received all data unacked", c == iterationCount);
140
141       queueConnection.close();
142       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc)context.lookup(QUEUE_FACTORY);
143       queueConnection = queueFactory.createQueueConnection();
144
145       assertTrue("Queue should be full", drainQueue() == iterationCount);
146
147    }
148
149    /**
150     * #Description of the Method
151     *
152     * @param persistence Description of Parameter
153     * @exception Exception Description of Exception
154     */

155    public void runUnackedMultipleSession(final int persistence) throws Exception JavaDoc
156    {
157       drainQueue();
158
159       final int iterationCount = getIterationCount();
160
161       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
162       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
163
164       QueueSender JavaDoc sender = session.createSender(queue);
165
166       Message JavaDoc message = session.createBytesMessage();
167       ((BytesMessage JavaDoc)message).writeBytes(PAYLOAD);
168
169       for (int i = 0; i < iterationCount; i++)
170          sender.send(message, persistence, 4, 0);
171
172       session.close();
173
174       QueueSession JavaDoc session1 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
175       queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
176       QueueReceiver JavaDoc receiver1 = session1.createReceiver(queue);
177       QueueSession JavaDoc session2 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
178       QueueReceiver JavaDoc receiver2 = session2.createReceiver(queue);
179       queueConnection.start();
180
181       // Read half from session1
182
int c = 0;
183       for (int l = 0; l < iterationCount/2; l++)
184       {
185          message = receiver1.receive(50);
186          if (message != null)
187             c++;
188       }
189       assertTrue("Should have received half data unacked", c == iterationCount/2);
190
191       // Read the rest from session2
192
c = 0;
193       Message JavaDoc lastMessage = null;
194       while (message != null)
195       {
196          message = receiver2.receive(50);
197          if (message != null)
198          {
199             c++;
200             lastMessage = message;
201          }
202       }
203       assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2);
204
205       // Close session1, the messages are unacked and should go back in the queue
206
session1.close();
207
208       // Acknowledge messages on session2 and close it
209
lastMessage.acknowledge();
210       session2.close();
211
212       queueConnection.stop();
213
214       assertTrue("Session1 messages should be available", drainQueue() == iterationCount/2);
215
216    }
217
218    /**
219     * #Description of the Method
220     *
221     * @param persistence Description of Parameter
222     * @exception Exception Description of Exception
223     */

224    public void runUnackedMultipleConnection(final int persistence) throws Exception JavaDoc
225    {
226       drainQueue();
227
228       final int iterationCount = getIterationCount();
229
230       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
231       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
232
233       QueueSender JavaDoc sender = session.createSender(queue);
234
235       Message JavaDoc message = session.createBytesMessage();
236       ((BytesMessage JavaDoc)message).writeBytes(PAYLOAD);
237
238       for (int i = 0; i < iterationCount; i++)
239          sender.send(message, persistence, 4, 0);
240
241       session.close();
242
243       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc)context.lookup(QUEUE_FACTORY);
244       QueueConnection JavaDoc queueConnection1 = queueFactory.createQueueConnection();
245       QueueSession JavaDoc session1 = queueConnection1.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
246       queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
247       QueueReceiver JavaDoc receiver1 = session1.createReceiver(queue);
248
249       QueueConnection JavaDoc queueConnection2 = queueFactory.createQueueConnection();
250       QueueSession JavaDoc session2 = queueConnection2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
251       QueueReceiver JavaDoc receiver2 = session2.createReceiver(queue);
252
253       queueConnection1.start();
254       queueConnection2.start();
255
256       // Read half from session1
257
int c = 0;
258       for (int l = 0; l < iterationCount/2; l++)
259       {
260          message = receiver1.receive(50);
261          if (message != null)
262             c++;
263       }
264       assertTrue("Should have received half data unacked", c == iterationCount/2);
265
266       // Read the rest from session2
267
Message JavaDoc lastMessage = null;
268       c = 0;
269       while (message != null)
270       {
271          message = receiver2.receive(50);
272          if (message != null)
273          {
274             c++;
275             lastMessage = message;
276          }
277       }
278       assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2);
279
280       // Close session1, the messages are unacked and should go back in the queue
281
queueConnection1.close();
282
283       // Acknowledge messages for connection 2 and close it
284
lastMessage.acknowledge();
285       queueConnection2.close();
286
287       assertTrue("Connection1 messages should be available", drainQueue() == iterationCount/2);
288
289    }
290
291    /**
292     * #Description of the Method
293     *
294     * @param persistence Description of Parameter
295     * @exception Exception Description of Exception
296     */

297    public void runUnackedTopic(final int persistence) throws Exception JavaDoc
298    {
299       drainQueue();
300       drainTopic();
301
302       final int iterationCount = getIterationCount();
303       final Logger log = getLog();
304
305       Thread JavaDoc sendThread =
306          new Thread JavaDoc()
307          {
308             public void run()
309             {
310                try
311                {
312
313                   TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
314                   Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_TOPIC);
315
316                   TopicPublisher JavaDoc publisher = session.createPublisher(topic);
317
318                   waitForSynchMessage();
319
320                   BytesMessage JavaDoc message = session.createBytesMessage();
321                   message.writeBytes(PAYLOAD);
322
323                   for (int i = 0; i < iterationCount; i++)
324                   {
325                      publisher.publish(message, persistence, 4, 0);
326                   }
327
328                   session.close();
329                }
330                catch (Exception JavaDoc e)
331                {
332                   log.error("error", e);
333                }
334             }
335          };
336
337       TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
338       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_TOPIC);
339       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
340
341
342       MyMessageListener listener = new MyMessageListener(iterationCount, log);
343
344       queueConnection.start();
345       sendThread.start();
346       subscriber.setMessageListener(listener);
347       topicConnection.start();
348       sendSynchMessage();
349       synchronized (listener)
350       {
351          if (listener.i < iterationCount)
352             listener.wait();
353       }
354       sendThread.join();
355       topicConnection.close();
356       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc)context.lookup(TOPIC_FACTORY);
357       topicConnection = topicFactory.createTopicConnection();
358       queueConnection.stop();
359       assertTrue("Topic should be empty", drainTopic() == 0);
360    }
361
362    /**
363     * #Description of the Method
364     *
365     * @param persistence Description of Parameter
366     * @exception Exception Description of Exception
367     */

368    public void runUnackedDurableTopic(final int persistence) throws Exception JavaDoc
369    {
370       drainQueue();
371       drainDurableTopic();
372
373       final int iterationCount = getIterationCount();
374       final Logger log = getLog();
375
376       Thread JavaDoc sendThread =
377          new Thread JavaDoc()
378          {
379             public void run()
380             {
381                try
382                {
383
384                   TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
385                   Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_DURABLE_TOPIC);
386
387                   TopicPublisher JavaDoc publisher = session.createPublisher(topic);
388
389                   waitForSynchMessage();
390
391                   BytesMessage JavaDoc message = session.createBytesMessage();
392                   message.writeBytes(PAYLOAD);
393
394                   for (int i = 0; i < iterationCount; i++)
395                   {
396                      publisher.publish(message, persistence, 4, 0);
397                   }
398
399                   session.close();
400                }
401                catch (Exception JavaDoc e)
402                {
403                   log.error("error", e);
404                }
405             }
406          };
407
408       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
409       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_DURABLE_TOPIC);
410       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
411
412       MyMessageListener listener = new MyMessageListener(iterationCount, log);
413
414       queueConnection.start();
415       sendThread.start();
416       subscriber.setMessageListener(listener);
417       topicDurableConnection.start();
418       sendSynchMessage();
419       synchronized (listener)
420       {
421          if (listener.i < iterationCount)
422             listener.wait();
423       }
424
425       sendThread.join();
426       topicDurableConnection.close();
427       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc)context.lookup(TOPIC_FACTORY);
428       topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
429       queueConnection.stop();
430       assertTrue("Topic should be full", drainDurableTopic() == iterationCount);
431    }
432
433    /**
434     * A unit test for JUnit
435     *
436     * @exception Exception Description of Exception
437     */

438    public void testUnackedQueue() throws Exception JavaDoc
439    {
440
441       getLog().debug("Starting UnackedQueue test");
442
443       runUnackedQueue(DeliveryMode.NON_PERSISTENT);
444       runUnackedQueue(DeliveryMode.PERSISTENT);
445
446       getLog().debug("UnackedQueue passed");
447    }
448
449    /**
450     * A unit test for JUnit
451     *
452     * @exception Exception Description of Exception
453     */

454    public void testUnackedMultipleSession() throws Exception JavaDoc
455    {
456
457       getLog().debug("Starting UnackedMultipleSession test");
458
459       runUnackedMultipleSession(DeliveryMode.NON_PERSISTENT);
460       runUnackedMultipleSession(DeliveryMode.PERSISTENT);
461
462       getLog().debug("UnackedMultipleSession passed");
463    }
464
465    /**
466     * A unit test for JUnit
467     *
468     * @exception Exception Description of Exception
469     */

470    public void testUnackedMultipleConnection() throws Exception JavaDoc
471    {
472
473       getLog().debug("Starting UnackedMultipleConnection test");
474
475       runUnackedMultipleConnection(DeliveryMode.NON_PERSISTENT);
476       runUnackedMultipleConnection(DeliveryMode.PERSISTENT);
477
478       getLog().debug("UnackedMultipleConnection passed");
479    }
480
481    /**
482     * A unit test for JUnit
483     *
484     * @exception Exception Description of Exception
485     */

486    public void testUnackedTopic() throws Exception JavaDoc
487    {
488
489       getLog().debug("Starting UnackedTopic test");
490
491       runUnackedTopic(DeliveryMode.NON_PERSISTENT);
492       runUnackedTopic(DeliveryMode.PERSISTENT);
493
494       getLog().debug("UnackedTopic passed");
495    }
496
497    /**
498     * A unit test for JUnit
499     *
500     * @exception Exception Description of Exception
501     */

502    public void testUnackedDurableTopic() throws Exception JavaDoc
503    {
504
505       getLog().debug("Starting UnackedDurableTopic test");
506
507       runUnackedDurableTopic(DeliveryMode.NON_PERSISTENT);
508       runUnackedDurableTopic(DeliveryMode.PERSISTENT);
509
510       getLog().debug("UnackedDurableTopic passed");
511    }
512
513    /**
514     * A unit test for JUnit
515     *
516     * @exception Exception Description of Exception
517     */

518    public void testDummyLast() throws Exception JavaDoc
519    {
520
521       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
522       session.unsubscribe("test");
523
524       queueConnection.close();
525       topicConnection.close();
526       topicDurableConnection.close();
527    }
528
529    /**
530     * The JUnit setup method
531     *
532     * @exception Exception Description of Exception
533     */

534    protected void setUp() throws Exception JavaDoc
535    {
536        // call setUp() in superclass
537
super.setUp() ;
538
539       if (context == null)
540       {
541          context = getInitialContext();
542
543          QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc)context.lookup(QUEUE_FACTORY);
544          queueConnection = queueFactory.createQueueConnection();
545
546          TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc)context.lookup(TOPIC_FACTORY);
547          topicConnection = topicFactory.createTopicConnection();
548          topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
549
550          getLog().debug("Connection to JBossMQ established.");
551       }
552    }
553
554    // Emptys out all the messages in a queue
555
private int drainQueue() throws Exception JavaDoc
556    {
557       getLog().debug("Draining Queue");
558       queueConnection.start();
559
560       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
561       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
562
563       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
564       Message JavaDoc message = receiver.receive(1000);
565       int c = 0;
566       while (message != null)
567       {
568          message = receiver.receive(1000);
569          c++;
570       }
571
572       getLog().debug(" Drained " + c + " messages from the queue");
573
574       session.close();
575
576       queueConnection.stop();
577
578       return c;
579    }
580
581    // Emptys out all the messages in a topic
582
private int drainTopic() throws Exception JavaDoc
583    {
584       getLog().debug("Draining Topic");
585       topicConnection.start();
586
587       final TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
588       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_TOPIC);
589       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
590
591       Message JavaDoc message = subscriber.receive(1000);
592       int c = 0;
593       while (message != null)
594       {
595          message = subscriber.receive(1000);
596          c++;
597       }
598
599       getLog().debug(" Drained " + c + " messages from the topic");
600
601       session.close();
602
603       topicConnection.stop();
604
605       return c;
606    }
607
608    // Emptys out all the messages in a durable topic
609
private int drainDurableTopic() throws Exception JavaDoc
610    {
611       getLog().debug("Draining Durable Topic");
612       topicDurableConnection.start();
613
614       final TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
615       Topic JavaDoc topic = (Topic JavaDoc)context.lookup(TEST_DURABLE_TOPIC);
616       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
617
618       Message JavaDoc message = subscriber.receive(1000);
619       int c = 0;
620       while (message != null)
621       {
622          message = subscriber.receive(1000);
623          c++;
624       }
625
626       getLog().debug(" Drained " + c + " messages from the durable topic");
627
628       session.close();
629
630       topicDurableConnection.stop();
631
632       return c;
633    }
634
635    private void waitForSynchMessage() throws Exception JavaDoc
636    {
637       getLog().debug("Waiting for Synch Message");
638       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
639       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
640
641       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
642       receiver.receive();
643       session.close();
644       getLog().debug("Got Synch Message");
645    }
646
647    private void sendSynchMessage() throws Exception JavaDoc
648    {
649       getLog().debug("Sending Synch Message");
650       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
651       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
652
653       QueueSender JavaDoc sender = session.createSender(queue);
654
655       Message JavaDoc message = session.createMessage();
656       sender.send(message);
657
658       session.close();
659       getLog().debug("Sent Synch Message");
660    }
661
662    public class MyMessageListener
663       implements MessageListener JavaDoc
664    {
665       public int i = 0;
666
667       public int iterationCount;
668
669       public Logger log;
670
671       public MyMessageListener(int iterationCount, Logger log)
672       {
673          this.iterationCount = iterationCount;
674          this.log = log;
675       }
676
677       public void onMessage(Message JavaDoc message)
678       {
679          synchronized (this)
680          {
681             i++;
682             log.debug("Got message " + i);
683             if (i >= iterationCount)
684                this.notify();
685          }
686       }
687    }
688
689    public int getIterationCount()
690    {
691       return 5;
692    }
693 }
694
Popular Tags