1 18 package org.apache.activemq.advisory; 19 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageListener ; 26 import javax.jms.Session ; 27 import javax.jms.Topic ; 28 29 import org.apache.activemq.EmbeddedBrokerTestSupport; 30 import org.apache.activemq.broker.region.RegionBroker; 31 import org.apache.activemq.command.ActiveMQTempQueue; 32 import org.apache.activemq.command.ActiveMQTempTopic; 33 34 import java.util.concurrent.ArrayBlockingQueue ; 35 import java.util.concurrent.BlockingQueue ; 36 import java.util.concurrent.TimeUnit ; 37 38 42 public class TempDestDeleteTest extends EmbeddedBrokerTestSupport implements ConsumerListener { 43 44 protected int consumerCounter; 45 protected ConsumerEventSource topicConsumerEventSource; 46 private ConsumerEventSource queueConsumerEventSource; 47 48 protected BlockingQueue eventQueue = new ArrayBlockingQueue (1000); 49 private Connection connection; 50 private Session session; 51 private ActiveMQTempTopic tempTopic; 52 private ActiveMQTempQueue tempQueue; 53 54 public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception { 55 topicConsumerEventSource.start(); 56 57 MessageConsumer consumer = createConsumer(tempTopic); 58 assertConsumerEvent(1, true); 59 60 Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic); 61 assertTrue( destinationExists(advisoryTopic) ); 62 63 consumer.close(); 64 65 tempTopic.delete(); 67 68 assertFalse( destinationExists(advisoryTopic) ); 69 } 70 71 public void testDeleteTempQueueDeletesAvisoryTopics() throws Exception { 72 queueConsumerEventSource.start(); 73 74 MessageConsumer consumer = createConsumer(tempQueue); 75 assertConsumerEvent(1, true); 76 77 Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempQueue); 78 assertTrue( destinationExists(advisoryTopic) ); 79 80 consumer.close(); 81 82 tempQueue.delete(); 84 85 assertFalse( destinationExists(advisoryTopic) ); 86 } 87 88 private boolean destinationExists(Destination dest) throws Exception { 89 RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class); 90 return rb.getTopicRegion().getDestinationMap().containsKey(dest) 91 || rb.getQueueRegion().getDestinationMap().containsKey(dest) 92 || rb.getTempTopicRegion().getDestinationMap().containsKey(dest) 93 || rb.getTempQueueRegion().getDestinationMap().containsKey(dest); 94 } 95 96 public void onConsumerEvent(ConsumerEvent event) { 97 eventQueue.add(event); 98 } 99 100 protected void setUp() throws Exception { 101 super.setUp(); 102 connection = createConnection(); 103 connection.start(); 104 105 session = connection.createSession(false, 0); 106 107 tempTopic = (ActiveMQTempTopic) session.createTemporaryTopic(); 108 topicConsumerEventSource = new ConsumerEventSource(connection, tempTopic); 109 topicConsumerEventSource.setConsumerListener(this); 110 111 tempQueue = (ActiveMQTempQueue) session.createTemporaryQueue(); 112 queueConsumerEventSource = new ConsumerEventSource(connection, tempQueue); 113 queueConsumerEventSource.setConsumerListener(this); 114 } 115 116 protected void tearDown() throws Exception { 117 if (connection != null) { 118 connection.close(); 119 } 120 super.tearDown(); 121 } 122 123 protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { 124 ConsumerEvent event = waitForConsumerEvent(); 125 assertEquals("Consumer count", count, event.getConsumerCount()); 126 assertEquals("started", started, event.isStarted()); 127 } 128 129 protected MessageConsumer createConsumer(Destination dest) throws JMSException { 130 final String consumerText = "Consumer: " + (++consumerCounter); 131 log.info("Creating consumer: " + consumerText + " on destination: " + dest); 132 133 MessageConsumer consumer = session.createConsumer(dest); 134 consumer.setMessageListener(new MessageListener () { 135 public void onMessage(Message message) { 136 log.info("Received message by: " + consumerText + " message: " + message); 137 } 138 }); 139 return consumer; 140 } 141 142 protected ConsumerEvent waitForConsumerEvent() throws InterruptedException { 143 ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(1000, TimeUnit.MILLISECONDS); 144 assertTrue("Should have received a consumer event!", answer != null); 145 return answer; 146 } 147 148 } 149 | Popular Tags |