1 package org.jgroups.tests; 2 3 4 import junit.framework.Test; 5 import junit.framework.TestCase; 6 import junit.framework.TestSuite; 7 import org.jgroups.TimeoutException; 8 import org.jgroups.stack.Interval; 9 import org.jgroups.util.Promise; 10 import org.jgroups.util.TimeScheduler; 11 import org.jgroups.util.Util; 12 13 import java.util.Date ; 14 import java.util.HashMap ; 15 import java.util.concurrent.ScheduledFuture ; 16 import java.util.concurrent.TimeUnit ; 17 import java.util.concurrent.Future ; 18 19 20 25 public class TimeSchedulerTest extends TestCase { 26 TimeScheduler timer=null; 27 static final int NUM_MSGS=1000; 28 static long[] xmit_timeouts={1000, 2000, 4000, 8000}; 29 static double PERCENTAGE_OFF=0.3; HashMap <Long ,Entry> msgs=new HashMap <Long ,Entry>(); 32 33 public TimeSchedulerTest(String name) { 34 super(name); 35 } 36 37 38 public void setUp() throws Exception { 39 super.setUp(); 40 timer=new TimeScheduler(); 41 } 42 43 public void tearDown() throws Exception { 44 super.tearDown(); 45 try { 46 timer.stop(); 47 } 48 catch(InterruptedException e) { 49 } 50 } 51 52 53 public void testCancel() throws InterruptedException { 54 for(int i=0; i < 10; i++) 55 timer.scheduleWithDynamicInterval(new OneTimeTask(1000)); 56 assertEquals(10, timer.size()); 57 timer.stop(); 58 assertEquals(0, timer.size()); 59 } 60 61 62 public void testTaskCancellationBeforeTaskHasRun() { 63 Future future; 64 StressTask task=new StressTask(); 65 future=timer.scheduleWithDynamicInterval(task); 66 assertEquals(1, timer.size()); 67 future.cancel(true); 68 assertEquals(1, timer.size()); 69 70 Util.sleep(200); 71 int num_executions=task.getNum_executions(); 72 System.out.println("number of task executions=" + num_executions); 73 assertEquals("task should never have executed as it was cancelled before execution", 0, num_executions); 74 75 timer.purge(); assertEquals(0, timer.size()); 77 } 78 79 public void testTaskCancellationAfterHasRun() { 80 Future future; 81 StressTask task=new StressTask(); 82 future=timer.scheduleWithDynamicInterval(task); 83 assertEquals(1, timer.size()); 84 85 Util.sleep(200); future.cancel(true); 87 assertEquals(1, timer.size()); 88 89 int num_executions=task.getNum_executions(); 90 System.out.println("number of task executions=" + num_executions); 91 assertTrue("task should have executed at least 1 time, as it was cancelled after 200ms", num_executions >= 1); 92 timer.purge(); assertEquals(0, timer.size()); 94 } 95 96 97 98 public void testRepeatingTask() { 99 Future future; 100 System.out.println(System.currentTimeMillis() + ": adding task"); 101 RepeatingTask task=new RepeatingTask(500); 102 future=timer.scheduleWithDynamicInterval(task); 103 Util.sleep(5000); 104 105 System.out.println("<<< cancelling task"); 106 future.cancel(true); 107 Util.sleep(2000); 108 int num=task.getNum(); 109 System.out.println("task executed " + num + " times"); 110 assertTrue(num >= 9 && num < 11); 111 } 112 113 public void testStress() { 114 StressTask t; 115 116 for(int i=0; i < 1000; i++) { 117 for(int j=0; j < 1000; j++) { 118 t=new StressTask(); 119 Future future=timer.scheduleWithDynamicInterval(t); 120 future.cancel(true); 121 } 122 System.out.println(i + ": " + timer.size()); 123 } 124 for(int i=0; i < 10; i++) { 125 System.out.println(timer.size()); 126 Util.sleep(500); 127 } 128 assertEquals(0, timer.size()); 129 } 130 131 132 public void testDynamicTask() throws InterruptedException { 133 TimeScheduler.Task task=new DynamicTask(); 134 ScheduledFuture <?> future=timer.scheduleWithDynamicInterval(task); 135 assertEquals(1, timer.getQueue().size()); 136 137 assertFalse(future.isCancelled()); 138 assertFalse(future.isDone()); 139 140 Thread.sleep(3000); 141 assertFalse(future.isCancelled()); 142 assertFalse(future.isDone()); 143 144 future.cancel(true); 145 assertTrue(future.isCancelled()); 146 assertTrue(future.isDone()); 147 } 148 149 150 151 public void testDynamicTaskCancel() throws InterruptedException { 152 153 TimeScheduler.Task task=new DynamicTask(); 154 ScheduledFuture <?> future=timer.scheduleWithDynamicInterval(task); 155 156 assertFalse(future.isCancelled()); 157 assertFalse(future.isDone()); 158 159 Thread.sleep(3000); 160 assertFalse(future.isCancelled()); 161 assertFalse(future.isDone()); 162 163 boolean success=future.cancel(true); 164 assertTrue(success); 165 assertTrue(future.isCancelled()); 166 assertTrue(future.isDone()); 167 168 success=future.cancel(true); 169 assertTrue(success); } 171 172 173 public void testIsDone() throws InterruptedException { 174 TimeScheduler.Task task=new DynamicTask(); 175 ScheduledFuture <?> future=timer.scheduleWithDynamicInterval(task); 176 177 assertFalse(future.isCancelled()); 178 assertFalse(future.isDone()); 179 180 Thread.sleep(3000); 181 assertFalse(future.isCancelled()); 182 assertFalse(future.isDone()); 183 184 future.cancel(true); 185 assertTrue(future.isCancelled()); 186 assertTrue(future.isDone()); 187 } 188 189 public void testIsDone2() throws InterruptedException { 190 TimeScheduler.Task task=new DynamicTask(new long[]{1000,2000,-1}); 191 ScheduledFuture <?> future=timer.scheduleWithDynamicInterval(task); 192 193 assertFalse(future.isCancelled()); 194 assertFalse(future.isDone()); 195 196 Thread.sleep(3500); 197 assertFalse(future.isCancelled()); 198 assertTrue(future.isDone()); 199 200 boolean success=future.cancel(true); 201 if(success) 202 assertTrue(future.isCancelled()); 203 else 204 assertFalse(future.isCancelled()); 205 assertTrue(future.isDone()); 206 } 207 208 209 public void testIsDone3() throws InterruptedException { 210 TimeScheduler.Task task=new DynamicTask(new long[]{-1}); 211 ScheduledFuture <?> future=timer.scheduleWithDynamicInterval(task); 212 Thread.sleep(100); 213 assertFalse(future.isCancelled()); 214 assertTrue(future.isDone()); 215 216 boolean success=future.cancel(true); 217 if(success) 218 assertTrue(future.isCancelled()); 219 else 220 assertFalse(future.isCancelled()); 221 assertTrue(future.isDone()); 222 } 223 224 225 public void testImmediateExecution() { 226 Promise p=new Promise(); 227 ImmediateTask task=new ImmediateTask(p); 228 timer.execute(task); 229 try { 230 long start=System.currentTimeMillis(), stop; 231 p.getResultWithTimeout(5); 232 stop=System.currentTimeMillis(); 233 System.out.println("task took " + (stop-start) + "ms"); 234 } 235 catch(TimeoutException e) { 236 fail("ran into timeout - task should have executed immediately"); 237 } 238 } 239 240 241 public void test2Tasks() throws InterruptedException { 242 int size; 243 244 System.out.println(System.currentTimeMillis() + ": adding task"); 245 timer.schedule(new MyTask(), 500, TimeUnit.MILLISECONDS); 246 size=timer.size(); 247 System.out.println("queue size=" + size); 248 assertEquals(1, size); 249 Thread.sleep(1000); 250 size=timer.size(); 251 System.out.println("queue size=" + size); 252 assertEquals(0, size); 253 254 Thread.sleep(1500); 255 System.out.println(System.currentTimeMillis() + ": adding task"); 256 timer.schedule(new MyTask(), 500, TimeUnit.MILLISECONDS); 257 258 System.out.println(System.currentTimeMillis() + ": adding task"); 259 timer.schedule(new MyTask(), 500, TimeUnit.MILLISECONDS); 260 261 System.out.println(System.currentTimeMillis() + ": adding task"); 262 timer.schedule(new MyTask(), 500, TimeUnit.MILLISECONDS); 263 264 size=timer.size(); 265 System.out.println("queue size=" + size); 266 assertEquals(3, size); 267 268 Thread.sleep(1000); 269 size=timer.size(); 270 System.out.println("queue size=" + size); 271 assertEquals(0, size); 272 } 273 274 275 276 277 278 282 public void testRetransmits() throws InterruptedException { 283 Entry entry; 284 int num_non_correct_entries=0; 285 286 System.out.println("-- adding " + NUM_MSGS + " messages:"); 288 for(long i=0; i < NUM_MSGS; i++) { 289 entry=new Entry(i); 290 msgs.put(new Long (i), entry); 291 timer.scheduleWithDynamicInterval(entry); 292 } 293 System.out.println("-- done"); 294 295 System.out.println("-- waiting for 20 secs for all retransmits"); 297 Thread.sleep(20000); 298 299 for(long i=0; i < NUM_MSGS; i++) { 301 entry=msgs.get(new Long (i)); 302 if(!entry.isCorrect()) { 303 num_non_correct_entries++; 304 } 305 } 306 307 if(num_non_correct_entries > 0) 308 System.err.println("Number of incorrect retransmission timeouts: " + num_non_correct_entries); 309 else { 310 for(long i=0; i < NUM_MSGS; i++) { 311 entry=msgs.get(new Long (i)); 312 if(entry != null) 313 System.out.println(i + ": " + entry); 314 } 315 } 316 assertEquals(0, num_non_correct_entries); 317 } 318 319 320 321 322 static private class ImmediateTask implements Runnable { 323 Promise p; 324 325 public ImmediateTask(Promise p) { 326 this.p=p; 327 } 328 329 public void run() { 330 p.setResult(Boolean.TRUE); 331 } 332 } 333 334 static class MyTask implements Runnable { 335 public void run() { 336 System.out.println(System.currentTimeMillis() + ": this is MyTask running - done"); 337 } 338 } 339 340 341 static class DynamicTask implements TimeScheduler.Task { 342 long[] times={1000,2000,4000}; 343 int index=0; 344 345 public DynamicTask() { 346 } 347 348 public DynamicTask(long[] times) { 349 this.times=times; 350 } 351 352 public long nextInterval() { 353 if(index == times.length -1) 354 return times[index]; 355 else 356 return times[index++]; 357 } 358 359 public void run() { 360 System.out.println("dynamic task run at " + new Date ()); 361 } 362 } 363 364 365 366 static class OneTimeTask implements TimeScheduler.Task { 367 boolean done=false; 368 private long timeout=0; 369 370 OneTimeTask(long timeout) { 371 this.timeout=timeout; 372 } 373 374 public long nextInterval() { 375 return timeout; 376 } 377 378 public void run() { 379 System.out.println(System.currentTimeMillis() + ": this is MyTask running - done"); 380 done=true; 381 } 382 } 383 384 385 static class RepeatingTask extends OneTimeTask { 386 int num=0; 387 388 RepeatingTask(long timeout) { 389 super(timeout); 390 } 391 392 public int getNum() { 393 return num; 394 } 395 396 public void run() { 397 System.out.println((num +1) + ": this is the repeating task"); 398 num++; 399 } 400 } 401 402 403 static class StressTask implements TimeScheduler.Task { 404 boolean cancelled=false; 405 int num_executions=0; 406 407 public int getNum_executions() { 408 return num_executions; 409 } 410 411 public long nextInterval() { 412 return 50; 413 } 414 415 public void run() { 416 num_executions++; 417 } 418 } 419 420 421 422 private static class Entry implements TimeScheduler.Task { 423 long start_time=0; long first_xmit=0; long second_xmit=0; long third_xmit=0; long fourth_xmit=0; boolean cancelled=false; 429 Interval interval=new Interval(xmit_timeouts); 430 long seqno=0; 431 432 433 Entry(long seqno) { 434 this.seqno=seqno; 435 start_time=System.currentTimeMillis(); 436 } 437 438 439 public long nextInterval() { 440 return interval.next(); 441 } 442 443 public void run() { 444 if(first_xmit == 0) 445 first_xmit=System.currentTimeMillis(); 446 else 447 if(second_xmit == 0) 448 second_xmit=System.currentTimeMillis(); 449 else 450 if(third_xmit == 0) 451 third_xmit=System.currentTimeMillis(); 452 else 453 if(fourth_xmit == 0) 454 fourth_xmit=System.currentTimeMillis(); 455 } 456 457 458 461 boolean isCorrect() { 462 long t; 463 long expected; 464 long diff, delta; 465 boolean off=false; 466 467 t=first_xmit - start_time; 468 expected=xmit_timeouts[0]; 469 diff=Math.abs(expected - t); 470 delta=(long)(expected * PERCENTAGE_OFF); 471 if(diff >= delta) off=true; 472 473 t=second_xmit - first_xmit; 474 expected=xmit_timeouts[1]; 475 diff=Math.abs(expected - t); 476 delta=(long)(expected * PERCENTAGE_OFF); 477 if(diff >= delta) off=true; 478 479 t=third_xmit - second_xmit; 480 expected=xmit_timeouts[2]; 481 diff=Math.abs(expected - t); 482 delta=(long)(expected * PERCENTAGE_OFF); 483 if(diff >= delta) off=true; 484 485 t=fourth_xmit - third_xmit; 486 expected=xmit_timeouts[3]; 487 diff=Math.abs(expected - t); 488 delta=(long)(expected * PERCENTAGE_OFF); 489 if(diff >= delta) off=true; 490 491 if(off) { 492 System.err.println("#" + seqno + ": " + this + ": (" + "entry is more than " + 493 PERCENTAGE_OFF + " percentage off "); 494 return false; 495 } 496 return true; 497 } 498 499 public String toString() { 500 StringBuilder sb=new StringBuilder (); 501 sb.append(first_xmit - start_time).append(", ").append(second_xmit - first_xmit).append(", "); 502 sb.append(third_xmit - second_xmit).append(", ").append(fourth_xmit - third_xmit); 503 return sb.toString(); 504 } 505 } 506 507 508 509 public static Test suite() { 510 TestSuite suite; 511 suite=new TestSuite(TimeSchedulerTest.class); 512 return (suite); 513 } 514 515 public static void main(String [] args) { 516 String [] name={TimeSchedulerTest.class.getName()}; 517 junit.textui.TestRunner.main(name); 518 } 519 } 520 | Popular Tags |