1 18 package org.apache.activemq.broker.region.policy; 19 20 import java.util.ArrayList ; 21 import java.util.List ; 22 import org.apache.activemq.broker.ConnectionContext; 23 import org.apache.activemq.broker.region.MessageReference; 24 import org.apache.activemq.broker.region.Subscription; 25 import org.apache.activemq.broker.region.SubscriptionRecovery; 26 import org.apache.activemq.broker.region.Topic; 27 import org.apache.activemq.command.ActiveMQDestination; 28 import org.apache.activemq.command.Message; 29 import org.apache.activemq.filter.DestinationFilter; 30 import org.apache.activemq.filter.MessageEvaluationContext; 31 39 public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy{ 40 volatile private MessageReference messages[]; 41 private int maximumSize=100; 42 private int tail=0; 43 44 public SubscriptionRecoveryPolicy copy() { 45 FixedCountSubscriptionRecoveryPolicy rc = new FixedCountSubscriptionRecoveryPolicy(); 46 rc.setMaximumSize(maximumSize); 47 return rc; 48 } 49 50 synchronized public boolean add(ConnectionContext context,MessageReference node) throws Exception { 51 messages[tail++]=node; 52 if(tail>=messages.length) 53 tail=0; 54 return true; 55 } 56 57 synchronized public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception { 58 int t=tail; 60 if(messages[t]==null) 62 t=0; 63 if(messages[t]==null) 65 return; 66 do{ 68 MessageReference node=messages[t]; 69 sub.addRecoveredMessage(context,node); 70 t++; 71 if(t>=messages.length) 72 t=0; 73 }while(t!=tail); 74 } 75 76 public void start() throws Exception { 77 messages=new MessageReference[maximumSize]; 78 } 79 80 public void stop() throws Exception { 81 messages=null; 82 } 83 84 public int getMaximumSize(){ 85 return maximumSize; 86 } 87 88 91 public void setMaximumSize(int maximumSize){ 92 this.maximumSize=maximumSize; 93 } 94 95 public Message[] browse(ActiveMQDestination destination) throws Exception { 96 List result=new ArrayList (); 97 DestinationFilter filter=DestinationFilter.parseFilter(destination); 98 int t=tail; 99 if(messages[t]==null) 100 t=0; 101 if(messages[t]!=null){ 102 do{ 103 MessageReference ref=messages[t]; 104 Message message=ref.getMessage(); 105 if(filter.matches(message.getDestination())){ 106 result.add(message); 107 } 108 t++; 109 if(t>=messages.length) 110 t=0; 111 }while(t!=tail); 112 } 113 return (Message[]) result.toArray(new Message[result.size()]); 114 } 115 116 } 117 | Popular Tags |