1 18 package org.apache.activemq.usecases; 19 import java.util.HashMap ; 20 import java.net.URI ; 21 import javax.jms.Connection ; 22 import javax.jms.DeliveryMode ; 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.ObjectMessage ; 29 import javax.jms.Queue ; 30 import javax.jms.QueueConnection ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.jms.TextMessage ; 36 import javax.jms.Topic ; 37 import org.apache.activemq.ActiveMQConnection; 38 import org.apache.activemq.ActiveMQConnectionFactory; 39 import org.apache.activemq.broker.BrokerService; 40 import org.apache.activemq.broker.BrokerFactory; 41 import org.apache.activemq.broker.Broker; 42 import org.apache.activemq.test.TestSupport; 43 import org.apache.activemq.util.IdGenerator; 44 import java.util.concurrent.atomic.AtomicBoolean ; 45 import java.util.concurrent.atomic.AtomicInteger ; 46 47 50 public class ReliableReconnectTest extends TestSupport { 51 private static final int RECEIVE_TIMEOUT = 10000; 52 protected static final int MESSAGE_COUNT = 100; 53 protected static final String DEFAULT_BROKER_URL = "vm://localhost"; 54 private IdGenerator idGen = new IdGenerator(); 55 protected int deliveryMode = DeliveryMode.PERSISTENT; 56 protected String consumerClientId; 57 protected Destination destination; 58 protected AtomicBoolean closeBroker = new AtomicBoolean (false); 59 protected AtomicInteger messagesReceived = new AtomicInteger (0); 60 protected BrokerService broker; 61 protected int firstBatch = MESSAGE_COUNT/10; 62 63 public ReliableReconnectTest() { 64 } 65 66 public ReliableReconnectTest(String n) { 67 super(n); 68 } 69 70 protected void setUp() throws Exception { 71 consumerClientId = idGen.generateId(); 72 super.setUp(); 73 topic = true; 74 destination = createDestination(getClass().getName()); 75 } 76 77 public ActiveMQConnectionFactory getConnectionFactory() throws Exception { 78 String url = "failover://" + DEFAULT_BROKER_URL; 79 return new ActiveMQConnectionFactory(url); 80 } 81 82 protected void startBroker() throws JMSException { 83 try { 84 broker = BrokerFactory.createBroker(new URI ("broker://()/localhost")); 85 broker.addConnector(DEFAULT_BROKER_URL); 86 broker.start(); 87 } catch (Exception e) { 88 e.printStackTrace(); 89 } 90 } 91 92 protected Connection createConsumerConnection() throws Exception { 93 Connection consumerConnection = getConnectionFactory().createConnection(); 94 consumerConnection.setClientID(consumerClientId); 95 consumerConnection.start(); 96 return consumerConnection; 97 } 98 99 protected MessageConsumer createConsumer(Connection con) throws Exception { 100 Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); 101 return s.createDurableSubscriber((Topic ) destination, "TestFred"); 102 } 103 104 protected void spawnConsumer() { 105 Thread thread = new Thread (new Runnable () { 106 public void run() { 107 try { 108 Connection consumerConnection = createConsumerConnection(); 109 MessageConsumer consumer = createConsumer(consumerConnection); 110 112 for (int i = 0;i < firstBatch;i++) { 113 Message msg = consumer.receive(RECEIVE_TIMEOUT); 114 if (msg != null) { 115 messagesReceived.incrementAndGet(); 117 } 118 } 119 synchronized (closeBroker) { 120 closeBroker.set(true); 121 closeBroker.notify(); 122 } 123 Thread.sleep(2000); 124 for (int i = firstBatch;i < MESSAGE_COUNT;i++) { 125 Message msg = consumer.receive(RECEIVE_TIMEOUT); 126 if (msg != null) { 128 messagesReceived.incrementAndGet(); 129 } 130 } 131 consumerConnection.close(); 132 synchronized (messagesReceived) { 133 messagesReceived.notify(); 134 } 135 } 136 catch (Throwable e) { 137 e.printStackTrace(); 138 } 139 } 140 }); 141 thread.start(); 142 } 143 144 public void testReconnect() throws Exception { 145 startBroker(); 146 Connection consumerConnection = createConsumerConnection(); 148 createConsumer(consumerConnection); 149 consumerConnection.close(); 150 Connection connection = createConnection(); 152 connection.setClientID(idGen.generateId()); 153 connection.start(); 154 Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 155 MessageProducer producer = producerSession.createProducer(destination); 156 TextMessage msg = producerSession.createTextMessage(); 157 for (int i = 0;i < MESSAGE_COUNT;i++) { 158 msg.setText("msg: " + i); 159 producer.send(msg); 160 } 161 connection.close(); 162 spawnConsumer(); 163 synchronized (closeBroker) { 164 if (!closeBroker.get()) { 165 closeBroker.wait(); 166 } 167 } 168 broker.stop(); 170 startBroker(); 171 synchronized (messagesReceived) { 173 if (messagesReceived.get() < MESSAGE_COUNT) { 174 messagesReceived.wait(60000); 175 } 176 } 177 int count = messagesReceived.get(); 179 assertTrue("Not enough messages received: " + count, count > firstBatch); 180 } 181 } 182 | Popular Tags |