1 20 21 package com.Ostermiller.util; 22 23 import java.util.*; 24 25 59 public class Parallelizer 60 { 61 68 public static final int INFINITE_THREAD_LIMIT = 0; 69 70 74 private int concurrentThreadLimit = INFINITE_THREAD_LIMIT; 75 76 82 public Parallelizer(){ 83 this(INFINITE_THREAD_LIMIT); 84 } 85 86 101 public Parallelizer(int concurrentThreadLimit){ 102 if (concurrentThreadLimit < INFINITE_THREAD_LIMIT) throw new IllegalArgumentException ("Bad concurrent thread limit: " + concurrentThreadLimit); 103 this.concurrentThreadLimit = concurrentThreadLimit; 104 } 105 106 111 private HashSet<Thread > runningThreads = new HashSet<Thread >(); 112 113 116 private LinkedList<Thread > toRunQueue = new LinkedList<Thread >(); 117 118 134 public void run(Runnable job){ 135 run(null, job, null, 0); 136 } 137 138 155 public void run(Runnable job, String threadName){ 156 run(null, job, threadName, 0); 157 } 158 159 176 public void run(ThreadGroup threadGroup, Runnable job){ 177 run(threadGroup, job, null, 0); 178 } 179 180 198 public void run(ThreadGroup threadGroup, Runnable job, String threadName){ 199 run(threadGroup, job, threadName, 0); 200 } 201 202 221 public void run(ThreadGroup threadGroup, final Runnable job, String threadName, long stackSize){ 222 throwFirstError(); 223 224 Runnable jobWrapper = new Runnable (){ 225 public void run(){ 226 try { 227 job.run(); 228 } catch (RuntimeException runtimeException){ 229 synchronized(runningThreads){ 231 exceptionList.add(runtimeException); 232 } 233 } catch (Error error){ 234 synchronized(runningThreads){ 236 errorList.add(error); 237 } 238 } finally { 239 synchronized(runningThreads){ 240 runningThreads.remove(Thread.currentThread()); 243 runningThreads.notifyAll(); 245 } 246 startAJobIfNeeded(); 249 } 250 } 251 }; 252 253 threadName = getNextThreadName(threadName); 255 256 synchronized(runningThreads){ 258 toRunQueue.add( 259 new Thread ( 260 threadGroup, 261 jobWrapper, 262 threadName, 263 stackSize 264 ) 265 ); 266 } 267 268 startAJobIfNeeded(); 271 } 272 273 276 private static int threadNameCount = 0; 277 278 285 private static String getNextThreadName(String threadName){ 286 if (threadName != null) return threadName; 287 return "Parallelizer-"+(threadNameCount++); 288 } 289 290 293 private LinkedList<RuntimeException > exceptionList = new LinkedList<RuntimeException >(); 294 295 300 private void throwFirstException(){ 301 synchronized(runningThreads){ 302 if (exceptionList.size() > 0){ 303 throw (RuntimeException )exceptionList.removeFirst(); 304 } 305 } 306 } 307 308 311 private LinkedList<Error > errorList = new LinkedList<Error >(); 312 313 318 private void throwFirstError() throws Error { 319 synchronized(runningThreads){ 320 if (errorList.size() > 0){ 321 throw (Error )errorList.removeFirst(); 322 } 323 } 324 } 325 326 333 private void startAJobIfNeeded(){ 334 synchronized(runningThreads){ 335 if (concurrentThreadLimit != INFINITE_THREAD_LIMIT){ 337 if (runningThreads.size() >= concurrentThreadLimit) return; 338 } 339 340 if (toRunQueue.size() == 0) return; 342 343 Thread thread = (Thread )toRunQueue.removeFirst(); 345 346 runningThreads.add(thread); 348 thread.start(); 349 } 350 } 351 352 366 public boolean done(){ 367 throwFirstError(); 368 synchronized(runningThreads){ 369 return (toRunQueue.size() + runningThreads.size()) == 0; 370 } 371 } 372 373 387 public void interrupt(){ 388 throwFirstError(); 389 synchronized(runningThreads){ 390 for (Iterator<Thread > i=runningThreads.iterator(); i.hasNext();){ 391 ((Thread )i.next()).interrupt(); 392 throwFirstError(); 393 } 394 } 395 } 396 397 409 public void dumpStack(){ 410 throwFirstError(); 411 synchronized(runningThreads){ 412 for (Iterator<Thread > i=runningThreads.iterator(); i.hasNext();){ 413 ((Thread )i.next()).dumpStack(); 414 throwFirstError(); 415 } 416 } 417 } 418 419 434 public Thread [] getRunningThreads(){ 435 throwFirstError(); 436 synchronized(runningThreads){ 437 return (Thread [])runningThreads.toArray(new Thread [0]); 438 } 439 } 440 441 456 public void join() throws InterruptedException { 457 while (!done()){ 458 synchronized(runningThreads){ 459 throwFirstException(); 460 runningThreads.wait(); 461 throwFirstError(); 462 throwFirstException(); 463 } 464 } 465 } 466 } 467 | Popular Tags |