1 7 package org.jboss.cache; 8 9 10 import org.apache.commons.logging.Log; 11 import org.apache.commons.logging.LogFactory; 12 import org.jboss.cache.marshall.MethodCall; 13 import org.jboss.cache.marshall.MethodDeclarations; 14 15 import java.util.ArrayList ; 16 import java.util.LinkedList ; 17 import java.util.List ; 18 import java.util.Timer ; 19 import java.util.TimerTask ; 20 21 22 28 public class ReplicationQueue 29 { 30 31 private static Log log = LogFactory.getLog(ReplicationQueue.class); 32 33 private CacheImpl cache = null; 34 35 38 private long interval = 5000; 39 40 43 private long max_elements = 500; 44 45 48 private final List <MethodCall> elements = new LinkedList <MethodCall>(); 49 50 53 private Timer timer = null; 54 55 58 private MyTask task = null; 59 60 public ReplicationQueue() 61 { 62 } 63 64 67 public ReplicationQueue(CacheImpl cache, long interval, long max_elements) 68 { 69 this.cache = cache; 70 this.interval = interval; 71 this.max_elements = max_elements; 72 } 73 74 77 public long getInterval() 78 { 79 return interval; 80 } 81 82 85 public void setInterval(long interval) 86 { 87 this.interval = interval; 88 stop(); 89 start(); 90 } 91 92 96 public long getMax_elements() 97 { 98 return max_elements; 99 } 100 101 104 public void setMax_elements(long max_elements) 105 { 106 this.max_elements = max_elements; 107 } 108 109 112 public synchronized void start() 113 { 114 if (interval > 0) 115 { 116 if (task == null) 117 task = new MyTask(); 118 if (timer == null) 119 { 120 timer = new Timer (true); 121 timer.schedule(task, 122 500, interval); } 125 } 126 } 127 128 131 public synchronized void stop() 132 { 133 if (task != null) 134 { 135 task.cancel(); 136 task = null; 137 } 138 if (timer != null) 139 { 140 timer.cancel(); 141 timer = null; 142 } 143 } 144 145 146 149 public void add(MethodCall job) 150 { 151 if (job == null) 152 throw new NullPointerException ("job is null"); 153 synchronized (elements) 154 { 155 elements.add(job); 156 if (elements.size() >= max_elements) 157 flush(); 158 } 159 } 160 161 164 public void flush() 165 { 166 List <MethodCall> l; 167 synchronized (elements) 168 { 169 if (log.isTraceEnabled()) 170 log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")"); 171 l = new ArrayList <MethodCall>(elements); 172 elements.clear(); 173 } 174 175 try 176 { 177 cache.callRemoteMethods(null, MethodDeclarations.replicateAllMethod, new Object []{l}, false, true, 5000); 179 } 180 catch (Throwable t) 181 { 182 log.error("failed replicating " + l.size() + " elements in replication queue", t); 183 } 184 } 185 186 class MyTask extends TimerTask 187 { 188 public void run() 189 { 190 flush(); 191 } 192 } 193 194 } 195 | Popular Tags |