KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > TransactionRollbackOrderTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.List JavaDoc;
22
23 import javax.jms.Connection JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32
33 import junit.framework.TestCase;
34
35 import org.apache.activemq.ActiveMQConnectionFactory;
36 import org.apache.activemq.command.ActiveMQQueue;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40 import java.util.concurrent.CountDownLatch JavaDoc;
41
42
43 /**
44  * Test case for AMQ-268
45  *
46  * @author Paul Smith
47  * @version $Revision: 1.1 $
48  */

49 public final class TransactionRollbackOrderTest extends TestCase {
50     private static final Log log = LogFactory.getLog(TransactionRollbackOrderTest.class);
51
52     private volatile String JavaDoc receivedText;
53
54     private Session JavaDoc producerSession;
55     private Session JavaDoc consumerSession;
56     private Destination JavaDoc queue;
57
58     private MessageProducer JavaDoc producer;
59     private MessageConsumer JavaDoc consumer;
60     private Connection JavaDoc connection;
61     private CountDownLatch JavaDoc latch = new CountDownLatch JavaDoc(1);
62     private int NUM_MESSAGES = 5;
63     private List JavaDoc msgSent = new ArrayList JavaDoc();
64     private List JavaDoc msgCommitted = new ArrayList JavaDoc();
65     private List JavaDoc msgRolledBack = new ArrayList JavaDoc();
66     private List JavaDoc msgRedelivered = new ArrayList JavaDoc();
67
68     public void testTransaction() throws Exception JavaDoc {
69
70         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
71
72         connection = factory.createConnection();
73         queue = new ActiveMQQueue(getClass().getName() + "." + getName());
74
75         producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
76         consumerSession = connection.createSession(true, 0);
77
78         producer = producerSession.createProducer(queue);
79
80         consumer = consumerSession.createConsumer(queue);
81         consumer.setMessageListener(new MessageListener JavaDoc() {
82
83             int msgCount = 0;
84             int msgCommittedCount = 0;
85
86             public void onMessage(Message JavaDoc m) {
87                 try {
88                     msgCount++;
89                     TextMessage JavaDoc tm = (TextMessage JavaDoc) m;
90                     receivedText = tm.getText();
91
92                     if (tm.getJMSRedelivered()) {
93                         msgRedelivered.add(receivedText);
94                     }
95
96                     log.info("consumer received message: " + receivedText + (tm.getJMSRedelivered() ? " ** Redelivered **" : ""));
97                     if (msgCount == 3) {
98                         msgRolledBack.add(receivedText);
99                         consumerSession.rollback();
100                         log.info("[msg: " + receivedText + "] ** rolled back **");
101                     }
102                     else {
103                         msgCommittedCount++;
104                         msgCommitted.add(receivedText);
105                         consumerSession.commit();
106                         log.info("[msg: " + receivedText + "] committed transaction ");
107                     }
108                     if (msgCommittedCount == NUM_MESSAGES) {
109                         latch.countDown();
110                     }
111                 }
112                 catch (JMSException JavaDoc e) {
113                     try {
114                         consumerSession.rollback();
115                         log.info("rolled back transaction");
116                     }
117                     catch (JMSException JavaDoc e1) {
118                         log.info(e1);
119                         e1.printStackTrace();
120                     }
121                     log.info(e);
122                     e.printStackTrace();
123                 }
124             }
125         });
126         connection.start();
127
128         TextMessage JavaDoc tm = null;
129         try {
130             for (int i = 1; i <= NUM_MESSAGES; i++) {
131                 tm = producerSession.createTextMessage();
132                 tm.setText("Hello " + i);
133                 msgSent.add(tm.getText());
134                 producer.send(tm);
135                 log.info("producer sent message: " + tm.getText());
136             }
137         }
138         catch (JMSException JavaDoc e) {
139             e.printStackTrace();
140         }
141
142         log.info("Waiting for latch");
143         latch.await();
144
145         assertEquals(1, msgRolledBack.size());
146         assertEquals(1, msgRedelivered.size());
147
148         log.info("msg RolledBack = " + msgRolledBack.get(0));
149         log.info("msg Redelivered = " + msgRedelivered.get(0));
150
151         assertEquals(msgRolledBack.get(0), msgRedelivered.get(0));
152
153         assertEquals(NUM_MESSAGES, msgSent.size());
154         assertEquals(NUM_MESSAGES, msgCommitted.size());
155
156         assertEquals(msgSent, msgCommitted);
157
158     }
159
160     protected void tearDown() throws Exception JavaDoc {
161         if (connection != null) {
162             log.info("Closing the connection");
163             connection.close();
164         }
165         super.tearDown();
166     }
167 }
168
Popular Tags