1 18 package org.apache.activemq.transport.discovery; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 24 import javax.jms.DeliveryMode ; 25 26 import junit.framework.Test; 27 28 import org.apache.activemq.broker.StubConnection; 29 import org.apache.activemq.broker.TransportConnector; 30 import org.apache.activemq.command.ActiveMQDestination; 31 import org.apache.activemq.command.ActiveMQQueue; 32 import org.apache.activemq.command.ConnectionInfo; 33 import org.apache.activemq.command.ConsumerInfo; 34 import org.apache.activemq.command.ProducerInfo; 35 import org.apache.activemq.command.SessionInfo; 36 import org.apache.activemq.network.NetworkTestSupport; 37 import org.apache.activemq.transport.Transport; 38 import org.apache.activemq.transport.TransportFactory; 39 import org.apache.activemq.transport.failover.FailoverTransport; 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 public class DiscoveryTransportBrokerTest extends NetworkTestSupport { 44 45 static final private Log log = LogFactory.getLog(DiscoveryTransportBrokerTest.class); 46 47 public void setUp() throws Exception { 48 super.setAutoFail(true); 49 super.setUp(); 50 } 51 52 public void testPublisherFailsOver() throws Exception { 53 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 54 int deliveryMode = DeliveryMode.NON_PERSISTENT; 55 56 StubConnection connection1 = createConnection(); 58 ConnectionInfo connectionInfo1 = createConnectionInfo(); 59 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 60 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 61 connection1.send(connectionInfo1); 62 connection1.send(sessionInfo1); 63 connection1.request(consumerInfo1); 64 65 StubConnection connection2 = createRemoteConnection(); 67 ConnectionInfo connectionInfo2 = createConnectionInfo(); 68 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 69 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 70 connection2.send(connectionInfo2); 71 connection2.send(sessionInfo2); 72 connection2.request(consumerInfo2); 73 74 StubConnection connection3 = createFailoverConnection(); 76 ConnectionInfo connectionInfo3 = createConnectionInfo(); 77 SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); 78 ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3); 79 connection3.send(connectionInfo3); 80 connection3.send(sessionInfo3); 81 connection3.send(producerInfo3); 82 83 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 85 86 FailoverTransport ft = (FailoverTransport) connection3.getTransport().narrow(FailoverTransport.class); 88 89 StubConnection connectionA; 91 StubConnection connectionB; 92 TransportConnector serverA; 93 if( connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI() ) ) { 94 connectionA=connection1; 95 connectionB=connection2; 96 serverA = connector; 97 } else { 98 connectionA=connection2; 99 connectionB=connection1; 100 serverA = remoteConnector; 101 } 102 103 assertNotNull(receiveMessage(connectionA)); 104 assertNoMessagesLeft(connectionB); 105 106 log.info("Disconnecting active server"); 108 serverA.stop(); 109 110 log.info("Sending request that should failover"); 111 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 112 113 assertNotNull(receiveMessage(connectionB)); 114 assertNoMessagesLeft(connectionA); 115 116 } 117 118 protected String getLocalURI() { 119 return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 120 } 121 122 protected String getRemoteURI() { 123 return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 124 } 125 126 protected TransportConnector createConnector() throws Exception , IOException , URISyntaxException { 127 TransportConnector x = super.createConnector(); 128 x.setDiscoveryUri(new URI (getDiscoveryUri())); 129 return x; 130 } 131 132 protected String getDiscoveryUri() { 133 return "multicast://default"; 134 } 135 136 protected TransportConnector createRemoteConnector() throws Exception , IOException , URISyntaxException { 137 TransportConnector x = super.createRemoteConnector(); 138 x.setDiscoveryUri(new URI (getDiscoveryUri())); 139 return x; 140 } 141 142 protected StubConnection createFailoverConnection() throws Exception { 143 URI failoverURI = new URI ("discovery:" + getDiscoveryUri()); 144 Transport transport = TransportFactory.connect(failoverURI); 145 StubConnection connection = new StubConnection(transport); 146 connections.add(connection); 147 return connection; 148 } 149 150 public static Test suite() { 151 return suite(DiscoveryTransportBrokerTest.class); 152 } 153 154 public static void main(String [] args) { 155 junit.textui.TestRunner.run(suite()); 156 } 157 158 } 159 | Popular Tags |