1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.util.ArrayList ; 23 import java.util.Collections ; 24 import java.util.HashMap ; 25 import java.util.Map ; 26 import java.util.Set ; 27 28 import javax.jms.InvalidClientIDException ; 29 import javax.jms.JMSException ; 30 31 import org.apache.activemq.broker.Broker; 32 import org.apache.activemq.broker.BrokerService; 33 import org.apache.activemq.broker.Connection; 34 import org.apache.activemq.broker.ConnectionContext; 35 import org.apache.activemq.broker.ConsumerBrokerExchange; 36 import org.apache.activemq.broker.DestinationAlreadyExistsException; 37 import org.apache.activemq.broker.ProducerBrokerExchange; 38 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; 39 import org.apache.activemq.broker.region.policy.PolicyMap; 40 import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; 41 import org.apache.activemq.command.ActiveMQDestination; 42 import org.apache.activemq.command.BrokerId; 43 import org.apache.activemq.command.BrokerInfo; 44 import org.apache.activemq.command.ConnectionId; 45 import org.apache.activemq.command.ConnectionInfo; 46 import org.apache.activemq.command.ConsumerInfo; 47 import org.apache.activemq.command.DestinationInfo; 48 import org.apache.activemq.command.Message; 49 import org.apache.activemq.command.MessageAck; 50 import org.apache.activemq.command.MessageDispatch; 51 import org.apache.activemq.command.MessageDispatchNotification; 52 import org.apache.activemq.command.MessagePull; 53 import org.apache.activemq.command.ProducerInfo; 54 import org.apache.activemq.command.RemoveSubscriptionInfo; 55 import org.apache.activemq.command.Response; 56 import org.apache.activemq.command.SessionInfo; 57 import org.apache.activemq.command.TransactionId; 58 import org.apache.activemq.kaha.Store; 59 import org.apache.activemq.memory.UsageManager; 60 import org.apache.activemq.store.PersistenceAdapter; 61 import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 62 import org.apache.activemq.thread.TaskRunnerFactory; 63 import org.apache.activemq.util.IdGenerator; 64 import org.apache.activemq.util.LongSequenceGenerator; 65 import org.apache.activemq.util.ServiceStopper; 66 67 import java.util.concurrent.ConcurrentHashMap ; 68 import java.util.concurrent.CopyOnWriteArrayList ; 69 70 71 76 public class RegionBroker implements Broker { 77 78 private static final IdGenerator brokerIdGenerator = new IdGenerator(); 79 80 private final Region queueRegion; 81 private final Region topicRegion; 82 private final Region tempQueueRegion; 83 private final Region tempTopicRegion; 84 private BrokerService brokerService; 85 private boolean started = false; 86 private boolean keepDurableSubsActive=false; 87 88 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 89 90 private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList (); 91 private final HashMap destinations = new HashMap (); 92 private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList (); 93 94 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 95 private BrokerId brokerId; 96 private String brokerName; 97 private Map clientIdSet = new HashMap (); private final DestinationInterceptor destinationInterceptor; 99 private ConnectionContext adminConnectionContext; 100 protected DestinationFactory destinationFactory; 101 protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap (); 102 103 104 public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { 105 this.brokerService = brokerService; 106 if (destinationFactory == null) { 107 throw new IllegalArgumentException ("null destinationFactory"); 108 } 109 this.sequenceGenerator.setLastSequenceId( destinationFactory.getLastMessageBrokerSequenceId() ); 110 this.destinationFactory = destinationFactory; 111 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 112 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 113 this.destinationInterceptor = destinationInterceptor; 114 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 115 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 116 } 117 118 public Map getDestinationMap() { 119 Map answer = getQueueRegion().getDestinationMap(); 120 answer.putAll(getTopicRegion().getDestinationMap()); 121 return answer; 122 } 123 124 public Set getDestinations(ActiveMQDestination destination) { 125 switch(destination.getDestinationType()) { 126 case ActiveMQDestination.QUEUE_TYPE: 127 return queueRegion.getDestinations(destination); 128 case ActiveMQDestination.TOPIC_TYPE: 129 return topicRegion.getDestinations(destination); 130 case ActiveMQDestination.TEMP_QUEUE_TYPE: 131 return tempQueueRegion.getDestinations(destination); 132 case ActiveMQDestination.TEMP_TOPIC_TYPE: 133 return tempTopicRegion.getDestinations(destination); 134 default: 135 return Collections.EMPTY_SET; 136 } 137 } 138 139 public Broker getAdaptor(Class type){ 140 if (type.isInstance(this)){ 141 return this; 142 } 143 return null; 144 } 145 146 public Region getQueueRegion() { 147 return queueRegion; 148 } 149 150 public Region getTempQueueRegion() { 151 return tempQueueRegion; 152 } 153 154 public Region getTempTopicRegion() { 155 return tempTopicRegion; 156 } 157 158 public Region getTopicRegion() { 159 return topicRegion; 160 } 161 162 protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 163 return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 164 } 165 166 protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 167 return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 168 } 169 170 protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 171 return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 172 } 173 174 protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 175 return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 176 } 177 178 private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException { 179 return new MemoryPersistenceAdapter(); 180 } 181 182 183 public void start() throws Exception { 184 ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 185 started = true; 186 queueRegion.start(); 187 topicRegion.start(); 188 tempQueueRegion.start(); 189 tempTopicRegion.start(); 190 } 191 192 public void stop() throws Exception { 193 started = false; 194 ServiceStopper ss = new ServiceStopper(); 195 doStop(ss); 196 ss.throwFirstException(); 197 } 198 199 public PolicyMap getDestinationPolicy(){ 200 return brokerService != null ? brokerService.getDestinationPolicy() : null; 201 } 202 203 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 204 String clientId = info.getClientId(); 205 if (clientId == null) { 206 throw new InvalidClientIDException ("No clientID specified for connection request"); 207 } 208 synchronized (clientIdSet ) { 209 if (clientIdSet.containsKey(clientId)) { 210 throw new InvalidClientIDException ("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected"); 211 } 212 else { 213 clientIdSet.put(clientId, info); 214 } 215 } 216 217 connections.add(context.getConnection()); 218 } 219 220 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 221 String clientId = info.getClientId(); 222 if (clientId == null) { 223 throw new InvalidClientIDException ("No clientID specified for connection disconnect request"); 224 } 225 synchronized (clientIdSet) { 226 ConnectionInfo oldValue = (ConnectionInfo) clientIdSet.get(clientId); 227 if (oldValue != null) { 230 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 231 clientIdSet.remove(clientId); 232 } 233 } 234 } 235 connections.remove(context.getConnection()); 236 } 237 238 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 239 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 240 } 241 242 public Connection[] getClients() throws Exception { 243 ArrayList l = new ArrayList (connections); 244 Connection rc[] = new Connection[l.size()]; 245 l.toArray(rc); 246 return rc; 247 } 248 249 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 250 251 Destination answer; 252 synchronized(destinations) { 253 answer = (Destination) destinations.get(destination); 254 if( answer!=null ) 255 return answer; 256 } 257 258 switch(destination.getDestinationType()) { 259 case ActiveMQDestination.QUEUE_TYPE: 260 answer = queueRegion.addDestination(context, destination); 261 break; 262 case ActiveMQDestination.TOPIC_TYPE: 263 answer = topicRegion.addDestination(context, destination); 264 break; 265 case ActiveMQDestination.TEMP_QUEUE_TYPE: 266 answer = tempQueueRegion.addDestination(context, destination); 267 break; 268 case ActiveMQDestination.TEMP_TOPIC_TYPE: 269 answer = tempTopicRegion.addDestination(context, destination); 270 break; 271 default: 272 throw createUnknownDestinationTypeException(destination); 273 } 274 275 synchronized(destinations) { 276 destinations.put(destination, answer); 277 return answer; 278 } 279 } 280 281 public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception { 282 synchronized(destinations) { 283 if( destinations.remove(destination)!=null ){ 284 switch(destination.getDestinationType()){ 285 case ActiveMQDestination.QUEUE_TYPE: 286 queueRegion.removeDestination(context,destination,timeout); 287 break; 288 case ActiveMQDestination.TOPIC_TYPE: 289 topicRegion.removeDestination(context,destination,timeout); 290 break; 291 case ActiveMQDestination.TEMP_QUEUE_TYPE: 292 tempQueueRegion.removeDestination(context,destination,timeout); 293 break; 294 case ActiveMQDestination.TEMP_TOPIC_TYPE: 295 tempTopicRegion.removeDestination(context,destination,timeout); 296 break; 297 default: 298 throw createUnknownDestinationTypeException(destination); 299 } 300 } 301 } 302 } 303 304 public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception { 305 addDestination(context,info.getDestination()); 306 307 } 308 309 public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception { 310 removeDestination(context,info.getDestination(), info.getTimeout()); 311 312 } 313 314 public ActiveMQDestination[] getDestinations() throws Exception { 315 ArrayList l; 316 synchronized(destinations) { 317 l = new ArrayList (destinations.values()); 318 } 319 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 320 l.toArray(rc); 321 return rc; 322 } 323 324 325 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 326 } 327 328 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 329 } 330 331 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 332 } 333 334 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 335 } 336 337 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 338 ActiveMQDestination destination = info.getDestination(); 339 switch(destination.getDestinationType()) { 340 case ActiveMQDestination.QUEUE_TYPE: 341 return queueRegion.addConsumer(context, info); 342 343 case ActiveMQDestination.TOPIC_TYPE: 344 return topicRegion.addConsumer(context, info); 345 346 case ActiveMQDestination.TEMP_QUEUE_TYPE: 347 return tempQueueRegion.addConsumer(context, info); 348 349 case ActiveMQDestination.TEMP_TOPIC_TYPE: 350 return tempTopicRegion.addConsumer(context, info); 351 352 default: 353 throw createUnknownDestinationTypeException(destination); 354 } 355 } 356 357 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 358 ActiveMQDestination destination = info.getDestination(); 359 switch(destination.getDestinationType()) { 360 case ActiveMQDestination.QUEUE_TYPE: 361 queueRegion.removeConsumer(context, info); 362 break; 363 case ActiveMQDestination.TOPIC_TYPE: 364 topicRegion.removeConsumer(context, info); 365 break; 366 case ActiveMQDestination.TEMP_QUEUE_TYPE: 367 tempQueueRegion.removeConsumer(context, info); 368 break; 369 case ActiveMQDestination.TEMP_TOPIC_TYPE: 370 tempTopicRegion.removeConsumer(context, info); 371 break; 372 default: 373 throw createUnknownDestinationTypeException(destination); 374 } 375 } 376 377 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 378 topicRegion.removeSubscription(context, info); 379 } 380 381 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 382 long si = sequenceGenerator.getNextSequenceId(); 383 message.getMessageId().setBrokerSequenceId(si); 384 if (producerExchange.isMutable() || producerExchange.getRegion()==null) { 385 ActiveMQDestination destination = message.getDestination(); 386 addDestination(producerExchange.getConnectionContext(),destination); 388 Region region = null; 389 switch(destination.getDestinationType()) { 390 case ActiveMQDestination.QUEUE_TYPE: 391 region = queueRegion; 392 break; 393 case ActiveMQDestination.TOPIC_TYPE: 394 region = topicRegion; 395 break; 396 case ActiveMQDestination.TEMP_QUEUE_TYPE: 397 region = tempQueueRegion; 398 break; 399 case ActiveMQDestination.TEMP_TOPIC_TYPE: 400 region = tempTopicRegion; 401 break; 402 default: 403 throw createUnknownDestinationTypeException(destination); 404 } 405 producerExchange.setRegion(region); 406 } 407 producerExchange.getRegion().send(producerExchange,message); 408 } 409 410 public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception { 411 if(consumerExchange.isWildcard() || consumerExchange.getRegion()==null){ 412 ActiveMQDestination destination=ack.getDestination(); 413 Region region=null; 414 switch(destination.getDestinationType()){ 415 case ActiveMQDestination.QUEUE_TYPE: 416 region=queueRegion; 417 break; 418 case ActiveMQDestination.TOPIC_TYPE: 419 region=topicRegion; 420 break; 421 case ActiveMQDestination.TEMP_QUEUE_TYPE: 422 region=tempQueueRegion; 423 break; 424 case ActiveMQDestination.TEMP_TOPIC_TYPE: 425 region=tempTopicRegion; 426 break; 427 default: 428 throw createUnknownDestinationTypeException(destination); 429 } 430 consumerExchange.setRegion(region); 431 } 432 consumerExchange.getRegion().acknowledge(consumerExchange,ack); 433 } 434 435 436 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 437 ActiveMQDestination destination = pull.getDestination(); 438 switch (destination.getDestinationType()) { 439 case ActiveMQDestination.QUEUE_TYPE: 440 return queueRegion.messagePull(context, pull); 441 442 case ActiveMQDestination.TOPIC_TYPE: 443 return topicRegion.messagePull(context, pull); 444 445 case ActiveMQDestination.TEMP_QUEUE_TYPE: 446 return tempQueueRegion.messagePull(context, pull); 447 448 case ActiveMQDestination.TEMP_TOPIC_TYPE: 449 return tempTopicRegion.messagePull(context, pull); 450 default: 451 throw createUnknownDestinationTypeException(destination); 452 } 453 } 454 455 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 456 throw new IllegalAccessException ("Transaction operation not implemented by this broker."); 457 } 458 459 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 460 throw new IllegalAccessException ("Transaction operation not implemented by this broker."); 461 } 462 463 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 464 throw new IllegalAccessException ("Transaction operation not implemented by this broker."); 465 } 466 467 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 468 throw new IllegalAccessException ("Transaction operation not implemented by this broker."); 469 } 470 471 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 472 throw new IllegalAccessException ("Transaction operation not implemented by this broker."); 473 } 474 475 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 476 throw new IllegalAccessException ("Transaction operation not implemented by this broker."); 477 } 478 479 480 public void gc() { 481 queueRegion.gc(); 482 topicRegion.gc(); 483 } 484 485 public BrokerId getBrokerId() { 486 if( brokerId==null ) { 487 brokerId=new BrokerId(brokerIdGenerator.generateId()); 490 } 491 return brokerId; 492 } 493 494 public void setBrokerId(BrokerId brokerId) { 495 this.brokerId = brokerId; 496 } 497 498 public String getBrokerName() { 499 if( brokerName==null ) { 500 try { 501 brokerName = java.net.InetAddress.getLocalHost().getHostName().toLowerCase(); 502 } catch (Exception e) { 503 brokerName="localhost"; 504 } 505 } 506 return brokerName; 507 } 508 509 public void setBrokerName(String brokerName) { 510 this.brokerName = brokerName; 511 } 512 513 public DestinationStatistics getDestinationStatistics() { 514 return destinationStatistics; 515 } 516 517 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 518 return new JMSException ("Unknown destination type: " + destination.getDestinationType()); 519 } 520 521 public synchronized void addBroker(Connection connection,BrokerInfo info){ 522 brokerInfos.add(info); 523 } 524 525 public synchronized void removeBroker(Connection connection,BrokerInfo info){ 526 if (info != null){ 527 brokerInfos.remove(info); 528 } 529 } 530 531 public synchronized BrokerInfo[] getPeerBrokerInfos(){ 532 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 533 result = (BrokerInfo[])brokerInfos.toArray(result); 534 return result; 535 } 536 537 public void processDispatch(MessageDispatch messageDispatch){ 538 539 } 540 541 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 542 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 543 switch(destination.getDestinationType()) { 544 case ActiveMQDestination.QUEUE_TYPE: 545 queueRegion.processDispatchNotification(messageDispatchNotification); 546 break; 547 case ActiveMQDestination.TOPIC_TYPE: 548 topicRegion.processDispatchNotification(messageDispatchNotification); 549 break; 550 case ActiveMQDestination.TEMP_QUEUE_TYPE: 551 tempQueueRegion.processDispatchNotification(messageDispatchNotification); 552 break; 553 case ActiveMQDestination.TEMP_TOPIC_TYPE: 554 tempTopicRegion.processDispatchNotification(messageDispatchNotification); 555 break; 556 default: 557 throw createUnknownDestinationTypeException(destination); 558 } 559 } 560 561 public boolean isSlaveBroker(){ 562 return brokerService.isSlave(); 563 } 564 565 public boolean isStopped(){ 566 return !started; 567 } 568 569 public Set getDurableDestinations(){ 570 return destinationFactory.getDestinations(); 571 } 572 573 public boolean isFaultTolerantConfiguration(){ 574 return false; 575 } 576 577 578 protected void doStop(ServiceStopper ss) { 579 ss.stop(queueRegion); 580 ss.stop(topicRegion); 581 ss.stop(tempQueueRegion); 582 ss.stop(tempTopicRegion); 583 } 584 585 public boolean isKeepDurableSubsActive() { 586 return keepDurableSubsActive; 587 } 588 589 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 590 this.keepDurableSubsActive = keepDurableSubsActive; 591 } 592 593 public DestinationInterceptor getDestinationInterceptor() { 594 return destinationInterceptor; 595 } 596 597 public ConnectionContext getAdminConnectionContext() { 598 return adminConnectionContext; 599 } 600 601 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 602 this.adminConnectionContext = adminConnectionContext; 603 } 604 605 public Map getConnectionStates() { 606 return connectionStates; 607 } 608 609 public Store getTempDataStore() { 610 return brokerService.getTempDataStore(); 611 } 612 613 public URI getVmConnectorURI(){ 614 return brokerService.getVmConnectorURI(); 615 } 616 } 617 | Popular Tags |