1 14 15 package org.apache.activemq.broker.region; 16 17 import java.io.IOException ; 18 import java.util.Iterator ; 19 import java.util.concurrent.ConcurrentHashMap ; 20 import javax.jms.InvalidSelectorException ; 21 import org.apache.activemq.broker.Broker; 22 import org.apache.activemq.broker.ConnectionContext; 23 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 24 import org.apache.activemq.command.ConsumerInfo; 25 import org.apache.activemq.command.Message; 26 import org.apache.activemq.command.MessageAck; 27 import org.apache.activemq.command.MessageDispatch; 28 import org.apache.activemq.memory.UsageListener; 29 import org.apache.activemq.memory.UsageManager; 30 import org.apache.activemq.util.SubscriptionKey; 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 34 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener{ 35 36 static private final Log log=LogFactory.getLog(PrefetchSubscription.class); 37 private final ConcurrentHashMap redeliveredMessages=new ConcurrentHashMap (); 38 private final ConcurrentHashMap destinations=new ConcurrentHashMap (); 39 private final SubscriptionKey subscriptionKey; 40 private final boolean keepDurableSubsActive; 41 private final UsageManager usageManager; 42 private boolean active=false; 43 44 public DurableTopicSubscription(Broker broker,UsageManager usageManager,ConnectionContext context, 45 ConsumerInfo info,boolean keepDurableSubsActive) throws InvalidSelectorException { 46 super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(), 47 broker.getTempDataStore(),info.getPrefetchSize())); 48 this.usageManager=usageManager; 49 this.keepDurableSubsActive=keepDurableSubsActive; 50 subscriptionKey=new SubscriptionKey(context.getClientId(),info.getSubscriptionName()); 51 } 52 53 synchronized public boolean isActive(){ 54 return active; 55 } 56 57 protected boolean isFull(){ 58 return !active||super.isFull(); 59 } 60 61 synchronized public void gc(){ 62 } 63 64 public synchronized void add(ConnectionContext context,Destination destination) throws Exception { 65 super.add(context,destination); 66 destinations.put(destination.getActiveMQDestination(),destination); 67 if(active||keepDurableSubsActive){ 68 Topic topic=(Topic)destination; 69 topic.activate(context,this); 70 if(pending.isEmpty(topic)){ 71 topic.recoverRetroactiveMessages(context,this); 72 } 73 } 74 dispatchMatched(); 75 } 76 77 public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception { 78 log.debug("Deactivating "+this); 79 if(!active){ 80 this.active=true; 81 this.context=context; 82 this.info=info; 83 if(!keepDurableSubsActive){ 84 for(Iterator iter=destinations.values().iterator();iter.hasNext();){ 85 Topic topic=(Topic)iter.next(); 86 topic.activate(context,this); 87 } 88 } 89 synchronized(pending){ 90 pending.setUsageManager(memoryManager); 91 pending.start(); 92 } 93 if(pending.isEmpty()){ 95 for(Iterator iter=destinations.values().iterator();iter.hasNext();){ 96 Topic topic=(Topic)iter.next(); 97 topic.recoverRetroactiveMessages(context,this); 98 } 99 } 100 dispatchMatched(); 101 this.usageManager.addUsageListener(this); 102 } 103 } 104 105 synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { 106 active=false; 107 this.usageManager.removeUsageListener(this); 108 synchronized(pending){ 109 pending.stop(); 110 } 111 if(!keepDurableSubsActive){ 112 for(Iterator iter=destinations.values().iterator();iter.hasNext();){ 113 Topic topic=(Topic)iter.next(); 114 topic.deactivate(context,this); 115 } 116 } 117 synchronized(dispatched){ 118 for(Iterator iter=dispatched.iterator();iter.hasNext();){ 119 MessageReference node=(MessageReference)iter.next(); 121 Integer count=(Integer )redeliveredMessages.get(node.getMessageId()); 122 if(count!=null){ 123 redeliveredMessages.put(node.getMessageId(),new Integer (count.intValue()+1)); 124 }else{ 125 redeliveredMessages.put(node.getMessageId(),new Integer (1)); 126 } 127 if(keepDurableSubsActive){ 128 synchronized(pending){ 129 pending.addMessageFirst(node); 130 } 131 }else{ 132 node.decrementReferenceCount(); 133 } 134 iter.remove(); 135 } 136 } 137 if(!keepDurableSubsActive){ 138 synchronized(pending){ 139 try{ 140 pending.reset(); 141 while(pending.hasNext()){ 142 MessageReference node=pending.next(); 143 node.decrementReferenceCount(); 144 pending.remove(); 145 } 146 }finally{ 147 pending.release(); 148 } 149 } 150 } 151 prefetchExtension=0; 152 } 153 154 protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ 155 MessageDispatch md=super.createMessageDispatch(node,message); 156 Integer count=(Integer )redeliveredMessages.get(node.getMessageId()); 157 if(count!=null){ 158 md.setRedeliveryCounter(count.intValue()); 159 } 160 return md; 161 } 162 163 public void add(MessageReference node) throws Exception { 164 if(!active&&!keepDurableSubsActive){ 165 return; 166 } 167 node.incrementReferenceCount(); 168 super.add(node); 169 } 170 171 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 172 pending.addRecoveredMessage(message); 173 } 174 175 public int getPendingQueueSize(){ 176 if(active||keepDurableSubsActive){ 177 return super.getPendingQueueSize(); 178 } 179 return 0; 181 } 182 183 public void setSelector(String selector) throws InvalidSelectorException { 184 throw new UnsupportedOperationException ( 185 "You cannot dynamically change the selector for durable topic subscriptions"); 186 } 187 188 protected boolean canDispatch(MessageReference node){ 189 return active; 190 } 191 192 protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException { 193 node.getRegionDestination().acknowledge(context,this,ack,node); 194 redeliveredMessages.remove(node.getMessageId()); 195 node.decrementReferenceCount(); 196 } 197 198 public String getSubscriptionName(){ 199 return subscriptionKey.getSubscriptionName(); 200 } 201 202 public String toString(){ 203 return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() 204 +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter 205 +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension; 206 } 207 208 public String getClientId(){ 209 return subscriptionKey.getClientId(); 210 } 211 212 public SubscriptionKey getSubscriptionKey(){ 213 return subscriptionKey; 214 } 215 216 219 public void destroy(){ 220 try{ 221 synchronized(pending){ 222 pending.reset(); 223 while(pending.hasNext()){ 224 MessageReference node=pending.next(); 225 node.decrementReferenceCount(); 226 } 227 } 228 }finally{ 229 pending.release(); 230 pending.clear(); 231 } 232 for(Iterator iter=dispatched.iterator();iter.hasNext();){ 233 MessageReference node=(MessageReference)iter.next(); 234 node.decrementReferenceCount(); 235 } 236 dispatched.clear(); 237 } 238 239 246 public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ 247 if(oldPercentUsage>newPercentUsage&&oldPercentUsage>=90){ 248 try{ 249 dispatchMatched(); 250 }catch(IOException e){ 251 log.warn("problem calling dispatchMatched",e); 252 } 253 } 254 } 255 } 256 | Popular Tags |