1 22 package org.jboss.ha.framework.server; 23 24 import java.util.Set ; 25 import java.util.Vector ; 26 import java.util.ArrayList ; 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.Collection ; 30 import java.util.HashSet ; 31 import java.util.List ; 32 import java.util.Map ; 33 34 import java.io.Serializable ; 35 36 import javax.management.MBeanServer ; 37 import javax.management.ObjectName ; 38 39 import EDU.oswego.cs.dl.util.concurrent.Latch; 40 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 41 42 import org.jboss.logging.Logger; 43 44 import org.jboss.ha.framework.interfaces.ClusterNode; 45 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 46 import org.jboss.ha.framework.interfaces.HAPartition; 47 48 49 57 public class DistributedReplicantManagerImpl 58 implements DistributedReplicantManagerImplMBean, 59 HAPartition.HAMembershipExtendedListener, 60 HAPartition.HAPartitionStateTransfer, 61 AsynchEventHandler.AsynchEventProcessor 62 { 63 65 protected final static String SERVICE_NAME = "DistributedReplicantManager"; 66 67 protected static int threadID; 69 70 protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap(); 71 protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap(); 72 protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap(); 73 protected HashMap intraviewIdCache = new HashMap (); 74 protected HAPartition partition; 75 76 protected AsynchEventHandler asynchHandler; 77 78 protected Logger log; 79 80 protected String nodeName = null; 81 82 protected Latch partitionNameKnown = new Latch (); 83 protected boolean trace; 84 85 protected Class [] add_types=new Class []{String .class, String .class, Serializable .class}; 86 protected Class [] remove_types=new Class []{String .class, String .class}; 87 88 90 92 100 public DistributedReplicantManagerImpl(ClusterPartition partition) 101 { 102 this(partition.getHAPartition()); 103 partition.setDistributedReplicantManager(this); 105 } 106 107 114 public DistributedReplicantManagerImpl(HAPartition partition) 115 { 116 this.partition = partition; 117 this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() + 118 "." + partition.getPartitionName()); 119 this.trace = log.isTraceEnabled(); 120 } 121 122 124 public void create() throws Exception 125 { 126 log.debug("registerRPCHandler"); 127 partition.registerRPCHandler(SERVICE_NAME, this); 128 log.debug("subscribeToStateTransferEvents"); 129 partition.subscribeToStateTransferEvents(SERVICE_NAME, this); 130 log.debug("registerMembershipListener"); 131 partition.registerMembershipListener(this); 132 } 133 134 public void start() throws Exception 135 { 136 this.nodeName = this.partition.getNodeName(); 137 138 asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler"); 140 asynchHandler.start(); 141 142 partitionNameKnown.release (); 144 } 147 148 public void stop() throws Exception 149 { 150 try 152 { 153 asynchHandler.stop(); 154 } 155 catch( Exception e) 156 { 157 log.warn("Failed to stop asynchHandler", e); 158 } 159 160 } 162 163 public void destroy() throws Exception 165 { 166 if (localReplicants != null) 168 { 169 synchronized(localReplicants) 170 { 171 String [] keys = new String [localReplicants.size()]; 172 localReplicants.keySet().toArray(keys); 173 for(int n = 0; n < keys.length; n ++) 174 { 175 this.removeLocal(keys[n]); } 178 } 179 } 180 181 partition.unregisterRPCHandler(SERVICE_NAME, this); 182 partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this); 183 partition.unregisterMembershipListener(this); 184 } 185 186 public void registerWithJmx(MBeanServer server) throws Exception 187 { 188 server.registerMBean(this, getObjectName()); 189 } 190 191 public void unregisterWithJmx(MBeanServer server) throws Exception 192 { 193 server.unregisterMBean(getObjectName()); 194 } 195 196 private ObjectName getObjectName() throws Exception 197 { 198 return new ObjectName ("jboss:service=" + SERVICE_NAME + ",partition=" + partition.getPartitionName()); 199 } 200 201 public String listContent () throws Exception 202 { 203 java.util.Collection services = this.getAllServices (); 206 207 StringBuffer result = new StringBuffer (); 208 java.util.Iterator catsIter = services.iterator (); 209 210 result.append ("<pre>"); 211 212 while (catsIter.hasNext ()) 213 { 214 String category = (String )catsIter.next (); 215 HashMap content = (HashMap )this.replicants.get (category); 216 if (content == null) 217 content = new HashMap (); 218 java.util.Iterator keysIter = content.keySet ().iterator (); 219 220 result.append ("-----------------------------------------------\n"); 221 result.append ("Service : ").append (category).append ("\n\n"); 222 223 Serializable local = lookupLocalReplicant(category); 224 if (local == null) 225 result.append ("\t- Service is *not* available locally\n"); 226 else 227 result.append ("\t- Service *is* also available locally\n"); 228 229 while (keysIter.hasNext ()) 230 { 231 String location = (String )keysIter.next (); 232 result.append ("\t- ").append(location).append ("\n"); 233 } 234 235 result.append ("\n"); 236 237 } 238 239 result.append ("</pre>"); 240 241 return result.toString (); 242 } 243 244 public String listXmlContent () throws Exception 245 { 246 java.util.Collection services = this.getAllServices (); 249 StringBuffer result = new StringBuffer (); 250 251 result.append ("<ReplicantManager>\n"); 252 253 java.util.Iterator catsIter = services.iterator (); 254 while (catsIter.hasNext ()) 255 { 256 String category = (String )catsIter.next (); 257 HashMap content = (HashMap )this.replicants.get (category); 258 if (content == null) 259 content = new HashMap (); 260 java.util.Iterator keysIter = content.keySet ().iterator (); 261 262 result.append ("\t<Service>\n"); 263 result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n"); 264 265 266 Serializable local = lookupLocalReplicant(category); 267 if (local != null) 268 { 269 result.append ("\t\t<Location>\n"); 270 result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n"); 271 result.append ("\t\t</Location>\n"); 272 } 273 274 while (keysIter.hasNext ()) 275 { 276 String location = (String )keysIter.next (); 277 result.append ("\t\t<Location>\n"); 278 result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n"); 279 result.append ("\t\t</Location>\n"); 280 } 281 282 result.append ("\t<Service>\n"); 283 284 } 285 286 result.append ("<ReplicantManager>\n"); 287 288 return result.toString (); 289 } 290 291 293 public Serializable getCurrentState () 294 { 295 java.util.Collection services = this.getAllServices (); 296 HashMap result = new HashMap (); 297 298 java.util.Iterator catsIter = services.iterator (); 299 while (catsIter.hasNext ()) 300 { 301 String category = (String )catsIter.next (); 302 HashMap content = (HashMap )this.replicants.get (category); 303 if (content == null) 304 content = new HashMap (); 305 else 306 content = (HashMap )content.clone (); 307 308 Serializable local = lookupLocalReplicant(category); 309 if (local != null) 310 content.put (this.nodeName, local); 311 312 result.put (category, content); 313 } 314 315 Object [] globalResult = new Object [] {result, intraviewIdCache}; 318 return globalResult; 319 } 320 321 public void setCurrentState(Serializable newState) 322 { 323 Object [] globalState = (Object [])newState; 324 325 HashMap map = (HashMap )globalState[0]; 326 this.replicants.putAll(map); 327 this.intraviewIdCache = (HashMap )globalState[1]; 328 329 if( trace ) 330 { 331 log.trace(nodeName + ": received new state, will republish local replicants"); 332 } 333 MembersPublisher publisher = new MembersPublisher(); 334 publisher.start(); 335 } 336 337 public Collection getAllServices () 338 { 339 HashSet services = new HashSet (); 340 services.addAll (localReplicants.keySet ()); 341 services.addAll (replicants.keySet ()); 342 return services; 343 } 344 345 347 public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups) 348 { 349 log.info("Merging partitions..."); 353 log.info("Dead members: " + deadMembers.size()); 354 log.info("Originating groups: " + originatingGroups); 355 purgeDeadMembers(deadMembers); 356 if (newMembers.size() > 0) 357 { 358 new MergeMembers().start(); 359 } 360 } 361 362 public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) 363 { 364 log.info("I am (" + nodeName + ") received membershipChanged event:"); 368 log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")"); 369 log.info("New Members : " + newMembers.size() + " (" + newMembers + ")"); 370 log.info("All Members : " + allMembers.size() + " (" + allMembers + ")"); 371 purgeDeadMembers(deadMembers); 372 373 } 375 376 378 public void processEvent(Object event) 379 { 380 KeyChangeEvent kce = (KeyChangeEvent) event; 381 notifyKeyListeners(kce.key, kce.replicants); 382 } 383 384 static class KeyChangeEvent 385 { 386 String key; 387 List replicants; 388 } 389 390 392 public void add(String key, Serializable replicant) throws Exception 393 { 394 if( trace ) 395 log.trace("add, key="+key+", value="+replicant); 396 partitionNameKnown.acquire (); 398 Object [] args = {key, this.nodeName, replicant}; 399 partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true); 400 synchronized(localReplicants) 401 { 402 localReplicants.put(key, replicant); 403 notifyKeyListeners(key, lookupReplicants(key)); 404 } 405 } 406 407 public void remove(String key) throws Exception 408 { 409 partitionNameKnown.acquire (); 411 if (localReplicants.containsKey(key)) 414 { 415 Object [] args = {key, this.nodeName}; 416 partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true); 417 removeLocal(key); 418 } 419 } 420 421 private void removeLocal(String key) 422 { 423 synchronized(localReplicants) 424 { 425 localReplicants.remove(key); 426 List result = lookupReplicants(key); 427 if (result == null) 428 result = new ArrayList (); notifyKeyListeners(key, result); 430 } 431 } 432 433 public Serializable lookupLocalReplicant(String key) 434 { 435 return (Serializable )localReplicants.get(key); 436 } 437 438 public List lookupReplicants(String key) 439 { 440 Serializable local = lookupLocalReplicant(key); 441 HashMap replicant = (HashMap )replicants.get(key); 442 if (replicant == null && local == null) 443 return null; 444 445 ArrayList rtn = new ArrayList (); 446 447 if (replicant == null) 448 { 449 if (local != null) 450 rtn.add(local); 451 } 452 else 453 { 454 ClusterNode[] nodes = partition.getClusterNodes(); 456 String replNode; 457 Object replVal; 458 for (int i = 0; i < nodes.length; i++) 459 { 460 replNode = nodes[i].getName(); 461 if (local != null && nodeName.equals(replNode)) 462 { 463 rtn.add(local); 464 continue; 465 } 466 467 replVal = replicant.get(replNode); 468 if (replVal != null) 469 rtn.add(replVal); 470 } 471 } 472 473 return rtn; 474 } 475 476 public List lookupReplicantsNodeNames(String key) 477 { 478 boolean locallyReplicated = localReplicants.containsKey (key); 479 HashMap replicant = (HashMap )replicants.get(key); 480 if (replicant == null && !locallyReplicated) 481 return null; 482 483 ArrayList rtn = new ArrayList (); 484 485 if (replicant == null) 486 { 487 if (locallyReplicated) 488 rtn.add(this.nodeName); 489 } 490 else 491 { 492 Set keys = replicant.keySet(); 494 ClusterNode[] nodes = partition.getClusterNodes(); 495 String keyOwner; 496 for (int i = 0; i < nodes.length; i++) 497 { 498 keyOwner = nodes[i].getName(); 499 if (locallyReplicated && nodeName.equals(keyOwner)) 500 { 501 rtn.add(this.nodeName); 502 continue; 503 } 504 505 if (keys.contains(keyOwner)) 506 rtn.add(keyOwner); 507 } 508 } 509 510 return rtn; 511 } 512 513 public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber) 514 { 515 synchronized(keyListeners) 516 { 517 ArrayList listeners = (ArrayList )keyListeners.get(key); 518 if (listeners == null) 519 { 520 listeners = new ArrayList (); 521 keyListeners.put(key, listeners); 522 } 523 listeners.add(subscriber); 524 } 525 } 526 527 public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber) 528 { 529 synchronized(keyListeners) 530 { 531 ArrayList listeners = (ArrayList )keyListeners.get (key); 532 if (listeners == null) return; 533 534 listeners.remove(subscriber); 535 if (listeners.size() == 0) 536 keyListeners.remove(key); 537 538 } 539 } 540 541 public int getReplicantsViewId(String key) 542 { 543 Integer result = (Integer )this.intraviewIdCache.get (key); 544 545 if (result == null) 546 return 0; 547 else 548 return result.intValue (); 549 } 550 551 public boolean isMasterReplica (String key) 552 { 553 if( trace ) 554 log.trace("isMasterReplica, key="+key); 555 if (!localReplicants.containsKey (key)) 558 { 559 if( trace ) 560 log.trace("no localReplicants, key="+key+", isMasterReplica=false"); 561 return false; 562 } 563 564 Vector allNodes = this.partition.getCurrentView (); 565 HashMap repForKey = (HashMap )replicants.get(key); 566 if (repForKey==null) 567 { 568 if( trace ) 569 log.trace("no replicants, key="+key+", isMasterReplica=true"); 570 return true; 571 } 572 Vector replicaNodes = new Vector ((repForKey).keySet ()); 573 boolean isMasterReplica = false; 574 for (int i=0; i<allNodes.size (); i++) 575 { 576 String aMember = (String )allNodes.elementAt (i); 577 if( trace ) 578 log.trace("Testing member: "+aMember); 579 if (replicaNodes.contains (aMember)) 580 { 581 if( trace ) 582 log.trace("Member found in replicaNodes, isMasterReplica=false"); 583 break; 584 } 585 else if (aMember.equals (this.nodeName)) 586 { 587 if( trace ) 588 log.trace("Member == nodeName, isMasterReplica=true"); 589 isMasterReplica = true; 590 break; 591 } 592 } 593 return isMasterReplica; 594 } 595 596 598 604 public void _add(String key, String nodeName, Serializable replicant) 605 { 606 if( trace ) 607 log.trace("_add(" + key + ", " + nodeName); 608 609 try 610 { 611 addReplicant(key, nodeName, replicant); 612 KeyChangeEvent kce = new KeyChangeEvent(); 614 kce.key = key; 615 kce.replicants = lookupReplicants(key); 616 asynchHandler.queueEvent(kce); 617 } 618 catch (Exception ex) 619 { 620 log.error("_add failed", ex); 621 } 622 } 623 624 629 public void _remove(String key, String nodeName) 630 { 631 try 632 { 633 if (removeReplicant (key, nodeName)) { 634 KeyChangeEvent kce = new KeyChangeEvent(); 636 kce.key = key; 637 kce.replicants = lookupReplicants(key); 638 asynchHandler.queueEvent(kce); 639 } 640 } 641 catch (Exception ex) 642 { 643 log.error("_remove failed", ex); 644 } 645 } 646 647 protected boolean removeReplicant (String key, String nodeName) throws Exception 648 { 649 synchronized(replicants) 650 { 651 HashMap replicant = (HashMap )replicants.get(key); 652 if (replicant == null) return false; 653 Object removed = replicant.remove(nodeName); 654 if (removed != null) 655 { 656 Collection values = replicant.values(); 657 if (values.size() == 0) 658 { 659 replicants.remove(key); 660 } 661 return true; 662 } 663 } 664 return false; 665 } 666 667 672 public Object [] lookupLocalReplicants() throws Exception 673 { 674 partitionNameKnown.acquire (); 676 Object [] rtn = {this.nodeName, localReplicants}; 677 if( trace ) 678 log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ()); 679 return rtn; 680 } 681 682 684 686 protected int calculateReplicantsHash (List members) 687 { 688 int result = 0; 689 Object obj = null; 690 691 for (int i=0; i<members.size (); i++) 692 { 693 obj = members.get (i); 694 if (obj != null) 695 result+= obj.hashCode (); } 697 698 return result; 699 } 700 701 protected int updateReplicantsHashId (String key) 702 { 703 List nodes = this.lookupReplicantsNodeNames (key); 706 int result = 0; 707 708 if ( (nodes == null) || (nodes.size () == 0) ) 709 { 710 this.intraviewIdCache.remove (key); 713 } 714 else 715 { 716 result = this.calculateReplicantsHash (nodes); 717 this.intraviewIdCache.put (key, new Integer (result)); 718 } 719 720 return result; 721 722 } 723 724 728 734 protected void addReplicant(String key, String nodeName, Serializable replicant) 735 { 736 addReplicant(replicants, key, nodeName, replicant); 737 } 738 739 746 protected void addReplicant(Map map, String key, String nodeName, Serializable replicant) 747 { 748 synchronized(map) 749 { 750 HashMap rep = (HashMap )map.get(key); 751 if (rep == null) 752 { 753 if( trace ) 754 log.trace("_adding new HashMap"); 755 rep = new HashMap (); 756 map.put(key, rep); 757 } 758 rep.put(nodeName, replicant); 759 } 760 } 761 762 protected Vector getKeysReplicatedByNode (String nodeName) 763 { 764 Vector result = new Vector (); 765 synchronized (replicants) 766 { 767 Iterator keysIter = replicants.keySet ().iterator (); 768 while (keysIter.hasNext ()) 769 { 770 String key = (String )keysIter.next (); 771 HashMap values = (HashMap )replicants.get (key); 772 if ( (values != null) && values.containsKey (nodeName) ) 773 { 774 result.add (key); 775 } 776 } 777 } 778 return result; 779 } 780 781 787 protected boolean replicantEntryAlreadyExists (String key, String nodeName) 788 { 789 return replicantEntryAlreadyExists (replicants, key, nodeName); 790 } 791 792 795 protected boolean replicantEntryAlreadyExists (Map map, String key, String nodeName) 796 { 797 HashMap rep = (HashMap )map.get(key); 798 if (rep == null) 799 return false; 800 else 801 return rep.containsKey (nodeName); 802 } 803 804 810 protected void notifyKeyListeners(String key, List newReplicants) 811 { 812 if( trace ) 813 log.trace("notifyKeyListeners"); 814 815 int newId = updateReplicantsHashId (key); 818 819 ArrayList listeners = (ArrayList )keyListeners.get(key); 820 if (listeners == null) 821 { 822 if( trace ) 823 log.trace("listeners is null"); 824 return; 825 } 826 827 DistributedReplicantManager.ReplicantListener[] toNotify = null; 829 synchronized(listeners) 830 { 831 toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()]; 832 toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify); 833 } 834 835 if( trace ) 836 log.trace("notifying " + toNotify.length + " listeners for key change: " + key); 837 for (int i = 0; i < toNotify.length; i++) 838 { 839 if (toNotify[i] != null) 840 toNotify[i].replicantsChanged(key, newReplicants, newId); 841 } 842 } 843 844 protected void republishLocalReplicants() 845 { 846 try 847 { 848 if( trace ) 849 log.trace("Start Re-Publish local replicants in DRM"); 850 851 HashMap localReplicants; 852 synchronized (this.localReplicants) 853 { 854 localReplicants = new HashMap (this.localReplicants); 855 } 856 857 Iterator entries = localReplicants.entrySet().iterator(); 858 while( entries.hasNext() ) 859 { 860 Map.Entry entry = (Map.Entry ) entries.next(); 861 String key = (String ) entry.getKey(); 862 Object replicant = entry.getValue(); 863 if (replicant != null) 864 { 865 if( trace ) 866 log.trace("publishing, key=" + key + ", value=" + replicant); 867 868 Object [] args = {key, this.nodeName, replicant}; 869 870 partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true); 871 notifyKeyListeners(key, lookupReplicants(key)); 872 } 873 } 874 if( trace ) 875 log.trace("End Re-Publish local replicants"); 876 } 877 catch (Exception e) 878 { 879 log.error("Re-Publish failed", e); 880 } 881 } 882 883 887 protected void mergeMembers() 888 { 889 try 890 { 891 log.debug("Start merging members in DRM service..."); 892 java.util.HashSet notifies = new java.util.HashSet (); 893 ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME, 894 "lookupLocalReplicants", 895 new Object []{}, new Class []{}, true); 896 if (rsp.size() == 0) 897 log.debug("No responses from other nodes during the DRM merge process."); 898 else 899 { 900 log.debug("The DRM merge process has received " + rsp.size() + " answers"); 901 } 902 for (int i = 0; i < rsp.size(); i++) 903 { 904 Object o = rsp.get(i); 905 if (o == null) 906 { 907 log.warn("As part of the answers received during the DRM merge process, a NULL message was received!"); 908 continue; 909 } 910 else if (o instanceof Throwable ) 911 { 912 log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable ) o); 913 continue; 914 } 915 916 Object [] objs = (Object []) o; 917 String node = (String )objs[0]; 918 Map replicants = (Map )objs[1]; 919 Iterator keys = replicants.keySet().iterator(); 920 921 while (keys.hasNext()) 923 { 924 String key = (String )keys.next(); 925 if (!replicantEntryAlreadyExists (key, node)) 927 { 928 addReplicant(key, node, (Serializable )replicants.get(key)); 929 notifies.add (key); 930 } 931 } 932 933 Vector currentStatus = getKeysReplicatedByNode (node); 934 if (currentStatus.size () > replicants.size ()) 935 { 936 for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++) 940 { 941 String theKey = (String )currentStatus.elementAt (currentKeysId); 942 if (!replicants.containsKey (theKey)) 943 { 944 removeReplicant (theKey, node); 945 notifies.add(theKey); 946 } 947 } 948 } 949 } 950 951 Iterator notifIter = notifies.iterator (); 952 while (notifIter.hasNext ()) 953 { 954 String key = (String )notifIter.next (); 955 notifyKeyListeners(key, lookupReplicants(key)); 956 } 957 log.debug ("..Finished merging members in DRM service"); 958 959 } 960 catch (Exception ex) 961 { 962 log.error("merge failed", ex); 963 } 964 } 965 966 970 protected void purgeDeadMembers(Vector deadMembers) 971 { 972 if (deadMembers.size() <= 0) 973 return; 974 975 log.debug("purgeDeadMembers, "+deadMembers); 976 try 977 { 978 synchronized(replicants) 979 { 980 Iterator keys = replicants.keySet().iterator(); 981 while (keys.hasNext()) 982 { 983 String key = (String )keys.next(); 984 HashMap replicant = (HashMap )replicants.get(key); 985 boolean modified = false; 986 for (int i = 0; i < deadMembers.size(); i++) 987 { 988 String node = deadMembers.elementAt(i).toString(); 989 log.debug("trying to remove deadMember " + node + " for key " + key); 990 Object removed = replicant.remove(node); 991 if (removed != null) 992 { 993 log.debug(node + " was removed"); 994 modified = true; 995 } 996 else 997 { 998 log.debug(node + " was NOT removed!!!"); 999 } 1000 } 1001 if (modified) 1002 { 1003 notifyKeyListeners(key, lookupReplicants(key)); 1004 } 1005 } 1006 } 1007 } 1008 catch (Exception ex) 1009 { 1010 log.error("purgeDeadMembers failed", ex); 1011 } 1012 } 1013 1014 1016 protected void cleanupKeyListeners() 1017 { 1018 } 1020 1021 protected synchronized static int nextThreadID() 1022 { 1023 return threadID ++; 1024 } 1025 1026 1028 1030 protected class MergeMembers extends Thread 1031 { 1032 public MergeMembers() 1033 { 1034 super("DRM Async Merger#"+nextThreadID()); 1035 } 1036 1037 1041 public void run() 1042 { 1043 log.debug("Sleeping for 50ms before mergeMembers"); 1044 try 1045 { 1046 Thread.sleep(50); 1050 } 1051 catch (Exception ignored) 1052 { 1053 } 1054 mergeMembers(); 1055 } 1056 } 1057 1058 protected class MembersPublisher extends Thread 1059 { 1060 public MembersPublisher() 1061 { 1062 super("DRM Async Publisher#"+nextThreadID()); 1063 } 1064 1065 1069 public void run() 1070 { 1071 log.debug("DRM: Sleeping before re-publishing for 50ms just in case"); 1072 try 1073 { 1074 Thread.sleep(50); 1078 } 1079 catch (Exception ignored) 1080 { 1081 } 1082 republishLocalReplicants(); 1083 } 1084 } 1085} 1086 | Popular Tags |