1 18 19 package org.apache.activemq.usecases; 20 import java.util.concurrent.atomic.AtomicInteger ; 21 22 import org.apache.activemq.ActiveMQConnectionFactory; 23 import org.apache.activemq.broker.BrokerService; 24 25 import javax.jms.Connection ; 26 import javax.jms.Queue ; 27 import javax.jms.QueueConnection ; 28 import javax.jms.QueueReceiver ; 29 import javax.jms.QueueSender ; 30 import javax.jms.QueueSession ; 31 import javax.jms.Session ; 32 import javax.jms.Topic ; 33 34 import junit.framework.TestCase; 35 36 40 public class CreateTemporaryQueueBeforeStartTest extends TestCase { 41 protected String bindAddress = "tcp://localhost:61621"; 42 private Connection connection; 43 private BrokerService broker = new BrokerService(); 44 45 public void testCreateTemporaryQueue() throws Exception { 46 connection = createConnection(); 47 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 48 Queue queue = session.createTemporaryQueue(); 49 assertTrue("No queue created!", queue != null); 50 Topic topic = session.createTemporaryTopic(); 51 assertTrue("No topic created!", topic != null); 52 } 53 54 public void testTryToReproduceNullPointerBug() throws Exception { 55 String url = bindAddress; 56 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 57 QueueConnection queueConnection = factory.createQueueConnection(); 58 this.connection = queueConnection; 59 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 60 QueueSender sender = session.createSender(null); Queue receiverQueue = session.createTemporaryQueue(); 62 QueueReceiver receiver = session.createReceiver(receiverQueue); 63 queueConnection.start(); 64 } 65 66 public void testTemporaryQueueConsumer() throws Exception { 67 final int NUMBER = 20; 68 final AtomicInteger count = new AtomicInteger (0); 69 for (int i = 0;i < NUMBER;i++) { 70 Thread thread = new Thread (new Runnable () { 71 public void run() { 72 try { 73 QueueConnection connection = createConnection(); 74 QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 75 Queue queue = session.createTemporaryQueue(); 76 QueueReceiver consumer = session.createReceiver(queue); 77 connection.start(); 78 79 80 if (count.incrementAndGet() >= NUMBER){ 81 synchronized(count){ 82 count.notify(); 83 } 84 } 85 } 86 catch (Exception ex) { 87 ex.printStackTrace(); 88 } 89 } 90 }); 91 thread.start(); 92 } 93 int maxWaitTime = 20000; 94 synchronized (count) { 95 long waitTime = maxWaitTime; 96 long start = System.currentTimeMillis(); 97 while (count.get() < NUMBER) { 98 if (waitTime <= 0) { 99 break; 100 } 101 else { 102 count.wait(waitTime); 103 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 104 } 105 } 106 } 107 assertTrue("Unexpected count: " + count, count.get() == NUMBER); 108 } 109 110 protected QueueConnection createConnection() throws Exception { 111 ActiveMQConnectionFactory factory = createConnectionFactory(); 112 return factory.createQueueConnection(); 113 } 114 115 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 116 return new ActiveMQConnectionFactory(bindAddress); 117 } 118 119 protected void setUp() throws Exception { 120 broker.setPersistent(false); 121 broker.addConnector(bindAddress); 122 broker.start(); 123 super.setUp(); 124 } 125 126 protected void tearDown() throws Exception { 127 if (connection != null) { 128 connection.close(); 129 } 130 broker.stop(); 131 super.tearDown(); 132 } 133 } 134 | Popular Tags |