KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > RollBackUnitTestCase


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmq.test;
23
24 import javax.jms.BytesMessage JavaDoc;
25 import javax.jms.DeliveryMode JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.jms.Topic JavaDoc;
36 import javax.jms.TopicConnection JavaDoc;
37 import javax.jms.TopicConnectionFactory JavaDoc;
38 import javax.jms.TopicPublisher JavaDoc;
39 import javax.jms.TopicSession JavaDoc;
40 import javax.jms.TopicSubscriber JavaDoc;
41 import javax.jms.Queue JavaDoc;
42 import javax.naming.Context JavaDoc;
43
44 import org.jboss.logging.Logger;
45 import org.jboss.test.JBossTestCase;
46
47 /**
48  * Rollback tests
49  *
50  * @author
51  * @version
52  */

53 public class RollBackUnitTestCase extends JBossTestCase
54 {
55
56    // Provider specific
57
static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
58
59    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
60
61    static String JavaDoc TEST_QUEUE = "queue/testQueue";
62
63    static String JavaDoc TEST_TOPIC = "topic/testTopic";
64
65    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
66
67    static byte[] PAYLOAD = new byte[10];
68
69    static Context JavaDoc context;
70
71    static QueueConnection JavaDoc queueConnection;
72
73    static TopicConnection JavaDoc topicConnection;
74
75    static TopicConnection JavaDoc topicDurableConnection;
76
77    /**
78     * Constructor the test
79     *
80     * @param name Description of Parameter
81     * @exception Exception Description of Exception
82     */

83    public RollBackUnitTestCase(String JavaDoc name) throws Exception JavaDoc
84    {
85       super(name);
86    }
87
88    /**
89     * #Description of the Method
90     *
91     * @param persistence Description of Parameter
92     * @exception Exception Description of Exception
93     */

94    public void runQueueSendRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
95    {
96       drainQueue();
97       final int iterationCount = getIterationCount();
98       final Logger log = getLog();
99
100       Thread JavaDoc sendThread = new Thread JavaDoc()
101       {
102          public void run()
103          {
104             try
105             {
106                QueueSession JavaDoc session = queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
107                Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
108
109                QueueSender JavaDoc sender = session.createSender(queue);
110
111                BytesMessage JavaDoc message = session.createBytesMessage();
112                message.writeBytes(PAYLOAD);
113                message.setStringProperty("TEST_NAME", "runQueueSendRollback");
114                message.setIntProperty("TEST_PERSISTENCE", persistence);
115                message.setBooleanProperty("TEST_EXPLICIT", explicit);
116                
117                for (int i = 0; i < iterationCount; i++)
118                {
119                   sender.send(message, persistence, 4, 0);
120                }
121
122                if (explicit)
123                   session.rollback();
124                session.close();
125             }
126             catch (Exception JavaDoc e)
127             {
128                log.error("error", e);
129             }
130          }
131       };
132
133       sendThread.start();
134       sendThread.join();
135       assertTrue("Queue should be empty", drainQueue() == 0);
136    }
137
138    /**
139     * #Description of the Method
140     *
141     * @param persistence Description of Parameter
142     * @exception Exception Description of Exception
143     */

144    public void runTopicSendRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
145    {
146       drainQueue();
147       drainTopic();
148
149       final int iterationCount = getIterationCount();
150       final Logger log = getLog();
151
152       Thread JavaDoc sendThread = new Thread JavaDoc()
153       {
154          public void run()
155          {
156             try
157             {
158
159                TopicSession JavaDoc session = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
160                Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
161
162                TopicPublisher JavaDoc publisher = session.createPublisher(topic);
163
164                BytesMessage JavaDoc message = session.createBytesMessage();
165                message.writeBytes(PAYLOAD);
166                message.setStringProperty("TEST_NAME", "runTopicSendRollback");
167                message.setIntProperty("TEST_PERSISTENCE", persistence);
168                message.setBooleanProperty("TEST_EXPLICIT", explicit);
169
170                for (int i = 0; i < iterationCount; i++)
171                {
172                   publisher.publish(message, persistence, 4, 0);
173                }
174
175                session.close();
176             }
177             catch (Exception JavaDoc e)
178             {
179                log.error("error", e);
180             }
181          }
182       };
183
184       sendThread.start();
185       sendThread.join();
186       assertTrue("Topic should be empty", drainTopic() == 0);
187    }
188
189    /**
190     * #Description of the Method
191     *
192     * @param persistence Description of Parameter
193     * @exception Exception Description of Exception
194     */

195    public void runAsynchQueueReceiveRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
196    {
197       drainQueue();
198
199       final int iterationCount = getIterationCount();
200       final Logger log = getLog();
201
202       Thread JavaDoc sendThread = new Thread JavaDoc()
203       {
204          public void run()
205          {
206             try
207             {
208                QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
209                Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
210
211                QueueSender JavaDoc sender = session.createSender(queue);
212
213                BytesMessage JavaDoc message = session.createBytesMessage();
214                message.writeBytes(PAYLOAD);
215                message.setStringProperty("TEST_NAME", "runAsynchQueueReceiveRollback");
216                message.setIntProperty("TEST_PERSISTENCE", persistence);
217                message.setBooleanProperty("TEST_EXPLICIT", explicit);
218
219                for (int i = 0; i < iterationCount; i++)
220                {
221                   sender.send(message, persistence, 4, 0);
222                }
223
224                session.close();
225             }
226             catch (Exception JavaDoc e)
227             {
228                log.error("error", e);
229             }
230          }
231       };
232
233       QueueSession JavaDoc session = queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
234       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
235       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
236
237       MyMessageListener listener = new MyMessageListener(iterationCount, log);
238
239       sendThread.start();
240       receiver.setMessageListener(listener);
241       queueConnection.start();
242       synchronized (listener)
243       {
244          if (listener.i < iterationCount)
245             listener.wait();
246       }
247       receiver.setMessageListener(null);
248
249       if (explicit)
250          session.rollback();
251       session.close();
252
253       queueConnection.stop();
254
255       sendThread.join();
256
257       assertTrue("Queue should be full", drainQueue() == iterationCount);
258
259    }
260
261    /**
262     * #Description of the Method
263     *
264     * @param persistence Description of Parameter
265     * @exception Exception Description of Exception
266     */

267    public void runAsynchTopicReceiveRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
268    {
269       drainQueue();
270       drainTopic();
271
272       final int iterationCount = getIterationCount();
273       final Logger log = getLog();
274
275       Thread JavaDoc sendThread = new Thread JavaDoc()
276       {
277          public void run()
278          {
279             try
280             {
281
282                TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
283                Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
284
285                TopicPublisher JavaDoc publisher = session.createPublisher(topic);
286
287                waitForSynchMessage();
288
289                BytesMessage JavaDoc message = session.createBytesMessage();
290                message.writeBytes(PAYLOAD);
291                message.setStringProperty("TEST_NAME", "runAsynchTopicReceiveRollback");
292                message.setIntProperty("TEST_PERSISTENCE", persistence);
293                message.setBooleanProperty("TEST_EXPLICIT", explicit);
294
295                for (int i = 0; i < iterationCount; i++)
296                {
297                   publisher.publish(message, persistence, 4, 0);
298                   log.debug("Published message " + i);
299                }
300
301                session.close();
302             }
303             catch (Exception JavaDoc e)
304             {
305                log.error("error", e);
306             }
307          }
308       };
309
310       TopicSession JavaDoc session = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
311       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
312       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
313
314       MyMessageListener listener = new MyMessageListener(iterationCount, log);
315
316       queueConnection.start();
317       sendThread.start();
318       subscriber.setMessageListener(listener);
319       topicConnection.start();
320       sendSynchMessage();
321       getLog().debug("Waiting for all messages");
322       synchronized (listener)
323       {
324          if (listener.i < iterationCount)
325             listener.wait();
326       }
327       getLog().debug("Got all messages");
328       subscriber.setMessageListener(null);
329
330       if (explicit)
331          session.rollback();
332       session.close();
333
334       sendThread.join();
335       topicConnection.stop();
336       queueConnection.stop();
337       assertTrue("Topic should be empty", drainTopic() == 0);
338    }
339
340    /**
341     * #Description of the Method
342     *
343     * @param persistence Description of Parameter
344     * @exception Exception Description of Exception
345     */

346    public void runAsynchDurableTopicReceiveRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
347    {
348       getLog().debug("====> runAsynchDurableTopicReceiveRollBack persistence=" + persistence + " explicit=" + explicit);
349       drainQueue();
350       drainDurableTopic();
351
352       final int iterationCount = getIterationCount();
353       final Logger log = getLog();
354
355       Thread JavaDoc sendThread = new Thread JavaDoc()
356       {
357          public void run()
358          {
359             try
360             {
361
362                TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
363                Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
364
365                TopicPublisher JavaDoc publisher = session.createPublisher(topic);
366
367                waitForSynchMessage();
368
369                BytesMessage JavaDoc message = session.createBytesMessage();
370                message.writeBytes(PAYLOAD);
371                message.setStringProperty("TEST_NAME", "runAsynchDurableTopicReceiveRollback");
372                message.setIntProperty("TEST_PERSISTENCE", persistence);
373                message.setBooleanProperty("TEST_EXPLICIT", explicit);
374
375                for (int i = 0; i < iterationCount; i++)
376                {
377                   publisher.publish(message, persistence, 4, 0);
378                   log.debug("Published message " + i);
379                }
380
381                session.close();
382             }
383             catch (Exception JavaDoc e)
384             {
385                log.error("error", e);
386             }
387          }
388       };
389
390       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
391       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
392       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
393       try
394       {
395          MyMessageListener listener = new MyMessageListener(iterationCount, log);
396
397          queueConnection.start();
398          sendThread.start();
399          subscriber.setMessageListener(listener);
400          topicDurableConnection.start();
401          sendSynchMessage();
402          getLog().debug("Waiting for all messages");
403          synchronized (listener)
404          {
405             if (listener.i < iterationCount)
406                listener.wait();
407          }
408          getLog().debug("Got all messages");
409          subscriber.setMessageListener(null);
410          subscriber.close();
411
412          if (explicit)
413             session.rollback();
414          session.close();
415
416          sendThread.join();
417          topicDurableConnection.stop();
418          queueConnection.stop();
419          assertTrue("Topic should be full", drainDurableTopic() == iterationCount);
420       }
421       finally
422       {
423          removeDurableSubscription();
424       }
425    }
426
427    /**
428     * A unit test for JUnit
429     *
430     * @exception Exception Description of Exception
431     */

432    public void testQueueSendRollBack() throws Exception JavaDoc
433    {
434
435       getLog().debug("Starting AsynchQueueSendRollBack test");
436
437       runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, false);
438       runQueueSendRollBack(DeliveryMode.PERSISTENT, false);
439       runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true);
440       runQueueSendRollBack(DeliveryMode.PERSISTENT, true);
441
442       getLog().debug("AsynchQueueSendRollBack passed");
443    }
444
445    /**
446     * A unit test for JUnit
447     *
448     * @exception Exception Description of Exception
449     */

450    public void testAsynchQueueReceiveBack() throws Exception JavaDoc
451    {
452
453       getLog().debug("Starting AsynchQueueReceiveRollBack test");
454
455       runAsynchQueueReceiveRollBack(DeliveryMode.NON_PERSISTENT, false);
456       runAsynchQueueReceiveRollBack(DeliveryMode.PERSISTENT, false);
457       runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true);
458       runQueueSendRollBack(DeliveryMode.PERSISTENT, true);
459
460       getLog().debug("AsynchQueueReceiveRollBack passed");
461    }
462
463    /**
464     * A unit test for JUnit
465     *
466     * @exception Exception Description of Exception
467     */

468    public void testTopicSendRollBack() throws Exception JavaDoc
469    {
470
471       getLog().debug("Starting AsynchTopicSendRollBack test");
472
473       runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, false);
474       runTopicSendRollBack(DeliveryMode.PERSISTENT, false);
475       runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, true);
476       runTopicSendRollBack(DeliveryMode.PERSISTENT, true);
477
478       getLog().debug("AsynchTopicSendRollBack passed");
479    }
480
481    /**
482     * A unit test for JUnit
483     *
484     * @exception Exception Description of Exception
485     */

486    public void testAsynchTopicReceiveRollBack() throws Exception JavaDoc
487    {
488
489       getLog().debug("Starting AsynchTopicReceiveRollBack test");
490
491       runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, false);
492       runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, false);
493       runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true);
494       runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, true);
495
496       getLog().debug("AsynchTopicReceiveRollBack passed");
497    }
498
499    /**
500     * A unit test for JUnit
501     *
502     * @exception Exception Description of Exception
503     */

504    public void testAsynchDurableTopicReceiveRollBack() throws Exception JavaDoc
505    {
506
507       getLog().debug("Starting AsynchDurableTopicReceiveRollBack test");
508
509       runAsynchDurableTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, false);
510       runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT, false);
511       runAsynchDurableTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true);
512       runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT, true);
513
514       getLog().debug("AsynchDurableTopicReceiveRollBack passed");
515    }
516
517    /**
518     * A unit test for JUnit
519     *
520     * @exception Exception Description of Exception
521     */

522    public void removeDurableSubscription() throws Exception JavaDoc
523    {
524
525       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
526       session.unsubscribe("test");
527    }
528
529    /**
530     * The JUnit setup method
531     *
532     * @exception Exception Description of Exception
533     */

534    protected void setUp() throws Exception JavaDoc
535    {
536       super.setUp();
537       
538       getLog().debug("START TEST " + getName());
539       context = getInitialContext();
540
541       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
542       queueConnection = queueFactory.createQueueConnection();
543
544       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
545       topicConnection = topicFactory.createTopicConnection();
546       topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
547
548       getLog().debug("Connection to JBossMQ established.");
549    }
550    
551    protected void tearDown() throws Exception JavaDoc
552    {
553       try
554       {
555          if (topicDurableConnection != null)
556          {
557             topicDurableConnection.close();
558             topicDurableConnection = null;
559          }
560       }
561       catch (JMSException JavaDoc ignored)
562       {
563       }
564       try
565       {
566          if (topicConnection != null)
567          {
568             topicConnection.close();
569             topicConnection = null;
570          }
571       }
572       catch (JMSException JavaDoc ignored)
573       {
574       }
575       try
576       {
577          if (queueConnection != null)
578          {
579             queueConnection.close();
580             queueConnection = null;
581          }
582       }
583       catch (JMSException JavaDoc ignored)
584       {
585       }
586       super.tearDown();
587    }
588
589    // Emptys out all the messages in a queue
590
private int drainQueue() throws Exception JavaDoc
591    {
592       getLog().debug("Draining Queue");
593       queueConnection.start();
594
595       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
596       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
597
598       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
599       Message JavaDoc message = receiver.receive(50);
600       int c = 0;
601       while (message != null)
602       {
603          c++;
604          message = receiver.receive(50);
605       }
606
607       getLog().debug(" Drained " + c + " messages from the queue");
608
609       session.close();
610
611       queueConnection.stop();
612
613       return c;
614    }
615
616    // Emptys out all the messages in a topic
617
private int drainTopic() throws Exception JavaDoc
618    {
619       getLog().debug("Draining Topic");
620       topicConnection.start();
621
622       final TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
623       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
624       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
625
626       Message JavaDoc message = subscriber.receive(50);
627       int c = 0;
628       while (message != null)
629       {
630          c++;
631          message = subscriber.receive(50);
632       }
633
634       getLog().debug(" Drained " + c + " messages from the topic");
635
636       session.close();
637
638       topicConnection.stop();
639
640       return c;
641    }
642
643    // Emptys out all the messages in a durable topic
644
private int drainDurableTopic() throws Exception JavaDoc
645    {
646       getLog().debug("Draining Durable Topic");
647       topicDurableConnection.start();
648
649       final TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
650       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
651       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
652
653       Message JavaDoc message = subscriber.receive(50);
654       int c = 0;
655       while (message != null)
656       {
657          c++;
658          message = subscriber.receive(50);
659       }
660
661       getLog().debug(" Drained " + c + " messages from the durable topic");
662
663       session.close();
664
665       topicDurableConnection.stop();
666
667       return c;
668    }
669
670    private void waitForSynchMessage() throws Exception JavaDoc
671    {
672       getLog().debug("Waiting for Synch Message");
673       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
674       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
675
676       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
677       receiver.receive();
678       session.close();
679       getLog().debug("Got Synch Message");
680    }
681
682    private void sendSynchMessage() throws Exception JavaDoc
683    {
684       getLog().debug("Sending Synch Message");
685       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
686       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
687
688       QueueSender JavaDoc sender = session.createSender(queue);
689
690       Message JavaDoc message = session.createMessage();
691       sender.send(message);
692
693       session.close();
694       getLog().debug("Sent Synch Message");
695    }
696
697    public class MyMessageListener implements MessageListener JavaDoc
698    {
699       public int i = 0;
700
701       public int iterationCount;
702
703       public Logger log;
704
705       public MyMessageListener(int iterationCount, Logger log)
706       {
707          this.iterationCount = iterationCount;
708          this.log = log;
709       }
710
711       public void onMessage(Message JavaDoc message)
712       {
713          synchronized (this)
714          {
715             i++;
716             log.debug("Got message " + i);
717             if (i >= iterationCount)
718                this.notify();
719          }
720       }
721    }
722
723    public int getIterationCount()
724    {
725       return 5;
726    }
727
728    public static junit.framework.Test suite() throws Exception JavaDoc
729    {
730        ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
731        return getDeploySetup(RollBackUnitTestCase.class,
732                loader.getResource("messaging/test-destinations-service.xml").toString());
733    }
734 }
735
Popular Tags