1 52 53 package com.go.trove.util; 54 55 import java.util.*; 56 57 68 public class ThreadPool extends ThreadGroup { 69 private static int cThreadID; 70 71 private synchronized static int nextThreadID() { 72 return cThreadID++; 73 } 74 75 77 private long mTimeout = -1; 78 private long mIdleTimeout = -1; 79 80 82 private Collection mListeners = new LinkedList(); 83 84 86 private LinkedList mPool; 88 private int mMax; 89 private int mActive; 90 private boolean mDaemon; 91 private int mPriority; 92 private boolean mClosed; 93 94 102 public ThreadPool(String name, int max) 103 throws IllegalArgumentException { 104 105 this(name, max, true); 106 } 107 108 117 public ThreadPool(ThreadGroup parent, String name, int max) 118 throws IllegalArgumentException { 119 120 this(parent, name, max, true); 121 } 122 123 132 public ThreadPool(String name, int max, boolean daemon) 133 throws IllegalArgumentException { 134 135 super(name); 136 137 init(max, daemon); 138 } 139 140 150 public ThreadPool(ThreadGroup parent, String name, int max,boolean daemon) 151 throws IllegalArgumentException { 152 153 super(parent, name); 154 155 init(max, daemon); 156 } 157 158 private void init(int max, boolean daemon) 159 throws IllegalArgumentException { 160 161 if (max <= 0) { 162 throw new IllegalArgumentException 163 ("Maximum number of threads must be greater than zero: " + 164 max); 165 } 166 167 mMax = max; 168 169 mDaemon = daemon; 170 mPriority = Thread.currentThread().getPriority(); 171 mClosed = false; 172 173 mPool = new LinkedList(); 174 } 175 176 182 public synchronized void setTimeout(long timeout) { 183 mTimeout = timeout; 184 } 185 186 190 public synchronized long getTimeout() { 191 return mTimeout; 192 } 193 194 198 public synchronized void setIdleTimeout(long timeout) { 199 mIdleTimeout = timeout; 200 } 201 202 206 public synchronized long getIdleTimeout() { 207 return mIdleTimeout; 208 } 209 210 public void addThreadPoolListener(ThreadPoolListener listener) { 211 synchronized (mListeners) { 212 mListeners.add(listener); 213 } 214 } 215 216 public void removeThreadPoolListener(ThreadPoolListener listener) { 217 synchronized (mListeners) { 218 mListeners.remove(listener); 219 } 220 } 221 222 226 public int getPriority() { 227 synchronized (mPool) { 228 return mPriority; 229 } 230 } 231 232 237 public void setPriority(int priority) throws IllegalArgumentException { 238 if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { 239 throw new IllegalArgumentException 240 ("Priority out of range: " + priority); 241 } 242 243 synchronized (mPool) { 244 mPriority = priority; 245 } 246 } 247 248 251 public int getMaximumAllowed() { 252 synchronized (mPool) { 253 return mMax; 254 } 255 } 256 257 260 public int getAvailableCount() { 261 synchronized (mPool) { 262 return mPool.size(); 263 } 264 } 265 266 270 public int getPooledCount() { 271 synchronized (mPool) { 272 return mActive; 273 } 274 } 275 276 279 public int getThreadCount() { 280 return activeCount(); 281 } 282 283 286 public Thread [] getAllThreads() { 287 int count = activeCount(); 288 Thread [] threads = new Thread [count]; 289 count = enumerate(threads); 290 if (count >= threads.length) { 291 return sort(threads); 292 } 293 else { 294 Thread [] newThreads = new Thread [count]; 295 System.arraycopy(threads, 0, newThreads, 0, count); 296 return sort(newThreads); 297 } 298 } 299 300 private Thread [] sort(Thread [] threads) { 301 Comparator c = BeanComparator.forClass(Thread .class) 302 .orderBy("threadGroup.name") 303 .orderBy("name") 304 .orderBy("priority"); 305 Arrays.sort(threads, c); 306 return threads; 307 } 308 309 321 public Thread start(Runnable target) 322 throws NoThreadException, InterruptedException 323 { 324 try { 325 return start0(target, getTimeout(), null); 326 } 327 catch (NoThreadException e) { 328 e.fillInStackTrace(); 329 throw e; 330 } 331 } 332 333 347 public Thread start(Runnable target, long timeout) 348 throws NoThreadException, InterruptedException 349 { 350 try { 351 return start0(target, timeout, null); 352 } 353 catch (NoThreadException e) { 354 e.fillInStackTrace(); 355 throw e; 356 } 357 } 358 359 360 373 public Thread start(Runnable target, String name) 374 throws NoThreadException, InterruptedException 375 { 376 try { 377 return start0(target, getTimeout(), name); 378 } 379 catch (NoThreadException e) { 380 e.fillInStackTrace(); 381 throw e; 382 } 383 } 384 385 400 public Thread start(Runnable target, long timeout, String name) 401 throws NoThreadException, InterruptedException 402 { 403 try { 404 return start0(target, timeout, name); 405 } 406 catch (NoThreadException e) { 407 e.fillInStackTrace(); 408 throw e; 409 } 410 } 411 412 private Thread start0(Runnable target, long timeout, String name) 413 throws NoThreadException, InterruptedException 414 { 415 PooledThread thread; 416 417 while (true) { 418 synchronized (mPool) { 419 closeCheck(); 420 421 if (mPool.size() > 0) { 423 thread = (PooledThread)mPool.removeLast(); 424 } 425 else { 426 if (mActive < mMax) { 429 return startThread(target, name); 430 } 431 else { 432 break; 433 } 434 } 435 } 436 437 if (name != null) { 438 thread.setName(name); 439 } 440 441 if (thread.setTarget(target)) { 442 return thread; 443 } 444 445 thread.join(); 449 } 450 451 if (timeout == 0) { 452 throw new NoThreadException("No thread available from " + this); 453 } 454 455 synchronized (mPool) { 457 closeCheck(); 458 459 if (timeout < 0) { 460 while (mPool.size() <= 0) { 461 mPool.wait(0); 462 closeCheck(); 463 } 464 } 465 else { 466 long expireTime = System.currentTimeMillis() + timeout; 467 while (mPool.size() <= 0) { 468 mPool.wait(timeout); 469 closeCheck(); 470 471 if (mPool.size() <= 0 && 474 System.currentTimeMillis() > expireTime) { 475 476 throw new NoThreadException 477 ("No thread available after waiting " + 478 timeout + " milliseconds: " + this); 479 } 480 } 481 } 482 483 thread = (PooledThread)mPool.removeLast(); 484 if (name != null) { 485 thread.setName(name); 486 } 487 488 if (thread.setTarget(target)) { 489 return thread; 490 } 491 } 492 493 thread.join(); 497 return startThread(target, name); 498 } 499 500 public boolean isClosed() { 501 return mClosed; 502 } 503 504 509 public void close() throws InterruptedException { 510 close(getTimeout()); 511 } 512 513 522 public void close(long timeout) throws InterruptedException { 523 synchronized (mPool) { 524 mClosed = true; 525 mPool.notifyAll(); 526 527 if (timeout != 0) { 528 if (timeout < 0) { 529 while (mActive > 0) { 530 mPool.wait(0); 532 } 533 } 534 else { 535 long expireTime = System.currentTimeMillis() + timeout; 536 while (mActive > 0) { 537 mPool.wait(timeout); 538 if (System.currentTimeMillis() > expireTime) { 539 break; 540 } 541 } 542 } 543 } 544 } 545 546 interrupt(); 547 } 548 549 private PooledThread startThread(Runnable target, String name) { 550 PooledThread thread; 551 552 synchronized (mPool) { 553 mActive++; 554 thread = new PooledThread(getName() + ' ' + nextThreadID()); 555 thread.setPriority(mPriority); 556 thread.setDaemon(mDaemon); 557 558 if (name != null) { 559 thread.setName(name); 560 } 561 562 thread.setTarget(target); 563 thread.start(); 564 } 565 566 ThreadPoolEvent event = new ThreadPoolEvent(this, thread); 567 synchronized (mListeners) { 568 for (Iterator it = mListeners.iterator(); it.hasNext();) { 569 ((ThreadPoolListener)it.next()).threadStarted(event); 570 } 571 } 572 573 return thread; 574 } 575 576 private void closeCheck() throws NoThreadException { 577 if (mClosed) { 578 throw new NoThreadException("Thread pool is closed", true); 579 } 580 } 581 582 void threadAvailable(PooledThread thread) { 583 synchronized (mPool) { 584 if (thread.getPriority() != mPriority) { 585 thread.setPriority(mPriority); 586 } 587 mPool.addLast(thread); 588 mPool.notify(); 589 } 590 } 591 592 void threadExiting(PooledThread thread) { 593 synchronized (mPool) { 594 if (mPool.remove(thread)) { 595 mActive--; 596 597 ThreadPoolEvent event = new ThreadPoolEvent(this, thread); 598 synchronized (mListeners) { 599 for (Iterator it = mListeners.iterator(); it.hasNext();) { 600 ((ThreadPoolListener)it.next()).threadExiting(event); 601 } 602 } 603 604 mPool.notify(); 605 } 606 } 607 } 608 609 private class PooledThread extends Thread { 610 private String mOriginalName; 611 private Runnable mTarget; 612 private boolean mExiting; 613 614 public PooledThread(String name) { 615 super(ThreadPool.this, name); 616 mOriginalName = name; 617 } 618 619 synchronized boolean setTarget(Runnable target) { 620 if (mTarget != null) { 621 throw new IllegalStateException 622 ("Target runnable in pooled thread is already set"); 623 } 624 625 if (mExiting) { 626 return false; 627 } 628 else { 629 mTarget = target; 630 notify(); 631 return true; 632 } 633 } 634 635 private synchronized Runnable waitForTarget() { 636 Runnable target; 637 638 if ((target = mTarget) == null) { 639 long idle = getIdleTimeout(); 640 641 if ((target = mTarget) == null) { 642 if (idle != 0) { 643 try { 644 if (idle < 0) { 645 wait(0); 646 } 647 else { 648 wait(idle); 649 } 650 } 651 catch (InterruptedException e) { 652 } 653 } 654 655 if ((target = mTarget) == null) { 656 mExiting = true; 657 } 658 } 659 } 660 661 return target; 662 } 663 664 public void run() { 665 try { 666 while (!isClosed()) { 667 if (Thread.interrupted()) { 668 continue; 669 } 670 671 Runnable target; 672 673 if ((target = waitForTarget()) == null) { 674 break; 675 } 676 677 try { 678 target.run(); 679 } 680 catch (ThreadDeath death) { 681 break; 682 } 683 catch (Throwable e) { 684 uncaughtException(Thread.currentThread(), e); 685 e = null; 686 } 687 688 target = null; 691 692 mTarget = null; 693 setName(mOriginalName); 694 threadAvailable(this); 695 } 696 } 697 finally { 698 threadExiting(this); 699 } 700 } 701 } 702 } 703 | Popular Tags |