KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > AbstractRegion


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.util.HashMap JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Map JavaDoc;
23 import java.util.Set JavaDoc;
24
25 import javax.jms.JMSException JavaDoc;
26
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.broker.ConsumerBrokerExchange;
29 import org.apache.activemq.broker.DestinationAlreadyExistsException;
30 import org.apache.activemq.broker.ProducerBrokerExchange;
31 import org.apache.activemq.command.ActiveMQDestination;
32 import org.apache.activemq.command.ConsumerInfo;
33 import org.apache.activemq.command.Message;
34 import org.apache.activemq.command.MessageAck;
35 import org.apache.activemq.command.MessageDispatchNotification;
36 import org.apache.activemq.command.MessagePull;
37 import org.apache.activemq.command.RemoveSubscriptionInfo;
38 import org.apache.activemq.command.Response;
39 import org.apache.activemq.filter.DestinationMap;
40 import org.apache.activemq.memory.UsageManager;
41 import org.apache.activemq.thread.TaskRunnerFactory;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import sun.security.x509.IssuerAlternativeNameExtension;
45
46 import java.util.concurrent.ConcurrentHashMap JavaDoc;
47
48 /**
49  *
50  * @version $Revision: 1.14 $
51  */

52 abstract public class AbstractRegion implements Region {
53
54     private static final Log log = LogFactory.getLog(AbstractRegion.class);
55
56     protected final ConcurrentHashMap JavaDoc destinations = new ConcurrentHashMap JavaDoc();
57     protected final DestinationMap destinationMap = new DestinationMap();
58     protected final ConcurrentHashMap JavaDoc subscriptions = new ConcurrentHashMap JavaDoc();
59     protected final UsageManager memoryManager;
60     protected final DestinationFactory destinationFactory;
61     protected final DestinationStatistics destinationStatistics;
62     protected final RegionBroker broker;
63     protected boolean autoCreateDestinations=true;
64     protected final TaskRunnerFactory taskRunnerFactory;
65     protected final Object JavaDoc destinationsMutex = new Object JavaDoc();
66     protected final Map JavaDoc consumerChangeMutexMap = new HashMap JavaDoc();
67     protected boolean started = false;
68
69     public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
70         if (broker == null) {
71             throw new IllegalArgumentException JavaDoc("null broker");
72         }
73         this.broker = broker;
74         this.destinationStatistics = destinationStatistics;
75         this.memoryManager = memoryManager;
76         this.taskRunnerFactory = taskRunnerFactory;
77         if (broker == null) {
78             throw new IllegalArgumentException JavaDoc("null destinationFactory");
79         }
80         this.destinationFactory = destinationFactory;
81     }
82
83     public void start() throws Exception JavaDoc {
84         started = true;
85         for (Iterator JavaDoc i = destinations.values().iterator();i.hasNext();) {
86             Destination dest = (Destination)i.next();
87             dest.start();
88         }
89     }
90     
91     public void stop() throws Exception JavaDoc {
92         started = false;
93         for (Iterator JavaDoc i = destinations.values().iterator();i.hasNext();) {
94             Destination dest = (Destination)i.next();
95             dest.stop();
96         }
97         destinations.clear();
98     }
99     
100     public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Exception JavaDoc{
101         log.debug("Adding destination: "+destination);
102         synchronized(destinationsMutex){
103             Destination dest=(Destination)destinations.get(destination);
104             if(dest==null){
105                 dest=createDestination(context,destination);
106                 // intercept if there is a valid interceptor defined
107
DestinationInterceptor destinationInterceptor=broker.getDestinationInterceptor();
108                 if(destinationInterceptor!=null){
109                     dest=destinationInterceptor.intercept(dest);
110                 }
111                 dest.start();
112                 destinations.put(destination,dest);
113                 destinationMap.put(destination,dest);
114                 // Add all consumers that are interested in the destination.
115
for(Iterator JavaDoc iter=subscriptions.values().iterator();iter.hasNext();){
116                     Subscription sub=(Subscription)iter.next();
117                     if(sub.matches(destination)){
118                         dest.addSubscription(context,sub);
119                     }
120                 }
121             }
122             return dest;
123         }
124     }
125
126     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
127                     throws Exception JavaDoc{
128
129         // No timeout.. then try to shut down right way, fails if there are current subscribers.
130
if( timeout == 0 ) {
131             for(Iterator JavaDoc iter=subscriptions.values().iterator();iter.hasNext();){
132                 Subscription sub=(Subscription) iter.next();
133                 if(sub.matches(destination)){
134                     throw new JMSException JavaDoc("Destination still has an active subscription: "+destination);
135                 }
136             }
137         }
138
139         if( timeout > 0 ) {
140             // TODO: implement a way to notify the subscribers that we want to take the down
141
// the destination and that they should un-subscribe.. Then wait up to timeout time before
142
// dropping the subscription.
143

144         }
145
146         log.debug("Removing destination: "+destination);
147         synchronized(destinationsMutex){
148             Destination dest=(Destination) destinations.remove(destination);
149             if(dest!=null){
150
151                 // timeout<0 or we timed out, we now force any remaining subscriptions to un-subscribe.
152
for(Iterator JavaDoc iter=subscriptions.values().iterator();iter.hasNext();){
153                     Subscription sub=(Subscription) iter.next();
154                     if(sub.matches(destination)){
155                         dest.removeSubscription(context, sub);
156                     }
157                 }
158
159                 destinationMap.removeAll(destination);
160                 dest.dispose(context);
161                 dest.stop();
162
163             }else{
164                 log.debug("Destination doesn't exist: " + dest);
165             }
166         }
167     }
168
169     /**
170      * Provide an exact or wildcard lookup of destinations in the region
171      *
172      * @return a set of matching destination objects.
173      */

174     public Set JavaDoc getDestinations(ActiveMQDestination destination) {
175         synchronized(destinationsMutex){
176             return destinationMap.get(destination);
177         }
178     }
179
180     public Map JavaDoc getDestinationMap() {
181         synchronized(destinationsMutex){
182             return new HashMap JavaDoc(destinations);
183         }
184     }
185
186     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
187         log.debug("Adding consumer: "+info.getConsumerId());
188         ActiveMQDestination destination = info.getDestination();
189         if (destination != null && ! destination.isPattern() && ! destination.isComposite()) {
190             // lets auto-create the destination
191
lookup(context, destination);
192         }
193
194         Object JavaDoc addGuard;
195         synchronized(consumerChangeMutexMap) {
196             addGuard = consumerChangeMutexMap.get(info.getConsumerId());
197             if (addGuard == null) {
198                 addGuard = new Object JavaDoc();
199                 consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
200             }
201         }
202         synchronized (addGuard) {
203             Object JavaDoc o = subscriptions.get(info.getConsumerId());
204             if (o != null) {
205                 log.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
206                 return (Subscription)o;
207             }
208
209             Subscription sub = createSubscription(context, info);
210
211             // We may need to add some destinations that are in persistent store but not active
212
// in the broker.
213
//
214
// TODO: think about this a little more. This is good cause destinations are not loaded into
215
// memory until a client needs to use the queue, but a management agent viewing the
216
// broker will not see a destination that exists in persistent store. We may want to
217
// eagerly load all destinations into the broker but have an inactive state for the
218
// destination which has reduced memory usage.
219
//
220
Set JavaDoc inactiveDests = getInactiveDestinations();
221             for (Iterator JavaDoc iter = inactiveDests.iterator(); iter.hasNext();) {
222                 ActiveMQDestination dest = (ActiveMQDestination) iter.next();
223                 if( sub.matches(dest) ) {
224                     context.getBroker().addDestination(context, dest);
225                 }
226             }
227             
228
229             subscriptions.put(info.getConsumerId(), sub);
230
231             // At this point we're done directly manipulating subscriptions,
232
// but we need to retain the synchronized block here. Consider
233
// otherwise what would happen if at this point a second
234
// thread added, then removed, as would be allowed with
235
// no mutex held. Remove is only essentially run once
236
// so everything after this point would be leaked.
237

238             // Add the subscription to all the matching queues.
239
for (Iterator JavaDoc iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
240                 Destination dest = (Destination) iter.next();
241                 dest.addSubscription(context, sub);
242             }
243
244             if( info.isBrowser() ) {
245                 ((QueueBrowserSubscription)sub).browseDone();
246             }
247
248             return sub;
249         }
250     }
251
252     /**
253      * Get all the Destinations that are in storage
254      * @return Set of all stored destinations
255      */

256     public Set JavaDoc getDurableDestinations(){
257         return destinationFactory.getDestinations();
258     }
259
260     /**
261      * @return all Destinations that don't have active consumers
262      */

263     protected Set JavaDoc getInactiveDestinations() {
264         Set JavaDoc inactiveDests = destinationFactory.getDestinations();
265         inactiveDests.removeAll( destinations.keySet() );
266         return inactiveDests;
267     }
268
269     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
270         log.debug("Removing consumer: "+info.getConsumerId());
271
272         Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId());
273         if( sub==null )
274             throw new IllegalArgumentException JavaDoc("The subscription does not exist: "+info.getConsumerId());
275
276         // remove the subscription from all the matching queues.
277
for (Iterator JavaDoc iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
278             Destination dest = (Destination) iter.next();
279             dest.removeSubscription(context, sub);
280         }
281
282         destroySubscription(sub);
283
284         synchronized (consumerChangeMutexMap) {
285             consumerChangeMutexMap.remove(info.getConsumerId());
286         }
287     }
288
289     protected void destroySubscription(Subscription sub) {
290         sub.destroy();
291     }
292
293     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception JavaDoc {
294         throw new JMSException JavaDoc("Invalid operation.");
295     }
296
297     public void send(final ProducerBrokerExchange producerExchange, Message messageSend)
298             throws Exception JavaDoc {
299         final ConnectionContext context = producerExchange.getConnectionContext();
300                
301         if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) {
302             final Destination regionDestination = lookup(context,messageSend.getDestination());
303             producerExchange.setRegionDestination(regionDestination);
304         }
305         
306         producerExchange.getRegionDestination().send(producerExchange, messageSend);
307     }
308
309     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception JavaDoc{
310         Subscription sub=consumerExchange.getSubscription();
311         if(sub==null){
312             sub=(Subscription)subscriptions.get(ack.getConsumerId());
313             if(sub==null){
314                 throw new IllegalArgumentException JavaDoc("The subscription does not exist: "+ack.getConsumerId());
315             }
316             consumerExchange.setSubscription(sub);
317         }
318         sub.acknowledge(consumerExchange.getConnectionContext(),ack);
319     }
320
321     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception JavaDoc {
322         Subscription sub = (Subscription) subscriptions.get(pull.getConsumerId());
323         if( sub==null )
324             throw new IllegalArgumentException JavaDoc("The subscription does not exist: "+pull.getConsumerId());
325         return sub.pullMessage(context, pull);
326     }
327
328     protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception JavaDoc {
329         synchronized(destinationsMutex){
330             Destination dest=(Destination) destinations.get(destination);
331             if(dest==null){
332                 if(autoCreateDestinations){
333                     // Try to auto create the destination... re-invoke broker from the
334
// top so that the proper security checks are performed.
335
try {
336                         dest = addDestination(context, destination);
337                         //context.getBroker().addDestination(context,destination);
338
}
339                     catch (DestinationAlreadyExistsException e) {
340                         // if the destination already exists then lets ignore this error
341
}
342                     // We should now have the dest created.
343
//dest=(Destination) destinations.get(destination);
344
}
345                 if(dest==null){
346                     throw new JMSException JavaDoc("The destination "+destination+" does not exist.");
347                 }
348             }
349             return dest;
350         }
351     }
352
353     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception JavaDoc{
354         Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId());
355         if (sub != null){
356             sub.processMessageDispatchNotification(messageDispatchNotification);
357         }
358     }
359     public void gc() {
360         for (Iterator JavaDoc iter = subscriptions.values().iterator(); iter.hasNext();) {
361             Subscription sub = (Subscription) iter.next();
362             sub.gc();
363         }
364         for (Iterator JavaDoc iter = destinations.values() .iterator(); iter.hasNext();) {
365             Destination dest = (Destination) iter.next();
366             dest.gc();
367         }
368     }
369
370     protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc;
371     
372     protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception JavaDoc {
373         return destinationFactory.createDestination(context, destination, destinationStatistics);
374     }
375
376     public boolean isAutoCreateDestinations() {
377         return autoCreateDestinations;
378     }
379
380     public void setAutoCreateDestinations(boolean autoCreateDestinations) {
381         this.autoCreateDestinations = autoCreateDestinations;
382     }
383
384
385 }
386
Popular Tags