KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > ScheduledThreadPoolExecutor


1 /*
2  * @(#)ScheduledThreadPoolExecutor.java 1.3 04/04/14
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.atomic.*;
10 import java.util.*;
11
12 /**
13  * A {@link ThreadPoolExecutor} that can additionally schedule
14  * commands to run after a given delay, or to execute
15  * periodically. This class is preferable to {@link java.util.Timer}
16  * when multiple worker threads are needed, or when the additional
17  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18  * this class extends) are required.
19  *
20  * <p> Delayed tasks execute no sooner than they are enabled, but
21  * without any real-time guarantees about when, after they are
22  * enabled, they will commence. Tasks scheduled for exactly the same
23  * execution time are enabled in first-in-first-out (FIFO) order of
24  * submission.
25  *
26  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27  * of the inherited tuning methods are not useful for it. In
28  * particular, because it acts as a fixed-sized pool using
29  * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30  * to <tt>maximumPoolSize</tt> have no useful effect.
31  *
32  * @since 1.5
33  * @author Doug Lea
34  */

35 public class ScheduledThreadPoolExecutor
36         extends ThreadPoolExecutor JavaDoc
37         implements ScheduledExecutorService JavaDoc {
38
39     /**
40      * False if should cancel/suppress periodic tasks on shutdown.
41      */

42     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
43
44     /**
45      * False if should cancel non-periodic tasks on shutdown.
46      */

47     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
48
49     /**
50      * Sequence number to break scheduling ties, and in turn to
51      * guarantee FIFO order among tied entries.
52      */

53     private static final AtomicLong sequencer = new AtomicLong(0);
54
55     /** Base of nanosecond timings, to avoid wrapping */
56     private static final long NANO_ORIGIN = System.nanoTime();
57
58     /**
59      * Returns nanosecond time offset by origin
60      */

61     final long now() {
62     return System.nanoTime() - NANO_ORIGIN;
63     }
64
65     private class ScheduledFutureTask<V>
66             extends FutureTask JavaDoc<V> implements ScheduledFuture JavaDoc<V> {
67         
68         /** Sequence number to break ties FIFO */
69         private final long sequenceNumber;
70         /** The time the task is enabled to execute in nanoTime units */
71         private long time;
72         /**
73          * Period in nanoseconds for repeating tasks. A positive
74          * value indicates fixed-rate execution. A negative value
75          * indicates fixed-delay execution. A value of 0 indicates a
76          * non-repeating task.
77          */

78         private final long period;
79
80         /**
81          * Creates a one-shot action with given nanoTime-based trigger time
82          */

83         ScheduledFutureTask(Runnable JavaDoc r, V result, long ns) {
84             super(r, result);
85             this.time = ns;
86             this.period = 0;
87             this.sequenceNumber = sequencer.getAndIncrement();
88         }
89
90         /**
91          * Creates a periodic action with given nano time and period
92          */

93         ScheduledFutureTask(Runnable JavaDoc r, V result, long ns, long period) {
94             super(r, result);
95             this.time = ns;
96             this.period = period;
97             this.sequenceNumber = sequencer.getAndIncrement();
98         }
99
100         /**
101          * Creates a one-shot action with given nanoTime-based trigger
102          */

103         ScheduledFutureTask(Callable JavaDoc<V> callable, long ns) {
104             super(callable);
105             this.time = ns;
106             this.period = 0;
107             this.sequenceNumber = sequencer.getAndIncrement();
108         }
109
110         public long getDelay(TimeUnit JavaDoc unit) {
111             long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
112             return d;
113         }
114
115         public int compareTo(Delayed JavaDoc other) {
116             if (other == this) // compare zero ONLY if same object
117
return 0;
118             ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
119             long diff = time - x.time;
120             if (diff < 0)
121                 return -1;
122             else if (diff > 0)
123                 return 1;
124             else if (sequenceNumber < x.sequenceNumber)
125                 return -1;
126             else
127                 return 1;
128         }
129
130         /**
131          * Returns true if this is a periodic (not a one-shot) action.
132          * @return true if periodic
133          */

134         boolean isPeriodic() {
135             return period != 0;
136         }
137
138         /**
139          * Run a periodic task
140          */

141         private void runPeriodic() {
142             boolean ok = ScheduledFutureTask.super.runAndReset();
143             boolean down = isShutdown();
144             // Reschedule if not cancelled and not shutdown or policy allows
145
if (ok && (!down ||
146                        (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
147                         !isTerminating()))) {
148                 long p = period;
149                 if (p > 0)
150                     time += p;
151                 else
152                     time = now() - p;
153                 ScheduledThreadPoolExecutor.super.getQueue().add(this);
154             }
155             // This might have been the final executed delayed
156
// task. Wake up threads to check.
157
else if (down)
158                 interruptIdleWorkers();
159         }
160
161         /**
162          * Overrides FutureTask version so as to reset/requeue if periodic.
163          */

164         public void run() {
165             if (isPeriodic())
166                 runPeriodic();
167             else
168                 ScheduledFutureTask.super.run();
169         }
170     }
171
172     /**
173      * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
174      */

175     private void delayedExecute(Runnable JavaDoc command) {
176         if (isShutdown()) {
177             reject(command);
178             return;
179         }
180         // Prestart a thread if necessary. We cannot prestart it
181
// running the task because the task (probably) shouldn't be
182
// run yet, so thread will just idle until delay elapses.
183
if (getPoolSize() < getCorePoolSize())
184             prestartCoreThread();
185             
186         super.getQueue().add(command);
187     }
188
189     /**
190      * Cancel and clear the queue of all tasks that should not be run
191      * due to shutdown policy.
192      */

193     private void cancelUnwantedTasks() {
194         boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
195         boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
196         if (!keepDelayed && !keepPeriodic)
197             super.getQueue().clear();
198         else if (keepDelayed || keepPeriodic) {
199             Object JavaDoc[] entries = super.getQueue().toArray();
200             for (int i = 0; i < entries.length; ++i) {
201                 Object JavaDoc e = entries[i];
202                 if (e instanceof ScheduledFutureTask) {
203                     ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
204                     if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
205                         t.cancel(false);
206                 }
207             }
208             entries = null;
209             purge();
210         }
211     }
212
213     public boolean remove(Runnable JavaDoc task) {
214     if (!(task instanceof ScheduledFutureTask))
215         return false;
216     return getQueue().remove(task);
217     }
218
219     /**
220      * Creates a new ScheduledThreadPoolExecutor with the given core
221      * pool size.
222      *
223      * @param corePoolSize the number of threads to keep in the pool,
224      * even if they are idle.
225      * @throws IllegalArgumentException if corePoolSize less than or
226      * equal to zero
227      */

228     public ScheduledThreadPoolExecutor(int corePoolSize) {
229         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
230               new DelayedWorkQueue());
231     }
232
233     /**
234      * Creates a new ScheduledThreadPoolExecutor with the given
235      * initial parameters.
236      *
237      * @param corePoolSize the number of threads to keep in the pool,
238      * even if they are idle.
239      * @param threadFactory the factory to use when the executor
240      * creates a new thread.
241      * @throws NullPointerException if threadFactory is null
242      */

243     public ScheduledThreadPoolExecutor(int corePoolSize,
244                              ThreadFactory JavaDoc threadFactory) {
245         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
246               new DelayedWorkQueue(), threadFactory);
247     }
248
249     /**
250      * Creates a new ScheduledThreadPoolExecutor with the given
251      * initial parameters.
252      *
253      * @param corePoolSize the number of threads to keep in the pool,
254      * even if they are idle.
255      * @param handler the handler to use when execution is blocked
256      * because the thread bounds and queue capacities are reached.
257      * @throws NullPointerException if handler is null
258      */

259     public ScheduledThreadPoolExecutor(int corePoolSize,
260                               RejectedExecutionHandler JavaDoc handler) {
261         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
262               new DelayedWorkQueue(), handler);
263     }
264
265     /**
266      * Creates a new ScheduledThreadPoolExecutor with the given
267      * initial parameters.
268      *
269      * @param corePoolSize the number of threads to keep in the pool,
270      * even if they are idle.
271      * @param threadFactory the factory to use when the executor
272      * creates a new thread.
273      * @param handler the handler to use when execution is blocked
274      * because the thread bounds and queue capacities are reached.
275      * @throws NullPointerException if threadFactory or handler is null
276      */

277     public ScheduledThreadPoolExecutor(int corePoolSize,
278                               ThreadFactory JavaDoc threadFactory,
279                               RejectedExecutionHandler JavaDoc handler) {
280         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
281               new DelayedWorkQueue(), threadFactory, handler);
282     }
283
284     public ScheduledFuture JavaDoc<?> schedule(Runnable JavaDoc command,
285                                        long delay,
286                                        TimeUnit JavaDoc unit) {
287         if (command == null || unit == null)
288             throw new NullPointerException JavaDoc();
289         long triggerTime = now() + unit.toNanos(delay);
290         ScheduledFutureTask<?> t =
291             new ScheduledFutureTask<Boolean JavaDoc>(command, null, triggerTime);
292         delayedExecute(t);
293         return t;
294     }
295
296     public <V> ScheduledFuture JavaDoc<V> schedule(Callable JavaDoc<V> callable,
297                                            long delay,
298                                            TimeUnit JavaDoc unit) {
299         if (callable == null || unit == null)
300             throw new NullPointerException JavaDoc();
301         if (delay < 0) delay = 0;
302         long triggerTime = now() + unit.toNanos(delay);
303         ScheduledFutureTask<V> t =
304             new ScheduledFutureTask<V>(callable, triggerTime);
305         delayedExecute(t);
306         return t;
307     }
308
309     public ScheduledFuture JavaDoc<?> scheduleAtFixedRate(Runnable JavaDoc command,
310                                                   long initialDelay,
311                                                   long period,
312                                                   TimeUnit JavaDoc unit) {
313         if (command == null || unit == null)
314             throw new NullPointerException JavaDoc();
315         if (period <= 0)
316             throw new IllegalArgumentException JavaDoc();
317         if (initialDelay < 0) initialDelay = 0;
318         long triggerTime = now() + unit.toNanos(initialDelay);
319         ScheduledFutureTask<?> t =
320             new ScheduledFutureTask<Object JavaDoc>(command,
321                                             null,
322                                             triggerTime,
323                                             unit.toNanos(period));
324         delayedExecute(t);
325         return t;
326     }
327     
328     public ScheduledFuture JavaDoc<?> scheduleWithFixedDelay(Runnable JavaDoc command,
329                                                      long initialDelay,
330                                                      long delay,
331                                                      TimeUnit JavaDoc unit) {
332         if (command == null || unit == null)
333             throw new NullPointerException JavaDoc();
334         if (delay <= 0)
335             throw new IllegalArgumentException JavaDoc();
336         if (initialDelay < 0) initialDelay = 0;
337         long triggerTime = now() + unit.toNanos(initialDelay);
338         ScheduledFutureTask<?> t =
339             new ScheduledFutureTask<Boolean JavaDoc>(command,
340                                              null,
341                                              triggerTime,
342                                              unit.toNanos(-delay));
343         delayedExecute(t);
344         return t;
345     }
346     
347
348     /**
349      * Execute command with zero required delay. This has effect
350      * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
351      * that inspections of the queue and of the list returned by
352      * <tt>shutdownNow</tt> will access the zero-delayed
353      * {@link ScheduledFuture}, not the <tt>command</tt> itself.
354      *
355      * @param command the task to execute
356      * @throws RejectedExecutionException at discretion of
357      * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
358      * for execution because the executor has been shut down.
359      * @throws NullPointerException if command is null
360      */

361     public void execute(Runnable JavaDoc command) {
362         if (command == null)
363             throw new NullPointerException JavaDoc();
364         schedule(command, 0, TimeUnit.NANOSECONDS);
365     }
366
367     // Override AbstractExecutorService methods
368

369     public Future JavaDoc<?> submit(Runnable JavaDoc task) {
370         return schedule(task, 0, TimeUnit.NANOSECONDS);
371     }
372
373     public <T> Future JavaDoc<T> submit(Runnable JavaDoc task, T result) {
374         return schedule(Executors.callable(task, result),
375                         0, TimeUnit.NANOSECONDS);
376     }
377
378     public <T> Future JavaDoc<T> submit(Callable JavaDoc<T> task) {
379         return schedule(task, 0, TimeUnit.NANOSECONDS);
380     }
381
382     /**
383      * Set policy on whether to continue executing existing periodic
384      * tasks even when this executor has been <tt>shutdown</tt>. In
385      * this case, these tasks will only terminate upon
386      * <tt>shutdownNow</tt>, or after setting the policy to
387      * <tt>false</tt> when already shutdown. This value is by default
388      * false.
389      * @param value if true, continue after shutdown, else don't.
390      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
391      */

392     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
393         continueExistingPeriodicTasksAfterShutdown = value;
394         if (!value && isShutdown())
395             cancelUnwantedTasks();
396     }
397
398     /**
399      * Get the policy on whether to continue executing existing
400      * periodic tasks even when this executor has been
401      * <tt>shutdown</tt>. In this case, these tasks will only
402      * terminate upon <tt>shutdownNow</tt> or after setting the policy
403      * to <tt>false</tt> when already shutdown. This value is by
404      * default false.
405      * @return true if will continue after shutdown.
406      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
407      */

408     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
409         return continueExistingPeriodicTasksAfterShutdown;
410     }
411
412     /**
413      * Set policy on whether to execute existing delayed
414      * tasks even when this executor has been <tt>shutdown</tt>. In
415      * this case, these tasks will only terminate upon
416      * <tt>shutdownNow</tt>, or after setting the policy to
417      * <tt>false</tt> when already shutdown. This value is by default
418      * true.
419      * @param value if true, execute after shutdown, else don't.
420      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
421      */

422     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
423         executeExistingDelayedTasksAfterShutdown = value;
424         if (!value && isShutdown())
425             cancelUnwantedTasks();
426     }
427
428     /**
429      * Get policy on whether to execute existing delayed
430      * tasks even when this executor has been <tt>shutdown</tt>. In
431      * this case, these tasks will only terminate upon
432      * <tt>shutdownNow</tt>, or after setting the policy to
433      * <tt>false</tt> when already shutdown. This value is by default
434      * true.
435      * @return true if will execute after shutdown.
436      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
437      */

438     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
439         return executeExistingDelayedTasksAfterShutdown;
440     }
441
442
443     /**
444      * Initiates an orderly shutdown in which previously submitted
445      * tasks are executed, but no new tasks will be accepted. If the
446      * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
447      * been set <tt>false</tt>, existing delayed tasks whose delays
448      * have not yet elapsed are cancelled. And unless the
449      * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
450      * been set <tt>true</tt>, future executions of existing periodic
451      * tasks will be cancelled.
452      */

453     public void shutdown() {
454         cancelUnwantedTasks();
455         super.shutdown();
456     }
457
458     /**
459      * Attempts to stop all actively executing tasks, halts the
460      * processing of waiting tasks, and returns a list of the tasks that were
461      * awaiting execution.
462      *
463      * <p>There are no guarantees beyond best-effort attempts to stop
464      * processing actively executing tasks. This implementation
465      * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
466      * fail to respond to interrupts, they may never terminate.
467      *
468      * @return list of tasks that never commenced execution. Each
469      * element of this list is a {@link ScheduledFuture},
470      * including those tasks submitted using <tt>execute</tt>, which
471      * are for scheduling purposes used as the basis of a zero-delay
472      * <tt>ScheduledFuture</tt>.
473      */

474     public List<Runnable JavaDoc> shutdownNow() {
475         return super.shutdownNow();
476     }
477
478     /**
479      * Returns the task queue used by this executor. Each element of
480      * this queue is a {@link ScheduledFuture}, including those
481      * tasks submitted using <tt>execute</tt> which are for scheduling
482      * purposes used as the basis of a zero-delay
483      * <tt>ScheduledFuture</tt>. Iteration over this queue is
484      * <em>not</em> guaranteed to traverse tasks in the order in
485      * which they will execute.
486      *
487      * @return the task queue
488      */

489     public BlockingQueue JavaDoc<Runnable JavaDoc> getQueue() {
490         return super.getQueue();
491     }
492
493     /**
494      * An annoying wrapper class to convince generics compiler to
495      * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
496      */

497     private static class DelayedWorkQueue
498         extends AbstractCollection<Runnable JavaDoc>
499         implements BlockingQueue JavaDoc<Runnable JavaDoc> {
500         
501         private final DelayQueue JavaDoc<ScheduledFutureTask> dq = new DelayQueue JavaDoc<ScheduledFutureTask>();
502         public Runnable JavaDoc poll() { return dq.poll(); }
503         public Runnable JavaDoc peek() { return dq.peek(); }
504         public Runnable JavaDoc take() throws InterruptedException JavaDoc { return dq.take(); }
505         public Runnable JavaDoc poll(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
506             return dq.poll(timeout, unit);
507         }
508
509         public boolean add(Runnable JavaDoc x) { return dq.add((ScheduledFutureTask)x); }
510         public boolean offer(Runnable JavaDoc x) { return dq.offer((ScheduledFutureTask)x); }
511         public void put(Runnable JavaDoc x) {
512             dq.put((ScheduledFutureTask)x);
513         }
514         public boolean offer(Runnable JavaDoc x, long timeout, TimeUnit JavaDoc unit) {
515             return dq.offer((ScheduledFutureTask)x, timeout, unit);
516         }
517
518         public Runnable JavaDoc remove() { return dq.remove(); }
519         public Runnable JavaDoc element() { return dq.element(); }
520         public void clear() { dq.clear(); }
521         public int drainTo(Collection<? super Runnable JavaDoc> c) { return dq.drainTo(c); }
522         public int drainTo(Collection<? super Runnable JavaDoc> c, int maxElements) {
523             return dq.drainTo(c, maxElements);
524         }
525
526         public int remainingCapacity() { return dq.remainingCapacity(); }
527         public boolean remove(Object JavaDoc x) { return dq.remove(x); }
528         public boolean contains(Object JavaDoc x) { return dq.contains(x); }
529         public int size() { return dq.size(); }
530         public boolean isEmpty() { return dq.isEmpty(); }
531         public Object JavaDoc[] toArray() { return dq.toArray(); }
532         public <T> T[] toArray(T[] array) { return dq.toArray(array); }
533         public Iterator<Runnable JavaDoc> iterator() {
534             return new Iterator<Runnable JavaDoc>() {
535                 private Iterator<ScheduledFutureTask> it = dq.iterator();
536                 public boolean hasNext() { return it.hasNext(); }
537                 public Runnable JavaDoc next() { return it.next(); }
538                 public void remove() { it.remove(); }
539             };
540         }
541     }
542 }
543
Popular Tags