KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > DestinationFactoryImpl


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jms.JMSException JavaDoc;
24
25 import org.apache.activemq.advisory.AdvisorySupport;
26 import org.apache.activemq.broker.ConnectionContext;
27 import org.apache.activemq.broker.region.policy.PolicyEntry;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQQueue;
30 import org.apache.activemq.command.ActiveMQTempDestination;
31 import org.apache.activemq.command.ActiveMQTopic;
32 import org.apache.activemq.command.SubscriptionInfo;
33 import org.apache.activemq.memory.UsageManager;
34 import org.apache.activemq.store.MessageStore;
35 import org.apache.activemq.store.PersistenceAdapter;
36 import org.apache.activemq.store.TopicMessageStore;
37 import org.apache.activemq.thread.TaskRunnerFactory;
38
39 /**
40  * Creates standard ActiveMQ implementations of {@link org.apache.activemq.broker.region.Destination}.
41  *
42  * @author fateev@amazon.com
43  * @version $Revision: 490814 $
44  */

45 public class DestinationFactoryImpl extends DestinationFactory {
46
47     protected final UsageManager memoryManager;
48     protected final TaskRunnerFactory taskRunnerFactory;
49     protected final PersistenceAdapter persistenceAdapter;
50     protected RegionBroker broker;
51
52     public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
53             PersistenceAdapter persistenceAdapter) {
54         this.memoryManager = memoryManager;
55         this.taskRunnerFactory = taskRunnerFactory;
56         if (persistenceAdapter == null) {
57             throw new IllegalArgumentException JavaDoc("null persistenceAdapter");
58         }
59         this.persistenceAdapter = persistenceAdapter;
60     }
61
62     public void setRegionBroker(RegionBroker broker) {
63         if (broker == null) {
64             throw new IllegalArgumentException JavaDoc("null broker");
65         }
66         this.broker = broker;
67     }
68
69     public Set JavaDoc getDestinations() {
70         return persistenceAdapter.getDestinations();
71     }
72
73     /**
74      * @return instance of {@link Queue} or {@link Topic}
75      */

76     public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception JavaDoc {
77         if (destination.isQueue()) {
78             if (destination.isTemporary()) {
79                 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
80                 return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
81                    
82                     public void addSubscription(ConnectionContext context,Subscription sub) throws Exception JavaDoc {
83                         // Only consumers on the same connection can consume from
84
// the temporary destination
85
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
86                             throw new JMSException JavaDoc("Cannot subscribe to remote temporary destination: "+tempDest);
87                         }
88                         super.addSubscription(context, sub);
89                     };
90                 };
91             } else {
92                 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
93                 Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
94                 configureQueue(queue, destination);
95                 queue.initialize();
96                 return queue;
97             }
98         } else if (destination.isTemporary()){
99             final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
100             return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
101                 public void addSubscription(ConnectionContext context,Subscription sub) throws Exception JavaDoc {
102                     // Only consumers on the same connection can consume from
103
// the temporary destination
104
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
105                         throw new JMSException JavaDoc("Cannot subscribe to remote temporary destination: "+tempDest);
106                     }
107                     super.addSubscription(context, sub);
108                 };
109             };
110         } else {
111             TopicMessageStore store = null;
112             if (!AdvisorySupport.isAdvisoryTopic(destination)) {
113                 store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
114             }
115             
116             Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
117             configureTopic(topic, destination);
118             
119             return topic;
120         }
121     }
122
123     protected void configureQueue(Queue queue, ActiveMQDestination destination) {
124         if (broker == null) {
125             throw new IllegalStateException JavaDoc("broker property is not set");
126         }
127         if (broker.getDestinationPolicy() != null) {
128             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
129             if (entry != null) {
130                 entry.configure(queue,broker.getTempDataStore());
131             }
132         }
133     }
134
135     protected void configureTopic(Topic topic, ActiveMQDestination destination) {
136         if (broker == null) {
137             throw new IllegalStateException JavaDoc("broker property is not set");
138         }
139         if (broker.getDestinationPolicy() != null) {
140             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
141             if (entry != null) {
142                 entry.configure(topic);
143             }
144         }
145     }
146
147     public long getLastMessageBrokerSequenceId() throws IOException JavaDoc {
148         return persistenceAdapter.getLastMessageBrokerSequenceId();
149     }
150
151     public PersistenceAdapter getPersistenceAdapter() {
152         return persistenceAdapter;
153     }
154
155     public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException JavaDoc {
156         return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
157     }
158
159 }
160
Popular Tags