1 package org.apache.turbine.services.schedule; 2 3 18 19 import java.util.Iterator ; 20 import java.util.List ; 21 22 import javax.servlet.ServletConfig ; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 27 import org.apache.torque.TorqueException; 28 import org.apache.torque.util.Criteria; 29 30 import org.apache.turbine.services.InitializationException; 31 import org.apache.turbine.services.TurbineBaseService; 32 import org.apache.turbine.util.TurbineException; 33 34 41 public class TurbineSchedulerService 42 extends TurbineBaseService 43 implements ScheduleService 44 { 45 46 private static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME); 47 48 49 protected JobQueue scheduleQueue = null; 50 51 52 private boolean enabled = false; 53 54 55 protected MainLoop mainLoop; 56 57 58 protected Thread thread; 59 60 63 public TurbineSchedulerService() 64 { 65 mainLoop = null; 66 thread = null; 67 } 68 69 75 public void init() 76 throws InitializationException 77 { 78 try 79 { 80 setEnabled(getConfiguration().getBoolean("enabled", true)); 81 scheduleQueue = new JobQueue(); 82 mainLoop = new MainLoop(); 83 84 List jobs = JobEntryPeer.doSelect(new Criteria()); 86 87 if (jobs != null && jobs.size() > 0) 88 { 89 Iterator it = jobs.iterator(); 90 while (it.hasNext()) 91 { 92 ((JobEntry) it.next()).calcRunTime(); 93 } 94 scheduleQueue.batchLoad(jobs); 95 96 restart(); 97 } 98 99 setInit(true); 100 } 101 catch (Exception e) 102 { 103 String errorMessage = "Could not initialize the scheduler service"; 104 log.error(errorMessage, e); 105 throw new InitializationException(errorMessage, e); 106 } 107 } 108 109 119 public void init(ServletConfig config) throws InitializationException 120 { 121 init(); 122 } 123 124 129 public void shutdown() 130 { 131 if (getThread() != null) 132 { 133 getThread().interrupt(); 134 } 135 } 136 137 144 public JobEntry getJob(int oid) 145 throws TurbineException 146 { 147 try 148 { 149 JobEntry je = JobEntryPeer.retrieveByPK(oid); 150 return scheduleQueue.getJob(je); 151 } 152 catch (TorqueException e) 153 { 154 String errorMessage = "Error retrieving job from persistent storage."; 155 log.error(errorMessage, e); 156 throw new TurbineException(errorMessage, e); 157 } 158 } 159 160 166 public void addJob(JobEntry je) 167 throws TurbineException 168 { 169 updateJob(je); 170 } 171 172 178 public void removeJob(JobEntry je) 179 throws TurbineException 180 { 181 try 182 { 183 Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey()); 185 JobEntryPeer.doDelete(c); 186 187 scheduleQueue.remove(je); 189 190 restart(); 192 } 193 catch (Exception e) 194 { 195 String errorMessage = "Problem removing Scheduled Job: " + je.getTask(); 196 log.error(errorMessage, e); 197 throw new TurbineException(errorMessage, e); 198 } 199 } 200 201 207 public void updateJob(JobEntry je) 208 throws TurbineException 209 { 210 try 211 { 212 je.calcRunTime(); 213 214 if (je.isNew()) 216 { 217 scheduleQueue.add(je); 218 } 219 else 220 { 221 scheduleQueue.modify(je); 222 } 223 224 je.save(); 225 226 restart(); 227 } 228 catch (Exception e) 229 { 230 String errorMessage = "Problem updating Scheduled Job: " + je.getTask(); 231 log.error(errorMessage, e); 232 throw new TurbineException(errorMessage, e); 233 } 234 } 235 236 241 public List listJobs() 242 { 243 return scheduleQueue.list(); 244 } 245 246 252 protected void setEnabled(boolean enabled) 253 { 254 this.enabled = enabled; 255 } 256 257 262 public boolean isEnabled() 263 { 264 return enabled; 265 } 266 267 270 public synchronized void startScheduler() 271 { 272 setEnabled(true); 273 restart(); 274 } 275 276 279 public synchronized void stopScheduler() 280 { 281 log.info("Stopping job scheduler"); 282 Thread thread = getThread(); 283 if (thread != null) 284 { 285 thread.interrupt(); 286 } 287 enabled = false; 288 } 289 290 297 public synchronized Thread getThread() 298 { 299 return thread; 300 } 301 302 305 private synchronized void clearThread() 306 { 307 thread = null; 308 } 309 310 316 public synchronized void restart() 317 { 318 if (enabled) 319 { 320 log.info("Starting job scheduler"); 321 if (thread == null) 322 { 323 thread = new Thread (mainLoop, ScheduleService.SERVICE_NAME); 327 thread.setDaemon(true); 332 thread.start(); 333 } 334 else 335 { 336 notify(); 337 } 338 } 339 } 340 341 348 private synchronized JobEntry nextJob() 349 throws TurbineException 350 { 351 try 352 { 353 while (!Thread.interrupted()) 354 { 355 JobEntry je = scheduleQueue.getNext(); 357 358 if (je == null) 359 { 360 wait(); 362 } 363 else 364 { 365 long now = System.currentTimeMillis(); 366 long when = je.getNextRuntime(); 367 368 if (when > now) 369 { 370 wait(when - now); 372 } 373 else 374 { 375 scheduleQueue.updateQueue(je); 377 return je; 379 } 380 } 381 } 382 } 383 catch (InterruptedException ex) 384 { 385 } 386 387 return null; 389 } 390 391 396 protected class MainLoop 397 implements Runnable 398 { 399 402 public void run() 403 { 404 String taskName = null; 405 try 406 { 407 while (enabled) 408 { 409 JobEntry je = nextJob(); 410 if (je != null) 411 { 412 taskName = je.getTask(); 413 414 Runnable wt = new WorkerThread(je); 416 Thread helper = new Thread (wt); 417 helper.start(); 418 } 419 else 420 { 421 break; 422 } 423 } 424 } 425 catch (Exception e) 426 { 427 log.error("Error running a Scheduled Job: " + taskName, e); 428 enabled = false; 429 } 430 finally 431 { 432 clearThread(); 433 } 434 } 435 } 436 } 437 | Popular Tags |