1 18 package org.apache.activemq.transport.fanout; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 23 import javax.jms.DeliveryMode ; 24 25 import junit.framework.Test; 26 27 import org.apache.activemq.broker.StubConnection; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQQueue; 30 import org.apache.activemq.command.ActiveMQTopic; 31 import org.apache.activemq.command.ConnectionInfo; 32 import org.apache.activemq.command.ConsumerInfo; 33 import org.apache.activemq.command.ProducerInfo; 34 import org.apache.activemq.command.SessionInfo; 35 import org.apache.activemq.network.NetworkTestSupport; 36 import org.apache.activemq.transport.Transport; 37 import org.apache.activemq.transport.TransportFactory; 38 import org.apache.activemq.transport.TransportFilter; 39 import org.apache.activemq.transport.mock.MockTransport; 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 import java.util.concurrent.CountDownLatch ; 44 import java.util.concurrent.TimeUnit ; 45 46 public class FanoutTransportBrokerTest extends NetworkTestSupport { 47 48 private static final Log log = LogFactory.getLog(FanoutTransportBrokerTest.class); 49 50 public ActiveMQDestination destination; 51 public int deliveryMode; 52 53 private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 54 55 public static Test suite() { 56 return suite(FanoutTransportBrokerTest.class); 57 } 58 59 public static void main(String [] args) { 60 junit.textui.TestRunner.run(suite()); 61 } 62 63 public void initCombosForTestPublisherFansout() { 64 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 65 new Integer (DeliveryMode.PERSISTENT) }); 66 addCombinationValues("destination", new Object [] { new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST"), }); 67 } 68 69 public void xtestPublisherFansout() throws Exception { 70 71 StubConnection connection1 = createConnection(); 73 ConnectionInfo connectionInfo1 = createConnectionInfo(); 74 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 75 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 76 connection1.send(connectionInfo1); 77 connection1.send(sessionInfo1); 78 connection1.request(consumerInfo1); 79 80 StubConnection connection2 = createRemoteConnection(); 82 ConnectionInfo connectionInfo2 = createConnectionInfo(); 83 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 84 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 85 connection2.send(connectionInfo2); 86 connection2.send(sessionInfo2); 87 connection2.request(consumerInfo2); 88 89 log.info("Starting the fanout connection."); 91 StubConnection connection3 = createFanoutConnection(); 92 ConnectionInfo connectionInfo3 = createConnectionInfo(); 93 SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); 94 ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3); 95 connection3.send(connectionInfo3); 96 connection3.send(sessionInfo3); 97 connection3.send(producerInfo3); 98 99 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 101 102 assertNotNull(receiveMessage(connection1)); 103 assertNoMessagesLeft(connection1); 104 105 assertNotNull(receiveMessage(connection2)); 106 assertNoMessagesLeft(connection2); 107 108 } 109 110 111 public void initCombosForTestPublisherWaitsForServerToBeUp() { 112 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 113 new Integer (DeliveryMode.PERSISTENT) }); 114 addCombinationValues("destination", new Object [] { new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST"), }); 115 } 116 public void testPublisherWaitsForServerToBeUp() throws Exception { 117 118 StubConnection connection1 = createConnection(); 120 ConnectionInfo connectionInfo1 = createConnectionInfo(); 121 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 122 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 123 connection1.send(connectionInfo1); 124 connection1.send(sessionInfo1); 125 connection1.request(consumerInfo1); 126 127 StubConnection connection2 = createRemoteConnection(); 129 ConnectionInfo connectionInfo2 = createConnectionInfo(); 130 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 131 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 132 connection2.send(connectionInfo2); 133 connection2.send(sessionInfo2); 134 connection2.request(consumerInfo2); 135 136 log.info("Starting the fanout connection."); 138 final StubConnection connection3 = createFanoutConnection(); 139 ConnectionInfo connectionInfo3 = createConnectionInfo(); 140 SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); 141 final ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3); 142 connection3.send(connectionInfo3); 143 connection3.send(sessionInfo3); 144 connection3.send(producerInfo3); 145 146 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 148 149 assertNotNull(receiveMessage(connection1)); 150 assertNoMessagesLeft(connection1); 151 152 assertNotNull(receiveMessage(connection2)); 153 assertNoMessagesLeft(connection2); 154 155 final CountDownLatch publishDone = new CountDownLatch (1); 156 157 MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class); 160 mt.install(new TransportFilter(mt.getNext()) { 161 public void oneway(Object command) throws IOException { 162 log.info("Dropping: "+command); 163 } 165 }); 166 167 new Thread () { 169 public void run() { 170 try { 172 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 173 } catch (Throwable e) { 174 e.printStackTrace(); 175 } 176 publishDone.countDown(); 177 } 178 }.start(); 179 180 assertFalse( publishDone.await(3, TimeUnit.SECONDS) ); 182 183 remoteURI = remoteConnector.getServer().getConnectURI().toString(); 185 restartRemoteBroker(); 186 187 assertTrue( publishDone.await(10, TimeUnit.SECONDS) ); 189 190 } 191 192 protected String getLocalURI() { 193 return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 194 } 195 196 protected String getRemoteURI() { 197 return remoteURI; 198 } 199 200 protected StubConnection createFanoutConnection() throws Exception { 201 URI fanoutURI = new URI ("fanout://static://(" + connector.getServer().getConnectURI() + "," 202 + "mock://"+remoteConnector.getServer().getConnectURI() + ")"); 203 Transport transport = TransportFactory.connect(fanoutURI); 204 StubConnection connection = new StubConnection(transport); 205 connections.add(connection); 206 return connection; 207 } 208 209 } 210 | Popular Tags |