KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > AMQFailoverIssue


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.usecases;
16
17 import java.net.URI JavaDoc;
18 import java.util.ArrayList JavaDoc;
19 import java.util.Random JavaDoc;
20 import java.util.concurrent.CountDownLatch JavaDoc;
21 import java.util.concurrent.ExecutorService JavaDoc;
22 import java.util.concurrent.Executors JavaDoc;
23 import java.util.concurrent.TimeUnit JavaDoc;
24 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
25 import javax.jms.BytesMessage JavaDoc;
26 import javax.jms.ConnectionFactory JavaDoc;
27 import javax.jms.DeliveryMode JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageListener JavaDoc;
31 import javax.jms.Session JavaDoc;
32 import junit.framework.Assert;
33 import junit.framework.TestCase;
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.broker.BrokerService;
36 import org.apache.activemq.broker.TransportConnector;
37 import org.apache.activemq.broker.region.policy.PolicyEntry;
38 import org.apache.activemq.broker.region.policy.PolicyMap;
39 import org.apache.activemq.memory.UsageManager;
40 import org.apache.activemq.network.DiscoveryNetworkConnector;
41 import org.apache.activemq.network.NetworkConnector;
42 import org.apache.activemq.pool.PooledConnectionFactory;
43 import org.springframework.jms.core.JmsTemplate;
44 import org.springframework.jms.core.MessageCreator;
45 import org.springframework.jms.listener.DefaultMessageListenerContainer;
46
47 public class AMQFailoverIssue extends TestCase{
48
49     private static final String JavaDoc URL1="tcp://localhost:61616";
50     private static final String JavaDoc QUEUE1_NAME="test.queue.1";
51     private static final int MAX_CONSUMERS=10;
52     private static final int MAX_PRODUCERS=5;
53     private static final int NUM_MESSAGE_TO_SEND=10000;
54     private static final int TOTAL_MESSAGES=MAX_PRODUCERS * NUM_MESSAGE_TO_SEND;
55     private static final boolean USE_FAILOVER=true;
56     private AtomicInteger JavaDoc messageCount=new AtomicInteger JavaDoc();
57     private CountDownLatch JavaDoc doneLatch;
58
59     @Override JavaDoc public void setUp() throws Exception JavaDoc{
60     }
61
62     @Override JavaDoc public void tearDown() throws Exception JavaDoc{
63     }
64
65     // This should fail with incubator-activemq-fuse-4.1.0.5
66
public void testFailoverIssue() throws Exception JavaDoc{
67         BrokerService brokerService1=null;
68         ActiveMQConnectionFactory acf=null;
69         PooledConnectionFactory pcf=null;
70         DefaultMessageListenerContainer container1=null;
71         try{
72             brokerService1=createBrokerService("broker1",URL1,null);
73             brokerService1.start();
74             acf=createConnectionFactory(URL1,USE_FAILOVER);
75             pcf=new PooledConnectionFactory(acf);
76             // Only listen on the first queue.. let the 2nd queue fill up.
77
doneLatch=new CountDownLatch JavaDoc(TOTAL_MESSAGES);
78             container1=createDefaultMessageListenerContainer(acf,new TestMessageListener1(0),QUEUE1_NAME);
79             container1.afterPropertiesSet();
80             Thread.sleep(5000);
81             final ExecutorService JavaDoc executor=Executors.newCachedThreadPool();
82             for(int i=0;i<MAX_PRODUCERS;i++){
83                 executor.submit(new PooledProducerTask(pcf,QUEUE1_NAME));
84             }
85             // Wait for all message to arrive.
86
assertTrue(doneLatch.await(45,TimeUnit.SECONDS));
87             executor.shutdown();
88             // Thread.sleep(30000);
89
Assert.assertEquals(TOTAL_MESSAGES,messageCount.get());
90         }finally{
91             container1.stop();
92             container1.destroy();
93             container1=null;
94             brokerService1.stop();
95             brokerService1=null;
96         }
97     }
98
99     private BrokerService createBrokerService(final String JavaDoc brokerName,final String JavaDoc uri1,final String JavaDoc uri2)
100             throws Exception JavaDoc{
101         final BrokerService brokerService=new BrokerService();
102         brokerService.setBrokerName(brokerName);
103         brokerService.setPersistent(false);
104         brokerService.setUseJmx(true);
105         final UsageManager memoryManager=new UsageManager();
106         memoryManager.setLimit(5000000);
107         brokerService.setMemoryManager(memoryManager);
108         final ArrayList JavaDoc<PolicyEntry> policyEntries=new ArrayList JavaDoc<PolicyEntry>();
109         final PolicyEntry entry=new PolicyEntry();
110         entry.setQueue(">");
111         // entry.setQueue(QUEUE1_NAME);
112
entry.setMemoryLimit(1);
113         policyEntries.add(entry);
114         final PolicyMap policyMap=new PolicyMap();
115         policyMap.setPolicyEntries(policyEntries);
116         brokerService.setDestinationPolicy(policyMap);
117         final TransportConnector tConnector=new TransportConnector();
118         tConnector.setUri(new URI JavaDoc(uri1));
119         tConnector.setBrokerName(brokerName);
120         tConnector.setName(brokerName+".transportConnector");
121         brokerService.addConnector(tConnector);
122         if(uri2!=null){
123             final NetworkConnector nc=new DiscoveryNetworkConnector(new URI JavaDoc("static:"+uri2));
124             nc.setBridgeTempDestinations(true);
125             nc.setBrokerName(brokerName);
126             nc.setPrefetchSize(1);
127             brokerService.addNetworkConnector(nc);
128         }
129         return brokerService;
130     }
131
132     public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory JavaDoc acf,
133             final MessageListener JavaDoc listener,final String JavaDoc queue){
134         final DefaultMessageListenerContainer container=new DefaultMessageListenerContainer();
135         container.setConnectionFactory(acf);
136         container.setDestinationName(queue);
137         container.setMessageListener(listener);
138         container.setSessionTransacted(false);
139         container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
140         container.setConcurrentConsumers(MAX_CONSUMERS);
141         return container;
142     }
143
144     public ActiveMQConnectionFactory createConnectionFactory(final String JavaDoc url,final boolean useFailover){
145         final String JavaDoc failoverUrl="failover:("+url+")";
146         final ActiveMQConnectionFactory acf=new ActiveMQConnectionFactory(useFailover?failoverUrl:url);
147         acf.setCopyMessageOnSend(false);
148         acf.setUseAsyncSend(false);
149         acf.setDispatchAsync(true);
150         acf.setUseCompression(false);
151         acf.setOptimizeAcknowledge(false);
152         acf.setOptimizedMessageDispatch(true);
153         acf.setUseAsyncSend(false);
154         return acf;
155     }
156
157     private class TestMessageListener1 implements MessageListener JavaDoc{
158
159         private final long waitTime;
160
161         public TestMessageListener1(long waitTime){
162             this.waitTime=waitTime;
163         }
164
165         public void onMessage(Message JavaDoc msg){
166             try{
167                 messageCount.incrementAndGet();
168                 doneLatch.countDown();
169                 Thread.sleep(waitTime);
170             }catch(InterruptedException JavaDoc e){
171                 // TODO Auto-generated catch block
172
e.printStackTrace();
173             }
174         }
175     }
176
177     private class PooledProducerTask implements Runnable JavaDoc{
178
179         private final String JavaDoc queueName;
180         private final PooledConnectionFactory pcf;
181
182         public PooledProducerTask(final PooledConnectionFactory pcf,final String JavaDoc queueName){
183             this.pcf=pcf;
184             this.queueName=queueName;
185         }
186
187         public void run(){
188             try{
189                 final JmsTemplate jmsTemplate=new JmsTemplate(pcf);
190                 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
191                 jmsTemplate.setExplicitQosEnabled(true);
192                 jmsTemplate.setMessageIdEnabled(false);
193                 jmsTemplate.setMessageTimestampEnabled(false);
194                 jmsTemplate.afterPropertiesSet();
195                 final byte[] bytes=new byte[2048];
196                 final Random JavaDoc r=new Random JavaDoc();
197                 r.nextBytes(bytes);
198                 Thread.sleep(2000);
199                 final AtomicInteger JavaDoc count=new AtomicInteger JavaDoc();
200                 for(int i=0;i<NUM_MESSAGE_TO_SEND;i++){
201                     jmsTemplate.send(queueName,new MessageCreator(){
202
203                         public Message JavaDoc createMessage(Session JavaDoc session) throws JMSException JavaDoc{
204                             final BytesMessage JavaDoc message=session.createBytesMessage();
205                             message.writeBytes(bytes);
206                             message.setIntProperty("count",count.incrementAndGet());
207                             message.setStringProperty("producer","pooled");
208                             return message;
209                         }
210                     });
211                 }
212             }catch(final Throwable JavaDoc e){
213                 e.printStackTrace();
214             }
215         }
216     }
217 }
218
Popular Tags