1 7 8 package java.util.concurrent; 9 import java.util.concurrent.atomic.*; 10 import java.util.*; 11 12 35 public class ScheduledThreadPoolExecutor 36 extends ThreadPoolExecutor 37 implements ScheduledExecutorService { 38 39 42 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 43 44 47 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 48 49 53 private static final AtomicLong sequencer = new AtomicLong(0); 54 55 56 private static final long NANO_ORIGIN = System.nanoTime(); 57 58 61 final long now() { 62 return System.nanoTime() - NANO_ORIGIN; 63 } 64 65 private class ScheduledFutureTask<V> 66 extends FutureTask <V> implements ScheduledFuture <V> { 67 68 69 private final long sequenceNumber; 70 71 private long time; 72 78 private final long period; 79 80 83 ScheduledFutureTask(Runnable r, V result, long ns) { 84 super(r, result); 85 this.time = ns; 86 this.period = 0; 87 this.sequenceNumber = sequencer.getAndIncrement(); 88 } 89 90 93 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 94 super(r, result); 95 this.time = ns; 96 this.period = period; 97 this.sequenceNumber = sequencer.getAndIncrement(); 98 } 99 100 103 ScheduledFutureTask(Callable <V> callable, long ns) { 104 super(callable); 105 this.time = ns; 106 this.period = 0; 107 this.sequenceNumber = sequencer.getAndIncrement(); 108 } 109 110 public long getDelay(TimeUnit unit) { 111 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); 112 return d; 113 } 114 115 public int compareTo(Delayed other) { 116 if (other == this) return 0; 118 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 119 long diff = time - x.time; 120 if (diff < 0) 121 return -1; 122 else if (diff > 0) 123 return 1; 124 else if (sequenceNumber < x.sequenceNumber) 125 return -1; 126 else 127 return 1; 128 } 129 130 134 boolean isPeriodic() { 135 return period != 0; 136 } 137 138 141 private void runPeriodic() { 142 boolean ok = ScheduledFutureTask.super.runAndReset(); 143 boolean down = isShutdown(); 144 if (ok && (!down || 146 (getContinueExistingPeriodicTasksAfterShutdownPolicy() && 147 !isTerminating()))) { 148 long p = period; 149 if (p > 0) 150 time += p; 151 else 152 time = now() - p; 153 ScheduledThreadPoolExecutor.super.getQueue().add(this); 154 } 155 else if (down) 158 interruptIdleWorkers(); 159 } 160 161 164 public void run() { 165 if (isPeriodic()) 166 runPeriodic(); 167 else 168 ScheduledFutureTask.super.run(); 169 } 170 } 171 172 175 private void delayedExecute(Runnable command) { 176 if (isShutdown()) { 177 reject(command); 178 return; 179 } 180 if (getPoolSize() < getCorePoolSize()) 184 prestartCoreThread(); 185 186 super.getQueue().add(command); 187 } 188 189 193 private void cancelUnwantedTasks() { 194 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); 195 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); 196 if (!keepDelayed && !keepPeriodic) 197 super.getQueue().clear(); 198 else if (keepDelayed || keepPeriodic) { 199 Object [] entries = super.getQueue().toArray(); 200 for (int i = 0; i < entries.length; ++i) { 201 Object e = entries[i]; 202 if (e instanceof ScheduledFutureTask) { 203 ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e; 204 if (t.isPeriodic()? !keepPeriodic : !keepDelayed) 205 t.cancel(false); 206 } 207 } 208 entries = null; 209 purge(); 210 } 211 } 212 213 public boolean remove(Runnable task) { 214 if (!(task instanceof ScheduledFutureTask)) 215 return false; 216 return getQueue().remove(task); 217 } 218 219 228 public ScheduledThreadPoolExecutor(int corePoolSize) { 229 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 230 new DelayedWorkQueue()); 231 } 232 233 243 public ScheduledThreadPoolExecutor(int corePoolSize, 244 ThreadFactory threadFactory) { 245 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 246 new DelayedWorkQueue(), threadFactory); 247 } 248 249 259 public ScheduledThreadPoolExecutor(int corePoolSize, 260 RejectedExecutionHandler handler) { 261 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 262 new DelayedWorkQueue(), handler); 263 } 264 265 277 public ScheduledThreadPoolExecutor(int corePoolSize, 278 ThreadFactory threadFactory, 279 RejectedExecutionHandler handler) { 280 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 281 new DelayedWorkQueue(), threadFactory, handler); 282 } 283 284 public ScheduledFuture <?> schedule(Runnable command, 285 long delay, 286 TimeUnit unit) { 287 if (command == null || unit == null) 288 throw new NullPointerException (); 289 long triggerTime = now() + unit.toNanos(delay); 290 ScheduledFutureTask<?> t = 291 new ScheduledFutureTask<Boolean >(command, null, triggerTime); 292 delayedExecute(t); 293 return t; 294 } 295 296 public <V> ScheduledFuture <V> schedule(Callable <V> callable, 297 long delay, 298 TimeUnit unit) { 299 if (callable == null || unit == null) 300 throw new NullPointerException (); 301 if (delay < 0) delay = 0; 302 long triggerTime = now() + unit.toNanos(delay); 303 ScheduledFutureTask<V> t = 304 new ScheduledFutureTask<V>(callable, triggerTime); 305 delayedExecute(t); 306 return t; 307 } 308 309 public ScheduledFuture <?> scheduleAtFixedRate(Runnable command, 310 long initialDelay, 311 long period, 312 TimeUnit unit) { 313 if (command == null || unit == null) 314 throw new NullPointerException (); 315 if (period <= 0) 316 throw new IllegalArgumentException (); 317 if (initialDelay < 0) initialDelay = 0; 318 long triggerTime = now() + unit.toNanos(initialDelay); 319 ScheduledFutureTask<?> t = 320 new ScheduledFutureTask<Object >(command, 321 null, 322 triggerTime, 323 unit.toNanos(period)); 324 delayedExecute(t); 325 return t; 326 } 327 328 public ScheduledFuture <?> scheduleWithFixedDelay(Runnable command, 329 long initialDelay, 330 long delay, 331 TimeUnit unit) { 332 if (command == null || unit == null) 333 throw new NullPointerException (); 334 if (delay <= 0) 335 throw new IllegalArgumentException (); 336 if (initialDelay < 0) initialDelay = 0; 337 long triggerTime = now() + unit.toNanos(initialDelay); 338 ScheduledFutureTask<?> t = 339 new ScheduledFutureTask<Boolean >(command, 340 null, 341 triggerTime, 342 unit.toNanos(-delay)); 343 delayedExecute(t); 344 return t; 345 } 346 347 348 361 public void execute(Runnable command) { 362 if (command == null) 363 throw new NullPointerException (); 364 schedule(command, 0, TimeUnit.NANOSECONDS); 365 } 366 367 369 public Future <?> submit(Runnable task) { 370 return schedule(task, 0, TimeUnit.NANOSECONDS); 371 } 372 373 public <T> Future <T> submit(Runnable task, T result) { 374 return schedule(Executors.callable(task, result), 375 0, TimeUnit.NANOSECONDS); 376 } 377 378 public <T> Future <T> submit(Callable <T> task) { 379 return schedule(task, 0, TimeUnit.NANOSECONDS); 380 } 381 382 392 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 393 continueExistingPeriodicTasksAfterShutdown = value; 394 if (!value && isShutdown()) 395 cancelUnwantedTasks(); 396 } 397 398 408 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 409 return continueExistingPeriodicTasksAfterShutdown; 410 } 411 412 422 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 423 executeExistingDelayedTasksAfterShutdown = value; 424 if (!value && isShutdown()) 425 cancelUnwantedTasks(); 426 } 427 428 438 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 439 return executeExistingDelayedTasksAfterShutdown; 440 } 441 442 443 453 public void shutdown() { 454 cancelUnwantedTasks(); 455 super.shutdown(); 456 } 457 458 474 public List<Runnable > shutdownNow() { 475 return super.shutdownNow(); 476 } 477 478 489 public BlockingQueue <Runnable > getQueue() { 490 return super.getQueue(); 491 } 492 493 497 private static class DelayedWorkQueue 498 extends AbstractCollection<Runnable > 499 implements BlockingQueue <Runnable > { 500 501 private final DelayQueue <ScheduledFutureTask> dq = new DelayQueue <ScheduledFutureTask>(); 502 public Runnable poll() { return dq.poll(); } 503 public Runnable peek() { return dq.peek(); } 504 public Runnable take() throws InterruptedException { return dq.take(); } 505 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 506 return dq.poll(timeout, unit); 507 } 508 509 public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); } 510 public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); } 511 public void put(Runnable x) { 512 dq.put((ScheduledFutureTask)x); 513 } 514 public boolean offer(Runnable x, long timeout, TimeUnit unit) { 515 return dq.offer((ScheduledFutureTask)x, timeout, unit); 516 } 517 518 public Runnable remove() { return dq.remove(); } 519 public Runnable element() { return dq.element(); } 520 public void clear() { dq.clear(); } 521 public int drainTo(Collection<? super Runnable > c) { return dq.drainTo(c); } 522 public int drainTo(Collection<? super Runnable > c, int maxElements) { 523 return dq.drainTo(c, maxElements); 524 } 525 526 public int remainingCapacity() { return dq.remainingCapacity(); } 527 public boolean remove(Object x) { return dq.remove(x); } 528 public boolean contains(Object x) { return dq.contains(x); } 529 public int size() { return dq.size(); } 530 public boolean isEmpty() { return dq.isEmpty(); } 531 public Object [] toArray() { return dq.toArray(); } 532 public <T> T[] toArray(T[] array) { return dq.toArray(array); } 533 public Iterator<Runnable > iterator() { 534 return new Iterator<Runnable >() { 535 private Iterator<ScheduledFutureTask> it = dq.iterator(); 536 public boolean hasNext() { return it.hasNext(); } 537 public Runnable next() { return it.next(); } 538 public void remove() { it.remove(); } 539 }; 540 } 541 } 542 } 543 | Popular Tags |