KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > turbine > services > schedule > TurbineSchedulerService


1 package org.apache.turbine.services.schedule;
2
3 /*
4  * Copyright 2001-2004 The Apache Software Foundation.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License")
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 import java.util.Iterator JavaDoc;
20 import java.util.List JavaDoc;
21
22 import javax.servlet.ServletConfig JavaDoc;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import org.apache.torque.TorqueException;
28 import org.apache.torque.util.Criteria;
29
30 import org.apache.turbine.services.InitializationException;
31 import org.apache.turbine.services.TurbineBaseService;
32 import org.apache.turbine.util.TurbineException;
33
34 /**
35  * Service for a cron like scheduler.
36  *
37  * @author <a HREF="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
38  * @author <a HREF="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
39  * @version $Id: TurbineSchedulerService.java,v 1.15.2.2 2004/05/20 03:06:49 seade Exp $
40  */

41 public class TurbineSchedulerService
42         extends TurbineBaseService
43         implements ScheduleService
44 {
45     /** Logging */
46     private static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
47
48     /** The queue */
49     protected JobQueue scheduleQueue = null;
50
51     /** Current status of the scheduler */
52     private boolean enabled = false;
53
54     /** The main loop for starting jobs. */
55     protected MainLoop mainLoop;
56
57     /** The thread used to process commands. */
58     protected Thread JavaDoc thread;
59
60     /**
61      * Creates a new instance.
62      */

63     public TurbineSchedulerService()
64     {
65         mainLoop = null;
66         thread = null;
67     }
68
69     /**
70      * Initializes the SchedulerService.
71      *
72      * @throws InitializationException Something went wrong in the init
73      * stage
74      */

75     public void init()
76             throws InitializationException
77     {
78         try
79         {
80             setEnabled(getConfiguration().getBoolean("enabled", true));
81             scheduleQueue = new JobQueue();
82             mainLoop = new MainLoop();
83
84             // Load all from cold storage.
85
List JavaDoc jobs = JobEntryPeer.doSelect(new Criteria());
86
87             if (jobs != null && jobs.size() > 0)
88             {
89                 Iterator JavaDoc it = jobs.iterator();
90                 while (it.hasNext())
91                 {
92                     ((JobEntry) it.next()).calcRunTime();
93                 }
94                 scheduleQueue.batchLoad(jobs);
95
96                 restart();
97             }
98
99             setInit(true);
100         }
101         catch (Exception JavaDoc e)
102         {
103             String JavaDoc errorMessage = "Could not initialize the scheduler service";
104             log.error(errorMessage, e);
105             throw new InitializationException(errorMessage, e);
106         }
107     }
108
109     /**
110      * Called the first time the Service is used.<br>
111      *
112      * Load all the jobs from cold storage. Add jobs to the queue
113      * (sorted in ascending order by runtime) and start the scheduler
114      * thread.
115      *
116      * @param config A ServletConfig.
117      * @deprecated use init() instead.
118      */

119     public void init(ServletConfig JavaDoc config) throws InitializationException
120     {
121         init();
122     }
123
124     /**
125      * Shutdowns the service.
126      *
127      * This methods interrupts the housekeeping thread.
128      */

129     public void shutdown()
130     {
131         if (getThread() != null)
132         {
133             getThread().interrupt();
134         }
135     }
136
137     /**
138      * Get a specific Job from Storage.
139      *
140      * @param oid The int id for the job.
141      * @return A JobEntry.
142      * @exception TurbineException job could not be retreived.
143      */

144     public JobEntry getJob(int oid)
145             throws TurbineException
146     {
147         try
148         {
149             JobEntry je = JobEntryPeer.retrieveByPK(oid);
150             return scheduleQueue.getJob(je);
151         }
152         catch (TorqueException e)
153         {
154             String JavaDoc errorMessage = "Error retrieving job from persistent storage.";
155             log.error(errorMessage, e);
156             throw new TurbineException(errorMessage, e);
157         }
158     }
159
160     /**
161      * Add a new job to the queue.
162      *
163      * @param je A JobEntry with the job to add.
164      * @throws TurbineException job could not be added
165      */

166     public void addJob(JobEntry je)
167             throws TurbineException
168     {
169         updateJob(je);
170     }
171
172     /**
173      * Remove a job from the queue.
174      *
175      * @param je A JobEntry with the job to remove.
176      * @exception TurbineException job could not be removed
177      */

178     public void removeJob(JobEntry je)
179             throws TurbineException
180     {
181         try
182         {
183             // First remove from DB.
184
Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
185             JobEntryPeer.doDelete(c);
186
187             // Remove from the queue.
188
scheduleQueue.remove(je);
189
190             // restart the scheduler
191
restart();
192         }
193         catch (Exception JavaDoc e)
194         {
195             String JavaDoc errorMessage = "Problem removing Scheduled Job: " + je.getTask();
196             log.error(errorMessage, e);
197             throw new TurbineException(errorMessage, e);
198         }
199     }
200
201     /**
202      * Add or update a job.
203      *
204      * @param je A JobEntry with the job to modify
205      * @throws TurbineException job could not be updated
206      */

207     public void updateJob(JobEntry je)
208             throws TurbineException
209     {
210         try
211         {
212             je.calcRunTime();
213
214             // Update the queue.
215
if (je.isNew())
216             {
217                 scheduleQueue.add(je);
218             }
219             else
220             {
221                 scheduleQueue.modify(je);
222             }
223
224             je.save();
225
226             restart();
227         }
228         catch (Exception JavaDoc e)
229         {
230             String JavaDoc errorMessage = "Problem updating Scheduled Job: " + je.getTask();
231             log.error(errorMessage, e);
232             throw new TurbineException(errorMessage, e);
233         }
234     }
235
236     /**
237      * List jobs in the queue. This is used by the scheduler UI.
238      *
239      * @return A List of jobs.
240      */

241     public List JavaDoc listJobs()
242     {
243         return scheduleQueue.list();
244     }
245
246     /**
247      * Sets the enabled status of the scheduler
248      *
249      * @param enabled
250      *
251      */

252     protected void setEnabled(boolean enabled)
253     {
254         this.enabled = enabled;
255     }
256
257     /**
258      * Determines if the scheduler service is currently enabled.
259      *
260      * @return Status of the scheduler service.
261      */

262     public boolean isEnabled()
263     {
264         return enabled;
265     }
266
267     /**
268      * Starts or restarts the scheduler if not already running.
269      */

270     public synchronized void startScheduler()
271     {
272         setEnabled(true);
273         restart();
274     }
275
276     /**
277      * Stops the scheduler if it is currently running.
278      */

279     public synchronized void stopScheduler()
280     {
281         log.info("Stopping job scheduler");
282         Thread JavaDoc thread = getThread();
283         if (thread != null)
284         {
285             thread.interrupt();
286         }
287         enabled = false;
288     }
289
290     /**
291      * Return the thread being used to process commands, or null if
292      * there is no such thread. You can use this to invoke any
293      * special methods on the thread, for example, to interrupt it.
294      *
295      * @return A Thread.
296      */

297     public synchronized Thread JavaDoc getThread()
298     {
299         return thread;
300     }
301
302     /**
303      * Set thread to null to indicate termination.
304      */

305     private synchronized void clearThread()
306     {
307         thread = null;
308     }
309
310     /**
311      * Start (or restart) a thread to process commands, or wake up an
312      * existing thread if one is already running. This method can be
313      * invoked if the background thread crashed due to an
314      * unrecoverable exception in an executed command.
315      */

316     public synchronized void restart()
317     {
318         if (enabled)
319         {
320             log.info("Starting job scheduler");
321             if (thread == null)
322             {
323                 // Create the the housekeeping thread of the scheduler. It will wait
324
// for the time when the next task needs to be started, and then
325
// launch a worker thread to execute the task.
326
thread = new Thread JavaDoc(mainLoop, ScheduleService.SERVICE_NAME);
327                 // Indicate that this is a system thread. JVM will quit only when there
328
// are no more enabled user threads. Settings threads spawned internally
329
// by Turbine as daemons allows commandline applications using Turbine
330
// to terminate in an orderly manner.
331
thread.setDaemon(true);
332                 thread.start();
333             }
334             else
335             {
336                 notify();
337             }
338         }
339     }
340
341     /**
342      * Return the next Job to execute, or null if thread is
343      * interrupted.
344      *
345      * @return A JobEntry.
346      * @exception TurbineException a generic exception.
347      */

348     private synchronized JobEntry nextJob()
349             throws TurbineException
350     {
351         try
352         {
353             while (!Thread.interrupted())
354             {
355                 // Grab the next job off the queue.
356
JobEntry je = scheduleQueue.getNext();
357
358                 if (je == null)
359                 {
360                     // Queue must be empty. Wait on it.
361
wait();
362                 }
363                 else
364                 {
365                     long now = System.currentTimeMillis();
366                     long when = je.getNextRuntime();
367
368                     if (when > now)
369                     {
370                         // Wait till next runtime.
371
wait(when - now);
372                     }
373                     else
374                     {
375                         // Update the next runtime for the job.
376
scheduleQueue.updateQueue(je);
377                         // Return the job to run it.
378
return je;
379                     }
380                 }
381             }
382         }
383         catch (InterruptedException JavaDoc ex)
384         {
385         }
386
387         // On interrupt.
388
return null;
389     }
390
391     /**
392      * Inner class. This is isolated in its own Runnable class just
393      * so that the main class need not implement Runnable, which would
394      * allow others to directly invoke run, which is not supported.
395      */

396     protected class MainLoop
397             implements Runnable JavaDoc
398     {
399         /**
400          * Method to run the class.
401          */

402         public void run()
403         {
404             String JavaDoc taskName = null;
405             try
406             {
407                 while (enabled)
408                 {
409                     JobEntry je = nextJob();
410                     if (je != null)
411                     {
412                         taskName = je.getTask();
413
414                         // Start the thread to run the job.
415
Runnable JavaDoc wt = new WorkerThread(je);
416                         Thread JavaDoc helper = new Thread JavaDoc(wt);
417                         helper.start();
418                     }
419                     else
420                     {
421                         break;
422                     }
423                 }
424             }
425             catch (Exception JavaDoc e)
426             {
427                 log.error("Error running a Scheduled Job: " + taskName, e);
428                 enabled = false;
429             }
430             finally
431             {
432                 clearThread();
433             }
434         }
435     }
436 }
437
Popular Tags