KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmessaging > test > RollBackUnitTestCase


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmessaging.test;
23
24 import javax.jms.BytesMessage JavaDoc;
25 import javax.jms.DeliveryMode JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.jms.Topic JavaDoc;
36 import javax.jms.TopicConnection JavaDoc;
37 import javax.jms.TopicConnectionFactory JavaDoc;
38 import javax.jms.TopicPublisher JavaDoc;
39 import javax.jms.TopicSession JavaDoc;
40 import javax.jms.TopicSubscriber JavaDoc;
41 import javax.jms.Queue JavaDoc;
42 import javax.naming.Context JavaDoc;
43
44 import org.jboss.logging.Logger;
45 import org.jboss.test.jbossmessaging.JMSTestCase;
46
47 /**
48  * Rollback tests
49  *
50  * @author <a HREF="mailto:richard.achmatowicz@jboss.com">Richard Achmatowicz</a>
51  * @author
52  * @version
53  */

54 public class RollBackUnitTestCase extends JMSTestCase
55 {
56
57    // Provider specific
58
static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
59
60    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
61
62    static String JavaDoc TEST_QUEUE = "queue/testQueue";
63
64    static String JavaDoc TEST_TOPIC = "topic/testTopic";
65
66    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
67
68    static byte[] PAYLOAD = new byte[10];
69
70    static Context JavaDoc context;
71
72    static QueueConnection JavaDoc queueConnection;
73
74    static TopicConnection JavaDoc topicConnection;
75
76    static TopicConnection JavaDoc topicDurableConnection;
77
78    /**
79     * Constructor the test
80     *
81     * @param name Description of Parameter
82     * @exception Exception Description of Exception
83     */

84    public RollBackUnitTestCase(String JavaDoc name) throws Exception JavaDoc
85    {
86       super(name);
87    }
88
89    /**
90     * #Description of the Method
91     *
92     * @param persistence Description of Parameter
93     * @exception Exception Description of Exception
94     */

95    public void runQueueSendRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
96    {
97       drainQueue();
98       final int iterationCount = getIterationCount();
99       final Logger log = getLog();
100
101       Thread JavaDoc sendThread = new Thread JavaDoc()
102       {
103          public void run()
104          {
105             try
106             {
107                QueueSession JavaDoc session = queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
108                Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
109
110                QueueSender JavaDoc sender = session.createSender(queue);
111
112                BytesMessage JavaDoc message = session.createBytesMessage();
113                message.writeBytes(PAYLOAD);
114                message.setStringProperty("TEST_NAME", "runQueueSendRollback");
115                message.setIntProperty("TEST_PERSISTENCE", persistence);
116                message.setBooleanProperty("TEST_EXPLICIT", explicit);
117                
118                for (int i = 0; i < iterationCount; i++)
119                {
120                   sender.send(message, persistence, 4, 0);
121                }
122
123                if (explicit)
124                   session.rollback();
125                session.close();
126             }
127             catch (Exception JavaDoc e)
128             {
129                log.error("error", e);
130             }
131          }
132       };
133
134       sendThread.start();
135       sendThread.join();
136       assertTrue("Queue should be empty", drainQueue() == 0);
137    }
138
139    /**
140     * #Description of the Method
141     *
142     * @param persistence Description of Parameter
143     * @exception Exception Description of Exception
144     */

145    public void runTopicSendRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
146    {
147       drainQueue();
148       drainTopic();
149
150       final int iterationCount = getIterationCount();
151       final Logger log = getLog();
152
153       Thread JavaDoc sendThread = new Thread JavaDoc()
154       {
155          public void run()
156          {
157             try
158             {
159
160                TopicSession JavaDoc session = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
161                Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
162
163                TopicPublisher JavaDoc publisher = session.createPublisher(topic);
164
165                BytesMessage JavaDoc message = session.createBytesMessage();
166                message.writeBytes(PAYLOAD);
167                message.setStringProperty("TEST_NAME", "runTopicSendRollback");
168                message.setIntProperty("TEST_PERSISTENCE", persistence);
169                message.setBooleanProperty("TEST_EXPLICIT", explicit);
170
171                for (int i = 0; i < iterationCount; i++)
172                {
173                   publisher.publish(message, persistence, 4, 0);
174                }
175
176                session.close();
177             }
178             catch (Exception JavaDoc e)
179             {
180                log.error("error", e);
181             }
182          }
183       };
184
185       sendThread.start();
186       sendThread.join();
187       assertTrue("Topic should be empty", drainTopic() == 0);
188    }
189
190    /**
191     * #Description of the Method
192     *
193     * @param persistence Description of Parameter
194     * @exception Exception Description of Exception
195     */

196    public void runAsynchQueueReceiveRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
197    {
198       drainQueue();
199
200       final int iterationCount = getIterationCount();
201       final Logger log = getLog();
202
203       Thread JavaDoc sendThread = new Thread JavaDoc()
204       {
205          public void run()
206          {
207             try
208             {
209                QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
210                Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
211
212                QueueSender JavaDoc sender = session.createSender(queue);
213
214                BytesMessage JavaDoc message = session.createBytesMessage();
215                message.writeBytes(PAYLOAD);
216                message.setStringProperty("TEST_NAME", "runAsynchQueueReceiveRollback");
217                message.setIntProperty("TEST_PERSISTENCE", persistence);
218                message.setBooleanProperty("TEST_EXPLICIT", explicit);
219
220                for (int i = 0; i < iterationCount; i++)
221                {
222                   sender.send(message, persistence, 4, 0);
223                }
224
225                session.close();
226             }
227             catch (Exception JavaDoc e)
228             {
229                log.error("error", e);
230             }
231          }
232       };
233
234       QueueSession JavaDoc session = queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
235       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
236       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
237
238       MyMessageListener listener = new MyMessageListener(iterationCount, log);
239
240       sendThread.start();
241       receiver.setMessageListener(listener);
242       queueConnection.start();
243       synchronized (listener)
244       {
245          if (listener.i < iterationCount)
246             listener.wait();
247       }
248       receiver.setMessageListener(null);
249
250       if (explicit)
251          session.rollback();
252       session.close();
253
254       queueConnection.stop();
255
256       sendThread.join();
257
258       assertTrue("Queue should be full", drainQueue() == iterationCount);
259
260    }
261
262    /**
263     * #Description of the Method
264     *
265     * @param persistence Description of Parameter
266     * @exception Exception Description of Exception
267     */

268    public void runAsynchTopicReceiveRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
269    {
270       drainQueue();
271       drainTopic();
272
273       final int iterationCount = getIterationCount();
274       final Logger log = getLog();
275
276       Thread JavaDoc sendThread = new Thread JavaDoc()
277       {
278          public void run()
279          {
280             try
281             {
282
283                TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
284                Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
285
286                TopicPublisher JavaDoc publisher = session.createPublisher(topic);
287
288                waitForSynchMessage();
289
290                BytesMessage JavaDoc message = session.createBytesMessage();
291                message.writeBytes(PAYLOAD);
292                message.setStringProperty("TEST_NAME", "runAsynchTopicReceiveRollback");
293                message.setIntProperty("TEST_PERSISTENCE", persistence);
294                message.setBooleanProperty("TEST_EXPLICIT", explicit);
295
296                for (int i = 0; i < iterationCount; i++)
297                {
298                   publisher.publish(message, persistence, 4, 0);
299                   log.debug("Published message " + i);
300                }
301
302                session.close();
303             }
304             catch (Exception JavaDoc e)
305             {
306                log.error("error", e);
307             }
308          }
309       };
310
311       TopicSession JavaDoc session = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
312       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
313       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
314
315       MyMessageListener listener = new MyMessageListener(iterationCount, log);
316
317       queueConnection.start();
318       sendThread.start();
319       subscriber.setMessageListener(listener);
320       topicConnection.start();
321       sendSynchMessage();
322       getLog().debug("Waiting for all messages");
323       synchronized (listener)
324       {
325          if (listener.i < iterationCount)
326             listener.wait();
327       }
328       getLog().debug("Got all messages");
329       subscriber.setMessageListener(null);
330
331       if (explicit)
332          session.rollback();
333       session.close();
334
335       sendThread.join();
336       topicConnection.stop();
337       queueConnection.stop();
338       assertTrue("Topic should be empty", drainTopic() == 0);
339    }
340
341    /**
342     * #Description of the Method
343     *
344     * @param persistence Description of Parameter
345     * @exception Exception Description of Exception
346     */

347    public void runAsynchDurableTopicReceiveRollBack(final int persistence, final boolean explicit) throws Exception JavaDoc
348    {
349       getLog().debug("====> runAsynchDurableTopicReceiveRollBack persistence=" + persistence + " explicit=" + explicit);
350       drainQueue();
351       drainDurableTopic();
352
353       final int iterationCount = getIterationCount();
354       final Logger log = getLog();
355
356       Thread JavaDoc sendThread = new Thread JavaDoc()
357       {
358          public void run()
359          {
360             try
361             {
362
363                TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
364                Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
365
366                TopicPublisher JavaDoc publisher = session.createPublisher(topic);
367
368                waitForSynchMessage();
369
370                BytesMessage JavaDoc message = session.createBytesMessage();
371                message.writeBytes(PAYLOAD);
372                message.setStringProperty("TEST_NAME", "runAsynchDurableTopicReceiveRollback");
373                message.setIntProperty("TEST_PERSISTENCE", persistence);
374                message.setBooleanProperty("TEST_EXPLICIT", explicit);
375
376                for (int i = 0; i < iterationCount; i++)
377                {
378                   publisher.publish(message, persistence, 4, 0);
379                   log.debug("Published message " + i);
380                }
381
382                session.close();
383             }
384             catch (Exception JavaDoc e)
385             {
386                log.error("error", e);
387             }
388          }
389       };
390
391       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
392       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
393       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
394       try
395       {
396          MyMessageListener listener = new MyMessageListener(iterationCount, log);
397
398          queueConnection.start();
399          sendThread.start();
400          subscriber.setMessageListener(listener);
401          topicDurableConnection.start();
402          sendSynchMessage();
403          getLog().debug("Waiting for all messages");
404          synchronized (listener)
405          {
406             if (listener.i < iterationCount)
407                listener.wait();
408          }
409          getLog().debug("Got all messages");
410          subscriber.setMessageListener(null);
411          subscriber.close();
412
413          if (explicit)
414             session.rollback();
415          session.close();
416
417          sendThread.join();
418          topicDurableConnection.stop();
419          queueConnection.stop();
420          assertTrue("Topic should be full", drainDurableTopic() == iterationCount);
421       }
422       finally
423       {
424          removeDurableSubscription();
425       }
426    }
427
428    /**
429     * A unit test for JUnit
430     *
431     * @exception Exception Description of Exception
432     */

433    public void testQueueSendRollBack() throws Exception JavaDoc
434    {
435
436       getLog().debug("Starting AsynchQueueSendRollBack test");
437
438       runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, false);
439       runQueueSendRollBack(DeliveryMode.PERSISTENT, false);
440       runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true);
441       runQueueSendRollBack(DeliveryMode.PERSISTENT, true);
442
443       getLog().debug("AsynchQueueSendRollBack passed");
444    }
445
446    /**
447     * A unit test for JUnit
448     *
449     * @exception Exception Description of Exception
450     */

451    public void testAsynchQueueReceiveBack() throws Exception JavaDoc
452    {
453
454       getLog().debug("Starting AsynchQueueReceiveRollBack test");
455
456       runAsynchQueueReceiveRollBack(DeliveryMode.NON_PERSISTENT, false);
457       runAsynchQueueReceiveRollBack(DeliveryMode.PERSISTENT, false);
458       runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true);
459       runQueueSendRollBack(DeliveryMode.PERSISTENT, true);
460
461       getLog().debug("AsynchQueueReceiveRollBack passed");
462    }
463
464    /**
465     * A unit test for JUnit
466     *
467     * @exception Exception Description of Exception
468     */

469    public void testTopicSendRollBack() throws Exception JavaDoc
470    {
471
472       getLog().debug("Starting AsynchTopicSendRollBack test");
473
474       runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, false);
475       runTopicSendRollBack(DeliveryMode.PERSISTENT, false);
476       runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, true);
477       runTopicSendRollBack(DeliveryMode.PERSISTENT, true);
478
479       getLog().debug("AsynchTopicSendRollBack passed");
480    }
481
482    /**
483     * A unit test for JUnit
484     *
485     * @exception Exception Description of Exception
486     */

487    public void testAsynchTopicReceiveRollBack() throws Exception JavaDoc
488    {
489
490       getLog().debug("Starting AsynchTopicReceiveRollBack test");
491
492       runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, false);
493       runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, false);
494       runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true);
495       runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, true);
496
497       getLog().debug("AsynchTopicReceiveRollBack passed");
498    }
499
500    /**
501     * A unit test for JUnit
502     *
503     * @exception Exception Description of Exception
504     */

505    public void testAsynchDurableTopicReceiveRollBack() throws Exception JavaDoc
506    {
507
508       getLog().debug("Starting AsynchDurableTopicReceiveRollBack test");
509
510       runAsynchDurableTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, false);
511       runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT, false);
512       runAsynchDurableTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true);
513       runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT, true);
514
515       getLog().debug("AsynchDurableTopicReceiveRollBack passed");
516    }
517
518    /**
519     * A unit test for JUnit
520     *
521     * @exception Exception Description of Exception
522     */

523    public void removeDurableSubscription() throws Exception JavaDoc
524    {
525
526       TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
527       session.unsubscribe("test");
528    }
529
530    /**
531     * The JUnit setup method
532     *
533     * @exception Exception Description of Exception
534     */

535    protected void setUp() throws Exception JavaDoc
536    {
537       super.setUp();
538       
539       getLog().debug("START TEST " + getName());
540       context = getInitialContext();
541
542       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
543       queueConnection = queueFactory.createQueueConnection();
544
545       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
546       topicConnection = topicFactory.createTopicConnection();
547       topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
548
549       getLog().debug("Connection to JBossMQ established.");
550    }
551    
552    protected void tearDown() throws Exception JavaDoc
553    {
554       try
555       {
556          if (topicDurableConnection != null)
557          {
558             topicDurableConnection.close();
559             topicDurableConnection = null;
560          }
561       }
562       catch (JMSException JavaDoc ignored)
563       {
564       }
565       try
566       {
567          if (topicConnection != null)
568          {
569             topicConnection.close();
570             topicConnection = null;
571          }
572       }
573       catch (JMSException JavaDoc ignored)
574       {
575       }
576       try
577       {
578          if (queueConnection != null)
579          {
580             queueConnection.close();
581             queueConnection = null;
582          }
583       }
584       catch (JMSException JavaDoc ignored)
585       {
586       }
587       super.tearDown();
588    }
589
590    // Emptys out all the messages in a queue
591
private int drainQueue() throws Exception JavaDoc
592    {
593       getLog().debug("Draining Queue");
594       queueConnection.start();
595
596       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
597       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
598
599       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
600       Message JavaDoc message = receiver.receive(50);
601       int c = 0;
602       while (message != null)
603       {
604          c++;
605          message = receiver.receive(50);
606       }
607
608       getLog().debug(" Drained " + c + " messages from the queue");
609
610       session.close();
611
612       queueConnection.stop();
613
614       return c;
615    }
616
617    // Emptys out all the messages in a topic
618
private int drainTopic() throws Exception JavaDoc
619    {
620       getLog().debug("Draining Topic");
621       topicConnection.start();
622
623       final TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
624       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
625       TopicSubscriber JavaDoc subscriber = session.createSubscriber(topic);
626
627       Message JavaDoc message = subscriber.receive(50);
628       int c = 0;
629       while (message != null)
630       {
631          c++;
632          message = subscriber.receive(50);
633       }
634
635       getLog().debug(" Drained " + c + " messages from the topic");
636
637       session.close();
638
639       topicConnection.stop();
640
641       return c;
642    }
643
644    // Emptys out all the messages in a durable topic
645
private int drainDurableTopic() throws Exception JavaDoc
646    {
647       getLog().debug("Draining Durable Topic");
648       topicDurableConnection.start();
649
650       final TopicSession JavaDoc session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
651       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
652       TopicSubscriber JavaDoc subscriber = session.createDurableSubscriber(topic, "test");
653
654       Message JavaDoc message = subscriber.receive(50);
655       int c = 0;
656       while (message != null)
657       {
658          c++;
659          message = subscriber.receive(50);
660       }
661
662       getLog().debug(" Drained " + c + " messages from the durable topic");
663
664       session.close();
665
666       topicDurableConnection.stop();
667
668       return c;
669    }
670
671    private void waitForSynchMessage() throws Exception JavaDoc
672    {
673       getLog().debug("Waiting for Synch Message");
674       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
675       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
676
677       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
678       receiver.receive();
679       session.close();
680       getLog().debug("Got Synch Message");
681    }
682
683    private void sendSynchMessage() throws Exception JavaDoc
684    {
685       getLog().debug("Sending Synch Message");
686       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
687       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
688
689       QueueSender JavaDoc sender = session.createSender(queue);
690
691       Message JavaDoc message = session.createMessage();
692       sender.send(message);
693
694       session.close();
695       getLog().debug("Sent Synch Message");
696    }
697
698    public class MyMessageListener implements MessageListener JavaDoc
699    {
700       public int i = 0;
701
702       public int iterationCount;
703
704       public Logger log;
705
706       public MyMessageListener(int iterationCount, Logger log)
707       {
708          this.iterationCount = iterationCount;
709          this.log = log;
710       }
711
712       public void onMessage(Message JavaDoc message)
713       {
714          synchronized (this)
715          {
716             i++;
717             log.debug("Got message " + i);
718             if (i >= iterationCount)
719                this.notify();
720          }
721       }
722    }
723
724    public int getIterationCount()
725    {
726       return 5;
727    }
728
729    public static junit.framework.Test suite() throws Exception JavaDoc
730    {
731        ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
732        String JavaDoc resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
733
734        return getDeploySetup(RollBackUnitTestCase.class,
735                loader.getResource(resourceName).toString());
736    }
737 }
738
Popular Tags