| 1 22 package org.jboss.ha.framework.server; 23 24 import java.util.Set ; 25 import java.util.Vector ; 26 import java.util.ArrayList ; 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.Collection ; 30 import java.util.HashSet ; 31 import java.util.List ; 32 import java.util.Map ; 33 34 import java.io.Serializable ; 35 36 import javax.management.MBeanServer ; 37 import javax.management.ObjectName ; 38 39 import EDU.oswego.cs.dl.util.concurrent.Latch; 40 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 41 42 import org.jboss.logging.Logger; 43 44 import org.jboss.ha.framework.interfaces.ClusterNode; 45 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 46 import org.jboss.ha.framework.interfaces.HAPartition; 47 48 49 57 public class DistributedReplicantManagerImpl 58 implements DistributedReplicantManagerImplMBean, 59 HAPartition.HAMembershipExtendedListener, 60 HAPartition.HAPartitionStateTransfer, 61 AsynchEventHandler.AsynchEventProcessor 62 { 63 65 protected final static String SERVICE_NAME = "DistributedReplicantManager"; 66 67 protected static int threadID; 69 70 protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap(); 71 protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap(); 72 protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap(); 73 protected HashMap intraviewIdCache = new HashMap (); 74 protected HAPartition partition; 75 76 protected AsynchEventHandler asynchHandler; 77 78 protected Logger log; 79 80 protected String nodeName = null; 81 82 protected Latch partitionNameKnown = new Latch (); 83 protected boolean trace; 84 85 protected Class [] add_types=new Class []{String .class, String .class, Serializable .class}; 86 protected Class [] remove_types=new Class []{String .class, String .class}; 87 88 90 92 100 public DistributedReplicantManagerImpl(ClusterPartition partition) 101 { 102 this(partition.getHAPartition()); 103 partition.setDistributedReplicantManager(this); 105 } 106 107 114 public DistributedReplicantManagerImpl(HAPartition partition) 115 { 116 this.partition = partition; 117 this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() + 118 "." + partition.getPartitionName()); 119 this.trace = log.isTraceEnabled(); 120 } 121 122 124 public void create() throws Exception  125 { 126 log.debug("registerRPCHandler"); 127 partition.registerRPCHandler(SERVICE_NAME, this); 128 log.debug("subscribeToStateTransferEvents"); 129 partition.subscribeToStateTransferEvents(SERVICE_NAME, this); 130 log.debug("registerMembershipListener"); 131 partition.registerMembershipListener(this); 132 } 133 134 public void start() throws Exception  135 { 136 this.nodeName = this.partition.getNodeName(); 137 138 asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler"); 140 asynchHandler.start(); 141 142 partitionNameKnown.release (); 144 } 147 148 public void stop() throws Exception  149 { 150 try 152 { 153 asynchHandler.stop(); 154 } 155 catch( Exception e) 156 { 157 log.warn("Failed to stop asynchHandler", e); 158 } 159 160 } 162 163 public void destroy() throws Exception  165 { 166 if (localReplicants != null) 168 { 169 synchronized(localReplicants) 170 { 171 String [] keys = new String [localReplicants.size()]; 172 localReplicants.keySet().toArray(keys); 173 for(int n = 0; n < keys.length; n ++) 174 { 175 this.removeLocal(keys[n]); } 178 } 179 } 180 181 partition.unregisterRPCHandler(SERVICE_NAME, this); 182 partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this); 183 partition.unregisterMembershipListener(this); 184 } 185 186 public void registerWithJmx(MBeanServer server) throws Exception  187 { 188 server.registerMBean(this, getObjectName()); 189 } 190 191 public void unregisterWithJmx(MBeanServer server) throws Exception  192 { 193 server.unregisterMBean(getObjectName()); 194 } 195 196 private ObjectName getObjectName() throws Exception  197 { 198 return new ObjectName ("jboss:service=" + SERVICE_NAME + ",partition=" + partition.getPartitionName()); 199 } 200 201 public String listContent () throws Exception  202 { 203 java.util.Collection services = this.getAllServices (); 206 207 StringBuffer result = new StringBuffer (); 208 java.util.Iterator catsIter = services.iterator (); 209 210 result.append ("<pre>"); 211 212 while (catsIter.hasNext ()) 213 { 214 String category = (String )catsIter.next (); 215 HashMap content = (HashMap )this.replicants.get (category); 216 if (content == null) 217 content = new HashMap (); 218 java.util.Iterator keysIter = content.keySet ().iterator (); 219 220 result.append ("-----------------------------------------------\n"); 221 result.append ("Service : ").append (category).append ("\n\n"); 222 223 Serializable local = lookupLocalReplicant(category); 224 if (local == null) 225 result.append ("\t- Service is *not* available locally\n"); 226 else 227 result.append ("\t- Service *is* also available locally\n"); 228 229 while (keysIter.hasNext ()) 230 { 231 String location = (String )keysIter.next (); 232 result.append ("\t- ").append(location).append ("\n"); 233 } 234 235 result.append ("\n"); 236 237 } 238 239 result.append ("</pre>"); 240 241 return result.toString (); 242 } 243 244 public String listXmlContent () throws Exception  245 { 246 java.util.Collection services = this.getAllServices (); 249 StringBuffer result = new StringBuffer (); 250 251 result.append ("<ReplicantManager>\n"); 252 253 java.util.Iterator catsIter = services.iterator (); 254 while (catsIter.hasNext ()) 255 { 256 String category = (String )catsIter.next (); 257 HashMap content = (HashMap )this.replicants.get (category); 258 if (content == null) 259 content = new HashMap (); 260 java.util.Iterator keysIter = content.keySet ().iterator (); 261 262 result.append ("\t<Service>\n"); 263 result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n"); 264 265 266 Serializable local = lookupLocalReplicant(category); 267 if (local != null) 268 { 269 result.append ("\t\t<Location>\n"); 270 result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n"); 271 result.append ("\t\t</Location>\n"); 272 } 273 274 while (keysIter.hasNext ()) 275 { 276 String location = (String )keysIter.next (); 277 result.append ("\t\t<Location>\n"); 278 result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n"); 279 result.append ("\t\t</Location>\n"); 280 } 281 282 result.append ("\t<Service>\n"); 283 284 } 285 286 result.append ("<ReplicantManager>\n"); 287 288 return result.toString (); 289 } 290 291 293 public Serializable getCurrentState () 294 { 295 java.util.Collection services = this.getAllServices (); 296 HashMap result = new HashMap (); 297 298 java.util.Iterator catsIter = services.iterator (); 299 while (catsIter.hasNext ()) 300 { 301 String category = (String )catsIter.next (); 302 HashMap content = (HashMap )this.replicants.get (category); 303 if (content == null) 304 content = new HashMap (); 305 else 306 content = (HashMap )content.clone (); 307 308 Serializable local = lookupLocalReplicant(category); 309 if (local != null) 310 content.put (this.nodeName, local); 311 312 result.put (category, content); 313 } 314 315 Object [] globalResult = new Object [] {result, intraviewIdCache}; 318 return globalResult; 319 } 320 321 public void setCurrentState(Serializable newState) 322 { 323 Object [] globalState = (Object [])newState; 324 325 HashMap map = (HashMap )globalState[0]; 326 this.replicants.putAll(map); 327 this.intraviewIdCache = (HashMap )globalState[1]; 328 329 if( trace ) 330 { 331 log.trace(nodeName + ": received new state, will republish local replicants"); 332 } 333 MembersPublisher publisher = new MembersPublisher(); 334 publisher.start(); 335 } 336 337 public Collection getAllServices () 338 { 339 HashSet services = new HashSet (); 340 services.addAll (localReplicants.keySet ()); 341 services.addAll (replicants.keySet ()); 342 return services; 343 } 344 345 347 public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups) 348 { 349 log.info("Merging partitions..."); 353 log.info("Dead members: " + deadMembers.size()); 354 log.info("Originating groups: " + originatingGroups); 355 purgeDeadMembers(deadMembers); 356 if (newMembers.size() > 0) 357 { 358 new MergeMembers().start(); 359 } 360 } 361 362 public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) 363 { 364 log.info("I am (" + nodeName + ") received membershipChanged event:"); 368 log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")"); 369 log.info("New Members : " + newMembers.size() + " (" + newMembers + ")"); 370 log.info("All Members : " + allMembers.size() + " (" + allMembers + ")"); 371 purgeDeadMembers(deadMembers); 372 373 } 375 376 378 public void processEvent(Object event) 379 { 380 KeyChangeEvent kce = (KeyChangeEvent) event; 381 notifyKeyListeners(kce.key, kce.replicants); 382 } 383 384 static class KeyChangeEvent 385 { 386 String key; 387 List replicants; 388 } 389 390 392 public void add(String key, Serializable replicant) throws Exception  393 { 394 if( trace ) 395 log.trace("add, key="+key+", value="+replicant); 396 partitionNameKnown.acquire (); 398 Object [] args = {key, this.nodeName, replicant}; 399 partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true); 400 synchronized(localReplicants) 401 { 402 localReplicants.put(key, replicant); 403 notifyKeyListeners(key, lookupReplicants(key)); 404 } 405 } 406 407 public void remove(String key) throws Exception  408 { 409 partitionNameKnown.acquire (); 411 if (localReplicants.containsKey(key)) 414 { 415 Object [] args = {key, this.nodeName}; 416 partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true); 417 removeLocal(key); 418 } 419 } 420 421 private void removeLocal(String key) 422 { 423 synchronized(localReplicants) 424 { 425 localReplicants.remove(key); 426 List result = lookupReplicants(key); 427 if (result == null) 428 result = new ArrayList (); notifyKeyListeners(key, result); 430 } 431 } 432 433 public Serializable lookupLocalReplicant(String key) 434 { 435 return (Serializable )localReplicants.get(key); 436 } 437 438 public List lookupReplicants(String key) 439 { 440 Serializable local = lookupLocalReplicant(key); 441 HashMap replicant = (HashMap )replicants.get(key); 442 if (replicant == null && local == null) 443 return null; 444 445 ArrayList rtn = new ArrayList (); 446 447 if (replicant == null) 448 { 449 if (local != null) 450 rtn.add(local); 451 } 452 else 453 { 454 ClusterNode[] nodes = partition.getClusterNodes(); 456 String replNode; 457 Object replVal; 458 for (int i = 0; i < nodes.length; i++) 459 { 460 replNode = nodes[i].getName(); 461 if (local != null && nodeName.equals(replNode)) 462 { 463 rtn.add(local); 464 continue; 465 } 466 467 replVal = replicant.get(replNode); 468 if (replVal != null) 469 rtn.add(replVal); 470 } 471 } 472 473 return rtn; 474 } 475 476 public List lookupReplicantsNodeNames(String key) 477 { 478 boolean locallyReplicated = localReplicants.containsKey (key); 479 HashMap replicant = (HashMap )replicants.get(key); 480 if (replicant == null && !locallyReplicated) 481 return null; 482 483 ArrayList rtn = new ArrayList (); 484 485 if (replicant == null) 486 { 487 if (locallyReplicated) 488 rtn.add(this.nodeName); 489 } 490 else 491 { 492 Set keys = replicant.keySet(); 494 ClusterNode[] nodes = partition.getClusterNodes(); 495 String keyOwner; 496 for (int i = 0; i < nodes.length; i++) 497 { 498 keyOwner = nodes[i].getName(); 499 if (locallyReplicated && nodeName.equals(keyOwner)) 500 { 501 rtn.add(this.nodeName); 502 continue; 503 } 504 505 if (keys.contains(keyOwner)) 506 rtn.add(keyOwner); 507 } 508 } 509 510 return rtn; 511 } 512 513 public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber) 514 { 515 synchronized(keyListeners) 516 { 517 ArrayList listeners = (ArrayList )keyListeners.get(key); 518 if (listeners == null) 519 { 520 listeners = new ArrayList (); 521 keyListeners.put(key, listeners); 522 } 523 listeners.add(subscriber); 524 } 525 } 526 527 public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber) 528 { 529 synchronized(keyListeners) 530 { 531 ArrayList listeners = (ArrayList )keyListeners.get (key); 532 if (listeners == null) return; 533 534 listeners.remove(subscriber); 535 if (listeners.size() == 0) 536 keyListeners.remove(key); 537 538 } 539 } 540 541 public int getReplicantsViewId(String key) 542 { 543 Integer result = (Integer )this.intraviewIdCache.get (key); 544 545 if (result == null) 546 return 0; 547 else 548 return result.intValue (); 549 } 550 551 public boolean isMasterReplica (String key) 552 { 553 if( trace ) 554 log.trace("isMasterReplica, key="+key); 555 if (!localReplicants.containsKey (key)) 558 { 559 if( trace ) 560 log.trace("no localReplicants, key="+key+", isMasterReplica=false"); 561 return false; 562 } 563 564 Vector allNodes = this.partition.getCurrentView (); 565 HashMap repForKey = (HashMap )replicants.get(key); 566 if (repForKey==null) 567 { 568 if( trace ) 569 log.trace("no replicants, key="+key+", isMasterReplica=true"); 570 return true; 571 } 572 Vector replicaNodes = new Vector ((repForKey).keySet ()); 573 boolean isMasterReplica = false; 574 for (int i=0; i<allNodes.size (); i++) 575 { 576 String aMember = (String )allNodes.elementAt (i); 577 if( trace ) 578 log.trace("Testing member: "+aMember); 579 if (replicaNodes.contains (aMember)) 580 { 581 if( trace ) 582 log.trace("Member found in replicaNodes, isMasterReplica=false"); 583 break; 584 } 585 else if (aMember.equals (this.nodeName)) 586 { 587 if( trace ) 588 log.trace("Member == nodeName, isMasterReplica=true"); 589 isMasterReplica = true; 590 break; 591 } 592 } 593 return isMasterReplica; 594 } 595 596 598 604 public void _add(String key, String nodeName, Serializable replicant) 605 { 606 if( trace ) 607 log.trace("_add(" + key + ", " + nodeName); 608 609 try 610 { 611 addReplicant(key, nodeName, replicant); 612 KeyChangeEvent kce = new KeyChangeEvent(); 614 kce.key = key; 615 kce.replicants = lookupReplicants(key); 616 asynchHandler.queueEvent(kce); 617 } 618 catch (Exception ex) 619 { 620 log.error("_add failed", ex); 621 } 622 } 623 624 629 public void _remove(String key, String nodeName) 630 { 631 try 632 { 633 if (removeReplicant (key, nodeName)) { 634 KeyChangeEvent kce = new KeyChangeEvent(); 636 kce.key = key; 637 kce.replicants = lookupReplicants(key); 638 asynchHandler.queueEvent(kce); 639 } 640 } 641 catch (Exception ex) 642 { 643 log.error("_remove failed", ex); 644 } 645 } 646 647 protected boolean removeReplicant (String key, String nodeName) throws Exception  648 { 649 synchronized(replicants) 650 { 651 HashMap replicant = (HashMap )replicants.get(key); 652 if (replicant == null) return false; 653 Object removed = replicant.remove(nodeName); 654 if (removed != null) 655 { 656 Collection values = replicant.values(); 657 if (values.size() == 0) 658 { 659 replicants.remove(key); 660 } 661 return true; 662 } 663 } 664 return false; 665 } 666 667 672 public Object [] lookupLocalReplicants() throws Exception  673 { 674 partitionNameKnown.acquire (); 676 Object [] rtn = {this.nodeName, localReplicants}; 677 if( trace ) 678 log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ()); 679 return rtn; 680 } 681 682 684 686 protected int calculateReplicantsHash (List members) 687 { 688 int result = 0; 689 Object obj = null; 690 691 for (int i=0; i<members.size (); i++) 692 { 693 obj = members.get (i); 694 if (obj != null) 695 result+= obj.hashCode (); } 697 698 return result; 699 } 700 701 protected int updateReplicantsHashId (String key) 702 { 703 List nodes = this.lookupReplicantsNodeNames (key); 706 int result = 0; 707 708 if ( (nodes == null) || (nodes.size () == 0) ) 709 { 710 this.intraviewIdCache.remove (key); 713 } 714 else 715 { 716 result = this.calculateReplicantsHash (nodes); 717 this.intraviewIdCache.put (key, new Integer (result)); 718 } 719 720 return result; 721 722 } 723 724 728 734 protected void addReplicant(String key, String nodeName, Serializable replicant) 735 { 736 addReplicant(replicants, key, nodeName, replicant); 737 } 738 739 746 protected void addReplicant(Map map, String key, String nodeName, Serializable replicant) 747 { 748 synchronized(map) 749 { 750 HashMap rep = (HashMap )map.get(key); 751 if (rep == null) 752 { 753 if( trace ) 754 log.trace("_adding new HashMap"); 755 rep = new HashMap (); 756 map.put(key, rep); 757 } 758 rep.put(nodeName, replicant); 759 } 760 } 761 762 protected Vector getKeysReplicatedByNode (String nodeName) 763 { 764 Vector result = new Vector (); 765 synchronized (replicants) 766 { 767 Iterator keysIter = replicants.keySet ().iterator (); 768 while (keysIter.hasNext ()) 769 { 770 String key = (String )keysIter.next (); 771 HashMap values = (HashMap )replicants.get (key); 772 |