1 18 package org.apache.activemq.advisory; 19 20 import java.util.concurrent.atomic.AtomicBoolean ; 21 import java.util.concurrent.atomic.AtomicInteger ; 22 23 import org.apache.activemq.Service; 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.ActiveMQMessage; 26 import org.apache.activemq.command.ActiveMQTopic; 27 import org.apache.activemq.command.ConsumerId; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.activemq.command.RemoveInfo; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 import javax.jms.Connection ; 34 import javax.jms.Destination ; 35 import javax.jms.JMSException ; 36 import javax.jms.Message ; 37 import javax.jms.MessageConsumer ; 38 import javax.jms.MessageListener ; 39 import javax.jms.Session ; 40 41 47 public class ConsumerEventSource implements Service, MessageListener { 48 private static final Log log = LogFactory.getLog(ConsumerEventSource.class); 49 50 private final Connection connection; 51 private final ActiveMQDestination destination; 52 private ConsumerListener listener; 53 private AtomicBoolean started = new AtomicBoolean (false); 54 private AtomicInteger consumerCount = new AtomicInteger (); 55 private Session session; 56 private MessageConsumer consumer; 57 58 public ConsumerEventSource(Connection connection, Destination destination) throws JMSException { 59 this.connection = connection; 60 this.destination = ActiveMQDestination.transform(destination); 61 } 62 63 public void setConsumerListener(ConsumerListener listener) { 64 this.listener = listener; 65 } 66 67 public void start() throws Exception { 68 if (started.compareAndSet(false, true)) { 69 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 70 ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination); 71 consumer = session.createConsumer(advisoryTopic); 72 consumer.setMessageListener(this); 73 } 74 } 75 76 public void stop() throws Exception { 77 if (started.compareAndSet(true, false)) { 78 if (session != null) { 79 session.close(); 80 } 81 } 82 } 83 84 public void onMessage(Message message) { 85 if (message instanceof ActiveMQMessage) { 86 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 87 Object command = activeMessage.getDataStructure(); 88 int count = 0; 89 if (command instanceof ConsumerInfo) { 90 count = consumerCount.incrementAndGet(); 91 count = extractConsumerCountFromMessage(message, count); 92 fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo) command, count)); 93 } 94 else if (command instanceof RemoveInfo) { 95 RemoveInfo removeInfo = (RemoveInfo) command; 96 if (removeInfo.isConsumerRemove()) { 97 count = consumerCount.decrementAndGet(); 98 count = extractConsumerCountFromMessage(message, count); 99 fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId) removeInfo.getObjectId(), count)); 100 } 101 } 102 else { 103 log.warn("Unknown command: " + command); 104 } 105 } 106 else { 107 log.warn("Unknown message type: " + message + ". Message ignored"); 108 } 109 } 110 111 116 protected int extractConsumerCountFromMessage(Message message, int count) { 117 try { 118 Object value = message.getObjectProperty("consumerCount"); 119 if (value instanceof Number ) { 120 Number n = (Number ) value; 121 return n.intValue(); 122 } 123 log.warn("No consumerCount header available on the message: " + message); 124 } 125 catch (Exception e) { 126 log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e); 127 } 128 return count; 129 } 130 131 protected void fireConsumerEvent(ConsumerEvent event) { 132 if (listener != null) { 133 listener.onConsumerEvent(event); 134 } 135 } 136 137 } 138 | Popular Tags |