KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsSessionRecoverTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.DeliveryMode JavaDoc;
22 import javax.jms.Destination JavaDoc;
23 import javax.jms.JMSException JavaDoc;
24 import javax.jms.Message JavaDoc;
25 import javax.jms.MessageConsumer JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27 import javax.jms.MessageProducer JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.TextMessage JavaDoc;
30
31 import org.apache.activemq.ActiveMQConnectionFactory;
32 import org.apache.activemq.command.ActiveMQQueue;
33 import org.apache.activemq.command.ActiveMQTopic;
34
35 import junit.framework.TestCase;
36 import java.util.concurrent.CountDownLatch JavaDoc;
37 import java.util.concurrent.TimeUnit JavaDoc;
38
39 /**
40  * Testcases to see if Session.recover() work.
41  * @version $Revision: 1.3 $
42  */

43 public class JmsSessionRecoverTest extends TestCase {
44
45     private Connection JavaDoc connection;
46     private ActiveMQConnectionFactory factory;
47     private Destination JavaDoc dest;
48
49     /**
50      * @see junit.framework.TestCase#setUp()
51      */

52     protected void setUp() throws Exception JavaDoc {
53         factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
54         connection = factory.createConnection();
55     }
56
57     /**
58      * @see junit.framework.TestCase#tearDown()
59      */

60     protected void tearDown() throws Exception JavaDoc {
61         if (connection != null) {
62             connection.close();
63             connection = null;
64         }
65     }
66
67     /**
68      *
69      * @throws JMSException
70      * @throws InterruptedException
71      */

72     public void testQueueSynchRecover() throws JMSException JavaDoc, InterruptedException JavaDoc {
73         dest = new ActiveMQQueue("Queue-"+System.currentTimeMillis());
74         doTestSynchRecover();
75     }
76
77     /**
78      *
79      * @throws JMSException
80      * @throws InterruptedException
81      */

82     public void testQueueAsynchRecover() throws JMSException JavaDoc, InterruptedException JavaDoc {
83         dest = new ActiveMQQueue("Queue-"+System.currentTimeMillis());
84         doTestAsynchRecover();
85     }
86
87     /**
88      *
89      * @throws JMSException
90      * @throws InterruptedException
91      */

92     public void testTopicSynchRecover() throws JMSException JavaDoc, InterruptedException JavaDoc {
93         dest = new ActiveMQTopic("Topic-"+System.currentTimeMillis());
94         doTestSynchRecover();
95     }
96
97     /**
98      *
99      * @throws JMSException
100      * @throws InterruptedException
101      */

102     public void testTopicAsynchRecover() throws JMSException JavaDoc, InterruptedException JavaDoc {
103         dest = new ActiveMQTopic("Topic-"+System.currentTimeMillis());
104         doTestAsynchRecover();
105     }
106
107     /**
108      *
109      * @throws JMSException
110      * @throws InterruptedException
111      */

112     public void testQueueAsynchRecoverWithAutoAck() throws JMSException JavaDoc, InterruptedException JavaDoc {
113         dest = new ActiveMQQueue("Queue-"+System.currentTimeMillis());
114         doTestAsynchRecoverWithAutoAck();
115     }
116
117     /**
118      *
119      * @throws JMSException
120      * @throws InterruptedException
121      */

122     public void testTopicAsynchRecoverWithAutoAck() throws JMSException JavaDoc, InterruptedException JavaDoc {
123         dest = new ActiveMQTopic("Topic-"+System.currentTimeMillis());
124         doTestAsynchRecoverWithAutoAck();
125     }
126     
127     /**
128      * Test to make sure that a Sync recover works.
129      *
130      * @throws JMSException
131      */

132     public void doTestSynchRecover() throws JMSException JavaDoc {
133         Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
134         MessageConsumer JavaDoc consumer = session.createConsumer(dest);
135         connection.start();
136                 
137         MessageProducer JavaDoc producer = session.createProducer(dest);
138         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
139         producer.send(session.createTextMessage("First"));
140         producer.send(session.createTextMessage("Second"));
141         
142         TextMessage JavaDoc message = (TextMessage JavaDoc)consumer.receive(1000);
143         assertEquals("First", message.getText());
144         assertFalse(message.getJMSRedelivered());
145         message.acknowledge();
146         
147         message = (TextMessage JavaDoc)consumer.receive(1000);
148         assertEquals("Second", message.getText());
149         assertFalse(message.getJMSRedelivered());
150         
151         session.recover();
152
153         message = (TextMessage JavaDoc)consumer.receive(2000);
154         assertEquals("Second", message.getText());
155         assertTrue(message.getJMSRedelivered());
156         
157         message.acknowledge();
158     }
159     
160     /**
161      * Test to make sure that a Async recover works.
162      *
163      * @throws JMSException
164      * @throws InterruptedException
165      */

166     public void doTestAsynchRecover() throws JMSException JavaDoc, InterruptedException JavaDoc {
167         
168         final Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
169         final String JavaDoc errorMessage[] = new String JavaDoc[]{null};
170         final CountDownLatch JavaDoc doneCountDownLatch = new CountDownLatch JavaDoc(1);
171         
172         MessageConsumer JavaDoc consumer = session.createConsumer(dest);
173         
174         MessageProducer JavaDoc producer = session.createProducer(dest);
175         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
176         producer.send(session.createTextMessage("First"));
177         producer.send(session.createTextMessage("Second"));
178         
179         consumer.setMessageListener(new MessageListener JavaDoc(){
180             int counter;
181             public void onMessage(Message msg) {
182                 counter++;
183                 try {
184                     TextMessage JavaDoc message = (TextMessage JavaDoc)msg;
185                     switch( counter ) {
186                         case 1:
187                             assertEquals("First", message.getText());
188                             assertFalse(message.getJMSRedelivered());
189                             message.acknowledge();
190                             
191                             break;
192                         case 2:
193                             assertEquals("Second", message.getText());
194                             assertFalse(message.getJMSRedelivered());
195                             session.recover();
196                             break;
197                             
198                         case 3:
199                             assertEquals("Second", message.getText());
200                             assertTrue(message.getJMSRedelivered());
201                             message.acknowledge();
202                             doneCountDownLatch.countDown();
203                             break;
204                             
205                         default:
206                             errorMessage[0]="Got too many messages: "+counter;
207                             doneCountDownLatch.countDown();
208                     }
209                 } catch (Throwable JavaDoc e) {
210                     e.printStackTrace();
211                     errorMessage[0]="Got exception: "+e;
212                     doneCountDownLatch.countDown();
213                 }
214             }
215         });
216         connection.start();
217         
218         if( doneCountDownLatch.await(5, TimeUnit.SECONDS) ) {
219             if( errorMessage[0]!=null ) {
220                 fail(errorMessage[0]);
221             }
222         } else {
223             fail("Timeout waiting for async message delivery to complete.");
224         }
225
226     }
227     
228     /**
229      * Test to make sure that a Async recover works when using AUTO_ACKNOWLEDGE.
230      *
231      * @throws JMSException
232      * @throws InterruptedException
233      */

234     public void doTestAsynchRecoverWithAutoAck() throws JMSException JavaDoc, InterruptedException JavaDoc {
235         
236         final Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
237         final String JavaDoc errorMessage[] = new String JavaDoc[]{null};
238         final CountDownLatch JavaDoc doneCountDownLatch = new CountDownLatch JavaDoc(1);
239         
240         MessageConsumer JavaDoc consumer = session.createConsumer(dest);
241         
242         MessageProducer JavaDoc producer = session.createProducer(dest);
243         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
244         producer.send(session.createTextMessage("First"));
245         producer.send(session.createTextMessage("Second"));
246         
247         consumer.setMessageListener(new MessageListener JavaDoc(){
248             int counter;
249             public void onMessage(Message msg) {
250                 counter++;
251                 try {
252                     TextMessage JavaDoc message = (TextMessage JavaDoc)msg;
253                     switch( counter ) {
254                         case 1:
255                             assertEquals("First", message.getText());
256                             assertFalse(message.getJMSRedelivered());
257                             break;
258                         case 2:
259                             // This should rollback the delivery of this message.. and re-deliver.
260
assertEquals("Second", message.getText());
261                             assertFalse(message.getJMSRedelivered());
262                             session.recover();
263                             break;
264                             
265                         case 3:
266                             assertEquals("Second", message.getText());
267                             assertTrue(message.getJMSRedelivered());
268                             doneCountDownLatch.countDown();
269                             break;
270                             
271                         default:
272                             errorMessage[0]="Got too many messages: "+counter;
273                             doneCountDownLatch.countDown();
274                     }
275                 } catch (Throwable JavaDoc e) {
276                     e.printStackTrace();
277                     errorMessage[0]="Got exception: "+e;
278                     doneCountDownLatch.countDown();
279                 }
280             }
281         });
282         connection.start();
283         
284         if( doneCountDownLatch.await(5000, TimeUnit.SECONDS) ) {
285             if( errorMessage[0]!=null ) {
286                 fail(errorMessage[0]);
287             }
288         } else {
289             fail("Timeout waiting for async message delivery to complete.");
290         }
291     }
292 }
293
Popular Tags