1 18 package org.apache.activemq.transport.failover; 19 20 import java.net.URI ; 21 22 import javax.jms.DeliveryMode ; 23 24 import junit.framework.Test; 25 26 import org.apache.activemq.broker.StubConnection; 27 import org.apache.activemq.broker.TransportConnector; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQQueue; 30 import org.apache.activemq.command.ActiveMQTopic; 31 import org.apache.activemq.command.ConnectionInfo; 32 import org.apache.activemq.command.ConsumerInfo; 33 import org.apache.activemq.command.ProducerInfo; 34 import org.apache.activemq.command.SessionInfo; 35 import org.apache.activemq.network.NetworkTestSupport; 36 import org.apache.activemq.transport.Transport; 37 import org.apache.activemq.transport.TransportFactory; 38 import org.apache.activemq.transport.failover.FailoverTransport; 39 40 public class FailoverTransportBrokerTest extends NetworkTestSupport { 41 42 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 43 .getLog(FailoverTransportBrokerTest.class); 44 45 public ActiveMQDestination destination; 46 public int deliveryMode; 47 48 public void initCombosForTestPublisherFailsOver() { 49 addCombinationValues( "deliveryMode", new Object []{ 50 new Integer (DeliveryMode.NON_PERSISTENT), 51 new Integer (DeliveryMode.PERSISTENT) 52 } ); 53 addCombinationValues( "destination", new Object []{ 54 new ActiveMQQueue("TEST"), 55 new ActiveMQTopic("TEST"), 56 } ); 57 } 58 public void testPublisherFailsOver() throws Exception { 59 60 StubConnection connection1 = createConnection(); 62 ConnectionInfo connectionInfo1 = createConnectionInfo(); 63 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 64 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 65 connection1.send(connectionInfo1); 66 connection1.send(sessionInfo1); 67 connection1.request(consumerInfo1); 68 69 StubConnection connection2 = createRemoteConnection(); 71 ConnectionInfo connectionInfo2 = createConnectionInfo(); 72 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 73 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 74 connection2.send(connectionInfo2); 75 connection2.send(sessionInfo2); 76 connection2.request(consumerInfo2); 77 78 log.info("Starting the failover connection."); 80 StubConnection connection3 = createFailoverConnection(); 81 ConnectionInfo connectionInfo3 = createConnectionInfo(); 82 SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); 83 ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3); 84 connection3.send(connectionInfo3); 85 connection3.send(sessionInfo3); 86 connection3.send(producerInfo3); 87 88 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 90 91 FailoverTransport ft = (FailoverTransport) connection3.getTransport().narrow(FailoverTransport.class); 93 94 StubConnection connectionA; 96 StubConnection connectionB; 97 TransportConnector serverA; 98 if( connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI() ) ) { 99 connectionA=connection1; 100 connectionB=connection2; 101 serverA = connector; 102 } else { 103 connectionA=connection2; 104 connectionB=connection1; 105 serverA = remoteConnector; 106 } 107 108 assertNotNull(receiveMessage(connectionA)); 109 assertNoMessagesLeft(connectionB); 110 111 log.info("Disconnecting the active connection"); 113 serverA.stop(); 114 115 connection3.request(createMessage(producerInfo3, destination, deliveryMode)); 116 117 assertNotNull(receiveMessage(connectionB)); 118 assertNoMessagesLeft(connectionA); 119 120 } 121 122 protected String getLocalURI() { 123 return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 124 } 125 126 protected String getRemoteURI() { 127 return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; 128 } 129 130 protected StubConnection createFailoverConnection() throws Exception { 131 URI failoverURI = new URI ("failover://"+connector.getServer().getConnectURI()+","+remoteConnector.getServer().getConnectURI()+""); 132 Transport transport = TransportFactory.connect(failoverURI); 133 StubConnection connection = new StubConnection(transport); 134 connections.add(connection); 135 return connection; 136 } 137 138 139 public static Test suite() { 140 return suite(FailoverTransportBrokerTest.class); 141 } 142 143 public static void main(String [] args) { 144 junit.textui.TestRunner.run(suite()); 145 } 146 147 } 148 | Popular Tags |