1 18 package org.apache.activemq.broker.region.policy; 19 20 import java.util.ArrayList ; 21 import java.util.Collections ; 22 import java.util.Iterator ; 23 import java.util.LinkedList ; 24 import java.util.List ; 25 import org.apache.activemq.broker.ConnectionContext; 26 import org.apache.activemq.broker.region.MessageReference; 27 import org.apache.activemq.broker.region.SubscriptionRecovery; 28 import org.apache.activemq.broker.region.Topic; 29 import org.apache.activemq.command.ActiveMQDestination; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.filter.DestinationFilter; 32 import org.apache.activemq.filter.MessageEvaluationContext; 33 import org.apache.activemq.thread.Scheduler; 34 35 44 public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 45 46 private static final int GC_INTERVAL = 1000; 47 48 private final List buffer = Collections.synchronizedList(new LinkedList ()); 51 private volatile long lastGCRun = System.currentTimeMillis(); 52 53 private long recoverDuration = 60 * 1000; 55 static class TimestampWrapper { 56 public MessageReference message; 57 public long timestamp; 58 59 public TimestampWrapper(MessageReference message, long timestamp) { 60 this.message = message; 61 this.timestamp = timestamp; 62 } 63 } 64 65 private final Runnable gcTask = new Runnable () { 66 public void run() { 67 gc(); 68 } 69 }; 70 71 public SubscriptionRecoveryPolicy copy() { 72 TimedSubscriptionRecoveryPolicy rc = new TimedSubscriptionRecoveryPolicy(); 73 rc.setRecoverDuration(recoverDuration); 74 return rc; 75 } 76 77 public boolean add(ConnectionContext context, MessageReference message) throws Exception { 78 buffer.add(new TimestampWrapper(message, lastGCRun)); 79 return true; 80 } 81 82 public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception { 83 ArrayList copy=new ArrayList (buffer); 85 if(!copy.isEmpty()){ 86 MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); 87 for(Iterator iter=copy.iterator();iter.hasNext();){ 88 TimestampWrapper timestampWrapper=(TimestampWrapper)iter.next(); 89 MessageReference message=timestampWrapper.message; 90 sub.addRecoveredMessage(context,message); 91 } 92 } 93 } 94 95 public void start() throws Exception { 96 Scheduler.executePeriodically(gcTask, GC_INTERVAL); 97 } 98 99 public void stop() throws Exception { 100 Scheduler.cancel(gcTask); 101 } 102 103 public void gc() { 104 lastGCRun = System.currentTimeMillis(); 105 while (buffer.size() > 0) { 106 TimestampWrapper timestampWrapper = (TimestampWrapper) buffer.get(0); 107 if( lastGCRun > timestampWrapper.timestamp+recoverDuration ) { 108 buffer.remove(0); 110 } 111 else { 112 break; 113 } 114 } 115 } 116 117 public long getRecoverDuration() { 118 return recoverDuration; 119 } 120 121 public void setRecoverDuration(long recoverDuration) { 122 this.recoverDuration = recoverDuration; 123 } 124 125 public Message[] browse(ActiveMQDestination destination) throws Exception { 126 List result = new ArrayList (); 127 ArrayList copy = new ArrayList (buffer); 128 DestinationFilter filter=DestinationFilter.parseFilter(destination); 129 for (Iterator iter = copy.iterator(); iter.hasNext();) { 130 TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next(); 131 MessageReference ref = timestampWrapper.message; 132 Message message=ref.getMessage(); 133 if (filter.matches(message.getDestination())){ 134 result.add(message); 135 } 136 } 137 return (Message[]) result.toArray(new Message[result.size()]); 138 } 139 140 } 141 | Popular Tags |