KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > TopicSubscription


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region;
16
17 import java.io.IOException JavaDoc;
18 import java.util.LinkedList JavaDoc;
19 import java.util.concurrent.atomic.AtomicLong JavaDoc;
20 import javax.jms.InvalidSelectorException JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import org.apache.activemq.broker.Broker;
23 import org.apache.activemq.broker.ConnectionContext;
24 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
25 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
26 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
27 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
28 import org.apache.activemq.command.ConsumerControl;
29 import org.apache.activemq.command.ConsumerInfo;
30 import org.apache.activemq.command.Message;
31 import org.apache.activemq.command.MessageAck;
32 import org.apache.activemq.command.MessageDispatch;
33 import org.apache.activemq.command.MessageDispatchNotification;
34 import org.apache.activemq.command.MessagePull;
35 import org.apache.activemq.command.Response;
36 import org.apache.activemq.memory.UsageManager;
37 import org.apache.activemq.transaction.Synchronization;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40
41 public class TopicSubscription extends AbstractSubscription{
42
43     private static final Log log=LogFactory.getLog(TopicSubscription.class);
44     private static final AtomicLong JavaDoc cursorNameCounter=new AtomicLong JavaDoc(0);
45     protected PendingMessageCursor matched;
46     final protected UsageManager usageManager;
47     protected AtomicLong JavaDoc dispatchedCounter=new AtomicLong JavaDoc();
48     protected AtomicLong JavaDoc prefetchExtension=new AtomicLong JavaDoc();
49     private int maximumPendingMessages=-1;
50     private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy();
51     private int discarded=0;
52     private final Object JavaDoc matchedListMutex=new Object JavaDoc();
53     private final AtomicLong JavaDoc enqueueCounter=new AtomicLong JavaDoc(0);
54     private final AtomicLong JavaDoc dequeueCounter=new AtomicLong JavaDoc(0);
55     boolean singleDestination=true;
56     Destination destination;
57     private int memoryUsageHighWaterMark=95;
58
59     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
60             throws Exception JavaDoc{
61         super(broker,context,info);
62         this.usageManager=usageManager;
63         String JavaDoc matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
64                 +"]";
65         this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
66        
67     }
68     
69     public void init() throws Exception JavaDoc {
70         this.matched.setUsageManager(usageManager);
71         this.matched.start();
72     }
73     
74     public void add(MessageReference node) throws Exception JavaDoc{
75         enqueueCounter.incrementAndGet();
76         node.incrementReferenceCount();
77         if(!isFull()&&!isSlaveBroker()){
78             optimizePrefetch();
79             // if maximumPendingMessages is set we will only discard messages which
80
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
81
dispatch(node);
82         }else{
83             if(maximumPendingMessages!=0){
84                 synchronized(matchedListMutex){
85                     matched.addMessageLast(node);
86                     // NOTE - be careful about the slaveBroker!
87
if(maximumPendingMessages>0){
88                         // calculate the high water mark from which point we will eagerly evict expired messages
89
int max=messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
90                         if(maximumPendingMessages>0&&maximumPendingMessages<max){
91                             max=maximumPendingMessages;
92                         }
93                         if(!matched.isEmpty()&&matched.size()>max){
94                             removeExpiredMessages();
95                         }
96                         // lets discard old messages as we are a slow consumer
97
while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){
98                             int pageInSize=matched.size()-maximumPendingMessages;
99                             // only page in a 1000 at a time - else we could blow da memory
100
pageInSize=Math.max(1000,pageInSize);
101                             LinkedList JavaDoc list=matched.pageInList(pageInSize);
102                             MessageReference[] oldMessages=messageEvictionStrategy.evictMessages(list);
103                             int messagesToEvict=oldMessages.length;
104                             for(int i=0;i<messagesToEvict;i++){
105                                 MessageReference oldMessage=oldMessages[i];
106                                 oldMessage.decrementReferenceCount();
107                                 matched.remove(oldMessage);
108                                 discarded++;
109                                 if(log.isDebugEnabled()){
110                                     log.debug("Discarding message "+oldMessages[i]);
111                                 }
112                             }
113                             // lets avoid an infinite loop if we are given a bad eviction strategy
114
// for a bad strategy lets just not evict
115
if(messagesToEvict==0){
116                                 log.warn("No messages to evict returned from eviction strategy: "
117                                         +messageEvictionStrategy);
118                                 break;
119                             }
120                         }
121                     }
122                 }
123             }
124         }
125     }
126
127     /**
128      * Discard any expired messages from the matched list. Called from a synchronized block.
129      *
130      * @throws IOException
131      */

132     protected void removeExpiredMessages() throws IOException JavaDoc{
133         try{
134             matched.reset();
135             while(matched.hasNext()){
136                 MessageReference node=matched.next();
137                 if(node.isExpired()){
138                     matched.remove();
139                     dispatchedCounter.incrementAndGet();
140                     node.decrementReferenceCount();
141                     break;
142                 }
143             }
144         }finally{
145             matched.release();
146         }
147     }
148
149     public void processMessageDispatchNotification(MessageDispatchNotification mdn){
150         synchronized(matchedListMutex){
151             try{
152                 matched.reset();
153                 while(matched.hasNext()){
154                     MessageReference node=matched.next();
155                     if(node.getMessageId().equals(mdn.getMessageId())){
156                         matched.remove();
157                         dispatchedCounter.incrementAndGet();
158                         node.decrementReferenceCount();
159                         break;
160                     }
161                 }
162             }finally{
163                 matched.release();
164             }
165         }
166     }
167
168     synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception JavaDoc{
169         // Handle the standard acknowledgment case.
170
boolean wasFull=isFull();
171         if(ack.isStandardAck()||ack.isPoisonAck()){
172             if(context.isInTransaction()){
173                 prefetchExtension.addAndGet(ack.getMessageCount());
174                 context.getTransaction().addSynchronization(new Synchronization(){
175
176                     public void afterCommit() throws Exception JavaDoc{
177                         synchronized(TopicSubscription.this){
178                             if( singleDestination && destination!=null) {
179                                 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
180                             }
181                         }
182                         dequeueCounter.addAndGet(ack.getMessageCount());
183                         prefetchExtension.addAndGet(ack.getMessageCount());
184                     }
185                 });
186             }else{
187                 if( singleDestination && destination!=null) {
188                     destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
189                 }
190                 dequeueCounter.addAndGet(ack.getMessageCount());
191                 prefetchExtension.addAndGet(ack.getMessageCount());
192             }
193             if(wasFull&&!isFull()){
194                 dispatchMatched();
195             }
196             return;
197         }else if(ack.isDeliveredAck()){
198             // Message was delivered but not acknowledged: update pre-fetch counters.
199
prefetchExtension.addAndGet(ack.getMessageCount());
200             if(wasFull&&!isFull()){
201                 dispatchMatched();
202             }
203             return;
204         }
205         throw new JMSException JavaDoc("Invalid acknowledgment: "+ack);
206     }
207
208     public Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception JavaDoc{
209         // not supported for topics
210
return null;
211     }
212
213     public int getPendingQueueSize(){
214         return matched();
215     }
216
217     public int getDispatchedQueueSize(){
218         return (int)(dispatchedCounter.get()-dequeueCounter.get());
219     }
220
221     public int getMaximumPendingMessages(){
222         return maximumPendingMessages;
223     }
224
225     public long getDispatchedCounter(){
226         return dispatchedCounter.get();
227     }
228
229     public long getEnqueueCounter(){
230         return enqueueCounter.get();
231     }
232
233     public long getDequeueCounter(){
234         return dequeueCounter.get();
235     }
236
237     /**
238      * @return the number of messages discarded due to being a slow consumer
239      */

240     public int discarded(){
241         synchronized(matchedListMutex){
242             return discarded;
243         }
244     }
245
246     /**
247      * @return the number of matched messages (messages targeted for the subscription but not yet able to be dispatched
248      * due to the prefetch buffer being full).
249      */

250     public int matched(){
251         synchronized(matchedListMutex){
252             return matched.size();
253         }
254     }
255
256     /**
257      * Sets the maximum number of pending messages that can be matched against this consumer before old messages are
258      * discarded.
259      */

260     public void setMaximumPendingMessages(int maximumPendingMessages){
261         this.maximumPendingMessages=maximumPendingMessages;
262     }
263
264     public MessageEvictionStrategy getMessageEvictionStrategy(){
265         return messageEvictionStrategy;
266     }
267
268     /**
269      * Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
270      */

271     public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
272         this.messageEvictionStrategy=messageEvictionStrategy;
273     }
274
275     // Implementation methods
276
// -------------------------------------------------------------------------
277
private boolean isFull(){
278         return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize();
279     }
280
281     /**
282      * @return true when 60% or more room is left for dispatching messages
283      */

284     public boolean isLowWaterMark(){
285         return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize() *.4);
286     }
287
288     /**
289      * @return true when 10% or less room is left for dispatching messages
290      */

291     public boolean isHighWaterMark(){
292         return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize() *.9);
293     }
294
295     /**
296      * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
297      */

298     public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
299         this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
300     }
301
302     /**
303      * @return the memoryUsageHighWaterMark
304      */

305     public int getMemoryUsageHighWaterMark(){
306         return this.memoryUsageHighWaterMark;
307     }
308
309     /**
310      * @return the usageManager
311      */

312     public UsageManager getUsageManager(){
313         return this.usageManager;
314     }
315     
316     /**
317      * @return the matched
318      */

319     public PendingMessageCursor getMatched(){
320         return this.matched;
321     }
322
323     /**
324      * @param matched the matched to set
325      */

326     public void setMatched(PendingMessageCursor matched){
327         this.matched=matched;
328     }
329
330     /**
331      * inform the MessageConsumer on the client to change it's prefetch
332      *
333      * @param newPrefetch
334      */

335     public void updateConsumerPrefetch(int newPrefetch){
336         if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
337             ConsumerControl cc=new ConsumerControl();
338             cc.setConsumerId(info.getConsumerId());
339             cc.setPrefetch(newPrefetch);
340             context.getConnection().dispatchAsync(cc);
341         }
342     }
343
344     /**
345      * optimize message consumer prefetch if the consumer supports it
346      *
347      */

348     public void optimizePrefetch(){
349         /*
350          * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
351          * &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
352          * isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
353          * updateConsumerPrefetch(info.getPrefetchSize()); }else
354          * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any
355          * outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
356          */

357     }
358
359     private void dispatchMatched() throws IOException JavaDoc{
360         synchronized(matchedListMutex){
361             try{
362                 matched.reset();
363                 while(matched.hasNext()&&!isFull()){
364                     MessageReference message=(MessageReference)matched.next();
365                     matched.remove();
366                     // Message may have been sitting in the matched list a while
367
// waiting for the consumer to ak the message.
368
if(message.isExpired()){
369                         message.decrementReferenceCount();
370                         continue; // just drop it.
371
}
372                     dispatch(message);
373                 }
374             }finally{
375                 matched.release();
376             }
377         }
378     }
379
380     private void dispatch(final MessageReference node) throws IOException JavaDoc{
381         Message message=(Message)node;
382         // Make sure we can dispatch a message.
383
MessageDispatch md=new MessageDispatch();
384         md.setMessage(message);
385         md.setConsumerId(info.getConsumerId());
386         md.setDestination(node.getRegionDestination().getActiveMQDestination());
387         dispatchedCounter.incrementAndGet();
388         // Keep track if this subscription is receiving messages from a single destination.
389
if(singleDestination){
390             if(destination==null){
391                 destination=node.getRegionDestination();
392             }else{
393                 if(destination!=node.getRegionDestination()){
394                     singleDestination=false;
395                 }
396             }
397         }
398         if(info.isDispatchAsync()){
399             md.setTransmitCallback(new Runnable JavaDoc(){
400
401                 public void run(){
402                     node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
403                     node.decrementReferenceCount();
404                 }
405             });
406             context.getConnection().dispatchAsync(md);
407         }else{
408             context.getConnection().dispatchSync(md);
409             node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
410             node.decrementReferenceCount();
411         }
412     }
413
414     public String JavaDoc toString(){
415         return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
416                 +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()
417                 +", discarded="+discarded();
418     }
419
420     public void destroy(){
421         synchronized(matchedListMutex){
422             try{
423                 matched.destroy();
424             }catch(Exception JavaDoc e){
425                log.warn("Failed to destroy cursor",e);
426             }
427         }
428     }
429
430     public int getPrefetchSize() {
431         return (int) (info.getPrefetchSize() + prefetchExtension.get());
432     }
433     
434 }
435
Popular Tags