KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsTransactionTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.net.URI JavaDoc;
21 import java.net.URISyntaxException JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.List JavaDoc;
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.ConnectionFactory JavaDoc;
26 import javax.jms.Destination JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.Message JavaDoc;
29 import javax.jms.MessageConsumer JavaDoc;
30 import javax.jms.MessageListener JavaDoc;
31 import javax.jms.MessageProducer JavaDoc;
32 import javax.jms.ObjectMessage JavaDoc;
33 import javax.jms.Session JavaDoc;
34 import javax.jms.TextMessage JavaDoc;
35 import org.apache.activemq.broker.BrokerFactory;
36 import org.apache.activemq.broker.BrokerService;
37 import org.apache.activemq.test.JmsResourceProvider;
38 import org.apache.activemq.test.TestSupport;
39
40 /**
41  * @version $Revision: 1.9 $
42  */

43 abstract public class JmsTransactionTestSupport extends TestSupport implements MessageListener JavaDoc {
44     
45     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
46             .getLog(JmsTransactionTestSupport.class);
47
48     protected ConnectionFactory JavaDoc connectionFactory;
49     protected Connection JavaDoc connection;
50     protected Session JavaDoc session;
51     protected MessageConsumer JavaDoc consumer;
52     protected MessageProducer JavaDoc producer;
53     protected JmsResourceProvider resourceProvider;
54     protected Destination JavaDoc destination;
55     
56     // for message listener test
57
private final int messageCount = 5;
58     private final String JavaDoc messageText = "message";
59     private List JavaDoc unackMessages = new ArrayList JavaDoc(messageCount);
60     private List JavaDoc ackMessages = new ArrayList JavaDoc(messageCount);
61     private boolean resendPhase = false;
62     protected int batchCount = 10;
63     protected int batchSize = 20;
64
65     protected BrokerService broker;
66
67     public JmsTransactionTestSupport() {
68         super();
69     }
70
71     public JmsTransactionTestSupport(String JavaDoc name) {
72         super(name);
73     }
74
75
76     /* (non-Javadoc)
77      * @see junit.framework.TestCase#setUp()
78      */

79     protected void setUp() throws Exception JavaDoc {
80         broker = createBroker();
81         broker.start();
82         
83         resourceProvider = getJmsResourceProvider();
84         topic = resourceProvider.isTopic();
85         // We will be using transacted sessions.
86
resourceProvider.setTransacted(true);
87         connectionFactory = resourceProvider.createConnectionFactory();
88         reconnect();
89     }
90
91     /**
92      */

93     protected BrokerService createBroker() throws Exception JavaDoc, URISyntaxException JavaDoc {
94         return BrokerFactory.createBroker(new URI JavaDoc("broker://()/localhost?persistent=false"));
95     }
96
97     /* (non-Javadoc)
98      * @see junit.framework.TestCase#tearDown()
99      */

100     protected void tearDown() throws Exception JavaDoc {
101         log.info("Closing down connection");
102
103         session.close();
104         session=null;
105         connection.close();
106         connection=null;
107         broker.stop();
108         broker=null;
109         
110         log.info("Connection closed.");
111     }
112
113     protected abstract JmsResourceProvider getJmsResourceProvider();
114
115     /**
116      * Sends a batch of messages and validates that the messages are received.
117      *
118      * @throws Exception
119      */

120     public void testSendReceiveTransactedBatches() throws Exception JavaDoc {
121        
122         TextMessage JavaDoc message = session.createTextMessage("Batch Message");
123
124         for (int j = 0; j < batchCount; j++) {
125             log.info("Producing bacth " + j + " of " + batchSize + " messages");
126
127             for (int i = 0; i < batchSize; i++) {
128                 producer.send(message);
129             }
130
131             session.commit();
132             log.info("Consuming bacth " + j + " of " + batchSize + " messages");
133
134             for (int i = 0; i < batchSize; i++) {
135                 message = (TextMessage JavaDoc) consumer.receive(1000 * 5);
136                 assertNotNull("Received only " + i + " messages in batch " + j, message);
137                 assertEquals("Batch Message", message.getText());
138             }
139
140             session.commit();
141         }
142     }
143
144     /**
145      * Sends a batch of messages and validates that the rollbacked message was not consumed.
146      *
147      * @throws Exception
148      */

149     public void testSendRollback() throws Exception JavaDoc {
150         Message[] outbound = new Message[]{
151             session.createTextMessage("First Message"),
152             session.createTextMessage("Second Message")
153         };
154
155         //sends a message
156
producer.send(outbound[0]);
157         session.commit();
158
159         //sends a message that gets rollbacked
160
producer.send(session.createTextMessage("I'm going to get rolled back."));
161         session.rollback();
162         
163         //sends a message
164
producer.send(outbound[1]);
165         session.commit();
166
167         //receives the first message
168
ArrayList JavaDoc messages = new ArrayList JavaDoc();
169         log.info("About to consume message 1");
170         Message message = consumer.receive(1000);
171         messages.add(message);
172         log.info("Received: " + message);
173
174         //receives the second message
175
log.info("About to consume message 2");
176         message = consumer.receive(4000);
177         messages.add(message);
178         log.info("Received: " + message);
179
180         //validates that the rollbacked was not consumed
181
session.commit();
182         Message inbound[] = new Message[messages.size()];
183         messages.toArray(inbound);
184         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
185     }
186     
187     /**
188      * Sends a batch of messages and validates that the message sent before session close is not consumed.
189      *
190      * @throws Exception
191      */

192     public void testSendSessionClose() throws Exception JavaDoc {
193         Message[] outbound = new Message[]{
194                 session.createTextMessage("First Message"),
195                 session.createTextMessage("Second Message")
196         };
197         
198         //sends a message
199
producer.send(outbound[0]);
200         session.commit();
201         
202         //sends a message that gets rollbacked
203
producer.send(session.createTextMessage("I'm going to get rolled back."));
204         consumer.close();
205         
206         reconnectSession();
207         
208         //sends a message
209
producer.send(outbound[1]);
210         session.commit();
211         
212         //receives the first message
213
ArrayList JavaDoc messages = new ArrayList JavaDoc();
214         log.info("About to consume message 1");
215         Message message = consumer.receive(1000);
216         messages.add(message);
217         log.info("Received: " + message);
218         
219         //receives the second message
220
log.info("About to consume message 2");
221         message = consumer.receive(4000);
222         messages.add(message);
223         log.info("Received: " + message);
224         
225         //validates that the rollbacked was not consumed
226
session.commit();
227         Message inbound[] = new Message[messages.size()];
228         messages.toArray(inbound);
229         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
230     }
231
232     /**
233      * Sends a batch of messages and validates that the message sent before session close is not consumed.
234      *
235      * @throws Exception
236      */

237     public void testSendSessionAndConnectionClose() throws Exception JavaDoc {
238         Message[] outbound = new Message[]{
239             session.createTextMessage("First Message"),
240             session.createTextMessage("Second Message")
241         };
242
243         //sends a message
244
producer.send(outbound[0]);
245         session.commit();
246
247         //sends a message that gets rollbacked
248
producer.send(session.createTextMessage("I'm going to get rolled back."));
249         consumer.close();
250         session.close();
251
252         reconnect();
253
254         //sends a message
255
producer.send(outbound[1]);
256         session.commit();
257
258         //receives the first message
259
ArrayList JavaDoc messages = new ArrayList JavaDoc();
260         log.info("About to consume message 1");
261         Message message = consumer.receive(1000);
262         messages.add(message);
263         log.info("Received: " + message);
264
265         //receives the second message
266
log.info("About to consume message 2");
267         message = consumer.receive(4000);
268         messages.add(message);
269         log.info("Received: " + message);
270
271         //validates that the rollbacked was not consumed
272
session.commit();
273         Message inbound[] = new Message[messages.size()];
274         messages.toArray(inbound);
275         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
276     }
277
278     /**
279      * Sends a batch of messages and validates that the rollbacked message was redelivered.
280      *
281      * @throws Exception
282      */

283     public void testReceiveRollback() throws Exception JavaDoc {
284         Message[] outbound = new Message[]{
285             session.createTextMessage("First Message"),
286             session.createTextMessage("Second Message")
287         };
288
289         // lets consume any outstanding messages from prev test runs
290
while (consumer.receive(1000) != null) {
291         }
292         session.commit();
293
294         //sent both messages
295
producer.send(outbound[0]);
296         producer.send(outbound[1]);
297         session.commit();
298
299         log.info("Sent 0: " + outbound[0]);
300         log.info("Sent 1: " + outbound[1]);
301
302         ArrayList JavaDoc messages = new ArrayList JavaDoc();
303         Message message = consumer.receive(1000);
304         messages.add(message);
305         assertEquals(outbound[0], message);
306         session.commit();
307
308         // rollback so we can get that last message again.
309
message = consumer.receive(1000);
310         assertNotNull(message);
311         assertEquals(outbound[1], message);
312         session.rollback();
313
314         // Consume again.. the prev message should
315
// get redelivered.
316
message = consumer.receive(5000);
317         assertNotNull("Should have re-received the message again!", message);
318         messages.add(message);
319         session.commit();
320
321         Message inbound[] = new Message[messages.size()];
322         messages.toArray(inbound);
323         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
324     }
325
326     /**
327      * Sends a batch of messages and validates that the rollbacked message was redelivered.
328      *
329      * @throws Exception
330      */

331     public void testReceiveTwoThenRollback() throws Exception JavaDoc {
332         Message[] outbound = new Message[]{
333             session.createTextMessage("First Message"),
334             session.createTextMessage("Second Message")
335         };
336
337         // lets consume any outstanding messages from prev test runs
338
while (consumer.receive(1000) != null) {
339         }
340         session.commit();
341
342         //
343
producer.send(outbound[0]);
344         producer.send(outbound[1]);
345         session.commit();
346
347         log.info("Sent 0: " + outbound[0]);
348         log.info("Sent 1: " + outbound[1]);
349
350         ArrayList JavaDoc messages = new ArrayList JavaDoc();
351         Message message = consumer.receive(1000);
352         assertEquals(outbound[0], message);
353
354         message = consumer.receive(1000);
355         assertNotNull(message);
356         assertEquals(outbound[1], message);
357         session.rollback();
358
359         // Consume again.. the prev message should
360
// get redelivered.
361
message = consumer.receive(5000);
362         assertNotNull("Should have re-received the first message again!", message);
363         messages.add(message);
364         assertEquals(outbound[0], message);
365         message = consumer.receive(5000);
366         assertNotNull("Should have re-received the second message again!", message);
367         messages.add(message);
368         assertEquals(outbound[1], message);
369         
370         assertNull(consumer.receiveNoWait());
371         session.commit();
372
373         Message inbound[] = new Message[messages.size()];
374         messages.toArray(inbound);
375         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
376     }
377
378     /**
379      * Sends a batch of messages and validates that the rollbacked message was not consumed.
380      *
381      * @throws Exception
382      */

383     public void testSendReceiveWithPrefetchOne() throws Exception JavaDoc {
384         setPrefetchToOne();
385         Message[] outbound = new Message[]{
386             session.createTextMessage("First Message"),
387             session.createTextMessage("Second Message"),
388             session.createTextMessage("Third Message"),
389             session.createTextMessage("Fourth Message")
390         };
391
392         for (int i = 0; i < outbound.length; i++) {
393             //sends a message
394
producer.send(outbound[i]);
395         }
396         session.commit();
397
398         //receives the first message
399
for (int i = 0; i < outbound.length; i++) {
400             log.info("About to consume message 1");
401             Message message = consumer.receive(1000);
402             assertNotNull(message);
403             log.info("Received: " + message);
404         }
405
406         //validates that the rollbacked was not consumed
407
session.commit();
408     }
409
410     /**
411      * Perform the test that validates if the rollbacked message was redelivered multiple times.
412      *
413      * @throws Exception
414      */

415     public void testReceiveTwoThenRollbackManyTimes() throws Exception JavaDoc {
416         for (int i = 0; i < 5; i++)
417             testReceiveTwoThenRollback();
418     }
419
420     /**
421      * Sends a batch of messages and validates that the rollbacked message was not consumed. This test differs by
422      * setting the message prefetch to one.
423      *
424      * @throws Exception
425      */

426     public void testSendRollbackWithPrefetchOfOne() throws Exception JavaDoc {
427         setPrefetchToOne();
428         testSendRollback();
429     }
430
431     /**
432      * Sends a batch of messages and and validates that the rollbacked message was redelivered. This test differs by
433      * setting the message prefetch to one.
434      *
435      * @throws Exception
436      */

437     public void testReceiveRollbackWithPrefetchOfOne() throws Exception JavaDoc {
438         setPrefetchToOne();
439         testReceiveRollback();
440     }
441
442     /**
443      * Tests if the messages can still be received if the consumer is closed (session is not closed).
444      *
445      * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
446      */

447     public void testCloseConsumerBeforeCommit() throws Exception JavaDoc {
448         TextMessage JavaDoc[] outbound = new TextMessage JavaDoc[]{
449             session.createTextMessage("First Message"),
450             session.createTextMessage("Second Message")
451         };
452
453         // lets consume any outstanding messages from prev test runs
454
while (consumer.receiveNoWait() != null) {
455         }
456
457         session.commit();
458
459         //sends the messages
460
producer.send(outbound[0]);
461         producer.send(outbound[1]);
462         session.commit();
463         log.info("Sent 0: " + outbound[0]);
464         log.info("Sent 1: " + outbound[1]);
465
466         TextMessage JavaDoc message = (TextMessage JavaDoc) consumer.receive(1000);
467         assertEquals(outbound[0].getText(), message.getText());
468         // Close the consumer before the commit. This should not cause the received message
469
// to rollback.
470
consumer.close();
471         session.commit();
472
473         // Create a new consumer
474
consumer = resourceProvider.createConsumer(session, destination);
475         log.info("Created consumer: " + consumer);
476
477         message = (TextMessage JavaDoc) consumer.receive(1000);
478         assertEquals(outbound[1].getText(), message.getText());
479         session.commit();
480     }
481
482
483     public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception JavaDoc {
484         ArrayList JavaDoc list = new ArrayList JavaDoc();
485         list.add("First");
486         Message outbound = session.createObjectMessage(list);
487         outbound.setStringProperty("foo", "abc");
488
489         producer.send(outbound);
490         session.commit();
491
492         log.info("About to consume message 1");
493         Message message = consumer.receive(5000);
494
495         List JavaDoc body = assertReceivedObjectMessageWithListBody(message);
496
497         // now lets try mutate it
498
try {
499             message.setStringProperty("foo", "def");
500             fail("Cannot change properties of the object!");
501         }
502         catch (JMSException JavaDoc e) {
503             log.info("Caught expected exception: " + e, e);
504         }
505         body.clear();
506         body.add("This should never be seen!");
507         session.rollback();
508
509         message = consumer.receive(5000);
510         List JavaDoc secondBody = assertReceivedObjectMessageWithListBody(message);
511         assertNotSame("Second call should return a different body", secondBody, body);
512         session.commit();
513     }
514
515     protected List JavaDoc assertReceivedObjectMessageWithListBody(Message message) throws JMSException JavaDoc {
516         assertNotNull("Should have received a message!", message);
517         assertEquals("foo header", "abc", message.getStringProperty("foo"));
518
519         assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage JavaDoc);
520         ObjectMessage JavaDoc objectMessage = (ObjectMessage JavaDoc) message;
521         List JavaDoc body = (List JavaDoc) objectMessage.getObject();
522         log.info("Received body: " + body);
523
524         assertEquals("Size of list should be 1", 1, body.size());
525         assertEquals("element 0 of list", "First", body.get(0));
526         return body;
527     }
528
529     /**
530      * Recreates the connection.
531      *
532      * @throws JMSException
533      */

534     protected void reconnect() throws JMSException JavaDoc {
535         
536         if (connection != null) {
537             // Close the prev connection.
538
connection.close();
539         }
540         session = null;
541         connection = resourceProvider.createConnection(connectionFactory);
542         reconnectSession();
543         connection.start();
544     }
545     
546     /**
547      * Recreates the connection.
548      *
549      * @throws JMSException
550      */

551     protected void reconnectSession() throws JMSException JavaDoc {
552         if (session != null) {
553             session.close();
554         }
555         
556         session = resourceProvider.createSession(connection);
557         destination = resourceProvider.createDestination(session, getSubject());
558         producer = resourceProvider.createProducer(session, destination);
559         consumer = resourceProvider.createConsumer(session, destination);
560     }
561
562     /**
563      * Sets the prefeftch policy to one.
564      */

565     protected void setPrefetchToOne() {
566         ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection) connection).getPrefetchPolicy();
567         prefetchPolicy.setQueuePrefetch(1);
568         prefetchPolicy.setTopicPrefetch(1);
569         prefetchPolicy.setDurableTopicPrefetch(1);
570         prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
571     }
572
573     public void testMessageListener() throws Exception JavaDoc {
574         //send messages
575
for(int i = 0;i<messageCount;i++) {
576             producer.send(session.createTextMessage(messageText+i));
577         }
578         session.commit();
579         consumer.setMessageListener(this);
580         //wait receive
581
waitReceiveUnack();
582         assertEquals(unackMessages.size(),messageCount);
583         //resend phase
584
waitReceiveAck();
585         assertEquals(ackMessages.size(),messageCount);
586         //should no longer re-receive
587
consumer.setMessageListener(null);
588         assertNull(consumer.receive(500));
589         reconnect();
590     }
591
592     public void onMessage(Message message) {
593         if(!resendPhase) {
594             unackMessages.add(message);
595             if(unackMessages.size() == messageCount) {
596                 try {
597                     session.rollback();
598                     resendPhase = true;
599                 } catch (Exception JavaDoc e) {
600                     e.printStackTrace();
601                 }
602             }
603         } else {
604             ackMessages.add(message);
605             if(ackMessages.size() == messageCount) {
606                 try {
607                     session.commit();
608                 } catch (Exception JavaDoc e) {
609                     e.printStackTrace();
610                 }
611             }
612         }
613     }
614
615     private void waitReceiveUnack() throws Exception JavaDoc {
616         for(int i=0; i < 100 && !resendPhase; i++) {
617             Thread.sleep(100);
618         }
619         assertTrue(resendPhase);
620     }
621
622     private void waitReceiveAck() throws Exception JavaDoc {
623         for(int i=0; i < 100 && ackMessages.size() < messageCount; i++) {
624             Thread.sleep(100);
625         }
626         assertFalse(ackMessages.size() < messageCount);
627     }
628 }
629
Popular Tags