KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > NetworkReconnectTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network;
19
20 import java.net.URI JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.Iterator JavaDoc;
23
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.Destination JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageConsumer JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31
32 import junit.framework.TestCase;
33
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.advisory.ConsumerEvent;
36 import org.apache.activemq.advisory.ConsumerEventSource;
37 import org.apache.activemq.advisory.ConsumerListener;
38 import org.apache.activemq.broker.BrokerFactory;
39 import org.apache.activemq.broker.BrokerService;
40 import org.apache.activemq.command.ActiveMQQueue;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
45
46 /**
47  * These test cases are used to verifiy that network connections get re established in all broker
48  * restart scenarios.
49  *
50  * @author chirino
51  */

52 public class NetworkReconnectTest extends TestCase {
53
54     protected static final Log log = LogFactory.getLog(NetworkReconnectTest.class);
55     
56     private BrokerService producerBroker;
57     private BrokerService consumerBroker;
58     private ActiveMQConnectionFactory producerConnectionFactory;
59     private ActiveMQConnectionFactory consumerConnectionFactory;
60     private Destination JavaDoc destination;
61     private ArrayList JavaDoc connections = new ArrayList JavaDoc();
62     
63     public void testMultipleProducerBrokerRestarts() throws Exception JavaDoc {
64         for (int i = 0; i < 10; i++) {
65             testWithProducerBrokerRestart();
66             disposeConsumerConnections();
67         }
68     }
69     
70     public void testWithoutRestarts() throws Exception JavaDoc {
71         startProducerBroker();
72         startConsumerBroker();
73
74         MessageConsumer JavaDoc consumer = createConsumer();
75         AtomicInteger JavaDoc counter = createConsumerCounter(producerConnectionFactory);
76         waitForConsumerToArrive(counter);
77         
78         String JavaDoc messageId = sendMessage();
79         Message JavaDoc message = consumer.receive(1000);
80         
81         assertEquals(messageId, message.getJMSMessageID());
82         
83         assertNull( consumer.receiveNoWait() );
84         
85     }
86
87     public void testWithProducerBrokerRestart() throws Exception JavaDoc {
88         startProducerBroker();
89         startConsumerBroker();
90
91         MessageConsumer JavaDoc consumer = createConsumer();
92         AtomicInteger JavaDoc counter = createConsumerCounter(producerConnectionFactory);
93         waitForConsumerToArrive(counter);
94         
95         String JavaDoc messageId = sendMessage();
96         Message JavaDoc message = consumer.receive(1000);
97         
98         assertEquals(messageId, message.getJMSMessageID());
99         assertNull( consumer.receiveNoWait() );
100         
101         // Restart the first broker...
102
stopProducerBroker();
103         startProducerBroker();
104         
105         counter = createConsumerCounter(producerConnectionFactory);
106         waitForConsumerToArrive(counter);
107         
108         messageId = sendMessage();
109         message = consumer.receive(1000);
110         
111         assertEquals(messageId, message.getJMSMessageID());
112         assertNull( consumer.receiveNoWait() );
113         
114     }
115
116     public void testWithConsumerBrokerRestart() throws Exception JavaDoc {
117
118         startProducerBroker();
119         startConsumerBroker();
120
121         MessageConsumer JavaDoc consumer = createConsumer();
122         AtomicInteger JavaDoc counter = createConsumerCounter(producerConnectionFactory);
123         waitForConsumerToArrive(counter);
124         
125         String JavaDoc messageId = sendMessage();
126         Message JavaDoc message = consumer.receive(1000);
127         
128         assertEquals(messageId, message.getJMSMessageID());
129         assertNull( consumer.receiveNoWait() );
130         
131         // Restart the first broker...
132
stopConsumerBroker();
133         waitForConsumerToLeave(counter);
134         startConsumerBroker();
135         
136         consumer = createConsumer();
137         waitForConsumerToArrive(counter);
138         
139         messageId = sendMessage();
140         message = consumer.receive(1000);
141         
142         assertEquals(messageId, message.getJMSMessageID());
143         assertNull( consumer.receiveNoWait() );
144         
145     }
146     
147     public void testWithConsumerBrokerStartDelay() throws Exception JavaDoc {
148         
149         startConsumerBroker();
150         MessageConsumer JavaDoc consumer = createConsumer();
151         
152         Thread.sleep(1000*5);
153         
154         startProducerBroker();
155         AtomicInteger JavaDoc counter = createConsumerCounter(producerConnectionFactory);
156         waitForConsumerToArrive(counter);
157         
158         String JavaDoc messageId = sendMessage();
159         Message JavaDoc message = consumer.receive(1000);
160         
161         assertEquals(messageId, message.getJMSMessageID());
162         
163         assertNull( consumer.receiveNoWait() );
164
165     }
166
167     
168     public void testWithProducerBrokerStartDelay() throws Exception JavaDoc {
169         
170         startProducerBroker();
171         AtomicInteger JavaDoc counter = createConsumerCounter(producerConnectionFactory);
172
173         Thread.sleep(1000*5);
174         
175         startConsumerBroker();
176         MessageConsumer JavaDoc consumer = createConsumer();
177                 
178         waitForConsumerToArrive(counter);
179         
180         String JavaDoc messageId = sendMessage();
181         Message JavaDoc message = consumer.receive(1000);
182         
183         assertEquals(messageId, message.getJMSMessageID());
184         
185         assertNull( consumer.receiveNoWait() );
186
187     }
188
189     protected void setUp() throws Exception JavaDoc {
190         
191         log.info("===============================================================================");
192         log.info("Running Test Case: "+getName());
193         log.info("===============================================================================");
194         
195         producerConnectionFactory = createProducerConnectionFactory();
196         consumerConnectionFactory = createConsumerConnectionFactory();
197         destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
198         
199     }
200     
201     protected void tearDown() throws Exception JavaDoc {
202         disposeConsumerConnections();
203         try {
204             stopProducerBroker();
205         } catch (Throwable JavaDoc e) {
206         }
207         try {
208             stopConsumerBroker();
209         } catch (Throwable JavaDoc e) {
210         }
211     }
212     
213     protected void disposeConsumerConnections() {
214         for (Iterator JavaDoc iter = connections.iterator(); iter.hasNext();) {
215             Connection JavaDoc connection = (Connection JavaDoc) iter.next();
216             try { connection.close(); } catch (Throwable JavaDoc ignore) {}
217         }
218     }
219     
220     protected void startProducerBroker() throws Exception JavaDoc {
221         if( producerBroker==null ) {
222             producerBroker = createFirstBroker();
223             producerBroker.start();
224         }
225     }
226     
227     protected void stopProducerBroker() throws Exception JavaDoc {
228         if( producerBroker!=null ) {
229             producerBroker.stop();
230             producerBroker=null;
231         }
232     }
233     
234     protected void startConsumerBroker() throws Exception JavaDoc {
235         if( consumerBroker==null ) {
236             consumerBroker = createSecondBroker();
237             consumerBroker.start();
238         }
239     }
240     
241     protected void stopConsumerBroker() throws Exception JavaDoc {
242         if( consumerBroker!=null ) {
243             consumerBroker.stop();
244             consumerBroker=null;
245         }
246     }
247     
248     protected BrokerService createFirstBroker() throws Exception JavaDoc {
249         return BrokerFactory.createBroker(new URI JavaDoc("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
250     }
251     
252     protected BrokerService createSecondBroker() throws Exception JavaDoc {
253         return BrokerFactory.createBroker(new URI JavaDoc("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
254     }
255
256     protected ActiveMQConnectionFactory createProducerConnectionFactory() {
257         return new ActiveMQConnectionFactory("vm://broker1");
258     }
259     
260     protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
261         return new ActiveMQConnectionFactory("vm://broker2");
262     }
263     
264     protected String JavaDoc sendMessage() throws JMSException JavaDoc {
265         Connection JavaDoc connection = null;
266         try {
267             connection = producerConnectionFactory.createConnection();
268             Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
269             MessageProducer JavaDoc producer = session.createProducer(destination);
270             Message JavaDoc message = session.createMessage();
271             producer.send(message);
272             return message.getJMSMessageID();
273         } finally {
274             try { connection.close(); } catch (Throwable JavaDoc ignore) {}
275         }
276     }
277     
278     protected MessageConsumer JavaDoc createConsumer() throws JMSException JavaDoc {
279         Connection JavaDoc connection = consumerConnectionFactory.createConnection();
280         connections.add(connection);
281         connection.start();
282         
283         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
284         return session.createConsumer(destination);
285     }
286     
287     protected AtomicInteger JavaDoc createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception JavaDoc {
288         final AtomicInteger JavaDoc rc = new AtomicInteger JavaDoc(0);
289         Connection JavaDoc connection = cf.createConnection();
290         connections.add(connection);
291         connection.start();
292         
293         ConsumerEventSource source = new ConsumerEventSource(connection, destination);
294         source.setConsumerListener(new ConsumerListener(){
295             public void onConsumerEvent(ConsumerEvent event) {
296                 rc.set(event.getConsumerCount());
297             }
298         });
299         source.start();
300         
301         return rc;
302     }
303     
304     protected void waitForConsumerToArrive(AtomicInteger JavaDoc consumerCounter) throws InterruptedException JavaDoc {
305         for( int i=0; i < 100; i++ ) {
306             if( consumerCounter.get() > 0 ) {
307                 return;
308             }
309             Thread.sleep(100);
310         }
311         fail("The consumer did not arrive.");
312     }
313     
314     protected void waitForConsumerToLeave(AtomicInteger JavaDoc consumerCounter) throws InterruptedException JavaDoc {
315         for( int i=0; i < 100; i++ ) {
316             if( consumerCounter.get() == 0 ) {
317                 return;
318             }
319             Thread.sleep(100);
320         }
321         fail("The consumer did not leave.");
322     }
323
324 }
325
Popular Tags