1 2 18 19 22 package org.quartz.core; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.quartz.JobPersistenceException; 27 import org.quartz.SchedulerException; 28 import org.quartz.Trigger; 29 import org.quartz.spi.TriggerFiredBundle; 30 31 import java.util.Random ; 32 33 45 public class QuartzSchedulerThread extends Thread { 46 53 private QuartzScheduler qs; 54 55 private QuartzSchedulerResources qsRsrcs; 56 57 private Object pauseLock = new Object (); 58 59 private Object idleLock = new Object (); 60 61 private boolean signaled; 62 63 private boolean paused; 64 65 private boolean halted; 66 67 private SchedulingContext ctxt = null; 68 69 private Random random = new Random (System.currentTimeMillis()); 70 71 private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L; 74 75 private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME; 76 77 private int idleWaitVariablness = 7 * 1000; 78 79 private long dbFailureRetryInterval = 15L * 1000L; 80 81 private final Log log = LogFactory.getLog(getClass()); 82 83 90 91 98 QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, 99 SchedulingContext ctxt) { 100 this(qs, qsRsrcs, ctxt, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY); 101 } 102 103 110 QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, 111 SchedulingContext ctxt, boolean setDaemon, int threadPrio) { 112 super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName()); 113 this.qs = qs; 114 this.qsRsrcs = qsRsrcs; 115 this.ctxt = ctxt; 116 this.setDaemon(setDaemon); 117 this.setPriority(threadPrio); 118 119 paused = true; 123 halted = false; 124 this.start(); 125 } 126 127 134 135 void setIdleWaitTime(long waitTime) { 136 idleWaitTime = waitTime; 137 idleWaitVariablness = (int) (waitTime * 0.2); 138 } 139 140 private long getDbFailureRetryInterval() { 141 return dbFailureRetryInterval; 142 } 143 144 public void setDbFailureRetryInterval(long dbFailureRetryInterval) { 145 this.dbFailureRetryInterval = dbFailureRetryInterval; 146 } 147 148 private long getRandomizedIdleWaitTime() { 149 return idleWaitTime - random.nextInt(idleWaitVariablness); 150 } 151 152 157 void togglePause(boolean pause) { 158 synchronized (pauseLock) { 159 paused = pause; 160 161 if (paused) { 162 signalSchedulingChange(); 163 } else { 164 pauseLock.notify(); 165 } 166 } 167 } 168 169 174 void halt() { 175 synchronized (pauseLock) { 176 halted = true; 177 178 if (paused) { 179 pauseLock.notify(); 180 } else { 181 signalSchedulingChange(); 182 } 183 } 184 } 185 186 boolean isPaused() { 187 return paused; 188 } 189 190 197 void signalSchedulingChange() { 198 signaled = true; 199 } 200 201 206 public void run() { 207 boolean lastAcquireFailed = false; 208 209 while (!halted) { 210 try { 211 synchronized (pauseLock) { 213 while (paused && !halted) { 214 try { 215 pauseLock.wait(100L); 217 } catch (InterruptedException ignore) { 218 } 219 } 220 221 if (halted) { 222 break; 223 } 224 } 225 226 int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); 227 if(availTreadCount > 0) { 228 229 Trigger trigger = null; 230 231 long now = System.currentTimeMillis(); 232 233 signaled = false; 234 try { 235 trigger = qsRsrcs.getJobStore().acquireNextTrigger( 236 ctxt, now + idleWaitTime); 237 lastAcquireFailed = false; 238 } catch (JobPersistenceException jpe) { 239 if(!lastAcquireFailed) { 240 qs.notifySchedulerListenersError( 241 "An error occured while scanning for the next trigger to fire.", 242 jpe); 243 } 244 lastAcquireFailed = true; 245 } catch (RuntimeException e) { 246 if(!lastAcquireFailed) { 247 getLog().error("quartzSchedulerThreadLoop: RuntimeException " 248 +e.getMessage(), e); 249 } 250 lastAcquireFailed = true; 251 } 252 253 if (trigger != null) { 254 255 now = System.currentTimeMillis(); 256 long triggerTime = trigger.getNextFireTime().getTime(); 257 long timeUntilTrigger = triggerTime - now; 258 long spinInterval = 10; 259 260 int numPauses = (int) (timeUntilTrigger / spinInterval); 272 while (numPauses >= 0 && !signaled) { 273 274 try { 275 Thread.sleep(spinInterval); 276 } catch (InterruptedException ignore) { 277 } 278 279 now = System.currentTimeMillis(); 280 timeUntilTrigger = triggerTime - now; 281 numPauses = (int) (timeUntilTrigger / spinInterval); 282 } 283 if (signaled) { 284 try { 285 qsRsrcs.getJobStore().releaseAcquiredTrigger( 286 ctxt, trigger); 287 } catch (JobPersistenceException jpe) { 288 qs.notifySchedulerListenersError( 289 "An error occured while releasing trigger '" 290 + trigger.getFullName() + "'", 291 jpe); 292 releaseTriggerRetryLoop(trigger); 295 } catch (RuntimeException e) { 296 getLog().error( 297 "releaseTriggerRetryLoop: RuntimeException " 298 +e.getMessage(), e); 299 releaseTriggerRetryLoop(trigger); 302 } 303 signaled = false; 304 continue; 305 } 306 307 TriggerFiredBundle bndle = null; 309 310 synchronized(pauseLock) { 311 if(!halted) { 312 try { 313 bndle = qsRsrcs.getJobStore().triggerFired(ctxt, 314 trigger); 315 } catch (SchedulerException se) { 316 qs.notifySchedulerListenersError( 317 "An error occured while firing trigger '" 318 + trigger.getFullName() + "'", se); 319 } catch (RuntimeException e) { 320 getLog().error( 321 "RuntimeException while firing trigger " + 322 trigger.getFullName(), e); 323 releaseTriggerRetryLoop(trigger); 326 } 327 } 328 329 if (bndle == null) { 333 try { 334 qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, 335 trigger); 336 } catch (SchedulerException se) { 337 qs.notifySchedulerListenersError( 338 "An error occured while releasing trigger '" 339 + trigger.getFullName() + "'", se); 340 releaseTriggerRetryLoop(trigger); 343 } 344 continue; 345 } 346 347 354 355 JobRunShell shell = null; 356 try { 357 shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell(); 358 shell.initialize(qs, bndle); 359 } catch (SchedulerException se) { 360 try { 361 qsRsrcs.getJobStore().triggeredJobComplete(ctxt, 362 trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); 363 } catch (SchedulerException se2) { 364 qs.notifySchedulerListenersError( 365 "An error occured while placing job's triggers in error state '" 366 + trigger.getFullName() + "'", se2); 367 errorTriggerRetryLoop(bndle); 370 } 371 continue; 372 } 373 374 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { 375 try { 376 getLog().error("ThreadPool.runInThread() return false!"); 381 qsRsrcs.getJobStore().triggeredJobComplete(ctxt, 382 trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); 383 } catch (SchedulerException se2) { 384 qs.notifySchedulerListenersError( 385 "An error occured while placing job's triggers in error state '" 386 + trigger.getFullName() + "'", se2); 387 releaseTriggerRetryLoop(trigger); 390 } 391 } 392 } 393 394 continue; 395 } 396 } else { continue; } 399 400 long now = System.currentTimeMillis(); 412 long waitTime = now + getRandomizedIdleWaitTime(); 413 long timeUntilContinue = waitTime - now; 414 long spinInterval = 10; 415 int numPauses = (int) (timeUntilContinue / spinInterval); 416 417 while (numPauses > 0 && !signaled) { 418 419 try { 420 Thread.sleep(10L); 421 } catch (InterruptedException ignore) { 422 } 423 424 now = System.currentTimeMillis(); 425 timeUntilContinue = waitTime - now; 426 numPauses = (int) (timeUntilContinue / spinInterval); 427 } 428 } catch(RuntimeException re) { 429 getLog().error("Runtime error occured in main trigger firing loop.", re); 430 } 431 } 433 qs = null; 435 qsRsrcs = null; 436 } 437 438 public void errorTriggerRetryLoop(TriggerFiredBundle bndle) { 439 int retryCount = 0; 440 try { 441 while (!halted) { 442 try { 443 Thread.sleep(getDbFailureRetryInterval()); retryCount++; 448 qsRsrcs.getJobStore().triggeredJobComplete(ctxt, 449 bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); 450 retryCount = 0; 451 break; 452 } catch (JobPersistenceException jpe) { 453 if(retryCount % 4 == 0) { 454 qs.notifySchedulerListenersError( 455 "An error occured while releasing trigger '" 456 + bndle.getTrigger().getFullName() + "'", jpe); 457 } 458 } catch (RuntimeException e) { 459 getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e); 460 } catch (InterruptedException e) { 461 getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e); 462 } 463 } 464 } finally { 465 if(retryCount == 0) { 466 getLog().info("releaseTriggerRetryLoop: connection restored."); 467 } 468 } 469 } 470 471 public void releaseTriggerRetryLoop(Trigger trigger) { 472 int retryCount = 0; 473 try { 474 while (!halted) { 475 try { 476 Thread.sleep(getDbFailureRetryInterval()); retryCount++; 481 qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger); 482 retryCount = 0; 483 break; 484 } catch (JobPersistenceException jpe) { 485 if(retryCount % 4 == 0) { 486 qs.notifySchedulerListenersError( 487 "An error occured while releasing trigger '" 488 + trigger.getFullName() + "'", jpe); 489 } 490 } catch (RuntimeException e) { 491 getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e); 492 } catch (InterruptedException e) { 493 getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e); 494 } 495 } 496 } finally { 497 if(retryCount == 0) { 498 getLog().info("releaseTriggerRetryLoop: connection restored."); 499 } 500 } 501 } 502 503 public Log getLog() { 504 return log; 505 } 506 507 } | Popular Tags |