KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > AMQDeadlockTestW4Brokers


1 /**
2  *
3  * Copyright 2005-2006 The Apache Software Foundation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.activemq.usecases;
19
20 import java.net.URI JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.Random JavaDoc;
23 import java.util.concurrent.CountDownLatch JavaDoc;
24 import java.util.concurrent.ExecutorService JavaDoc;
25 import java.util.concurrent.Executors JavaDoc;
26 import java.util.concurrent.TimeUnit JavaDoc;
27 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
28
29 import javax.jms.BytesMessage JavaDoc;
30 import javax.jms.ConnectionFactory JavaDoc;
31 import javax.jms.DeliveryMode JavaDoc;
32 import javax.jms.JMSException JavaDoc;
33 import javax.jms.Message JavaDoc;
34 import javax.jms.MessageListener JavaDoc;
35 import javax.jms.Session JavaDoc;
36
37 import junit.framework.Assert;
38 import junit.framework.TestCase;
39
40 import org.apache.activemq.ActiveMQConnectionFactory;
41 import org.apache.activemq.broker.BrokerService;
42 import org.apache.activemq.broker.TransportConnector;
43 import org.apache.activemq.broker.region.policy.PolicyEntry;
44 import org.apache.activemq.broker.region.policy.PolicyMap;
45 import org.apache.activemq.memory.UsageManager;
46 import org.apache.activemq.network.DiscoveryNetworkConnector;
47 import org.apache.activemq.network.NetworkConnector;
48 import org.apache.activemq.pool.PooledConnectionFactory;
49 import org.springframework.jms.core.JmsTemplate;
50 import org.springframework.jms.core.MessageCreator;
51 import org.springframework.jms.listener.DefaultMessageListenerContainer;
52
53 public class AMQDeadlockTestW4Brokers extends TestCase {
54
55     private static final String JavaDoc BROKER_URL1 = "tcp://localhost:61616";
56
57     private static final String JavaDoc BROKER_URL2 = "tcp://localhost:61617";
58
59     private static final String JavaDoc BROKER_URL3 = "tcp://localhost:61618";
60
61     private static final String JavaDoc BROKER_URL4 = "tcp://localhost:61619";
62
63     private static final String JavaDoc URL1 = "tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
64
65     private static final String JavaDoc URL2 = "tcp://localhost:61617?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
66
67     private static final String JavaDoc URL3 = "tcp://localhost:61618?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
68
69     private static final String JavaDoc URL4 = "tcp://localhost:61619?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
70
71     private static final String JavaDoc QUEUE1_NAME = "test.queue.1";
72
73     private static final int MAX_CONSUMERS = 5;
74
75     private static final int NUM_MESSAGE_TO_SEND = 10000;
76     private static final CountDownLatch JavaDoc latch = new CountDownLatch JavaDoc(MAX_CONSUMERS*NUM_MESSAGE_TO_SEND);
77
78     @Override JavaDoc
79     public void setUp() throws Exception JavaDoc {
80
81     }
82
83     @Override JavaDoc
84     public void tearDown() throws Exception JavaDoc {
85
86     }
87
88     public void test4BrokerWithOutLingo() throws Exception JavaDoc {
89
90         BrokerService brokerService1 = null;
91         BrokerService brokerService2 = null;
92         BrokerService brokerService3 = null;
93         BrokerService brokerService4 = null;
94         ActiveMQConnectionFactory acf1 = null;
95         ActiveMQConnectionFactory acf2 = null;
96         PooledConnectionFactory pcf1 = null;
97         PooledConnectionFactory pcf2 = null;
98         ActiveMQConnectionFactory acf3 = null;
99         ActiveMQConnectionFactory acf4 = null;
100         PooledConnectionFactory pcf3 = null;
101         PooledConnectionFactory pcf4 = null;
102         DefaultMessageListenerContainer container1 = null;
103
104         try {
105
106             //Test with and without queue limits.
107
brokerService1 = createBrokerService("broker1", BROKER_URL1,
108                     BROKER_URL2, BROKER_URL3, BROKER_URL4, 0 /* 10000000 */);
109             brokerService1.start();
110             brokerService2 = createBrokerService("broker2", BROKER_URL2,
111                     BROKER_URL1, BROKER_URL3, BROKER_URL4, 0/* 40000000 */);
112             brokerService2.start();
113             brokerService3 = createBrokerService("broker3", BROKER_URL3,
114                     BROKER_URL2, BROKER_URL1, BROKER_URL4, 0/* 10000000 */);
115             brokerService3.start();
116             brokerService4 = createBrokerService("broker4", BROKER_URL4,
117                     BROKER_URL1, BROKER_URL3, BROKER_URL2, 0/* 10000000 */);
118             brokerService4.start();
119
120             final String JavaDoc failover1 = "failover:("
121                     + URL1
122                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
123             final String JavaDoc failover2 = "failover:("
124                     + URL2
125                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
126
127             final String JavaDoc failover3 = "failover:("
128                     + URL3
129                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
130
131             final String JavaDoc failover4 = "failover:("
132                     + URL4
133                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
134
135             acf1 = createConnectionFactory(failover1);
136             acf2 = createConnectionFactory(failover2);
137             acf3 = createConnectionFactory(failover3);
138             acf4 = createConnectionFactory(failover4);
139
140             pcf1 = new PooledConnectionFactory(acf1);
141             pcf2 = new PooledConnectionFactory(acf2);
142             pcf3 = new PooledConnectionFactory(acf3);
143             pcf4 = new PooledConnectionFactory(acf4);
144
145
146             container1 = createDefaultMessageListenerContainer(acf2,
147                     new TestMessageListener1(0), QUEUE1_NAME);
148             container1.afterPropertiesSet();
149
150             final PooledProducerTask[] task = new PooledProducerTask[4];
151             task[0] = new PooledProducerTask(pcf1, QUEUE1_NAME, "producer1");
152             task[1] = new PooledProducerTask(pcf2, QUEUE1_NAME, "producer2");
153             task[2] = new PooledProducerTask(pcf3, QUEUE1_NAME, "producer3");
154             task[3] = new PooledProducerTask(pcf4, QUEUE1_NAME, "producer4");
155
156             final ExecutorService JavaDoc executor = Executors.newCachedThreadPool();
157
158             for (int i = 0; i < 4; i++) {
159                 executor.submit(task[i]);
160             }
161
162             latch.await(15,TimeUnit.SECONDS);
163             assertTrue(latch.getCount()==MAX_CONSUMERS*NUM_MESSAGE_TO_SEND);
164
165         } catch (Exception JavaDoc e) {
166             e.printStackTrace();
167         } finally {
168
169             container1.stop();
170             container1.destroy();
171             container1 = null;
172
173             brokerService1.stop();
174             brokerService1 = null;
175             brokerService2.stop();
176             brokerService2 = null;
177             brokerService3.stop();
178             brokerService3 = null;
179             brokerService4.stop();
180             brokerService4 = null;
181         }
182
183     }
184
185     private BrokerService createBrokerService(final String JavaDoc brokerName,
186             final String JavaDoc uri1, final String JavaDoc uri2, final String JavaDoc uri3,
187             final String JavaDoc uri4, final int queueLimit) throws Exception JavaDoc {
188         final BrokerService brokerService = new BrokerService();
189
190         brokerService.setBrokerName(brokerName);
191         brokerService.setPersistent(false);
192         brokerService.setUseJmx(true);
193
194         final UsageManager memoryManager = new UsageManager();
195         memoryManager.setLimit(100000000);
196         brokerService.setMemoryManager(memoryManager);
197
198         final ArrayList JavaDoc<PolicyEntry> policyEntries = new ArrayList JavaDoc<PolicyEntry>();
199
200         final PolicyEntry entry = new PolicyEntry();
201         entry.setQueue(">");
202         entry.setMemoryLimit(queueLimit);
203         policyEntries.add(entry);
204
205         final PolicyMap policyMap = new PolicyMap();
206         policyMap.setPolicyEntries(policyEntries);
207         brokerService.setDestinationPolicy(policyMap);
208
209         final TransportConnector tConnector = new TransportConnector();
210         tConnector.setUri(new URI JavaDoc(uri1));
211         tConnector.setBrokerName(brokerName);
212         tConnector.setName(brokerName + ".transportConnector");
213         brokerService.addConnector(tConnector);
214
215         if (uri2 != null) {
216             final NetworkConnector nc = new DiscoveryNetworkConnector(new URI JavaDoc(
217                     "static:" + uri2 + "," + uri3 + "," + uri4));
218             nc.setBridgeTempDestinations(true);
219             nc.setBrokerName(brokerName);
220                         
221             // When using queue limits set this to 1
222
nc.setPrefetchSize(1000);
223             nc.setNetworkTTL(1);
224             brokerService.addNetworkConnector(nc);
225         }
226
227         return brokerService;
228
229     }
230
231     public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
232             final ConnectionFactory JavaDoc acf, final MessageListener JavaDoc listener,
233             final String JavaDoc queue) {
234         final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
235         container.setConnectionFactory(acf);
236         container.setDestinationName(queue);
237         container.setMessageListener(listener);
238         container.setSessionTransacted(false);
239         container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
240         container.setConcurrentConsumers(MAX_CONSUMERS);
241         return container;
242     }
243
244     public ActiveMQConnectionFactory createConnectionFactory(final String JavaDoc url) {
245         final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
246         acf.setCopyMessageOnSend(false);
247         acf.setUseAsyncSend(false);
248         acf.setDispatchAsync(true);
249         acf.setUseCompression(false);
250         acf.setOptimizeAcknowledge(false);
251         acf.setOptimizedMessageDispatch(true);
252         acf.setUseAsyncSend(false);
253         
254         return acf;
255     }
256
257     private class TestMessageListener1 implements MessageListener JavaDoc {
258
259         private final long waitTime;
260
261         final AtomicInteger JavaDoc count = new AtomicInteger JavaDoc(0);
262         public TestMessageListener1(long waitTime) {
263             this.waitTime = waitTime;
264
265         }
266
267         public void onMessage(Message JavaDoc msg) {
268
269             try {
270                 /*System.out.println("Listener1 Consumed message "
271                         + msg.getIntProperty("count") + " from "
272                         + msg.getStringProperty("producerName"));*/

273                 int value = count.incrementAndGet();
274                 if (value%1000==0){
275                 System.out.println("Consumed message: " + value);
276                 }
277              
278                 Thread.sleep(waitTime);
279                 latch.countDown();
280             /*} catch (JMSException e) {
281                 e.printStackTrace();*/

282             } catch (InterruptedException JavaDoc e) {
283                 e.printStackTrace();
284             }
285
286         }
287     }
288
289     private class PooledProducerTask implements Runnable JavaDoc {
290
291         private final String JavaDoc queueName;
292
293         private final PooledConnectionFactory pcf;
294
295         private final String JavaDoc producerName;
296
297         public PooledProducerTask(final PooledConnectionFactory pcf,
298                 final String JavaDoc queueName, final String JavaDoc producerName) {
299             this.pcf = pcf;
300             this.queueName = queueName;
301             this.producerName = producerName;
302         }
303
304         public void run() {
305
306             try {
307
308                 final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
309                 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
310                 jmsTemplate.setExplicitQosEnabled(true);
311                 jmsTemplate.setMessageIdEnabled(false);
312                 jmsTemplate.setMessageTimestampEnabled(false);
313                 jmsTemplate.afterPropertiesSet();
314
315                 final byte[] bytes = new byte[2048];
316                 final Random JavaDoc r = new Random JavaDoc();
317                 r.nextBytes(bytes);
318
319                 for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
320                     final int count = i;
321                     jmsTemplate.send(queueName, new MessageCreator() {
322
323                         public Message JavaDoc createMessage(Session JavaDoc session)
324                                 throws JMSException JavaDoc {
325
326                             final BytesMessage JavaDoc message = session
327                                     .createBytesMessage();
328
329                             message.writeBytes(bytes);
330                             message.setIntProperty("count", count);
331                             message.setStringProperty("producerName",
332                                     producerName);
333                             return message;
334                         }
335                     });
336
337                 // System.out.println("PooledProducer " + producerName + " sent message: " + count);
338

339                     // Thread.sleep(1000);
340
}
341
342             } catch (final Throwable JavaDoc e) {
343                 System.err.println("Producer 1 is exiting.");
344                 e.printStackTrace();
345             }
346         }
347     }
348
349 }
350
Popular Tags