1 18 package org.apache.activemq.advisory; 19 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageListener ; 26 import javax.jms.Session ; 27 import org.apache.activemq.Service; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQMessage; 30 import org.apache.activemq.command.ActiveMQTopic; 31 import org.apache.activemq.command.ProducerId; 32 import org.apache.activemq.command.ProducerInfo; 33 import org.apache.activemq.command.RemoveInfo; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import java.util.concurrent.atomic.AtomicBoolean ; 37 import java.util.concurrent.atomic.AtomicInteger ; 38 39 45 public class ProducerEventSource implements Service, MessageListener { 46 private static final Log log = LogFactory.getLog(ProducerEventSource.class); 47 48 private final Connection connection; 49 private final ActiveMQDestination destination; 50 private ProducerListener listener; 51 private AtomicBoolean started = new AtomicBoolean (false); 52 private AtomicInteger producerCount = new AtomicInteger (); 53 private Session session; 54 private MessageConsumer consumer; 55 56 public ProducerEventSource(Connection connection, Destination destination) throws JMSException { 57 this.connection = connection; 58 this.destination = ActiveMQDestination.transform(destination); 59 } 60 61 public void setProducerListener(ProducerListener listener) { 62 this.listener = listener; 63 } 64 65 public void start() throws Exception { 66 if (started.compareAndSet(false, true)) { 67 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 68 ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination); 69 consumer = session.createConsumer(advisoryTopic); 70 consumer.setMessageListener(this); 71 } 72 } 73 74 public void stop() throws Exception { 75 if (started.compareAndSet(true, false)) { 76 if (session != null) { 77 session.close(); 78 } 79 } 80 } 81 82 public void onMessage(Message message) { 83 if (message instanceof ActiveMQMessage) { 84 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 85 Object command = activeMessage.getDataStructure(); 86 int count = 0; 87 if (command instanceof ProducerInfo) { 88 count = producerCount.incrementAndGet(); 89 count = extractProducerCountFromMessage(message, count); 90 fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo) command, count)); 91 } 92 else if (command instanceof RemoveInfo) { 93 RemoveInfo removeInfo = (RemoveInfo) command; 94 if (removeInfo.isProducerRemove()) { 95 count = producerCount.decrementAndGet(); 96 count = extractProducerCountFromMessage(message, count); 97 fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId) removeInfo.getObjectId(), count)); 98 } 99 } 100 else { 101 log.warn("Unknown command: " + command); 102 } 103 } 104 else { 105 log.warn("Unknown message type: " + message + ". Message ignored"); 106 } 107 } 108 109 protected int extractProducerCountFromMessage(Message message, int count) { 110 try { 111 Object value = message.getObjectProperty("producerCount"); 112 if (value instanceof Number ) { 113 Number n = (Number ) value; 114 return n.intValue(); 115 } 116 log.warn("No producerCount header available on the message: " + message); 117 } 118 catch (Exception e) { 119 log.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); 120 } 121 return count; 122 } 123 124 protected void fireProducerEvent(ProducerEvent event) { 125 if (listener != null) { 126 listener.onProducerEvent(event); 127 } 128 } 129 130 } 131 | Popular Tags |