1 18 package org.apache.activemq.broker.region; 19 20 import java.util.HashMap ; 21 import java.util.Iterator ; 22 import java.util.Map ; 23 import java.util.Set ; 24 25 import javax.jms.JMSException ; 26 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.broker.ConsumerBrokerExchange; 29 import org.apache.activemq.broker.DestinationAlreadyExistsException; 30 import org.apache.activemq.broker.ProducerBrokerExchange; 31 import org.apache.activemq.command.ActiveMQDestination; 32 import org.apache.activemq.command.ConsumerInfo; 33 import org.apache.activemq.command.Message; 34 import org.apache.activemq.command.MessageAck; 35 import org.apache.activemq.command.MessageDispatchNotification; 36 import org.apache.activemq.command.MessagePull; 37 import org.apache.activemq.command.RemoveSubscriptionInfo; 38 import org.apache.activemq.command.Response; 39 import org.apache.activemq.filter.DestinationMap; 40 import org.apache.activemq.memory.UsageManager; 41 import org.apache.activemq.thread.TaskRunnerFactory; 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 import sun.security.x509.IssuerAlternativeNameExtension; 45 46 import java.util.concurrent.ConcurrentHashMap ; 47 48 52 abstract public class AbstractRegion implements Region { 53 54 private static final Log log = LogFactory.getLog(AbstractRegion.class); 55 56 protected final ConcurrentHashMap destinations = new ConcurrentHashMap (); 57 protected final DestinationMap destinationMap = new DestinationMap(); 58 protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap (); 59 protected final UsageManager memoryManager; 60 protected final DestinationFactory destinationFactory; 61 protected final DestinationStatistics destinationStatistics; 62 protected final RegionBroker broker; 63 protected boolean autoCreateDestinations=true; 64 protected final TaskRunnerFactory taskRunnerFactory; 65 protected final Object destinationsMutex = new Object (); 66 protected final Map consumerChangeMutexMap = new HashMap (); 67 protected boolean started = false; 68 69 public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 70 if (broker == null) { 71 throw new IllegalArgumentException ("null broker"); 72 } 73 this.broker = broker; 74 this.destinationStatistics = destinationStatistics; 75 this.memoryManager = memoryManager; 76 this.taskRunnerFactory = taskRunnerFactory; 77 if (broker == null) { 78 throw new IllegalArgumentException ("null destinationFactory"); 79 } 80 this.destinationFactory = destinationFactory; 81 } 82 83 public void start() throws Exception { 84 started = true; 85 for (Iterator i = destinations.values().iterator();i.hasNext();) { 86 Destination dest = (Destination)i.next(); 87 dest.start(); 88 } 89 } 90 91 public void stop() throws Exception { 92 started = false; 93 for (Iterator i = destinations.values().iterator();i.hasNext();) { 94 Destination dest = (Destination)i.next(); 95 dest.stop(); 96 } 97 destinations.clear(); 98 } 99 100 public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Exception { 101 log.debug("Adding destination: "+destination); 102 synchronized(destinationsMutex){ 103 Destination dest=(Destination)destinations.get(destination); 104 if(dest==null){ 105 dest=createDestination(context,destination); 106 DestinationInterceptor destinationInterceptor=broker.getDestinationInterceptor(); 108 if(destinationInterceptor!=null){ 109 dest=destinationInterceptor.intercept(dest); 110 } 111 dest.start(); 112 destinations.put(destination,dest); 113 destinationMap.put(destination,dest); 114 for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ 116 Subscription sub=(Subscription)iter.next(); 117 if(sub.matches(destination)){ 118 dest.addSubscription(context,sub); 119 } 120 } 121 } 122 return dest; 123 } 124 } 125 126 public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) 127 throws Exception { 128 129 if( timeout == 0 ) { 131 for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ 132 Subscription sub=(Subscription) iter.next(); 133 if(sub.matches(destination)){ 134 throw new JMSException ("Destination still has an active subscription: "+destination); 135 } 136 } 137 } 138 139 if( timeout > 0 ) { 140 144 } 145 146 log.debug("Removing destination: "+destination); 147 synchronized(destinationsMutex){ 148 Destination dest=(Destination) destinations.remove(destination); 149 if(dest!=null){ 150 151 for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ 153 Subscription sub=(Subscription) iter.next(); 154 if(sub.matches(destination)){ 155 dest.removeSubscription(context, sub); 156 } 157 } 158 159 destinationMap.removeAll(destination); 160 dest.dispose(context); 161 dest.stop(); 162 163 }else{ 164 log.debug("Destination doesn't exist: " + dest); 165 } 166 } 167 } 168 169 174 public Set getDestinations(ActiveMQDestination destination) { 175 synchronized(destinationsMutex){ 176 return destinationMap.get(destination); 177 } 178 } 179 180 public Map getDestinationMap() { 181 synchronized(destinationsMutex){ 182 return new HashMap (destinations); 183 } 184 } 185 186 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 187 log.debug("Adding consumer: "+info.getConsumerId()); 188 ActiveMQDestination destination = info.getDestination(); 189 if (destination != null && ! destination.isPattern() && ! destination.isComposite()) { 190 lookup(context, destination); 192 } 193 194 Object addGuard; 195 synchronized(consumerChangeMutexMap) { 196 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 197 if (addGuard == null) { 198 addGuard = new Object (); 199 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 200 } 201 } 202 synchronized (addGuard) { 203 Object o = subscriptions.get(info.getConsumerId()); 204 if (o != null) { 205 log.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 206 return (Subscription)o; 207 } 208 209 Subscription sub = createSubscription(context, info); 210 211 Set inactiveDests = getInactiveDestinations(); 221 for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) { 222 ActiveMQDestination dest = (ActiveMQDestination) iter.next(); 223 if( sub.matches(dest) ) { 224 context.getBroker().addDestination(context, dest); 225 } 226 } 227 228 229 subscriptions.put(info.getConsumerId(), sub); 230 231 238 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 240 Destination dest = (Destination) iter.next(); 241 dest.addSubscription(context, sub); 242 } 243 244 if( info.isBrowser() ) { 245 ((QueueBrowserSubscription)sub).browseDone(); 246 } 247 248 return sub; 249 } 250 } 251 252 256 public Set getDurableDestinations(){ 257 return destinationFactory.getDestinations(); 258 } 259 260 263 protected Set getInactiveDestinations() { 264 Set inactiveDests = destinationFactory.getDestinations(); 265 inactiveDests.removeAll( destinations.keySet() ); 266 return inactiveDests; 267 } 268 269 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 270 log.debug("Removing consumer: "+info.getConsumerId()); 271 272 Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId()); 273 if( sub==null ) 274 throw new IllegalArgumentException ("The subscription does not exist: "+info.getConsumerId()); 275 276 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 278 Destination dest = (Destination) iter.next(); 279 dest.removeSubscription(context, sub); 280 } 281 282 destroySubscription(sub); 283 284 synchronized (consumerChangeMutexMap) { 285 consumerChangeMutexMap.remove(info.getConsumerId()); 286 } 287 } 288 289 protected void destroySubscription(Subscription sub) { 290 sub.destroy(); 291 } 292 293 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 294 throw new JMSException ("Invalid operation."); 295 } 296 297 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) 298 throws Exception { 299 final ConnectionContext context = producerExchange.getConnectionContext(); 300 301 if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) { 302 final Destination regionDestination = lookup(context,messageSend.getDestination()); 303 producerExchange.setRegionDestination(regionDestination); 304 } 305 306 producerExchange.getRegionDestination().send(producerExchange, messageSend); 307 } 308 309 public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception { 310 Subscription sub=consumerExchange.getSubscription(); 311 if(sub==null){ 312 sub=(Subscription)subscriptions.get(ack.getConsumerId()); 313 if(sub==null){ 314 throw new IllegalArgumentException ("The subscription does not exist: "+ack.getConsumerId()); 315 } 316 consumerExchange.setSubscription(sub); 317 } 318 sub.acknowledge(consumerExchange.getConnectionContext(),ack); 319 } 320 321 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 322 Subscription sub = (Subscription) subscriptions.get(pull.getConsumerId()); 323 if( sub==null ) 324 throw new IllegalArgumentException ("The subscription does not exist: "+pull.getConsumerId()); 325 return sub.pullMessage(context, pull); 326 } 327 328 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception { 329 synchronized(destinationsMutex){ 330 Destination dest=(Destination) destinations.get(destination); 331 if(dest==null){ 332 if(autoCreateDestinations){ 333 try { 336 dest = addDestination(context, destination); 337 } 339 catch (DestinationAlreadyExistsException e) { 340 } 342 } 345 if(dest==null){ 346 throw new JMSException ("The destination "+destination+" does not exist."); 347 } 348 } 349 return dest; 350 } 351 } 352 353 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 354 Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId()); 355 if (sub != null){ 356 sub.processMessageDispatchNotification(messageDispatchNotification); 357 } 358 } 359 public void gc() { 360 for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { 361 Subscription sub = (Subscription) iter.next(); 362 sub.gc(); 363 } 364 for (Iterator iter = destinations.values() .iterator(); iter.hasNext();) { 365 Destination dest = (Destination) iter.next(); 366 dest.gc(); 367 } 368 } 369 370 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception ; 371 372 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 373 return destinationFactory.createDestination(context, destination, destinationStatistics); 374 } 375 376 public boolean isAutoCreateDestinations() { 377 return autoCreateDestinations; 378 } 379 380 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 381 this.autoCreateDestinations = autoCreateDestinations; 382 } 383 384 385 } 386 | Popular Tags |