1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 import java.util.Iterator ; 22 import java.util.Set ; 23 24 import javax.jms.InvalidDestinationException ; 25 import javax.jms.JMSException ; 26 27 import org.apache.activemq.advisory.AdvisorySupport; 28 import org.apache.activemq.broker.ConnectionContext; 29 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 30 import org.apache.activemq.broker.region.policy.PolicyEntry; 31 import org.apache.activemq.command.ActiveMQDestination; 32 import org.apache.activemq.command.ConnectionId; 33 import org.apache.activemq.command.ConsumerId; 34 import org.apache.activemq.command.ConsumerInfo; 35 import org.apache.activemq.command.RemoveSubscriptionInfo; 36 import org.apache.activemq.command.SessionId; 37 import org.apache.activemq.command.SubscriptionInfo; 38 import org.apache.activemq.memory.UsageManager; 39 import org.apache.activemq.store.TopicMessageStore; 40 import org.apache.activemq.thread.TaskRunnerFactory; 41 import org.apache.activemq.util.LongSequenceGenerator; 42 import org.apache.activemq.util.SubscriptionKey; 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 46 import java.util.concurrent.ConcurrentHashMap ; 47 48 52 public class TopicRegion extends AbstractRegion { 53 private static final Log log = LogFactory.getLog(TopicRegion.class); 54 protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap (); 55 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); 56 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 57 private boolean keepDurableSubsActive=false; 58 59 public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, 60 DestinationFactory destinationFactory) { 61 super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 62 63 } 64 65 public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception { 66 if(info.isDurable()){ 67 ActiveMQDestination destination=info.getDestination(); 68 if(!destination.isPattern()){ 69 lookup(context,destination); 71 } 72 String clientId=context.getClientId(); 73 String subcriptionName=info.getSubscriptionName(); 74 SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName); 75 DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); 76 if(sub!=null){ 77 if(sub.isActive()){ 78 throw new JMSException ("Durable consumer is in use for client: "+clientId+" and subscriptionName: " 79 +subcriptionName); 80 } 81 if(hasDurableSubChanged(info,sub.getConsumerInfo())){ 83 durableSubscriptions.remove(key); 85 for(Iterator iter=destinations.values().iterator();iter.hasNext();){ 86 Topic topic=(Topic)iter.next(); 87 topic.deleteSubscription(context,key); 88 } 89 super.removeConsumer(context,sub.getConsumerInfo()); 90 super.addConsumer(context,info); 91 sub=(DurableTopicSubscription)durableSubscriptions.get(key); 92 }else{ 93 if(sub.getConsumerInfo().getConsumerId()!=null) 95 subscriptions.remove(sub.getConsumerInfo().getConsumerId()); 96 subscriptions.put(info.getConsumerId(),sub); 97 } 98 }else{ 99 super.addConsumer(context,info); 100 sub=(DurableTopicSubscription)durableSubscriptions.get(key); 101 if(sub==null){ 102 throw new JMSException ("Cannot use the same consumerId: "+info.getConsumerId() 103 +" for two different durable subscriptions clientID: "+key.getClientId() 104 +" subscriberName: "+key.getSubscriptionName()); 105 } 106 } 107 sub.activate(memoryManager,context,info); 108 return sub; 109 }else{ 110 return super.addConsumer(context,info); 111 } 112 } 113 114 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 115 if (info.isDurable()) { 116 117 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 118 DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); 119 if (sub != null) { 120 sub.deactivate(keepDurableSubsActive); 121 } 122 123 } 124 else { 125 super.removeConsumer(context, info); 126 } 127 } 128 129 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 130 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName()); 131 DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); 132 if (sub == null) { 133 throw new InvalidDestinationException ("No durable subscription exists for: " + info.getSubcriptionName()); 134 } 135 if (sub.isActive()) { 136 throw new JMSException ("Durable consumer is in use"); 137 } 138 139 durableSubscriptions.remove(key); 140 for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { 141 Topic topic = (Topic) iter.next(); 142 topic.deleteSubscription(context, key); 143 } 144 super.removeConsumer(context, sub.getConsumerInfo()); 145 } 146 147 public String toString() { 148 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() 149 + "%"; 150 } 151 152 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 155 Topic topic = (Topic) super.createDestination(context, destination); 156 157 recoverDurableSubscriptions(context, topic); 158 159 return topic; 160 } 161 162 private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException , JMSException , Exception { 163 TopicMessageStore store = (TopicMessageStore) topic.getMessageStore(); 164 if (store != null) { 166 SubscriptionInfo[] infos = store.getAllSubscriptions(); 167 for (int i = 0; i < infos.length; i++) { 168 169 SubscriptionInfo info = infos[i]; 170 log.debug("Restoring durable subscription: "+infos); 171 SubscriptionKey key = new SubscriptionKey(info); 172 173 DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); 175 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); 176 if( sub == null ) { 177 ConnectionContext c = new ConnectionContext(); 178 c.setBroker(context.getBroker()); 179 c.setClientId(key.getClientId()); 180 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); 181 sub = (DurableTopicSubscription) createSubscription(c, consumerInfo ); 182 } 183 184 topic.addSubscription(context, sub); 185 } 186 } 187 } 188 189 private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { 190 ConsumerInfo rc = new ConsumerInfo(); 191 rc.setSelector(info.getSelector()); 192 rc.setSubscriptionName(info.getSubcriptionName()); 193 rc.setDestination(info.getDestination()); 194 rc.setConsumerId(createConsumerId()); 195 return rc; 196 } 197 198 private ConsumerId createConsumerId() { 199 return new ConsumerId(recoveredDurableSubSessionId,recoveredDurableSubIdGenerator.getNextSequenceId()); 200 } 201 202 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 203 if (broker.getDestinationPolicy() != null) { 204 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 205 if (entry != null) { 206 entry.configure(topic); 207 } 208 } 209 } 210 211 protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException { 212 if(info.isDurable()){ 213 if(AdvisorySupport.isAdvisoryTopic(info.getDestination())){ 214 throw new JMSException ("Cannot create a durable subscription for an advisory Topic"); 215 } 216 SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName()); 217 DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); 218 if(sub==null){ 219 sub=new DurableTopicSubscription(broker,memoryManager,context,info,keepDurableSubsActive); 220 ActiveMQDestination destination=info.getDestination(); 221 if(destination!=null&&broker.getDestinationPolicy()!=null){ 222 PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); 223 if(entry!=null){ 224 entry.configure(broker,memoryManager,sub); 225 } 226 } 227 durableSubscriptions.put(key,sub); 228 }else{ 229 throw new JMSException ("That durable subscription is already active."); 230 } 231 return sub; 232 } 233 try{ 234 TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager); 235 ActiveMQDestination destination=info.getDestination(); 237 if(destination!=null&&broker.getDestinationPolicy()!=null){ 238 PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); 239 if(entry!=null){ 240 entry.configure(broker,memoryManager,answer); 241 } 242 } 243 answer.init(); 244 return answer; 245 }catch(Exception e){ 246 log.error("Failed to create TopicSubscription ",e); 247 JMSException jmsEx=new JMSException ("Couldn't create TopicSubscription"); 248 jmsEx.setLinkedException(e); 249 throw jmsEx; 250 } 251 } 252 253 255 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) { 256 if (info1.getSelector() != null ^ info2.getSelector() != null) 257 return true; 258 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) 259 return true; 260 return !info1.getDestination().equals(info2.getDestination()); 261 } 262 263 protected Set getInactiveDestinations() { 264 Set inactiveDestinations = super.getInactiveDestinations(); 265 for (Iterator iter = inactiveDestinations.iterator(); iter.hasNext();) { 266 ActiveMQDestination dest = (ActiveMQDestination) iter.next(); 267 if (!dest.isTopic()) 268 iter.remove(); 269 } 270 return inactiveDestinations; 271 } 272 273 public boolean isKeepDurableSubsActive() { 274 return keepDurableSubsActive; 275 } 276 277 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 278 this.keepDurableSubsActive = keepDurableSubsActive; 279 } 280 281 } 282 | Popular Tags |