1 17 18 21 package org.quartz.impl.jdbcjobstore; 22 23 import java.io.IOException ; 24 import java.lang.reflect.Constructor ; 25 import java.lang.reflect.InvocationHandler ; 26 import java.lang.reflect.InvocationTargetException ; 27 import java.lang.reflect.Proxy ; 28 import java.sql.Connection ; 29 import java.sql.SQLException ; 30 import java.util.ArrayList ; 31 import java.util.Date ; 32 import java.util.HashMap ; 33 import java.util.HashSet ; 34 import java.util.Iterator ; 35 import java.util.LinkedList ; 36 import java.util.List ; 37 import java.util.Set ; 38 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 import org.quartz.Calendar; 42 import org.quartz.CronTrigger; 43 import org.quartz.JobDataMap; 44 import org.quartz.JobDetail; 45 import org.quartz.JobPersistenceException; 46 import org.quartz.ObjectAlreadyExistsException; 47 import org.quartz.Scheduler; 48 import org.quartz.SchedulerConfigException; 49 import org.quartz.SchedulerException; 50 import org.quartz.SimpleTrigger; 51 import org.quartz.Trigger; 52 import org.quartz.core.SchedulingContext; 53 import org.quartz.spi.ClassLoadHelper; 54 import org.quartz.spi.JobStore; 55 import org.quartz.spi.SchedulerSignaler; 56 import org.quartz.spi.TriggerFiredBundle; 57 import org.quartz.utils.DBConnectionManager; 58 import org.quartz.utils.Key; 59 import org.quartz.utils.TriggerStatus; 60 61 62 70 public abstract class JobStoreSupport implements JobStore, Constants { 71 72 79 80 protected static String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS"; 81 82 protected static String LOCK_JOB_ACCESS = "JOB_ACCESS"; 83 84 protected static String LOCK_CALENDAR_ACCESS = "CALENDAR_ACCESS"; 85 86 protected static String LOCK_STATE_ACCESS = "STATE_ACCESS"; 87 88 protected static String LOCK_MISFIRE_ACCESS = "MISFIRE_ACCESS"; 89 90 97 98 protected String dsName; 99 100 protected String tablePrefix = DEFAULT_TABLE_PREFIX; 101 102 protected boolean useProperties = false; 103 104 protected String instanceId; 105 106 protected String instanceName; 107 108 protected String delegateClassName; 109 protected Class delegateClass = StdJDBCDelegate.class; 110 111 protected HashMap calendarCache = new HashMap (); 112 113 private DriverDelegate delegate; 114 115 private long misfireThreshold = 60000L; 117 private boolean dontSetAutoCommitFalse = false; 118 119 private boolean isClustered = false; 120 121 private boolean useDBLocks = false; 122 123 private boolean lockOnInsert = true; 124 125 private Semaphore lockHandler = null; 127 private String selectWithLockSQL = null; 128 129 private long clusterCheckinInterval = 7500L; 130 131 private ClusterManager clusterManagementThread = null; 132 133 private MisfireHandler misfireHandler = null; 134 135 private ClassLoadHelper classLoadHelper; 136 137 private SchedulerSignaler signaler; 138 139 protected int maxToRecoverAtATime = 20; 140 141 private boolean setTxIsolationLevelSequential = false; 142 143 private long dbRetryInterval = 10000; 144 145 private boolean makeThreadsDaemons = false; 146 147 private boolean doubleCheckLockMisfireHandler = true; 148 149 private final Log log = LogFactory.getLog(getClass()); 150 151 158 159 165 public void setDataSource(String dsName) { 166 this.dsName = dsName; 167 } 168 169 175 public String getDataSource() { 176 return dsName; 177 } 178 179 184 public void setTablePrefix(String prefix) { 185 if (prefix == null) { 186 prefix = ""; 187 } 188 189 this.tablePrefix = prefix; 190 } 191 192 197 public String getTablePrefix() { 198 return tablePrefix; 199 } 200 201 206 public void setUseProperties(String useProp) { 207 if (useProp == null) { 208 useProp = "false"; 209 } 210 211 this.useProperties = Boolean.valueOf(useProp).booleanValue(); 212 } 213 214 219 public boolean canUseProperties() { 220 return useProperties; 221 } 222 223 228 public void setInstanceId(String instanceId) { 229 this.instanceId = instanceId; 230 } 231 232 237 public String getInstanceId() { 238 239 return instanceId; 240 } 241 242 245 public void setInstanceName(String instanceName) { 246 this.instanceName = instanceName; 247 } 248 249 252 public String getInstanceName() { 253 254 return instanceName; 255 } 256 257 262 public void setIsClustered(boolean isClustered) { 263 this.isClustered = isClustered; 264 } 265 266 271 public boolean isClustered() { 272 return isClustered; 273 } 274 275 282 public long getClusterCheckinInterval() { 283 return clusterCheckinInterval; 284 } 285 286 293 public void setClusterCheckinInterval(long l) { 294 clusterCheckinInterval = l; 295 } 296 297 304 public int getMaxMisfiresToHandleAtATime() { 305 return maxToRecoverAtATime; 306 } 307 308 315 public void setMaxMisfiresToHandleAtATime(int maxToRecoverAtATime) { 316 this.maxToRecoverAtATime = maxToRecoverAtATime; 317 } 318 319 322 public long getDbRetryInterval() { 323 return dbRetryInterval; 324 } 325 328 public void setDbRetryInterval(long dbRetryInterval) { 329 this.dbRetryInterval = dbRetryInterval; 330 } 331 332 338 public void setUseDBLocks(boolean useDBLocks) { 339 this.useDBLocks = useDBLocks; 340 } 341 342 348 public boolean getUseDBLocks() { 349 return useDBLocks; 350 } 351 352 public boolean isLockOnInsert() { 353 return lockOnInsert; 354 } 355 356 368 public void setLockOnInsert(boolean lockOnInsert) { 369 this.lockOnInsert = lockOnInsert; 370 } 371 372 public long getMisfireThreshold() { 373 return misfireThreshold; 374 } 375 376 383 public void setMisfireThreshold(long misfireThreshold) { 384 if (misfireThreshold < 1) { 385 throw new IllegalArgumentException ( 386 "Misfirethreshold must be larger than 0"); 387 } 388 this.misfireThreshold = misfireThreshold; 389 } 390 391 public boolean isDontSetAutoCommitFalse() { 392 return dontSetAutoCommitFalse; 393 } 394 395 402 public void setDontSetAutoCommitFalse(boolean b) { 403 dontSetAutoCommitFalse = b; 404 } 405 406 public boolean isTxIsolationLevelSerializable() { 407 return setTxIsolationLevelSequential; 408 } 409 410 415 public void setTxIsolationLevelSerializable(boolean b) { 416 setTxIsolationLevelSequential = b; 417 } 418 419 420 428 public void setDriverDelegateClass(String delegateClassName) 429 throws InvalidConfigurationException { 430 this.delegateClassName = delegateClassName; 431 } 432 433 440 public String getDriverDelegateClass() { 441 return delegateClassName; 442 } 443 444 public String getSelectWithLockSQL() { 445 return selectWithLockSQL; 446 } 447 448 456 public void setSelectWithLockSQL(String string) { 457 selectWithLockSQL = string; 458 } 459 460 protected ClassLoadHelper getClassLoadHelper() { 461 return classLoadHelper; 462 } 463 464 471 public boolean getMakeThreadsDaemons() { 472 return makeThreadsDaemons; 473 } 474 475 482 public void setMakeThreadsDaemons(boolean makeThreadsDaemons) { 483 this.makeThreadsDaemons = makeThreadsDaemons; 484 } 485 486 492 public boolean getDoubleCheckLockMisfireHandler() { 493 return doubleCheckLockMisfireHandler; 494 } 495 496 502 public void setDoubleCheckLockMisfireHandler( 503 boolean doubleCheckLockMisfireHandler) { 504 this.doubleCheckLockMisfireHandler = doubleCheckLockMisfireHandler; 505 } 506 507 511 protected Log getLog() { 512 return log; 513 } 514 515 521 public void initialize(ClassLoadHelper loadHelper, 522 SchedulerSignaler signaler) throws SchedulerConfigException { 523 524 if (dsName == null) { 525 throw new SchedulerConfigException("DataSource name not set."); 526 } 527 528 classLoadHelper = loadHelper; 529 this.signaler = signaler; 530 531 if (getLockHandler() == null) { 534 535 if (isClustered()) { 538 setUseDBLocks(true); 539 } 540 541 if (getUseDBLocks()) { 542 getLog().info( 543 "Using db table-based data access locking (synchronization)."); 544 setLockHandler( 545 new StdRowLockSemaphore(getTablePrefix(), getSelectWithLockSQL())); 546 } else { 547 getLog().info( 548 "Using thread monitor-based data access locking (synchronization)."); 549 setLockHandler(new SimpleSemaphore()); 550 } 551 } 552 553 if (!isClustered()) { 554 try { 555 cleanVolatileTriggerAndJobs(); 556 } catch (SchedulerException se) { 557 throw new SchedulerConfigException( 558 "Failure occured during job recovery.", se); 559 } 560 } 561 } 562 563 566 public void schedulerStarted() throws SchedulerException { 567 568 if (isClustered()) { 569 clusterManagementThread = new ClusterManager(); 570 clusterManagementThread.initialize(); 571 } else { 572 try { 573 recoverJobs(); 574 } catch (SchedulerException se) { 575 throw new SchedulerConfigException( 576 "Failure occured during job recovery.", se); 577 } 578 } 579 580 misfireHandler = new MisfireHandler(); 581 misfireHandler.initialize(); 582 } 583 584 591 public void shutdown() { 592 if (clusterManagementThread != null) { 593 clusterManagementThread.shutdown(); 594 } 595 596 if (misfireHandler != null) { 597 misfireHandler.shutdown(); 598 } 599 600 try { 601 DBConnectionManager.getInstance().shutdown(getDataSource()); 602 } catch (SQLException sqle) { 603 getLog().warn("Database connection shutdown unsuccessful.", sqle); 604 } 605 } 606 607 public boolean supportsPersistence() { 608 return true; 609 } 610 611 615 protected abstract Connection getNonManagedTXConnection() 616 throws JobPersistenceException; 617 618 623 protected Connection getAttributeRestoringConnection(Connection conn) { 624 return (Connection )Proxy.newProxyInstance( 625 Thread.currentThread().getContextClassLoader(), 626 new Class [] { Connection .class }, 627 new AttributeRestoringConnectionInvocationHandler(conn)); 628 } 629 630 protected Connection getConnection() throws JobPersistenceException { 631 Connection conn = null; 632 try { 633 conn = DBConnectionManager.getInstance().getConnection( 634 getDataSource()); 635 } catch (SQLException sqle) { 636 throw new JobPersistenceException( 637 "Failed to obtain DB connection from data source '" 638 + getDataSource() + "': " + sqle.toString(), sqle); 639 } catch (Throwable e) { 640 throw new JobPersistenceException( 641 "Failed to obtain DB connection from data source '" 642 + getDataSource() + "': " + e.toString(), e, 643 JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE); 644 } 645 646 if (conn == null) { 647 throw new JobPersistenceException( 648 "Could not get connection from DataSource '" 649 + getDataSource() + "'"); 650 } 651 652 conn = getAttributeRestoringConnection(conn); 654 655 try { 657 if (!isDontSetAutoCommitFalse()) { 658 conn.setAutoCommit(false); 659 } 660 661 if(isTxIsolationLevelSerializable()) { 662 conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); 663 } 664 } catch (SQLException sqle) { 665 getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle); 666 } catch (Throwable e) { 667 try { conn.close(); } catch(Throwable tt) {} 668 669 throw new JobPersistenceException( 670 "Failure setting up connection.", e); 671 } 672 673 return conn; 674 } 675 676 protected void releaseLock(Connection conn, String lockName, boolean doIt) { 677 if (doIt && conn != null) { 678 try { 679 getLockHandler().releaseLock(conn, lockName); 680 } catch (LockException le) { 681 getLog().error("Error returning lock: " + le.getMessage(), le); 682 } 683 } 684 } 685 686 691 protected void cleanVolatileTriggerAndJobs() 692 throws JobPersistenceException { 693 executeInNonManagedTXLock( 694 LOCK_TRIGGER_ACCESS, 695 new VoidTransactionCallback() { 696 public void execute(Connection conn) throws JobPersistenceException { 697 cleanVolatileTriggerAndJobs(conn); 698 } 699 }); 700 } 701 702 710 protected void cleanVolatileTriggerAndJobs(Connection conn) 711 throws JobPersistenceException { 712 try { 713 Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn); 715 Key[] volatileJobs = getDelegate().selectVolatileJobs(conn); 716 717 for (int i = 0; i < volatileTriggers.length; i++) { 718 removeTrigger(conn, null, volatileTriggers[i].getName(), 719 volatileTriggers[i].getGroup()); 720 } 721 getLog().info( 722 "Removed " + volatileTriggers.length 723 + " Volatile Trigger(s)."); 724 725 for (int i = 0; i < volatileJobs.length; i++) { 726 removeJob(conn, null, volatileJobs[i].getName(), 727 volatileJobs[i].getGroup(), true); 728 } 729 getLog().info( 730 "Removed " + volatileJobs.length + " Volatile Job(s)."); 731 732 getDelegate().deleteVolatileFiredTriggers(conn); 734 735 } catch (Exception e) { 736 throw new JobPersistenceException("Couldn't clean volatile data: " 737 + e.getMessage(), e); 738 } 739 } 740 741 747 protected void recoverJobs() throws JobPersistenceException { 748 executeInNonManagedTXLock( 749 LOCK_TRIGGER_ACCESS, 750 new VoidTransactionCallback() { 751 public void execute(Connection conn) throws JobPersistenceException { 752 recoverJobs(conn); 753 } 754 }); 755 } 756 757 766 protected void recoverJobs(Connection conn) throws JobPersistenceException { 767 try { 768 int rows = getDelegate().updateTriggerStatesFromOtherStates(conn, 770 STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED); 771 772 rows += getDelegate().updateTriggerStatesFromOtherStates(conn, 773 STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED); 774 775 getLog().info( 776 "Freed " + rows 777 + " triggers from 'acquired' / 'blocked' state."); 778 779 recoverMisfiredJobs(conn, true); 781 782 Trigger[] recoveringJobTriggers = getDelegate() 784 .selectTriggersForRecoveringJobs(conn); 785 getLog() 786 .info( 787 "Recovering " 788 + recoveringJobTriggers.length 789 + " jobs that were in-progress at the time of the last shut-down."); 790 791 for (int i = 0; i < recoveringJobTriggers.length; ++i) { 792 if (jobExists(conn, recoveringJobTriggers[i].getJobName(), 793 recoveringJobTriggers[i].getJobGroup())) { 794 recoveringJobTriggers[i].computeFirstFireTime(null); 795 storeTrigger(conn, null, recoveringJobTriggers[i], null, false, 796 STATE_WAITING, false, true); 797 } 798 } 799 getLog().info("Recovery complete."); 800 801 Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE); 803 for(int i=0; ct != null && i < ct.length; i++) { 804 removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup()); 805 } 806 getLog().info( 807 "Removed " + ct.length 808 + " 'complete' triggers."); 809 810 int n = getDelegate().deleteFiredTriggers(conn); 812 getLog().info("Removed " + n + " stale fired job entries."); 813 } catch (JobPersistenceException e) { 814 throw e; 815 } catch (Exception e) { 816 throw new JobPersistenceException("Couldn't recover jobs: " 817 + e.getMessage(), e); 818 } 819 } 820 821 protected long getMisfireTime() { 822 long misfireTime = System.currentTimeMillis(); 823 if (getMisfireThreshold() > 0) { 824 misfireTime -= getMisfireThreshold(); 825 } 826 827 return (misfireTime > 0) ? misfireTime : 0; 828 } 829 830 834 protected static class RecoverMisfiredJobsResult { 835 public static final RecoverMisfiredJobsResult NO_OP = 836 new RecoverMisfiredJobsResult(false, 0); 837 838 private boolean _hasMoreMisfiredTriggers; 839 private int _processedMisfiredTriggerCount; 840 841 public RecoverMisfiredJobsResult( 842 boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount) { 843 _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers; 844 _processedMisfiredTriggerCount = processedMisfiredTriggerCount; 845 } 846 847 public boolean hasMoreMisfiredTriggers() { 848 return _hasMoreMisfiredTriggers; 849 } 850 public int getProcessedMisfiredTriggerCount() { 851 return _processedMisfiredTriggerCount; 852 } 853 } 854 855 protected RecoverMisfiredJobsResult recoverMisfiredJobs( 856 Connection conn, boolean recovering) 857 throws JobPersistenceException, SQLException { 858 859 int maxMisfiresToHandleAtATime = 862 (recovering) ? -1 : getMaxMisfiresToHandleAtATime(); 863 864 List misfiredTriggers = new ArrayList (); 865 866 boolean hasMoreMisfiredTriggers = 869 getDelegate().selectMisfiredTriggersInStates( 870 conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime(), 871 maxMisfiresToHandleAtATime, misfiredTriggers); 872 873 if (hasMoreMisfiredTriggers) { 874 getLog().info( 875 "Handling the first " + misfiredTriggers.size() + 876 " triggers that missed their scheduled fire-time. " + 877 "More misfired triggers remain to be processed."); 878 } else if (misfiredTriggers.size() > 0) { 879 getLog().info( 880 "Handling " + misfiredTriggers.size() + 881 " trigger(s) that missed their scheduled fire-time."); 882 } else { 883 getLog().debug( 884 "Found 0 triggers that missed their scheduled fire-time."); 885 return RecoverMisfiredJobsResult.NO_OP; 886 } 887 888 for (Iterator misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter.hasNext();) { 889 Key triggerKey = (Key) misfiredTriggerIter.next(); 890 891 Trigger trig = 892 retrieveTrigger(conn, triggerKey.getName(), triggerKey.getGroup()); 893 894 if (trig == null) { 895 continue; 896 } 897 898 doUpdateOfMisfiredTrigger(conn, null, trig, false, STATE_WAITING, recovering); 899 } 900 901 return new RecoverMisfiredJobsResult( 902 hasMoreMisfiredTriggers, misfiredTriggers.size()); 903 } 904 905 protected boolean updateMisfiredTrigger(Connection conn, 906 SchedulingContext ctxt, String triggerName, String groupName, 907 String newStateIfNotComplete, boolean forceState) throws JobPersistenceException { 911 try { 912 913 Trigger trig = getDelegate().selectTrigger(conn, triggerName, 914 groupName); 915 916 long misfireTime = System.currentTimeMillis(); 917 if (getMisfireThreshold() > 0) { 918 misfireTime -= getMisfireThreshold(); 919 } 920 921 if (trig.getNextFireTime().getTime() > misfireTime) { 922 return false; 923 } 924 925 doUpdateOfMisfiredTrigger(conn, ctxt, trig, forceState, newStateIfNotComplete, false); 926 927 return true; 928 929 } catch (Exception e) { 930 throw new JobPersistenceException( 931 "Couldn't update misfired trigger '" + groupName + "." 932 + triggerName + "': " + e.getMessage(), e); 933 } 934 } 935 936 private void doUpdateOfMisfiredTrigger(Connection conn, SchedulingContext ctxt, Trigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException { 937 Calendar cal = null; 938 if (trig.getCalendarName() != null) { 939 cal = retrieveCalendar(conn, ctxt, trig.getCalendarName()); 940 } 941 942 signaler.notifyTriggerListenersMisfired(trig); 943 944 trig.updateAfterMisfire(cal); 945 946 if (trig.getNextFireTime() == null) { 947 storeTrigger(conn, ctxt, trig, 948 null, true, STATE_COMPLETE, forceState, recovering); 949 } else { 950 storeTrigger(conn, ctxt, trig, null, true, newStateIfNotComplete, 951 forceState, false); 952 } 953 } 954 955 968 public void storeJobAndTrigger(final SchedulingContext ctxt, final JobDetail newJob, 969 final Trigger newTrigger) 970 throws ObjectAlreadyExistsException, JobPersistenceException { 971 executeInLock( 972 (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null, 973 new VoidTransactionCallback() { 974 public void execute(Connection conn) throws JobPersistenceException { 975 if (newJob.isVolatile() && !newTrigger.isVolatile()) { 976 JobPersistenceException jpe = 977 new JobPersistenceException( 978 "Cannot associate non-volatile trigger with a volatile job!"); 979 jpe.setErrorCode(SchedulerException.ERR_CLIENT_ERROR); 980 throw jpe; 981 } 982 983 storeJob(conn, ctxt, newJob, false); 984 storeTrigger(conn, ctxt, newTrigger, newJob, false, 985 Constants.STATE_WAITING, false, false); 986 } 987 }); 988 } 989 990 1005 public void storeJob(final SchedulingContext ctxt, final JobDetail newJob, 1006 final boolean replaceExisting) throws ObjectAlreadyExistsException, JobPersistenceException { 1007 executeInLock( 1008 (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null, 1009 new VoidTransactionCallback() { 1010 public void execute(Connection conn) throws JobPersistenceException { 1011 storeJob(conn, ctxt, newJob, replaceExisting); 1012 } 1013 }); 1014 } 1015 1016 1021 protected void storeJob(Connection conn, SchedulingContext ctxt, 1022 JobDetail newJob, boolean replaceExisting) 1023 throws ObjectAlreadyExistsException, JobPersistenceException { 1024 if (newJob.isVolatile() && isClustered()) { 1025 getLog().info( 1026 "note: volatile jobs are effectively non-volatile in a clustered environment."); 1027 } 1028 1029 boolean existingJob = jobExists(conn, newJob.getName(), newJob 1030 .getGroup()); 1031 try { 1032 if (existingJob) { 1033 if (!replaceExisting) { 1034 throw new ObjectAlreadyExistsException(newJob); 1035 } 1036 getDelegate().updateJobDetail(conn, newJob); 1037 } else { 1038 getDelegate().insertJobDetail(conn, newJob); 1039 } 1040 } catch (IOException e) { 1041 throw new JobPersistenceException("Couldn't store job: " 1042 + e.getMessage(), e); 1043 } catch (SQLException e) { 1044 throw new JobPersistenceException("Couldn't store job: " 1045 + e.getMessage(), e); 1046 } 1047 } 1048 1049 1054 protected boolean jobExists(Connection conn, String jobName, 1055 String groupName) throws JobPersistenceException { 1056 try { 1057 return getDelegate().jobExists(conn, jobName, groupName); 1058 } catch (SQLException e) { 1059 throw new JobPersistenceException( 1060 "Couldn't determine job existence (" + groupName + "." 1061 + jobName + "): " + e.getMessage(), e); 1062 } 1063 } 1064 1065 1066 1081 public void storeTrigger(final SchedulingContext ctxt, final Trigger newTrigger, 1082 final boolean replaceExisting) throws ObjectAlreadyExistsException, 1083 JobPersistenceException { 1084 executeInLock( 1085 (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null, 1086 new VoidTransactionCallback() { 1087 public void execute(Connection conn) throws JobPersistenceException { 1088 storeTrigger(conn, ctxt, newTrigger, null, replaceExisting, 1089 STATE_WAITING, false, false); 1090 } 1091 }); 1092 } 1093 1094 1099 protected void storeTrigger(Connection conn, SchedulingContext ctxt, 1100 Trigger newTrigger, JobDetail job, boolean replaceExisting, String state, 1101 boolean forceState, boolean recovering) 1102 throws ObjectAlreadyExistsException, JobPersistenceException { 1103 if (newTrigger.isVolatile() && isClustered()) { 1104 getLog().info( 1105 "note: volatile triggers are effectively non-volatile in a clustered environment."); 1106 } 1107 1108 boolean existingTrigger = triggerExists(conn, newTrigger.getName(), 1109 newTrigger.getGroup()); 1110 1111 if ((existingTrigger) && (!replaceExisting)) { 1112 throw new ObjectAlreadyExistsException(newTrigger); 1113 } 1114 1115 try { 1116 1117 boolean shouldBepaused = false; 1118 1119 if (!forceState) { 1120 shouldBepaused = getDelegate().isTriggerGroupPaused( 1121 conn, newTrigger.getGroup()); 1122 1123 if(!shouldBepaused) { 1124 shouldBepaused = getDelegate().isTriggerGroupPaused(conn, 1125 ALL_GROUPS_PAUSED); 1126 1127 if (shouldBepaused) { 1128 getDelegate().insertPausedTriggerGroup(conn, newTrigger.getGroup()); 1129 } 1130 } 1131 1132 if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) { 1133 state = STATE_PAUSED; 1134 } 1135 } 1136 1137 if(job == null) { 1138 job = getDelegate().selectJobDetail(conn, 1139 newTrigger.getJobName(), newTrigger.getJobGroup(), 1140 getClassLoadHelper()); 1141 } 1142 if (job == null) { 1143 throw new JobPersistenceException("The job (" 1144 + newTrigger.getFullJobName() 1145 + ") referenced by the trigger does not exist."); 1146 } 1147 if (job.isVolatile() && !newTrigger.isVolatile()) { 1148 throw new JobPersistenceException( 1149 "It does not make sense to " 1150 + "associate a non-volatile Trigger with a volatile Job!"); 1151 } 1152 1153 if (job.isStateful() && !recovering) { 1154 state = checkBlockedState(conn, ctxt, job.getName(), 1155 job.getGroup(), state); 1156 } 1157 1158 if (existingTrigger) { 1159 if (newTrigger.getClass() == SimpleTrigger.class) { 1160 getDelegate().updateSimpleTrigger(conn, 1161 (SimpleTrigger) newTrigger); 1162 } else if (newTrigger.getClass() == CronTrigger.class) { 1163 getDelegate().updateCronTrigger(conn, 1164 (CronTrigger) newTrigger); 1165 } else { 1166 getDelegate().updateBlobTrigger(conn, newTrigger); 1167 } 1168 getDelegate().updateTrigger(conn, newTrigger, state, job); 1169 } else { 1170 getDelegate().insertTrigger(conn, newTrigger, state, job); 1171 if (newTrigger.getClass() == SimpleTrigger.class) { 1172 getDelegate().insertSimpleTrigger(conn, 1173 (SimpleTrigger) newTrigger); 1174 } else if (newTrigger.getClass() == CronTrigger.class) { 1175 getDelegate().insertCronTrigger(conn, 1176 (CronTrigger) newTrigger); 1177 } else { 1178 getDelegate().insertBlobTrigger(conn, newTrigger); 1179 } 1180 } 1181 } catch (Exception e) { 1182 throw new JobPersistenceException("Couldn't store trigger: " 1183 + e.getMessage(), e); 1184 } 1185 } 1186 1187 1192 protected boolean triggerExists(Connection conn, String triggerName, 1193 String groupName) throws JobPersistenceException { 1194 try { 1195 return getDelegate().triggerExists(conn, triggerName, groupName); 1196 } catch (SQLException e) { 1197 throw new JobPersistenceException( 1198 "Couldn't determine trigger existence (" + groupName + "." 1199 + triggerName + "): " + e.getMessage(), e); 1200 } 1201 } 1202 1203 1223 public boolean removeJob(final SchedulingContext ctxt, final String jobName, 1224 final String groupName) throws JobPersistenceException { 1225 return ((Boolean )executeInLock( 1226 LOCK_TRIGGER_ACCESS, 1227 new TransactionCallback() { 1228 public Object execute(Connection conn) throws JobPersistenceException { 1229 return removeJob(conn, ctxt, jobName, groupName, true) ? 1230 Boolean.TRUE : Boolean.FALSE; 1231 } 1232 })).booleanValue(); 1233 } 1234 1235 protected boolean removeJob(Connection conn, SchedulingContext ctxt, 1236 String jobName, String groupName, boolean activeDeleteSafe) 1237 throws JobPersistenceException { 1238 1239 try { 1240 Key[] jobTriggers = getDelegate().selectTriggerNamesForJob(conn, 1241 jobName, groupName); 1242 for (int i = 0; i < jobTriggers.length; ++i) { 1243 deleteTriggerAndChildren( 1244 conn, jobTriggers[i].getName(), jobTriggers[i].getGroup()); 1245 } 1246 1247 return deleteJobAndChildren(conn, ctxt, jobName, groupName); 1248 } catch (SQLException e) { 1249 throw new JobPersistenceException("Couldn't remove job: " 1250 + e.getMessage(), e); 1251 } 1252 } 1253 1254 1260 private boolean deleteJobAndChildren(Connection conn, 1261 SchedulingContext ctxt, String jobName, String groupName) 1262 throws NoSuchDelegateException, SQLException { 1263 getDelegate().deleteJobListeners(conn, jobName, groupName); 1264 1265 return (getDelegate().deleteJobDetail(conn, jobName, groupName) > 0); 1266 } 1267 1268 1275 private boolean deleteTriggerAndChildren( 1276 Connection conn, String triggerName, String triggerGroupName) 1277 throws SQLException , NoSuchDelegateException { 1278 DriverDelegate delegate = getDelegate(); 1279 1280 if ((delegate.deleteSimpleTrigger(conn, triggerName, triggerGroupName) == 0) && 1282 (delegate.deleteCronTrigger(conn, triggerName, triggerGroupName) == 0)) { 1283 delegate.deleteBlobTrigger(conn, triggerName, triggerGroupName); 1284 } 1285 1286 delegate.deleteTriggerListeners(conn, triggerName, triggerGroupName); 1287 1288 return (delegate.deleteTrigger(conn, triggerName, triggerGroupName) > 0); 1289 } 1290 1291 1303 public JobDetail retrieveJob(final SchedulingContext ctxt, final String jobName, 1304 final String groupName) throws JobPersistenceException { 1305 return (JobDetail)executeWithoutLock( new TransactionCallback() { 1307 public Object execute(Connection conn) throws JobPersistenceException { 1308 return retrieveJob(conn, ctxt, jobName, groupName); 1309 } 1310 }); 1311 } 1312 1313 protected JobDetail retrieveJob(Connection conn, SchedulingContext ctxt, 1314 String jobName, String groupName) throws JobPersistenceException { 1315 try { 1316 JobDetail job = getDelegate().selectJobDetail(conn, jobName, 1317 groupName, getClassLoadHelper()); 1318 if (job != null) { 1319 String [] listeners = getDelegate().selectJobListeners(conn, 1320 jobName, groupName); 1321 for (int i = 0; i < listeners.length; ++i) { 1322 job.addJobListener(listeners[i]); 1323 } 1324 } 1325 1326 return job; 1327 } catch (ClassNotFoundException e) { 1328 throw new JobPersistenceException( 1329 "Couldn't retrieve job because a required class was not found: " 1330 + e.getMessage(), e, 1331 SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST); 1332 } catch (IOException e) { 1333 throw new JobPersistenceException( 1334 "Couldn't retrieve job because the BLOB couldn't be deserialized: " 1335 + e.getMessage(), e, 1336 SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST); 1337 } catch (SQLException e) { 1338 throw new JobPersistenceException("Couldn't retrieve job: " 1339 + e.getMessage(), e); 1340 } 1341 } 1342 1343 1368 public boolean removeTrigger(final SchedulingContext ctxt, final String triggerName, 1369 final String groupName) throws JobPersistenceException { 1370 return ((Boolean )executeInLock( 1371 LOCK_TRIGGER_ACCESS, 1372 new TransactionCallback() { 1373 public Object execute(Connection conn) throws JobPersistenceException { 1374 return removeTrigger(conn, ctxt, triggerName, groupName) ? 1375 Boolean.TRUE : Boolean.FALSE; 1376 } 1377 })).booleanValue(); 1378 } 1379 1380 protected boolean removeTrigger(Connection conn, SchedulingContext ctxt, 1381 String triggerName, String groupName) 1382 throws JobPersistenceException { 1383 boolean removedTrigger = false; 1384 try { 1385 JobDetail job = getDelegate().selectJobForTrigger(conn, 1387 triggerName, groupName, getClassLoadHelper()); 1388 1389 removedTrigger = 1390 deleteTriggerAndChildren(conn, triggerName, groupName); 1391 1392 if (null != job && !job.isDurable()) { 1393 int numTriggers = getDelegate().selectNumTriggersForJob(conn, 1394 job.getName(), job.getGroup()); 1395 if (numTriggers == 0) { 1396 deleteJobAndChildren(conn, ctxt, job.getName(), job.getGroup()); 1399 } 1400 } 1401 } catch (ClassNotFoundException e) { 1402 throw new JobPersistenceException("Couldn't remove trigger: " 1403 + e.getMessage(), e); 1404 } catch (SQLException e) { 1405 throw new JobPersistenceException("Couldn't remove trigger: " 1406 + e.getMessage(), e); 1407 } 1408 1409 return removedTrigger; 1410 } 1411 1412 1415 public boolean replaceTrigger(final SchedulingContext ctxt, final String triggerName, 1416 final String groupName, final Trigger newTrigger) throws JobPersistenceException { 1417 return ((Boolean )executeInLock( 1418 LOCK_TRIGGER_ACCESS, 1419 new TransactionCallback() { 1420 public Object execute(Connection conn) throws JobPersistenceException { 1421 return replaceTrigger(conn, ctxt, triggerName, groupName, newTrigger) ? 1422 Boolean.TRUE : Boolean.FALSE; 1423 } 1424 })).booleanValue(); 1425 } 1426 1427 protected boolean replaceTrigger(Connection conn, SchedulingContext ctxt, 1428 String triggerName, String groupName, Trigger newTrigger) 1429 throws JobPersistenceException { 1430 try { 1431 JobDetail job = getDelegate().selectJobForTrigger(conn, 1433 triggerName, groupName, getClassLoadHelper()); 1434 1435 if (job == null) { 1436 return false; 1437 } 1438 1439 if (!newTrigger.getJobName().equals(job.getName()) || 1440 !newTrigger.getJobGroup().equals(job.getGroup())) { 1441 throw new JobPersistenceException("New trigger is not related to the same job as the old trigger."); 1442 } 1443 1444 boolean removedTrigger = 1445 deleteTriggerAndChildren(conn, triggerName, groupName); 1446 1447 storeTrigger(conn, ctxt, newTrigger, job, false, STATE_WAITING, false, false); 1448 1449 return removedTrigger; 1450 } catch (ClassNotFoundException e) { 1451 throw new JobPersistenceException("Couldn't remove trigger: " 1452 + e.getMessage(), e); 1453 } catch (SQLException e) { 1454 throw new JobPersistenceException("Couldn't remove trigger: " 1455 + e.getMessage(), e); 1456 } 1457 } 1458 1459 1471 public Trigger retrieveTrigger(final SchedulingContext ctxt, final String triggerName, 1472 final String groupName) throws JobPersistenceException { 1473 return (Trigger)executeWithoutLock( new TransactionCallback() { 1475 public Object execute(Connection conn) throws JobPersistenceException { 1476 return retrieveTrigger(conn, ctxt, triggerName, groupName); 1477 } 1478 }); 1479 } 1480 1481 protected Trigger retrieveTrigger(Connection conn, SchedulingContext ctxt, 1482 String triggerName, String groupName) 1483 throws JobPersistenceException { 1484 return retrieveTrigger(conn, triggerName, groupName); 1485 } 1486 1487 protected Trigger retrieveTrigger(Connection conn, String triggerName, String groupName) 1488 throws JobPersistenceException { 1489 try { 1490 Trigger trigger = getDelegate().selectTrigger(conn, triggerName, 1491 groupName); 1492 if (trigger == null) { 1493 return null; 1494 } 1495 1496 trigger.clearAllTriggerListeners(); 1499 1500 String [] listeners = getDelegate().selectTriggerListeners(conn, 1501 triggerName, groupName); 1502 for (int i = 0; i < listeners.length; ++i) { 1503 trigger.addTriggerListener(listeners[i]); 1504 } 1505 1506 return trigger; 1507 } catch (Exception e) { 1508 throw new JobPersistenceException("Couldn't retrieve trigger: " 1509 + e.getMessage(), e); 1510 } 1511 } 1512 1513 1524 public int getTriggerState(final SchedulingContext ctxt, final String triggerName, 1525 final String groupName) throws JobPersistenceException { 1526 return ((Integer )executeWithoutLock( new TransactionCallback() { 1528 public Object execute(Connection conn) throws JobPersistenceException { 1529 return new Integer (getTriggerState(conn, ctxt, triggerName, groupName)); 1530 } 1531 })).intValue(); 1532 } 1533 1534 public int getTriggerState(Connection conn, SchedulingContext ctxt, 1535 String triggerName, String groupName) 1536 throws JobPersistenceException { 1537 try { 1538 String ts = getDelegate().selectTriggerState(conn, triggerName, 1539 groupName); 1540 1541 if (ts == null) { 1542 return Trigger.STATE_NONE; 1543 } 1544 1545 if (ts.equals(STATE_DELETED)) { 1546 return Trigger.STATE_NONE; 1547 } 1548 1549 if (ts.equals(STATE_COMPLETE)) { 1550 return Trigger.STATE_COMPLETE; 1551 } 1552 1553 if (ts.equals(STATE_PAUSED)) { 1554 return Trigger.STATE_PAUSED; 1555 } 1556 1557 if (ts.equals(STATE_PAUSED_BLOCKED)) { 1558 return Trigger.STATE_PAUSED; 1559 } 1560 1561 if (ts.equals(STATE_ERROR)) { 1562 return Trigger.STATE_ERROR; 1563 } 1564 1565 if (ts.equals(STATE_BLOCKED)) { 1566 return Trigger.STATE_BLOCKED; 1567 } 1568 1569 return Trigger.STATE_NORMAL; 1570 1571 } catch (SQLException e) { 1572 throw new JobPersistenceException( 1573 "Couldn't determine state of trigger (" + groupName + "." 1574 + triggerName + "): " + e.getMessage(), e); 1575 } 1576 } 1577 1578 1595 public void storeCalendar(final SchedulingContext ctxt, final String calName, 1596 final Calendar calendar, final boolean replaceExisting, final boolean updateTriggers) 1597 throws ObjectAlreadyExistsException, JobPersistenceException { 1598 executeInLock( 1599 (isLockOnInsert() || updateTriggers) ? LOCK_TRIGGER_ACCESS : null, 1600 new VoidTransactionCallback() { 1601 public void execute(Connection conn) throws JobPersistenceException { 1602 storeCalendar(conn, ctxt, calName, calendar, replaceExisting, updateTriggers); 1603 } 1604 }); 1605 } 1606 1607 protected void storeCalendar(Connection conn, SchedulingContext ctxt, 1608 String calName, Calendar calendar, boolean replaceExisting, boolean updateTriggers) 1609 throws ObjectAlreadyExistsException, JobPersistenceException { 1610 try { 1611 boolean existingCal = calendarExists(conn, calName); 1612 if (existingCal && !replaceExisting) { 1613 throw new ObjectAlreadyExistsException( 1614 "Calendar with name '" + calName + "' already exists."); 1615 } 1616 1617 if (existingCal) { 1618 if (getDelegate().updateCalendar(conn, calName, calendar) < 1) { 1619 throw new JobPersistenceException( 1620 "Couldn't store calendar. Update failed."); 1621 } 1622 1623 if(updateTriggers) { 1624 Trigger[] trigs = getDelegate().selectTriggersForCalendar(conn, calName); 1625 1626 for(int i=0; i < trigs.length; i++) { 1627 trigs[i].updateWithNewCalendar(calendar, getMisfireThreshold()); 1628 storeTrigger(conn, ctxt, trigs[i], null, true, STATE_WAITING, false, false); 1629 } 1630 } 1631 } else { 1632 if (getDelegate().insertCalendar(conn, calName, calendar) < 1) { 1633 throw new JobPersistenceException( 1634 "Couldn't store calendar. Insert failed."); 1635 } 1636 } 1637 1638 if (isClustered == false) { 1639 calendarCache.put(calName, calendar); } 1641 1642 } catch (IOException e) { 1643 throw new JobPersistenceException( 1644 "Couldn't store calendar because the BLOB couldn't be serialized: " 1645 + e.getMessage(), e); 1646 } catch (ClassNotFoundException e) { 1647 throw new JobPersistenceException("Couldn't store calendar: " 1648 + e.getMessage(), e); 1649 }catch (SQLException e) { 1650 throw new JobPersistenceException("Couldn't store calendar: " 1651 + e.getMessage(), e); 1652 } 1653 } 1654 1655 protected boolean calendarExists(Connection conn, String calName) 1656 throws JobPersistenceException { 1657 try { 1658 return getDelegate().calendarExists(conn, calName); 1659 } catch (SQLException e) { 1660 throw new JobPersistenceException( 1661 "Couldn't determine calendar existence (" + calName + "): " 1662 + e.getMessage(), e); 1663 } 1664 } 1665 1666 1681 public boolean removeCalendar(final SchedulingContext ctxt, final String calName) 1682 throws JobPersistenceException { 1683 return ((Boolean )executeInLock( 1684 LOCK_TRIGGER_ACCESS, 1685 new TransactionCallback() { 1686 public Object execute(Connection conn) throws JobPersistenceException { 1687 return removeCalendar(conn, ctxt, calName) ? 1688 Boolean.TRUE : Boolean.FALSE; 1689 } 1690 })).booleanValue(); 1691 } 1692 1693 protected boolean removeCalendar(Connection conn, SchedulingContext ctxt, 1694 String calName) throws JobPersistenceException { 1695 try { 1696 if (getDelegate().calendarIsReferenced(conn, calName)) { 1697 throw new JobPersistenceException( 1698 "Calender cannot be removed if it referenced by a trigger!"); 1699 } 1700 1701 if (isClustered == false) { 1702 calendarCache.remove(calName); 1703 } 1704 1705 return (getDelegate().deleteCalendar(conn, calName) > 0); 1706 } catch (SQLException e) { 1707 throw new JobPersistenceException("Couldn't remove calendar: " 1708 + e.getMessage(), e); 1709 } 1710 } 1711 1712 1722 public Calendar retrieveCalendar(final SchedulingContext ctxt, final String calName) 1723 throws JobPersistenceException { 1724 return (Calendar)executeWithoutLock( new TransactionCallback() { 1726 public Object execute(Connection conn) throws JobPersistenceException { 1727 return retrieveCalendar(conn, ctxt, calName); 1728 } 1729 }); 1730 } 1731 1732 protected Calendar retrieveCalendar(Connection conn, 1733 SchedulingContext ctxt, String calName) 1734 throws JobPersistenceException { 1735 Calendar cal = (isClustered) ? null : (Calendar) calendarCache.get(calName); 1738 if (cal != null) { 1739 return cal; 1740 } 1741 1742 try { 1743 cal = getDelegate().selectCalendar(conn, calName); 1744 if (isClustered == false) { 1745 calendarCache.put(calName, cal); } 1747 return cal; 1748 } catch (ClassNotFoundException e) { 1749 throw new JobPersistenceException( 1750 "Couldn't retrieve calendar because a required class was not found: " 1751 + e.getMessage(), e); 1752 } catch (IOException e) { 1753 throw new JobPersistenceException( 1754 "Couldn't retrieve calendar because the BLOB couldn't be deserialized: " 1755 + e.getMessage(), e); 1756 } catch (SQLException e) { 1757 throw new JobPersistenceException("Couldn't retrieve calendar: " 1758 + e.getMessage(), e); 1759 } 1760 } 1761 1762 1768 public int getNumberOfJobs(final SchedulingContext ctxt) 1769 throws JobPersistenceException { 1770 return ((Integer )executeWithoutLock( new TransactionCallback() { 1772 public Object execute(Connection conn) throws JobPersistenceException { 1773 return new Integer (getNumberOfJobs(conn, ctxt)); 1774 } 1775 })).intValue(); 1776 } 1777 1778 protected int getNumberOfJobs(Connection conn, SchedulingContext ctxt) 1779 throws JobPersistenceException { 1780 try { 1781 return getDelegate().selectNumJobs(conn); 1782 } catch (SQLException e) { 1783 throw new JobPersistenceException( 1784 "Couldn't obtain number of jobs: " + e.getMessage(), e); 1785 } 1786 } 1787 1788 1794 public int getNumberOfTriggers(final SchedulingContext ctxt) 1795 throws JobPersistenceException { 1796 return ((Integer )executeWithoutLock( new TransactionCallback() { 1798 public Object execute(Connection conn) throws JobPersistenceException { 1799 return new Integer (getNumberOfTriggers(conn, ctxt)); 1800 } 1801 })).intValue(); 1802 } 1803 1804 protected int getNumberOfTriggers(Connection conn, SchedulingContext ctxt) 1805 throws JobPersistenceException { 1806 try { 1807 return getDelegate().selectNumTriggers(conn); 1808 } catch (SQLException e) { 1809 throw new JobPersistenceException( 1810 "Couldn't obtain number of triggers: " + e.getMessage(), e); 1811 } 1812 } 1813 1814 1820 public int getNumberOfCalendars(final SchedulingContext ctxt) 1821 throws JobPersistenceException { 1822 return ((Integer )executeWithoutLock( new TransactionCallback() { 1824 public Object execute(Connection conn) throws JobPersistenceException { 1825 return new Integer (getNumberOfCalendars(conn, ctxt)); 1826 } 1827 })).intValue(); 1828 } 1829 1830 protected int getNumberOfCalendars(Connection conn, SchedulingContext ctxt) 1831 throws JobPersistenceException { 1832 try { 1833 return getDelegate().selectNumCalendars(conn); 1834 } catch (SQLException e) { 1835 throw new JobPersistenceException( 1836 "Couldn't obtain number of calendars: " + e.getMessage(), e); 1837 } 1838 } 1839 1840 1851 public String [] getJobNames(final SchedulingContext ctxt, final String groupName) 1852 throws JobPersistenceException { 1853 return (String [])executeWithoutLock( new TransactionCallback() { 1855 public Object execute(Connection conn) throws JobPersistenceException { 1856 return getJobNames(conn, ctxt, groupName); 1857 } 1858 }); 1859 } 1860 1861 protected String [] getJobNames(Connection conn, SchedulingContext ctxt, 1862 String groupName) throws JobPersistenceException { 1863 String [] jobNames = null; 1864 1865 try { 1866 jobNames = getDelegate().selectJobsInGroup(conn, groupName); 1867 } catch (SQLException e) { 1868 throw new JobPersistenceException("Couldn't obtain job names: " 1869 + e.getMessage(), e); 1870 } 1871 1872 return jobNames; 1873 } 1874 1875 1886 public String [] getTriggerNames(final SchedulingContext ctxt, final String groupName) 1887 throws JobPersistenceException { 1888 return (String [])executeWithoutLock( new TransactionCallback() { 1890 public Object execute(Connection conn) throws JobPersistenceException { 1891 return getTriggerNames(conn, ctxt, groupName); 1892 } 1893 }); 1894 } 1895 1896 protected String [] getTriggerNames(Connection conn, SchedulingContext ctxt, 1897 String groupName) throws JobPersistenceException { 1898 1899 String [] trigNames = null; 1900 1901 try { 1902 trigNames = getDelegate().selectTriggersInGroup(conn, groupName); 1903 } catch (SQLException e) { 1904 throw new JobPersistenceException("Couldn't obtain trigger names: " 1905 + e.getMessage(), e); 1906 } 1907 1908 return trigNames; 1909 } 1910 1911 1912 1923 public String [] getJobGroupNames(final SchedulingContext ctxt) 1924 throws JobPersistenceException { 1925 return (String [])executeWithoutLock( new TransactionCallback() { 1927 public Object execute(Connection conn) throws JobPersistenceException { 1928 return getJobGroupNames(conn, ctxt); 1929 } 1930 }); 1931 } 1932 1933 protected String [] getJobGroupNames(Connection conn, SchedulingContext ctxt) 1934 throws JobPersistenceException { 1935 1936 String [] groupNames = null; 1937 1938 try { 1939 groupNames = getDelegate().selectJobGroups(conn); 1940 } catch (SQLException e) { 1941 throw new JobPersistenceException("Couldn't obtain job groups: " 1942 + e.getMessage(), e); 1943 } 1944 1945 return groupNames; 1946 } 1947 1948 1959 public String [] getTriggerGroupNames(final SchedulingContext ctxt) 1960 throws JobPersistenceException { 1961 return (String [])executeWithoutLock( new TransactionCallback() { 1963 public Object execute(Connection conn) throws JobPersistenceException { 1964 return getTriggerGroupNames(conn, ctxt); 1965 } 1966 }); 1967 } 1968 1969 protected String [] getTriggerGroupNames(Connection conn, 1970 SchedulingContext ctxt) throws JobPersistenceException { 1971 1972 String [] groupNames = null; 1973 1974 try { 1975 groupNames = getDelegate().selectTriggerGroups(conn); 1976 } catch (SQLException e) { 1977 throw new JobPersistenceException( 1978 "Couldn't obtain trigger groups: " + e.getMessage(), e); 1979 } 1980 1981 return groupNames; 1982 } 1983 1984 1995 public String [] getCalendarNames(final SchedulingContext ctxt) 1996 throws JobPersistenceException { 1997 return (String [])executeWithoutLock( new TransactionCallback() { 1999 public Object execute(Connection conn) throws JobPersistenceException { 2000 return getCalendarNames(conn, ctxt); 2001 } 2002 }); 2003 } 2004 2005 protected String [] getCalendarNames(Connection conn, SchedulingContext ctxt) 2006 throws JobPersistenceException { 2007 try { 2008 return getDelegate().selectCalendars(conn); 2009 } catch (SQLException e) { 2010 throw new JobPersistenceException( 2011 "Couldn't obtain trigger groups: " + e.getMessage(), e); 2012 } 2013 } 2014 2015 2024 public Trigger[] getTriggersForJob(final SchedulingContext ctxt, final String jobName, 2025 final String groupName) throws JobPersistenceException { 2026 return (Trigger[])executeWithoutLock( new TransactionCallback() { 2028 public Object execute(Connection conn) throws JobPersistenceException { 2029 return getTriggersForJob(conn, ctxt, jobName, groupName); 2030 } 2031 }); 2032 } 2033 2034 protected Trigger[] getTriggersForJob(Connection conn, 2035 SchedulingContext ctxt, String jobName, String groupName) 2036 throws JobPersistenceException { 2037 Trigger[] array = null; 2038 2039 try { 2040 array = getDelegate() 2041 .selectTriggersForJob(conn, jobName, groupName); 2042 } catch (Exception e) { 2043 throw new JobPersistenceException( 2044 "Couldn't obtain triggers for job: " + e.getMessage(), e); 2045 } 2046 2047 return array; 2048 } 2049 2050 2057 public void pauseTrigger(final SchedulingContext ctxt, final String triggerName, 2058 final String groupName) throws JobPersistenceException { 2059 executeInLock( 2060 LOCK_TRIGGER_ACCESS, 2061 new VoidTransactionCallback() { 2062 public void execute(Connection conn) throws JobPersistenceException { 2063 pauseTrigger(conn, ctxt, triggerName, groupName); 2064 } 2065 }); 2066 } 2067 2068 2075 public void pauseTrigger(Connection conn, SchedulingContext ctxt, 2076 String triggerName, String groupName) 2077 throws JobPersistenceException { 2078 2079 try { 2080 String oldState = getDelegate().selectTriggerState(conn, 2081 triggerName, groupName); 2082 2083 if (oldState.equals(STATE_WAITING) 2084 || oldState.equals(STATE_ACQUIRED)) { 2085 2086 getDelegate().updateTriggerState(conn, triggerName, 2087 groupName, STATE_PAUSED); 2088 } else if (oldState.equals(STATE_BLOCKED)) { 2089 getDelegate().updateTriggerState(conn, triggerName, 2090 groupName, STATE_PAUSED_BLOCKED); 2091 } 2092 } catch (SQLException e) { 2093 throw new JobPersistenceException("Couldn't pause trigger '" 2094 + groupName + "." + triggerName + "': " + e.getMessage(), e); 2095 } 2096 } 2097 2098 2106 public void pauseJob(final SchedulingContext ctxt, final String jobName, 2107 final String groupName) throws JobPersistenceException { 2108 executeInLock( 2109 LOCK_TRIGGER_ACCESS, 2110 new VoidTransactionCallback() { 2111 public void execute(Connection conn) throws JobPersistenceException { 2112 Trigger[] triggers = getTriggersForJob(conn, ctxt, jobName, groupName); 2113 for (int j = 0; j < triggers.length; j++) { 2114 pauseTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup()); 2115 } 2116 } 2117 }); 2118 } 2119 2120 2128 public void pauseJobGroup(final SchedulingContext ctxt, final String groupName) 2129 throws JobPersistenceException { 2130 executeInLock( 2131 LOCK_TRIGGER_ACCESS, 2132 new VoidTransactionCallback() { 2133 public void execute(Connection conn) throws JobPersistenceException { 2134 String [] jobNames = getJobNames(conn, ctxt, groupName); 2135 2136 for (int i = 0; i < jobNames.length; i++) { 2137 Trigger[] triggers = getTriggersForJob(conn, ctxt, jobNames[i], groupName); 2138 for (int j = 0; j < triggers.length; j++) { 2139 pauseTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup()); 2140 } 2141 } 2142 } 2143 }); 2144 } 2145 2146 2153 protected String checkBlockedState( 2154 Connection conn, SchedulingContext ctxt, String jobName, 2155 String jobGroupName, String currentState) 2156 throws JobPersistenceException { 2157 2158 if ((currentState.equals(STATE_WAITING) == false) && 2160 (currentState.equals(STATE_PAUSED) == false)) { 2161 return currentState; 2162 } 2163 2164 try { 2165 List lst = getDelegate().selectFiredTriggerRecordsByJob(conn, 2166 jobName, jobGroupName); 2167 2168 if (lst.size() > 0) { 2169 FiredTriggerRecord rec = (FiredTriggerRecord) lst.get(0); 2170 if (rec.isJobIsStateful()) { return (STATE_PAUSED.equals(currentState)) ? STATE_PAUSED_BLOCKED : STATE_BLOCKED; 2174 } 2175 } 2176 2177 return currentState; 2178 } catch (SQLException e) { 2179 throw new JobPersistenceException( 2180 "Couldn't determine if trigger should be in a blocked state '" 2181 + jobGroupName + "." 2182 + jobName + "': " 2183 + e.getMessage(), e); 2184 } 2185 2186 } 2187 2188 2215 2216 2229 public void resumeTrigger(final SchedulingContext ctxt, final String triggerName, 2230 final String groupName) throws JobPersistenceException { 2231 executeInLock( 2232 LOCK_TRIGGER_ACCESS, 2233 new VoidTransactionCallback() { 2234 public void execute(Connection conn) throws JobPersistenceException { 2235 resumeTrigger(conn, ctxt, triggerName, groupName); 2236 } 2237 }); 2238 } 2239 2240 2253 public void resumeTrigger(Connection conn, SchedulingContext ctxt, 2254 String triggerName, String groupName) 2255 throws JobPersistenceException { 2256 try { 2257 2258 TriggerStatus status = getDelegate().selectTriggerStatus(conn, 2259 triggerName, groupName); 2260 2261 if (status == null || status.getNextFireTime() == null) { 2262 return; 2263 } 2264 2265 boolean blocked = false; 2266 if(STATE_PAUSED_BLOCKED.equals(status.getStatus())) { 2267 blocked = true; 2268 } 2269 2270 String newState = checkBlockedState(conn, ctxt, status.getJobKey().getName(), 2271 status.getJobKey().getGroup(), STATE_WAITING); 2272 2273 boolean misfired = false; 2274 2275 if (status.getNextFireTime().before(new Date ())) { 2276 misfired = updateMisfiredTrigger(conn, ctxt, triggerName, groupName, 2277 newState, true); 2278 } 2279 2280 if(!misfired) { 2281 if(blocked) { 2282 getDelegate().updateTriggerStateFromOtherState(conn, 2283 triggerName, groupName, newState, STATE_PAUSED_BLOCKED); 2284 } else { 2285 getDelegate().updateTriggerStateFromOtherState(conn, 2286 triggerName, groupName, newState, STATE_PAUSED); 2287 } 2288 } 2289 2290 } catch (SQLException e) { 2291 throw new JobPersistenceException("Couldn't resume trigger '" 2292 + groupName + "." + triggerName + "': " + e.getMessage(), e); 2293 } 2294 } 2295 2296 2310 public void resumeJob(final SchedulingContext ctxt, final String jobName, 2311 final String groupName) throws JobPersistenceException { 2312 executeInLock( 2313 LOCK_TRIGGER_ACCESS, 2314 new VoidTransactionCallback() { 2315 public void execute(Connection conn) throws JobPersistenceException { 2316 Trigger[] triggers = getTriggersForJob(conn, ctxt, jobName, groupName); 2317 for (int j = 0; j < triggers.length; j++) { 2318 resumeTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup()); 2319 } 2320 } 2321 }); 2322 } 2323 2324 2338 public void resumeJobGroup(final SchedulingContext ctxt, final String groupName) 2339 throws JobPersistenceException { 2340 executeInLock( 2341 LOCK_TRIGGER_ACCESS, 2342 new VoidTransactionCallback() { 2343 public void execute(Connection conn) throws JobPersistenceException { 2344 String [] jobNames = getJobNames(conn, ctxt, groupName); 2345 2346 for (int i = 0; i < jobNames.length; i++) { 2347 Trigger[] triggers = getTriggersForJob(conn, ctxt, jobNames[i], groupName); 2348 for (int j = 0; j < triggers.length; j++) { 2349 resumeTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup()); 2350 } 2351 } 2352 } 2353 }); 2354 } 2355 2356 2364 public void pauseTriggerGroup(final SchedulingContext ctxt, final String groupName) 2365 throws JobPersistenceException { 2366 executeInLock( 2367 LOCK_TRIGGER_ACCESS, 2368 new VoidTransactionCallback() { 2369 public void execute(Connection conn) throws JobPersistenceException { 2370 pauseTriggerGroup(conn, ctxt, groupName); 2371 } 2372 }); 2373 } 2374 2375 2383 public void pauseTriggerGroup(Connection conn, SchedulingContext ctxt, 2384 String groupName) throws JobPersistenceException { 2385 2386 try { 2387 2388 getDelegate().updateTriggerGroupStateFromOtherStates( 2389 conn, groupName, STATE_PAUSED, STATE_ACQUIRED, 2390 STATE_WAITING, STATE_WAITING); 2391 2392 getDelegate().updateTriggerGroupStateFromOtherState( 2393 conn, groupName, STATE_PAUSED_BLOCKED, STATE_BLOCKED); 2394 2395 if (!getDelegate().isTriggerGroupPaused(conn, groupName)) { 2396 getDelegate().insertPausedTriggerGroup(conn, groupName); 2397 } 2398 2399 } catch (SQLException e) { 2400 throw new JobPersistenceException("Couldn't pause trigger group '" 2401 + groupName + "': " + e.getMessage(), e); 2402 } 2403 } 2404 2405 public Set getPausedTriggerGroups(final SchedulingContext ctxt) 2406 throws JobPersistenceException { 2407 return (Set )executeWithoutLock( new TransactionCallback() { 2409 public Object execute(Connection conn) throws JobPersistenceException { 2410 return getPausedTriggerGroups(conn, ctxt); 2411 } 2412 }); 2413 } 2414 2415 2423 public Set getPausedTriggerGroups(Connection conn, SchedulingContext ctxt) 2424 throws JobPersistenceException { 2425 2426 try { 2427 return getDelegate().selectPausedTriggerGroups(conn); 2428 } catch (SQLException e) { 2429 throw new JobPersistenceException( 2430 "Couldn't determine paused trigger groups: " + e.getMessage(), e); 2431 } 2432 } 2433 2434 2447 public void resumeTriggerGroup(final SchedulingContext ctxt, final String groupName) 2448 throws JobPersistenceException { 2449 executeInLock( 2450 LOCK_TRIGGER_ACCESS, 2451 new VoidTransactionCallback() { 2452 public void execute(Connection conn) throws JobPersistenceException { 2453 resumeTriggerGroup(conn, ctxt, groupName); 2454 } 2455 }); 2456 } 2457 2458 2471 public void resumeTriggerGroup(Connection conn, SchedulingContext ctxt, 2472 String groupName) throws JobPersistenceException { 2473 2474 try { 2475 2476 getDelegate().deletePausedTriggerGroup(conn, groupName); 2477 2478 String [] trigNames = getDelegate().selectTriggersInGroup(conn, 2479 groupName); 2480 2481 for (int i = 0; i < trigNames.length; i++) { 2482 resumeTrigger(conn, ctxt, trigNames[i], groupName); 2483 } 2484 2485 2517 2518 } catch (SQLException e) { 2519 throw new JobPersistenceException("Couldn't pause trigger group '" 2520 + groupName + "': " + e.getMessage(), e); 2521 } 2522 } 2523 2524 2538 public void pauseAll(final SchedulingContext ctxt) throws JobPersistenceException { 2539 executeInLock( 2540 LOCK_TRIGGER_ACCESS, 2541 new VoidTransactionCallback() { 2542 public void execute(Connection conn) throws JobPersistenceException { 2543 pauseAll(conn, ctxt); 2544 } 2545 }); 2546 } 2547 2548 2562 public void pauseAll(Connection conn, SchedulingContext ctxt) 2563 throws JobPersistenceException { 2564 2565 String [] names = getTriggerGroupNames(conn, ctxt); 2566 2567 for (int i = 0; i < names.length; i++) { 2568 pauseTriggerGroup(conn, ctxt, names[i]); 2569 } 2570 2571 try { 2572 if (!getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED)) { 2573 getDelegate().insertPausedTriggerGroup(conn, ALL_GROUPS_PAUSED); 2574 } 2575 2576 } catch (SQLException e) { 2577 throw new JobPersistenceException( 2578 "Couldn't pause all trigger groups: " + e.getMessage(), e); 2579 } 2580 2581 } 2582 2583 2596 public void resumeAll(final SchedulingContext ctxt) 2597 throws JobPersistenceException { 2598 executeInLock( 2599 LOCK_TRIGGER_ACCESS, 2600 new VoidTransactionCallback() { 2601 public void execute(Connection conn) throws JobPersistenceException { 2602 resumeAll(conn, ctxt); 2603 } 2604 }); 2605 } 2606 2607 2621 public void resumeAll(Connection conn, SchedulingContext ctxt) 2622 throws JobPersistenceException { 2623 2624 String [] names = getTriggerGroupNames(conn, ctxt); 2625 2626 for (int i = 0; i < names.length; i++) { 2627 resumeTriggerGroup(conn, ctxt, names[i]); 2628 } 2629 2630 try { 2631 getDelegate().deletePausedTriggerGroup(conn, ALL_GROUPS_PAUSED); 2632 } catch (SQLException e) { 2633 throw new JobPersistenceException( 2634 "Couldn't resume all trigger groups: " + e.getMessage(), e); 2635 } 2636 } 2637 2638 private static long ftrCtr = System.currentTimeMillis(); 2639 2640 protected synchronized String getFiredTriggerRecordId() { 2641 return getInstanceId() + ftrCtr++; 2642 } 2643 2644 2652 public Trigger acquireNextTrigger(final SchedulingContext ctxt, final long noLaterThan) 2653 throws JobPersistenceException { 2654 return (Trigger)executeInNonManagedTXLock( 2655 LOCK_TRIGGER_ACCESS, 2656 new TransactionCallback() { 2657 public Object execute(Connection conn) throws JobPersistenceException { 2658 return acquireNextTrigger(conn, ctxt, noLaterThan); 2659 } 2660 }); 2661 } 2662 2663 protected Trigger acquireNextTrigger(Connection conn, SchedulingContext ctxt, long noLaterThan) 2666 throws JobPersistenceException { 2667 do { 2668 try { 2669 Key triggerKey = getDelegate().selectTriggerToAcquire(conn, noLaterThan, getMisfireTime()); 2670 2671 if (triggerKey == null) { 2673 return null; 2674 } 2675 2676 int rowsUpdated = 2677 getDelegate().updateTriggerStateFromOtherState( 2678 conn, 2679 triggerKey.getName(), triggerKey.getGroup(), 2680 STATE_ACQUIRED, STATE_WAITING); 2681 2682 if (rowsUpdated <= 0) { 2684 continue; 2685 } 2686 2687 Trigger nextTrigger = 2688 retrieveTrigger(conn, ctxt, triggerKey.getName(), triggerKey.getGroup()); 2689 2690 if(nextTrigger == null) { 2692 continue; 2693 } 2694 2695 nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); 2696 getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); 2697 2698 return nextTrigger; 2699 } catch (Exception e) { 2700 throw new JobPersistenceException( 2701 "Couldn't acquire next trigger: " + e.getMessage(), e); 2702 } 2703 } while (true); 2704 } 2705 2706 2713 public void releaseAcquiredTrigger(final SchedulingContext ctxt, final Trigger trigger) 2714 throws JobPersistenceException { 2715 executeInNonManagedTXLock( 2716 LOCK_TRIGGER_ACCESS, 2717 new VoidTransactionCallback() { 2718 public void execute(Connection conn) throws JobPersistenceException { 2719 releaseAcquiredTrigger(conn, ctxt, trigger); 2720 } 2721 }); 2722 } 2723 2724 protected void releaseAcquiredTrigger(Connection conn, 2725 SchedulingContext ctxt, Trigger trigger) 2726 throws JobPersistenceException { 2727 try { 2728 getDelegate().updateTriggerStateFromOtherState(conn, 2729 trigger.getName(), trigger.getGroup(), STATE_WAITING, 2730 STATE_ACQUIRED); 2731 getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId()); 2732 } catch (SQLException e) { 2733 throw new JobPersistenceException( 2734 "Couldn't release acquired trigger: " + e.getMessage(), e); 2735 } 2736 } 2737 2738 2749 public TriggerFiredBundle triggerFired( 2750 final SchedulingContext ctxt, final Trigger trigger) throws JobPersistenceException { 2751 return 2752 (TriggerFiredBundle)executeInNonManagedTXLock( 2753 LOCK_TRIGGER_ACCESS, 2754 new TransactionCallback() { 2755 public Object execute(Connection conn) throws JobPersistenceException { 2756 try { 2757 return triggerFired(conn, ctxt, trigger); 2758 } catch (JobPersistenceException jpe) { 2759 if (jpe.getErrorCode() == SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST) { 2761 return null; 2762 } else { 2763 throw jpe; 2764 } 2765 } 2766 } 2767 }); 2768 } 2769 2770 protected TriggerFiredBundle triggerFired(Connection conn, 2771 SchedulingContext ctxt, Trigger trigger) 2772 throws JobPersistenceException { 2773 JobDetail job = null; 2774 Calendar cal = null; 2775 2776 try { String state = getDelegate().selectTriggerState(conn, 2779 trigger.getName(), trigger.getGroup()); 2780 if (!state.equals(STATE_ACQUIRED)) { 2781 return null; 2782 } 2783 } catch (SQLException e) { 2784 throw new JobPersistenceException("Couldn't select trigger state: " 2785 + e.getMessage(), e); 2786 } 2787 2788 try { 2789 job = retrieveJob(conn, ctxt, trigger.getJobName(), trigger 2790 .getJobGroup()); 2791 if (job == null) { return null; } 2792 } catch (JobPersistenceException jpe) { 2793 try { 2794 getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); 2795 getDelegate().updateTriggerState(conn, trigger.getName(), 2796 trigger.getGroup(), STATE_ERROR); 2797 } catch (SQLException sqle) { 2798 getLog().error("Unable to set trigger state to ERROR.", sqle); 2799 } 2800 throw jpe; 2801 } 2802 2803 if (trigger.getCalendarName() != null) { 2804 cal = retrieveCalendar(conn, ctxt, trigger.getCalendarName()); 2805 if (cal == null) { return null; } 2806 } 2807 2808 try { 2809 getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId()); 2810 getDelegate().insertFiredTrigger(conn, trigger, STATE_EXECUTING, 2811 job); 2812 } catch (SQLException e) { 2813 throw new JobPersistenceException("Couldn't insert fired trigger: " 2814 + e.getMessage(), e); 2815 } 2816 2817 Date prevFireTime = trigger.getPreviousFireTime(); 2818 2819 trigger.triggered(cal); 2821 2822 String state = STATE_WAITING; 2823 boolean force = true; 2824 2825 if (job.isStateful()) { 2826 state = STATE_BLOCKED; 2827 force = false; 2828 try { 2829 getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getName(), 2830 job.getGroup(), STATE_BLOCKED, STATE_WAITING); 2831 getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getName(), 2832 job.getGroup(), STATE_BLOCKED, STATE_ACQUIRED); 2833 getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getName(), 2834 job.getGroup(), STATE_PAUSED_BLOCKED, STATE_PAUSED); 2835 } catch (SQLException e) { 2836 throw new JobPersistenceException( 2837 "Couldn't update states of blocked triggers: " 2838 + e.getMessage(), e); 2839 } 2840 } 2841 2842 if (trigger.getNextFireTime() == null) { 2843 state = STATE_COMPLETE; 2844 force = true; 2845 } 2846 2847 storeTrigger(conn, ctxt, trigger, job, true, state, force, false); 2848 2849 job.getJobDataMap().clearDirtyFlag(); 2850 2851 return new TriggerFiredBundle(job, trigger, cal, trigger.getGroup() 2852 .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date (), trigger 2853 .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); 2854 } 2855 2856 2865 public void triggeredJobComplete(final SchedulingContext ctxt, final Trigger trigger, 2866 final JobDetail jobDetail, final int triggerInstCode) 2867 throws JobPersistenceException { 2868 executeInNonManagedTXLock( 2869 LOCK_TRIGGER_ACCESS, 2870 new VoidTransactionCallback() { 2871 public void execute(Connection conn) throws JobPersistenceException { 2872 triggeredJobComplete(conn, ctxt, trigger, jobDetail,triggerInstCode); 2873 } 2874 }); 2875 } 2876 2877 protected void triggeredJobComplete(Connection conn, 2878 SchedulingContext ctxt, Trigger trigger, JobDetail jobDetail, 2879 int triggerInstCode) throws JobPersistenceException { 2880 try { 2881 if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) { 2882 if(trigger.getNextFireTime() == null) { 2883 TriggerStatus stat = getDelegate().selectTriggerStatus( 2886 conn, trigger.getName(), trigger.getGroup()); 2887 if(stat != null && stat.getNextFireTime() == null) { 2888 removeTrigger(conn, ctxt, trigger.getName(), trigger.getGroup()); 2889 } 2890 } else{ 2891 removeTrigger(conn, ctxt, trigger.getName(), trigger.getGroup()); 2892 } 2893 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) { 2894 getDelegate().updateTriggerState(conn, trigger.getName(), 2895 trigger.getGroup(), STATE_COMPLETE); 2896 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) { 2897 getLog().info("Trigger " + trigger.getFullName() + " set to ERROR state."); 2898 getDelegate().updateTriggerState(conn, trigger.getName(), 2899 trigger.getGroup(), STATE_ERROR); 2900 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) { 2901 getDelegate().updateTriggerStatesForJob(conn, 2902 trigger.getJobName(), trigger.getJobGroup(), 2903 STATE_COMPLETE); 2904 } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) { 2905 getLog().info("All triggers of Job " + 2906 trigger.getFullJobName() + " set to ERROR state."); 2907 getDelegate().updateTriggerStatesForJob(conn, 2908 trigger.getJobName(), trigger.getJobGroup(), 2909 STATE_ERROR); 2910 } 2911 2912 if (jobDetail.isStateful()) { 2913 getDelegate().updateTriggerStatesForJobFromOtherState(conn, 2914 jobDetail.getName(), jobDetail.getGroup(), 2915 STATE_WAITING, STATE_BLOCKED); 2916 2917 getDelegate().updateTriggerStatesForJobFromOtherState(conn, 2918 jobDetail.getName(), jobDetail.getGroup(), 2919 STATE_PAUSED, STATE_PAUSED_BLOCKED); 2920 2921 try { 2922 if (jobDetail.getJobDataMap().isDirty()) { 2923 getDelegate().updateJobData(conn, jobDetail); 2924 } 2925 } catch (IOException e) { 2926 throw new JobPersistenceException( 2927 "Couldn't serialize job data: " + e.getMessage(), e); 2928 } catch (SQLException e) { 2929 throw new JobPersistenceException( 2930 "Couldn't update job data: " + e.getMessage(), e); 2931 } 2932 } 2933 } catch (SQLException e) { 2934 throw new JobPersistenceException( 2935 "Couldn't update trigger state(s): " + e.getMessage(), e); 2936 } 2937 2938 try { 2939 getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId()); 2940 } catch (SQLException e) { 2941 throw new JobPersistenceException("Couldn't delete fired trigger: " 2942 + e.getMessage(), e); 2943 } 2944 } 2945 2946 2951 protected DriverDelegate getDelegate() throws NoSuchDelegateException { 2952 if (null == delegate) { 2953 try { 2954 if(delegateClassName != null) { 2955 delegateClass = 2956 getClassLoadHelper().loadClass(delegateClassName); 2957 } 2958 2959 Constructor ctor = null; 2960 Object [] ctorParams = null; 2961 if (canUseProperties()) { 2962 Class [] ctorParamTypes = new Class []{ 2963 Log.class, String .class, String .class, Boolean .class}; 2964 ctor = delegateClass.getConstructor(ctorParamTypes); 2965 ctorParams = new Object []{ 2966 getLog(), tablePrefix, 2967 instanceId, new Boolean (canUseProperties())}; 2968 } else { 2969 Class [] ctorParamTypes = new Class []{ 2970 Log.class, String .class, String .class}; 2971 ctor = delegateClass.getConstructor(ctorParamTypes); 2972 ctorParams = new Object []{getLog(), tablePrefix, instanceId}; 2973 } 2974 2975 delegate = (DriverDelegate) ctor.newInstance(ctorParams); 2976 } catch (NoSuchMethodException e) { 2977 throw new NoSuchDelegateException( 2978 "Couldn't find delegate constructor: " + e.getMessage()); 2979 } catch (InstantiationException e) { 2980 throw new NoSuchDelegateException("Couldn't create delegate: " 2981 + e.getMessage()); 2982 } catch (IllegalAccessException e) { 2983 throw new NoSuchDelegateException("Couldn't create delegate: " 2984 + e.getMessage()); 2985 } catch (InvocationTargetException e) { 2986 throw new NoSuchDelegateException("Couldn't create delegate: " 2987 + e.getMessage()); 2988 } catch (ClassNotFoundException e) { 2989 throw new NoSuchDelegateException("Couldn't load delegate class: " 2990 + e.getMessage()); 2991 } 2992 } 2993 2994 return delegate; 2995 } 2996 2997 protected Semaphore getLockHandler() { 2998 return lockHandler; 2999 } 3000 3001 public void setLockHandler(Semaphore lockHandler) { 3002 this.lockHandler = lockHandler; 3003 } 3004 3005 3009 protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException { 3010 boolean transOwner = false; 3011 Connection conn = getNonManagedTXConnection(); 3012 try { 3013 RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP; 3014 3015 int misfireCount = (getDoubleCheckLockMisfireHandler()) ? 3019 getDelegate().countMisfiredTriggersInStates( 3020 conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime()) : 3021 Integer.MAX_VALUE; 3022 3023 if (misfireCount == 0) { 3024 getLog().debug( 3025 "Found 0 triggers that missed their scheduled fire-time."); 3026 } else { 3027 transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); 3028 3029 result = recoverMisfiredJobs(conn, false); 3030 } 3031 3032 commitConnection(conn); 3033 return result; 3034 } catch (JobPersistenceException e) { 3035 rollbackConnection(conn); 3036 throw e; 3037 } catch (SQLException e) { 3038 rollbackConnection(conn); 3039 throw new JobPersistenceException("Database error recovering from misfires.", e); 3040 } catch (RuntimeException e) { 3041 rollbackConnection(conn); 3042 throw new JobPersistenceException("Unexpected runtime exception: " 3043 + e.getMessage(), e); 3044 } finally { 3045 try { 3046 releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); 3047 } finally { 3048 cleanupConnection(conn); 3049 } 3050 } 3051 } 3052 3053 protected void signalSchedulingChange() { 3054 signaler.signalSchedulingChange(); 3055 } 3056 3057 3061 protected boolean firstCheckIn = true; 3062 3063 protected long lastCheckin = System.currentTimeMillis(); 3064 3065 protected boolean doCheckin() throws JobPersistenceException { 3066 boolean transOwner = false; 3067 boolean transStateOwner = false; 3068 boolean recovered = false; 3069 3070 Connection conn = getNonManagedTXConnection(); 3071 try { 3072 List failedRecords = null; 3077 if (firstCheckIn == false) { 3078 boolean succeeded = false; 3079 try { 3080 failedRecords = clusterCheckIn(conn); 3081 commitConnection(conn); 3082 succeeded = true; 3083 } catch (JobPersistenceException e) { 3084 rollbackConnection(conn); 3085 throw e; 3086 } finally { 3087 if (succeeded == false) { 3090 cleanupConnection(conn); 3091 } 3092 } 3093 } 3094 3095 if (firstCheckIn || (failedRecords.size() > 0)) { 3096 getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS); 3097 transStateOwner = true; 3098 3099 failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn); 3102 3103 if (failedRecords.size() > 0) { 3104 getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); 3105 transOwner = true; 3107 3108 clusterRecover(conn, failedRecords); 3109 recovered = true; 3110 } 3111 } 3112 3113 commitConnection(conn); 3114 } catch (JobPersistenceException e) { 3115 rollbackConnection(conn); 3116 throw e; 3117 } finally { 3118 try { 3119 releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); 3120 } finally { 3121 try { 3122 releaseLock(conn, LOCK_STATE_ACCESS, transStateOwner); 3123 } finally { 3124 cleanupConnection(conn); 3125 } 3126 } 3127 } 3128 3129 firstCheckIn = false; 3130 3131 return recovered; 3132 } 3133 3134 3138 protected List findFailedInstances(Connection conn) 3139 throws JobPersistenceException { 3140 try { 3141 List failedInstances = new LinkedList (); 3142 boolean foundThisScheduler = false; 3143 long timeNow = System.currentTimeMillis(); 3144 3145 List states = getDelegate().selectSchedulerStateRecords(conn, null); 3146 3147 for (Iterator itr = states.iterator(); itr.hasNext();) { 3148 SchedulerStateRecord rec = (SchedulerStateRecord) itr.next(); 3149 3150 if (rec.getSchedulerInstanceId().equals(getInstanceId())) { 3152 foundThisScheduler = true; 3153 if (firstCheckIn) { 3154 failedInstances.add(rec); 3155 } 3156 } else { 3157 if (calcFailedIfAfter(rec) < timeNow) { 3159 failedInstances.add(rec); 3160 } 3161 } 3162 } 3163 3164 if (firstCheckIn) { 3166 failedInstances.addAll(findOrphanedFailedInstances(conn, states)); 3167 } 3168 3169 if ((foundThisScheduler == false) && (firstCheckIn == false)) { 3172 getLog().warn( 3174 "This scheduler instance (" + getInstanceId() + ") is still " + 3175 "active but was recovered by another instance in the cluster. " + 3176 "This may cause inconsistent behavior."); 3177 } 3178 3179 return failedInstances; 3180 } catch (Exception e) { 3181 lastCheckin = System.currentTimeMillis(); 3182 throw new JobPersistenceException("Failure identifying failed instances when checking-in: " 3183 + e.getMessage(), e); 3184 } 3185 } 3186 3187 3194 private List findOrphanedFailedInstances( 3195 Connection conn, 3196 List schedulerStateRecords) 3197 throws SQLException , NoSuchDelegateException { 3198 List orphanedInstances = new ArrayList (); 3199 3200 Set allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn); 3201 if (allFiredTriggerInstanceNames.isEmpty() == false) { 3202 for (Iterator schedulerStateIter = schedulerStateRecords.iterator(); 3203 schedulerStateIter.hasNext();) { 3204 SchedulerStateRecord rec = (SchedulerStateRecord)schedulerStateIter.next(); 3205 3206 allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId()); 3207 } 3208 3209 for (Iterator orphanIter = allFiredTriggerInstanceNames.iterator(); 3210 orphanIter.hasNext();) { 3211 3212 SchedulerStateRecord orphanedInstance = new SchedulerStateRecord(); 3213 orphanedInstance.setSchedulerInstanceId((String )orphanIter.next()); 3214 3215 orphanedInstances.add(orphanedInstance); 3216 3217 getLog().warn( 3218 "Found orphaned fired triggers for instance: " + orphanedInstance.getSchedulerInstanceId()); 3219 } 3220 } 3221 3222 return orphanedInstances; 3223 } 3224 3225 protected long calcFailedIfAfter(SchedulerStateRecord rec) { 3226 return rec.getCheckinTimestamp() + 3227 Math.max(rec.getCheckinInterval(), 3228 (System.currentTimeMillis() - lastCheckin)) + 3229 7500L; 3230 } 3231 3232 protected List clusterCheckIn(Connection conn) 3233 throws JobPersistenceException { 3234 3235 List failedInstances = findFailedInstances(conn); 3236 3237 try { 3238 3240 lastCheckin = System.currentTimeMillis(); 3242 if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) { 3243 getDelegate().insertSchedulerState(conn, getInstanceId(), 3244 lastCheckin, getClusterCheckinInterval()); 3245 } 3246 3247 } catch (Exception e) { 3248 throw new JobPersistenceException("Failure updating scheduler state when checking-in: " 3249 + e.getMessage(), e); 3250 } 3251 3252 return failedInstances; 3253 } 3254 3255 protected void clusterRecover(Connection conn, List failedInstances) 3256 throws JobPersistenceException { 3257 3258 if (failedInstances.size() > 0) { 3259 3260 long recoverIds = System.currentTimeMillis(); 3261 3262 logWarnIfNonZero(failedInstances.size(), 3263 "ClusterManager: detected " + failedInstances.size() 3264 + " failed or restarted instances."); 3265 try { 3266 Iterator itr = failedInstances.iterator(); 3267 while (itr.hasNext()) { 3268 SchedulerStateRecord rec = (SchedulerStateRecord) itr 3269 .next(); 3270 3271 getLog().info( 3272 "ClusterManager: Scanning for instance \"" 3273 + rec.getSchedulerInstanceId() 3274 + "\"'s failed in-progress jobs."); 3275 3276 List firedTriggerRecs = getDelegate() 3277 .selectInstancesFiredTriggerRecords(conn, 3278 rec.getSchedulerInstanceId()); 3279 3280 int acquiredCount = 0; 3281 int recoveredCount = 0; 3282 int otherCount = 0; 3283 3284 Set triggerKeys = new HashSet (); 3285 3286 Iterator ftItr = firedTriggerRecs.iterator(); 3287 while (ftItr.hasNext()) { 3288 FiredTriggerRecord ftRec = (FiredTriggerRecord) ftItr 3289 .next(); 3290 3291 Key tKey = ftRec.getTriggerKey(); 3292 Key jKey = ftRec.getJobKey(); 3293 3294 triggerKeys.add(tKey); 3295 3296 if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) { 3298 getDelegate() 3299 .updateTriggerStatesForJobFromOtherState( 3300 conn, jKey.getName(), 3301 jKey.getGroup(), STATE_WAITING, 3302 STATE_BLOCKED); 3303 } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) { 3304 getDelegate() 3305 .updateTriggerStatesForJobFromOtherState( 3306 conn, jKey.getName(), 3307 jKey.getGroup(), STATE_PAUSED, 3308 STATE_PAUSED_BLOCKED); 3309 } 3310 3311 if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) { 3313 getDelegate().updateTriggerStateFromOtherState( 3314 conn, tKey.getName(), tKey.getGroup(), 3315 STATE_WAITING, STATE_ACQUIRED); 3316 acquiredCount++; 3317 } else if (ftRec.isJobRequestsRecovery()) { 3318 if (jobExists(conn, jKey.getName(), jKey.getGroup())) { 3321 SimpleTrigger rcvryTrig = new SimpleTrigger( 3322 "recover_" 3323 + rec.getSchedulerInstanceId() 3324 + "_" 3325 + String.valueOf(recoverIds++), 3326 Scheduler.DEFAULT_RECOVERY_GROUP, 3327 new Date (ftRec.getFireTimestamp())); 3328 rcvryTrig.setJobName(jKey.getName()); 3329 rcvryTrig.setJobGroup(jKey.getGroup()); 3330 rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW); 3331 rcvryTrig.setPriority(ftRec.getPriority()); 3332 JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup()); 3333 jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName()); 3334 jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup()); 3335 jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp())); 3336 rcvryTrig.setJobDataMap(jd); 3337 3338 rcvryTrig.computeFirstFireTime(null); 3339 storeTrigger(conn, null, rcvryTrig, null, false, 3340 STATE_WAITING, false, true); 3341 recoveredCount++; 3342 } else { 3343 getLog() 3344 .warn( 3345 "ClusterManager: failed job '" 3346 + jKey 3347 + "' no longer exists, cannot schedule recovery."); 3348 otherCount++; 3349 } 3350 } else { 3351 otherCount++; 3352 } 3353 3354 if (ftRec.isJobIsStateful()) { 3356 getDelegate() 3357 .updateTriggerStatesForJobFromOtherState( 3358 conn, jKey.getName(), 3359 jKey.getGroup(), STATE_WAITING, 3360 STATE_BLOCKED); 3361 getDelegate() 3362 .updateTriggerStatesForJobFromOtherState( 3363 conn, jKey.getName(), 3364 jKey.getGroup(), STATE_PAUSED, 3365 STATE_PAUSED_BLOCKED); 3366 } 3367 } 3368 3369 getDelegate().deleteFiredTriggers(conn, 3370 rec.getSchedulerInstanceId()); 3371 3372 int completeCount = 0; 3375 for (Iterator triggerKeyIter = triggerKeys.iterator(); triggerKeyIter.hasNext();) { 3376 Key triggerKey = (Key)triggerKeyIter.next(); 3377 3378 if (getDelegate().selectTriggerState(conn, triggerKey.getName(), triggerKey.getGroup()). 3379 equals(STATE_COMPLETE)) { 3380 List firedTriggers = 3381 getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup()); 3382 if (firedTriggers.isEmpty()) { 3383 SchedulingContext schedulingContext = new SchedulingContext(); 3384 schedulingContext.setInstanceId(instanceId); 3385 3386 if (removeTrigger(conn, schedulingContext, triggerKey.getName(), triggerKey.getGroup())) { 3387 completeCount++; 3388 } 3389 } 3390 } 3391 } 3392 3393 logWarnIfNonZero(acquiredCount, 3394 "ClusterManager: ......Freed " + acquiredCount 3395 + " acquired trigger(s)."); 3396 logWarnIfNonZero(completeCount, 3397 "ClusterManager: ......Deleted " + completeCount 3398 + " complete triggers(s)."); 3399 logWarnIfNonZero(recoveredCount, 3400 "ClusterManager: ......Scheduled " + recoveredCount 3401 + " recoverable job(s) for recovery."); 3402 logWarnIfNonZero(otherCount, 3403 "ClusterManager: ......Cleaned-up " + otherCount 3404 + " other failed job(s)."); 3405 3406 if (rec.getSchedulerInstanceId().equals(getInstanceId()) == false) { 3407 getDelegate().deleteSchedulerState(conn, 3408 rec.getSchedulerInstanceId()); 3409 } 3410 } 3411 } catch (Exception e) { 3412 throw new JobPersistenceException("Failure recovering jobs: " 3413 + e.getMessage(), e); 3414 } 3415 } 3416 } 3417 3418 protected void logWarnIfNonZero(int val, String warning) { 3419 if (val > 0) { 3420 getLog().info(warning); 3421 } else { 3422 getLog().debug(warning); 3423 } 3424 } 3425 3426 3442 protected void cleanupConnection(Connection conn) { 3443 if (conn != null) { 3444 if (conn instanceof Proxy ) { 3445 Proxy connProxy = (Proxy )conn; 3446 3447 InvocationHandler invocationHandler = 3448 Proxy.getInvocationHandler(connProxy); 3449 if (invocationHandler instanceof AttributeRestoringConnectionInvocationHandler) { 3450 AttributeRestoringConnectionInvocationHandler connHandler = 3451 (AttributeRestoringConnectionInvocationHandler)invocationHandler; 3452 3453 connHandler.restoreOriginalAtributes(); 3454 closeConnection(connHandler.getWrappedConnection()); 3455 return; 3456 } 3457 } 3458 3459 closeConnection(conn); 3461 } 3462 } 3463 3464 3465 3475 protected void closeConnection(Connection conn) { 3476 if (conn != null) { 3477 try { 3478 conn.close(); 3479 } catch (SQLException e) { 3480 getLog().error("Failed to close Connection", e); 3481 } catch (Throwable e) { 3482 getLog().error( 3483 "Unexpected exception closing Connection." + 3484 " This is often due to a Connection being returned after or during shutdown.", e); 3485 } 3486 } 3487 } 3488 3489 3500 protected void rollbackConnection(Connection conn) { 3501 if (conn != null) { 3502 try { 3503 conn.rollback(); 3504 } catch (SQLException e) { 3505 getLog().error( 3506 "Couldn't rollback jdbc connection. "+e.getMessage(), e); 3507 } 3508 } 3509 } 3510 3511 3518 protected void commitConnection(Connection conn) 3519 throws JobPersistenceException { 3520 3521 if (conn != null) { 3522 try { 3523 conn.commit(); 3524 } catch (SQLException e) { 3525 throw new JobPersistenceException( 3526 "Couldn't commit jdbc connection. "+e.getMessage(), e); 3527 } 3528 } 3529 } 3530 3531 3540 protected interface TransactionCallback { 3541 Object execute(Connection conn) throws JobPersistenceException; 3542 } 3543 3544 3550 protected interface VoidTransactionCallback { 3551 void execute(Connection conn) throws JobPersistenceException; 3552 } 3553 3554 3565 public Object executeWithoutLock( 3566 TransactionCallback txCallback) throws JobPersistenceException { 3567 return executeInLock(null, txCallback); 3568 } 3569 3570 3583 protected void executeInLock( 3584 final String lockName, 3585 final VoidTransactionCallback txCallback) throws JobPersistenceException { 3586 executeInLock( 3587 lockName, 3588 new TransactionCallback() { 3589 public Object execute(Connection conn) throws JobPersistenceException { 3590 txCallback.execute(conn); 3591 return null; 3592 } 3593 }); 3594 } 3595 3596 3605 protected abstract Object executeInLock( 3606 String lockName, 3607 TransactionCallback txCallback) throws JobPersistenceException; 3608 3609 3621 protected void executeInNonManagedTXLock( 3622 final String lockName, 3623 final VoidTransactionCallback txCallback) throws JobPersistenceException { 3624 executeInNonManagedTXLock( 3625 lockName, 3626 new TransactionCallback() { 3627 public Object execute(Connection conn) throws JobPersistenceException { 3628 txCallback.execute(conn); 3629 return null; 3630 } 3631 }); 3632 } 3633 3634 3642 protected Object executeInNonManagedTXLock( 3643 String lockName, 3644 TransactionCallback txCallback) throws JobPersistenceException { 3645 boolean transOwner = false; 3646 Connection conn = null; 3647 try { 3648 if (lockName != null) { 3649 if (getLockHandler().requiresConnection()) { 3652 conn = getNonManagedTXConnection(); 3653 } 3654 3655 transOwner = getLockHandler().obtainLock(conn, lockName); 3656 } 3657 3658 if (conn == null) { 3659 conn = getNonManagedTXConnection(); 3660 } 3661 3662 Object result = txCallback.execute(conn); 3663 commitConnection(conn); 3664 return result; 3665 } catch (JobPersistenceException e) { 3666 rollbackConnection(conn); 3667 throw e; 3668 } catch (RuntimeException e) { 3669 rollbackConnection(conn); 3670 throw new JobPersistenceException("Unexpected runtime exception: " 3671 + e.getMessage(), e); 3672 } finally { 3673 try { 3674 releaseLock(conn, lockName, transOwner); 3675 } finally { 3676 cleanupConnection(conn); 3677 } 3678 } 3679 } 3680 3681 3687 class ClusterManager extends Thread { 3688 3689 private boolean shutdown = false; 3690 3691 private int numFails = 0; 3692 3693 ClusterManager() { 3694 this.setPriority(Thread.NORM_PRIORITY + 2); 3695 this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_ClusterManager"); 3696 this.setDaemon(getMakeThreadsDaemons()); 3697 } 3698 3699 public void initialize() { 3700 this.manage(); 3701 this.start(); 3702 } 3703 3704 public void shutdown() { 3705 shutdown = true; 3706 this.interrupt(); 3707 } 3708 3709 private boolean manage() { 3710 boolean res = false; 3711 try { 3712 3713 res = doCheckin(); 3714 3715 numFails = 0; 3716 getLog().debug("ClusterManager: Check-in complete."); 3717 } catch (Exception e) { 3718 if(numFails % 4 == 0) { 3719 getLog().error( 3720 "ClusterManager: Error managing cluster: " 3721 + e.getMessage(), e); 3722 } 3723 numFails++; 3724 } 3725 return res; 3726 } 3727 3728 public void run() { 3729 while (!shutdown) { 3730 3731 if (!shutdown) { 3732 long timeToSleep = getClusterCheckinInterval(); 3733 long transpiredTime = (System.currentTimeMillis() - lastCheckin); 3734 timeToSleep = timeToSleep - transpiredTime; 3735 if (timeToSleep <= 0) { 3736 timeToSleep = 100L; 3737 } 3738 3739 if(numFails > 0) { 3740 timeToSleep = Math.max(getDbRetryInterval(), timeToSleep); 3741 } 3742 3743 try { 3744 Thread.sleep(timeToSleep); 3745 } catch (Exception ignore) { 3746 } 3747 } 3748 3749 if (!shutdown && this.manage()) { 3750 signalSchedulingChange(); 3751 } 3752 3753 } } 3755 } 3756 3757 3763 class MisfireHandler extends Thread { 3764 3765 private boolean shutdown = false; 3766 3767 private int numFails = 0; 3768 3769 3770 MisfireHandler() { 3771 this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_MisfireHandler"); 3772 this.setDaemon(getMakeThreadsDaemons()); 3773 } 3774 3775 public void initialize() { 3776 this.start(); 3778 } 3779 3780 public void shutdown() { 3781 shutdown = true; 3782 this.interrupt(); 3783 } 3784 3785 private RecoverMisfiredJobsResult manage() { 3786 try { 3787 getLog().debug("MisfireHandler: scanning for misfires..."); 3788 3789 RecoverMisfiredJobsResult res = doRecoverMisfires(); 3790 numFails = 0; 3791 return res; 3792 } catch (Exception e) { 3793 if(numFails % 4 == 0) { 3794 getLog().error( 3795 "MisfireHandler: Error handling misfires: " 3796 + e.getMessage(), e); 3797 } 3798 numFails++; 3799 } 3800 return RecoverMisfiredJobsResult.NO_OP; 3801 } 3802 3803 public void run() { 3804 3805 while (!shutdown) { 3806 3807 long sTime = System.currentTimeMillis(); 3808 3809 RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage(); 3810 3811 if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) { 3812 signalSchedulingChange(); 3813 } 3814 3815 if (!shutdown) { 3816 long timeToSleep = 50l; if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) { 3818 timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime); 3819 if (timeToSleep <= 0) { 3820 timeToSleep = 50l; 3821 } 3822 3823 if(numFails > 0) { 3824 timeToSleep = Math.max(getDbRetryInterval(), timeToSleep); 3825 } 3826 } 3827 3828 try { 3829 Thread.sleep(timeToSleep); 3830 } catch (Exception ignore) { 3831 } 3832 } } 3834 } 3835 } 3836} 3837 3838 | Popular Tags |