1 17 18 package org.apache.activemq.usecases; 19 20 import java.net.URI ; 21 import java.util.ArrayList ; 22 import java.util.Random ; 23 import java.util.concurrent.CountDownLatch ; 24 import java.util.concurrent.ExecutorService ; 25 import java.util.concurrent.Executors ; 26 import java.util.concurrent.TimeUnit ; 27 import java.util.concurrent.atomic.AtomicInteger ; 28 29 import javax.jms.BytesMessage ; 30 import javax.jms.ConnectionFactory ; 31 import javax.jms.DeliveryMode ; 32 import javax.jms.JMSException ; 33 import javax.jms.Message ; 34 import javax.jms.MessageListener ; 35 import javax.jms.Session ; 36 37 import junit.framework.Assert; 38 import junit.framework.TestCase; 39 40 import org.apache.activemq.ActiveMQConnectionFactory; 41 import org.apache.activemq.broker.BrokerService; 42 import org.apache.activemq.broker.TransportConnector; 43 import org.apache.activemq.broker.region.policy.PolicyEntry; 44 import org.apache.activemq.broker.region.policy.PolicyMap; 45 import org.apache.activemq.memory.UsageManager; 46 import org.apache.activemq.network.DiscoveryNetworkConnector; 47 import org.apache.activemq.network.NetworkConnector; 48 import org.apache.activemq.pool.PooledConnectionFactory; 49 import org.springframework.jms.core.JmsTemplate; 50 import org.springframework.jms.core.MessageCreator; 51 import org.springframework.jms.listener.DefaultMessageListenerContainer; 52 53 public class AMQDeadlockTestW4Brokers extends TestCase { 54 55 private static final String BROKER_URL1 = "tcp://localhost:61616"; 56 57 private static final String BROKER_URL2 = "tcp://localhost:61617"; 58 59 private static final String BROKER_URL3 = "tcp://localhost:61618"; 60 61 private static final String BROKER_URL4 = "tcp://localhost:61619"; 62 63 private static final String URL1 = "tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false"; 64 65 private static final String URL2 = "tcp://localhost:61617?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false"; 66 67 private static final String URL3 = "tcp://localhost:61618?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false"; 68 69 private static final String URL4 = "tcp://localhost:61619?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false"; 70 71 private static final String QUEUE1_NAME = "test.queue.1"; 72 73 private static final int MAX_CONSUMERS = 5; 74 75 private static final int NUM_MESSAGE_TO_SEND = 10000; 76 private static final CountDownLatch latch = new CountDownLatch (MAX_CONSUMERS*NUM_MESSAGE_TO_SEND); 77 78 @Override 79 public void setUp() throws Exception { 80 81 } 82 83 @Override 84 public void tearDown() throws Exception { 85 86 } 87 88 public void test4BrokerWithOutLingo() throws Exception { 89 90 BrokerService brokerService1 = null; 91 BrokerService brokerService2 = null; 92 BrokerService brokerService3 = null; 93 BrokerService brokerService4 = null; 94 ActiveMQConnectionFactory acf1 = null; 95 ActiveMQConnectionFactory acf2 = null; 96 PooledConnectionFactory pcf1 = null; 97 PooledConnectionFactory pcf2 = null; 98 ActiveMQConnectionFactory acf3 = null; 99 ActiveMQConnectionFactory acf4 = null; 100 PooledConnectionFactory pcf3 = null; 101 PooledConnectionFactory pcf4 = null; 102 DefaultMessageListenerContainer container1 = null; 103 104 try { 105 106 brokerService1 = createBrokerService("broker1", BROKER_URL1, 108 BROKER_URL2, BROKER_URL3, BROKER_URL4, 0 ); 109 brokerService1.start(); 110 brokerService2 = createBrokerService("broker2", BROKER_URL2, 111 BROKER_URL1, BROKER_URL3, BROKER_URL4, 0); 112 brokerService2.start(); 113 brokerService3 = createBrokerService("broker3", BROKER_URL3, 114 BROKER_URL2, BROKER_URL1, BROKER_URL4, 0); 115 brokerService3.start(); 116 brokerService4 = createBrokerService("broker4", BROKER_URL4, 117 BROKER_URL1, BROKER_URL3, BROKER_URL2, 0); 118 brokerService4.start(); 119 120 final String failover1 = "failover:(" 121 + URL1 122 + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false"; 123 final String failover2 = "failover:(" 124 + URL2 125 + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false"; 126 127 final String failover3 = "failover:(" 128 + URL3 129 + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false"; 130 131 final String failover4 = "failover:(" 132 + URL4 133 + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false"; 134 135 acf1 = createConnectionFactory(failover1); 136 acf2 = createConnectionFactory(failover2); 137 acf3 = createConnectionFactory(failover3); 138 acf4 = createConnectionFactory(failover4); 139 140 pcf1 = new PooledConnectionFactory(acf1); 141 pcf2 = new PooledConnectionFactory(acf2); 142 pcf3 = new PooledConnectionFactory(acf3); 143 pcf4 = new PooledConnectionFactory(acf4); 144 145 146 container1 = createDefaultMessageListenerContainer(acf2, 147 new TestMessageListener1(0), QUEUE1_NAME); 148 container1.afterPropertiesSet(); 149 150 final PooledProducerTask[] task = new PooledProducerTask[4]; 151 task[0] = new PooledProducerTask(pcf1, QUEUE1_NAME, "producer1"); 152 task[1] = new PooledProducerTask(pcf2, QUEUE1_NAME, "producer2"); 153 task[2] = new PooledProducerTask(pcf3, QUEUE1_NAME, "producer3"); 154 task[3] = new PooledProducerTask(pcf4, QUEUE1_NAME, "producer4"); 155 156 final ExecutorService executor = Executors.newCachedThreadPool(); 157 158 for (int i = 0; i < 4; i++) { 159 executor.submit(task[i]); 160 } 161 162 latch.await(15,TimeUnit.SECONDS); 163 assertTrue(latch.getCount()==MAX_CONSUMERS*NUM_MESSAGE_TO_SEND); 164 165 } catch (Exception e) { 166 e.printStackTrace(); 167 } finally { 168 169 container1.stop(); 170 container1.destroy(); 171 container1 = null; 172 173 brokerService1.stop(); 174 brokerService1 = null; 175 brokerService2.stop(); 176 brokerService2 = null; 177 brokerService3.stop(); 178 brokerService3 = null; 179 brokerService4.stop(); 180 brokerService4 = null; 181 } 182 183 } 184 185 private BrokerService createBrokerService(final String brokerName, 186 final String uri1, final String uri2, final String uri3, 187 final String uri4, final int queueLimit) throws Exception { 188 final BrokerService brokerService = new BrokerService(); 189 190 brokerService.setBrokerName(brokerName); 191 brokerService.setPersistent(false); 192 brokerService.setUseJmx(true); 193 194 final UsageManager memoryManager = new UsageManager(); 195 memoryManager.setLimit(100000000); 196 brokerService.setMemoryManager(memoryManager); 197 198 final ArrayList <PolicyEntry> policyEntries = new ArrayList <PolicyEntry>(); 199 200 final PolicyEntry entry = new PolicyEntry(); 201 entry.setQueue(">"); 202 entry.setMemoryLimit(queueLimit); 203 policyEntries.add(entry); 204 205 final PolicyMap policyMap = new PolicyMap(); 206 policyMap.setPolicyEntries(policyEntries); 207 brokerService.setDestinationPolicy(policyMap); 208 209 final TransportConnector tConnector = new TransportConnector(); 210 tConnector.setUri(new URI (uri1)); 211 tConnector.setBrokerName(brokerName); 212 tConnector.setName(brokerName + ".transportConnector"); 213 brokerService.addConnector(tConnector); 214 215 if (uri2 != null) { 216 final NetworkConnector nc = new DiscoveryNetworkConnector(new URI ( 217 "static:" + uri2 + "," + uri3 + "," + uri4)); 218 nc.setBridgeTempDestinations(true); 219 nc.setBrokerName(brokerName); 220 221 nc.setPrefetchSize(1000); 223 nc.setNetworkTTL(1); 224 brokerService.addNetworkConnector(nc); 225 } 226 227 return brokerService; 228 229 } 230 231 public DefaultMessageListenerContainer createDefaultMessageListenerContainer( 232 final ConnectionFactory acf, final MessageListener listener, 233 final String queue) { 234 final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); 235 container.setConnectionFactory(acf); 236 container.setDestinationName(queue); 237 container.setMessageListener(listener); 238 container.setSessionTransacted(false); 239 container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); 240 container.setConcurrentConsumers(MAX_CONSUMERS); 241 return container; 242 } 243 244 public ActiveMQConnectionFactory createConnectionFactory(final String url) { 245 final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url); 246 acf.setCopyMessageOnSend(false); 247 acf.setUseAsyncSend(false); 248 acf.setDispatchAsync(true); 249 acf.setUseCompression(false); 250 acf.setOptimizeAcknowledge(false); 251 acf.setOptimizedMessageDispatch(true); 252 acf.setUseAsyncSend(false); 253 254 return acf; 255 } 256 257 private class TestMessageListener1 implements MessageListener { 258 259 private final long waitTime; 260 261 final AtomicInteger count = new AtomicInteger (0); 262 public TestMessageListener1(long waitTime) { 263 this.waitTime = waitTime; 264 265 } 266 267 public void onMessage(Message msg) { 268 269 try { 270 273 int value = count.incrementAndGet(); 274 if (value%1000==0){ 275 System.out.println("Consumed message: " + value); 276 } 277 278 Thread.sleep(waitTime); 279 latch.countDown(); 280 282 } catch (InterruptedException e) { 283 e.printStackTrace(); 284 } 285 286 } 287 } 288 289 private class PooledProducerTask implements Runnable { 290 291 private final String queueName; 292 293 private final PooledConnectionFactory pcf; 294 295 private final String producerName; 296 297 public PooledProducerTask(final PooledConnectionFactory pcf, 298 final String queueName, final String producerName) { 299 this.pcf = pcf; 300 this.queueName = queueName; 301 this.producerName = producerName; 302 } 303 304 public void run() { 305 306 try { 307 308 final JmsTemplate jmsTemplate = new JmsTemplate(pcf); 309 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 310 jmsTemplate.setExplicitQosEnabled(true); 311 jmsTemplate.setMessageIdEnabled(false); 312 jmsTemplate.setMessageTimestampEnabled(false); 313 jmsTemplate.afterPropertiesSet(); 314 315 final byte[] bytes = new byte[2048]; 316 final Random r = new Random (); 317 r.nextBytes(bytes); 318 319 for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) { 320 final int count = i; 321 jmsTemplate.send(queueName, new MessageCreator() { 322 323 public Message createMessage(Session session) 324 throws JMSException { 325 326 final BytesMessage message = session 327 .createBytesMessage(); 328 329 message.writeBytes(bytes); 330 message.setIntProperty("count", count); 331 message.setStringProperty("producerName", 332 producerName); 333 return message; 334 } 335 }); 336 337 339 } 341 342 } catch (final Throwable e) { 343 System.err.println("Producer 1 is exiting."); 344 e.printStackTrace(); 345 } 346 } 347 } 348 349 } 350 | Popular Tags |