1 4 package com.tctest; 5 6 import com.tc.exception.TCRuntimeException; 7 import com.tc.object.config.ConfigVisitor; 8 import com.tc.object.config.DSOClientConfigHelper; 9 import com.tc.object.config.TransparencyClassSpec; 10 import com.tc.simulator.app.ApplicationConfig; 11 import com.tc.simulator.listener.ListenerProvider; 12 import com.tc.util.Assert; 13 import com.tctest.runner.AbstractTransparentApp; 14 15 import java.util.ArrayList ; 16 import java.util.List ; 17 import java.util.concurrent.Callable ; 18 import java.util.concurrent.CancellationException ; 19 import java.util.concurrent.CyclicBarrier ; 20 import java.util.concurrent.ExecutionException ; 21 import java.util.concurrent.ExecutorService ; 22 import java.util.concurrent.FutureTask ; 23 import java.util.concurrent.LinkedBlockingQueue ; 24 import java.util.concurrent.ThreadPoolExecutor ; 25 import java.util.concurrent.TimeUnit ; 26 import java.util.concurrent.TimeoutException ; 27 28 public class FutureTaskTestApp extends AbstractTransparentApp { 29 30 private final CyclicBarrier barrier; 31 private final CyclicBarrier barrier2 = new CyclicBarrier (2); 32 33 private final int NUM_OF_ITEMS = 500; 34 private final DataRoot root = new DataRoot(); 35 private final LinkedBlockingQueue workerQueue = new LinkedBlockingQueue (); 36 private final LinkedBlockingQueue resultQueue = new LinkedBlockingQueue (); 37 38 public FutureTaskTestApp(String appId, ApplicationConfig cfg, ListenerProvider listenerProvider) { 39 super(appId, cfg, listenerProvider); 40 barrier = new CyclicBarrier (getParticipantCount()); 41 } 42 43 public void run() { 44 try { 45 int index = barrier.await(); 46 47 testSharedDuringRunning(index); 48 testWithCallable(index); 49 testWithRunnable(index); 50 testWithMyFutureTask(index); 51 testWithLinkedBlockingQueue(index); 52 testWithExecutorService(index); 53 54 } catch (Throwable t) { 55 notifyError(t); 56 } 57 } 58 59 private void testSharedDuringRunning(int index) throws Exception { 60 if (index == 0) { 61 FutureTask task = new MyFutureTask(new MyCallable()); 62 Thread thread1 = new Thread (new GetUnSharedRunnable(task)); 63 thread1.start(); 64 Thread.sleep(1000); 65 root.setTask(task); 66 task.run(); 67 barrier2.await(); 68 } 69 70 barrier.await(); 71 } 72 73 private void testWithExecutorService(int index) throws Exception { 74 long startTime = System.currentTimeMillis(); 75 if (index == 0) { 76 List list = new ArrayList (); 77 for (int i = 0; i < NUM_OF_ITEMS; i++) { 78 Callable callable = new MyCallable(i); 79 list.add(callable); 80 } 81 root.setList(list); 82 } 83 84 barrier.await(); 85 86 if (index == 1) { 87 ExecutorService service = new ThreadPoolExecutor (10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue ()); 88 List futures = service.invokeAll(root.getList()); 89 root.setTasksList(futures); 90 } else { 91 List tasksList = root.getTasksList(); 92 while (tasksList == null) { 93 tasksList = root.getTasksList(); 94 } 95 for (int i = 0; i < NUM_OF_ITEMS; i++) { 96 System.err.println("Getting Task " + i); 97 Assert.assertEquals(root, ((FutureTask ) tasksList.get(i)).get()); 98 } 99 } 100 101 long endTime = System.currentTimeMillis(); 102 System.err.println("Elapsed time in ExecutorService: " + (endTime - startTime)); 103 104 barrier.await(); 105 } 106 107 private void testWithLinkedBlockingQueue(int index) throws Exception { 108 long startTime = System.currentTimeMillis(); 109 if (index == 0) { 110 for (int i = 0; i < NUM_OF_ITEMS; i++) { 111 System.err.println("Putting task " + i); 112 FutureTask task = new MyFutureTask(new MyCallable(i)); 113 workerQueue.put(task); 114 } 115 workerQueue.put("STOP"); 116 workerQueue.put("STOP"); 117 } else { 118 while (true) { 119 Object o = workerQueue.take(); 120 if ("STOP".equals(o)) { 121 break; 122 } else { 123 FutureTask task = (FutureTask ) o; 124 task.run(); 125 resultQueue.put(task); 126 } 127 } 128 } 129 if (index == 0) { 130 for (int i = 0; i < NUM_OF_ITEMS; i++) { 131 FutureTask task = (FutureTask ) resultQueue.take(); 132 133 Assert.assertEquals(root, task.get()); 134 } 135 } 136 137 long endTime = System.currentTimeMillis(); 138 System.err.println("Elapsed time in LinkedBlockingQueue: " + (endTime - startTime)); 139 140 barrier.await(); 141 } 142 143 private void testWithMyFutureTask(int index) throws Exception { 144 FutureTask task = new MyFutureTask(new MyCallable()); 145 basicRunTask(index, task); 146 147 task = new MyFutureTask(new MyLongCallable()); 148 basicCancelTask(index, task); 149 150 task = new MyFutureTask(new MySemiLongCallable()); 151 basicCancelTaskWithCompletion(index, task); 152 153 task = new MyFutureTask(new MyLongCallable()); 154 timeoutGetTask(index, task); 155 156 task = new MyFutureTask(new MyCallable()); 157 basicSet(index, (MyFutureTask) task); 158 159 task = new MyFutureTask(new MyCallable()); 160 basicSetException(index, (MyFutureTask) task); 161 162 task = new MyFutureTask(new MyCallable()); 163 basicRunAndResetException(index, (MyFutureTask) task); 164 } 165 166 private void testWithRunnable(int index) throws Exception { 167 FutureTask task = new FutureTask (new MyRunnable(), root); 168 basicRunTask(index, task); 169 170 task = new FutureTask (new MyLongRunnable(), root); 171 basicCancelTask(index, task); 172 173 task = new FutureTask (new MySemiLongRunnable(), root); 174 basicCancelTaskWithCompletion(index, task); 175 176 task = new FutureTask (new MyLongRunnable(), root); 177 basicCancelTask(index, task); 178 179 task = new FutureTask (new MyLongRunnable(), root); 180 timeoutGetTask(index, task); 181 } 182 183 private void testWithCallable(int index) throws Exception { 184 FutureTask task = new FutureTask (new MyCallable()); 185 basicRunTask(index, task); 186 187 task = new FutureTask (new MyLongCallable()); 188 basicCancelTask(index, task); 189 190 task = new FutureTask (new MySemiLongCallable()); 191 basicCancelTaskWithCompletion(index, task); 192 193 task = new FutureTask (new MyLongCallable()); 194 timeoutGetTask(index, task); 195 196 task = new FutureTask (new MyThrowable()); 197 basicRunTaskWithException(index, task); 198 } 199 200 private void basicSet(int index, MyFutureTask task) throws Exception { 201 if (index == 0) { 202 root.setTask(task); 203 } 204 205 barrier.await(); 206 207 if (index == 1) { 208 ((MyFutureTask) root.getTask()).set(root); 209 } 210 211 Object o = root.getTask().get(); 212 while (o == null) { 213 o = root.getTask().get(); 214 } 215 Assert.assertEquals(root, o); 216 217 Assert.assertTrue(root.getTask().isDone()); 218 219 barrier.await(); 220 } 221 222 private void basicRunAndResetException(int index, MyFutureTask task) throws Exception { 223 if (index == 0) { 224 root.setTask(task); 225 } 226 227 barrier.await(); 228 229 if (index == 1) { 230 boolean flag = ((MyFutureTask) root.getTask()).runAndReset(); 231 Assert.assertTrue(flag); 232 } 233 234 barrier.await(); 235 236 Assert.assertFalse(root.getTask().isDone()); 237 238 barrier.await(); 239 } 240 241 private void basicSetException(int index, MyFutureTask task) throws Exception { 242 final String exceptionMsg = "Test setting InterruptedException"; 243 if (index == 0) { 244 root.setTask(task); 245 } 246 247 barrier.await(); 248 249 if (index == 1) { 250 ((MyFutureTask) root.getTask()).setException(new InterruptedException (exceptionMsg)); 251 } 252 253 barrier.await(); 254 255 try { 256 root.getTask().get(); 257 throw new AssertionError ("Should have thrown an ExecutionException."); 258 } catch (ExecutionException e) { 259 Assert.assertEquals(exceptionMsg, e.getCause().getMessage()); 260 } 261 262 barrier.await(); 263 } 264 265 private void timeoutGetTask(int index, FutureTask longTask) throws Exception { 266 if (index == 0) { 267 root.setTask(longTask); 268 } 269 270 barrier.await(); 271 272 if (index == 1) { 273 root.getTask().run(); 274 } else if (index == 0) { 275 try { 276 root.getTask().get(10000, TimeUnit.MILLISECONDS); 277 throw new AssertionError ("Should have thrown a TimeoutException."); 278 } catch (TimeoutException e) { 279 root.getTask().cancel(true); 280 } 281 } 282 283 barrier.await(); 284 285 Assert.assertTrue(root.getTask().isCancelled()); 286 287 Assert.assertTrue(root.getTask().isDone()); 288 289 barrier.await(); 290 291 } 292 293 private void basicCancelTask(int index, FutureTask longTask) throws Exception { 294 if (index == 0) { 295 root.setTask(longTask); 296 } 297 298 barrier.await(); 299 300 if (index == 1) { 301 root.getTask().run(); 302 } else if (index == 0) { 303 root.getTask().cancel(true); 304 } 305 306 barrier.await(); 307 308 Assert.assertTrue(root.getTask().isCancelled()); 309 310 Assert.assertTrue(root.getTask().isDone()); 311 312 try { 313 root.getTask().get(); 314 throw new AssertionError ("Could have thrown a CancellationException."); 315 } catch (CancellationException e) { 316 } 318 319 barrier.await(); 320 321 } 322 323 private void basicCancelTaskWithCompletion(int index, FutureTask longTask) throws Exception { 324 if (index == 0) { 325 root.setTask(longTask); 326 } 327 328 barrier.await(); 329 330 if (index == 1) { 331 root.getTask().run(); 332 } else if (index == 0) { 333 root.getTask().cancel(false); 334 } 335 336 barrier.await(); 337 338 Assert.assertTrue(root.getTask().isCancelled()); 339 340 Assert.assertTrue(root.getTask().isDone()); 341 342 try { 343 root.getTask().get(); 344 throw new AssertionError ("Could have thrown a CancellationException."); 345 } catch (CancellationException e) { 346 } 348 349 barrier.await(); 350 351 } 352 353 private void basicRunTaskWithException(int index, FutureTask task) throws Exception { 354 if (index == 0) { 355 root.setTask(task); 356 } 357 358 barrier.await(); 359 360 if (index == 1) { 361 root.getTask().run(); 362 } 363 364 try { 365 root.getTask().get(); 366 throw new AssertionError ("Should have thrown an ExecutionException"); 367 } catch (ExecutionException e) { 368 Assert.assertTrue(e.getCause() instanceof InterruptedException ); 369 Assert.assertEquals(MyThrowable.EXCEPTION_MSG, e.getCause().getMessage()); 370 } 371 372 barrier.await(); 373 } 374 375 private void basicRunTask(int index, FutureTask task) throws Exception { 376 if (index == 0) { 377 root.setTask(task); 378 } 379 380 barrier.await(); 381 382 if (index == 1) { 383 384 root.getTask().run(); 385 } 386 387 Assert.assertEquals(root, root.getTask().get()); 388 389 Assert.assertTrue(root.getTask().isDone()); 390 391 barrier.await(); 392 } 393 394 public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) { 395 String testClass = FutureTaskTestApp.class.getName(); 396 TransparencyClassSpec spec = config.getOrCreateSpec(testClass); 397 398 config.addIncludePattern(testClass + "$*"); 399 400 String methodExpression = "* " + testClass + "*.*(..)"; 401 config.addWriteAutolock(methodExpression); 402 403 spec.addRoot("barrier", "barrier"); 404 spec.addRoot("barrier2", "barrier2"); 405 spec.addRoot("root", "root"); 406 spec.addRoot("workerQueue", "workerQueue"); 407 spec.addRoot("resultQueue", "resultQueue"); 408 } 409 410 private static class DataRoot { 411 private FutureTask task; 412 private List list; 413 private List tasksList; 414 415 public DataRoot() { 416 super(); 417 } 418 419 public synchronized void setTask(FutureTask task) { 420 this.task = task; 421 } 422 423 public synchronized FutureTask getTask() { 424 return this.task; 425 } 426 427 public List getList() { 428 return list; 429 } 430 431 public synchronized void setList(List list) { 432 this.list = list; 433 } 434 435 public synchronized List getTasksList() { 436 return tasksList; 437 } 438 439 public synchronized void setTasksList(List tasksList) { 440 this.tasksList = tasksList; 441 } 442 } 443 444 private class MyLongCallable implements Callable { 445 public Object call() throws Exception { 446 while (true) { 447 if (Thread.interrupted()) { 448 throw new InterruptedException (); 449 } 450 Thread.sleep(10000); 451 } 452 } 453 } 454 455 private class MySemiLongCallable implements Callable { 456 public Object call() throws Exception { 457 Thread.sleep(30000); 458 return root; 459 } 460 } 461 462 private class MyCallable implements Callable { 463 private Integer value; 464 465 public MyCallable(int i) { 466 this.value = new Integer (i); 467 } 468 469 public MyCallable() { 470 super(); 471 } 472 473 public Object call() throws Exception { 474 if (value != null) { 475 System.err.println("Running call() in MyCallable: " + value); 476 } 477 return root; 478 } 479 } 480 481 private class MyThrowable implements Callable { 482 public static final String EXCEPTION_MSG = "Test InterruptException"; 483 484 public Object call() throws Exception { 485 throw new InterruptedException (EXCEPTION_MSG); 486 } 487 } 488 489 private class MyLongRunnable implements Runnable { 490 public void run() { 491 try { 492 while (true) { 493 if (Thread.interrupted()) { 494 throw new InterruptedException (); 495 } 496 Thread.sleep(10000); 497 } 498 } catch (Exception e) { 499 throw new TCRuntimeException(e); 500 } 501 } 502 } 503 504 private class MySemiLongRunnable implements Runnable { 505 public void run() { 506 try { 507 Thread.sleep(30000); 508 } catch (Exception e) { 509 throw new TCRuntimeException(e); 510 } 511 } 512 } 513 514 private class MyRunnable implements Runnable { 515 public void run() { 516 } 518 } 519 520 private class MyFutureTask extends FutureTask { 521 public MyFutureTask(Callable callable) { 522 super(callable); 523 } 524 525 public MyFutureTask(Runnable runnable, Object result) { 526 super(runnable, result); 527 } 528 529 public synchronized void set(Object v) { 530 super.set(v); 531 } 532 533 public synchronized void setException(Throwable t) { 534 super.setException(t); 535 } 536 537 public boolean runAndReset() { 538 return super.runAndReset(); 539 } 540 } 541 542 private class GetUnSharedRunnable implements Runnable { 543 private FutureTask task; 544 545 public GetUnSharedRunnable(FutureTask task) { 546 this.task = task; 547 } 548 549 public void run() { 550 try { 551 Assert.assertEquals(root, task.get()); 552 barrier2.await(); 553 } catch (Exception e) { 554 throw new TCRuntimeException(e); 555 } 556 } 557 } 558 559 } 560 | Popular Tags |