1 7 8 package java.util.concurrent; 9 import java.util.concurrent.locks.*; 10 import java.util.*; 11 12 277 public class ThreadPoolExecutor extends AbstractExecutorService { 278 281 private static final Runnable [] EMPTY_RUNNABLE_ARRAY = new Runnable [0]; 282 283 286 private static final RuntimePermission shutdownPerm = 287 new RuntimePermission ("modifyThread"); 288 289 292 private final BlockingQueue <Runnable > workQueue; 293 294 298 private final ReentrantLock mainLock = new ReentrantLock(); 299 300 303 private final Condition termination = mainLock.newCondition(); 304 305 308 private final HashSet<Worker> workers = new HashSet<Worker>(); 309 310 315 private volatile long keepAliveTime; 316 317 322 private volatile int corePoolSize; 323 324 329 private volatile int maximumPoolSize; 330 331 336 private volatile int poolSize; 337 338 341 volatile int runState; 342 343 345 static final int RUNNING = 0; 346 347 static final int SHUTDOWN = 1; 348 349 static final int STOP = 2; 350 351 static final int TERMINATED = 3; 352 353 356 private volatile RejectedExecutionHandler handler; 357 358 361 private volatile ThreadFactory threadFactory; 362 363 366 private int largestPoolSize; 367 368 372 private long completedTaskCount; 373 374 377 private static final RejectedExecutionHandler defaultHandler = 378 new AbortPolicy(); 379 380 383 void reject(Runnable command) { 384 handler.rejectedExecution(command, this); 385 } 386 387 394 private Thread addThread(Runnable firstTask) { 395 Worker w = new Worker(firstTask); 396 Thread t = threadFactory.newThread(w); 397 if (t != null) { 398 w.thread = t; 399 workers.add(w); 400 int nt = ++poolSize; 401 if (nt > largestPoolSize) 402 largestPoolSize = nt; 403 } 404 return t; 405 } 406 407 414 private boolean addIfUnderCorePoolSize(Runnable firstTask) { 415 Thread t = null; 416 final ReentrantLock mainLock = this.mainLock; 417 mainLock.lock(); 418 try { 419 if (poolSize < corePoolSize) 420 t = addThread(firstTask); 421 } finally { 422 mainLock.unlock(); 423 } 424 if (t == null) 425 return false; 426 t.start(); 427 return true; 428 } 429 430 438 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) { 439 Thread t = null; 440 Runnable next = null; 441 final ReentrantLock mainLock = this.mainLock; 442 mainLock.lock(); 443 try { 444 if (poolSize < maximumPoolSize) { 445 next = workQueue.poll(); 446 if (next == null) 447 next = firstTask; 448 t = addThread(next); 449 } 450 } finally { 451 mainLock.unlock(); 452 } 453 if (t == null) 454 return null; 455 t.start(); 456 return next; 457 } 458 459 460 465 Runnable getTask() throws InterruptedException { 466 for (;;) { 467 switch(runState) { 468 case RUNNING: { 469 if (poolSize <= corePoolSize) return workQueue.take(); 471 472 long timeout = keepAliveTime; 473 if (timeout <= 0) return null; 475 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 476 if (r != null) 477 return r; 478 if (poolSize > corePoolSize) return null; 480 break; 482 } 483 484 case SHUTDOWN: { 485 Runnable r = workQueue.poll(); 487 if (r != null) 488 return r; 489 490 if (workQueue.isEmpty()) { 492 interruptIdleWorkers(); 493 return null; 494 } 495 496 try { 499 return workQueue.take(); 500 } catch(InterruptedException ignore) {} 501 break; 502 } 503 504 case STOP: 505 return null; 506 default: 507 assert false; 508 } 509 } 510 } 511 512 515 void interruptIdleWorkers() { 516 final ReentrantLock mainLock = this.mainLock; 517 mainLock.lock(); 518 try { 519 for (Worker w : workers) 520 w.interruptIfIdle(); 521 } finally { 522 mainLock.unlock(); 523 } 524 } 525 526 530 void workerDone(Worker w) { 531 final ReentrantLock mainLock = this.mainLock; 532 mainLock.lock(); 533 try { 534 completedTaskCount += w.completedTasks; 535 workers.remove(w); 536 if (--poolSize > 0) 537 return; 538 539 541 int state = runState; 542 assert state != TERMINATED; 543 544 if (state != STOP) { 545 if (!workQueue.isEmpty()) { 551 Thread t = addThread(null); 552 if (t != null) 553 t.start(); 554 return; 555 } 556 557 if (state == RUNNING) 559 return; 560 } 561 562 termination.signalAll(); 565 runState = TERMINATED; 566 } finally { 568 mainLock.unlock(); 569 } 570 571 assert runState == TERMINATED; 572 terminated(); 573 } 574 575 578 private class Worker implements Runnable { 579 580 586 private final ReentrantLock runLock = new ReentrantLock(); 587 588 591 private Runnable firstTask; 592 593 597 volatile long completedTasks; 598 599 603 Thread thread; 604 605 Worker(Runnable firstTask) { 606 this.firstTask = firstTask; 607 } 608 609 boolean isActive() { 610 return runLock.isLocked(); 611 } 612 613 616 void interruptIfIdle() { 617 final ReentrantLock runLock = this.runLock; 618 if (runLock.tryLock()) { 619 try { 620 thread.interrupt(); 621 } finally { 622 runLock.unlock(); 623 } 624 } 625 } 626 627 630 void interruptNow() { 631 thread.interrupt(); 632 } 633 634 637 private void runTask(Runnable task) { 638 final ReentrantLock runLock = this.runLock; 639 runLock.lock(); 640 try { 641 if (runState == STOP) 644 return; 645 646 Thread.interrupted(); boolean ran = false; 648 beforeExecute(thread, task); 649 try { 650 task.run(); 651 ran = true; 652 afterExecute(task, null); 653 ++completedTasks; 654 } catch(RuntimeException ex) { 655 if (!ran) 656 afterExecute(task, ex); 657 throw ex; 661 } 662 } finally { 663 runLock.unlock(); 664 } 665 } 666 667 670 public void run() { 671 try { 672 Runnable task = firstTask; 673 firstTask = null; 674 while (task != null || (task = getTask()) != null) { 675 runTask(task); 676 task = null; } 678 } catch(InterruptedException ie) { 679 } finally { 681 workerDone(this); 682 } 683 } 684 } 685 686 688 711 public ThreadPoolExecutor(int corePoolSize, 712 int maximumPoolSize, 713 long keepAliveTime, 714 TimeUnit unit, 715 BlockingQueue <Runnable > workQueue) { 716 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 717 Executors.defaultThreadFactory(), defaultHandler); 718 } 719 720 744 public ThreadPoolExecutor(int corePoolSize, 745 int maximumPoolSize, 746 long keepAliveTime, 747 TimeUnit unit, 748 BlockingQueue <Runnable > workQueue, 749 ThreadFactory threadFactory) { 750 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 751 threadFactory, defaultHandler); 752 } 753 754 778 public ThreadPoolExecutor(int corePoolSize, 779 int maximumPoolSize, 780 long keepAliveTime, 781 TimeUnit unit, 782 BlockingQueue <Runnable > workQueue, 783 RejectedExecutionHandler handler) { 784 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 785 Executors.defaultThreadFactory(), handler); 786 } 787 788 814 public ThreadPoolExecutor(int corePoolSize, 815 int maximumPoolSize, 816 long keepAliveTime, 817 TimeUnit unit, 818 BlockingQueue <Runnable > workQueue, 819 ThreadFactory threadFactory, 820 RejectedExecutionHandler handler) { 821 if (corePoolSize < 0 || 822 maximumPoolSize <= 0 || 823 maximumPoolSize < corePoolSize || 824 keepAliveTime < 0) 825 throw new IllegalArgumentException (); 826 if (workQueue == null || threadFactory == null || handler == null) 827 throw new NullPointerException (); 828 this.corePoolSize = corePoolSize; 829 this.maximumPoolSize = maximumPoolSize; 830 this.workQueue = workQueue; 831 this.keepAliveTime = unit.toNanos(keepAliveTime); 832 this.threadFactory = threadFactory; 833 this.handler = handler; 834 } 835 836 837 851 public void execute(Runnable command) { 852 if (command == null) 853 throw new NullPointerException (); 854 for (;;) { 855 if (runState != RUNNING) { 856 reject(command); 857 return; 858 } 859 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) 860 return; 861 if (workQueue.offer(command)) 862 return; 863 Runnable r = addIfUnderMaximumPoolSize(command); 864 if (r == command) 865 return; 866 if (r == null) { 867 reject(command); 868 return; 869 } 870 } 872 } 873 874 885 public void shutdown() { 886 SecurityManager security = System.getSecurityManager(); 892 if (security != null) 893 java.security.AccessController.checkPermission(shutdownPerm); 894 895 boolean fullyTerminated = false; 896 final ReentrantLock mainLock = this.mainLock; 897 mainLock.lock(); 898 try { 899 if (workers.size() > 0) { 900 if (security != null) { 904 for (Worker w: workers) 905 security.checkAccess(w.thread); 906 } 907 908 int state = runState; 909 if (state == RUNNING) runState = SHUTDOWN; 911 912 try { 913 for (Worker w: workers) 914 w.interruptIfIdle(); 915 } catch(SecurityException se) { 916 runState = state; 923 throw se; 924 } 925 } 926 else { fullyTerminated = true; 928 runState = TERMINATED; 929 termination.signalAll(); 930 } 931 } finally { 932 mainLock.unlock(); 933 } 934 if (fullyTerminated) 935 terminated(); 936 } 937 938 939 955 public List<Runnable > shutdownNow() { 956 SecurityManager security = System.getSecurityManager(); 958 if (security != null) 959 java.security.AccessController.checkPermission(shutdownPerm); 960 961 boolean fullyTerminated = false; 962 final ReentrantLock mainLock = this.mainLock; 963 mainLock.lock(); 964 try { 965 if (workers.size() > 0) { 966 if (security != null) { 967 for (Worker w: workers) 968 security.checkAccess(w.thread); 969 } 970 971 int state = runState; 972 if (state != TERMINATED) 973 runState = STOP; 974 try { 975 for (Worker w : workers) 976 w.interruptNow(); 977 } catch(SecurityException se) { 978 runState = state; throw se; 980 } 981 } 982 else { fullyTerminated = true; 984 runState = TERMINATED; 985 termination.signalAll(); 986 } 987 } finally { 988 mainLock.unlock(); 989 } 990 if (fullyTerminated) 991 terminated(); 992 return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY)); 993 } 994 995 public boolean isShutdown() { 996 return runState != RUNNING; 997 } 998 999 1009 public boolean isTerminating() { 1010 return runState == STOP; 1011 } 1012 1013 public boolean isTerminated() { 1014 return runState == TERMINATED; 1015 } 1016 1017 public boolean awaitTermination(long timeout, TimeUnit unit) 1018 throws InterruptedException { 1019 long nanos = unit.toNanos(timeout); 1020 final ReentrantLock mainLock = this.mainLock; 1021 mainLock.lock(); 1022 try { 1023 for (;;) { 1024 if (runState == TERMINATED) 1025 return true; 1026 if (nanos <= 0) 1027 return false; 1028 nanos = termination.awaitNanos(nanos); 1029 } 1030 } finally { 1031 mainLock.unlock(); 1032 } 1033 } 1034 1035 1039 protected void finalize() { 1040 shutdown(); 1041 } 1042 1043 1050 public void setThreadFactory(ThreadFactory threadFactory) { 1051 if (threadFactory == null) 1052 throw new NullPointerException (); 1053 this.threadFactory = threadFactory; 1054 } 1055 1056 1062 public ThreadFactory getThreadFactory() { 1063 return threadFactory; 1064 } 1065 1066 1073 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1074 if (handler == null) 1075 throw new NullPointerException (); 1076 this.handler = handler; 1077 } 1078 1079 1085 public RejectedExecutionHandler getRejectedExecutionHandler() { 1086 return handler; 1087 } 1088 1089 1097 public BlockingQueue <Runnable > getQueue() { 1098 return workQueue; 1099 } 1100 1101 1118 public boolean remove(Runnable task) { 1119 return getQueue().remove(task); 1120 } 1121 1122 1123 1133 public void purge() { 1134 try { 1136 Iterator<Runnable > it = getQueue().iterator(); 1137 while (it.hasNext()) { 1138 Runnable r = it.next(); 1139 if (r instanceof Future <?>) { 1140 Future <?> c = (Future <?>)r; 1141 if (c.isCancelled()) 1142 it.remove(); 1143 } 1144 } 1145 } 1146 catch(ConcurrentModificationException ex) { 1147 return; 1148 } 1149 } 1150 1151 1163 public void setCorePoolSize(int corePoolSize) { 1164 if (corePoolSize < 0) 1165 throw new IllegalArgumentException (); 1166 final ReentrantLock mainLock = this.mainLock; 1167 mainLock.lock(); 1168 try { 1169 int extra = this.corePoolSize - corePoolSize; 1170 this.corePoolSize = corePoolSize; 1171 if (extra < 0) { 1172 int n = workQueue.size(); 1173 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) { 1177 Thread t = addThread(null); 1178 if (t != null) 1179 t.start(); 1180 else 1181 break; 1182 } 1183 } 1184 else if (extra > 0 && poolSize > corePoolSize) { 1185 Iterator<Worker> it = workers.iterator(); 1186 while (it.hasNext() && 1187 extra-- > 0 && 1188 poolSize > corePoolSize && 1189 workQueue.remainingCapacity() == 0) 1190 it.next().interruptIfIdle(); 1191 } 1192 } finally { 1193 mainLock.unlock(); 1194 } 1195 } 1196 1197 1203 public int getCorePoolSize() { 1204 return corePoolSize; 1205 } 1206 1207 1214 public boolean prestartCoreThread() { 1215 return addIfUnderCorePoolSize(null); 1216 } 1217 1218 1224 public int prestartAllCoreThreads() { 1225 int n = 0; 1226 while (addIfUnderCorePoolSize(null)) 1227 ++n; 1228 return n; 1229 } 1230 1231 1242 public void setMaximumPoolSize(int maximumPoolSize) { 1243 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1244 throw new IllegalArgumentException (); 1245 final ReentrantLock mainLock = this.mainLock; 1246 mainLock.lock(); 1247 try { 1248 int extra = this.maximumPoolSize - maximumPoolSize; 1249 this.maximumPoolSize = maximumPoolSize; 1250 if (extra > 0 && poolSize > maximumPoolSize) { 1251 Iterator<Worker> it = workers.iterator(); 1252 while (it.hasNext() && 1253 extra > 0 && 1254 poolSize > maximumPoolSize) { 1255 it.next().interruptIfIdle(); 1256 --extra; 1257 } 1258 } 1259 } finally { 1260 mainLock.unlock(); 1261 } 1262 } 1263 1264 1270 public int getMaximumPoolSize() { 1271 return maximumPoolSize; 1272 } 1273 1274 1286 public void setKeepAliveTime(long time, TimeUnit unit) { 1287 if (time < 0) 1288 throw new IllegalArgumentException (); 1289 this.keepAliveTime = unit.toNanos(time); 1290 } 1291 1292 1301 public long getKeepAliveTime(TimeUnit unit) { 1302 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1303 } 1304 1305 1306 1307 1312 public int getPoolSize() { 1313 return poolSize; 1314 } 1315 1316 1322 public int getActiveCount() { 1323 final ReentrantLock mainLock = this.mainLock; 1324 mainLock.lock(); 1325 try { 1326 int n = 0; 1327 for (Worker w : workers) { 1328 if (w.isActive()) 1329 ++n; 1330 } 1331 return n; 1332 } finally { 1333 mainLock.unlock(); 1334 } 1335 } 1336 1337 1343 public int getLargestPoolSize() { 1344 final ReentrantLock mainLock = this.mainLock; 1345 mainLock.lock(); 1346 try { 1347 return largestPoolSize; 1348 } finally { 1349 mainLock.unlock(); 1350 } 1351 } 1352 1353 1362 public long getTaskCount() { 1363 final ReentrantLock mainLock = this.mainLock; 1364 mainLock.lock(); 1365 try { 1366 long n = completedTaskCount; 1367 for (Worker w : workers) { 1368 n += w.completedTasks; 1369 if (w.isActive()) 1370 ++n; 1371 } 1372 return n + workQueue.size(); 1373 } finally { 1374 mainLock.unlock(); 1375 } 1376 } 1377 1378 1387 public long getCompletedTaskCount() { 1388 final ReentrantLock mainLock = this.mainLock; 1389 mainLock.lock(); 1390 try { 1391 long n = completedTaskCount; 1392 for (Worker w : workers) 1393 n += w.completedTasks; 1394 return n; 1395 } finally { 1396 mainLock.unlock(); 1397 } 1398 } 1399 1400 1411 protected void beforeExecute(Thread t, Runnable r) { } 1412 1413 1425 protected void afterExecute(Runnable r, Throwable t) { } 1426 1427 1433 protected void terminated() { } 1434 1435 1441 public static class CallerRunsPolicy implements RejectedExecutionHandler { 1442 1445 public CallerRunsPolicy() { } 1446 1447 1453 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1454 if (!e.isShutdown()) { 1455 r.run(); 1456 } 1457 } 1458 } 1459 1460 1464 public static class AbortPolicy implements RejectedExecutionHandler { 1465 1468 public AbortPolicy() { } 1469 1470 1476 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1477 throw new RejectedExecutionException (); 1478 } 1479 } 1480 1481 1485 public static class DiscardPolicy implements RejectedExecutionHandler { 1486 1489 public DiscardPolicy() { } 1490 1491 1496 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1497 } 1498 } 1499 1500 1505 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 1506 1509 public DiscardOldestPolicy() { } 1510 1511 1519 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1520 if (!e.isShutdown()) { 1521 e.getQueue().poll(); 1522 e.execute(r); 1523 } 1524 } 1525 } 1526} 1527 | Popular Tags |