1 18 package org.apache.tools.ant.taskdefs; 19 20 import java.lang.reflect.Method ; 21 import java.util.Enumeration ; 22 import java.util.Vector ; 23 import java.util.List ; 24 import java.util.ArrayList ; 25 import org.apache.tools.ant.BuildException; 26 import org.apache.tools.ant.Location; 27 import org.apache.tools.ant.Task; 28 import org.apache.tools.ant.TaskContainer; 29 import org.apache.tools.ant.util.StringUtils; 30 31 49 public class Parallel extends Task 50 implements TaskContainer { 51 52 53 public static class TaskList implements TaskContainer { 54 55 private List tasks = new ArrayList (); 56 57 63 public void addTask(Task nestedTask) { 64 tasks.add(nestedTask); 65 } 66 } 67 68 69 private Vector nestedTasks = new Vector (); 70 71 72 private final Object semaphore = new Object (); 73 74 75 private int numThreads = 0; 76 77 78 private int numThreadsPerProcessor = 0; 79 80 81 private long timeout; 82 83 84 private volatile boolean stillRunning; 85 86 87 private boolean timedOut; 88 89 93 private boolean failOnAny; 94 95 96 private TaskList daemonTasks; 97 98 99 private StringBuffer exceptionMessage; 100 101 102 private int numExceptions = 0; 103 104 105 private Throwable firstException; 106 107 108 private Location firstLocation; 109 110 114 public void addDaemons(TaskList daemonTasks) { 115 if (this.daemonTasks != null) { 116 throw new BuildException("Only one daemon group is supported"); 117 } 118 this.daemonTasks = daemonTasks; 119 } 120 121 127 public void setPollInterval(int pollInterval) { 128 } 129 130 138 public void setFailOnAny(boolean failOnAny) { 139 this.failOnAny = failOnAny; 140 } 141 142 146 public void addTask(Task nestedTask) { 147 nestedTasks.addElement(nestedTask); 148 } 149 150 161 public void setThreadsPerProcessor(int numThreadsPerProcessor) { 162 this.numThreadsPerProcessor = numThreadsPerProcessor; 163 } 164 165 176 public void setThreadCount(int numThreads) { 177 this.numThreads = numThreads; 178 } 179 180 189 public void setTimeout(long timeout) { 190 this.timeout = timeout; 191 } 192 193 194 195 200 public void execute() throws BuildException { 201 updateThreadCounts(); 202 if (numThreads == 0) { 203 numThreads = nestedTasks.size(); 204 } 205 spinThreads(); 206 } 207 208 211 private void updateThreadCounts() { 212 if (numThreadsPerProcessor != 0) { 213 int numProcessors = getNumProcessors(); 214 if (numProcessors != 0) { 215 numThreads = numProcessors * numThreadsPerProcessor; 216 } 217 } 218 } 219 220 private void processExceptions(TaskRunnable[] runnables) { 221 if (runnables == null) { 222 return; 223 } 224 for (int i = 0; i < runnables.length; ++i) { 225 Throwable t = runnables[i].getException(); 226 if (t != null) { 227 numExceptions++; 228 if (firstException == null) { 229 firstException = t; 230 } 231 if (t instanceof BuildException 232 && firstLocation == Location.UNKNOWN_LOCATION) { 233 firstLocation = ((BuildException) t).getLocation(); 234 } 235 exceptionMessage.append(StringUtils.LINE_SEP); 236 exceptionMessage.append(t.getMessage()); 237 } 238 } 239 } 240 241 246 private void spinThreads() throws BuildException { 247 final int numTasks = nestedTasks.size(); 248 TaskRunnable[] runnables = new TaskRunnable[numTasks]; 249 stillRunning = true; 250 timedOut = false; 251 252 int threadNumber = 0; 253 for (Enumeration e = nestedTasks.elements(); e.hasMoreElements(); 254 threadNumber++) { 255 Task nestedTask = (Task) e.nextElement(); 256 runnables[threadNumber] 257 = new TaskRunnable(nestedTask); 258 } 259 260 final int maxRunning = numTasks < numThreads ? numTasks : numThreads; 261 TaskRunnable[] running = new TaskRunnable[maxRunning]; 262 263 threadNumber = 0; 264 ThreadGroup group = new ThreadGroup ("parallel"); 265 266 TaskRunnable[] daemons = null; 267 if (daemonTasks != null && daemonTasks.tasks.size() != 0) { 268 daemons = new TaskRunnable[daemonTasks.tasks.size()]; 269 } 270 271 synchronized (semaphore) { 272 } 278 279 synchronized (semaphore) { 280 if (daemons != null) { 282 for (int i = 0; i < daemons.length; ++i) { 283 daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i)); 284 Thread daemonThread = new Thread (group, daemons[i]); 285 daemonThread.setDaemon(true); 286 daemonThread.start(); 287 } 288 } 289 290 for (int i = 0; i < maxRunning; ++i) { 293 running[i] = runnables[threadNumber++]; 294 Thread thread = new Thread (group, running[i]); 295 thread.start(); 296 } 297 298 if (timeout != 0) { 299 Thread timeoutThread = new Thread () { 301 public synchronized void run() { 302 try { 303 wait(timeout); 304 synchronized (semaphore) { 305 stillRunning = false; 306 timedOut = true; 307 semaphore.notifyAll(); 308 } 309 } catch (InterruptedException e) { 310 } 312 } 313 }; 314 timeoutThread.start(); 315 } 316 317 outer: 319 while (threadNumber < numTasks && stillRunning) { 320 for (int i = 0; i < maxRunning; i++) { 321 if (running[i] == null || running[i].isFinished()) { 322 running[i] = runnables[threadNumber++]; 323 Thread thread = new Thread (group, running[i]); 324 thread.start(); 325 continue outer; 328 } 329 } 330 331 try { 334 semaphore.wait(); 335 } catch (InterruptedException ie) { 336 } 340 } 341 342 outer2: 344 while (stillRunning) { 345 for (int i = 0; i < maxRunning; ++i) { 346 if (running[i] != null && !running[i].isFinished()) { 347 try { 350 semaphore.wait(); 351 } catch (InterruptedException ie) { 352 } 354 continue outer2; 355 } 356 } 357 stillRunning = false; 358 } 359 } 360 361 if (timedOut) { 362 throw new BuildException("Parallel execution timed out"); 363 } 364 365 exceptionMessage = new StringBuffer (); 367 numExceptions = 0; 368 firstException = null; 369 firstLocation = Location.UNKNOWN_LOCATION; 370 processExceptions(daemons); 371 processExceptions(runnables); 372 373 if (numExceptions == 1) { 374 if (firstException instanceof BuildException) { 375 throw (BuildException) firstException; 376 } else { 377 throw new BuildException(firstException); 378 } 379 } else if (numExceptions > 1) { 380 throw new BuildException(exceptionMessage.toString(), 381 firstLocation); 382 } 383 } 384 385 390 private int getNumProcessors() { 391 try { 392 Class [] paramTypes = {}; 393 Method availableProcessors = 394 Runtime .class.getMethod("availableProcessors", paramTypes); 395 396 Object [] args = {}; 397 Integer ret = (Integer ) availableProcessors.invoke(Runtime.getRuntime(), args); 398 return ret.intValue(); 399 } catch (Exception e) { 400 return 0; 402 } 403 } 404 405 408 private class TaskRunnable implements Runnable { 409 private Throwable exception; 410 private Task task; 411 private boolean finished; 412 413 418 TaskRunnable(Task task) { 419 this.task = task; 420 } 421 422 426 public void run() { 427 try { 428 task.perform(); 429 } catch (Throwable t) { 430 exception = t; 431 if (failOnAny) { 432 stillRunning = false; 433 } 434 } finally { 435 synchronized (semaphore) { 436 finished = true; 437 semaphore.notifyAll(); 438 } 439 } 440 } 441 442 446 public Throwable getException() { 447 return exception; 448 } 449 450 454 boolean isFinished() { 455 return finished; 456 } 457 } 458 459 } 460 | Popular Tags |