1 18 package net.sf.mybatchfwk; 19 20 import java.util.Arrays ; 21 import java.util.ConcurrentModificationException ; 22 import java.util.HashSet ; 23 import java.util.Iterator ; 24 import java.util.List ; 25 import java.util.concurrent.AbstractExecutorService ; 26 import java.util.concurrent.BlockingQueue ; 27 import java.util.concurrent.Executors ; 28 import java.util.concurrent.Future ; 29 import java.util.concurrent.ThreadFactory ; 30 import java.util.concurrent.ThreadPoolExecutor ; 31 import java.util.concurrent.TimeUnit ; 32 import java.util.concurrent.locks.Condition ; 33 import java.util.concurrent.locks.ReentrantLock ; 34 35 39 public class PausableThreadPoolExecutor extends AbstractExecutorService implements ITaskExecutor { 40 41 private boolean isPaused; 42 private ReentrantLock pauseLock = new ReentrantLock (); 43 private Condition unpaused = pauseLock.newCondition(); 44 protected BatchService batchService = null; 45 46 49 private static final Runnable [] EMPTY_RUNNABLE_ARRAY = new Runnable [0]; 50 51 54 private static final RuntimePermission shutdownPerm = 55 new RuntimePermission ("modifyThread"); 56 57 60 protected final BlockingQueue <Runnable > workQueue; 61 62 66 private final ReentrantLock mainLock = new ReentrantLock (); 67 68 71 private final Condition termination = mainLock.newCondition(); 72 73 76 private final HashSet <Worker> workers = new HashSet <Worker>(); 77 78 83 private volatile long keepAliveTime; 84 85 90 protected volatile int corePoolSize; 91 92 97 private volatile int maximumPoolSize; 98 99 104 protected volatile int poolSize; 105 106 109 protected volatile int runState; 110 111 113 static protected final int RUNNING = 0; 114 115 static protected final int SHUTDOWN = 1; 116 117 static protected final int STOP = 2; 118 119 static protected final int TERMINATED = 3; 120 121 124 private volatile ThreadFactory threadFactory; 125 126 129 private int largestPoolSize; 130 131 135 private long completedTaskCount; 136 137 144 private Thread addThread(Runnable firstTask) { 145 Worker w = new Worker(firstTask); 146 Thread t = threadFactory.newThread(w); 147 if (t != null) { 148 w.thread = t; 149 workers.add(w); 150 int nt = ++poolSize; 151 if (nt > largestPoolSize) 152 largestPoolSize = nt; 153 } 154 return t; 155 } 156 157 164 protected boolean addIfUnderCorePoolSize(Runnable firstTask) { 165 Thread t = null; 166 final ReentrantLock mainLock = this.mainLock; 167 mainLock.lock(); 168 try { 169 if (poolSize < corePoolSize) 170 t = addThread(firstTask); 171 } finally { 172 mainLock.unlock(); 173 } 174 if (t == null) 175 return false; 176 t.start(); 177 return true; 178 } 179 180 188 protected Runnable addIfUnderMaximumPoolSize(Runnable firstTask) { 189 Thread t = null; 190 Runnable next = null; 191 final ReentrantLock mainLock = this.mainLock; 192 mainLock.lock(); 193 try { 194 if (poolSize < maximumPoolSize) { 195 next = workQueue.poll(); 196 if (next == null) 197 next = firstTask; 198 t = addThread(next); 199 } 200 } finally { 201 mainLock.unlock(); 202 } 203 if (t == null) 204 return null; 205 t.start(); 206 return next; 207 } 208 209 210 215 Runnable getTask() throws InterruptedException { 216 for (;;) { 217 switch(runState) { 218 case RUNNING: { 219 if (poolSize <= corePoolSize) return workQueue.take(); 221 222 long timeout = keepAliveTime; 223 if (timeout <= 0) return null; 225 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 226 if (r != null) 227 return r; 228 if (poolSize > corePoolSize) return null; 230 break; 232 } 233 234 case SHUTDOWN: { 235 Runnable r = workQueue.poll(); 237 if (r != null) 238 return r; 239 240 if (workQueue.isEmpty()) { 242 interruptIdleWorkers(); 243 return null; 244 } 245 246 try { 249 return workQueue.take(); 250 } catch(InterruptedException ignore) {} 251 break; 252 } 253 254 case STOP: 255 return null; 256 default: 257 assert false; 258 } 259 } 260 } 261 262 265 void interruptIdleWorkers() { 266 final ReentrantLock mainLock = this.mainLock; 267 mainLock.lock(); 268 try { 269 for (Worker w : workers) 270 w.interruptIfIdle(); 271 } finally { 272 mainLock.unlock(); 273 } 274 } 275 276 280 void workerDone(Worker w) { 281 final ReentrantLock mainLock = this.mainLock; 282 mainLock.lock(); 283 try { 284 completedTaskCount += w.completedTasks; 285 workers.remove(w); 286 if (--poolSize > 0) 287 return; 288 289 291 int state = runState; 292 assert state != TERMINATED; 293 294 if (state != STOP) { 295 if (!workQueue.isEmpty()) { 301 Thread t = addThread(null); 302 if (t != null) 303 t.start(); 304 return; 305 } 306 307 if (state == RUNNING) 309 return; 310 } 311 312 termination.signalAll(); 315 runState = TERMINATED; 316 } finally { 318 mainLock.unlock(); 319 } 320 321 assert runState == TERMINATED; 322 terminated(); 323 } 324 325 328 private class Worker implements Runnable { 329 330 336 private final ReentrantLock runLock = new ReentrantLock (); 337 338 341 private Runnable firstTask; 342 343 347 volatile long completedTasks; 348 349 353 Thread thread; 354 355 Worker(Runnable firstTask) { 356 this.firstTask = firstTask; 357 } 358 359 boolean isActive() { 360 return runLock.isLocked(); 361 } 362 363 366 void interruptIfIdle() { 367 final ReentrantLock runLock = this.runLock; 368 if (runLock.tryLock()) { 369 try { 370 thread.interrupt(); 371 } finally { 372 runLock.unlock(); 373 } 374 } 375 } 376 377 380 void interruptNow() { 381 thread.interrupt(); 382 } 383 384 387 private void runTask(Runnable task) { 388 final ReentrantLock runLock = this.runLock; 389 runLock.lock(); 390 try { 391 if (runState == STOP) 394 return; 395 396 Thread.interrupted(); boolean ran = false; 398 beforeExecute(thread, task); 399 try { 400 task.run(); 401 ran = true; 402 afterExecute(task, null); 403 ++completedTasks; 404 } catch(RuntimeException ex) { 405 if (!ran) 406 afterExecute(task, ex); 407 } 408 } finally { 409 runLock.unlock(); 410 } 411 } 412 413 416 public void run() { 417 try { 418 Runnable task = firstTask; 419 firstTask = null; 420 while (task != null || (task = getTask()) != null) { 421 runTask(task); 422 task = null; } 424 } catch(InterruptedException ie) { 425 } finally { 427 workerDone(this); 428 } 429 } 430 } 431 432 434 457 public PausableThreadPoolExecutor(BatchService batchService, int corePoolSize, 458 int maximumPoolSize, 459 long keepAliveTime, 460 TimeUnit unit, 461 BlockingQueue <Runnable > workQueue) { 462 if (corePoolSize < 0 || 463 maximumPoolSize <= 0 || 464 maximumPoolSize < corePoolSize || 465 keepAliveTime < 0) 466 throw new IllegalArgumentException (); 467 if (workQueue == null) 468 throw new NullPointerException (); 469 this.batchService = batchService; 470 this.corePoolSize = corePoolSize; 471 this.maximumPoolSize = maximumPoolSize; 472 this.workQueue = workQueue; 473 this.keepAliveTime = unit.toNanos(keepAliveTime); 474 this.threadFactory = Executors.defaultThreadFactory(); 475 } 476 477 478 492 public void execute(Runnable command) { 493 if (command == null) 494 throw new NullPointerException (); 495 if (runState != RUNNING) { 496 return; 497 } 498 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) 499 return; 500 try { 501 workQueue.put(command); 502 } catch (InterruptedException e) {} 503 addIfUnderMaximumPoolSize(command); 504 } 505 506 509 public boolean execute(ITask task) throws BatchException { 510 if (task == null) 511 throw new NullPointerException (); 512 if (runState != RUNNING) { 513 return false; 514 } 515 if (!batchService.canBeExecuted(task)) { 516 return false; 517 } 518 if (poolSize < corePoolSize && addIfUnderCorePoolSize(task)) 519 return true; 520 try { 521 workQueue.put(task); 522 } catch (InterruptedException e) { 523 return false; 524 } 525 addIfUnderMaximumPoolSize(task); 526 return true; 527 } 528 529 540 public void shutdown() { 541 SecurityManager security = System.getSecurityManager(); 547 if (security != null) 548 java.security.AccessController.checkPermission(shutdownPerm); 549 550 boolean fullyTerminated = false; 551 final ReentrantLock mainLock = this.mainLock; 552 mainLock.lock(); 553 try { 554 if (workers.size() > 0) { 555 if (security != null) { 559 for (Worker w: workers) 560 security.checkAccess(w.thread); 561 } 562 563 int state = runState; 564 if (state == RUNNING) runState = SHUTDOWN; 566 567 try { 568 for (Worker w: workers) 569 w.interruptIfIdle(); 570 } catch(SecurityException se) { 571 runState = state; 578 throw se; 579 } 580 } 581 else { fullyTerminated = true; 583 runState = TERMINATED; 584 termination.signalAll(); 585 } 586 } finally { 587 mainLock.unlock(); 588 } 589 if (fullyTerminated) 590 terminated(); 591 } 592 593 594 610 public List <Runnable > shutdownNow() { 611 SecurityManager security = System.getSecurityManager(); 613 if (security != null) 614 java.security.AccessController.checkPermission(shutdownPerm); 615 616 boolean fullyTerminated = false; 617 final ReentrantLock mainLock = this.mainLock; 618 mainLock.lock(); 619 try { 620 if (workers.size() > 0) { 621 if (security != null) { 622 for (Worker w: workers) 623 security.checkAccess(w.thread); 624 } 625 626 int state = runState; 627 if (state != TERMINATED) 628 runState = STOP; 629 try { 630 for (Worker w : workers) 631 w.interruptNow(); 632 } catch(SecurityException se) { 633 runState = state; throw se; 635 } 636 } 637 else { fullyTerminated = true; 639 runState = TERMINATED; 640 termination.signalAll(); 641 } 642 } finally { 643 mainLock.unlock(); 644 } 645 if (fullyTerminated) 646 terminated(); 647 return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY)); 648 } 649 650 public boolean isShutdown() { 651 return runState != RUNNING; 652 } 653 654 664 public boolean isTerminating() { 665 return runState == STOP; 666 } 667 668 public boolean isTerminated() { 669 return runState == TERMINATED; 670 } 671 672 public boolean awaitTermination(long timeout, TimeUnit unit) 673 throws InterruptedException { 674 long nanos = unit.toNanos(timeout); 675 final ReentrantLock mainLock = this.mainLock; 676 mainLock.lock(); 677 try { 678 for (;;) { 679 if (runState == TERMINATED) 680 return true; 681 if (nanos <= 0) 682 return false; 683 nanos = termination.awaitNanos(nanos); 684 } 685 } finally { 686 mainLock.unlock(); 687 } 688 } 689 690 694 protected void finalize() { 695 shutdown(); 696 } 697 698 705 public void setThreadFactory(ThreadFactory threadFactory) { 706 if (threadFactory == null) 707 throw new NullPointerException (); 708 this.threadFactory = threadFactory; 709 } 710 711 717 public ThreadFactory getThreadFactory() { 718 return threadFactory; 719 } 720 721 729 public BlockingQueue <Runnable > getQueue() { 730 return workQueue; 731 } 732 733 750 public boolean remove(Runnable task) { 751 return getQueue().remove(task); 752 } 753 754 755 765 public void purge() { 766 try { 768 Iterator <Runnable > it = getQueue().iterator(); 769 while (it.hasNext()) { 770 Runnable r = it.next(); 771 if (r instanceof Future <?>) { 772 Future <?> c = (Future <?>)r; 773 if (c.isCancelled()) 774 it.remove(); 775 } 776 } 777 } 778 catch(ConcurrentModificationException ex) { 779 return; 780 } 781 } 782 783 795 public void setCorePoolSize(int corePoolSize) { 796 if (corePoolSize < 0) 797 throw new IllegalArgumentException (); 798 final ReentrantLock mainLock = this.mainLock; 799 mainLock.lock(); 800 try { 801 int extra = this.corePoolSize - corePoolSize; 802 this.corePoolSize = corePoolSize; 803 if (extra < 0) { 804 int n = workQueue.size(); 805 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) { 809 Thread t = addThread(null); 810 if (t != null) 811 t.start(); 812 else 813 break; 814 } 815 } 816 else if (extra > 0 && poolSize > corePoolSize) { 817 Iterator <Worker> it = workers.iterator(); 818 while (it.hasNext() && 819 extra-- > 0 && 820 poolSize > corePoolSize && 821 workQueue.remainingCapacity() == 0) 822 it.next().interruptIfIdle(); 823 } 824 } finally { 825 mainLock.unlock(); 826 } 827 } 828 829 835 public int getCorePoolSize() { 836 return corePoolSize; 837 } 838 839 846 public boolean prestartCoreThread() { 847 return addIfUnderCorePoolSize(null); 848 } 849 850 856 public int prestartAllCoreThreads() { 857 int n = 0; 858 while (addIfUnderCorePoolSize(null)) 859 ++n; 860 return n; 861 } 862 863 874 public void setMaximumPoolSize(int maximumPoolSize) { 875 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 876 throw new IllegalArgumentException (); 877 final ReentrantLock mainLock = this.mainLock; 878 mainLock.lock(); 879 try { 880 int extra = this.maximumPoolSize - maximumPoolSize; 881 this.maximumPoolSize = maximumPoolSize; 882 if (extra > 0 && poolSize > maximumPoolSize) { 883 Iterator <Worker> it = workers.iterator(); 884 while (it.hasNext() && 885 extra > 0 && 886 poolSize > maximumPoolSize) { 887 it.next().interruptIfIdle(); 888 --extra; 889 } 890 } 891 } finally { 892 mainLock.unlock(); 893 } 894 } 895 896 902 public int getMaximumPoolSize() { 903 return maximumPoolSize; 904 } 905 906 918 public void setKeepAliveTime(long time, TimeUnit unit) { 919 if (time < 0) 920 throw new IllegalArgumentException (); 921 this.keepAliveTime = unit.toNanos(time); 922 } 923 924 933 public long getKeepAliveTime(TimeUnit unit) { 934 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 935 } 936 937 938 939 944 public int getPoolSize() { 945 return poolSize; 946 } 947 948 954 public int getActiveCount() { 955 final ReentrantLock mainLock = this.mainLock; 956 mainLock.lock(); 957 try { 958 int n = 0; 959 for (Worker w : workers) { 960 if (w.isActive()) 961 ++n; 962 } 963 return n; 964 } finally { 965 mainLock.unlock(); 966 } 967 } 968 969 975 public int getLargestPoolSize() { 976 final ReentrantLock mainLock = this.mainLock; 977 mainLock.lock(); 978 try { 979 return largestPoolSize; 980 } finally { 981 mainLock.unlock(); 982 } 983 } 984 985 994 public long getTaskCount() { 995 final ReentrantLock mainLock = this.mainLock; 996 mainLock.lock(); 997 try { 998 long n = completedTaskCount; 999 for (Worker w : workers) { 1000 n += w.completedTasks; 1001 if (w.isActive()) 1002 ++n; 1003 } 1004 return n + workQueue.size(); 1005 } finally { 1006 mainLock.unlock(); 1007 } 1008 } 1009 1010 1019 public long getCompletedTaskCount() { 1020 final ReentrantLock mainLock = this.mainLock; 1021 mainLock.lock(); 1022 try { 1023 long n = completedTaskCount; 1024 for (Worker w : workers) 1025 n += w.completedTasks; 1026 return n; 1027 } finally { 1028 mainLock.unlock(); 1029 } 1030 } 1031 1032 1038 protected void terminated() { } 1039 1040 1051 protected void beforeExecute(Thread t, Runnable r) { 1052 pauseLock.lock(); 1053 try { 1054 while (isPaused) { 1055 unpaused.await(); 1056 } 1057 } catch(InterruptedException ie) { 1058 t.interrupt(); 1059 } finally { 1060 pauseLock.unlock(); 1061 } 1062 } 1063 1064 1076 protected void afterExecute(Runnable r, Throwable t) { 1077 batchService.afterExecute(((ITask)r), t); 1078 } 1079 1080 1083 public void pause() { 1084 pauseLock.lock(); 1085 try { 1086 isPaused = true; 1087 } finally { 1088 pauseLock.unlock(); 1089 } 1090 } 1091 1092 1095 public void resume() { 1096 pauseLock.lock(); 1097 try { 1098 isPaused = false; 1099 unpaused.signalAll(); 1100 } finally { 1101 pauseLock.unlock(); 1102 } 1103 } 1104 1105 1108 public boolean isRunning() { 1109 return (runState == RUNNING); 1110 } 1111} 1112 | Popular Tags |