1 33 34 package EDU.oswego.cs.dl.util.concurrent; 35 import java.util.*; 36 37 334 335 public class PooledExecutor extends ThreadFactoryUser implements Executor { 336 337 341 public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE; 342 343 347 public static final int DEFAULT_MINIMUMPOOLSIZE = 1; 348 349 354 public static final long DEFAULT_KEEPALIVETIME = 60 * 1000; 355 356 357 protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE; 358 359 360 protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE; 361 362 363 protected int poolSize_ = 0; 364 365 366 protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME; 367 368 372 protected boolean shutdown_ = false; 373 374 377 protected final Channel handOff_; 378 379 385 protected final Map threads_; 386 387 388 protected BlockedExecutionHandler blockedExecutionHandler_; 389 390 393 394 public PooledExecutor() { 395 this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE); 396 } 397 398 402 403 public PooledExecutor(int maxPoolSize) { 404 this(new SynchronousChannel(), maxPoolSize); 405 } 406 407 411 412 public PooledExecutor(Channel channel) { 413 this(channel, DEFAULT_MAXIMUMPOOLSIZE); 414 } 415 416 420 421 public PooledExecutor(Channel channel, int maxPoolSize) { 422 maximumPoolSize_ = maxPoolSize; 423 handOff_ = channel; 424 runWhenBlocked(); 425 threads_ = new HashMap(); 426 } 427 428 433 public synchronized int getMaximumPoolSize() { 434 return maximumPoolSize_; 435 } 436 437 447 public synchronized void setMaximumPoolSize(int newMaximum) { 448 if (newMaximum <= 0) throw new IllegalArgumentException (); 449 maximumPoolSize_ = newMaximum; 450 } 451 452 458 public synchronized int getMinimumPoolSize() { 459 return minimumPoolSize_; 460 } 461 462 469 public synchronized void setMinimumPoolSize(int newMinimum) { 470 if (newMinimum < 0) throw new IllegalArgumentException (); 471 minimumPoolSize_ = newMinimum; 472 } 473 474 479 public synchronized int getPoolSize() { 480 return poolSize_; 481 } 482 483 488 public synchronized long getKeepAliveTime() { 489 return keepAliveTime_; 490 } 491 492 497 public synchronized void setKeepAliveTime(long msecs) { 498 keepAliveTime_ = msecs; 499 } 500 501 502 public synchronized BlockedExecutionHandler getBlockedExecutionHandler() { 503 return blockedExecutionHandler_; 504 } 505 506 507 public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) { 508 blockedExecutionHandler_ = h; 509 } 510 511 515 protected void addThread(Runnable command) { 516 Worker worker = new Worker(command); 517 Thread thread = getThreadFactory().newThread(worker); 518 threads_.put(worker, thread); 519 ++poolSize_; 520 thread.start(); 521 } 522 523 528 public int createThreads(int numberOfThreads) { 529 int ncreated = 0; 530 for (int i = 0; i < numberOfThreads; ++i) { 531 synchronized(this) { 532 if (poolSize_ < maximumPoolSize_) { 533 addThread(null); 534 ++ncreated; 535 } 536 else 537 break; 538 } 539 } 540 return ncreated; 541 } 542 543 550 public synchronized void interruptAll() { 551 for (Iterator it = threads_.values().iterator(); it.hasNext(); ) { 552 Thread t = (Thread )(it.next()); 553 t.interrupt(); 554 } 555 } 556 557 562 public void shutdownNow() { 563 shutdownNow(new DiscardWhenBlocked()); 564 } 565 566 572 public synchronized void shutdownNow(BlockedExecutionHandler handler) { 573 setBlockedExecutionHandler(handler); 574 shutdown_ = true; minimumPoolSize_ = maximumPoolSize_ = 0; interruptAll(); } 578 579 584 public void shutdownAfterProcessingCurrentlyQueuedTasks() { 585 shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked()); 586 } 587 588 594 public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) { 595 setBlockedExecutionHandler(handler); 596 shutdown_ = true; 597 if (poolSize_ == 0) minimumPoolSize_ = maximumPoolSize_ = 0; 599 } 600 601 605 public synchronized boolean isTerminatedAfterShutdown() { 606 return shutdown_ && poolSize_ == 0; 607 } 608 609 620 public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException { 621 if (!shutdown_) 622 throw new IllegalStateException (); 623 if (poolSize_ == 0) 624 return true; 625 long waitTime = maxWaitTime; 626 if (waitTime <= 0) 627 return false; 628 long start = System.currentTimeMillis(); 629 for (;;) { 630 wait(waitTime); 631 if (poolSize_ == 0) 632 return true; 633 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 634 if (waitTime <= 0) 635 return false; 636 } 637 } 638 639 647 public synchronized void awaitTerminationAfterShutdown() throws InterruptedException { 648 if (!shutdown_) 649 throw new IllegalStateException (); 650 while (poolSize_ > 0) 651 wait(); 652 } 653 654 671 public List drain() { 672 boolean wasInterrupted = false; 673 Vector tasks = new Vector(); 674 for (;;) { 675 try { 676 Object x = handOff_.poll(0); 677 if (x == null) 678 break; 679 else 680 tasks.addElement(x); 681 } 682 catch (InterruptedException ex) { 683 wasInterrupted = true; } 685 } 686 if (wasInterrupted) Thread.currentThread().interrupt(); 687 return tasks; 688 } 689 690 693 protected synchronized void workerDone(Worker w) { 694 threads_.remove(w); 695 if (--poolSize_ == 0 && shutdown_) { 696 maximumPoolSize_ = minimumPoolSize_ = 0; notifyAll(); } 699 700 if (poolSize_ == 0 || poolSize_ < minimumPoolSize_) { 702 try { 703 Runnable r = (Runnable )(handOff_.poll(0)); 704 if (r != null && !shutdown_) addThread(r); 706 } catch(InterruptedException ie) { 707 return; 708 } 709 } 710 } 711 712 715 protected Runnable getTask() throws InterruptedException { 716 long waitTime; 717 synchronized(this) { 718 if (poolSize_ > maximumPoolSize_) return null; 720 waitTime = (shutdown_)? 0 : keepAliveTime_; 721 } 722 if (waitTime >= 0) 723 return (Runnable )(handOff_.poll(waitTime)); 724 else 725 return (Runnable )(handOff_.take()); 726 } 727 728 729 732 protected class Worker implements Runnable { 733 protected Runnable firstTask_; 734 735 protected Worker(Runnable firstTask) { firstTask_ = firstTask; } 736 737 public void run() { 738 try { 739 Runnable task = firstTask_; 740 firstTask_ = null; 742 if (task != null) { 743 task.run(); 744 task = null; 745 } 746 747 while ( (task = getTask()) != null) { 748 task.run(); 749 task = null; 750 } 751 } 752 catch (InterruptedException ex) { } finally { 754 workerDone(this); 755 } 756 } 757 } 758 759 766 public interface BlockedExecutionHandler { 767 771 boolean blockedAction(Runnable command) throws InterruptedException ; 772 } 773 774 775 protected class RunWhenBlocked implements BlockedExecutionHandler { 776 public boolean blockedAction(Runnable command) { 777 command.run(); 778 return true; 779 } 780 } 781 782 787 public void runWhenBlocked() { 788 setBlockedExecutionHandler(new RunWhenBlocked()); 789 } 790 791 792 protected class WaitWhenBlocked implements BlockedExecutionHandler { 793 public boolean blockedAction(Runnable command) throws InterruptedException { 794 synchronized(PooledExecutor.this) { 795 if (shutdown_) 796 return true; 797 } 798 handOff_.put(command); 799 return true; 800 } 801 } 802 803 808 public void waitWhenBlocked() { 809 setBlockedExecutionHandler(new WaitWhenBlocked()); 810 } 811 812 813 protected class DiscardWhenBlocked implements BlockedExecutionHandler { 814 public boolean blockedAction(Runnable command) { 815 return true; 816 } 817 } 818 819 823 public void discardWhenBlocked() { 824 setBlockedExecutionHandler(new DiscardWhenBlocked()); 825 } 826 827 828 829 protected class AbortWhenBlocked implements BlockedExecutionHandler { 830 public boolean blockedAction(Runnable command) { 831 throw new RuntimeException ("Pool is blocked"); 832 } 833 } 834 835 839 public void abortWhenBlocked() { 840 setBlockedExecutionHandler(new AbortWhenBlocked()); 841 } 842 843 844 850 protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler { 851 public boolean blockedAction(Runnable command) throws InterruptedException { 852 handOff_.poll(0); 853 if (!handOff_.offer(command, 0)) 854 command.run(); 855 return true; 856 } 857 } 858 859 863 public void discardOldestWhenBlocked() { 864 setBlockedExecutionHandler(new DiscardOldestWhenBlocked()); 865 } 866 867 872 public void execute(Runnable command) throws InterruptedException { 873 for (;;) { 874 synchronized(this) { 875 if (!shutdown_) { 876 int size = poolSize_; 877 878 if (size < minimumPoolSize_) { 880 addThread(command); 881 return; 882 } 883 884 if (handOff_.offer(command, 0)) { 886 return; 887 } 888 889 if (size < maximumPoolSize_) { 891 addThread(command); 892 return; 893 } 894 } 895 } 896 897 if (getBlockedExecutionHandler().blockedAction(command)) { 899 return; 900 } 901 } 902 } 903 } 904 | Popular Tags |