KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > cluster > test > DRMTestCase


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.cluster.test;
23
24 import java.rmi.RemoteException JavaDoc;
25 import java.rmi.server.UnicastRemoteObject JavaDoc;
26 import java.security.SecureRandom JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.HashMap JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.Vector JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.HashSet JavaDoc;
33
34 import javax.management.MBeanServer JavaDoc;
35 import javax.management.MBeanServerFactory JavaDoc;
36 import javax.management.MBeanServerInvocationHandler JavaDoc;
37 import javax.management.ObjectName JavaDoc;
38 import javax.management.Notification JavaDoc;
39
40 import junit.framework.Test;
41
42 import org.jboss.test.JBossClusteredTestCase;
43 import org.jboss.test.cluster.drm.IReplicants;
44 import org.jboss.test.cluster.drm.MockHAPartition;
45 import org.jboss.ha.framework.interfaces.ClusterNode;
46 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
47 import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener;
48 import org.jboss.ha.framework.server.DistributedReplicantManagerImpl;
49 import org.jboss.jmx.adaptor.rmi.RMIAdaptor;
50 import org.jboss.jmx.adaptor.rmi.RMIAdaptorExt;
51 import org.jboss.jmx.adaptor.rmi.RMINotificationListener;
52 import org.jboss.logging.Logger;
53 import org.jgroups.stack.IpAddress;
54
55 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
56
57 /** Tests of the DistributedReplicantManagerImpl
58  *
59  * @author Scott.Stark@jboss.org
60  * @author Brian.Stansberry@jboss.com
61  * @version $Revision: 58603 $
62  */

63 public class DRMTestCase extends JBossClusteredTestCase
64 {
65    static class TestListener extends UnicastRemoteObject JavaDoc
66       implements RMINotificationListener
67    {
68       private static final long serialVersionUID = 1;
69       private Logger log;
70
71       public TestListener(Logger log) throws RemoteException JavaDoc
72       {
73          this.log = log;
74       }
75       public void handleNotification(Notification JavaDoc notification, Object JavaDoc handback)
76          throws RemoteException JavaDoc
77       {
78          log.info("handleNotification, "+notification);
79       }
80    }
81    
82    /**
83     * Thread that will first register a DRM ReplicantLister that synchronizes
84     * on the test class' lock object, and then calls DRM add or remove,
85     * causing the thread to block if the lock object's monitor is held.
86     */

87    static class BlockingListenerThread extends Thread JavaDoc
88       implements DistributedReplicantManager.ReplicantListener
89    {
90       private DistributedReplicantManagerImpl drm;
91       private String JavaDoc nodeName;
92       private boolean add;
93       private boolean blocking;
94       private Exception JavaDoc ex;
95       
96       BlockingListenerThread(DistributedReplicantManagerImpl drm,
97                              boolean add,
98                              String JavaDoc nodeName)
99       {
100          this.drm = drm;
101          this.add =add;
102          this.nodeName = nodeName;
103          drm.registerListener("TEST", this);
104       }
105
106       public void replicantsChanged(String JavaDoc key, List JavaDoc newReplicants, int newReplicantsViewId)
107       {
108          blocking = true;
109          synchronized(lock)
110          {
111             blocking = false;
112          }
113       }
114       
115       public void run()
116       {
117          try
118          {
119             if (add)
120             {
121                if (nodeName == null)
122                   drm.add("TEST", "local-replicant");
123                else
124                   drm._add("TEST", nodeName, "remote-replicant");
125             }
126             else
127             {
128                if (nodeName == null)
129                   drm.remove("TEST");
130                else
131                   drm._remove("TEST", nodeName);
132             }
133          }
134          catch (Exception JavaDoc e)
135          {
136             ex = e;
137          }
138       }
139       
140       public boolean isBlocking()
141       {
142          return blocking;
143       }
144       
145       public Exception JavaDoc getException()
146       {
147          return ex;
148       }
149       
150    }
151    
152    /**
153     * Thread that registers and then unregisters a DRM ReplicantListener.
154     */

155    static class RegistrationThread extends Thread JavaDoc
156    {
157       private DistributedReplicantManager drm;
158       private boolean registered = false;
159       private boolean unregistered = true;
160       
161       RegistrationThread(DistributedReplicantManager drm)
162       {
163          this.drm = drm;
164       }
165       
166       public void run()
167       {
168          NullListener listener = new NullListener();
169          drm.registerListener("DEADLOCK", listener);
170          registered = true;
171          drm.unregisterListener("DEADLOCK", listener);
172          unregistered = true;
173       }
174       
175       public boolean isRegistered()
176       {
177          return registered;
178       }
179       
180       public boolean isUnregistered()
181       {
182          return unregistered;
183       }
184       
185    }
186    
187    /**
188     * A DRM ReplicantListener that does nothing.
189     */

190    static class NullListener
191       implements DistributedReplicantManager.ReplicantListener
192    {
193       public void replicantsChanged(String JavaDoc key, List JavaDoc newReplicants,
194                                     int newReplicantsViewId)
195       {
196          // no-op
197
}
198    }
199    
200    /**
201     * DRM ReplicantListener that mimics the HASingletonDeployer service
202     * by deploying/undeploying a service if it's notified that by that DRM
203     * that it is the master replica for its key.
204     */

205    static class MockHASingletonDeployer
206       implements DistributedReplicantManager.ReplicantListener
207    {
208       DistributedReplicantManager drm;
209       MockDeployer deployer;
210       String JavaDoc key;
211       boolean master = false;
212       NullListener deploymentListener = new NullListener();
213       Exception JavaDoc ex;
214       Logger log;
215       Object JavaDoc mutex = new Object JavaDoc();
216       
217       MockHASingletonDeployer(MockDeployer deployer, String JavaDoc key, Logger log)
218       {
219          this.drm = deployer.getDRM();
220          this.deployer = deployer;
221          this.key = key;
222          this.log = log;
223       }
224
225       public void replicantsChanged(String JavaDoc key,
226                                     List JavaDoc newReplicants,
227                                     int newReplicantsViewId)
228       {
229          if (this.key.equals(key))
230          {
231             synchronized(mutex)
232             {
233                boolean nowMaster = drm.isMasterReplica(key);
234                
235                try
236                {
237                   if (!master && nowMaster) {
238                      log.debug(Thread.currentThread().getName() +
239                                " Deploying " + key);
240                      deployer.deploy(key + "A", key, deploymentListener);
241                   }
242                   else if (master && !nowMaster) {
243                      log.debug(Thread.currentThread().getName() +
244                                " undeploying " + key);
245                      deployer.undeploy(key + "A", deploymentListener);
246                   }
247                   else
248                   {
249                      log.debug(Thread.currentThread().getName() +
250                                " -- no status change in " + key +
251                                " -- master = " + master);
252                   }
253                   master = nowMaster;
254                }
255                catch (Exception JavaDoc e)
256                {
257                   e.printStackTrace();
258                   if (ex == null)
259                      ex = e;
260                }
261             }
262          }
263       }
264       
265       public Exception JavaDoc getException()
266       {
267          return ex;
268       }
269       
270    }
271    
272    /**
273     * Thread the repeatedly deploys and undeploys a MockHASingletonDeployer.
274     */

275    static class DeployerThread extends Thread JavaDoc
276    {
277       Semaphore semaphore;
278       MockDeployer deployer;
279       DistributedReplicantManager.ReplicantListener listener;
280       String JavaDoc key;
281       Exception JavaDoc ex;
282       int count = -1;
283       Logger log;
284       
285       DeployerThread(MockDeployer deployer,
286                      String JavaDoc key,
287                      DistributedReplicantManager.ReplicantListener listener,
288                      Semaphore semaphore,
289                      Logger log)
290       {
291          super("Deployer " + key);
292          this.deployer = deployer;
293          this.listener = listener;
294          this.key = key;
295          this.semaphore = semaphore;
296          this.log = log;
297       }
298       
299       public void run()
300       {
301          boolean acquired = false;
302          try
303          {
304             acquired = semaphore.attempt(60000);
305             if (!acquired)
306                throw new Exception JavaDoc("Cannot acquire semaphore");
307             SecureRandom JavaDoc random = new SecureRandom JavaDoc();
308             for (count = 0; count < LOOP_COUNT; count++)
309             {
310                deployer.deploy(key, "JGroups", listener);
311
312                sleepThread(random.nextInt(50));
313                deployer.undeploy(key, listener);
314             }
315          }
316          catch (Exception JavaDoc e)
317          {
318             e.printStackTrace();
319             ex = e;
320          }
321          finally
322          {
323             if (acquired)
324                semaphore.release();
325          }
326       }
327       
328       public Exception JavaDoc getException()
329       {
330          return ex;
331       }
332       
333       public int getCount()
334       {
335          return count;
336       }
337    }
338    
339    /**
340     * Thread that mimics the JGroups up-handler thread that calls into the DRM.
341     * Repeatedly and randomly calls adds or removes a replicant for a set
342     * of keys.
343     */

344    static class JGroupsThread extends Thread JavaDoc
345    {
346       Semaphore semaphore;
347       DistributedReplicantManagerImpl drm;
348       String JavaDoc[] keys;
349       String JavaDoc nodeName;
350       Exception JavaDoc ex;
351       int count = -1;
352       int weightFactor;
353       
354       JGroupsThread(DistributedReplicantManagerImpl drm,
355                     String JavaDoc[] keys,
356                     String JavaDoc nodeName,
357                     Semaphore semaphore)
358       {
359          super("JGroups");
360          this.drm = drm;
361          this.keys = keys;
362          this.semaphore = semaphore;
363          this.nodeName = nodeName;
364          this.weightFactor = (int) 2.5 * keys.length;
365       }
366       
367       public void run()
368       {
369          boolean acquired = false;
370          try
371          {
372             acquired = semaphore.attempt(60000);
373             if (!acquired)
374                throw new Exception JavaDoc("Cannot acquire semaphore");
375             boolean[] added = new boolean[keys.length];
376             SecureRandom JavaDoc random = new SecureRandom JavaDoc();
377             
378             for (count = 0; count < weightFactor * LOOP_COUNT; count++)
379             {
380                int pos = random.nextInt(keys.length);
381                if (added[pos])
382                {
383                   drm._remove(keys[pos], nodeName);
384                   added[pos] = false;
385                }
386                else
387                {
388                   drm._add(keys[pos], nodeName, "");
389                   added[pos] = true;
390                }
391                sleepThread(random.nextInt(30));
392             }
393          }
394          catch (Exception JavaDoc e)
395          {
396             e.printStackTrace();
397             ex = e;
398          }
399          finally
400          {
401             if (acquired)
402                semaphore.release();
403          }
404       }
405       
406       public Exception JavaDoc getException()
407       {
408          return ex;
409       }
410       
411       public int getCount()
412       {
413          return (count / weightFactor);
414       }
415       
416    }
417    
418    /**
419     * Mocks the deployer of a service that registers/unregisters DRM listeners
420     * and replicants. Only allows a single thread of execution, a la the
421     * org.jboss.system.ServiceController.
422     */

423    static class MockDeployer
424    {
425       DistributedReplicantManager drm;
426       
427       MockDeployer(DistributedReplicantManager drm)
428       {
429          this.drm = drm;
430       }
431       
432       void deploy(String JavaDoc key, String JavaDoc replicant,
433                   DistributedReplicantManager.ReplicantListener listener)
434             throws Exception JavaDoc
435       {
436          synchronized(this)
437          {
438             drm.registerListener(key, listener);
439             drm.add(key, replicant);
440             sleepThread(10);
441          }
442       }
443       
444       void undeploy(String JavaDoc key,
445                     DistributedReplicantManager.ReplicantListener listener)
446          throws Exception JavaDoc
447       {
448          synchronized(this)
449          {
450             drm.remove(key);
451             drm.unregisterListener(key, listener);
452             sleepThread(10);
453          }
454       }
455       
456       DistributedReplicantManager getDRM()
457       {
458          return drm;
459       }
460    }
461    
462    /** ReplicantListener that caches the list of replicants */
463    static class CachingListener implements ReplicantListener
464    {
465       List JavaDoc replicants = null;
466       boolean clean = true;
467       
468       public void replicantsChanged(String JavaDoc key, List JavaDoc newReplicants,
469                                     int newReplicantsViewId)
470       {
471          this.replicants = newReplicants;
472          if (clean && newReplicants != null)
473          {
474             int last = Integer.MIN_VALUE;
475             for (Iterator JavaDoc iter = newReplicants.iterator(); iter.hasNext(); )
476             {
477                int cur = ((Integer JavaDoc) iter.next()).intValue();
478                if (last >= cur)
479                {
480                   clean = false;
481                   break;
482                }
483                
484                last = cur;
485             }
486          }
487       }
488       
489    }
490
491    private static Object JavaDoc lock = new Object JavaDoc();
492    private static int LOOP_COUNT = 30;
493    
494    public static Test suite() throws Exception JavaDoc
495    {
496       Test t1 = getDeploySetup(DRMTestCase.class, "drm-tests.sar");
497       return t1;
498    }
499
500    public DRMTestCase(String JavaDoc name)
501    {
502       super(name);
503    }
504
505    public void testStateReplication()
506       throws Exception JavaDoc
507    {
508       log.debug("+++ testStateReplication");
509       log.info("java.rmi.server.hostname="+System.getProperty("java.rmi.server.hostname"));
510       RMIAdaptor[] adaptors = getAdaptors();
511       String JavaDoc[] servers = super.getServers();
512       RMIAdaptorExt server0 = (RMIAdaptorExt) adaptors[0];
513       log.info("server0: "+server0);
514       ObjectName JavaDoc clusterService = new ObjectName JavaDoc("jboss:service=DefaultPartition");
515       Vector JavaDoc view0 = (Vector JavaDoc) server0.getAttribute(clusterService, "CurrentView");
516       log.info("server0: CurrentView, "+view0);
517       ObjectName JavaDoc drmService = new ObjectName JavaDoc("jboss.test:service=DRMTestCase");
518       IReplicants drm0 = (IReplicants)
519          MBeanServerInvocationHandler.newProxyInstance(server0, drmService,
520          IReplicants.class, true);
521       log.info(MBeanServerInvocationHandler JavaDoc.class.getProtectionDomain());
522       TestListener listener = new TestListener(log);
523       server0.addNotificationListener(drmService, listener, null, null);
524       log.info("server0 addNotificationListener");
525       String JavaDoc address = (String JavaDoc) drm0.lookupLocalReplicant();
526       log.info("server0: lookupLocalReplicant: "+address);
527       assertTrue("server0: address("+address+") == server0("+servers[0]+")",
528          address.equals(servers[0]));
529
530       RMIAdaptorExt server1 = (RMIAdaptorExt) adaptors[1];
531       log.info("server1: "+server1);
532       Vector JavaDoc view1 = (Vector JavaDoc) server1.getAttribute(clusterService, "CurrentView");
533       log.info("server1: CurrentView, "+view1);
534       IReplicants drm1 = (IReplicants)
535          MBeanServerInvocationHandler.newProxyInstance(server1, drmService,
536          IReplicants.class, true);
537       server1.addNotificationListener(drmService, listener, null, null);
538       log.info("server1 addNotificationListener");
539       address = (String JavaDoc) drm1.lookupLocalReplicant();
540       log.info("server1: lookupLocalReplicant: "+address);
541       assertTrue("server1: address("+address+") == server1("+servers[1]+")",
542          address.equals(servers[1]));
543
544       List JavaDoc replicants0 = drm0.lookupReplicants();
545       List JavaDoc replicants1 = drm1.lookupReplicants();
546       assertTrue("size of replicants0 == replicants1)",
547          replicants0.size() == replicants1.size());
548       HashSet JavaDoc testSet = new HashSet JavaDoc(replicants0);
549       for(int n = 0; n < replicants0.size(); n ++)
550       {
551          Object JavaDoc entry = replicants1.get(n);
552          assertTrue("replicants0 contains:"+entry, testSet.contains(entry));
553       }
554
555       //
556
for(int n = 0; n < 10; n ++)
557       {
558          drm0.add("key"+n, "data"+n+".0");
559          drm1.add("key"+n, "data"+n+".1");
560       }
561       for(int n = 0; n < 10; n ++)
562       {
563          String JavaDoc key = "key"+n;
564          log.info("key: "+key);
565          replicants0 = drm0.lookupReplicants(key);
566          replicants1 = drm1.lookupReplicants(key);
567          log.info("replicants0: "+replicants0);
568          log.info("replicants1: "+replicants1);
569          HashSet JavaDoc testSet0 = new HashSet JavaDoc(replicants0);
570          HashSet JavaDoc testSet1 = new HashSet JavaDoc(replicants1);
571          assertTrue("size of replicants0 == replicants1)",
572             replicants0.size() == replicants1.size());
573          Object JavaDoc entry = drm0.lookupLocalReplicant(key);
574          log.info("drm0.lookupLocalReplicant, key="+key+", entry="+entry);
575          assertTrue("replicants0 contains:"+entry, testSet0.contains(entry));
576          assertTrue("replicants1 contains:"+entry, testSet1.contains(entry));
577       }
578
579       for(int n = 0; n < 10; n ++)
580          drm0.remove("key"+n);
581
582       server0.removeNotificationListener(drmService, listener);
583       server1.removeNotificationListener(drmService, listener);
584    }
585    
586    /**
587     * Tests the functionality of isMasterReplica(), also testing merge
588     * handling.
589     *
590     * TODO move this test out of the testsuite and into the cluster module
591     * itself, since it doesn't rely on the container.
592     *
593     * @throws Exception
594     */

595    public void testIsMasterReplica() throws Exception JavaDoc
596    {
597       log.debug("+++ testIsMasterReplica()");
598       
599       MBeanServer JavaDoc mbeanServer =
600          MBeanServerFactory.createMBeanServer("mockPartition");
601       try {
602          ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
603          MockHAPartition partition = new MockHAPartition(localAddress);
604       
605          DistributedReplicantManagerImpl drm =
606                new DistributedReplicantManagerImpl(partition);
607
608          drm.create();
609          
610          // Create a fake view for the MockHAPartition
611

612          Vector JavaDoc remoteAddresses = new Vector JavaDoc();
613          for (int i = 1; i < 5; i++)
614             remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
615          
616          Vector JavaDoc allNodes = new Vector JavaDoc(remoteAddresses);
617          allNodes.add(localAddress);
618          partition.setCurrentViewClusterNodes(allNodes);
619          
620          // Pass fake state to DRMImpl
621

622          HashMap JavaDoc replicants = new HashMap JavaDoc();
623          ArrayList JavaDoc remoteResponses = new ArrayList JavaDoc();
624          for (int i = 0; i < remoteAddresses.size(); i++)
625          {
626             ClusterNode node = (ClusterNode) remoteAddresses.elementAt(i);
627             Integer JavaDoc replicant = new Integer JavaDoc(i + 1);
628             replicants.put(node.getName(), replicant);
629             HashMap JavaDoc localReplicant = new HashMap JavaDoc();
630             localReplicant.put("Mock", replicant);
631             remoteResponses.add(new Object JavaDoc[] {node.getName(), localReplicant});
632          }
633          HashMap JavaDoc services = new HashMap JavaDoc();
634          services.put("Mock", replicants);
635          
636          int hash = 0;
637          for (int i = 1; i < 5; i++)
638             hash += (new Integer JavaDoc(i)).hashCode();
639          
640          HashMap JavaDoc intraviewIds = new HashMap JavaDoc();
641          intraviewIds.put("Mock", new Integer JavaDoc(hash));
642       
643          partition.setRemoteReplicants(remoteResponses);
644          
645          drm.setCurrentState(new Object JavaDoc[] {services, intraviewIds });
646          
647          drm.start();
648          
649          // add a local replicant
650

651          drm.add("Mock", new Integer JavaDoc(5));
652          
653          // test that this node is not the master replica
654

655          assertFalse("Local node is not master after startup",
656                      drm.isMasterReplica("Mock"));
657       
658          // simulate a split where this node is the coord
659

660          Vector JavaDoc localOnly = new Vector JavaDoc();
661          localOnly.add(localAddress);
662          
663          partition.setCurrentViewClusterNodes(localOnly);
664          partition.setRemoteReplicants(new ArrayList JavaDoc());
665          
666          drm.membershipChanged(remoteAddresses, new Vector JavaDoc(), localOnly);
667          
668          // test that this node is the master replica
669

670          assertTrue("Local node is master after split", drm.isMasterReplica("Mock"));
671          
672          // Remove our local replicant
673

674          drm.remove("Mock");
675          
676          // test that this node is not the master replica
677

678          assertFalse("Local node is not master after dropping replicant",
679                      drm.isMasterReplica("Mock"));
680          
681          // Restore the local replicant
682

683          drm.add("Mock", new Integer JavaDoc(5));
684          
685          // simulate a merge
686

687          Vector JavaDoc mergeGroups = new Vector JavaDoc();
688          mergeGroups.add(remoteAddresses);
689          mergeGroups.add(localOnly);
690          
691          partition.setCurrentViewClusterNodes(allNodes);
692          partition.setRemoteReplicants(remoteResponses);
693          
694          drm.membershipChangedDuringMerge(new Vector JavaDoc(), remoteAddresses,
695                                           allNodes, mergeGroups);
696          
697          // Merge processing is done asynchronously, so pause a bit
698
sleepThread(100);
699          
700          // test that this node is not the master replica
701

702          assertFalse("Local node is not master after merge",
703                      drm.isMasterReplica("Mock"));
704       }
705       finally {
706          MBeanServerFactory.releaseMBeanServer(mbeanServer);
707       }
708    }
709    
710    
711    /**
712     * Tests that one thread blocking in DRM.notifyKeyListeners() does not
713     * prevent other threads registering/unregistering listeners. JBAS-2539
714     *
715     * TODO move this test out of the testsuite and into the cluster module
716     * itself, since it doesn't rely on the container.
717     *
718     * @throws Exception
719     */

720    public void testKeyListenerDeadlock() throws Exception JavaDoc
721    {
722       log.debug("+++ testKeyListenerDeadlock()");
723       
724       MBeanServer JavaDoc mbeanServer =
725          MBeanServerFactory.createMBeanServer("mockPartition");
726       try {
727          ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
728          MockHAPartition partition = new MockHAPartition(localAddress);
729       
730          DistributedReplicantManagerImpl drm =
731                new DistributedReplicantManagerImpl(partition);
732
733          drm.create();
734          
735          // Create a fake view for the MockHAPartition
736

737          Vector JavaDoc remoteAddresses = new Vector JavaDoc();
738          for (int i = 1; i < 5; i++)
739             remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
740          
741          Vector JavaDoc allNodes = new Vector JavaDoc(remoteAddresses);
742          allNodes.add(localAddress);
743