1 18 package org.apache.activemq.advisory; 19 20 import java.util.concurrent.ArrayBlockingQueue ; 21 import java.util.concurrent.BlockingQueue ; 22 import java.util.concurrent.TimeUnit ; 23 24 import org.apache.activemq.EmbeddedBrokerTestSupport; 25 import org.apache.activemq.advisory.ConsumerEvent; 26 import org.apache.activemq.advisory.ConsumerEventSource; 27 import org.apache.activemq.advisory.ConsumerListener; 28 29 import javax.jms.Connection ; 30 import javax.jms.JMSException ; 31 import javax.jms.Message ; 32 import javax.jms.MessageConsumer ; 33 import javax.jms.MessageListener ; 34 import javax.jms.Session ; 35 36 40 public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener { 41 42 protected Session consumerSession1; 43 protected Session consumerSession2; 44 protected int consumerCounter; 45 protected ConsumerEventSource consumerEventSource; 46 protected BlockingQueue eventQueue = new ArrayBlockingQueue (1000); 47 private Connection connection; 48 49 public void testConsumerEvents() throws Exception { 50 consumerEventSource.start(); 51 52 consumerSession1 = createConsumer(); 53 assertConsumerEvent(1, true); 54 55 consumerSession2 = createConsumer(); 56 assertConsumerEvent(2, true); 57 58 consumerSession1.close(); 59 consumerSession1 = null; 60 assertConsumerEvent(1, false); 61 62 consumerSession2.close(); 63 consumerSession2 = null; 64 assertConsumerEvent(0, false); 65 } 66 67 public void testListenWhileAlreadyConsumersActive() throws Exception { 68 consumerSession1 = createConsumer(); 69 consumerSession2 = createConsumer(); 70 71 consumerEventSource.start(); 72 assertConsumerEvent(2, true); 73 assertConsumerEvent(2, true); 74 75 consumerSession1.close(); 76 consumerSession1 = null; 77 assertConsumerEvent(1, false); 78 79 consumerSession2.close(); 80 consumerSession2 = null; 81 assertConsumerEvent(0, false); 82 } 83 84 public void onConsumerEvent(ConsumerEvent event) { 85 eventQueue.add(event); 86 } 87 88 protected void setUp() throws Exception { 89 super.setUp(); 90 91 connection = createConnection(); 92 connection.start(); 93 consumerEventSource = new ConsumerEventSource(connection, destination); 94 consumerEventSource.setConsumerListener(this); 95 } 96 97 protected void tearDown() throws Exception { 98 if (consumerEventSource != null) { 99 consumerEventSource.stop(); 100 } 101 if (consumerSession2 != null) { 102 consumerSession2.close(); 103 } 104 if (consumerSession1 != null) { 105 consumerSession1.close(); 106 } 107 if (connection != null) { 108 connection.close(); 109 } 110 super.tearDown(); 111 } 112 113 protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { 114 ConsumerEvent event = waitForConsumerEvent(); 115 assertEquals("Consumer count", count, event.getConsumerCount()); 116 assertEquals("started", started, event.isStarted()); 117 } 118 119 protected Session createConsumer() throws JMSException { 120 final String consumerText = "Consumer: " + (++consumerCounter); 121 log.info("Creating consumer: " + consumerText + " on destination: " + destination); 122 123 Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 124 MessageConsumer consumer = answer.createConsumer(destination); 125 consumer.setMessageListener(new MessageListener () { 126 public void onMessage(Message message) { 127 log.info("Received message by: " + consumerText + " message: " + message); 128 } 129 }); 130 return answer; 131 } 132 133 protected ConsumerEvent waitForConsumerEvent() throws InterruptedException { 134 ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS); 135 assertTrue("Should have received a consumer event!", answer != null); 136 return answer; 137 } 138 139 } 140 | Popular Tags |