1 18 package org.apache.activemq.network; 19 20 import java.net.URI ; 21 import java.util.ArrayList ; 22 import java.util.Iterator ; 23 24 import javax.jms.Connection ; 25 import javax.jms.Destination ; 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 32 import junit.framework.TestCase; 33 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.activemq.advisory.ConsumerEvent; 36 import org.apache.activemq.advisory.ConsumerEventSource; 37 import org.apache.activemq.advisory.ConsumerListener; 38 import org.apache.activemq.broker.BrokerFactory; 39 import org.apache.activemq.broker.BrokerService; 40 import org.apache.activemq.command.ActiveMQQueue; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 import java.util.concurrent.atomic.AtomicInteger ; 45 46 52 public class NetworkReconnectTest extends TestCase { 53 54 protected static final Log log = LogFactory.getLog(NetworkReconnectTest.class); 55 56 private BrokerService producerBroker; 57 private BrokerService consumerBroker; 58 private ActiveMQConnectionFactory producerConnectionFactory; 59 private ActiveMQConnectionFactory consumerConnectionFactory; 60 private Destination destination; 61 private ArrayList connections = new ArrayList (); 62 63 public void testMultipleProducerBrokerRestarts() throws Exception { 64 for (int i = 0; i < 10; i++) { 65 testWithProducerBrokerRestart(); 66 disposeConsumerConnections(); 67 } 68 } 69 70 public void testWithoutRestarts() throws Exception { 71 startProducerBroker(); 72 startConsumerBroker(); 73 74 MessageConsumer consumer = createConsumer(); 75 AtomicInteger counter = createConsumerCounter(producerConnectionFactory); 76 waitForConsumerToArrive(counter); 77 78 String messageId = sendMessage(); 79 Message message = consumer.receive(1000); 80 81 assertEquals(messageId, message.getJMSMessageID()); 82 83 assertNull( consumer.receiveNoWait() ); 84 85 } 86 87 public void testWithProducerBrokerRestart() throws Exception { 88 startProducerBroker(); 89 startConsumerBroker(); 90 91 MessageConsumer consumer = createConsumer(); 92 AtomicInteger counter = createConsumerCounter(producerConnectionFactory); 93 waitForConsumerToArrive(counter); 94 95 String messageId = sendMessage(); 96 Message message = consumer.receive(1000); 97 98 assertEquals(messageId, message.getJMSMessageID()); 99 assertNull( consumer.receiveNoWait() ); 100 101 stopProducerBroker(); 103 startProducerBroker(); 104 105 counter = createConsumerCounter(producerConnectionFactory); 106 waitForConsumerToArrive(counter); 107 108 messageId = sendMessage(); 109 message = consumer.receive(1000); 110 111 assertEquals(messageId, message.getJMSMessageID()); 112 assertNull( consumer.receiveNoWait() ); 113 114 } 115 116 public void testWithConsumerBrokerRestart() throws Exception { 117 118 startProducerBroker(); 119 startConsumerBroker(); 120 121 MessageConsumer consumer = createConsumer(); 122 AtomicInteger counter = createConsumerCounter(producerConnectionFactory); 123 waitForConsumerToArrive(counter); 124 125 String messageId = sendMessage(); 126 Message message = consumer.receive(1000); 127 128 assertEquals(messageId, message.getJMSMessageID()); 129 assertNull( consumer.receiveNoWait() ); 130 131 stopConsumerBroker(); 133 waitForConsumerToLeave(counter); 134 startConsumerBroker(); 135 136 consumer = createConsumer(); 137 waitForConsumerToArrive(counter); 138 139 messageId = sendMessage(); 140 message = consumer.receive(1000); 141 142 assertEquals(messageId, message.getJMSMessageID()); 143 assertNull( consumer.receiveNoWait() ); 144 145 } 146 147 public void testWithConsumerBrokerStartDelay() throws Exception { 148 149 startConsumerBroker(); 150 MessageConsumer consumer = createConsumer(); 151 152 Thread.sleep(1000*5); 153 154 startProducerBroker(); 155 AtomicInteger counter = createConsumerCounter(producerConnectionFactory); 156 waitForConsumerToArrive(counter); 157 158 String messageId = sendMessage(); 159 Message message = consumer.receive(1000); 160 161 assertEquals(messageId, message.getJMSMessageID()); 162 163 assertNull( consumer.receiveNoWait() ); 164 165 } 166 167 168 public void testWithProducerBrokerStartDelay() throws Exception { 169 170 startProducerBroker(); 171 AtomicInteger counter = createConsumerCounter(producerConnectionFactory); 172 173 Thread.sleep(1000*5); 174 175 startConsumerBroker(); 176 MessageConsumer consumer = createConsumer(); 177 178 waitForConsumerToArrive(counter); 179 180 String messageId = sendMessage(); 181 Message message = consumer.receive(1000); 182 183 assertEquals(messageId, message.getJMSMessageID()); 184 185 assertNull( consumer.receiveNoWait() ); 186 187 } 188 189 protected void setUp() throws Exception { 190 191 log.info("==============================================================================="); 192 log.info("Running Test Case: "+getName()); 193 log.info("==============================================================================="); 194 195 producerConnectionFactory = createProducerConnectionFactory(); 196 consumerConnectionFactory = createConsumerConnectionFactory(); 197 destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE"); 198 199 } 200 201 protected void tearDown() throws Exception { 202 disposeConsumerConnections(); 203 try { 204 stopProducerBroker(); 205 } catch (Throwable e) { 206 } 207 try { 208 stopConsumerBroker(); 209 } catch (Throwable e) { 210 } 211 } 212 213 protected void disposeConsumerConnections() { 214 for (Iterator iter = connections.iterator(); iter.hasNext();) { 215 Connection connection = (Connection ) iter.next(); 216 try { connection.close(); } catch (Throwable ignore) {} 217 } 218 } 219 220 protected void startProducerBroker() throws Exception { 221 if( producerBroker==null ) { 222 producerBroker = createFirstBroker(); 223 producerBroker.start(); 224 } 225 } 226 227 protected void stopProducerBroker() throws Exception { 228 if( producerBroker!=null ) { 229 producerBroker.stop(); 230 producerBroker=null; 231 } 232 } 233 234 protected void startConsumerBroker() throws Exception { 235 if( consumerBroker==null ) { 236 consumerBroker = createSecondBroker(); 237 consumerBroker.start(); 238 } 239 } 240 241 protected void stopConsumerBroker() throws Exception { 242 if( consumerBroker!=null ) { 243 consumerBroker.stop(); 244 consumerBroker=null; 245 } 246 } 247 248 protected BrokerService createFirstBroker() throws Exception { 249 return BrokerFactory.createBroker(new URI ("xbean:org/apache/activemq/network/reconnect-broker1.xml")); 250 } 251 252 protected BrokerService createSecondBroker() throws Exception { 253 return BrokerFactory.createBroker(new URI ("xbean:org/apache/activemq/network/reconnect-broker2.xml")); 254 } 255 256 protected ActiveMQConnectionFactory createProducerConnectionFactory() { 257 return new ActiveMQConnectionFactory("vm://broker1"); 258 } 259 260 protected ActiveMQConnectionFactory createConsumerConnectionFactory() { 261 return new ActiveMQConnectionFactory("vm://broker2"); 262 } 263 264 protected String sendMessage() throws JMSException { 265 Connection connection = null; 266 try { 267 connection = producerConnectionFactory.createConnection(); 268 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 269 MessageProducer producer = session.createProducer(destination); 270 Message message = session.createMessage(); 271 producer.send(message); 272 return message.getJMSMessageID(); 273 } finally { 274 try { connection.close(); } catch (Throwable ignore) {} 275 } 276 } 277 278 protected MessageConsumer createConsumer() throws JMSException { 279 Connection connection = consumerConnectionFactory.createConnection(); 280 connections.add(connection); 281 connection.start(); 282 283 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 284 return session.createConsumer(destination); 285 } 286 287 protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception { 288 final AtomicInteger rc = new AtomicInteger (0); 289 Connection connection = cf.createConnection(); 290 connections.add(connection); 291 connection.start(); 292 293 ConsumerEventSource source = new ConsumerEventSource(connection, destination); 294 source.setConsumerListener(new ConsumerListener(){ 295 public void onConsumerEvent(ConsumerEvent event) { 296 rc.set(event.getConsumerCount()); 297 } 298 }); 299 source.start(); 300 301 return rc; 302 } 303 304 protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException { 305 for( int i=0; i < 100; i++ ) { 306 if( consumerCounter.get() > 0 ) { 307 return; 308 } 309 Thread.sleep(100); 310 } 311 fail("The consumer did not arrive."); 312 } 313 314 protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException { 315 for( int i=0; i < 100; i++ ) { 316 if( consumerCounter.get() == 0 ) { 317 return; 318 } 319 Thread.sleep(100); 320 } 321 fail("The consumer did not leave."); 322 } 323 324 } 325 | Popular Tags |