KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > RegionBroker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21 import java.net.URI JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.Collections JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Map JavaDoc;
26 import java.util.Set JavaDoc;
27
28 import javax.jms.InvalidClientIDException JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30
31 import org.apache.activemq.broker.Broker;
32 import org.apache.activemq.broker.BrokerService;
33 import org.apache.activemq.broker.Connection;
34 import org.apache.activemq.broker.ConnectionContext;
35 import org.apache.activemq.broker.ConsumerBrokerExchange;
36 import org.apache.activemq.broker.DestinationAlreadyExistsException;
37 import org.apache.activemq.broker.ProducerBrokerExchange;
38 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
39 import org.apache.activemq.broker.region.policy.PolicyMap;
40 import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
41 import org.apache.activemq.command.ActiveMQDestination;
42 import org.apache.activemq.command.BrokerId;
43 import org.apache.activemq.command.BrokerInfo;
44 import org.apache.activemq.command.ConnectionId;
45 import org.apache.activemq.command.ConnectionInfo;
46 import org.apache.activemq.command.ConsumerInfo;
47 import org.apache.activemq.command.DestinationInfo;
48 import org.apache.activemq.command.Message;
49 import org.apache.activemq.command.MessageAck;
50 import org.apache.activemq.command.MessageDispatch;
51 import org.apache.activemq.command.MessageDispatchNotification;
52 import org.apache.activemq.command.MessagePull;
53 import org.apache.activemq.command.ProducerInfo;
54 import org.apache.activemq.command.RemoveSubscriptionInfo;
55 import org.apache.activemq.command.Response;
56 import org.apache.activemq.command.SessionInfo;
57 import org.apache.activemq.command.TransactionId;
58 import org.apache.activemq.kaha.Store;
59 import org.apache.activemq.memory.UsageManager;
60 import org.apache.activemq.store.PersistenceAdapter;
61 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
62 import org.apache.activemq.thread.TaskRunnerFactory;
63 import org.apache.activemq.util.IdGenerator;
64 import org.apache.activemq.util.LongSequenceGenerator;
65 import org.apache.activemq.util.ServiceStopper;
66
67 import java.util.concurrent.ConcurrentHashMap JavaDoc;
68 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
69
70
71 /**
72  * Routes Broker operations to the correct messaging regions for processing.
73  *
74  * @version $Revision$
75  */

76 public class RegionBroker implements Broker {
77
78     private static final IdGenerator brokerIdGenerator = new IdGenerator();
79
80     private final Region queueRegion;
81     private final Region topicRegion;
82     private final Region tempQueueRegion;
83     private final Region tempTopicRegion;
84     private BrokerService brokerService;
85     private boolean started = false;
86     private boolean keepDurableSubsActive=false;
87     
88     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
89     
90     private final CopyOnWriteArrayList JavaDoc connections = new CopyOnWriteArrayList JavaDoc();
91     private final HashMap JavaDoc destinations = new HashMap JavaDoc();
92     private final CopyOnWriteArrayList JavaDoc brokerInfos = new CopyOnWriteArrayList JavaDoc();
93
94     private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
95     private BrokerId brokerId;
96     private String JavaDoc brokerName;
97     private Map JavaDoc clientIdSet = new HashMap JavaDoc(); // we will synchronize access
98
private final DestinationInterceptor destinationInterceptor;
99     private ConnectionContext adminConnectionContext;
100     protected DestinationFactory destinationFactory;
101     protected final ConcurrentHashMap JavaDoc connectionStates = new ConcurrentHashMap JavaDoc();
102     
103         
104     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException JavaDoc {
105         this.brokerService = brokerService;
106         if (destinationFactory == null) {
107             throw new IllegalArgumentException JavaDoc("null destinationFactory");
108         }
109         this.sequenceGenerator.setLastSequenceId( destinationFactory.getLastMessageBrokerSequenceId() );
110         this.destinationFactory = destinationFactory;
111         queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
112         topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
113         this.destinationInterceptor = destinationInterceptor;
114         tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
115         tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
116     }
117     
118     public Map JavaDoc getDestinationMap() {
119         Map JavaDoc answer = getQueueRegion().getDestinationMap();
120         answer.putAll(getTopicRegion().getDestinationMap());
121         return answer;
122     }
123
124     public Set JavaDoc getDestinations(ActiveMQDestination destination) {
125         switch(destination.getDestinationType()) {
126         case ActiveMQDestination.QUEUE_TYPE:
127             return queueRegion.getDestinations(destination);
128         case ActiveMQDestination.TOPIC_TYPE:
129             return topicRegion.getDestinations(destination);
130         case ActiveMQDestination.TEMP_QUEUE_TYPE:
131             return tempQueueRegion.getDestinations(destination);
132         case ActiveMQDestination.TEMP_TOPIC_TYPE:
133             return tempTopicRegion.getDestinations(destination);
134         default:
135             return Collections.EMPTY_SET;
136         }
137     }
138
139     public Broker getAdaptor(Class JavaDoc type){
140         if (type.isInstance(this)){
141             return this;
142         }
143         return null;
144     }
145
146     public Region getQueueRegion() {
147         return queueRegion;
148     }
149
150     public Region getTempQueueRegion() {
151         return tempQueueRegion;
152     }
153
154     public Region getTempTopicRegion() {
155         return tempTopicRegion;
156     }
157
158     public Region getTopicRegion() {
159         return topicRegion;
160     }
161
162     protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
163         return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
164     }
165
166     protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
167         return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
168     }
169
170     protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
171         return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
172     }
173
174     protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
175         return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
176     }
177     
178     private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException JavaDoc {
179         return new MemoryPersistenceAdapter();
180     }
181     
182     
183     public void start() throws Exception JavaDoc {
184         ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
185         started = true;
186         queueRegion.start();
187         topicRegion.start();
188         tempQueueRegion.start();
189         tempTopicRegion.start();
190     }
191
192     public void stop() throws Exception JavaDoc {
193         started = false;
194         ServiceStopper ss = new ServiceStopper();
195         doStop(ss);
196         ss.throwFirstException();
197     }
198     
199     public PolicyMap getDestinationPolicy(){
200         return brokerService != null ? brokerService.getDestinationPolicy() : null;
201     }
202
203     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception JavaDoc {
204         String JavaDoc clientId = info.getClientId();
205         if (clientId == null) {
206             throw new InvalidClientIDException JavaDoc("No clientID specified for connection request");
207         }
208         synchronized (clientIdSet ) {
209             if (clientIdSet.containsKey(clientId)) {
210                 throw new InvalidClientIDException JavaDoc("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected");
211             }
212             else {
213                 clientIdSet.put(clientId, info);
214             }
215         }
216
217         connections.add(context.getConnection());
218     }
219
220     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable JavaDoc error) throws Exception JavaDoc {
221         String JavaDoc clientId = info.getClientId();
222         if (clientId == null) {
223             throw new InvalidClientIDException JavaDoc("No clientID specified for connection disconnect request");
224         }
225         synchronized (clientIdSet) {
226             ConnectionInfo oldValue = (ConnectionInfo) clientIdSet.get(clientId);
227             // we may be removing the duplicate connection, not the first connection to be created
228
// so lets check that their connection IDs are the same
229
if (oldValue != null) {
230                 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
231                     clientIdSet.remove(clientId);
232                 }
233             }
234         }
235         connections.remove(context.getConnection());
236     }
237
238     protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
239         return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
240     }
241
242     public Connection[] getClients() throws Exception JavaDoc {
243         ArrayList JavaDoc l = new ArrayList JavaDoc(connections);
244         Connection rc[] = new Connection[l.size()];
245         l.toArray(rc);
246         return rc;
247     }
248
249     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception JavaDoc {
250   
251         Destination answer;
252         synchronized(destinations) {
253             answer = (Destination) destinations.get(destination);
254             if( answer!=null )
255                 return answer;
256         }
257         
258         switch(destination.getDestinationType()) {
259         case ActiveMQDestination.QUEUE_TYPE:
260             answer = queueRegion.addDestination(context, destination);
261             break;
262         case ActiveMQDestination.TOPIC_TYPE:
263             answer = topicRegion.addDestination(context, destination);
264             break;
265         case ActiveMQDestination.TEMP_QUEUE_TYPE:
266             answer = tempQueueRegion.addDestination(context, destination);
267             break;
268         case ActiveMQDestination.TEMP_TOPIC_TYPE:
269             answer = tempTopicRegion.addDestination(context, destination);
270             break;
271         default:
272             throw createUnknownDestinationTypeException(destination);
273         }
274
275         synchronized(destinations) {
276             destinations.put(destination, answer);
277             return answer;
278         }
279     }
280
281     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception JavaDoc{
282         synchronized(destinations) {
283             if( destinations.remove(destination)!=null ){
284                 switch(destination.getDestinationType()){
285                 case ActiveMQDestination.QUEUE_TYPE:
286                     queueRegion.removeDestination(context,destination,timeout);
287                     break;
288                 case ActiveMQDestination.TOPIC_TYPE:
289                     topicRegion.removeDestination(context,destination,timeout);
290                     break;
291                 case ActiveMQDestination.TEMP_QUEUE_TYPE:
292                     tempQueueRegion.removeDestination(context,destination,timeout);
293                     break;
294                 case ActiveMQDestination.TEMP_TOPIC_TYPE:
295                     tempTopicRegion.removeDestination(context,destination,timeout);
296                     break;
297                 default:
298                     throw createUnknownDestinationTypeException(destination);
299                 }
300             }
301         }
302     }
303     
304     public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception JavaDoc{
305         addDestination(context,info.getDestination());
306         
307     }
308
309     public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception JavaDoc{
310         removeDestination(context,info.getDestination(), info.getTimeout());
311         
312     }
313
314     public ActiveMQDestination[] getDestinations() throws Exception JavaDoc {
315         ArrayList JavaDoc l;
316         synchronized(destinations) {
317             l = new ArrayList JavaDoc(destinations.values());
318         }
319         ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
320         l.toArray(rc);
321         return rc;
322     }
323
324
325     public void addSession(ConnectionContext context, SessionInfo info) throws Exception JavaDoc {
326     }
327
328     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception JavaDoc {
329     }
330
331     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
332     }
333
334     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
335     }
336
337     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
338         ActiveMQDestination destination = info.getDestination();
339         switch(destination.getDestinationType()) {
340         case ActiveMQDestination.QUEUE_TYPE:
341             return queueRegion.addConsumer(context, info);
342             
343         case ActiveMQDestination.TOPIC_TYPE:
344             return topicRegion.addConsumer(context, info);
345         
346         case ActiveMQDestination.TEMP_QUEUE_TYPE:
347             return tempQueueRegion.addConsumer(context, info);
348             
349         case ActiveMQDestination.TEMP_TOPIC_TYPE:
350             return tempTopicRegion.addConsumer(context, info);
351             
352         default:
353             throw createUnknownDestinationTypeException(destination);
354         }
355     }
356
357     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
358         ActiveMQDestination destination = info.getDestination();
359         switch(destination.getDestinationType()) {
360         case ActiveMQDestination.QUEUE_TYPE:
361             queueRegion.removeConsumer(context, info);
362             break;
363         case ActiveMQDestination.TOPIC_TYPE:
364             topicRegion.removeConsumer(context, info);
365             break;
366         case ActiveMQDestination.TEMP_QUEUE_TYPE:
367             tempQueueRegion.removeConsumer(context, info);
368             break;
369         case ActiveMQDestination.TEMP_TOPIC_TYPE:
370             tempTopicRegion.removeConsumer(context, info);
371             break;
372         default:
373             throw createUnknownDestinationTypeException(destination);
374         }
375     }
376
377     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception JavaDoc {
378         topicRegion.removeSubscription(context, info);
379     }
380
381     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception JavaDoc {
382         long si = sequenceGenerator.getNextSequenceId();
383         message.getMessageId().setBrokerSequenceId(si);
384         if (producerExchange.isMutable() || producerExchange.getRegion()==null) {
385             ActiveMQDestination destination = message.getDestination();
386             //ensure the destination is registered with the RegionBroker
387
addDestination(producerExchange.getConnectionContext(),destination);
388             Region region = null;
389             switch(destination.getDestinationType()) {
390             case ActiveMQDestination.QUEUE_TYPE:
391                 region = queueRegion;
392                 break;
393             case ActiveMQDestination.TOPIC_TYPE:
394                 region = topicRegion;
395                 break;
396             case ActiveMQDestination.TEMP_QUEUE_TYPE:
397                 region = tempQueueRegion;
398                 break;
399             case ActiveMQDestination.TEMP_TOPIC_TYPE:
400                 region = tempTopicRegion;
401                 break;
402             default:
403                 throw createUnknownDestinationTypeException(destination);
404             }
405             producerExchange.setRegion(region);
406         }
407         producerExchange.getRegion().send(producerExchange,message);
408     }
409
410     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception JavaDoc{
411         if(consumerExchange.isWildcard() || consumerExchange.getRegion()==null){
412             ActiveMQDestination destination=ack.getDestination();
413             Region region=null;
414             switch(destination.getDestinationType()){
415             case ActiveMQDestination.QUEUE_TYPE:
416                 region=queueRegion;
417                 break;
418             case ActiveMQDestination.TOPIC_TYPE:
419                 region=topicRegion;
420                 break;
421             case ActiveMQDestination.TEMP_QUEUE_TYPE:
422                 region=tempQueueRegion;
423                 break;
424             case ActiveMQDestination.TEMP_TOPIC_TYPE:
425                 region=tempTopicRegion;
426                 break;
427             default:
428                 throw createUnknownDestinationTypeException(destination);
429             }
430             consumerExchange.setRegion(region);
431         }
432         consumerExchange.getRegion().acknowledge(consumerExchange,ack);
433     }
434
435     
436     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception JavaDoc {
437         ActiveMQDestination destination = pull.getDestination();
438         switch (destination.getDestinationType()) {
439         case ActiveMQDestination.QUEUE_TYPE:
440             return queueRegion.messagePull(context, pull);
441
442         case ActiveMQDestination.TOPIC_TYPE:
443             return topicRegion.messagePull(context, pull);
444
445         case ActiveMQDestination.TEMP_QUEUE_TYPE:
446             return tempQueueRegion.messagePull(context, pull);
447
448         case ActiveMQDestination.TEMP_TOPIC_TYPE:
449             return tempTopicRegion.messagePull(context, pull);
450         default:
451             throw createUnknownDestinationTypeException(destination);
452         }
453     }
454
455     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception JavaDoc {
456         throw new IllegalAccessException JavaDoc("Transaction operation not implemented by this broker.");
457     }
458
459     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
460         throw new IllegalAccessException JavaDoc("Transaction operation not implemented by this broker.");
461     }
462
463     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
464         throw new IllegalAccessException JavaDoc("Transaction operation not implemented by this broker.");
465     }
466
467     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
468         throw new IllegalAccessException JavaDoc("Transaction operation not implemented by this broker.");
469     }
470
471     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception JavaDoc {
472         throw new IllegalAccessException JavaDoc("Transaction operation not implemented by this broker.");
473     }
474     
475     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception JavaDoc {
476         throw new IllegalAccessException JavaDoc("Transaction operation not implemented by this broker.");
477     }
478
479
480     public void gc() {
481         queueRegion.gc();
482         topicRegion.gc();
483     }
484
485     public BrokerId getBrokerId() {
486         if( brokerId==null ) {
487             // TODO: this should persist the broker id so that subsequent startup
488
// uses the same broker id.
489
brokerId=new BrokerId(brokerIdGenerator.generateId());
490         }
491         return brokerId;
492     }
493     
494     public void setBrokerId(BrokerId brokerId) {
495         this.brokerId = brokerId;
496     }
497
498     public String JavaDoc getBrokerName() {
499         if( brokerName==null ) {
500             try {
501                 brokerName = java.net.InetAddress.getLocalHost().getHostName().toLowerCase();
502             } catch (Exception JavaDoc e) {
503                 brokerName="localhost";
504             }
505         }
506         return brokerName;
507     }
508     
509     public void setBrokerName(String JavaDoc brokerName) {
510         this.brokerName = brokerName;
511     }
512     
513     public DestinationStatistics getDestinationStatistics() {
514         return destinationStatistics;
515     }
516
517     protected JMSException JavaDoc createUnknownDestinationTypeException(ActiveMQDestination destination) {
518         return new JMSException JavaDoc("Unknown destination type: " + destination.getDestinationType());
519     }
520
521     public synchronized void addBroker(Connection connection,BrokerInfo info){
522             brokerInfos.add(info);
523     }
524     
525     public synchronized void removeBroker(Connection connection,BrokerInfo info){
526         if (info != null){
527             brokerInfos.remove(info);
528         }
529     }
530
531     public synchronized BrokerInfo[] getPeerBrokerInfos(){
532         BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
533         result = (BrokerInfo[])brokerInfos.toArray(result);
534         return result;
535     }
536     
537     public void processDispatch(MessageDispatch messageDispatch){
538         
539     }
540     
541     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception JavaDoc {
542         ActiveMQDestination destination = messageDispatchNotification.getDestination();
543         switch(destination.getDestinationType()) {
544         case ActiveMQDestination.QUEUE_TYPE:
545             queueRegion.processDispatchNotification(messageDispatchNotification);
546             break;
547         case ActiveMQDestination.TOPIC_TYPE:
548             topicRegion.processDispatchNotification(messageDispatchNotification);
549             break;
550         case ActiveMQDestination.TEMP_QUEUE_TYPE:
551             tempQueueRegion.processDispatchNotification(messageDispatchNotification);
552             break;
553         case ActiveMQDestination.TEMP_TOPIC_TYPE:
554             tempTopicRegion.processDispatchNotification(messageDispatchNotification);
555             break;
556         default:
557             throw createUnknownDestinationTypeException(destination);
558         }
559     }
560     
561     public boolean isSlaveBroker(){
562         return brokerService.isSlave();
563     }
564     
565     public boolean isStopped(){
566         return !started;
567     }
568     
569     public Set JavaDoc getDurableDestinations(){
570         return destinationFactory.getDestinations();
571     }
572     
573     public boolean isFaultTolerantConfiguration(){
574         return false;
575     }
576
577
578     protected void doStop(ServiceStopper ss) {
579         ss.stop(queueRegion);
580         ss.stop(topicRegion);
581         ss.stop(tempQueueRegion);
582         ss.stop(tempTopicRegion);
583     }
584
585     public boolean isKeepDurableSubsActive() {
586         return keepDurableSubsActive;
587     }
588
589     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
590         this.keepDurableSubsActive = keepDurableSubsActive;
591     }
592
593     public DestinationInterceptor getDestinationInterceptor() {
594         return destinationInterceptor;
595     }
596
597     public ConnectionContext getAdminConnectionContext() {
598         return adminConnectionContext;
599     }
600  
601     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
602         this.adminConnectionContext = adminConnectionContext;
603     }
604     
605     public Map JavaDoc getConnectionStates() {
606         return connectionStates;
607     }
608
609     public Store getTempDataStore() {
610         return brokerService.getTempDataStore();
611     }
612     
613     public URI JavaDoc getVmConnectorURI(){
614         return brokerService.getVmConnectorURI();
615     }
616 }
617
Popular Tags