KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > notifications > Notifier


1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.notifications;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.jboss.cache.CacheImpl;
12 import org.jboss.cache.CacheListener;
13 import org.jboss.cache.CacheSPI;
14 import org.jboss.cache.Fqn;
15 import org.jboss.cache.InvocationContext;
16 import org.jboss.cache.marshall.MethodCall;
17 import org.jboss.cache.util.MapCopy;
18 import org.jgroups.View;
19
20 import java.lang.reflect.Method JavaDoc;
21 import java.util.Collections JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.Set JavaDoc;
25 import java.util.concurrent.CopyOnWriteArraySet JavaDoc;
26
27 /**
28  * Helper class that handles all notifications to registered listeners.
29  *
30  * @author <a HREF="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
31  */

32 public class Notifier
33 {
34
35    // calling iterator on a Concurrent Set is expensive due to synchronization - same problem
36
// with calling isEmpty so hasListeners is an optimization to indicate whether or not listeners
37
// is empty
38
//
39
private boolean hasListeners = false;
40
41    // store this seperately from other listeners to avoid concurrency penalty of
42
// iterating through Concurrent Set - eviction listener is always there (or almost always)
43
// and there are less frequently other listeners so optimization is justified
44
private CacheListener evictionPolicyListener;
45
46    private final Set JavaDoc<CacheListener> listeners = new CopyOnWriteArraySet JavaDoc<CacheListener>();
47    private CacheImpl cache;
48    private InvocationContext tempCtx;
49    private static final Log log = LogFactory.getLog(Notifier.class);
50
51    // --- the java.lang.reflect.Methods of CacheListener
52
private static Method JavaDoc nodeCreated, nodeModified, nodeRemoved, nodeVisited, nodeEvicted, nodeLoaded, nodePassivated, nodeActivated, nodeMoved, cacheStarted, cacheStopped, viewChange;
53
54    static
55    {
56       try
57       {
58          nodeCreated = CacheListener.class.getMethod("nodeCreated", Fqn.class, boolean.class, boolean.class);
59          nodeModified = CacheListener.class.getMethod("nodeModified", Fqn.class, boolean.class, boolean.class, CacheListener.ModificationType.class, Map JavaDoc.class);
60          nodeRemoved = CacheListener.class.getMethod("nodeRemoved", Fqn.class, boolean.class, boolean.class, Map JavaDoc.class);
61          nodeVisited = CacheListener.class.getMethod("nodeVisited", Fqn.class, boolean.class);
62          nodeEvicted = CacheListener.class.getMethod("nodeEvicted", Fqn.class, boolean.class, boolean.class);
63          nodeLoaded = CacheListener.class.getMethod("nodeLoaded", Fqn.class, boolean.class, Map JavaDoc.class);
64          nodePassivated = CacheListener.class.getMethod("nodePassivated", Fqn.class, boolean.class);
65          nodeActivated = CacheListener.class.getMethod("nodeActivated", Fqn.class, boolean.class);
66          nodeMoved = CacheListener.class.getMethod("nodeMoved", Fqn.class, Fqn.class, boolean.class, boolean.class);
67          cacheStarted = CacheListener.class.getMethod("cacheStarted", CacheSPI.class);
68          cacheStopped = CacheListener.class.getMethod("cacheStopped", CacheSPI.class);
69          viewChange = CacheListener.class.getMethod("viewChange", View.class);
70       }
71       catch (Exception JavaDoc e)
72       {
73          log.error("Unable to initialise Notifier - unable to get Methods on CacheListener.class", e);
74       }
75    }
76
77
78    public Notifier(CacheImpl cache)
79    {
80       this.cache = cache;
81    }
82
83    /**
84     * Sets an eviction policy listener.
85     *
86     * @param l
87     */

88    public void setEvictionPolicyListener(CacheListener l)
89    {
90       evictionPolicyListener = l;
91    }
92
93    /**
94     * Adds a cache listener to the list of cache listeners registered.
95     *
96     * @param l
97     */

98    public void addCacheListener(CacheListener l)
99    {
100       synchronized (listeners)
101       {
102          listeners.add(l);
103          hasListeners = true;
104       }
105    }
106
107    /**
108     * Removes a cache listener from the list of cache listeners registered.
109     *
110     * @param l
111     */

112    public void removeCacheListener(CacheListener l)
113    {
114       synchronized (listeners)
115       {
116          listeners.remove(l);
117          hasListeners = !listeners.isEmpty();
118       }
119    }
120
121    /**
122     * Removes all listeners from the notifier, including the evictionPolicyListener.
123     */

124    public void removeAllCacheListeners()
125    {
126       listeners.clear();
127       hasListeners = false;
128    }
129
130    /**
131     * Retrieves an (unmodifiable) set of cache listeners registered.
132     */

133    public Set JavaDoc<CacheListener> getCacheListeners()
134    {
135       return Collections.unmodifiableSet(listeners);
136    }
137
138    // TODO: MANIK: Revisit these sync statements on these methods. Needed to make sure all notifications from OptimisticCreateIfNotExistsInterceptor are delivered properly, but a very bad idea since a user's Listener may cause the cache to hang.
139

140    /**
141     * Notifies all registered listeners of a nodeCreated event.
142     *
143     * @param fqn
144     * @param pre
145     * @param sendImmediately
146     */

147    public synchronized void notifyNodeCreated(Fqn fqn, boolean pre, boolean sendImmediately)
148    {
149
150       boolean originLocal = cache.getInvocationContext().isOriginLocal();
151       if (sendImmediately)
152       {
153          resetInvocationContext();
154          if (evictionPolicyListener != null)
155          {
156             evictionPolicyListener.nodeCreated(fqn, pre, originLocal);
157          }
158          if (hasListeners)
159          {
160             for (CacheListener listener : listeners)
161             {
162                listener.nodeCreated(fqn, pre, originLocal);
163             }
164          }
165          restoreInvocationContext();
166       }
167       else
168       {
169          MethodCall call = new MethodCall(nodeCreated, new Object JavaDoc[]{fqn, pre, originLocal});
170          cache.getInvocationContext().addCacheListenerEvent(call);
171       }
172    }
173
174    /**
175     * Notifies all registered listeners of a nodeModified event.
176     *
177     * @param fqn
178     * @param pre
179     * @param modificationType
180     * @param data
181     * @param sendImmediately
182     */

183    public synchronized void notifyNodeModified(Fqn fqn, boolean pre, CacheListener.ModificationType modificationType, Map JavaDoc<Object JavaDoc, Object JavaDoc> data, boolean sendImmediately)
184    {
185       boolean originLocal = cache.getInvocationContext().isOriginLocal();
186       Map JavaDoc dataCopy = copy(data);
187       if (sendImmediately)
188       {
189          resetInvocationContext();
190          if (evictionPolicyListener != null)
191          {
192             evictionPolicyListener.nodeModified(fqn, pre, originLocal, modificationType, dataCopy);
193          }
194          if (hasListeners)
195          {
196             for (CacheListener listener : listeners)
197             {
198                listener.nodeModified(fqn, pre, originLocal, modificationType, dataCopy);
199             }
200          }
201          restoreInvocationContext();
202       }
203       else
204       {
205          MethodCall call = new MethodCall(nodeModified, new Object JavaDoc[]{fqn, pre, originLocal, modificationType, dataCopy});
206          cache.getInvocationContext().addCacheListenerEvent(call);
207       }
208    }
209
210    /**
211     * Notifies all registered listeners of a nodeRemoved event.
212     *
213     * @param fqn
214     * @param pre
215     * @param data
216     * @param sendImmediately
217     */

218    public synchronized void notifyNodeRemoved(Fqn fqn, boolean pre, Map JavaDoc<Object JavaDoc, Object JavaDoc> data, boolean sendImmediately)
219    {
220       boolean originLocal = cache.getInvocationContext().isOriginLocal();
221       Map JavaDoc dataCopy = copy(data);
222       if (sendImmediately)
223       {
224          resetInvocationContext();
225          if (evictionPolicyListener != null)
226          {
227             evictionPolicyListener.nodeRemoved(fqn, pre, originLocal, dataCopy);
228          }
229          if (hasListeners)
230          {
231             for (CacheListener listener : listeners)
232             {
233                listener.nodeRemoved(fqn, pre, originLocal, dataCopy);
234             }
235          }
236          restoreInvocationContext();
237       }
238       else
239       {
240          MethodCall call = new MethodCall(nodeRemoved, new Object JavaDoc[]{fqn, pre, originLocal, dataCopy});
241          cache.getInvocationContext().addCacheListenerEvent(call);
242       }
243
244    }
245
246    /**
247     * Notifies all registered listeners of a nodeVisited event.
248     *
249     * @param fqn
250     * @param pre
251     * @param sendImmediately
252     */

253    public synchronized void notifyNodeVisited(Fqn fqn, boolean pre, boolean sendImmediately)
254    {
255       if (sendImmediately)
256       {
257          resetInvocationContext();
258          if (evictionPolicyListener != null)
259          {
260             evictionPolicyListener.nodeVisited(fqn, pre);
261          }
262          if (hasListeners)
263          {
264             for (CacheListener listener : listeners)
265             {
266                listener.nodeVisited(fqn, pre);
267             }
268          }
269          restoreInvocationContext();
270       }
271       else
272       {
273          MethodCall call = new MethodCall(nodeVisited, new Object JavaDoc[]{fqn, pre});
274          cache.getInvocationContext().addCacheListenerEvent(call);
275       }
276
277    }
278
279    public synchronized void notifyNodeMoved(Fqn originalFqn, Fqn newFqn, boolean pre, boolean sendImmediately)
280    {
281       boolean originLocal = cache.getInvocationContext().isOriginLocal();
282       if (sendImmediately)
283       {
284          resetInvocationContext();
285          if (evictionPolicyListener != null)
286          {
287             evictionPolicyListener.nodeMoved(originalFqn, newFqn, pre, originLocal);
288          }
289          if (hasListeners)
290          {
291             for (CacheListener listener : listeners)
292             {
293                listener.nodeMoved(originalFqn, newFqn, pre, originLocal);
294             }
295          }
296          restoreInvocationContext();
297       }
298       else
299       {
300          MethodCall call = new MethodCall(nodeMoved, new Object JavaDoc[]{originalFqn, newFqn, pre, originLocal});
301          cache.getInvocationContext().addCacheListenerEvent(call);
302       }
303    }
304
305
306    /**
307     * Notifies all registered listeners of a nodeEvicted event.
308     *
309     * @param fqn
310     * @param pre
311     * @param sendImmediately
312     */

313    public synchronized void notifyNodeEvicted(Fqn fqn, boolean pre, boolean sendImmediately)
314    {
315       boolean originLocal = cache.getInvocationContext().isOriginLocal();
316       if (sendImmediately)
317       {
318          resetInvocationContext();
319          if (evictionPolicyListener != null)
320          {
321             evictionPolicyListener.nodeEvicted(fqn, pre, originLocal);
322          }
323          if (hasListeners)
324          {
325             for (CacheListener listener : listeners)
326             {
327                listener.nodeEvicted(fqn, pre, originLocal);
328             }
329          }
330          restoreInvocationContext();
331       }
332       else
333       {
334          MethodCall call = new MethodCall(nodeEvicted, new Object JavaDoc[]{fqn, pre, originLocal});
335          cache.getInvocationContext().addCacheListenerEvent(call);
336       }
337    }
338
339    /**
340     * Notifies all registered listeners of a nodeLoaded event.
341     *
342     * @param fqn
343     * @param pre
344     * @param data
345     * @param sendImmediately
346     */

347    public synchronized void notifyNodeLoaded(Fqn fqn, boolean pre, Map JavaDoc<Object JavaDoc, Object JavaDoc> data, boolean sendImmediately)
348    {
349       Map JavaDoc dataCopy = copy(data);
350       if (sendImmediately)
351       {
352          resetInvocationContext();
353          if (evictionPolicyListener != null)
354          {
355             evictionPolicyListener.nodeLoaded(fqn, pre, dataCopy);
356          }
357          if (hasListeners)
358          {
359             for (CacheListener listener : listeners)
360             {
361                listener.nodeLoaded(fqn, pre, dataCopy);
362             }
363          }
364          restoreInvocationContext();
365       }
366       else
367       {
368          MethodCall call = new MethodCall(nodeLoaded, new Object JavaDoc[]{fqn, pre, dataCopy});
369          cache.getInvocationContext().addCacheListenerEvent(call);
370       }
371    }
372
373    /**
374     * Notifies all registered listeners of a nodeActivated event.
375     *
376     * @param fqn
377     * @param pre
378     * @param sendImmediately
379     */

380    public synchronized void notifyNodeActivated(Fqn fqn, boolean pre, boolean sendImmediately)
381    {
382       if (sendImmediately)
383       {
384          resetInvocationContext();
385          if (evictionPolicyListener != null)
386          {
387             evictionPolicyListener.nodeActivated(fqn, pre);
388          }
389          if (hasListeners)
390          {
391             for (CacheListener listener : listeners)
392             {
393                listener.nodeActivated(fqn, pre);
394             }
395          }
396          restoreInvocationContext();
397       }
398       else
399       {
400          MethodCall call = new MethodCall(nodeActivated, new Object JavaDoc[]{fqn, pre});
401          cache.getInvocationContext().addCacheListenerEvent(call);
402       }
403    }
404
405    /**
406     * Notifies all registered listeners of a nodePassivated event.
407     *
408     * @param fqn
409     * @param pre
410     * @param sendImmediately
411     */

412    public synchronized void notifyNodePassivated(Fqn fqn, boolean pre, boolean sendImmediately)
413    {
414       if (sendImmediately)
415       {
416          resetInvocationContext();
417          if (evictionPolicyListener != null)
418          {
419             evictionPolicyListener.nodePassivated(fqn, pre);
420          }
421          if (hasListeners)
422          {
423             for (CacheListener listener : listeners)
424             {
425                listener.nodePassivated(fqn, pre);
426             }
427          }
428          restoreInvocationContext();
429       }
430       else
431       {
432          MethodCall call = new MethodCall(nodePassivated, new Object JavaDoc[]{fqn, pre});
433          cache.getInvocationContext().addCacheListenerEvent(call);
434       }
435    }
436
437    /**
438     * Notifies all registered listeners of a cacheStarted event.
439     *
440     * @param cache
441     * @param sendImmediately
442     */

443    public synchronized void notifyCacheStarted(CacheSPI cache, boolean sendImmediately)
444    {
445       if (sendImmediately)
446       {
447          resetInvocationContext();
448          if (evictionPolicyListener != null)
449          {
450             evictionPolicyListener.cacheStarted(cache);
451          }
452          if (hasListeners)
453          {
454             for (CacheListener listener : listeners)
455             {
456                listener.cacheStarted(cache);
457             }
458          }
459          restoreInvocationContext();
460       }
461       else
462       {
463          MethodCall call = new MethodCall(cacheStarted, new Object JavaDoc[]{cache});
464          cache.getInvocationContext().addCacheListenerEvent(call);
465       }
466    }
467
468    /**
469     * Notifies all registered listeners of a cacheStopped event.
470     *
471     * @param cache
472     * @param sendImmediately
473     */

474    public synchronized void notifyCacheStopped(CacheSPI cache, boolean sendImmediately)
475    {
476       if (sendImmediately)
477       {
478          resetInvocationContext();
479          if (evictionPolicyListener != null)
480          {
481             evictionPolicyListener.cacheStopped(cache);
482          }
483          if (hasListeners)
484          {
485             for (CacheListener listener : listeners)
486             {
487                listener.cacheStopped(cache);
488             }
489          }
490          restoreInvocationContext();
491       }
492       else
493       {
494          MethodCall call = new MethodCall(cacheStopped, new Object JavaDoc[]{cache});
495          cache.getInvocationContext().addCacheListenerEvent(call);
496       }
497    }
498
499    /**
500     * Notifies all registered listeners of a viewChange event.
501     *
502     * @param new_view
503     * @param sendImmediately
504     */

505    public synchronized void notifyViewChange(View new_view, boolean sendImmediately)
506    {
507       if (sendImmediately)
508       {
509          resetInvocationContext();
510          if (evictionPolicyListener != null)
511          {
512             evictionPolicyListener.viewChange(new_view);
513          }
514          if (hasListeners)
515          {
516             for (CacheListener listener : listeners)
517             {
518                listener.viewChange(new_view);
519             }
520          }
521          restoreInvocationContext();
522       }
523       else
524       {
525          MethodCall call = new MethodCall(viewChange, new Object JavaDoc[]{new_view});
526          cache.getInvocationContext().addCacheListenerEvent(call);
527       }
528    }
529
530    private Map JavaDoc copy(Map JavaDoc<Object JavaDoc, Object JavaDoc> data)
531    {
532       if (safe(data)) return data;
533       return new MapCopy<Object JavaDoc, Object JavaDoc>(data);
534    }
535
536    private void restoreInvocationContext()
537    {
538       cache.getInvocationContext().setState(tempCtx);
539    }
540
541    private void resetInvocationContext()
542    {
543       try
544       {
545          tempCtx = cache.getInvocationContext().clone();
546       }
547       catch (CloneNotSupportedException JavaDoc e)
548       {
549          log.warn("Unable to clone Invocation Context " + cache.getInvocationContext(), e);
550       }
551       cache.getInvocationContext().reset();
552    }
553
554    /**
555     * A map is deemed 'safe' to be passed as-is to a listener, if either of the following are true:
556     * <ul>
557     * <li>It is null</li>
558     * <li>It is an instance of {@link org.jboss.cache.util.MapCopy}, which is immutable</li>
559     * <li>It is an instance of {@link java.util.Collections#emptyMap()}, which is also immutable</li>
560     * </ul>
561     *
562     * @param map
563     * @return
564     */

565    private boolean safe(Map JavaDoc map)
566    {
567       return map == null || map instanceof MapCopy || map.getClass().equals(Collections.emptyMap().getClass());
568    }
569
570    /**
571     * Fires off all notifications for a given queue.
572     *
573     * @param queue queue to process.
574     */

575    public void invokeQueuedNotifications(List JavaDoc<MethodCall> queue)
576    {
577       resetInvocationContext();
578       for (MethodCall c : queue)
579       {
580          if (evictionPolicyListener != null)
581          {
582             try
583             {
584                c.invoke(evictionPolicyListener);
585             }
586             catch (Throwable JavaDoc throwable)
587             {
588                log.error("Unable to deliver queued notification " + c + " to eviction policy listener", throwable);
589             }
590          }
591          if (hasListeners)
592          {
593             for (CacheListener listener : listeners)
594             {
595                try
596                {
597                   c.invoke(listener);
598                }
599                catch (Throwable JavaDoc throwable)
600                {
601                   log.error("Unable to deliver queued notification " + c + " to listener " + listener, throwable);
602                }
603             }
604          }
605       }
606       restoreInvocationContext();
607    }
608
609    /**
610     * Fires off all notifications that have been registered within the current invocation, with sendImmediately set to false.
611     */

612    public void invokeQueuedNotifications()
613    {
614       invokeQueuedNotifications(cache.getInvocationContext().getCacheListenerEvents());
615    }
616 }
617
Popular Tags