1 7 package org.jboss.cache.eviction; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.Fqn; 12 import org.jboss.cache.Region; 13 import org.jboss.cache.lock.TimeoutException; 14 15 import java.util.concurrent.BlockingQueue ; 16 import java.util.concurrent.LinkedBlockingQueue ; 17 import java.util.concurrent.TimeUnit ; 18 19 28 public abstract class BaseEvictionAlgorithm implements EvictionAlgorithm 29 { 30 private static final Log log = LogFactory.getLog(BaseEvictionAlgorithm.class); 31 32 35 protected Region region; 36 37 40 protected BlockingQueue recycleQueue; 41 42 45 protected EvictionQueue evictionQueue; 46 47 55 protected abstract EvictionQueue setupEvictionQueue(Region region) throws EvictionException; 56 57 63 protected abstract boolean shouldEvictNode(NodeEntry ne); 64 65 protected BaseEvictionAlgorithm() 66 { 67 recycleQueue = new LinkedBlockingQueue (500000); 68 } 69 70 protected void initialize(Region region) throws EvictionException 71 { 72 if (region == null) 73 throw new IllegalArgumentException ("region"); 74 this.region = region; 75 evictionQueue = setupEvictionQueue(region); 76 log.debug("initialized: " + this); 77 } 78 79 91 public void process(Region region) throws EvictionException 92 { 93 if (this.region == null) 94 { 95 this.initialize(region); 96 } 97 98 if (log.isTraceEnabled()) 99 { 100 log.trace("process(): region: " + region.getFqn()); 101 } 102 103 this.processQueues(region); 104 this.emptyRecycleQueue(); 105 this.prune(); 106 } 107 108 public void resetEvictionQueue(Region region) 109 { 110 } 111 112 118 public EvictionQueue getEvictionQueue() 119 { 120 return this.evictionQueue; 121 } 122 123 133 protected void processQueues(Region region) throws EvictionException 134 { 135 EvictedEventNode node; 136 int count = 0; 137 while ((node = region.takeLastEventNode()) != null) 138 { 139 Fqn fqn = node.getFqn(); 140 141 count++; 142 switch (node.getEventType()) 143 { 144 case ADD_NODE_EVENT: 145 this.processAddedNodes(fqn, 146 node.getElementDifference(), 147 node.isResetElementCount()); 148 break; 149 case REMOVE_NODE_EVENT: 150 this.processRemovedNodes(fqn); 151 break; 152 case VISIT_NODE_EVENT: 153 this.processVisitedNodes(fqn); 154 break; 155 case ADD_ELEMENT_EVENT: 156 this.processAddedElement(fqn); 157 break; 158 case REMOVE_ELEMENT_EVENT: 159 this.processRemovedElement(fqn); 160 break; 161 case MARK_IN_USE_EVENT: 162 this.processMarkInUseNodes(fqn, node.getInUseTimeout()); 163 break; 164 case UNMARK_USE_EVENT: 165 this.processUnmarkInUseNodes(fqn); 166 break; 167 default: 168 throw new RuntimeException ("Illegal Eviction Event type " + node.getEventType()); 169 } 170 } 171 172 if (log.isTraceEnabled()) 173 { 174 log.trace("processed " + count + " node events in region: " + region.getFqn()); 175 } 176 177 } 178 179 protected void evict(NodeEntry ne) 180 { 181 if (ne != null) 183 { 184 evictionQueue.removeNodeEntry(ne); 185 if (!this.evictCacheNode(ne.getFqn())) 186 { 187 try 188 { 189 recycleQueue.put(ne.getFqn()); 190 } 191 catch (InterruptedException e) 192 { 193 log.debug("InterruptedException", e); 194 } 195 } 196 } 197 } 198 199 205 protected boolean evictCacheNode(Fqn fqn) 206 { 207 if (log.isTraceEnabled()) 208 { 209 log.trace("Attempting to evict cache node with fqn of " + fqn); 210 } 211 EvictionPolicy policy = region.getEvictionPolicy(); 212 214 try 215 { 216 policy.evict(fqn); 217 } 218 catch (Exception e) 219 { 220 if (e instanceof TimeoutException) 221 { 222 log.warn("eviction of " + fqn + " timed out. Will retry later."); 223 return false; 224 } 225 e.printStackTrace(); 226 return false; 227 } 228 229 if (log.isTraceEnabled()) 230 { 231 log.trace("Eviction of cache node with fqn of " + fqn + " successful"); 232 } 233 234 return true; 235 } 236 237 protected void processMarkInUseNodes(Fqn fqn, long inUseTimeout) throws EvictionException 238 { 239 if (log.isTraceEnabled()) 240 { 241 log.trace("Marking node " + fqn + " as in use with a usage timeout of " + inUseTimeout); 242 } 243 244 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 245 if (ne != null) 246 { 247 ne.setCurrentlyInUse(true, inUseTimeout); 248 } 249 } 250 251 protected void processUnmarkInUseNodes(Fqn fqn) throws EvictionException 252 { 253 if (log.isTraceEnabled()) 254 { 255 log.trace("Unmarking node " + fqn + " as in use"); 256 } 257 258 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 259 if (ne != null) 260 { 261 ne.setCurrentlyInUse(false, 0); 262 } 263 } 264 265 protected void processAddedNodes(Fqn fqn, int numAddedElements, boolean resetElementCount) throws EvictionException 266 { 267 if (log.isTraceEnabled()) 268 { 269 log.trace("Adding node " + fqn + " with " + numAddedElements + " elements to eviction queue"); 270 } 271 272 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 273 if (ne != null) 274 { 275 ne.setModifiedTimeStamp(System.currentTimeMillis()); 276 ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1); 277 if (resetElementCount) 278 { 279 ne.setNumberOfElements(numAddedElements); 280 } 281 else 282 { 283 ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements); 284 } 285 return; 286 } 287 288 long stamp = System.currentTimeMillis(); 289 ne = new NodeEntry(fqn); 290 ne.setModifiedTimeStamp(stamp); 291 ne.setNumberOfNodeVisits(1); 292 ne.setNumberOfElements(numAddedElements); 293 if (evictionQueue.containsNodeEntry(ne)) 295 { 296 if (log.isTraceEnabled()) 297 { 298 log.trace("Queue already contains " + ne.getFqn() + " processing it as visited"); 299 } 300 this.processVisitedNodes(ne.getFqn()); 301 return; 302 } 303 304 evictionQueue.addNodeEntry(ne); 305 306 if (log.isTraceEnabled()) 307 { 308 log.trace(ne.getFqn() + " added successfully to eviction queue"); 309 } 310 } 311 312 328 protected void processRemovedNodes(Fqn fqn) throws EvictionException 329 { 330 if (log.isTraceEnabled()) 331 { 332 log.trace("Removing node " + fqn + " from eviction queue and attempting eviction"); 333 } 334 335 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 336 if (ne != null) 337 { 338 evictionQueue.removeNodeEntry(ne); 339 } 340 else 341 { 342 if (log.isDebugEnabled()) 343 { 344 log.debug("processRemoveNodes(): Can't find node associated with fqn: " + fqn 345 + "Could have been evicted earlier. Will just continue."); 346 } 347 return; 348 } 349 350 if (log.isTraceEnabled()) 351 { 352 log.trace(fqn + " removed from eviction queue"); 353 } 354 } 355 356 369 protected void processVisitedNodes(Fqn fqn) throws EvictionException 370 { 371 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 372 if (ne == null) 373 { 374 if (log.isDebugEnabled()) 375 { 376 log.debug("Visiting node that was not added to eviction queues. Assuming that it has 1 element."); 377 } 378 this.processAddedNodes(fqn, 1, false); 379 return; 380 } 381 ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1); 385 ne.setModifiedTimeStamp(System.currentTimeMillis()); 386 } 387 388 protected void processRemovedElement(Fqn fqn) throws EvictionException 389 { 390 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 391 392 if (ne == null) 393 { 394 if (log.isDebugEnabled()) 395 { 396 log.debug("Removing element from " + fqn + " but eviction queue does not contain this node. " + 397 "Ignoring removeElement event."); 398 } 399 return; 400 } 401 402 ne.setNumberOfElements(ne.getNumberOfElements() - 1); 403 ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1); 405 ne.setModifiedTimeStamp(System.currentTimeMillis()); 406 } 407 408 protected void processAddedElement(Fqn fqn) throws EvictionException 409 { 410 NodeEntry ne = evictionQueue.getNodeEntry(fqn); 411 if (ne == null) 412 { 413 if (log.isDebugEnabled()) 414 { 415 log.debug("Adding element " + fqn + " for a node that doesn't exist yet. Process as an add."); 416 } 417 this.processAddedNodes(fqn, 1, false); 418 return; 419 } 420 421 ne.setNumberOfElements(ne.getNumberOfElements() + 1); 422 423 ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1); 425 ne.setModifiedTimeStamp(System.currentTimeMillis()); 426 } 427 428 429 436 protected void emptyRecycleQueue() throws EvictionException 437 { 438 while (true) 439 { 440 Fqn fqn; 441 442 try 443 { 444 fqn = (Fqn) recycleQueue.poll(0, TimeUnit.SECONDS); 446 } 447 catch (InterruptedException e) 448 { 449 e.printStackTrace(); 450 break; 451 } 452 453 if (fqn == null) 454 { 455 if (log.isTraceEnabled()) 456 { 457 log.trace("Recycle queue is empty"); 458 } 459 break; 460 } 461 462 if (log.isTraceEnabled()) 463 { 464 log.trace("emptying recycle bin. Evict node " + fqn); 465 } 466 467 if (!evictCacheNode(fqn)) 469 { 470 try 471 { 472 recycleQueue.put(fqn); 473 } 474 catch (InterruptedException e) 475 { 476 e.printStackTrace(); 477 } 478 break; 479 } 480 } 481 } 482 483 protected boolean isNodeInUseAndNotTimedOut(NodeEntry ne) 484 { 485 if (ne.isCurrentlyInUse()) 486 { 487 if (ne.getInUseTimeoutTimestamp() == 0) 488 { 489 return true; 490 } 491 492 if (System.currentTimeMillis() < ne.getInUseTimeoutTimestamp()) 493 { 494 return true; 495 } 496 } 497 return false; 498 } 499 500 501 protected void prune() throws EvictionException 502 { 503 NodeEntry entry; 504 while ((entry = evictionQueue.getFirstNodeEntry()) != null) 505 { 506 if (this.shouldEvictNode(entry)) 507 { 508 this.evict(entry); 509 } 510 else 511 { 512 break; 513 } 514 } 515 } 516 517 520 public String toString() 521 { 522 return super.toString() + 523 " reqion=" + region.getFqn() + 524 " recycle=" + recycleQueue.size() + 525 " evict=" + evictionQueue.getNumberOfNodes(); 526 } 527 528 } 529 | Popular Tags |