KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > RecoveryBrokerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import javax.jms.DeliveryMode JavaDoc;
21
22 import junit.framework.Test;
23
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.ActiveMQQueue;
26 import org.apache.activemq.command.ActiveMQTopic;
27 import org.apache.activemq.command.ConnectionInfo;
28 import org.apache.activemq.command.ConsumerInfo;
29 import org.apache.activemq.command.LocalTransactionId;
30 import org.apache.activemq.command.Message;
31 import org.apache.activemq.command.MessageAck;
32 import org.apache.activemq.command.ProducerInfo;
33 import org.apache.activemq.command.SessionInfo;
34
35 /**
36  * Used to simulate the recovery that occurs when a broker shuts down.
37  *
38  * @version $Revision$
39  */

40 public class RecoveryBrokerTest extends BrokerRestartTestSupport {
41         
42     public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception JavaDoc {
43         
44         ActiveMQDestination destination = new ActiveMQQueue("TEST");
45         
46         // Setup the producer and send the message.
47
StubConnection connection = createConnection();
48         ConnectionInfo connectionInfo = createConnectionInfo();
49         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
50         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
51         connection.send(connectionInfo);
52         connection.send(sessionInfo);
53         connection.send(producerInfo);
54         
55         for( int i=0; i < 4; i++) {
56             Message message = createMessage(producerInfo, destination);
57             message.setPersistent(true);
58             connection.send(message);
59         }
60         
61         // Setup the consumer and receive the message.
62
connection = createConnection();
63         connectionInfo = createConnectionInfo();
64         sessionInfo = createSessionInfo(connectionInfo);
65         connection.send(connectionInfo);
66         connection.send(sessionInfo);
67         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
68         connection.send(consumerInfo);
69
70         // The we should get the messages.
71
for( int i=0; i < 4 ; i++ ) {
72             Message m2 = receiveMessage(connection);
73             assertNotNull(m2);
74         }
75
76         // restart the broker.
77
restartBroker();
78         
79         // No messages should be delivered.
80
Message m = receiveMessage(connection);
81         assertNull(m);
82     }
83
84     public void testQueuePersistentUncommitedMessagesLostOnRestart() throws Exception JavaDoc {
85         
86         ActiveMQDestination destination = new ActiveMQQueue("TEST");
87         
88         // Setup the producer and send the message.
89
StubConnection connection = createConnection();
90         ConnectionInfo connectionInfo = createConnectionInfo();
91         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
92         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
93         connection.send(connectionInfo);
94         connection.send(sessionInfo);
95         connection.send(producerInfo);
96         
97         // Begin the transaction.
98
LocalTransactionId txid = createLocalTransaction(sessionInfo);
99         connection.send(createBeginTransaction(connectionInfo, txid));
100
101         for( int i=0; i < 4; i++) {
102             Message message = createMessage(producerInfo, destination);
103             message.setPersistent(true);
104             message.setTransactionId(txid);
105             connection.send(message);
106         }
107         
108         // Don't commit
109

110         // restart the broker.
111
restartBroker();
112         
113         // Setup the consumer and receive the message.
114
connection = createConnection();
115         connectionInfo = createConnectionInfo();
116         sessionInfo = createSessionInfo(connectionInfo);
117         connection.send(connectionInfo);
118         connection.send(sessionInfo);
119         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
120         connection.send(consumerInfo);
121
122         // No messages should be delivered.
123
Message m = receiveMessage(connection);
124         assertNull(m);
125     }
126
127     public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception JavaDoc {
128         
129         ActiveMQDestination destination = new ActiveMQTopic("TEST");
130         
131         // Setup a first connection
132
StubConnection connection1 = createConnection();
133         ConnectionInfo connectionInfo1 = createConnectionInfo();
134         connectionInfo1.setClientId("A");
135         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
136         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
137         connection1.send(connectionInfo1);
138         connection1.send(sessionInfo1);
139         connection1.send(producerInfo1);
140
141         // Create the durable subscription.
142
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
143         consumerInfo1.setSubscriptionName("test");
144         consumerInfo1.setPrefetchSize(100);
145         connection1.send(consumerInfo1);
146         
147         // Close the subscription.
148
connection1.send(closeConsumerInfo(consumerInfo1));
149
150         // Send the messages
151
connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
152         connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
153         connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
154         connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
155         connection1.request(closeConnectionInfo(connectionInfo1));
156         // Restart the broker.
157
restartBroker();
158         
159         // Get a connection to the new broker.
160
StubConnection connection2 = createConnection();
161         ConnectionInfo connectionInfo2 = createConnectionInfo();
162         connectionInfo2.setClientId("A");
163         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
164         connection2.send(connectionInfo2);
165         connection2.send(sessionInfo2);
166
167         // Re-open the subscription.
168
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
169         consumerInfo2.setSubscriptionName("test");
170         consumerInfo2.setPrefetchSize(100);
171         connection2.send(consumerInfo2);
172         
173         // The we should get the messages.
174
for( int i=0; i < 4 ; i++ ) {
175             Message m2 = receiveMessage(connection2);
176             assertNotNull(m2);
177         }
178         assertNoMessagesLeft(connection2);
179     }
180
181     public void testQueuePersistentMessagesNotLostOnRestart() throws Exception JavaDoc {
182         
183         ActiveMQDestination destination = new ActiveMQQueue("TEST");
184         
185         // Setup the producer and send the message.
186
StubConnection connection = createConnection();
187         ConnectionInfo connectionInfo = createConnectionInfo();
188         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
189         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
190         connection.send(connectionInfo);
191         connection.send(sessionInfo);
192         connection.send(producerInfo);
193         Message message = createMessage(producerInfo, destination);
194         message.setPersistent(true);
195         connection.send(message);
196         connection.request(closeConnectionInfo(connectionInfo));
197         
198         // restart the broker.
199
restartBroker();
200         
201         // Setup the consumer and receive the message.
202
connection = createConnection();
203         connectionInfo = createConnectionInfo();
204         sessionInfo = createSessionInfo(connectionInfo);
205         connection.send(connectionInfo);
206         connection.send(sessionInfo);
207         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
208         connection.send(consumerInfo);
209         
210         // Message should have been dropped due to broker restart.
211
Message m = receiveMessage(connection);
212         assertNotNull("Should have received a message by now!", m);
213         assertEquals( m.getMessageId(), message.getMessageId() );
214     }
215     
216     public void testQueueNonPersistentMessagesLostOnRestart() throws Exception JavaDoc {
217         
218         ActiveMQDestination destination = new ActiveMQQueue("TEST");
219         
220         // Setup the producer and send the message.
221
StubConnection connection = createConnection();
222         ConnectionInfo connectionInfo = createConnectionInfo();
223         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
224         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
225         connection.send(connectionInfo);
226         connection.send(sessionInfo);
227         connection.send(producerInfo);
228         Message message = createMessage(producerInfo, destination);
229         message.setPersistent(false);
230         connection.send(message);
231         
232         // restart the broker.
233
restartBroker();
234         
235         // Setup the consumer and receive the message.
236
connection = createConnection();
237         connectionInfo = createConnectionInfo();
238         sessionInfo = createSessionInfo(connectionInfo);
239         connection.send(connectionInfo);
240         connection.send(sessionInfo);
241         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
242         connection.send(consumerInfo);
243         
244         // Message should have been dropped due to broker restart.
245
assertNoMessagesLeft(connection);
246     }
247     
248     public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception JavaDoc {
249         
250         ActiveMQDestination destination = new ActiveMQQueue("TEST");
251         
252         // Setup the producer and send the message.
253
StubConnection connection = createConnection();
254         ConnectionInfo connectionInfo = createConnectionInfo();
255         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
256         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
257         connection.send(connectionInfo);
258         connection.send(sessionInfo);
259         connection.send(producerInfo);
260         
261         // Begin the transaction.
262
LocalTransactionId txid = createLocalTransaction(sessionInfo);
263         connection.send(createBeginTransaction(connectionInfo, txid));
264
265         for( int i=0; i < 4; i++) {
266             Message message = createMessage(producerInfo, destination);
267             message.setPersistent(true);
268             message.setTransactionId(txid);
269             connection.send(message);
270         }
271         
272         // Commit
273
connection.send(createCommitTransaction1Phase(connectionInfo, txid));
274         connection.request(closeConnectionInfo(connectionInfo));
275         // restart the broker.
276
restartBroker();
277         
278         // Setup the consumer and receive the message.
279
connection = createConnection();
280         connectionInfo = createConnectionInfo();
281         sessionInfo = createSessionInfo(connectionInfo);
282         connection.send(connectionInfo);
283         connection.send(sessionInfo);
284         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
285         connection.send(consumerInfo);
286
287         for( int i=0; i < 4 ;i ++ ) {
288             Message m = receiveMessage(connection);
289             assertNotNull(m);
290         }
291         
292         assertNoMessagesLeft(connection);
293     }
294     
295     public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception JavaDoc {
296         
297         ActiveMQDestination destination = new ActiveMQQueue("TEST");
298         
299         // Setup the producer and send the message.
300
StubConnection connection = createConnection();
301         ConnectionInfo connectionInfo = createConnectionInfo();
302         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
303         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
304         connection.send(connectionInfo);
305         connection.send(sessionInfo);
306         connection.send(producerInfo);
307         
308         for( int i=0; i < 4; i++) {
309             Message message = createMessage(producerInfo, destination);
310             message.setPersistent(true);
311             connection.send(message);
312         }
313         
314         // Setup the consumer and receive the message.
315
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
316         connection.send(consumerInfo);
317
318         // Begin the transaction.
319
LocalTransactionId txid = createLocalTransaction(sessionInfo);
320         connection.send(createBeginTransaction(connectionInfo, txid));
321         for( int i=0; i < 4 ;i ++ ) {
322             Message m = receiveMessage(connection);
323             assertNotNull(m);
324             MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
325             ack.setTransactionId(txid);
326             connection.send(ack);
327         }
328         // Commit
329
connection.send(createCommitTransaction1Phase(connectionInfo, txid));
330         connection.request(closeConnectionInfo(connectionInfo));
331         // restart the broker.
332
restartBroker();
333         
334         // Setup the consumer and receive the message.
335
connection = createConnection();
336         connectionInfo = createConnectionInfo();
337         sessionInfo = createSessionInfo(connectionInfo);
338         connection.send(connectionInfo);
339         connection.send(sessionInfo);
340         consumerInfo = createConsumerInfo(sessionInfo, destination);
341         connection.send(consumerInfo);
342
343         // No messages should be delivered.
344
Message m = receiveMessage(connection);
345         assertNull(m);
346     }
347     
348     public void testQueuePersistentUncommitedAcksLostOnRestart() throws Exception JavaDoc {
349         
350         ActiveMQDestination destination = new ActiveMQQueue("TEST");
351         
352         // Setup the producer and send the message.
353
StubConnection connection = createConnection();
354         ConnectionInfo connectionInfo = createConnectionInfo();
355         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
356         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
357         connection.send(connectionInfo);
358         connection.send(sessionInfo);
359         connection.send(producerInfo);
360         
361         for( int i=0; i < 4; i++) {
362             Message message = createMessage(producerInfo, destination);
363             message.setPersistent(true);
364             connection.send(message);
365         }
366         
367         // Setup the consumer and receive the message.
368
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
369         connection.send(consumerInfo);
370
371         // Begin the transaction.
372
LocalTransactionId txid = createLocalTransaction(sessionInfo);
373         connection.send(createBeginTransaction(connectionInfo, txid));
374         for( int i=0; i < 4 ;i ++ ) {
375             Message m = receiveMessage(connection);
376             assertNotNull(m);
377             MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
378             ack.setTransactionId(txid);
379             connection.send(ack);
380         }
381         // Don't commit
382

383         // restart the broker.
384
restartBroker();
385         
386         // Setup the consumer and receive the message.
387
connection = createConnection();
388         connectionInfo = createConnectionInfo();
389         sessionInfo = createSessionInfo(connectionInfo);
390         connection.send(connectionInfo);
391         connection.send(sessionInfo);
392         consumerInfo = createConsumerInfo(sessionInfo, destination);
393         connection.send(consumerInfo);
394
395         // All messages should be re-delivered.
396
for( int i=0; i < 4 ;i ++ ) {
397             Message m = receiveMessage(connection);
398             assertNotNull(m);
399         }
400         
401         assertNoMessagesLeft(connection);
402     }
403
404     public static Test suite() {
405         return suite(RecoveryBrokerTest.class);
406     }
407     
408     public static void main(String JavaDoc[] args) {
409         junit.textui.TestRunner.run(suite());
410     }
411
412 }
413
Popular Tags