KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sf > mybatchfwk > PausableThreadPoolExecutor


1 /*
2  * MyBatchFramework - Open-source batch framework.
3  * Copyright (C) 2006 Jérôme Bertèche cyberteche@users.sourceforge.net
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * Jérôme Bertèche
16  * Email: cyberteche@users.sourceforge.net
17  */

18 package net.sf.mybatchfwk;
19
20 import java.util.Arrays JavaDoc;
21 import java.util.ConcurrentModificationException JavaDoc;
22 import java.util.HashSet JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.List JavaDoc;
25 import java.util.concurrent.AbstractExecutorService JavaDoc;
26 import java.util.concurrent.BlockingQueue JavaDoc;
27 import java.util.concurrent.Executors JavaDoc;
28 import java.util.concurrent.Future JavaDoc;
29 import java.util.concurrent.ThreadFactory JavaDoc;
30 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
31 import java.util.concurrent.TimeUnit JavaDoc;
32 import java.util.concurrent.locks.Condition JavaDoc;
33 import java.util.concurrent.locks.ReentrantLock JavaDoc;
34
35 /**
36  * The thread pool executor service that executes the tasks
37  * @author Jérôme Bertèche (cyberteche@users.sourceforge.net)
38  */

39 public class PausableThreadPoolExecutor extends AbstractExecutorService JavaDoc implements ITaskExecutor {
40     
41     private boolean isPaused;
42     private ReentrantLock JavaDoc pauseLock = new ReentrantLock JavaDoc();
43     private Condition JavaDoc unpaused = pauseLock.newCondition();
44     protected BatchService batchService = null;
45
46     /**
47      * Only used to force toArray() to produce a Runnable[].
48      */

49     private static final Runnable JavaDoc[] EMPTY_RUNNABLE_ARRAY = new Runnable JavaDoc[0];
50
51     /**
52      * Permission for checking shutdown
53      */

54     private static final RuntimePermission JavaDoc shutdownPerm =
55         new RuntimePermission JavaDoc("modifyThread");
56
57     /**
58      * Queue used for holding tasks and handing off to worker threads.
59      */

60     protected final BlockingQueue JavaDoc<Runnable JavaDoc> workQueue;
61
62     /**
63      * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
64      * workers set.
65      */

66     private final ReentrantLock JavaDoc mainLock = new ReentrantLock JavaDoc();
67
68     /**
69      * Wait condition to support awaitTermination
70      */

71     private final Condition JavaDoc termination = mainLock.newCondition();
72
73     /**
74      * Set containing all worker threads in pool.
75      */

76     private final HashSet JavaDoc<Worker> workers = new HashSet JavaDoc<Worker>();
77
78     /**
79      * Timeout in nanoseconds for idle threads waiting for work.
80      * Threads use this timeout only when there are more than
81      * corePoolSize present. Otherwise they wait forever for new work.
82      */

83     private volatile long keepAliveTime;
84
85     /**
86      * Core pool size, updated only while holding mainLock,
87      * but volatile to allow concurrent readability even
88      * during updates.
89      */

90     protected volatile int corePoolSize;
91
92     /**
93      * Maximum pool size, updated only while holding mainLock
94      * but volatile to allow concurrent readability even
95      * during updates.
96      */

97     private volatile int maximumPoolSize;
98
99     /**
100      * Current pool size, updated only while holding mainLock
101      * but volatile to allow concurrent readability even
102      * during updates.
103      */

104     protected volatile int poolSize;
105
106     /**
107      * Lifecycle state
108      */

109     protected volatile int runState;
110
111     // Special values for runState
112
/** Normal, not-shutdown mode */
113     static protected final int RUNNING = 0;
114     /** Controlled shutdown mode */
115     static protected final int SHUTDOWN = 1;
116     /** Immediate shutdown mode */
117     static protected final int STOP = 2;
118     /** Final state */
119     static protected final int TERMINATED = 3;
120
121     /**
122      * Factory for new threads.
123      */

124     private volatile ThreadFactory JavaDoc threadFactory;
125
126     /**
127      * Tracks largest attained pool size.
128      */

129     private int largestPoolSize;
130
131     /**
132      * Counter for completed tasks. Updated only on termination of
133      * worker threads.
134      */

135     private long completedTaskCount;
136     
137     /**
138      * Create and return a new thread running firstTask as its first
139      * task. Call only while holding mainLock
140      * @param firstTask the task the new thread should run first (or
141      * null if none)
142      * @return the new thread, or null if threadFactory fails to create thread
143      */

144     private Thread JavaDoc addThread(Runnable JavaDoc firstTask) {
145         Worker w = new Worker(firstTask);
146         Thread JavaDoc t = threadFactory.newThread(w);
147         if (t != null) {
148             w.thread = t;
149             workers.add(w);
150             int nt = ++poolSize;
151             if (nt > largestPoolSize)
152                 largestPoolSize = nt;
153         }
154         return t;
155     }
156
157     /**
158      * Create and start a new thread running firstTask as its first
159      * task, only if fewer than corePoolSize threads are running.
160      * @param firstTask the task the new thread should run first (or
161      * null if none)
162      * @return true if successful.
163      */

164     protected boolean addIfUnderCorePoolSize(Runnable JavaDoc firstTask) {
165         Thread JavaDoc t = null;
166         final ReentrantLock JavaDoc mainLock = this.mainLock;
167         mainLock.lock();
168         try {
169             if (poolSize < corePoolSize)
170                 t = addThread(firstTask);
171         } finally {
172             mainLock.unlock();
173         }
174         if (t == null)
175             return false;
176         t.start();
177         return true;
178     }
179
180     /**
181      * Create and start a new thread only if fewer than maximumPoolSize
182      * threads are running. The new thread runs as its first task the
183      * next task in queue, or if there is none, the given task.
184      * @param firstTask the task the new thread should run first (or
185      * null if none)
186      * @return null on failure, else the first task to be run by new thread.
187      */

188     protected Runnable JavaDoc addIfUnderMaximumPoolSize(Runnable JavaDoc firstTask) {
189         Thread JavaDoc t = null;
190         Runnable JavaDoc next = null;
191         final ReentrantLock JavaDoc mainLock = this.mainLock;
192         mainLock.lock();
193         try {
194             if (poolSize < maximumPoolSize) {
195                 next = workQueue.poll();
196                 if (next == null)
197                     next = firstTask;
198                 t = addThread(next);
199             }
200         } finally {
201             mainLock.unlock();
202         }
203         if (t == null)
204             return null;
205         t.start();
206         return next;
207     }
208
209
210     /**
211      * Get the next task for a worker thread to run.
212      * @return the task
213      * @throws InterruptedException if interrupted while waiting for task
214      */

215     Runnable JavaDoc getTask() throws InterruptedException JavaDoc {
216         for (;;) {
217             switch(runState) {
218             case RUNNING: {
219                 if (poolSize <= corePoolSize) // untimed wait if core
220
return workQueue.take();
221                 
222                 long timeout = keepAliveTime;
223                 if (timeout <= 0) // die immediately for 0 timeout
224
return null;
225                 Runnable JavaDoc r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
226                 if (r != null)
227                     return r;
228                 if (poolSize > corePoolSize) // timed out
229
return null;
230                 // else, after timeout, pool shrank so shouldn't die, so retry
231
break;
232             }
233
234             case SHUTDOWN: {
235                 // Help drain queue
236
Runnable JavaDoc r = workQueue.poll();
237                 if (r != null)
238                     return r;
239                     
240                 // Check if can terminate
241
if (workQueue.isEmpty()) {
242                     interruptIdleWorkers();
243                     return null;
244                 }
245
246                 // There could still be delayed tasks in queue.
247
// Wait for one, re-checking state upon interruption
248
try {
249                     return workQueue.take();
250                 } catch(InterruptedException JavaDoc ignore) {}
251                 break;
252             }
253
254             case STOP:
255                 return null;
256             default:
257                 assert false;
258             }
259         }
260     }
261
262     /**
263      * Wake up all threads that might be waiting for tasks.
264      */

265     void interruptIdleWorkers() {
266         final ReentrantLock JavaDoc mainLock = this.mainLock;
267         mainLock.lock();
268         try {
269             for (Worker w : workers)
270                 w.interruptIfIdle();
271         } finally {
272             mainLock.unlock();
273         }
274     }
275
276     /**
277      * Perform bookkeeping for a terminated worker thread.
278      * @param w the worker
279      */

280     void workerDone(Worker w) {
281         final ReentrantLock JavaDoc mainLock = this.mainLock;
282         mainLock.lock();
283         try {
284             completedTaskCount += w.completedTasks;
285             workers.remove(w);
286             if (--poolSize > 0)
287                 return;
288
289             // Else, this is the last thread. Deal with potential shutdown.
290

291             int state = runState;
292             assert state != TERMINATED;
293
294             if (state != STOP) {
295                 // If there are queued tasks but no threads, create
296
// replacement thread. We must create it initially
297
// idle to avoid orphaned tasks in case addThread
298
// fails. This also handles case of delayed tasks
299
// that will sometime later become runnable.
300
if (!workQueue.isEmpty()) {
301                     Thread JavaDoc t = addThread(null);
302                     if (t != null)
303                         t.start();
304                     return;
305                 }
306
307                 // Otherwise, we can exit without replacement
308
if (state == RUNNING)
309                     return;
310             }
311
312             // Either state is STOP, or state is SHUTDOWN and there is
313
// no work to do. So we can terminate.
314
termination.signalAll();
315             runState = TERMINATED;
316             // fall through to call terminate() outside of lock.
317
} finally {
318             mainLock.unlock();
319         }
320
321         assert runState == TERMINATED;
322         terminated();
323     }
324
325     /**
326      * Worker threads
327      */

328     private class Worker implements Runnable JavaDoc {
329
330         /**
331          * The runLock is acquired and released surrounding each task
332          * execution. It mainly protects against interrupts that are
333          * intended to cancel the worker thread from instead
334          * interrupting the task being run.
335          */

336         private final ReentrantLock JavaDoc runLock = new ReentrantLock JavaDoc();
337
338         /**
339          * Initial task to run before entering run loop
340          */

341         private Runnable JavaDoc firstTask;
342
343         /**
344          * Per thread completed task counter; accumulated
345          * into completedTaskCount upon termination.
346          */

347         volatile long completedTasks;
348
349         /**
350          * Thread this worker is running in. Acts as a final field,
351          * but cannot be set until thread is created.
352          */

353         Thread JavaDoc thread;
354
355         Worker(Runnable JavaDoc firstTask) {
356             this.firstTask = firstTask;
357         }
358
359         boolean isActive() {
360             return runLock.isLocked();
361         }
362
363         /**
364          * Interrupt thread if not running a task
365          */

366         void interruptIfIdle() {
367             final ReentrantLock JavaDoc runLock = this.runLock;
368             if (runLock.tryLock()) {
369                 try {
370                     thread.interrupt();
371                 } finally {
372                     runLock.unlock();
373                 }
374             }
375         }
376
377         /**
378          * Cause thread to die even if running a task.
379          */

380         void interruptNow() {
381             thread.interrupt();
382         }
383
384         /**
385          * Run a single task between before/after methods.
386          */

387         private void runTask(Runnable JavaDoc task) {
388             final ReentrantLock JavaDoc runLock = this.runLock;
389             runLock.lock();
390             try {
391                 // Abort now if immediate cancel. Otherwise, we have
392
// committed to run this task.
393
if (runState == STOP)
394                     return;
395
396                 Thread.interrupted(); // clear interrupt status on entry
397
boolean ran = false;
398                 beforeExecute(thread, task);
399                 try {
400                     task.run();
401                     ran = true;
402                     afterExecute(task, null);
403                     ++completedTasks;
404                 } catch(RuntimeException JavaDoc ex) {
405                     if (!ran)
406                         afterExecute(task, ex);
407                 }
408             } finally {
409                 runLock.unlock();
410             }
411         }
412
413         /**
414          * Main run loop
415          */

416         public void run() {
417             try {
418                 Runnable JavaDoc task = firstTask;
419                 firstTask = null;
420                 while (task != null || (task = getTask()) != null) {
421                     runTask(task);
422                     task = null; // unnecessary but can help GC
423
}
424             } catch(InterruptedException JavaDoc ie) {
425                 // fall through
426
} finally {
427                 workerDone(this);
428             }
429         }
430     }
431
432     // Public methods
433

434     /**
435      * Creates a new <tt>ThreadPoolExecutor</tt> with the given
436      * initial parameters and default thread factory and handler. It
437      * may be more convenient to use one of the {@link Executors}
438      * factory methods instead of this general purpose constructor.
439      *
440      * @param corePoolSize the number of threads to keep in the
441      * pool, even if they are idle.
442      * @param maximumPoolSize the maximum number of threads to allow in the
443      * pool.
444      * @param keepAliveTime when the number of threads is greater than
445      * the core, this is the maximum time that excess idle threads
446      * will wait for new tasks before terminating.
447      * @param unit the time unit for the keepAliveTime
448      * argument.
449      * @param workQueue the queue to use for holding tasks before they
450      * are executed. This queue will hold only the <tt>Runnable</tt>
451      * tasks submitted by the <tt>execute</tt> method.
452      * @throws IllegalArgumentException if corePoolSize, or
453      * keepAliveTime less than zero, or if maximumPoolSize less than or
454      * equal to zero, or if corePoolSize greater than maximumPoolSize.
455      * @throws NullPointerException if <tt>workQueue</tt> is null
456      */

457     public PausableThreadPoolExecutor(BatchService batchService, int corePoolSize,
458                               int maximumPoolSize,
459                               long keepAliveTime,
460                               TimeUnit JavaDoc unit,
461                               BlockingQueue JavaDoc<Runnable JavaDoc> workQueue) {
462         if (corePoolSize < 0 ||
463             maximumPoolSize <= 0 ||
464             maximumPoolSize < corePoolSize ||
465             keepAliveTime < 0)
466             throw new IllegalArgumentException JavaDoc();
467         if (workQueue == null)
468             throw new NullPointerException JavaDoc();
469         this.batchService = batchService;
470         this.corePoolSize = corePoolSize;
471         this.maximumPoolSize = maximumPoolSize;
472         this.workQueue = workQueue;
473         this.keepAliveTime = unit.toNanos(keepAliveTime);
474         this.threadFactory = Executors.defaultThreadFactory();
475     }
476
477
478     /**
479      * Executes the given task sometime in the future. The task
480      * may execute in a new thread or in an existing pooled thread.
481      *
482      * If the task cannot be submitted for execution, either because this
483      * executor has been shutdown or because its capacity has been reached,
484      * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
485      *
486      * @param command the task to execute
487      * @throws RejectedExecutionException at discretion of
488      * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
489      * for execution
490      * @throws NullPointerException if command is null
491      */

492     public void execute(Runnable JavaDoc command) {
493         if (command == null)
494             throw new NullPointerException JavaDoc();
495         if (runState != RUNNING) {
496             return;
497         }
498         if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
499             return;
500         try {
501             workQueue.put(command);
502         } catch (InterruptedException JavaDoc e) {}
503         addIfUnderMaximumPoolSize(command);
504     }
505     
506     /* (non-Javadoc)
507      * @see net.sf.mybatchfwk.ITaskExecutor#execute(net.sf.mybatchfwk.ITask)
508      */

509     public boolean execute(ITask task) throws BatchException {
510         if (task == null)
511             throw new NullPointerException JavaDoc();
512         if (runState != RUNNING) {
513             return false;
514         }
515         if (!batchService.canBeExecuted(task)) {
516             return false;
517         }
518         if (poolSize < corePoolSize && addIfUnderCorePoolSize(task))
519             return true;
520         try {
521             workQueue.put(task);
522         } catch (InterruptedException JavaDoc e) {
523             return false;
524         }
525         addIfUnderMaximumPoolSize(task);
526         return true;
527     }
528
529     /**
530      * Initiates an orderly shutdown in which previously submitted
531      * tasks are executed, but no new tasks will be
532      * accepted. Invocation has no additional effect if already shut
533      * down.
534      * @throws SecurityException if a security manager exists and
535      * shutting down this ExecutorService may manipulate threads that
536      * the caller is not permitted to modify because it does not hold
537      * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
538      * or the security manager's <tt>checkAccess</tt> method denies access.
539      */

540     public void shutdown() {
541         // Fail if caller doesn't have modifyThread permission. We
542
// explicitly check permissions directly because we can't trust
543
// implementations of SecurityManager to correctly override
544
// the "check access" methods such that our documented
545
// security policy is implemented.
546
SecurityManager JavaDoc security = System.getSecurityManager();
547     if (security != null)
548             java.security.AccessController.checkPermission(shutdownPerm);
549
550         boolean fullyTerminated = false;
551         final ReentrantLock JavaDoc mainLock = this.mainLock;
552         mainLock.lock();
553         try {
554             if (workers.size() > 0) {
555                 // Check if caller can modify worker threads. This
556
// might not be true even if passed above check, if
557
// the SecurityManager treats some threads specially.
558
if (security != null) {
559                     for (Worker w: workers)
560                         security.checkAccess(w.thread);
561                 }
562
563                 int state = runState;
564                 if (state == RUNNING) // don't override shutdownNow
565
runState = SHUTDOWN;
566
567                 try {
568                     for (Worker w: workers)
569                         w.interruptIfIdle();
570                 } catch(SecurityException JavaDoc se) {
571                     // If SecurityManager allows above checks, but
572
// then unexpectedly throws exception when
573
// interrupting threads (which it ought not do),
574
// back out as cleanly as we can. Some threads may
575
// have been killed but we remain in non-shutdown
576
// state.
577
runState = state;
578                     throw se;
579                 }
580             }
581             else { // If no workers, trigger full termination now
582
fullyTerminated = true;
583                 runState = TERMINATED;
584                 termination.signalAll();
585             }
586         } finally {
587             mainLock.unlock();
588         }
589         if (fullyTerminated)
590             terminated();
591     }
592
593
594     /**
595      * Attempts to stop all actively executing tasks, halts the
596      * processing of waiting tasks, and returns a list of the tasks that were
597      * awaiting execution.
598      *
599      * <p>This implementation cancels tasks via {@link
600      * Thread#interrupt}, so if any tasks mask or fail to respond to
601      * interrupts, they may never terminate.
602      *
603      * @return list of tasks that never commenced execution
604      * @throws SecurityException if a security manager exists and
605      * shutting down this ExecutorService may manipulate threads that
606      * the caller is not permitted to modify because it does not hold
607      * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
608      * or the security manager's <tt>checkAccess</tt> method denies access.
609      */

610     public List JavaDoc<Runnable JavaDoc> shutdownNow() {
611         // Almost the same code as shutdown()
612
SecurityManager JavaDoc security = System.getSecurityManager();
613     if (security != null)
614             java.security.AccessController.checkPermission(shutdownPerm);
615
616         boolean fullyTerminated = false;
617         final ReentrantLock JavaDoc mainLock = this.mainLock;
618         mainLock.lock();
619         try {
620             if (workers.size() > 0) {
621                 if (security != null) {
622                     for (Worker w: workers)
623                         security.checkAccess(w.thread);
624                 }
625
626                 int state = runState;
627                 if (state != TERMINATED)
628                     runState = STOP;
629                 try {
630                     for (Worker w : workers)
631                         w.interruptNow();
632                 } catch(SecurityException JavaDoc se) {
633                     runState = state; // back out;
634
throw se;
635                 }
636             }
637             else { // If no workers, trigger full termination now
638
fullyTerminated = true;
639                 runState = TERMINATED;
640                 termination.signalAll();
641             }
642         } finally {
643             mainLock.unlock();
644         }
645         if (fullyTerminated)
646             terminated();
647         return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
648     }
649
650     public boolean isShutdown() {
651         return runState != RUNNING;
652     }
653
654     /**
655      * Returns true if this executor is in the process of terminating
656      * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
657      * completely terminated. This method may be useful for
658      * debugging. A return of <tt>true</tt> reported a sufficient
659      * period after shutdown may indicate that submitted tasks have
660      * ignored or suppressed interruption, causing this executor not
661      * to properly terminate.
662      * @return true if terminating but not yet terminated.
663      */

664     public boolean isTerminating() {
665         return runState == STOP;
666     }
667
668     public boolean isTerminated() {
669         return runState == TERMINATED;
670     }
671
672     public boolean awaitTermination(long timeout, TimeUnit JavaDoc unit)
673         throws InterruptedException JavaDoc {
674         long nanos = unit.toNanos(timeout);
675         final ReentrantLock JavaDoc mainLock = this.mainLock;
676         mainLock.lock();
677         try {
678             for (;;) {
679                 if (runState == TERMINATED)
680                     return true;
681                 if (nanos <= 0)
682                     return false;
683                 nanos = termination.awaitNanos(nanos);
684             }
685         } finally {
686             mainLock.unlock();
687         }
688     }
689
690     /**
691      * Invokes <tt>shutdown</tt> when this executor is no longer
692      * referenced.
693      */

694     protected void finalize() {
695         shutdown();
696     }
697
698     /**
699      * Sets the thread factory used to create new threads.
700      *
701      * @param threadFactory the new thread factory
702      * @throws NullPointerException if threadFactory is null
703      * @see #getThreadFactory
704      */

705     public void setThreadFactory(ThreadFactory JavaDoc threadFactory) {
706         if (threadFactory == null)
707             throw new NullPointerException JavaDoc();
708         this.threadFactory = threadFactory;
709     }
710
711     /**
712      * Returns the thread factory used to create new threads.
713      *
714      * @return the current thread factory
715      * @see #setThreadFactory
716      */

717     public ThreadFactory JavaDoc getThreadFactory() {
718         return threadFactory;
719     }
720
721     /**
722      * Returns the task queue used by this executor. Access to the
723      * task queue is intended primarily for debugging and monitoring.
724      * This queue may be in active use. Retrieving the task queue
725      * does not prevent queued tasks from executing.
726      *
727      * @return the task queue
728      */

729     public BlockingQueue JavaDoc<Runnable JavaDoc> getQueue() {
730         return workQueue;
731     }
732
733     /**
734      * Removes this task from the executor's internal queue if it is
735      * present, thus causing it not to be run if it has not already
736      * started.
737      *
738      * <p> This method may be useful as one part of a cancellation
739      * scheme. It may fail to remove tasks that have been converted
740      * into other forms before being placed on the internal queue. For
741      * example, a task entered using <tt>submit</tt> might be
742      * converted into a form that maintains <tt>Future</tt> status.
743      * However, in such cases, method {@link ThreadPoolExecutor#purge}
744      * may be used to remove those Futures that have been cancelled.
745      *
746      *
747      * @param task the task to remove
748      * @return true if the task was removed
749      */

750     public boolean remove(Runnable JavaDoc task) {
751         return getQueue().remove(task);
752     }
753
754
755     /**
756      * Tries to remove from the work queue all {@link Future}
757      * tasks that have been cancelled. This method can be useful as a
758      * storage reclamation operation, that has no other impact on
759      * functionality. Cancelled tasks are never executed, but may
760      * accumulate in work queues until worker threads can actively
761      * remove them. Invoking this method instead tries to remove them now.
762      * However, this method may fail to remove tasks in
763      * the presence of interference by other threads.
764      */

765     public void purge() {
766         // Fail if we encounter interference during traversal
767
try {
768             Iterator JavaDoc<Runnable JavaDoc> it = getQueue().iterator();
769             while (it.hasNext()) {
770                 Runnable JavaDoc r = it.next();
771                 if (r instanceof Future JavaDoc<?>) {
772                     Future JavaDoc<?> c = (Future JavaDoc<?>)r;
773                     if (c.isCancelled())
774                         it.remove();
775                 }
776             }
777         }
778         catch(ConcurrentModificationException JavaDoc ex) {
779             return;
780         }
781     }
782
783     /**
784      * Sets the core number of threads. This overrides any value set
785      * in the constructor. If the new value is smaller than the
786      * current value, excess existing threads will be terminated when
787      * they next become idle. If larger, new threads will, if needed,
788      * be started to execute any queued tasks.
789      *
790      * @param corePoolSize the new core size
791      * @throws IllegalArgumentException if <tt>corePoolSize</tt>
792      * less than zero
793      * @see #getCorePoolSize
794      */

795     public void setCorePoolSize(int corePoolSize) {
796         if (corePoolSize < 0)
797             throw new IllegalArgumentException JavaDoc();
798         final ReentrantLock JavaDoc mainLock = this.mainLock;
799         mainLock.lock();
800         try {
801             int extra = this.corePoolSize - corePoolSize;
802             this.corePoolSize = corePoolSize;
803             if (extra < 0) {
804                 int n = workQueue.size();
805                 // We have to create initially-idle threads here
806
// because we otherwise have no recourse about
807
// what to do with a dequeued task if addThread fails.
808
while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) {
809                     Thread JavaDoc t = addThread(null);
810                     if (t != null)
811                         t.start();
812                     else
813                         break;
814                 }
815             }
816             else if (extra > 0 && poolSize > corePoolSize) {
817                 Iterator JavaDoc<Worker> it = workers.iterator();
818                 while (it.hasNext() &&
819                        extra-- > 0 &&
820                        poolSize > corePoolSize &&
821                        workQueue.remainingCapacity() == 0)
822                     it.next().interruptIfIdle();
823             }
824         } finally {
825             mainLock.unlock();
826         }
827     }
828
829     /**
830      * Returns the core number of threads.
831      *
832      * @return the core number of threads
833      * @see #setCorePoolSize
834      */

835     public int getCorePoolSize() {
836         return corePoolSize;
837     }
838
839     /**
840      * Starts a core thread, causing it to idly wait for work. This
841      * overrides the default policy of starting core threads only when
842      * new tasks are executed. This method will return <tt>false</tt>
843      * if all core threads have already been started.
844      * @return true if a thread was started
845      */

846     public boolean prestartCoreThread() {
847         return addIfUnderCorePoolSize(null);
848     }
849
850     /**
851      * Starts all core threads, causing them to idly wait for work. This
852      * overrides the default policy of starting core threads only when
853      * new tasks are executed.
854      * @return the number of threads started.
855      */

856     public int prestartAllCoreThreads() {
857         int n = 0;
858         while (addIfUnderCorePoolSize(null))
859             ++n;
860         return n;
861     }
862
863     /**
864      * Sets the maximum allowed number of threads. This overrides any
865      * value set in the constructor. If the new value is smaller than
866      * the current value, excess existing threads will be
867      * terminated when they next become idle.
868      *
869      * @param maximumPoolSize the new maximum
870      * @throws IllegalArgumentException if maximumPoolSize less than zero or
871      * the {@link #getCorePoolSize core pool size}
872      * @see #getMaximumPoolSize
873      */

874     public void setMaximumPoolSize(int maximumPoolSize) {
875         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
876             throw new IllegalArgumentException JavaDoc();
877         final ReentrantLock JavaDoc mainLock = this.mainLock;
878         mainLock.lock();
879         try {
880             int extra = this.maximumPoolSize - maximumPoolSize;
881             this.maximumPoolSize = maximumPoolSize;
882             if (extra > 0 && poolSize > maximumPoolSize) {
883                 Iterator JavaDoc<Worker> it = workers.iterator();
884                 while (it.hasNext() &&
885                        extra > 0 &&
886                        poolSize > maximumPoolSize) {
887                     it.next().interruptIfIdle();
888                     --extra;
889                 }
890             }
891         } finally {
892             mainLock.unlock();
893         }
894     }
895
896     /**
897      * Returns the maximum allowed number of threads.
898      *
899      * @return the maximum allowed number of threads
900      * @see #setMaximumPoolSize
901      */

902     public int getMaximumPoolSize() {
903         return maximumPoolSize;
904     }
905
906     /**
907      * Sets the time limit for which threads may remain idle before
908      * being terminated. If there are more than the core number of
909      * threads currently in the pool, after waiting this amount of
910      * time without processing a task, excess threads will be
911      * terminated. This overrides any value set in the constructor.
912      * @param time the time to wait. A time value of zero will cause
913      * excess threads to terminate immediately after executing tasks.
914      * @param unit the time unit of the time argument
915      * @throws IllegalArgumentException if time less than zero
916      * @see #getKeepAliveTime
917      */

918     public void setKeepAliveTime(long time, TimeUnit JavaDoc unit) {
919         if (time < 0)
920             throw new IllegalArgumentException JavaDoc();
921         this.keepAliveTime = unit.toNanos(time);
922     }
923
924     /**
925      * Returns the thread keep-alive time, which is the amount of time
926      * which threads in excess of the core pool size may remain
927      * idle before being terminated.
928      *
929      * @param unit the desired time unit of the result
930      * @return the time limit
931      * @see #setKeepAliveTime
932      */

933     public long getKeepAliveTime(TimeUnit JavaDoc unit) {
934         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
935     }
936
937     /* Statistics */
938
939     /**
940      * Returns the current number of threads in the pool.
941      *
942      * @return the number of threads
943      */

944     public int getPoolSize() {
945         return poolSize;
946     }
947
948     /**
949      * Returns the approximate number of threads that are actively
950      * executing tasks.
951      *
952      * @return the number of threads
953      */

954     public int getActiveCount() {
955         final ReentrantLock JavaDoc mainLock = this.mainLock;
956         mainLock.lock();
957         try {
958             int n = 0;
959             for (Worker w : workers) {
960                 if (w.isActive())
961                     ++n;
962             }
963             return n;
964         } finally {
965             mainLock.unlock();
966         }
967     }
968
969     /**
970      * Returns the largest number of threads that have ever
971      * simultaneously been in the pool.
972      *
973      * @return the number of threads
974      */

975     public int getLargestPoolSize() {
976         final ReentrantLock JavaDoc mainLock = this.mainLock;
977         mainLock.lock();
978         try {
979             return largestPoolSize;
980         } finally {
981             mainLock.unlock();
982         }
983     }
984
985     /**
986      * Returns the approximate total number of tasks that have been
987      * scheduled for execution. Because the states of tasks and
988      * threads may change dynamically during computation, the returned
989      * value is only an approximation, but one that does not ever
990      * decrease across successive calls.
991      *
992      * @return the number of tasks
993      */

994     public long getTaskCount() {
995         final ReentrantLock JavaDoc mainLock = this.mainLock;
996         mainLock.lock();
997         try {
998             long n = completedTaskCount;
999             for (Worker w : workers) {
1000                n += w.completedTasks;
1001                if (w.isActive())
1002                    ++n;
1003            }
1004            return n + workQueue.size();
1005        } finally {
1006            mainLock.unlock();
1007        }
1008    }
1009
1010    /**
1011     * Returns the approximate total number of tasks that have
1012     * completed execution. Because the states of tasks and threads
1013     * may change dynamically during computation, the returned value
1014     * is only an approximation, but one that does not ever decrease
1015     * across successive calls.
1016     *
1017     * @return the number of tasks
1018     */

1019    public long getCompletedTaskCount() {
1020        final ReentrantLock JavaDoc mainLock = this.mainLock;
1021        mainLock.lock();
1022        try {
1023            long n = completedTaskCount;
1024            for (Worker w : workers)
1025                n += w.completedTasks;
1026            return n;
1027        } finally {
1028            mainLock.unlock();
1029        }
1030    }
1031
1032    /**
1033     * Method invoked when the Executor has terminated. Default
1034     * implementation does nothing. Note: To properly nest multiple
1035     * overridings, subclasses should generally invoke
1036     * <tt>super.terminated</tt> within this method.
1037     */

1038    protected void terminated() { }
1039
1040    /**
1041     * Method invoked prior to executing the given Runnable in the
1042     * given thread. This method is invoked by thread <tt>t</tt> that
1043     * will execute task <tt>r</tt>, and may be used to re-initialize
1044     * ThreadLocals, or to perform logging. Note: To properly nest
1045     * multiple overridings, subclasses should generally invoke
1046     * <tt>super.beforeExecute</tt> at the end of this method.
1047     *
1048     * @param t the thread that will run task r.
1049     * @param r the task that will be executed.
1050     */

1051    protected void beforeExecute(Thread JavaDoc t, Runnable JavaDoc r) {
1052        pauseLock.lock();
1053        try {
1054            while (isPaused) {
1055                unpaused.await();
1056            }
1057        } catch(InterruptedException JavaDoc ie) {
1058            t.interrupt();
1059        } finally {
1060            pauseLock.unlock();
1061        }
1062    }
1063    
1064    /**
1065     * Method invoked upon completion of execution of the given
1066     * Runnable. This method is invoked by the thread that executed
1067     * the task. If non-null, the Throwable is the uncaught exception
1068     * that caused execution to terminate abruptly. Note: To properly
1069     * nest multiple overridings, subclasses should generally invoke
1070     * <tt>super.afterExecute</tt> at the beginning of this method.
1071     *
1072     * @param r the runnable that has completed.
1073     * @param t the exception that caused termination, or null if
1074     * execution completed normally.
1075     */

1076    protected void afterExecute(Runnable JavaDoc r, Throwable JavaDoc t) {
1077        batchService.afterExecute(((ITask)r), t);
1078    }
1079
1080    /**
1081     * Pause the execution of all tasks
1082     */

1083    public void pause() {
1084        pauseLock.lock();
1085        try {
1086            isPaused = true;
1087        } finally {
1088            pauseLock.unlock();
1089        }
1090    }
1091     
1092    /**
1093     * Resume the execution of all tasks
1094     */

1095    public void resume() {
1096        pauseLock.lock();
1097        try {
1098            isPaused = false;
1099            unpaused.signalAll();
1100        } finally {
1101            pauseLock.unlock();
1102        }
1103    }
1104
1105    /* (non-Javadoc)
1106     * @see net.sf.mybatchfwk.ITaskExecutor#isRunning()
1107     */

1108    public boolean isRunning() {
1109        return (runState == RUNNING);
1110    }
1111}
1112
Popular Tags