1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 import java.util.Set ; 22 23 import javax.jms.JMSException ; 24 25 import org.apache.activemq.advisory.AdvisorySupport; 26 import org.apache.activemq.broker.ConnectionContext; 27 import org.apache.activemq.broker.region.policy.PolicyEntry; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQQueue; 30 import org.apache.activemq.command.ActiveMQTempDestination; 31 import org.apache.activemq.command.ActiveMQTopic; 32 import org.apache.activemq.command.SubscriptionInfo; 33 import org.apache.activemq.memory.UsageManager; 34 import org.apache.activemq.store.MessageStore; 35 import org.apache.activemq.store.PersistenceAdapter; 36 import org.apache.activemq.store.TopicMessageStore; 37 import org.apache.activemq.thread.TaskRunnerFactory; 38 39 45 public class DestinationFactoryImpl extends DestinationFactory { 46 47 protected final UsageManager memoryManager; 48 protected final TaskRunnerFactory taskRunnerFactory; 49 protected final PersistenceAdapter persistenceAdapter; 50 protected RegionBroker broker; 51 52 public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, 53 PersistenceAdapter persistenceAdapter) { 54 this.memoryManager = memoryManager; 55 this.taskRunnerFactory = taskRunnerFactory; 56 if (persistenceAdapter == null) { 57 throw new IllegalArgumentException ("null persistenceAdapter"); 58 } 59 this.persistenceAdapter = persistenceAdapter; 60 } 61 62 public void setRegionBroker(RegionBroker broker) { 63 if (broker == null) { 64 throw new IllegalArgumentException ("null broker"); 65 } 66 this.broker = broker; 67 } 68 69 public Set getDestinations() { 70 return persistenceAdapter.getDestinations(); 71 } 72 73 76 public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { 77 if (destination.isQueue()) { 78 if (destination.isTemporary()) { 79 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; 80 return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) { 81 82 public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { 83 if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { 86 throw new JMSException ("Cannot subscribe to remote temporary destination: "+tempDest); 87 } 88 super.addSubscription(context, sub); 89 }; 90 }; 91 } else { 92 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); 93 Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()); 94 configureQueue(queue, destination); 95 queue.initialize(); 96 return queue; 97 } 98 } else if (destination.isTemporary()){ 99 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; 100 return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) { 101 public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { 102 if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { 105 throw new JMSException ("Cannot subscribe to remote temporary destination: "+tempDest); 106 } 107 super.addSubscription(context, sub); 108 }; 109 }; 110 } else { 111 TopicMessageStore store = null; 112 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 113 store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination); 114 } 115 116 Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory); 117 configureTopic(topic, destination); 118 119 return topic; 120 } 121 } 122 123 protected void configureQueue(Queue queue, ActiveMQDestination destination) { 124 if (broker == null) { 125 throw new IllegalStateException ("broker property is not set"); 126 } 127 if (broker.getDestinationPolicy() != null) { 128 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 129 if (entry != null) { 130 entry.configure(queue,broker.getTempDataStore()); 131 } 132 } 133 } 134 135 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 136 if (broker == null) { 137 throw new IllegalStateException ("broker property is not set"); 138 } 139 if (broker.getDestinationPolicy() != null) { 140 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 141 if (entry != null) { 142 entry.configure(topic); 143 } 144 } 145 } 146 147 public long getLastMessageBrokerSequenceId() throws IOException { 148 return persistenceAdapter.getLastMessageBrokerSequenceId(); 149 } 150 151 public PersistenceAdapter getPersistenceAdapter() { 152 return persistenceAdapter; 153 } 154 155 public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { 156 return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); 157 } 158 159 } 160 | Popular Tags |