KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > quartz > core > QuartzSchedulerThread


1
2 /*
3  * Copyright 2004-2005 OpenSymphony
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
6  * use this file except in compliance with the License. You may obtain a copy
7  * of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations
15  * under the License.
16  *
17  */

18
19 /*
20  * Previously Copyright (c) 2001-2004 James House
21  */

22 package org.quartz.core;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.quartz.JobPersistenceException;
27 import org.quartz.SchedulerException;
28 import org.quartz.Trigger;
29 import org.quartz.spi.TriggerFiredBundle;
30
31 import java.util.Random JavaDoc;
32
33 /**
34  * <p>
35  * The thread responsible for performing the work of firing <code>{@link Trigger}</code>
36  * s that are registered with the <code>{@link QuartzScheduler}</code>.
37  * </p>
38  *
39  * @see QuartzScheduler
40  * @see org.quartz.Job
41  * @see Trigger
42  *
43  * @author James House
44  */

45 public class QuartzSchedulerThread extends Thread JavaDoc {
46     /*
47      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
48      *
49      * Data members.
50      *
51      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
52      */

53     private QuartzScheduler qs;
54
55     private QuartzSchedulerResources qsRsrcs;
56
57     private Object JavaDoc pauseLock = new Object JavaDoc();
58
59     private Object JavaDoc idleLock = new Object JavaDoc();
60
61     private boolean signaled;
62
63     private boolean paused;
64
65     private boolean halted;
66
67     private SchedulingContext ctxt = null;
68
69     private Random JavaDoc random = new Random JavaDoc(System.currentTimeMillis());
70
71     // When the scheduler finds there is no current trigger to fire, how long
72
// it should wait until checking again...
73
private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
74
75     private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
76
77     private int idleWaitVariablness = 7 * 1000;
78
79     private long dbFailureRetryInterval = 15L * 1000L;
80
81     private final Log log = LogFactory.getLog(getClass());
82
83     /*
84      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
85      *
86      * Constructors.
87      *
88      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
89      */

90
91     /**
92      * <p>
93      * Construct a new <code>QuartzSchedulerThread</code> for the given
94      * <code>QuartzScheduler</code> as a non-daemon <code>Thread</code>
95      * with normal priority.
96      * </p>
97      */

98     QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
99             SchedulingContext ctxt) {
100         this(qs, qsRsrcs, ctxt, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
101     }
102
103     /**
104      * <p>
105      * Construct a new <code>QuartzSchedulerThread</code> for the given
106      * <code>QuartzScheduler</code> as a <code>Thread</code> with the given
107      * attributes.
108      * </p>
109      */

110     QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
111             SchedulingContext ctxt, boolean setDaemon, int threadPrio) {
112         super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
113         this.qs = qs;
114         this.qsRsrcs = qsRsrcs;
115         this.ctxt = ctxt;
116         this.setDaemon(setDaemon);
117         this.setPriority(threadPrio);
118
119         // start the underlying thread, but put this object into the 'paused'
120
// state
121
// so processing doesn't start yet...
122
paused = true;
123         halted = false;
124         this.start();
125     }
126
127     /*
128      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
129      *
130      * Interface.
131      *
132      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
133      */

134
135     void setIdleWaitTime(long waitTime) {
136         idleWaitTime = waitTime;
137         idleWaitVariablness = (int) (waitTime * 0.2);
138     }
139
140     private long getDbFailureRetryInterval() {
141         return dbFailureRetryInterval;
142     }
143
144     public void setDbFailureRetryInterval(long dbFailureRetryInterval) {
145         this.dbFailureRetryInterval = dbFailureRetryInterval;
146     }
147
148     private long getRandomizedIdleWaitTime() {
149         return idleWaitTime - random.nextInt(idleWaitVariablness);
150     }
151
152     /**
153      * <p>
154      * Signals the main processing loop to pause at the next possible point.
155      * </p>
156      */

157     void togglePause(boolean pause) {
158         synchronized (pauseLock) {
159             paused = pause;
160
161             if (paused) {
162                 signalSchedulingChange();
163             } else {
164                 pauseLock.notify();
165             }
166         }
167     }
168
169     /**
170      * <p>
171      * Signals the main processing loop to pause at the next possible point.
172      * </p>
173      */

174     void halt() {
175         synchronized (pauseLock) {
176             halted = true;
177
178             if (paused) {
179                 pauseLock.notify();
180             } else {
181                 signalSchedulingChange();
182             }
183         }
184     }
185
186     boolean isPaused() {
187         return paused;
188     }
189
190     /**
191      * <p>
192      * Signals the main processing loop that a change in scheduling has been
193      * made - in order to interrupt any sleeping that may be occuring while
194      * waiting for the fire time to arrive.
195      * </p>
196      */

197     void signalSchedulingChange() {
198         signaled = true;
199     }
200
201     /**
202      * <p>
203      * The main processing loop of the <code>QuartzSchedulerThread</code>.
204      * </p>
205      */

206     public void run() {
207         boolean lastAcquireFailed = false;
208         
209         while (!halted) {
210             try {
211                 // check if we're supposed to pause...
212
synchronized (pauseLock) {
213                     while (paused && !halted) {
214                         try {
215                             // wait until togglePause(false) is called...
216
pauseLock.wait(100L);
217                         } catch (InterruptedException JavaDoc ignore) {
218                         }
219                     }
220     
221                     if (halted) {
222                         break;
223                     }
224                 }
225
226                 int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
227                 if(availTreadCount > 0) {
228
229                     Trigger trigger = null;
230
231                     long now = System.currentTimeMillis();
232
233                     signaled = false;
234                     try {
235                         trigger = qsRsrcs.getJobStore().acquireNextTrigger(
236                                 ctxt, now + idleWaitTime);
237                         lastAcquireFailed = false;
238                     } catch (JobPersistenceException jpe) {
239                         if(!lastAcquireFailed) {
240                             qs.notifySchedulerListenersError(
241                                 "An error occured while scanning for the next trigger to fire.",
242                                 jpe);
243                         }
244                         lastAcquireFailed = true;
245                     } catch (RuntimeException JavaDoc e) {
246                         if(!lastAcquireFailed) {
247                             getLog().error("quartzSchedulerThreadLoop: RuntimeException "
248                                     +e.getMessage(), e);
249                         }
250                         lastAcquireFailed = true;
251                     }
252
253                     if (trigger != null) {
254
255                         now = System.currentTimeMillis();
256                         long triggerTime = trigger.getNextFireTime().getTime();
257                         long timeUntilTrigger = triggerTime - now;
258                         long spinInterval = 10;
259
260                         // this looping may seem a bit silly, but it's the
261
// current work-around
262
// for a dead-lock that can occur if the Thread.sleep()
263
// is replaced with
264
// a obj.wait() that gets notified when the signal is
265
// set...
266
// so to be able to detect the signal change without
267
// sleeping the entire
268
// timeUntilTrigger, we spin here... don't worry
269
// though, this spinning
270
// doesn't even register 0.2% cpu usage on a pentium 4.
271
int numPauses = (int) (timeUntilTrigger / spinInterval);
272                         while (numPauses >= 0 && !signaled) {
273
274                             try {
275                                 Thread.sleep(spinInterval);
276                             } catch (InterruptedException JavaDoc ignore) {
277                             }
278
279                             now = System.currentTimeMillis();
280                             timeUntilTrigger = triggerTime - now;
281                             numPauses = (int) (timeUntilTrigger / spinInterval);
282                         }
283                         if (signaled) {
284                             try {
285                                 qsRsrcs.getJobStore().releaseAcquiredTrigger(
286                                         ctxt, trigger);
287                             } catch (JobPersistenceException jpe) {
288                                 qs.notifySchedulerListenersError(
289                                         "An error occured while releasing trigger '"
290                                                 + trigger.getFullName() + "'",
291                                         jpe);
292                                 // db connection must have failed... keep
293
// retrying until it's up...
294
releaseTriggerRetryLoop(trigger);
295                             } catch (RuntimeException JavaDoc e) {
296                                 getLog().error(
297                                     "releaseTriggerRetryLoop: RuntimeException "
298                                     +e.getMessage(), e);
299                                 // db connection must have failed... keep
300
// retrying until it's up...
301
releaseTriggerRetryLoop(trigger);
302                             }
303                             signaled = false;
304                             continue;
305                         }
306
307                         // set trigger to 'executing'
308
TriggerFiredBundle bndle = null;
309
310                         synchronized(pauseLock) {
311                             if(!halted) {
312                                 try {
313                                     bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
314                                             trigger);
315                                 } catch (SchedulerException se) {
316                                     qs.notifySchedulerListenersError(
317                                             "An error occured while firing trigger '"
318                                                     + trigger.getFullName() + "'", se);
319                                 } catch (RuntimeException JavaDoc e) {
320                                     getLog().error(
321                                         "RuntimeException while firing trigger " +
322                                         trigger.getFullName(), e);
323                                     // db connection must have failed... keep
324
// retrying until it's up...
325
releaseTriggerRetryLoop(trigger);
326                                 }
327                             }
328
329                             // it's possible to get 'null' if the trigger was paused,
330
// blocked, or other similar occurances that prevent it being
331
// fired at this time... or if the scheduler was shutdown (halted)
332
if (bndle == null) {
333                                 try {
334                                     qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
335                                             trigger);
336                                 } catch (SchedulerException se) {
337                                     qs.notifySchedulerListenersError(
338                                             "An error occured while releasing trigger '"
339                                                     + trigger.getFullName() + "'", se);
340                                     // db connection must have failed... keep retrying
341
// until it's up...
342
releaseTriggerRetryLoop(trigger);
343                                 }
344                                 continue;
345                             }
346
347                             // TODO: improvements:
348
//
349
// 2- make sure we can get a job runshell before firing trigger, or
350
// don't let that throw an exception (right now it never does,
351
// but the signature says it can).
352
// 3- acquire more triggers at a time (based on num threads available?)
353

354
355                             JobRunShell shell = null;
356                             try {
357                                 shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
358                                 shell.initialize(qs, bndle);
359                             } catch (SchedulerException se) {
360                                 try {
361                                     qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
362                                             trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
363                                 } catch (SchedulerException se2) {
364                                     qs.notifySchedulerListenersError(
365                                             "An error occured while placing job's triggers in error state '"
366                                                     + trigger.getFullName() + "'", se2);
367                                     // db connection must have failed... keep retrying
368
// until it's up...
369
errorTriggerRetryLoop(bndle);
370                                 }
371                                 continue;
372                             }
373
374                             if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
375                                 try {
376                                     // this case should never happen, as it is indicative of the
377
// scheduler being shutdown or a bug in the thread pool or
378
// a thread pool being used concurrently - which the docs
379
// say not to do...
380
getLog().error("ThreadPool.runInThread() return false!");
381                                     qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
382                                             trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
383                                 } catch (SchedulerException se2) {
384                                     qs.notifySchedulerListenersError(
385                                             "An error occured while placing job's triggers in error state '"
386                                                     + trigger.getFullName() + "'", se2);
387                                     // db connection must have failed... keep retrying
388
// until it's up...
389
releaseTriggerRetryLoop(trigger);
390                                 }
391                             }
392                         }
393
394                         continue;
395                     }
396                 } else { // if(availTreadCount > 0)
397
continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
398
}
399
400                 // this looping may seem a bit silly, but it's the current
401
// work-around
402
// for a dead-lock that can occur if the Thread.sleep() is replaced
403
// with
404
// a obj.wait() that gets notified when the signal is set...
405
// so to be able to detect the signal change without sleeping the
406
// entier
407
// getRandomizedIdleWaitTime(), we spin here... don't worry though,
408
// the
409
// CPU usage of this spinning can't even be measured on a pentium
410
// 4.
411
long now = System.currentTimeMillis();
412                 long waitTime = now + getRandomizedIdleWaitTime();
413                 long timeUntilContinue = waitTime - now;
414                 long spinInterval = 10;
415                 int numPauses = (int) (timeUntilContinue / spinInterval);
416     
417                 while (numPauses > 0 && !signaled) {
418     
419                     try {
420                         Thread.sleep(10L);
421                     } catch (InterruptedException JavaDoc ignore) {
422                     }
423     
424                     now = System.currentTimeMillis();
425                     timeUntilContinue = waitTime - now;
426                     numPauses = (int) (timeUntilContinue / spinInterval);
427                 }
428             } catch(RuntimeException JavaDoc re) {
429                 getLog().error("Runtime error occured in main trigger firing loop.", re);
430             }
431         } // loop...
432

433         // drop references to scheduler stuff to aid garbage collection...
434
qs = null;
435         qsRsrcs = null;
436     }
437
438     public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
439         int retryCount = 0;
440         try {
441             while (!halted) {
442                 try {
443                     Thread.sleep(getDbFailureRetryInterval()); // retry every N
444
// seconds (the db
445
// connection must
446
// be failed)
447
retryCount++;
448                     qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
449                             bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
450                     retryCount = 0;
451                     break;
452                 } catch (JobPersistenceException jpe) {
453                     if(retryCount % 4 == 0) {
454                         qs.notifySchedulerListenersError(
455                             "An error occured while releasing trigger '"
456                                     + bndle.getTrigger().getFullName() + "'", jpe);
457                     }
458                 } catch (RuntimeException JavaDoc e) {
459                     getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
460                 } catch (InterruptedException JavaDoc e) {
461                     getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
462                 }
463             }
464         } finally {
465             if(retryCount == 0) {
466                 getLog().info("releaseTriggerRetryLoop: connection restored.");
467             }
468         }
469     }
470     
471     public void releaseTriggerRetryLoop(Trigger trigger) {
472         int retryCount = 0;
473         try {
474             while (!halted) {
475                 try {
476                     Thread.sleep(getDbFailureRetryInterval()); // retry every N
477
// seconds (the db
478
// connection must
479
// be failed)
480
retryCount++;
481                     qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger);
482                     retryCount = 0;
483                     break;
484                 } catch (JobPersistenceException jpe) {
485                     if(retryCount % 4 == 0) {
486                         qs.notifySchedulerListenersError(
487                             "An error occured while releasing trigger '"
488                                     + trigger.getFullName() + "'", jpe);
489                     }
490                 } catch (RuntimeException JavaDoc e) {
491                     getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
492                 } catch (InterruptedException JavaDoc e) {
493                     getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
494                 }
495             }
496         } finally {
497             if(retryCount == 0) {
498                 getLog().info("releaseTriggerRetryLoop: connection restored.");
499             }
500         }
501     }
502     
503     public Log getLog() {
504         return log;
505     }
506
507 } // end of QuartzSchedulerThread
508
Popular Tags