KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > AMQDeadlockTest3


1 package org.apache.activemq;
2
3 import java.net.URI JavaDoc;
4 import java.util.ArrayList JavaDoc;
5 import java.util.Random JavaDoc;
6 import java.util.concurrent.CountDownLatch JavaDoc;
7 import java.util.concurrent.ExecutorService JavaDoc;
8 import java.util.concurrent.Executors JavaDoc;
9 import java.util.concurrent.TimeUnit JavaDoc;
10 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
11
12 import javax.jms.BytesMessage JavaDoc;
13 import javax.jms.ConnectionFactory JavaDoc;
14 import javax.jms.DeliveryMode JavaDoc;
15 import javax.jms.JMSException JavaDoc;
16 import javax.jms.Message JavaDoc;
17 import javax.jms.MessageListener JavaDoc;
18 import javax.jms.Session JavaDoc;
19
20 import junit.framework.Assert;
21 import junit.framework.TestCase;
22
23 import org.apache.activemq.broker.BrokerService;
24 import org.apache.activemq.broker.TransportConnector;
25 import org.apache.activemq.broker.region.policy.PolicyEntry;
26 import org.apache.activemq.broker.region.policy.PolicyMap;
27 import org.apache.activemq.memory.UsageManager;
28 import org.apache.activemq.network.DiscoveryNetworkConnector;
29 import org.apache.activemq.network.NetworkConnector;
30 import org.apache.activemq.pool.PooledConnectionFactory;
31 import org.springframework.jms.core.JmsTemplate;
32 import org.springframework.jms.core.MessageCreator;
33 import org.springframework.jms.listener.DefaultMessageListenerContainer;
34
35
36 public class AMQDeadlockTest3 extends TestCase {
37
38     private static final String JavaDoc URL1 = "tcp://localhost:61616";
39
40     private static final String JavaDoc URL2 = "tcp://localhost:61617";
41
42     private static final String JavaDoc QUEUE1_NAME = "test.queue.1";
43
44     private static final String JavaDoc QUEUE2_NAME = "test.queue.2";
45
46     private static final int MAX_CONSUMERS = 1;
47
48     private static final int MAX_PRODUCERS = 1;
49
50     private static final int NUM_MESSAGE_TO_SEND = 10;
51
52     private AtomicInteger JavaDoc messageCount = new AtomicInteger JavaDoc();
53     private CountDownLatch JavaDoc doneLatch;
54
55     public void setUp() throws Exception JavaDoc {
56     }
57
58     public void tearDown() throws Exception JavaDoc {
59     }
60
61     // This should fail with incubator-activemq-fuse-4.1.0.5
62
public void testQueueLimitsWithOneBrokerSameConnection() throws Exception JavaDoc {
63
64         BrokerService brokerService1 = null;
65         ActiveMQConnectionFactory acf = null;
66         PooledConnectionFactory pcf = null;
67         DefaultMessageListenerContainer container1 = null;
68
69         try {
70             brokerService1 = createBrokerService("broker1", URL1, null);
71             brokerService1.start();
72
73             acf = createConnectionFactory(URL1);
74             pcf = new PooledConnectionFactory(acf);
75
76             // Only listen on the first queue.. let the 2nd queue fill up.
77
doneLatch = new CountDownLatch JavaDoc(NUM_MESSAGE_TO_SEND);
78             container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
79             container1.afterPropertiesSet();
80
81             Thread.sleep(2000);
82
83             final ExecutorService JavaDoc executor = Executors.newCachedThreadPool();
84             for (int i = 0; i < MAX_PRODUCERS; i++) {
85                 executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
86                 Thread.sleep(1000);
87                 executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
88             }
89             
90             // Wait for all message to arrive.
91
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
92             executor.shutdownNow();
93
94             Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
95
96         } finally {
97
98             container1.stop();
99             container1.destroy();
100             container1 = null;
101             brokerService1.stop();
102             brokerService1 = null;
103
104         }
105
106     }
107     
108
109
110     
111     // This should fail with incubator-activemq-fuse-4.1.0.5
112
public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
113             throws Exception JavaDoc {
114
115         BrokerService brokerService1 = null;
116         BrokerService brokerService2 = null;
117         ActiveMQConnectionFactory acf1 = null;
118         ActiveMQConnectionFactory acf2 = null;
119         PooledConnectionFactory pcf = null;
120         DefaultMessageListenerContainer container1 = null;
121
122         try {
123             brokerService1 = createBrokerService("broker1", URL1, URL2);
124             brokerService1.start();
125             brokerService2 = createBrokerService("broker2", URL2, URL1);
126             brokerService2.start();
127
128             acf1 = createConnectionFactory(URL1);
129             acf2 = createConnectionFactory(URL2);
130
131             pcf = new PooledConnectionFactory(acf1);
132
133             Thread.sleep(1000);
134
135             doneLatch = new CountDownLatch JavaDoc(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
136             container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
137             container1.afterPropertiesSet();
138
139             final ExecutorService JavaDoc executor = Executors.newCachedThreadPool();
140             for (int i = 0; i < MAX_PRODUCERS; i++) {
141                 executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
142                 Thread.sleep(1000);
143                 executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
144             }
145
146             assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
147             executor.shutdownNow();
148
149             Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
150                     messageCount.get());
151         } finally {
152
153             container1.stop();
154             container1.destroy();
155             container1 = null;
156
157             brokerService1.stop();
158             brokerService1 = null;
159             brokerService2.stop();
160             brokerService2 = null;
161         }
162     }
163     
164     
165     // This should fail with incubator-activemq-fuse-4.1.0.5
166
public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
167             throws Exception JavaDoc {
168
169         BrokerService brokerService1 = null;
170         BrokerService brokerService2 = null;
171         ActiveMQConnectionFactory acf1 = null;
172         ActiveMQConnectionFactory acf2 = null;
173         DefaultMessageListenerContainer container1 = null;
174         DefaultMessageListenerContainer container2 = null;
175         
176         try {
177             brokerService1 = createBrokerService("broker1", URL1, URL2);
178             brokerService1.start();
179             brokerService2 = createBrokerService("broker2", URL2, URL1);
180             brokerService2.start();
181
182             acf1 = createConnectionFactory(URL1);
183             acf2 = createConnectionFactory(URL2);
184
185             Thread.sleep(1000);
186
187             doneLatch = new CountDownLatch JavaDoc(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
188
189             container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
190             container1.afterPropertiesSet();
191             container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
192             container2.afterPropertiesSet();
193
194             final ExecutorService JavaDoc executor = Executors.newCachedThreadPool();
195             for (int i = 0; i < MAX_PRODUCERS; i++) {
196                 executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
197                 Thread.sleep(1000);
198                 executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
199             }
200
201             assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
202             executor.shutdownNow();
203
204             Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
205         } finally {
206
207             container1.stop();
208             container1.destroy();
209             container1 = null;
210             
211             container2.stop();
212             container2.destroy();
213             container2 = null;
214
215             brokerService1.stop();
216             brokerService1 = null;
217             brokerService2.stop();
218             brokerService2 = null;
219         }
220     }
221
222
223
224
225     private BrokerService createBrokerService(final String JavaDoc brokerName,
226             final String JavaDoc uri1, final String JavaDoc uri2) throws Exception JavaDoc {
227         final BrokerService brokerService = new BrokerService();
228
229         brokerService.setBrokerName(brokerName);
230         brokerService.setPersistent(false);
231         brokerService.setUseJmx(true);
232
233         final UsageManager memoryManager = new UsageManager();
234         memoryManager.setLimit(5000000);
235         brokerService.setMemoryManager(memoryManager);
236
237         final ArrayList JavaDoc policyEntries = new ArrayList JavaDoc();
238
239         final PolicyEntry entry = new PolicyEntry();
240         entry.setQueue(">");
241         // entry.setQueue(QUEUE1_NAME);
242
entry.setMemoryLimit(1000);
243         policyEntries.add(entry);
244
245         final PolicyMap policyMap = new PolicyMap();
246         policyMap.setPolicyEntries(policyEntries);
247         brokerService.setDestinationPolicy(policyMap);
248
249         final TransportConnector tConnector = new TransportConnector();
250         tConnector.setUri(new URI JavaDoc(uri1));
251         tConnector.setBrokerName(brokerName);
252         tConnector.setName(brokerName + ".transportConnector");
253         brokerService.addConnector(tConnector);
254
255         if (uri2 != null) {
256             final NetworkConnector nc = new DiscoveryNetworkConnector(new URI JavaDoc("static:" + uri2));
257             nc.setBridgeTempDestinations(true);
258             nc.setBrokerName(brokerName);
259             brokerService.addNetworkConnector(nc);
260         }
261
262         return brokerService;
263
264     }
265
266     public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
267             final ConnectionFactory JavaDoc acf, final MessageListener JavaDoc listener,
268             final String JavaDoc queue) {
269         final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
270         container.setConnectionFactory(acf);
271         container.setDestinationName(queue);
272         container.setMessageListener(listener);
273         container.setSessionTransacted(false);
274         container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
275         container.setConcurrentConsumers(MAX_CONSUMERS);
276         return container;
277     }
278
279     public ActiveMQConnectionFactory createConnectionFactory(final String JavaDoc url) {
280         final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
281         acf.setCopyMessageOnSend(false);
282         acf.setUseAsyncSend(false);
283         acf.setDispatchAsync(true);
284         acf.setUseCompression(false);
285         acf.setOptimizeAcknowledge(false);
286         acf.setOptimizedMessageDispatch(true);
287         acf.setAlwaysSyncSend(true);
288         return acf;
289     }
290
291     private class TestMessageListener1 implements MessageListener JavaDoc {
292
293         private final long waitTime;
294
295         public TestMessageListener1(long waitTime) {
296             this.waitTime = waitTime;
297         
298         }
299
300         public void onMessage(Message msg) {
301
302             try {
303                 System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
304
305                 messageCount.incrementAndGet();
306                 doneLatch.countDown();
307                 
308                 Thread.sleep(waitTime);
309             } catch (JMSException JavaDoc e) {
310                 // TODO Auto-generated catch block
311
e.printStackTrace();
312             } catch (InterruptedException JavaDoc e) {
313                 // TODO Auto-generated catch block
314
e.printStackTrace();
315             }
316
317         }
318     }
319
320
321     private class PooledProducerTask implements Runnable JavaDoc {
322
323         private final String JavaDoc queueName;
324
325         private final PooledConnectionFactory pcf;
326
327         public PooledProducerTask(final PooledConnectionFactory pcf,
328                 final String JavaDoc queueName) {
329             this.pcf = pcf;
330             this.queueName = queueName;
331         }
332
333         public void run() {
334
335             try {
336
337                 final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
338                 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
339                 jmsTemplate.setExplicitQosEnabled(true);
340                 jmsTemplate.setMessageIdEnabled(false);
341                 jmsTemplate.setMessageTimestampEnabled(false);
342                 jmsTemplate.afterPropertiesSet();
343
344                 final byte[] bytes = new byte[2048];
345                 final Random JavaDoc r = new Random JavaDoc();
346                 r.nextBytes(bytes);
347
348                 Thread.sleep(2000);
349
350                 final AtomicInteger JavaDoc count = new AtomicInteger JavaDoc();
351                 for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
352                     jmsTemplate.send(queueName, new MessageCreator() {
353
354                         public Message createMessage(Session JavaDoc session)
355                                 throws JMSException JavaDoc {
356
357                             final BytesMessage JavaDoc message = session.createBytesMessage();
358
359                             message.writeBytes(bytes);
360                             message.setIntProperty("count", count.incrementAndGet());
361                             message.setStringProperty("producer", "pooled");
362                             return message;
363                         }
364                     });
365
366                     System.out.println("PooledProducer sent message: "+ count.get());
367                     // Thread.sleep(1000);
368
}
369
370             } catch (final Throwable JavaDoc e) {
371                 System.err.println("Producer 1 is exiting.");
372                 e.printStackTrace();
373             }
374         }
375     }
376     
377     
378     private class NonPooledProducerTask implements Runnable JavaDoc {
379
380         private final String JavaDoc queueName;
381
382         private final ConnectionFactory JavaDoc cf;
383
384         public NonPooledProducerTask(final ConnectionFactory JavaDoc cf,
385                 final String JavaDoc queueName) {
386             this.cf = cf;
387             this.queueName = queueName;
388         }
389
390         public void run() {
391
392             try {
393
394                 final JmsTemplate jmsTemplate = new JmsTemplate(cf);
395                 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
396                 jmsTemplate.setExplicitQosEnabled(true);
397                 jmsTemplate.setMessageIdEnabled(false);
398                 jmsTemplate.setMessageTimestampEnabled(false);
399                 jmsTemplate.afterPropertiesSet();
400
401                 final byte[] bytes = new byte[2048];
402                 final Random JavaDoc r = new Random JavaDoc();
403                 r.nextBytes(bytes);
404
405                 Thread.sleep(2000);
406
407                 final AtomicInteger JavaDoc count = new AtomicInteger JavaDoc();
408                 for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
409                     jmsTemplate.send(queueName, new MessageCreator() {
410
411                         public Message createMessage(Session JavaDoc session)
412                                 throws JMSException JavaDoc {
413
414                             final BytesMessage JavaDoc message = session
415                                     .createBytesMessage();
416
417                             message.writeBytes(bytes);
418                             message.setIntProperty("count", count
419                                     .incrementAndGet());
420                             message.setStringProperty("producer", "non-pooled");
421                             return message;
422                         }
423                     });
424
425                     System.out.println("Non-PooledProducer sent message: " + count.get());
426
427                     // Thread.sleep(1000);
428
}
429
430             } catch (final Throwable JavaDoc e) {
431                 System.err.println("Producer 1 is exiting.");
432                 e.printStackTrace();
433             }
434         }
435     }
436
437 }
438
Popular Tags