1 4 package java.util.concurrent; 5 6 import java.util.concurrent.locks.Condition ; 7 import java.util.concurrent.locks.ReentrantLock ; 8 9 public class FutureTaskTC implements Future , Runnable { 10 private final Sync sync; 11 12 public FutureTaskTC(Callable callable) { 13 if (callable == null) throw new NullPointerException (); 14 sync = new Sync(callable); 15 } 16 17 public FutureTaskTC(Runnable runnable, Object result) { 18 sync = new Sync(Executors.callable(runnable, result)); 19 } 20 21 public boolean isCancelled() { 22 return sync.innerIsCancelled(); 23 } 24 25 public boolean isDone() { 26 return sync.innerIsDone(); 27 } 28 29 public boolean cancel(boolean mayInterruptIfRunning) { 30 return sync.innerCancel(mayInterruptIfRunning); 31 } 32 33 public Object get() throws InterruptedException , ExecutionException { 34 return sync.innerGet(); 35 } 36 37 public Object get(long timeout, TimeUnit unit) throws InterruptedException , ExecutionException , TimeoutException { 38 return sync.innerGet(unit.toNanos(timeout)); 39 } 40 41 protected void done() { 42 } 44 45 protected void set(Object v) { 46 sync.innerSet(v); 47 } 48 49 protected void setException(Throwable t) { 50 sync.innerSetException(t); 51 } 52 53 public void run() { 54 sync.innerRun(); 55 } 56 57 protected boolean runAndReset() { 58 return sync.innerRunAndReset(); 59 } 60 61 private final class Sync { 62 private static final int RUNNING = 1; 63 private static final int RAN = 2; 64 private static final int CANCELLED = 4; 65 66 private final Callable callable; 67 private Object result; 68 private Throwable exception; 69 70 private int state; 71 private final ReentrantLock lock; 72 private final Condition ran; 73 74 private transient volatile Thread runner; 75 private Object proxyRunner; 76 77 Sync(Callable callable) { 78 this.callable = callable; 79 lock = new ReentrantLock (); 80 ran = lock.newCondition(); 81 } 82 83 Sync() { 84 callable = null; 85 lock = null; 86 ran = null; 87 } 88 89 private boolean ranOrCancelled(int state) { 90 return (state & (RAN | CANCELLED)) != 0; 91 } 92 93 private int tryAcquireShared() { 94 return innerIsDone() ? 1 : -1; 95 } 96 97 protected boolean tryReleaseShared(int ignore) { 98 managedTryReleaseShared(); 99 return true; 100 } 101 102 private boolean managedTryReleaseShared() { 103 runner = null; 104 proxyRunner = null; 105 ran.signalAll(); 106 return true; 107 } 108 109 private final void setState(int state) { 110 this.state = state; 111 } 112 113 private final int getSynchronizedState() { 114 lock.lock(); 115 try { 116 return state; 117 } finally { 118 lock.unlock(); 119 } 120 } 121 122 private final boolean compareAndSetState(int expected, int newValue) { 123 lock.lock(); 124 try { 125 int s = state; 126 if (s == expected) { 127 setState(newValue); 128 return true; 129 } 130 } finally { 131 lock.unlock(); 132 } 133 return false; 134 } 135 136 boolean innerIsCancelled() { 137 return getSynchronizedState() == CANCELLED; 138 } 139 140 boolean innerIsDone() { 141 lock.lock(); 142 try { 143 return ranOrCancelled(state) && proxyRunner == null; 144 } finally { 145 lock.unlock(); 146 } 147 } 148 149 Object innerGet() throws InterruptedException , ExecutionException { 150 lock.lock(); 151 try { 152 while (tryAcquireShared() < 0) { 153 ran.await(); 154 } 155 } finally { 156 lock.unlock(); 157 } 158 159 if (getSynchronizedState() == CANCELLED) throw new CancellationException (); 160 if (exception != null) throw new ExecutionException (exception); 161 return result; 162 } 163 164 Object innerGet(long nanosTimeout) throws InterruptedException , ExecutionException , TimeoutException { 165 lock.lock(); 166 try { 167 long startTime = System.nanoTime(); 168 while ((tryAcquireShared() < 0) && (nanosTimeout > 0)) { 169 ran.await(nanosTimeout, TimeUnit.NANOSECONDS); 170 long endTime = System.nanoTime(); 171 nanosTimeout -= (endTime - startTime); 172 } 173 if (tryAcquireShared() < 0) { throw new TimeoutException (); } 174 } finally { 175 lock.unlock(); 176 } 177 178 if (getSynchronizedState() == CANCELLED) throw new CancellationException (); 179 if (exception != null) throw new ExecutionException (exception); 180 return result; 181 } 182 183 void innerSet(Object v) { 184 lock.lock(); 185 try { 186 managedInnerSet(v); 187 } finally { 188 lock.unlock(); 189 } 190 } 191 192 private void managedInnerSet(Object v) { 193 int s = state; 194 if (ranOrCancelled(s)) return; 195 setState(RAN); 196 result = v; 197 managedTryReleaseShared(); 198 done(); 199 } 200 201 void innerSetException(Throwable t) { 202 lock.lock(); 203 try { 204 int s = state; 205 if (ranOrCancelled(s)) return; 206 setState(RAN); 207 exception = t; 208 result = null; 209 managedTryReleaseShared(); 210 done(); 211 } finally { 212 lock.unlock(); 213 } 214 } 215 216 private void managedInnerCancel() { 217 Thread r = null; 218 lock.lock(); 219 try { 220 r = runner; 221 } finally { 222 lock.unlock(); 223 } 224 if (r != null) { 225 r.interrupt(); 226 } 227 } 228 229 boolean innerCancel(boolean mayInterruptIfRunning) { 230 lock.lock(); 231 try { 232 int s = state; 233 if (ranOrCancelled(s)) return false; 234 setState(CANCELLED); 235 } finally { 236 lock.unlock(); 237 } 238 lock.lock(); 239 try { 240 if (mayInterruptIfRunning) { 241 managedInnerCancel(); 242 } 243 tryReleaseShared(0); 244 } finally { 245 lock.unlock(); 246 } 247 done(); 248 return true; 249 } 250 251 void innerRun() { 252 if (!compareAndSetState(0, RUNNING)) return; 253 254 try { 255 boolean isRunning = false; 256 lock.lock(); 257 try { 258 isRunning = state == RUNNING; 259 if (isRunning) { 260 runner = Thread.currentThread(); 261 proxyRunner = runner.toString(); 262 } 263 } finally { 264 lock.unlock(); 265 } 266 if (isRunning) { 267 Object o = callable.call(); 268 lock.lock(); 269 try { 270 managedInnerSet(o); 271 } finally { 272 lock.unlock(); 273 } 274 } else { 275 lock.lock(); 276 try { 277 managedTryReleaseShared(); 278 } finally { 279 lock.unlock(); 280 } 281 } 282 } catch (Throwable ex) { 283 innerSetException(ex); 284 } 285 } 286 287 boolean innerRunAndReset() { 288 if (!compareAndSetState(0, RUNNING)) return false; 289 try { 290 runner = Thread.currentThread(); 291 callable.call(); 292 runner = null; 293 return compareAndSetState(RUNNING, 0); 294 } catch (Throwable ex) { 295 innerSetException(ex); 296 return false; 297 } 298 } 299 } 300 } 301 | Popular Tags |