KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > TwoBrokerQueueClientsReconnectTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import org.apache.activemq.JmsMultipleBrokersTestSupport;
21 import org.apache.activemq.ActiveMQConnectionFactory;
22 import org.apache.activemq.ActiveMQPrefetchPolicy;
23
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.Connection JavaDoc;
27 import javax.jms.Session JavaDoc;
28 import javax.jms.MessageConsumer JavaDoc;
29 import java.net.URI JavaDoc;
30
31 /**
32  * @version $Revision: 1.1.1.1 $
33  */

34 public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport {
35     protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100
36
protected static final int PREFETCH_COUNT = 1;
37
38     protected int msgsClient1, msgsClient2;
39     protected String JavaDoc broker1, broker2;
40
41     public void testClientAReceivesOnly() throws Exception JavaDoc {
42         broker1 = "BrokerA";
43         broker2 = "BrokerB";
44
45         doOneClientReceivesOnly();
46     }
47
48     public void testClientBReceivesOnly() throws Exception JavaDoc {
49         broker1 = "BrokerB";
50         broker2 = "BrokerA";
51
52         doOneClientReceivesOnly();
53     }
54
55     public void doOneClientReceivesOnly() throws Exception JavaDoc {
56         // Bridge brokers
57
bridgeBrokers(broker1, broker2);
58         bridgeBrokers(broker2, broker1);
59
60         // Run brokers
61
startAllBrokers();
62
63         // Create queue
64
Destination JavaDoc dest = createDestination("TEST.FOO", false);
65
66         // Create consumers
67
MessageConsumer JavaDoc client1 = createConsumer(broker1, dest);
68         MessageConsumer JavaDoc client2 = createConsumer(broker2, dest);
69
70         // Give clients time to register with broker
71
Thread.sleep(500);
72
73         // Always send messages to broker A
74
sendMessages("BrokerA", dest, MESSAGE_COUNT);
75
76         // Close the second client, messages should be sent to the first client
77
client2.close();
78
79         // Let the first client receive all messages
80
msgsClient1 += receiveAllMessages(client1);
81         client1.close();
82
83         // First client should have received 100 messages
84
assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT, msgsClient1);
85     }
86
87     public void testClientAReceivesOnlyAfterReconnect() throws Exception JavaDoc {
88         broker1 = "BrokerA";
89         broker2 = "BrokerB";
90
91         doOneClientReceivesOnlyAfterReconnect();
92     }
93
94     public void testClientBReceivesOnlyAfterReconnect() throws Exception JavaDoc {
95         broker1 = "BrokerB";
96         broker2 = "BrokerA";
97
98         doOneClientReceivesOnlyAfterReconnect();
99     }
100
101     public void doOneClientReceivesOnlyAfterReconnect() throws Exception JavaDoc {
102         // Bridge brokers
103
bridgeBrokers(broker1, broker2);
104         bridgeBrokers(broker2, broker1);
105
106         // Run brokers
107
startAllBrokers();
108
109         // Create queue
110
Destination JavaDoc dest = createDestination("TEST.FOO", false);
111
112         // Create first consumer
113
MessageConsumer JavaDoc client1 = createConsumer(broker1, dest);
114         MessageConsumer JavaDoc client2 = createConsumer(broker2, dest);
115
116         // Give clients time to register with broker
117
Thread.sleep(500);
118
119         // Always send message to broker A
120
sendMessages("BrokerA", dest, MESSAGE_COUNT);
121
122         // Let the first client receive the first 20% of messages
123
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
124
125         // Disconnect the first client
126
client1.close();
127
128         // Create another client for the first broker
129
client1 = createConsumer(broker1, dest);
130         Thread.sleep(500);
131
132         // Close the second client, messages should be sent to the first client
133
client2.close();
134
135         // Receive the rest of the messages
136
msgsClient1 += receiveAllMessages(client1);
137         client1.close();
138
139         // The first client should have received 100 messages
140
assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT, msgsClient1);
141     }
142
143     public void testTwoClientsReceiveClientADisconnects() throws Exception JavaDoc {
144         broker1 = "BrokerA";
145         broker2 = "BrokerB";
146
147         doTwoClientsReceiveOneClientDisconnects();
148     }
149
150     public void testTwoClientsReceiveClientBDisconnects() throws Exception JavaDoc {
151         broker1 = "BrokerB";
152         broker2 = "BrokerA";
153
154         doTwoClientsReceiveOneClientDisconnects();
155     }
156
157     public void doTwoClientsReceiveOneClientDisconnects() throws Exception JavaDoc {
158         // Bridge brokers
159
bridgeBrokers(broker1, broker2);
160         bridgeBrokers(broker2, broker1);
161
162         // Run brokers
163
startAllBrokers();
164
165         // Create queue
166
Destination JavaDoc dest = createDestination("TEST.FOO", false);
167
168         // Create first client
169
MessageConsumer JavaDoc client1 = createConsumer(broker1, dest);
170         MessageConsumer JavaDoc client2 = createConsumer(broker2, dest);
171
172         // Give clients time to register with broker
173
Thread.sleep(500);
174
175         // Always send messages to broker A
176
sendMessages("BrokerA", dest, MESSAGE_COUNT);
177
178         // Let each client receive 20% of the messages - 40% total
179
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
180         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
181
182         // Disconnect the first client
183
client1.close();
184
185         // Let the second client receive the rest of the messages
186
msgsClient2 += receiveAllMessages(client2);
187         client2.close();
188
189         // First client should have received 20% of the messages
190
assertEquals("Client for " + broker1 + " should have received 20% of the messages.", (int)(MESSAGE_COUNT * 0.20), msgsClient1);
191
192         // Second client should have received 80% of the messages
193
assertEquals("Client for " + broker2 + " should have received 80% of the messages.", (int)(MESSAGE_COUNT * 0.80), msgsClient2);
194     }
195
196     public void testTwoClientsReceiveClientAReconnects() throws Exception JavaDoc {
197         broker1 = "BrokerA";
198         broker2 = "BrokerB";
199
200         doTwoClientsReceiveOneClientReconnects();
201     }
202
203     public void testTwoClientsReceiveClientBReconnects() throws Exception JavaDoc {
204         broker1 = "BrokerB";
205         broker2 = "BrokerA";
206
207         doTwoClientsReceiveOneClientReconnects();
208     }
209
210     public void doTwoClientsReceiveOneClientReconnects() throws Exception JavaDoc {
211         // Bridge brokers
212
bridgeBrokers(broker1, broker2);
213         bridgeBrokers(broker2, broker1);
214
215         // Run brokers
216
startAllBrokers();
217
218         // Create queue
219
Destination JavaDoc dest = createDestination("TEST.FOO", false);
220
221         // Create the first client
222
MessageConsumer JavaDoc client1 = createConsumer(broker1, dest);
223         MessageConsumer JavaDoc client2 = createConsumer(broker2, dest);
224
225         // Give clients time to register with broker
226
Thread.sleep(500);
227
228         // Always send messages to broker A
229
sendMessages("BrokerA", dest, MESSAGE_COUNT);
230
231         // Let each client receive 20% of the messages - 40% total
232
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
233         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
234
235         // Disconnect the first client
236
client1.close();
237
238         // Let the second client receive 20% more of the total messages
239
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
240
241         // Create another client for broker 1
242
client1 = createConsumer(broker1, dest);
243         Thread.sleep(500);
244
245         // Let each client receive 20% of the messages - 40% total
246
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
247         client1.close();
248
249         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
250         client2.close();
251
252         // First client should have received 40 messages
253
assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1);
254
255         // Second client should have received 60 messages
256
assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2);
257     }
258
259     public void testTwoClientsReceiveTwoClientReconnects() throws Exception JavaDoc {
260         broker1 = "BrokerA";
261         broker2 = "BrokerB";
262
263         // Bridge brokers
264
bridgeBrokers(broker1, broker2);
265         bridgeBrokers(broker2, broker1);
266
267         // Run brokers
268
startAllBrokers();
269
270         // Create queue
271
Destination JavaDoc dest = createDestination("TEST.FOO", false);
272
273         // Create the first client
274
MessageConsumer JavaDoc client1 = createConsumer(broker1, dest);
275         MessageConsumer JavaDoc client2 = createConsumer(broker2, dest);
276
277         // Give clients time to register with broker
278
Thread.sleep(500);
279
280         // Always send messages to broker A
281
sendMessages("BrokerA", dest, MESSAGE_COUNT);
282
283         // Let each client receive 20% of the messages - 40% total
284
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
285         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
286
287         // Disconnect both clients
288
client1.close();
289         client2.close();
290
291         // Create another two clients for each broker
292
client1 = createConsumer(broker1, dest);
293         client2 = createConsumer(broker2, dest);
294         Thread.sleep(500);
295
296         // Let each client receive 30% more of the total messages - 60% total
297
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30));
298         client1.close();
299
300         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30));
301         client2.close();
302
303         // First client should have received 50% of the messages
304
assertEquals("Client for " + broker1 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient1);
305
306         // Second client should have received 50% of the messages
307
assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2);
308     }
309
310     protected int receiveExactMessages(MessageConsumer JavaDoc consumer, int msgCount) throws Exception JavaDoc {
311         Message JavaDoc msg;
312         int i;
313         for (i=0; i<msgCount; i++) {
314             msg = consumer.receive(1000);
315             if (msg == null) {
316                 System.err.println("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i);
317                 break;
318             }
319         }
320
321         return i;
322     }
323
324     protected int receiveAllMessages(MessageConsumer JavaDoc consumer) throws Exception JavaDoc {
325         int msgsReceived = 0;
326
327         Message JavaDoc msg;
328         do {
329             msg = consumer.receive(1000);
330             if (msg != null) {
331                 msgsReceived++;
332             }
333         } while (msg != null);
334
335         return msgsReceived;
336     }
337
338     protected MessageConsumer JavaDoc createConsumer(String JavaDoc brokerName, Destination JavaDoc dest) throws Exception JavaDoc {
339         Connection JavaDoc conn = createConnection(brokerName);
340         conn.start();
341         Session JavaDoc sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
342         return sess.createConsumer(dest);
343     }
344
345     public void setUp() throws Exception JavaDoc {
346         super.setAutoFail(true);
347         super.setUp();
348         createBroker(new URI JavaDoc("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
349         createBroker(new URI JavaDoc("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
350
351         // Configure broker connection factory
352
ActiveMQConnectionFactory factoryA, factoryB;
353         factoryA = (ActiveMQConnectionFactory)getConnectionFactory("BrokerA");
354         factoryB = (ActiveMQConnectionFactory)getConnectionFactory("BrokerB");
355
356         // Set prefetch policy
357
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
358         policy.setAll(PREFETCH_COUNT);
359
360         factoryA.setPrefetchPolicy(policy);
361         factoryB.setPrefetchPolicy(policy);
362
363         msgsClient1 = 0;
364         msgsClient2 = 0;
365     }
366 }
367
Popular Tags