1 7 8 package java.util.concurrent; 9 10 import java.util.*; 11 12 26 public abstract class AbstractExecutorService implements ExecutorService { 27 28 public Future <?> submit(Runnable task) { 29 if (task == null) throw new NullPointerException (); 30 FutureTask <Object > ftask = new FutureTask <Object >(task, null); 31 execute(ftask); 32 return ftask; 33 } 34 35 public <T> Future <T> submit(Runnable task, T result) { 36 if (task == null) throw new NullPointerException (); 37 FutureTask <T> ftask = new FutureTask <T>(task, result); 38 execute(ftask); 39 return ftask; 40 } 41 42 public <T> Future <T> submit(Callable <T> task) { 43 if (task == null) throw new NullPointerException (); 44 FutureTask <T> ftask = new FutureTask <T>(task); 45 execute(ftask); 46 return ftask; 47 } 48 49 52 private <T> T doInvokeAny(Collection<Callable <T>> tasks, 53 boolean timed, long nanos) 54 throws InterruptedException , ExecutionException , TimeoutException { 55 if (tasks == null) 56 throw new NullPointerException (); 57 int ntasks = tasks.size(); 58 if (ntasks == 0) 59 throw new IllegalArgumentException (); 60 List<Future <T>> futures= new ArrayList<Future <T>>(ntasks); 61 ExecutorCompletionService <T> ecs = 62 new ExecutorCompletionService <T>(this); 63 64 70 try { 71 ExecutionException ee = null; 74 long lastTime = (timed)? System.nanoTime() : 0; 75 Iterator<Callable <T>> it = tasks.iterator(); 76 77 futures.add(ecs.submit(it.next())); 79 --ntasks; 80 int active = 1; 81 82 for (;;) { 83 Future <T> f = ecs.poll(); 84 if (f == null) { 85 if (ntasks > 0) { 86 --ntasks; 87 futures.add(ecs.submit(it.next())); 88 ++active; 89 } 90 else if (active == 0) 91 break; 92 else if (timed) { 93 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 94 if (f == null) 95 throw new TimeoutException (); 96 long now = System.nanoTime(); 97 nanos -= now - lastTime; 98 lastTime = now; 99 } 100 else 101 f = ecs.take(); 102 } 103 if (f != null) { 104 --active; 105 try { 106 return f.get(); 107 } catch(InterruptedException ie) { 108 throw ie; 109 } catch(ExecutionException eex) { 110 ee = eex; 111 } catch(RuntimeException rex) { 112 ee = new ExecutionException (rex); 113 } 114 } 115 } 116 117 if (ee == null) 118 ee = new ExecutionException (); 119 throw ee; 120 121 } finally { 122 for (Future <T> f : futures) 123 f.cancel(true); 124 } 125 } 126 127 public <T> T invokeAny(Collection<Callable <T>> tasks) 128 throws InterruptedException , ExecutionException { 129 try { 130 return doInvokeAny(tasks, false, 0); 131 } catch (TimeoutException cannotHappen) { 132 assert false; 133 return null; 134 } 135 } 136 137 public <T> T invokeAny(Collection<Callable <T>> tasks, 138 long timeout, TimeUnit unit) 139 throws InterruptedException , ExecutionException , TimeoutException { 140 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 141 } 142 143 public <T> List<Future <T>> invokeAll(Collection<Callable <T>> tasks) 144 throws InterruptedException { 145 if (tasks == null) 146 throw new NullPointerException (); 147 List<Future <T>> futures = new ArrayList<Future <T>>(tasks.size()); 148 boolean done = false; 149 try { 150 for (Callable <T> t : tasks) { 151 FutureTask <T> f = new FutureTask <T>(t); 152 futures.add(f); 153 execute(f); 154 } 155 for (Future <T> f : futures) { 156 if (!f.isDone()) { 157 try { 158 f.get(); 159 } catch(CancellationException ignore) { 160 } catch(ExecutionException ignore) { 161 } 162 } 163 } 164 done = true; 165 return futures; 166 } finally { 167 if (!done) 168 for (Future <T> f : futures) 169 f.cancel(true); 170 } 171 } 172 173 public <T> List<Future <T>> invokeAll(Collection<Callable <T>> tasks, 174 long timeout, TimeUnit unit) 175 throws InterruptedException { 176 if (tasks == null || unit == null) 177 throw new NullPointerException (); 178 long nanos = unit.toNanos(timeout); 179 List<Future <T>> futures = new ArrayList<Future <T>>(tasks.size()); 180 boolean done = false; 181 try { 182 for (Callable <T> t : tasks) 183 futures.add(new FutureTask <T>(t)); 184 185 long lastTime = System.nanoTime(); 186 187 Iterator<Future <T>> it = futures.iterator(); 190 while (it.hasNext()) { 191 execute((Runnable )(it.next())); 192 long now = System.nanoTime(); 193 nanos -= now - lastTime; 194 lastTime = now; 195 if (nanos <= 0) 196 return futures; 197 } 198 199 for (Future <T> f : futures) { 200 if (!f.isDone()) { 201 if (nanos <= 0) 202 return futures; 203 try { 204 f.get(nanos, TimeUnit.NANOSECONDS); 205 } catch(CancellationException ignore) { 206 } catch(ExecutionException ignore) { 207 } catch(TimeoutException toe) { 208 return futures; 209 } 210 long now = System.nanoTime(); 211 nanos -= now - lastTime; 212 lastTime = now; 213 } 214 } 215 done = true; 216 return futures; 217 } finally { 218 if (!done) 219 for (Future <T> f : futures) 220 f.cancel(true); 221 } 222 } 223 224 } 225 | Popular Tags |