1 14 15 package org.apache.activemq.broker.region.policy; 16 17 import org.apache.activemq.broker.Broker; 18 import org.apache.activemq.broker.region.DurableTopicSubscription; 19 import org.apache.activemq.broker.region.Queue; 20 import org.apache.activemq.broker.region.Topic; 21 import org.apache.activemq.broker.region.TopicSubscription; 22 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 23 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; 24 import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 25 import org.apache.activemq.filter.DestinationMapEntry; 26 import org.apache.activemq.kaha.Store; 27 import org.apache.activemq.memory.UsageManager; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 31 39 public class PolicyEntry extends DestinationMapEntry{ 40 41 private static final Log log=LogFactory.getLog(PolicyEntry.class); 42 private DispatchPolicy dispatchPolicy; 43 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 44 private boolean sendAdvisoryIfNoConsumers; 45 private DeadLetterStrategy deadLetterStrategy; 46 private PendingMessageLimitStrategy pendingMessageLimitStrategy; 47 private MessageEvictionStrategy messageEvictionStrategy; 48 private long memoryLimit; 49 private MessageGroupMapFactory messageGroupMapFactory; 50 private PendingQueueMessageStoragePolicy pendingQueuePolicy; 51 private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; 52 private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; 53 public void configure(Queue queue,Store tmpStore){ 54 if(dispatchPolicy!=null){ 55 queue.setDispatchPolicy(dispatchPolicy); 56 } 57 if(deadLetterStrategy!=null){ 58 queue.setDeadLetterStrategy(deadLetterStrategy); 59 } 60 queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); 61 if(memoryLimit>0){ 62 queue.getUsageManager().setLimit(memoryLimit); 63 } 64 if(pendingQueuePolicy!=null){ 65 PendingMessageCursor messages=pendingQueuePolicy.getQueuePendingMessageCursor(queue,tmpStore); 66 queue.setMessages(messages); 67 } 68 } 69 70 public void configure(Topic topic){ 71 if(dispatchPolicy!=null){ 72 topic.setDispatchPolicy(dispatchPolicy); 73 } 74 if(deadLetterStrategy!=null){ 75 topic.setDeadLetterStrategy(deadLetterStrategy); 76 } 77 if(subscriptionRecoveryPolicy!=null){ 78 topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy()); 79 } 80 topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); 81 if(memoryLimit>0){ 82 topic.getUsageManager().setLimit(memoryLimit); 83 } 84 } 85 86 public void configure(Broker broker,UsageManager memoryManager,TopicSubscription subscription){ 87 if(pendingMessageLimitStrategy!=null){ 88 int value=pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); 89 int consumerLimit=subscription.getInfo().getMaximumPendingMessageLimit(); 90 if(consumerLimit>0){ 91 if(value<0||consumerLimit<value){ 92 value=consumerLimit; 93 } 94 } 95 if(value>=0){ 96 if(log.isDebugEnabled()){ 97 log.debug("Setting the maximumPendingMessages size to: "+value+" for consumer: " 98 +subscription.getInfo().getConsumerId()); 99 } 100 subscription.setMaximumPendingMessages(value); 101 } 102 } 103 if(messageEvictionStrategy!=null){ 104 subscription.setMessageEvictionStrategy(messageEvictionStrategy); 105 } 106 if (pendingSubscriberPolicy!=null) { 107 String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); 108 int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); 109 subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name,broker.getTempDataStore(),maxBatchSize)); 110 } 111 } 112 113 public void configure(Broker broker,UsageManager memoryManager,DurableTopicSubscription sub){ 114 String clientId=sub.getClientId(); 115 String subName=sub.getSubscriptionName(); 116 int prefetch=sub.getPrefetchSize(); 117 if(pendingDurableSubscriberPolicy!=null){ 118 PendingMessageCursor cursor=pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, 119 subName,broker.getTempDataStore(),prefetch); 120 cursor.setUsageManager(memoryManager); 121 sub.setPending(cursor); 122 } 123 } 124 125 public DispatchPolicy getDispatchPolicy(){ 128 return dispatchPolicy; 129 } 130 131 public void setDispatchPolicy(DispatchPolicy policy){ 132 this.dispatchPolicy=policy; 133 } 134 135 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy(){ 136 return subscriptionRecoveryPolicy; 137 } 138 139 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy){ 140 this.subscriptionRecoveryPolicy=subscriptionRecoveryPolicy; 141 } 142 143 public boolean isSendAdvisoryIfNoConsumers(){ 144 return sendAdvisoryIfNoConsumers; 145 } 146 147 150 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers){ 151 this.sendAdvisoryIfNoConsumers=sendAdvisoryIfNoConsumers; 152 } 153 154 public DeadLetterStrategy getDeadLetterStrategy(){ 155 return deadLetterStrategy; 156 } 157 158 161 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy){ 162 this.deadLetterStrategy=deadLetterStrategy; 163 } 164 165 public PendingMessageLimitStrategy getPendingMessageLimitStrategy(){ 166 return pendingMessageLimitStrategy; 167 } 168 169 177 public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy){ 178 this.pendingMessageLimitStrategy=pendingMessageLimitStrategy; 179 } 180 181 public MessageEvictionStrategy getMessageEvictionStrategy(){ 182 return messageEvictionStrategy; 183 } 184 185 188 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){ 189 this.messageEvictionStrategy=messageEvictionStrategy; 190 } 191 192 public long getMemoryLimit(){ 193 return memoryLimit; 194 } 195 196 200 public void setMemoryLimit(long memoryLimit){ 201 this.memoryLimit=memoryLimit; 202 } 203 204 public MessageGroupMapFactory getMessageGroupMapFactory(){ 205 if(messageGroupMapFactory==null){ 206 messageGroupMapFactory=new MessageGroupHashBucketFactory(); 207 } 208 return messageGroupMapFactory; 209 } 210 211 215 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory){ 216 this.messageGroupMapFactory=messageGroupMapFactory; 217 } 218 219 220 223 public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){ 224 return this.pendingDurableSubscriberPolicy; 225 } 226 227 228 231 public void setPendingDurableSubscriberPolicy( 232 PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){ 233 this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy; 234 } 235 236 237 240 public PendingQueueMessageStoragePolicy getPendingQueuePolicy(){ 241 return this.pendingQueuePolicy; 242 } 243 244 245 248 public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy){ 249 this.pendingQueuePolicy=pendingQueuePolicy; 250 } 251 252 253 256 public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy(){ 257 return this.pendingSubscriberPolicy; 258 } 259 260 261 264 public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy){ 265 this.pendingSubscriberPolicy=pendingSubscriberPolicy; 266 } 267 268 } 269 | Popular Tags |