| 1 22 package org.jboss.test.cluster.test; 23 24 import java.rmi.RemoteException ; 25 import java.rmi.server.UnicastRemoteObject ; 26 import java.security.SecureRandom ; 27 import java.util.ArrayList ; 28 import java.util.HashMap ; 29 import java.util.Iterator ; 30 import java.util.Vector ; 31 import java.util.List ; 32 import java.util.HashSet ; 33 34 import javax.management.MBeanServer ; 35 import javax.management.MBeanServerFactory ; 36 import javax.management.MBeanServerInvocationHandler ; 37 import javax.management.ObjectName ; 38 import javax.management.Notification ; 39 40 import junit.framework.Test; 41 42 import org.jboss.test.JBossClusteredTestCase; 43 import org.jboss.test.cluster.drm.IReplicants; 44 import org.jboss.test.cluster.drm.MockHAPartition; 45 import org.jboss.ha.framework.interfaces.ClusterNode; 46 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 47 import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener; 48 import org.jboss.ha.framework.server.DistributedReplicantManagerImpl; 49 import org.jboss.jmx.adaptor.rmi.RMIAdaptor; 50 import org.jboss.jmx.adaptor.rmi.RMIAdaptorExt; 51 import org.jboss.jmx.adaptor.rmi.RMINotificationListener; 52 import org.jboss.logging.Logger; 53 import org.jgroups.stack.IpAddress; 54 55 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 56 57 63 public class DRMTestCase extends JBossClusteredTestCase 64 { 65 static class TestListener extends UnicastRemoteObject  66 implements RMINotificationListener 67 { 68 private static final long serialVersionUID = 1; 69 private Logger log; 70 71 public TestListener(Logger log) throws RemoteException  72 { 73 this.log = log; 74 } 75 public void handleNotification(Notification notification, Object handback) 76 throws RemoteException  77 { 78 log.info("handleNotification, "+notification); 79 } 80 } 81 82 87 static class BlockingListenerThread extends Thread 88 implements DistributedReplicantManager.ReplicantListener 89 { 90 private DistributedReplicantManagerImpl drm; 91 private String nodeName; 92 private boolean add; 93 private boolean blocking; 94 private Exception ex; 95 96 BlockingListenerThread(DistributedReplicantManagerImpl drm, 97 boolean add, 98 String nodeName) 99 { 100 this.drm = drm; 101 this.add =add; 102 this.nodeName = nodeName; 103 drm.registerListener("TEST", this); 104 } 105 106 public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId) 107 { 108 blocking = true; 109 synchronized(lock) 110 { 111 blocking = false; 112 } 113 } 114 115 public void run() 116 { 117 try 118 { 119 if (add) 120 { 121 if (nodeName == null) 122 drm.add("TEST", "local-replicant"); 123 else 124 drm._add("TEST", nodeName, "remote-replicant"); 125 } 126 else 127 { 128 if (nodeName == null) 129 drm.remove("TEST"); 130 else 131 drm._remove("TEST", nodeName); 132 } 133 } 134 catch (Exception e) 135 { 136 ex = e; 137 } 138 } 139 140 public boolean isBlocking() 141 { 142 return blocking; 143 } 144 145 public Exception getException() 146 { 147 return ex; 148 } 149 150 } 151 152 155 static class RegistrationThread extends Thread  156 { 157 private DistributedReplicantManager drm; 158 private boolean registered = false; 159 private boolean unregistered = true; 160 161 RegistrationThread(DistributedReplicantManager drm) 162 { 163 this.drm = drm; 164 } 165 166 public void run() 167 { 168 NullListener listener = new NullListener(); 169 drm.registerListener("DEADLOCK", listener); 170 registered = true; 171 drm.unregisterListener("DEADLOCK", listener); 172 unregistered = true; 173 } 174 175 public boolean isRegistered() 176 { 177 return registered; 178 } 179 180 public boolean isUnregistered() 181 { 182 return unregistered; 183 } 184 185 } 186 187 190 static class NullListener 191 implements DistributedReplicantManager.ReplicantListener 192 { 193 public void replicantsChanged(String key, List newReplicants, 194 int newReplicantsViewId) 195 { 196 } 198 } 199 200 205 static class MockHASingletonDeployer 206 implements DistributedReplicantManager.ReplicantListener 207 { 208 DistributedReplicantManager drm; 209 MockDeployer deployer; 210 String key; 211 boolean master = false; 212 NullListener deploymentListener = new NullListener(); 213 Exception ex; 214 Logger log; 215 Object mutex = new Object (); 216 217 MockHASingletonDeployer(MockDeployer deployer, String key, Logger log) 218 { 219 this.drm = deployer.getDRM(); 220 this.deployer = deployer; 221 this.key = key; 222 this.log = log; 223 } 224 225 public void replicantsChanged(String key, 226 List newReplicants, 227 int newReplicantsViewId) 228 { 229 if (this.key.equals(key)) 230 { 231 synchronized(mutex) 232 { 233 boolean nowMaster = drm.isMasterReplica(key); 234 235 try 236 { 237 if (!master && nowMaster) { 238 log.debug(Thread.currentThread().getName() + 239 " Deploying " + key); 240 deployer.deploy(key + "A", key, deploymentListener); 241 } 242 else if (master && !nowMaster) { 243 log.debug(Thread.currentThread().getName() + 244 " undeploying " + key); 245 deployer.undeploy(key + "A", deploymentListener); 246 } 247 else 248 { 249 log.debug(Thread.currentThread().getName() + 250 " -- no status change in " + key + 251 " -- master = " + master); 252 } 253 master = nowMaster; 254 } 255 catch (Exception e) 256 { 257 e.printStackTrace(); 258 if (ex == null) 259 ex = e; 260 } 261 } 262 } 263 } 264 265 public Exception getException() 266 { 267 return ex; 268 } 269 270 } 271 272 275 static class DeployerThread extends Thread  276 { 277 Semaphore semaphore; 278 MockDeployer deployer; 279 DistributedReplicantManager.ReplicantListener listener; 280 String key; 281 Exception ex; 282 int count = -1; 283 Logger log; 284 285 DeployerThread(MockDeployer deployer, 286 String key, 287 DistributedReplicantManager.ReplicantListener listener, 288 Semaphore semaphore, 289 Logger log) 290 { 291 super("Deployer " + key); 292 this.deployer = deployer; 293 this.listener = listener; 294 this.key = key; 295 this.semaphore = semaphore; 296 this.log = log; 297 } 298 299 public void run() 300 { 301 boolean acquired = false; 302 try 303 { 304 acquired = semaphore.attempt(60000); 305 if (!acquired) 306 throw new Exception ("Cannot acquire semaphore"); 307 SecureRandom random = new SecureRandom (); 308 for (count = 0; count < LOOP_COUNT; count++) 309 { 310 deployer.deploy(key, "JGroups", listener); 311 312 sleepThread(random.nextInt(50)); 313 deployer.undeploy(key, listener); 314 } 315 } 316 catch (Exception e) 317 { 318 e.printStackTrace(); 319 ex = e; 320 } 321 finally 322 { 323 if (acquired) 324 semaphore.release(); 325 } 326 } 327 328 public Exception getException() 329 { 330 return ex; 331 } 332 333 public int getCount() 334 { 335 return count; 336 } 337 } 338 339 344 static class JGroupsThread extends Thread  345 { 346 Semaphore semaphore; 347 DistributedReplicantManagerImpl drm; 348 String [] keys; 349 String nodeName; 350 Exception ex; 351 int count = -1; 352 int weightFactor; 353 354 JGroupsThread(DistributedReplicantManagerImpl drm, 355 String [] keys, 356 String nodeName, 357 Semaphore semaphore) 358 { 359 super("JGroups"); 360 this.drm = drm; 361 this.keys = keys; 362 this.semaphore = semaphore; 363 this.nodeName = nodeName; 364 this.weightFactor = (int) 2.5 * keys.length; 365 } 366 367 public void run() 368 { 369 boolean acquired = false; 370 try 371 { 372 acquired = semaphore.attempt(60000); 373 if (!acquired) 374 throw new Exception ("Cannot acquire semaphore"); 375 boolean[] added = new boolean[keys.length]; 376 SecureRandom random = new SecureRandom (); 377 378 for (count = 0; count < weightFactor * LOOP_COUNT; count++) 379 { 380 int pos = random.nextInt(keys.length); 381 if (added[pos]) 382 { 383 drm._remove(keys[pos], nodeName); 384 added[pos] = false; 385 } 386 else 387 { 388 drm._add(keys[pos], nodeName, ""); 389 added[pos] = true; 390 } 391 sleepThread(random.nextInt(30)); 392 } 393 } 394 catch (Exception e) 395 { 396 e.printStackTrace(); 397 ex = e; 398 } 399 finally 400 { 401 if (acquired) 402 semaphore.release(); 403 } 404 } 405 406 public Exception getException() 407 { 408 return ex; 409 } 410 411 public int getCount() 412 { 413 return (count / weightFactor); 414 } 415 416 } 417 418 423 static class MockDeployer 424 { 425 DistributedReplicantManager drm; 426 427 MockDeployer(DistributedReplicantManager drm) 428 { 429 this.drm = drm; 430 } 431 432 void deploy(String key, String replicant, 433 DistributedReplicantManager.ReplicantListener listener) 434 throws Exception 435 { 436 synchronized(this) 437 { 438 drm.registerListener(key, listener); 439 drm.add(key, replicant); 440 sleepThread(10); 441 } 442 } 443 444 void undeploy(String key, 445 DistributedReplicantManager.ReplicantListener listener) 446 throws Exception 447 { 448 synchronized(this) 449 { 450 drm.remove(key); 451 drm.unregisterListener(key, listener); 452 sleepThread(10); 453 } 454 } 455 456 DistributedReplicantManager getDRM() 457 { 458 return drm; 459 } 460 } 461 462 463 static class CachingListener implements ReplicantListener 464 { 465 List replicants = null; 466 boolean clean = true; 467 468 public void replicantsChanged(String key, List newReplicants, 469 int newReplicantsViewId) 470 { 471 this.replicants = newReplicants; 472 if (clean && newReplicants != null) 473 { 474 int last = Integer.MIN_VALUE; 475 for (Iterator iter = newReplicants.iterator(); iter.hasNext(); ) 476 { 477 int cur = ((Integer ) iter.next()).intValue(); 478 if (last >= cur) 479 { 480 clean = false; 481 break; 482 } 483 484 last = cur; 485 } 486 } 487 } 488 489 } 490 491 private static Object lock = new Object (); 492 private static int LOOP_COUNT = 30; 493 494 public static Test suite() throws Exception  495 { 496 Test t1 = getDeploySetup(DRMTestCase.class, "drm-tests.sar"); 497 return t1; 498 } 499 500 public DRMTestCase(String name) 501 { 502 super(name); 503 } 504 505 public void testStateReplication() 506 throws Exception  507 { 508 log.debug("+++ testStateReplication"); 509 log.info("java.rmi.server.hostname="+System.getProperty("java.rmi.server.hostname")); 510 RMIAdaptor[] adaptors = getAdaptors(); 511 String [] servers = super.getServers(); 512 RMIAdaptorExt server0 = (RMIAdaptorExt) adaptors[0]; 513 log.info("server0: "+server0); 514 ObjectName clusterService = new ObjectName ("jboss:service=DefaultPartition"); 515 Vector view0 = (Vector ) server0.getAttribute(clusterService, "CurrentView"); 516 log.info("server0: CurrentView, "+view0); 517 ObjectName drmService = new ObjectName ("jboss.test:service=DRMTestCase"); 518 IReplicants drm0 = (IReplicants) 519 MBeanServerInvocationHandler.newProxyInstance(server0, drmService, 520 IReplicants.class, true); 521 log.info(MBeanServerInvocationHandler .class.getProtectionDomain()); 522 TestListener listener = new TestListener(log); 523 server0.addNotificationListener(drmService, listener, null, null); 524 log.info("server0 addNotificationListener"); 525 String address = (String ) drm0.lookupLocalReplicant(); 526 log.info("server0: lookupLocalReplicant: "+address); 527 assertTrue("server0: address("+address+") == server0("+servers[0]+")", 528 address.equals(servers[0])); 529 530 RMIAdaptorExt server1 = (RMIAdaptorExt) adaptors[1]; 531 log.info("server1: "+server1); 532 Vector view1 = (Vector ) server1.getAttribute(clusterService, "CurrentView"); 533 log.info("server1: CurrentView, "+view1); 534 IReplicants drm1 = (IReplicants) 535 MBeanServerInvocationHandler.newProxyInstance(server1, drmService, 536 IReplicants.class, true); 537 server1.addNotificationListener(drmService, listener, null, null); 538 log.info("server1 addNotificationListener"); 539 address = (String ) drm1.lookupLocalReplicant(); 540 log.info("server1: lookupLocalReplicant: "+address); 541 assertTrue("server1: address("+address+") == server1("+servers[1]+")", 542 address.equals(servers[1])); 543 544 List replicants0 = drm0.lookupReplicants(); 545 List replicants1 = drm1.lookupReplicants(); 546 assertTrue("size of replicants0 == replicants1)", 547 replicants0.size() == replicants1.size()); 548 HashSet testSet = new HashSet (replicants0); 549 for(int n = 0; n < replicants0.size(); n ++) 550 { 551 Object entry = replicants1.get(n); 552 assertTrue("replicants0 contains:"+entry, testSet.contains(entry)); 553 } 554 555 for(int n = 0; n < 10; n ++) 557 { 558 drm0.add("key"+n, "data"+n+".0"); 559 drm1.add("key"+n, "data"+n+".1"); 560 } 561 for(int n = 0; n < 10; n ++) 562 { 563 String key = "key"+n; 564 log.info("key: "+key); 565 replicants0 = drm0.lookupReplicants(key); 566 replicants1 = drm1.lookupReplicants(key); 567 log.info("replicants0: "+replicants0); 568 log.info("replicants1: "+replicants1); 569 HashSet testSet0 = new HashSet (replicants0); 570 HashSet testSet1 = new HashSet (replicants1); 571 assertTrue("size of replicants0 == replicants1)", 572 replicants0.size() == replicants1.size()); 573 Object entry = drm0.lookupLocalReplicant(key); 574 log.info("drm0.lookupLocalReplicant, key="+key+", entry="+entry); 575 assertTrue("replicants0 contains:"+entry, testSet0.contains(entry)); 576 assertTrue("replicants1 contains:"+entry, testSet1.contains(entry)); 577 } 578 579 for(int n = 0; n < 10; n ++) 580 drm0.remove("key"+n); 581 582 server0.removeNotificationListener(drmService, listener); 583 server1.removeNotificationListener(drmService, listener); 584 } 585 586 595 public void testIsMasterReplica() throws Exception  596 { 597 log.debug("+++ testIsMasterReplica()"); 598 599 MBeanServer mbeanServer = 600 MBeanServerFactory.createMBeanServer("mockPartition"); 601 try { 602 ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345)); 603 MockHAPartition partition = new MockHAPartition(localAddress); 604 605 DistributedReplicantManagerImpl drm = 606 new DistributedReplicantManagerImpl(partition); 607 608 drm.create(); 609 610 612 Vector remoteAddresses = new Vector (); 613 for (int i = 1; i < 5; i++) 614 remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i))); 615 616 Vector allNodes = new Vector (remoteAddresses); 617 allNodes.add(localAddress); 618 partition.setCurrentViewClusterNodes(allNodes); 619 620 622 HashMap replicants = new HashMap (); 623 ArrayList remoteResponses = new ArrayList (); 624 for (int i = 0; i < remoteAddresses.size(); i++) 625 { 626 ClusterNode node = (ClusterNode) remoteAddresses.elementAt(i); 627 Integer replicant = new Integer (i + 1); 628 replicants.put(node.getName(), replicant); 629 HashMap localReplicant = new HashMap (); 630 localReplicant.put("Mock", replicant); 631 remoteResponses.add(new Object [] {node.getName(), localReplicant}); 632 } 633 HashMap services = new HashMap (); 634 services.put("Mock", replicants); 635 636 int hash = 0; 637 for (int i = 1; i < 5; i++) 638 hash += (new Integer (i)).hashCode(); 639 640 HashMap intraviewIds = new HashMap (); 641 intraviewIds.put("Mock", new Integer (hash)); 642 643 partition.setRemoteReplicants(remoteResponses); 644 645 drm.setCurrentState(new Object [] {services, intraviewIds }); 646 647 drm.start(); 648 649 651 drm.add("Mock", new Integer (5)); 652 653 655 assertFalse("Local node is not master after startup", 656 drm.isMasterReplica("Mock")); 657 658 660 Vector localOnly = new Vector (); 661 localOnly.add(localAddress); 662 663 partition.setCurrentViewClusterNodes(localOnly); 664 partition.setRemoteReplicants(new ArrayList ()); 665 666 drm.membershipChanged(remoteAddresses, new Vector (), localOnly); 667 668 670 assertTrue("Local node is master after split", drm.isMasterReplica("Mock")); 671 672 674 drm.remove("Mock"); 675 676 678 assertFalse("Local node is not master after dropping replicant", 679 drm.isMasterReplica("Mock")); 680 681 683 drm.add("Mock", new Integer (5)); 684 685 687 Vector mergeGroups = new Vector (); 688 mergeGroups.add(remoteAddresses); 689 mergeGroups.add(localOnly); 690 691 partition.setCurrentViewClusterNodes(allNodes); 692 partition.setRemoteReplicants(remoteResponses); 693 694 drm.membershipChangedDuringMerge(new Vector (), remoteAddresses, 695 allNodes, mergeGroups); 696 697 sleepThread(100); 699 700 702 assertFalse("Local node is not master after merge", 703 drm.isMasterReplica("Mock")); 704 } 705 finally { 706 MBeanServerFactory.releaseMBeanServer(mbeanServer); 707 } 708 } 709 710 711 720 public void testKeyListenerDeadlock() throws Exception  721 { 722 log.debug("+++ testKeyListenerDeadlock()"); 723 724 MBeanServer mbeanServer = 725 MBeanServerFactory.createMBeanServer("mockPartition"); 726 try { 727 ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345)); 728 MockHAPartition partition = new MockHAPartition(localAddress); 729 730 DistributedReplicantManagerImpl drm = 731 new DistributedReplicantManagerImpl(partition); 732 733 drm.create(); 734 735 737 Vector remoteAddresses = new Vector (); 738 for (int i = 1; i < 5; i++) 739 remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i))); 740 741 Vector allNodes = new Vector (remoteAddresses); 742 allNodes.add(localAddress); 743
|