1 17 package org.apache.activemq.transport.failover; 18 19 import java.io.IOException ; 20 import java.net.URI ; 21 import java.net.URISyntaxException ; 22 import java.util.concurrent.atomic.AtomicInteger ; 23 24 import javax.jms.DeliveryMode ; 25 import javax.jms.ExceptionListener ; 26 import javax.jms.JMSException ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 31 import junit.framework.TestCase; 32 33 import org.apache.activemq.ActiveMQConnection; 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.activemq.broker.BrokerService; 36 import org.apache.activemq.broker.TransportConnector; 37 import org.apache.activemq.command.ActiveMQQueue; 38 import org.apache.activemq.transport.TransportListener; 39 import org.apache.activemq.transport.mock.MockTransport; 40 import org.apache.activemq.util.ServiceStopper; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 import java.util.concurrent.CountDownLatch ; 45 import java.util.concurrent.TimeUnit ; 46 import java.util.concurrent.atomic.AtomicBoolean ; 47 48 52 public class ReconnectTest extends TestCase { 53 54 protected static final Log log = LogFactory.getLog(ReconnectTest.class); 55 public static final int MESSAGES_PER_ITTERATION = 10; 56 public static final int WORKER_COUNT = 10; 57 private BrokerService bs; 58 private URI tcpUri; 59 private AtomicInteger interruptedCount = new AtomicInteger (); 60 private Worker[] workers; 61 62 class Worker implements Runnable , ExceptionListener { 63 64 private ActiveMQConnection connection; 65 private AtomicBoolean stop=new AtomicBoolean (false); 66 public AtomicInteger iterations = new AtomicInteger (); 67 public CountDownLatch stopped = new CountDownLatch (1); 68 private Throwable error; 69 70 public Worker() throws URISyntaxException , JMSException { 71 URI uri = new URI ("failover://(mock://("+tcpUri+"))"); 72 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 73 connection = (ActiveMQConnection)factory.createConnection(); 74 connection.setExceptionListener(this); 75 connection.addTransportListener(new TransportListener() { 76 public void onCommand(Object command) { 77 } 78 public void onException(IOException error) { 79 setError(error); 80 } 81 public void transportInterupted() { 82 interruptedCount.incrementAndGet(); 83 } 84 public void transportResumed() { 85 }}); 86 connection.start(); 87 } 88 89 public void failConnection() { 90 MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class); 91 mockTransport.onException(new IOException ("Simulated error")); 92 } 93 94 public void start() { 95 new Thread (this).start(); 96 } 97 public void stop() { 98 stop.set(true); 99 try { 100 if( !stopped.await(5, TimeUnit.SECONDS) ) { 101 connection.close(); 102 stopped.await(); 103 } else { 104 connection.close(); 105 } 106 } catch (Exception e) { 107 e.printStackTrace(); 108 } 109 } 110 111 public void run() { 112 try { 113 ActiveMQQueue queue = new ActiveMQQueue("FOO"); 114 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 115 MessageConsumer consumer = session.createConsumer(queue); 116 MessageProducer producer = session.createProducer(queue); 117 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 118 while( !stop.get() ) { 119 for( int i=0; i < MESSAGES_PER_ITTERATION; i++) { 120 producer.send(session.createTextMessage("TEST:"+i)); 121 } 122 for( int i=0; i < MESSAGES_PER_ITTERATION; i++) { 123 consumer.receive(); 124 } 125 iterations.incrementAndGet(); 126 } 127 session.close(); 128 } catch (JMSException e) { 129 setError(e); 130 } finally { 131 stopped.countDown(); 132 } 133 } 134 135 public void onException(JMSException error) { 136 setError(error); 137 stop(); 138 } 139 140 141 public synchronized Throwable getError() { 142 return error; 143 } 144 public synchronized void setError(Throwable error) { 145 this.error = error; 146 } 147 148 public synchronized void assertNoErrors() { 149 if( error !=null ) { 150 error.printStackTrace(); 151 fail("Got Exception: "+error); 152 } 153 } 154 155 } 156 157 public void testReconnects() throws Exception { 158 159 for( int k=1; k < 5; k++ ) { 160 161 System.out.println("Test run: "+k); 162 163 for (int i=0; i < WORKER_COUNT; i++) { 165 for( int j=0; workers[i].iterations.get() == 0 && j < 5; j++ ) { 166 workers[i].assertNoErrors(); 167 System.out.println("Waiting for worker "+i+" to finish an iteration."); 168 Thread.sleep(1000); 169 } 170 assertTrue("Worker "+i+" never completed an interation.", workers[i].iterations.get()!=0); 171 workers[i].assertNoErrors(); 172 } 173 174 System.out.println("Simulating transport error to cause reconnect."); 175 176 for (int i=0; i < WORKER_COUNT; i++) { 178 workers[i].failConnection(); 179 } 180 181 while ( interruptedCount.get() < WORKER_COUNT ) { 183 System.out.println("Waiting for connections to get interrupted.. at: "+interruptedCount.get()); 184 Thread.sleep(1000); 185 } 186 187 System.out.println("Pausing before starting next iterations..."); 189 Thread.sleep(1000); 190 191 interruptedCount.set(0); 193 for (int i=0; i < WORKER_COUNT; i++) { 194 workers[i].iterations.set(0); 195 } 196 197 } 198 199 } 200 201 protected void setUp() throws Exception { 202 bs = new BrokerService(); 203 bs.setPersistent(false); 204 bs.setUseJmx(true); 205 TransportConnector connector = bs.addConnector("tcp://localhost:0"); 206 bs.start(); 207 tcpUri = connector.getConnectUri(); 208 209 workers = new Worker[WORKER_COUNT]; 210 for (int i=0; i < WORKER_COUNT; i++) { 211 workers[i] = new Worker(); 212 workers[i].start(); 213 } 214 215 } 216 217 protected void tearDown() throws Exception { 218 for (int i=0; i < WORKER_COUNT; i++) { 219 workers[i].stop(); 220 } 221 new ServiceStopper().stop(bs); 222 } 223 224 } 225 | Popular Tags |