1 7 package org.jboss.cache.notifications; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.CacheImpl; 12 import org.jboss.cache.CacheListener; 13 import org.jboss.cache.CacheSPI; 14 import org.jboss.cache.Fqn; 15 import org.jboss.cache.InvocationContext; 16 import org.jboss.cache.marshall.MethodCall; 17 import org.jboss.cache.util.MapCopy; 18 import org.jgroups.View; 19 20 import java.lang.reflect.Method ; 21 import java.util.Collections ; 22 import java.util.List ; 23 import java.util.Map ; 24 import java.util.Set ; 25 import java.util.concurrent.CopyOnWriteArraySet ; 26 27 32 public class Notifier 33 { 34 35 private boolean hasListeners = false; 40 41 private CacheListener evictionPolicyListener; 45 46 private final Set <CacheListener> listeners = new CopyOnWriteArraySet <CacheListener>(); 47 private CacheImpl cache; 48 private InvocationContext tempCtx; 49 private static final Log log = LogFactory.getLog(Notifier.class); 50 51 private static Method nodeCreated, nodeModified, nodeRemoved, nodeVisited, nodeEvicted, nodeLoaded, nodePassivated, nodeActivated, nodeMoved, cacheStarted, cacheStopped, viewChange; 53 54 static 55 { 56 try 57 { 58 nodeCreated = CacheListener.class.getMethod("nodeCreated", Fqn.class, boolean.class, boolean.class); 59 nodeModified = CacheListener.class.getMethod("nodeModified", Fqn.class, boolean.class, boolean.class, CacheListener.ModificationType.class, Map .class); 60 nodeRemoved = CacheListener.class.getMethod("nodeRemoved", Fqn.class, boolean.class, boolean.class, Map .class); 61 nodeVisited = CacheListener.class.getMethod("nodeVisited", Fqn.class, boolean.class); 62 nodeEvicted = CacheListener.class.getMethod("nodeEvicted", Fqn.class, boolean.class, boolean.class); 63 nodeLoaded = CacheListener.class.getMethod("nodeLoaded", Fqn.class, boolean.class, Map .class); 64 nodePassivated = CacheListener.class.getMethod("nodePassivated", Fqn.class, boolean.class); 65 nodeActivated = CacheListener.class.getMethod("nodeActivated", Fqn.class, boolean.class); 66 nodeMoved = CacheListener.class.getMethod("nodeMoved", Fqn.class, Fqn.class, boolean.class, boolean.class); 67 cacheStarted = CacheListener.class.getMethod("cacheStarted", CacheSPI.class); 68 cacheStopped = CacheListener.class.getMethod("cacheStopped", CacheSPI.class); 69 viewChange = CacheListener.class.getMethod("viewChange", View.class); 70 } 71 catch (Exception e) 72 { 73 log.error("Unable to initialise Notifier - unable to get Methods on CacheListener.class", e); 74 } 75 } 76 77 78 public Notifier(CacheImpl cache) 79 { 80 this.cache = cache; 81 } 82 83 88 public void setEvictionPolicyListener(CacheListener l) 89 { 90 evictionPolicyListener = l; 91 } 92 93 98 public void addCacheListener(CacheListener l) 99 { 100 synchronized (listeners) 101 { 102 listeners.add(l); 103 hasListeners = true; 104 } 105 } 106 107 112 public void removeCacheListener(CacheListener l) 113 { 114 synchronized (listeners) 115 { 116 listeners.remove(l); 117 hasListeners = !listeners.isEmpty(); 118 } 119 } 120 121 124 public void removeAllCacheListeners() 125 { 126 listeners.clear(); 127 hasListeners = false; 128 } 129 130 133 public Set <CacheListener> getCacheListeners() 134 { 135 return Collections.unmodifiableSet(listeners); 136 } 137 138 140 147 public synchronized void notifyNodeCreated(Fqn fqn, boolean pre, boolean sendImmediately) 148 { 149 150 boolean originLocal = cache.getInvocationContext().isOriginLocal(); 151 if (sendImmediately) 152 { 153 resetInvocationContext(); 154 if (evictionPolicyListener != null) 155 { 156 evictionPolicyListener.nodeCreated(fqn, pre, originLocal); 157 } 158 if (hasListeners) 159 { 160 for (CacheListener listener : listeners) 161 { 162 listener.nodeCreated(fqn, pre, originLocal); 163 } 164 } 165 restoreInvocationContext(); 166 } 167 else 168 { 169 MethodCall call = new MethodCall(nodeCreated, new Object []{fqn, pre, originLocal}); 170 cache.getInvocationContext().addCacheListenerEvent(call); 171 } 172 } 173 174 183 public synchronized void notifyNodeModified(Fqn fqn, boolean pre, CacheListener.ModificationType modificationType, Map <Object , Object > data, boolean sendImmediately) 184 { 185 boolean originLocal = cache.getInvocationContext().isOriginLocal(); 186 Map dataCopy = copy(data); 187 if (sendImmediately) 188 { 189 resetInvocationContext(); 190 if (evictionPolicyListener != null) 191 { 192 evictionPolicyListener.nodeModified(fqn, pre, originLocal, modificationType, dataCopy); 193 } 194 if (hasListeners) 195 { 196 for (CacheListener listener : listeners) 197 { 198 listener.nodeModified(fqn, pre, originLocal, modificationType, dataCopy); 199 } 200 } 201 restoreInvocationContext(); 202 } 203 else 204 { 205 MethodCall call = new MethodCall(nodeModified, new Object []{fqn, pre, originLocal, modificationType, dataCopy}); 206 cache.getInvocationContext().addCacheListenerEvent(call); 207 } 208 } 209 210 218 public synchronized void notifyNodeRemoved(Fqn fqn, boolean pre, Map <Object , Object > data, boolean sendImmediately) 219 { 220 boolean originLocal = cache.getInvocationContext().isOriginLocal(); 221 Map dataCopy = copy(data); 222 if (sendImmediately) 223 { 224 resetInvocationContext(); 225 if (evictionPolicyListener != null) 226 { 227 evictionPolicyListener.nodeRemoved(fqn, pre, originLocal, dataCopy); 228 } 229 if (hasListeners) 230 { 231 for (CacheListener listener : listeners) 232 { 233 listener.nodeRemoved(fqn, pre, originLocal, dataCopy); 234 } 235 } 236 restoreInvocationContext(); 237 } 238 else 239 { 240 MethodCall call = new MethodCall(nodeRemoved, new Object []{fqn, pre, originLocal, dataCopy}); 241 cache.getInvocationContext().addCacheListenerEvent(call); 242 } 243 244 } 245 246 253 public synchronized void notifyNodeVisited(Fqn fqn, boolean pre, boolean sendImmediately) 254 { 255 if (sendImmediately) 256 { 257 resetInvocationContext(); 258 if (evictionPolicyListener != null) 259 { 260 evictionPolicyListener.nodeVisited(fqn, pre); 261 } 262 if (hasListeners) 263 { 264 for (CacheListener listener : listeners) 265 { 266 listener.nodeVisited(fqn, pre); 267 } 268 } 269 restoreInvocationContext(); 270 } 271 else 272 { 273 MethodCall call = new MethodCall(nodeVisited, new Object []{fqn, pre}); 274 cache.getInvocationContext().addCacheListenerEvent(call); 275 } 276 277 } 278 279 public synchronized void notifyNodeMoved(Fqn originalFqn, Fqn newFqn, boolean pre, boolean sendImmediately) 280 { 281 boolean originLocal = cache.getInvocationContext().isOriginLocal(); 282 if (sendImmediately) 283 { 284 resetInvocationContext(); 285 if (evictionPolicyListener != null) 286 { 287 evictionPolicyListener.nodeMoved(originalFqn, newFqn, pre, originLocal); 288 } 289 if (hasListeners) 290 { 291 for (CacheListener listener : listeners) 292 { 293 listener.nodeMoved(originalFqn, newFqn, pre, originLocal); 294 } 295 } 296 restoreInvocationContext(); 297 } 298 else 299 { 300 MethodCall call = new MethodCall(nodeMoved, new Object []{originalFqn, newFqn, pre, originLocal}); 301 cache.getInvocationContext().addCacheListenerEvent(call); 302 } 303 } 304 305 306 313 public synchronized void notifyNodeEvicted(Fqn fqn, boolean pre, boolean sendImmediately) 314 { 315 boolean originLocal = cache.getInvocationContext().isOriginLocal(); 316 if (sendImmediately) 317 { 318 resetInvocationContext(); 319 if (evictionPolicyListener != null) 320 { 321 evictionPolicyListener.nodeEvicted(fqn, pre, originLocal); 322 } 323 if (hasListeners) 324 { 325 for (CacheListener listener : listeners) 326 { 327 listener.nodeEvicted(fqn, pre, originLocal); 328 } 329 } 330 restoreInvocationContext(); 331 } 332 else 333 { 334 MethodCall call = new MethodCall(nodeEvicted, new Object []{fqn, pre, originLocal}); 335 cache.getInvocationContext().addCacheListenerEvent(call); 336 } 337 } 338 339 347 public synchronized void notifyNodeLoaded(Fqn fqn, boolean pre, Map <Object , Object > data, boolean sendImmediately) 348 { 349 Map dataCopy = copy(data); 350 if (sendImmediately) 351 { 352 resetInvocationContext(); 353 if (evictionPolicyListener != null) 354 { 355 evictionPolicyListener.nodeLoaded(fqn, pre, dataCopy); 356 } 357 if (hasListeners) 358 { 359 for (CacheListener listener : listeners) 360 { 361 listener.nodeLoaded(fqn, pre, dataCopy); 362 } 363 } 364 restoreInvocationContext(); 365 } 366 else 367 { 368 MethodCall call = new MethodCall(nodeLoaded, new Object []{fqn, pre, dataCopy}); 369 cache.getInvocationContext().addCacheListenerEvent(call); 370 } 371 } 372 373 380 public synchronized void notifyNodeActivated(Fqn fqn, boolean pre, boolean sendImmediately) 381 { 382 if (sendImmediately) 383 { 384 resetInvocationContext(); 385 if (evictionPolicyListener != null) 386 { 387 evictionPolicyListener.nodeActivated(fqn, pre); 388 } 389 if (hasListeners) 390 { 391 for (CacheListener listener : listeners) 392 { 393 listener.nodeActivated(fqn, pre); 394 } 395 } 396 restoreInvocationContext(); 397 } 398 else 399 { 400 MethodCall call = new MethodCall(nodeActivated, new Object []{fqn, pre}); 401 cache.getInvocationContext().addCacheListenerEvent(call); 402 } 403 } 404 405 412 public synchronized void notifyNodePassivated(Fqn fqn, boolean pre, boolean sendImmediately) 413 { 414 if (sendImmediately) 415 { 416 resetInvocationContext(); 417 if (evictionPolicyListener != null) 418 { 419 evictionPolicyListener.nodePassivated(fqn, pre); 420 } 421 if (hasListeners) 422 { 423 for (CacheListener listener : listeners) 424 { 425 listener.nodePassivated(fqn, pre); 426 } 427 } 428 restoreInvocationContext(); 429 } 430 else 431 { 432 MethodCall call = new MethodCall(nodePassivated, new Object []{fqn, pre}); 433 cache.getInvocationContext().addCacheListenerEvent(call); 434 } 435 } 436 437 443 public synchronized void notifyCacheStarted(CacheSPI cache, boolean sendImmediately) 444 { 445 if (sendImmediately) 446 { 447 resetInvocationContext(); 448 if (evictionPolicyListener != null) 449 { 450 evictionPolicyListener.cacheStarted(cache); 451 } 452 if (hasListeners) 453 { 454 for (CacheListener listener : listeners) 455 { 456 listener.cacheStarted(cache); 457 } 458 } 459 restoreInvocationContext(); 460 } 461 else 462 { 463 MethodCall call = new MethodCall(cacheStarted, new Object []{cache}); 464 cache.getInvocationContext().addCacheListenerEvent(call); 465 } 466 } 467 468 474 public synchronized void notifyCacheStopped(CacheSPI cache, boolean sendImmediately) 475 { 476 if (sendImmediately) 477 { 478 resetInvocationContext(); 479 if (evictionPolicyListener != null) 480 { 481 evictionPolicyListener.cacheStopped(cache); 482 } 483 if (hasListeners) 484 { 485 for (CacheListener listener : listeners) 486 { 487 listener.cacheStopped(cache); 488 } 489 } 490 restoreInvocationContext(); 491 } 492 else 493 { 494 MethodCall call = new MethodCall(cacheStopped, new Object []{cache}); 495 cache.getInvocationContext().addCacheListenerEvent(call); 496 } 497 } 498 499 505 public synchronized void notifyViewChange(View new_view, boolean sendImmediately) 506 { 507 if (sendImmediately) 508 { 509 resetInvocationContext(); 510 if (evictionPolicyListener != null) 511 { 512 evictionPolicyListener.viewChange(new_view); 513 } 514 if (hasListeners) 515 { 516 for (CacheListener listener : listeners) 517 { 518 listener.viewChange(new_view); 519 } 520 } 521 restoreInvocationContext(); 522 } 523 else 524 { 525 MethodCall call = new MethodCall(viewChange, new Object []{new_view}); 526 cache.getInvocationContext().addCacheListenerEvent(call); 527 } 528 } 529 530 private Map copy(Map <Object , Object > data) 531 { 532 if (safe(data)) return data; 533 return new MapCopy<Object , Object >(data); 534 } 535 536 private void restoreInvocationContext() 537 { 538 cache.getInvocationContext().setState(tempCtx); 539 } 540 541 private void resetInvocationContext() 542 { 543 try 544 { 545 tempCtx = cache.getInvocationContext().clone(); 546 } 547 catch (CloneNotSupportedException e) 548 { 549 log.warn("Unable to clone Invocation Context " + cache.getInvocationContext(), e); 550 } 551 cache.getInvocationContext().reset(); 552 } 553 554 565 private boolean safe(Map map) 566 { 567 return map == null || map instanceof MapCopy || map.getClass().equals(Collections.emptyMap().getClass()); 568 } 569 570 575 public void invokeQueuedNotifications(List <MethodCall> queue) 576 { 577 resetInvocationContext(); 578 for (MethodCall c : queue) 579 { 580 if (evictionPolicyListener != null) 581 { 582 try 583 { 584 c.invoke(evictionPolicyListener); 585 } 586 catch (Throwable throwable) 587 { 588 log.error("Unable to deliver queued notification " + c + " to eviction policy listener", throwable); 589 } 590 } 591 if (hasListeners) 592 { 593 for (CacheListener listener : listeners) 594 { 595 try 596 { 597 c.invoke(listener); 598 } 599 catch (Throwable throwable) 600 { 601 log.error("Unable to deliver queued notification " + c + " to listener " + listener, throwable); 602 } 603 } 604 } 605 } 606 restoreInvocationContext(); 607 } 608 609 612 public void invokeQueuedNotifications() 613 { 614 invokeQueuedNotifications(cache.getInvocationContext().getCacheListenerEvents()); 615 } 616 } 617 | Popular Tags |