KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > failover > FailoverTransportBrokerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.failover;
19
20 import java.net.URI JavaDoc;
21
22 import javax.jms.DeliveryMode JavaDoc;
23
24 import junit.framework.Test;
25
26 import org.apache.activemq.broker.StubConnection;
27 import org.apache.activemq.broker.TransportConnector;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQQueue;
30 import org.apache.activemq.command.ActiveMQTopic;
31 import org.apache.activemq.command.ConnectionInfo;
32 import org.apache.activemq.command.ConsumerInfo;
33 import org.apache.activemq.command.ProducerInfo;
34 import org.apache.activemq.command.SessionInfo;
35 import org.apache.activemq.network.NetworkTestSupport;
36 import org.apache.activemq.transport.Transport;
37 import org.apache.activemq.transport.TransportFactory;
38 import org.apache.activemq.transport.failover.FailoverTransport;
39
40 public class FailoverTransportBrokerTest extends NetworkTestSupport {
41     
42     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
43             .getLog(FailoverTransportBrokerTest.class);
44     
45     public ActiveMQDestination destination;
46     public int deliveryMode;
47
48     public void initCombosForTestPublisherFailsOver() {
49         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
50                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
51                 new Integer JavaDoc(DeliveryMode.PERSISTENT)
52                 } );
53         addCombinationValues( "destination", new Object JavaDoc[]{
54                 new ActiveMQQueue("TEST"),
55                 new ActiveMQTopic("TEST"),
56                 } );
57     }
58     public void testPublisherFailsOver() throws Exception JavaDoc {
59         
60         // Start a normal consumer on the local broker
61
StubConnection connection1 = createConnection();
62         ConnectionInfo connectionInfo1 = createConnectionInfo();
63         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
64         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
65         connection1.send(connectionInfo1);
66         connection1.send(sessionInfo1);
67         connection1.request(consumerInfo1);
68
69         // Start a normal consumer on a remote broker
70
StubConnection connection2 = createRemoteConnection();
71         ConnectionInfo connectionInfo2 = createConnectionInfo();
72         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
73         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
74         connection2.send(connectionInfo2);
75         connection2.send(sessionInfo2);
76         connection2.request(consumerInfo2);
77         
78         // Start a failover publisher.
79
log.info("Starting the failover connection.");
80         StubConnection connection3 = createFailoverConnection();
81         ConnectionInfo connectionInfo3 = createConnectionInfo();
82         SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
83         ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3);
84         connection3.send(connectionInfo3);
85         connection3.send(sessionInfo3);
86         connection3.send(producerInfo3);
87
88         // Send the message using the fail over publisher.
89
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
90
91         // The message will be sent to one of the brokers.
92
FailoverTransport ft = (FailoverTransport) connection3.getTransport().narrow(FailoverTransport.class);
93
94         // See which broker we were connected to.
95
StubConnection connectionA;
96         StubConnection connectionB;
97         TransportConnector serverA;
98         if( connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI() ) ) {
99             connectionA=connection1;
100             connectionB=connection2;
101             serverA = connector;
102         } else {
103             connectionA=connection2;
104             connectionB=connection1;
105             serverA = remoteConnector;
106         }
107         
108         assertNotNull(receiveMessage(connectionA));
109         assertNoMessagesLeft(connectionB);
110         
111         // Dispose the server so that it fails over to the other server.
112
log.info("Disconnecting the active connection");
113         serverA.stop();
114         
115         connection3.request(createMessage(producerInfo3, destination, deliveryMode));
116         
117         assertNotNull(receiveMessage(connectionB));
118         assertNoMessagesLeft(connectionA);
119             
120     }
121     
122     protected String JavaDoc getLocalURI() {
123         return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
124     }
125     
126     protected String JavaDoc getRemoteURI() {
127         return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
128     }
129     
130     protected StubConnection createFailoverConnection() throws Exception JavaDoc {
131         URI JavaDoc failoverURI = new URI JavaDoc("failover://"+connector.getServer().getConnectURI()+","+remoteConnector.getServer().getConnectURI()+"");
132         Transport transport = TransportFactory.connect(failoverURI);
133         StubConnection connection = new StubConnection(transport);
134         connections.add(connection);
135         return connection;
136     }
137
138     
139     public static Test suite() {
140         return suite(FailoverTransportBrokerTest.class);
141     }
142     
143     public static void main(String JavaDoc[] args) {
144         junit.textui.TestRunner.run(suite());
145     }
146
147 }
148
Popular Tags