KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > fanout > FanoutTransportBrokerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.fanout;
19
20 import java.io.IOException JavaDoc;
21 import java.net.URI JavaDoc;
22
23 import javax.jms.DeliveryMode JavaDoc;
24
25 import junit.framework.Test;
26
27 import org.apache.activemq.broker.StubConnection;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQQueue;
30 import org.apache.activemq.command.ActiveMQTopic;
31 import org.apache.activemq.command.ConnectionInfo;
32 import org.apache.activemq.command.ConsumerInfo;
33 import org.apache.activemq.command.ProducerInfo;
34 import org.apache.activemq.command.SessionInfo;
35 import org.apache.activemq.network.NetworkTestSupport;
36 import org.apache.activemq.transport.Transport;
37 import org.apache.activemq.transport.TransportFactory;
38 import org.apache.activemq.transport.TransportFilter;
39 import org.apache.activemq.transport.mock.MockTransport;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43 import java.util.concurrent.CountDownLatch JavaDoc;
44 import java.util.concurrent.TimeUnit JavaDoc;
45
46 public class FanoutTransportBrokerTest extends NetworkTestSupport {
47
48     private static final Log log = LogFactory.getLog(FanoutTransportBrokerTest.class);
49
50     public ActiveMQDestination destination;
51     public int deliveryMode;
52     
53     private String JavaDoc remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
54
55     public static Test suite() {
56         return suite(FanoutTransportBrokerTest.class);
57     }
58
59     public static void main(String JavaDoc[] args) {
60         junit.textui.TestRunner.run(suite());
61     }
62
63     public void initCombosForTestPublisherFansout() {
64         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
65                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
66         addCombinationValues("destination", new Object JavaDoc[] { new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST"), });
67     }
68
69     public void xtestPublisherFansout() throws Exception JavaDoc {
70
71         // Start a normal consumer on the local broker
72
StubConnection connection1 = createConnection();
73         ConnectionInfo connectionInfo1 = createConnectionInfo();
74         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
75         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
76         connection1.send(connectionInfo1);
77         connection1.send(sessionInfo1);
78         connection1.request(consumerInfo1);
79
80         // Start a normal consumer on a remote broker
81
StubConnection connection2 = createRemoteConnection();
82         ConnectionInfo connectionInfo2 = createConnectionInfo();
83         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
84         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
85         connection2.send(connectionInfo2);
86         connection2.send(sessionInfo2);
87         connection2.request(consumerInfo2);
88
89         // Start a fanout publisher.
90
log.info("Starting the fanout connection.");
91         StubConnection connection3 = createFanoutConnection();
92         ConnectionInfo connectionInfo3 = createConnectionInfo();
93         SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
94         ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3);
95         connection3.send(connectionInfo3);
96         connection3.send(sessionInfo3);
97         connection3.send(producerInfo3);
98
99         // Send the message using the fail over publisher.
100
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
101
102         assertNotNull(receiveMessage(connection1));
103         assertNoMessagesLeft(connection1);
104
105         assertNotNull(receiveMessage(connection2));
106         assertNoMessagesLeft(connection2);
107
108     }
109
110     
111     public void initCombosForTestPublisherWaitsForServerToBeUp() {
112         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
113                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
114         addCombinationValues("destination", new Object JavaDoc[] { new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST"), });
115     }
116     public void testPublisherWaitsForServerToBeUp() throws Exception JavaDoc {
117
118         // Start a normal consumer on the local broker
119
StubConnection connection1 = createConnection();
120         ConnectionInfo connectionInfo1 = createConnectionInfo();
121         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
122         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
123         connection1.send(connectionInfo1);
124         connection1.send(sessionInfo1);
125         connection1.request(consumerInfo1);
126
127         // Start a normal consumer on a remote broker
128
StubConnection connection2 = createRemoteConnection();
129         ConnectionInfo connectionInfo2 = createConnectionInfo();
130         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
131         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
132         connection2.send(connectionInfo2);
133         connection2.send(sessionInfo2);
134         connection2.request(consumerInfo2);
135
136         // Start a fanout publisher.
137
log.info("Starting the fanout connection.");
138         final StubConnection connection3 = createFanoutConnection();
139         ConnectionInfo connectionInfo3 = createConnectionInfo();
140         SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
141         final ProducerInfo producerInfo3 = createProducerInfo(sessionInfo3);
142         connection3.send(connectionInfo3);
143         connection3.send(sessionInfo3);
144         connection3.send(producerInfo3);
145
146         // Send the message using the fail over publisher.
147
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
148
149         assertNotNull(receiveMessage(connection1));
150         assertNoMessagesLeft(connection1);
151
152         assertNotNull(receiveMessage(connection2));
153         assertNoMessagesLeft(connection2);
154         
155         final CountDownLatch JavaDoc publishDone = new CountDownLatch JavaDoc(1);
156         
157         // The MockTransport is on the remote connection.
158
// Slip in a new transport filter after the MockTransport
159
MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
160         mt.install(new TransportFilter(mt.getNext()) {
161             public void oneway(Object JavaDoc command) throws IOException JavaDoc {
162                 log.info("Dropping: "+command);
163                 // just eat it! to simulate a recent failure.
164
}
165         });
166                 
167         // Send a message (async) as this will block
168
new Thread JavaDoc() {
169             public void run() {
170                 // Send the message using the fail over publisher.
171
try {
172                     connection3.request(createMessage(producerInfo3, destination, deliveryMode));
173                 } catch (Throwable JavaDoc e) {
174                     e.printStackTrace();
175                 }
176                 publishDone.countDown();
177             }
178         }.start();
179         
180         // Assert that we block:
181
assertFalse( publishDone.await(3, TimeUnit.SECONDS) );
182         
183         // Restart the remote server. State should be re-played and the publish should continue.
184
remoteURI = remoteConnector.getServer().getConnectURI().toString();
185         restartRemoteBroker();
186
187         // This should reconnect, and resend
188
assertTrue( publishDone.await(10, TimeUnit.SECONDS) );
189
190     }
191
192     protected String JavaDoc getLocalURI() {
193         return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
194     }
195
196     protected String JavaDoc getRemoteURI() {
197         return remoteURI;
198     }
199
200     protected StubConnection createFanoutConnection() throws Exception JavaDoc {
201         URI JavaDoc fanoutURI = new URI JavaDoc("fanout://static://(" + connector.getServer().getConnectURI() + ","
202                 + "mock://"+remoteConnector.getServer().getConnectURI() + ")");
203         Transport transport = TransportFactory.connect(fanoutURI);
204         StubConnection connection = new StubConnection(transport);
205         connections.add(connection);
206         return connection;
207     }
208
209 }
210
Popular Tags