1 14 15 package org.apache.activemq.broker.region; 16 17 import java.io.IOException ; 18 import java.util.LinkedList ; 19 import java.util.concurrent.atomic.AtomicLong ; 20 import javax.jms.InvalidSelectorException ; 21 import javax.jms.JMSException ; 22 import org.apache.activemq.broker.Broker; 23 import org.apache.activemq.broker.ConnectionContext; 24 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 25 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 26 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 27 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 28 import org.apache.activemq.command.ConsumerControl; 29 import org.apache.activemq.command.ConsumerInfo; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.command.MessageAck; 32 import org.apache.activemq.command.MessageDispatch; 33 import org.apache.activemq.command.MessageDispatchNotification; 34 import org.apache.activemq.command.MessagePull; 35 import org.apache.activemq.command.Response; 36 import org.apache.activemq.memory.UsageManager; 37 import org.apache.activemq.transaction.Synchronization; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 41 public class TopicSubscription extends AbstractSubscription{ 42 43 private static final Log log=LogFactory.getLog(TopicSubscription.class); 44 private static final AtomicLong cursorNameCounter=new AtomicLong (0); 45 protected PendingMessageCursor matched; 46 final protected UsageManager usageManager; 47 protected AtomicLong dispatchedCounter=new AtomicLong (); 48 protected AtomicLong prefetchExtension=new AtomicLong (); 49 private int maximumPendingMessages=-1; 50 private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy(); 51 private int discarded=0; 52 private final Object matchedListMutex=new Object (); 53 private final AtomicLong enqueueCounter=new AtomicLong (0); 54 private final AtomicLong dequeueCounter=new AtomicLong (0); 55 boolean singleDestination=true; 56 Destination destination; 57 private int memoryUsageHighWaterMark=95; 58 59 public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager) 60 throws Exception { 61 super(broker,context,info); 62 this.usageManager=usageManager; 63 String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString() 64 +"]"; 65 this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore()); 66 67 } 68 69 public void init() throws Exception { 70 this.matched.setUsageManager(usageManager); 71 this.matched.start(); 72 } 73 74 public void add(MessageReference node) throws Exception { 75 enqueueCounter.incrementAndGet(); 76 node.incrementReferenceCount(); 77 if(!isFull()&&!isSlaveBroker()){ 78 optimizePrefetch(); 79 dispatch(node); 82 }else{ 83 if(maximumPendingMessages!=0){ 84 synchronized(matchedListMutex){ 85 matched.addMessageLast(node); 86 if(maximumPendingMessages>0){ 88 int max=messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 90 if(maximumPendingMessages>0&&maximumPendingMessages<max){ 91 max=maximumPendingMessages; 92 } 93 if(!matched.isEmpty()&&matched.size()>max){ 94 removeExpiredMessages(); 95 } 96 while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){ 98 int pageInSize=matched.size()-maximumPendingMessages; 99 pageInSize=Math.max(1000,pageInSize); 101 LinkedList list=matched.pageInList(pageInSize); 102 MessageReference[] oldMessages=messageEvictionStrategy.evictMessages(list); 103 int messagesToEvict=oldMessages.length; 104 for(int i=0;i<messagesToEvict;i++){ 105 MessageReference oldMessage=oldMessages[i]; 106 oldMessage.decrementReferenceCount(); 107 matched.remove(oldMessage); 108 discarded++; 109 if(log.isDebugEnabled()){ 110 log.debug("Discarding message "+oldMessages[i]); 111 } 112 } 113 if(messagesToEvict==0){ 116 log.warn("No messages to evict returned from eviction strategy: " 117 +messageEvictionStrategy); 118 break; 119 } 120 } 121 } 122 } 123 } 124 } 125 } 126 127 132 protected void removeExpiredMessages() throws IOException { 133 try{ 134 matched.reset(); 135 while(matched.hasNext()){ 136 MessageReference node=matched.next(); 137 if(node.isExpired()){ 138 matched.remove(); 139 dispatchedCounter.incrementAndGet(); 140 node.decrementReferenceCount(); 141 break; 142 } 143 } 144 }finally{ 145 matched.release(); 146 } 147 } 148 149 public void processMessageDispatchNotification(MessageDispatchNotification mdn){ 150 synchronized(matchedListMutex){ 151 try{ 152 matched.reset(); 153 while(matched.hasNext()){ 154 MessageReference node=matched.next(); 155 if(node.getMessageId().equals(mdn.getMessageId())){ 156 matched.remove(); 157 dispatchedCounter.incrementAndGet(); 158 node.decrementReferenceCount(); 159 break; 160 } 161 } 162 }finally{ 163 matched.release(); 164 } 165 } 166 } 167 168 synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 169 boolean wasFull=isFull(); 171 if(ack.isStandardAck()||ack.isPoisonAck()){ 172 if(context.isInTransaction()){ 173 prefetchExtension.addAndGet(ack.getMessageCount()); 174 context.getTransaction().addSynchronization(new Synchronization(){ 175 176 public void afterCommit() throws Exception { 177 synchronized(TopicSubscription.this){ 178 if( singleDestination && destination!=null) { 179 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 180 } 181 } 182 dequeueCounter.addAndGet(ack.getMessageCount()); 183 prefetchExtension.addAndGet(ack.getMessageCount()); 184 } 185 }); 186 }else{ 187 if( singleDestination && destination!=null) { 188 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 189 } 190 dequeueCounter.addAndGet(ack.getMessageCount()); 191 prefetchExtension.addAndGet(ack.getMessageCount()); 192 } 193 if(wasFull&&!isFull()){ 194 dispatchMatched(); 195 } 196 return; 197 }else if(ack.isDeliveredAck()){ 198 prefetchExtension.addAndGet(ack.getMessageCount()); 200 if(wasFull&&!isFull()){ 201 dispatchMatched(); 202 } 203 return; 204 } 205 throw new JMSException ("Invalid acknowledgment: "+ack); 206 } 207 208 public Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception { 209 return null; 211 } 212 213 public int getPendingQueueSize(){ 214 return matched(); 215 } 216 217 public int getDispatchedQueueSize(){ 218 return (int)(dispatchedCounter.get()-dequeueCounter.get()); 219 } 220 221 public int getMaximumPendingMessages(){ 222 return maximumPendingMessages; 223 } 224 225 public long getDispatchedCounter(){ 226 return dispatchedCounter.get(); 227 } 228 229 public long getEnqueueCounter(){ 230 return enqueueCounter.get(); 231 } 232 233 public long getDequeueCounter(){ 234 return dequeueCounter.get(); 235 } 236 237 240 public int discarded(){ 241 synchronized(matchedListMutex){ 242 return discarded; 243 } 244 } 245 246 250 public int matched(){ 251 synchronized(matchedListMutex){ 252 return matched.size(); 253 } 254 } 255 256 260 public void setMaximumPendingMessages(int maximumPendingMessages){ 261 this.maximumPendingMessages=maximumPendingMessages; 262 } 263 264 public MessageEvictionStrategy getMessageEvictionStrategy(){ 265 return messageEvictionStrategy; 266 } 267 268 271 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){ 272 this.messageEvictionStrategy=messageEvictionStrategy; 273 } 274 275 private boolean isFull(){ 278 return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize(); 279 } 280 281 284 public boolean isLowWaterMark(){ 285 return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize() *.4); 286 } 287 288 291 public boolean isHighWaterMark(){ 292 return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize() *.9); 293 } 294 295 298 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){ 299 this.memoryUsageHighWaterMark=memoryUsageHighWaterMark; 300 } 301 302 305 public int getMemoryUsageHighWaterMark(){ 306 return this.memoryUsageHighWaterMark; 307 } 308 309 312 public UsageManager getUsageManager(){ 313 return this.usageManager; 314 } 315 316 319 public PendingMessageCursor getMatched(){ 320 return this.matched; 321 } 322 323 326 public void setMatched(PendingMessageCursor matched){ 327 this.matched=matched; 328 } 329 330 335 public void updateConsumerPrefetch(int newPrefetch){ 336 if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){ 337 ConsumerControl cc=new ConsumerControl(); 338 cc.setConsumerId(info.getConsumerId()); 339 cc.setPrefetch(newPrefetch); 340 context.getConnection().dispatchAsync(cc); 341 } 342 } 343 344 348 public void optimizePrefetch(){ 349 357 } 358 359 private void dispatchMatched() throws IOException { 360 synchronized(matchedListMutex){ 361 try{ 362 matched.reset(); 363 while(matched.hasNext()&&!isFull()){ 364 MessageReference message=(MessageReference)matched.next(); 365 matched.remove(); 366 if(message.isExpired()){ 369 message.decrementReferenceCount(); 370 continue; } 372 dispatch(message); 373 } 374 }finally{ 375 matched.release(); 376 } 377 } 378 } 379 380 private void dispatch(final MessageReference node) throws IOException { 381 Message message=(Message)node; 382 MessageDispatch md=new MessageDispatch(); 384 md.setMessage(message); 385 md.setConsumerId(info.getConsumerId()); 386 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 387 dispatchedCounter.incrementAndGet(); 388 if(singleDestination){ 390 if(destination==null){ 391 destination=node.getRegionDestination(); 392 }else{ 393 if(destination!=node.getRegionDestination()){ 394 singleDestination=false; 395 } 396 } 397 } 398 if(info.isDispatchAsync()){ 399 md.setTransmitCallback(new Runnable (){ 400 401 public void run(){ 402 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 403 node.decrementReferenceCount(); 404 } 405 }); 406 context.getConnection().dispatchAsync(md); 407 }else{ 408 context.getConnection().dispatchSync(md); 409 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 410 node.decrementReferenceCount(); 411 } 412 } 413 414 public String toString(){ 415 return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() 416 +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched() 417 +", discarded="+discarded(); 418 } 419 420 public void destroy(){ 421 synchronized(matchedListMutex){ 422 try{ 423 matched.destroy(); 424 }catch(Exception e){ 425 log.warn("Failed to destroy cursor",e); 426 } 427 } 428 } 429 430 public int getPrefetchSize() { 431 return (int) (info.getPrefetchSize() + prefetchExtension.get()); 432 } 433 434 } 435 | Popular Tags |