KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > jmx > ManagedRegionBroker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.jmx;
19
20 import java.io.IOException JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.HashMap JavaDoc;
23 import java.util.Hashtable JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.List JavaDoc;
26 import java.util.Map JavaDoc;
27 import java.util.Set JavaDoc;
28 import java.util.Map.Entry;
29
30 import javax.management.InstanceNotFoundException JavaDoc;
31 import javax.management.MBeanServer JavaDoc;
32 import javax.management.MalformedObjectNameException JavaDoc;
33 import javax.management.ObjectName JavaDoc;
34 import javax.management.openmbean.CompositeData JavaDoc;
35 import javax.management.openmbean.CompositeDataSupport JavaDoc;
36 import javax.management.openmbean.CompositeType JavaDoc;
37 import javax.management.openmbean.OpenDataException JavaDoc;
38 import javax.management.openmbean.TabularData JavaDoc;
39 import javax.management.openmbean.TabularDataSupport JavaDoc;
40 import javax.management.openmbean.TabularType JavaDoc;
41
42 import org.apache.activemq.broker.Broker;
43 import org.apache.activemq.broker.BrokerService;
44 import org.apache.activemq.broker.ConnectionContext;
45 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
46 import org.apache.activemq.broker.region.Destination;
47 import org.apache.activemq.broker.region.DestinationFactory;
48 import org.apache.activemq.broker.region.DestinationFactoryImpl;
49 import org.apache.activemq.broker.region.DestinationInterceptor;
50 import org.apache.activemq.broker.region.Queue;
51 import org.apache.activemq.broker.region.Region;
52 import org.apache.activemq.broker.region.RegionBroker;
53 import org.apache.activemq.broker.region.Subscription;
54 import org.apache.activemq.broker.region.Topic;
55 import org.apache.activemq.broker.region.TopicSubscription;
56 import org.apache.activemq.command.ActiveMQDestination;
57 import org.apache.activemq.command.ActiveMQMessage;
58 import org.apache.activemq.command.ActiveMQTopic;
59 import org.apache.activemq.command.ConsumerInfo;
60 import org.apache.activemq.command.Message;
61 import org.apache.activemq.command.MessageId;
62 import org.apache.activemq.command.SubscriptionInfo;
63 import org.apache.activemq.memory.UsageManager;
64 import org.apache.activemq.store.MessageRecoveryListener;
65 import org.apache.activemq.store.PersistenceAdapter;
66 import org.apache.activemq.store.TopicMessageStore;
67 import org.apache.activemq.thread.TaskRunnerFactory;
68 import org.apache.activemq.util.JMXSupport;
69 import org.apache.activemq.util.ServiceStopper;
70 import org.apache.activemq.util.SubscriptionKey;
71 import org.apache.commons.logging.Log;
72 import org.apache.commons.logging.LogFactory;
73
74 import java.util.concurrent.ConcurrentHashMap JavaDoc;
75 import java.util.concurrent.CopyOnWriteArraySet JavaDoc;
76
77 public class ManagedRegionBroker extends RegionBroker {
78     private static final Log log = LogFactory.getLog(ManagedRegionBroker.class);
79     private final MBeanServer JavaDoc mbeanServer;
80     private final ObjectName JavaDoc brokerObjectName;
81     private final Map JavaDoc topics = new ConcurrentHashMap JavaDoc();
82     private final Map JavaDoc queues = new ConcurrentHashMap JavaDoc();
83     private final Map JavaDoc temporaryQueues = new ConcurrentHashMap JavaDoc();
84     private final Map JavaDoc temporaryTopics = new ConcurrentHashMap JavaDoc();
85     private final Map JavaDoc queueSubscribers = new ConcurrentHashMap JavaDoc();
86     private final Map JavaDoc topicSubscribers = new ConcurrentHashMap JavaDoc();
87     private final Map JavaDoc durableTopicSubscribers = new ConcurrentHashMap JavaDoc();
88     private final Map JavaDoc inactiveDurableTopicSubscribers = new ConcurrentHashMap JavaDoc();
89     private final Map JavaDoc temporaryQueueSubscribers = new ConcurrentHashMap JavaDoc();
90     private final Map JavaDoc temporaryTopicSubscribers = new ConcurrentHashMap JavaDoc();
91     private final Map JavaDoc subscriptionKeys = new ConcurrentHashMap JavaDoc();
92     private final Map JavaDoc subscriptionMap = new ConcurrentHashMap JavaDoc();
93     private final Set JavaDoc registeredMBeans = new CopyOnWriteArraySet JavaDoc();
94     /* This is the first broker in the broker interceptor chain. */
95     private Broker contextBroker;
96
97     public ManagedRegionBroker(BrokerService brokerService,MBeanServer JavaDoc mbeanServer,ObjectName JavaDoc brokerObjectName,
98                     TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor)
99                     throws IOException JavaDoc{
100         super(brokerService,taskRunnerFactory,memoryManager, destinationFactory, destinationInterceptor);
101         this.mbeanServer=mbeanServer;
102         this.brokerObjectName=brokerObjectName;
103     }
104
105     public void start() throws Exception JavaDoc{
106         super.start();
107         // build all existing durable subscriptions
108
buildExistingSubscriptions();
109     }
110
111     protected void doStop(ServiceStopper stopper){
112         super.doStop(stopper);
113         // lets remove any mbeans not yet removed
114
for(Iterator JavaDoc iter=registeredMBeans.iterator();iter.hasNext();){
115             ObjectName JavaDoc name=(ObjectName JavaDoc) iter.next();
116             try{
117                 mbeanServer.unregisterMBean(name);
118             }catch(InstanceNotFoundException JavaDoc e){
119                 log.warn("The MBean: "+name+" is no longer registered with JMX");
120             }catch(Exception JavaDoc e){
121                 stopper.onException(this,e);
122             }
123         }
124         registeredMBeans.clear();
125     }
126
127     protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
128             DestinationFactory destinationFactory){
129         return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
130     }
131
132     protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){
133         return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
134     }
135
136     protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){
137         return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
138     }
139
140     protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
141             DestinationFactory destinationFactory){
142         return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
143     }
144
145     public void register(ActiveMQDestination destName,Destination destination){
146         // TODO refactor to allow views for custom destinations
147
try{
148             ObjectName JavaDoc objectName=createObjectName(destName);
149             DestinationView view;
150             if (destination instanceof Queue) {
151                 view=new QueueView(this,(Queue) destination);
152             } else if (destination instanceof Topic){
153                 view=new TopicView(this,(Topic) destination);
154             } else {
155                 view = null;
156                 log.warn("JMX View is not supported for custom destination: " + destination);
157             }
158             if (view != null) {
159                 registerDestination(objectName,destName,view);
160             }
161         }catch(Exception JavaDoc e){
162             log.error("Failed to register destination "+destName,e);
163         }
164     }
165
166     public void unregister(ActiveMQDestination destName){
167         try{
168             ObjectName JavaDoc objectName=createObjectName(destName);
169             unregisterDestination(objectName);
170         }catch(Exception JavaDoc e){
171             log.error("Failed to unregister "+destName,e);
172         }
173     }
174
175     public ObjectName JavaDoc registerSubscription(ConnectionContext context,Subscription sub){
176         Hashtable JavaDoc map=brokerObjectName.getKeyPropertyList();
177         String JavaDoc objectNameStr=brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+",Type=Subscription,";
178         String JavaDoc destinationType="destinationType="+sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
179         String JavaDoc destinationName="destinationName="
180                 +JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
181         String JavaDoc clientId="clientId="+JMXSupport.encodeObjectNamePart(context.getClientId());
182         String JavaDoc persistentMode="persistentMode=";
183         String JavaDoc consumerId="";
184         SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubscriptionName());
185         if(sub.getConsumerInfo().isDurable()){
186             persistentMode+="Durable, subscriptionID="
187                     +JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
188         }else{
189             persistentMode+="Non-Durable";
190             if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){
191                 consumerId=",consumerId="
192                         +JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
193             }
194         }
195         objectNameStr+=persistentMode+",";
196         objectNameStr+=destinationType+",";
197         objectNameStr+=destinationName+",";
198         objectNameStr+=clientId;
199         objectNameStr+=consumerId;
200         try{
201             ObjectName JavaDoc objectName=new ObjectName JavaDoc(objectNameStr);
202             SubscriptionView view;
203             if(sub.getConsumerInfo().isDurable()){
204                 view=new DurableSubscriptionView(this,context.getClientId(),sub);
205             }else{
206                 if(sub instanceof TopicSubscription){
207                     view=new TopicSubscriptionView(context.getClientId(),(TopicSubscription)sub);
208                 }else{
209                     view=new SubscriptionView(context.getClientId(),sub);
210                 }
211             }
212             registerSubscription(objectName,sub.getConsumerInfo(),key,view);
213             subscriptionMap.put(sub,objectName);
214             return objectName;
215         }catch(Exception JavaDoc e){
216             log.error("Failed to register subscription "+sub,e);
217             return null;
218         }
219     }
220
221     public void unregisterSubscription(Subscription sub){
222         ObjectName JavaDoc name=(ObjectName JavaDoc) subscriptionMap.remove(sub);
223         if(name!=null){
224             try{
225                 unregisterSubscription(name);
226             }catch(Exception JavaDoc e){
227                 log.error("Failed to unregister subscription "+sub,e);
228             }
229         }
230     }
231
232     protected void registerDestination(ObjectName JavaDoc key,ActiveMQDestination dest,DestinationView view) throws Exception JavaDoc{
233         if(dest.isQueue()){
234             if(dest.isTemporary()){
235                 temporaryQueues.put(key,view);
236             }else{
237                 queues.put(key,view);
238             }
239         }else{
240             if(dest.isTemporary()){
241                 temporaryTopics.put(key,view);
242             }else{
243                 topics.put(key,view);
244             }
245         }
246         try {
247             mbeanServer.registerMBean(view,key);
248             registeredMBeans.add(key);
249         } catch (Throwable JavaDoc e) {
250             log.warn("Failed to register MBean: "+key);
251             log.debug("Failure reason: "+e,e);
252         }
253     }
254
255     protected void unregisterDestination(ObjectName JavaDoc key) throws Exception JavaDoc{
256         topics.remove(key);
257         queues.remove(key);
258         temporaryQueues.remove(key);
259         temporaryTopics.remove(key);
260         if(registeredMBeans.remove(key)){
261             try {
262                 mbeanServer.unregisterMBean(key);
263             } catch (Throwable JavaDoc e) {
264                 log.warn("Failed to unregister MBean: "+key);
265                 log.debug("Failure reason: "+e,e);
266             }
267         }
268     }
269
270     protected void registerSubscription(ObjectName JavaDoc key,ConsumerInfo info,SubscriptionKey subscriptionKey,
271                     SubscriptionView view) throws Exception JavaDoc{
272         ActiveMQDestination dest=info.getDestination();
273         if(dest.isQueue()){
274             if(dest.isTemporary()){
275                 temporaryQueueSubscribers.put(key,view);
276             }else{
277                 queueSubscribers.put(key,view);
278             }
279         }else{
280             if(dest.isTemporary()){
281                 temporaryTopicSubscribers.put(key,view);
282             }else{
283                 if(info.isDurable()){
284                     durableTopicSubscribers.put(key,view);
285                     // unregister any inactive durable subs
286
try{
287                         ObjectName JavaDoc inactiveName=(ObjectName JavaDoc) subscriptionKeys.get(subscriptionKey);
288                         if(inactiveName!=null){
289                             inactiveDurableTopicSubscribers.remove(inactiveName);
290                             registeredMBeans.remove(inactiveName);
291                             mbeanServer.unregisterMBean(inactiveName);
292                         }
293                     }catch(Throwable JavaDoc e){
294                         log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e);
295                     }
296                 }else{
297                     topicSubscribers.put(key,view);
298                 }
299             }
300         }
301         
302         try {
303             mbeanServer.registerMBean(view,key);
304             registeredMBeans.add(key);
305         } catch (Throwable JavaDoc e) {
306             log.warn("Failed to register MBean: "+key);
307             log.debug("Failure reason: "+e,e);
308         }
309         
310     }
311
312     protected void unregisterSubscription(ObjectName JavaDoc key) throws Exception JavaDoc{
313         queueSubscribers.remove(key);
314         topicSubscribers.remove(key);
315         inactiveDurableTopicSubscribers.remove(key);
316         temporaryQueueSubscribers.remove(key);
317         temporaryTopicSubscribers.remove(key);
318         if(registeredMBeans.remove(key)){
319             try {
320                 mbeanServer.unregisterMBean(key);
321             } catch (Throwable JavaDoc e) {
322                 log.warn("Failed to unregister MBean: "+key);
323                 log.debug("Failure reason: "+e,e);
324             }
325         }
326         DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key);
327         if(view!=null){
328             // need to put this back in the inactive list
329
SubscriptionKey subscriptionKey=new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
330             SubscriptionInfo info=new SubscriptionInfo();
331             info.setClientId(subscriptionKey.getClientId());
332             info.setSubcriptionName(subscriptionKey.getSubscriptionName());
333             info.setDestination(new ActiveMQTopic(view.getDestinationName()));
334             addInactiveSubscription(subscriptionKey,info);
335         }
336     }
337
338     protected void buildExistingSubscriptions() throws Exception JavaDoc{
339         Map JavaDoc subscriptions=new HashMap JavaDoc();
340         Set JavaDoc destinations=destinationFactory.getDestinations();
341         if(destinations!=null){
342             for(Iterator JavaDoc iter=destinations.iterator();iter.hasNext();){
343                 ActiveMQDestination dest=(ActiveMQDestination) iter.next();
344                 if(dest.isTopic()){
345                     SubscriptionInfo[] infos= destinationFactory.getAllDurableSubscriptions((ActiveMQTopic) dest);
346                     if(infos!=null){
347                         for(int i=0;i<infos.length;i++){
348                             SubscriptionInfo info=infos[i];
349                             log.debug("Restoring durable subscription: "+infos);
350                             SubscriptionKey key=new SubscriptionKey(info);
351                             subscriptions.put(key,info);
352                         }
353                     }
354                 }
355             }
356         }
357         for(Iterator JavaDoc i=subscriptions.entrySet().iterator();i.hasNext();){
358             Map.Entry JavaDoc entry=(Entry) i.next();
359             SubscriptionKey key=(SubscriptionKey) entry.getKey();
360             SubscriptionInfo info=(SubscriptionInfo) entry.getValue();
361             addInactiveSubscription(key,info);
362         }
363     }
364
365     protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
366         Hashtable JavaDoc map=brokerObjectName.getKeyPropertyList();
367         try{
368             ObjectName JavaDoc objectName=new ObjectName JavaDoc(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
369                             +","+"Type=Subscription,"+"active=false,"+"name="
370                             +JMXSupport.encodeObjectNamePart(key.toString())+"");
371             SubscriptionView view=new InactiveDurableSubscriptionView(this,key.getClientId(),info);
372             
373             try {
374                 mbeanServer.registerMBean(view,objectName);
375                 registeredMBeans.add(objectName);
376             } catch (Throwable JavaDoc e) {
377                 log.warn("Failed to register MBean: "+key);
378                 log.debug("Failure reason: "+e,e);
379             }
380             
381             inactiveDurableTopicSubscribers.put(objectName,view);
382             subscriptionKeys.put(key,objectName);
383         }catch(Exception JavaDoc e){
384             log.error("Failed to register subscription "+info,e);
385         }
386     }
387
388     public CompositeData JavaDoc[] browse(SubscriptionView view) throws OpenDataException JavaDoc{
389         List JavaDoc messages=getSubscriberMessages(view);
390         CompositeData JavaDoc c[]=new CompositeData JavaDoc[messages.size()];
391         for(int i=0;i<c.length;i++){
392             try{
393                 c[i]=OpenTypeSupport.convert((Message) messages.get(i));
394             }catch(Throwable JavaDoc e){
395                 log.error("failed to browse : " + view,e);
396             }
397         }
398         return c;
399     }
400
401     public TabularData JavaDoc browseAsTable(SubscriptionView view) throws OpenDataException JavaDoc{
402         OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
403         List JavaDoc messages=getSubscriberMessages(view);
404         CompositeType JavaDoc ct=factory.getCompositeType();
405         TabularType JavaDoc tt=new TabularType JavaDoc("MessageList","MessageList",ct,new String JavaDoc[] { "JMSMessageID" });
406         TabularDataSupport JavaDoc rc=new TabularDataSupport JavaDoc(tt);
407         for(int i=0;i<messages.size();i++){
408             rc.put(new CompositeDataSupport JavaDoc(ct,factory.getFields(messages.get(i))));
409         }
410         return rc;
411     }
412
413     protected List JavaDoc getSubscriberMessages(SubscriptionView view){
414         //TODO It is very dangerous operation for big backlogs
415
if (!(destinationFactory instanceof DestinationFactoryImpl)) {
416             throw new RuntimeException JavaDoc("unsupported by " + destinationFactory);
417         }
418         PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
419         final List JavaDoc result=new ArrayList JavaDoc();
420         try{
421             ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
422             TopicMessageStore store=adapter.createTopicMessageStore(topic);
423             store.recover(new MessageRecoveryListener(){
424                 public void recoverMessage(Message message) throws Exception JavaDoc{
425                     result.add(message);
426                 }
427
428                 public void recoverMessageReference(MessageId messageReference) throws Exception JavaDoc{
429                     throw new RuntimeException JavaDoc("Should not be called.");
430                 }
431
432                 public void finished(){}
433
434                 public boolean hasSpace(){
435                     return true;
436                 }
437             });
438         }catch(Throwable JavaDoc e){
439             log.error("Failed to browse messages for Subscription "+view,e);
440         }
441         return result;
442         
443     }
444
445     protected ObjectName JavaDoc[] getTopics(){
446         Set JavaDoc set=topics.keySet();
447         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
448     }
449
450     protected ObjectName JavaDoc[] getQueues(){
451         Set JavaDoc set=queues.keySet();
452         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
453     }
454
455     protected ObjectName JavaDoc[] getTemporaryTopics(){
456         Set JavaDoc set=temporaryTopics.keySet();
457         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
458     }
459
460     protected ObjectName JavaDoc[] getTemporaryQueues(){
461         Set JavaDoc set=temporaryQueues.keySet();
462         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
463     }
464
465     protected ObjectName JavaDoc[] getTopicSubscribers(){
466         Set JavaDoc set=topicSubscribers.keySet();
467         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
468     }
469
470     protected ObjectName JavaDoc[] getDurableTopicSubscribers(){
471         Set JavaDoc set=durableTopicSubscribers.keySet();
472         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
473     }
474
475     protected ObjectName JavaDoc[] getQueueSubscribers(){
476         Set JavaDoc set=queueSubscribers.keySet();
477         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
478     }
479
480     protected ObjectName JavaDoc[] getTemporaryTopicSubscribers(){
481         Set JavaDoc set=temporaryTopicSubscribers.keySet();
482         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
483     }
484
485     protected ObjectName JavaDoc[] getTemporaryQueueSubscribers(){
486         Set JavaDoc set=temporaryQueueSubscribers.keySet();
487         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
488     }
489
490     protected ObjectName JavaDoc[] getInactiveDurableTopicSubscribers(){
491         Set JavaDoc set=inactiveDurableTopicSubscribers.keySet();
492         return (ObjectName JavaDoc[]) set.toArray(new ObjectName JavaDoc[set.size()]);
493     }
494
495     public Broker getContextBroker(){
496         return contextBroker;
497     }
498
499     public void setContextBroker(Broker contextBroker){
500         this.contextBroker=contextBroker;
501     }
502
503     protected ObjectName JavaDoc createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException JavaDoc{
504         // Build the object name for the destination
505
Hashtable JavaDoc map=brokerObjectName.getKeyPropertyList();
506         ObjectName JavaDoc objectName=new ObjectName JavaDoc(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+","
507                         +"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","
508                         +"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
509         return objectName;
510     }
511 }
512
Popular Tags