KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > buddyreplication > BuddyManager


1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.buddyreplication;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.jboss.cache.AbstractCacheListener;
12 import org.jboss.cache.CacheException;
13 import org.jboss.cache.CacheImpl;
14 import org.jboss.cache.Fqn;
15 import org.jboss.cache.Region;
16 import org.jboss.cache.config.BuddyReplicationConfig;
17 import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
18 import org.jboss.cache.lock.TimeoutException;
19 import org.jboss.cache.marshall.MethodCall;
20 import org.jboss.cache.marshall.MethodCallFactory;
21 import org.jboss.cache.marshall.MethodDeclarations;
22 import org.jboss.cache.marshall.VersionAwareMarshaller;
23 import org.jboss.cache.statetransfer.StateTransferManager;
24 import org.jboss.util.stream.MarshalledValueInputStream;
25 import org.jgroups.Address;
26 import org.jgroups.View;
27
28 import java.io.ByteArrayInputStream JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Arrays JavaDoc;
31 import java.util.Collection JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.HashSet JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.List JavaDoc;
36 import java.util.Map JavaDoc;
37 import java.util.Set JavaDoc;
38 import java.util.Vector JavaDoc;
39 import java.util.concurrent.BlockingQueue JavaDoc;
40 import java.util.concurrent.ConcurrentHashMap JavaDoc;
41 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
42 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
43
44 /**
45  * Class that manages buddy replication groups.
46  *
47  * @author <a HREF="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
48  */

49 public class BuddyManager
50 {
51    private static Log log = LogFactory.getLog(BuddyManager.class);
52
53    /**
54     * Configuration object.
55     */

56    final BuddyReplicationConfig config;
57
58    /**
59     * Buddy locator class
60     */

61    BuddyLocator buddyLocator;
62
63    /**
64     * back-refernce to the CacheImpl object
65     */

66    private CacheImpl cache;
67
68    /**
69     * The buddy group set up for this instance
70     */

71    BuddyGroup buddyGroup;
72
73    /**
74     * Map of buddy pools received from broadcasts
75     */

76    Map JavaDoc<Address, String JavaDoc> buddyPool = new ConcurrentHashMap JavaDoc<Address, String JavaDoc>();
77
78    /**
79     * The nullBuddyPool is a set of addresses that have not specified buddy pools.
80     */

81    final Set JavaDoc<Address> nullBuddyPool = new HashSet JavaDoc<Address>();
82
83    /**
84     * Map of bddy groups the current instance participates in as a backup node.
85     * Keyed on String group name, values are BuddyGroup objects.
86     * Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
87     */

88    Map JavaDoc<String JavaDoc, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap JavaDoc<String JavaDoc, BuddyGroup>();
89
90    /**
91     * Queue to deal with queued up view change requests - which are handled asynchronously
92     */

93    private final BlockingQueue JavaDoc<MembershipChange> queue = new LinkedBlockingQueue JavaDoc<MembershipChange>();
94
95    /**
96     * Async thread that handles items on the view change queue
97     */

98    private AsyncViewChangeHandlerThread asyncViewChangeHandler = new AsyncViewChangeHandlerThread();
99    private static AtomicInteger JavaDoc threadId = new AtomicInteger JavaDoc(0);
100
101    /**
102     * Constants representng the buddy backup subtree
103     */

104    public static final String JavaDoc BUDDY_BACKUP_SUBTREE = "_BUDDY_BACKUP_";
105    public static final Fqn BUDDY_BACKUP_SUBTREE_FQN = Fqn.fromString(BUDDY_BACKUP_SUBTREE);
106
107    /**
108     * number of times to retry communicating with a selected buddy if the buddy has not been initialised.
109     */

110    private static int UNINIT_BUDDIES_RETRIES = 5;
111    /**
112     * wait time between retries
113     */

114    private static final long[] UNINIT_BUDDIES_RETRY_NAPTIME = {500, 1000, 1500, 2000, 2500};
115
116    /**
117     * Lock to synchronise on to ensure buddy pool info is received before buddies are assigned to groups.
118     */

119    private final Object JavaDoc poolInfoNotifierLock = new Object JavaDoc();
120
121
122    /**
123     * Flag to prevent us receiving and processing remote calls before we've started
124     */

125    private boolean initialised = false;
126    // private Latch initLatch = new Latch();
127

128    public BuddyManager(BuddyReplicationConfig config)
129    {
130       this.config = config;
131
132       BuddyLocatorConfig blc = config.getBuddyLocatorConfig();
133       try
134       {
135          // it's OK if the buddy locator config is null.
136
buddyLocator = (blc == null) ? createDefaultBuddyLocator() : createBuddyLocator(blc);
137       }
138       catch (Exception JavaDoc e)
139       {
140          log.warn("Caught exception instantiating buddy locator", e);
141          log.error("Unable to instantiate specified buddyLocatorClass [" + blc + "]. Using default buddyLocator [" + NextMemberBuddyLocator.class.getName() + "] instead, with default properties.");
142          buddyLocator = createDefaultBuddyLocator();
143       }
144
145       // Update the overall config with the BuddyLocatorConfig actually used
146
if (blc != buddyLocator.getConfig())
147       {
148          config.setBuddyLocatorConfig(buddyLocator.getConfig());
149       }
150    }
151
152    public BuddyReplicationConfig getConfig()
153    {
154       return config;
155    }
156
157    protected BuddyLocator createBuddyLocator(BuddyLocatorConfig config) throws ClassNotFoundException JavaDoc, IllegalAccessException JavaDoc, InstantiationException JavaDoc
158    {
159       BuddyLocator bl = (BuddyLocator) Class.forName(config.getBuddyLocatorClass()).newInstance();
160       bl.init(config);
161       return bl;
162    }
163
164    protected BuddyLocator createDefaultBuddyLocator()
165    {
166       BuddyLocator bl = new NextMemberBuddyLocator();
167       bl.init(null);
168       return bl;
169    }
170
171    public boolean isEnabled()
172    {
173       return config.isEnabled();
174    }
175
176    public String JavaDoc getBuddyPoolName()
177    {
178       return config.getBuddyPoolName();
179    }
180
181    public static String JavaDoc getGroupNameFromAddress(Object JavaDoc address)
182    {
183       String JavaDoc s = address.toString();
184       return s.replace(':', '_');
185    }
186
187    public void init(CacheImpl cache) throws Exception JavaDoc
188    {
189       log.debug("Starting buddy manager");
190       this.cache = cache;
191       buddyGroup = new BuddyGroup();
192       buddyGroup.setDataOwner(cache.getLocalAddress());
193       buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
194
195       if (config.getBuddyPoolName() != null)
196       {
197          buddyPool.put(buddyGroup.getDataOwner(), config.getBuddyPoolName());
198       }
199
200       broadcastBuddyPoolMembership();
201
202       // allow waiting threads to process.
203
initialised = true;
204
205       // register a CacheImpl Listener to reassign buddies as and when view changes occur
206
cache.getNotifier().addCacheListener(new AbstractCacheListener()
207       {
208          private Vector JavaDoc<Address> oldMembers;
209
210          public void viewChange(View newView)
211          {
212             Vector JavaDoc<Address> newMembers = newView.getMembers();
213
214             // the whole 'oldMembers' concept is only used for buddy pool announcements.
215
if (config.getBuddyPoolName() == null)
216             {
217                enqueueViewChange(null, newMembers);
218             }
219             else
220             {
221                enqueueViewChange(oldMembers == null ? null : new Vector JavaDoc<Address>(oldMembers), new Vector JavaDoc<Address>(newMembers));
222                if (oldMembers == null) oldMembers = new Vector JavaDoc<Address>();
223                oldMembers.clear();
224                oldMembers.addAll(newMembers);
225             }
226          }
227       });
228
229       // assign buddies based on what we know now
230
reassignBuddies(cache.getMembers());
231       asyncViewChangeHandler.start();
232    }
233
234    public boolean isAutoDataGravitation()
235    {
236       return config.isAutoDataGravitation();
237    }
238
239    public boolean isDataGravitationRemoveOnFind()
240    {
241       return config.isDataGravitationRemoveOnFind();
242    }
243
244    public boolean isDataGravitationSearchBackupTrees()
245    {
246       return config.isDataGravitationSearchBackupTrees();
247    }
248
249    public int getBuddyCommunicationTimeout()
250    {
251       return config.getBuddyCommunicationTimeout();
252    }
253
254    // -------------- methods to be called by the tree cache listener --------------------
255

256    static class MembershipChange
257    {
258       List JavaDoc<Address> oldMembers;
259       List JavaDoc<Address> newMembers;
260
261       public MembershipChange(List JavaDoc<Address> oldMembers, List JavaDoc<Address> newMembers)
262       {
263          this.oldMembers = oldMembers;
264          this.newMembers = newMembers;
265       }
266    }
267
268    private void enqueueViewChange(List JavaDoc<Address> oldMembers, List JavaDoc<Address> newMembers)
269    {
270       // put this on a queue
271
try
272       {
273          queue.put(new MembershipChange(oldMembers, newMembers));
274       }
275       catch (InterruptedException JavaDoc e)
276       {
277          log.warn("Caught interrupted exception trying to enqueue a view change event", e);
278       }
279    }
280
281    /**
282     * Called by the TreeCacheListener when a
283     * view change is detected. Used to find new buddies if
284     * existing buddies have died or if new members to the cluster
285     * have been added. Makes use of the BuddyLocator and then
286     * makes RPC calls to remote nodes to assign/remove buddies.
287     */

288    private void reassignBuddies(List JavaDoc<Address> membership) throws Exception JavaDoc
289    {
290       if (log.isDebugEnabled())
291       {
292          log.debug("Data owner address " + cache.getLocalAddress());
293          log.debug("Entering updateGroup. Current group: " + buddyGroup + ". Current View membership: " + membership);
294       }
295       // some of my buddies have died!
296
List JavaDoc<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
297       List JavaDoc<Address> uninitialisedBuddies = new ArrayList JavaDoc<Address>();
298       for (Address newBuddy : newBuddies)
299       {
300          if (!buddyGroup.buddies.contains(newBuddy))
301          {
302             uninitialisedBuddies.add(newBuddy);
303          }
304       }
305
306       List JavaDoc<Address> obsoleteBuddies = new ArrayList JavaDoc<Address>();
307       // find obsolete buddies
308
for (Address origBuddy : buddyGroup.buddies)
309       {
310          if (!newBuddies.contains(origBuddy))
311          {
312             obsoleteBuddies.add(origBuddy);
313          }
314       }
315
316       // Update buddy list
317
if (!obsoleteBuddies.isEmpty())
318       {
319          removeFromGroup(obsoleteBuddies);
320       }
321       else
322       {
323          log.trace("No obsolete buddies found, nothing to announce.");
324       }
325       if (!uninitialisedBuddies.isEmpty())
326       {
327          addBuddies(uninitialisedBuddies);
328       }
329       else
330       {
331          log.trace("No uninitialized buddies found, nothing to announce.");
332       }
333
334       log.info("New buddy group: " + buddyGroup);
335    }
336
337    // -------------- methods to be called by the tree cache --------------------
338

339    /**
340     * Called by CacheImpl._remoteAnnounceBuddyPoolName(Address address, String buddyPoolName)
341     * when a view change occurs and caches need to inform the cluster of which buddy pool it is in.
342     */

343    public void handlePoolNameBroadcast(Address address, String JavaDoc poolName)
344    {
345       if (log.isDebugEnabled())
346       {
347          log.debug(buddyGroup.getDataOwner() + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
348       }
349       if (poolName != null)
350       {
351          buddyPool.put(address, poolName);
352       }
353       else
354       {
355          synchronized (nullBuddyPool)
356          {
357             if (!nullBuddyPool.contains(address)) nullBuddyPool.add(address);
358          }
359       }
360
361       // notify any waiting view change threads that buddy pool info has been received.
362
synchronized (poolInfoNotifierLock)
363       {
364          log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
365          poolInfoNotifierLock.notifyAll();
366       }
367    }
368
369    /**
370     * Called by CacheImpl._remoteRemoveFromBuddyGroup(String groupName)
371     * when a method call for this is received from a remote cache.
372     */

373    public void handleRemoveFromBuddyGroup(String JavaDoc groupName) throws BuddyNotInitException
374    {
375       if (!initialised) throw new BuddyNotInitException("Not yet initialised");
376
377       if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName);
378       buddyGroupsIParticipateIn.remove(groupName);
379
380       // remove backup data for this group
381
if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
382       try
383       {
384          cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
385       }
386       catch (CacheException e)
387       {
388          log.error("Unable to remove backup data for group " + groupName, e);
389       }
390    }
391
392    /**
393     * Called by CacheImpl._remoteAssignToBuddyGroup(BuddyGroup g) when a method
394     * call for this is received from a remote cache.
395     *
396     * @param newGroup the buddy group
397     * @param state Map<Fqn, byte[]> of any state from the DataOwner. Cannot
398     * be <code>null</code>.
399     */

400    public void handleAssignToBuddyGroup(BuddyGroup newGroup, Map JavaDoc<Fqn, byte[]> state) throws Exception JavaDoc
401    {
402       if (!initialised) throw new BuddyNotInitException("Not yet initialised");
403
404       if (log.isInfoEnabled()) log.info("Assigning self to buddy group " + newGroup);
405       buddyGroupsIParticipateIn.put(newGroup.getGroupName(), newGroup);
406
407       // Integrate state transfer from the data owner of the buddy group
408
Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
409               newGroup.getGroupName());
410       VersionAwareMarshaller marshaller = null;
411       if (cache.getConfiguration().isUseRegionBasedMarshalling())
412       {
413          marshaller = cache.getMarshaller();
414       }
415
416       StateTransferManager stateMgr = cache.getStateTransferManager();
417
418       for (Iterator JavaDoc it = state.entrySet().iterator(); it.hasNext();)
419       {
420          Map.Entry JavaDoc entry = (Map.Entry JavaDoc) it.next();
421          Fqn fqn = (Fqn) entry.getKey();
422          String JavaDoc fqnS = fqn.toString();
423          if (marshaller == null || !marshaller.isInactive(fqn.toString()))
424          {
425             ClassLoader JavaDoc cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS);
426             Fqn integrationRoot = new Fqn(integrationBase, fqn);
427
428             byte[] stateBuffer = (byte[]) entry.getValue();
429             MarshalledValueInputStream in = null;
430             try
431             {
432                ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(stateBuffer);
433                in = new MarshalledValueInputStream(bais);
434                stateMgr.setState(in, integrationRoot, cl);
435             }
436             catch (Throwable JavaDoc t)
437             {
438                log.error("State for fqn " + fqn + " could not be transferred to a buddy at " + cache.getLocalAddress());
439             }
440             finally
441             {
442                if (in != null)
443                {
444                   in.close();
445                }
446             }
447          }
448       }
449    }
450
451    // -------------- static util methods ------------------
452

453    public static Fqn getBackupFqn(Object JavaDoc buddyGroupName, Fqn origFqn)
454    {
455       List JavaDoc<Object JavaDoc> elements = new ArrayList JavaDoc<Object JavaDoc>();
456       elements.add(BUDDY_BACKUP_SUBTREE);
457       elements.add(buddyGroupName);
458       elements.addAll(origFqn.peekElements());
459
460       return new Fqn(elements);
461    }
462
463    public static Fqn getBackupFqn(Fqn buddyGroupRoot, Fqn origFqn)
464    {
465       if (origFqn.isChildOf(buddyGroupRoot))
466       {
467          return origFqn;
468       }
469
470       List JavaDoc<Object JavaDoc> elements = new ArrayList JavaDoc<Object JavaDoc>();
471       elements.add(BUDDY_BACKUP_SUBTREE);
472       elements.add(buddyGroupRoot.get(1));
473       elements.addAll(origFqn.peekElements());
474
475       return new Fqn(elements);
476    }
477
478    public static boolean isBackupFqn(Fqn name)
479    {
480       return name != null && name.hasElement(BuddyManager.BUDDY_BACKUP_SUBTREE);
481    }
482
483    // -------------- methods to be called by the BaseRPCINterceptor --------------------
484

485    /**
486     * Returns a list of buddies for which this instance is Data Owner.
487     * List excludes self. Used by the BaseRPCInterceptor when deciding
488     * who to replicate to.
489     */

490    public List JavaDoc<Address> getBuddyAddresses()
491    {
492       return buddyGroup.buddies;
493    }
494
495    /**
496     * Introspects method call for Fqns and changes them such that they
497     * are under the current buddy group's backup subtree
498     * (e.g., /_buddy_backup_/my_host:7890/) rather than the root (/).
499     * Called by BaseRPCInterceptor to transform method calls before broadcasting.
500     */

501    public MethodCall transformFqns(MethodCall call)
502    {
503       return transformFqns(call, call.getMethodId() != MethodDeclarations.dataGravitationCleanupMethod_id);
504    }
505
506    public MethodCall transformFqns(MethodCall call, boolean transformForCurrentCall)
507    {
508       if (call != null && call.getArgs() != null)
509       {
510          MethodCall call2 = new MethodCall(call.getMethod(), call.getArgs().clone(), call.getMethodId());
511          handleArgs(call2.getArgs(), transformForCurrentCall);
512          return call2;
513       }
514       else
515       {
516          return call;
517       }
518    }
519
520    // -------------- internal helpers methods --------------------
521

522    private void removeFromGroup(List JavaDoc<Address> buddies) throws InterruptedException JavaDoc
523    {
524       if (log.isDebugEnabled())
525       {
526          log.debug("Removing obsolete buddies from buddy group [" + buddyGroup.getGroupName() + "]. Obsolete buddies are " + buddies);
527       }
528       buddyGroup.buddies.removeAll(buddies);
529       // now broadcast a message to the removed buddies.
530
MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteRemoveFromBuddyGroupMethod, buddyGroup.getGroupName());
531       MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
532
533       int attemptsLeft = UNINIT_BUDDIES_RETRIES;
534       int currentAttempt = 0;
535
536       while (attemptsLeft-- > 0)
537       {
538          try
539          {
540             makeRemoteCall(buddies, replicateCall);
541             break;
542          }
543          catch (Exception JavaDoc e)
544          {
545             if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException)
546             {
547                if (attemptsLeft > 0)
548                {
549                   log.info("One of the buddies have not been initialised. Will retry after a short nap.");
550                   Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
551                }
552                else
553                {
554                   throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries");
555                }
556             }
557             else
558             {
559                log.error("Unable to communicate with Buddy for some reason", e);
560             }
561          }
562       }
563       log.trace("removeFromGroup notification complete");
564    }
565
566    private void addBuddies(List JavaDoc<Address> buddies) throws Exception JavaDoc
567    {
568       // this check is redundant - if buddies is empty this method will not be called. - Manik
569

570       // if (buddies.size() == 0)
571
// return;
572

573
574       if (log.isDebugEnabled())
575       {
576          log.debug("Assigning new buddies to buddy group [" + buddyGroup.getGroupName() + "]. New buddies are " + buddies);
577       }
578
579
580       buddyGroup.buddies.addAll(buddies);
581
582       // Create the state transfer map
583

584       Map JavaDoc<Fqn, byte[]> stateMap = new HashMap JavaDoc<Fqn, byte[]>();
585       byte[] state;
586       if (cache.getConfiguration().isUseRegionBasedMarshalling())
587       {
588          Collection JavaDoc<Region> regions = cache.getRegionManager().getAllMarshallingRegions();
589          if (regions.size() > 0)
590          {
591             for (Region r : regions)
592             {
593                Fqn f = r.getFqn();
594                state = acquireState(f);
595                if (state != null)
596                {
597                   stateMap.put(f, state);
598                }
599             }
600          }
601          else if (!cache.getConfiguration().isInactiveOnStartup())
602          {
603             // No regions defined; try the root
604
state = acquireState(Fqn.ROOT);
605             if (state != null)
606             {
607                stateMap.put(Fqn.ROOT, state);
608             }
609          }
610       }
611       else
612       {
613          state = acquireState(Fqn.ROOT);
614          if (state != null)
615          {
616             stateMap.put(Fqn.ROOT, state);
617          }
618       }
619
620       // now broadcast a message to the newly assigned buddies.
621
MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAssignToBuddyGroupMethod, buddyGroup, stateMap);
622       MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
623
624       int attemptsLeft = UNINIT_BUDDIES_RETRIES;
625       int currentAttempt = 0;
626
627       while (attemptsLeft-- > 0)
628       {
629          try
630          {
631             makeRemoteCall(buddies, replicateCall);
632             break;
633          }
634          catch (Exception JavaDoc e)
635          {
636             if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException)
637             {
638                if (attemptsLeft > 0)
639                {
640                   log.info("One of the buddies have not been initialised. Will retry after a short nap.");
641                   Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
642
643                }
644                else
645                {
646                   throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries");
647                }
648             }
649             else
650             {
651                log.error("Unable to communicate with Buddy for some reason", e);
652             }
653          }
654       }
655
656       log.trace("addToGroup notification complete");
657    }
658
659    private byte[] acquireState(Fqn fqn) throws Exception JavaDoc
660    {
661       // Call _getState with progressively longer timeouts until we
662
// get state or it doesn't throw a TimeoutException
663
long[] timeouts = {400, 800, 1600};
664       TimeoutException timeoutException = null;
665
666       boolean trace = log.isTraceEnabled();
667
668       for (int i = 0; i < timeouts.length; i++)
669       {
670          timeoutException = null;
671
672          boolean force = (i == timeouts.length - 1);
673
674          try
675          {
676             byte[] state = cache.generateState(fqn, timeouts[i], force, false);
677             if (log.isDebugEnabled())
678             {
679                log.debug("acquireState(): got state");
680             }
681             return state;
682          }
683          catch (TimeoutException t)
684          {
685             timeoutException = t;
686             if (trace)
687             {
688                log.trace("acquireState(): got a TimeoutException");
689             }
690          }
691          catch (Exception JavaDoc e)
692          {
693             throw e;
694          }
695          catch (Throwable JavaDoc t)
696          {
697             throw new RuntimeException JavaDoc(t);
698          }
699       }
700
701       // If we got a timeout exception on the final try,
702
// this is a failure condition
703
if (timeoutException != null)
704       {
705          throw new CacheException("acquireState(): Failed getting state due to timeout",
706                  timeoutException);
707       }
708
709       if (log.isDebugEnabled())
710       {
711          log.debug("acquireState(): Unable to give state");
712       }
713
714       return null;
715    }
716
717    /**
718     * Called by the BuddyGroupMembershipMonitor every time a view change occurs.
719     */

720    private void broadcastBuddyPoolMembership()
721    {
722       broadcastBuddyPoolMembership(null);
723    }
724
725    private void broadcastBuddyPoolMembership(List JavaDoc<Address> recipients)
726    {
727       // broadcast to other caches
728
if (log.isDebugEnabled())
729       {
730          log.debug("Instance " + buddyGroup.getDataOwner() + " broadcasting membership in buddy pool " + config.getBuddyPoolName() + " to recipients " + recipients);
731       }
732
733       MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAnnounceBuddyPoolNameMethod, buddyGroup.getDataOwner(), config.getBuddyPoolName());
734       MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
735
736       try
737       {
738          makeRemoteCall(recipients, replicateCall);
739       }
740       catch (Exception JavaDoc e)
741       {
742          log.error("Problems broadcasting buddy pool membership info to cluster", e);
743       }
744    }
745
746    private void makeRemoteCall(List JavaDoc<Address> recipients, MethodCall call) throws Exception JavaDoc
747    {
748       // remove non-members from dest list
749
if (recipients != null)
750       {
751          Iterator JavaDoc<Address> recipientsIt = recipients.iterator();
752          List JavaDoc<Address> members = cache.getMembers();
753          while (recipientsIt.hasNext())
754          {
755             if (!members.contains(recipientsIt.next()))
756             {
757                recipientsIt.remove();
758
759             }
760          }
761       }
762
763       cache.callRemoteMethods(recipients, call, true, true, config.getBuddyCommunicationTimeout());
764    }
765
766
767    private void handleArgs(Object JavaDoc[] args, boolean transformForCurrentCall)
768    {
769       for (int i = 0; i < args.length; i++)
770       {
771          if (args[i] instanceof MethodCall)
772          {
773             MethodCall call = (MethodCall) args[i];
774             boolean transformFqns = true;
775             if (call.getMethodId() == MethodDeclarations.dataGravitationCleanupMethod_id)
776             {
777                transformFqns = false;
778             }
779
780             args[i] = transformFqns((MethodCall) args[i], transformFqns);
781          }
782
783          if (args[i] instanceof List JavaDoc && args[i] != null)
784          {
785             Object JavaDoc[] asArray = ((List JavaDoc) args[i]).toArray();
786             handleArgs(asArray, transformForCurrentCall);
787             List JavaDoc newList = new ArrayList JavaDoc(asArray.length);
788             // Oops! JDK 5.0!
789
//Collections.addAll(newList, asArray);
790
newList.addAll(Arrays.asList(asArray));
791             args[i] = newList;
792          }
793
794          if (args[i] instanceof Fqn)
795          {
796             Fqn fqn = (Fqn) args[i];
797             if (transformForCurrentCall) args[i] = getBackupFqn(fqn);
798          }
799       }
800    }
801
802    /**
803     * Assumes the backup Fqn if the current instance is the data owner
804     *
805     * @param originalFqn
806     * @return backup fqn
807     */

808    public Fqn getBackupFqn(Fqn originalFqn)
809    {
810       return getBackupFqn(buddyGroup == null || buddyGroup.getGroupName() == null ? "null" : buddyGroup.getGroupName(), originalFqn);
811    }
812
813    /**
814     * Blocks until the BuddyManager has finished initialising
815     */

816    private void waitForInit()
817    {
818       while (!initialised)
819       {
820          try
821          {
822             Thread.sleep(100);
823          }
824          catch (InterruptedException JavaDoc e)
825          {
826          }
827       }
828    }
829
830    public static Fqn getActualFqn(Fqn fqn)
831    {
832       if (!isBackupFqn(fqn)) return fqn;
833       List JavaDoc elements = new ArrayList JavaDoc(fqn.peekElements());
834
835       // remove the first 2 elements
836
elements.remove(0);
837       elements.remove(0);
838
839       return new Fqn(elements);
840    }
841
842
843    /**
844     * Asynchronous thread that deals with handling view changes placed on a queue
845     */

846    private class AsyncViewChangeHandlerThread implements Runnable JavaDoc
847    {
848       private Thread JavaDoc t;
849
850       public void start()
851       {
852          if (t == null || !t.isAlive())
853          {
854             t = new Thread JavaDoc(this);
855             t.setName("AsyncViewChangeHandlerThread-" + threadId.getAndIncrement());
856             t.setDaemon(true);
857             t.start();
858          }
859       }
860
861       public void run()
862       {
863          // don't start this thread until the Buddy Manager has initialised as it cocks things up.
864
waitForInit();
865          while (!Thread.interrupted())
866          {
867             try
868             {
869                handleEnqueuedViewChange();
870             }
871             catch (InterruptedException JavaDoc e)
872             {
873                break;
874             }
875             catch (Throwable JavaDoc t)
876             {
877                // Don't let the thread die
878
log.error("Caught exception handling view change", t);
879             }
880          }
881          log.trace("Exiting run()");
882       }
883
884       private void handleEnqueuedViewChange() throws Exception JavaDoc
885       {
886          log.trace("Waiting for enqueued view change events");
887          MembershipChange members = queue.take();
888
889          broadcastPoolMembership(members);
890
891          boolean rebroadcast = false;
892
893          // make sure new buddies have broadcast their pool memberships.
894
while (!buddyPoolInfoAvailable(members.newMembers))
895          {
896             rebroadcast = true;
897             synchronized (poolInfoNotifierLock)
898             {
899                log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
900                poolInfoNotifierLock.wait();
901             }
902          }
903
904          if (rebroadcast) broadcastPoolMembership(members);
905
906          // always refresh buddy list.
907
reassignBuddies(members.newMembers);
908       }
909
910       private void broadcastPoolMembership(MembershipChange members)
911       {
912          log.trace("Broadcasting pool membership details, triggered by view change.");
913          if (members.oldMembers == null)
914          {
915             broadcastBuddyPoolMembership();
916          }
917          else
918          {
919             List JavaDoc<Address> delta = new ArrayList JavaDoc<Address>();
920             delta.addAll(members.newMembers);
921             delta.removeAll(members.oldMembers);
922             broadcastBuddyPoolMembership(delta);
923          }
924       }
925
926       private boolean buddyPoolInfoAvailable(List JavaDoc<Address> newMembers)
927       {
928          boolean infoReceived = true;
929          for (Address address : newMembers)
930          {
931             // make sure no one is concurrently writing to nullBuddyPool.
932
synchronized (nullBuddyPool)
933             {
934                // log.trace("Testing on node " + buddyGroup.getDataOwner() + " for candidate " + address);
935
// log.trace("Is me? " + address.equals(cache.getLocalAddress()));
936
// log.trace("is in bP? " + buddyPool.keySet().contains(address));
937
// log.trace("is in nBP? " + nullBuddyPool.contains(address));
938
infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
939             }
940          }
941
942          if (log.isTraceEnabled())
943          {
944             log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "? " + infoReceived);
945          }
946
947          return infoReceived;
948       }
949    }
950 }
Popular Tags