|                                                                                                              1
 18  package org.apache.tools.ant.taskdefs;
 19
 20  import java.lang.reflect.Method
  ; 21  import java.util.Enumeration
  ; 22  import java.util.Vector
  ; 23  import java.util.List
  ; 24  import java.util.ArrayList
  ; 25  import org.apache.tools.ant.BuildException;
 26  import org.apache.tools.ant.Location;
 27  import org.apache.tools.ant.Task;
 28  import org.apache.tools.ant.TaskContainer;
 29  import org.apache.tools.ant.util.StringUtils;
 30
 31
 49  public class Parallel extends Task
 50                        implements TaskContainer {
 51
 52
 53      public static class TaskList implements TaskContainer {
 54
 55          private List
  tasks = new ArrayList  (); 56
 57
 63          public void addTask(Task nestedTask) {
 64              tasks.add(nestedTask);
 65          }
 66      }
 67
 68
 69      private Vector
  nestedTasks = new Vector  (); 70
 71
 72      private final Object
  semaphore = new Object  (); 73
 74
 75      private int numThreads = 0;
 76
 77
 78      private int numThreadsPerProcessor = 0;
 79
 80
 81      private long timeout;
 82
 83
 84      private volatile boolean stillRunning;
 85
 86
 87      private boolean timedOut;
 88
 89
 93      private boolean failOnAny;
 94
 95
 96      private TaskList daemonTasks;
 97
 98
 99      private StringBuffer
  exceptionMessage; 100
 101
 102     private int numExceptions = 0;
 103
 104
 105     private Throwable
  firstException; 106
 107
 108     private Location firstLocation;
 109
 110
 114     public void addDaemons(TaskList daemonTasks) {
 115         if (this.daemonTasks != null) {
 116             throw new BuildException("Only one daemon group is supported");
 117         }
 118         this.daemonTasks = daemonTasks;
 119     }
 120
 121
 127     public void setPollInterval(int pollInterval) {
 128     }
 129
 130
 138     public void setFailOnAny(boolean failOnAny) {
 139         this.failOnAny = failOnAny;
 140     }
 141
 142
 146     public void addTask(Task nestedTask) {
 147         nestedTasks.addElement(nestedTask);
 148     }
 149
 150
 161     public void setThreadsPerProcessor(int numThreadsPerProcessor) {
 162         this.numThreadsPerProcessor = numThreadsPerProcessor;
 163     }
 164
 165
 176     public void setThreadCount(int numThreads) {
 177         this.numThreads = numThreads;
 178     }
 179
 180
 189     public void setTimeout(long timeout) {
 190         this.timeout = timeout;
 191     }
 192
 193
 194
 195
 200     public void execute() throws BuildException {
 201         updateThreadCounts();
 202         if (numThreads == 0) {
 203             numThreads = nestedTasks.size();
 204         }
 205         spinThreads();
 206     }
 207
 208
 211     private void updateThreadCounts() {
 212         if (numThreadsPerProcessor != 0) {
 213             int numProcessors = getNumProcessors();
 214             if (numProcessors != 0) {
 215                 numThreads = numProcessors * numThreadsPerProcessor;
 216             }
 217         }
 218     }
 219
 220     private void processExceptions(TaskRunnable[] runnables) {
 221         if (runnables == null) {
 222             return;
 223         }
 224         for (int i = 0; i < runnables.length; ++i) {
 225             Throwable
  t = runnables[i].getException(); 226             if (t != null) {
 227                 numExceptions++;
 228                 if (firstException == null) {
 229                     firstException = t;
 230                 }
 231                 if (t instanceof BuildException
 232                     && firstLocation == Location.UNKNOWN_LOCATION) {
 233                     firstLocation = ((BuildException) t).getLocation();
 234                 }
 235                 exceptionMessage.append(StringUtils.LINE_SEP);
 236                 exceptionMessage.append(t.getMessage());
 237             }
 238         }
 239     }
 240
 241
 246     private void spinThreads() throws BuildException {
 247         final int numTasks = nestedTasks.size();
 248         TaskRunnable[] runnables = new TaskRunnable[numTasks];
 249         stillRunning = true;
 250         timedOut = false;
 251
 252         int threadNumber = 0;
 253         for (Enumeration
  e = nestedTasks.elements(); e.hasMoreElements(); 254              threadNumber++) {
 255             Task nestedTask = (Task) e.nextElement();
 256             runnables[threadNumber]
 257                 = new TaskRunnable(nestedTask);
 258         }
 259
 260         final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
 261         TaskRunnable[] running = new TaskRunnable[maxRunning];
 262
 263         threadNumber = 0;
 264         ThreadGroup
  group = new ThreadGroup  ("parallel"); 265
 266         TaskRunnable[] daemons = null;
 267         if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
 268             daemons = new TaskRunnable[daemonTasks.tasks.size()];
 269         }
 270
 271         synchronized (semaphore) {
 272                                                                     }
 278
 279         synchronized (semaphore) {
 280                         if (daemons != null) {
 282                 for (int i = 0; i < daemons.length; ++i) {
 283                     daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
 284                     Thread
  daemonThread =  new Thread  (group, daemons[i]); 285                     daemonThread.setDaemon(true);
 286                     daemonThread.start();
 287                 }
 288             }
 289
 290                                     for (int i = 0; i < maxRunning; ++i) {
 293                 running[i] = runnables[threadNumber++];
 294                 Thread
  thread =  new Thread  (group, running[i]); 295                 thread.start();
 296             }
 297
 298             if (timeout != 0) {
 299                                 Thread
  timeoutThread = new Thread  () { 301                     public synchronized void run() {
 302                         try {
 303                             wait(timeout);
 304                             synchronized (semaphore) {
 305                                 stillRunning = false;
 306                                 timedOut = true;
 307                                 semaphore.notifyAll();
 308                             }
 309                         } catch (InterruptedException
  e) { 310                                                     }
 312                     }
 313                 };
 314                 timeoutThread.start();
 315             }
 316
 317                         outer:
 319             while (threadNumber < numTasks && stillRunning) {
 320                 for (int i = 0; i < maxRunning; i++) {
 321                     if (running[i] == null || running[i].isFinished()) {
 322                         running[i] = runnables[threadNumber++];
 323                         Thread
  thread =  new Thread  (group, running[i]); 324                         thread.start();
 325                                                                         continue outer;
 328                     }
 329                 }
 330
 331                                                 try {
 334                     semaphore.wait();
 335                 } catch (InterruptedException
  ie) { 336                                                                             }
 340             }
 341
 342                         outer2:
 344             while (stillRunning) {
 345                 for (int i = 0; i < maxRunning; ++i) {
 346                     if (running[i] != null && !running[i].isFinished()) {
 347                                                                         try {
 350                             semaphore.wait();
 351                         } catch (InterruptedException
  ie) { 352                                                     }
 354                         continue outer2;
 355                     }
 356                 }
 357                 stillRunning = false;
 358             }
 359         }
 360
 361         if (timedOut) {
 362             throw new BuildException("Parallel execution timed out");
 363         }
 364
 365                 exceptionMessage = new StringBuffer
  (); 367         numExceptions = 0;
 368         firstException = null;
 369         firstLocation = Location.UNKNOWN_LOCATION;
 370         processExceptions(daemons);
 371         processExceptions(runnables);
 372
 373         if (numExceptions == 1) {
 374             if (firstException instanceof BuildException) {
 375                 throw (BuildException) firstException;
 376             } else {
 377                 throw new BuildException(firstException);
 378             }
 379         } else if (numExceptions > 1) {
 380             throw new BuildException(exceptionMessage.toString(),
 381                                      firstLocation);
 382         }
 383     }
 384
 385
 390     private int getNumProcessors() {
 391         try {
 392             Class
  [] paramTypes = {}; 393             Method
  availableProcessors = 394                 Runtime
  .class.getMethod("availableProcessors", paramTypes); 395
 396             Object
  [] args = {}; 397             Integer
  ret = (Integer  ) availableProcessors.invoke(Runtime.getRuntime(), args); 398             return ret.intValue();
 399         } catch (Exception
  e) { 400                         return 0;
 402         }
 403     }
 404
 405
 408     private class TaskRunnable implements Runnable
  { 409         private Throwable
  exception; 410         private Task task;
 411         private boolean finished;
 412
 413
 418         TaskRunnable(Task task) {
 419             this.task = task;
 420         }
 421
 422
 426         public void run() {
 427             try {
 428                 task.perform();
 429             } catch (Throwable
  t) { 430                 exception = t;
 431                 if (failOnAny) {
 432                     stillRunning = false;
 433                 }
 434             } finally {
 435                 synchronized (semaphore) {
 436                     finished = true;
 437                     semaphore.notifyAll();
 438                 }
 439             }
 440         }
 441
 442
 446         public Throwable
  getException() { 447             return exception;
 448         }
 449
 450
 454         boolean isFinished() {
 455             return finished;
 456         }
 457     }
 458
 459 }
 460
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |