1 7 package org.jboss.dtf; 8 9 import junit.framework.AssertionFailedError; 10 import junit.framework.Test; 11 import junit.framework.TestListener; 12 import junit.framework.TestResult; 13 import org.apache.log4j.Level; 14 import org.jboss.logging.Logger; 15 import org.jgroups.Address; 16 import org.jgroups.Channel; 17 import org.jgroups.ChannelException; 18 import org.jgroups.JChannel; 19 import org.jgroups.MembershipListener; 20 import org.jgroups.View; 21 import org.jgroups.blocks.GroupRequest; 22 import org.jgroups.blocks.MethodCall; 23 import org.jgroups.blocks.RpcDispatcher; 24 25 import java.io.IOException; 26 import java.util.Enumeration; 27 import java.util.Vector; 28 29 45 public class DistributedTestCase extends MultipleTestCase implements MembershipListener 46 { 47 private static final Logger log = Logger.getLogger(DistributedTestCase.class); 48 49 private int parties = 2; 51 private Channel channel; 52 private RpcDispatcher disp; 53 private Address localAddress; 54 55 private int numOfMembers; 56 57 58 74 private String props = 76 "TCP(start_port=7800):" + 77 "TCPPING(initial_hosts=localhost[7800];port_range=3;timeout=3000;" + 78 "num_initial_members=3;up_thread=true;down_thread=true):" + 79 "MERGE2(max_interval=3000;min_interval=1500):" + 80 "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):" + 81 "pbcast.NAKACK(down_thread=true;up_thread=true;gc_lag=100;retransmit_timeout=3000):" + 82 "pbcast.STABLE(desired_avg_gossip=20000;down_thread=false;up_thread=false):" + 83 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;" + 84 "print_local_addr=false;down_thread=true;up_thread=true)"; 85 86 87 private long startupTimeout = 60000; 89 private long shutdownTimeout = 120000; 92 93 private boolean shouldShutdown = true; 94 95 private int shutdownCount; 96 97 private boolean startupWaitFlag = false; 98 private boolean shutdownWaitFlag = false; 99 100 private boolean startupCalledFlag = false; 101 private boolean shutdownCalledFlag = false; 102 103 private int testRunCount = 0; 107 private boolean runningAsUnitTest = false; 109 110 private final Object waitObj = new Object(); 111 private DistributedTestListener testListener = new DistributedTestListener(); 112 113 public DistributedTestCase(String name) 114 { 115 116 super(name); 117 } 119 120 public long getShutdownTimeout() 121 { 122 return shutdownTimeout; 123 } 124 125 protected void setShutdownTimeout(long timeout) 126 { 127 this.shutdownTimeout = timeout; 128 } 129 130 public void setLogging() 131 { 132 org.apache.log4j.BasicConfigurator.configure(); 133 org.apache.log4j.Category.getRoot().setLevel(Level.INFO); 134 org.apache.log4j.Category.getInstance("org.jboss.remoting").setLevel(Level.DEBUG); 135 org.apache.log4j.Category.getInstance("org.jboss.dtf").setLevel(Level.DEBUG); 136 org.apache.log4j.Category.getInstance("org.jgroups").setLevel(Level.FATAL); 137 138 org.apache.log4j.SimpleLayout layout = new org.apache.log4j.SimpleLayout(); 139 try 140 { 141 org.apache.log4j.FileAppender fileAppender = new org.apache.log4j.FileAppender(layout, getClass().getName() + "_output.log"); 142 fileAppender.setThreshold(Level.DEBUG); 143 org.apache.log4j.Category.getRoot().addAppender(fileAppender); 144 } 145 catch(IOException e) 146 { 147 e.printStackTrace(); 148 } 149 150 154 Enumeration appenders = org.apache.log4j.Category.getRoot().getAllAppenders(); 156 while(appenders.hasMoreElements()) 157 { 158 org.apache.log4j.Appender appender = (org.apache.log4j.Appender) appenders.nextElement(); 159 if(appender instanceof org.apache.log4j.ConsoleAppender) 161 { 162 ((org.apache.log4j.ConsoleAppender) appender).setThreshold(Level.INFO); 163 } 164 } 165 } 166 167 172 public int getNumberOfMembers() 173 { 174 return numOfMembers; 175 } 176 177 182 protected void init(int numOfInstances) 183 { 184 parties = numOfInstances; 185 } 186 187 192 public int getNumberOfInstances() 193 { 194 return parties; 195 } 196 197 204 public void startup(int numOfInstances) throws Exception 205 { 206 init(numOfInstances); 207 startup(); 208 } 209 210 216 public void startup() throws Exception 217 { 218 shutdownCount = parties; 219 shouldShutdown = parties > 1; 221 222 startupWaitFlag = true; 223 startupCalledFlag = true; 224 log.debug("Sending startup notification"); 225 sendStartupNotification(); 226 227 long startTime = System.currentTimeMillis(); 228 while(startupWaitFlag) 229 { 230 try 231 { 232 synchronized(waitObj) 233 { 234 waitObj.wait(1000); 235 } 236 237 if(timeoutExpired(startTime, startupTimeout)) 238 { 239 break; 240 } 241 } 242 catch(InterruptedException e) 243 { 244 break; 245 } 246 } 247 248 if(startupWaitFlag) 249 { 250 disp.stop(); 252 channel.disconnect(); 253 throw new Exception("Timed out waiting for other instances to start."); 254 } 255 } 256 257 263 public void shutdown() throws Exception 264 { 265 try 266 { 267 shutdownWaitFlag = true; 268 shutdownCalledFlag = true; 269 Thread.sleep(1000); 270 sendShutdownNotification(); 271 272 long startTime = System.currentTimeMillis(); 273 while(shutdownWaitFlag) 274 { 275 try 276 { 277 Thread.sleep(1000); 279 if(timeoutExpired(startTime, shutdownTimeout)) 280 { 281 if(shouldShutdown) 282 { 283 break; 284 } 285 } 286 } 287 catch(InterruptedException e) 288 { 289 } 290 } 291 292 if(shutdownWaitFlag) 293 { 294 throw new Exception("Timed out waiting for other instances to stop."); 296 } 297 } 298 finally 299 { 300 if(!runningAsUnitTest) 303 { 304 log.debug("calling disconnect. runningAsUnitTest = " + runningAsUnitTest); 305 disconnect(); 306 } 307 } 308 } 309 310 313 protected void disconnect() 314 { 315 try 317 { 318 Thread.sleep(5000); 319 } 320 catch(InterruptedException e) 321 { 322 e.printStackTrace(); 323 } 324 try 325 { 326 log.debug("Disconnecting from JGroups. Will not be able to send any more messages."); 327 disp.stop(); 328 channel.disconnect(); 329 333 } 335 catch(Exception e) 336 { 337 log.warn("Exception in disconnect() when stopping and closing channel.", e); 338 } 339 } 340 341 private boolean timeoutExpired(long startTime, long timeout) 342 { 343 long duration = System.currentTimeMillis() - startTime; 344 if(duration > timeout) 345 { 346 return true; 347 } 348 else 349 { 350 return false; 351 } 352 } 353 354 private void sendStartupNotification() throws ChannelException 355 { 356 channel = new JChannel(props); 358 disp = new RpcDispatcher(channel, null, this, this); 359 channel.connect("DistributedTestCase"); 360 localAddress = channel.getLocalAddress(); 361 } 362 363 private void sendShutdownNotification() 364 { 365 MethodCall call = new MethodCall("receiveShutdownNotification", 366 new Object[]{localAddress}, new Class[]{Address.class}); 367 disp.callRemoteMethods(null, call, GroupRequest.GET_NONE, 0); 368 log.debug("sent shutdown notification " + call); 369 } 370 371 377 public void viewAccepted(View view) 378 { 379 Vector members = view.getMembers(); 381 log.debug("members.size() = " + members.size()); 382 Enumeration enum = members.elements(); 383 while(enum.hasMoreElements()) 384 { 385 log.debug(enum.nextElement()); 386 } 387 numOfMembers = members.size(); 388 if(numOfMembers >= parties && startupWaitFlag) { 390 startupWaitFlag = false; 391 synchronized(waitObj) 392 { 393 waitObj.notify(); 394 } 395 } 396 } 397 398 403 public void receiveShutdownNotification(Address address) 404 { 405 log.debug("receiveShutdownNotification() from " + address); 406 log.debug("shutdownCount = " + (shutdownCount - 1) + 407 " and shutdownWaitFlag = " + shutdownWaitFlag); 408 if(--shutdownCount == 0 && shutdownWaitFlag) { 410 if(shouldShutdown) 411 { 412 shutdownWaitFlag = false; 413 } 414 } 415 } 416 417 private void callRemoteAssert(String methodName, Object[] params, Class[] types) 418 { 419 int len = params != null ? params.length : 0; 420 Object[] new_args = new Object[len + 1]; 421 new_args[0] = localAddress; 422 for(int i = 0; i < params.length; i++) 423 { 424 new_args[i + 1] = params[i]; 425 } 426 MethodCall call = new MethodCall(methodName, new_args, types); 427 disp.callRemoteMethods(null, call, GroupRequest.GET_NONE, 0); 428 } 429 430 433 439 public void receiveAssert(Address source, String message) 440 { 441 log.warn("Assert source: " + source + "\tmessage = " + message); 442 } 443 444 445 450 public void run(TestResult testResult) 451 { 452 log.debug("DistributedTestCase::run(TestResult testResult) called."); 453 log.debug("countTestCases() = " + countTestCases()); 454 testResult.addListener(testListener); 455 super.run(testResult); 456 } 457 458 464 protected void setUp() throws Exception 465 { 466 467 log.debug("setUp() - testRunCount = " + testRunCount); 468 if(testRunCount == 0) 469 { 470 if(!startupCalledFlag) 472 { 473 log.debug("calling startup()"); 474 startup(getNumberOfInstances()); 475 } 476 } 477 testRunCount++; 478 } 479 480 485 protected void tearDown() throws Exception 486 { 487 log.debug("tearDown() - testRunCount = " + testRunCount); 488 log.debug("tearDown() - countTestCases() = " + countTestCases()); 489 if(testRunCount == countTestCases()) 490 { 491 if(!shutdownCalledFlag) 493 { 494 log.debug("calling shutdown()"); 495 shutdown(); 496 } 497 } 498 } 499 500 505 public void suspect(Address address) 506 { 507 } 508 509 public void block() 510 { 511 } 512 513 517 public class DistributedTestListener implements TestListener 518 { 519 public void addError(Test test, Throwable throwable) 520 { 521 String message = throwable.getMessage(); 522 String methodName = "receiveAssert"; 523 log.debug("addError() called with " + message); 524 callRemoteAssert(methodName, new Object[]{message}, new Class[]{Address.class, String.class}); 525 } 526 527 public void addFailure(Test test, AssertionFailedError assertionFailedError) 528 { 529 String message = assertionFailedError.getMessage(); 530 String methodName = "receiveAssert"; 531 log.debug("addFailure() called with " + message); 532 callRemoteAssert(methodName, new Object[]{message}, new Class[]{Address.class, String.class}); 533 } 534 535 public void endTest(Test test) 536 { 537 log.debug("endTest() called. Calling disconnect()."); 538 disconnect(); 539 } 540 541 public void startTest(Test test) 542 { 543 runningAsUnitTest = true; 544 log.debug("startTest() called"); 545 } 546 547 } 548 549 550 public static void main(String[] args) 551 { 552 553 org.apache.log4j.BasicConfigurator.configure(); 554 org.apache.log4j.Category.getRoot().setLevel(Level.INFO); 555 org.apache.log4j.Category.getInstance(DistributedTestCase.class).setLevel(Level.DEBUG); 556 557 try 558 { 559 final DistributedTestCase testCase = new DistributedTestCase(DistributedTestCase.class.getName()); 560 final DistributedTestCase testCase2 = new DistributedTestCase(DistributedTestCase.class.getName() + "2"); 561 562 new Thread() 563 { 564 public void run() 565 { 566 try 567 { 568 testCase.startup(2); 569 } 570 catch(Exception e) 571 { 572 e.printStackTrace(); 573 } 574 } 575 }.start(); 576 new Thread() 577 { 578 public void run() 579 { 580 try 581 { 582 testCase2.startup(2); 583 } 584 catch(Exception e) 585 { 586 e.printStackTrace(); 587 } 588 } 589 }.start(); 590 591 int x = 100; 592 while(x-- > 0) 593 { 594 Thread.sleep(200); 595 } 596 597 System.out.println("Number of members connected: " + testCase.getNumberOfMembers()); 598 System.out.println("Number of members connected: " + testCase2.getNumberOfMembers()); 599 600 DistributedTest.assertTrue(2 == testCase.getNumberOfMembers()); 601 602 } 605 catch(Exception e) 606 { 607 e.printStackTrace(); 608 System.exit(1); 609 } 610 System.exit(0); 611 } 612 613 } 614 | Popular Tags |