|                                                                                                              1
 18  package org.apache.activemq.broker.region;
 19
 20  import java.io.IOException
  ; 21  import java.net.URI
  ; 22  import java.util.ArrayList
  ; 23  import java.util.Collections
  ; 24  import java.util.HashMap
  ; 25  import java.util.Map
  ; 26  import java.util.Set
  ; 27
 28  import javax.jms.InvalidClientIDException
  ; 29  import javax.jms.JMSException
  ; 30
 31  import org.apache.activemq.broker.Broker;
 32  import org.apache.activemq.broker.BrokerService;
 33  import org.apache.activemq.broker.Connection;
 34  import org.apache.activemq.broker.ConnectionContext;
 35  import org.apache.activemq.broker.ConsumerBrokerExchange;
 36  import org.apache.activemq.broker.DestinationAlreadyExistsException;
 37  import org.apache.activemq.broker.ProducerBrokerExchange;
 38  import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 39  import org.apache.activemq.broker.region.policy.PolicyMap;
 40  import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
 41  import org.apache.activemq.command.ActiveMQDestination;
 42  import org.apache.activemq.command.BrokerId;
 43  import org.apache.activemq.command.BrokerInfo;
 44  import org.apache.activemq.command.ConnectionId;
 45  import org.apache.activemq.command.ConnectionInfo;
 46  import org.apache.activemq.command.ConsumerInfo;
 47  import org.apache.activemq.command.DestinationInfo;
 48  import org.apache.activemq.command.Message;
 49  import org.apache.activemq.command.MessageAck;
 50  import org.apache.activemq.command.MessageDispatch;
 51  import org.apache.activemq.command.MessageDispatchNotification;
 52  import org.apache.activemq.command.MessagePull;
 53  import org.apache.activemq.command.ProducerInfo;
 54  import org.apache.activemq.command.RemoveSubscriptionInfo;
 55  import org.apache.activemq.command.Response;
 56  import org.apache.activemq.command.SessionInfo;
 57  import org.apache.activemq.command.TransactionId;
 58  import org.apache.activemq.kaha.Store;
 59  import org.apache.activemq.memory.UsageManager;
 60  import org.apache.activemq.store.PersistenceAdapter;
 61  import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 62  import org.apache.activemq.thread.TaskRunnerFactory;
 63  import org.apache.activemq.util.IdGenerator;
 64  import org.apache.activemq.util.LongSequenceGenerator;
 65  import org.apache.activemq.util.ServiceStopper;
 66
 67  import java.util.concurrent.ConcurrentHashMap
  ; 68  import java.util.concurrent.CopyOnWriteArrayList
  ; 69
 70
 71
 76  public class RegionBroker implements Broker {
 77
 78      private static final IdGenerator brokerIdGenerator = new IdGenerator();
 79
 80      private final Region queueRegion;
 81      private final Region topicRegion;
 82      private final Region tempQueueRegion;
 83      private final Region tempTopicRegion;
 84      private BrokerService brokerService;
 85      private boolean started = false;
 86      private boolean keepDurableSubsActive=false;
 87
 88      protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
 89
 90      private final CopyOnWriteArrayList
  connections = new CopyOnWriteArrayList  (); 91      private final HashMap
  destinations = new HashMap  (); 92      private final CopyOnWriteArrayList
  brokerInfos = new CopyOnWriteArrayList  (); 93
 94      private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
 95      private BrokerId brokerId;
 96      private String
  brokerName; 97      private Map
  clientIdSet = new HashMap  ();     private final DestinationInterceptor destinationInterceptor; 99      private ConnectionContext adminConnectionContext;
 100     protected DestinationFactory destinationFactory;
 101     protected final ConcurrentHashMap
  connectionStates = new ConcurrentHashMap  (); 102
 103
 104     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException
  { 105         this.brokerService = brokerService;
 106         if (destinationFactory == null) {
 107             throw new IllegalArgumentException
  ("null destinationFactory"); 108         }
 109         this.sequenceGenerator.setLastSequenceId( destinationFactory.getLastMessageBrokerSequenceId() );
 110         this.destinationFactory = destinationFactory;
 111         queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
 112         topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
 113         this.destinationInterceptor = destinationInterceptor;
 114         tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
 115         tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
 116     }
 117
 118     public Map
  getDestinationMap() { 119         Map
  answer = getQueueRegion().getDestinationMap(); 120         answer.putAll(getTopicRegion().getDestinationMap());
 121         return answer;
 122     }
 123
 124     public Set
  getDestinations(ActiveMQDestination destination) { 125         switch(destination.getDestinationType()) {
 126         case ActiveMQDestination.QUEUE_TYPE:
 127             return queueRegion.getDestinations(destination);
 128         case ActiveMQDestination.TOPIC_TYPE:
 129             return topicRegion.getDestinations(destination);
 130         case ActiveMQDestination.TEMP_QUEUE_TYPE:
 131             return tempQueueRegion.getDestinations(destination);
 132         case ActiveMQDestination.TEMP_TOPIC_TYPE:
 133             return tempTopicRegion.getDestinations(destination);
 134         default:
 135             return Collections.EMPTY_SET;
 136         }
 137     }
 138
 139     public Broker getAdaptor(Class
  type){ 140         if (type.isInstance(this)){
 141             return this;
 142         }
 143         return null;
 144     }
 145
 146     public Region getQueueRegion() {
 147         return queueRegion;
 148     }
 149
 150     public Region getTempQueueRegion() {
 151         return tempQueueRegion;
 152     }
 153
 154     public Region getTempTopicRegion() {
 155         return tempTopicRegion;
 156     }
 157
 158     public Region getTopicRegion() {
 159         return topicRegion;
 160     }
 161
 162     protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
 163         return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
 164     }
 165
 166     protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
 167         return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
 168     }
 169
 170     protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
 171         return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
 172     }
 173
 174     protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
 175         return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
 176     }
 177
 178     private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException
  { 179         return new MemoryPersistenceAdapter();
 180     }
 181
 182
 183     public void start() throws Exception
  { 184         ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
 185         started = true;
 186         queueRegion.start();
 187         topicRegion.start();
 188         tempQueueRegion.start();
 189         tempTopicRegion.start();
 190     }
 191
 192     public void stop() throws Exception
  { 193         started = false;
 194         ServiceStopper ss = new ServiceStopper();
 195         doStop(ss);
 196         ss.throwFirstException();
 197     }
 198
 199     public PolicyMap getDestinationPolicy(){
 200         return brokerService != null ? brokerService.getDestinationPolicy() : null;
 201     }
 202
 203     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
  { 204         String
  clientId = info.getClientId(); 205         if (clientId == null) {
 206             throw new InvalidClientIDException
  ("No clientID specified for connection request"); 207         }
 208         synchronized (clientIdSet ) {
 209             if (clientIdSet.containsKey(clientId)) {
 210                 throw new InvalidClientIDException
  ("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected"); 211             }
 212             else {
 213                 clientIdSet.put(clientId, info);
 214             }
 215         }
 216
 217         connections.add(context.getConnection());
 218     }
 219
 220     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
  error) throws Exception  { 221         String
  clientId = info.getClientId(); 222         if (clientId == null) {
 223             throw new InvalidClientIDException
  ("No clientID specified for connection disconnect request"); 224         }
 225         synchronized (clientIdSet) {
 226             ConnectionInfo oldValue = (ConnectionInfo) clientIdSet.get(clientId);
 227                                     if (oldValue != null) {
 230                 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
 231                     clientIdSet.remove(clientId);
 232                 }
 233             }
 234         }
 235         connections.remove(context.getConnection());
 236     }
 237
 238     protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
 239         return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
 240     }
 241
 242     public Connection[] getClients() throws Exception
  { 243         ArrayList
  l = new ArrayList  (connections); 244         Connection rc[] = new Connection[l.size()];
 245         l.toArray(rc);
 246         return rc;
 247     }
 248
 249     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception
  { 250
 251         Destination answer;
 252         synchronized(destinations) {
 253             answer = (Destination) destinations.get(destination);
 254             if( answer!=null )
 255                 return answer;
 256         }
 257
 258         switch(destination.getDestinationType()) {
 259         case ActiveMQDestination.QUEUE_TYPE:
 260             answer  = queueRegion.addDestination(context, destination);
 261             break;
 262         case ActiveMQDestination.TOPIC_TYPE:
 263             answer = topicRegion.addDestination(context, destination);
 264             break;
 265         case ActiveMQDestination.TEMP_QUEUE_TYPE:
 266             answer = tempQueueRegion.addDestination(context, destination);
 267             break;
 268         case ActiveMQDestination.TEMP_TOPIC_TYPE:
 269             answer = tempTopicRegion.addDestination(context, destination);
 270             break;
 271         default:
 272             throw createUnknownDestinationTypeException(destination);
 273         }
 274
 275         synchronized(destinations) {
 276             destinations.put(destination, answer);
 277             return answer;
 278         }
 279     }
 280
 281     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception
  { 282         synchronized(destinations) {
 283             if( destinations.remove(destination)!=null ){
 284                 switch(destination.getDestinationType()){
 285                 case ActiveMQDestination.QUEUE_TYPE:
 286                     queueRegion.removeDestination(context,destination,timeout);
 287                     break;
 288                 case ActiveMQDestination.TOPIC_TYPE:
 289                     topicRegion.removeDestination(context,destination,timeout);
 290                     break;
 291                 case ActiveMQDestination.TEMP_QUEUE_TYPE:
 292                     tempQueueRegion.removeDestination(context,destination,timeout);
 293                     break;
 294                 case ActiveMQDestination.TEMP_TOPIC_TYPE:
 295                     tempTopicRegion.removeDestination(context,destination,timeout);
 296                     break;
 297                 default:
 298                     throw createUnknownDestinationTypeException(destination);
 299                 }
 300             }
 301         }
 302     }
 303
 304     public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception
  { 305         addDestination(context,info.getDestination());
 306
 307     }
 308
 309     public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception
  { 310         removeDestination(context,info.getDestination(), info.getTimeout());
 311
 312     }
 313
 314     public ActiveMQDestination[] getDestinations() throws Exception
  { 315         ArrayList
  l; 316         synchronized(destinations) {
 317             l = new ArrayList
  (destinations.values()); 318         }
 319         ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
 320         l.toArray(rc);
 321         return rc;
 322     }
 323
 324
 325     public void addSession(ConnectionContext context, SessionInfo info) throws Exception
  { 326     }
 327
 328     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception
  { 329     }
 330
 331     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception
  { 332     }
 333
 334     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception
  { 335     }
 336
 337     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
  { 338         ActiveMQDestination destination = info.getDestination();
 339         switch(destination.getDestinationType()) {
 340         case ActiveMQDestination.QUEUE_TYPE:
 341             return queueRegion.addConsumer(context, info);
 342
 343         case ActiveMQDestination.TOPIC_TYPE:
 344             return topicRegion.addConsumer(context, info);
 345
 346         case ActiveMQDestination.TEMP_QUEUE_TYPE:
 347             return tempQueueRegion.addConsumer(context, info);
 348
 349         case ActiveMQDestination.TEMP_TOPIC_TYPE:
 350             return tempTopicRegion.addConsumer(context, info);
 351
 352         default:
 353             throw createUnknownDestinationTypeException(destination);
 354         }
 355     }
 356
 357     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
  { 358         ActiveMQDestination destination = info.getDestination();
 359         switch(destination.getDestinationType()) {
 360         case ActiveMQDestination.QUEUE_TYPE:
 361             queueRegion.removeConsumer(context, info);
 362             break;
 363         case ActiveMQDestination.TOPIC_TYPE:
 364             topicRegion.removeConsumer(context, info);
 365             break;
 366         case ActiveMQDestination.TEMP_QUEUE_TYPE:
 367             tempQueueRegion.removeConsumer(context, info);
 368             break;
 369         case ActiveMQDestination.TEMP_TOPIC_TYPE:
 370             tempTopicRegion.removeConsumer(context, info);
 371             break;
 372         default:
 373             throw createUnknownDestinationTypeException(destination);
 374         }
 375     }
 376
 377     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception
  { 378         topicRegion.removeSubscription(context, info);
 379     }
 380
 381     public void send(ProducerBrokerExchange producerExchange,  Message message) throws Exception
  { 382         long si = sequenceGenerator.getNextSequenceId();
 383         message.getMessageId().setBrokerSequenceId(si);
 384         if (producerExchange.isMutable() || producerExchange.getRegion()==null) {
 385             ActiveMQDestination destination = message.getDestination();
 386                         addDestination(producerExchange.getConnectionContext(),destination);
 388             Region region = null;
 389             switch(destination.getDestinationType()) {
 390             case ActiveMQDestination.QUEUE_TYPE:
 391                 region = queueRegion;
 392                 break;
 393             case ActiveMQDestination.TOPIC_TYPE:
 394                 region = topicRegion;
 395                 break;
 396             case ActiveMQDestination.TEMP_QUEUE_TYPE:
 397                 region = tempQueueRegion;
 398                 break;
 399             case ActiveMQDestination.TEMP_TOPIC_TYPE:
 400                 region = tempTopicRegion;
 401                 break;
 402             default:
 403                 throw createUnknownDestinationTypeException(destination);
 404             }
 405             producerExchange.setRegion(region);
 406         }
 407         producerExchange.getRegion().send(producerExchange,message);
 408     }
 409
 410     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception
  { 411         if(consumerExchange.isWildcard() || consumerExchange.getRegion()==null){
 412             ActiveMQDestination destination=ack.getDestination();
 413             Region region=null;
 414             switch(destination.getDestinationType()){
 415             case ActiveMQDestination.QUEUE_TYPE:
 416                 region=queueRegion;
 417                 break;
 418             case ActiveMQDestination.TOPIC_TYPE:
 419                 region=topicRegion;
 420                 break;
 421             case ActiveMQDestination.TEMP_QUEUE_TYPE:
 422                 region=tempQueueRegion;
 423                 break;
 424             case ActiveMQDestination.TEMP_TOPIC_TYPE:
 425                 region=tempTopicRegion;
 426                 break;
 427             default:
 428                 throw createUnknownDestinationTypeException(destination);
 429             }
 430             consumerExchange.setRegion(region);
 431         }
 432         consumerExchange.getRegion().acknowledge(consumerExchange,ack);
 433     }
 434
 435
 436     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
  { 437         ActiveMQDestination destination = pull.getDestination();
 438         switch (destination.getDestinationType()) {
 439         case ActiveMQDestination.QUEUE_TYPE:
 440             return queueRegion.messagePull(context, pull);
 441
 442         case ActiveMQDestination.TOPIC_TYPE:
 443             return topicRegion.messagePull(context, pull);
 444
 445         case ActiveMQDestination.TEMP_QUEUE_TYPE:
 446             return tempQueueRegion.messagePull(context, pull);
 447
 448         case ActiveMQDestination.TEMP_TOPIC_TYPE:
 449             return tempTopicRegion.messagePull(context, pull);
 450         default:
 451             throw createUnknownDestinationTypeException(destination);
 452         }
 453     }
 454
 455     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception
  { 456         throw new IllegalAccessException
  ("Transaction operation not implemented by this broker."); 457     }
 458
 459     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception
  { 460         throw new IllegalAccessException
  ("Transaction operation not implemented by this broker."); 461     }
 462
 463     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception
  { 464         throw new IllegalAccessException
  ("Transaction operation not implemented by this broker."); 465     }
 466
 467     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception
  { 468         throw new IllegalAccessException
  ("Transaction operation not implemented by this broker."); 469     }
 470
 471     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception
  { 472         throw new IllegalAccessException
  ("Transaction operation not implemented by this broker."); 473     }
 474
 475     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception
  { 476         throw new IllegalAccessException
  ("Transaction operation not implemented by this broker."); 477     }
 478
 479
 480     public void gc() {
 481         queueRegion.gc();
 482         topicRegion.gc();
 483     }
 484
 485     public BrokerId getBrokerId() {
 486         if( brokerId==null ) {
 487                                     brokerId=new BrokerId(brokerIdGenerator.generateId());
 490         }
 491         return brokerId;
 492     }
 493
 494     public void setBrokerId(BrokerId brokerId) {
 495         this.brokerId = brokerId;
 496     }
 497
 498     public String
  getBrokerName() { 499         if( brokerName==null ) {
 500             try {
 501                 brokerName = java.net.InetAddress.getLocalHost().getHostName().toLowerCase();
 502             } catch (Exception
  e) { 503                 brokerName="localhost";
 504             }
 505         }
 506         return brokerName;
 507     }
 508
 509     public void setBrokerName(String
  brokerName) { 510         this.brokerName = brokerName;
 511     }
 512
 513     public DestinationStatistics getDestinationStatistics() {
 514         return destinationStatistics;
 515     }
 516
 517     protected JMSException
  createUnknownDestinationTypeException(ActiveMQDestination destination) { 518         return new JMSException
  ("Unknown destination type: " + destination.getDestinationType()); 519     }
 520
 521     public synchronized void addBroker(Connection connection,BrokerInfo info){
 522             brokerInfos.add(info);
 523     }
 524
 525     public synchronized void removeBroker(Connection connection,BrokerInfo info){
 526         if (info != null){
 527             brokerInfos.remove(info);
 528         }
 529     }
 530
 531     public synchronized BrokerInfo[] getPeerBrokerInfos(){
 532         BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
 533         result = (BrokerInfo[])brokerInfos.toArray(result);
 534         return result;
 535     }
 536
 537     public void processDispatch(MessageDispatch messageDispatch){
 538
 539     }
 540
 541     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception
  { 542         ActiveMQDestination destination = messageDispatchNotification.getDestination();
 543         switch(destination.getDestinationType()) {
 544         case ActiveMQDestination.QUEUE_TYPE:
 545             queueRegion.processDispatchNotification(messageDispatchNotification);
 546             break;
 547         case ActiveMQDestination.TOPIC_TYPE:
 548             topicRegion.processDispatchNotification(messageDispatchNotification);
 549             break;
 550         case ActiveMQDestination.TEMP_QUEUE_TYPE:
 551             tempQueueRegion.processDispatchNotification(messageDispatchNotification);
 552             break;
 553         case ActiveMQDestination.TEMP_TOPIC_TYPE:
 554             tempTopicRegion.processDispatchNotification(messageDispatchNotification);
 555             break;
 556         default:
 557             throw createUnknownDestinationTypeException(destination);
 558         }
 559     }
 560
 561     public boolean isSlaveBroker(){
 562         return brokerService.isSlave();
 563     }
 564
 565     public boolean isStopped(){
 566         return !started;
 567     }
 568
 569     public Set
  getDurableDestinations(){ 570         return destinationFactory.getDestinations();
 571     }
 572
 573     public boolean isFaultTolerantConfiguration(){
 574         return false;
 575     }
 576
 577
 578     protected void doStop(ServiceStopper ss) {
 579         ss.stop(queueRegion);
 580         ss.stop(topicRegion);
 581         ss.stop(tempQueueRegion);
 582         ss.stop(tempTopicRegion);
 583     }
 584
 585     public boolean isKeepDurableSubsActive() {
 586         return keepDurableSubsActive;
 587     }
 588
 589     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
 590         this.keepDurableSubsActive = keepDurableSubsActive;
 591     }
 592
 593     public DestinationInterceptor getDestinationInterceptor() {
 594         return destinationInterceptor;
 595     }
 596
 597     public ConnectionContext getAdminConnectionContext() {
 598         return adminConnectionContext;
 599     }
 600
 601     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
 602         this.adminConnectionContext = adminConnectionContext;
 603     }
 604
 605     public Map
  getConnectionStates() { 606         return connectionStates;
 607     }
 608
 609     public Store getTempDataStore() {
 610         return brokerService.getTempDataStore();
 611     }
 612
 613     public URI
  getVmConnectorURI(){ 614         return brokerService.getVmConnectorURI();
 615     }
 616 }
 617
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |