1 18 package org.apache.activemq.broker.jmx; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.HashMap ; 23 import java.util.Hashtable ; 24 import java.util.Iterator ; 25 import java.util.List ; 26 import java.util.Map ; 27 import java.util.Set ; 28 import java.util.Map.Entry; 29 30 import javax.management.InstanceNotFoundException ; 31 import javax.management.MBeanServer ; 32 import javax.management.MalformedObjectNameException ; 33 import javax.management.ObjectName ; 34 import javax.management.openmbean.CompositeData ; 35 import javax.management.openmbean.CompositeDataSupport ; 36 import javax.management.openmbean.CompositeType ; 37 import javax.management.openmbean.OpenDataException ; 38 import javax.management.openmbean.TabularData ; 39 import javax.management.openmbean.TabularDataSupport ; 40 import javax.management.openmbean.TabularType ; 41 42 import org.apache.activemq.broker.Broker; 43 import org.apache.activemq.broker.BrokerService; 44 import org.apache.activemq.broker.ConnectionContext; 45 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 46 import org.apache.activemq.broker.region.Destination; 47 import org.apache.activemq.broker.region.DestinationFactory; 48 import org.apache.activemq.broker.region.DestinationFactoryImpl; 49 import org.apache.activemq.broker.region.DestinationInterceptor; 50 import org.apache.activemq.broker.region.Queue; 51 import org.apache.activemq.broker.region.Region; 52 import org.apache.activemq.broker.region.RegionBroker; 53 import org.apache.activemq.broker.region.Subscription; 54 import org.apache.activemq.broker.region.Topic; 55 import org.apache.activemq.broker.region.TopicSubscription; 56 import org.apache.activemq.command.ActiveMQDestination; 57 import org.apache.activemq.command.ActiveMQMessage; 58 import org.apache.activemq.command.ActiveMQTopic; 59 import org.apache.activemq.command.ConsumerInfo; 60 import org.apache.activemq.command.Message; 61 import org.apache.activemq.command.MessageId; 62 import org.apache.activemq.command.SubscriptionInfo; 63 import org.apache.activemq.memory.UsageManager; 64 import org.apache.activemq.store.MessageRecoveryListener; 65 import org.apache.activemq.store.PersistenceAdapter; 66 import org.apache.activemq.store.TopicMessageStore; 67 import org.apache.activemq.thread.TaskRunnerFactory; 68 import org.apache.activemq.util.JMXSupport; 69 import org.apache.activemq.util.ServiceStopper; 70 import org.apache.activemq.util.SubscriptionKey; 71 import org.apache.commons.logging.Log; 72 import org.apache.commons.logging.LogFactory; 73 74 import java.util.concurrent.ConcurrentHashMap ; 75 import java.util.concurrent.CopyOnWriteArraySet ; 76 77 public class ManagedRegionBroker extends RegionBroker { 78 private static final Log log = LogFactory.getLog(ManagedRegionBroker.class); 79 private final MBeanServer mbeanServer; 80 private final ObjectName brokerObjectName; 81 private final Map topics = new ConcurrentHashMap (); 82 private final Map queues = new ConcurrentHashMap (); 83 private final Map temporaryQueues = new ConcurrentHashMap (); 84 private final Map temporaryTopics = new ConcurrentHashMap (); 85 private final Map queueSubscribers = new ConcurrentHashMap (); 86 private final Map topicSubscribers = new ConcurrentHashMap (); 87 private final Map durableTopicSubscribers = new ConcurrentHashMap (); 88 private final Map inactiveDurableTopicSubscribers = new ConcurrentHashMap (); 89 private final Map temporaryQueueSubscribers = new ConcurrentHashMap (); 90 private final Map temporaryTopicSubscribers = new ConcurrentHashMap (); 91 private final Map subscriptionKeys = new ConcurrentHashMap (); 92 private final Map subscriptionMap = new ConcurrentHashMap (); 93 private final Set registeredMBeans = new CopyOnWriteArraySet (); 94 95 private Broker contextBroker; 96 97 public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName, 98 TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) 99 throws IOException { 100 super(brokerService,taskRunnerFactory,memoryManager, destinationFactory, destinationInterceptor); 101 this.mbeanServer=mbeanServer; 102 this.brokerObjectName=brokerObjectName; 103 } 104 105 public void start() throws Exception { 106 super.start(); 107 buildExistingSubscriptions(); 109 } 110 111 protected void doStop(ServiceStopper stopper){ 112 super.doStop(stopper); 113 for(Iterator iter=registeredMBeans.iterator();iter.hasNext();){ 115 ObjectName name=(ObjectName ) iter.next(); 116 try{ 117 mbeanServer.unregisterMBean(name); 118 }catch(InstanceNotFoundException e){ 119 log.warn("The MBean: "+name+" is no longer registered with JMX"); 120 }catch(Exception e){ 121 stopper.onException(this,e); 122 } 123 } 124 registeredMBeans.clear(); 125 } 126 127 protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, 128 DestinationFactory destinationFactory){ 129 return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory); 130 } 131 132 protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){ 133 return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory); 134 } 135 136 protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){ 137 return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory); 138 } 139 140 protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, 141 DestinationFactory destinationFactory){ 142 return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory); 143 } 144 145 public void register(ActiveMQDestination destName,Destination destination){ 146 try{ 148 ObjectName objectName=createObjectName(destName); 149 DestinationView view; 150 if (destination instanceof Queue) { 151 view=new QueueView(this,(Queue) destination); 152 } else if (destination instanceof Topic){ 153 view=new TopicView(this,(Topic) destination); 154 } else { 155 view = null; 156 log.warn("JMX View is not supported for custom destination: " + destination); 157 } 158 if (view != null) { 159 registerDestination(objectName,destName,view); 160 } 161 }catch(Exception e){ 162 log.error("Failed to register destination "+destName,e); 163 } 164 } 165 166 public void unregister(ActiveMQDestination destName){ 167 try{ 168 ObjectName objectName=createObjectName(destName); 169 unregisterDestination(objectName); 170 }catch(Exception e){ 171 log.error("Failed to unregister "+destName,e); 172 } 173 } 174 175 public ObjectName registerSubscription(ConnectionContext context,Subscription sub){ 176 Hashtable map=brokerObjectName.getKeyPropertyList(); 177 String objectNameStr=brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+",Type=Subscription,"; 178 String destinationType="destinationType="+sub.getConsumerInfo().getDestination().getDestinationTypeAsString(); 179 String destinationName="destinationName=" 180 +JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName()); 181 String clientId="clientId="+JMXSupport.encodeObjectNamePart(context.getClientId()); 182 String persistentMode="persistentMode="; 183 String consumerId=""; 184 SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubscriptionName()); 185 if(sub.getConsumerInfo().isDurable()){ 186 persistentMode+="Durable, subscriptionID=" 187 +JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName()); 188 }else{ 189 persistentMode+="Non-Durable"; 190 if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){ 191 consumerId=",consumerId=" 192 +JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString()); 193 } 194 } 195 objectNameStr+=persistentMode+","; 196 objectNameStr+=destinationType+","; 197 objectNameStr+=destinationName+","; 198 objectNameStr+=clientId; 199 objectNameStr+=consumerId; 200 try{ 201 ObjectName objectName=new ObjectName (objectNameStr); 202 SubscriptionView view; 203 if(sub.getConsumerInfo().isDurable()){ 204 view=new DurableSubscriptionView(this,context.getClientId(),sub); 205 }else{ 206 if(sub instanceof TopicSubscription){ 207 view=new TopicSubscriptionView(context.getClientId(),(TopicSubscription)sub); 208 }else{ 209 view=new SubscriptionView(context.getClientId(),sub); 210 } 211 } 212 registerSubscription(objectName,sub.getConsumerInfo(),key,view); 213 subscriptionMap.put(sub,objectName); 214 return objectName; 215 }catch(Exception e){ 216 log.error("Failed to register subscription "+sub,e); 217 return null; 218 } 219 } 220 221 public void unregisterSubscription(Subscription sub){ 222 ObjectName name=(ObjectName ) subscriptionMap.remove(sub); 223 if(name!=null){ 224 try{ 225 unregisterSubscription(name); 226 }catch(Exception e){ 227 log.error("Failed to unregister subscription "+sub,e); 228 } 229 } 230 } 231 232 protected void registerDestination(ObjectName key,ActiveMQDestination dest,DestinationView view) throws Exception { 233 if(dest.isQueue()){ 234 if(dest.isTemporary()){ 235 temporaryQueues.put(key,view); 236 }else{ 237 queues.put(key,view); 238 } 239 }else{ 240 if(dest.isTemporary()){ 241 temporaryTopics.put(key,view); 242 }else{ 243 topics.put(key,view); 244 } 245 } 246 try { 247 mbeanServer.registerMBean(view,key); 248 registeredMBeans.add(key); 249 } catch (Throwable e) { 250 log.warn("Failed to register MBean: "+key); 251 log.debug("Failure reason: "+e,e); 252 } 253 } 254 255 protected void unregisterDestination(ObjectName key) throws Exception { 256 topics.remove(key); 257 queues.remove(key); 258 temporaryQueues.remove(key); 259 temporaryTopics.remove(key); 260 if(registeredMBeans.remove(key)){ 261 try { 262 mbeanServer.unregisterMBean(key); 263 } catch (Throwable e) { 264 log.warn("Failed to unregister MBean: "+key); 265 log.debug("Failure reason: "+e,e); 266 } 267 } 268 } 269 270 protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey, 271 SubscriptionView view) throws Exception { 272 ActiveMQDestination dest=info.getDestination(); 273 if(dest.isQueue()){ 274 if(dest.isTemporary()){ 275 temporaryQueueSubscribers.put(key,view); 276 }else{ 277 queueSubscribers.put(key,view); 278 } 279 }else{ 280 if(dest.isTemporary()){ 281 temporaryTopicSubscribers.put(key,view); 282 }else{ 283 if(info.isDurable()){ 284 durableTopicSubscribers.put(key,view); 285 try{ 287 ObjectName inactiveName=(ObjectName ) subscriptionKeys.get(subscriptionKey); 288 if(inactiveName!=null){ 289 inactiveDurableTopicSubscribers.remove(inactiveName); 290 registeredMBeans.remove(inactiveName); 291 mbeanServer.unregisterMBean(inactiveName); 292 } 293 }catch(Throwable e){ 294 log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e); 295 } 296 }else{ 297 topicSubscribers.put(key,view); 298 } 299 } 300 } 301 302 try { 303 mbeanServer.registerMBean(view,key); 304 registeredMBeans.add(key); 305 } catch (Throwable e) { 306 log.warn("Failed to register MBean: "+key); 307 log.debug("Failure reason: "+e,e); 308 } 309 310 } 311 312 protected void unregisterSubscription(ObjectName key) throws Exception { 313 queueSubscribers.remove(key); 314 topicSubscribers.remove(key); 315 inactiveDurableTopicSubscribers.remove(key); 316 temporaryQueueSubscribers.remove(key); 317 temporaryTopicSubscribers.remove(key); 318 if(registeredMBeans.remove(key)){ 319 try { 320 mbeanServer.unregisterMBean(key); 321 } catch (Throwable e) { 322 log.warn("Failed to unregister MBean: "+key); 323 log.debug("Failure reason: "+e,e); 324 } 325 } 326 DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key); 327 if(view!=null){ 328 SubscriptionKey subscriptionKey=new SubscriptionKey(view.getClientId(),view.getSubscriptionName()); 330 SubscriptionInfo info=new SubscriptionInfo(); 331 info.setClientId(subscriptionKey.getClientId()); 332 info.setSubcriptionName(subscriptionKey.getSubscriptionName()); 333 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 334 addInactiveSubscription(subscriptionKey,info); 335 } 336 } 337 338 protected void buildExistingSubscriptions() throws Exception { 339 Map subscriptions=new HashMap (); 340 Set destinations=destinationFactory.getDestinations(); 341 if(destinations!=null){ 342 for(Iterator iter=destinations.iterator();iter.hasNext();){ 343 ActiveMQDestination dest=(ActiveMQDestination) iter.next(); 344 if(dest.isTopic()){ 345 SubscriptionInfo[] infos= destinationFactory.getAllDurableSubscriptions((ActiveMQTopic) dest); 346 if(infos!=null){ 347 for(int i=0;i<infos.length;i++){ 348 SubscriptionInfo info=infos[i]; 349 log.debug("Restoring durable subscription: "+infos); 350 SubscriptionKey key=new SubscriptionKey(info); 351 subscriptions.put(key,info); 352 } 353 } 354 } 355 } 356 } 357 for(Iterator i=subscriptions.entrySet().iterator();i.hasNext();){ 358 Map.Entry entry=(Entry) i.next(); 359 SubscriptionKey key=(SubscriptionKey) entry.getKey(); 360 SubscriptionInfo info=(SubscriptionInfo) entry.getValue(); 361 addInactiveSubscription(key,info); 362 } 363 } 364 365 protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){ 366 Hashtable map=brokerObjectName.getKeyPropertyList(); 367 try{ 368 ObjectName objectName=new ObjectName (brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName") 369 +","+"Type=Subscription,"+"active=false,"+"name=" 370 +JMXSupport.encodeObjectNamePart(key.toString())+""); 371 SubscriptionView view=new InactiveDurableSubscriptionView(this,key.getClientId(),info); 372 373 try { 374 mbeanServer.registerMBean(view,objectName); 375 registeredMBeans.add(objectName); 376 } catch (Throwable e) { 377 log.warn("Failed to register MBean: "+key); 378 log.debug("Failure reason: "+e,e); 379 } 380 381 inactiveDurableTopicSubscribers.put(objectName,view); 382 subscriptionKeys.put(key,objectName); 383 }catch(Exception e){ 384 log.error("Failed to register subscription "+info,e); 385 } 386 } 387 388 public CompositeData [] browse(SubscriptionView view) throws OpenDataException { 389 List messages=getSubscriberMessages(view); 390 CompositeData c[]=new CompositeData [messages.size()]; 391 for(int i=0;i<c.length;i++){ 392 try{ 393 c[i]=OpenTypeSupport.convert((Message) messages.get(i)); 394 }catch(Throwable e){ 395 log.error("failed to browse : " + view,e); 396 } 397 } 398 return c; 399 } 400 401 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 402 OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class); 403 List messages=getSubscriberMessages(view); 404 CompositeType ct=factory.getCompositeType(); 405 TabularType tt=new TabularType ("MessageList","MessageList",ct,new String [] { "JMSMessageID" }); 406 TabularDataSupport rc=new TabularDataSupport (tt); 407 for(int i=0;i<messages.size();i++){ 408 rc.put(new CompositeDataSupport (ct,factory.getFields(messages.get(i)))); 409 } 410 return rc; 411 } 412 413 protected List getSubscriberMessages(SubscriptionView view){ 414 if (!(destinationFactory instanceof DestinationFactoryImpl)) { 416 throw new RuntimeException ("unsupported by " + destinationFactory); 417 } 418 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); 419 final List result=new ArrayList (); 420 try{ 421 ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName()); 422 TopicMessageStore store=adapter.createTopicMessageStore(topic); 423 store.recover(new MessageRecoveryListener(){ 424 public void recoverMessage(Message message) throws Exception { 425 result.add(message); 426 } 427 428 public void recoverMessageReference(MessageId messageReference) throws Exception { 429 throw new RuntimeException ("Should not be called."); 430 } 431 432 public void finished(){} 433 434 public boolean hasSpace(){ 435 return true; 436 } 437 }); 438 }catch(Throwable e){ 439 log.error("Failed to browse messages for Subscription "+view,e); 440 } 441 return result; 442 443 } 444 445 protected ObjectName [] getTopics(){ 446 Set set=topics.keySet(); 447 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 448 } 449 450 protected ObjectName [] getQueues(){ 451 Set set=queues.keySet(); 452 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 453 } 454 455 protected ObjectName [] getTemporaryTopics(){ 456 Set set=temporaryTopics.keySet(); 457 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 458 } 459 460 protected ObjectName [] getTemporaryQueues(){ 461 Set set=temporaryQueues.keySet(); 462 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 463 } 464 465 protected ObjectName [] getTopicSubscribers(){ 466 Set set=topicSubscribers.keySet(); 467 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 468 } 469 470 protected ObjectName [] getDurableTopicSubscribers(){ 471 Set set=durableTopicSubscribers.keySet(); 472 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 473 } 474 475 protected ObjectName [] getQueueSubscribers(){ 476 Set set=queueSubscribers.keySet(); 477 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 478 } 479 480 protected ObjectName [] getTemporaryTopicSubscribers(){ 481 Set set=temporaryTopicSubscribers.keySet(); 482 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 483 } 484 485 protected ObjectName [] getTemporaryQueueSubscribers(){ 486 Set set=temporaryQueueSubscribers.keySet(); 487 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 488 } 489 490 protected ObjectName [] getInactiveDurableTopicSubscribers(){ 491 Set set=inactiveDurableTopicSubscribers.keySet(); 492 return (ObjectName []) set.toArray(new ObjectName [set.size()]); 493 } 494 495 public Broker getContextBroker(){ 496 return contextBroker; 497 } 498 499 public void setContextBroker(Broker contextBroker){ 500 this.contextBroker=contextBroker; 501 } 502 503 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException { 504 Hashtable map=brokerObjectName.getKeyPropertyList(); 506 ObjectName objectName=new ObjectName (brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+"," 507 +"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+"," 508 +"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); 509 return objectName; 510 } 511 } 512 | Popular Tags |