1 7 package org.jboss.cache.buddyreplication; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.AbstractCacheListener; 12 import org.jboss.cache.CacheException; 13 import org.jboss.cache.CacheImpl; 14 import org.jboss.cache.Fqn; 15 import org.jboss.cache.Region; 16 import org.jboss.cache.config.BuddyReplicationConfig; 17 import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig; 18 import org.jboss.cache.lock.TimeoutException; 19 import org.jboss.cache.marshall.MethodCall; 20 import org.jboss.cache.marshall.MethodCallFactory; 21 import org.jboss.cache.marshall.MethodDeclarations; 22 import org.jboss.cache.marshall.VersionAwareMarshaller; 23 import org.jboss.cache.statetransfer.StateTransferManager; 24 import org.jboss.util.stream.MarshalledValueInputStream; 25 import org.jgroups.Address; 26 import org.jgroups.View; 27 28 import java.io.ByteArrayInputStream ; 29 import java.util.ArrayList ; 30 import java.util.Arrays ; 31 import java.util.Collection ; 32 import java.util.HashMap ; 33 import java.util.HashSet ; 34 import java.util.Iterator ; 35 import java.util.List ; 36 import java.util.Map ; 37 import java.util.Set ; 38 import java.util.Vector ; 39 import java.util.concurrent.BlockingQueue ; 40 import java.util.concurrent.ConcurrentHashMap ; 41 import java.util.concurrent.LinkedBlockingQueue ; 42 import java.util.concurrent.atomic.AtomicInteger ; 43 44 49 public class BuddyManager 50 { 51 private static Log log = LogFactory.getLog(BuddyManager.class); 52 53 56 final BuddyReplicationConfig config; 57 58 61 BuddyLocator buddyLocator; 62 63 66 private CacheImpl cache; 67 68 71 BuddyGroup buddyGroup; 72 73 76 Map <Address, String > buddyPool = new ConcurrentHashMap <Address, String >(); 77 78 81 final Set <Address> nullBuddyPool = new HashSet <Address>(); 82 83 88 Map <String , BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap <String , BuddyGroup>(); 89 90 93 private final BlockingQueue <MembershipChange> queue = new LinkedBlockingQueue <MembershipChange>(); 94 95 98 private AsyncViewChangeHandlerThread asyncViewChangeHandler = new AsyncViewChangeHandlerThread(); 99 private static AtomicInteger threadId = new AtomicInteger (0); 100 101 104 public static final String BUDDY_BACKUP_SUBTREE = "_BUDDY_BACKUP_"; 105 public static final Fqn BUDDY_BACKUP_SUBTREE_FQN = Fqn.fromString(BUDDY_BACKUP_SUBTREE); 106 107 110 private static int UNINIT_BUDDIES_RETRIES = 5; 111 114 private static final long[] UNINIT_BUDDIES_RETRY_NAPTIME = {500, 1000, 1500, 2000, 2500}; 115 116 119 private final Object poolInfoNotifierLock = new Object (); 120 121 122 125 private boolean initialised = false; 126 128 public BuddyManager(BuddyReplicationConfig config) 129 { 130 this.config = config; 131 132 BuddyLocatorConfig blc = config.getBuddyLocatorConfig(); 133 try 134 { 135 buddyLocator = (blc == null) ? createDefaultBuddyLocator() : createBuddyLocator(blc); 137 } 138 catch (Exception e) 139 { 140 log.warn("Caught exception instantiating buddy locator", e); 141 log.error("Unable to instantiate specified buddyLocatorClass [" + blc + "]. Using default buddyLocator [" + NextMemberBuddyLocator.class.getName() + "] instead, with default properties."); 142 buddyLocator = createDefaultBuddyLocator(); 143 } 144 145 if (blc != buddyLocator.getConfig()) 147 { 148 config.setBuddyLocatorConfig(buddyLocator.getConfig()); 149 } 150 } 151 152 public BuddyReplicationConfig getConfig() 153 { 154 return config; 155 } 156 157 protected BuddyLocator createBuddyLocator(BuddyLocatorConfig config) throws ClassNotFoundException , IllegalAccessException , InstantiationException 158 { 159 BuddyLocator bl = (BuddyLocator) Class.forName(config.getBuddyLocatorClass()).newInstance(); 160 bl.init(config); 161 return bl; 162 } 163 164 protected BuddyLocator createDefaultBuddyLocator() 165 { 166 BuddyLocator bl = new NextMemberBuddyLocator(); 167 bl.init(null); 168 return bl; 169 } 170 171 public boolean isEnabled() 172 { 173 return config.isEnabled(); 174 } 175 176 public String getBuddyPoolName() 177 { 178 return config.getBuddyPoolName(); 179 } 180 181 public static String getGroupNameFromAddress(Object address) 182 { 183 String s = address.toString(); 184 return s.replace(':', '_'); 185 } 186 187 public void init(CacheImpl cache) throws Exception 188 { 189 log.debug("Starting buddy manager"); 190 this.cache = cache; 191 buddyGroup = new BuddyGroup(); 192 buddyGroup.setDataOwner(cache.getLocalAddress()); 193 buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress())); 194 195 if (config.getBuddyPoolName() != null) 196 { 197 buddyPool.put(buddyGroup.getDataOwner(), config.getBuddyPoolName()); 198 } 199 200 broadcastBuddyPoolMembership(); 201 202 initialised = true; 204 205 cache.getNotifier().addCacheListener(new AbstractCacheListener() 207 { 208 private Vector <Address> oldMembers; 209 210 public void viewChange(View newView) 211 { 212 Vector <Address> newMembers = newView.getMembers(); 213 214 if (config.getBuddyPoolName() == null) 216 { 217 enqueueViewChange(null, newMembers); 218 } 219 else 220 { 221 enqueueViewChange(oldMembers == null ? null : new Vector <Address>(oldMembers), new Vector <Address>(newMembers)); 222 if (oldMembers == null) oldMembers = new Vector <Address>(); 223 oldMembers.clear(); 224 oldMembers.addAll(newMembers); 225 } 226 } 227 }); 228 229 reassignBuddies(cache.getMembers()); 231 asyncViewChangeHandler.start(); 232 } 233 234 public boolean isAutoDataGravitation() 235 { 236 return config.isAutoDataGravitation(); 237 } 238 239 public boolean isDataGravitationRemoveOnFind() 240 { 241 return config.isDataGravitationRemoveOnFind(); 242 } 243 244 public boolean isDataGravitationSearchBackupTrees() 245 { 246 return config.isDataGravitationSearchBackupTrees(); 247 } 248 249 public int getBuddyCommunicationTimeout() 250 { 251 return config.getBuddyCommunicationTimeout(); 252 } 253 254 256 static class MembershipChange 257 { 258 List <Address> oldMembers; 259 List <Address> newMembers; 260 261 public MembershipChange(List <Address> oldMembers, List <Address> newMembers) 262 { 263 this.oldMembers = oldMembers; 264 this.newMembers = newMembers; 265 } 266 } 267 268 private void enqueueViewChange(List <Address> oldMembers, List <Address> newMembers) 269 { 270 try 272 { 273 queue.put(new MembershipChange(oldMembers, newMembers)); 274 } 275 catch (InterruptedException e) 276 { 277 log.warn("Caught interrupted exception trying to enqueue a view change event", e); 278 } 279 } 280 281 288 private void reassignBuddies(List <Address> membership) throws Exception 289 { 290 if (log.isDebugEnabled()) 291 { 292 log.debug("Data owner address " + cache.getLocalAddress()); 293 log.debug("Entering updateGroup. Current group: " + buddyGroup + ". Current View membership: " + membership); 294 } 295 List <Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner()); 297 List <Address> uninitialisedBuddies = new ArrayList <Address>(); 298 for (Address newBuddy : newBuddies) 299 { 300 if (!buddyGroup.buddies.contains(newBuddy)) 301 { 302 uninitialisedBuddies.add(newBuddy); 303 } 304 } 305 306 List <Address> obsoleteBuddies = new ArrayList <Address>(); 307 for (Address origBuddy : buddyGroup.buddies) 309 { 310 if (!newBuddies.contains(origBuddy)) 311 { 312 obsoleteBuddies.add(origBuddy); 313 } 314 } 315 316 if (!obsoleteBuddies.isEmpty()) 318 { 319 removeFromGroup(obsoleteBuddies); 320 } 321 else 322 { 323 log.trace("No obsolete buddies found, nothing to announce."); 324 } 325 if (!uninitialisedBuddies.isEmpty()) 326 { 327 addBuddies(uninitialisedBuddies); 328 } 329 else 330 { 331 log.trace("No uninitialized buddies found, nothing to announce."); 332 } 333 334 log.info("New buddy group: " + buddyGroup); 335 } 336 337 339 343 public void handlePoolNameBroadcast(Address address, String poolName) 344 { 345 if (log.isDebugEnabled()) 346 { 347 log.debug(buddyGroup.getDataOwner() + ": received announcement that cache instance " + address + " is in buddy pool " + poolName); 348 } 349 if (poolName != null) 350 { 351 buddyPool.put(address, poolName); 352 } 353 else 354 { 355 synchronized (nullBuddyPool) 356 { 357 if (!nullBuddyPool.contains(address)) nullBuddyPool.add(address); 358 } 359 } 360 361 synchronized (poolInfoNotifierLock) 363 { 364 log.trace("Notifying any waiting view change threads that we have received buddy pool info."); 365 poolInfoNotifierLock.notifyAll(); 366 } 367 } 368 369 373 public void handleRemoveFromBuddyGroup(String groupName) throws BuddyNotInitException 374 { 375 if (!initialised) throw new BuddyNotInitException("Not yet initialised"); 376 377 if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName); 378 buddyGroupsIParticipateIn.remove(groupName); 379 380 if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName); 382 try 383 { 384 cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName)); 385 } 386 catch (CacheException e) 387 { 388 log.error("Unable to remove backup data for group " + groupName, e); 389 } 390 } 391 392 400 public void handleAssignToBuddyGroup(BuddyGroup newGroup, Map <Fqn, byte[]> state) throws Exception 401 { 402 if (!initialised) throw new BuddyNotInitException("Not yet initialised"); 403 404 if (log.isInfoEnabled()) log.info("Assigning self to buddy group " + newGroup); 405 buddyGroupsIParticipateIn.put(newGroup.getGroupName(), newGroup); 406 407 Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, 409 newGroup.getGroupName()); 410 VersionAwareMarshaller marshaller = null; 411 if (cache.getConfiguration().isUseRegionBasedMarshalling()) 412 { 413 marshaller = cache.getMarshaller(); 414 } 415 416 StateTransferManager stateMgr = cache.getStateTransferManager(); 417 418 for (Iterator it = state.entrySet().iterator(); it.hasNext();) 419 { 420 Map.Entry entry = (Map.Entry ) it.next(); 421 Fqn fqn = (Fqn) entry.getKey(); 422 String fqnS = fqn.toString(); 423 if (marshaller == null || !marshaller.isInactive(fqn.toString())) 424 { 425 ClassLoader cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS); 426 Fqn integrationRoot = new Fqn(integrationBase, fqn); 427 428 byte[] stateBuffer = (byte[]) entry.getValue(); 429 MarshalledValueInputStream in = null; 430 try 431 { 432 ByteArrayInputStream bais = new ByteArrayInputStream (stateBuffer); 433 in = new MarshalledValueInputStream(bais); 434 stateMgr.setState(in, integrationRoot, cl); 435 } 436 catch (Throwable t) 437 { 438 log.error("State for fqn " + fqn + " could not be transferred to a buddy at " + cache.getLocalAddress()); 439 } 440 finally 441 { 442 if (in != null) 443 { 444 in.close(); 445 } 446 } 447 } 448 } 449 } 450 451 453 public static Fqn getBackupFqn(Object buddyGroupName, Fqn origFqn) 454 { 455 List <Object > elements = new ArrayList <Object >(); 456 elements.add(BUDDY_BACKUP_SUBTREE); 457 elements.add(buddyGroupName); 458 elements.addAll(origFqn.peekElements()); 459 460 return new Fqn(elements); 461 } 462 463 public static Fqn getBackupFqn(Fqn buddyGroupRoot, Fqn origFqn) 464 { 465 if (origFqn.isChildOf(buddyGroupRoot)) 466 { 467 return origFqn; 468 } 469 470 List <Object > elements = new ArrayList <Object >(); 471 elements.add(BUDDY_BACKUP_SUBTREE); 472 elements.add(buddyGroupRoot.get(1)); 473 elements.addAll(origFqn.peekElements()); 474 475 return new Fqn(elements); 476 } 477 478 public static boolean isBackupFqn(Fqn name) 479 { 480 return name != null && name.hasElement(BuddyManager.BUDDY_BACKUP_SUBTREE); 481 } 482 483 485 490 public List <Address> getBuddyAddresses() 491 { 492 return buddyGroup.buddies; 493 } 494 495 501 public MethodCall transformFqns(MethodCall call) 502 { 503 return transformFqns(call, call.getMethodId() != MethodDeclarations.dataGravitationCleanupMethod_id); 504 } 505 506 public MethodCall transformFqns(MethodCall call, boolean transformForCurrentCall) 507 { 508 if (call != null && call.getArgs() != null) 509 { 510 MethodCall call2 = new MethodCall(call.getMethod(), call.getArgs().clone(), call.getMethodId()); 511 handleArgs(call2.getArgs(), transformForCurrentCall); 512 return call2; 513 } 514 else 515 { 516 return call; 517 } 518 } 519 520 522 private void removeFromGroup(List <Address> buddies) throws InterruptedException 523 { 524 if (log.isDebugEnabled()) 525 { 526 log.debug("Removing obsolete buddies from buddy group [" + buddyGroup.getGroupName() + "]. Obsolete buddies are " + buddies); 527 } 528 buddyGroup.buddies.removeAll(buddies); 529 MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteRemoveFromBuddyGroupMethod, buddyGroup.getGroupName()); 531 MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall); 532 533 int attemptsLeft = UNINIT_BUDDIES_RETRIES; 534 int currentAttempt = 0; 535 536 while (attemptsLeft-- > 0) 537 { 538 try 539 { 540 makeRemoteCall(buddies, replicateCall); 541 break; 542 } 543 catch (Exception e) 544 { 545 if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException) 546 { 547 if (attemptsLeft > 0) 548 { 549 log.info("One of the buddies have not been initialised. Will retry after a short nap."); 550 Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]); 551 } 552 else 553 { 554 throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries"); 555 } 556 } 557 else 558 { 559 log.error("Unable to communicate with Buddy for some reason", e); 560 } 561 } 562 } 563 log.trace("removeFromGroup notification complete"); 564 } 565 566 private void addBuddies(List <Address> buddies) throws Exception 567 { 568 570 573 574 if (log.isDebugEnabled()) 575 { 576 log.debug("Assigning new buddies to buddy group [" + buddyGroup.getGroupName() + "]. New buddies are " + buddies); 577 } 578 579 580 buddyGroup.buddies.addAll(buddies); 581 582 584 Map <Fqn, byte[]> stateMap = new HashMap <Fqn, byte[]>(); 585 byte[] state; 586 if (cache.getConfiguration().isUseRegionBasedMarshalling()) 587 { 588 Collection <Region> regions = cache.getRegionManager().getAllMarshallingRegions(); 589 if (regions.size() > 0) 590 { 591 for (Region r : regions) 592 { 593 Fqn f = r.getFqn(); 594 state = acquireState(f); 595 if (state != null) 596 { 597 stateMap.put(f, state); 598 } 599 } 600 } 601 else if (!cache.getConfiguration().isInactiveOnStartup()) 602 { 603 state = acquireState(Fqn.ROOT); 605 if (state != null) 606 { 607 stateMap.put(Fqn.ROOT, state); 608 } 609 } 610 } 611 else 612 { 613 state = acquireState(Fqn.ROOT); 614 if (state != null) 615 { 616 stateMap.put(Fqn.ROOT, state); 617 } 618 } 619 620 MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAssignToBuddyGroupMethod, buddyGroup, stateMap); 622 MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall); 623 624 int attemptsLeft = UNINIT_BUDDIES_RETRIES; 625 int currentAttempt = 0; 626 627 while (attemptsLeft-- > 0) 628 { 629 try 630 { 631 makeRemoteCall(buddies, replicateCall); 632 break; 633 } 634 catch (Exception e) 635 { 636 if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException) 637 { 638 if (attemptsLeft > 0) 639 { 640 log.info("One of the buddies have not been initialised. Will retry after a short nap."); 641 Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]); 642 643 } 644 else 645 { 646 throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries"); 647 } 648 } 649 else 650 { 651 log.error("Unable to communicate with Buddy for some reason", e); 652 } 653 } 654 } 655 656 log.trace("addToGroup notification complete"); 657 } 658 659 private byte[] acquireState(Fqn fqn) throws Exception 660 { 661 long[] timeouts = {400, 800, 1600}; 664 TimeoutException timeoutException = null; 665 666 boolean trace = log.isTraceEnabled(); 667 668 for (int i = 0; i < timeouts.length; i++) 669 { 670 timeoutException = null; 671 672 boolean force = (i == timeouts.length - 1); 673 674 try 675 { 676 byte[] state = cache.generateState(fqn, timeouts[i], force, false); 677 if (log.isDebugEnabled()) 678 { 679 log.debug("acquireState(): got state"); 680 } 681 return state; 682 } 683 catch (TimeoutException t) 684 { 685 timeoutException = t; 686 if (trace) 687 { 688 log.trace("acquireState(): got a TimeoutException"); 689 } 690 } 691 catch (Exception e) 692 { 693 throw e; 694 } 695 catch (Throwable t) 696 { 697 throw new RuntimeException (t); 698 } 699 } 700 701 if (timeoutException != null) 704 { 705 throw new CacheException("acquireState(): Failed getting state due to timeout", 706 timeoutException); 707 } 708 709 if (log.isDebugEnabled()) 710 { 711 log.debug("acquireState(): Unable to give state"); 712 } 713 714 return null; 715 } 716 717 720 private void broadcastBuddyPoolMembership() 721 { 722 broadcastBuddyPoolMembership(null); 723 } 724 725 private void broadcastBuddyPoolMembership(List <Address> recipients) 726 { 727 if (log.isDebugEnabled()) 729 { 730 log.debug("Instance " + buddyGroup.getDataOwner() + " broadcasting membership in buddy pool " + config.getBuddyPoolName() + " to recipients " + recipients); 731 } 732 733 MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAnnounceBuddyPoolNameMethod, buddyGroup.getDataOwner(), config.getBuddyPoolName()); 734 MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall); 735 736 try 737 { 738 makeRemoteCall(recipients, replicateCall); 739 } 740 catch (Exception e) 741 { 742 log.error("Problems broadcasting buddy pool membership info to cluster", e); 743 } 744 } 745 746 private void makeRemoteCall(List <Address> recipients, MethodCall call) throws Exception 747 { 748 if (recipients != null) 750 { 751 Iterator <Address> recipientsIt = recipients.iterator(); 752 List <Address> members = cache.getMembers(); 753 while (recipientsIt.hasNext()) 754 { 755 if (!members.contains(recipientsIt.next())) 756 { 757 recipientsIt.remove(); 758 759 } 760 } 761 } 762 763 cache.callRemoteMethods(recipients, call, true, true, config.getBuddyCommunicationTimeout()); 764 } 765 766 767 private void handleArgs(Object [] args, boolean transformForCurrentCall) 768 { 769 for (int i = 0; i < args.length; i++) 770 { 771 if (args[i] instanceof MethodCall) 772 { 773 MethodCall call = (MethodCall) args[i]; 774 boolean transformFqns = true; 775 if (call.getMethodId() == MethodDeclarations.dataGravitationCleanupMethod_id) 776 { 777 transformFqns = false; 778 } 779 780 args[i] = transformFqns((MethodCall) args[i], transformFqns); 781 } 782 783 if (args[i] instanceof List && args[i] != null) 784 { 785 Object [] asArray = ((List ) args[i]).toArray(); 786 handleArgs(asArray, transformForCurrentCall); 787 List newList = new ArrayList (asArray.length); 788 newList.addAll(Arrays.asList(asArray)); 791 args[i] = newList; 792 } 793 794 if (args[i] instanceof Fqn) 795 { 796 Fqn fqn = (Fqn) args[i]; 797 if (transformForCurrentCall) args[i] = getBackupFqn(fqn); 798 } 799 } 800 } 801 802 808 public Fqn getBackupFqn(Fqn originalFqn) 809 { 810 return getBackupFqn(buddyGroup == null || buddyGroup.getGroupName() == null ? "null" : buddyGroup.getGroupName(), originalFqn); 811 } 812 813 816 private void waitForInit() 817 { 818 while (!initialised) 819 { 820 try 821 { 822 Thread.sleep(100); 823 } 824 catch (InterruptedException e) 825 { 826 } 827 } 828 } 829 830 public static Fqn getActualFqn(Fqn fqn) 831 { 832 if (!isBackupFqn(fqn)) return fqn; 833 List elements = new ArrayList (fqn.peekElements()); 834 835 elements.remove(0); 837 elements.remove(0); 838 839 return new Fqn(elements); 840 } 841 842 843 846 private class AsyncViewChangeHandlerThread implements Runnable 847 { 848 private Thread t; 849 850 public void start() 851 { 852 if (t == null || !t.isAlive()) 853 { 854 t = new Thread (this); 855 t.setName("AsyncViewChangeHandlerThread-" + threadId.getAndIncrement()); 856 t.setDaemon(true); 857 t.start(); 858 } 859 } 860 861 public void run() 862 { 863 waitForInit(); 865 while (!Thread.interrupted()) 866 { 867 try 868 { 869 handleEnqueuedViewChange(); 870 } 871 catch (InterruptedException e) 872 { 873 break; 874 } 875 catch (Throwable t) 876 { 877 log.error("Caught exception handling view change", t); 879 } 880 } 881 log.trace("Exiting run()"); 882 } 883 884 private void handleEnqueuedViewChange() throws Exception 885 { 886 log.trace("Waiting for enqueued view change events"); 887 MembershipChange members = queue.take(); 888 889 broadcastPoolMembership(members); 890 891 boolean rebroadcast = false; 892 893 while (!buddyPoolInfoAvailable(members.newMembers)) 895 { 896 rebroadcast = true; 897 synchronized (poolInfoNotifierLock) 898 { 899 log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock."); 900 poolInfoNotifierLock.wait(); 901 } 902 } 903 904 if (rebroadcast) broadcastPoolMembership(members); 905 906 reassignBuddies(members.newMembers); 908 } 909 910 private void broadcastPoolMembership(MembershipChange members) 911 { 912 log.trace("Broadcasting pool membership details, triggered by view change."); 913 if (members.oldMembers == null) 914 { 915 broadcastBuddyPoolMembership(); 916 } 917 else 918 { 919 List <Address> delta = new ArrayList <Address>(); 920 delta.addAll(members.newMembers); 921 delta.removeAll(members.oldMembers); 922 broadcastBuddyPoolMembership(delta); 923 } 924 } 925 926 private boolean buddyPoolInfoAvailable(List <Address> newMembers) 927 { 928 boolean infoReceived = true; 929 for (Address address : newMembers) 930 { 931 synchronized (nullBuddyPool) 933 { 934 infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address)); 939 } 940 } 941 942 if (log.isTraceEnabled()) 943 { 944 log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "? " + infoReceived); 945 } 946 947 return infoReceived; 948 } 949 } 950 } | Popular Tags |