1 22 package org.jboss.test.util.test; 23 24 import java.util.Arrays ; 25 import java.util.HashMap ; 26 import java.util.HashSet ; 27 28 import org.jboss.util.threadpool.BasicThreadPool; 29 import org.jboss.util.threadpool.Task; 30 import org.jboss.logging.Logger; 31 import junit.framework.TestCase; 32 33 41 public class ThreadPoolTaskUnitTestCase extends TestCase 42 { 43 private static Logger log = Logger.getLogger(ThreadPoolTaskUnitTestCase.class); 44 45 46 static final int BASIC = 0; 47 48 49 static final int HOLD_START = 1; 50 51 52 Stats accepted = new Stats("Accepted"); 53 54 55 Stats rejected = new Stats("Rejected"); 56 57 58 Stats started = new Stats("Started"); 59 60 61 Stats completed = new Stats("Completed"); 62 63 64 Stats stopped = new Stats("Stopped"); 65 66 67 HashMap threadNames = new HashMap (); 68 69 74 public ThreadPoolTaskUnitTestCase(String name) 75 { 76 super(name); 77 } 78 79 protected void setUp() throws Exception 80 { 81 log.debug("====> Starting test: " + getName()); 82 } 83 84 protected void tearDown() throws Exception 85 { 86 log.debug("=====> Stopping test: " + getName()); 87 } 88 89 92 public void testBasic() throws Exception 93 { 94 BasicThreadPool pool = new BasicThreadPool(); 95 try 96 { 97 pool.runTask(new TestTask(BASIC, "test")); 98 completed.wait(1); 99 HashSet expected = makeExpected(new Object [] {"test"}); 100 assertEquals(expected, accepted.tasks); 101 assertEquals(expected, started.tasks); 102 assertEquals(expected, completed.tasks); 103 } 104 finally 105 { 106 pool.stop(true); 107 } 108 } 109 110 113 public void testMultipleBasic() throws Exception 114 { 115 BasicThreadPool pool = new BasicThreadPool(); 116 try 117 { 118 pool.runTask(new TestTask(BASIC, "test1")); 119 pool.runTask(new TestTask(BASIC, "test2")); 120 pool.runTask(new TestTask(BASIC, "test3")); 121 completed.wait(3); 122 HashSet expected = makeExpected(new Object [] {"test1", "test2", "test3"}); 123 assertEquals(expected, accepted.tasks); 124 assertEquals(expected, started.tasks); 125 assertEquals(expected, completed.tasks); 126 } 127 finally 128 { 129 pool.stop(true); 130 } 131 } 132 133 136 public void testSimplePooling() throws Exception 137 { 138 BasicThreadPool pool = new BasicThreadPool(); 139 pool.setMaximumPoolSize(1); 140 try 141 { 142 pool.runTask(new TestTask(BASIC, "test1")); 143 completed.wait(1); 144 pool.runTask(new TestTask(BASIC, "test2")); 145 completed.wait(2); 146 assertEquals(threadNames.get("test1"), threadNames.get("test2")); 147 } 148 finally 149 { 150 pool.stop(true); 151 } 152 } 153 154 157 public void testMultiplePooling() throws Exception 158 { 159 BasicThreadPool pool = new BasicThreadPool(); 160 try 161 { 162 pool.runTask(new TestTask(HOLD_START, "test1")); 163 started.wait(1); 164 pool.runTask(new TestTask(BASIC, "test2")); 165 completed.wait(1); 166 started.release("test1"); 167 completed.wait(2); 168 assertTrue("Shouldn't run on the same thread", threadNames.get("test1").equals(threadNames.get("test2")) == false); 169 } 170 finally 171 { 172 pool.stop(true); 173 } 174 } 175 176 179 public void testMaximumPool() throws Exception 180 { 181 BasicThreadPool pool = new BasicThreadPool(); 182 pool.setMaximumPoolSize(1); 183 try 184 { 185 pool.runTask(new TestTask(HOLD_START, "test1")); 186 started.wait(1); 187 pool.runTask(new TestTask(BASIC, "test2")); 188 Thread.sleep(1000); 189 assertEquals(0, completed.tasks.size()); 190 started.release("test1"); 191 completed.wait(2); 192 assertEquals(makeExpected(new Object [] {"test1", "test2"}), completed.tasks); 193 } 194 finally 195 { 196 pool.stop(true); 197 } 198 } 199 200 203 public void testMaximumQueue() throws Exception 204 { 205 BasicThreadPool pool = new BasicThreadPool(); 206 pool.setMaximumQueueSize(1); 207 pool.setMaximumPoolSize(1); 208 try 209 { 210 pool.runTask(new TestTask(HOLD_START, "test1")); 211 started.wait(1); 212 pool.runTask(new TestTask(BASIC, "test2")); 213 assertEquals(0, rejected.tasks.size()); 214 pool.runTask(new TestTask(BASIC, "test3")); 215 assertEquals(makeExpected(new Object [] {"test3"}), rejected.tasks); 216 217 started.release("test1"); 218 completed.wait(2); 219 assertEquals(makeExpected(new Object [] {"test1", "test2"}), completed.tasks); 220 } 221 finally 222 { 223 pool.stop(true); 224 } 225 } 226 227 230 public void testCompleteTimeout() throws Exception 231 { 232 BasicThreadPool pool = new BasicThreadPool(); 233 pool.setMaximumQueueSize(1); 234 pool.setMaximumPoolSize(1); 235 try 236 { 237 240 TestTask task = new TestTask(HOLD_START, "test1", 0, 10*1000, Task.WAIT_NONE); 241 pool.runTask(task); 242 started.wait(1); 243 started.release("test1"); 244 completed.wait(1); 245 246 249 task = new TestTask(HOLD_START, "test2", 0, 10*1000, Task.WAIT_NONE); 250 task.setRunSleepTime(12*1000); 251 pool.runTask(task); 252 started.wait(1); 253 started.release("test2"); 254 stopped.wait(1); 255 completed.wait(1); 256 257 task = new TestTask(HOLD_START, "test3", 0, 0, Task.WAIT_NONE); 259 pool.runTask(task); 260 started.wait(1); 261 started.release("test3"); 262 completed.wait(1); 263 264 267 task = new TestTask(HOLD_START, "test4", 0, 10*1000, Task.WAIT_NONE); 268 task.setRunSleepTime(12*1000); 269 pool.runTask(task); 270 started.wait(1); 271 started.release("test4"); 272 stopped.wait(1); 273 completed.wait(1); 274 } 275 finally 276 { 277 pool.stop(true); 278 } 279 } 280 281 public void testCompleteTimeoutWithSpinLoop() throws Exception 282 { 283 BasicThreadPool pool = new BasicThreadPool(); 284 pool.setMaximumQueueSize(1); 285 pool.setMaximumPoolSize(1); 286 try 287 { 288 291 TestTask task = new TestTask(HOLD_START, "test1", 0, 10*1000, Task.WAIT_NONE); 292 task.setRunSleepTime(Long.MAX_VALUE); 293 pool.runTask(task); 294 started.wait(1); 295 started.release("test1"); 296 stopped.wait(1); 297 completed.wait(1); 298 } 299 finally 300 { 301 pool.stop(true); 302 } 303 } 304 305 311 public synchronized void saveRunnableThreadName(String data, String name) 312 { 313 threadNames.put(data, name); 314 } 315 316 322 public HashSet makeExpected(Object [] expected) 323 { 324 return new HashSet (Arrays.asList(expected)); 325 } 326 327 330 public class TestTask implements Task 331 { 332 333 private int test; 334 335 private String data; 336 337 private long startTimeout; 338 339 private long completionTimeout; 340 341 private long runSleepTime; 342 343 private int waitType; 344 345 351 public TestTask(int test, String data) 352 { 353 this(test, data, 0, Task.WAIT_NONE); 354 } 355 356 364 public TestTask(int test, String data, long startTimeout, int waitType) 365 { 366 this(test, data, startTimeout, 0, waitType); 367 } 368 public TestTask(int test, String data, long startTimeout, 369 long completionTimeout, int waitType) 370 { 371 this.test = test; 372 this.data = data; 373 this.startTimeout = startTimeout; 374 this.completionTimeout = completionTimeout; 375 this.waitType = waitType; 376 } 377 378 public void execute() 379 { 380 saveThreadName(); 381 log.info("Start execute"); 382 if( runSleepTime > 0 ) 383 { 384 log.info("Begin spin loop"); 385 if( runSleepTime == Long.MAX_VALUE ) 386 { 387 while( true ) 388 ; 389 } 390 else 391 { 392 log.info("Begin sleep"); 393 try 394 { 395 Thread.sleep(runSleepTime); 396 } 397 catch(InterruptedException e) 398 { 399 } 400 } 401 } 402 log.info("End execute"); 403 } 404 405 public void saveThreadName() 406 { 407 saveRunnableThreadName(data, Thread.currentThread().getName()); 408 } 409 410 public void accepted(long time) 411 { 412 accepted.notify(data, time); 413 } 414 415 public void rejected(long time, Throwable throwable) 416 { 417 rejected.notify(data, time, throwable); 418 } 419 420 public void started(long time) 421 { 422 started.notify(data, time); 423 if (test == HOLD_START) 424 started.waitForRelease(data); 425 } 426 427 public void completed(long time, Throwable throwable) 428 { 429 completed.notify(data, time, throwable); 430 } 431 432 public long getCompletionTimeout() 433 { 434 return completionTimeout; 435 } 436 437 public int getPriority() 438 { 439 return Thread.NORM_PRIORITY; 440 } 441 442 public long getStartTimeout() 443 { 444 return startTimeout; 445 } 446 447 public int getWaitType() 448 { 449 return waitType; 450 } 451 452 public void stop() 453 { 454 stopped.notify(data); 455 } 456 457 public void setRunSleepTime(long runSleepTime) 458 { 459 this.runSleepTime = runSleepTime; 460 } 461 } 462 463 public class Stats 464 { 465 468 String name; 469 470 471 HashSet tasks = new HashSet (); 472 473 474 HashMap times = new HashMap (); 475 476 477 HashMap errors = new HashMap (); 478 479 480 HashSet releases = new HashSet (); 481 482 public Stats(String name) 483 { 484 this.name = name; 485 } 486 487 490 public void wait(int target) 491 throws InterruptedException 492 { 493 log.debug(Thread.currentThread().getName() + ": Waiting for " + name + " target=" + target); 494 synchronized (ThreadPoolTaskUnitTestCase.this) 495 { 496 while (tasks.size() < target) 497 ThreadPoolTaskUnitTestCase.this.wait(); 498 log.debug(Thread.currentThread().getName() + ": Waited for " + name + " target=" + target); 499 } 500 } 501 502 507 public void release(String data) 508 { 509 log.debug(Thread.currentThread().getName() + ": Releasing " + name + " data=" + data); 510 synchronized (ThreadPoolTaskUnitTestCase.this) 511 { 512 releases.add(data); 513 ThreadPoolTaskUnitTestCase.this.notifyAll(); 514 log.debug(Thread.currentThread().getName() + ": Released " + name + " data=" + data); 515 } 516 } 517 518 521 public void waitForRelease(String data) 522 { 523 log.debug(Thread.currentThread().getName() + ": Waiting for release " + name + " data=" + data); 524 synchronized (ThreadPoolTaskUnitTestCase.this) 525 { 526 try 527 { 528 while (releases.contains(data) == false) 529 ThreadPoolTaskUnitTestCase.this.wait(); 530 } 531 catch (InterruptedException ignored) 532 { 533 } 534 log.debug(Thread.currentThread().getName() + ": Waited for release " + name + " data=" + data); 535 } 536 } 537 538 541 public void notify(String data) 542 { 543 log.debug(Thread.currentThread().getName() + ": Notifying " + name + " data=" + data); 544 synchronized (ThreadPoolTaskUnitTestCase.this) 545 { 546 tasks.add(data); 547 ThreadPoolTaskUnitTestCase.this.notifyAll(); 548 log.debug(Thread.currentThread().getName() + ": Notified " + name + " data=" + data); 549 } 550 } 551 552 555 public void notify(String data, long time) 556 { 557 log.debug(Thread.currentThread().getName() + ": Notifying " + name + " data=" + data + " time=" + time); 558 synchronized (ThreadPoolTaskUnitTestCase.this) 559 { 560 tasks.add(data); 561 times.put(data, new Long (time)); 562 ThreadPoolTaskUnitTestCase.this.notifyAll(); 563 } 564 log.debug(Thread.currentThread().getName() + ": Notified " + name + " data=" + data + " time=" + time); 565 } 566 567 570 public void notify(String data, long time, Throwable throwable) 571 { 572 if (throwable != null) 573 log.debug(Thread.currentThread().getName() + ": Notifying " + name + " data=" + data + " time=" + time, throwable); 574 else 575 log.debug(Thread.currentThread().getName() + ": Notifying " + name + " data=" + data + " time=" + time + " throwable=null"); 576 synchronized (ThreadPoolTaskUnitTestCase.this) 577 { 578 tasks.add(data); 579 times.put(data, new Long (time)); 580 errors.put(data, throwable); 581 ThreadPoolTaskUnitTestCase.this.notifyAll(); 582 } 583 if (throwable != null) 584 log.debug(Thread.currentThread().getName() + ": Notified " + name + " data=" + data + " time=" + time + " throwable=" + throwable.getMessage()); 585 else 586 log.debug(Thread.currentThread().getName() + ": Notified " + name + " data=" + data + " time=" + time + " throwable=null"); 587 } 588 589 592 public void clear() 593 { 594 log.debug(Thread.currentThread().getName() + ": Clearing " + name); 595 synchronized (ThreadPoolTaskUnitTestCase.this) 596 { 597 tasks.clear(); 598 log.debug(Thread.currentThread().getName() + ": Cleared " + name); 599 } 600 } 601 } 602 } 603 | Popular Tags |