KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > failover > ReconnectTest


1 /**
2  *
3  * Copyright 2005-2006 The Apache Software Foundation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.activemq.transport.failover;
18
19 import java.io.IOException JavaDoc;
20 import java.net.URI JavaDoc;
21 import java.net.URISyntaxException JavaDoc;
22 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
23
24 import javax.jms.DeliveryMode JavaDoc;
25 import javax.jms.ExceptionListener JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageProducer JavaDoc;
29 import javax.jms.Session JavaDoc;
30
31 import junit.framework.TestCase;
32
33 import org.apache.activemq.ActiveMQConnection;
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.broker.BrokerService;
36 import org.apache.activemq.broker.TransportConnector;
37 import org.apache.activemq.command.ActiveMQQueue;
38 import org.apache.activemq.transport.TransportListener;
39 import org.apache.activemq.transport.mock.MockTransport;
40 import org.apache.activemq.util.ServiceStopper;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 import java.util.concurrent.CountDownLatch JavaDoc;
45 import java.util.concurrent.TimeUnit JavaDoc;
46 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
47
48 /**
49  *
50  * @version $Revision: 1.1 $
51  */

52 public class ReconnectTest extends TestCase {
53     
54     protected static final Log log = LogFactory.getLog(ReconnectTest.class);
55     public static final int MESSAGES_PER_ITTERATION = 10;
56     public static final int WORKER_COUNT = 10;
57     private BrokerService bs;
58     private URI JavaDoc tcpUri;
59     private AtomicInteger JavaDoc interruptedCount = new AtomicInteger JavaDoc();
60     private Worker[] workers;
61
62     class Worker implements Runnable JavaDoc, ExceptionListener JavaDoc {
63         
64         private ActiveMQConnection connection;
65         private AtomicBoolean JavaDoc stop=new AtomicBoolean JavaDoc(false);
66         public AtomicInteger JavaDoc iterations = new AtomicInteger JavaDoc();
67         public CountDownLatch JavaDoc stopped = new CountDownLatch JavaDoc(1);
68         private Throwable JavaDoc error;
69         
70         public Worker() throws URISyntaxException JavaDoc, JMSException JavaDoc {
71             URI JavaDoc uri = new URI JavaDoc("failover://(mock://("+tcpUri+"))");
72             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
73             connection = (ActiveMQConnection)factory.createConnection();
74             connection.setExceptionListener(this);
75             connection.addTransportListener(new TransportListener() {
76                 public void onCommand(Object JavaDoc command) {
77                 }
78                 public void onException(IOException JavaDoc error) {
79                     setError(error);
80                 }
81                 public void transportInterupted() {
82                     interruptedCount.incrementAndGet();
83                 }
84                 public void transportResumed() {
85                 }});
86             connection.start();
87         }
88         
89         public void failConnection() {
90             MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class);
91             mockTransport.onException(new IOException JavaDoc("Simulated error"));
92         }
93         
94         public void start() {
95             new Thread JavaDoc(this).start();
96         }
97         public void stop() {
98             stop.set(true);
99             try {
100                 if( !stopped.await(5, TimeUnit.SECONDS) ) {
101                     connection.close();
102                     stopped.await();
103                 } else {
104                     connection.close();
105                 }
106             } catch (Exception JavaDoc e) {
107                 e.printStackTrace();
108             }
109         }
110         
111         public void run() {
112             try {
113                 ActiveMQQueue queue = new ActiveMQQueue("FOO");
114                 Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
115                 MessageConsumer JavaDoc consumer = session.createConsumer(queue);
116                 MessageProducer JavaDoc producer = session.createProducer(queue);
117                 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
118                 while( !stop.get() ) {
119                     for( int i=0; i < MESSAGES_PER_ITTERATION; i++) {
120                         producer.send(session.createTextMessage("TEST:"+i));
121                     }
122                     for( int i=0; i < MESSAGES_PER_ITTERATION; i++) {
123                         consumer.receive();
124                     }
125                     iterations.incrementAndGet();
126                 }
127                 session.close();
128             } catch (JMSException JavaDoc e) {
129                 setError(e);
130             } finally {
131                 stopped.countDown();
132             }
133         }
134
135         public void onException(JMSException JavaDoc error) {
136             setError(error);
137             stop();
138         }
139
140
141         public synchronized Throwable JavaDoc getError() {
142             return error;
143         }
144         public synchronized void setError(Throwable JavaDoc error) {
145             this.error = error;
146         }
147
148         public synchronized void assertNoErrors() {
149             if( error !=null ) {
150                 error.printStackTrace();
151                 fail("Got Exception: "+error);
152             }
153         }
154         
155     }
156     
157     public void testReconnects() throws Exception JavaDoc {
158         
159         for( int k=1; k < 5; k++ ) {
160             
161             System.out.println("Test run: "+k);
162             
163             // Wait for at least one iteration to occur...
164
for (int i=0; i < WORKER_COUNT; i++) {
165                 for( int j=0; workers[i].iterations.get() == 0 && j < 5; j++ ) {
166                     workers[i].assertNoErrors();
167                     System.out.println("Waiting for worker "+i+" to finish an iteration.");
168                     Thread.sleep(1000);
169                 }
170                 assertTrue("Worker "+i+" never completed an interation.", workers[i].iterations.get()!=0);
171                 workers[i].assertNoErrors();
172             }
173             
174             System.out.println("Simulating transport error to cause reconnect.");
175             
176             // Simulate a transport failure.
177
for (int i=0; i < WORKER_COUNT; i++) {
178                 workers[i].failConnection();
179             }
180             
181             // Wait for the connections to get interrupted...
182
while ( interruptedCount.get() < WORKER_COUNT ) {
183                 System.out.println("Waiting for connections to get interrupted.. at: "+interruptedCount.get());
184                 Thread.sleep(1000);
185             }
186
187             // let things stablize..
188
System.out.println("Pausing before starting next iterations...");
189             Thread.sleep(1000);
190
191             // Reset the counters..
192
interruptedCount.set(0);
193             for (int i=0; i < WORKER_COUNT; i++) {
194                 workers[i].iterations.set(0);
195             }
196
197         }
198         
199     }
200
201     protected void setUp() throws Exception JavaDoc {
202         bs = new BrokerService();
203         bs.setPersistent(false);
204         bs.setUseJmx(true);
205         TransportConnector connector = bs.addConnector("tcp://localhost:0");
206         bs.start();
207         tcpUri = connector.getConnectUri();
208         
209         workers = new Worker[WORKER_COUNT];
210         for (int i=0; i < WORKER_COUNT; i++) {
211             workers[i] = new Worker();
212             workers[i].start();
213         }
214                 
215     }
216
217     protected void tearDown() throws Exception JavaDoc {
218         for (int i=0; i < WORKER_COUNT; i++) {
219             workers[i].stop();
220         }
221         new ServiceStopper().stop(bs);
222     }
223
224 }
225
Popular Tags