1 package org.apache.activemq; 2 3 import java.net.URI ; 4 import java.util.ArrayList ; 5 import java.util.Random ; 6 import java.util.concurrent.CountDownLatch ; 7 import java.util.concurrent.ExecutorService ; 8 import java.util.concurrent.Executors ; 9 import java.util.concurrent.TimeUnit ; 10 import java.util.concurrent.atomic.AtomicInteger ; 11 12 import javax.jms.BytesMessage ; 13 import javax.jms.ConnectionFactory ; 14 import javax.jms.DeliveryMode ; 15 import javax.jms.JMSException ; 16 import javax.jms.Message ; 17 import javax.jms.MessageListener ; 18 import javax.jms.Session ; 19 20 import junit.framework.Assert; 21 import junit.framework.TestCase; 22 23 import org.apache.activemq.broker.BrokerService; 24 import org.apache.activemq.broker.TransportConnector; 25 import org.apache.activemq.broker.region.policy.PolicyEntry; 26 import org.apache.activemq.broker.region.policy.PolicyMap; 27 import org.apache.activemq.memory.UsageManager; 28 import org.apache.activemq.network.DiscoveryNetworkConnector; 29 import org.apache.activemq.network.NetworkConnector; 30 import org.apache.activemq.pool.PooledConnectionFactory; 31 import org.springframework.jms.core.JmsTemplate; 32 import org.springframework.jms.core.MessageCreator; 33 import org.springframework.jms.listener.DefaultMessageListenerContainer; 34 35 36 public class AMQDeadlockTest3 extends TestCase { 37 38 private static final String URL1 = "tcp://localhost:61616"; 39 40 private static final String URL2 = "tcp://localhost:61617"; 41 42 private static final String QUEUE1_NAME = "test.queue.1"; 43 44 private static final String QUEUE2_NAME = "test.queue.2"; 45 46 private static final int MAX_CONSUMERS = 1; 47 48 private static final int MAX_PRODUCERS = 1; 49 50 private static final int NUM_MESSAGE_TO_SEND = 10; 51 52 private AtomicInteger messageCount = new AtomicInteger (); 53 private CountDownLatch doneLatch; 54 55 public void setUp() throws Exception { 56 } 57 58 public void tearDown() throws Exception { 59 } 60 61 public void testQueueLimitsWithOneBrokerSameConnection() throws Exception { 63 64 BrokerService brokerService1 = null; 65 ActiveMQConnectionFactory acf = null; 66 PooledConnectionFactory pcf = null; 67 DefaultMessageListenerContainer container1 = null; 68 69 try { 70 brokerService1 = createBrokerService("broker1", URL1, null); 71 brokerService1.start(); 72 73 acf = createConnectionFactory(URL1); 74 pcf = new PooledConnectionFactory(acf); 75 76 doneLatch = new CountDownLatch (NUM_MESSAGE_TO_SEND); 78 container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME); 79 container1.afterPropertiesSet(); 80 81 Thread.sleep(2000); 82 83 final ExecutorService executor = Executors.newCachedThreadPool(); 84 for (int i = 0; i < MAX_PRODUCERS; i++) { 85 executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME)); 86 Thread.sleep(1000); 87 executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME)); 88 } 89 90 assertTrue(doneLatch.await(20, TimeUnit.SECONDS)); 92 executor.shutdownNow(); 93 94 Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get()); 95 96 } finally { 97 98 container1.stop(); 99 container1.destroy(); 100 container1 = null; 101 brokerService1.stop(); 102 brokerService1 = null; 103 104 } 105 106 } 107 108 109 110 111 public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing() 113 throws Exception { 114 115 BrokerService brokerService1 = null; 116 BrokerService brokerService2 = null; 117 ActiveMQConnectionFactory acf1 = null; 118 ActiveMQConnectionFactory acf2 = null; 119 PooledConnectionFactory pcf = null; 120 DefaultMessageListenerContainer container1 = null; 121 122 try { 123 brokerService1 = createBrokerService("broker1", URL1, URL2); 124 brokerService1.start(); 125 brokerService2 = createBrokerService("broker2", URL2, URL1); 126 brokerService2.start(); 127 128 acf1 = createConnectionFactory(URL1); 129 acf2 = createConnectionFactory(URL2); 130 131 pcf = new PooledConnectionFactory(acf1); 132 133 Thread.sleep(1000); 134 135 doneLatch = new CountDownLatch (MAX_PRODUCERS * NUM_MESSAGE_TO_SEND); 136 container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME); 137 container1.afterPropertiesSet(); 138 139 final ExecutorService executor = Executors.newCachedThreadPool(); 140 for (int i = 0; i < MAX_PRODUCERS; i++) { 141 executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME)); 142 Thread.sleep(1000); 143 executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME)); 144 } 145 146 assertTrue(doneLatch.await(20, TimeUnit.SECONDS)); 147 executor.shutdownNow(); 148 149 Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, 150 messageCount.get()); 151 } finally { 152 153 container1.stop(); 154 container1.destroy(); 155 container1 = null; 156 157 brokerService1.stop(); 158 brokerService1 = null; 159 brokerService2.stop(); 160 brokerService2 = null; 161 } 162 } 163 164 165 public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing() 167 throws Exception { 168 169 BrokerService brokerService1 = null; 170 BrokerService brokerService2 = null; 171 ActiveMQConnectionFactory acf1 = null; 172 ActiveMQConnectionFactory acf2 = null; 173 DefaultMessageListenerContainer container1 = null; 174 DefaultMessageListenerContainer container2 = null; 175 176 try { 177 brokerService1 = createBrokerService("broker1", URL1, URL2); 178 brokerService1.start(); 179 brokerService2 = createBrokerService("broker2", URL2, URL1); 180 brokerService2.start(); 181 182 acf1 = createConnectionFactory(URL1); 183 acf2 = createConnectionFactory(URL2); 184 185 Thread.sleep(1000); 186 187 doneLatch = new CountDownLatch (NUM_MESSAGE_TO_SEND*MAX_PRODUCERS); 188 189 container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME); 190 container1.afterPropertiesSet(); 191 container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME); 192 container2.afterPropertiesSet(); 193 194 final ExecutorService executor = Executors.newCachedThreadPool(); 195 for (int i = 0; i < MAX_PRODUCERS; i++) { 196 executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME)); 197 Thread.sleep(1000); 198 executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME)); 199 } 200 201 assertTrue(doneLatch.await(20, TimeUnit.SECONDS)); 202 executor.shutdownNow(); 203 204 Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get()); 205 } finally { 206 207 container1.stop(); 208 container1.destroy(); 209 container1 = null; 210 211 container2.stop(); 212 container2.destroy(); 213 container2 = null; 214 215 brokerService1.stop(); 216 brokerService1 = null; 217 brokerService2.stop(); 218 brokerService2 = null; 219 } 220 } 221 222 223 224 225 private BrokerService createBrokerService(final String brokerName, 226 final String uri1, final String uri2) throws Exception { 227 final BrokerService brokerService = new BrokerService(); 228 229 brokerService.setBrokerName(brokerName); 230 brokerService.setPersistent(false); 231 brokerService.setUseJmx(true); 232 233 final UsageManager memoryManager = new UsageManager(); 234 memoryManager.setLimit(5000000); 235 brokerService.setMemoryManager(memoryManager); 236 237 final ArrayList policyEntries = new ArrayList (); 238 239 final PolicyEntry entry = new PolicyEntry(); 240 entry.setQueue(">"); 241 entry.setMemoryLimit(1000); 243 policyEntries.add(entry); 244 245 final PolicyMap policyMap = new PolicyMap(); 246 policyMap.setPolicyEntries(policyEntries); 247 brokerService.setDestinationPolicy(policyMap); 248 249 final TransportConnector tConnector = new TransportConnector(); 250 tConnector.setUri(new URI (uri1)); 251 tConnector.setBrokerName(brokerName); 252 tConnector.setName(brokerName + ".transportConnector"); 253 brokerService.addConnector(tConnector); 254 255 if (uri2 != null) { 256 final NetworkConnector nc = new DiscoveryNetworkConnector(new URI ("static:" + uri2)); 257 nc.setBridgeTempDestinations(true); 258 nc.setBrokerName(brokerName); 259 brokerService.addNetworkConnector(nc); 260 } 261 262 return brokerService; 263 264 } 265 266 public DefaultMessageListenerContainer createDefaultMessageListenerContainer( 267 final ConnectionFactory acf, final MessageListener listener, 268 final String queue) { 269 final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); 270 container.setConnectionFactory(acf); 271 container.setDestinationName(queue); 272 container.setMessageListener(listener); 273 container.setSessionTransacted(false); 274 container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); 275 container.setConcurrentConsumers(MAX_CONSUMERS); 276 return container; 277 } 278 279 public ActiveMQConnectionFactory createConnectionFactory(final String url) { 280 final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url); 281 acf.setCopyMessageOnSend(false); 282 acf.setUseAsyncSend(false); 283 acf.setDispatchAsync(true); 284 acf.setUseCompression(false); 285 acf.setOptimizeAcknowledge(false); 286 acf.setOptimizedMessageDispatch(true); 287 acf.setAlwaysSyncSend(true); 288 return acf; 289 } 290 291 private class TestMessageListener1 implements MessageListener { 292 293 private final long waitTime; 294 295 public TestMessageListener1(long waitTime) { 296 this.waitTime = waitTime; 297 298 } 299 300 public void onMessage(Message msg) { 301 302 try { 303 System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count")); 304 305 messageCount.incrementAndGet(); 306 doneLatch.countDown(); 307 308 Thread.sleep(waitTime); 309 } catch (JMSException e) { 310 e.printStackTrace(); 312 } catch (InterruptedException e) { 313 e.printStackTrace(); 315 } 316 317 } 318 } 319 320 321 private class PooledProducerTask implements Runnable { 322 323 private final String queueName; 324 325 private final PooledConnectionFactory pcf; 326 327 public PooledProducerTask(final PooledConnectionFactory pcf, 328 final String queueName) { 329 this.pcf = pcf; 330 this.queueName = queueName; 331 } 332 333 public void run() { 334 335 try { 336 337 final JmsTemplate jmsTemplate = new JmsTemplate(pcf); 338 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 339 jmsTemplate.setExplicitQosEnabled(true); 340 jmsTemplate.setMessageIdEnabled(false); 341 jmsTemplate.setMessageTimestampEnabled(false); 342 jmsTemplate.afterPropertiesSet(); 343 344 final byte[] bytes = new byte[2048]; 345 final Random r = new Random (); 346 r.nextBytes(bytes); 347 348 Thread.sleep(2000); 349 350 final AtomicInteger count = new AtomicInteger (); 351 for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) { 352 jmsTemplate.send(queueName, new MessageCreator() { 353 354 public Message createMessage(Session session) 355 throws JMSException { 356 357 final BytesMessage message = session.createBytesMessage(); 358 359 message.writeBytes(bytes); 360 message.setIntProperty("count", count.incrementAndGet()); 361 message.setStringProperty("producer", "pooled"); 362 return message; 363 } 364 }); 365 366 System.out.println("PooledProducer sent message: "+ count.get()); 367 } 369 370 } catch (final Throwable e) { 371 System.err.println("Producer 1 is exiting."); 372 e.printStackTrace(); 373 } 374 } 375 } 376 377 378 private class NonPooledProducerTask implements Runnable { 379 380 private final String queueName; 381 382 private final ConnectionFactory cf; 383 384 public NonPooledProducerTask(final ConnectionFactory cf, 385 final String queueName) { 386 this.cf = cf; 387 this.queueName = queueName; 388 } 389 390 public void run() { 391 392 try { 393 394 final JmsTemplate jmsTemplate = new JmsTemplate(cf); 395 jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 396 jmsTemplate.setExplicitQosEnabled(true); 397 jmsTemplate.setMessageIdEnabled(false); 398 jmsTemplate.setMessageTimestampEnabled(false); 399 jmsTemplate.afterPropertiesSet(); 400 401 final byte[] bytes = new byte[2048]; 402 final Random r = new Random (); 403 r.nextBytes(bytes); 404 405 Thread.sleep(2000); 406 407 final AtomicInteger count = new AtomicInteger (); 408 for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) { 409 jmsTemplate.send(queueName, new MessageCreator() { 410 411 public Message createMessage(Session session) 412 throws JMSException { 413 414 final BytesMessage message = session 415 .createBytesMessage(); 416 417 message.writeBytes(bytes); 418 message.setIntProperty("count", count 419 .incrementAndGet()); 420 message.setStringProperty("producer", "non-pooled"); 421 return message; 422 } 423 }); 424 425 System.out.println("Non-PooledProducer sent message: " + count.get()); 426 427 } 429 430 } catch (final Throwable e) { 431 System.err.println("Producer 1 is exiting."); 432 e.printStackTrace(); 433 } 434 } 435 } 436 437 } 438 | Popular Tags |