KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ha > framework > server > DistributedReplicantManagerImpl


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.ha.framework.server;
23
24 import java.util.Set JavaDoc;
25 import java.util.Vector JavaDoc;
26 import java.util.ArrayList JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.HashSet JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Map JavaDoc;
33
34 import java.io.Serializable JavaDoc;
35
36 import javax.management.MBeanServer JavaDoc;
37 import javax.management.ObjectName JavaDoc;
38
39 import EDU.oswego.cs.dl.util.concurrent.Latch;
40 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
41
42 import org.jboss.logging.Logger;
43
44 import org.jboss.ha.framework.interfaces.ClusterNode;
45 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
46 import org.jboss.ha.framework.interfaces.HAPartition;
47
48
49 /**
50  * This class manages replicated objects.
51  *
52  * @author <a HREF="mailto:bill@burkecentral.com">Bill Burke</a>.
53  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
54  * @author Scott.stark@jboss.org
55  * @version $Revision: 58574 $
56  */

57 public class DistributedReplicantManagerImpl
58    implements DistributedReplicantManagerImplMBean,
59               HAPartition.HAMembershipExtendedListener,
60               HAPartition.HAPartitionStateTransfer,
61               AsynchEventHandler.AsynchEventProcessor
62 {
63    // Constants -----------------------------------------------------
64

65    protected final static String JavaDoc SERVICE_NAME = "DistributedReplicantManager";
66    
67    // Attributes ----------------------------------------------------
68
protected static int threadID;
69    
70    protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap();
71    protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
72    protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
73    protected HashMap JavaDoc intraviewIdCache = new HashMap JavaDoc();
74    protected HAPartition partition;
75    /** The handler used to send replicant change notifications asynchronously */
76    protected AsynchEventHandler asynchHandler;
77    
78    protected Logger log;
79    
80    protected String JavaDoc nodeName = null;
81    
82    protected Latch partitionNameKnown = new Latch ();
83    protected boolean trace;
84
85    protected Class JavaDoc[] add_types=new Class JavaDoc[]{String JavaDoc.class, String JavaDoc.class, Serializable JavaDoc.class};
86    protected Class JavaDoc[] remove_types=new Class JavaDoc[]{String JavaDoc.class, String JavaDoc.class};
87
88    // Static --------------------------------------------------------
89

90    // Constructors --------------------------------------------------
91

92    /**
93     * Creates a DistributedReplicantManager that manages replicated objects
94     * through the given partition. Injects a back reference to this object
95     * into the partition.
96     *
97     * @param partition {@link ClusterPartition} through which replicated
98     * objects will be exchanged
99     */

100    public DistributedReplicantManagerImpl(ClusterPartition partition)
101    {
102       this(partition.getHAPartition());
103       // Set a back ref to ourself
104
partition.setDistributedReplicantManager(this);
105    }
106    
107    /**
108     * Creates a DistributedReplicantManager that manages replicated objects
109     * through the given partition.
110     *
111     * @param partition {@link HAPartition} through which replicated objects
112     * will be exchanged
113     */

114    public DistributedReplicantManagerImpl(HAPartition partition)
115    {
116      this.partition = partition;
117      this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() +
118                  "." + partition.getPartitionName());
119      this.trace = log.isTraceEnabled();
120   }
121
122    // Public --------------------------------------------------------
123

124    public void create() throws Exception JavaDoc
125    {
126       log.debug("registerRPCHandler");
127       partition.registerRPCHandler(SERVICE_NAME, this);
128       log.debug("subscribeToStateTransferEvents");
129       partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
130       log.debug("registerMembershipListener");
131       partition.registerMembershipListener(this);
132    }
133    
134    public void start() throws Exception JavaDoc
135    {
136       this.nodeName = this.partition.getNodeName();
137       
138       // Create the asynch listener handler thread
139
asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
140       asynchHandler.start();
141
142       partitionNameKnown.release (); // partition name is now known!
143

144       //log.info("mergemembers");
145
//mergeMembers();
146
}
147    
148    public void stop() throws Exception JavaDoc
149    {
150       // Stop the asynch handler thread
151
try
152       {
153          asynchHandler.stop();
154       }
155       catch( Exception JavaDoc e)
156       {
157          log.warn("Failed to stop asynchHandler", e);
158       }
159       
160       // TODO reset the latch
161
}
162
163    // NR 200505 : [JBCLUSTER-38] unbind at destroy
164
public void destroy() throws Exception JavaDoc
165    {
166       // we cleanly shutdown. This should be optimized.
167
if (localReplicants != null)
168       {
169          synchronized(localReplicants)
170          {
171             String JavaDoc[] keys = new String JavaDoc[localReplicants.size()];
172             localReplicants.keySet().toArray(keys);
173             for(int n = 0; n < keys.length; n ++)
174             {
175                this.removeLocal(keys[n]); // channel is disconnected, so
176
// don't try to notify cluster
177
}
178          }
179       }
180       
181       partition.unregisterRPCHandler(SERVICE_NAME, this);
182       partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
183       partition.unregisterMembershipListener(this);
184    }
185
186    public void registerWithJmx(MBeanServer JavaDoc server) throws Exception JavaDoc
187    {
188       server.registerMBean(this, getObjectName());
189    }
190    
191    public void unregisterWithJmx(MBeanServer JavaDoc server) throws Exception JavaDoc
192    {
193       server.unregisterMBean(getObjectName());
194    }
195    
196    private ObjectName JavaDoc getObjectName() throws Exception JavaDoc
197    {
198       return new ObjectName JavaDoc("jboss:service=" + SERVICE_NAME + ",partition=" + partition.getPartitionName());
199    }
200    
201    public String JavaDoc listContent () throws Exception JavaDoc
202    {
203       // we merge all replicants services: local only or not
204
//
205
java.util.Collection JavaDoc services = this.getAllServices ();
206
207       StringBuffer JavaDoc result = new StringBuffer JavaDoc ();
208       java.util.Iterator JavaDoc catsIter = services.iterator ();
209       
210       result.append ("<pre>");
211       
212       while (catsIter.hasNext ())
213       {
214          String JavaDoc category = (String JavaDoc)catsIter.next ();
215          HashMap JavaDoc content = (HashMap JavaDoc)this.replicants.get (category);
216          if (content == null)
217             content = new HashMap JavaDoc ();
218          java.util.Iterator JavaDoc keysIter = content.keySet ().iterator ();
219                   
220          result.append ("-----------------------------------------------\n");
221          result.append ("Service : ").append (category).append ("\n\n");
222          
223          Serializable JavaDoc local = lookupLocalReplicant(category);
224          if (local == null)
225             result.append ("\t- Service is *not* available locally\n");
226          else
227             result.append ("\t- Service *is* also available locally\n");
228
229          while (keysIter.hasNext ())
230          {
231             String JavaDoc location = (String JavaDoc)keysIter.next ();
232             result.append ("\t- ").append(location).append ("\n");
233          }
234          
235          result.append ("\n");
236          
237       }
238       
239       result.append ("</pre>");
240       
241       return result.toString ();
242    }
243    
244    public String JavaDoc listXmlContent () throws Exception JavaDoc
245    {
246       // we merge all replicants services: local only or not
247
//
248
java.util.Collection JavaDoc services = this.getAllServices ();
249       StringBuffer JavaDoc result = new StringBuffer JavaDoc ();
250
251       result.append ("<ReplicantManager>\n");
252
253       java.util.Iterator JavaDoc catsIter = services.iterator ();
254       while (catsIter.hasNext ())
255       {
256          String JavaDoc category = (String JavaDoc)catsIter.next ();
257          HashMap JavaDoc content = (HashMap JavaDoc)this.replicants.get (category);
258          if (content == null)
259             content = new HashMap JavaDoc ();
260          java.util.Iterator JavaDoc keysIter = content.keySet ().iterator ();
261                   
262          result.append ("\t<Service>\n");
263          result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
264
265          
266          Serializable JavaDoc local = lookupLocalReplicant(category);
267          if (local != null)
268          {
269             result.append ("\t\t<Location>\n");
270             result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
271             result.append ("\t\t</Location>\n");
272          }
273
274          while (keysIter.hasNext ())
275          {
276             String JavaDoc location = (String JavaDoc)keysIter.next ();
277             result.append ("\t\t<Location>\n");
278             result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
279             result.append ("\t\t</Location>\n");
280          }
281          
282          result.append ("\t<Service>\n");
283          
284       }
285
286       result.append ("<ReplicantManager>\n");
287       
288       return result.toString ();
289    }
290
291    // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
292

293    public Serializable JavaDoc getCurrentState ()
294    {
295       java.util.Collection JavaDoc services = this.getAllServices ();
296       HashMap JavaDoc result = new HashMap JavaDoc ();
297       
298       java.util.Iterator JavaDoc catsIter = services.iterator ();
299       while (catsIter.hasNext ())
300       {
301          String JavaDoc category = (String JavaDoc)catsIter.next ();
302          HashMap JavaDoc content = (HashMap JavaDoc)this.replicants.get (category);
303          if (content == null)
304             content = new HashMap JavaDoc ();
305          else
306             content = (HashMap JavaDoc)content.clone ();
307          
308          Serializable JavaDoc local = lookupLocalReplicant(category);
309          if (local != null)
310             content.put (this.nodeName, local);
311          
312          result.put (category, content);
313       }
314       
315       // we add the intraviewid cache to the global result
316
//
317
Object JavaDoc[] globalResult = new Object JavaDoc[] {result, intraviewIdCache};
318       return globalResult;
319    }
320
321    public void setCurrentState(Serializable JavaDoc newState)
322    {
323       Object JavaDoc[] globalState = (Object JavaDoc[])newState;
324       
325       HashMap JavaDoc map = (HashMap JavaDoc)globalState[0];
326       this.replicants.putAll(map);
327       this.intraviewIdCache = (HashMap JavaDoc)globalState[1];
328
329       if( trace )
330       {
331          log.trace(nodeName + ": received new state, will republish local replicants");
332       }
333       MembersPublisher publisher = new MembersPublisher();
334       publisher.start();
335    }
336       
337    public Collection JavaDoc getAllServices ()
338    {
339       HashSet JavaDoc services = new HashSet JavaDoc();
340       services.addAll (localReplicants.keySet ());
341       services.addAll (replicants.keySet ());
342       return services;
343    }
344    
345    // HAPartition.HAMembershipListener implementation ----------------------------------------------
346

347    public void membershipChangedDuringMerge(Vector JavaDoc deadMembers, Vector JavaDoc newMembers, Vector JavaDoc allMembers, Vector JavaDoc originatingGroups)
348    {
349       // Here we only care about deadMembers. Purge all replicant lists of deadMembers
350
// and then notify all listening nodes.
351
//
352
log.info("Merging partitions...");
353       log.info("Dead members: " + deadMembers.size());
354       log.info("Originating groups: " + originatingGroups);
355       purgeDeadMembers(deadMembers);
356       if (newMembers.size() > 0)
357       {
358          new MergeMembers().start();
359       }
360    }
361    
362    public void membershipChanged(Vector JavaDoc deadMembers, Vector JavaDoc newMembers, Vector JavaDoc allMembers)
363    {
364       // Here we only care about deadMembers. Purge all replicant lists of deadMembers
365
// and then notify all listening nodes.
366
//
367
log.info("I am (" + nodeName + ") received membershipChanged event:");
368      log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
369      log.info("New Members : " + newMembers.size() + " (" + newMembers + ")");
370      log.info("All Members : " + allMembers.size() + " (" + allMembers + ")");
371       purgeDeadMembers(deadMembers);
372       
373       // we don't need to merge members anymore
374
}
375    
376    // AsynchEventHandler.AsynchEventProcessor implementation -----------------
377

378    public void processEvent(Object JavaDoc event)
379    {
380       KeyChangeEvent kce = (KeyChangeEvent) event;
381       notifyKeyListeners(kce.key, kce.replicants);
382    }
383    
384    static class KeyChangeEvent
385    {
386       String JavaDoc key;
387       List JavaDoc replicants;
388    }
389    
390    // DistributedReplicantManager implementation ----------------------------------------------
391

392    public void add(String JavaDoc key, Serializable JavaDoc replicant) throws Exception JavaDoc
393    {
394       if( trace )
395          log.trace("add, key="+key+", value="+replicant);
396       partitionNameKnown.acquire (); // we don't propagate until our name is known
397

398       Object JavaDoc[] args = {key, this.nodeName, replicant};
399       partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
400       synchronized(localReplicants)
401       {
402          localReplicants.put(key, replicant);
403          notifyKeyListeners(key, lookupReplicants(key));
404       }
405    }
406    
407    public void remove(String JavaDoc key) throws Exception JavaDoc
408    {
409       partitionNameKnown.acquire (); // we don't propagate until our name is known
410

411       // optimisation: we don't make a costly network call
412
// if there is nothing to remove
413
if (localReplicants.containsKey(key))
414       {
415          Object JavaDoc[] args = {key, this.nodeName};
416          partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
417          removeLocal(key);
418       }
419    }
420    
421    private void removeLocal(String JavaDoc key)
422    {
423       synchronized(localReplicants)
424       {
425          localReplicants.remove(key);
426          List JavaDoc result = lookupReplicants(key);
427          if (result == null)
428             result = new ArrayList JavaDoc (); // don't pass null but an empty list
429
notifyKeyListeners(key, result);
430       }
431    }
432    
433    public Serializable JavaDoc lookupLocalReplicant(String JavaDoc key)
434    {
435       return (Serializable JavaDoc)localReplicants.get(key);
436    }
437    
438    public List JavaDoc lookupReplicants(String JavaDoc key)
439    {
440       Serializable JavaDoc local = lookupLocalReplicant(key);
441       HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
442       if (replicant == null && local == null)
443          return null;
444
445       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
446
447       if (replicant == null)
448       {
449          if (local != null)
450             rtn.add(local);
451       }
452       else
453       {
454          // JBAS-2677. Put the replicants in view order.
455
ClusterNode[] nodes = partition.getClusterNodes();
456          String JavaDoc replNode;
457          Object JavaDoc replVal;
458          for (int i = 0; i < nodes.length; i++)
459          {
460             replNode = nodes[i].getName();
461             if (local != null && nodeName.equals(replNode))
462             {
463                rtn.add(local);
464                continue;
465             }
466             
467             replVal = replicant.get(replNode);
468             if (replVal != null)
469                rtn.add(replVal);
470          }
471       }
472       
473       return rtn;
474    }
475    
476    public List JavaDoc lookupReplicantsNodeNames(String JavaDoc key)
477    {
478       boolean locallyReplicated = localReplicants.containsKey (key);
479       HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
480       if (replicant == null && !locallyReplicated)
481          return null;
482
483       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
484       
485       if (replicant == null)
486       {
487          if (locallyReplicated)
488             rtn.add(this.nodeName);
489       }
490       else
491       {
492          // JBAS-2677. Put the replicants in view order.
493
Set JavaDoc keys = replicant.keySet();
494          ClusterNode[] nodes = partition.getClusterNodes();
495          String JavaDoc keyOwner;
496          for (int i = 0; i < nodes.length; i++)
497          {
498             keyOwner = nodes[i].getName();
499             if (locallyReplicated && nodeName.equals(keyOwner))
500             {
501                rtn.add(this.nodeName);
502                continue;
503             }
504             
505             if (keys.contains(keyOwner))
506                rtn.add(keyOwner);
507          }
508       }
509       
510       return rtn;
511    }
512    
513    public void registerListener(String JavaDoc key, DistributedReplicantManager.ReplicantListener subscriber)
514    {
515       synchronized(keyListeners)
516       {
517          ArrayList JavaDoc listeners = (ArrayList JavaDoc)keyListeners.get(key);
518          if (listeners == null)
519          {
520             listeners = new ArrayList JavaDoc();
521             keyListeners.put(key, listeners);
522          }
523          listeners.add(subscriber);
524       }
525    }
526    
527    public void unregisterListener(String JavaDoc key, DistributedReplicantManager.ReplicantListener subscriber)
528    {
529       synchronized(keyListeners)
530       {
531          ArrayList JavaDoc listeners = (ArrayList JavaDoc)keyListeners.get (key);
532          if (listeners == null) return;
533          
534          listeners.remove(subscriber);
535          if (listeners.size() == 0)
536             keyListeners.remove(key);
537
538       }
539    }
540    
541    public int getReplicantsViewId(String JavaDoc key)
542    {
543       Integer JavaDoc result = (Integer JavaDoc)this.intraviewIdCache.get (key);
544       
545       if (result == null)
546          return 0;
547       else
548          return result.intValue ();
549    }
550    
551    public boolean isMasterReplica (String JavaDoc key)
552    {
553       if( trace )
554          log.trace("isMasterReplica, key="+key);
555       // if I am not a replicat, I cannot be the master...
556
//
557
if (!localReplicants.containsKey (key))
558       {
559          if( trace )
560             log.trace("no localReplicants, key="+key+", isMasterReplica=false");
561          return false;
562       }
563
564       Vector JavaDoc allNodes = this.partition.getCurrentView ();
565       HashMap JavaDoc repForKey = (HashMap JavaDoc)replicants.get(key);
566       if (repForKey==null)
567       {
568          if( trace )
569             log.trace("no replicants, key="+key+", isMasterReplica=true");
570          return true;
571       }
572       Vector JavaDoc replicaNodes = new Vector JavaDoc ((repForKey).keySet ());
573       boolean isMasterReplica = false;
574       for (int i=0; i<allNodes.size (); i++)
575       {
576          String JavaDoc aMember = (String JavaDoc)allNodes.elementAt (i);
577          if( trace )
578             log.trace("Testing member: "+aMember);
579          if (replicaNodes.contains (aMember))
580          {
581             if( trace )
582                log.trace("Member found in replicaNodes, isMasterReplica=false");
583             break;
584          }
585          else if (aMember.equals (this.nodeName))
586          {
587             if( trace )
588                log.trace("Member == nodeName, isMasterReplica=true");
589             isMasterReplica = true;
590             break;
591          }
592       }
593       return isMasterReplica;
594    }
595
596    // DistributedReplicantManager cluster callbacks ----------------------------------------------
597

598    /**
599     * Cluster callback called when a new replicant is added on another node
600     * @param key Replicant key
601     * @param nodeName Node that add the current replicant
602     * @param replicant Serialized representation of the replicant
603     */

604    public void _add(String JavaDoc key, String JavaDoc nodeName, Serializable JavaDoc replicant)
605    {
606       if( trace )
607          log.trace("_add(" + key + ", " + nodeName);
608       
609       try
610       {
611          addReplicant(key, nodeName, replicant);
612          // Notify listeners asynchronously
613
KeyChangeEvent kce = new KeyChangeEvent();
614          kce.key = key;
615          kce.replicants = lookupReplicants(key);
616          asynchHandler.queueEvent(kce);
617       }
618       catch (Exception JavaDoc ex)
619       {
620          log.error("_add failed", ex);
621       }
622    }
623    
624    /**
625     * Cluster callback called when a replicant is removed by another node
626     * @param key Name of the replicant key
627     * @param nodeName Node that wants to remove its replicant for the give key
628     */

629    public void _remove(String JavaDoc key, String JavaDoc nodeName)
630    {
631       try
632       {
633          if (removeReplicant (key, nodeName)) {
634             // Notify listeners asynchronously
635
KeyChangeEvent kce = new KeyChangeEvent();
636             kce.key = key;
637             kce.replicants = lookupReplicants(key);
638             asynchHandler.queueEvent(kce);
639          }
640       }
641       catch (Exception JavaDoc ex)
642       {
643          log.error("_remove failed", ex);
644       }
645    }
646    
647    protected boolean removeReplicant (String JavaDoc key, String JavaDoc nodeName) throws Exception JavaDoc
648    {
649       synchronized(replicants)
650       {
651          HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
652          if (replicant == null) return false;
653          Object JavaDoc removed = replicant.remove(nodeName);
654          if (removed != null)
655          {
656             Collection JavaDoc values = replicant.values();
657             if (values.size() == 0)
658             {
659                replicants.remove(key);
660             }
661             return true;
662          }
663       }
664       return false;
665    }
666    
667    /**
668     * Cluster callback called when a node wants to know our complete list of local replicants
669     * @throws Exception Thrown if a cluster communication exception occurs
670     * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
671     */

672    public Object JavaDoc[] lookupLocalReplicants() throws Exception JavaDoc
673    {
674       partitionNameKnown.acquire (); // we don't answer until our name is known
675

676       Object JavaDoc[] rtn = {this.nodeName, localReplicants};
677       if( trace )
678          log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
679       return rtn;
680    }
681    
682    // Package protected ---------------------------------------------
683

684    // Protected -----------------------------------------------------
685

686    protected int calculateReplicantsHash (List JavaDoc members)
687    {
688       int result = 0;
689       Object JavaDoc obj = null;
690       
691       for (int i=0; i<members.size (); i++)
692       {
693          obj = members.get (i);
694          if (obj != null)
695             result+= obj.hashCode (); // no explicit overflow with int addition
696
}
697       
698       return result;
699    }
700    
701    protected int updateReplicantsHashId (String JavaDoc key)
702    {
703       // we first get a list of all nodes names that replicate this key
704
//
705
List JavaDoc nodes = this.lookupReplicantsNodeNames (key);
706       int result = 0;
707       
708       if ( (nodes == null) || (nodes.size () == 0) )
709       {
710          // no nore replicants for this key: we uncache our view id
711
//
712
this.intraviewIdCache.remove (key);
713       }
714       else
715       {
716          result = this.calculateReplicantsHash (nodes);
717          this.intraviewIdCache.put (key, new Integer JavaDoc (result));
718       }
719       
720       return result;
721       
722    }
723    
724    ///////////////
725
// DistributedReplicantManager API
726
///////////////
727

728    /**
729     * Add a replicant to the replicants map.
730     * @param key replicant key name
731     * @param nodeName name of the node that adds this replicant
732     * @param replicant Serialized representation of the replica
733     */

734    protected void addReplicant(String JavaDoc key, String JavaDoc nodeName, Serializable JavaDoc replicant)
735    {
736       addReplicant(replicants, key, nodeName, replicant);
737    }
738    
739    /**
740     * Logic for adding replicant to any map.
741     * @param map structure in which adding the new replicant
742     * @param key name of the replicant key
743     * @param nodeName name of the node adding the replicant
744     * @param replicant serialized representation of the replicant that is added
745     */

746    protected void addReplicant(Map JavaDoc map, String JavaDoc key, String JavaDoc nodeName, Serializable JavaDoc replicant)
747    {
748       synchronized(map)
749       {
750          HashMap JavaDoc rep = (HashMap JavaDoc)map.get(key);
751          if (rep == null)
752          {
753             if( trace )
754                log.trace("_adding new HashMap");
755             rep = new HashMap JavaDoc();
756             map.put(key, rep);
757          }
758          rep.put(nodeName, replicant);
759       }
760    }
761    
762    protected Vector JavaDoc getKeysReplicatedByNode (String JavaDoc nodeName)
763    {
764       Vector JavaDoc result = new Vector JavaDoc ();
765       synchronized (replicants)
766       {
767          Iterator JavaDoc keysIter = replicants.keySet ().iterator ();
768          while (keysIter.hasNext ())
769          {
770             String JavaDoc key = (String JavaDoc)keysIter.next ();
771             HashMap JavaDoc values = (HashMap JavaDoc)replicants.get (key);
772             if ( (values != null) && values.containsKey (nodeName) )
773             {
774                result.add (key);
775             }
776          }
777       }
778       return result;
779    }
780    
781    /**
782     * Indicates if the a replicant already exists for a given key/node pair
783     * @param key replicant key name
784     * @param nodeName name of the node
785     * @return a boolean indicating if a replicant for the given node exists for the given key
786     */

787    protected boolean replicantEntryAlreadyExists (String JavaDoc key, String JavaDoc nodeName)
788    {
789       return replicantEntryAlreadyExists (replicants, key, nodeName);
790    }
791    
792    /**
793     * Indicates if the a replicant already exists for a given key/node pair in the give data structure
794     */

795    protected boolean replicantEntryAlreadyExists (Map JavaDoc map, String JavaDoc key, String JavaDoc nodeName)
796    {
797          HashMap JavaDoc rep = (HashMap JavaDoc)map.get(key);
798          if (rep == null)
799             return false;
800          else
801             return rep.containsKey (nodeName);
802    }
803    
804    /**
805     * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
806     * @param key The replicant key name
807     * @param newReplicants The new list of replicants
808     *
809     */

810    protected void notifyKeyListeners(String JavaDoc key, List JavaDoc newReplicants)
811    {
812       if( trace )
813          log.trace("notifyKeyListeners");
814
815       // we first update the intra-view id for this particular key
816
//
817
int newId = updateReplicantsHashId (key);
818       
819       ArrayList JavaDoc listeners = (ArrayList JavaDoc)keyListeners.get(key);
820       if (listeners == null)
821       {
822          if( trace )
823             log.trace("listeners is null");
824          return;
825       }
826       
827       // ArrayList's iterator is not thread safe
828
DistributedReplicantManager.ReplicantListener[] toNotify = null;
829       synchronized(listeners)
830       {
831          toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
832          toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
833       }
834       
835       if( trace )
836          log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
837       for (int i = 0; i < toNotify.length; i++)
838       {
839          if (toNotify[i] != null)
840             toNotify[i].replicantsChanged(key, newReplicants, newId);
841       }
842    }
843
844    protected void republishLocalReplicants()
845    {
846       try
847       {
848          if( trace )
849             log.trace("Start Re-Publish local replicants in DRM");
850
851          HashMap JavaDoc localReplicants;
852          synchronized (this.localReplicants)
853          {
854             localReplicants = new HashMap JavaDoc(this.localReplicants);
855          }
856
857          Iterator JavaDoc entries = localReplicants.entrySet().iterator();
858          while( entries.hasNext() )
859          {
860             Map.Entry JavaDoc entry = (Map.Entry JavaDoc) entries.next();
861             String JavaDoc key = (String JavaDoc) entry.getKey();
862             Object JavaDoc replicant = entry.getValue();
863             if (replicant != null)
864             {
865                if( trace )
866                   log.trace("publishing, key=" + key + ", value=" + replicant);
867
868                Object JavaDoc[] args = {key, this.nodeName, replicant};
869
870                partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
871                notifyKeyListeners(key, lookupReplicants(key));
872             }
873          }
874          if( trace )
875             log.trace("End Re-Publish local replicants");
876       }
877       catch (Exception JavaDoc e)
878       {
879          log.error("Re-Publish failed", e);
880       }
881    }
882
883    ////////////////////
884
// Group membership API
885
////////////////////
886

887    protected void mergeMembers()
888    {
889       try
890       {
891          log.debug("Start merging members in DRM service...");
892          java.util.HashSet JavaDoc notifies = new java.util.HashSet JavaDoc ();
893          ArrayList JavaDoc rsp = partition.callMethodOnCluster(SERVICE_NAME,
894                                         "lookupLocalReplicants",
895                                         new Object JavaDoc[]{}, new Class JavaDoc[]{}, true);
896          if (rsp.size() == 0)
897             log.debug("No responses from other nodes during the DRM merge process.");
898          else
899          {
900             log.debug("The DRM merge process has received " + rsp.size() + " answers");
901          }
902          for (int i = 0; i < rsp.size(); i++)
903          {
904             Object JavaDoc o = rsp.get(i);
905             if (o == null)
906             {
907                log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
908                continue;
909             }
910             else if (o instanceof Throwable JavaDoc)
911             {
912                log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable JavaDoc) o);
913                continue;
914             }
915             
916             Object JavaDoc[] objs = (Object JavaDoc[]) o;
917             String JavaDoc node = (String JavaDoc)objs[0];
918             Map JavaDoc replicants = (Map JavaDoc)objs[1];
919             Iterator JavaDoc keys = replicants.keySet().iterator();
920             
921             //FIXME: We don't remove keys in the merge process but only add new keys!
922
while (keys.hasNext())
923             {
924                String JavaDoc key = (String JavaDoc)keys.next();
925                // done to reduce duplicate notifications
926
if (!replicantEntryAlreadyExists (key, node))
927                {
928                   addReplicant(key, node, (Serializable JavaDoc)replicants.get(key));
929                   notifies.add (key);
930                }
931             }
932             
933             Vector JavaDoc currentStatus = getKeysReplicatedByNode (node);
934             if (currentStatus.size () > replicants.size ())
935             {
936                // The merge process needs to remove some (now)
937
// unexisting keys
938
//
939
for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
940                {
941                   String JavaDoc theKey = (String JavaDoc)currentStatus.elementAt (currentKeysId);
942                   if (!replicants.containsKey (theKey))
943                   {
944                      removeReplicant (theKey, node);
945                      notifies.add(theKey);
946                   }
947                }
948             }
949          }
950          
951          Iterator JavaDoc notifIter = notifies.iterator ();
952          while (notifIter.hasNext ())
953          {
954             String JavaDoc key = (String JavaDoc)notifIter.next ();
955             notifyKeyListeners(key, lookupReplicants(key));
956          }
957          log.debug ("..Finished merging members in DRM service");
958
959       }
960       catch (Exception JavaDoc ex)
961       {
962          log.error("merge failed", ex);
963       }
964    }
965
966    /**
967     * get rid of dead members from replicant list
968     * return true if anything was purged.
969     */

970    protected void purgeDeadMembers(Vector JavaDoc deadMembers)
971    {
972       if (deadMembers.size() <= 0)
973          return;
974
975       log.debug("purgeDeadMembers, "+deadMembers);
976       try
977       {
978          synchronized(replicants)
979          {
980             Iterator JavaDoc keys = replicants.keySet().iterator();
981             while (keys.hasNext())
982             {
983                String JavaDoc key = (String JavaDoc)keys.next();
984                HashMap JavaDoc replicant = (HashMap JavaDoc)replicants.get(key);
985                boolean modified = false;
986                for (int i = 0; i < deadMembers.size(); i++)
987                {
988                   String JavaDoc node = deadMembers.elementAt(i).toString();
989                   log.debug("trying to remove deadMember " + node + " for key " + key);
990                   Object JavaDoc removed = replicant.remove(node);
991                   if (removed != null)
992                   {
993                      log.debug(node + " was removed");
994                      modified = true;
995                   }
996                   else
997                   {
998                      log.debug(node + " was NOT removed!!!");
999                   }
1000               }
1001               if (modified)
1002               {
1003                  notifyKeyListeners(key, lookupReplicants(key));
1004               }
1005            }
1006         }
1007      }
1008      catch (Exception JavaDoc ex)
1009      {
1010         log.error("purgeDeadMembers failed", ex);
1011      }
1012   }
1013
1014   /**
1015    */

1016   protected void cleanupKeyListeners()
1017   {
1018      // NOT IMPLEMENTED YET
1019
}
1020
1021   protected synchronized static int nextThreadID()
1022   {
1023      return threadID ++;
1024   }
1025
1026   // Private -------------------------------------------------------
1027

1028   // Inner classes -------------------------------------------------
1029

1030   protected class MergeMembers extends Thread JavaDoc
1031   {
1032      public MergeMembers()
1033      {
1034         super("DRM Async Merger#"+nextThreadID());
1035      }
1036
1037      /**
1038       * Called when the service needs to merge with another partition. This
1039       * process is performed asynchronously
1040       */

1041      public void run()
1042      {
1043         log.debug("Sleeping for 50ms before mergeMembers");
1044         try
1045         {
1046            // if this thread invokes a cluster method call before
1047
// membershipChanged event completes, it could timeout/hang
1048
// we need to discuss this with Bela.
1049
Thread.sleep(50);
1050         }
1051         catch (Exception JavaDoc ignored)
1052         {
1053         }
1054         mergeMembers();
1055      }
1056   }
1057
1058   protected class MembersPublisher extends Thread JavaDoc
1059   {
1060      public MembersPublisher()
1061      {
1062         super("DRM Async Publisher#"+nextThreadID());
1063      }
1064
1065      /**
1066       * Called when service needs to re-publish its local replicants to other
1067       * cluster members after this node has joined the cluster.
1068       */

1069      public void run()
1070      {
1071         log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
1072         try
1073         {
1074            // if this thread invokes a cluster method call before
1075
// membershipChanged event completes, it could timeout/hang
1076
// we need to discuss this with Bela.
1077
Thread.sleep(50);
1078         }
1079         catch (Exception JavaDoc ignored)
1080         {
1081         }
1082         republishLocalReplicants();
1083      }
1084   }
1085}
1086
Popular Tags