1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.JmsMultipleBrokersTestSupport; 21 import org.apache.activemq.ActiveMQConnectionFactory; 22 import org.apache.activemq.ActiveMQPrefetchPolicy; 23 24 import javax.jms.Destination ; 25 import javax.jms.Message ; 26 import javax.jms.Connection ; 27 import javax.jms.Session ; 28 import javax.jms.MessageConsumer ; 29 import java.net.URI ; 30 31 34 public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport { 35 protected static final int MESSAGE_COUNT = 100; protected static final int PREFETCH_COUNT = 1; 37 38 protected int msgsClient1, msgsClient2; 39 protected String broker1, broker2; 40 41 public void testClientAReceivesOnly() throws Exception { 42 broker1 = "BrokerA"; 43 broker2 = "BrokerB"; 44 45 doOneClientReceivesOnly(); 46 } 47 48 public void testClientBReceivesOnly() throws Exception { 49 broker1 = "BrokerB"; 50 broker2 = "BrokerA"; 51 52 doOneClientReceivesOnly(); 53 } 54 55 public void doOneClientReceivesOnly() throws Exception { 56 bridgeBrokers(broker1, broker2); 58 bridgeBrokers(broker2, broker1); 59 60 startAllBrokers(); 62 63 Destination dest = createDestination("TEST.FOO", false); 65 66 MessageConsumer client1 = createConsumer(broker1, dest); 68 MessageConsumer client2 = createConsumer(broker2, dest); 69 70 Thread.sleep(500); 72 73 sendMessages("BrokerA", dest, MESSAGE_COUNT); 75 76 client2.close(); 78 79 msgsClient1 += receiveAllMessages(client1); 81 client1.close(); 82 83 assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT, msgsClient1); 85 } 86 87 public void testClientAReceivesOnlyAfterReconnect() throws Exception { 88 broker1 = "BrokerA"; 89 broker2 = "BrokerB"; 90 91 doOneClientReceivesOnlyAfterReconnect(); 92 } 93 94 public void testClientBReceivesOnlyAfterReconnect() throws Exception { 95 broker1 = "BrokerB"; 96 broker2 = "BrokerA"; 97 98 doOneClientReceivesOnlyAfterReconnect(); 99 } 100 101 public void doOneClientReceivesOnlyAfterReconnect() throws Exception { 102 bridgeBrokers(broker1, broker2); 104 bridgeBrokers(broker2, broker1); 105 106 startAllBrokers(); 108 109 Destination dest = createDestination("TEST.FOO", false); 111 112 MessageConsumer client1 = createConsumer(broker1, dest); 114 MessageConsumer client2 = createConsumer(broker2, dest); 115 116 Thread.sleep(500); 118 119 sendMessages("BrokerA", dest, MESSAGE_COUNT); 121 122 msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); 124 125 client1.close(); 127 128 client1 = createConsumer(broker1, dest); 130 Thread.sleep(500); 131 132 client2.close(); 134 135 msgsClient1 += receiveAllMessages(client1); 137 client1.close(); 138 139 assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT, msgsClient1); 141 } 142 143 public void testTwoClientsReceiveClientADisconnects() throws Exception { 144 broker1 = "BrokerA"; 145 broker2 = "BrokerB"; 146 147 doTwoClientsReceiveOneClientDisconnects(); 148 } 149 150 public void testTwoClientsReceiveClientBDisconnects() throws Exception { 151 broker1 = "BrokerB"; 152 broker2 = "BrokerA"; 153 154 doTwoClientsReceiveOneClientDisconnects(); 155 } 156 157 public void doTwoClientsReceiveOneClientDisconnects() throws Exception { 158 bridgeBrokers(broker1, broker2); 160 bridgeBrokers(broker2, broker1); 161 162 startAllBrokers(); 164 165 Destination dest = createDestination("TEST.FOO", false); 167 168 MessageConsumer client1 = createConsumer(broker1, dest); 170 MessageConsumer client2 = createConsumer(broker2, dest); 171 172 Thread.sleep(500); 174 175 sendMessages("BrokerA", dest, MESSAGE_COUNT); 177 178 msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); 180 msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); 181 182 client1.close(); 184 185 msgsClient2 += receiveAllMessages(client2); 187 client2.close(); 188 189 assertEquals("Client for " + broker1 + " should have received 20% of the messages.", (int)(MESSAGE_COUNT * 0.20), msgsClient1); 191 192 assertEquals("Client for " + broker2 + " should have received 80% of the messages.", (int)(MESSAGE_COUNT * 0.80), msgsClient2); 194 } 195 196 public void testTwoClientsReceiveClientAReconnects() throws Exception { 197 broker1 = "BrokerA"; 198 broker2 = "BrokerB"; 199 200 doTwoClientsReceiveOneClientReconnects(); 201 } 202 203 public void testTwoClientsReceiveClientBReconnects() throws Exception { 204 broker1 = "BrokerB"; 205 broker2 = "BrokerA"; 206 207 doTwoClientsReceiveOneClientReconnects(); 208 } 209 210 public void doTwoClientsReceiveOneClientReconnects() throws Exception { 211 bridgeBrokers(broker1, broker2); 213 bridgeBrokers(broker2, broker1); 214 215 startAllBrokers(); 217 218 Destination dest = createDestination("TEST.FOO", false); 220 221 MessageConsumer client1 = createConsumer(broker1, dest); 223 MessageConsumer client2 = createConsumer(broker2, dest); 224 225 Thread.sleep(500); 227 228 sendMessages("BrokerA", dest, MESSAGE_COUNT); 230 231 msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); 233 msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); 234 235 client1.close(); 237 238 msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); 240 241 client1 = createConsumer(broker1, dest); 243 Thread.sleep(500); 244 245 msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); 247 client1.close(); 248 249 msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); 250 client2.close(); 251 252 assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1); 254 255 assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2); 257 } 258 259 public void testTwoClientsReceiveTwoClientReconnects() throws Exception { 260 broker1 = "BrokerA"; 261 broker2 = "BrokerB"; 262 263 bridgeBrokers(broker1, broker2); 265 bridgeBrokers(broker2, broker1); 266 267 startAllBrokers(); 269 270 Destination dest = createDestination("TEST.FOO", false); 272 273 MessageConsumer client1 = createConsumer(broker1, dest); 275 MessageConsumer client2 = createConsumer(broker2, dest); 276 277 Thread.sleep(500); 279 280 sendMessages("BrokerA", dest, MESSAGE_COUNT); 282 283 msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); 285 msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); 286 287 client1.close(); 289 client2.close(); 290 291 client1 = createConsumer(broker1, dest); 293 client2 = createConsumer(broker2, dest); 294 Thread.sleep(500); 295 296 msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30)); 298 client1.close(); 299 300 msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30)); 301 client2.close(); 302 303 assertEquals("Client for " + broker1 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient1); 305 306 assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2); 308 } 309 310 protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception { 311 Message msg; 312 int i; 313 for (i=0; i<msgCount; i++) { 314 msg = consumer.receive(1000); 315 if (msg == null) { 316 System.err.println("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i); 317 break; 318 } 319 } 320 321 return i; 322 } 323 324 protected int receiveAllMessages(MessageConsumer consumer) throws Exception { 325 int msgsReceived = 0; 326 327 Message msg; 328 do { 329 msg = consumer.receive(1000); 330 if (msg != null) { 331 msgsReceived++; 332 } 333 } while (msg != null); 334 335 return msgsReceived; 336 } 337 338 protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception { 339 Connection conn = createConnection(brokerName); 340 conn.start(); 341 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 342 return sess.createConsumer(dest); 343 } 344 345 public void setUp() throws Exception { 346 super.setAutoFail(true); 347 super.setUp(); 348 createBroker(new URI ("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); 349 createBroker(new URI ("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); 350 351 ActiveMQConnectionFactory factoryA, factoryB; 353 factoryA = (ActiveMQConnectionFactory)getConnectionFactory("BrokerA"); 354 factoryB = (ActiveMQConnectionFactory)getConnectionFactory("BrokerB"); 355 356 ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); 358 policy.setAll(PREFETCH_COUNT); 359 360 factoryA.setPrefetchPolicy(policy); 361 factoryB.setPrefetchPolicy(policy); 362 363 msgsClient1 = 0; 364 msgsClient2 = 0; 365 } 366 } 367 | Popular Tags |