1 17 18 package org.sape.carbon.services.threadpool.test; 19 20 import org.sape.carbon.core.component.Lookup; 21 import org.sape.carbon.core.component.lifecycle.LifecycleInterceptor; 22 import org.sape.carbon.core.config.interceptor.ConfigurationInterceptor; 23 24 import org.sape.carbon.services.threadpool.QueueFullPolicyEnum; 25 import org.sape.carbon.services.threadpool.TaskCallback; 26 import org.sape.carbon.services.threadpool.TaskInfo; 27 import org.sape.carbon.services.threadpool.TaskStatusEnum; 28 import org.sape.carbon.services.threadpool.ThreadPool; 29 import org.sape.carbon.services.threadpool.ThreadPoolConfiguration; 30 import org.sape.carbon.services.threadpool.ThreadPoolRuntimeException; 31 32 import junit.extensions.ActiveTestSuite; 33 import junit.framework.Test; 34 import junit.framework.TestCase; 35 import junit.framework.TestSuite; 36 37 44 public class ThreadPoolTest extends TestCase implements TaskCallback { 45 private boolean taskFailedCalled = false; 46 private boolean taskSucceededCalled = false; 47 48 private int callCount = 0; 49 private int waitingCount = 0; 50 private boolean waiterNotified = false; 51 private Thread executingThread; 52 53 54 public ThreadPoolTest(String name) { 55 super(name); 56 } 57 58 public void testConcurrentExecution() throws InterruptedException { 59 ThreadPool testPool = 60 (ThreadPool) Lookup.getInstance().fetchComponent( 61 ThreadPoolTest.TEST_THREAD_POOL); 62 63 Runnable testTask = new TestTask(); 65 TaskInfo[] taskInfos = new TaskInfo[10]; 66 for (int i = 0; i < taskInfos.length; i++) { 67 taskInfos[i] = testPool.execute(testTask, null); 68 } 69 70 synchronized (this) { 72 long start = System.currentTimeMillis(); 73 while (this.waitingCount != 5) { 74 assertTrue( 75 "Tasks did not start in acceptable time", 76 System.currentTimeMillis() - start < 1000); 77 wait(1); 78 } 79 } 80 81 int pendingCount = 0; 83 int executingCount = 0; 84 85 for (int i = 0; i < taskInfos.length; i++) { 86 if (taskInfos[i].getStatus() == TaskStatusEnum.PENDING) { 87 pendingCount++; 88 } else if (taskInfos[i].getStatus() == TaskStatusEnum.EXECUTING) { 89 executingCount++; 90 } 91 } 92 93 assertTrue( 94 "Incorrect number of tasks in queue, expected [5], actual [" + testPool.getQueueSize() + "]", 95 testPool.getPoolSize().equals(new Integer (5))); 96 assertTrue( 97 "Incorrect pending task count: expected [5] actual [" + pendingCount + "]", 98 pendingCount == 5); 99 assertTrue( 100 "Incorrect executing task count: expected [5] actual [" + executingCount + "]", 101 executingCount == 5); 102 103 synchronized (this) { 106 notifyAll(); 107 108 long start = System.currentTimeMillis(); 109 while (this.callCount != 5) { 110 assertTrue( 111 "Tasks did not execute in acceptable time", 112 System.currentTimeMillis() - start < 1000); 113 wait(1); 114 } 115 116 start = System.currentTimeMillis(); 117 while (this.waitingCount != 5) { 118 assertTrue( 119 "Tasks did not start in acceptable time", 120 System.currentTimeMillis() - start < 1000); 121 wait(1); 122 } 123 } 124 125 executingCount = 0; 127 int successCount = 0; 128 129 for (int i = 0; i < taskInfos.length; i++) { 130 if (taskInfos[i].getStatus() == TaskStatusEnum.SUCCESS) { 131 successCount++; 132 } else if (taskInfos[i].getStatus() == TaskStatusEnum.EXECUTING) { 133 executingCount++; 134 } 135 } 136 137 assertTrue( 138 "Incorrect successCount task count: expected [5] actual [" + successCount + "]", 139 successCount == 5); 140 assertTrue( 141 "Incorrect executing task count: expected [5] actual [" + executingCount + "]", 142 executingCount == 5); 143 144 synchronized (this) { 146 notifyAll(); 147 148 long start = System.currentTimeMillis(); 149 while (this.callCount != 10) { 150 assertTrue( 151 "Tasks did not execute in acceptable time", 152 System.currentTimeMillis() - start < 1000); 153 wait(1); 154 } 155 } 156 157 for (int i = 0; i < taskInfos.length; i++) { 159 assertTrue("Some tasks were not in success state", 160 taskInfos[i].getStatus() == TaskStatusEnum.SUCCESS); 161 } 162 } 163 164 public void testTaskFailures() throws InterruptedException { 165 ThreadPool testPool = 166 (ThreadPool) Lookup.getInstance().fetchComponent( 167 ThreadPoolTest.TEST_THREAD_POOL); 168 169 TaskInfo task = testPool.execute(new TestFailureTask(), null, this); 170 171 synchronized (this) { 173 wait(1000); 174 } 175 assertTrue( 176 "Task was not in failed state", 177 task.getStatus() == TaskStatusEnum.FAILED); 178 assertTrue( 179 "TaskInfo failure cause was not captured", 180 task.getFailureCause() != null); 181 assertTrue( 182 "Test thread pool did not capture failed task", 183 testPool.getFailedTasks().contains(task)); 184 assertTrue( 185 "Failure callback was not called", 186 getTaskFailedCalled()); 187 } 188 189 public void testWaitForTaskCompletionOnSuccess() throws InterruptedException { 190 ThreadPool testPool = 191 (ThreadPool) Lookup.getInstance().fetchComponent( 192 ThreadPoolTest.TEST_THREAD_POOL); 193 194 TaskInfo task = testPool.execute(new TestTask(), null); 196 testPool.execute(new WaitForTask(task), null); 197 198 synchronized (this) { 199 while (this.waitingCount != 1) { 200 wait(1); 201 } 202 notify(); 203 204 long start = System.currentTimeMillis(); 205 while (!this.waiterNotified) { 206 assertTrue( 207 "Thead waiting for task completion was not notified", 208 System.currentTimeMillis() - start < 1000); 209 wait(1); 210 } 211 } 212 } 213 214 public void testWaitForTaskCompletionOnFailure() throws InterruptedException { 215 ThreadPool testPool = 216 (ThreadPool) Lookup.getInstance().fetchComponent( 217 ThreadPoolTest.TEST_THREAD_POOL); 218 219 TaskInfo task = testPool.execute(new TestFailureTask(), "Test Task"); 221 testPool.execute(new WaitForTask(task), null); 222 223 synchronized (this) { 224 long start = System.currentTimeMillis(); 225 while (!this.waiterNotified) { 226 assertTrue( 227 "Thead waiting for task completion was not notified", 228 System.currentTimeMillis() - start < 1000); 229 wait(1); 230 } 231 } 232 233 } 234 235 public void testStopService() throws InterruptedException { 236 ThreadPool testPool = 237 (ThreadPool) Lookup.getInstance().fetchComponent( 238 ThreadPoolTest.TEST_THREAD_POOL); 239 240 testPool.execute(new TestFailureTask(), null); 242 Runnable testTask = new TestTask(); 243 TaskInfo[] taskInfos = new TaskInfo[10]; 244 for (int i = 0; i < taskInfos.length; i++) { 245 taskInfos[i] = testPool.execute(testTask, null); 246 } 247 248 synchronized (this) { 250 long start = System.currentTimeMillis(); 251 while (this.waitingCount != 5) { 252 assertTrue( 253 "Tasks did not start in acceptable time", 254 System.currentTimeMillis() - start < 1000); 255 wait(1); 256 } 257 } 258 259 ((LifecycleInterceptor) testPool).stopComponent(); 260 ((LifecycleInterceptor) testPool).startComponent(); 261 262 int pendingCount = 0; 263 for (int i = 0; i < taskInfos.length; i++) { 264 if (taskInfos[i].getStatus() == TaskStatusEnum.PENDING) { 265 pendingCount++; 266 } 267 } 268 269 assertTrue( 270 "Incorrect pending task count: expected [5] actual [" + pendingCount + "]", 271 pendingCount == 5); 272 273 assertTrue( 274 "Tasks still pending after service stopped", 275 testPool.getQueueSize().equals(new Integer (0))); 276 277 assertTrue( 278 "Failed tasks not cleared when service stopped", 279 testPool.getFailedTasks().isEmpty()); 280 } 281 282 public void testSuspendService() throws InterruptedException { 283 ThreadPool testPool = 284 (ThreadPool) Lookup.getInstance().fetchComponent( 285 ThreadPoolTest.TEST_THREAD_POOL); 286 287 testPool.execute(new TestFailureTask(), null); 289 Runnable testTask = new TestTask(); 290 TaskInfo[] taskInfos = new TaskInfo[10]; 291 for (int i = 0; i < taskInfos.length; i++) { 292 taskInfos[i] = testPool.execute(testTask, "Test Task"); 293 } 294 295 synchronized (this) { 297 long start = System.currentTimeMillis(); 298 while (this.waitingCount != 5) { 299 assertTrue( 300 "Tasks did not start in acceptable time", 301 System.currentTimeMillis() - start < 1000); 302 wait(1); 303 } 304 } 305 306 ((LifecycleInterceptor) testPool).suspendComponent(); 307 308 int pendingCount = 0; 309 for (int i = 0; i < taskInfos.length; i++) { 310 if (taskInfos[i].getStatus() == TaskStatusEnum.PENDING) { 311 pendingCount++; 312 } 313 } 314 315 assertTrue( 316 "Incorrect pending task count: expected [5] actual [" + pendingCount + "]", 317 pendingCount == 5); 318 319 ((LifecycleInterceptor) testPool).resumeComponent(); 320 321 synchronized (this) { 323 long start = System.currentTimeMillis(); 324 while (this.waitingCount != testPool.getPoolSize().intValue()) { 325 assertTrue( 326 "Tasks did not start in acceptable time", 327 System.currentTimeMillis() - start < 1000); 328 wait(1); 329 } 330 } 331 332 int executingCount = 0; 333 for (int i = 0; i < taskInfos.length; i++) { 334 if (taskInfos[i].getStatus() == TaskStatusEnum.EXECUTING) { 335 executingCount++; 336 } 337 } 338 339 assertTrue( 340 "Incorrect number of tasks executing, expected [" + 341 testPool.getPoolSize().intValue() + 342 "], actual [" + 343 executingCount + "]", 344 executingCount == testPool.getPoolSize().intValue()); 345 346 347 ((LifecycleInterceptor) testPool).destroyComponent(); 349 } 350 351 public void testRunWhenBlocked() throws InterruptedException { 352 ThreadPool testPool = 353 (ThreadPool) Lookup.getInstance().fetchComponent( 354 ThreadPoolTest.TEST_THREAD_POOL); 355 356 ((ThreadPoolConfiguration) testPool).setQueueFullPolicy(QueueFullPolicyEnum.RUN); 357 ((ConfigurationInterceptor) testPool).applyConfiguration(); 358 359 try { 360 fillQueue(testPool); 361 362 testPool.execute(new Runnable () { 364 public void run() { 365 ThreadPoolTest.this.executingThread = Thread.currentThread(); 366 } 367 }, null); 368 369 assertTrue( 370 "Task did not execute on current thread", 371 this.executingThread == Thread.currentThread()); 372 373 } finally { 374 synchronized (this) { 375 ((LifecycleInterceptor) testPool).destroyComponent(); 377 } 378 } 379 } 380 381 public void testWaitWhenBlocked() throws InterruptedException { 382 final ThreadPool testPool = 383 (ThreadPool) Lookup.getInstance().fetchComponent( 384 ThreadPoolTest.TEST_THREAD_POOL); 385 386 ((ThreadPoolConfiguration) testPool).setQueueFullPolicy(QueueFullPolicyEnum.WAIT); 387 ((ConfigurationInterceptor) testPool).applyConfiguration(); 388 389 Thread testThread = null; 390 try { 391 fillQueue(testPool); 392 393 testThread = new Thread (new Runnable () { 394 public void run() { 395 try { 396 TaskInfo info = testPool.execute(new Runnable () { 397 public void run() { 398 ThreadPoolTest.this.executingThread = Thread.currentThread(); 399 } 400 }, null); 401 } catch (ThreadPoolRuntimeException tpre) { 402 } 404 405 } 406 }); 407 408 testThread.start(); 409 testThread.join(1000); 410 411 assertTrue( 412 "Task was executed when it should have waited", 413 testThread.isAlive()); 414 415 } finally { 416 synchronized (this) { 417 ((LifecycleInterceptor) testPool).destroyComponent(); 419 if (testThread != null) testThread.interrupt(); 420 } 421 } 422 } 423 424 public void testAbortWhenBlocked() throws InterruptedException { 425 ThreadPool testPool = 426 (ThreadPool) Lookup.getInstance().fetchComponent( 427 ThreadPoolTest.TEST_THREAD_POOL); 428 429 ((ThreadPoolConfiguration) testPool).setQueueFullPolicy(QueueFullPolicyEnum.ABORT); 430 ((ConfigurationInterceptor) testPool).applyConfiguration(); 431 432 try { 433 fillQueue(testPool); 434 435 testPool.execute(new Runnable () { 437 public void run() { 438 } 439 }, null); 440 441 fail("Did not catch exception when queue was full"); 442 443 } catch (RuntimeException re) { 444 } finally { 446 synchronized (this) { 447 ((LifecycleInterceptor) testPool).destroyComponent(); 449 } 450 } 451 } 452 453 public void testDiscardWhenBlocked() throws InterruptedException { 454 ThreadPool testPool = 455 (ThreadPool) Lookup.getInstance().fetchComponent( 456 ThreadPoolTest.TEST_THREAD_POOL); 457 458 ((ThreadPoolConfiguration) testPool).setQueueFullPolicy(QueueFullPolicyEnum.DISCARD); 459 ((ConfigurationInterceptor) testPool).applyConfiguration(); 460 461 try { 462 fillQueue(testPool); 463 464 TaskInfo info = testPool.execute(new Runnable () { 466 public void run() { 467 } 468 }, null); 469 470 while (testPool.getQueueSize().intValue() > 0) { 472 synchronized (this) { 473 notifyAll(); 474 } 475 Thread.yield(); 476 } 477 478 assertTrue( 479 "Task was not discarded", 480 info.getStatus() == TaskStatusEnum.PENDING); 481 482 } finally { 483 synchronized (this) { 484 ((LifecycleInterceptor) testPool).destroyComponent(); 486 } 487 } 488 } 489 490 public void testDiscardOldestWhenBlocked() throws InterruptedException { 491 ThreadPool testPool = 492 (ThreadPool) Lookup.getInstance().fetchComponent( 493 ThreadPoolTest.TEST_THREAD_POOL); 494 495 ((ThreadPoolConfiguration) testPool).setQueueFullPolicy(QueueFullPolicyEnum.DISCARD_OLDEST); 496 ((ConfigurationInterceptor) testPool).applyConfiguration(); 497 498 try { 499 fillQueue(testPool); 500 501 TaskInfo info = testPool.execute(new Runnable () { 503 public void run() { 504 } 505 }, null); 506 507 while (testPool.getQueueSize().intValue() > 0) { 509 synchronized (this) { 510 notifyAll(); 511 } 512 Thread.yield(); 513 } 514 515 assertTrue( 516 "Task was discarded", 517 info.getStatus() != TaskStatusEnum.PENDING); 518 519 } finally { 520 synchronized (this) { 521 ((LifecycleInterceptor) testPool).destroyComponent(); 523 } 524 } 525 } 526 527 private void fillQueue(ThreadPool pool) throws InterruptedException { 528 int maxQueueSize = ((ThreadPoolConfiguration) pool).getTaskQueueSize(); 529 int poolSize = ((ThreadPoolConfiguration) pool).getThreadPoolSize(); 530 Runnable testTask = new TestTask(); 531 532 synchronized (this) { 533 while (this.waitingCount != poolSize) { 534 pool.execute(testTask, null); 535 wait(1); 536 } 537 } 538 539 while (pool.getQueueSize().intValue() < maxQueueSize) { 540 pool.execute(testTask, null); 541 } 542 } 543 544 public void testThreadPoolSize() { 545 ThreadPool testPool = 546 (ThreadPool) Lookup.getInstance().fetchComponent( 547 ThreadPoolTest.TEST_THREAD_POOL); 548 ((LifecycleInterceptor) testPool).destroyComponent(); 550 testPool = (ThreadPool) Lookup.getInstance().fetchComponent( 551 ThreadPoolTest.TEST_THREAD_POOL); 552 553 assertTrue( 554 "Wrong number of active threads", 555 testPool.getPoolSize().equals(new Integer (5))); 556 } 557 558 public void testCallback() throws InterruptedException { 559 ThreadPool testPool = 560 (ThreadPool) Lookup.getInstance().fetchComponent( 561 ThreadPoolTest.TEST_THREAD_POOL); 562 563 TaskInfo task = testPool.execute(new TestTask(), null, this); 564 565 synchronized (this) { 567 long start = System.currentTimeMillis(); 568 while (this.waitingCount != 1) { 569 assertTrue( 570 "Tasks did not start in acceptable time", 571 System.currentTimeMillis() - start < 1000); 572 wait(1); 573 } 574 notify(); 575 wait(1000); 576 } 577 578 assertTrue("Callback not called", getTaskSucceededCalled()); 579 } 580 581 private static final String TEST_THREAD_POOL = 582 "/threadpool/test/TestThreadPool"; 583 584 585 public static Test suite() { 586 TestSuite masterSuite = new TestSuite(); 587 Test singleThreadedTests = getSingleThreadedTests(); 589 if (singleThreadedTests != null) { 590 masterSuite.addTest(singleThreadedTests); 591 } 592 Test multiThreadedTests = getMultiThreadedTests(); 594 if (multiThreadedTests != null) { 595 masterSuite.addTest(multiThreadedTests); 596 } 597 return masterSuite; 598 } 599 600 606 private static Test getSingleThreadedTests() { 607 TestSuite suite = new TestSuite(); 608 609 suite.addTest(new ThreadPoolTest("testConcurrentExecution")); 610 suite.addTest(new ThreadPoolTest("testTaskFailures")); 611 suite.addTest(new ThreadPoolTest("testWaitForTaskCompletionOnSuccess")); 612 suite.addTest(new ThreadPoolTest("testWaitForTaskCompletionOnFailure")); 613 suite.addTest(new ThreadPoolTest("testStopService")); 614 suite.addTest(new ThreadPoolTest("testSuspendService")); 615 suite.addTest(new ThreadPoolTest("testRunWhenBlocked")); 616 suite.addTest(new ThreadPoolTest("testWaitWhenBlocked")); 617 suite.addTest(new ThreadPoolTest("testAbortWhenBlocked")); 618 suite.addTest(new ThreadPoolTest("testDiscardWhenBlocked")); 619 suite.addTest(new ThreadPoolTest("testDiscardOldestWhenBlocked")); 620 suite.addTest(new ThreadPoolTest("testThreadPoolSize")); 621 suite.addTest(new ThreadPoolTest("testCallback")); 622 623 return suite; 624 } 625 626 635 private static Test getMultiThreadedTests() { 636 TestSuite suite = new ActiveTestSuite(); 637 638 642 return suite; 643 } 644 645 650 private static void addTest(TestSuite suite, String testName, int number) { 651 for (int count = 0; count < number; count++) { 652 suite.addTest(new ThreadPoolTest(testName)); 653 } 654 } 655 656 private class TestTask implements Runnable { 657 public void run() { 658 synchronized (ThreadPoolTest.this) { 659 try { 660 ThreadPoolTest.this.waitingCount++; 661 ThreadPoolTest.this.wait(); 662 } catch (InterruptedException e) { 663 Thread.currentThread().interrupt(); 664 } finally { 665 ThreadPoolTest.this.waitingCount--; 666 ThreadPoolTest.this.callCount++; 667 } 668 } 669 } 670 } 671 672 private static class TestFailureTask implements Runnable { 673 public void run() { 674 throw new RuntimeException (); 675 } 676 } 677 678 private class WaitForTask implements Runnable { 679 private TaskInfo task; 680 681 public WaitForTask(TaskInfo task) { 682 this.task = task; 683 } 684 685 public void run() { 686 try { 687 task.waitUntilExecuted(); 688 synchronized (ThreadPoolTest.this) { 689 ThreadPoolTest.this.waiterNotified = true; 690 } 691 } catch (InterruptedException e) { 692 } 693 } 694 } 695 696 699 public synchronized void taskFailed(TaskInfo taskInfo) { 700 this.taskFailedCalled = true; 701 notifyAll(); 702 } 703 704 707 public synchronized void taskSucceeded(TaskInfo taskInfo) { 708 this.taskSucceededCalled = true; 709 notifyAll(); 710 } 711 712 public synchronized boolean getTaskFailedCalled() { 713 return this.taskFailedCalled; 714 } 715 716 public synchronized boolean getTaskSucceededCalled() { 717 return this.taskSucceededCalled; 718 } 719 720 } 721 | Popular Tags |