KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > TwoBrokerMulticastQueueTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import org.apache.activemq.ActiveMQConnectionFactory;
21 import org.apache.activemq.CombinationTestSupport;
22 import org.apache.activemq.util.MessageIdList;
23 import org.apache.activemq.command.ActiveMQQueue;
24 import org.apache.activemq.broker.BrokerService;
25 import org.apache.activemq.xbean.XBeanBrokerFactory;
26
27 import java.net.URI JavaDoc;
28 import java.util.Arrays JavaDoc;
29
30 import junit.framework.Test;
31
32 import javax.jms.Destination JavaDoc;
33 import javax.jms.ConnectionFactory JavaDoc;
34 import javax.jms.Connection JavaDoc;
35 import javax.jms.Session JavaDoc;
36 import javax.jms.JMSException JavaDoc;
37 import javax.jms.MessageConsumer JavaDoc;
38 import javax.jms.MessageProducer JavaDoc;
39 import javax.jms.TextMessage JavaDoc;
40
41 public class TwoBrokerMulticastQueueTest extends CombinationTestSupport {
42
43     public static Test suite() {
44         return suite(TwoBrokerMulticastQueueTest.class);
45     }
46
47     public static void main(String JavaDoc[] args) {
48         junit.textui.TestRunner.run(suite());
49     }
50
51     public static final int MESSAGE_COUNT = 100;
52     public static final int BROKER_COUNT = 2;
53     public static final int CONSUMER_COUNT = 20;
54
55     private BrokerService[] brokers;
56     public String JavaDoc sendUri, recvUri;
57
58     public void setUp() throws Exception JavaDoc {
59         super.setAutoFail(true);
60         super.setUp();
61     }
62
63     public void tearDown() throws Exception JavaDoc {
64         if (brokers != null) {
65             for (int i=0; i<BROKER_COUNT; i++) {
66                 if (brokers[i] != null) {
67                     brokers[i].stop();
68                 }
69             }
70             super.tearDown();
71         }
72     }
73
74     private void doSendReceiveTest() throws Exception JavaDoc {
75         Destination JavaDoc dest = new ActiveMQQueue("TEST.FOO");
76
77         ConnectionFactory sendFactory = createConnectionFactory(sendUri);
78
79         Connection JavaDoc conn = createConnection(sendFactory);
80         sendMessages(conn, dest, MESSAGE_COUNT);
81
82         Thread.sleep(500);
83
84         ConnectionFactory recvFactory = createConnectionFactory(recvUri);
85         assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0));
86     }
87
88     private void doMultipleConsumersConnectTest() throws Exception JavaDoc {
89         Destination JavaDoc dest = new ActiveMQQueue("TEST.FOO");
90
91         ConnectionFactory sendFactory = createConnectionFactory(sendUri);
92
93         Connection JavaDoc conn = createConnection(sendFactory);
94         sendMessages(conn, dest, MESSAGE_COUNT);
95
96         Thread.sleep(500);
97
98         ConnectionFactory recvFactory = createConnectionFactory(recvUri);
99         assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0));
100
101         for (int i=0; i<(CONSUMER_COUNT-1); i++) {
102             assertEquals(0, receiveMessages(createConnection(recvFactory), dest, 200));
103         }
104     }
105
106     public void initCombosForTestSendReceive() {
107         addCombinationValues("sendUri", new Object JavaDoc[] {
108             "tcp://localhost:61616", "tcp://localhost:61617"
109         });
110         addCombinationValues("recvUri", new Object JavaDoc[] {
111             "tcp://localhost:61616", "tcp://localhost:61617"
112         });
113     }
114
115     public void testSendReceive() throws Exception JavaDoc {
116         createMulticastBrokerNetwork();
117         doSendReceiveTest();
118     }
119
120     public void initCombosForTestMultipleConsumersConnect() {
121         addCombinationValues("sendUri", new Object JavaDoc[] {
122             "tcp://localhost:61616", "tcp://localhost:61617",
123         });
124         addCombinationValues("recvUri", new Object JavaDoc[] {
125             "tcp://localhost:61616", "tcp://localhost:61617"
126         });
127     }
128
129     public void testMultipleConsumersConnect() throws Exception JavaDoc {
130         createMulticastBrokerNetwork();
131         doMultipleConsumersConnectTest();
132     }
133
134     public void testSendReceiveUsingFailover() throws Exception JavaDoc {
135         sendUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
136         recvUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
137         createMulticastBrokerNetwork();
138         doSendReceiveTest();
139     }
140
141     public void testMultipleConsumersConnectUsingFailover() throws Exception JavaDoc {
142         sendUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
143         recvUri = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
144         createMulticastBrokerNetwork();
145         doMultipleConsumersConnectTest();
146     }
147
148     public void testSendReceiveUsingDiscovery() throws Exception JavaDoc {
149         sendUri = "discovery:multicast://default";
150         recvUri = "discovery:multicast://default";
151         createMulticastBrokerNetwork();
152         doSendReceiveTest();
153     }
154
155     public void testMultipleConsumersConnectUsingDiscovery() throws Exception JavaDoc {
156         sendUri = "discovery:multicast://default";
157         recvUri = "discovery:multicast://default";
158         createMulticastBrokerNetwork();
159         doMultipleConsumersConnectTest();
160     }
161
162     public void testSendReceiveUsingAutoAssignFailover() throws Exception JavaDoc {
163         sendUri = "failover:(discovery:multicast://default)";
164         recvUri = "failover:(discovery:multicast://default)";
165         createAutoAssignMulticastBrokerNetwork();
166         doSendReceiveTest();
167     }
168
169     public void testMultipleConsumersConnectUsingAutoAssignFailover() throws Exception JavaDoc {
170         sendUri = "failover:(discovery:multicast://default)";
171         recvUri = "failover:(discovery:multicast://default)";
172         createAutoAssignMulticastBrokerNetwork();
173         doMultipleConsumersConnectTest();
174     }
175
176     public void testSendReceiveUsingAutoAssignDiscovery() throws Exception JavaDoc {
177         sendUri = "discovery:multicast://default";
178         recvUri = "discovery:multicast://default";
179         createAutoAssignMulticastBrokerNetwork();
180         doSendReceiveTest();
181     }
182
183     public void testMultipleConsumersConnectUsingAutoAssignDiscovery() throws Exception JavaDoc {
184         sendUri = "discovery:multicast://default";
185         recvUri = "discovery:multicast://default";
186         createAutoAssignMulticastBrokerNetwork();
187         doMultipleConsumersConnectTest();
188     }
189
190     protected void createMulticastBrokerNetwork() throws Exception JavaDoc {
191
192         brokers = new BrokerService[BROKER_COUNT];
193         for (int i=0; i<BROKER_COUNT; i++) {
194             brokers[i] = createBroker("org/apache/activemq/usecases/multicast-broker-" + (i+1)+ ".xml");
195             brokers[i].start();
196         }
197
198         // Let the brokers discover each other first
199
Thread.sleep(1000);
200     }
201
202     protected void createAutoAssignMulticastBrokerNetwork() throws Exception JavaDoc {
203         brokers = new BrokerService[BROKER_COUNT];
204         for (int i=0; i<BROKER_COUNT; i++) {
205             brokers[i] = createBroker("org/apache/activemq/usecases/multicast-broker-auto.xml");
206             brokers[i].start();
207         }
208
209         // Let the brokers discover each other first
210
Thread.sleep(1000);
211     }
212
213     protected BrokerService createBroker(String JavaDoc uri) throws Exception JavaDoc {
214         return (new XBeanBrokerFactory()).createBroker(new URI JavaDoc(uri));
215     }
216
217     protected ConnectionFactory createConnectionFactory(String JavaDoc uri) {
218         return new ActiveMQConnectionFactory(uri);
219     }
220
221     protected Connection JavaDoc createConnection(ConnectionFactory factory) throws JMSException JavaDoc {
222         Connection JavaDoc conn = factory.createConnection();
223         return conn;
224     }
225
226     protected int receiveMessages(Connection JavaDoc conn, Destination JavaDoc dest, int waitTime) throws JMSException JavaDoc, InterruptedException JavaDoc {
227         conn.start();
228         MessageIdList list = new MessageIdList();
229         Session JavaDoc sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
230         MessageConsumer JavaDoc consumer = sess.createConsumer(dest);
231         consumer.setMessageListener(list);
232
233         if (waitTime > 0) {
234             Thread.sleep(waitTime);
235         } else {
236             list.waitForMessagesToArrive(MESSAGE_COUNT);
237         }
238
239         conn.close();
240
241         return list.getMessageCount();
242     }
243
244     protected void sendMessages(Connection JavaDoc conn, Destination JavaDoc dest, int count) throws JMSException JavaDoc {
245         conn.start();
246         Session JavaDoc sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
247         MessageProducer JavaDoc prod = sess.createProducer(dest);
248
249         for (int i=0; i<count; i++) {
250             prod.send(createTextMessage(sess, "Message " + i, 1024));
251         }
252
253         conn.close();
254     }
255
256     protected TextMessage JavaDoc createTextMessage(Session JavaDoc session, String JavaDoc initText, int messageSize) throws JMSException JavaDoc {
257         TextMessage JavaDoc msg = session.createTextMessage();
258
259         // Pad message text
260
if (initText.length() < messageSize) {
261             char[] data = new char[messageSize - initText.length()];
262             Arrays.fill(data, '*');
263             String JavaDoc str = new String JavaDoc(data);
264             msg.setText(initText + str);
265
266         // Do not pad message text
267
} else {
268             msg.setText(initText);
269         }
270
271         return msg;
272     }
273
274 }
275
Popular Tags