KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > advisory > ProducerEventSource


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.advisory;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.Destination JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import javax.jms.Message JavaDoc;
24 import javax.jms.MessageConsumer JavaDoc;
25 import javax.jms.MessageListener JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import org.apache.activemq.Service;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQMessage;
30 import org.apache.activemq.command.ActiveMQTopic;
31 import org.apache.activemq.command.ProducerId;
32 import org.apache.activemq.command.ProducerInfo;
33 import org.apache.activemq.command.RemoveInfo;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
37 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
38
39 /**
40  * An object which can be used to listen to the number of active consumers
41  * available on a given destination.
42  *
43  * @version $Revision: 359679 $
44  */

45 public class ProducerEventSource implements Service, MessageListener JavaDoc {
46     private static final Log log = LogFactory.getLog(ProducerEventSource.class);
47
48     private final Connection JavaDoc connection;
49     private final ActiveMQDestination destination;
50     private ProducerListener listener;
51     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
52     private AtomicInteger JavaDoc producerCount = new AtomicInteger JavaDoc();
53     private Session JavaDoc session;
54     private MessageConsumer JavaDoc consumer;
55
56     public ProducerEventSource(Connection JavaDoc connection, Destination JavaDoc destination) throws JMSException JavaDoc {
57         this.connection = connection;
58         this.destination = ActiveMQDestination.transform(destination);
59     }
60
61     public void setProducerListener(ProducerListener listener) {
62         this.listener = listener;
63     }
64
65     public void start() throws Exception JavaDoc {
66         if (started.compareAndSet(false, true)) {
67             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
68             ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination);
69             consumer = session.createConsumer(advisoryTopic);
70             consumer.setMessageListener(this);
71         }
72     }
73
74     public void stop() throws Exception JavaDoc {
75         if (started.compareAndSet(true, false)) {
76             if (session != null) {
77                 session.close();
78             }
79         }
80     }
81
82     public void onMessage(Message JavaDoc message) {
83         if (message instanceof ActiveMQMessage) {
84             ActiveMQMessage activeMessage = (ActiveMQMessage) message;
85             Object JavaDoc command = activeMessage.getDataStructure();
86             int count = 0;
87             if (command instanceof ProducerInfo) {
88                 count = producerCount.incrementAndGet();
89                 count = extractProducerCountFromMessage(message, count);
90                 fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo) command, count));
91             }
92             else if (command instanceof RemoveInfo) {
93                 RemoveInfo removeInfo = (RemoveInfo) command;
94                 if (removeInfo.isProducerRemove()) {
95                     count = producerCount.decrementAndGet();
96                     count = extractProducerCountFromMessage(message, count);
97                     fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId) removeInfo.getObjectId(), count));
98                 }
99             }
100             else {
101                 log.warn("Unknown command: " + command);
102             }
103         }
104         else {
105             log.warn("Unknown message type: " + message + ". Message ignored");
106         }
107     }
108
109     protected int extractProducerCountFromMessage(Message JavaDoc message, int count) {
110         try {
111             Object JavaDoc value = message.getObjectProperty("producerCount");
112             if (value instanceof Number JavaDoc) {
113                 Number JavaDoc n = (Number JavaDoc) value;
114                 return n.intValue();
115             }
116             log.warn("No producerCount header available on the message: " + message);
117         }
118         catch (Exception JavaDoc e) {
119             log.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
120         }
121         return count;
122     }
123
124     protected void fireProducerEvent(ProducerEvent event) {
125         if (listener != null) {
126             listener.onProducerEvent(event);
127         }
128     }
129
130 }
131
Popular Tags