1 4 package org.jboss.cache.loader; 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jboss.cache.CacheException; 9 import org.jboss.cache.Fqn; 10 import org.jboss.cache.Modification; 11 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig; 12 import org.jboss.cache.util.MapCopy; 13 14 import java.io.IOException ; 15 import java.util.ArrayList ; 16 import java.util.HashMap ; 17 import java.util.Iterator ; 18 import java.util.List ; 19 import java.util.Map ; 20 import java.util.concurrent.BlockingQueue ; 21 import java.util.concurrent.LinkedBlockingQueue ; 22 import java.util.concurrent.TimeUnit ; 23 import java.util.concurrent.atomic.AtomicBoolean ; 24 import java.util.concurrent.atomic.AtomicInteger ; 25 26 81 public class AsyncCacheLoader extends AbstractDelegatingCacheLoader 82 { 83 84 private static final Log log = LogFactory.getLog(AsyncCacheLoader.class); 85 86 private static AtomicInteger threadId = new AtomicInteger (0); 87 88 89 92 public static final int DEFAULT_QUEUE_SIZE = 10000; 93 94 private AsynchCacheLoaderConfig config; 95 private AsyncProcessor processor; 96 private AtomicBoolean stopped = new AtomicBoolean (true); 97 private BlockingQueue <Modification> queue = new LinkedBlockingQueue (DEFAULT_QUEUE_SIZE); 98 99 100 public AsyncCacheLoader() 101 { 102 super(null); 103 } 104 105 public AsyncCacheLoader(CacheLoader cacheLoader) 106 { 107 super(cacheLoader); 108 } 109 110 public void setConfig(IndividualCacheLoaderConfig base) 111 { 112 if (base instanceof AsynchCacheLoaderConfig) 113 { 114 config = (AsynchCacheLoaderConfig) base; 115 } 116 else 117 { 118 config = new AsynchCacheLoaderConfig(base); 119 } 120 121 if (config.getQueueSize() > 0) 122 { 123 queue = new LinkedBlockingQueue (config.getQueueSize()); 124 } 125 126 super.setConfig(base); 127 } 128 129 public Map get(Fqn name) throws Exception 130 { 131 try 132 { 133 return super.get(name); 134 } 135 catch (IOException e) 136 { 137 log.trace(e); 139 return new HashMap (); } 141 } 142 143 Object get(Fqn name, Object key) throws Exception 144 { 145 if (config.getReturnOld()) 146 { 147 try 148 { 149 Map map = super.get(name); 150 if (map != null) 151 { 152 return map.get(key); 153 } 154 } 155 catch (IOException e) 156 { 157 log.trace(e); 159 } 160 } 161 return null; 162 } 163 164 public Object put(Fqn name, Object key, Object value) throws Exception 165 { 166 if (config.getUseAsyncPut()) 167 { 168 Object oldValue = get(name, key); 169 Modification mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE, name, key, value); 170 enqueue(mod); 171 return oldValue; 172 } 173 else 174 { 175 return super.put(name, key, value); 176 } 177 } 178 179 public void put(Fqn name, Map attributes) throws Exception 180 { 181 if (config.getUseAsyncPut()) 182 { 183 Map attrs = (attributes == null ? null : new MapCopy(attributes)); 185 Modification mod = new Modification(Modification.ModificationType.PUT_DATA, name, attrs); 186 enqueue(mod); 187 } 188 else 189 { 190 super.put(name, attributes); } 192 } 193 194 public void put(List <Modification> modifications) throws Exception 195 { 196 if (config.getUseAsyncPut()) 197 { 198 Iterator i = modifications.iterator(); 199 while (i.hasNext()) 200 { 201 enqueue((Modification) i.next()); 202 } 203 } 204 else 205 { 206 super.put(modifications); 207 } 208 } 209 210 public Object remove(Fqn name, Object key) throws Exception 211 { 212 Object oldValue = get(name, key); 213 Modification mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE, name, key); 214 enqueue(mod); 215 return oldValue; 216 } 217 218 public void remove(Fqn name) throws Exception 219 { 220 Modification mod = new Modification(Modification.ModificationType.REMOVE_NODE, name); 221 enqueue(mod); 222 } 223 224 public void removeData(Fqn name) throws Exception 225 { 226 Modification mod = new Modification(Modification.ModificationType.REMOVE_DATA, name); 227 enqueue(mod); 228 } 229 230 public void start() throws Exception 231 { 232 if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this); 233 stopped.set(false); 234 super.start(); 235 processor = new AsyncProcessor(); 236 processor.start(); 237 } 238 239 public void stop() 240 { 241 stopped.set(true); 242 if (processor != null) 243 { 244 processor.stop(); 245 } 246 super.stop(); 247 } 248 249 private void enqueue(Modification mod) 250 throws CacheException, InterruptedException 251 { 252 if (stopped.get()) 253 { 254 throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries."); 255 } 256 queue.put(mod); 257 } 258 259 264 private class AsyncProcessor implements Runnable 265 { 266 private Thread t; 267 268 private final List mods = new ArrayList (config.getBatchSize()); 270 271 public void start() 272 { 273 if (t == null || !t.isAlive()) 274 { 275 t = new Thread (this, "AsyncCacheLoader-" + threadId.getAndIncrement()); 276 t.start(); 278 } 279 } 280 281 public void stop() 282 { 283 if (t != null) 284 { 285 t.interrupt(); 286 try 287 { 288 t.join(); 289 } 290 catch (InterruptedException e) 291 { 292 } 293 } 294 if (!queue.isEmpty()) 295 { 296 log.warn("Async queue not yet empty, possibly interrupted"); 297 } 298 } 299 300 public void run() 301 { 302 while (!Thread.interrupted()) 303 { 304 try 305 { 306 run0(); 307 } 308 catch (InterruptedException e) 309 { 310 break; 311 } 312 } 313 314 try 315 { 316 if (log.isTraceEnabled()) log.trace("process remaining batch " + mods.size()); 317 put(mods); 318 if (log.isTraceEnabled()) log.trace("process remaining queued " + queue.size()); 319 while (!queue.isEmpty()) 320 { 321 run0(); 322 } 323 } 324 catch (InterruptedException e) 325 { 326 log.trace("remaining interrupted"); 327 } 328 } 329 330 private void run0() throws InterruptedException 331 { 332 log.trace("run0"); 333 Object o = queue.take(); 334 addTaken(o); 335 while (mods.size() < config.getBatchSize()) 336 { 337 o = queue.poll(config.getPollWait(), TimeUnit.MILLISECONDS); 338 if (o == null) 339 { 340 break; 341 } 342 addTaken(o); 343 } 344 if (log.isTraceEnabled()) 345 { 346 log.trace("put " + mods.size()); 347 } 348 put(mods); 349 mods.clear(); 350 } 351 352 private void addTaken(Object o) 353 { 354 if (o instanceof List ) 355 { 356 mods.addAll((List ) o); 357 } 358 else 359 { 360 Modification mod = (Modification) o; 361 mods.add(mod); 362 } 363 } 364 365 private void put(List <Modification> mods) 366 { 367 try 368 { 369 AsyncCacheLoader.super.put(mods); 370 } 371 catch (Exception e) 372 { 373 if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e); 374 log.debug("Exception: ", e); 375 } 376 } 377 378 public String toString() 379 { 380 return "TQ t=" + t; 381 } 382 383 } 384 385 public String toString() 386 { 387 return super.toString() + 388 " delegate=[" + super.getCacheLoader() + "]" + 389 " processor=" + processor + 390 " stopped=" + stopped + 391 " batchSize=" + config.getBatchSize() + 392 " pollWait=" + config.getPollWait() + 393 " returnOld=" + config.getReturnOld() + 394 " asyncPut=" + config.getUseAsyncPut() + 395 " queue.remainingCapacity()=" + queue.remainingCapacity() + 396 " queue.peek()=" + queue.peek(); 397 } 398 399 } 400 | Popular Tags |