|                                                                                                              1
 18  package org.apache.activemq.broker.region;
 19
 20  import java.util.HashMap
  ; 21  import java.util.Iterator
  ; 22  import java.util.Map
  ; 23  import java.util.Set
  ; 24
 25  import javax.jms.JMSException
  ; 26
 27  import org.apache.activemq.broker.ConnectionContext;
 28  import org.apache.activemq.broker.ConsumerBrokerExchange;
 29  import org.apache.activemq.broker.DestinationAlreadyExistsException;
 30  import org.apache.activemq.broker.ProducerBrokerExchange;
 31  import org.apache.activemq.command.ActiveMQDestination;
 32  import org.apache.activemq.command.ConsumerInfo;
 33  import org.apache.activemq.command.Message;
 34  import org.apache.activemq.command.MessageAck;
 35  import org.apache.activemq.command.MessageDispatchNotification;
 36  import org.apache.activemq.command.MessagePull;
 37  import org.apache.activemq.command.RemoveSubscriptionInfo;
 38  import org.apache.activemq.command.Response;
 39  import org.apache.activemq.filter.DestinationMap;
 40  import org.apache.activemq.memory.UsageManager;
 41  import org.apache.activemq.thread.TaskRunnerFactory;
 42  import org.apache.commons.logging.Log;
 43  import org.apache.commons.logging.LogFactory;
 44  import sun.security.x509.IssuerAlternativeNameExtension;
 45
 46  import java.util.concurrent.ConcurrentHashMap
  ; 47
 48
 52  abstract public class AbstractRegion implements Region {
 53
 54      private static final Log log = LogFactory.getLog(AbstractRegion.class);
 55
 56      protected final ConcurrentHashMap
  destinations = new ConcurrentHashMap  (); 57      protected final DestinationMap destinationMap = new DestinationMap();
 58      protected final ConcurrentHashMap
  subscriptions = new ConcurrentHashMap  (); 59      protected final UsageManager memoryManager;
 60      protected final DestinationFactory destinationFactory;
 61      protected final DestinationStatistics destinationStatistics;
 62      protected final RegionBroker broker;
 63      protected boolean autoCreateDestinations=true;
 64      protected final TaskRunnerFactory taskRunnerFactory;
 65      protected final Object
  destinationsMutex = new Object  (); 66      protected final Map
  consumerChangeMutexMap = new HashMap  (); 67      protected boolean started = false;
 68
 69      public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
 70          if (broker == null) {
 71              throw new IllegalArgumentException
  ("null broker"); 72          }
 73          this.broker = broker;
 74          this.destinationStatistics = destinationStatistics;
 75          this.memoryManager = memoryManager;
 76          this.taskRunnerFactory = taskRunnerFactory;
 77          if (broker == null) {
 78              throw new IllegalArgumentException
  ("null destinationFactory"); 79          }
 80          this.destinationFactory = destinationFactory;
 81      }
 82
 83      public void start() throws Exception
  { 84          started = true;
 85          for (Iterator
  i = destinations.values().iterator();i.hasNext();) { 86              Destination dest = (Destination)i.next();
 87              dest.start();
 88          }
 89      }
 90
 91      public void stop() throws Exception
  { 92          started = false;
 93          for (Iterator
  i = destinations.values().iterator();i.hasNext();) { 94              Destination dest = (Destination)i.next();
 95              dest.stop();
 96          }
 97          destinations.clear();
 98      }
 99
 100     public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Exception
  { 101         log.debug("Adding destination: "+destination);
 102         synchronized(destinationsMutex){
 103             Destination dest=(Destination)destinations.get(destination);
 104             if(dest==null){
 105                 dest=createDestination(context,destination);
 106                                 DestinationInterceptor destinationInterceptor=broker.getDestinationInterceptor();
 108                 if(destinationInterceptor!=null){
 109                     dest=destinationInterceptor.intercept(dest);
 110                 }
 111                 dest.start();
 112                 destinations.put(destination,dest);
 113                 destinationMap.put(destination,dest);
 114                                 for(Iterator
  iter=subscriptions.values().iterator();iter.hasNext();){ 116                     Subscription sub=(Subscription)iter.next();
 117                     if(sub.matches(destination)){
 118                         dest.addSubscription(context,sub);
 119                     }
 120                 }
 121             }
 122             return dest;
 123         }
 124     }
 125
 126     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
 127                     throws Exception
  { 128
 129                 if( timeout == 0 ) {
 131             for(Iterator
  iter=subscriptions.values().iterator();iter.hasNext();){ 132                 Subscription sub=(Subscription) iter.next();
 133                 if(sub.matches(destination)){
 134                     throw new JMSException
  ("Destination still has an active subscription: "+destination); 135                 }
 136             }
 137         }
 138
 139         if( timeout > 0 ) {
 140
 144         }
 145
 146         log.debug("Removing destination: "+destination);
 147         synchronized(destinationsMutex){
 148             Destination dest=(Destination) destinations.remove(destination);
 149             if(dest!=null){
 150
 151                                 for(Iterator
  iter=subscriptions.values().iterator();iter.hasNext();){ 153                     Subscription sub=(Subscription) iter.next();
 154                     if(sub.matches(destination)){
 155                         dest.removeSubscription(context, sub);
 156                     }
 157                 }
 158
 159                 destinationMap.removeAll(destination);
 160                 dest.dispose(context);
 161                 dest.stop();
 162
 163             }else{
 164                 log.debug("Destination doesn't exist: " + dest);
 165             }
 166         }
 167     }
 168
 169
 174     public Set
  getDestinations(ActiveMQDestination destination) { 175         synchronized(destinationsMutex){
 176             return destinationMap.get(destination);
 177         }
 178     }
 179
 180     public Map
  getDestinationMap() { 181         synchronized(destinationsMutex){
 182             return new HashMap
  (destinations); 183         }
 184     }
 185
 186     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
  { 187         log.debug("Adding consumer: "+info.getConsumerId());
 188         ActiveMQDestination destination = info.getDestination();
 189         if (destination != null && ! destination.isPattern() && ! destination.isComposite()) {
 190                         lookup(context, destination);
 192         }
 193
 194         Object
  addGuard; 195         synchronized(consumerChangeMutexMap) {
 196             addGuard = consumerChangeMutexMap.get(info.getConsumerId());
 197             if (addGuard == null) {
 198                 addGuard = new Object
  (); 199                 consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
 200             }
 201         }
 202         synchronized (addGuard) {
 203             Object
  o = subscriptions.get(info.getConsumerId()); 204             if (o != null) {
 205                 log.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
 206                 return (Subscription)o;
 207             }
 208
 209             Subscription sub = createSubscription(context, info);
 210
 211                                                                                                                         Set
  inactiveDests = getInactiveDestinations(); 221             for (Iterator
  iter = inactiveDests.iterator(); iter.hasNext();) { 222                 ActiveMQDestination dest = (ActiveMQDestination) iter.next();
 223                 if( sub.matches(dest) ) {
 224                     context.getBroker().addDestination(context, dest);
 225                 }
 226             }
 227
 228
 229             subscriptions.put(info.getConsumerId(), sub);
 230
 231
 238                         for (Iterator
  iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 240                 Destination dest = (Destination) iter.next();
 241                 dest.addSubscription(context, sub);
 242             }
 243
 244             if( info.isBrowser() ) {
 245                 ((QueueBrowserSubscription)sub).browseDone();
 246             }
 247
 248             return sub;
 249         }
 250     }
 251
 252
 256     public Set
  getDurableDestinations(){ 257         return destinationFactory.getDestinations();
 258     }
 259
 260
 263     protected Set
  getInactiveDestinations() { 264         Set
  inactiveDests = destinationFactory.getDestinations(); 265         inactiveDests.removeAll( destinations.keySet() );
 266         return inactiveDests;
 267     }
 268
 269     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
  { 270         log.debug("Removing consumer: "+info.getConsumerId());
 271
 272         Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId());
 273         if( sub==null )
 274             throw new IllegalArgumentException
  ("The subscription does not exist: "+info.getConsumerId()); 275
 276                 for (Iterator
  iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 278             Destination dest = (Destination) iter.next();
 279             dest.removeSubscription(context, sub);
 280         }
 281
 282         destroySubscription(sub);
 283
 284         synchronized (consumerChangeMutexMap) {
 285             consumerChangeMutexMap.remove(info.getConsumerId());
 286         }
 287     }
 288
 289     protected void destroySubscription(Subscription sub) {
 290         sub.destroy();
 291     }
 292
 293     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception
  { 294         throw new JMSException
  ("Invalid operation."); 295     }
 296
 297     public void send(final ProducerBrokerExchange producerExchange, Message messageSend)
 298             throws Exception
  { 299         final ConnectionContext context = producerExchange.getConnectionContext();
 300
 301         if (producerExchange.isMutable() || producerExchange.getRegionDestination()==null) {
 302             final Destination regionDestination = lookup(context,messageSend.getDestination());
 303             producerExchange.setRegionDestination(regionDestination);
 304         }
 305
 306         producerExchange.getRegionDestination().send(producerExchange, messageSend);
 307     }
 308
 309     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception
  { 310         Subscription sub=consumerExchange.getSubscription();
 311         if(sub==null){
 312             sub=(Subscription)subscriptions.get(ack.getConsumerId());
 313             if(sub==null){
 314                 throw new IllegalArgumentException
  ("The subscription does not exist: "+ack.getConsumerId()); 315             }
 316             consumerExchange.setSubscription(sub);
 317         }
 318         sub.acknowledge(consumerExchange.getConnectionContext(),ack);
 319     }
 320
 321     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
  { 322         Subscription sub = (Subscription) subscriptions.get(pull.getConsumerId());
 323         if( sub==null )
 324             throw new IllegalArgumentException
  ("The subscription does not exist: "+pull.getConsumerId()); 325         return sub.pullMessage(context, pull);
 326     }
 327
 328     protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception
  { 329         synchronized(destinationsMutex){
 330             Destination dest=(Destination) destinations.get(destination);
 331             if(dest==null){
 332                 if(autoCreateDestinations){
 333                                                             try {
 336                         dest = addDestination(context, destination);
 337                                             }
 339                     catch (DestinationAlreadyExistsException e) {
 340                                             }
 342                                                         }
 345                 if(dest==null){
 346                     throw new JMSException
  ("The destination "+destination+" does not exist."); 347                 }
 348             }
 349             return dest;
 350         }
 351     }
 352
 353     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception
  { 354         Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId());
 355         if (sub != null){
 356             sub.processMessageDispatchNotification(messageDispatchNotification);
 357         }
 358     }
 359     public void gc() {
 360         for (Iterator
  iter = subscriptions.values().iterator(); iter.hasNext();) { 361             Subscription sub = (Subscription) iter.next();
 362             sub.gc();
 363         }
 364         for (Iterator
  iter = destinations.values()  .iterator(); iter.hasNext();) { 365             Destination dest = (Destination) iter.next();
 366             dest.gc();
 367         }
 368     }
 369
 370     protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception
  ; 371
 372     protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception
  { 373         return destinationFactory.createDestination(context, destination, destinationStatistics);
 374     }
 375
 376     public boolean isAutoCreateDestinations() {
 377         return autoCreateDestinations;
 378     }
 379
 380     public void setAutoCreateDestinations(boolean autoCreateDestinations) {
 381         this.autoCreateDestinations = autoCreateDestinations;
 382     }
 383
 384
 385 }
 386
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |