KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > invalidation > bridges > JGCacheInvalidationBridge


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.cache.invalidation.bridges;
23
24 import java.io.Serializable JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.Vector JavaDoc;
27 import java.util.Collection JavaDoc;
28 import javax.management.MBeanServerInvocationHandler JavaDoc;
29 import javax.management.ObjectName JavaDoc;
30
31 import org.jboss.cache.invalidation.BatchInvalidation;
32 import org.jboss.cache.invalidation.InvalidationManager;
33 import org.jboss.cache.invalidation.InvalidationGroup;
34 import org.jboss.cache.invalidation.InvalidationManagerMBean;
35 import org.jboss.cache.invalidation.BridgeInvalidationSubscription;
36 import org.jboss.cache.invalidation.InvalidationBridgeListener;
37 import org.jboss.ha.framework.interfaces.HAPartition;
38 import org.jboss.ha.framework.interfaces.DistributedState;
39 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
40 import org.jboss.ha.framework.server.ClusterPartitionMBean;
41 import org.jboss.system.server.ServerConfigUtil;
42
43 /**
44  * JGroups implementation of a cache invalidation bridge
45  *
46  * @see JGCacheInvalidationBridgeMBean
47  *
48  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
49  * @version $Revision: 58617 $
50  *
51  * <p><b>Revisions:</b>
52  *
53  * <p><b>24 septembre 2002 Sacha Labourey:</b>
54  * <ul>
55  * <li> First implementation </li>
56  * </ul>
57  */

58
59 public class JGCacheInvalidationBridge
60    extends org.jboss.system.ServiceMBeanSupport
61    implements JGCacheInvalidationBridgeMBean,
62       DistributedState.DSListenerEx,
63       InvalidationBridgeListener,
64       DistributedReplicantManager.ReplicantListener
65 {
66    
67    // Constants -----------------------------------------------------
68

69    // Attributes ----------------------------------------------------
70

71    protected String JavaDoc partitionName = ServerConfigUtil.getDefaultPartitionName();
72    /**
73     * The ClusterPartition with which we are associated.
74     */

75    protected ClusterPartitionMBean clusterPartition;
76    protected String JavaDoc invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
77    protected String JavaDoc bridgeName = "DefaultJGCacheIB";
78    
79    protected HAPartition partition = null;
80    protected DistributedState ds = null;
81    protected DistributedReplicantManager drm = null;
82    protected String JavaDoc RPC_HANLE_NAME = null;
83    protected String JavaDoc nodeName = null;
84    
85    protected InvalidationManagerMBean invalMgr = null;
86    protected BridgeInvalidationSubscription invalidationSubscription = null;
87    protected Collection JavaDoc localGroups = null;
88    protected Vector JavaDoc bridgedGroups = new Vector JavaDoc ();
89    
90
91    protected final Class JavaDoc[] rpc_invalidate_types=new Class JavaDoc[]{String JavaDoc.class, Serializable JavaDoc.class};
92    protected final Class JavaDoc[] rpc_invalidates_types=new Class JavaDoc[]{String JavaDoc.class, Serializable JavaDoc[].class};
93    protected final Class JavaDoc[] rpc_invalidate_all_types=new Class JavaDoc[]{String JavaDoc.class};
94    protected final Class JavaDoc[] rpc_batch_invalidate_types=new Class JavaDoc[]{BatchInvalidation[].class};
95
96
97    // Static --------------------------------------------------------
98

99    // Constructors --------------------------------------------------
100

101    public JGCacheInvalidationBridge ()
102    {
103    }
104       
105    // Public --------------------------------------------------------
106

107    // JGCacheInvalidationBridgeMBean implementation ----------------------------------------------
108

109    public String JavaDoc getInvalidationManager ()
110    {
111       return this.invalidationManagerName;
112    }
113
114    public ClusterPartitionMBean getClusterPartition()
115    {
116       return clusterPartition;
117    }
118
119    public void setClusterPartition(ClusterPartitionMBean clusterPartition)
120    {
121       this.clusterPartition = clusterPartition;
122    }
123    
124    public String JavaDoc getPartitionName ()
125    {
126       return this.partitionName;
127    }
128    
129    public void setInvalidationManager (String JavaDoc objectName)
130    {
131       this.invalidationManagerName = objectName;
132    }
133    
134    public void setPartitionName (String JavaDoc partitionName)
135    {
136       this.partitionName = partitionName;
137    }
138    
139    public String JavaDoc getBridgeName ()
140    {
141       return this.bridgeName;
142    }
143    
144    public void setBridgeName (String JavaDoc name)
145    {
146       this.bridgeName = name;
147    }
148    
149    // DistributedReplicantManager.ReplicantListener implementation ---------------------------
150

151    /**
152     * @todo examine thread safety. synchronized keyword was added to method
153     * signature when internal behavior of DistributedReplicantManagerImpl was
154     * changed so that multiple threads could concurrently send replicantsChanged
155     * notifications. Need to examine in detail how this method interacts with
156     * DistributedState to see if we can remove/narrow the synchronization.
157     */

158    public synchronized void replicantsChanged (String JavaDoc key, java.util.List JavaDoc newReplicants, int newReplicantsViewId)
159    {
160       if (key.equals (this.RPC_HANLE_NAME) && this.drm.isMasterReplica (this.RPC_HANLE_NAME))
161       {
162          log.debug ("The list of replicant for the JG bridge has changed, computing and updating local info...");
163          
164          // we remove any entry from the DS whose node is dead
165
//
166
java.util.Collection JavaDoc coll = this.ds.getAllKeys (this.RPC_HANLE_NAME);
167          if (coll == null)
168          {
169             log.debug ("... No bridge info was associated to this node");
170             return;
171          }
172          
173          // to avoid ConcurrentModificationException, we copy the list of keys in a new structure
174
//
175
ArrayList JavaDoc collCopy = new java.util.ArrayList JavaDoc (coll);
176          java.util.List JavaDoc newReplicantsNodeNames = this.drm.lookupReplicantsNodeNames (this.RPC_HANLE_NAME);
177          
178
179          for (int i = 0; i < collCopy.size(); i++)
180          {
181             String JavaDoc nodeEntry = (String JavaDoc)collCopy.get(i);
182             if (!newReplicantsNodeNames.contains (nodeEntry))
183             {
184                // the list of bridged topic contains a dead member: we remove it
185
//
186
try
187                {
188                   log.debug ("removing bridge information associated to this node from the DS");
189                   this.ds.remove (this.RPC_HANLE_NAME, nodeEntry, true);
190                }
191                catch (Exception JavaDoc e)
192                {
193                   log.info ("Unable to remove a node entry from the distributed cache", e);
194                }
195             }
196          }
197       }
198    }
199       
200    // DistributedState.DSListener implementation ----------------------------------------------
201

202     public void valueHasChanged (String JavaDoc category, Serializable JavaDoc key, Serializable JavaDoc value, boolean locallyModified)
203     {
204        this.updatedBridgedInvalidationGroupsInfo ();
205     }
206     
207     public void keyHasBeenRemoved (String JavaDoc category, Serializable JavaDoc key, Serializable JavaDoc previousContent, boolean locallyModified)
208     {
209        this.updatedBridgedInvalidationGroupsInfo ();
210     }
211
212    // InvalidationBridgeListener implementation ----------------------------------------------
213

214    public void batchInvalidate (BatchInvalidation[] invalidations, boolean asynchronous)
215    {
216       if (invalidations == null) return;
217       
218       // we need to sort which group other nodes accept or refuse and propagate through the net
219
//
220
ArrayList JavaDoc acceptedGroups = new ArrayList JavaDoc();
221       
222       for (int i=0; i<invalidations.length; i++)
223       {
224          BatchInvalidation currBI = invalidations[i];
225          if (groupExistsRemotely (currBI.getInvalidationGroupName ()))
226             acceptedGroups.add (currBI);
227       }
228       
229       if (acceptedGroups.size () > 0)
230       {
231          BatchInvalidation[] result = new BatchInvalidation[acceptedGroups.size ()];
232          result = (BatchInvalidation[])acceptedGroups.toArray (result);
233          
234          if (log.isTraceEnabled ())
235             log.trace ("Transmitting batch invalidation: " + result);
236          this._do_rpc_batchInvalidate (result, asynchronous);
237       }
238    }
239    
240    public void invalidate (String JavaDoc invalidationGroupName, Serializable JavaDoc[] keys, boolean asynchronous)
241    {
242       // if the group exists on another node, we simply propagate to other nodes
243
//
244
if (log.isTraceEnabled ())
245          log.trace ("Transmitting invalidations for group: " + invalidationGroupName);
246       
247       if (groupExistsRemotely (invalidationGroupName))
248          _do_rpc_invalidates (invalidationGroupName, keys, asynchronous);
249    }
250    
251    public void invalidate (String JavaDoc invalidationGroupName, Serializable JavaDoc key, boolean asynchronous)
252    {
253       // if the group exists on another node, we simply propagate to other nodes
254
//
255
if (log.isTraceEnabled ())
256          log.trace ("Transmitting invalidation for group: " + invalidationGroupName);
257
258       if (groupExistsRemotely (invalidationGroupName))
259          _do_rpc_invalidate (invalidationGroupName, key, asynchronous);
260    }
261
262    public void invalidateAll(String JavaDoc groupName, boolean async)
263    {
264       if (log.isTraceEnabled ())
265          log.trace ("Transmitting for all entries for invalidation for group: " + groupName);
266       if (groupExistsRemotely (groupName))
267          _do_rpc_invalidate_all (groupName, async);
268    }
269
270    public void newGroupCreated (String JavaDoc groupInvalidationName)
271    {
272       try
273       {
274          this.publishLocalInvalidationGroups ();
275          //this.updatedBridgedInvalidationGroupsInfo ();
276
}
277       catch (Exception JavaDoc e)
278       {
279          log.info ("Problem while registering a new invalidation group over the cluster", e);
280       }
281    }
282    
283    public void groupIsDropped (String JavaDoc groupInvalidationName)
284    {
285       try
286       {
287          this.publishLocalInvalidationGroups ();
288          //this.updatedBridgedInvalidationGroupsInfo ();
289
}
290       catch (Exception JavaDoc e)
291       {
292          log.info ("Problem while un-registering a new invalidation group over the cluster", e);
293       }
294    }
295    
296    // ServiceMBeanSupport overrides ---------------------------------------------------
297

298    public void startService () throws Exception JavaDoc
299    {
300       RPC_HANLE_NAME = "DCacheBridge-" + this.bridgeName;
301       
302       // Support old-style partition lookup for configs that don't
303
// inject the partition.
304
// TODO remove this after a while; deprecated in 4.0.4
305
if (this.clusterPartition == null)
306       {
307          javax.naming.Context JavaDoc ctx = new javax.naming.InitialContext JavaDoc ();
308          this.partition = (HAPartition)ctx.lookup("/HAPartition/" + this.partitionName);
309       }
310       else
311       {
312          this.partition = this.clusterPartition.getHAPartition();
313          this.partitionName = this.partition.getPartitionName();
314       }
315          
316       this.ds = this.partition.getDistributedStateService ();
317       this.drm = this.partition.getDistributedReplicantManager ();
318       this.nodeName = this.partition.getNodeName();
319       
320       this.drm.add (this.RPC_HANLE_NAME, "");
321       this.drm.registerListener (this.RPC_HANLE_NAME, this);
322       this.ds.registerDSListenerEx (RPC_HANLE_NAME, this);
323       this.partition.registerRPCHandler(RPC_HANLE_NAME, this);
324       
325       // we now publish the list of caches we have access to
326
//
327
this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean)
328          org.jboss.system.Registry.lookup (this.invalidationManagerName);
329       if( invalMgr == null )
330          throw new IllegalStateException JavaDoc("Failed to find: "+invalidationManagerName+", check dependency");
331
332       publishLocalInvalidationGroups ();
333       this.updatedBridgedInvalidationGroupsInfo ();
334       
335       this.invalidationSubscription = invalMgr.registerBridgeListener (this);
336       
337    }
338    
339    public void stopService ()
340    {
341       try
342       {
343          this.partition.unregisterRPCHandler (this.RPC_HANLE_NAME, this);
344          this.ds.unregisterDSListenerEx (this.RPC_HANLE_NAME, this);
345          this.drm.unregisterListener (this.RPC_HANLE_NAME, this);
346          this.drm.remove (this.RPC_HANLE_NAME);
347          
348          this.invalidationSubscription.unregister ();
349                   
350          this.ds.remove (this.RPC_HANLE_NAME, this.nodeName, true);
351          
352          this.invalMgr = null;
353          this.partition = null;
354          this.drm = null;
355          this.ds = null;
356          this.invalidationSubscription = null;
357          this.RPC_HANLE_NAME = null;
358          this.nodeName = null;
359          this.localGroups = null;
360          this.bridgedGroups = new Vector JavaDoc ();
361       }
362       catch (Exception JavaDoc e)
363       {
364          log.info ("Problem while shuting down invalidation cache bridge", e);
365       }
366    }
367    
368    // RPC calls ---------------------------------------------
369

370    public void _rpc_invalidate (String JavaDoc invalidationGroupName, Serializable JavaDoc key)
371    {
372       if (log.isTraceEnabled ())
373          log.trace ("Received remote invalidation for group: " + invalidationGroupName);
374
375       this.invalidationSubscription.invalidate (invalidationGroupName, key);
376    }
377    
378    public void _rpc_invalidates (String JavaDoc invalidationGroupName, Serializable JavaDoc[] keys)
379    {
380       if (log.isTraceEnabled ())
381          log.trace ("Received remote invalidations for group: " + invalidationGroupName);
382
383       this.invalidationSubscription.invalidate (invalidationGroupName, keys);
384    }
385
386    public void _rpc_invalidate_all (String JavaDoc invalidationGroupName)
387    {
388       if (log.isTraceEnabled ())
389          log.trace ("Received remote invalidate_all for group: " + invalidationGroupName);
390
391       this.invalidationSubscription.invalidateAll (invalidationGroupName);
392    }
393
394    public void _rpc_batchInvalidate (BatchInvalidation[] invalidations)
395    {
396       if (log.isTraceEnabled () && invalidations != null)
397          log.trace ("Received remote batch invalidation for this number of groups: " + invalidations.length);
398
399       this.invalidationSubscription.batchInvalidate (invalidations);
400    }
401
402    protected void _do_rpc_invalidate (String JavaDoc invalidationGroupName, Serializable JavaDoc key, boolean asynch)
403    {
404       Object JavaDoc[] params = new Object JavaDoc[] {invalidationGroupName, key};
405       try
406       {
407          if (asynch)
408             this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
409                                                       "_rpc_invalidate",
410                                                       params, rpc_invalidate_types, true);
411          else
412             this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
413                                                 "_rpc_invalidate",
414                                                 params, rpc_invalidate_types, true);
415       }
416       catch (Exception JavaDoc e)
417       {
418          log.debug ("Distributed invalidation (1) has failed for group " +
419                     invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
420       }
421    }
422    
423    protected void _do_rpc_invalidates (String JavaDoc invalidationGroupName, Serializable JavaDoc[] keys, boolean asynch)
424    {
425       Object JavaDoc[] params = new Object JavaDoc[] {invalidationGroupName, keys};
426       try
427       {
428          if (asynch)
429             this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
430                                                       "_rpc_invalidates", params, rpc_invalidates_types, true);
431          else
432             this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
433                                                 "_rpc_invalidates", params, rpc_invalidates_types, true);
434       }
435       catch (Exception JavaDoc e)
436       {
437          log.debug ("Distributed invalidation (2) has failed for group " +
438                     invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
439       }
440    }
441
442    protected void _do_rpc_invalidate_all (String JavaDoc invalidationGroupName, boolean asynch)
443    {
444       Object JavaDoc[] params = new Object JavaDoc[] {invalidationGroupName};
445       try
446       {
447          if (asynch)
448             this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
449                                                       "_rpc_invalidate_all", params, rpc_invalidate_all_types, true);
450          else
451             this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
452                                                 "_rpc_invalidate_all", params, rpc_invalidate_all_types, true);
453       }
454       catch (Exception JavaDoc e)
455       {
456          log.debug ("Distributed invalidation (2) has failed for group " +
457                     invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
458       }
459    }
460    
461    protected void _do_rpc_batchInvalidate (BatchInvalidation[] invalidations, boolean asynch)
462    {
463       Object JavaDoc[] params = new Object JavaDoc[] {invalidations};
464       try
465       {
466          if (asynch)
467             this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
468                                                       "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true);
469          else
470             this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
471                                                 "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true);
472       }
473       catch (Exception JavaDoc e)
474       {
475          log.debug ("Distributed invalidation (3) has failed (Bridge: " + this.bridgeName + ")");
476       }
477    }
478
479    
480    // Package protected ---------------------------------------------
481

482    // Protected -----------------------------------------------------
483

484    protected synchronized void publishLocalInvalidationGroups () throws Exception JavaDoc
485    {
486       this.localGroups = invalMgr.getInvalidationGroups ();
487       
488       log.debug ("Publishing locally available invalidation groups: " + this.localGroups);
489
490       ArrayList JavaDoc content = new ArrayList JavaDoc (this.localGroups);
491       ArrayList JavaDoc result = new ArrayList JavaDoc (content.size ());
492       
493
494       for (int i = 0; i < content.size(); i++)
495       {
496          String JavaDoc aGroup = ((InvalidationGroup)content.get(i)).getGroupName ();
497          result.add (aGroup);
498       }
499       
500       if (result.size () > 0)
501       {
502          NodeInfo info = new NodeInfo (result, this.nodeName);
503          this.ds.set (this.RPC_HANLE_NAME, this.nodeName, info, true);
504       }
505       else
506          this.ds.remove (this.RPC_HANLE_NAME, this.nodeName, true);
507    }
508    
509    protected void updatedBridgedInvalidationGroupsInfo ()
510    {
511       Collection JavaDoc bridgedByNode = this.ds.getAllValues (this.RPC_HANLE_NAME);
512       
513       log.debug ("Updating list of invalidation groups that are bridged...");
514       
515       if (bridgedByNode != null)
516       {
517          // Make a copy
518
//
519
ArrayList JavaDoc copy = new ArrayList JavaDoc (bridgedByNode);
520
521          Vector JavaDoc result = new Vector JavaDoc ();
522          
523
524          for (int i = 0; i < copy.size(); i++)
525          {
526             NodeInfo infoForNode = (NodeInfo)copy.get(i);
527             log.trace ("InfoForNode: " + infoForNode);
528             
529             if (infoForNode != null && !infoForNode.groupName.equals (this.nodeName))
530             {
531                ArrayList JavaDoc groupsForNode = infoForNode.groups;
532                log.trace ("Groups for node: " + groupsForNode);
533                
534
535                for (int j = 0; j < groupsForNode.size(); j++)
536                {
537                   String JavaDoc aGroup = (String JavaDoc)groupsForNode.get(j);
538                   if (!result.contains (aGroup))
539                   {
540                      log.trace ("Adding: " + aGroup);
541                      result.add (aGroup);
542                   }
543                }
544                
545             }
546             
547          }
548          // atomic assignation of the result
549
//
550
this.bridgedGroups = result;
551          
552          log.debug ("... computed list of bridged groups: " + result);
553       }
554       else
555       {
556          log.debug ("... nothing needs to be bridged.");
557       }
558          
559    }
560    
561    protected boolean groupExistsRemotely (String JavaDoc groupName)
562    {
563       return this.bridgedGroups.contains (groupName);
564    }
565    
566    // Private -------------------------------------------------------
567

568    // Inner classes -------------------------------------------------
569

570 }
571
572 class NodeInfo implements java.io.Serializable JavaDoc
573 {
574    static final long serialVersionUID = -3215712955134929006L;
575    
576    public ArrayList JavaDoc groups = null;
577    public String JavaDoc groupName = null;
578    
579    public NodeInfo (){}
580    
581    public NodeInfo (ArrayList JavaDoc groups, String JavaDoc groupName)
582    {
583       this.groups = groups;
584       this.groupName = groupName;
585    }
586    
587 }
588
Popular Tags