KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > policy > PolicyEntry


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region.policy;
16
17 import org.apache.activemq.broker.Broker;
18 import org.apache.activemq.broker.region.DurableTopicSubscription;
19 import org.apache.activemq.broker.region.Queue;
20 import org.apache.activemq.broker.region.Topic;
21 import org.apache.activemq.broker.region.TopicSubscription;
22 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
23 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
24 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
25 import org.apache.activemq.filter.DestinationMapEntry;
26 import org.apache.activemq.kaha.Store;
27 import org.apache.activemq.memory.UsageManager;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 /**
32  * Represents an entry in a {@link PolicyMap} for assigning policies to a specific destination or a hierarchical
33  * wildcard area of destinations.
34  *
35  * @org.apache.xbean.XBean
36  *
37  * @version $Revision: 1.1 $
38  */

39 public class PolicyEntry extends DestinationMapEntry{
40
41     private static final Log log=LogFactory.getLog(PolicyEntry.class);
42     private DispatchPolicy dispatchPolicy;
43     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
44     private boolean sendAdvisoryIfNoConsumers;
45     private DeadLetterStrategy deadLetterStrategy;
46     private PendingMessageLimitStrategy pendingMessageLimitStrategy;
47     private MessageEvictionStrategy messageEvictionStrategy;
48     private long memoryLimit;
49     private MessageGroupMapFactory messageGroupMapFactory;
50     private PendingQueueMessageStoragePolicy pendingQueuePolicy;
51     private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
52     private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
53     public void configure(Queue queue,Store tmpStore){
54         if(dispatchPolicy!=null){
55             queue.setDispatchPolicy(dispatchPolicy);
56         }
57         if(deadLetterStrategy!=null){
58             queue.setDeadLetterStrategy(deadLetterStrategy);
59         }
60         queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
61         if(memoryLimit>0){
62             queue.getUsageManager().setLimit(memoryLimit);
63         }
64         if(pendingQueuePolicy!=null){
65             PendingMessageCursor messages=pendingQueuePolicy.getQueuePendingMessageCursor(queue,tmpStore);
66             queue.setMessages(messages);
67         }
68     }
69
70     public void configure(Topic topic){
71         if(dispatchPolicy!=null){
72             topic.setDispatchPolicy(dispatchPolicy);
73         }
74         if(deadLetterStrategy!=null){
75             topic.setDeadLetterStrategy(deadLetterStrategy);
76         }
77         if(subscriptionRecoveryPolicy!=null){
78             topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
79         }
80         topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
81         if(memoryLimit>0){
82             topic.getUsageManager().setLimit(memoryLimit);
83         }
84     }
85
86     public void configure(Broker broker,UsageManager memoryManager,TopicSubscription subscription){
87         if(pendingMessageLimitStrategy!=null){
88             int value=pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
89             int consumerLimit=subscription.getInfo().getMaximumPendingMessageLimit();
90             if(consumerLimit>0){
91                 if(value<0||consumerLimit<value){
92                     value=consumerLimit;
93                 }
94             }
95             if(value>=0){
96                 if(log.isDebugEnabled()){
97                     log.debug("Setting the maximumPendingMessages size to: "+value+" for consumer: "
98                             +subscription.getInfo().getConsumerId());
99                 }
100                 subscription.setMaximumPendingMessages(value);
101             }
102         }
103         if(messageEvictionStrategy!=null){
104             subscription.setMessageEvictionStrategy(messageEvictionStrategy);
105         }
106         if (pendingSubscriberPolicy!=null) {
107             String JavaDoc name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
108             int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
109             subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name,broker.getTempDataStore(),maxBatchSize));
110         }
111     }
112
113     public void configure(Broker broker,UsageManager memoryManager,DurableTopicSubscription sub){
114         String JavaDoc clientId=sub.getClientId();
115         String JavaDoc subName=sub.getSubscriptionName();
116         int prefetch=sub.getPrefetchSize();
117         if(pendingDurableSubscriberPolicy!=null){
118             PendingMessageCursor cursor=pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId,
119                     subName,broker.getTempDataStore(),prefetch);
120             cursor.setUsageManager(memoryManager);
121             sub.setPending(cursor);
122         }
123     }
124
125     // Properties
126
// -------------------------------------------------------------------------
127
public DispatchPolicy getDispatchPolicy(){
128         return dispatchPolicy;
129     }
130
131     public void setDispatchPolicy(DispatchPolicy policy){
132         this.dispatchPolicy=policy;
133     }
134
135     public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy(){
136         return subscriptionRecoveryPolicy;
137     }
138
139     public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy){
140         this.subscriptionRecoveryPolicy=subscriptionRecoveryPolicy;
141     }
142
143     public boolean isSendAdvisoryIfNoConsumers(){
144         return sendAdvisoryIfNoConsumers;
145     }
146
147     /**
148      * Sends an advisory message if a non-persistent message is sent and there are no active consumers
149      */

150     public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers){
151         this.sendAdvisoryIfNoConsumers=sendAdvisoryIfNoConsumers;
152     }
153
154     public DeadLetterStrategy getDeadLetterStrategy(){
155         return deadLetterStrategy;
156     }
157
158     /**
159      * Sets the policy used to determine which dead letter queue destination should be used
160      */

161     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy){
162         this.deadLetterStrategy=deadLetterStrategy;
163     }
164
165     public PendingMessageLimitStrategy getPendingMessageLimitStrategy(){
166         return pendingMessageLimitStrategy;
167     }
168
169     /**
170      * Sets the strategy to calculate the maximum number of messages that are allowed to be pending on consumers (in
171      * addition to their prefetch sizes).
172      *
173      * Once the limit is reached, non-durable topics can then start discarding old messages. This allows us to keep
174      * dispatching messages to slow consumers while not blocking fast consumers and discarding the messages oldest
175      * first.
176      */

177     public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy){
178         this.pendingMessageLimitStrategy=pendingMessageLimitStrategy;
179     }
180
181     public MessageEvictionStrategy getMessageEvictionStrategy(){
182         return messageEvictionStrategy;
183     }
184
185     /**
186      * Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
187      */

188     public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
189         this.messageEvictionStrategy=messageEvictionStrategy;
190     }
191
192     public long getMemoryLimit(){
193         return memoryLimit;
194     }
195
196     /**
197      *
198      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
199      */

200     public void setMemoryLimit(long memoryLimit){
201         this.memoryLimit=memoryLimit;
202     }
203
204     public MessageGroupMapFactory getMessageGroupMapFactory(){
205         if(messageGroupMapFactory==null){
206             messageGroupMapFactory=new MessageGroupHashBucketFactory();
207         }
208         return messageGroupMapFactory;
209     }
210
211     /**
212      * Sets the factory used to create new instances of {MessageGroupMap} used to implement the <a
213      * HREF="http://activemq.apache.org/message-groups.html">Message Groups</a> functionality.
214      */

215     public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory){
216         this.messageGroupMapFactory=messageGroupMapFactory;
217     }
218
219     
220     /**
221      * @return the pendingDurableSubscriberPolicy
222      */

223     public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
224         return this.pendingDurableSubscriberPolicy;
225     }
226
227     
228     /**
229      * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
230      */

231     public void setPendingDurableSubscriberPolicy(
232             PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
233         this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
234     }
235
236     
237     /**
238      * @return the pendingQueuePolicy
239      */

240     public PendingQueueMessageStoragePolicy getPendingQueuePolicy(){
241         return this.pendingQueuePolicy;
242     }
243
244     
245     /**
246      * @param pendingQueuePolicy the pendingQueuePolicy to set
247      */

248     public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy){
249         this.pendingQueuePolicy=pendingQueuePolicy;
250     }
251
252     
253     /**
254      * @return the pendingSubscriberPolicy
255      */

256     public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy(){
257         return this.pendingSubscriberPolicy;
258     }
259
260     
261     /**
262      * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
263      */

264     public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy){
265         this.pendingSubscriberPolicy=pendingSubscriberPolicy;
266     }
267
268 }
269
Popular Tags