KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > riotfamily > riot > job > JobManager


1 /* ***** BEGIN LICENSE BLOCK *****
2  * Version: MPL 1.1
3  * The contents of this file are subject to the Mozilla Public License Version
4  * 1.1 (the "License"); you may not use this file except in compliance with
5  * the License. You may obtain a copy of the License at
6  * http://www.mozilla.org/MPL/
7  *
8  * Software distributed under the License is distributed on an "AS IS" basis,
9  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
10  * for the specific language governing rights and limitations under the
11  * License.
12  *
13  * The Original Code is Riot.
14  *
15  * The Initial Developer of the Original Code is
16  * Neteye GmbH.
17  * Portions created by the Initial Developer are Copyright (C) 2006
18  * the Initial Developer. All Rights Reserved.
19  *
20  * Contributor(s):
21  * Felix Gnass [fgnass at neteye dot de]
22  *
23  * ***** END LICENSE BLOCK ***** */

24 package org.riotfamily.riot.job;
25
26 import java.util.Collection JavaDoc;
27 import java.util.Date JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Map JavaDoc;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.riotfamily.riot.job.persistence.JobDao;
34 import org.riotfamily.riot.job.persistence.JobDetail;
35 import org.riotfamily.riot.job.persistence.JobLogEntry;
36 import org.riotfamily.riot.job.support.ExecutionTimeUpdater;
37 import org.riotfamily.riot.job.support.JobTask;
38 import org.riotfamily.riot.job.support.TaskList;
39 import org.riotfamily.riot.job.ui.JobUIUpdater;
40 import org.springframework.beans.factory.DisposableBean;
41 import org.springframework.context.ApplicationContext;
42 import org.springframework.context.ApplicationContextAware;
43 import org.springframework.core.task.SimpleAsyncTaskExecutor;
44
45 public class JobManager implements ApplicationContextAware, DisposableBean {
46
47     private static final String JavaDoc THREAD_NAME_PREFIX = "JobThread";
48     
49     private static Log log = LogFactory.getLog(JobManager.class);
50     
51     private Map JavaDoc jobs;
52     
53     JobDao dao;
54     
55     JobUIUpdater uiUpdater;
56     
57     private SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(
58             THREAD_NAME_PREFIX);
59     
60     private TaskList taskList = new TaskList();
61     
62     private ExecutionTimeUpdater executionTimeUpdater = new ExecutionTimeUpdater(taskList);
63     
64     public JobManager(JobDao dao, JobUIUpdater uiUpdater) {
65         this.dao = dao;
66         this.uiUpdater = uiUpdater;
67         taskExecutor.setThreadPriority(Thread.MIN_PRIORITY);
68         taskExecutor.execute(executionTimeUpdater);
69     }
70
71     public void setApplicationContext(ApplicationContext context) {
72         jobs = context.getBeansOfType(Job.class);
73     }
74     
75     /**
76      * Checks whether there are any pending jobs which are not being executed.
77      * This implementation assumes that all jobs are run within the same
78      * virtual machine. To work in a clustered environment, this method must
79      * be overridden to check whether a job is run by another node before it
80      * is marked as aborted.
81      */

82     protected void checkForAbortedJobs() {
83         log.info("Checking for interrupted jobs ...");
84         Collection JavaDoc pendingJobs = dao.getPendingJobDetails();
85         Iterator JavaDoc it = pendingJobs.iterator();
86         while (it.hasNext()) {
87             JobDetail jd = (JobDetail) it.next();
88             if (taskList.getJobTask(jd) == null) {
89                 log.info("Job " + jd + " is not running - marking as INTERRUPTED.");
90                 jd.setState(JobDetail.INTERRUPTED);
91                 dao.updateJobDetail(jd);
92             }
93         }
94     }
95     
96     protected Job getJob(String JavaDoc type) {
97         return (Job) jobs.get(type);
98     }
99     
100     public JobDetail getOrCreateJob(String JavaDoc type, String JavaDoc objectId)
101             throws JobCreationException {
102         
103         JobDetail detail = dao.getPendingJobDetail(type, objectId);
104         if (detail == null) {
105             detail = dao.getLastCompletedJobDetail(type, objectId);
106             if (detail == null || getJob(type).isRepeatable()) {
107                 detail = setupJob(type, objectId);
108                 log.debug("No pending job found of type " + type
109                         + " for objectId '" + objectId + "' - a new job has "
110                         + " been set up with id " + detail.getId());
111             }
112         }
113         else {
114             log.debug("Found pending job: " + detail.getId());
115         }
116         return detail;
117     }
118     
119     private JobDetail setupJob(String JavaDoc type, String JavaDoc objectId)
120             throws JobCreationException {
121         
122         int averageStepTime = dao.getAverageStepTime(type);
123         JobDetail detail = new JobDetail(type, objectId, averageStepTime);
124         dao.saveJobDetail(detail);
125         taskExecutor.execute(new JobSetupTask(detail));
126         return detail;
127     }
128         
129     public void executeJob(JobDetail detail) {
130         log.info("Executing job " + detail);
131         Job job = getJob(detail.getType());
132         JobTask task = new JobTask(job, detail, dao, uiUpdater, taskList);
133         taskExecutor.execute(task);
134     }
135     
136     public void interruptJob(JobDetail detail) {
137         log.info("Interrupting " + detail);
138         taskList.interrupt(detail);
139         JobLogEntry logEntry = new JobLogEntry(detail, "Job interrupted");
140         dao.log(logEntry);
141         uiUpdater.log(logEntry);
142     }
143     
144     public void cancelJob(JobDetail detail) {
145         log.info("Canceling " + detail);
146         taskList.interrupt(detail);
147         Job job = getJob(detail.getType());
148         job.tearDown(detail.getObjectId());
149         detail.setState(JobDetail.CANCELED);
150         detail.setEndDate(new Date JavaDoc());
151         dao.updateJobDetail(detail);
152         uiUpdater.updateJob(detail);
153         JobLogEntry logEntry = new JobLogEntry(detail, "Job canceled");
154         dao.log(logEntry);
155         uiUpdater.log(logEntry);
156     }
157     
158     public void destroy() throws Exception JavaDoc {
159         executionTimeUpdater.stop();
160         taskList.interruptAll();
161         log.debug("JobManager has been shut down.");
162     }
163     
164     private class JobSetupTask implements Runnable JavaDoc {
165
166         private JobDetail detail;
167         
168         private Job job;
169         
170         public JobSetupTask(JobDetail detail) {
171             this.detail = detail;
172             this.job = getJob(detail.getType());
173         }
174
175         public void run() {
176             detail.init(job.setup(detail.getObjectId()));
177             dao.updateJobDetail(detail);
178             uiUpdater.updateJob(detail);
179         }
180     }
181 }
182
Popular Tags