| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.PooledExecutor1 /* 2 File: PooledExecutor.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 19Jun1998 dl Create public version 12 29aug1998 dl rely on ThreadFactoryUser, 13 remove ThreadGroup-based methods 14 adjusted locking policies 15 3mar1999 dl Worker threads sense decreases in pool size 16 31mar1999 dl Allow supplied channel in constructor; 17 add methods createThreads, drain 18 15may1999 dl Allow infinite keepalives 19 21oct1999 dl add minimumPoolSize methods 20 7sep2000 dl BlockedExecutionHandler now an interface, 21 new DiscardOldestWhenBlocked policy 22 12oct2000 dl add shutdownAfterProcessingCurrentlyQueuedTasks 23 13nov2000 dl null out task ref after run 24 08apr2001 dl declare inner class ctor protected 25 12nov2001 dl Better shutdown support 26 Blocked exec handlers can throw IE 27 Simplify locking scheme 28 25jan2001 dl {get,set}BlockedExecutionHandler now public 29 17may2002 dl null out task var in worker run to enable GC. 30 30aug2003 dl check for new tasks when timing out 31 18feb2004 dl replace dead thread if no others left 32 */ 33 34 package EDU.oswego.cs.dl.util.concurrent; 35 import java.util.*; 36 37 /** 38 * A tunable, extensible thread pool class. The main supported public 39 * method is <code>execute(Runnable command)</code>, which can be 40 * called instead of directly creating threads to execute commands. 41 * 42 * <p> 43 * Thread pools can be useful for several, usually intertwined 44 * reasons: 45 * 46 * <ul> 47 * 48 * <li> To bound resource use. A limit can be placed on the maximum 49 * number of simultaneously executing threads. 50 * 51 * <li> To manage concurrency levels. A targeted number of threads 52 * can be allowed to execute simultaneously. 53 * 54 * <li> To manage a set of threads performing related tasks. 55 * 56 * <li> To minimize overhead, by reusing previously constructed 57 * Thread objects rather than creating new ones. (Note however 58 * that pools are hardly ever cure-alls for performance problems 59 * associated with thread construction, especially on JVMs that 60 * themselves internally pool or recycle threads.) 61 * 62 * </ul> 63 * 64 * These goals introduce a number of policy parameters that are 65 * encapsulated in this class. All of these parameters have defaults 66 * and are tunable, either via get/set methods, or, in cases where 67 * decisions should hold across lifetimes, via methods that can be 68 * easily overridden in subclasses. The main, most commonly set 69 * parameters can be established in constructors. Policy choices 70 * across these dimensions can and do interact. Be careful, and 71 * please read this documentation completely before using! See also 72 * the usage examples below. 73 * 74 * <dl> 75 * <dt> Queueing 76 * 77 * <dd> By default, this pool uses queueless synchronous channels to 78 * to hand off work to threads. This is a safe, conservative policy 79 * that avoids lockups when handling sets of requests that might 80 * have internal dependencies. (In these cases, queuing one task 81 * could lock up another that would be able to continue if the 82 * queued task were to run.) If you are sure that this cannot 83 * happen, then you can instead supply a queue of some sort (for 84 * example, a BoundedBuffer or LinkedQueue) in the constructor. 85 * This will cause new commands to be queued in cases where all 86 * MaximumPoolSize threads are busy. Queues are sometimes 87 * appropriate when each task is completely independent of others, 88 * so tasks cannot affect each others execution. For example, in an 89 * http server. <p> 90 * 91 * When given a choice, this pool always prefers adding a new thread 92 * rather than queueing if there are currently fewer than the 93 * current getMinimumPoolSize threads running, but otherwise always 94 * prefers queuing a request rather than adding a new thread. Thus, 95 * if you use an unbounded buffer, you will never have more than 96 * getMinimumPoolSize threads running. (Since the default 97 * minimumPoolSize is one, you will probably want to explicitly 98 * setMinimumPoolSize.) <p> 99 * 100 * While queuing can be useful in smoothing out transient bursts of 101 * requests, especially in socket-based services, it is not very 102 * well behaved when commands continue to arrive on average faster 103 * than they can be processed. Using bounds for both the queue and 104 * the pool size, along with run-when-blocked policy is often a 105 * reasonable response to such possibilities. <p> 106 * 107 * Queue sizes and maximum pool sizes can often be traded off for 108 * each other. Using large queues and small pools minimizes CPU 109 * usage, OS resources, and context-switching overhead, but can lead 110 * to artifically low throughput. Especially if tasks frequently 111 * block (for example if they are I/O bound), a JVM and underlying 112 * OS may be able to schedule time for more threads than you 113 * otherwise allow. Use of small queues or queueless handoffs 114 * generally requires larger pool sizes, which keeps CPUs busier but 115 * may encounter unacceptable scheduling overhead, which also 116 * decreases throughput. <p> 117 * 118 * <dt> Maximum Pool size 119 * 120 * <dd> The maximum number of threads to use, when needed. The pool 121 * does not by default preallocate threads. Instead, a thread is 122 * created, if necessary and if there are fewer than the maximum, 123 * only when an <code>execute</code> request arrives. The default 124 * value is (for all practical purposes) infinite -- 125 * <code>Integer.MAX_VALUE</code>, so should be set in the 126 * constructor or the set method unless you are just using the pool 127 * to minimize construction overhead. Because task handoffs to idle 128 * worker threads require synchronization that in turn relies on JVM 129 * scheduling policies to ensure progress, it is possible that a new 130 * thread will be created even though an existing worker thread has 131 * just become idle but has not progressed to the point at which it 132 * can accept a new task. This phenomenon tends to occur on some 133 * JVMs when bursts of short tasks are executed. <p> 134 * 135 * <dt> Minimum Pool size 136 * 137 * <dd> The minimum number of threads to use, when needed (default 138 * 1). When a new request is received, and fewer than the minimum 139 * number of threads are running, a new thread is always created to 140 * handle the request even if other worker threads are idly waiting 141 * for work. Otherwise, a new thread is created only if there are 142 * fewer than the maximum and the request cannot immediately be 143 * queued. <p> 144 * 145 * <dt> Preallocation 146 * 147 * <dd> You can override lazy thread construction policies via 148 * method createThreads, which establishes a given number of warm 149 * threads. Be aware that these preallocated threads will time out 150 * and die (and later be replaced with others if needed) if not used 151 * within the keep-alive time window. If you use preallocation, you 152 * probably want to increase the keepalive time. The difference 153 * between setMinimumPoolSize and createThreads is that 154 * createThreads immediately establishes threads, while setting the 155 * minimum pool size waits until requests arrive. <p> 156 * 157 * <dt> Keep-alive time 158 * 159 * <dd> If the pool maintained references to a fixed set of threads 160 * in the pool, then it would impede garbage collection of otherwise 161 * idle threads. This would defeat the resource-management aspects 162 * of pools. One solution would be to use weak references. However, 163 * this would impose costly and difficult synchronization issues. 164 * Instead, threads are simply allowed to terminate and thus be 165 * GCable if they have been idle for the given keep-alive time. The 166 * value of this parameter represents a trade-off between GCability 167 * and construction time. In most current Java VMs, thread 168 * construction and cleanup overhead is on the order of 169 * milliseconds. The default keep-alive value is one minute, which 170 * means that the time needed to construct and then GC a thread is 171 * expended at most once per minute. 172 * <p> 173 * 174 * To establish worker threads permanently, use a <em>negative</em> 175 * argument to setKeepAliveTime. <p> 176 * 177 * <dt> Blocked execution policy 178 * 179 * <dd> If the maximum pool size or queue size is bounded, then it 180 * is possible for incoming <code>execute</code> requests to 181 * block. There are four supported policies for handling this 182 * problem, and mechanics (based on the Strategy Object pattern) to 183 * allow others in subclasses: <p> 184 * 185 * <dl> 186 * <dt> Run (the default) 187 * <dd> The thread making the <code>execute</code> request 188 * runs the task itself. This policy helps guard against lockup. 189 * <dt> Wait 190 * <dd> Wait until a thread becomes available. This 191 * policy should, in general, not be used if the minimum number of 192 * of threads is zero, in which case a thread may never become 193 * available. 194 * <dt> Abort 195 * <dd> Throw a RuntimeException 196 * <dt> Discard 197 * <dd> Throw away the current request and return. 198 * <dt> DiscardOldest 199 * <dd> Throw away the oldest request and return. 200 * </dl> 201 * 202 * Other plausible policies include raising the maximum pool size 203 * after checking with some other objects that this is OK. <p> 204 * 205 * These cases can never occur if the maximum pool size is unbounded 206 * or the queue is unbounded. In these cases you instead face 207 * potential resource exhaustion.) The execute method does not 208 * throw any checked exceptions in any of these cases since any 209 * errors associated with them must normally be dealt with via 210 * handlers or callbacks. (Although in some cases, these might be 211 * associated with throwing unchecked exceptions.) You may wish to 212 * add special implementations even if you choose one of the listed 213 * policies. For example, the supplied Discard policy does not 214 * inform the caller of the drop. You could add your own version 215 * that does so. Since choice of policies is normally a system-wide 216 * decision, selecting a policy affects all calls to 217 * <code>execute</code>. If for some reason you would instead like 218 * to make per-call decisions, you could add variant versions of the 219 * <code>execute</code> method (for example, 220 * <code>executeIfWouldNotBlock</code>) in subclasses. <p> 221 * 222 * <dt> Thread construction parameters 223 * 224 * <dd> A settable ThreadFactory establishes each new thread. By 225 * default, it merely generates a new instance of class Thread, but 226 * can be changed to use a Thread subclass, to set priorities, 227 * ThreadLocals, etc. <p> 228 * 229 * <dt> Interruption policy 230 * 231 * <dd> Worker threads check for interruption after processing each 232 * command, and terminate upon interruption. Fresh threads will 233 * replace them if needed. Thus, new tasks will not start out in an 234 * interrupted state due to an uncleared interruption in a previous 235 * task. Also, unprocessed commands are never dropped upon 236 * interruption. It would conceptually suffice simply to clear 237 * interruption between tasks, but implementation characteristics of 238 * interruption-based methods are uncertain enough to warrant this 239 * conservative strategy. It is a good idea to be equally 240 * conservative in your code for the tasks running within pools. 241 * <p> 242 * 243 * <dt> Shutdown policy 244 * 245 * <dd> The interruptAll method interrupts, but does not disable the 246 * pool. Two different shutdown methods are supported for use when 247 * you do want to (permanently) stop processing tasks. Method 248 * shutdownAfterProcessingCurrentlyQueuedTasks waits until all 249 * current tasks are finished. The shutDownNow method interrupts 250 * current threads and leaves other queued requests unprocessed. 251 * <p> 252 * 253 * <dt> Handling requests after shutdown 254 * 255 * <dd> When the pool is shutdown, new incoming requests are handled 256 * by the blockedExecutionHandler. By default, the handler is set to 257 * discard new requests, but this can be set with an optional 258 * argument to method 259 * shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are 260 * using some form of queuing, you may wish to call method drain() 261 * to remove (and return) unprocessed commands from the queue after 262 * shutting down the pool and its clients. If you need to be sure 263 * these commands are processed, you can then run() each of the 264 * commands in the list returned by drain(). 265 * 266 * </dl> 267 * <p> 268 * 269 * <b>Usage examples.</b> 270 * <p> 271 * 272 * Probably the most common use of pools is in statics or singletons 273 * accessible from a number of classes in a package; for example: 274 * 275 * <pre> 276 * class MyPool { 277 * // initialize to use a maximum of 8 threads. 278 * static PooledExecutor pool = new PooledExecutor(8); 279 * } 280 * </pre> 281 * Here are some sample variants in initialization: 282 * <ol> 283 * <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only 284 * when needed due to incoming requests), but allowing 285 * up to 100 threads if the buffer gets full. 286 * <pre> 287 * pool = new PooledExecutor(new BoundedBuffer(10), 100); 288 * pool.setMinimumPoolSize(4); 289 * </pre> 290 * <li> Same as (1), except pre-start 9 threads, allowing them to 291 * die if they are not used for five minutes. 292 * <pre> 293 * pool = new PooledExecutor(new BoundedBuffer(10), 100); 294 * pool.setMinimumPoolSize(4); 295 * pool.setKeepAliveTime(1000 * 60 * 5); 296 * pool.createThreads(9); 297 * </pre> 298 * <li> Same as (2) except clients abort if both the buffer is full and 299 * all 100 threads are busy: 300 * <pre> 301 * pool = new PooledExecutor(new BoundedBuffer(10), 100); 302 * pool.setMinimumPoolSize(4); 303 * pool.setKeepAliveTime(1000 * 60 * 5); 304 * pool.abortWhenBlocked(); 305 * pool.createThreads(9); 306 * </pre> 307 * <li> An unbounded queue serviced by exactly 5 threads: 308 * <pre> 309 * pool = new PooledExecutor(new LinkedQueue()); 310 * pool.setKeepAliveTime(-1); // live forever 311 * pool.createThreads(5); 312 * </pre> 313 * </ol> 314 * 315 * <p> 316 * <b>Usage notes.</b> 317 * <p> 318 * 319 * Pools do not mesh well with using thread-specific storage via 320 * java.lang.ThreadLocal. ThreadLocal relies on the identity of a 321 * thread executing a particular task. Pools use the same thread to 322 * perform different tasks. <p> 323 * 324 * If you need a policy not handled by the parameters in this class 325 * consider writing a subclass. <p> 326 * 327 * Version note: Previous versions of this class relied on 328 * ThreadGroups for aggregate control. This has been removed, and the 329 * method interruptAll added, to avoid differences in behavior across 330 * JVMs. 331 * 332 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 333 **/ 334 335 public class PooledExecutor extends ThreadFactoryUser implements Executor { 336 337 /** 338 * The maximum pool size; used if not otherwise specified. Default 339 * value is essentially infinite (Integer.MAX_VALUE) 340 **/ 341 public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE; 342 343 /** 344 * The minimum pool size; used if not otherwise specified. Default 345 * value is 1. 346 **/ 347 public static final int DEFAULT_MINIMUMPOOLSIZE = 1; 348 349 /** 350 * The maximum time to keep worker threads alive waiting for new 351 * tasks; used if not otherwise specified. Default value is one 352 * minute (60000 milliseconds). 353 **/ 354 public static final long DEFAULT_KEEPALIVETIME = 60 * 1000; 355 356 /** The maximum number of threads allowed in pool. **/ 357 protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE; 358 359 /** The minumum number of threads to maintain in pool. **/ 360 protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE; 361 362 /** Current pool size. **/ 363 protected int poolSize_ = 0; 364 365 /** The maximum time for an idle thread to wait for new task. **/ 366 protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME; 367 368 /** 369 * Shutdown flag - latches true when a shutdown method is called 370 * in order to disable queuing/handoffs of new tasks. 371 **/ 372 protected boolean shutdown_ = false; 373 374 /** 375 * The channel used to hand off the command to a thread in the pool. 376 **/ 377 protected final Channel handOff_; 378 379 /** 380 * The set of active threads, declared as a map from workers to 381 * their threads. This is needed by the interruptAll method. It 382 * may also be useful in subclasses that need to perform other 383 * thread management chores. 384 **/ 385 protected final Map threads_; 386 387 /** The current handler for unserviceable requests. **/ 388 protected BlockedExecutionHandler blockedExecutionHandler_; 389 390 /** 391 * Create a new pool with all default settings 392 **/ 393 394 public PooledExecutor() { 395 this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE); 396 } 397 398 /** 399 * Create a new pool with all default settings except 400 * for maximum pool size. 401 **/ 402 403 public PooledExecutor(int maxPoolSize) { 404 this(new SynchronousChannel(), maxPoolSize); 405 } 406 407 /** 408 * Create a new pool that uses the supplied Channel for queuing, and 409 * with all default parameter settings. 410 **/ 411 412 public PooledExecutor(Channel channel) { 413 this(channel, DEFAULT_MAXIMUMPOOLSIZE); 414 } 415 416 /** 417 * Create a new pool that uses the supplied Channel for queuing, and 418 * with all default parameter settings except for maximum pool size. 419 **/ 420 421 public PooledExecutor(Channel channel, int maxPoolSize) { 422 maximumPoolSize_ = maxPoolSize; 423 handOff_ = channel; 424 runWhenBlocked(); 425 threads_ = new HashMap(); 426 } 427 428 /** 429 * Return the maximum number of threads to simultaneously execute 430 * New unqueued requests will be handled according to the current 431 * blocking policy once this limit is exceeded. 432 **/ 433 public synchronized int getMaximumPoolSize() { 434 return maximumPoolSize_; 435 } 436 437 /** 438 * Set the maximum number of threads to use. Decreasing the pool 439 * size will not immediately kill existing threads, but they may 440 * later die when idle. 441 * @exception IllegalArgumentException if less or equal to zero. 442 * (It is 443 * not considered an error to set the maximum to be less than than 444 * the minimum. However, in this case there are no guarantees 445 * about behavior.) 446 **/ 447 public synchronized void setMaximumPoolSize(int newMaximum) { 448 if (newMaximum <= 0) throw new IllegalArgumentException(); 449 maximumPoolSize_ = newMaximum; 450 } 451 452 /** 453 * Return the minimum number of threads to simultaneously execute. 454 * (Default value is 1). If fewer than the mininum number are 455 * running upon reception of a new request, a new thread is started 456 * to handle this request. 457 **/ 458 public synchronized int getMinimumPoolSize() { 459 return minimumPoolSize_; 460 } 461 462 /** 463 * Set the minimum number of threads to use. 464 * @exception IllegalArgumentException if less than zero. (It is not 465 * considered an error to set the minimum to be greater than the 466 * maximum. However, in this case there are no guarantees about 467 * behavior.) 468 **/ 469 public synchronized void setMinimumPoolSize(int newMinimum) { 470 if (newMinimum < 0) throw new IllegalArgumentException(); 471 minimumPoolSize_ = newMinimum; 472 } 473 474 /** 475 * Return the current number of active threads in the pool. This 476 * number is just a snaphot, and may change immediately upon 477 * returning 478 **/ 479 public synchronized int getPoolSize() { 480 return poolSize_; 481 } 482 483 /** 484 * Return the number of milliseconds to keep threads alive waiting 485 * for new commands. A negative value means to wait forever. A zero 486 * value means not to wait at all. 487 **/ 488 public synchronized long getKeepAliveTime() { 489 return keepAliveTime_; 490 } 491 492 /** 493 * Set the number of milliseconds to keep threads alive waiting for 494 * new commands. A negative value means to wait forever. A zero 495 * value means not to wait at all. 496 **/ 497 public synchronized void setKeepAliveTime(long msecs) { 498 keepAliveTime_ = msecs; 499 } 500 501 /** Get the handler for blocked execution **/ 502 public synchronized BlockedExecutionHandler getBlockedExecutionHandler() { 503 return blockedExecutionHandler_; 504 } 505 506 /** Set the handler for blocked execution **/ 507 public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) { 508 blockedExecutionHandler_ = h; 509 } 510 511 /** 512 * Create and start a thread to handle a new command. Call only 513 * when holding lock. 514 **/ 515 protected void addThread(Runnable command) { 516 Worker worker = new Worker(command); 517 Thread thread = getThreadFactory().newThread(worker); 518 threads_.put(worker, thread); 519 ++poolSize_; 520 thread.start(); 521 } 522 523 /** 524 * Create and start up to numberOfThreads threads in the pool. 525 * Return the number created. This may be less than the number 526 * requested if creating more would exceed maximum pool size bound. 527 **/ 528 public int createThreads(int numberOfThreads) { 529 int ncreated = 0; 530 for (int i = 0; i < numberOfThreads; ++i) { 531 synchronized(this) { 532 if (poolSize_ < maximumPoolSize_) { 533 addThread(null); 534 ++ncreated; 535 } 536 else 537 break; 538 } 539 } 540 return ncreated; 541 } 542 543 /** 544 * Interrupt all threads in the pool, causing them all to 545 * terminate. Assuming that executed tasks do not disable (clear) 546 * interruptions, each thread will terminate after processing its 547 * current task. Threads will terminate sooner if the executed tasks 548 * themselves respond to interrupts. 549 **/ 550 public synchronized void interruptAll() { 551 for (Iterator it = threads_.values().iterator(); it.hasNext(); ) { 552 Thread t = (Thread)(it.next()); 553 t.interrupt(); 554 } 555 } 556 557 /** 558 * Interrupt all threads and disable construction of new 559 * threads. Any tasks entered after this point will be discarded. A 560 * shut down pool cannot be restarted. 561 */ 562 public void shutdownNow() { 563 shutdownNow(new DiscardWhenBlocked()); 564 } 565 566 /** 567 * Interrupt all threads and disable construction of new 568 * threads. Any tasks entered after this point will be handled by 569 * the given BlockedExecutionHandler. A shut down pool cannot be 570 * restarted. 571 */ 572 public synchronized void shutdownNow(BlockedExecutionHandler handler) { 573 setBlockedExecutionHandler(handler); 574 shutdown_ = true; // don't allow new tasks 575 minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads 576 interruptAll(); // interrupt all existing threads 577 } 578 579 /** 580 * Terminate threads after processing all elements currently in 581 * queue. Any tasks entered after this point will be discarded. A 582 * shut down pool cannot be restarted. 583 **/ 584 public void shutdownAfterProcessingCurrentlyQueuedTasks() { 585 shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked()); 586 } 587 588 /** 589 * Terminate threads after processing all elements currently in 590 * queue. Any tasks entered after this point will be handled by the 591 * given BlockedExecutionHandler. A shut down pool cannot be 592 * restarted. 593 **/ 594 public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) { 595 setBlockedExecutionHandler(handler); 596 shutdown_ = true; 597 if (poolSize_ == 0) // disable new thread construction when idle 598 minimumPoolSize_ = maximumPoolSize_ = 0; 599 } 600 601 /** 602 * Return true if a shutDown method has succeeded in terminating all 603 * threads. 604 */ 605 public synchronized boolean isTerminatedAfterShutdown() { 606 return shutdown_ && poolSize_ == 0; 607 } 608 609 /** 610 * Wait for a shutdown pool to fully terminate, or until the timeout 611 * has expired. This method may only be called <em>after</em> 612 * invoking shutdownNow or 613 * shutdownAfterProcessingCurrentlyQueuedTasks. 614 * 615 * @param maxWaitTime the maximum time in milliseconds to wait 616 * @return true if the pool has terminated within the max wait period 617 * @exception IllegalStateException if shutdown has not been requested 618 * @exception InterruptedException if the current thread has been interrupted in the course of waiting 619 */ 620 public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException { 621 if (!shutdown_) 622 throw new IllegalStateException(); 623 if (poolSize_ == 0) 624 return true; 625 long waitTime = maxWaitTime; 626 if (waitTime <= 0) 627 return false; 628 long start = System.currentTimeMillis(); 629 for (;;) { 630 wait(waitTime); 631 if (poolSize_ == 0) 632 return true; 633 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 634 if (waitTime <= 0) 635 return false; 636 } 637 } 638 639 /** 640 * Wait for a shutdown pool to fully terminate. This method may 641 * only be called <em>after</em> invoking shutdownNow or 642 * shutdownAfterProcessingCurrentlyQueuedTasks. 643 * 644 * @exception IllegalStateException if shutdown has not been requested 645 * @exception InterruptedException if the current thread has been interrupted in the course of waiting 646 */ 647 public synchronized void awaitTerminationAfterShutdown() throws InterruptedException { 648 if (!shutdown_) 649 throw new IllegalStateException(); 650 while (poolSize_ > 0) 651 wait(); 652 } 653 654 /** 655 * Remove all unprocessed tasks from pool queue, and return them in 656 * a java.util.List. Thsi method should be used only when there are 657 * not any active clients of the pool. Otherwise you face the 658 * possibility that the method will loop pulling out tasks as 659 * clients are putting them in. This method can be useful after 660 * shutting down a pool (via shutdownNow) to determine whether there 661 * are any pending tasks that were not processed. You can then, for 662 * example execute all unprocessed commands via code along the lines 663 * of: 664 * 665 * <pre> 666 * List tasks = pool.drain(); 667 * for (Iterator it = tasks.iterator(); it.hasNext();) 668 * ( (Runnable)(it.next()) ).run(); 669 * </pre> 670 **/ 671 public List drain() { 672 boolean wasInterrupted = false; 673 Vector tasks = new Vector(); 674 for (;;) { 675 try { 676 Object x = handOff_.poll(0); 677 if (x == null) 678 break; 679 else 680 tasks.addElement(x); 681 } 682 catch (InterruptedException ex) { 683 wasInterrupted = true; // postpone re-interrupt until drained 684 } 685 } 686 if (wasInterrupted) Thread.currentThread().interrupt(); 687 return tasks; 688 } 689 690 /** 691 * Cleanup method called upon termination of worker thread. 692 **/ 693 protected synchronized void workerDone(Worker w) { 694 threads_.remove(w); 695 if (--poolSize_ == 0 && shutdown_) { 696 maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads 697 notifyAll(); // notify awaitTerminationAfterShutdown 698 } 699 700 // Create a replacement if needed 701 if (poolSize_ == 0 || poolSize_ < minimumPoolSize_) { 702 try { 703 Runnable r = (Runnable)(handOff_.poll(0)); 704 if (r != null && !shutdown_) // just consume task if shut down 705 addThread(r); 706 } catch(InterruptedException ie) { 707 return; 708 } 709 } 710 } 711 712 /** 713 * Get a task from the handoff queue, or null if shutting down. 714 **/ 715 protected Runnable getTask() throws InterruptedException { 716 long waitTime; 717 synchronized(this) { 718 if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads 719 return null; 720 waitTime = (shutdown_)? 0 : keepAliveTime_; 721 } 722 if (waitTime >= 0) 723 return (Runnable)(handOff_.poll(waitTime)); 724 else 725 return (Runnable)(handOff_.take()); 726 } 727 728 729 /** 730 * Class defining the basic run loop for pooled threads. 731 **/ 732 protected class Worker implements Runnable { 733 protected Runnable firstTask_; 734 735 protected Worker(Runnable firstTask) { firstTask_ = firstTask; } 736 737 public void run() { 738 try { 739 Runnable task = firstTask_; 740 firstTask_ = null; // enable GC 741 742 if (task != null) { 743 task.run(); 744 task = null; 745 } 746 747 while ( (task = getTask()) != null) { 748 task.run(); 749 task = null; 750 } 751 } 752 catch (InterruptedException ex) { } // fall through 753 finally { 754 workerDone(this); 755 } 756 } 757 } 758 759 /** 760 * Class for actions to take when execute() blocks. Uses Strategy 761 * pattern to represent different actions. You can add more in 762 * subclasses, and/or create subclasses of these. If so, you will 763 * also want to add or modify the corresponding methods that set the 764 * current blockedExectionHandler_. 765 **/ 766 public interface BlockedExecutionHandler { 767 /** 768 * Return true if successfully handled so, execute should 769 * terminate; else return false if execute loop should be retried. 770 **/ 771 boolean blockedAction(Runnable command) throws InterruptedException; 772 } 773 774 /** Class defining Run action. **/ 775 protected class RunWhenBlocked implements BlockedExecutionHandler { 776 public boolean blockedAction(Runnable command) { 777 command.run(); 778 return true; 779 } 780 } 781 782 /** 783 * Set the policy for blocked execution to be that the current 784 * thread executes the command if there are no available threads in 785 * the pool. 786 **/ 787 public void runWhenBlocked() { 788 setBlockedExecutionHandler(new RunWhenBlocked()); 789 } 790 791 /** Class defining Wait action. **/ 792 protected class WaitWhenBlocked implements BlockedExecutionHandler { 793 public boolean blockedAction(Runnable command) throws InterruptedException{ 794 synchronized(PooledExecutor.this) { 795 if (shutdown_) 796 return true; 797 } 798 handOff_.put(command); 799 return true; 800 } 801 } 802 803 /** 804 * Set the policy for blocked execution to be to wait until a thread 805 * is available, unless the pool has been shut down, in which case 806 * the action is discarded. 807 **/ 808 public void waitWhenBlocked() { 809 setBlockedExecutionHandler(new WaitWhenBlocked()); 810 } 811 812 /** Class defining Discard action. **/ 813 protected class DiscardWhenBlocked implements BlockedExecutionHandler { 814 public boolean blockedAction(Runnable command) { 815 return true; 816 } 817 } 818 819 /** 820 * Set the policy for blocked execution to be to return without 821 * executing the request. 822 **/ 823 public void discardWhenBlocked() { 824 setBlockedExecutionHandler(new DiscardWhenBlocked()); 825 } 826 827 828 /** Class defining Abort action. **/ 829 protected class AbortWhenBlocked implements BlockedExecutionHandler { 830 public boolean blockedAction(Runnable command) { 831 throw new RuntimeException("Pool is blocked"); 832 } 833 } 834 835 /** 836 * Set the policy for blocked execution to be to 837 * throw a RuntimeException. 838 **/ 839 public void abortWhenBlocked() { 840 setBlockedExecutionHandler(new AbortWhenBlocked()); 841 } 842 843 844 /** 845 * Class defining DiscardOldest action. Under this policy, at most 846 * one old unhandled task is discarded. If the new task can then be 847 * handed off, it is. Otherwise, the new task is run in the current 848 * thread (i.e., RunWhenBlocked is used as a backup policy.) 849 **/ 850 protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler { 851 public boolean blockedAction(Runnable command) throws InterruptedException{ 852 handOff_.poll(0); 853 if (!handOff_.offer(command, 0)) 854 command.run(); 855 return true; 856 } 857 } 858 859 /** 860 * Set the policy for blocked execution to be to discard the oldest 861 * unhandled request 862 **/ 863 public void discardOldestWhenBlocked() { 864 setBlockedExecutionHandler(new DiscardOldestWhenBlocked()); 865 } 866 867 /** 868 * Arrange for the given command to be executed by a thread in this 869 * pool. The method normally returns when the command has been 870 * handed off for (possibly later) execution. 871 **/ 872 public void execute(Runnable command) throws InterruptedException { 873 for (;;) { 874 synchronized(this) { 875 if (!shutdown_) { 876 int size = poolSize_; 877 878 // Ensure minimum number of threads 879 if (size < minimumPoolSize_) { 880 addThread(command); 881 return; 882 } 883 884 // Try to give to existing thread 885 if (handOff_.offer(command, 0)) { 886 return; 887 } 888 889 // If cannot handoff and still under maximum, create new thread 890 if (size < maximumPoolSize_) { 891 addThread(command); 892 return; 893 } 894 } 895 } 896 897 // Cannot hand off and cannot create -- ask for help 898 if (getBlockedExecutionHandler().blockedAction(command)) { 899 return; 900 } 901 } 902 } 903 } 904 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|