KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > cluster > test > DRMTestCase


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.cluster.test;
23
24 import java.rmi.RemoteException JavaDoc;
25 import java.rmi.server.UnicastRemoteObject JavaDoc;
26 import java.security.SecureRandom JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.HashMap JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.Vector JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.HashSet JavaDoc;
33
34 import javax.management.MBeanServer JavaDoc;
35 import javax.management.MBeanServerFactory JavaDoc;
36 import javax.management.MBeanServerInvocationHandler JavaDoc;
37 import javax.management.ObjectName JavaDoc;
38 import javax.management.Notification JavaDoc;
39
40 import junit.framework.Test;
41
42 import org.jboss.test.JBossClusteredTestCase;
43 import org.jboss.test.cluster.drm.IReplicants;
44 import org.jboss.test.cluster.drm.MockHAPartition;
45 import org.jboss.ha.framework.interfaces.ClusterNode;
46 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
47 import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener;
48 import org.jboss.ha.framework.server.DistributedReplicantManagerImpl;
49 import org.jboss.jmx.adaptor.rmi.RMIAdaptor;
50 import org.jboss.jmx.adaptor.rmi.RMIAdaptorExt;
51 import org.jboss.jmx.adaptor.rmi.RMINotificationListener;
52 import org.jboss.logging.Logger;
53 import org.jgroups.stack.IpAddress;
54
55 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
56
57 /** Tests of the DistributedReplicantManagerImpl
58  *
59  * @author Scott.Stark@jboss.org
60  * @author Brian.Stansberry@jboss.com
61  * @version $Revision: 58603 $
62  */

63 public class DRMTestCase extends JBossClusteredTestCase
64 {
65    static class TestListener extends UnicastRemoteObject JavaDoc
66       implements RMINotificationListener
67    {
68       private static final long serialVersionUID = 1;
69       private Logger log;
70
71       public TestListener(Logger log) throws RemoteException JavaDoc
72       {
73          this.log = log;
74       }
75       public void handleNotification(Notification JavaDoc notification, Object JavaDoc handback)
76          throws RemoteException JavaDoc
77       {
78          log.info("handleNotification, "+notification);
79       }
80    }
81    
82    /**
83     * Thread that will first register a DRM ReplicantLister that synchronizes
84     * on the test class' lock object, and then calls DRM add or remove,
85     * causing the thread to block if the lock object's monitor is held.
86     */

87    static class BlockingListenerThread extends Thread JavaDoc
88       implements DistributedReplicantManager.ReplicantListener
89    {
90       private DistributedReplicantManagerImpl drm;
91       private String JavaDoc nodeName;
92       private boolean add;
93       private boolean blocking;
94       private Exception JavaDoc ex;
95       
96       BlockingListenerThread(DistributedReplicantManagerImpl drm,
97                              boolean add,
98                              String JavaDoc nodeName)
99       {
100          this.drm = drm;
101          this.add =add;
102          this.nodeName = nodeName;
103          drm.registerListener("TEST", this);
104       }
105
106       public void replicantsChanged(String JavaDoc key, List JavaDoc newReplicants, int newReplicantsViewId)
107       {
108          blocking = true;
109          synchronized(lock)
110          {
111             blocking = false;
112          }
113       }
114       
115       public void run()
116       {
117          try
118          {
119             if (add)
120             {
121                if (nodeName == null)
122                   drm.add("TEST", "local-replicant");
123                else
124                   drm._add("TEST", nodeName, "remote-replicant");
125             }
126             else
127             {
128                if (nodeName == null)
129                   drm.remove("TEST");
130                else
131                   drm._remove("TEST", nodeName);
132             }
133          }
134          catch (Exception JavaDoc e)
135          {
136             ex = e;
137          }
138       }
139       
140       public boolean isBlocking()
141       {
142          return blocking;
143       }
144       
145       public Exception JavaDoc getException()
146       {
147          return ex;
148       }
149       
150    }
151    
152    /**
153     * Thread that registers and then unregisters a DRM ReplicantListener.
154     */

155    static class RegistrationThread extends Thread JavaDoc
156    {
157       private DistributedReplicantManager drm;
158       private boolean registered = false;
159       private boolean unregistered = true;
160       
161       RegistrationThread(DistributedReplicantManager drm)
162       {
163          this.drm = drm;
164       }
165       
166       public void run()
167       {
168          NullListener listener = new NullListener();
169          drm.registerListener("DEADLOCK", listener);
170          registered = true;
171          drm.unregisterListener("DEADLOCK", listener);
172          unregistered = true;
173       }
174       
175       public boolean isRegistered()
176       {
177          return registered;
178       }
179       
180       public boolean isUnregistered()
181       {
182          return unregistered;
183       }
184       
185    }
186    
187    /**
188     * A DRM ReplicantListener that does nothing.
189     */

190    static class NullListener
191       implements DistributedReplicantManager.ReplicantListener
192    {
193       public void replicantsChanged(String JavaDoc key, List JavaDoc newReplicants,
194                                     int newReplicantsViewId)
195       {
196          // no-op
197
}
198    }
199    
200    /**
201     * DRM ReplicantListener that mimics the HASingletonDeployer service
202     * by deploying/undeploying a service if it's notified that by that DRM
203     * that it is the master replica for its key.
204     */

205    static class MockHASingletonDeployer
206       implements DistributedReplicantManager.ReplicantListener
207    {
208       DistributedReplicantManager drm;
209       MockDeployer deployer;
210       String JavaDoc key;
211       boolean master = false;
212       NullListener deploymentListener = new NullListener();
213       Exception JavaDoc ex;
214       Logger log;
215       Object JavaDoc mutex = new Object JavaDoc();
216       
217       MockHASingletonDeployer(MockDeployer deployer, String JavaDoc key, Logger log)
218       {
219          this.drm = deployer.getDRM();
220          this.deployer = deployer;
221          this.key = key;
222          this.log = log;
223       }
224
225       public void replicantsChanged(String JavaDoc key,
226                                     List JavaDoc newReplicants,
227                                     int newReplicantsViewId)
228       {
229          if (this.key.equals(key))
230          {
231             synchronized(mutex)
232             {
233                boolean nowMaster = drm.isMasterReplica(key);
234                
235                try
236                {
237                   if (!master && nowMaster) {
238                      log.debug(Thread.currentThread().getName() +
239                                " Deploying " + key);
240                      deployer.deploy(key + "A", key, deploymentListener);
241                   }
242                   else if (master && !nowMaster) {
243                      log.debug(Thread.currentThread().getName() +
244                                " undeploying " + key);
245                      deployer.undeploy(key + "A", deploymentListener);
246                   }
247                   else
248                   {
249                      log.debug(Thread.currentThread().getName() +
250                                " -- no status change in " + key +
251                                " -- master = " + master);
252                   }
253                   master = nowMaster;
254                }
255                catch (Exception JavaDoc e)
256                {
257                   e.printStackTrace();
258                   if (ex == null)
259                      ex = e;
260                }
261             }
262          }
263       }
264       
265       public Exception JavaDoc getException()
266       {
267          return ex;
268       }
269       
270    }
271    
272    /**
273     * Thread the repeatedly deploys and undeploys a MockHASingletonDeployer.
274     */

275    static class DeployerThread extends Thread JavaDoc
276    {
277       Semaphore semaphore;
278       MockDeployer deployer;
279       DistributedReplicantManager.ReplicantListener listener;
280       String JavaDoc key;
281       Exception JavaDoc ex;
282       int count = -1;
283       Logger log;
284       
285       DeployerThread(MockDeployer deployer,
286                      String JavaDoc key,
287                      DistributedReplicantManager.ReplicantListener listener,
288                      Semaphore semaphore,
289                      Logger log)
290       {
291          super("Deployer " + key);
292          this.deployer = deployer;
293          this.listener = listener;
294          this.key = key;
295          this.semaphore = semaphore;
296          this.log = log;
297       }
298       
299       public void run()
300       {
301          boolean acquired = false;
302          try
303          {
304             acquired = semaphore.attempt(60000);
305             if (!acquired)
306                throw new Exception JavaDoc("Cannot acquire semaphore");
307             SecureRandom JavaDoc random = new SecureRandom JavaDoc();
308             for (count = 0; count < LOOP_COUNT; count++)
309             {
310                deployer.deploy(key, "JGroups", listener);
311
312                sleepThread(random.nextInt(50));
313                deployer.undeploy(key, listener);
314             }
315          }
316          catch (Exception JavaDoc e)
317          {
318             e.printStackTrace();
319             ex = e;
320          }
321          finally
322          {
323             if (acquired)
324                semaphore.release();
325          }
326       }
327       
328       public Exception JavaDoc getException()
329       {
330          return ex;
331       }
332       
333       public int getCount()
334       {
335          return count;
336       }
337    }
338    
339    /**
340     * Thread that mimics the JGroups up-handler thread that calls into the DRM.
341     * Repeatedly and randomly calls adds or removes a replicant for a set
342     * of keys.
343     */

344    static class JGroupsThread extends Thread JavaDoc
345    {
346       Semaphore semaphore;
347       DistributedReplicantManagerImpl drm;
348       String JavaDoc[] keys;
349       String JavaDoc nodeName;
350       Exception JavaDoc ex;
351       int count = -1;
352       int weightFactor;
353       
354       JGroupsThread(DistributedReplicantManagerImpl drm,
355                     String JavaDoc[] keys,
356                     String JavaDoc nodeName,
357                     Semaphore semaphore)
358       {
359          super("JGroups");
360          this.drm = drm;
361          this.keys = keys;
362          this.semaphore = semaphore;
363          this.nodeName = nodeName;
364          this.weightFactor = (int) 2.5 * keys.length;
365       }
366       
367       public void run()
368       {
369          boolean acquired = false;
370          try
371          {
372             acquired = semaphore.attempt(60000);
373             if (!acquired)
374                throw new Exception JavaDoc("Cannot acquire semaphore");
375             boolean[] added = new boolean[keys.length];
376             SecureRandom JavaDoc random = new SecureRandom JavaDoc();
377             
378             for (count = 0; count < weightFactor * LOOP_COUNT; count++)
379             {
380                int pos = random.nextInt(keys.length);
381                if (added[pos])
382                {
383                   drm._remove(keys[pos], nodeName);
384                   added[pos] = false;
385                }
386                else
387                {
388                   drm._add(keys[pos], nodeName, "");
389                   added[pos] = true;
390                }
391                sleepThread(random.nextInt(30));
392             }
393          }
394          catch (Exception JavaDoc e)
395          {
396             e.printStackTrace();
397             ex = e;
398          }
399          finally
400          {
401             if (acquired)
402                semaphore.release();
403          }
404       }
405       
406       public Exception JavaDoc getException()
407       {
408          return ex;
409       }
410       
411       public int getCount()
412       {
413          return (count / weightFactor);
414       }
415       
416    }
417    
418    /**
419     * Mocks the deployer of a service that registers/unregisters DRM listeners
420     * and replicants. Only allows a single thread of execution, a la the
421     * org.jboss.system.ServiceController.
422     */

423    static class MockDeployer
424    {
425       DistributedReplicantManager drm;
426       
427       MockDeployer(DistributedReplicantManager drm)
428       {
429          this.drm = drm;
430       }
431       
432       void deploy(String JavaDoc key, String JavaDoc replicant,
433                   DistributedReplicantManager.ReplicantListener listener)
434             throws Exception JavaDoc
435       {
436          synchronized(this)
437          {
438             drm.registerListener(key, listener);
439             drm.add(key, replicant);
440             sleepThread(10);
441          }
442       }
443       
444       void undeploy(String JavaDoc key,
445                     DistributedReplicantManager.ReplicantListener listener)
446          throws Exception JavaDoc
447       {
448          synchronized(this)
449          {
450             drm.remove(key);
451             drm.unregisterListener(key, listener);
452             sleepThread(10);
453          }
454       }
455       
456       DistributedReplicantManager getDRM()
457       {
458          return drm;
459       }
460    }
461    
462    /** ReplicantListener that caches the list of replicants */
463    static class CachingListener implements ReplicantListener
464    {
465       List JavaDoc replicants = null;
466       boolean clean = true;
467       
468       public void replicantsChanged(String JavaDoc key, List JavaDoc newReplicants,
469                                     int newReplicantsViewId)
470       {
471          this.replicants = newReplicants;
472          if (clean && newReplicants != null)
473          {
474             int last = Integer.MIN_VALUE;
475             for (Iterator JavaDoc iter = newReplicants.iterator(); iter.hasNext(); )
476             {
477                int cur = ((Integer JavaDoc) iter.next()).intValue();
478                if (last >= cur)
479                {
480                   clean = false;
481                   break;
482                }
483                
484                last = cur;
485             }
486          }
487       }
488       
489    }
490
491    private static Object JavaDoc lock = new Object JavaDoc();
492    private static int LOOP_COUNT = 30;
493    
494    public static Test suite() throws Exception JavaDoc
495    {
496       Test t1 = getDeploySetup(DRMTestCase.class, "drm-tests.sar");
497       return t1;
498    }
499
500    public DRMTestCase(String JavaDoc name)
501    {
502       super(name);
503    }
504
505    public void testStateReplication()
506       throws Exception JavaDoc
507    {
508       log.debug("+++ testStateReplication");
509       log.info("java.rmi.server.hostname="+System.getProperty("java.rmi.server.hostname"));
510       RMIAdaptor[] adaptors = getAdaptors();
511       String JavaDoc[] servers = super.getServers();
512       RMIAdaptorExt server0 = (RMIAdaptorExt) adaptors[0];
513       log.info("server0: "+server0);
514       ObjectName JavaDoc clusterService = new ObjectName JavaDoc("jboss:service=DefaultPartition");
515       Vector JavaDoc view0 = (Vector JavaDoc) server0.getAttribute(clusterService, "CurrentView");
516       log.info("server0: CurrentView, "+view0);
517       ObjectName JavaDoc drmService = new ObjectName JavaDoc("jboss.test:service=DRMTestCase");
518       IReplicants drm0 = (IReplicants)
519          MBeanServerInvocationHandler.newProxyInstance(server0, drmService,
520          IReplicants.class, true);
521       log.info(MBeanServerInvocationHandler JavaDoc.class.getProtectionDomain());
522       TestListener listener = new TestListener(log);
523       server0.addNotificationListener(drmService, listener, null, null);
524       log.info("server0 addNotificationListener");
525       String JavaDoc address = (String JavaDoc) drm0.lookupLocalReplicant();
526       log.info("server0: lookupLocalReplicant: "+address);
527       assertTrue("server0: address("+address+") == server0("+servers[0]+")",
528          address.equals(servers[0]));
529
530       RMIAdaptorExt server1 = (RMIAdaptorExt) adaptors[1];
531       log.info("server1: "+server1);
532       Vector JavaDoc view1 = (Vector JavaDoc) server1.getAttribute(clusterService, "CurrentView");
533       log.info("server1: CurrentView, "+view1);
534       IReplicants drm1 = (IReplicants)
535          MBeanServerInvocationHandler.newProxyInstance(server1, drmService,
536          IReplicants.class, true);
537       server1.addNotificationListener(drmService, listener, null, null);
538       log.info("server1 addNotificationListener");
539       address = (String JavaDoc) drm1.lookupLocalReplicant();
540       log.info("server1: lookupLocalReplicant: "+address);
541       assertTrue("server1: address("+address+") == server1("+servers[1]+")",
542          address.equals(servers[1]));
543
544       List JavaDoc replicants0 = drm0.lookupReplicants();
545       List JavaDoc replicants1 = drm1.lookupReplicants();
546       assertTrue("size of replicants0 == replicants1)",
547          replicants0.size() == replicants1.size());
548       HashSet JavaDoc testSet = new HashSet JavaDoc(replicants0);
549       for(int n = 0; n < replicants0.size(); n ++)
550       {
551          Object JavaDoc entry = replicants1.get(n);
552          assertTrue("replicants0 contains:"+entry, testSet.contains(entry));
553       }
554
555       //
556
for(int n = 0; n < 10; n ++)
557       {
558          drm0.add("key"+n, "data"+n+".0");
559          drm1.add("key"+n, "data"+n+".1");
560       }
561       for(int n = 0; n < 10; n ++)
562       {
563          String JavaDoc key = "key"+n;
564          log.info("key: "+key);
565          replicants0 = drm0.lookupReplicants(key);
566          replicants1 = drm1.lookupReplicants(key);
567          log.info("replicants0: "+replicants0);
568          log.info("replicants1: "+replicants1);
569          HashSet JavaDoc testSet0 = new HashSet JavaDoc(replicants0);
570          HashSet JavaDoc testSet1 = new HashSet JavaDoc(replicants1);
571          assertTrue("size of replicants0 == replicants1)",
572             replicants0.size() == replicants1.size());
573          Object JavaDoc entry = drm0.lookupLocalReplicant(key);
574          log.info("drm0.lookupLocalReplicant, key="+key+", entry="+entry);
575          assertTrue("replicants0 contains:"+entry, testSet0.contains(entry));
576          assertTrue("replicants1 contains:"+entry, testSet1.contains(entry));
577       }
578
579       for(int n = 0; n < 10; n ++)
580          drm0.remove("key"+n);
581
582       server0.removeNotificationListener(drmService, listener);
583       server1.removeNotificationListener(drmService, listener);
584    }
585    
586    /**
587     * Tests the functionality of isMasterReplica(), also testing merge
588     * handling.
589     *
590     * TODO move this test out of the testsuite and into the cluster module
591     * itself, since it doesn't rely on the container.
592     *
593     * @throws Exception
594     */

595    public void testIsMasterReplica() throws Exception JavaDoc
596    {
597       log.debug("+++ testIsMasterReplica()");
598       
599       MBeanServer JavaDoc mbeanServer =
600          MBeanServerFactory.createMBeanServer("mockPartition");
601       try {
602          ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
603          MockHAPartition partition = new MockHAPartition(localAddress);
604       
605          DistributedReplicantManagerImpl drm =
606                new DistributedReplicantManagerImpl(partition);
607
608          drm.create();
609          
610          // Create a fake view for the MockHAPartition
611

612          Vector JavaDoc remoteAddresses = new Vector JavaDoc();
613          for (int i = 1; i < 5; i++)
614             remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
615          
616          Vector JavaDoc allNodes = new Vector JavaDoc(remoteAddresses);
617          allNodes.add(localAddress);
618          partition.setCurrentViewClusterNodes(allNodes);
619          
620          // Pass fake state to DRMImpl
621

622          HashMap JavaDoc replicants = new HashMap JavaDoc();
623          ArrayList JavaDoc remoteResponses = new ArrayList JavaDoc();
624          for (int i = 0; i < remoteAddresses.size(); i++)
625          {
626             ClusterNode node = (ClusterNode) remoteAddresses.elementAt(i);
627             Integer JavaDoc replicant = new Integer JavaDoc(i + 1);
628             replicants.put(node.getName(), replicant);
629             HashMap JavaDoc localReplicant = new HashMap JavaDoc();
630             localReplicant.put("Mock", replicant);
631             remoteResponses.add(new Object JavaDoc[] {node.getName(), localReplicant});
632          }
633          HashMap JavaDoc services = new HashMap JavaDoc();
634          services.put("Mock", replicants);
635          
636          int hash = 0;
637          for (int i = 1; i < 5; i++)
638             hash += (new Integer JavaDoc(i)).hashCode();
639          
640          HashMap JavaDoc intraviewIds = new HashMap JavaDoc();
641          intraviewIds.put("Mock", new Integer JavaDoc(hash));
642       
643          partition.setRemoteReplicants(remoteResponses);
644          
645          drm.setCurrentState(new Object JavaDoc[] {services, intraviewIds });
646          
647          drm.start();
648          
649          // add a local replicant
650

651          drm.add("Mock", new Integer JavaDoc(5));
652          
653          // test that this node is not the master replica
654

655          assertFalse("Local node is not master after startup",
656                      drm.isMasterReplica("Mock"));
657       
658          // simulate a split where this node is the coord
659

660          Vector JavaDoc localOnly = new Vector JavaDoc();
661          localOnly.add(localAddress);
662          
663          partition.setCurrentViewClusterNodes(localOnly);
664          partition.setRemoteReplicants(new ArrayList JavaDoc());
665          
666          drm.membershipChanged(remoteAddresses, new Vector JavaDoc(), localOnly);
667          
668          // test that this node is the master replica
669

670          assertTrue("Local node is master after split", drm.isMasterReplica("Mock"));
671          
672          // Remove our local replicant
673

674          drm.remove("Mock");
675          
676          // test that this node is not the master replica
677

678          assertFalse("Local node is not master after dropping replicant",
679                      drm.isMasterReplica("Mock"));
680          
681          // Restore the local replicant
682

683          drm.add("Mock", new Integer JavaDoc(5));
684          
685          // simulate a merge
686

687          Vector JavaDoc mergeGroups = new Vector JavaDoc();
688          mergeGroups.add(remoteAddresses);
689          mergeGroups.add(localOnly);
690          
691          partition.setCurrentViewClusterNodes(allNodes);
692          partition.setRemoteReplicants(remoteResponses);
693          
694          drm.membershipChangedDuringMerge(new Vector JavaDoc(), remoteAddresses,
695                                           allNodes, mergeGroups);
696          
697          // Merge processing is done asynchronously, so pause a bit
698
sleepThread(100);
699          
700          // test that this node is not the master replica
701

702          assertFalse("Local node is not master after merge",
703                      drm.isMasterReplica("Mock"));
704       }
705       finally {
706          MBeanServerFactory.releaseMBeanServer(mbeanServer);
707       }
708    }
709    
710    
711    /**
712     * Tests that one thread blocking in DRM.notifyKeyListeners() does not
713     * prevent other threads registering/unregistering listeners. JBAS-2539
714     *
715     * TODO move this test out of the testsuite and into the cluster module
716     * itself, since it doesn't rely on the container.
717     *
718     * @throws Exception
719     */

720    public void testKeyListenerDeadlock() throws Exception JavaDoc
721    {
722       log.debug("+++ testKeyListenerDeadlock()");
723       
724       MBeanServer JavaDoc mbeanServer =
725          MBeanServerFactory.createMBeanServer("mockPartition");
726       try {
727          ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
728          MockHAPartition partition = new MockHAPartition(localAddress);
729       
730          DistributedReplicantManagerImpl drm =
731                new DistributedReplicantManagerImpl(partition);
732
733          drm.create();
734          
735          // Create a fake view for the MockHAPartition
736

737          Vector JavaDoc remoteAddresses = new Vector JavaDoc();
738          for (int i = 1; i < 5; i++)
739             remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
740          
741          Vector JavaDoc allNodes = new Vector JavaDoc(remoteAddresses);
742          allNodes.add(localAddress);
743          partition.setCurrentViewClusterNodes(allNodes);
744          
745          drm.start();
746          
747          BlockingListenerThread blt =
748             new BlockingListenerThread(drm, true, null);
749          
750          // Hold the lock monitor so the test thread can't acquire it
751
// This keeps the blocking thread alive.
752
synchronized(lock) {
753             // Spawn a thread that will change a key and then block on the
754
// notification back to itself
755
blt.start();
756
757             sleepThread(50);
758             
759             assertTrue("Test thread is alive", blt.isAlive());
760             assertTrue("Test thread is blocking", blt.isBlocking());
761             
762             RegistrationThread rt = new RegistrationThread(drm);
763             rt.start();
764
765             sleepThread(50);
766             
767             assertTrue("No deadlock on listener registration", rt.isRegistered());
768             
769             assertTrue("No deadlock on listener unregistration", rt.isUnregistered());
770             
771             assertNull("No exception in deadlock tester", blt.getException());
772             
773             assertTrue("Test thread is still blocking", blt.isBlocking());
774             assertTrue("Test thread is still alive", blt.isAlive());
775          }
776          
777          drm.unregisterListener("TEST", blt);
778          
779          sleepThread(50);
780          
781          // Test going through remove
782
blt = new BlockingListenerThread(drm, false, null);
783          
784          // Hold the lock monitor so the test thread can't acquire it
785
// This keeps the blocking thread alive.
786
synchronized(lock) {
787             // Spawn a thread that will change a key and then block on the
788
// notification back to itself
789
blt.start();
790
791             sleepThread(50);
792             
793             assertTrue("Test thread is alive", blt.isAlive());
794             assertTrue("Test thread is blocking", blt.isBlocking());
795             
796             RegistrationThread rt = new RegistrationThread(drm);
797             rt.start();
798
799             sleepThread(50);
800             
801             assertTrue("No deadlock on listener registration", rt.isRegistered());
802             
803             assertTrue("No deadlock on listener unregistration", rt.isUnregistered());
804             
805             assertNull("No exception in deadlock tester", blt.getException());
806             
807             assertTrue("Test thread is still blocking", blt.isBlocking());
808             assertTrue("Test thread is still alive", blt.isAlive());
809          }
810       }
811       finally {
812          MBeanServerFactory.releaseMBeanServer(mbeanServer);
813       }
814    }
815    
816    
817    /**
818     * Tests that remotely-originated calls don't block.
819     *
820     * TODO move this test out of the testsuite and into the cluster module
821     * itself, since it doesn't rely on the container.
822     *
823     * @throws Exception
824     */

825    public void testRemoteCallBlocking() throws Exception JavaDoc
826    {
827       log.debug("+++ testRemoteCallBlocking()");
828       
829       MBeanServer JavaDoc mbeanServer =
830          MBeanServerFactory.createMBeanServer("mockPartition");
831       try {
832          ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
833          MockHAPartition partition = new MockHAPartition(localAddress);
834       
835          DistributedReplicantManagerImpl drm =
836                new DistributedReplicantManagerImpl(partition);
837
838          drm.create();
839          
840          // Create a fake view for the MockHAPartition
841

842          Vector JavaDoc remoteAddresses = new Vector JavaDoc();
843          for (int i = 1; i < 5; i++)
844             remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
845          
846          Vector JavaDoc allNodes = new Vector JavaDoc(remoteAddresses);
847          allNodes.add(localAddress);
848          partition.setCurrentViewClusterNodes(allNodes);
849          
850          drm.start();
851          
852          String JavaDoc sender = ((ClusterNode)remoteAddresses.get(0)).getName();
853          BlockingListenerThread blt =
854             new BlockingListenerThread(drm, true, sender);
855          
856          // Hold the lock monitor so the test thread can't acquire it
857
// This keeps the blocking thread alive.
858
synchronized(lock) {
859             // Spawn a thread that will change a key and then block on the
860
// notification back to itself
861
blt.start();
862
863             sleepThread(50);
864             
865             assertFalse("JGroups thread is not alive", blt.isAlive());
866             assertTrue("Async handler thread is blocking", blt.isBlocking());
867             
868             assertNull("No exception in JGroups thread", blt.getException());
869          }
870          
871          drm.unregisterListener("TEST", blt);
872          
873          sleepThread(50);
874          
875          // Test going through remove
876
blt = new BlockingListenerThread(drm, false, sender);
877          
878          // Hold the lock monitor so the test thread can't acquire it
879
// This keeps the blocking thread alive.
880
synchronized(lock) {
881             // Spawn a thread that will change a key and then block on the
882
// notification back to itself
883
blt.start();
884
885             sleepThread(50);
886             
887             assertFalse("JGroups thread is not alive", blt.isAlive());
888             assertTrue("Async handler thread is blocking", blt.isBlocking());
889             
890             assertNull("No exception in JGroups thread", blt.getException());
891          }
892       }
893       finally {
894          MBeanServerFactory.releaseMBeanServer(mbeanServer);
895       }
896    }
897    
898    /**
899     * Tests that one thread blocking in DRM.notifyKeyListeners() does not
900     * prevent other threads that use different keys adding/removing
901     * replicants. JBAS-2169
902     *
903     * TODO move this test out of the testsuite and into the cluster module
904     * itself, since it doesn't rely on the container.
905     *
906     * @throws Exception
907     */

908    public void testNonConflictingAddRemoveDeadlock() throws Exception JavaDoc
909    {
910
911       log.debug("+++ testNonConflictingAddRemoveDeadlock()");
912       
913       addRemoveDeadlockTest(false);
914    }
915    
916    /**
917     * Tests that one thread blocking in DRM.notifyKeyListeners() does not
918     * prevent other threads that use the same keys adding/removing
919     * replicants. JBAS-1151
920     *
921     * NOTE: This test basically demonstrates a small race condition that can
922     * happen with the way HASingletonSupport's startService() method is
923     * implemented (actually HAServiceMBeanSupport, but relevant in the case
924     * of subclass HASingletonSupport, and in particular in its use in the
925     * HASingletonDeployer service). However, since the test doesn't actually
926     * use the relevant code, but rather uses mock objects that work the same
927     * way, this test is disabled -- its purpose has been achieved. JIRA issue
928     * JBAS-1151 tracks the real problem; when it's resolved we'll create a test
929     * case against the real code that proves that fact.
930     *
931     * TODO move this test out of the testsuite and into the cluster module
932     * itself, since it doesn't rely on the container.
933     *
934     * @throws Exception
935     */

936    public void badtestConflictingAddRemoveDeadlock() throws Exception JavaDoc
937    {
938       log.debug("+++ testConflictingAddRemoveDeadlock()");
939       
940       addRemoveDeadlockTest(true);
941    }
942    
943    private void addRemoveDeadlockTest(boolean conflicting) throws Exception JavaDoc
944    {
945       String JavaDoc[] keys = { "A", "B", "C", "D", "E" };
946       int count = keys.length;
947       
948       MBeanServer JavaDoc mbeanServer =
949          MBeanServerFactory.createMBeanServer("mockPartition");
950       try {
951          ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
952          MockHAPartition partition = new MockHAPartition(localAddress);
953       
954          DistributedReplicantManagerImpl drm =
955                new DistributedReplicantManagerImpl(partition);
956
957          drm.create();
958          
959          // Create a fake view for the MockHAPartition
960

961          Vector JavaDoc remoteAddresses = new Vector JavaDoc();
962          ClusterNode remote = new ClusterNode(new IpAddress("127.0.0.1", 12341));
963          remoteAddresses.add(remote);
964          
965          Vector JavaDoc allNodes = new Vector JavaDoc(remoteAddresses);
966          allNodes.add(localAddress);
967          partition.setCurrentViewClusterNodes(allNodes);
968          
969          drm.start();
970          
971          MockDeployer deployer = new MockDeployer(drm);
972          
973          if (!conflicting)
974          {
975             // Register a MockHASingletonDeployer, but since we're in
976
// non-conflicting mode, the DeployerThreads won't deal with it
977
MockHASingletonDeployer listener =
978                   new MockHASingletonDeployer(deployer, "HASingleton", log);
979             
980             drm.registerListener("HASingleton", listener);
981             drm.add("HASingleton", "HASingleton");
982          }
983          
984          // Create a semaphore to gate the threads and acquire all its permits
985
Semaphore semaphore = new Semaphore(count + 1);
986          for (int i = 0; i <= count; i++)
987             semaphore.acquire();
988          
989          DeployerThread[] deployers = new DeployerThread[keys.length];
990          for (int i = 0; i < count; i++)
991          {
992             DistributedReplicantManager.ReplicantListener listener = null;
993             if (conflicting)
994             {
995                listener = new MockHASingletonDeployer(deployer, keys[i], log);
996             }
997             else
998             {
999                listener = new NullListener();
1000            }
1001            deployers[i] = new DeployerThread(deployer, keys[i], listener, semaphore, log);
1002            deployers[i].start();
1003         }
1004         
1005         String JavaDoc[] jgKeys = keys;
1006         if (!conflicting)
1007         {
1008            // The JGroups thread also deals with the MockHASingletonDeployer
1009
// key that the DeployerThreads don't
1010
jgKeys = new String JavaDoc[keys.length + 1];
1011            System.arraycopy(keys, 0, jgKeys, 0, keys.length);
1012            jgKeys[keys.length] = "HASingleton";
1013         }
1014         JGroupsThread jgThread = new JGroupsThread(drm, jgKeys, remote.getName(), semaphore);
1015         jgThread.start();
1016         
1017         // Launch the threads
1018
semaphore.release(count + 1);
1019         
1020         boolean reacquired = false;
1021         try
1022         {
1023            // Give the threads 5 secs to acquire the semaphore
1024
long maxElapsed = System.currentTimeMillis() + 5000;
1025            for (int i = 0; i < keys.length; i++)
1026            {
1027               if (deployers[i].getCount() < 0)
1028               {
1029                  assertTrue("Thread " + keys[i] + " started in time",
1030                              maxElapsed - System.currentTimeMillis() > 0);
1031                  sleepThread(10);
1032                  i--; // try again
1033
}
1034            }
1035            
1036            while (jgThread.getCount() < 0)
1037            {
1038               assertTrue("jgThread started in time",
1039                           maxElapsed - System.currentTimeMillis() > 0);
1040               sleepThread(10);
1041            }
1042            // Reaquire all the permits, thus showing the threads didn't deadlock
1043

1044            // Give them 500 ms per loop
1045
maxElapsed = System.currentTimeMillis() + (500 * LOOP_COUNT);
1046            for (int i = 0; i <= count; i++)
1047            {
1048               long waitTime = maxElapsed - System.currentTimeMillis();
1049               assertTrue("Acquired thread " + i, semaphore.attempt(waitTime));
1050            }
1051            
1052            reacquired = true;
1053            
1054            // Ensure there were no exceptions
1055
for (int i = 0; i < keys.length; i++)
1056            {
1057               assertEquals("Thread " + keys[i] + " finished", LOOP_COUNT, deployers[i].getCount());
1058               assertNull("Thread " + keys[i] + " saw no exceptions", deployers[i].getException());
1059            }
1060            assertEquals("JGroups Thread finished", LOOP_COUNT, jgThread.getCount());
1061            assertNull("JGroups Thread saw no exceptions", jgThread.getException());
1062         }
1063         finally
1064         {
1065
1066            if (!reacquired)
1067            {
1068               for (int i = 0; i < keys.length; i++)
1069               {
1070                  if (deployers[i].getException() != null)
1071                  {
1072                     System.out.println("Exception in deployer " + i);
1073                     deployers[i].getException().printStackTrace(System.out);
1074                  }
1075                  else
1076                  {
1077                     System.out.println("Thread " + i + " completed " + deployers[i].getCount());
1078                  }
1079               }
1080               if (jgThread.getException() != null)
1081               {
1082                  System.out.println("Exception in jgThread");
1083                  jgThread.getException().printStackTrace(System.out);
1084               }
1085               else
1086               {
1087                  System.out.println("jgThread completed " + jgThread.getCount());
1088               }
1089            }
1090            
1091            // Be sure the threads are dead
1092
if (jgThread.isAlive())
1093            {
1094               jgThread.interrupt();
1095               sleepThread(5);
1096               printStackTrace(jgThread.getName(), jgThread.getException());
1097            }
1098            for (int i = 0; i < keys.length; i++)
1099            {
1100               if (deployers[i].isAlive())
1101               {
1102                  deployers[i].interrupt();
1103                  sleepThread(5);
1104                  printStackTrace(deployers[i].getName(), deployers[i].getException());
1105               }
1106            }
1107               
1108         }
1109      }
1110      finally {
1111         MBeanServerFactory.releaseMBeanServer(mbeanServer);
1112      }
1113   }
1114   
1115   public void testReplicantOrder() throws Exception JavaDoc
1116   {
1117      MBeanServer JavaDoc mbeanServer =
1118         MBeanServerFactory.createMBeanServer("mockPartitionA");
1119      try {
1120         
1121         // Create a fake view for the MockHAPartition
1122
ClusterNode[] nodes = new ClusterNode[5];
1123         String JavaDoc[] names = new String JavaDoc[nodes.length];
1124         Integer JavaDoc[] replicants = new Integer JavaDoc[nodes.length];
1125         Vector JavaDoc allNodes = new Vector JavaDoc();
1126         for (int i = 0; i < nodes.length; i++)
1127         {
1128            nodes[i] = new ClusterNode(new IpAddress("127.0.0.1", 12340 + i));
1129            allNodes.add(nodes[i]);
1130            names[i] = nodes[i].getName();
1131            replicants[i] = new Integer JavaDoc(i);
1132         }
1133         
1134         MockHAPartition partition = new MockHAPartition(nodes[2]);
1135         partition.setCurrentViewClusterNodes(allNodes);
1136         
1137         DistributedReplicantManagerImpl drm =
1138               new DistributedReplicantManagerImpl(partition);
1139         drm.create();
1140         drm.start();
1141         
1142         CachingListener listener = new CachingListener();
1143         drm.registerListener("TEST", listener);
1144         
1145         SecureRandom JavaDoc random = new SecureRandom JavaDoc();
1146         boolean[] added = new boolean[nodes.length];
1147         List JavaDoc lookup = null;
1148         for (int i = 0; i < 10; i++)
1149         {
1150            int node = random.nextInt(nodes.length);
1151            if (added[node])
1152            {
1153               if (node == 2)
1154                  drm.remove("TEST");
1155               else
1156                  drm._remove("TEST", nodes[node].getName());
1157               added[node] = false;
1158            }
1159            else
1160            {
1161               if (node == 2)
1162                  drm.add("TEST", replicants[node]);
1163               else
1164                  drm._add("TEST", nodes[node].getName(), replicants[node]);
1165               added[node] = true;
1166            }
1167            
1168            // Confirm the proper order of the replicant node names
1169
lookup = maskListClass(drm.lookupReplicantsNodeNames("TEST"));
1170            confirmReplicantList(lookup, names, added);
1171            
1172            // Confirm the proper order of the replicants via lookupReplicants
1173
lookup = maskListClass(drm.lookupReplicants("TEST"));
1174            confirmReplicantList(lookup, replicants, added);
1175            
1176            // Confirm the listener got the same list
1177
// assertEquals("Listener received a correct list", lookup,
1178
// maskListClass(listener.replicants));
1179
}
1180         
1181         // Let the asynchronous notification thread catch up
1182
sleep(25);
1183         
1184         // Confirm all lists presented to the listener were properly ordered
1185
assertTrue("Listener saw no misordered lists", listener.clean);
1186         
1187      }
1188      finally {
1189         MBeanServerFactory.releaseMBeanServer(mbeanServer);
1190      }
1191   }
1192   
1193   private void confirmReplicantList(List JavaDoc current, Object JavaDoc[] all, boolean[] added)
1194   {
1195      Iterator JavaDoc iter = current.iterator();
1196      for (int i = 0; i < added.length; i++)
1197      {
1198         if (added[i])
1199         {
1200            assertTrue("List has more replicants", iter.hasNext());
1201            assertEquals("Replicant for node " + i + " is next",
1202                         all[i], iter.next());
1203         }
1204      }
1205      assertFalse("List has no extra replicants", iter.hasNext());
1206   }
1207   
1208   /** Converts the given list to an ArrayList, if it isn't already */
1209   private List JavaDoc maskListClass(List JavaDoc toMask)
1210   {
1211      if (toMask instanceof ArrayList JavaDoc)
1212         return toMask;
1213      else if (toMask == null)
1214         return new ArrayList JavaDoc();
1215      else
1216         return new ArrayList JavaDoc(toMask);
1217   }
1218
1219   private static void sleepThread(long millis)
1220   {
1221      try
1222      {
1223         Thread.sleep(millis);
1224      }
1225      catch (InterruptedException JavaDoc e) {
1226         e.printStackTrace();
1227      }
1228   }
1229   
1230   private static void printStackTrace(String JavaDoc threadName, Exception JavaDoc e)
1231   {
1232      if (e instanceof InterruptedException JavaDoc)
1233      {
1234         System.out.println("Stack trace for " + threadName);
1235         e.printStackTrace(System.out);
1236         System.out.println();
1237      }
1238   }
1239
1240}
1241
Popular Tags