1 17 18 21 package org.quartz.simpl; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.quartz.SchedulerConfigException; 26 import org.quartz.spi.ThreadPool; 27 28 import java.util.Iterator ; 29 import java.util.LinkedList ; 30 import java.util.List ; 31 32 51 public class SimpleThreadPool implements ThreadPool { 52 53 60 61 private int count = -1; 62 63 private int prio = Thread.NORM_PRIORITY; 64 65 private boolean isShutdown = false; 66 private boolean handoffPending = false; 67 68 private boolean inheritLoader = false; 69 70 private boolean inheritGroup = true; 71 72 private boolean makeThreadsDaemons = false; 73 74 private ThreadGroup threadGroup; 75 76 private final Object nextRunnableLock = new Object (); 77 78 private List workers; 79 private LinkedList availWorkers = new LinkedList (); 80 private LinkedList busyWorkers = new LinkedList (); 81 82 private String threadNamePrefix = "SimpleThreadPoolWorker"; 83 84 private final Log log = LogFactory.getLog(getClass()); 85 86 93 94 102 public SimpleThreadPool() { 103 } 104 105 119 public SimpleThreadPool(int threadCount, int threadPriority) { 120 setThreadCount(threadCount); 121 setThreadPriority(threadPriority); 122 } 123 124 131 132 public Log getLog() { 133 return log; 134 } 135 136 public int getPoolSize() { 137 return getThreadCount(); 138 } 139 140 146 public void setThreadCount(int count) { 147 this.count = count; 148 } 149 150 155 public int getThreadCount() { 156 return count; 157 } 158 159 165 public void setThreadPriority(int prio) { 166 this.prio = prio; 167 } 168 169 174 public int getThreadPriority() { 175 return prio; 176 } 177 178 public void setThreadNamePrefix(String prfx) { 179 this.threadNamePrefix = prfx; 180 } 181 182 public String getThreadNamePrefix() { 183 return threadNamePrefix; 184 } 185 186 190 public boolean isThreadsInheritContextClassLoaderOfInitializingThread() { 191 return inheritLoader; 192 } 193 194 199 public void setThreadsInheritContextClassLoaderOfInitializingThread( 200 boolean inheritLoader) { 201 this.inheritLoader = inheritLoader; 202 } 203 204 public boolean isThreadsInheritGroupOfInitializingThread() { 205 return inheritGroup; 206 } 207 208 public void setThreadsInheritGroupOfInitializingThread( 209 boolean inheritGroup) { 210 this.inheritGroup = inheritGroup; 211 } 212 213 214 217 public boolean isMakeThreadsDaemons() { 218 return makeThreadsDaemons; 219 } 220 221 225 public void setMakeThreadsDaemons(boolean makeThreadsDaemons) { 226 this.makeThreadsDaemons = makeThreadsDaemons; 227 } 228 229 public void initialize() throws SchedulerConfigException { 230 231 if (count <= 0) { 232 throw new SchedulerConfigException( 233 "Thread count must be > 0"); 234 } 235 if (prio <= 0 || prio > 9) { 236 throw new SchedulerConfigException( 237 "Thread priority must be > 0 and <= 9"); 238 } 239 240 if(isThreadsInheritGroupOfInitializingThread()) { 241 threadGroup = Thread.currentThread().getThreadGroup(); 242 } else { 243 threadGroup = Thread.currentThread().getThreadGroup(); 245 ThreadGroup parent = threadGroup; 246 while ( !parent.getName().equals("main") ) { 247 threadGroup = parent; 248 parent = threadGroup.getParent(); 249 } 250 threadGroup = new ThreadGroup (parent, "SimpleThreadPool"); 251 if (isMakeThreadsDaemons()) { 252 threadGroup.setDaemon(true); 253 } 254 } 255 256 257 if (isThreadsInheritContextClassLoaderOfInitializingThread()) { 258 getLog().info( 259 "Job execution threads will use class loader of thread: " 260 + Thread.currentThread().getName()); 261 } 262 263 Iterator workerThreads = createWorkerThreads(count).iterator(); 265 while(workerThreads.hasNext()) { 266 WorkerThread wt = (WorkerThread) workerThreads.next(); 267 wt.start(); 268 availWorkers.add(wt); 269 } 270 } 271 272 protected List createWorkerThreads(int count) { 273 workers = new LinkedList (); 274 for (int i = 1; i<= count; ++i) { 275 WorkerThread wt = new WorkerThread(this, threadGroup, 276 getThreadNamePrefix() + "-" + i, 277 getThreadPriority(), 278 isMakeThreadsDaemons()); 279 if (isThreadsInheritContextClassLoaderOfInitializingThread()) { 280 wt.setContextClassLoader(Thread.currentThread() 281 .getContextClassLoader()); 282 } 283 workers.add(wt); 284 } 285 286 return workers; 287 } 288 289 298 public void shutdown() { 299 shutdown(true); 300 } 301 302 311 public void shutdown(boolean waitForJobsToComplete) { 312 313 synchronized (nextRunnableLock) { 314 isShutdown = true; 315 316 Iterator workerThreads = workers.iterator(); 318 while(workerThreads.hasNext()) { 319 WorkerThread wt = (WorkerThread) workerThreads.next(); 320 wt.shutdown(); 321 availWorkers.remove(wt); 322 } 323 324 nextRunnableLock.notifyAll(); 328 329 if (waitForJobsToComplete == true) { 330 331 while(handoffPending) 333 try { nextRunnableLock.wait(100); } catch(Throwable t) {} 334 335 while (busyWorkers.size() > 0) { 337 WorkerThread wt = (WorkerThread) busyWorkers.getFirst(); 338 try { 339 getLog().debug( 340 "Waiting for thread " + wt.getName() 341 + " to shut down"); 342 343 nextRunnableLock.wait(2000); 346 } catch (InterruptedException ex) { 347 } 348 } 349 350 int activeCount = threadGroup.activeCount(); 351 if (activeCount > 0) { 352 getLog().info( 353 "There are still " + activeCount + " worker threads active." 354 + " See javadoc runInThread(Runnable) for a possible explanation"); 355 } 356 357 getLog().debug("shutdown complete"); 358 } 359 } 360 } 361 362 373 public boolean runInThread(Runnable runnable) { 374 if (runnable == null) { 375 return false; 376 } 377 378 synchronized (nextRunnableLock) { 379 380 handoffPending = true; 381 382 while ((availWorkers.size() < 1) && !isShutdown) { 384 try { 385 nextRunnableLock.wait(500); 386 } catch (InterruptedException ignore) { 387 } 388 } 389 390 if (!isShutdown) { 391 WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); 392 busyWorkers.add(wt); 393 wt.run(runnable); 394 } 395 else { 396 WorkerThread wt = new WorkerThread(this, threadGroup, 399 "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); 400 busyWorkers.add(wt); 401 workers.add(wt); 402 wt.start(); 403 } 404 nextRunnableLock.notifyAll(); 405 handoffPending = false; 406 } 407 408 return true; 409 } 410 411 public int blockForAvailableThreads() { 412 synchronized(nextRunnableLock) { 413 414 while((availWorkers.size() < 1 || handoffPending) && !isShutdown) { 415 try { 416 nextRunnableLock.wait(500); 417 } catch (InterruptedException ignore) { 418 } 419 } 420 421 return availWorkers.size(); 422 } 423 } 424 425 protected void makeAvailable(WorkerThread wt) { 426 synchronized(nextRunnableLock) { 427 if(!isShutdown) 428 availWorkers.add(wt); 429 busyWorkers.remove(wt); 430 nextRunnableLock.notifyAll(); 431 } 432 } 433 434 441 442 447 class WorkerThread extends Thread { 448 449 private boolean run = true; 451 452 private SimpleThreadPool tp; 453 454 private Runnable runnable = null; 455 456 463 WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name, 464 int prio, boolean isDaemon) { 465 466 this(tp, threadGroup, name, prio, isDaemon, null); 467 } 468 469 475 WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name, 476 int prio, boolean isDaemon, Runnable runnable) { 477 478 super(threadGroup, name); 479 this.tp = tp; 480 this.runnable = runnable; 481 setPriority(prio); 482 setDaemon(isDaemon); 483 } 484 485 490 void shutdown() { 491 run = false; 492 493 } 499 500 public void run(Runnable newRunnable) { 501 synchronized(this) { 502 if(runnable != null) 503 throw new IllegalStateException ("Already running a Runnable!"); 504 505 runnable = newRunnable; 506 this.notifyAll(); 507 } 508 } 509 510 515 public void run() { 516 boolean runOnce = (runnable != null); 517 518 boolean ran = false; 519 while (run) { 520 try { 521 synchronized(this) { 522 while (runnable == null && run) { 523 this.wait(500); 524 } 525 } 526 527 if (runnable != null) { 528 ran = true; 529 runnable.run(); 530 } 531 } catch (InterruptedException unblock) { 532 try { 534 getLog().error("worker threat got 'interrupt'ed.", unblock); 535 } catch(Exception e) { 536 } 538 } catch (Exception exceptionInRunnable) { 539 try { 540 getLog().error("Error while executing the Runnable: ", 541 exceptionInRunnable); 542 } catch(Exception e) { 543 } 545 } finally { 546 runnable = null; 547 if(getPriority() != tp.getThreadPriority()) 549 setPriority(tp.getThreadPriority()); 550 551 if (runOnce) { 552 run = false; 553 } 554 else if(ran) { 555 ran = false; 556 makeAvailable(this); 557 } 558 559 } 560 } 561 562 try { 564 getLog().debug("WorkerThread is shutting down"); 565 } catch(Exception e) { 566 } 568 } 569 } 570 } 571 | Popular Tags |