KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ha > framework > server > DistributedReplicantManagerImpl


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.ha.framework.server;
23
24 import java.util.Set JavaDoc;
25 import java.util.Vector JavaDoc;
26 import java.util.ArrayList JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.HashSet JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Map JavaDoc;
33
34 import java.io.Serializable JavaDoc;
35
36 import javax.management.MBeanServer JavaDoc;
37 import javax.management.ObjectName JavaDoc;
38
39 import EDU.oswego.cs.dl.util.concurrent.Latch;
40 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
41
42 import org.jboss.logging.Logger;
43
44 import org.jboss.ha.framework.interfaces.ClusterNode;
45 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
46 import org.jboss.ha.framework.interfaces.HAPartition;
47
48
49 /**
50  * This class manages replicated objects.
51  *
52  * @author <a HREF="mailto:bill@burkecentral.com">Bill Burke</a>.
53  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
54  * @author Scott.stark@jboss.org
55  * @version $Revision: 58574 $
56  */

57 public class DistributedReplicantManagerImpl
58    implements DistributedReplicantManagerImplMBean,
59               HAPartition.HAMembershipExtendedListener,
60               HAPartition.HAPartitionStateTransfer,
61               AsynchEventHandler.AsynchEventProcessor
62 {
63    // Constants -----------------------------------------------------
64

65    protected final static String JavaDoc SERVICE_NAME = "DistributedReplicantManager";
66    
67    // Attributes ----------------------------------------------------
68
protected static int threadID;
69    
70    protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap();
71    protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
72    protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
73    protected HashMap JavaDoc intraviewIdCache = new HashMap JavaDoc();
74    protected HAPartition partition;
75    /** The handler used to send replicant change notifications asynchronously */
76    protected AsynchEventHandler asynchHandler;
77    
78    protected Logger log;
79    
80    protected String JavaDoc nodeName = null;
81    
82    protected Latch partitionNameKnown = new Latch ();
83    protected boolean trace;
84
85    protected Class JavaDoc[] add_types=new Class JavaDoc[]{String JavaDoc.class, String JavaDoc.class, Serializable JavaDoc.class};
86    protected Class JavaDoc[] remove_types=new Class JavaDoc[]{String JavaDoc.class, String JavaDoc.class};
87
88    // Static --------------------------------------------------------
89

90    // Constructors --------------------------------------------------
91

92    /**
93     * Creates a DistributedReplicantManager that manages replicated objects
94     * through the given partition. Injects a back reference to this object
95     * into the partition.
96     *
97     * @param partition {@link ClusterPartition} through which replicated
98     * objects will be exchanged
99     */

100    public DistributedReplicantManagerImpl(ClusterPartition partition)
101    {
102       this(partition.getHAPartition());
103       // Set a back ref to ourself
104
partition.setDistributedReplicantManager(this);
105    }
106    
107    /**
108     * Creates a DistributedReplicantManager that manages replicated objects
109     * through the given partition.
110     *
111     * @param partition {@link HAPartition} through which replicated objects
112     * will be exchanged
113     */

114    public DistributedReplicantManagerImpl(HAPartition partition)
115    {
116      this.partition = partition;
117      this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() +
118                  "." + partition.getPartitionName());
119      this.trace = log.isTraceEnabled();
120   }
121
122    // Public --------------------------------------------------------
123

124    public void create() throws Exception JavaDoc
125    {
126       log.debug("registerRPCHandler");
127       partition.registerRPCHandler(SERVICE_NAME, this);
128       log.debug("subscribeToStateTransferEvents");
129       partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
130       log.debug("registerMembershipListener");
131       partition.registerMembershipListener(this);
132    }
133    
134    public void start() throws Exception JavaDoc
135    {
136       this.nodeName = this.partition.getNodeName();
137       
138       // Create the asynch listener handler thread
139
asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
140       asynchHandler.start();
141
142       partitionNameKnown.release (); // partition name is now known!
143

144       //log.info("mergemembers");
145
//mergeMembers();
146
}
147    
148    public void stop() throws Exception JavaDoc
149    {
150       // Stop the asynch handler thread
151
try
152       {
153          asynchHandler.stop();
154       }
155       catch( Exception JavaDoc e)
156       {
157          log.warn("Failed to stop asynchHandler", e);
158       }
159       
160       // TODO reset the latch
161
}
162
163    // NR 200505 : [JBCLUSTER-38] unbind at destroy
164
public void destroy() throws Exception JavaDoc
165    {
166       // we cleanly shutdown. This should be optimized.
167
if (localReplicants != null)
168       {
169          synchronized(localReplicants)
170          {
171             String JavaDoc[] keys = new String JavaDoc[localReplicants.size()];
172             localReplicants.keySet().toArray(keys);
173             for(int n = 0; n < keys.length; n ++)
174             {
175                this.removeLocal(keys[n]); // channel is disconnected, so
176
// don't try to notify cluster
177
}
178          }
179       }
180       
181       partition.unregisterRPCHandler(SERVICE_NAME, this);
182       partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
183       partition.unregisterMembershipListener(this);
184    }
185
186    public void registerWithJmx(MBeanServer JavaDoc server) throws Exception JavaDoc
187    {
188       server.registerMBean(this, getObjectName());
189    }
190    
191    public void unregisterWithJmx(MBeanServer JavaDoc server) throws Exception JavaDoc
192    {
193       server.unregisterMBean(getObjectName());
194    }
195    
196    private ObjectName JavaDoc getObjectName() throws Exception JavaDoc
197    {
198       return new ObjectName JavaDoc("jboss:service=" + SERVICE_NAME + ",partition=" + partition.getPartitionName());
199    }
200    
201    public String JavaDoc listContent () throws Exception JavaDoc
202    {
203       // we merge all replicants services: local only or not
204
//
205
java.util.Collection JavaDoc services = this.getAllServices ();
206
207       StringBuffer JavaDoc result = new StringBuffer JavaDoc ();
208       java.util.Iterator JavaDoc catsIter = services.iterator ();
209       
210       result.append ("<pre>");
211       
212       while (catsIter.hasNext ())
213       {
214          String JavaDoc category = (String JavaDoc)catsIter.next ();
215          HashMap JavaDoc content = (HashMap JavaDoc)this.replicants.get (category);
216          if (content == null)
217             content = new HashMap JavaDoc ();
218          java.util.Iterator JavaDoc keysIter = content.keySet ().iterator ();
219                   
220          result.append ("-----------------------------------------------\n");
221          result.append ("Service : ").append (category).append ("\n\n");
222          
223          Serializable JavaDoc local = lookupLocalReplicant(category);
224          if (local == null)
225             result.append ("\t- Service is *not* available locally\n");
226          else
227             result.append ("\t- Service *is* also available locally\n");
228
229          while (keysIter.hasNext ())
230          {
231             String JavaDoc location = (String JavaDoc)keysIter.next ();
232             result.append ("\t- ").append(location).append ("\n");
233          }
234          
235          result.append ("\n");
236          
237       }
238       
239       result.append ("</pre>");
240       
241       return result.toString ();
242    }
243    
244    public String JavaDoc listXmlContent () throws Exception JavaDoc
245    {
246       // we merge all replicants services: local only or not
247
//
248
java.util.Collection JavaDoc services = this.getAllServices ();
249       StringBuffer JavaDoc result = new StringBuffer JavaDoc ();
250
251       result.append ("<ReplicantManager>\n");
252
253       java.util.Iterator JavaDoc catsIter = services.iterator ();
254       while (catsIter.hasNext ())
255       {
256          String JavaDoc category = (String JavaDoc)catsIter.next ();
257          HashMap JavaDoc content = (HashMap JavaDoc)this.replicants.get (category);
258          if (content == null)
259             content = new HashMap JavaDoc ();
260          java.util.Iterator JavaDoc keysIter = content.keySet ().iterator ();
261                   
262          result.append ("\t<Service>\n");
263          result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
264
265          
266          Serializable JavaDoc local = lookupLocalReplicant(category);
267          if (local != null)
268          {
269             result.append ("\t\t<Location>\n");
270             result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
271             result.append ("\t\t</Location>\n");
272          }
273
274          while (keysIter.hasNext ())
275          {
276             String JavaDoc location = (String JavaDoc)keysIter.next ();
277             result.append ("\t\t<Location>\n");
278             result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
279             result.append ("\t\t</Location>\n");
280          }
281          
282          result.append ("\t<Service>\n");
283          
284       }
285
286       result.append ("<ReplicantManager>\n");
287       
288       return result.toString ();
289    }
290
291    // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
292

293    public Serializable JavaDoc getCurrentState ()
294    {
295       java.util.Collection JavaDoc services = this.getAllServices ();
296       HashMap JavaDoc result = new HashMap JavaDoc ();
297       
298       java.util.Iterator JavaDoc catsIter = services.iterator ();
299       while (catsIter.hasNext ())
300       {
301          String JavaDoc category = (String JavaDoc)catsIter.next ();
302          HashMap JavaDoc content = (HashMap JavaDoc)this.replicants.get (category);
303          if (content == null)
304             content = new HashMap JavaDoc ();
305          else
306             content = (HashMap JavaDoc)content.clone ();
307          
308          Serializable JavaDoc local = lookupLocalReplicant(category);
309          if (local != null)
310             content.put (this.nodeName, local);
311          
312          result.put (category, content);
313       }
314       
315       // we add the intraviewid cache to the global result
316
//
317
Object JavaDoc[] globalResult = new Object JavaDoc[] {result, intraviewIdCache};
318       return globalResult;
319    }
320
321    public void setCurrentState(Serializable JavaDoc newState)
322    {
323       Object JavaDoc[] globalState = (Object JavaDoc[])newState;
324       
325       HashMap JavaDoc map = (HashMap JavaDoc)globalState[0];
326       this.replicants.putAll(map);
327       this.intraviewIdCache = (HashMap JavaDoc)globalState[1];
328
329       if( trace )
330       {
331          log.trace(nodeName + ": received new state, will republish local replicants");
332       }
333       MembersPublisher publisher = new MembersPublisher();
334       publisher.start();
335    }
336       
337    public Collection JavaDoc getAllServices ()
338    {
339       HashSet JavaDoc services = new HashSet JavaDoc();
340       services.addAll (localReplicants.keySet ());
341       services.addAll (replicants.keySet ());
342       return services;
343    }
344    
345    // HAPartition.HAMembershipListener implementation ----------------------------------------------
346

347    public void membershipChangedDuringMerge(Vector JavaDoc deadMembers, Vector JavaDoc newMembers, Vector JavaDoc allMembers, Vector JavaDoc originatingGroups)
348    {
349       // Here we only care about deadMembers. Purge all replicant lists of deadMembers
350
// and then notify all listening nodes.
351
//
352
log.info("Merging partitions...");
353       log.info("Dead members: " + deadMembers.size());
354       log.info("Originating groups: " + originatingGroups);
355       purgeDeadMembers(deadMembers);
356       if (newMembers.size() > 0)
357       {
358          new MergeMembers().start();
359       }
360    }
361    
362    public void membershipChanged(Vector JavaDoc deadMembers, Vector JavaDoc newMembers, Vector JavaDoc allMembers)
363    {
364       // Here we only care about deadMembers. Purge all replicant lists of deadMembers
365
// and then notify all listening nodes.
366
//
367
log.info("I am (" + nodeName + ") received membershipChanged event:");
368      log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
369      log.info("New Members : " + newMembers.size() + " (" + newMembers + ")");
370      log.info("All Members : " + allMembers.size() + " (" + allMembers + ")");
371       purgeDeadMembers(deadMembers);
372       
373       // we don't need to merge members anymore
374
}
375    
376    // AsynchEventHandler.AsynchEventProcessor implementation -----------------
377

378    public void processEvent(Object JavaDoc event)
379    {
380       KeyChangeEvent kce = (KeyChangeEvent) event;
381       notifyKeyListeners(kce.key, kce.replicants);
382    }
383    
384    static class KeyChangeEvent
385    {
386       String JavaDoc key;
387       List JavaDoc replicants;
388    }
389    
390    // DistributedReplicantManager implementation ----------------------------------------------
391

392    public void add(String JavaDoc key, Serializable JavaDoc replicant) throws Exception JavaDoc
393    {
394       if( trace )
395          log.trace("add, key="+key+", value="+replicant);
396       partitionNameKnown.acquire (); // we don't propagate until our name is known
397

398       Object JavaDoc[] args = {key, this.nodeName, replicant};
399       partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
400       synchronized(localReplicants)
401       {
402          localReplicants.put(key, replicant);
403          notifyKeyListeners(key, lookupReplicants(key));
404       }
405    }
406    
407    public void remove(String JavaDoc key) throws Exception JavaDoc
408    {
409       partitionNameKnown.acquire (); // we don't propagate until our name is known
410

411       // optimisation: we don't make a costly network call
412
// if there is nothing to remove
413
if (localReplicants.containsKey(key))
414       {
415          Object JavaDoc[] args = {key, this.nodeName};
416          partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
417          removeLocal(key);
418       }
419    }
420    
421    private void removeLocal(String JavaDoc key)
422    {
423       synchronized(localReplicants)
424       {
425          localReplicants.remove(key);
426          List JavaDoc result = lookupReplicants(key);
427          if (result == null)
428             result = new ArrayList JavaDoc (); // don't pass null but an empty list
429
notifyKeyListeners(key, result);
430       }
431    }
432    
433    public Serializable JavaDoc lookupLocalReplicant(String JavaDoc key)
434    {
435       return (Serializable JavaDoc)localReplicants.get(key);
436    }
437    
438    public List JavaDoc lookupReplicants(String JavaDoc key)
439    {
440       Serializable JavaDoc local = lookupLocalReplicant(key);
441       HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
442       if (replicant == null && local == null)
443          return null;
444
445       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
446
447       if (replicant == null)
448       {
449          if (local != null)
450             rtn.add(local);
451       }
452       else
453       {
454          // JBAS-2677. Put the replicants in view order.
455
ClusterNode[] nodes = partition.getClusterNodes();
456          String JavaDoc replNode;
457          Object JavaDoc replVal;
458          for (int i = 0; i < nodes.length; i++)
459          {
460             replNode = nodes[i].getName();
461             if (local != null && nodeName.equals(replNode))
462             {
463                rtn.add(local);
464                continue;
465             }
466             
467             replVal = replicant.get(replNode);
468             if (replVal != null)
469                rtn.add(replVal);
470          }
471       }
472       
473       return rtn;
474    }
475    
476    public List JavaDoc lookupReplicantsNodeNames(String JavaDoc key)
477    {
478       boolean locallyReplicated = localReplicants.containsKey (key);
479       HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
480       if (replicant == null && !locallyReplicated)
481          return null;
482
483       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
484       
485       if (replicant == null)
486       {
487          if (locallyReplicated)
488             rtn.add(this.nodeName);
489       }
490       else
491       {
492          // JBAS-2677. Put the replicants in view order.
493
Set JavaDoc keys = replicant.keySet();
494          ClusterNode[] nodes = partition.getClusterNodes();
495          String JavaDoc keyOwner;
496          for (int i = 0; i < nodes.length; i++)
497          {
498             keyOwner = nodes[i].getName();
499             if (locallyReplicated && nodeName.equals(keyOwner))
500             {
501                rtn.add(this.nodeName);
502                continue;
503             }
504             
505             if (keys.contains(keyOwner))
506                rtn.add(keyOwner);
507          }
508       }
509       
510       return rtn;
511    }
512    
513    public void registerListener(String JavaDoc key, DistributedReplicantManager.ReplicantListener subscriber)
514    {
515       synchronized(keyListeners)
516       {
517          ArrayList JavaDoc listeners = (ArrayList JavaDoc)keyListeners.get(key);
518          if (listeners == null)
519          {
520             listeners = new ArrayList JavaDoc();
521             keyListeners.put(key, listeners);
522          }
523          listeners.add(subscriber);
524       }
525    }
526    
527    public void unregisterListener(String JavaDoc key, DistributedReplicantManager.ReplicantListener subscriber)
528    {
529       synchronized(keyListeners)
530       {
531          ArrayList JavaDoc listeners = (ArrayList JavaDoc)keyListeners.get (key);
532          if (listeners == null) return;
533          
534          listeners.remove(subscriber);
535          if (listeners.size() == 0)
536             keyListeners.remove(key);
537
538       }
539    }
540    
541    public int getReplicantsViewId(String JavaDoc key)
542    {
543       Integer JavaDoc result = (Integer JavaDoc)this.intraviewIdCache.get (key);
544       
545       if (result == null)
546          return 0;
547       else
548          return result.intValue ();
549    }
550    
551    public boolean isMasterReplica (String JavaDoc key)
552    {
553       if( trace )
554          log.trace("isMasterReplica, key="+key);
555       // if I am not a replicat, I cannot be the master...
556
//
557
if (!localReplicants.containsKey (key))
558       {
559          if( trace )
560             log.trace("no localReplicants, key="+key+", isMasterReplica=false");
561          return false;
562       }
563
564       Vector JavaDoc allNodes = this.partition.getCurrentView ();
565       HashMap JavaDoc repForKey = (HashMap JavaDoc)replicants.get(key);
566       if (repForKey==null)
567       {
568          if( trace )
569             log.trace("no replicants, key="+key+", isMasterReplica=true");
570          return true;
571       }
572       Vector JavaDoc replicaNodes = new Vector JavaDoc ((repForKey).keySet ());
573       boolean isMasterReplica = false;
574       for (int i=0; i<allNodes.size (); i++)
575       {
576          String JavaDoc aMember = (String JavaDoc)allNodes.elementAt (i);
577          if( trace )
578             log.trace("Testing member: "+aMember);
579          if (replicaNodes.contains (aMember))
580          {
581             if( trace )
582                log.trace("Member found in replicaNodes, isMasterReplica=false");
583             break;
584          }
585          else if (aMember.equals (this.nodeName))
586          {
587             if( trace )
588                log.trace("Member == nodeName, isMasterReplica=true");
589             isMasterReplica = true;
590             break;
591          }
592       }
593       return isMasterReplica;
594    }
595
596    // DistributedReplicantManager cluster callbacks ----------------------------------------------
597

598    /**
599     * Cluster callback called when a new replicant is added on another node
600     * @param key Replicant key
601     * @param nodeName Node that add the current replicant
602     * @param replicant Serialized representation of the replicant
603     */

604    public void _add(String JavaDoc key, String JavaDoc nodeName, Serializable JavaDoc replicant)
605    {
606       if( trace )
607          log.trace("_add(" + key + ", " + nodeName);
608       
609       try
610       {
611          addReplicant(key, nodeName, replicant);
612          // Notify listeners asynchronously
613
KeyChangeEvent kce = new KeyChangeEvent();
614          kce.key = key;
615          kce.replicants = lookupReplicants(key);
616          asynchHandler.queueEvent(kce);
617       }
618       catch (Exception JavaDoc ex)
619       {
620          log.error("_add failed", ex);
621       }
622    }
623    
624    /**
625     * Cluster callback called when a replicant is removed by another node
626     * @param key Name of the replicant key
627     * @param nodeName Node that wants to remove its replicant for the give key
628     */

629    public void _remove(String JavaDoc key, String JavaDoc nodeName)
630    {
631       try
632       {
633          if (removeReplicant (key, nodeName)) {
634             // Notify listeners asynchronously
635
KeyChangeEvent kce = new KeyChangeEvent();
636             kce.key = key;
637             kce.replicants = lookupReplicants(key);
638             asynchHandler.queueEvent(kce);
639          }
640       }
641       catch (Exception JavaDoc ex)
642       {
643          log.error("_remove failed", ex);
644       }
645    }
646    
647    protected boolean removeReplicant (String JavaDoc key, String JavaDoc nodeName) throws Exception JavaDoc
648    {
649       synchronized(replicants)
650       {
651          HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
652          if (replicant == null) return false;
653          Object JavaDoc removed = replicant.remove(nodeName);
654          if (removed != null)
655          {
656             Collection JavaDoc values = replicant.values();
657             if (values.size() == 0)
658             {
659                replicants.remove(key);
660             }
661             return true;
662          }
663       }
664       return false;
665    }
666    
667    /**
668     * Cluster callback called when a node wants to know our complete list of local replicants
669     * @throws Exception Thrown if a cluster communication exception occurs
670     * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
671     */

672    public Object JavaDoc[] lookupLocalReplicants() throws Exception JavaDoc
673    {
674       partitionNameKnown.acquire (); // we don't answer until our name is known
675

676       Object JavaDoc[] rtn = {this.nodeName, localReplicants};
677       if( trace )
678          log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
679       return rtn;
680    }
681    
682    // Package protected ---------------------------------------------
683

684    // Protected -----------------------------------------------------
685

686    protected int calculateReplicantsHash (List JavaDoc members)
687    {
688       int result = 0;
689       Object JavaDoc obj = null;
690       
691       for (int i=0; i<members.size (); i++)
692       {
693          obj = members.get (i);
694          if (obj != null)
695             result+= obj.hashCode (); // no explicit overflow with int addition
696
}
697       
698       return result;
699    }
700    
701    protected int updateReplicantsHashId (String JavaDoc key)
702    {
703       // we first get a list of all nodes names that replicate this key
704
//
705
List JavaDoc nodes = this.lookupReplicantsNodeNames (key);
706       int result = 0;
707       
708       if ( (nodes == null) || (nodes.size () == 0) )
709       {
710          // no nore replicants for this key: we uncache our view id
711
//
712
this.intraviewIdCache.remove (key);
713       }
714       else
715       {
716          result = this.calculateReplicantsHash (nodes);
717          this.intraviewIdCache.put (key, new Integer JavaDoc (result));
718       }
719       
720       return result;
721       
722    }
723    
724    ///////////////
725
// DistributedReplicantManager API
726
///////////////
727

728    /**
729     * Add a replicant to the replicants map.
730     * @param key replicant key name
731     * @param nodeName name of the node that adds this replicant
732     * @param replicant Serialized representation of the replica
733     */

734    protected void addReplicant(String JavaDoc key, String JavaDoc nodeName, Serializable JavaDoc replicant)
735    {
736       addReplicant(replicants, key, nodeName, replicant);
737    }
738    
739    /**
740     * Logic for adding replicant to any map.
741     * @param map structure in which adding the new replicant
742     * @param key name of the replicant key
743     * @param nodeName name of the node adding the replicant
744     * @param replicant serialized representation of the replicant that is added
745     */

746    protected void addReplicant(Map JavaDoc map, String JavaDoc key, String JavaDoc nodeName, Serializable JavaDoc replicant)
747    {
748       synchronized(map)
749       {
750          HashMap JavaDoc rep = (HashMap JavaDoc)map.get(key);
751          if (rep == null)
752          {
753             if( trace )
754                log.trace("_adding new HashMap");
755             rep = new HashMap JavaDoc();
756             map.put(key, rep);
757          }
758          rep.put(nodeName, replicant);
759       }
760    }
761    
762    protected Vector JavaDoc getKeysReplicatedByNode (String JavaDoc nodeName)
763    {
764       Vector JavaDoc result = new Vector JavaDoc ();
765       synchronized (replicants)
766       {
767          Iterator JavaDoc keysIter = replicants.keySet ().iterator ();
768          while (keysIter.hasNext ())
769          {
770             String JavaDoc key = (String JavaDoc)keysIter.next ();
771             HashMap JavaDoc values = (HashMap JavaDoc)replicants.get (key);
772