| 1 17 18 21 package org.quartz.impl.jdbcjobstore; 22 23 import java.io.IOException ; 24 import java.lang.reflect.Constructor ; 25 import java.lang.reflect.InvocationHandler ; 26 import java.lang.reflect.InvocationTargetException ; 27 import java.lang.reflect.Proxy ; 28 import java.sql.Connection ; 29 import java.sql.SQLException ; 30 import java.util.ArrayList ; 31 import java.util.Date ; 32 import java.util.HashMap ; 33 import java.util.HashSet ; 34 import java.util.Iterator ; 35 import java.util.LinkedList ; 36 import java.util.List ; 37 import java.util.Set ; 38 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 import org.quartz.Calendar; 42 import org.quartz.CronTrigger; 43 import org.quartz.JobDataMap; 44 import org.quartz.JobDetail; 45 import org.quartz.JobPersistenceException; 46 import org.quartz.ObjectAlreadyExistsException; 47 import org.quartz.Scheduler; 48 import org.quartz.SchedulerConfigException; 49 import org.quartz.SchedulerException; 50 import org.quartz.SimpleTrigger; 51 import org.quartz.Trigger; 52 import org.quartz.core.SchedulingContext; 53 import org.quartz.spi.ClassLoadHelper; 54 import org.quartz.spi.JobStore; 55 import org.quartz.spi.SchedulerSignaler; 56 import org.quartz.spi.TriggerFiredBundle; 57 import org.quartz.utils.DBConnectionManager; 58 import org.quartz.utils.Key; 59 import org.quartz.utils.TriggerStatus; 60 61 62 70 public abstract class JobStoreSupport implements JobStore, Constants { 71 72 79 80 protected static String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS"; 81 82 protected static String LOCK_JOB_ACCESS = "JOB_ACCESS"; 83 84 protected static String LOCK_CALENDAR_ACCESS = "CALENDAR_ACCESS"; 85 86 protected static String LOCK_STATE_ACCESS = "STATE_ACCESS"; 87 88 protected static String LOCK_MISFIRE_ACCESS = "MISFIRE_ACCESS"; 89 90 97 98 protected String dsName; 99 100 protected String tablePrefix = DEFAULT_TABLE_PREFIX; 101 102 protected boolean useProperties = false; 103 104 protected String instanceId; 105 106 protected String instanceName; 107 108 protected String delegateClassName; 109 protected Class delegateClass = StdJDBCDelegate.class; 110 111 protected HashMap calendarCache = new HashMap (); 112 113 private DriverDelegate delegate; 114 115 private long misfireThreshold = 60000L; 117 private boolean dontSetAutoCommitFalse = false; 118 119 private boolean isClustered = false; 120 121 private boolean useDBLocks = false; 122 123 private boolean lockOnInsert = true; 124 125 private Semaphore lockHandler = null; 127 private String selectWithLockSQL = null; 128 129 private long clusterCheckinInterval = 7500L; 130 131 private ClusterManager clusterManagementThread = null; 132 133 private MisfireHandler misfireHandler = null; 134 135 private ClassLoadHelper classLoadHelper; 136 137 private SchedulerSignaler signaler; 138 139 protected int maxToRecoverAtATime = 20; 140 141 private boolean setTxIsolationLevelSequential = false; 142 143 private long dbRetryInterval = 10000; 144 145 private boolean makeThreadsDaemons = false; 146 147 private boolean doubleCheckLockMisfireHandler = true; 148 149 private final Log log = LogFactory.getLog(getClass()); 150 151 158 159 165 public void setDataSource(String dsName) { 166 this.dsName = dsName; 167 } 168 169 175 public String getDataSource() { 176 return dsName; 177 } 178 179 184 public void setTablePrefix(String prefix) { 185 if (prefix == null) { 186 prefix = ""; 187 } 188 189 this.tablePrefix = prefix; 190 } 191 192 197 public String getTablePrefix() { 198 return tablePrefix; 199 } 200 201 206 public void setUseProperties(String useProp) { 207 if (useProp == null) { 208 useProp = "false"; 209 } 210 211 this.useProperties = Boolean.valueOf(useProp).booleanValue(); 212 } 213 214 219 public boolean canUseProperties() { 220 return useProperties; 221 } 222 223 228 public void setInstanceId(String instanceId) { 229 this.instanceId = instanceId; 230 } 231 232 237 public String getInstanceId() { 238 239 return instanceId; 240 } 241 242 245 public void setInstanceName(String instanceName) { 246 this.instanceName = instanceName; 247 } 248 249 252 public String getInstanceName() { 253 254 return instanceName; 255 } 256 257 262 public void setIsClustered(boolean isClustered) { 263 this.isClustered = isClustered; 264 } 265 266 271 public boolean isClustered() { 272 return isClustered; 273 } 274 275 282 public long getClusterCheckinInterval() { 283 return clusterCheckinInterval; 284 } 285 286 293 public void setClusterCheckinInterval(long l) { 294 clusterCheckinInterval = l; 295 } 296 297 304 public int getMaxMisfiresToHandleAtATime() { 305 return maxToRecoverAtATime; 306 } 307 308 315 public void setMaxMisfiresToHandleAtATime(int maxToRecoverAtATime) { 316 this.maxToRecoverAtATime = maxToRecoverAtATime; 317 } 318 319 322 public long getDbRetryInterval() { 323 return dbRetryInterval; 324 } 325 328 public void setDbRetryInterval(long dbRetryInterval) { 329 this.dbRetryInterval = dbRetryInterval; 330 } 331 332 338 public void setUseDBLocks(boolean useDBLocks) { 339 this.useDBLocks = useDBLocks; 340 } 341 342 348 public boolean getUseDBLocks() { 349 return useDBLocks; 350 } 351 352 public boolean isLockOnInsert() { 353 return lockOnInsert; 354 } 355 356 368 public void setLockOnInsert(boolean lockOnInsert) { 369 this.lockOnInsert = lockOnInsert; 370 } 371 372 public long getMisfireThreshold() { 373 return misfireThreshold; 374 } 375 376 383 public void setMisfireThreshold(long misfireThreshold) { 384 if (misfireThreshold < 1) { 385 throw new IllegalArgumentException ( 386 "Misfirethreshold must be larger than 0"); 387 } 388 this.misfireThreshold = misfireThreshold; 389 } 390 391 public boolean isDontSetAutoCommitFalse() { 392 return dontSetAutoCommitFalse; 393 } 394 395 402 public void setDontSetAutoCommitFalse(boolean b) { 403 dontSetAutoCommitFalse = b; 404 } 405 406 public boolean isTxIsolationLevelSerializable() { 407 return setTxIsolationLevelSequential; 408 } 409 410 415 public void setTxIsolationLevelSerializable(boolean b) { 416 setTxIsolationLevelSequential = b; 417 } 418 419 420 428 public void setDriverDelegateClass(String delegateClassName) 429 throws InvalidConfigurationException { 430 this.delegateClassName = delegateClassName; 431 } 432 433 440 public String getDriverDelegateClass() { 441 return delegateClassName; 442 } 443 444 public String getSelectWithLockSQL() { 445 return selectWithLockSQL; 446 } 447 448 456 public void setSelectWithLockSQL(String string) { 457 selectWithLockSQL = string; 458 } 459 460 protected ClassLoadHelper getClassLoadHelper() { 461 return classLoadHelper; 462 } 463 464 471 public boolean getMakeThreadsDaemons() { 472 return makeThreadsDaemons; 473 } 474 475 482 public void setMakeThreadsDaemons(boolean makeThreadsDaemons) { 483 this.makeThreadsDaemons = makeThreadsDaemons; 484 } 485 486 492 public boolean getDoubleCheckLockMisfireHandler() { 493 return doubleCheckLockMisfireHandler; 494 } 495 496 502 public void setDoubleCheckLockMisfireHandler( 503 boolean doubleCheckLockMisfireHandler) { 504 this.doubleCheckLockMisfireHandler = doubleCheckLockMisfireHandler; 505 } 506 507 511 protected Log getLog() { 512 return log; 513 } 514 515 521 public void initialize(ClassLoadHelper loadHelper, 522 SchedulerSignaler signaler) throws SchedulerConfigException { 523 524 if (dsName == null) { 525 throw new SchedulerConfigException("DataSource name not set."); 526 } 527 528 classLoadHelper = loadHelper; 529 this.signaler = signaler; 530 531 if (getLockHandler() == null) { 534 535 if (isClustered()) { 538 setUseDBLocks(true); 539 } 540 541 if (getUseDBLocks()) { 542 getLog().info( 543 "Using db table-based data access locking (synchronization)."); 544 setLockHandler( 545 new StdRowLockSemaphore(getTablePrefix(), getSelectWithLockSQL())); 546 } else { 547 getLog().info( 548 "Using thread monitor-based data access locking (synchronization)."); 549 setLockHandler(new SimpleSemaphore()); 550 } 551 } 552 553 if (!isClustered()) { 554 try { 555 cleanVolatileTriggerAndJobs(); 556 } catch (SchedulerException se) { 557 throw new SchedulerConfigException( 558 "Failure occured during job recovery.", se); 559 } 560 } 561 } 562 563 566 public void schedulerStarted() throws SchedulerException { 567 568 if (isClustered()) { 569 clusterManagementThread = new ClusterManager(); 570 clusterManagementThread.initialize(); 571 } else { 572 try { 573 recoverJobs(); 574 } catch (SchedulerException se) { 575 throw new SchedulerConfigException( 576 "Failure occured during job recovery.", se); 577 } 578 } 579 580 misfireHandler = new MisfireHandler(); 581 misfireHandler.initialize(); 582 } 583 584 591 public void shutdown() { 592 if (clusterManagementThread != null) { 593 clusterManagementThread.shutdown(); 594 } 595 596 if (misfireHandler != null) { 597 misfireHandler.shutdown(); 598 } 599 600 try { 601 DBConnectionManager.getInstance().shutdown(getDataSource()); 602 } catch (SQLException sqle) { 603 getLog().warn("Database connection shutdown unsuccessful.", sqle); 604 } 605 } 606 607 public boolean supportsPersistence() { 608 return true; 609 } 610 611 615 protected abstract Connection getNonManagedTXConnection() 616 throws JobPersistenceException; 617 618 623 protected Connection getAttributeRestoringConnection(Connection conn) { 624 return (Connection )Proxy.newProxyInstance( 625 Thread.currentThread().getContextClassLoader(), 626 new Class [] { Connection .class }, 627 new AttributeRestoringConnectionInvocationHandler(conn)); 628 } 629 630 protected Connection getConnection() throws JobPersistenceException { 631 Connection conn = null; 632 try { 633 conn = DBConnectionManager.getInstance().getConnection( 634 getDataSource()); 635 } catch (SQLException sqle) { 636 throw new JobPersistenceException( 637 "Failed to obtain DB connection from data source '" 638 + getDataSource() + "': " + sqle.toString(), sqle); 639 } catch (Throwable e) { 640 throw new JobPersistenceException( 641 "Failed to obtain DB connection from data source '" 642 + getDataSource() + "': " + e.toString(), e, 643 JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE); 644 } 645 646 if (conn == null) { 647 throw new JobPersistenceException( 648 "Could not get connection from DataSource '" 649 + getDataSource() + "'"); 650 } 651 652 conn = getAttributeRestoringConnection(conn); 654 655 try { 657 if (!isDontSetAutoCommitFalse()) { 658 conn.setAutoCommit(false); 659 } 660 661 if(isTxIsolationLevelSerializable()) { 662 conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); 663 } 664 } catch (SQLException sqle) { 665 getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle); 666 } catch (Throwable e) { 667 try { conn.close(); } catch(Throwable tt) {} 668 669 throw new JobPersistenceException( 670 "Failure setting up connection.", e); 671 } 672 673 return conn; 674 } 675 676 protected void releaseLock(Connection conn, String lockName, boolean doIt) { 677 if (doIt && conn != null) { 678 try { 679 getLockHandler().releaseLock(conn, lockName); 680 } catch (LockException le) { 681 getLog().error("Error returning lock: " + le.getMessage(), le); 682 } 683 } 684 } 685 686 691 protected void cleanVolatileTriggerAndJobs() 692 throws JobPersistenceException { 693 executeInNonManagedTXLock( 694 LOCK_TRIGGER_ACCESS, 695 new VoidTransactionCallback() { 696 public void execute(Connection conn) throws JobPersistenceException { 697 cleanVolatileTriggerAndJobs(conn); 698 } 699 }); 700 } 701 702 710 protected void cleanVolatileTriggerAndJobs(Connection conn) 711 throws JobPersistenceException { 712 try { 713 Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn); 715 Key[] volatileJobs = getDelegate().selectVolatileJobs(conn); 716 717 for (int i = 0; i < volatileTriggers.length; i++) { 718 removeTrigger(conn, null, volatileTriggers[i].getName(), 719 volatileTriggers[i].getGroup()); 720 } 721 getLog().info( 722 "Removed " + volatileTriggers.length 723 + " Volatile Trigger(s)."); 724 725 for (int i = 0; i < volatileJobs.length; i++) { 726 removeJob(conn, null, volatileJobs[i].getName(), 727 volatileJobs[i].getGroup(), true); 728 } 729 getLog().info( 730 "Removed " + volatileJobs.length + " Volatile Job(s)."); 731 732 getDelegate().deleteVolatileFiredTriggers(conn); 734 735 } catch (Exception e) { 736 throw new JobPersistenceException("Couldn't clean volatile data: " 737 + e.getMessage(), e); 738 } 739 } 740 741 747 protected void recoverJobs() throws JobPersistenceException { 748 executeInNonManagedTXLock( 749 LOCK_TRIGGER_ACCESS, 750 new VoidTransactionCallback() { 751 public void execute(Connection conn) throws JobPersistenceException { 752 recoverJobs(conn); 753 } 754 }); 755 } 756 757 766 protected void recoverJobs(Connection conn) throws JobPersistenceException { 767 try { 768 int rows = getDelegate().updateTriggerStatesFromOtherStates(conn, 770 STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED); 771 772 rows += getDelegate().updateTriggerStatesFromOtherStates(conn, 773 STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED); 774 775 getLog().info( 776 "Freed " + rows 777 + " triggers from 'acquired' / 'blocked' state."); 778 779 recoverMisfiredJobs(conn, true); 781 782 Trigger[] recoveringJobTriggers = getDelegate() 784 .selectTriggersForRecoveringJobs(conn); 785 getLog() 786 .info( 787 "Recovering " 788 + recoveringJobTriggers.length 789 + " jobs that were in-progress at the time of the last shut-down."); 790 791 for (int i = 0; i < recoveringJobTriggers.length; ++i) { 792 if (jobExists(conn, recoveringJobTriggers[i].getJobName(), 793 recoveringJobTriggers[i].getJobGroup())) { 794 recoveringJobTriggers[i].computeFirstFireTime(null); 795 storeTrigger(conn, null, recoveringJobTriggers[i], null, false, 796 STATE_WAITING, false, true); 797 } 798 } 799 getLog().info("Recovery complete."); 800 801 Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE); 803 for(int i=0; ct != null && i < ct.length; i++) { 804 removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup()); 805 } 806 getLog().info( 807 "Removed " + ct.length 808 + " 'complete' triggers."); 809 810 int n = getDelegate().deleteFiredTriggers(conn); 812 getLog().info("Removed " + n + " stale fired job entries."); 813 } catch (JobPersistenceException e) { 814 throw e; 815 } catch (Exception e) { 816 throw new JobPersistenceException("Couldn't recover jobs: " 817 + e.getMessage(), e); 818 } 819 } 820 821 protected long getMisfireTime() { 822 long misfireTime = System.currentTimeMillis(); 823 if (getMisfireThreshold() > 0) { 824 misfireTime -= getMisfireThreshold(); 825 } 826 827 return (misfireTime > 0) ? misfireTime : 0; 828 } 829 830 834 protected static class RecoverMisfiredJobsResult { 835 public static final RecoverMisfiredJobsResult NO_OP = 836 new RecoverMisfiredJobsResult(false, 0); 837 838 private boolean _hasMoreMisfiredTriggers; 839 private int _processedMisfiredTriggerCount; 840 841 public RecoverMisfiredJobsResult( 842 boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount) { 843 _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers; 844 _processedMisfiredTriggerCount = processedMisfiredTriggerCount; 845 } 846 847 public boolean hasMoreMisfiredTriggers() { 848 return _hasMoreMisfiredTriggers; 849 } 850 public int getProcessedMisfiredTriggerCount() { 851 return _processedMisfiredTriggerCount; 852 } 853 } 854 855 protected RecoverMisfiredJobsResult recoverMisfiredJobs( 856 Connection conn, boolean recovering) 857 throws JobPersistenceException, SQLException { 858 859 int maxMisfiresToHandleAtATime = 862 (recovering) ? -1 : getMaxMisfiresToHandleAtATime(); 863 864 List misfiredTriggers = new ArrayList (); 865 866 boolean hasMoreMisfiredTriggers = 869 getDelegate().selectMisfiredTriggersInStates( 870 conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime(), 871 maxMisfiresToHandleAtATime, misfiredTriggers); 872 873 if (hasMoreMisfiredTriggers) { 874 getLog().info( 875 "Handling the first " + misfiredTriggers.size() + 876 " triggers that missed their scheduled fire-time. " + 877 "More misfired triggers remain to be processed."); 878 } else if (misfiredTriggers.size() > 0) { 879 getLog().info( 880 "Handling " + misfiredTriggers.size() + 881 " trigger(s) that missed their scheduled fire-time."); 882 } else { 883 getLog().debug( 884 "Found 0 triggers that missed their scheduled fire-time."); 885 return RecoverMisfiredJobsResult.NO_OP; 886 } 887 888 for (Iterator misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter.hasNext();) { 889 Key triggerKey = (Key) misfiredTriggerIter.next(); 890 891 Trigger trig = 892 retrieveTrigger(conn, triggerKey.getName(), triggerKey.getGroup()); 893 894 if (trig == null) { 895 continue; 896 } 897 898 doUpdateOfMisfiredTrigge
|