1 18 package org.apache.activemq.usecases; 19 20 import java.net.URI ; 21 import java.util.HashMap ; 22 import java.util.Map ; 23 import java.util.concurrent.CountDownLatch ; 24 import java.util.concurrent.TimeUnit ; 25 26 import javax.jms.Destination ; 27 import javax.jms.MessageConsumer ; 28 29 import org.apache.activemq.JmsMultipleBrokersTestSupport; 30 import org.apache.activemq.util.MessageIdList; 31 32 35 public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport { 36 public static final int BROKER_COUNT = 2; public static final int CONSUMER_COUNT = 3; public static final int PRODUCER_COUNT = 3; public static final int MESSAGE_COUNT = 10; 41 protected Map consumerMap; 42 43 public void testTopicAllConnected() throws Exception { 44 bridgeAllBrokers(); 45 startAllBrokers(); 46 47 48 Destination dest = createDestination("TEST.FOO", true); 50 51 CountDownLatch latch = new CountDownLatch ( 52 BROKER_COUNT * PRODUCER_COUNT * 53 BROKER_COUNT * CONSUMER_COUNT * 54 MESSAGE_COUNT); 55 56 for (int i=1; i<=BROKER_COUNT; i++) { 58 for (int j=0; j<CONSUMER_COUNT; j++) { 59 consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest, latch)); 60 } 61 } 62 63 Thread.sleep(5000); 65 66 for (int i=1; i<=BROKER_COUNT; i++) { 68 for (int j=0; j<PRODUCER_COUNT; j++) { 69 sendMessages("Broker" + i, dest, MESSAGE_COUNT); 70 } 71 } 72 73 assertTrue("Missing "+latch.getCount()+ " messages", latch.await(30, TimeUnit.SECONDS)); 74 75 for (int i=1; i<=BROKER_COUNT; i++) { 77 for (int j=0; j<CONSUMER_COUNT; j++) { 78 MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer )consumerMap.get("Consumer:" + i + ":" + j)); 79 assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, msgs.getMessageCount()); 80 } 81 } 82 83 } 84 85 public void testQueueAllConnected() throws Exception { 86 bridgeAllBrokers(); 87 startAllBrokers(); 88 89 Destination dest = createDestination("TEST.FOO", false); 91 92 CountDownLatch latch = new CountDownLatch (BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT); 93 94 for (int i=1; i<=BROKER_COUNT; i++) { 96 for (int j=0; j<CONSUMER_COUNT; j++) { 97 consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest, latch)); 98 } 99 } 100 101 Thread.sleep(5000); 103 104 for (int i=1; i<=BROKER_COUNT; i++) { 106 for (int j=0; j<PRODUCER_COUNT; j++) { 107 sendMessages("Broker" + i, dest, MESSAGE_COUNT); 108 } 109 } 110 111 assertTrue("Missing "+latch.getCount()+ " messages", latch.await(30, TimeUnit.SECONDS)); 113 114 int totalMsg = 0; 116 for (int i=1; i<=BROKER_COUNT; i++) { 117 for (int j=0; j<CONSUMER_COUNT; j++) { 118 MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer )consumerMap.get("Consumer:" + i + ":" + j)); 119 totalMsg += msgs.getMessageCount(); 120 } 121 } 122 assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg); 123 } 124 125 public void setUp() throws Exception { 126 super.setAutoFail(true); 127 super.setUp(); 128 129 for (int i=1; i<=BROKER_COUNT; i++) { 131 createBroker(new URI ("broker:()/Broker" + i + "?persistent=false&useJmx=false")); 132 } 133 134 consumerMap = new HashMap (); 135 } 136 } 137 | Popular Tags |