KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > dtf > DistributedTestCase


1 /*
2 * JBoss, the OpenSource J2EE webOS
3 *
4 * Distributable under LGPL license.
5 * See terms of license at gnu.org.
6 */

7 package org.jboss.dtf;
8
9 import junit.framework.AssertionFailedError;
10 import junit.framework.Test;
11 import junit.framework.TestListener;
12 import junit.framework.TestResult;
13 import org.apache.log4j.Level;
14 import org.jboss.logging.Logger;
15 import org.jgroups.Address;
16 import org.jgroups.Channel;
17 import org.jgroups.ChannelException;
18 import org.jgroups.JChannel;
19 import org.jgroups.MembershipListener;
20 import org.jgroups.View;
21 import org.jgroups.blocks.GroupRequest;
22 import org.jgroups.blocks.MethodCall;
23 import org.jgroups.blocks.RpcDispatcher;
24
25 import java.io.IOException;
26 import java.util.Enumeration;
27 import java.util.Vector;
28
29 /**
30  * This class should be used as the base class for any distributed test classes
31  * or other classes where you need to have a remote barrier so all of the
32  * classes can start at the same time. Note: If want to continually run,
33  * then set the number of instances to 1, which will by-pass the shutdown.
34  * <br>
35  * This class requires JGroups 2.0 to run
36  * <br>
37  * Since this uses the JUnit framework, can create test just as you would
38  * a normal JUnit test, but make sure that if you use setUp() and tearDown()
39  * to call the super as well since these are used to control the remote
40  * synchronization.
41  *
42  * @author <a HREF="mailto:telrod@vocalocity.net">Tom Elrod</a>
43  * @version $Revision: 1.6.2.3 $
44  */

45 public class DistributedTestCase extends MultipleTestCase implements MembershipListener
46 {
47    private static final Logger log = Logger.getLogger(DistributedTestCase.class);
48
49    private int parties = 2; //defaults to 2 since most common number of instances (since will always be > 1)
50

51    private Channel channel;
52    private RpcDispatcher disp;
53    private Address localAddress;
54
55    private int numOfMembers;
56
57
58 /*
59    // multicast version of jgroups config
60    private String props =
61          "UDP(mcast_recv_buf_size=64000;mcast_send_buf_size=32000;" +
62          "mcast_port=45566;use_packet_handler=false;ucast_recv_buf_size=64000;" +
63          "mcast_addr=228.8.8.8;loopback=false;ucast_send_buf_size=32000;ip_ttl=32):" +
64          "PING(timeout=2000;num_initial_members=3):" +
65          "MERGE2(max_interval=10000;min_interval=5000):" +
66          "FD(timeout=2000;max_tries=3;shun=true):" +
67          "VERIFY_SUSPECT(timeout=1500):" +
68          "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,1200,2400,4800):" +
69          "pbcast.STABLE(desired_avg_gossip=20000):" +
70          "UNICAST(timeout=1200,2400,3600):" +
71          "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
72          "pbcast.GMS(print_local_addr=true;join_timeout=3000;join_retry_timeout=2000;shun=true)";
73 */

74    // tcp version of jgroups config
75
private String props =
76          "TCP(start_port=7800):" +
77          "TCPPING(initial_hosts=localhost[7800];port_range=3;timeout=3000;" +
78          "num_initial_members=3;up_thread=true;down_thread=true):" +
79          "MERGE2(max_interval=3000;min_interval=1500):" +
80          "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):" +
81          "pbcast.NAKACK(down_thread=true;up_thread=true;gc_lag=100;retransmit_timeout=3000):" +
82          "pbcast.STABLE(desired_avg_gossip=20000;down_thread=false;up_thread=false):" +
83          "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;" +
84          "print_local_addr=false;down_thread=true;up_thread=true)";
85
86
87    // How long should wait for everyone to be ready to start. Default 10 seconds
88
private long startupTimeout = 60000;
89    // How long should wait for everyone to be ready to shutdown.
90
// Default 2 minutes since could take a long time to run tests.
91
private long shutdownTimeout = 120000;
92
93    private boolean shouldShutdown = true;
94
95    private int shutdownCount;
96
97    private boolean startupWaitFlag = false;
98    private boolean shutdownWaitFlag = false;
99
100    private boolean startupCalledFlag = false;
101    private boolean shutdownCalledFlag = false;
102
103    //JUnit related variables
104
// Used to indicate the number of runs since have to be sure to not call
105
// startup() for each test method run.
106
private int testRunCount = 0;
107    // flag to indicate if should disconnect from JG using shutdown() or endTest()
108
private boolean runningAsUnitTest = false;
109
110    private final Object waitObj = new Object();
111    private DistributedTestListener testListener = new DistributedTestListener();
112
113    public DistributedTestCase(String name)
114    {
115
116       super(name);
117       //setLogging();
118
}
119
120    public long getShutdownTimeout()
121    {
122       return shutdownTimeout;
123    }
124
125    protected void setShutdownTimeout(long timeout)
126    {
127       this.shutdownTimeout = timeout;
128    }
129
130    public void setLogging()
131    {
132       org.apache.log4j.BasicConfigurator.configure();
133       org.apache.log4j.Category.getRoot().setLevel(Level.INFO);
134       org.apache.log4j.Category.getInstance("org.jboss.remoting").setLevel(Level.DEBUG);
135       org.apache.log4j.Category.getInstance("org.jboss.dtf").setLevel(Level.DEBUG);
136       org.apache.log4j.Category.getInstance("org.jgroups").setLevel(Level.FATAL);
137
138       org.apache.log4j.SimpleLayout layout = new org.apache.log4j.SimpleLayout();
139       try
140       {
141          org.apache.log4j.FileAppender fileAppender = new org.apache.log4j.FileAppender(layout, getClass().getName() + "_output.log");
142          fileAppender.setThreshold(Level.DEBUG);
143          org.apache.log4j.Category.getRoot().addAppender(fileAppender);
144       }
145       catch(IOException e)
146       {
147          e.printStackTrace();
148       }
149
150 // org.apache.log4j.ConsoleAppender consoleAppender = new org.apache.log4j.ConsoleAppender();
151
// consoleAppender.setThreshold(Level.INFO);
152
// org.apache.log4j.Category.getRoot().addAppender(consoleAppender);
153

154       //System.out.println("Root log level = " + org.apache.log4j.Category.getRoot().getLevel());
155
Enumeration appenders = org.apache.log4j.Category.getRoot().getAllAppenders();
156       while(appenders.hasMoreElements())
157       {
158          org.apache.log4j.Appender appender = (org.apache.log4j.Appender) appenders.nextElement();
159          //System.out.println(appender.getName());
160
if(appender instanceof org.apache.log4j.ConsoleAppender)
161          {
162             ((org.apache.log4j.ConsoleAppender) appender).setThreshold(Level.INFO);
163          }
164       }
165    }
166
167    /**
168     * Number of members connected.
169     *
170     * @return
171     */

172    public int getNumberOfMembers()
173    {
174       return numOfMembers;
175    }
176
177    /**
178     * Sets the number of total number of remote instances (including this instance).
179     *
180     * @param numOfInstances
181     */

182    protected void init(int numOfInstances)
183    {
184       parties = numOfInstances;
185    }
186
187    /**
188     * Get the total number of instances running in test case.
189     *
190     * @return
191     */

192    public int getNumberOfInstances()
193    {
194       return parties;
195    }
196
197    /**
198     * Sends JG message to let other remote test instances know this instance is
199     * ready to run. Will block until all instances are ready.
200     *
201     * @param numOfInstances - indicates total number of instnaces for remote test.
202     * @throws Exception
203     */

204    public void startup(int numOfInstances) throws Exception
205    {
206       init(numOfInstances);
207       startup();
208    }
209
210    /**
211     * Sends JG message to let other remote test instances know this instance is
212     * ready to run. Will block until all instances are ready.
213     *
214     * @throws Exception
215     */

216    public void startup() throws Exception
217    {
218       shutdownCount = parties;
219       // if more than 1 party, then should shutdown
220
shouldShutdown = parties > 1;
221
222       startupWaitFlag = true;
223       startupCalledFlag = true;
224       log.debug("Sending startup notification");
225       sendStartupNotification();
226
227       long startTime = System.currentTimeMillis();
228       while(startupWaitFlag)
229       {
230          try
231          {
232             synchronized(waitObj)
233             {
234                waitObj.wait(1000);
235             }
236
237             if(timeoutExpired(startTime, startupTimeout))
238             {
239                break;
240             }
241          }
242          catch(InterruptedException e)
243          {
244             break;
245          }
246       }
247
248       if(startupWaitFlag)
249       {
250          // we timed out and still not everyone joined
251
disp.stop();
252          channel.disconnect();
253          throw new Exception("Timed out waiting for other instances to start.");
254       }
255    }
256
257    /**
258     * Should be called when ready to shutdown. Will notify all other remote test
259     * instances and will then block until all other instances have made the same call.
260     *
261     * @throws Exception
262     */

263    public void shutdown() throws Exception
264    {
265       try
266       {
267          shutdownWaitFlag = true;
268          shutdownCalledFlag = true;
269          Thread.sleep(1000);
270          sendShutdownNotification();
271
272          long startTime = System.currentTimeMillis();
273          while(shutdownWaitFlag)
274          {
275             try
276             {
277                //TODO: Need to same waitObj.wait(1000) as is done in startup()
278
Thread.sleep(1000);
279                if(timeoutExpired(startTime, shutdownTimeout))
280                {
281                   if(shouldShutdown)
282                   {
283                      break;
284                   }
285                }
286             }
287             catch(InterruptedException e)
288             {
289             }
290          }
291
292          if(shutdownWaitFlag)
293          {
294             // we timed out
295
throw new Exception("Timed out waiting for other instances to stop.");
296          }
297       }
298       finally
299       {
300          // if not running as unit test, can disconnect now.
301
// otherwise, need to wait till test has ended.
302
if(!runningAsUnitTest)
303          {
304             log.debug("calling disconnect. runningAsUnitTest = " + runningAsUnitTest);
305             disconnect();
306          }
307       }
308    }
309
310    /**
311     * Disconnects from JGroups
312     */

313    protected void disconnect()
314    {
315       //need to give JG a few seconds to send test report
316
try
317       {
318          Thread.sleep(5000);
319       }
320       catch(InterruptedException e)
321       {
322          e.printStackTrace();
323       }
324       try
325       {
326          log.debug("Disconnecting from JGroups. Will not be able to send any more messages.");
327          disp.stop();
328          channel.disconnect();
329          /**
330           * Can not call close since it will prevent any of the other
331           * instances from receiving or sending shutdown notifications.
332           */

333          //channel.close();
334
}
335       catch(Exception e)
336       {
337          log.warn("Exception in disconnect() when stopping and closing channel.", e);
338       }
339    }
340
341    private boolean timeoutExpired(long startTime, long timeout)
342    {
343       long duration = System.currentTimeMillis() - startTime;
344       if(duration > timeout)
345       {
346          return true;
347       }
348       else
349       {
350          return false;
351       }
352    }
353
354    private void sendStartupNotification() throws ChannelException
355    {
356       //JGroups code
357
channel = new JChannel(props);
358       disp = new RpcDispatcher(channel, null, this, this);
359       channel.connect("DistributedTestCase");
360       localAddress = channel.getLocalAddress();
361    }
362
363    private void sendShutdownNotification()
364    {
365       MethodCall call = new MethodCall("receiveShutdownNotification",
366                                        new Object[]{localAddress}, new Class[]{Address.class});
367       disp.callRemoteMethods(null, call, GroupRequest.GET_NONE, 0);
368       log.debug("sent shutdown notification " + call);
369    }
370
371    /**
372     * Used to indicate when members have joined the JGroups channel for this
373     * test case run.
374     *
375     * @param view
376     */

377    public void viewAccepted(View view)
378    {
379       // has everyone joined
380
Vector members = view.getMembers();
381       log.debug("members.size() = " + members.size());
382       Enumeration enum = members.elements();
383       while(enum.hasMoreElements())
384       {
385          log.debug(enum.nextElement());
386       }
387       numOfMembers = members.size();
388       if(numOfMembers >= parties && startupWaitFlag) // waiting for everyone to start
389
{
390          startupWaitFlag = false;
391          synchronized(waitObj)
392          {
393             waitObj.notify();
394          }
395       }
396    }
397
398    /**
399     * Called using JGroups by other instances when they are ready to shutdown.
400     *
401     * @param address
402     */

403    public void receiveShutdownNotification(Address address)
404    {
405       log.debug("receiveShutdownNotification() from " + address);
406       log.debug("shutdownCount = " + (shutdownCount - 1) +
407                 " and shutdownWaitFlag = " + shutdownWaitFlag);
408       if(--shutdownCount == 0 && shutdownWaitFlag) // waiting for everyone to stop
409
{
410          if(shouldShutdown)
411          {
412             shutdownWaitFlag = false;
413          }
414       }
415    }
416
417    private void callRemoteAssert(String methodName, Object[] params, Class[] types)
418    {
419       int len = params != null ? params.length : 0;
420       Object[] new_args = new Object[len + 1];
421       new_args[0] = localAddress;
422       for(int i = 0; i < params.length; i++)
423       {
424          new_args[i + 1] = params[i];
425       }
426       MethodCall call = new MethodCall(methodName, new_args, types);
427       disp.callRemoteMethods(null, call, GroupRequest.GET_NONE, 0);
428    }
429
430    /*************************************
431     * Driver callback for JUnit asserts *
432     *************************************/

433    /**
434     * JGroups callback when a test fails.
435     *
436     * @param source
437     * @param message
438     */

439    public void receiveAssert(Address source, String message)
440    {
441       log.warn("Assert source: " + source + "\tmessage = " + message);
442    }
443
444
445    /**
446     * ***********************
447     * JUnit methods *
448     * ************************
449     */

450    public void run(TestResult testResult)
451    {
452       log.debug("DistributedTestCase::run(TestResult testResult) called.");
453       log.debug("countTestCases() = " + countTestCases());
454       testResult.addListener(testListener);
455       super.run(testResult);
456    }
457
458    /**
459     * Will check to see if this is the first test method to be run, if it is, will
460     * then call startup() to let other instances know ready to run.
461     *
462     * @throws Exception
463     */

464    protected void setUp() throws Exception
465    {
466
467       log.debug("setUp() - testRunCount = " + testRunCount);
468       if(testRunCount == 0)
469       {
470          // have to make sure startup not already explicitly called
471
if(!startupCalledFlag)
472          {
473             log.debug("calling startup()");
474             startup(getNumberOfInstances());
475          }
476       }
477       testRunCount++;
478    }
479
480    /**
481     * Will call shutdown if this is the last test method to be run.
482     *
483     * @throws Exception
484     */

485    protected void tearDown() throws Exception
486    {
487       log.debug("tearDown() - testRunCount = " + testRunCount);
488       log.debug("tearDown() - countTestCases() = " + countTestCases());
489       if(testRunCount == countTestCases())
490       {
491          // need to make sure shutdown not already explicitly called
492
if(!shutdownCalledFlag)
493          {
494             log.debug("calling shutdown()");
495             shutdown();
496          }
497       }
498    }
499
500    /**
501     * *********************
502     * JGroups methods *
503     * **********************
504     */

505    public void suspect(Address address)
506    {
507    }
508
509    public void block()
510    {
511    }
512
513    /**
514     * Listener of the test results which then forward failures or errors
515     * on to the other remote instances via JG so they can report it.
516     */

517    public class DistributedTestListener implements TestListener
518    {
519       public void addError(Test test, Throwable throwable)
520       {
521          String message = throwable.getMessage();
522          String methodName = "receiveAssert";
523          log.debug("addError() called with " + message);
524          callRemoteAssert(methodName, new Object[]{message}, new Class[]{Address.class, String.class});
525       }
526
527       public void addFailure(Test test, AssertionFailedError assertionFailedError)
528       {
529          String message = assertionFailedError.getMessage();
530          String methodName = "receiveAssert";
531          log.debug("addFailure() called with " + message);
532          callRemoteAssert(methodName, new Object[]{message}, new Class[]{Address.class, String.class});
533       }
534
535       public void endTest(Test test)
536       {
537          log.debug("endTest() called. Calling disconnect().");
538          disconnect();
539       }
540
541       public void startTest(Test test)
542       {
543          runningAsUnitTest = true;
544          log.debug("startTest() called");
545       }
546
547    }
548
549
550    public static void main(String[] args)
551    {
552
553       org.apache.log4j.BasicConfigurator.configure();
554       org.apache.log4j.Category.getRoot().setLevel(Level.INFO);
555       org.apache.log4j.Category.getInstance(DistributedTestCase.class).setLevel(Level.DEBUG);
556
557       try
558       {
559          final DistributedTestCase testCase = new DistributedTestCase(DistributedTestCase.class.getName());
560          final DistributedTestCase testCase2 = new DistributedTestCase(DistributedTestCase.class.getName() + "2");
561
562          new Thread()
563          {
564             public void run()
565             {
566                try
567                {
568                   testCase.startup(2);
569                }
570                catch(Exception e)
571                {
572                   e.printStackTrace();
573                }
574             }
575          }.start();
576          new Thread()
577          {
578             public void run()
579             {
580                try
581                {
582                   testCase2.startup(2);
583                }
584                catch(Exception e)
585                {
586                   e.printStackTrace();
587                }
588             }
589          }.start();
590
591          int x = 100;
592          while(x-- > 0)
593          {
594             Thread.sleep(200);
595          }
596
597          System.out.println("Number of members connected: " + testCase.getNumberOfMembers());
598          System.out.println("Number of members connected: " + testCase2.getNumberOfMembers());
599
600          DistributedTest.assertTrue(2 == testCase.getNumberOfMembers());
601
602 // testCase.shutdown();
603
// testCase2.shutdown();
604
}
605       catch(Exception e)
606       {
607          e.printStackTrace();
608          System.exit(1);
609       }
610       System.exit(0);
611    }
612
613 }
614
Popular Tags