1 14 15 package org.apache.activemq.usecases; 16 17 import java.net.URI ; 18 import java.util.ArrayList ; 19 import java.util.Random ; 20 import java.util.concurrent.CountDownLatch ; 21 import java.util.concurrent.ExecutorService ; 22 import java.util.concurrent.Executors ; 23 import java.util.concurrent.TimeUnit ; 24 import java.util.concurrent.atomic.AtomicInteger ; 25 import javax.jms.BytesMessage ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageListener ; 31 import javax.jms.Session ; 32 import junit.framework.Assert; 33 import junit.framework.TestCase; 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.activemq.broker.BrokerService; 36 import org.apache.activemq.broker.TransportConnector; 37 import org.apache.activemq.broker.region.policy.PolicyEntry; 38 import org.apache.activemq.broker.region.policy.PolicyMap; 39 import org.apache.activemq.memory.UsageManager; 40 import org.apache.activemq.network.DiscoveryNetworkConnector; 41 import org.apache.activemq.network.NetworkConnector; 42 import org.apache.activemq.pool.PooledConnectionFactory; 43 import org.springframework.jms.core.JmsTemplate; 44 import org.springframework.jms.core.MessageCreator; 45 import org.springframework.jms.listener.DefaultMessageListenerContainer; 46 47 public class AMQFailoverIssue extends TestCase{ 48 49 private static final String URL1="tcp://localhost:61616"; 50 private static final String QUEUE1_NAME="test.queue.1"; 51 private static final int MAX_CONSUMERS=10; 52 private static final int MAX_PRODUCERS=5; 53 private static final int NUM_MESSAGE_TO_SEND=10000; 54 private static final int TOTAL_MESSAGES=MAX_PRODUCERS * NUM_MESSAGE_TO_SEND; 55 private static final boolean USE_FAILOVER=true; 56 private AtomicInteger messageCount=new AtomicInteger (); 57 private CountDownLatch doneLatch; 58 59 @Override public void setUp() throws Exception { 60 } 61 62 @Override public void tearDown() throws Exception { 63 } 64 65 public void testFailoverIssue() throws Exception { 67 BrokerService brokerService1=null; 68 ActiveMQConnectionFactory acf=null; 69 PooledConnectionFactory pcf=null; 70 DefaultMessageListenerContainer container1=null; 71 try{ 72 brokerService1=createBrokerService("broker1",URL1,null); 73 brokerService1.start(); 74 acf=createConnectionFactory(URL1,USE_FAILOVER); 75 pcf=new PooledConnectionFactory(acf); 76 doneLatch=new CountDownLatch (TOTAL_MESSAGES); 78 container1=createDefaultMessageListenerContainer(acf,new TestMessageListener1(0),QUEUE1_NAME); 79 container1.afterPropertiesSet(); 80 Thread.sleep(5000); 81 final ExecutorService executor=Executors.newCachedThreadPool(); 82 for(int i=0;i<MAX_PRODUCERS;i++){ 83 executor.submit(new PooledProducerTask(pcf,QUEUE1_NAME)); 84 } 85 assertTrue(doneLatch.await(45,TimeUnit.SECONDS)); 87 executor.shutdown(); 88 Assert.assertEquals(TOTAL_MESSAGES,messageCount.get()); 90 }finally{ 91 container1.stop(); 92 container1.destroy(); 93 container1=null; 94 brokerService1.stop(); 95 brokerService1=null; 96 } 97 } 98 99 private BrokerService createBrokerService(final String brokerName,final String uri1,final String uri2) 100 throws Exception { 101 final BrokerService brokerService=new BrokerService(); 102 brokerService.setBrokerName(brokerName); 103 brokerService.setPersistent(false); 104 brokerService.setUseJmx(true); 105 final UsageManager memoryManager=new UsageManager(); 106 memoryManager.setLimit(5000000); 107 brokerService.setMemoryManager(memoryManager); 108 final ArrayList <PolicyEntry> policyEntries=new ArrayList <PolicyEntry>(); 109 final PolicyEntry entry=new PolicyEntry(); 110 entry.setQueue(">"); 111 entry.setMemoryLimit(1); 113 policyEntries.add(entry); 114 final PolicyMap policyMap=new PolicyMap(); 115 policyMap.setPolicyEntries(policyEntries); 116 brokerService.setDestinationPolicy(policyMap); 117 final TransportConnector tConnector=new TransportConnector(); 118 tConnector.setUri(new URI (uri1)); 119 tConnector.setBrokerName(brokerName); 120 tConnector.setName(brokerName+".transportConnector"); 121 brokerService.addConnector(tConnector); 122 if(uri2!=null){ 123 final NetworkConnector nc=new DiscoveryNetworkConnector(new URI ("static:"+uri2)); 124 nc.setBridgeTempDestinations(true); 125 nc.setBrokerName(brokerName); 126 nc.setPrefetchSize(1); 127 brokerService.addNetworkConnector(nc); 128 } 129 return brokerService; 130 } 131 132 public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, 133 final MessageListener listener,final String queue){ 134 final DefaultMessageListenerContainer container=new DefaultMessageListenerContainer(); 135 container.setConnectionFactory(acf); 136 container.setDestinationName(queue); 137 container.setMessageListener(listener); 138 container.setSessionTransacted(false); 139 container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); 140 container.setConcurrentConsumers(MAX_CONSUMERS); 141 return container; 142 } 143 144 public ActiveMQConnectionFactory createConnectionFactory(final String url,final boolean useFailover){ 145 final String failoverUrl="failover:("+url+")"; 146 final ActiveMQConnectionFactory acf=new ActiveMQConnectionFactory(useFailover?failoverUrl:url); 147 acf.setCopyMessageOnSend(false); 148 acf.setUseAsyncSend(false); 149 acf.setDispatchAsync(true); 150 acf.setUseCompression(false); 151 acf.setOptimizeAcknowledge(false); 152 acf.setOptimizedMessageDispatch(true); 153 acf.setUseAsyncSend(false); 154 return acf; 155 } 156 157 private class TestMessageListener1 implements MessageListener { 158 159 private final long waitTime; 160 161 public TestMessageListener1(long waitTime){ 162 this.waitTime=waitTime; 163 } 164 165 public void onMessage(Message msg){ 166 try{ 167 messageCount.incrementAndGet(); 168 doneLatch.countDown(); 169 Thread.sleep(waitTime); 170 }catch(InterruptedException e){ 171 e.printStackTrace(); 173 } 174 } 175 } 176 177 private class PooledProducerTask implements Runnable { 178 179 private final String queueName; 180 private final PooledConnectionFactory pcf; 181 182 public PooledProducerTask(final PooledConnectionFactory pcf,final String queueName){ 183 this.pcf=pcf; 184 this.queueName=queueName; 185 } 186 187 public void run(){ 188 try{ 189 final JmsTemplate jmsTemplate=new JmsTemplate(pcf); 190 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 191 jmsTemplate.setExplicitQosEnabled(true); 192 jmsTemplate.setMessageIdEnabled(false); 193 jmsTemplate.setMessageTimestampEnabled(false); 194 jmsTemplate.afterPropertiesSet(); 195 final byte[] bytes=new byte[2048]; 196 final Random r=new Random (); 197 r.nextBytes(bytes); 198 Thread.sleep(2000); 199 final AtomicInteger count=new AtomicInteger (); 200 for(int i=0;i<NUM_MESSAGE_TO_SEND;i++){ 201 jmsTemplate.send(queueName,new MessageCreator(){ 202 203 public Message createMessage(Session session) throws JMSException { 204 final BytesMessage message=session.createBytesMessage(); 205 message.writeBytes(bytes); 206 message.setIntProperty("count",count.incrementAndGet()); 207 message.setStringProperty("producer","pooled"); 208 return message; 209 } 210 }); 211 } 212 }catch(final Throwable e){ 213 e.printStackTrace(); 214 } 215 } 216 } 217 } 218 | Popular Tags |