1 24 package org.riotfamily.riot.job; 25 26 import java.util.Collection ; 27 import java.util.Date ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.riotfamily.riot.job.persistence.JobDao; 34 import org.riotfamily.riot.job.persistence.JobDetail; 35 import org.riotfamily.riot.job.persistence.JobLogEntry; 36 import org.riotfamily.riot.job.support.ExecutionTimeUpdater; 37 import org.riotfamily.riot.job.support.JobTask; 38 import org.riotfamily.riot.job.support.TaskList; 39 import org.riotfamily.riot.job.ui.JobUIUpdater; 40 import org.springframework.beans.factory.DisposableBean; 41 import org.springframework.context.ApplicationContext; 42 import org.springframework.context.ApplicationContextAware; 43 import org.springframework.core.task.SimpleAsyncTaskExecutor; 44 45 public class JobManager implements ApplicationContextAware, DisposableBean { 46 47 private static final String THREAD_NAME_PREFIX = "JobThread"; 48 49 private static Log log = LogFactory.getLog(JobManager.class); 50 51 private Map jobs; 52 53 JobDao dao; 54 55 JobUIUpdater uiUpdater; 56 57 private SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor( 58 THREAD_NAME_PREFIX); 59 60 private TaskList taskList = new TaskList(); 61 62 private ExecutionTimeUpdater executionTimeUpdater = new ExecutionTimeUpdater(taskList); 63 64 public JobManager(JobDao dao, JobUIUpdater uiUpdater) { 65 this.dao = dao; 66 this.uiUpdater = uiUpdater; 67 taskExecutor.setThreadPriority(Thread.MIN_PRIORITY); 68 taskExecutor.execute(executionTimeUpdater); 69 } 70 71 public void setApplicationContext(ApplicationContext context) { 72 jobs = context.getBeansOfType(Job.class); 73 } 74 75 82 protected void checkForAbortedJobs() { 83 log.info("Checking for interrupted jobs ..."); 84 Collection pendingJobs = dao.getPendingJobDetails(); 85 Iterator it = pendingJobs.iterator(); 86 while (it.hasNext()) { 87 JobDetail jd = (JobDetail) it.next(); 88 if (taskList.getJobTask(jd) == null) { 89 log.info("Job " + jd + " is not running - marking as INTERRUPTED."); 90 jd.setState(JobDetail.INTERRUPTED); 91 dao.updateJobDetail(jd); 92 } 93 } 94 } 95 96 protected Job getJob(String type) { 97 return (Job) jobs.get(type); 98 } 99 100 public JobDetail getOrCreateJob(String type, String objectId) 101 throws JobCreationException { 102 103 JobDetail detail = dao.getPendingJobDetail(type, objectId); 104 if (detail == null) { 105 detail = dao.getLastCompletedJobDetail(type, objectId); 106 if (detail == null || getJob(type).isRepeatable()) { 107 detail = setupJob(type, objectId); 108 log.debug("No pending job found of type " + type 109 + " for objectId '" + objectId + "' - a new job has " 110 + " been set up with id " + detail.getId()); 111 } 112 } 113 else { 114 log.debug("Found pending job: " + detail.getId()); 115 } 116 return detail; 117 } 118 119 private JobDetail setupJob(String type, String objectId) 120 throws JobCreationException { 121 122 int averageStepTime = dao.getAverageStepTime(type); 123 JobDetail detail = new JobDetail(type, objectId, averageStepTime); 124 dao.saveJobDetail(detail); 125 taskExecutor.execute(new JobSetupTask(detail)); 126 return detail; 127 } 128 129 public void executeJob(JobDetail detail) { 130 log.info("Executing job " + detail); 131 Job job = getJob(detail.getType()); 132 JobTask task = new JobTask(job, detail, dao, uiUpdater, taskList); 133 taskExecutor.execute(task); 134 } 135 136 public void interruptJob(JobDetail detail) { 137 log.info("Interrupting " + detail); 138 taskList.interrupt(detail); 139 JobLogEntry logEntry = new JobLogEntry(detail, "Job interrupted"); 140 dao.log(logEntry); 141 uiUpdater.log(logEntry); 142 } 143 144 public void cancelJob(JobDetail detail) { 145 log.info("Canceling " + detail); 146 taskList.interrupt(detail); 147 Job job = getJob(detail.getType()); 148 job.tearDown(detail.getObjectId()); 149 detail.setState(JobDetail.CANCELED); 150 detail.setEndDate(new Date ()); 151 dao.updateJobDetail(detail); 152 uiUpdater.updateJob(detail); 153 JobLogEntry logEntry = new JobLogEntry(detail, "Job canceled"); 154 dao.log(logEntry); 155 uiUpdater.log(logEntry); 156 } 157 158 public void destroy() throws Exception { 159 executionTimeUpdater.stop(); 160 taskList.interruptAll(); 161 log.debug("JobManager has been shut down."); 162 } 163 164 private class JobSetupTask implements Runnable { 165 166 private JobDetail detail; 167 168 private Job job; 169 170 public JobSetupTask(JobDetail detail) { 171 this.detail = detail; 172 this.job = getJob(detail.getType()); 173 } 174 175 public void run() { 176 detail.init(job.setup(detail.getObjectId())); 177 dao.updateJobDetail(detail); 178 uiUpdater.updateJob(detail); 179 } 180 } 181 } 182 | Popular Tags |