1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.util.MessageIdList; 21 import org.apache.activemq.JmsMultipleBrokersTestSupport; 22 23 import javax.jms.MessageConsumer ; 24 import javax.jms.Destination ; 25 import java.net.URI ; 26 27 30 public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { 31 protected static final int MESSAGE_COUNT = 100; 32 33 36 public void test_AB_BC_BrokerNetwork() throws Exception { 37 bridgeBrokers("BrokerA", "BrokerB"); 39 bridgeBrokers("BrokerB", "BrokerC"); 40 41 startAllBrokers(); 42 43 Destination dest = createDestination("TEST.FOO", true); 45 46 MessageConsumer clientA = createConsumer("BrokerA", dest); 48 MessageConsumer clientB = createConsumer("BrokerB", dest); 49 MessageConsumer clientC = createConsumer("BrokerC", dest); 50 51 Thread.sleep(2000); 53 sendMessages("BrokerA", dest, MESSAGE_COUNT); 55 sendMessages("BrokerB", dest, MESSAGE_COUNT); 56 sendMessages("BrokerC", dest, MESSAGE_COUNT); 57 58 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 60 MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); 61 MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); 62 63 msgsA.waitForMessagesToArrive(MESSAGE_COUNT); 64 msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2); 65 msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2); 66 67 assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); 68 assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount()); 69 assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount()); 70 } 71 72 75 public void test_BA_BC_BrokerNetwork() throws Exception { 76 bridgeBrokers("BrokerB", "BrokerA"); 78 bridgeBrokers("BrokerB", "BrokerC"); 79 80 startAllBrokers(); 81 82 Destination dest = createDestination("TEST.FOO", true); 84 85 MessageConsumer clientA = createConsumer("BrokerA", dest); 87 MessageConsumer clientB = createConsumer("BrokerB", dest); 88 MessageConsumer clientC = createConsumer("BrokerC", dest); 89 90 Thread.sleep(2000); 92 sendMessages("BrokerA", dest, MESSAGE_COUNT); 94 sendMessages("BrokerB", dest, MESSAGE_COUNT); 95 sendMessages("BrokerC", dest, MESSAGE_COUNT); 96 97 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 99 MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); 100 MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); 101 102 msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 2); 103 msgsB.waitForMessagesToArrive(MESSAGE_COUNT); 104 msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2); 105 106 assertEquals(MESSAGE_COUNT * 2, msgsA.getMessageCount()); 107 assertEquals(MESSAGE_COUNT, msgsB.getMessageCount()); 108 assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount()); 109 } 110 111 114 public void test_AB_CB_BrokerNetwork() throws Exception { 115 bridgeBrokers("BrokerA", "BrokerB"); 117 bridgeBrokers("BrokerC", "BrokerB"); 118 119 startAllBrokers(); 120 121 Destination dest = createDestination("TEST.FOO", true); 123 124 MessageConsumer clientA = createConsumer("BrokerA", dest); 126 MessageConsumer clientB = createConsumer("BrokerB", dest); 127 MessageConsumer clientC = createConsumer("BrokerC", dest); 128 129 Thread.sleep(2000); 131 132 sendMessages("BrokerA", dest, MESSAGE_COUNT); 134 sendMessages("BrokerB", dest, MESSAGE_COUNT); 135 sendMessages("BrokerC", dest, MESSAGE_COUNT); 136 137 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 139 MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); 140 MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); 141 142 msgsA.waitForMessagesToArrive(MESSAGE_COUNT); 143 msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); 144 msgsC.waitForMessagesToArrive(MESSAGE_COUNT); 145 146 assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); 147 assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); 148 assertEquals(MESSAGE_COUNT, msgsC.getMessageCount()); 149 } 150 151 154 public void testAllConnectedBrokerNetwork() throws Exception { 155 bridgeBrokers("BrokerA", "BrokerB"); 157 bridgeBrokers("BrokerB", "BrokerA"); 158 bridgeBrokers("BrokerB", "BrokerC"); 159 bridgeBrokers("BrokerC", "BrokerB"); 160 bridgeBrokers("BrokerA", "BrokerC"); 161 bridgeBrokers("BrokerC", "BrokerA"); 162 163 startAllBrokers(); 164 165 Destination dest = createDestination("TEST.FOO", true); 167 168 MessageConsumer clientA = createConsumer("BrokerA", dest); 170 MessageConsumer clientB = createConsumer("BrokerB", dest); 171 MessageConsumer clientC = createConsumer("BrokerC", dest); 172 Thread.sleep(2000); 174 175 sendMessages("BrokerA", dest, MESSAGE_COUNT); 177 sendMessages("BrokerB", dest, MESSAGE_COUNT); 178 sendMessages("BrokerC", dest, MESSAGE_COUNT); 179 180 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 182 MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); 183 MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); 184 185 msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3); 186 msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); 187 msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3); 188 189 assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount()); 190 assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); 191 assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount()); 192 } 193 194 197 public void testAllConnectedUsingMulticast() throws Exception { 198 bridgeAllBrokers(); 200 201 startAllBrokers(); 202 203 Destination dest = createDestination("TEST.FOO", true); 205 206 MessageConsumer clientA = createConsumer("BrokerA", dest); 208 MessageConsumer clientB = createConsumer("BrokerB", dest); 209 MessageConsumer clientC = createConsumer("BrokerC", dest); 210 211 Thread.sleep(2000); 213 214 sendMessages("BrokerA", dest, MESSAGE_COUNT); 216 sendMessages("BrokerB", dest, MESSAGE_COUNT); 217 sendMessages("BrokerC", dest, MESSAGE_COUNT); 218 219 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 221 MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); 222 MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); 223 224 msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3); 225 msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); 226 msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3); 227 228 assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount()); 229 assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); 230 assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount()); 231 } 232 233 public void setUp() throws Exception { 234 super.setAutoFail(true); 235 super.setUp(); 236 createBroker(new URI ("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); 237 createBroker(new URI ("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); 238 createBroker(new URI ("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false")); 239 } 240 } 241 | Popular Tags |