KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > advisory > AdvisoryBroker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.advisory;
19
20 import java.util.Iterator JavaDoc;
21
22 import org.apache.activemq.broker.Broker;
23 import org.apache.activemq.broker.BrokerFilter;
24 import org.apache.activemq.broker.ConnectionContext;
25 import org.apache.activemq.broker.ProducerBrokerExchange;
26 import org.apache.activemq.broker.region.Destination;
27 import org.apache.activemq.broker.region.Subscription;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQMessage;
30 import org.apache.activemq.command.ActiveMQTopic;
31 import org.apache.activemq.command.Command;
32 import org.apache.activemq.command.ConnectionInfo;
33 import org.apache.activemq.command.ConsumerId;
34 import org.apache.activemq.command.ConsumerInfo;
35 import org.apache.activemq.command.DestinationInfo;
36 import org.apache.activemq.command.MessageId;
37 import org.apache.activemq.command.ProducerId;
38 import org.apache.activemq.command.ProducerInfo;
39 import org.apache.activemq.util.IdGenerator;
40 import org.apache.activemq.util.LongSequenceGenerator;
41
42 import java.util.concurrent.ConcurrentHashMap JavaDoc;
43
44 /**
45  * This broker filter handles tracking the state of the broker for purposes of publishing advisory messages
46  * to advisory consumers.
47  *
48  * @version $Revision$
49  */

50 public class AdvisoryBroker extends BrokerFilter {
51     
52     //private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
53

54     protected final ConcurrentHashMap JavaDoc connections = new ConcurrentHashMap JavaDoc();
55     protected final ConcurrentHashMap JavaDoc consumers = new ConcurrentHashMap JavaDoc();
56     protected final ConcurrentHashMap JavaDoc producers = new ConcurrentHashMap JavaDoc();
57     protected final ConcurrentHashMap JavaDoc destinations = new ConcurrentHashMap JavaDoc();
58
59     static final private IdGenerator idGenerator = new IdGenerator();
60     protected final ProducerId advisoryProducerId = new ProducerId();
61     final private LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
62     
63     public AdvisoryBroker(Broker next) {
64         super(next);
65         advisoryProducerId.setConnectionId(idGenerator.generateId());
66     }
67     
68     
69
70     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception JavaDoc {
71         next.addConnection(context, info);
72
73         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
74         fireAdvisory(context, topic, info);
75         connections.put(info.getConnectionId(), info);
76     }
77
78     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
79         Subscription answer = next.addConsumer(context, info);
80
81         // Don't advise advisory topics.
82
if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) {
83             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
84             consumers.put(info.getConsumerId(), info);
85             fireConsumerAdvisory(context, topic, info);
86         } else {
87             
88             // We need to replay all the previously collected state objects
89
// for this newly added consumer.
90
if( AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination()) ) {
91                 // Replay the connections.
92
for (Iterator JavaDoc iter = connections.values().iterator(); iter.hasNext();) {
93                     ConnectionInfo value = (ConnectionInfo) iter.next();
94                     ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
95                     fireAdvisory(context, topic, value, info.getConsumerId());
96                 }
97             }
98             
99             // We need to replay all the previously collected destination objects
100
// for this newly added consumer.
101
if( AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination()) ) {
102                 // Replay the destinations.
103
for (Iterator JavaDoc iter = destinations.values().iterator(); iter.hasNext();) {
104                     DestinationInfo value = (DestinationInfo) iter.next();
105                     ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
106                     fireAdvisory(context, topic, value, info.getConsumerId());
107                 }
108             }
109
110             // Replay the producers.
111
if( AdvisorySupport.isProducerAdvisoryTopic(info.getDestination()) ) {
112                 for (Iterator JavaDoc iter = producers.values().iterator(); iter.hasNext();) {
113                     ProducerInfo value = (ProducerInfo) iter.next();
114                     ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
115                     fireProducerAdvisory(context, topic, value, info.getConsumerId());
116                 }
117             }
118             
119             // Replay the consumers.
120
if( AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ) {
121                 for (Iterator JavaDoc iter = consumers.values().iterator(); iter.hasNext();) {
122                     ConsumerInfo value = (ConsumerInfo) iter.next();
123                     ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
124                     fireConsumerAdvisory(context, topic, value, info.getConsumerId());
125                 }
126             }
127         }
128         return answer;
129     }
130
131     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
132         next.addProducer(context, info);
133
134         // Don't advise advisory topics.
135
if( info.getDestination()!=null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) {
136             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
137             fireAdvisory(context, topic, info);
138             producers.put(info.getProducerId(), info);
139         }
140     }
141     
142     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception JavaDoc {
143         Destination answer = next.addDestination(context, destination);
144       
145         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
146         DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
147         fireAdvisory(context, topic, info);
148         destinations.put(destination, info);
149         return answer;
150     }
151     
152     public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception JavaDoc{
153         ActiveMQDestination destination = info.getDestination();
154         next.addDestinationInfo(context, info);
155         
156         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
157         fireAdvisory(context, topic, info);
158         destinations.put(destination, info);
159     }
160
161     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception JavaDoc {
162         next.removeDestination(context, destination, timeout);
163         DestinationInfo info = (DestinationInfo) destinations.remove(destination);
164         if( info !=null ) {
165             info.setDestination(destination);
166             info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
167             ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
168             fireAdvisory(context, topic, info);
169             try {
170                 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
171             } catch (Exception JavaDoc expectedIfDestinationDidNotExistYet) {
172             }
173             try {
174                 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
175             } catch (Exception JavaDoc expectedIfDestinationDidNotExistYet) {
176             }
177         }
178        
179     }
180     
181     public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception JavaDoc{
182         next.removeDestinationInfo(context, destInfo);
183         DestinationInfo info = (DestinationInfo) destinations.remove(destInfo.getDestination());
184
185         if( info !=null ) {
186             info.setDestination(destInfo.getDestination());
187             info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
188             ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
189             fireAdvisory(context, topic, info);
190             try {
191                 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
192             } catch (Exception JavaDoc expectedIfDestinationDidNotExistYet) {
193             }
194             try {
195                 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
196             } catch (Exception JavaDoc expectedIfDestinationDidNotExistYet) {
197             }
198         }
199
200     }
201
202     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable JavaDoc error) throws Exception JavaDoc {
203         next.removeConnection(context, info, error);
204
205         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
206         fireAdvisory(context, topic, info.createRemoveCommand());
207         connections.remove(info.getConnectionId());
208     }
209
210     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
211         next.removeConsumer(context, info);
212
213         // Don't advise advisory topics.
214
if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) {
215             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
216             consumers.remove(info.getConsumerId());
217             fireConsumerAdvisory(context, topic, info.createRemoveCommand());
218         }
219     }
220
221     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
222         next.removeProducer(context, info);
223
224         // Don't advise advisory topics.
225
if( info.getDestination()!=null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) {
226             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
227             producers.remove(info.getProducerId());
228             fireProducerAdvisory(context, topic, info.createRemoveCommand());
229         }
230     }
231     
232     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception JavaDoc {
233         fireAdvisory(context, topic, command, null);
234     }
235     
236     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception JavaDoc {
237         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
238         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
239     }
240     
241     protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception JavaDoc {
242         fireConsumerAdvisory(context, topic, command, null);
243     }
244     protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception JavaDoc {
245         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
246         advisoryMessage.setIntProperty("consumerCount", consumers.size());
247         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
248     }
249     
250     protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception JavaDoc {
251         fireProducerAdvisory(context, topic, command, null);
252     }
253     protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception JavaDoc {
254         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
255         advisoryMessage.setIntProperty("producerCount", producers.size());
256         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
257     }
258
259     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception JavaDoc {
260         advisoryMessage.setDataStructure(command);
261         advisoryMessage.setPersistent(false);
262         advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
263         advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
264         advisoryMessage.setTargetConsumerId(targetConsumerId);
265
266         advisoryMessage.setDestination(topic);
267         advisoryMessage.setResponseRequired(false);
268         advisoryMessage.setProducerId(advisoryProducerId);
269         boolean originalFlowControl = context.isProducerFlowControl();
270         final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
271         producerExchange.setConnectionContext(context);
272         producerExchange.setMutable(true);
273         try {
274             context.setProducerFlowControl(false);
275             next.send(producerExchange, advisoryMessage);
276         } finally {
277             context.setProducerFlowControl(originalFlowControl);
278         }
279     }
280
281 }
282
Popular Tags