1 18 package org.apache.activemq.broker.region.policy; 19 20 import java.util.Iterator ; 21 import java.util.List ; 22 import org.apache.activemq.broker.ConnectionContext; 23 import org.apache.activemq.broker.region.MessageReference; 24 import org.apache.activemq.broker.region.SubscriptionRecovery; 25 import org.apache.activemq.broker.region.Topic; 26 import org.apache.activemq.command.ActiveMQDestination; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.memory.list.DestinationBasedMessageList; 29 import org.apache.activemq.memory.list.MessageList; 30 import org.apache.activemq.memory.list.SimpleMessageList; 31 32 41 public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 42 43 private MessageList buffer; 44 private int maximumSize = 100 * 64 * 1024; 45 private boolean useSharedBuffer = true; 46 47 public SubscriptionRecoveryPolicy copy() { 48 FixedSizedSubscriptionRecoveryPolicy rc = new FixedSizedSubscriptionRecoveryPolicy(); 49 rc.setMaximumSize(maximumSize); 50 rc.setUseSharedBuffer(useSharedBuffer); 51 return rc; 52 } 53 54 public boolean add(ConnectionContext context, MessageReference message) throws Exception { 55 buffer.add(message); 56 return true; 57 } 58 59 public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception { 60 List copy=buffer.getMessages(sub.getActiveMQDestination()); 62 if(!copy.isEmpty()){ 63 for(Iterator iter=copy.iterator();iter.hasNext();){ 64 MessageReference node=(MessageReference)iter.next(); 65 sub.addRecoveredMessage(context,node); 66 } 67 } 68 } 69 70 public void start() throws Exception { 71 buffer = createMessageList(); 72 } 73 74 public void stop() throws Exception { 75 buffer.clear(); 76 } 77 78 public MessageList getBuffer() { 81 return buffer; 82 } 83 84 public void setBuffer(MessageList buffer) { 85 this.buffer = buffer; 86 } 87 88 public int getMaximumSize() { 89 return maximumSize; 90 } 91 92 95 public void setMaximumSize(int maximumSize) { 96 this.maximumSize = maximumSize; 97 } 98 99 public boolean isUseSharedBuffer() { 100 return useSharedBuffer; 101 } 102 103 public void setUseSharedBuffer(boolean useSharedBuffer) { 104 this.useSharedBuffer = useSharedBuffer; 105 } 106 107 public Message[] browse(ActiveMQDestination destination) throws Exception { 108 return buffer.browse(destination); 109 } 110 111 113 protected MessageList createMessageList() { 115 if (useSharedBuffer) { 116 return new SimpleMessageList(maximumSize); 117 } 118 else { 119 return new DestinationBasedMessageList(maximumSize); 120 } 121 } 122 123 } 124 | Popular Tags |