KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > XARecoveryBrokerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import junit.framework.Test;
21
22 import org.apache.activemq.command.ActiveMQDestination;
23 import org.apache.activemq.command.ActiveMQQueue;
24 import org.apache.activemq.command.ConnectionInfo;
25 import org.apache.activemq.command.ConsumerInfo;
26 import org.apache.activemq.command.DataArrayResponse;
27 import org.apache.activemq.command.Message;
28 import org.apache.activemq.command.MessageAck;
29 import org.apache.activemq.command.ProducerInfo;
30 import org.apache.activemq.command.Response;
31 import org.apache.activemq.command.SessionInfo;
32 import org.apache.activemq.command.TransactionId;
33 import org.apache.activemq.command.TransactionInfo;
34 import org.apache.activemq.command.XATransactionId;
35
36 /**
37  * Used to simulate the recovery that occurs when a broker shuts down.
38  *
39  * @version $Revision$
40  */

41 public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
42         
43     
44     public void testPreparedTransactionRecoveredOnRestart() throws Exception JavaDoc {
45         
46         ActiveMQDestination destination = createDestination();
47         
48         // Setup the producer and send the message.
49
StubConnection connection = createConnection();
50         ConnectionInfo connectionInfo = createConnectionInfo();
51         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
52         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
53         connection.send(connectionInfo);
54         connection.send(sessionInfo);
55         connection.send(producerInfo);
56         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
57         connection.send(consumerInfo);
58         
59         // Prepare 4 message sends.
60
for( int i=0; i < 4; i++) {
61             // Begin the transaction.
62
XATransactionId txid = createXATransaction(sessionInfo);
63             connection.send(createBeginTransaction(connectionInfo, txid));
64
65             Message message = createMessage(producerInfo, destination);
66             message.setPersistent(true);
67             message.setTransactionId(txid);
68             connection.send(message);
69             
70             // Prepare
71
connection.send(createPrepareTransaction(connectionInfo, txid));
72         }
73
74         // Since prepared but not committed.. they should not get delivered.
75
assertNoMessagesLeft(connection);
76         connection.request(closeConnectionInfo(connectionInfo));
77
78         // restart the broker.
79
restartBroker();
80         
81         // Setup the consumer and receive the message.
82
connection = createConnection();
83         connectionInfo = createConnectionInfo();
84         sessionInfo = createSessionInfo(connectionInfo);
85         connection.send(connectionInfo);
86         connection.send(sessionInfo);
87         consumerInfo = createConsumerInfo(sessionInfo, destination);
88         connection.send(consumerInfo);
89
90         // Since prepared but not committed.. they should not get delivered.
91
assertNoMessagesLeft(connection);
92         
93         Response response = connection.request( new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER) );
94         assertNotNull(response);
95         DataArrayResponse dar = (DataArrayResponse) response;
96         assertEquals(4, dar.getData().length);
97         
98         // Commit the prepared transactions.
99
for( int i=0; i < dar.getData().length ;i ++ ) {
100             connection.send( createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]) );
101         }
102
103         // We should not get the committed transactions.
104
for( int i=0; i < 4 ;i ++ ) {
105             Message m = receiveMessage(connection);
106             assertNotNull(m);
107         }
108         
109         assertNoMessagesLeft(connection);
110     }
111
112     public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception JavaDoc {
113         
114         ActiveMQDestination destination = createDestination();
115         
116         // Setup the producer and send the message.
117
StubConnection connection = createConnection();
118         ConnectionInfo connectionInfo = createConnectionInfo();
119         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
120         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
121         connection.send(connectionInfo);
122         connection.send(sessionInfo);
123         connection.send(producerInfo);
124         
125         // Begin the transaction.
126
XATransactionId txid = createXATransaction(sessionInfo);
127         connection.send(createBeginTransaction(connectionInfo, txid));
128
129         for( int i=0; i < 4; i++) {
130             Message message = createMessage(producerInfo, destination);
131             message.setPersistent(true);
132             message.setTransactionId(txid);
133             connection.send(message);
134         }
135         
136         // Commit
137
connection.send(createCommitTransaction1Phase(connectionInfo, txid));
138         connection.request(closeConnectionInfo(connectionInfo));
139         // restart the broker.
140
restartBroker();
141         
142         // Setup the consumer and receive the message.
143
connection = createConnection();
144         connectionInfo = createConnectionInfo();
145         sessionInfo = createSessionInfo(connectionInfo);
146         connection.send(connectionInfo);
147         connection.send(sessionInfo);
148         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
149         connection.send(consumerInfo);
150
151         for( int i=0; i < 4 ;i ++ ) {
152             Message m = receiveMessage(connection);
153             assertNotNull(m);
154         }
155         
156         assertNoMessagesLeft(connection);
157     }
158     
159     public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception JavaDoc {
160         
161         ActiveMQDestination destination = createDestination();
162         
163         // Setup the producer and send the message.
164
StubConnection connection = createConnection();
165         ConnectionInfo connectionInfo = createConnectionInfo();
166         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
167         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
168         connection.send(connectionInfo);
169         connection.send(sessionInfo);
170         connection.send(producerInfo);
171         
172         for( int i=0; i < 4; i++) {
173             Message message = createMessage(producerInfo, destination);
174             message.setPersistent(true);
175             connection.send(message);
176         }
177         
178         // Setup the consumer and receive the message.
179
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
180         connection.send(consumerInfo);
181
182         // Begin the transaction.
183
XATransactionId txid = createXATransaction(sessionInfo);
184         connection.send(createBeginTransaction(connectionInfo, txid));
185         for( int i=0; i < 4 ;i ++ ) {
186             Message m = receiveMessage(connection);
187             assertNotNull(m);
188             MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
189             ack.setTransactionId(txid);
190             connection.send(ack);
191         }
192         // Commit
193
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
194         
195         // restart the broker.
196
restartBroker();
197         
198         // Setup the consumer and receive the message.
199
connection = createConnection();
200         connectionInfo = createConnectionInfo();
201         sessionInfo = createSessionInfo(connectionInfo);
202         connection.send(connectionInfo);
203         connection.send(sessionInfo);
204         consumerInfo = createConsumerInfo(sessionInfo, destination);
205         connection.send(consumerInfo);
206
207         // No messages should be delivered.
208
assertNoMessagesLeft(connection);
209
210         Message m = receiveMessage(connection);
211         assertNull(m);
212     }
213
214     public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception JavaDoc {
215         
216         ActiveMQDestination destination = createDestination();
217         
218         // Setup the producer and send the message.
219
StubConnection connection = createConnection();
220         ConnectionInfo connectionInfo = createConnectionInfo();
221         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
222         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
223         connection.send(connectionInfo);
224         connection.send(sessionInfo);
225         connection.send(producerInfo);
226         
227         for( int i=0; i < 4; i++) {
228             Message message = createMessage(producerInfo, destination);
229             message.setPersistent(true);
230             connection.send(message);
231         }
232         
233         // Setup the consumer and receive the message.
234
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
235         connection.send(consumerInfo);
236
237         // Begin the transaction.
238
XATransactionId txid = createXATransaction(sessionInfo);
239         connection.send(createBeginTransaction(connectionInfo, txid));
240         for( int i=0; i < 4 ;i ++ ) {
241             Message m = receiveMessage(connection);
242             assertNotNull(m);
243             MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
244             ack.setTransactionId(txid);
245             connection.send(ack);
246         }
247         // Don't commit
248

249         // restart the broker.
250
restartBroker();
251         
252         // Setup the consumer and receive the message.
253
connection = createConnection();
254         connectionInfo = createConnectionInfo();
255         sessionInfo = createSessionInfo(connectionInfo);
256         connection.send(connectionInfo);
257         connection.send(sessionInfo);
258         consumerInfo = createConsumerInfo(sessionInfo, destination);
259         connection.send(consumerInfo);
260
261         // All messages should be re-delivered.
262
for( int i=0; i < 4 ;i ++ ) {
263             Message m = receiveMessage(connection);
264             assertNotNull(m);
265         }
266         
267         assertNoMessagesLeft(connection);
268     }
269
270     public static Test suite() {
271         return suite(XARecoveryBrokerTest.class);
272     }
273     
274     public static void main(String JavaDoc[] args) {
275         junit.textui.TestRunner.run(suite());
276     }
277
278     protected ActiveMQDestination createDestination() {
279         return new ActiveMQQueue(getClass().getName() + "." + getName());
280     }
281     
282 }
283
Popular Tags