1 16 package org.apache.cocoon.components.cron; 17 18 import java.text.ParseException ; 19 import java.util.Arrays ; 20 import java.util.Date ; 21 import java.util.Map ; 22 import java.util.NoSuchElementException ; 23 24 import org.apache.avalon.framework.CascadingException; 25 import org.apache.avalon.framework.activity.Disposable; 26 import org.apache.avalon.framework.activity.Initializable; 27 import org.apache.avalon.framework.activity.Startable; 28 import org.apache.avalon.framework.component.Component; 29 import org.apache.avalon.framework.configuration.Configurable; 30 import org.apache.avalon.framework.configuration.Configuration; 31 import org.apache.avalon.framework.configuration.ConfigurationException; 32 import org.apache.avalon.framework.context.Context; 33 import org.apache.avalon.framework.context.ContextException; 34 import org.apache.avalon.framework.context.Contextualizable; 35 import org.apache.avalon.framework.logger.AbstractLogEnabled; 36 import org.apache.avalon.framework.parameters.Parameters; 37 import org.apache.avalon.framework.service.ServiceException; 38 import org.apache.avalon.framework.service.ServiceManager; 39 import org.apache.avalon.framework.service.Serviceable; 40 import org.apache.avalon.framework.thread.ThreadSafe; 41 import org.apache.cocoon.components.thread.RunnableManager; 42 import org.apache.cocoon.components.thread.ThreadPool; 43 import org.quartz.CronTrigger; 44 import org.quartz.Job; 45 import org.quartz.JobDataMap; 46 import org.quartz.JobDetail; 47 import org.quartz.Scheduler; 48 import org.quartz.SchedulerException; 49 import org.quartz.SimpleTrigger; 50 import org.quartz.Trigger; 51 import org.quartz.JobExecutionContext; 52 import org.quartz.JobExecutionException; 53 import org.quartz.impl.DirectSchedulerFactory; 54 import org.quartz.impl.jdbcjobstore.InvalidConfigurationException; 55 import org.quartz.impl.jdbcjobstore.JobStoreSupport; 56 import org.quartz.simpl.RAMJobStore; 57 import org.quartz.spi.JobStore; 58 import org.quartz.spi.TriggerFiredBundle; 59 import org.quartz.utils.ConnectionProvider; 60 import org.quartz.utils.DBConnectionManager; 61 import org.quartz.utils.JNDIConnectionProvider; 62 63 71 public class QuartzJobScheduler extends AbstractLogEnabled 72 implements JobScheduler, Component, ThreadSafe, 73 Serviceable, Configurable, Startable, 74 Disposable, Contextualizable, Initializable { 75 76 77 private static final String POLICY_RUN = "RUN"; 78 79 80 private static final String POLICY_WAIT = "WAIT"; 81 82 83 private static final String POLICY_ABORT = "ABORT"; 84 85 86 private static final String POLICY_DISCARD = "DISCARD"; 87 88 89 private static final String POLICY_DISCARD_OLDEST = "DISCARDOLDEST"; 90 91 92 93 static final String DATA_MAP_ROLE = "QuartzJobScheduler.ROLE"; 94 95 96 static final String DATA_MAP_OBJECT = "QuartzJobScheduler.Object"; 97 98 99 static final String DATA_MAP_NAME = "QuartzJobScheduler.JobName"; 100 101 102 static final String DATA_MAP_CONTEXT = "QuartzJobScheduler.Context"; 103 104 105 static final String DATA_MAP_MANAGER = "QuartzJobScheduler.ServiceManager"; 106 107 108 static final String DATA_MAP_LOGGER = "QuartzJobScheduler.Logger"; 109 110 111 static final String DATA_MAP_RUN_CONCURRENT = "QuartzJobScheduler.RunConcurrently"; 112 113 114 static final String DATA_MAP_PARAMETERS = "QuartzJobScheduler.Parameters"; 115 116 117 static final String DATA_MAP_OBJECTMAP = "QuartzJobScheduler.Map"; 118 119 121 122 123 static final String DATA_MAP_KEY_ISRUNNING = "QuartzJobExecutor.isRunning"; 124 125 126 127 static final String DEFAULT_QUARTZ_JOB_GROUP = "Cocoon"; 128 129 130 static final String DEFAULT_QUARTZ_SCHEDULER_NAME = "Cocoon"; 131 132 133 private Context context; 134 135 136 private ThreadPool executor; 137 138 139 private Scheduler scheduler; 140 141 142 private ServiceManager manager; 143 144 145 private Configuration config; 146 147 148 private boolean m_shutdownGraceful; 149 150 153 public String [] getJobNames() { 154 try { 155 final String [] names = scheduler.getJobNames(DEFAULT_QUARTZ_JOB_GROUP); 156 Arrays.sort(names); 157 158 return names; 159 } catch (final SchedulerException se) { 160 getLogger().error("could not gather job names", se); 161 } 162 163 return new String [0]; 164 } 165 166 169 public JobSchedulerEntry getJobSchedulerEntry(String jobname) { 170 try { 171 return new QuartzJobSchedulerEntry(jobname, scheduler); 172 } catch (final Exception e) { 173 getLogger().error("cannot create QuartzJobSchedulerEntry", e); 174 } 175 176 return null; 177 } 178 179 182 public void addJob(final String name, final Object job, final String cronSpec, final boolean canRunConcurrently, 183 final Parameters params, final Map objects) 184 throws CascadingException { 185 final JobDataMap jobDataMap = new JobDataMap(); 186 jobDataMap.put(DATA_MAP_OBJECT, job); 187 addJob(name, jobDataMap, cronSpec, canRunConcurrently, params, objects); 188 } 189 190 193 public void addJob(final String name, final String jobrole, final String cronSpec, 194 final boolean canRunConcurrently, final Parameters params, final Map objects) 195 throws CascadingException { 196 final JobDataMap jobDataMap = new JobDataMap(); 197 jobDataMap.put(DATA_MAP_ROLE, jobrole); 198 addJob(name, jobDataMap, cronSpec, canRunConcurrently, params, objects); 199 } 200 201 204 public void addJob(final String name, final Object job, final String cronSpec, final boolean canRunConcurrently) 205 throws CascadingException { 206 if (!(job instanceof CronJob) && !(job instanceof Runnable ) && !(job instanceof Job)) { 207 throw new CascadingException("Job object is neither an instance of " + CronJob.class.getName() + "," + 208 Runnable .class.getName() + " nor " + Job.class.getName()); 209 } 210 211 addJob(name, job, cronSpec, canRunConcurrently, null, null); 212 } 213 214 217 public void addJob(final String name, final String jobrole, final String cronSpec, final boolean canRunConcurrently) 218 throws CascadingException { 219 addJob(name, jobrole, cronSpec, canRunConcurrently, null, null); 220 } 221 222 234 public void addPeriodicJob(String name, String jobrole, long period, boolean canRunConcurrently, Parameters params, 235 Map objects) 236 throws CascadingException { 237 final JobDataMap jobDataMap = new JobDataMap(); 238 jobDataMap.put(DATA_MAP_ROLE, jobrole); 239 240 final long ms = period * 1000; 241 final SimpleTrigger timeEntry = 242 new SimpleTrigger(name, DEFAULT_QUARTZ_JOB_GROUP, new Date (System.currentTimeMillis() + ms), null, 243 SimpleTrigger.REPEAT_INDEFINITELY, ms); 244 245 addJob(name, jobDataMap, timeEntry, canRunConcurrently, params, objects); 246 } 247 248 260 public void addPeriodicJob(String name, Object job, long period, boolean canRunConcurrently, Parameters params, 261 Map objects) 262 throws CascadingException { 263 if (!(job instanceof CronJob) && !(job instanceof Runnable ) && !(job instanceof Job)) { 264 throw new CascadingException("Job object is neither an instance of " + CronJob.class.getName() + "," + 265 Runnable .class.getName() + " nor " + Job.class.getName()); 266 } 267 final JobDataMap jobDataMap = new JobDataMap(); 268 jobDataMap.put(DATA_MAP_OBJECT, job); 269 270 final long ms = period * 1000; 271 final SimpleTrigger timeEntry = 272 new SimpleTrigger(name, DEFAULT_QUARTZ_JOB_GROUP, new Date (System.currentTimeMillis() + ms), null, 273 SimpleTrigger.REPEAT_INDEFINITELY, ms); 274 275 addJob(name, jobDataMap, timeEntry, canRunConcurrently, params, objects); 276 } 277 278 281 public void configure(final Configuration config) 282 throws ConfigurationException { 283 this.config = config; 284 } 285 286 289 public void dispose() { 290 try { 291 if (getLogger().isInfoEnabled()) { 292 getLogger().info("shutting down scheduler " + 293 (m_shutdownGraceful ? "graceful (waiting for running jobs to complete)" 294 : "immediately (killing running jobs)")); 295 } 296 297 scheduler.shutdown(m_shutdownGraceful); 298 scheduler = null; 299 } catch (final SchedulerException se) { 300 getLogger().error("failure during scheduler shutdown", se); 301 } 302 303 this.executor = null; 304 } 305 306 309 public void contextualize(Context context) throws ContextException { 310 this.context = context; 311 } 312 313 public void initialize() throws Exception { 314 try { 315 final String runID = new Date ().toString().replace(' ', '_'); 318 final QuartzThreadPool pool = createThreadPool(this.config.getChild("thread-pool")); 319 final JobStore store = createJobStore(DEFAULT_QUARTZ_SCHEDULER_NAME, runID, this.config.getChild("store")); 320 DirectSchedulerFactory.getInstance().createScheduler(DEFAULT_QUARTZ_SCHEDULER_NAME, runID, pool, store); 321 scheduler = DirectSchedulerFactory.getInstance().getScheduler(DEFAULT_QUARTZ_SCHEDULER_NAME); 323 } catch (final SchedulerException se) { 324 throw new ConfigurationException("cannot create a quartz scheduler", se); 325 } 326 327 final Configuration[] triggers = this.config.getChild("triggers").getChildren("trigger"); 328 createTriggers(triggers); 329 330 this.config = null; 332 333 if (getLogger().isDebugEnabled() && (triggers.length == 0)) { 334 getLogger().debug("no triggers configured at startup"); 335 } 336 } 337 338 341 public boolean fireJob(final Object job) { 342 return fireJob(job.getClass().getName(), job); 343 } 344 345 348 public boolean fireJob(final String jobrole) { 349 Object job = null; 350 351 try { 352 job = manager.lookup(jobrole); 353 354 return fireJob(jobrole, job); 355 } catch (final ServiceException se) { 356 getLogger().error("cannot fire job " + jobrole, se); 357 } finally { 358 manager.release(job); 359 } 360 361 return false; 362 } 363 364 367 public boolean fireJob(final Object job, final Parameters params, final Map objects) 368 throws CascadingException { 369 if (job instanceof ConfigurableCronJob) { 370 ((ConfigurableCronJob)job).setup(params, objects); 371 } 372 373 return fireJob(job); 374 } 375 376 379 public boolean fireJob(final String jobrole, final Parameters params, final Map objects) 380 throws CascadingException { 381 Object job = null; 382 383 try { 384 job = manager.lookup(jobrole); 385 386 if (job instanceof ConfigurableCronJob) { 387 ((ConfigurableCronJob)job).setup(params, objects); 388 } 389 390 return fireJob(jobrole, job); 391 } catch (final ServiceException se) { 392 getLogger().error("cannot fire job " + jobrole, se); 393 } finally { 394 manager.release(job); 395 } 396 397 return false; 398 } 399 400 403 public void fireJobAt(final Date date, final String name, final Object job) 404 throws CascadingException { 405 fireJobAt(date, name, job, null, null); 406 } 407 408 411 public void fireJobAt(final Date date, final String name, final String jobrole) 412 throws CascadingException { 413 fireJobAt(date, name, jobrole, null, null); 414 } 415 416 419 public void fireJobAt(final Date date, final String name, final Object job, final Parameters params, 420 final Map objects) 421 throws CascadingException { 422 final JobDataMap jobDataMap = new JobDataMap(); 423 jobDataMap.put(DATA_MAP_OBJECT, job); 424 addJob(name, jobDataMap, date, true, params, objects); 425 } 426 427 430 public void fireJobAt(final Date date, final String name, final String jobrole, final Parameters params, 431 final Map objects) 432 throws CascadingException { 433 final JobDataMap jobDataMap = new JobDataMap(); 434 jobDataMap.put(DATA_MAP_ROLE, jobrole); 435 addJob(name, jobDataMap, date, true, params, objects); 436 } 437 438 441 public void removeJob(final String name) 442 throws NoSuchElementException { 443 try { 444 if (scheduler.deleteJob(name, DEFAULT_QUARTZ_JOB_GROUP)) { 445 getLogger().info("job " + name + " removed by request"); 446 } else { 447 getLogger().error("couldn't remove requested job " + name); 448 } 449 } catch (final SchedulerException se) { 450 getLogger().error("cannot remove job " + name, se); 451 throw new NoSuchElementException (se.getMessage()); 452 } 453 } 454 455 458 public void service(final ServiceManager manager) 459 throws ServiceException { 460 this.manager = manager; 461 } 462 463 466 public void start() 467 throws Exception { 468 scheduler.start(); 469 } 470 471 474 public void stop() 475 throws Exception { 476 scheduler.pause(); 477 } 478 479 491 private void addJob(final String name, final JobDataMap jobDataMap, final Date date, 492 final boolean canRunConcurrently, final Parameters params, final Map objects) 493 throws CascadingException { 494 final SimpleTrigger trigger = new SimpleTrigger(name, DEFAULT_QUARTZ_JOB_GROUP, date); 495 addJob(name, jobDataMap, trigger, canRunConcurrently, params, objects); 496 } 497 498 510 private void addJob(final String name, final JobDataMap jobDataMap, final String cronSpec, 511 final boolean canRunConcurrently, final Parameters params, final Map objects) 512 throws CascadingException { 513 final CronTrigger cronJobEntry = new CronTrigger(name, DEFAULT_QUARTZ_JOB_GROUP); 514 515 try { 516 cronJobEntry.setCronExpression(cronSpec); 517 } catch (final ParseException pe) { 518 throw new CascadingException(pe.getMessage(), pe); 519 } 520 521 addJob(name, jobDataMap, cronJobEntry, canRunConcurrently, params, objects); 522 } 523 524 536 private void addJob(final String name, final JobDataMap jobDataMap, final Trigger trigger, 537 final boolean canRunConcurrently, final Parameters params, final Map objects) 538 throws CascadingException { 539 try { 540 final JobDetail jobdetail = scheduler.getJobDetail(name, DEFAULT_QUARTZ_JOB_GROUP); 541 if (jobdetail != null) { 542 removeJob(name); 543 } 544 } catch (final SchedulerException ignored) { 545 } 546 547 initDataMap(jobDataMap, name, canRunConcurrently, params, objects); 548 549 final JobDetail detail = createJobDetail(name, jobDataMap); 550 551 if (getLogger().isInfoEnabled()) { 552 getLogger().info("Adding CronJob '" + trigger.getFullName() + "'"); 553 } 554 555 try { 556 scheduler.scheduleJob(detail, trigger); 557 } catch (final SchedulerException se) { 558 throw new CascadingException(se.getMessage(), se); 559 } 560 561 if (getLogger().isDebugEnabled()) { 562 if (trigger instanceof CronTrigger) { 563 getLogger().debug("Time schedule summary:\n" + ((CronTrigger)trigger).getExpressionSummary()); 564 } else { 565 getLogger().debug("Next scheduled time: " + trigger.getNextFireTime()); 566 } 567 } 568 } 569 570 protected JobDataMap initDataMap(JobDataMap jobDataMap, String jobName, boolean concurent, 571 Parameters params, Map objects) { 572 jobDataMap.put(DATA_MAP_NAME, jobName); 573 jobDataMap.put(DATA_MAP_LOGGER, getLogger()); 574 jobDataMap.put(DATA_MAP_CONTEXT, this.context); 575 jobDataMap.put(DATA_MAP_MANAGER, this.manager); 576 jobDataMap.put(DATA_MAP_RUN_CONCURRENT, concurent? Boolean.TRUE: Boolean.FALSE); 577 if (null != params) { 578 jobDataMap.put(DATA_MAP_PARAMETERS, params); 579 } 580 if (null != objects) { 581 jobDataMap.put(DATA_MAP_OBJECTMAP, objects); 582 } 583 return jobDataMap; 584 } 585 586 protected JobDetail createJobDetail(String name, JobDataMap jobDataMap) { 587 final JobDetail detail = new JobDetail(name, DEFAULT_QUARTZ_JOB_GROUP, QuartzJobExecutor.class); 588 detail.setJobDataMap(jobDataMap); 589 return detail; 590 } 591 592 599 private QuartzThreadPool createThreadPool(final Configuration poolConfig) 600 throws ServiceException { 601 final boolean useQueueing = poolConfig.getChild("use-queueing").getValueAsBoolean(false); 602 final int queueSize = poolConfig.getChild("queue-size").getValueAsInteger(-1); 603 final int maxPoolSize = poolConfig.getChild("max-pool-size").getValueAsInteger(-1); 604 final int minPoolSize = poolConfig.getChild("min-pool-size").getValueAsInteger(-1); 605 final int keepAliveTimeMs = poolConfig.getChild("keep-alive-time-ms").getValueAsInteger(-1); 606 final String blockPolicy = poolConfig.getChild("block-policy").getValue(null); 607 m_shutdownGraceful = poolConfig.getChild("shutdown-graceful").getValueAsBoolean(true); 608 609 final int shutdownWaitTimeMs = poolConfig.getChild("shutdown-wait-time-ms").getValueAsInteger(-1); 610 final RunnableManager runnableManager = (RunnableManager)this.manager.lookup(RunnableManager.ROLE); 611 this.executor = runnableManager.createPool(queueSize, 612 maxPoolSize, 613 minPoolSize, 614 Thread.NORM_PRIORITY, 615 false, keepAliveTimeMs, 617 blockPolicy, 618 m_shutdownGraceful, 619 shutdownWaitTimeMs); 620 final QuartzThreadPool pool = new QuartzThreadPool(this.executor); 621 return pool; 622 } 623 624 631 private void createTriggers(final Configuration[] triggers) 632 throws ConfigurationException { 633 for (int i = 0; i < triggers.length; i++) { 634 String cron = triggers[i].getChild("cron").getValue(null); 635 636 if (null == cron) { 637 final String seconds = triggers[i].getChild("seconds").getValue("0"); 638 final String minutes = triggers[i].getChild("minutes").getValue("*"); 639 final String hours = triggers[i].getChild("hours").getValue("*"); 640 final String days = triggers[i].getChild("days").getValue("*"); 641 final String months = triggers[i].getChild("months").getValue("*"); 642 final String weekdays = triggers[i].getChild("weekdays").getValue("?"); 643 final String years = triggers[i].getChild("years").getValue("*"); 644 cron = seconds + " " + minutes + " " + hours + " " + days + " " + months + " " + weekdays + " " + 645 years; 646 } 647 648 try { 649 addJob(triggers[i].getAttribute("name"), triggers[i].getAttribute("target"), cron, 650 triggers[i].getAttributeAsBoolean("concurrent-runs", true)); 651 } catch (final CascadingException ce) { 652 throw new ConfigurationException("failed adding trigger to scheduler", ce); 653 } 654 } 655 } 656 657 private JobStore createJobStore(String instanceName, String instanceID, final Configuration configuration) 658 throws ConfigurationException { 659 String type = configuration.getAttribute("type", "ram"); 660 if (type.equals("ram")) { 661 return new RAMJobStore(); 662 } 663 664 JobStoreSupport store = null; 665 if (type.equals("tx")) { 666 store = new QuartzJobStoreTX(getLogger(), this.manager, this.context); 667 } else if (type.equals("cmt")) { 668 store = new QuartzJobStoreCMT(getLogger(), this.manager, this.context); 669 } else { 670 throw new ConfigurationException("Unknown store type: " + type); 671 } 672 673 Configuration dsConfig = configuration.getChild("datasource", false); 674 if (dsConfig == null) { 675 throw new ConfigurationException("Store " + type + " requires datasource configuration."); 676 } 677 678 String dsName = dsConfig.getValue(); 679 String dsType = dsConfig.getAttribute("provider", "jndi"); 680 681 ConnectionProvider provider; 682 if (dsType.equals("jndi")) { 683 provider = new JNDIConnectionProvider(dsName, false); 684 } else if (dsType.equals("excalibur")) { 685 provider = new DataSourceComponentConnectionProvider(dsName, this.manager); 686 } else { 687 try { 689 provider = (ConnectionProvider)Class.forName(dsType).newInstance(); 690 } catch (Exception e) { 691 throw new ConfigurationException("Could not instantiate ConnectionProvider class " + dsType); 692 } 693 } 694 695 store.setInstanceName(instanceName); 696 store.setInstanceId(instanceID); 697 store.setDataSource(dsType + ":" + dsName); 698 DBConnectionManager.getInstance().addConnectionProvider(dsType + ":" + dsName, provider); 699 700 String delegate = configuration.getAttribute("delegate", null); 701 try { 702 if (delegate != null) { 703 store.setDriverDelegateClass(delegate); 704 } 705 } catch (InvalidConfigurationException e) { 706 throw new ConfigurationException("Could not instantiate DriverDelegate class " + delegate, e); 707 } 708 709 return store; 710 } 711 712 715 private boolean fireJob(final String name, final Object job) { 716 try { 717 if (job instanceof CronJob) { 718 JobDataMap jobDataMap = new JobDataMap(); 719 jobDataMap.put(DATA_MAP_OBJECT, job); 720 initDataMap(jobDataMap, name, true, null, null); 721 722 final JobDetail detail = createJobDetail(name, jobDataMap); 723 724 TriggerFiredBundle trigger = new TriggerFiredBundle(detail, null, null, false, null, null, null, null); 725 726 final Job executor = createJobExecutor(); 727 final JobExecutionContext context = new JobExecutionContext(this.scheduler, trigger, executor); 728 729 this.executor.execute(new Runnable () { 730 public void run() { 731 try { 733 executor.execute(context); 734 } catch (JobExecutionException e) { 735 getLogger().error("Job '" + job + "' died.", e); 736 } 737 } 738 }); 739 } else if (job instanceof Runnable ) { 740 this.executor.execute((Runnable )job); 741 } else { 742 getLogger().error("Job named '" + name + "' is of invalid class: " + job.getClass().getName()); 743 return false; 744 } 745 746 return true; 747 } catch (final InterruptedException ie) { 748 getLogger().error("job " + name + " interrupted", ie); 749 } 750 751 return false; 752 } 753 754 protected Job createJobExecutor() { 755 return new QuartzJobExecutor(); 756 } 757 758 764 private static class QuartzThreadPool extends AbstractLogEnabled implements org.quartz.spi.ThreadPool { 765 766 private ThreadPool executor; 767 768 771 public QuartzThreadPool(final ThreadPool executor) { 772 super(); 773 this.executor = executor; 774 } 775 776 779 public int getPoolSize() { 780 return this.executor.getMaximumPoolSize(); 781 } 782 783 786 public void initialize() { 787 } 788 789 792 public boolean runInThread(final Runnable job) { 793 try { 794 this.executor.execute(job); 795 } catch (final InterruptedException ie) { 796 getLogger().error("Cronjob failed", ie); 797 } 798 799 return true; 800 } 801 802 805 public void shutdown(final boolean waitForJobsToComplete) { 806 this.executor.shutdown(); 807 } 808 } 809 } 810 | Popular Tags |