1 18 package org.apache.activemq.advisory; 19 20 import java.util.concurrent.ArrayBlockingQueue ; 21 import java.util.concurrent.BlockingQueue ; 22 import java.util.concurrent.TimeUnit ; 23 24 import org.apache.activemq.EmbeddedBrokerTestSupport; 25 import org.apache.activemq.advisory.ConsumerEvent; 26 import org.apache.activemq.advisory.ConsumerEventSource; 27 import org.apache.activemq.advisory.ConsumerListener; 28 29 import javax.jms.Connection ; 30 import javax.jms.JMSException ; 31 import javax.jms.Message ; 32 import javax.jms.MessageConsumer ; 33 import javax.jms.MessageListener ; 34 import javax.jms.MessageProducer ; 35 import javax.jms.Session ; 36 37 41 public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener { 42 43 protected Session consumerSession1; 44 protected Session consumerSession2; 45 protected int consumerCounter; 46 protected ProducerEventSource producerEventSource; 47 protected BlockingQueue eventQueue = new ArrayBlockingQueue (1000); 48 private Connection connection; 49 50 public void testProducerEvents() throws Exception { 51 producerEventSource.start(); 52 53 consumerSession1 = createProducer(); 54 assertConsumerEvent(1, true); 55 56 consumerSession2 = createProducer(); 57 assertConsumerEvent(2, true); 58 59 consumerSession1.close(); 60 consumerSession1 = null; 61 assertConsumerEvent(1, false); 62 63 consumerSession2.close(); 64 consumerSession2 = null; 65 assertConsumerEvent(0, false); 66 } 67 68 public void testListenWhileAlreadyConsumersActive() throws Exception { 69 consumerSession1 = createProducer(); 70 consumerSession2 = createProducer(); 71 72 producerEventSource.start(); 73 assertConsumerEvent(2, true); 74 assertConsumerEvent(2, true); 75 76 consumerSession1.close(); 77 consumerSession1 = null; 78 assertConsumerEvent(1, false); 79 80 consumerSession2.close(); 81 consumerSession2 = null; 82 assertConsumerEvent(0, false); 83 } 84 85 public void onProducerEvent(ProducerEvent event) { 86 eventQueue.add(event); 87 } 88 89 protected void setUp() throws Exception { 90 super.setUp(); 91 92 connection = createConnection(); 93 connection.start(); 94 producerEventSource = new ProducerEventSource(connection, destination); 95 producerEventSource.setProducerListener(this); 96 } 97 98 protected void tearDown() throws Exception { 99 if (producerEventSource != null) { 100 producerEventSource.stop(); 101 } 102 if (consumerSession2 != null) { 103 consumerSession2.close(); 104 } 105 if (consumerSession1 != null) { 106 consumerSession1.close(); 107 } 108 if (connection != null) { 109 connection.close(); 110 } 111 super.tearDown(); 112 } 113 114 protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { 115 ProducerEvent event = waitForProducerEvent(); 116 assertEquals("Producer count", count, event.getProducerCount()); 117 assertEquals("started", started, event.isStarted()); 118 } 119 120 protected Session createProducer() throws JMSException { 121 final String consumerText = "Consumer: " + (++consumerCounter); 122 log.info("Creating consumer: " + consumerText + " on destination: " + destination); 123 124 Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 125 MessageProducer producer = answer.createProducer(destination); 126 return answer; 127 } 128 129 protected ProducerEvent waitForProducerEvent() throws InterruptedException { 130 ProducerEvent answer = (ProducerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS); 131 assertTrue("Should have received a consumer event!", answer != null); 132 return answer; 133 } 134 135 } 136 | Popular Tags |