1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.broker.BrokerService; 21 import org.apache.activemq.broker.TransportConnector; 22 import org.apache.activemq.network.DemandForwardingBridge; 23 import org.apache.activemq.network.NetworkBridgeConfiguration; 24 import org.apache.activemq.transport.TransportFactory; 25 import org.apache.activemq.JmsMultipleBrokersTestSupport; 26 import org.apache.activemq.command.Command; 27 import org.apache.activemq.util.MessageIdList; 28 29 import javax.jms.Destination ; 30 import javax.jms.MessageConsumer ; 31 import java.util.List ; 32 import java.util.ArrayList ; 33 import java.net.URI ; 34 35 import java.util.concurrent.atomic.AtomicInteger ; 36 37 40 public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultipleBrokersTestSupport { 41 protected static final int MESSAGE_COUNT = 10; 42 43 protected List bridges; 44 protected AtomicInteger msgDispatchCount; 45 46 49 public void testRemoteBrokerHasConsumer() throws Exception { 50 bridgeBrokers("BrokerA", "BrokerB"); 52 53 startAllBrokers(); 54 55 Destination dest = createDestination("TEST.FOO", true); 57 58 MessageConsumer clientA = createConsumer("BrokerA", dest); 60 MessageConsumer clientB = createConsumer("BrokerB", dest); 61 62 sendMessages("BrokerA", dest, MESSAGE_COUNT); 64 65 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 67 MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); 68 69 msgsA.waitForMessagesToArrive(MESSAGE_COUNT); 70 msgsB.waitForMessagesToArrive(MESSAGE_COUNT); 71 72 assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); 73 assertEquals(MESSAGE_COUNT, msgsB.getMessageCount()); 74 75 assertEquals(MESSAGE_COUNT, msgDispatchCount.get()); 77 } 78 79 82 public void testRemoteBrokerHasNoConsumer() throws Exception { 83 bridgeBrokers("BrokerA", "BrokerB"); 85 86 startAllBrokers(); 87 88 Destination dest = createDestination("TEST.FOO", true); 90 91 MessageConsumer clientA = createConsumer("BrokerA", dest); 93 94 sendMessages("BrokerA", dest, MESSAGE_COUNT); 96 97 MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); 99 100 msgsA.waitForMessagesToArrive(MESSAGE_COUNT); 101 102 assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); 103 104 assertEquals(0, msgDispatchCount.get()); 106 } 107 108 protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception { 109 List remoteTransports = remoteBroker.getTransportConnectors(); 110 List localTransports = localBroker.getTransportConnectors(); 111 112 URI remoteURI, localURI; 113 if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) { 114 remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri(); 115 localURI = ((TransportConnector)localTransports.get(0)).getConnectUri(); 116 117 if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) { 119 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 120 config.setBrokerName(localBroker.getBrokerName()); 121 DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI), 122 TransportFactory.connect(remoteURI)) { 123 protected void serviceLocalCommand(Command command) { 124 if (command.isMessageDispatch()) { 125 msgDispatchCount.incrementAndGet(); 127 } 128 129 super.serviceLocalCommand(command); 130 } 131 }; 132 bridges.add(bridge); 133 134 bridge.start(); 135 } else { 136 throw new Exception ("Remote broker or local broker is not using tcp connectors"); 137 } 138 } else { 139 throw new Exception ("Remote broker or local broker has no registered connectors."); 140 } 141 142 MAX_SETUP_TIME = 2000; 143 } 144 145 public void setUp() throws Exception { 146 super.setAutoFail(true); 147 super.setUp(); 148 createBroker(new URI ("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); 149 createBroker(new URI ("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); 150 151 bridges = new ArrayList (); 152 msgDispatchCount = new AtomicInteger (0); 153 } 154 } 155 | Popular Tags |