1 18 package org.apache.activemq.thread; 19 20 import java.util.concurrent.Executor ; 21 22 26 class PooledTaskRunner implements TaskRunner { 27 28 private final int maxIterationsPerRun; 29 private final Executor executor; 30 private final Task task; 31 private final Runnable runable; 32 private boolean queued; 33 private boolean shutdown; 34 private boolean iterating; 35 private Thread runningThread; 36 37 public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) { 38 this.executor = executor; 39 this.maxIterationsPerRun = maxIterationsPerRun; 40 this.task = task; 41 runable = new Runnable () { 42 public void run() { 43 runningThread = Thread.currentThread(); 44 runTask(); 45 runningThread = null; 46 } 47 }; 48 } 49 50 51 52 55 public void wakeup() throws InterruptedException { 56 synchronized( runable ) { 57 58 64 if( queued || shutdown ) 65 return; 66 67 queued=true; 68 69 if( !iterating ) { 71 executor.execute(runable); 72 } 73 } 74 } 75 76 80 public void shutdown(long timeout) throws InterruptedException { 81 synchronized(runable){ 82 shutdown=true; 83 if(runningThread!=Thread.currentThread()){ 88 if(iterating==true){ 89 runable.wait(timeout); 90 } 91 } 92 } 93 } 94 95 96 public void shutdown() throws InterruptedException { 97 shutdown(0); 98 } 99 private void runTask() { 100 101 synchronized (runable) { 102 queued = false; 103 if( shutdown ) { 104 iterating = false; 105 runable.notifyAll(); 106 return; 107 } 108 iterating = true; 109 } 110 111 boolean done=false; 114 for (int i = 0; i < maxIterationsPerRun; i++) { 115 if( !task.iterate() ) { 116 done=true; 117 break; 118 } 119 } 120 121 synchronized (runable) { 122 iterating=false; 123 if( shutdown ) { 124 queued=false; 125 runable.notifyAll(); 126 return; 127 } 128 129 if( !done ) 132 queued = true; 133 134 if( queued ) { 135 executor.execute(runable); 136 } 137 } 138 } 139 } 140 | Popular Tags |