KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > advisory > ConsumerEventSource


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.advisory;
19
20 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
21 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
22
23 import org.apache.activemq.Service;
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.ActiveMQMessage;
26 import org.apache.activemq.command.ActiveMQTopic;
27 import org.apache.activemq.command.ConsumerId;
28 import org.apache.activemq.command.ConsumerInfo;
29 import org.apache.activemq.command.RemoveInfo;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 import javax.jms.Connection JavaDoc;
34 import javax.jms.Destination JavaDoc;
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.Message JavaDoc;
37 import javax.jms.MessageConsumer JavaDoc;
38 import javax.jms.MessageListener JavaDoc;
39 import javax.jms.Session JavaDoc;
40
41 /**
42  * An object which can be used to listen to the number of active consumers
43  * available on a given destination.
44  *
45  * @version $Revision: 475999 $
46  */

47 public class ConsumerEventSource implements Service, MessageListener JavaDoc {
48     private static final Log log = LogFactory.getLog(ConsumerEventSource.class);
49
50     private final Connection JavaDoc connection;
51     private final ActiveMQDestination destination;
52     private ConsumerListener listener;
53     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
54     private AtomicInteger JavaDoc consumerCount = new AtomicInteger JavaDoc();
55     private Session JavaDoc session;
56     private MessageConsumer JavaDoc consumer;
57
58     public ConsumerEventSource(Connection JavaDoc connection, Destination destination) throws JMSException JavaDoc {
59         this.connection = connection;
60         this.destination = ActiveMQDestination.transform(destination);
61     }
62
63     public void setConsumerListener(ConsumerListener listener) {
64         this.listener = listener;
65     }
66
67     public void start() throws Exception JavaDoc {
68         if (started.compareAndSet(false, true)) {
69             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
70             ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
71             consumer = session.createConsumer(advisoryTopic);
72             consumer.setMessageListener(this);
73         }
74     }
75
76     public void stop() throws Exception JavaDoc {
77         if (started.compareAndSet(true, false)) {
78             if (session != null) {
79                 session.close();
80             }
81         }
82     }
83
84     public void onMessage(Message message) {
85         if (message instanceof ActiveMQMessage) {
86             ActiveMQMessage activeMessage = (ActiveMQMessage) message;
87             Object JavaDoc command = activeMessage.getDataStructure();
88             int count = 0;
89             if (command instanceof ConsumerInfo) {
90                 count = consumerCount.incrementAndGet();
91                 count = extractConsumerCountFromMessage(message, count);
92                 fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo) command, count));
93             }
94             else if (command instanceof RemoveInfo) {
95                 RemoveInfo removeInfo = (RemoveInfo) command;
96                 if (removeInfo.isConsumerRemove()) {
97                     count = consumerCount.decrementAndGet();
98                     count = extractConsumerCountFromMessage(message, count);
99                     fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId) removeInfo.getObjectId(), count));
100                 }
101             }
102             else {
103                 log.warn("Unknown command: " + command);
104             }
105         }
106         else {
107             log.warn("Unknown message type: " + message + ". Message ignored");
108         }
109     }
110
111     /**
112      * Lets rely by default on the broker telling us what the consumer count is
113      * as it can ensure that we are up to date at all times and have not
114      * received messages out of order etc.
115      */

116     protected int extractConsumerCountFromMessage(Message message, int count) {
117         try {
118             Object JavaDoc value = message.getObjectProperty("consumerCount");
119             if (value instanceof Number JavaDoc) {
120                 Number JavaDoc n = (Number JavaDoc) value;
121                 return n.intValue();
122             }
123             log.warn("No consumerCount header available on the message: " + message);
124         }
125         catch (Exception JavaDoc e) {
126             log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
127         }
128         return count;
129     }
130
131     protected void fireConsumerEvent(ConsumerEvent event) {
132         if (listener != null) {
133             listener.onConsumerEvent(event);
134         }
135     }
136
137 }
138
Popular Tags