KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsMultipleBrokersTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import javax.jms.*;
21
22 import org.apache.activemq.network.DiscoveryNetworkConnector;
23 import org.apache.activemq.network.NetworkConnector;
24 import org.apache.activemq.util.MessageIdList;
25 import org.apache.activemq.util.IdGenerator;
26 import org.apache.activemq.command.ActiveMQDestination;
27 import org.apache.activemq.command.ActiveMQTopic;
28 import org.apache.activemq.command.ActiveMQQueue;
29 import org.apache.activemq.xbean.BrokerFactoryBean;
30 import org.apache.activemq.broker.BrokerService;
31 import org.apache.activemq.broker.BrokerFactory;
32 import org.apache.activemq.broker.TransportConnector;
33 import org.apache.activemq.CombinationTestSupport;
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.ConnectionClosedException;
36 import org.springframework.core.io.Resource;
37
38 import java.util.List JavaDoc;
39 import java.util.Map JavaDoc;
40 import java.util.HashMap JavaDoc;
41 import java.util.ArrayList JavaDoc;
42 import java.util.Iterator JavaDoc;
43 import java.util.Collections JavaDoc;
44 import java.util.Arrays JavaDoc;
45 import java.util.Collection JavaDoc;
46 import java.util.concurrent.CountDownLatch JavaDoc;
47 import java.net.URI JavaDoc;
48
49 /**
50  * Test case support that allows the easy management and connection of several brokers.
51  *
52  * @version $Revision$
53  */

54 public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
55     public static final String JavaDoc AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
56     public static int MAX_SETUP_TIME = 5000;
57
58     protected Map JavaDoc brokers;
59     protected Map JavaDoc destinations;
60
61     protected int messageSize = 1;
62
63     protected boolean persistentDelivery = true;
64     protected boolean verbose = false;
65
66     protected NetworkConnector bridgeBrokers(String JavaDoc localBrokerName, String JavaDoc remoteBrokerName) throws Exception JavaDoc {
67        return bridgeBrokers(localBrokerName,remoteBrokerName,false,1);
68     }
69     
70     
71     protected void bridgeBrokers(String JavaDoc localBrokerName, String JavaDoc remoteBrokerName,boolean dynamicOnly) throws Exception JavaDoc {
72         BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker;
73         BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker;
74
75         bridgeBrokers(localBroker, remoteBroker,dynamicOnly,1);
76     }
77     
78     protected NetworkConnector bridgeBrokers(String JavaDoc localBrokerName, String JavaDoc remoteBrokerName,boolean dynamicOnly, int networkTTL) throws Exception JavaDoc {
79         BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker;
80         BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker;
81
82         return bridgeBrokers(localBroker, remoteBroker,dynamicOnly,networkTTL);
83     }
84     
85    
86
87     // Overwrite this method to specify how you want to bridge the two brokers
88
// By default, bridge them using add network connector of the local broker and the first connector of the remote broker
89
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,boolean dynamicOnly, int networkTTL) throws Exception JavaDoc {
90         List transportConnectors = remoteBroker.getTransportConnectors();
91         URI JavaDoc remoteURI;
92         if (!transportConnectors.isEmpty()) {
93             remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
94             NetworkConnector connector=new DiscoveryNetworkConnector(new URI JavaDoc("static:" + remoteURI));
95             connector.setDynamicOnly(dynamicOnly);
96             connector.setNetworkTTL(networkTTL);
97             localBroker.addNetworkConnector(connector);
98             MAX_SETUP_TIME = 2000;
99             return connector;
100         } else {
101             throw new Exception JavaDoc("Remote broker has no registered connectors.");
102         }
103
104     }
105
106     // This will interconnect all brokes using multicast
107
protected void bridgeAllBrokers() throws Exception JavaDoc {
108         bridgeAllBrokers("default");
109     }
110
111     protected void bridgeAllBrokers(String JavaDoc groupName) throws Exception JavaDoc {
112         Collection JavaDoc brokerList = brokers.values();
113         for (Iterator JavaDoc i=brokerList.iterator(); i.hasNext();) {
114             BrokerService broker = ((BrokerItem)i.next()).broker;
115             List transportConnectors = broker.getTransportConnectors();
116
117             if (transportConnectors.isEmpty()) {
118                 broker.addConnector(new URI JavaDoc(AUTO_ASSIGN_TRANSPORT));
119                 transportConnectors = broker.getTransportConnectors();
120             }
121
122             TransportConnector transport = (TransportConnector)transportConnectors.get(0);
123             transport.setDiscoveryUri(new URI JavaDoc("multicast://" + groupName));
124             broker.addNetworkConnector("multicast://" + groupName);
125         }
126
127         // Multicasting may take longer to setup
128
MAX_SETUP_TIME = 8000;
129     }
130
131     protected void startAllBrokers() throws Exception JavaDoc {
132         Collection JavaDoc brokerList = brokers.values();
133         for (Iterator JavaDoc i=brokerList.iterator(); i.hasNext();) {
134             BrokerService broker = ((BrokerItem)i.next()).broker;
135             broker.start();
136         }
137
138         Thread.sleep(MAX_SETUP_TIME);
139     }
140
141     protected BrokerService createBroker(String JavaDoc brokerName) throws Exception JavaDoc {
142         BrokerService broker = new BrokerService();
143         broker.setBrokerName(brokerName);
144         brokers.put(brokerName, new BrokerItem(broker));
145
146         return broker;
147     }
148
149     protected BrokerService createBroker(URI JavaDoc brokerUri) throws Exception JavaDoc {
150         BrokerService broker = BrokerFactory.createBroker(brokerUri);
151         brokers.put(broker.getBrokerName(), new BrokerItem(broker));
152
153         return broker;
154     }
155
156     protected BrokerService createBroker(Resource configFile) throws Exception JavaDoc {
157         BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
158         brokerFactory.afterPropertiesSet();
159
160         BrokerService broker = brokerFactory.getBroker();
161         brokers.put(broker.getBrokerName(), new BrokerItem(broker));
162
163         return broker;
164     }
165
166     protected ConnectionFactory getConnectionFactory(String JavaDoc brokerName) throws Exception JavaDoc {
167         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
168         if (brokerItem != null) {
169             return brokerItem.factory;
170         }
171         return null;
172     }
173
174     protected Connection createConnection(String JavaDoc brokerName) throws Exception JavaDoc {
175         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
176         if (brokerItem != null) {
177             return brokerItem.createConnection();
178         }
179         return null;
180     }
181
182     protected MessageConsumer createConsumer(String JavaDoc brokerName, Destination dest) throws Exception JavaDoc {
183         return createConsumer(brokerName, dest, null);
184     }
185     
186     protected MessageConsumer createConsumer(String JavaDoc brokerName, Destination dest, CountDownLatch JavaDoc latch) throws Exception JavaDoc {
187         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
188         if (brokerItem != null) {
189             return brokerItem.createConsumer(dest, latch);
190         }
191         return null;
192     }
193
194     protected MessageConsumer createDurableSubscriber(String JavaDoc brokerName, Topic dest, String JavaDoc name) throws Exception JavaDoc {
195         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
196         if (brokerItem != null) {
197             return brokerItem.createDurableSubscriber(dest, name);
198         }
199         return null;
200     }
201
202     protected MessageIdList getBrokerMessages(String JavaDoc brokerName) {
203         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
204         if (brokerItem != null) {
205             return brokerItem.getAllMessages();
206         }
207         return null;
208     }
209
210     protected MessageIdList getConsumerMessages(String JavaDoc brokerName, MessageConsumer consumer) {
211         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
212         if (brokerItem != null) {
213             return brokerItem.getConsumerMessages(consumer);
214         }
215         return null;
216     }
217
218     protected void sendMessages(String JavaDoc brokerName, Destination destination, int count) throws Exception JavaDoc {
219         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
220
221         Connection conn = brokerItem.createConnection();
222         conn.start();
223         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
224
225         MessageProducer producer = brokerItem.createProducer(destination, sess);
226         producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
227
228         for (int i = 0; i < count; i++) {
229             TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
230             producer.send(msg);
231         }
232
233         producer.close();
234         sess.close();
235         conn.close();
236         brokerItem.connections.remove(conn);
237     }
238
239     protected TextMessage createTextMessage(Session session, String JavaDoc initText) throws Exception JavaDoc {
240         TextMessage msg = session.createTextMessage();
241
242         // Pad message text
243
if (initText.length() < messageSize) {
244             char[] data = new char[messageSize - initText.length()];
245             Arrays.fill(data, '*');
246             String JavaDoc str = new String JavaDoc(data);
247             msg.setText(initText + str);
248
249         // Do not pad message text
250
} else {
251             msg.setText(initText);
252         }
253
254         return msg;
255     }
256
257     protected ActiveMQDestination createDestination(String JavaDoc name, boolean topic) throws JMSException {
258         Destination dest;
259         if (topic) {
260             dest = new ActiveMQTopic(name);
261             destinations.put(name, dest);
262             return (ActiveMQDestination)dest;
263         } else {
264             dest = new ActiveMQQueue(name);
265             destinations.put(name, dest);
266             return (ActiveMQDestination)dest;
267         }
268     }
269
270     protected void setUp() throws Exception JavaDoc {
271         super.setUp();
272         brokers = new HashMap JavaDoc();
273         destinations = new HashMap JavaDoc();
274     }
275
276     protected void tearDown() throws Exception JavaDoc {
277         destroyAllBrokers();
278         super.tearDown();
279     }
280
281     protected void destroyBroker(String JavaDoc brokerName) throws Exception JavaDoc {
282         BrokerItem brokerItem = (BrokerItem)brokers.remove(brokerName);
283
284         if (brokerItem != null) {
285             brokerItem.destroy();
286         }
287     }
288
289     protected void destroyAllBrokers() throws Exception JavaDoc {
290         for (Iterator JavaDoc i=brokers.values().iterator(); i.hasNext();) {
291             BrokerItem brokerItem = (BrokerItem)i.next();
292             brokerItem.destroy();
293         }
294         brokers.clear();
295     }
296
297
298     // Class to group broker components together
299
public class BrokerItem {
300         public BrokerService broker;
301         public ActiveMQConnectionFactory factory;
302         public List connections;
303         public Map JavaDoc consumers;
304         public MessageIdList allMessages = new MessageIdList();
305
306         private IdGenerator id;
307
308         public boolean persistent = false;
309
310         public BrokerItem(BrokerService broker) throws Exception JavaDoc {
311             this.broker = broker;
312
313             factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
314             consumers = Collections.synchronizedMap(new HashMap JavaDoc());
315             connections = Collections.synchronizedList(new ArrayList JavaDoc());
316             allMessages.setVerbose(verbose);
317             id = new IdGenerator(broker.getBrokerName() + ":");
318         }
319
320         public Connection createConnection() throws Exception JavaDoc {
321             Connection conn = factory.createConnection();
322             conn.setClientID(id.generateId());
323
324             connections.add(conn);
325             return conn;
326         }
327
328         public MessageConsumer createConsumer(Destination dest) throws Exception JavaDoc {
329             return createConsumer(dest, null);
330         }
331         
332         public MessageConsumer createConsumer(Destination dest, CountDownLatch JavaDoc latch) throws Exception JavaDoc {
333             Connection c = createConnection();
334             c.start();
335             Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
336             return createConsumerWithSession(dest, s, latch);
337         }
338
339         public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception JavaDoc {
340             return createConsumerWithSession(dest, sess, null);
341         }
342         public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch JavaDoc latch) throws Exception JavaDoc {
343             MessageConsumer client = sess.createConsumer(dest);
344             MessageIdList messageIdList = new MessageIdList();
345             messageIdList.setCountDownLatch(latch);
346             messageIdList.setParent(allMessages);
347             client.setMessageListener(messageIdList);
348             consumers.put(client, messageIdList);
349             return client;
350         }
351
352         public MessageConsumer createDurableSubscriber(Topic dest, String JavaDoc name) throws Exception JavaDoc {
353             Connection c = createConnection();
354             c.start();
355             Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
356             return createDurableSubscriber(dest, s, name);
357         }
358
359         public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String JavaDoc name) throws Exception JavaDoc {
360             MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name);
361             MessageIdList messageIdList = new MessageIdList();
362             messageIdList.setParent(allMessages);
363             client.setMessageListener(messageIdList);
364             consumers.put(client, messageIdList);
365
366             return client;
367         }
368
369         public MessageIdList getAllMessages() {
370             return allMessages;
371         }
372
373         public MessageIdList getConsumerMessages(MessageConsumer consumer) {
374             return (MessageIdList)consumers.get(consumer);
375         }
376
377         public MessageProducer createProducer(Destination dest) throws Exception JavaDoc {
378             Connection c = createConnection();
379             c.start();
380             Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
381             return createProducer(dest, s);
382         }
383
384         public MessageProducer createProducer(Destination dest, Session sess) throws Exception JavaDoc {
385             MessageProducer client = sess.createProducer(dest);
386             client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
387             return client;
388         }
389
390         public void destroy() throws Exception JavaDoc {
391             while (!connections.isEmpty()) {
392                 Connection c = (Connection)connections.remove(0);
393                 try {
394                     c.close();
395                 } catch (ConnectionClosedException e) {
396                 }
397             }
398
399             broker.stop();
400             consumers.clear();
401
402             broker = null;
403             connections = null;
404             consumers = null;
405             factory = null;
406         }
407     }
408
409 }
410
Popular Tags