KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > quartz > impl > jdbcjobstore > JobStoreSupport


1 /*
2  * Copyright 2004-2005 OpenSymphony
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy
6  * of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  *
16  */

17
18 /*
19  * Previously Copyright (c) 2001-2004 James House
20  */

21 package org.quartz.impl.jdbcjobstore;
22
23 import java.io.IOException JavaDoc;
24 import java.lang.reflect.Constructor JavaDoc;
25 import java.lang.reflect.InvocationHandler JavaDoc;
26 import java.lang.reflect.InvocationTargetException JavaDoc;
27 import java.lang.reflect.Proxy JavaDoc;
28 import java.sql.Connection JavaDoc;
29 import java.sql.SQLException JavaDoc;
30 import java.util.ArrayList JavaDoc;
31 import java.util.Date JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.HashSet JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.LinkedList JavaDoc;
36 import java.util.List JavaDoc;
37 import java.util.Set JavaDoc;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.quartz.Calendar;
42 import org.quartz.CronTrigger;
43 import org.quartz.JobDataMap;
44 import org.quartz.JobDetail;
45 import org.quartz.JobPersistenceException;
46 import org.quartz.ObjectAlreadyExistsException;
47 import org.quartz.Scheduler;
48 import org.quartz.SchedulerConfigException;
49 import org.quartz.SchedulerException;
50 import org.quartz.SimpleTrigger;
51 import org.quartz.Trigger;
52 import org.quartz.core.SchedulingContext;
53 import org.quartz.spi.ClassLoadHelper;
54 import org.quartz.spi.JobStore;
55 import org.quartz.spi.SchedulerSignaler;
56 import org.quartz.spi.TriggerFiredBundle;
57 import org.quartz.utils.DBConnectionManager;
58 import org.quartz.utils.Key;
59 import org.quartz.utils.TriggerStatus;
60
61
62 /**
63  * <p>
64  * Contains base functionality for JDBC-based JobStore implementations.
65  * </p>
66  *
67  * @author <a HREF="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
68  * @author James House
69  */

70 public abstract class JobStoreSupport implements JobStore, Constants {
71
72     /*
73      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
74      *
75      * Constants.
76      *
77      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
78      */

79
80     protected static String JavaDoc LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";
81
82     protected static String JavaDoc LOCK_JOB_ACCESS = "JOB_ACCESS";
83
84     protected static String JavaDoc LOCK_CALENDAR_ACCESS = "CALENDAR_ACCESS";
85
86     protected static String JavaDoc LOCK_STATE_ACCESS = "STATE_ACCESS";
87
88     protected static String JavaDoc LOCK_MISFIRE_ACCESS = "MISFIRE_ACCESS";
89
90     /*
91      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
92      *
93      * Data members.
94      *
95      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
96      */

97
98     protected String JavaDoc dsName;
99
100     protected String JavaDoc tablePrefix = DEFAULT_TABLE_PREFIX;
101
102     protected boolean useProperties = false;
103
104     protected String JavaDoc instanceId;
105
106     protected String JavaDoc instanceName;
107     
108     protected String JavaDoc delegateClassName;
109     protected Class JavaDoc delegateClass = StdJDBCDelegate.class;
110
111     protected HashMap JavaDoc calendarCache = new HashMap JavaDoc();
112
113     private DriverDelegate delegate;
114
115     private long misfireThreshold = 60000L; // one minute
116

117     private boolean dontSetAutoCommitFalse = false;
118
119     private boolean isClustered = false;
120
121     private boolean useDBLocks = false;
122     
123     private boolean lockOnInsert = true;
124
125     private Semaphore lockHandler = null; // set in initialize() method...
126

127     private String JavaDoc selectWithLockSQL = null;
128
129     private long clusterCheckinInterval = 7500L;
130
131     private ClusterManager clusterManagementThread = null;
132
133     private MisfireHandler misfireHandler = null;
134
135     private ClassLoadHelper classLoadHelper;
136
137     private SchedulerSignaler signaler;
138
139     protected int maxToRecoverAtATime = 20;
140     
141     private boolean setTxIsolationLevelSequential = false;
142     
143     private long dbRetryInterval = 10000;
144     
145     private boolean makeThreadsDaemons = false;
146     
147     private boolean doubleCheckLockMisfireHandler = true;
148     
149     private final Log log = LogFactory.getLog(getClass());
150     
151     /*
152      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
153      *
154      * Interface.
155      *
156      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
157      */

158
159     /**
160      * <p>
161      * Set the name of the <code>DataSource</code> that should be used for
162      * performing database functions.
163      * </p>
164      */

165     public void setDataSource(String JavaDoc dsName) {
166         this.dsName = dsName;
167     }
168
169     /**
170      * <p>
171      * Get the name of the <code>DataSource</code> that should be used for
172      * performing database functions.
173      * </p>
174      */

175     public String JavaDoc getDataSource() {
176         return dsName;
177     }
178
179     /**
180      * <p>
181      * Set the prefix that should be pre-pended to all table names.
182      * </p>
183      */

184     public void setTablePrefix(String JavaDoc prefix) {
185         if (prefix == null) {
186             prefix = "";
187         }
188
189         this.tablePrefix = prefix;
190     }
191
192     /**
193      * <p>
194      * Get the prefix that should be pre-pended to all table names.
195      * </p>
196      */

197     public String JavaDoc getTablePrefix() {
198         return tablePrefix;
199     }
200
201     /**
202      * <p>
203      * Set whether String-only properties will be handled in JobDataMaps.
204      * </p>
205      */

206     public void setUseProperties(String JavaDoc useProp) {
207         if (useProp == null) {
208             useProp = "false";
209         }
210
211         this.useProperties = Boolean.valueOf(useProp).booleanValue();
212     }
213
214     /**
215      * <p>
216      * Get whether String-only properties will be handled in JobDataMaps.
217      * </p>
218      */

219     public boolean canUseProperties() {
220         return useProperties;
221     }
222
223     /**
224      * <p>
225      * Set the instance Id of the Scheduler (must be unique within a cluster).
226      * </p>
227      */

228     public void setInstanceId(String JavaDoc instanceId) {
229         this.instanceId = instanceId;
230     }
231
232     /**
233      * <p>
234      * Get the instance Id of the Scheduler (must be unique within a cluster).
235      * </p>
236      */

237     public String JavaDoc getInstanceId() {
238
239         return instanceId;
240     }
241
242     /**
243      * Set the instance name of the Scheduler (must be unique within this server instance).
244      */

245     public void setInstanceName(String JavaDoc instanceName) {
246         this.instanceName = instanceName;
247     }
248
249     /**
250      * Get the instance name of the Scheduler (must be unique within this server instance).
251      */

252     public String JavaDoc getInstanceName() {
253
254         return instanceName;
255     }
256
257     /**
258      * <p>
259      * Set whether this instance is part of a cluster.
260      * </p>
261      */

262     public void setIsClustered(boolean isClustered) {
263         this.isClustered = isClustered;
264     }
265
266     /**
267      * <p>
268      * Get whether this instance is part of a cluster.
269      * </p>
270      */

271     public boolean isClustered() {
272         return isClustered;
273     }
274
275     /**
276      * <p>
277      * Get the frequency (in milliseconds) at which this instance "checks-in"
278      * with the other instances of the cluster. -- Affects the rate of
279      * detecting failed instances.
280      * </p>
281      */

282     public long getClusterCheckinInterval() {
283         return clusterCheckinInterval;
284     }
285
286     /**
287      * <p>
288      * Set the frequency (in milliseconds) at which this instance "checks-in"
289      * with the other instances of the cluster. -- Affects the rate of
290      * detecting failed instances.
291      * </p>
292      */

293     public void setClusterCheckinInterval(long l) {
294         clusterCheckinInterval = l;
295     }
296
297     /**
298      * <p>
299      * Get the maximum number of misfired triggers that the misfire handling
300      * thread will try to recover at one time (within one transaction). The
301      * default is 20.
302      * </p>
303      */

304     public int getMaxMisfiresToHandleAtATime() {
305         return maxToRecoverAtATime;
306     }
307
308     /**
309      * <p>
310      * Set the maximum number of misfired triggers that the misfire handling
311      * thread will try to recover at one time (within one transaction). The
312      * default is 20.
313      * </p>
314      */

315     public void setMaxMisfiresToHandleAtATime(int maxToRecoverAtATime) {
316         this.maxToRecoverAtATime = maxToRecoverAtATime;
317     }
318
319     /**
320      * @return Returns the dbRetryInterval.
321      */

322     public long getDbRetryInterval() {
323         return dbRetryInterval;
324     }
325     /**
326      * @param dbRetryInterval The dbRetryInterval to set.
327      */

328     public void setDbRetryInterval(long dbRetryInterval) {
329         this.dbRetryInterval = dbRetryInterval;
330     }
331     
332     /**
333      * <p>
334      * Set whether this instance should use database-based thread
335      * synchronization.
336      * </p>
337      */

338     public void setUseDBLocks(boolean useDBLocks) {
339         this.useDBLocks = useDBLocks;
340     }
341
342     /**
343      * <p>
344      * Get whether this instance should use database-based thread
345      * synchronization.
346      * </p>
347      */

348     public boolean getUseDBLocks() {
349         return useDBLocks;
350     }
351
352     public boolean isLockOnInsert() {
353         return lockOnInsert;
354     }
355     
356     /**
357      * Whether or not to obtain locks when inserting new jobs/triggers.
358      * Defaults to <code>true</code>, which is safest - some db's (such as
359      * MS SQLServer) seem to require this to avoid deadlocks under high load,
360      * while others seem to do fine without.
361      *
362      * <p>Setting this property to <code>false</code> will provide a
363      * significant performance increase during the addition of new jobs
364      * and triggers.</p>
365      *
366      * @param lockOnInsert
367      */

368     public void setLockOnInsert(boolean lockOnInsert) {
369         this.lockOnInsert = lockOnInsert;
370     }
371     
372     public long getMisfireThreshold() {
373         return misfireThreshold;
374     }
375
376     /**
377      * The the number of milliseconds by which a trigger must have missed its
378      * next-fire-time, in order for it to be considered "misfired" and thus
379      * have its misfire instruction applied.
380      *
381      * @param misfireThreshold
382      */

383     public void setMisfireThreshold(long misfireThreshold) {
384         if (misfireThreshold < 1) {
385             throw new IllegalArgumentException JavaDoc(
386                     "Misfirethreshold must be larger than 0");
387         }
388         this.misfireThreshold = misfireThreshold;
389     }
390
391     public boolean isDontSetAutoCommitFalse() {
392         return dontSetAutoCommitFalse;
393     }
394
395     /**
396      * Don't call set autocommit(false) on connections obtained from the
397      * DataSource. This can be helpfull in a few situations, such as if you
398      * have a driver that complains if it is called when it is already off.
399      *
400      * @param b
401      */

402     public void setDontSetAutoCommitFalse(boolean b) {
403         dontSetAutoCommitFalse = b;
404     }
405
406     public boolean isTxIsolationLevelSerializable() {
407         return setTxIsolationLevelSequential;
408     }
409
410     /**
411      * Set the transaction isolation level of DB connections to sequential.
412      *
413      * @param b
414      */

415     public void setTxIsolationLevelSerializable(boolean b) {
416         setTxIsolationLevelSequential = b;
417     }
418
419     
420     /**
421      * <p>
422      * Set the JDBC driver delegate class.
423      * </p>
424      *
425      * @param delegateClassName
426      * the delegate class name
427      */

428     public void setDriverDelegateClass(String JavaDoc delegateClassName)
429         throws InvalidConfigurationException {
430         this.delegateClassName = delegateClassName;
431     }
432
433     /**
434      * <p>
435      * Get the JDBC driver delegate class name.
436      * </p>
437      *
438      * @return the delegate class name
439      */

440     public String JavaDoc getDriverDelegateClass() {
441         return delegateClassName;
442     }
443
444     public String JavaDoc getSelectWithLockSQL() {
445         return selectWithLockSQL;
446     }
447
448     /**
449      * <p>
450      * set the SQL statement to use to select and lock a row in the "locks"
451      * table.
452      * </p>
453      *
454      * @see StdRowLockSemaphore
455      */

456     public void setSelectWithLockSQL(String JavaDoc string) {
457         selectWithLockSQL = string;
458     }
459
460     protected ClassLoadHelper getClassLoadHelper() {
461         return classLoadHelper;
462     }
463
464     /**
465      * Get whether the threads spawned by this JobStore should be
466      * marked as daemon. Possible threads include the <code>MisfireHandler</code>
467      * and the <code>ClusterManager</code>.
468      *
469      * @see Thread#setDaemon(boolean)
470      */

471     public boolean getMakeThreadsDaemons() {
472         return makeThreadsDaemons;
473     }
474
475     /**
476      * Set whether the threads spawned by this JobStore should be
477      * marked as daemon. Possible threads include the <code>MisfireHandler</code>
478      * and the <code>ClusterManager</code>.
479      *
480      * @see Thread#setDaemon(boolean)
481      */

482     public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
483         this.makeThreadsDaemons = makeThreadsDaemons;
484     }
485     
486     /**
487      * Get whether to check to see if there are Triggers that have misfired
488      * before actually acquiring the lock to recover them. This should be
489      * set to false if the majority of the time, there are are misfired
490      * Triggers.
491      */

492     public boolean getDoubleCheckLockMisfireHandler() {
493         return doubleCheckLockMisfireHandler;
494     }
495
496     /**
497      * Set whether to check to see if there are Triggers that have misfired
498      * before actually acquiring the lock to recover them. This should be
499      * set to false if the majority of the time, there are are misfired
500      * Triggers.
501      */

502     public void setDoubleCheckLockMisfireHandler(
503             boolean doubleCheckLockMisfireHandler) {
504         this.doubleCheckLockMisfireHandler = doubleCheckLockMisfireHandler;
505     }
506     
507     //---------------------------------------------------------------------------
508
// interface methods
509
//---------------------------------------------------------------------------
510

511     protected Log getLog() {
512         return log;
513     }
514
515     /**
516      * <p>
517      * Called by the QuartzScheduler before the <code>JobStore</code> is
518      * used, in order to give it a chance to initialize.
519      * </p>
520      */

521     public void initialize(ClassLoadHelper loadHelper,
522             SchedulerSignaler signaler) throws SchedulerConfigException {
523
524         if (dsName == null) {
525             throw new SchedulerConfigException("DataSource name not set.");
526         }
527
528         classLoadHelper = loadHelper;
529         this.signaler = signaler;
530
531         // If the user hasn't specified an explicit lock handler, then
532
// choose one based on CMT/Clustered/UseDBLocks.
533
if (getLockHandler() == null) {
534             
535             // If the user hasn't specified an explicit lock handler,
536
// then we *must* use DB locks with clustering
537
if (isClustered()) {
538                 setUseDBLocks(true);
539             }
540             
541             if (getUseDBLocks()) {
542                 getLog().info(
543                     "Using db table-based data access locking (synchronization).");
544                 setLockHandler(
545                     new StdRowLockSemaphore(getTablePrefix(), getSelectWithLockSQL()));
546             } else {
547                 getLog().info(
548                     "Using thread monitor-based data access locking (synchronization).");
549                 setLockHandler(new SimpleSemaphore());
550             }
551         }
552
553         if (!isClustered()) {
554             try {
555                 cleanVolatileTriggerAndJobs();
556             } catch (SchedulerException se) {
557                 throw new SchedulerConfigException(
558                         "Failure occured during job recovery.", se);
559             }
560         }
561     }
562    
563     /**
564      * @see org.quartz.spi.JobStore#schedulerStarted()
565      */

566     public void schedulerStarted() throws SchedulerException {
567
568         if (isClustered()) {
569             clusterManagementThread = new ClusterManager();
570             clusterManagementThread.initialize();
571         } else {
572             try {
573                 recoverJobs();
574             } catch (SchedulerException se) {
575                 throw new SchedulerConfigException(
576                         "Failure occured during job recovery.", se);
577             }
578         }
579
580         misfireHandler = new MisfireHandler();
581         misfireHandler.initialize();
582     }
583     
584     /**
585      * <p>
586      * Called by the QuartzScheduler to inform the <code>JobStore</code> that
587      * it should free up all of it's resources because the scheduler is
588      * shutting down.
589      * </p>
590      */

591     public void shutdown() {
592         if (clusterManagementThread != null) {
593             clusterManagementThread.shutdown();
594         }
595
596         if (misfireHandler != null) {
597             misfireHandler.shutdown();
598         }
599         
600         try {
601             DBConnectionManager.getInstance().shutdown(getDataSource());
602         } catch (SQLException JavaDoc sqle) {
603             getLog().warn("Database connection shutdown unsuccessful.", sqle);
604         }
605     }
606
607     public boolean supportsPersistence() {
608         return true;
609     }
610
611     //---------------------------------------------------------------------------
612
// helper methods for subclasses
613
//---------------------------------------------------------------------------
614

615     protected abstract Connection JavaDoc getNonManagedTXConnection()
616         throws JobPersistenceException;
617
618     /**
619      * Wrap the given <code>Connection</code> in a Proxy such that attributes
620      * that might be set will be restored before the connection is closed
621      * (and potentially restored to a pool).
622      */

623     protected Connection JavaDoc getAttributeRestoringConnection(Connection JavaDoc conn) {
624         return (Connection JavaDoc)Proxy.newProxyInstance(
625                 Thread.currentThread().getContextClassLoader(),
626                 new Class JavaDoc[] { Connection JavaDoc.class },
627                 new AttributeRestoringConnectionInvocationHandler(conn));
628     }
629     
630     protected Connection JavaDoc getConnection() throws JobPersistenceException {
631         Connection JavaDoc conn = null;
632         try {
633             conn = DBConnectionManager.getInstance().getConnection(
634                     getDataSource());
635         } catch (SQLException JavaDoc sqle) {
636             throw new JobPersistenceException(
637                     "Failed to obtain DB connection from data source '"
638                     + getDataSource() + "': " + sqle.toString(), sqle);
639         } catch (Throwable JavaDoc e) {
640             throw new JobPersistenceException(
641                     "Failed to obtain DB connection from data source '"
642                     + getDataSource() + "': " + e.toString(), e,
643                     JobPersistenceException.ERR_PERSISTENCE_CRITICAL_FAILURE);
644         }
645
646         if (conn == null) {
647             throw new JobPersistenceException(
648                 "Could not get connection from DataSource '"
649                 + getDataSource() + "'");
650         }
651
652         // Protect connection attributes we might change.
653
conn = getAttributeRestoringConnection(conn);
654
655         // Set any connection connection attributes we are to override.
656
try {
657             if (!isDontSetAutoCommitFalse()) {
658                 conn.setAutoCommit(false);
659             }
660
661             if(isTxIsolationLevelSerializable()) {
662                 conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
663             }
664         } catch (SQLException JavaDoc sqle) {
665             getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle);
666         } catch (Throwable JavaDoc e) {
667             try { conn.close(); } catch(Throwable JavaDoc tt) {}
668             
669             throw new JobPersistenceException(
670                 "Failure setting up connection.", e);
671         }
672     
673         return conn;
674     }
675     
676     protected void releaseLock(Connection JavaDoc conn, String JavaDoc lockName, boolean doIt) {
677         if (doIt && conn != null) {
678             try {
679                 getLockHandler().releaseLock(conn, lockName);
680             } catch (LockException le) {
681                 getLog().error("Error returning lock: " + le.getMessage(), le);
682             }
683         }
684     }
685     
686     /**
687      * Removes all volatile data.
688      *
689      * @throws JobPersistenceException If jobs could not be recovered.
690      */

691     protected void cleanVolatileTriggerAndJobs()
692         throws JobPersistenceException {
693         executeInNonManagedTXLock(
694             LOCK_TRIGGER_ACCESS,
695             new VoidTransactionCallback() {
696                 public void execute(Connection JavaDoc conn) throws JobPersistenceException {
697                     cleanVolatileTriggerAndJobs(conn);
698                 }
699             });
700     }
701     
702     /**
703      * <p>
704      * Removes all volatile data.
705      * </p>
706      *
707      * @throws JobPersistenceException
708      * if jobs could not be recovered
709      */

710     protected void cleanVolatileTriggerAndJobs(Connection JavaDoc conn)
711         throws JobPersistenceException {
712         try {
713             // find volatile jobs & triggers...
714
Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn);
715             Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);
716
717             for (int i = 0; i < volatileTriggers.length; i++) {
718                 removeTrigger(conn, null, volatileTriggers[i].getName(),
719                         volatileTriggers[i].getGroup());
720             }
721             getLog().info(
722                     "Removed " + volatileTriggers.length
723                             + " Volatile Trigger(s).");
724
725             for (int i = 0; i < volatileJobs.length; i++) {
726                 removeJob(conn, null, volatileJobs[i].getName(),
727                         volatileJobs[i].getGroup(), true);
728             }
729             getLog().info(
730                     "Removed " + volatileJobs.length + " Volatile Job(s).");
731
732             // clean up any fired trigger entries
733
getDelegate().deleteVolatileFiredTriggers(conn);
734
735         } catch (Exception JavaDoc e) {
736             throw new JobPersistenceException("Couldn't clean volatile data: "
737                     + e.getMessage(), e);
738         }
739     }
740
741     /**
742      * Recover any failed or misfired jobs and clean up the data store as
743      * appropriate.
744      *
745      * @throws JobPersistenceException if jobs could not be recovered
746      */

747     protected void recoverJobs() throws JobPersistenceException {
748         executeInNonManagedTXLock(
749             LOCK_TRIGGER_ACCESS,
750             new VoidTransactionCallback() {
751                 public void execute(Connection JavaDoc conn) throws JobPersistenceException {
752                     recoverJobs(conn);
753                 }
754             });
755     }
756     
757     /**
758      * <p>
759      * Will recover any failed or misfired jobs and clean up the data store as
760      * appropriate.
761      * </p>
762      *
763      * @throws JobPersistenceException
764      * if jobs could not be recovered
765      */

766     protected void recoverJobs(Connection JavaDoc conn) throws JobPersistenceException {
767         try {
768             // update inconsistent job states
769
int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
770                     STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);
771
772             rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
773                         STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
774             
775             getLog().info(
776                     "Freed " + rows
777                             + " triggers from 'acquired' / 'blocked' state.");
778
779             // clean up misfired jobs
780
recoverMisfiredJobs(conn, true);
781             
782             // recover jobs marked for recovery that were not fully executed
783
Trigger[] recoveringJobTriggers = getDelegate()
784                     .selectTriggersForRecoveringJobs(conn);
785             getLog()
786                     .info(
787                             "Recovering "
788                                     + recoveringJobTriggers.length
789                                     + " jobs that were in-progress at the time of the last shut-down.");
790
791             for (int i = 0; i < recoveringJobTriggers.length; ++i) {
792                 if (jobExists(conn, recoveringJobTriggers[i].getJobName(),
793                         recoveringJobTriggers[i].getJobGroup())) {
794                     recoveringJobTriggers[i].computeFirstFireTime(null);
795                     storeTrigger(conn, null, recoveringJobTriggers[i], null, false,
796                             STATE_WAITING, false, true);
797                 }
798             }
799             getLog().info("Recovery complete.");
800
801             // remove lingering 'complete' triggers...
802
Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
803             for(int i=0; ct != null && i < ct.length; i++) {
804                 removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup());
805             }
806             getLog().info(
807                 "Removed " + ct.length
808                 + " 'complete' triggers.");
809             
810             // clean up any fired trigger entries
811
int n = getDelegate().deleteFiredTriggers(conn);
812             getLog().info("Removed " + n + " stale fired job entries.");
813         } catch (JobPersistenceException e) {
814             throw e;
815         } catch (Exception JavaDoc e) {
816             throw new JobPersistenceException("Couldn't recover jobs: "
817                     + e.getMessage(), e);
818         }
819     }
820
821     protected long getMisfireTime() {
822         long misfireTime = System.currentTimeMillis();
823         if (getMisfireThreshold() > 0) {
824             misfireTime -= getMisfireThreshold();
825         }
826
827         return (misfireTime > 0) ? misfireTime : 0;
828     }
829
830     /**
831      * Helper class for returning the composite result of trying
832      * to recover misfired jobs.
833      */

834     protected static class RecoverMisfiredJobsResult {
835         public static final RecoverMisfiredJobsResult NO_OP =
836             new RecoverMisfiredJobsResult(false, 0);
837         
838         private boolean _hasMoreMisfiredTriggers;
839         private int _processedMisfiredTriggerCount;
840         
841         public RecoverMisfiredJobsResult(
842             boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount) {
843             _hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
844             _processedMisfiredTriggerCount = processedMisfiredTriggerCount;
845         }
846         
847         public boolean hasMoreMisfiredTriggers() {
848             return _hasMoreMisfiredTriggers;
849         }
850         public int getProcessedMisfiredTriggerCount() {
851             return _processedMisfiredTriggerCount;
852         }
853     }
854     
855     protected RecoverMisfiredJobsResult recoverMisfiredJobs(
856         Connection JavaDoc conn, boolean recovering)
857         throws JobPersistenceException, SQLException JavaDoc {
858
859         // If recovering, we want to handle all of the misfired
860
// triggers right away.
861
int maxMisfiresToHandleAtATime =
862             (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
863         
864         List JavaDoc misfiredTriggers = new ArrayList JavaDoc();
865         
866         // We must still look for the MISFIRED state in case triggers were left
867
// in this state when upgrading to this version that does not support it.
868
boolean hasMoreMisfiredTriggers =
869             getDelegate().selectMisfiredTriggersInStates(
870                 conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime(),
871                 maxMisfiresToHandleAtATime, misfiredTriggers);
872
873         if (hasMoreMisfiredTriggers) {
874             getLog().info(
875                 "Handling the first " + misfiredTriggers.size() +
876                 " triggers that missed their scheduled fire-time. " +
877                 "More misfired triggers remain to be processed.");
878         } else if (misfiredTriggers.size() > 0) {
879             getLog().info(
880                 "Handling " + misfiredTriggers.size() +
881                 " trigger(s) that missed their scheduled fire-time.");
882         } else {
883             getLog().debug(
884                 "Found 0 triggers that missed their scheduled fire-time.");
885             return RecoverMisfiredJobsResult.NO_OP;
886         }
887
888         for (Iterator JavaDoc misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter.hasNext();) {
889             Key triggerKey = (Key) misfiredTriggerIter.next();
890             
891             Trigger trig =
892                 retrieveTrigger(conn, triggerKey.getName(), triggerKey.getGroup());
893
894             if (trig == null) {
895                 continue;
896             }
897
898             doUpdateOfMisfiredTrigger(conn, null, trig, false, STATE_WAITING, recovering);
899         }
900
901         return new RecoverMisfiredJobsResult(
902                 hasMoreMisfiredTriggers, misfiredTriggers.size());
903     }
904
905     protected boolean updateMisfiredTrigger(Connection JavaDoc conn,
906             SchedulingContext ctxt, String JavaDoc triggerName, String JavaDoc groupName,
907             String JavaDoc newStateIfNotComplete, boolean forceState) // TODO: probably
908
// get rid of
909
// this
910
throws JobPersistenceException {
911         try {
912
913             Trigger trig = getDelegate().selectTrigger(conn, triggerName,
914                     groupName);
915
916             long misfireTime = System.currentTimeMillis();
917             if (getMisfireThreshold() > 0) {
918                 misfireTime -= getMisfireThreshold();
919             }
920
921             if (trig.getNextFireTime().getTime() > misfireTime) {
922                 return false;
923             }
924
925             doUpdateOfMisfiredTrigger(conn, ctxt, trig, forceState, newStateIfNotComplete, false);
926             
927             return true;
928
929         } catch (Exception JavaDoc e) {
930             throw new JobPersistenceException(
931                     "Couldn't update misfired trigger '" + groupName + "."
932                             + triggerName + "': " + e.getMessage(), e);
933         }
934     }
935
936     private void doUpdateOfMisfiredTrigger(Connection JavaDoc conn, SchedulingContext ctxt, Trigger trig, boolean forceState, String JavaDoc newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
937             Calendar cal = null;
938             if (trig.getCalendarName() != null) {
939                 cal = retrieveCalendar(conn, ctxt, trig.getCalendarName());
940             }
941
942             signaler.notifyTriggerListenersMisfired(trig);
943
944             trig.updateAfterMisfire(cal);
945
946             if (trig.getNextFireTime() == null) {
947                 storeTrigger(conn, ctxt, trig,
948                     null, true, STATE_COMPLETE, forceState, recovering);
949             } else {
950                 storeTrigger(conn, ctxt, trig, null, true, newStateIfNotComplete,
951                         forceState, false);
952             }
953     }
954
955     /**
956      * <p>
957      * Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
958      * </p>
959      *
960      * @param newJob
961      * The <code>JobDetail</code> to be stored.
962      * @param newTrigger
963      * The <code>Trigger</code> to be stored.
964      * @throws ObjectAlreadyExistsException
965      * if a <code>Job</code> with the same name/group already
966      * exists.
967      */

968     public void storeJobAndTrigger(final SchedulingContext ctxt, final JobDetail newJob,
969             final Trigger newTrigger)
970         throws ObjectAlreadyExistsException, JobPersistenceException {
971         executeInLock(
972             (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
973             new VoidTransactionCallback() {
974                 public void execute(Connection JavaDoc conn) throws JobPersistenceException {
975                     if (newJob.isVolatile() && !newTrigger.isVolatile()) {
976                         JobPersistenceException jpe =
977                             new JobPersistenceException(
978                                 "Cannot associate non-volatile trigger with a volatile job!");
979                         jpe.setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
980                         throw jpe;
981                     }
982
983                     storeJob(conn, ctxt, newJob, false);
984                     storeTrigger(conn, ctxt, newTrigger, newJob, false,
985                             Constants.STATE_WAITING, false, false);
986                 }
987             });
988     }
989     
990     /**
991      * <p>
992      * Store the given <code>{@link org.quartz.JobDetail}</code>.
993      * </p>
994      *
995      * @param newJob
996      * The <code>JobDetail</code> to be stored.
997      * @param replaceExisting
998      * If <code>true</code>, any <code>Job</code> existing in the
999      * <code>JobStore</code> with the same name & group should be
1000     * over-written.
1001     * @throws ObjectAlreadyExistsException
1002     * if a <code>Job</code> with the same name/group already
1003     * exists, and replaceExisting is set to false.
1004     */

1005    public void storeJob(final SchedulingContext ctxt, final JobDetail newJob,
1006        final boolean replaceExisting) throws ObjectAlreadyExistsException, JobPersistenceException {
1007        executeInLock(
1008            (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
1009            new VoidTransactionCallback() {
1010                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
1011                    storeJob(conn, ctxt, newJob, replaceExisting);
1012                }
1013            });
1014    }
1015    
1016    /**
1017     * <p>
1018     * Insert or update a job.
1019     * </p>
1020     */

1021    protected void storeJob(Connection JavaDoc conn, SchedulingContext ctxt,
1022            JobDetail newJob, boolean replaceExisting)
1023        throws ObjectAlreadyExistsException, JobPersistenceException {
1024        if (newJob.isVolatile() && isClustered()) {
1025            getLog().info(
1026                "note: volatile jobs are effectively non-volatile in a clustered environment.");
1027        }
1028
1029        boolean existingJob = jobExists(conn, newJob.getName(), newJob
1030                .getGroup());
1031        try {
1032            if (existingJob) {
1033                if (!replaceExisting) {
1034                    throw new ObjectAlreadyExistsException(newJob);
1035                }
1036                getDelegate().updateJobDetail(conn, newJob);
1037            } else {
1038                getDelegate().insertJobDetail(conn, newJob);
1039            }
1040        } catch (IOException JavaDoc e) {
1041            throw new JobPersistenceException("Couldn't store job: "
1042                    + e.getMessage(), e);
1043        } catch (SQLException JavaDoc e) {
1044            throw new JobPersistenceException("Couldn't store job: "
1045                    + e.getMessage(), e);
1046        }
1047    }
1048
1049    /**
1050     * <p>
1051     * Check existence of a given job.
1052     * </p>
1053     */

1054    protected boolean jobExists(Connection JavaDoc conn, String JavaDoc jobName,
1055            String JavaDoc groupName) throws JobPersistenceException {
1056        try {
1057            return getDelegate().jobExists(conn, jobName, groupName);
1058        } catch (SQLException JavaDoc e) {
1059            throw new JobPersistenceException(
1060                    "Couldn't determine job existence (" + groupName + "."
1061                            + jobName + "): " + e.getMessage(), e);
1062        }
1063    }
1064
1065
1066    /**
1067     * <p>
1068     * Store the given <code>{@link org.quartz.Trigger}</code>.
1069     * </p>
1070     *
1071     * @param newTrigger
1072     * The <code>Trigger</code> to be stored.
1073     * @param replaceExisting
1074     * If <code>true</code>, any <code>Trigger</code> existing in
1075     * the <code>JobStore</code> with the same name & group should
1076     * be over-written.
1077     * @throws ObjectAlreadyExistsException
1078     * if a <code>Trigger</code> with the same name/group already
1079     * exists, and replaceExisting is set to false.
1080     */

1081    public void storeTrigger(final SchedulingContext ctxt, final Trigger newTrigger,
1082        final boolean replaceExisting) throws ObjectAlreadyExistsException,
1083            JobPersistenceException {
1084        executeInLock(
1085            (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
1086            new VoidTransactionCallback() {
1087                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
1088                    storeTrigger(conn, ctxt, newTrigger, null, replaceExisting,
1089                        STATE_WAITING, false, false);
1090                }
1091            });
1092    }
1093    
1094    /**
1095     * <p>
1096     * Insert or update a trigger.
1097     * </p>
1098     */

1099    protected void storeTrigger(Connection JavaDoc conn, SchedulingContext ctxt,
1100            Trigger newTrigger, JobDetail job, boolean replaceExisting, String JavaDoc state,
1101            boolean forceState, boolean recovering)
1102        throws ObjectAlreadyExistsException, JobPersistenceException {
1103        if (newTrigger.isVolatile() && isClustered()) {
1104            getLog().info(
1105                "note: volatile triggers are effectively non-volatile in a clustered environment.");
1106        }
1107
1108        boolean existingTrigger = triggerExists(conn, newTrigger.getName(),
1109                newTrigger.getGroup());
1110
1111        if ((existingTrigger) && (!replaceExisting)) {
1112            throw new ObjectAlreadyExistsException(newTrigger);
1113        }
1114        
1115        try {
1116
1117            boolean shouldBepaused = false;
1118
1119            if (!forceState) {
1120                shouldBepaused = getDelegate().isTriggerGroupPaused(
1121                        conn, newTrigger.getGroup());
1122
1123                if(!shouldBepaused) {
1124                    shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
1125                            ALL_GROUPS_PAUSED);
1126
1127                    if (shouldBepaused) {
1128                        getDelegate().insertPausedTriggerGroup(conn, newTrigger.getGroup());
1129                    }
1130                }
1131
1132                if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) {
1133                    state = STATE_PAUSED;
1134                }
1135            }
1136
1137            if(job == null) {
1138                job = getDelegate().selectJobDetail(conn,
1139                    newTrigger.getJobName(), newTrigger.getJobGroup(),
1140                    getClassLoadHelper());
1141            }
1142            if (job == null) {
1143                throw new JobPersistenceException("The job ("
1144                        + newTrigger.getFullJobName()
1145                        + ") referenced by the trigger does not exist.");
1146            }
1147            if (job.isVolatile() && !newTrigger.isVolatile()) {
1148                throw new JobPersistenceException(
1149                        "It does not make sense to "
1150                                + "associate a non-volatile Trigger with a volatile Job!");
1151            }
1152
1153            if (job.isStateful() && !recovering) {
1154                state = checkBlockedState(conn, ctxt, job.getName(),
1155                        job.getGroup(), state);
1156            }
1157            
1158            if (existingTrigger) {
1159                if (newTrigger.getClass() == SimpleTrigger.class) {
1160                    getDelegate().updateSimpleTrigger(conn,
1161                            (SimpleTrigger) newTrigger);
1162                } else if (newTrigger.getClass() == CronTrigger.class) {
1163                    getDelegate().updateCronTrigger(conn,
1164                            (CronTrigger) newTrigger);
1165                } else {
1166                    getDelegate().updateBlobTrigger(conn, newTrigger);
1167                }
1168                getDelegate().updateTrigger(conn, newTrigger, state, job);
1169            } else {
1170                getDelegate().insertTrigger(conn, newTrigger, state, job);
1171                if (newTrigger.getClass() == SimpleTrigger.class) {
1172                    getDelegate().insertSimpleTrigger(conn,
1173                            (SimpleTrigger) newTrigger);
1174                } else if (newTrigger.getClass() == CronTrigger.class) {
1175                    getDelegate().insertCronTrigger(conn,
1176                            (CronTrigger) newTrigger);
1177                } else {
1178                    getDelegate().insertBlobTrigger(conn, newTrigger);
1179                }
1180            }
1181        } catch (Exception JavaDoc e) {
1182            throw new JobPersistenceException("Couldn't store trigger: "
1183                    + e.getMessage(), e);
1184        }
1185    }
1186
1187    /**
1188     * <p>
1189     * Check existence of a given trigger.
1190     * </p>
1191     */

1192    protected boolean triggerExists(Connection JavaDoc conn, String JavaDoc triggerName,
1193            String JavaDoc groupName) throws JobPersistenceException {
1194        try {
1195            return getDelegate().triggerExists(conn, triggerName, groupName);
1196        } catch (SQLException JavaDoc e) {
1197            throw new JobPersistenceException(
1198                    "Couldn't determine trigger existence (" + groupName + "."
1199                            + triggerName + "): " + e.getMessage(), e);
1200        }
1201    }
1202
1203    /**
1204     * <p>
1205     * Remove (delete) the <code>{@link org.quartz.Job}</code> with the given
1206     * name, and any <code>{@link org.quartz.Trigger}</code> s that reference
1207     * it.
1208     * </p>
1209     *
1210     * <p>
1211     * If removal of the <code>Job</code> results in an empty group, the
1212     * group should be removed from the <code>JobStore</code>'s list of
1213     * known group names.
1214     * </p>
1215     *
1216     * @param jobName
1217     * The name of the <code>Job</code> to be removed.
1218     * @param groupName
1219     * The group name of the <code>Job</code> to be removed.
1220     * @return <code>true</code> if a <code>Job</code> with the given name &
1221     * group was found and removed from the store.
1222     */

1223    public boolean removeJob(final SchedulingContext ctxt, final String JavaDoc jobName,
1224        final String JavaDoc groupName) throws JobPersistenceException {
1225        return ((Boolean JavaDoc)executeInLock(
1226                LOCK_TRIGGER_ACCESS,
1227                new TransactionCallback() {
1228                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1229                        return removeJob(conn, ctxt, jobName, groupName, true) ?
1230                                Boolean.TRUE : Boolean.FALSE;
1231                    }
1232                })).booleanValue();
1233    }
1234    
1235    protected boolean removeJob(Connection JavaDoc conn, SchedulingContext ctxt,
1236            String JavaDoc jobName, String JavaDoc groupName, boolean activeDeleteSafe)
1237        throws JobPersistenceException {
1238
1239        try {
1240            Key[] jobTriggers = getDelegate().selectTriggerNamesForJob(conn,
1241                    jobName, groupName);
1242            for (int i = 0; i < jobTriggers.length; ++i) {
1243                deleteTriggerAndChildren(
1244                    conn, jobTriggers[i].getName(), jobTriggers[i].getGroup());
1245            }
1246
1247            return deleteJobAndChildren(conn, ctxt, jobName, groupName);
1248        } catch (SQLException JavaDoc e) {
1249            throw new JobPersistenceException("Couldn't remove job: "
1250                    + e.getMessage(), e);
1251        }
1252    }
1253
1254    /**
1255     * Delete a job and its listeners.
1256     *
1257     * @see #removeJob(Connection, SchedulingContext, String, String, boolean)
1258     * @see #removeTrigger(Connection, SchedulingContext, String, String)
1259     */

1260    private boolean deleteJobAndChildren(Connection JavaDoc conn,
1261            SchedulingContext ctxt, String JavaDoc jobName, String JavaDoc groupName)
1262        throws NoSuchDelegateException, SQLException JavaDoc {
1263        getDelegate().deleteJobListeners(conn, jobName, groupName);
1264
1265        return (getDelegate().deleteJobDetail(conn, jobName, groupName) > 0);
1266    }
1267    
1268    /**
1269     * Delete a trigger, its listeners, and its Simple/Cron/BLOB sub-table entry.
1270     *
1271     * @see #removeJob(Connection, SchedulingContext, String, String, boolean)
1272     * @see #removeTrigger(Connection, SchedulingContext, String, String)
1273     * @see #replaceTrigger(Connection, SchedulingContext, String, String, Trigger)
1274     */

1275    private boolean deleteTriggerAndChildren(
1276            Connection JavaDoc conn, String JavaDoc triggerName, String JavaDoc triggerGroupName)
1277        throws SQLException JavaDoc, NoSuchDelegateException {
1278        DriverDelegate delegate = getDelegate();
1279
1280        // Once it succeeds in deleting one sub-table entry it will not try the others.
1281
if ((delegate.deleteSimpleTrigger(conn, triggerName, triggerGroupName) == 0) &&
1282            (delegate.deleteCronTrigger(conn, triggerName, triggerGroupName) == 0)) {
1283            delegate.deleteBlobTrigger(conn, triggerName, triggerGroupName);
1284        }
1285        
1286        delegate.deleteTriggerListeners(conn, triggerName, triggerGroupName);
1287        
1288        return (delegate.deleteTrigger(conn, triggerName, triggerGroupName) > 0);
1289    }
1290    
1291    /**
1292     * <p>
1293     * Retrieve the <code>{@link org.quartz.JobDetail}</code> for the given
1294     * <code>{@link org.quartz.Job}</code>.
1295     * </p>
1296     *
1297     * @param jobName
1298     * The name of the <code>Job</code> to be retrieved.
1299     * @param groupName
1300     * The group name of the <code>Job</code> to be retrieved.
1301     * @return The desired <code>Job</code>, or null if there is no match.
1302     */

1303    public JobDetail retrieveJob(final SchedulingContext ctxt, final String JavaDoc jobName,
1304            final String JavaDoc groupName) throws JobPersistenceException {
1305        return (JobDetail)executeWithoutLock( // no locks necessary for read...
1306
new TransactionCallback() {
1307                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1308                    return retrieveJob(conn, ctxt, jobName, groupName);
1309                }
1310            });
1311    }
1312    
1313    protected JobDetail retrieveJob(Connection JavaDoc conn, SchedulingContext ctxt,
1314            String JavaDoc jobName, String JavaDoc groupName) throws JobPersistenceException {
1315        try {
1316            JobDetail job = getDelegate().selectJobDetail(conn, jobName,
1317                    groupName, getClassLoadHelper());
1318            if (job != null) {
1319                String JavaDoc[] listeners = getDelegate().selectJobListeners(conn,
1320                        jobName, groupName);
1321                for (int i = 0; i < listeners.length; ++i) {
1322                    job.addJobListener(listeners[i]);
1323                }
1324            }
1325
1326            return job;
1327        } catch (ClassNotFoundException JavaDoc e) {
1328            throw new JobPersistenceException(
1329                    "Couldn't retrieve job because a required class was not found: "
1330                            + e.getMessage(), e,
1331                    SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST);
1332        } catch (IOException JavaDoc e) {
1333            throw new JobPersistenceException(
1334                    "Couldn't retrieve job because the BLOB couldn't be deserialized: "
1335                            + e.getMessage(), e,
1336                    SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST);
1337        } catch (SQLException JavaDoc e) {
1338            throw new JobPersistenceException("Couldn't retrieve job: "
1339                    + e.getMessage(), e);
1340        }
1341    }
1342
1343    /**
1344     * <p>
1345     * Remove (delete) the <code>{@link org.quartz.Trigger}</code> with the
1346     * given name.
1347     * </p>
1348     *
1349     * <p>
1350     * If removal of the <code>Trigger</code> results in an empty group, the
1351     * group should be removed from the <code>JobStore</code>'s list of
1352     * known group names.
1353     * </p>
1354     *
1355     * <p>
1356     * If removal of the <code>Trigger</code> results in an 'orphaned' <code>Job</code>
1357     * that is not 'durable', then the <code>Job</code> should be deleted
1358     * also.
1359     * </p>
1360     *
1361     * @param triggerName
1362     * The name of the <code>Trigger</code> to be removed.
1363     * @param groupName
1364     * The group name of the <code>Trigger</code> to be removed.
1365     * @return <code>true</code> if a <code>Trigger</code> with the given
1366     * name & group was found and removed from the store.
1367     */

1368    public boolean removeTrigger(final SchedulingContext ctxt, final String JavaDoc triggerName,
1369        final String JavaDoc groupName) throws JobPersistenceException {
1370        return ((Boolean JavaDoc)executeInLock(
1371                LOCK_TRIGGER_ACCESS,
1372                new TransactionCallback() {
1373                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1374                        return removeTrigger(conn, ctxt, triggerName, groupName) ?
1375                                Boolean.TRUE : Boolean.FALSE;
1376                    }
1377                })).booleanValue();
1378    }
1379    
1380    protected boolean removeTrigger(Connection JavaDoc conn, SchedulingContext ctxt,
1381            String JavaDoc triggerName, String JavaDoc groupName)
1382        throws JobPersistenceException {
1383        boolean removedTrigger = false;
1384        try {
1385            // this must be called before we delete the trigger, obviously
1386
JobDetail job = getDelegate().selectJobForTrigger(conn,
1387                    triggerName, groupName, getClassLoadHelper());
1388
1389            removedTrigger =
1390                deleteTriggerAndChildren(conn, triggerName, groupName);
1391
1392            if (null != job && !job.isDurable()) {
1393                int numTriggers = getDelegate().selectNumTriggersForJob(conn,
1394                        job.getName(), job.getGroup());
1395                if (numTriggers == 0) {
1396                    // Don't call removeJob() because we don't want to check for
1397
// triggers again.
1398
deleteJobAndChildren(conn, ctxt, job.getName(), job.getGroup());
1399                }
1400            }
1401        } catch (ClassNotFoundException JavaDoc e) {
1402            throw new JobPersistenceException("Couldn't remove trigger: "
1403                    + e.getMessage(), e);
1404        } catch (SQLException JavaDoc e) {
1405            throw new JobPersistenceException("Couldn't remove trigger: "
1406                    + e.getMessage(), e);
1407        }
1408
1409        return removedTrigger;
1410    }
1411
1412    /**
1413     * @see org.quartz.spi.JobStore#replaceTrigger(org.quartz.core.SchedulingContext, java.lang.String, java.lang.String, org.quartz.Trigger)
1414     */

1415    public boolean replaceTrigger(final SchedulingContext ctxt, final String JavaDoc triggerName,
1416            final String JavaDoc groupName, final Trigger newTrigger) throws JobPersistenceException {
1417        return ((Boolean JavaDoc)executeInLock(
1418                LOCK_TRIGGER_ACCESS,
1419                new TransactionCallback() {
1420                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1421                        return replaceTrigger(conn, ctxt, triggerName, groupName, newTrigger) ?
1422                                Boolean.TRUE : Boolean.FALSE;
1423                    }
1424                })).booleanValue();
1425    }
1426    
1427    protected boolean replaceTrigger(Connection JavaDoc conn, SchedulingContext ctxt,
1428            String JavaDoc triggerName, String JavaDoc groupName, Trigger newTrigger)
1429        throws JobPersistenceException {
1430        try {
1431            // this must be called before we delete the trigger, obviously
1432
JobDetail job = getDelegate().selectJobForTrigger(conn,
1433                    triggerName, groupName, getClassLoadHelper());
1434
1435            if (job == null) {
1436                return false;
1437            }
1438            
1439            if (!newTrigger.getJobName().equals(job.getName()) ||
1440                !newTrigger.getJobGroup().equals(job.getGroup())) {
1441                throw new JobPersistenceException("New trigger is not related to the same job as the old trigger.");
1442            }
1443            
1444            boolean removedTrigger =
1445                deleteTriggerAndChildren(conn, triggerName, groupName);
1446            
1447            storeTrigger(conn, ctxt, newTrigger, job, false, STATE_WAITING, false, false);
1448
1449            return removedTrigger;
1450        } catch (ClassNotFoundException JavaDoc e) {
1451            throw new JobPersistenceException("Couldn't remove trigger: "
1452                    + e.getMessage(), e);
1453        } catch (SQLException JavaDoc e) {
1454            throw new JobPersistenceException("Couldn't remove trigger: "
1455                    + e.getMessage(), e);
1456        }
1457    }
1458
1459    /**
1460     * <p>
1461     * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
1462     * </p>
1463     *
1464     * @param triggerName
1465     * The name of the <code>Trigger</code> to be retrieved.
1466     * @param groupName
1467     * The group name of the <code>Trigger</code> to be retrieved.
1468     * @return The desired <code>Trigger</code>, or null if there is no
1469     * match.
1470     */

1471    public Trigger retrieveTrigger(final SchedulingContext ctxt, final String JavaDoc triggerName,
1472        final String JavaDoc groupName) throws JobPersistenceException {
1473        return (Trigger)executeWithoutLock( // no locks necessary for read...
1474
new TransactionCallback() {
1475                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1476                    return retrieveTrigger(conn, ctxt, triggerName, groupName);
1477                }
1478            });
1479    }
1480    
1481    protected Trigger retrieveTrigger(Connection JavaDoc conn, SchedulingContext ctxt,
1482            String JavaDoc triggerName, String JavaDoc groupName)
1483        throws JobPersistenceException {
1484        return retrieveTrigger(conn, triggerName, groupName);
1485    }
1486    
1487    protected Trigger retrieveTrigger(Connection JavaDoc conn, String JavaDoc triggerName, String JavaDoc groupName)
1488        throws JobPersistenceException {
1489        try {
1490            Trigger trigger = getDelegate().selectTrigger(conn, triggerName,
1491                    groupName);
1492            if (trigger == null) {
1493                return null;
1494            }
1495            
1496            // In case Trigger was BLOB, clear out any listeners that might
1497
// have been serialized.
1498
trigger.clearAllTriggerListeners();
1499            
1500            String JavaDoc[] listeners = getDelegate().selectTriggerListeners(conn,
1501                    triggerName, groupName);
1502            for (int i = 0; i < listeners.length; ++i) {
1503                trigger.addTriggerListener(listeners[i]);
1504            }
1505
1506            return trigger;
1507        } catch (Exception JavaDoc e) {
1508            throw new JobPersistenceException("Couldn't retrieve trigger: "
1509                    + e.getMessage(), e);
1510        }
1511    }
1512
1513    /**
1514     * <p>
1515     * Get the current state of the identified <code>{@link Trigger}</code>.
1516     * </p>
1517     *
1518     * @see Trigger#STATE_NORMAL
1519     * @see Trigger#STATE_PAUSED
1520     * @see Trigger#STATE_COMPLETE
1521     * @see Trigger#STATE_ERROR
1522     * @see Trigger#STATE_NONE
1523     */

1524    public int getTriggerState(final SchedulingContext ctxt, final String JavaDoc triggerName,
1525            final String JavaDoc groupName) throws JobPersistenceException {
1526        return ((Integer JavaDoc)executeWithoutLock( // no locks necessary for read...
1527
new TransactionCallback() {
1528                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1529                        return new Integer JavaDoc(getTriggerState(conn, ctxt, triggerName, groupName));
1530                    }
1531                })).intValue();
1532    }
1533    
1534    public int getTriggerState(Connection JavaDoc conn, SchedulingContext ctxt,
1535            String JavaDoc triggerName, String JavaDoc groupName)
1536        throws JobPersistenceException {
1537        try {
1538            String JavaDoc ts = getDelegate().selectTriggerState(conn, triggerName,
1539                    groupName);
1540
1541            if (ts == null) {
1542                return Trigger.STATE_NONE;
1543            }
1544
1545            if (ts.equals(STATE_DELETED)) {
1546                return Trigger.STATE_NONE;
1547            }
1548
1549            if (ts.equals(STATE_COMPLETE)) {
1550                return Trigger.STATE_COMPLETE;
1551            }
1552
1553            if (ts.equals(STATE_PAUSED)) {
1554                return Trigger.STATE_PAUSED;
1555            }
1556
1557            if (ts.equals(STATE_PAUSED_BLOCKED)) {
1558                return Trigger.STATE_PAUSED;
1559            }
1560
1561            if (ts.equals(STATE_ERROR)) {
1562                return Trigger.STATE_ERROR;
1563            }
1564
1565            if (ts.equals(STATE_BLOCKED)) {
1566                return Trigger.STATE_BLOCKED;
1567            }
1568
1569            return Trigger.STATE_NORMAL;
1570
1571        } catch (SQLException JavaDoc e) {
1572            throw new JobPersistenceException(
1573                    "Couldn't determine state of trigger (" + groupName + "."
1574                            + triggerName + "): " + e.getMessage(), e);
1575        }
1576    }
1577
1578    /**
1579     * <p>
1580     * Store the given <code>{@link org.quartz.Calendar}</code>.
1581     * </p>
1582     *
1583     * @param calName
1584     * The name of the calendar.
1585     * @param calendar
1586     * The <code>Calendar</code> to be stored.
1587     * @param replaceExisting
1588     * If <code>true</code>, any <code>Calendar</code> existing
1589     * in the <code>JobStore</code> with the same name & group
1590     * should be over-written.
1591     * @throws ObjectAlreadyExistsException
1592     * if a <code>Calendar</code> with the same name already
1593     * exists, and replaceExisting is set to false.
1594     */

1595    public void storeCalendar(final SchedulingContext ctxt, final String JavaDoc calName,
1596        final Calendar calendar, final boolean replaceExisting, final boolean updateTriggers)
1597        throws ObjectAlreadyExistsException, JobPersistenceException {
1598        executeInLock(
1599            (isLockOnInsert() || updateTriggers) ? LOCK_TRIGGER_ACCESS : null,
1600            new VoidTransactionCallback() {
1601                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
1602                    storeCalendar(conn, ctxt, calName, calendar, replaceExisting, updateTriggers);
1603                }
1604            });
1605    }
1606    
1607    protected void storeCalendar(Connection JavaDoc conn, SchedulingContext ctxt,
1608            String JavaDoc calName, Calendar calendar, boolean replaceExisting, boolean updateTriggers)
1609        throws ObjectAlreadyExistsException, JobPersistenceException {
1610        try {
1611            boolean existingCal = calendarExists(conn, calName);
1612            if (existingCal && !replaceExisting) {
1613                throw new ObjectAlreadyExistsException(
1614                    "Calendar with name '" + calName + "' already exists.");
1615            }
1616
1617            if (existingCal) {
1618                if (getDelegate().updateCalendar(conn, calName, calendar) < 1) {
1619                    throw new JobPersistenceException(
1620                        "Couldn't store calendar. Update failed.");
1621                }
1622                
1623                if(updateTriggers) {
1624                    Trigger[] trigs = getDelegate().selectTriggersForCalendar(conn, calName);
1625                    
1626                    for(int i=0; i < trigs.length; i++) {
1627                        trigs[i].updateWithNewCalendar(calendar, getMisfireThreshold());
1628                        storeTrigger(conn, ctxt, trigs[i], null, true, STATE_WAITING, false, false);
1629                    }
1630                }
1631            } else {
1632                if (getDelegate().insertCalendar(conn, calName, calendar) < 1) {
1633                    throw new JobPersistenceException(
1634                        "Couldn't store calendar. Insert failed.");
1635                }
1636            }
1637
1638            if (isClustered == false) {
1639                calendarCache.put(calName, calendar); // lazy-cache
1640
}
1641
1642        } catch (IOException JavaDoc e) {
1643            throw new JobPersistenceException(
1644                    "Couldn't store calendar because the BLOB couldn't be serialized: "
1645                            + e.getMessage(), e);
1646        } catch (ClassNotFoundException JavaDoc e) {
1647            throw new JobPersistenceException("Couldn't store calendar: "
1648                    + e.getMessage(), e);
1649        }catch (SQLException JavaDoc e) {
1650            throw new JobPersistenceException("Couldn't store calendar: "
1651                    + e.getMessage(), e);
1652        }
1653    }
1654    
1655    protected boolean calendarExists(Connection JavaDoc conn, String JavaDoc calName)
1656        throws JobPersistenceException {
1657        try {
1658            return getDelegate().calendarExists(conn, calName);
1659        } catch (SQLException JavaDoc e) {
1660            throw new JobPersistenceException(
1661                    "Couldn't determine calendar existence (" + calName + "): "
1662                            + e.getMessage(), e);
1663        }
1664    }
1665
1666    /**
1667     * <p>
1668     * Remove (delete) the <code>{@link org.quartz.Calendar}</code> with the
1669     * given name.
1670     * </p>
1671     *
1672     * <p>
1673     * If removal of the <code>Calendar</code> would result in
1674     * <code.Trigger</code>s pointing to non-existent calendars, then a
1675     * <code>JobPersistenceException</code> will be thrown.</p>
1676     * *
1677     * @param calName The name of the <code>Calendar</code> to be removed.
1678     * @return <code>true</code> if a <code>Calendar</code> with the given name
1679     * was found and removed from the store.
1680     */

1681    public boolean removeCalendar(final SchedulingContext ctxt, final String JavaDoc calName)
1682        throws JobPersistenceException {
1683        return ((Boolean JavaDoc)executeInLock(
1684                LOCK_TRIGGER_ACCESS,
1685                new TransactionCallback() {
1686                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1687                        return removeCalendar(conn, ctxt, calName) ?
1688                                Boolean.TRUE : Boolean.FALSE;
1689                    }
1690                })).booleanValue();
1691    }
1692    
1693    protected boolean removeCalendar(Connection JavaDoc conn, SchedulingContext ctxt,
1694            String JavaDoc calName) throws JobPersistenceException {
1695        try {
1696            if (getDelegate().calendarIsReferenced(conn, calName)) {
1697                throw new JobPersistenceException(
1698                    "Calender cannot be removed if it referenced by a trigger!");
1699            }
1700
1701            if (isClustered == false) {
1702                calendarCache.remove(calName);
1703            }
1704
1705            return (getDelegate().deleteCalendar(conn, calName) > 0);
1706        } catch (SQLException JavaDoc e) {
1707            throw new JobPersistenceException("Couldn't remove calendar: "
1708                    + e.getMessage(), e);
1709        }
1710    }
1711
1712    /**
1713     * <p>
1714     * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
1715     * </p>
1716     *
1717     * @param calName
1718     * The name of the <code>Calendar</code> to be retrieved.
1719     * @return The desired <code>Calendar</code>, or null if there is no
1720     * match.
1721     */

1722    public Calendar retrieveCalendar(final SchedulingContext ctxt, final String JavaDoc calName)
1723        throws JobPersistenceException {
1724        return (Calendar)executeWithoutLock( // no locks necessary for read...
1725
new TransactionCallback() {
1726                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1727                    return retrieveCalendar(conn, ctxt, calName);
1728                }
1729            });
1730    }
1731    
1732    protected Calendar retrieveCalendar(Connection JavaDoc conn,
1733            SchedulingContext ctxt, String JavaDoc calName)
1734        throws JobPersistenceException {
1735        // all calendars are persistent, but we can lazy-cache them during run
1736
// time as long as we aren't running clustered.
1737
Calendar cal = (isClustered) ? null : (Calendar) calendarCache.get(calName);
1738        if (cal != null) {
1739            return cal;
1740        }
1741
1742        try {
1743            cal = getDelegate().selectCalendar(conn, calName);
1744            if (isClustered == false) {
1745                calendarCache.put(calName, cal); // lazy-cache...
1746
}
1747            return cal;
1748        } catch (ClassNotFoundException JavaDoc e) {
1749            throw new JobPersistenceException(
1750                    "Couldn't retrieve calendar because a required class was not found: "
1751                            + e.getMessage(), e);
1752        } catch (IOException JavaDoc e) {
1753            throw new JobPersistenceException(
1754                    "Couldn't retrieve calendar because the BLOB couldn't be deserialized: "
1755                            + e.getMessage(), e);
1756        } catch (SQLException JavaDoc e) {
1757            throw new JobPersistenceException("Couldn't retrieve calendar: "
1758                    + e.getMessage(), e);
1759        }
1760    }
1761
1762    /**
1763     * <p>
1764     * Get the number of <code>{@link org.quartz.Job}</code> s that are
1765     * stored in the <code>JobStore</code>.
1766     * </p>
1767     */

1768    public int getNumberOfJobs(final SchedulingContext ctxt)
1769        throws JobPersistenceException {
1770        return ((Integer JavaDoc)executeWithoutLock( // no locks necessary for read...
1771
new TransactionCallback() {
1772                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1773                        return new Integer JavaDoc(getNumberOfJobs(conn, ctxt));
1774                    }
1775                })).intValue();
1776    }
1777    
1778    protected int getNumberOfJobs(Connection JavaDoc conn, SchedulingContext ctxt)
1779        throws JobPersistenceException {
1780        try {
1781            return getDelegate().selectNumJobs(conn);
1782        } catch (SQLException JavaDoc e) {
1783            throw new JobPersistenceException(
1784                    "Couldn't obtain number of jobs: " + e.getMessage(), e);
1785        }
1786    }
1787
1788    /**
1789     * <p>
1790     * Get the number of <code>{@link org.quartz.Trigger}</code> s that are
1791     * stored in the <code>JobsStore</code>.
1792     * </p>
1793     */

1794    public int getNumberOfTriggers(final SchedulingContext ctxt)
1795        throws JobPersistenceException {
1796        return ((Integer JavaDoc)executeWithoutLock( // no locks necessary for read...
1797
new TransactionCallback() {
1798                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1799                        return new Integer JavaDoc(getNumberOfTriggers(conn, ctxt));
1800                    }
1801                })).intValue();
1802    }
1803    
1804    protected int getNumberOfTriggers(Connection JavaDoc conn, SchedulingContext ctxt)
1805        throws JobPersistenceException {
1806        try {
1807            return getDelegate().selectNumTriggers(conn);
1808        } catch (SQLException JavaDoc e) {
1809            throw new JobPersistenceException(
1810                    "Couldn't obtain number of triggers: " + e.getMessage(), e);
1811        }
1812    }
1813
1814    /**
1815     * <p>
1816     * Get the number of <code>{@link org.quartz.Calendar}</code> s that are
1817     * stored in the <code>JobsStore</code>.
1818     * </p>
1819     */

1820    public int getNumberOfCalendars(final SchedulingContext ctxt)
1821        throws JobPersistenceException {
1822        return ((Integer JavaDoc)executeWithoutLock( // no locks necessary for read...
1823
new TransactionCallback() {
1824                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1825                        return new Integer JavaDoc(getNumberOfCalendars(conn, ctxt));
1826                    }
1827                })).intValue();
1828    }
1829    
1830    protected int getNumberOfCalendars(Connection JavaDoc conn, SchedulingContext ctxt)
1831        throws JobPersistenceException {
1832        try {
1833            return getDelegate().selectNumCalendars(conn);
1834        } catch (SQLException JavaDoc e) {
1835            throw new JobPersistenceException(
1836                    "Couldn't obtain number of calendars: " + e.getMessage(), e);
1837        }
1838    }
1839
1840    /**
1841     * <p>
1842     * Get the names of all of the <code>{@link org.quartz.Job}</code> s that
1843     * have the given group name.
1844     * </p>
1845     *
1846     * <p>
1847     * If there are no jobs in the given group name, the result should be a
1848     * zero-length array (not <code>null</code>).
1849     * </p>
1850     */

1851    public String JavaDoc[] getJobNames(final SchedulingContext ctxt, final String JavaDoc groupName)
1852        throws JobPersistenceException {
1853        return (String JavaDoc[])executeWithoutLock( // no locks necessary for read...
1854
new TransactionCallback() {
1855                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1856                    return getJobNames(conn, ctxt, groupName);
1857                }
1858            });
1859    }
1860    
1861    protected String JavaDoc[] getJobNames(Connection JavaDoc conn, SchedulingContext ctxt,
1862            String JavaDoc groupName) throws JobPersistenceException {
1863        String JavaDoc[] jobNames = null;
1864
1865        try {
1866            jobNames = getDelegate().selectJobsInGroup(conn, groupName);
1867        } catch (SQLException JavaDoc e) {
1868            throw new JobPersistenceException("Couldn't obtain job names: "
1869                    + e.getMessage(), e);
1870        }
1871
1872        return jobNames;
1873    }
1874
1875    /**
1876     * <p>
1877     * Get the names of all of the <code>{@link org.quartz.Trigger}</code> s
1878     * that have the given group name.
1879     * </p>
1880     *
1881     * <p>
1882     * If there are no triggers in the given group name, the result should be a
1883     * zero-length array (not <code>null</code>).
1884     * </p>
1885     */

1886    public String JavaDoc[] getTriggerNames(final SchedulingContext ctxt, final String JavaDoc groupName)
1887        throws JobPersistenceException {
1888        return (String JavaDoc[])executeWithoutLock( // no locks necessary for read...
1889
new TransactionCallback() {
1890                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1891                    return getTriggerNames(conn, ctxt, groupName);
1892                }
1893            });
1894    }
1895    
1896    protected String JavaDoc[] getTriggerNames(Connection JavaDoc conn, SchedulingContext ctxt,
1897            String JavaDoc groupName) throws JobPersistenceException {
1898
1899        String JavaDoc[] trigNames = null;
1900
1901        try {
1902            trigNames = getDelegate().selectTriggersInGroup(conn, groupName);
1903        } catch (SQLException JavaDoc e) {
1904            throw new JobPersistenceException("Couldn't obtain trigger names: "
1905                    + e.getMessage(), e);
1906        }
1907
1908        return trigNames;
1909    }
1910
1911
1912    /**
1913     * <p>
1914     * Get the names of all of the <code>{@link org.quartz.Job}</code>
1915     * groups.
1916     * </p>
1917     *
1918     * <p>
1919     * If there are no known group names, the result should be a zero-length
1920     * array (not <code>null</code>).
1921     * </p>
1922     */

1923    public String JavaDoc[] getJobGroupNames(final SchedulingContext ctxt)
1924        throws JobPersistenceException {
1925        return (String JavaDoc[])executeWithoutLock( // no locks necessary for read...
1926
new TransactionCallback() {
1927                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1928                    return getJobGroupNames(conn, ctxt);
1929                }
1930            });
1931    }
1932    
1933    protected String JavaDoc[] getJobGroupNames(Connection JavaDoc conn, SchedulingContext ctxt)
1934        throws JobPersistenceException {
1935
1936        String JavaDoc[] groupNames = null;
1937
1938        try {
1939            groupNames = getDelegate().selectJobGroups(conn);
1940        } catch (SQLException JavaDoc e) {
1941            throw new JobPersistenceException("Couldn't obtain job groups: "
1942                    + e.getMessage(), e);
1943        }
1944
1945        return groupNames;
1946    }
1947
1948    /**
1949     * <p>
1950     * Get the names of all of the <code>{@link org.quartz.Trigger}</code>
1951     * groups.
1952     * </p>
1953     *
1954     * <p>
1955     * If there are no known group names, the result should be a zero-length
1956     * array (not <code>null</code>).
1957     * </p>
1958     */

1959    public String JavaDoc[] getTriggerGroupNames(final SchedulingContext ctxt)
1960        throws JobPersistenceException {
1961        return (String JavaDoc[])executeWithoutLock( // no locks necessary for read...
1962
new TransactionCallback() {
1963                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
1964                    return getTriggerGroupNames(conn, ctxt);
1965                }
1966            });
1967    }
1968    
1969    protected String JavaDoc[] getTriggerGroupNames(Connection JavaDoc conn,
1970            SchedulingContext ctxt) throws JobPersistenceException {
1971
1972        String JavaDoc[] groupNames = null;
1973
1974        try {
1975            groupNames = getDelegate().selectTriggerGroups(conn);
1976        } catch (SQLException JavaDoc e) {
1977            throw new JobPersistenceException(
1978                    "Couldn't obtain trigger groups: " + e.getMessage(), e);
1979        }
1980
1981        return groupNames;
1982    }
1983
1984    /**
1985     * <p>
1986     * Get the names of all of the <code>{@link org.quartz.Calendar}</code> s
1987     * in the <code>JobStore</code>.
1988     * </p>
1989     *
1990     * <p>
1991     * If there are no Calendars in the given group name, the result should be
1992     * a zero-length array (not <code>null</code>).
1993     * </p>
1994     */

1995    public String JavaDoc[] getCalendarNames(final SchedulingContext ctxt)
1996        throws JobPersistenceException {
1997        return (String JavaDoc[])executeWithoutLock( // no locks necessary for read...
1998
new TransactionCallback() {
1999                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
2000                    return getCalendarNames(conn, ctxt);
2001                }
2002            });
2003    }
2004    
2005    protected String JavaDoc[] getCalendarNames(Connection JavaDoc conn, SchedulingContext ctxt)
2006        throws JobPersistenceException {
2007        try {
2008            return getDelegate().selectCalendars(conn);
2009        } catch (SQLException JavaDoc e) {
2010            throw new JobPersistenceException(
2011                    "Couldn't obtain trigger groups: " + e.getMessage(), e);
2012        }
2013    }
2014
2015    /**
2016     * <p>
2017     * Get all of the Triggers that are associated to the given Job.
2018     * </p>
2019     *
2020     * <p>
2021     * If there are no matches, a zero-length array should be returned.
2022     * </p>
2023     */

2024    public Trigger[] getTriggersForJob(final SchedulingContext ctxt, final String JavaDoc jobName,
2025        final String JavaDoc groupName) throws JobPersistenceException {
2026        return (Trigger[])executeWithoutLock( // no locks necessary for read...
2027
new TransactionCallback() {
2028                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
2029                    return getTriggersForJob(conn, ctxt, jobName, groupName);
2030                }
2031            });
2032    }
2033    
2034    protected Trigger[] getTriggersForJob(Connection JavaDoc conn,
2035            SchedulingContext ctxt, String JavaDoc jobName, String JavaDoc groupName)
2036        throws JobPersistenceException {
2037        Trigger[] array = null;
2038
2039        try {
2040            array = getDelegate()
2041                    .selectTriggersForJob(conn, jobName, groupName);
2042        } catch (Exception JavaDoc e) {
2043            throw new JobPersistenceException(
2044                    "Couldn't obtain triggers for job: " + e.getMessage(), e);
2045        }
2046
2047        return array;
2048    }
2049
2050    /**
2051     * <p>
2052     * Pause the <code>{@link org.quartz.Trigger}</code> with the given name.
2053     * </p>
2054     *
2055     * @see #resumeTrigger(SchedulingContext, String, String)
2056     */

2057    public void pauseTrigger(final SchedulingContext ctxt, final String JavaDoc triggerName,
2058            final String JavaDoc groupName) throws JobPersistenceException {
2059        executeInLock(
2060            LOCK_TRIGGER_ACCESS,
2061            new VoidTransactionCallback() {
2062                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2063                    pauseTrigger(conn, ctxt, triggerName, groupName);
2064                }
2065            });
2066    }
2067    
2068    /**
2069     * <p>
2070     * Pause the <code>{@link org.quartz.Trigger}</code> with the given name.
2071     * </p>
2072     *
2073     * @see #resumeTrigger(Connection, SchedulingContext, String, String)
2074     */

2075    public void pauseTrigger(Connection JavaDoc conn, SchedulingContext ctxt,
2076            String JavaDoc triggerName, String JavaDoc groupName)
2077        throws JobPersistenceException {
2078
2079        try {
2080            String JavaDoc oldState = getDelegate().selectTriggerState(conn,
2081                    triggerName, groupName);
2082
2083            if (oldState.equals(STATE_WAITING)
2084                    || oldState.equals(STATE_ACQUIRED)) {
2085
2086                getDelegate().updateTriggerState(conn, triggerName,
2087                        groupName, STATE_PAUSED);
2088            } else if (oldState.equals(STATE_BLOCKED)) {
2089                getDelegate().updateTriggerState(conn, triggerName,
2090                        groupName, STATE_PAUSED_BLOCKED);
2091            }
2092        } catch (SQLException JavaDoc e) {
2093            throw new JobPersistenceException("Couldn't pause trigger '"
2094                    + groupName + "." + triggerName + "': " + e.getMessage(), e);
2095        }
2096    }
2097
2098    /**
2099     * <p>
2100     * Pause the <code>{@link org.quartz.Job}</code> with the given name - by
2101     * pausing all of its current <code>Trigger</code>s.
2102     * </p>
2103     *
2104     * @see #resumeJob(SchedulingContext, String, String)
2105     */

2106    public void pauseJob(final SchedulingContext ctxt, final String JavaDoc jobName,
2107            final String JavaDoc groupName) throws JobPersistenceException {
2108        executeInLock(
2109            LOCK_TRIGGER_ACCESS,
2110            new VoidTransactionCallback() {
2111                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2112                    Trigger[] triggers = getTriggersForJob(conn, ctxt, jobName, groupName);
2113                    for (int j = 0; j < triggers.length; j++) {
2114                        pauseTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup());
2115                    }
2116                }
2117            });
2118    }
2119    
2120    /**
2121     * <p>
2122     * Pause all of the <code>{@link org.quartz.Job}s</code> in the given
2123     * group - by pausing all of their <code>Trigger</code>s.
2124     * </p>
2125     *
2126     * @see #resumeJobGroup(SchedulingContext, String)
2127     */

2128    public void pauseJobGroup(final SchedulingContext ctxt, final String JavaDoc groupName)
2129        throws JobPersistenceException {
2130        executeInLock(
2131            LOCK_TRIGGER_ACCESS,
2132            new VoidTransactionCallback() {
2133                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2134                    String JavaDoc[] jobNames = getJobNames(conn, ctxt, groupName);
2135
2136                    for (int i = 0; i < jobNames.length; i++) {
2137                        Trigger[] triggers = getTriggersForJob(conn, ctxt, jobNames[i], groupName);
2138                        for (int j = 0; j < triggers.length; j++) {
2139                            pauseTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup());
2140                        }
2141                    }
2142                }
2143            });
2144    }
2145    
2146    /**
2147     * Determines if a Trigger for the given job should be blocked.
2148     * State can only transition to STATE_PAUSED_BLOCKED/STATE_BLOCKED from
2149     * STATE_PAUSED/STATE_WAITING respectively.
2150     *
2151     * @return STATE_PAUSED_BLOCKED, STATE_BLOCKED, or the currentState.
2152     */

2153    protected String JavaDoc checkBlockedState(
2154            Connection JavaDoc conn, SchedulingContext ctxt, String JavaDoc jobName,
2155            String JavaDoc jobGroupName, String JavaDoc currentState)
2156        throws JobPersistenceException {
2157
2158        // State can only transition to BLOCKED from PAUSED or WAITING.
2159
if ((currentState.equals(STATE_WAITING) == false) &&
2160            (currentState.equals(STATE_PAUSED) == false)) {
2161            return currentState;
2162        }
2163        
2164        try {
2165            List JavaDoc lst = getDelegate().selectFiredTriggerRecordsByJob(conn,
2166                    jobName, jobGroupName);
2167
2168            if (lst.size() > 0) {
2169                FiredTriggerRecord rec = (FiredTriggerRecord) lst.get(0);
2170                if (rec.isJobIsStateful()) { // TODO: worry about
2171
// failed/recovering/volatile job
2172
// states?
2173
return (STATE_PAUSED.equals(currentState)) ? STATE_PAUSED_BLOCKED : STATE_BLOCKED;
2174                }
2175            }
2176
2177            return currentState;
2178        } catch (SQLException JavaDoc e) {
2179            throw new JobPersistenceException(
2180                "Couldn't determine if trigger should be in a blocked state '"
2181                    + jobGroupName + "."
2182                    + jobName + "': "
2183                    + e.getMessage(), e);
2184        }
2185
2186    }
2187
2188    /*
2189     * private List findTriggersToBeBlocked(Connection conn, SchedulingContext
2190     * ctxt, String groupName) throws JobPersistenceException {
2191     *
2192     * try { List blockList = new LinkedList();
2193     *
2194     * List affectingJobs =
2195     * getDelegate().selectStatefulJobsOfTriggerGroup(conn, groupName);
2196     *
2197     * Iterator itr = affectingJobs.iterator(); while(itr.hasNext()) { Key
2198     * jobKey = (Key) itr.next();
2199     *
2200     * List lst = getDelegate().selectFiredTriggerRecordsByJob(conn,
2201     * jobKey.getName(), jobKey.getGroup());
2202     *
2203     * This logic is BROKEN...
2204     *
2205     * if(lst.size() > 0) { FiredTriggerRecord rec =
2206     * (FiredTriggerRecord)lst.get(0); if(rec.isJobIsStateful()) // TODO: worry
2207     * about failed/recovering/volatile job states? blockList.add(
2208     * rec.getTriggerKey() ); } }
2209     *
2210     *
2211     * return blockList; } catch (SQLException e) { throw new
2212     * JobPersistenceException ("Couldn't determine states of resumed triggers
2213     * in group '" + groupName + "': " + e.getMessage(), e); } }
2214     */

2215
2216    /**
2217     * <p>
2218     * Resume (un-pause) the <code>{@link org.quartz.Trigger}</code> with the
2219     * given name.
2220     * </p>
2221     *
2222     * <p>
2223     * If the <code>Trigger</code> missed one or more fire-times, then the
2224     * <code>Trigger</code>'s misfire instruction will be applied.
2225     * </p>
2226     *
2227     * @see #pauseTrigger(SchedulingContext, String, String)
2228     */

2229    public void resumeTrigger(final SchedulingContext ctxt, final String JavaDoc triggerName,
2230            final String JavaDoc groupName) throws JobPersistenceException {
2231        executeInLock(
2232            LOCK_TRIGGER_ACCESS,
2233            new VoidTransactionCallback() {
2234                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2235                    resumeTrigger(conn, ctxt, triggerName, groupName);
2236                }
2237            });
2238    }
2239    
2240    /**
2241     * <p>
2242     * Resume (un-pause) the <code>{@link org.quartz.Trigger}</code> with the
2243     * given name.
2244     * </p>
2245     *
2246     * <p>
2247     * If the <code>Trigger</code> missed one or more fire-times, then the
2248     * <code>Trigger</code>'s misfire instruction will be applied.
2249     * </p>
2250     *
2251     * @see #pauseTrigger(Connection, SchedulingContext, String, String)
2252     */

2253    public void resumeTrigger(Connection JavaDoc conn, SchedulingContext ctxt,
2254            String JavaDoc triggerName, String JavaDoc groupName)
2255        throws JobPersistenceException {
2256        try {
2257
2258            TriggerStatus status = getDelegate().selectTriggerStatus(conn,
2259                    triggerName, groupName);
2260
2261            if (status == null || status.getNextFireTime() == null) {
2262                return;
2263            }
2264
2265            boolean blocked = false;
2266            if(STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
2267                blocked = true;
2268            }
2269
2270            String JavaDoc newState = checkBlockedState(conn, ctxt, status.getJobKey().getName(),
2271                    status.getJobKey().getGroup(), STATE_WAITING);
2272
2273            boolean misfired = false;
2274
2275            if (status.getNextFireTime().before(new Date JavaDoc())) {
2276                misfired = updateMisfiredTrigger(conn, ctxt, triggerName, groupName,
2277                    newState, true);
2278            }
2279
2280            if(!misfired) {
2281                if(blocked) {
2282                    getDelegate().updateTriggerStateFromOtherState(conn,
2283                            triggerName, groupName, newState, STATE_PAUSED_BLOCKED);
2284                } else {
2285                    getDelegate().updateTriggerStateFromOtherState(conn,
2286                            triggerName, groupName, newState, STATE_PAUSED);
2287                }
2288            }
2289
2290        } catch (SQLException JavaDoc e) {
2291            throw new JobPersistenceException("Couldn't resume trigger '"
2292                    + groupName + "." + triggerName + "': " + e.getMessage(), e);
2293        }
2294    }
2295
2296    /**
2297     * <p>
2298     * Resume (un-pause) the <code>{@link org.quartz.Job}</code> with the
2299     * given name.
2300     * </p>
2301     *
2302     * <p>
2303     * If any of the <code>Job</code>'s<code>Trigger</code> s missed one
2304     * or more fire-times, then the <code>Trigger</code>'s misfire
2305     * instruction will be applied.
2306     * </p>
2307     *
2308     * @see #pauseJob(SchedulingContext, String, String)
2309     */

2310    public void resumeJob(final SchedulingContext ctxt, final String JavaDoc jobName,
2311        final String JavaDoc groupName) throws JobPersistenceException {
2312        executeInLock(
2313            LOCK_TRIGGER_ACCESS,
2314            new VoidTransactionCallback() {
2315                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2316                    Trigger[] triggers = getTriggersForJob(conn, ctxt, jobName, groupName);
2317                    for (int j = 0; j < triggers.length; j++) {
2318                        resumeTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup());
2319                    }
2320                }
2321            });
2322    }
2323    
2324    /**
2325     * <p>
2326     * Resume (un-pause) all of the <code>{@link org.quartz.Job}s</code> in
2327     * the given group.
2328     * </p>
2329     *
2330     * <p>
2331     * If any of the <code>Job</code> s had <code>Trigger</code> s that
2332     * missed one or more fire-times, then the <code>Trigger</code>'s
2333     * misfire instruction will be applied.
2334     * </p>
2335     *
2336     * @see #pauseJobGroup(SchedulingContext, String)
2337     */

2338    public void resumeJobGroup(final SchedulingContext ctxt, final String JavaDoc groupName)
2339        throws JobPersistenceException {
2340        executeInLock(
2341            LOCK_TRIGGER_ACCESS,
2342            new VoidTransactionCallback() {
2343                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2344                    String JavaDoc[] jobNames = getJobNames(conn, ctxt, groupName);
2345
2346                    for (int i = 0; i < jobNames.length; i++) {
2347                        Trigger[] triggers = getTriggersForJob(conn, ctxt, jobNames[i], groupName);
2348                        for (int j = 0; j < triggers.length; j++) {
2349                            resumeTrigger(conn, ctxt, triggers[j].getName(), triggers[j].getGroup());
2350                        }
2351                    }
2352                }
2353            });
2354    }
2355    
2356    /**
2357     * <p>
2358     * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2359     * given group.
2360     * </p>
2361     *
2362     * @see #resumeTriggerGroup(SchedulingContext, String)
2363     */

2364    public void pauseTriggerGroup(final SchedulingContext ctxt, final String JavaDoc groupName)
2365        throws JobPersistenceException {
2366        executeInLock(
2367            LOCK_TRIGGER_ACCESS,
2368            new VoidTransactionCallback() {
2369                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2370                    pauseTriggerGroup(conn, ctxt, groupName);
2371                }
2372            });
2373    }
2374    
2375    /**
2376     * <p>
2377     * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2378     * given group.
2379     * </p>
2380     *
2381     * @see #resumeTriggerGroup(Connection, SchedulingContext, String)
2382     */

2383    public void pauseTriggerGroup(Connection JavaDoc conn, SchedulingContext ctxt,
2384            String JavaDoc groupName) throws JobPersistenceException {
2385
2386        try {
2387
2388            getDelegate().updateTriggerGroupStateFromOtherStates(
2389                    conn, groupName, STATE_PAUSED, STATE_ACQUIRED,
2390                    STATE_WAITING, STATE_WAITING);
2391
2392            getDelegate().updateTriggerGroupStateFromOtherState(
2393                    conn, groupName, STATE_PAUSED_BLOCKED, STATE_BLOCKED);
2394            
2395            if (!getDelegate().isTriggerGroupPaused(conn, groupName)) {
2396                getDelegate().insertPausedTriggerGroup(conn, groupName);
2397            }
2398
2399        } catch (SQLException JavaDoc e) {
2400            throw new JobPersistenceException("Couldn't pause trigger group '"
2401                    + groupName + "': " + e.getMessage(), e);
2402        }
2403    }
2404
2405    public Set JavaDoc getPausedTriggerGroups(final SchedulingContext ctxt)
2406        throws JobPersistenceException {
2407        return (Set JavaDoc)executeWithoutLock( // no locks necessary for read...
2408
new TransactionCallback() {
2409                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
2410                    return getPausedTriggerGroups(conn, ctxt);
2411                }
2412            });
2413    }
2414    
2415    /**
2416     * <p>
2417     * Pause all of the <code>{@link org.quartz.Trigger}s</code> in the
2418     * given group.
2419     * </p>
2420     *
2421     * @see #resumeTriggerGroup(Connection, SchedulingContext, String)
2422     */

2423    public Set JavaDoc getPausedTriggerGroups(Connection JavaDoc conn, SchedulingContext ctxt)
2424        throws JobPersistenceException {
2425
2426        try {
2427            return getDelegate().selectPausedTriggerGroups(conn);
2428        } catch (SQLException JavaDoc e) {
2429            throw new JobPersistenceException(
2430                    "Couldn't determine paused trigger groups: " + e.getMessage(), e);
2431        }
2432    }
2433    
2434    /**
2435     * <p>
2436     * Resume (un-pause) all of the <code>{@link org.quartz.Trigger}s</code>
2437     * in the given group.
2438     * </p>
2439     *
2440     * <p>
2441     * If any <code>Trigger</code> missed one or more fire-times, then the
2442     * <code>Trigger</code>'s misfire instruction will be applied.
2443     * </p>
2444     *
2445     * @see #pauseTriggerGroup(SchedulingContext, String)
2446     */

2447    public void resumeTriggerGroup(final SchedulingContext ctxt, final String JavaDoc groupName)
2448        throws JobPersistenceException {
2449        executeInLock(
2450            LOCK_TRIGGER_ACCESS,
2451            new VoidTransactionCallback() {
2452                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2453                    resumeTriggerGroup(conn, ctxt, groupName);
2454                }
2455            });
2456    }
2457    
2458    /**
2459     * <p>
2460     * Resume (un-pause) all of the <code>{@link org.quartz.Trigger}s</code>
2461     * in the given group.
2462     * </p>
2463     *
2464     * <p>
2465     * If any <code>Trigger</code> missed one or more fire-times, then the
2466     * <code>Trigger</code>'s misfire instruction will be applied.
2467     * </p>
2468     *
2469     * @see #pauseTriggerGroup(Connection, SchedulingContext, String)
2470     */

2471    public void resumeTriggerGroup(Connection JavaDoc conn, SchedulingContext ctxt,
2472            String JavaDoc groupName) throws JobPersistenceException {
2473
2474        try {
2475
2476            getDelegate().deletePausedTriggerGroup(conn, groupName);
2477
2478            String JavaDoc[] trigNames = getDelegate().selectTriggersInGroup(conn,
2479                    groupName);
2480
2481            for (int i = 0; i < trigNames.length; i++) {
2482                resumeTrigger(conn, ctxt, trigNames[i], groupName);
2483            }
2484
2485            // TODO: find an efficient way to resume triggers (better than the
2486
// above)... logic below is broken because of
2487
// findTriggersToBeBlocked()
2488
/*
2489             * int res =
2490             * getDelegate().updateTriggerGroupStateFromOtherState(conn,
2491             * groupName, STATE_WAITING, STATE_PAUSED);
2492             *
2493             * if(res > 0) {
2494             *
2495             * long misfireTime = System.currentTimeMillis();
2496             * if(getMisfireThreshold() > 0) misfireTime -=
2497             * getMisfireThreshold();
2498             *
2499             * Key[] misfires =
2500             * getDelegate().selectMisfiredTriggersInGroupInState(conn,
2501             * groupName, STATE_WAITING, misfireTime);
2502             *
2503             * List blockedTriggers = findTriggersToBeBlocked(conn, ctxt,
2504             * groupName);
2505             *
2506             * Iterator itr = blockedTriggers.iterator(); while(itr.hasNext()) {
2507             * Key key = (Key)itr.next();
2508             * getDelegate().updateTriggerState(conn, key.getName(),
2509             * key.getGroup(), STATE_BLOCKED); }
2510             *
2511             * for(int i=0; i < misfires.length; i++) { String
2512             * newState = STATE_WAITING;
2513             * if(blockedTriggers.contains(misfires[i])) newState =
2514             * STATE_BLOCKED; updateMisfiredTrigger(conn, ctxt,
2515             * misfires[i].getName(), misfires[i].getGroup(), newState, true); } }
2516             */

2517
2518        } catch (SQLException JavaDoc e) {
2519            throw new JobPersistenceException("Couldn't pause trigger group '"
2520                    + groupName + "': " + e.getMessage(), e);
2521        }
2522    }
2523
2524    /**
2525     * <p>
2526     * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code>
2527     * on every group.
2528     * </p>
2529     *
2530     * <p>
2531     * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
2532     * instructions WILL be applied.
2533     * </p>
2534     *
2535     * @see #resumeAll(SchedulingContext)
2536     * @see #pauseTriggerGroup(SchedulingContext, String)
2537     */

2538    public void pauseAll(final SchedulingContext ctxt) throws JobPersistenceException {
2539        executeInLock(
2540            LOCK_TRIGGER_ACCESS,
2541            new VoidTransactionCallback() {
2542                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2543                    pauseAll(conn, ctxt);
2544                }
2545            });
2546    }
2547    
2548    /**
2549     * <p>
2550     * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code>
2551     * on every group.
2552     * </p>
2553     *
2554     * <p>
2555     * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
2556     * instructions WILL be applied.
2557     * </p>
2558     *
2559     * @see #resumeAll(SchedulingContext)
2560     * @see #pauseTriggerGroup(SchedulingContext, String)
2561     */

2562    public void pauseAll(Connection JavaDoc conn, SchedulingContext ctxt)
2563        throws JobPersistenceException {
2564
2565        String JavaDoc[] names = getTriggerGroupNames(conn, ctxt);
2566
2567        for (int i = 0; i < names.length; i++) {
2568            pauseTriggerGroup(conn, ctxt, names[i]);
2569        }
2570
2571        try {
2572            if (!getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED)) {
2573                getDelegate().insertPausedTriggerGroup(conn, ALL_GROUPS_PAUSED);
2574            }
2575
2576        } catch (SQLException JavaDoc e) {
2577            throw new JobPersistenceException(
2578                    "Couldn't pause all trigger groups: " + e.getMessage(), e);
2579        }
2580
2581    }
2582
2583    /**
2584     * <p>
2585     * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
2586     * on every group.
2587     * </p>
2588     *
2589     * <p>
2590     * If any <code>Trigger</code> missed one or more fire-times, then the
2591     * <code>Trigger</code>'s misfire instruction will be applied.
2592     * </p>
2593     *
2594     * @see #pauseAll(SchedulingContext)
2595     */

2596    public void resumeAll(final SchedulingContext ctxt)
2597        throws JobPersistenceException {
2598        executeInLock(
2599            LOCK_TRIGGER_ACCESS,
2600            new VoidTransactionCallback() {
2601                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2602                    resumeAll(conn, ctxt);
2603                }
2604            });
2605    }
2606    
2607    /**
2608     * protected
2609     * <p>
2610     * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
2611     * on every group.
2612     * </p>
2613     *
2614     * <p>
2615     * If any <code>Trigger</code> missed one or more fire-times, then the
2616     * <code>Trigger</code>'s misfire instruction will be applied.
2617     * </p>
2618     *
2619     * @see #pauseAll(SchedulingContext)
2620     */

2621    public void resumeAll(Connection JavaDoc conn, SchedulingContext ctxt)
2622        throws JobPersistenceException {
2623
2624        String JavaDoc[] names = getTriggerGroupNames(conn, ctxt);
2625
2626        for (int i = 0; i < names.length; i++) {
2627            resumeTriggerGroup(conn, ctxt, names[i]);
2628        }
2629
2630        try {
2631            getDelegate().deletePausedTriggerGroup(conn, ALL_GROUPS_PAUSED);
2632        } catch (SQLException JavaDoc e) {
2633            throw new JobPersistenceException(
2634                    "Couldn't resume all trigger groups: " + e.getMessage(), e);
2635        }
2636    }
2637
2638    private static long ftrCtr = System.currentTimeMillis();
2639
2640    protected synchronized String JavaDoc getFiredTriggerRecordId() {
2641        return getInstanceId() + ftrCtr++;
2642    }
2643
2644    /**
2645     * <p>
2646     * Get a handle to the next N triggers to be fired, and mark them as 'reserved'
2647     * by the calling scheduler.
2648     * </p>
2649     *
2650     * @see #releaseAcquiredTrigger(SchedulingContext, Trigger)
2651     */

2652    public Trigger acquireNextTrigger(final SchedulingContext ctxt, final long noLaterThan)
2653        throws JobPersistenceException {
2654        return (Trigger)executeInNonManagedTXLock(
2655                LOCK_TRIGGER_ACCESS,
2656                new TransactionCallback() {
2657                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
2658                        return acquireNextTrigger(conn, ctxt, noLaterThan);
2659                    }
2660                });
2661    }
2662    
2663    // TODO: this really ought to return something like a FiredTriggerBundle,
2664
// so that the fireInstanceId doesn't have to be on the trigger...
2665
protected Trigger acquireNextTrigger(Connection JavaDoc conn, SchedulingContext ctxt, long noLaterThan)
2666        throws JobPersistenceException {
2667        do {
2668            try {
2669                Key triggerKey = getDelegate().selectTriggerToAcquire(conn, noLaterThan, getMisfireTime());
2670
2671                // No trigger is ready to fire yet.
2672
if (triggerKey == null) {
2673                    return null;
2674                }
2675                
2676                int rowsUpdated =
2677                    getDelegate().updateTriggerStateFromOtherState(
2678                        conn,
2679                        triggerKey.getName(), triggerKey.getGroup(),
2680                        STATE_ACQUIRED, STATE_WAITING);
2681
2682                // If our trigger was no longer in the expected state, try a new one.
2683
if (rowsUpdated <= 0) {
2684                    continue;
2685                }
2686
2687                Trigger nextTrigger =
2688                    retrieveTrigger(conn, ctxt, triggerKey.getName(), triggerKey.getGroup());
2689
2690                // If our trigger is no longer available, try a new one.
2691
if(nextTrigger == null) {
2692                    continue;
2693                }
2694                  
2695                nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
2696                getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
2697                
2698                return nextTrigger;
2699            } catch (Exception JavaDoc e) {
2700                throw new JobPersistenceException(
2701                          "Couldn't acquire next trigger: " + e.getMessage(), e);
2702            }
2703        } while (true);
2704    }
2705    
2706    /**
2707     * <p>
2708     * Inform the <code>JobStore</code> that the scheduler no longer plans to
2709     * fire the given <code>Trigger</code>, that it had previously acquired
2710     * (reserved).
2711     * </p>
2712     */

2713    public void releaseAcquiredTrigger(final SchedulingContext ctxt, final Trigger trigger)
2714        throws JobPersistenceException {
2715        executeInNonManagedTXLock(
2716            LOCK_TRIGGER_ACCESS,
2717            new VoidTransactionCallback() {
2718                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2719                    releaseAcquiredTrigger(conn, ctxt, trigger);
2720                }
2721            });
2722    }
2723    
2724    protected void releaseAcquiredTrigger(Connection JavaDoc conn,
2725            SchedulingContext ctxt, Trigger trigger)
2726        throws JobPersistenceException {
2727        try {
2728            getDelegate().updateTriggerStateFromOtherState(conn,
2729                    trigger.getName(), trigger.getGroup(), STATE_WAITING,
2730                    STATE_ACQUIRED);
2731            getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId());
2732        } catch (SQLException JavaDoc e) {
2733            throw new JobPersistenceException(
2734                    "Couldn't release acquired trigger: " + e.getMessage(), e);
2735        }
2736    }
2737
2738    /**
2739     * <p>
2740     * Inform the <code>JobStore</code> that the scheduler is now firing the
2741     * given <code>Trigger</code> (executing its associated <code>Job</code>),
2742     * that it had previously acquired (reserved).
2743     * </p>
2744     *
2745     * @return null if the trigger or its job or calendar no longer exist, or
2746     * if the trigger was not successfully put into the 'executing'
2747     * state.
2748     */

2749    public TriggerFiredBundle triggerFired(
2750            final SchedulingContext ctxt, final Trigger trigger) throws JobPersistenceException {
2751        return
2752            (TriggerFiredBundle)executeInNonManagedTXLock(
2753                LOCK_TRIGGER_ACCESS,
2754                new TransactionCallback() {
2755                    public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
2756                        try {
2757                            return triggerFired(conn, ctxt, trigger);
2758                        } catch (JobPersistenceException jpe) {
2759                            // If job didn't exisit, we still want to commit our work and return null.
2760
if (jpe.getErrorCode() == SchedulerException.ERR_PERSISTENCE_JOB_DOES_NOT_EXIST) {
2761                                return null;
2762                            } else {
2763                                throw jpe;
2764                            }
2765                        }
2766                    }
2767                });
2768    }
2769
2770    protected TriggerFiredBundle triggerFired(Connection JavaDoc conn,
2771            SchedulingContext ctxt, Trigger trigger)
2772        throws JobPersistenceException {
2773        JobDetail job = null;
2774        Calendar cal = null;
2775
2776        // Make sure trigger wasn't deleted, paused, or completed...
2777
try { // if trigger was deleted, state will be STATE_DELETED
2778
String JavaDoc state = getDelegate().selectTriggerState(conn,
2779                    trigger.getName(), trigger.getGroup());
2780            if (!state.equals(STATE_ACQUIRED)) {
2781                return null;
2782            }
2783        } catch (SQLException JavaDoc e) {
2784            throw new JobPersistenceException("Couldn't select trigger state: "
2785                    + e.getMessage(), e);
2786        }
2787
2788        try {
2789            job = retrieveJob(conn, ctxt, trigger.getJobName(), trigger
2790                    .getJobGroup());
2791            if (job == null) { return null; }
2792        } catch (JobPersistenceException jpe) {
2793            try {
2794                getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
2795                getDelegate().updateTriggerState(conn, trigger.getName(),
2796                        trigger.getGroup(), STATE_ERROR);
2797            } catch (SQLException JavaDoc sqle) {
2798                getLog().error("Unable to set trigger state to ERROR.", sqle);
2799            }
2800            throw jpe;
2801        }
2802
2803        if (trigger.getCalendarName() != null) {
2804            cal = retrieveCalendar(conn, ctxt, trigger.getCalendarName());
2805            if (cal == null) { return null; }
2806        }
2807
2808        try {
2809            getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId());
2810            getDelegate().insertFiredTrigger(conn, trigger, STATE_EXECUTING,
2811                    job);
2812        } catch (SQLException JavaDoc e) {
2813            throw new JobPersistenceException("Couldn't insert fired trigger: "
2814                    + e.getMessage(), e);
2815        }
2816
2817        Date JavaDoc prevFireTime = trigger.getPreviousFireTime();
2818
2819        // call triggered - to update the trigger's next-fire-time state...
2820
trigger.triggered(cal);
2821
2822        String JavaDoc state = STATE_WAITING;
2823        boolean force = true;
2824        
2825        if (job.isStateful()) {
2826            state = STATE_BLOCKED;
2827            force = false;
2828            try {
2829                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getName(),
2830                        job.getGroup(), STATE_BLOCKED, STATE_WAITING);
2831                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getName(),
2832                        job.getGroup(), STATE_BLOCKED, STATE_ACQUIRED);
2833                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getName(),
2834                        job.getGroup(), STATE_PAUSED_BLOCKED, STATE_PAUSED);
2835            } catch (SQLException JavaDoc e) {
2836                throw new JobPersistenceException(
2837                        "Couldn't update states of blocked triggers: "
2838                                + e.getMessage(), e);
2839            }
2840        }
2841            
2842        if (trigger.getNextFireTime() == null) {
2843            state = STATE_COMPLETE;
2844            force = true;
2845        }
2846
2847        storeTrigger(conn, ctxt, trigger, job, true, state, force, false);
2848
2849        job.getJobDataMap().clearDirtyFlag();
2850
2851        return new TriggerFiredBundle(job, trigger, cal, trigger.getGroup()
2852                .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date JavaDoc(), trigger
2853                .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
2854    }
2855
2856    /**
2857     * <p>
2858     * Inform the <code>JobStore</code> that the scheduler has completed the
2859     * firing of the given <code>Trigger</code> (and the execution its
2860     * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code>
2861     * in the given <code>JobDetail</code> should be updated if the <code>Job</code>
2862     * is stateful.
2863     * </p>
2864     */

2865    public void triggeredJobComplete(final SchedulingContext ctxt, final Trigger trigger,
2866            final JobDetail jobDetail, final int triggerInstCode)
2867        throws JobPersistenceException {
2868        executeInNonManagedTXLock(
2869            LOCK_TRIGGER_ACCESS,
2870            new VoidTransactionCallback() {
2871                public void execute(Connection JavaDoc conn) throws JobPersistenceException {
2872                    triggeredJobComplete(conn, ctxt, trigger, jobDetail,triggerInstCode);
2873                }
2874            });
2875    }
2876    
2877    protected void triggeredJobComplete(Connection JavaDoc conn,
2878            SchedulingContext ctxt, Trigger trigger, JobDetail jobDetail,
2879            int triggerInstCode) throws JobPersistenceException {
2880        try {
2881            if (triggerInstCode == Trigger.INSTRUCTION_DELETE_TRIGGER) {
2882                if(trigger.getNextFireTime() == null) {
2883                    // double check for possible reschedule within job
2884
// execution, which would cancel the need to delete...
2885
TriggerStatus stat = getDelegate().selectTriggerStatus(
2886                            conn, trigger.getName(), trigger.getGroup());
2887                    if(stat != null && stat.getNextFireTime() == null) {
2888                        removeTrigger(conn, ctxt, trigger.getName(), trigger.getGroup());
2889                    }
2890                } else{
2891                    removeTrigger(conn, ctxt, trigger.getName(), trigger.getGroup());
2892                }
2893            } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_COMPLETE) {
2894                getDelegate().updateTriggerState(conn, trigger.getName(),
2895                        trigger.getGroup(), STATE_COMPLETE);
2896            } else if (triggerInstCode == Trigger.INSTRUCTION_SET_TRIGGER_ERROR) {
2897                getLog().info("Trigger " + trigger.getFullName() + " set to ERROR state.");
2898                getDelegate().updateTriggerState(conn, trigger.getName(),
2899                        trigger.getGroup(), STATE_ERROR);
2900            } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_COMPLETE) {
2901                getDelegate().updateTriggerStatesForJob(conn,
2902                        trigger.getJobName(), trigger.getJobGroup(),
2903                        STATE_COMPLETE);
2904            } else if (triggerInstCode == Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR) {
2905                getLog().info("All triggers of Job " +
2906                        trigger.getFullJobName() + " set to ERROR state.");
2907                getDelegate().updateTriggerStatesForJob(conn,
2908                        trigger.getJobName(), trigger.getJobGroup(),
2909                        STATE_ERROR);
2910            }
2911
2912            if (jobDetail.isStateful()) {
2913                getDelegate().updateTriggerStatesForJobFromOtherState(conn,
2914                        jobDetail.getName(), jobDetail.getGroup(),
2915                        STATE_WAITING, STATE_BLOCKED);
2916
2917                getDelegate().updateTriggerStatesForJobFromOtherState(conn,
2918                        jobDetail.getName(), jobDetail.getGroup(),
2919                        STATE_PAUSED, STATE_PAUSED_BLOCKED);
2920
2921                try {
2922                    if (jobDetail.getJobDataMap().isDirty()) {
2923                        getDelegate().updateJobData(conn, jobDetail);
2924                    }
2925                } catch (IOException JavaDoc e) {
2926                    throw new JobPersistenceException(
2927                            "Couldn't serialize job data: " + e.getMessage(), e);
2928                } catch (SQLException JavaDoc e) {
2929                    throw new JobPersistenceException(
2930                            "Couldn't update job data: " + e.getMessage(), e);
2931                }
2932            }
2933        } catch (SQLException JavaDoc e) {
2934            throw new JobPersistenceException(
2935                    "Couldn't update trigger state(s): " + e.getMessage(), e);
2936        }
2937
2938        try {
2939            getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId());
2940        } catch (SQLException JavaDoc e) {
2941            throw new JobPersistenceException("Couldn't delete fired trigger: "
2942                    + e.getMessage(), e);
2943        }
2944    }
2945
2946    /**
2947     * <P>
2948     * Get the driver delegate for DB operations.
2949     * </p>
2950     */

2951    protected DriverDelegate getDelegate() throws NoSuchDelegateException {
2952        if (null == delegate) {
2953            try {
2954                if(delegateClassName != null) {
2955                    delegateClass =
2956                        getClassLoadHelper().loadClass(delegateClassName);
2957                }
2958                
2959                Constructor JavaDoc ctor = null;
2960                Object JavaDoc[] ctorParams = null;
2961                if (canUseProperties()) {
2962                    Class JavaDoc[] ctorParamTypes = new Class JavaDoc[]{
2963                        Log.class, String JavaDoc.class, String JavaDoc.class, Boolean JavaDoc.class};
2964                    ctor = delegateClass.getConstructor(ctorParamTypes);
2965                    ctorParams = new Object JavaDoc[]{
2966                        getLog(), tablePrefix,
2967                        instanceId, new Boolean JavaDoc(canUseProperties())};
2968                } else {
2969                    Class JavaDoc[] ctorParamTypes = new Class JavaDoc[]{
2970                        Log.class, String JavaDoc.class, String JavaDoc.class};
2971                    ctor = delegateClass.getConstructor(ctorParamTypes);
2972                    ctorParams = new Object JavaDoc[]{getLog(), tablePrefix, instanceId};
2973                }
2974
2975                delegate = (DriverDelegate) ctor.newInstance(ctorParams);
2976            } catch (NoSuchMethodException JavaDoc e) {
2977                throw new NoSuchDelegateException(
2978                        "Couldn't find delegate constructor: " + e.getMessage());
2979            } catch (InstantiationException JavaDoc e) {
2980                throw new NoSuchDelegateException("Couldn't create delegate: "
2981                        + e.getMessage());
2982            } catch (IllegalAccessException JavaDoc e) {
2983                throw new NoSuchDelegateException("Couldn't create delegate: "
2984                        + e.getMessage());
2985            } catch (InvocationTargetException JavaDoc e) {
2986                throw new NoSuchDelegateException("Couldn't create delegate: "
2987                        + e.getMessage());
2988            } catch (ClassNotFoundException JavaDoc e) {
2989                throw new NoSuchDelegateException("Couldn't load delegate class: "
2990                        + e.getMessage());
2991            }
2992        }
2993
2994        return delegate;
2995    }
2996
2997    protected Semaphore getLockHandler() {
2998        return lockHandler;
2999    }
3000
3001    public void setLockHandler(Semaphore lockHandler) {
3002        this.lockHandler = lockHandler;
3003    }
3004
3005    //---------------------------------------------------------------------------
3006
// Management methods
3007
//---------------------------------------------------------------------------
3008

3009    protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
3010        boolean transOwner = false;
3011        Connection JavaDoc conn = getNonManagedTXConnection();
3012        try {
3013            RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
3014            
3015            // Before we make the potentially expensive call to acquire the
3016
// trigger lock, peek ahead to see if it is likely we would find
3017
// misfired triggers requiring recovery.
3018
int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
3019                getDelegate().countMisfiredTriggersInStates(
3020                    conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime()) :
3021                Integer.MAX_VALUE;
3022            
3023            if (misfireCount == 0) {
3024                getLog().debug(
3025                    "Found 0 triggers that missed their scheduled fire-time.");
3026            } else {
3027                transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
3028                
3029                result = recoverMisfiredJobs(conn, false);
3030            }
3031            
3032            commitConnection(conn);
3033            return result;
3034        } catch (JobPersistenceException e) {
3035            rollbackConnection(conn);
3036            throw e;
3037        } catch (SQLException JavaDoc e) {
3038            rollbackConnection(conn);
3039            throw new JobPersistenceException("Database error recovering from misfires.", e);
3040        } catch (RuntimeException JavaDoc e) {
3041            rollbackConnection(conn);
3042            throw new JobPersistenceException("Unexpected runtime exception: "
3043                    + e.getMessage(), e);
3044        } finally {
3045            try {
3046                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
3047            } finally {
3048                cleanupConnection(conn);
3049            }
3050        }
3051    }
3052
3053    protected void signalSchedulingChange() {
3054        signaler.signalSchedulingChange();
3055    }
3056
3057    //---------------------------------------------------------------------------
3058
// Cluster management methods
3059
//---------------------------------------------------------------------------
3060

3061    protected boolean firstCheckIn = true;
3062
3063    protected long lastCheckin = System.currentTimeMillis();
3064    
3065    protected boolean doCheckin() throws JobPersistenceException {
3066        boolean transOwner = false;
3067        boolean transStateOwner = false;
3068        boolean recovered = false;
3069
3070        Connection JavaDoc conn = getNonManagedTXConnection();
3071        try {
3072            // Other than the first time, always checkin first to make sure there is
3073
// work to be done before we aquire the lock (since that is expensive,
3074
// and is almost never necessary). This must be done in a separate
3075
// transaction to prevent a deadlock under recovery conditions.
3076
List JavaDoc failedRecords = null;
3077            if (firstCheckIn == false) {
3078                boolean succeeded = false;
3079                try {
3080                    failedRecords = clusterCheckIn(conn);
3081                    commitConnection(conn);
3082                    succeeded = true;
3083                } catch (JobPersistenceException e) {
3084                    rollbackConnection(conn);
3085                    throw e;
3086                } finally {
3087                    // Only cleanup the connection if we failed and are bailing
3088
// as we will otherwise continue to use it.
3089
if (succeeded == false) {
3090                        cleanupConnection(conn);
3091                    }
3092                }
3093            }
3094            
3095            if (firstCheckIn || (failedRecords.size() > 0)) {
3096                getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
3097                transStateOwner = true;
3098    
3099                // Now that we own the lock, make sure we still have work to do.
3100
// The first time through, we also need to make sure we update/create our state record
3101
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
3102    
3103                if (failedRecords.size() > 0) {
3104                    getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
3105                    //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
3106
transOwner = true;
3107    
3108                    clusterRecover(conn, failedRecords);
3109                    recovered = true;
3110                }
3111            }
3112            
3113            commitConnection(conn);
3114        } catch (JobPersistenceException e) {
3115            rollbackConnection(conn);
3116            throw e;
3117        } finally {
3118            try {
3119                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
3120            } finally {
3121                try {
3122                    releaseLock(conn, LOCK_STATE_ACCESS, transStateOwner);
3123                } finally {
3124                    cleanupConnection(conn);
3125                }
3126            }
3127        }
3128
3129        firstCheckIn = false;
3130
3131        return recovered;
3132    }
3133
3134    /**
3135     * Get a list of all scheduler instances in the cluster that may have failed.
3136     * This includes this scheduler if it is checking in for the first time.
3137     */

3138    protected List JavaDoc findFailedInstances(Connection JavaDoc conn)
3139        throws JobPersistenceException {
3140        try {
3141            List JavaDoc failedInstances = new LinkedList JavaDoc();
3142            boolean foundThisScheduler = false;
3143            long timeNow = System.currentTimeMillis();
3144            
3145            List JavaDoc states = getDelegate().selectSchedulerStateRecords(conn, null);
3146
3147            for (Iterator JavaDoc itr = states.iterator(); itr.hasNext();) {
3148                SchedulerStateRecord rec = (SchedulerStateRecord) itr.next();
3149        
3150                // find own record...
3151
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
3152                    foundThisScheduler = true;
3153                    if (firstCheckIn) {
3154                        failedInstances.add(rec);
3155                    }
3156                } else {
3157                    // find failed instances...
3158
if (calcFailedIfAfter(rec) < timeNow) {
3159                        failedInstances.add(rec);
3160                    }
3161                }
3162            }
3163            
3164            // The first time through, also check for orphaned fired triggers.
3165
if (firstCheckIn) {
3166                failedInstances.addAll(findOrphanedFailedInstances(conn, states));
3167            }
3168            
3169            // If not the first time but we didn't find our own instance, then
3170
// Someone must have done recovery for us.
3171
if ((foundThisScheduler == false) && (firstCheckIn == false)) {
3172                // TODO: revisit when handle self-failed-out implied (see TODO in clusterCheckIn() below)
3173
getLog().warn(
3174                    "This scheduler instance (" + getInstanceId() + ") is still " +
3175                    "active but was recovered by another instance in the cluster. " +
3176                    "This may cause inconsistent behavior.");
3177            }
3178            
3179            return failedInstances;
3180        } catch (Exception JavaDoc e) {
3181            lastCheckin = System.currentTimeMillis();
3182            throw new JobPersistenceException("Failure identifying failed instances when checking-in: "
3183                    + e.getMessage(), e);
3184        }
3185    }
3186    
3187    /**
3188     * Create dummy <code>SchedulerStateRecord</code> objects for fired triggers
3189     * that have no scheduler state record. Checkin timestamp and interval are
3190     * left as zero on these dummy <code>SchedulerStateRecord</code> objects.
3191     *
3192     * @param schedulerStateRecords List of all current <code>SchedulerStateRecords</code>
3193     */

3194    private List JavaDoc findOrphanedFailedInstances(
3195            Connection JavaDoc conn,
3196            List JavaDoc schedulerStateRecords)
3197        throws SQLException JavaDoc, NoSuchDelegateException {
3198        List JavaDoc orphanedInstances = new ArrayList JavaDoc();
3199        
3200        Set JavaDoc allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn);
3201        if (allFiredTriggerInstanceNames.isEmpty() == false) {
3202            for (Iterator JavaDoc schedulerStateIter = schedulerStateRecords.iterator();
3203                 schedulerStateIter.hasNext();) {
3204                SchedulerStateRecord rec = (SchedulerStateRecord)schedulerStateIter.next();
3205                
3206                allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId());
3207            }
3208            
3209            for (Iterator JavaDoc orphanIter = allFiredTriggerInstanceNames.iterator();
3210                 orphanIter.hasNext();) {
3211                
3212                SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
3213                orphanedInstance.setSchedulerInstanceId((String JavaDoc)orphanIter.next());
3214                
3215                orphanedInstances.add(orphanedInstance);
3216                
3217                getLog().warn(
3218                    "Found orphaned fired triggers for instance: " + orphanedInstance.getSchedulerInstanceId());
3219            }
3220        }
3221        
3222        return orphanedInstances;
3223    }
3224    
3225    protected long calcFailedIfAfter(SchedulerStateRecord rec) {
3226        return rec.getCheckinTimestamp() +
3227            Math.max(rec.getCheckinInterval(),
3228                    (System.currentTimeMillis() - lastCheckin)) +
3229            7500L;
3230    }
3231    
3232    protected List JavaDoc clusterCheckIn(Connection JavaDoc conn)
3233        throws JobPersistenceException {
3234
3235        List JavaDoc failedInstances = findFailedInstances(conn);
3236        
3237        try {
3238            // TODO: handle self-failed-out
3239

3240            // check in...
3241
lastCheckin = System.currentTimeMillis();
3242            if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
3243                getDelegate().insertSchedulerState(conn, getInstanceId(),
3244                        lastCheckin, getClusterCheckinInterval());
3245            }
3246            
3247        } catch (Exception JavaDoc e) {
3248            throw new JobPersistenceException("Failure updating scheduler state when checking-in: "
3249                    + e.getMessage(), e);
3250        }
3251
3252        return failedInstances;
3253    }
3254
3255    protected void clusterRecover(Connection JavaDoc conn, List JavaDoc failedInstances)
3256        throws JobPersistenceException {
3257
3258        if (failedInstances.size() > 0) {
3259
3260            long recoverIds = System.currentTimeMillis();
3261
3262            logWarnIfNonZero(failedInstances.size(),
3263                    "ClusterManager: detected " + failedInstances.size()
3264                            + " failed or restarted instances.");
3265            try {
3266                Iterator JavaDoc itr = failedInstances.iterator();
3267                while (itr.hasNext()) {
3268                    SchedulerStateRecord rec = (SchedulerStateRecord) itr
3269                            .next();
3270
3271                    getLog().info(
3272                            "ClusterManager: Scanning for instance \""
3273                                    + rec.getSchedulerInstanceId()
3274                                    + "\"'s failed in-progress jobs.");
3275
3276                    List JavaDoc firedTriggerRecs = getDelegate()
3277                            .selectInstancesFiredTriggerRecords(conn,
3278                                    rec.getSchedulerInstanceId());
3279
3280                    int acquiredCount = 0;
3281                    int recoveredCount = 0;
3282                    int otherCount = 0;
3283
3284                    Set JavaDoc triggerKeys = new HashSet JavaDoc();
3285                    
3286                    Iterator JavaDoc ftItr = firedTriggerRecs.iterator();
3287                    while (ftItr.hasNext()) {
3288                        FiredTriggerRecord ftRec = (FiredTriggerRecord) ftItr
3289                                .next();
3290
3291                        Key tKey = ftRec.getTriggerKey();
3292                        Key jKey = ftRec.getJobKey();
3293
3294                        triggerKeys.add(tKey);
3295                        
3296                        // release blocked triggers..
3297
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
3298                            getDelegate()
3299                                    .updateTriggerStatesForJobFromOtherState(
3300                                            conn, jKey.getName(),
3301                                            jKey.getGroup(), STATE_WAITING,
3302                                            STATE_BLOCKED);
3303                        } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
3304                            getDelegate()
3305                                    .updateTriggerStatesForJobFromOtherState(
3306                                            conn, jKey.getName(),
3307                                            jKey.getGroup(), STATE_PAUSED,
3308                                            STATE_PAUSED_BLOCKED);
3309                        }
3310
3311                        // release acquired triggers..
3312
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
3313                            getDelegate().updateTriggerStateFromOtherState(
3314                                    conn, tKey.getName(), tKey.getGroup(),
3315                                    STATE_WAITING, STATE_ACQUIRED);
3316                            acquiredCount++;
3317                        } else if (ftRec.isJobRequestsRecovery()) {
3318                            // handle jobs marked for recovery that were not fully
3319
// executed..
3320
if (jobExists(conn, jKey.getName(), jKey.getGroup())) {
3321                                SimpleTrigger rcvryTrig = new SimpleTrigger(
3322                                        "recover_"
3323                                                + rec.getSchedulerInstanceId()
3324                                                + "_"
3325                                                + String.valueOf(recoverIds++),
3326                                        Scheduler.DEFAULT_RECOVERY_GROUP,
3327                                        new Date JavaDoc(ftRec.getFireTimestamp()));
3328                                rcvryTrig.setJobName(jKey.getName());
3329                                rcvryTrig.setJobGroup(jKey.getGroup());
3330                                rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
3331                                rcvryTrig.setPriority(ftRec.getPriority());
3332                                JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
3333                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
3334                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
3335                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
3336                                rcvryTrig.setJobDataMap(jd);
3337
3338                                rcvryTrig.computeFirstFireTime(null);
3339                                storeTrigger(conn, null, rcvryTrig, null, false,
3340                                        STATE_WAITING, false, true);
3341                                recoveredCount++;
3342                            } else {
3343                                getLog()
3344                                        .warn(
3345                                                "ClusterManager: failed job '"
3346                                                        + jKey
3347                                                        + "' no longer exists, cannot schedule recovery.");
3348                                otherCount++;
3349                            }
3350                        } else {
3351                            otherCount++;
3352                        }
3353
3354                        // free up stateful job's triggers
3355
if (ftRec.isJobIsStateful()) {
3356                            getDelegate()
3357                                .updateTriggerStatesForJobFromOtherState(
3358                                        conn, jKey.getName(),
3359                                        jKey.getGroup(), STATE_WAITING,
3360                                        STATE_BLOCKED);
3361                            getDelegate()
3362                                .updateTriggerStatesForJobFromOtherState(
3363                                        conn, jKey.getName(),
3364                                        jKey.getGroup(), STATE_PAUSED,
3365                                        STATE_PAUSED_BLOCKED);
3366                        }
3367                    }
3368
3369                    getDelegate().deleteFiredTriggers(conn,
3370                            rec.getSchedulerInstanceId());
3371
3372                    // Check if any of the fired triggers we just deleted were the last fired trigger
3373
// records of a COMPLETE trigger.
3374
int completeCount = 0;
3375                    for (Iterator JavaDoc triggerKeyIter = triggerKeys.iterator(); triggerKeyIter.hasNext();) {
3376                        Key triggerKey = (Key)triggerKeyIter.next();
3377                        
3378                        if (getDelegate().selectTriggerState(conn, triggerKey.getName(), triggerKey.getGroup()).
3379                                equals(STATE_COMPLETE)) {
3380                            List JavaDoc firedTriggers =
3381                                getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
3382                            if (firedTriggers.isEmpty()) {
3383                                SchedulingContext schedulingContext = new SchedulingContext();
3384                                schedulingContext.setInstanceId(instanceId);
3385                                
3386                                if (removeTrigger(conn, schedulingContext, triggerKey.getName(), triggerKey.getGroup())) {
3387                                    completeCount++;
3388                                }
3389                            }
3390                        }
3391                    }
3392                    
3393                    logWarnIfNonZero(acquiredCount,
3394                            "ClusterManager: ......Freed " + acquiredCount
3395                                    + " acquired trigger(s).");
3396                    logWarnIfNonZero(completeCount,
3397                            "ClusterManager: ......Deleted " + completeCount
3398                                    + " complete triggers(s).");
3399                    logWarnIfNonZero(recoveredCount,
3400                            "ClusterManager: ......Scheduled " + recoveredCount
3401                                    + " recoverable job(s) for recovery.");
3402                    logWarnIfNonZero(otherCount,
3403                            "ClusterManager: ......Cleaned-up " + otherCount
3404                                    + " other failed job(s).");
3405
3406                    if (rec.getSchedulerInstanceId().equals(getInstanceId()) == false) {
3407                        getDelegate().deleteSchedulerState(conn,
3408                                rec.getSchedulerInstanceId());
3409                    }
3410                }
3411            } catch (Exception JavaDoc e) {
3412                throw new JobPersistenceException("Failure recovering jobs: "
3413                        + e.getMessage(), e);
3414            }
3415        }
3416    }
3417
3418    protected void logWarnIfNonZero(int val, String JavaDoc warning) {
3419        if (val > 0) {
3420            getLog().info(warning);
3421        } else {
3422            getLog().debug(warning);
3423        }
3424    }
3425
3426    /**
3427     * <p>
3428     * Cleanup the given database connection. This means restoring
3429     * any modified auto commit or transaction isolation connection
3430     * attributes, and then closing the underlying connection.
3431     * </p>
3432     *
3433     * <p>
3434     * This is separate from closeConnection() because the Spring
3435     * integration relies on being able to overload closeConnection() and
3436     * expects the same connection back that it originally returned
3437     * from the datasource.
3438     * </p>
3439     *
3440     * @see #closeConnection(Connection)
3441     */

3442    protected void cleanupConnection(Connection JavaDoc conn) {
3443        if (conn != null) {
3444            if (conn instanceof Proxy JavaDoc) {
3445                Proxy JavaDoc connProxy = (Proxy JavaDoc)conn;
3446                
3447                InvocationHandler JavaDoc invocationHandler =
3448                    Proxy.getInvocationHandler(connProxy);
3449                if (invocationHandler instanceof AttributeRestoringConnectionInvocationHandler) {
3450                    AttributeRestoringConnectionInvocationHandler connHandler =
3451                        (AttributeRestoringConnectionInvocationHandler)invocationHandler;
3452                        
3453                    connHandler.restoreOriginalAtributes();
3454                    closeConnection(connHandler.getWrappedConnection());
3455                    return;
3456                }
3457            }
3458            
3459            // Wan't a Proxy, or was a Proxy, but wasn't ours.
3460
closeConnection(conn);
3461        }
3462    }
3463    
3464    
3465    /**
3466     * Closes the supplied <code>Connection</code>.
3467     * <p>
3468     * Ignores a <code>null Connection</code>.
3469     * Any exception thrown trying to close the <code>Connection</code> is
3470     * logged and ignored.
3471     * </p>
3472     *
3473     * @param conn The <code>Connection</code> to close (Optional).
3474     */

3475    protected void closeConnection(Connection JavaDoc conn) {
3476        if (conn != null) {
3477            try {
3478                conn.close();
3479            } catch (SQLException JavaDoc e) {
3480                getLog().error("Failed to close Connection", e);
3481            } catch (Throwable JavaDoc e) {
3482                getLog().error(
3483                    "Unexpected exception closing Connection." +
3484                    " This is often due to a Connection being returned after or during shutdown.", e);
3485            }
3486        }
3487    }
3488
3489    /**
3490     * Rollback the supplied connection.
3491     *
3492     * <p>
3493     * Logs any SQLException it gets trying to rollback, but will not propogate
3494     * the exception lest it mask the exception that caused the caller to
3495     * need to rollback in the first place.
3496     * </p>
3497     *
3498     * @param conn (Optional)
3499     */

3500    protected void rollbackConnection(Connection JavaDoc conn) {
3501        if (conn != null) {
3502            try {
3503                conn.rollback();
3504            } catch (SQLException JavaDoc e) {
3505                getLog().error(
3506                    "Couldn't rollback jdbc connection. "+e.getMessage(), e);
3507            }
3508        }
3509    }
3510    
3511    /**
3512     * Commit the supplied connection
3513     *
3514     * @param conn (Optional)
3515     * @throws JobPersistenceException thrown if a SQLException occurs when the
3516     * connection is committed
3517     */

3518    protected void commitConnection(Connection JavaDoc conn)
3519        throws JobPersistenceException {
3520
3521        if (conn != null) {
3522            try {
3523                conn.commit();
3524            } catch (SQLException JavaDoc e) {
3525                throw new JobPersistenceException(
3526                    "Couldn't commit jdbc connection. "+e.getMessage(), e);
3527            }
3528        }
3529    }
3530    
3531    /**
3532     * Implement this interface to provide the code to execute within
3533     * the a transaction template. If no return value is required, execute
3534     * should just return null.
3535     *
3536     * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
3537     * @see JobStoreSupport#executeInLock(String, TransactionCallback)
3538     * @see JobStoreSupport#executeWithoutLock(TransactionCallback)
3539     */

3540    protected interface TransactionCallback {
3541        Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException;
3542    }
3543
3544    /**
3545     * Implement this interface to provide the code to execute within
3546     * the a transaction template that has no return value.
3547     *
3548     * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
3549     */

3550    protected interface VoidTransactionCallback {
3551        void execute(Connection JavaDoc conn) throws JobPersistenceException;
3552    }
3553
3554    /**
3555     * Execute the given callback in a transaction. Depending on the JobStore,
3556     * the surrounding transaction may be assumed to be already present
3557     * (managed).
3558     *
3559     * <p>
3560     * This method just forwards to executeInLock() with a null lockName.
3561     * </p>
3562     *
3563     * @see #executeInLock(String, TransactionCallback)
3564     */

3565    public Object JavaDoc executeWithoutLock(
3566        TransactionCallback txCallback) throws JobPersistenceException {
3567        return executeInLock(null, txCallback);
3568    }
3569
3570    /**
3571     * Execute the given callback having aquired the given lock.
3572     * Depending on the JobStore, the surrounding transaction may be
3573     * assumed to be already present (managed). This version is just a
3574     * handy wrapper around executeInLock that doesn't require a return
3575     * value.
3576     *
3577     * @param lockName The name of the lock to aquire, for example
3578     * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3579     * lockCallback is still executed in a transaction.
3580     *
3581     * @see #executeInLock(String, TransactionCallback)
3582     */

3583    protected void executeInLock(
3584            final String JavaDoc lockName,
3585            final VoidTransactionCallback txCallback) throws JobPersistenceException {
3586        executeInLock(
3587            lockName,
3588            new TransactionCallback() {
3589                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
3590                    txCallback.execute(conn);
3591                    return null;
3592                }
3593            });
3594    }
3595    
3596    /**
3597     * Execute the given callback having aquired the given lock.
3598     * Depending on the JobStore, the surrounding transaction may be
3599     * assumed to be already present (managed).
3600     *
3601     * @param lockName The name of the lock to aquire, for example
3602     * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3603     * lockCallback is still executed in a transaction.
3604     */

3605    protected abstract Object JavaDoc executeInLock(
3606        String JavaDoc lockName,
3607        TransactionCallback txCallback) throws JobPersistenceException;
3608    
3609    /**
3610     * Execute the given callback having optionally aquired the given lock.
3611     * This uses the non-managed transaction connection. This version is just a
3612     * handy wrapper around executeInNonManagedTXLock that doesn't require a return
3613     * value.
3614     *
3615     * @param lockName The name of the lock to aquire, for example
3616     * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3617     * lockCallback is still executed in a non-managed transaction.
3618     *
3619     * @see #executeInNonManagedTXLock(String, TransactionCallback)
3620     */

3621    protected void executeInNonManagedTXLock(
3622            final String JavaDoc lockName,
3623            final VoidTransactionCallback txCallback) throws JobPersistenceException {
3624        executeInNonManagedTXLock(
3625            lockName,
3626            new TransactionCallback() {
3627                public Object JavaDoc execute(Connection JavaDoc conn) throws JobPersistenceException {
3628                    txCallback.execute(conn);
3629                    return null;
3630                }
3631            });
3632    }
3633    
3634    /**
3635     * Execute the given callback having optionally aquired the given lock.
3636     * This uses the non-managed transaction connection.
3637     *
3638     * @param lockName The name of the lock to aquire, for example
3639     * "TRIGGER_ACCESS". If null, then no lock is aquired, but the
3640     * lockCallback is still executed in a non-managed transaction.
3641     */

3642    protected Object JavaDoc executeInNonManagedTXLock(
3643            String JavaDoc lockName,
3644            TransactionCallback txCallback) throws JobPersistenceException {
3645        boolean transOwner = false;
3646        Connection JavaDoc conn = null;
3647        try {
3648            if (lockName != null) {
3649                // If we aren't using db locks, then delay getting DB connection
3650
// until after aquiring the lock since it isn't needed.
3651
if (getLockHandler().requiresConnection()) {
3652                    conn = getNonManagedTXConnection();
3653                }
3654                
3655                transOwner = getLockHandler().obtainLock(conn, lockName);
3656            }
3657            
3658            if (conn == null) {
3659                conn = getNonManagedTXConnection();
3660            }
3661            
3662            Object JavaDoc result = txCallback.execute(conn);
3663            commitConnection(conn);
3664            return result;
3665        } catch (JobPersistenceException e) {
3666            rollbackConnection(conn);
3667            throw e;
3668        } catch (RuntimeException JavaDoc e) {
3669            rollbackConnection(conn);
3670            throw new JobPersistenceException("Unexpected runtime exception: "
3671                    + e.getMessage(), e);
3672        } finally {
3673            try {
3674                releaseLock(conn, lockName, transOwner);
3675            } finally {
3676                cleanupConnection(conn);
3677            }
3678        }
3679    }
3680    
3681    /////////////////////////////////////////////////////////////////////////////
3682
//
3683
// ClusterManager Thread
3684
//
3685
/////////////////////////////////////////////////////////////////////////////
3686

3687    class ClusterManager extends Thread JavaDoc {
3688
3689        private boolean shutdown = false;
3690
3691        private int numFails = 0;
3692        
3693        ClusterManager() {
3694            this.setPriority(Thread.NORM_PRIORITY + 2);
3695            this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_ClusterManager");
3696            this.setDaemon(getMakeThreadsDaemons());
3697        }
3698
3699        public void initialize() {
3700            this.manage();
3701            this.start();
3702        }
3703
3704        public void shutdown() {
3705            shutdown = true;
3706            this.interrupt();
3707        }
3708
3709        private boolean manage() {
3710            boolean res = false;
3711            try {
3712
3713                res = doCheckin();
3714
3715                numFails = 0;
3716                getLog().debug("ClusterManager: Check-in complete.");
3717            } catch (Exception JavaDoc e) {
3718                if(numFails % 4 == 0) {
3719                    getLog().error(
3720                        "ClusterManager: Error managing cluster: "
3721                                + e.getMessage(), e);
3722                }
3723                numFails++;
3724            }
3725            return res;
3726        }
3727
3728        public void run() {
3729            while (!shutdown) {
3730
3731                if (!shutdown) {
3732                    long timeToSleep = getClusterCheckinInterval();
3733                    long transpiredTime = (System.currentTimeMillis() - lastCheckin);
3734                    timeToSleep = timeToSleep - transpiredTime;
3735                    if (timeToSleep <= 0) {
3736                        timeToSleep = 100L;
3737                    }
3738
3739                    if(numFails > 0) {
3740                        timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
3741                    }
3742                    
3743                    try {
3744                        Thread.sleep(timeToSleep);
3745                    } catch (Exception JavaDoc ignore) {
3746                    }
3747                }
3748
3749                if (!shutdown && this.manage()) {
3750                    signalSchedulingChange();
3751                }
3752
3753            }//while !shutdown
3754
}
3755    }
3756
3757    /////////////////////////////////////////////////////////////////////////////
3758
//
3759
// MisfireHandler Thread
3760
//
3761
/////////////////////////////////////////////////////////////////////////////
3762

3763    class MisfireHandler extends Thread JavaDoc {
3764
3765        private boolean shutdown = false;
3766
3767        private int numFails = 0;
3768        
3769
3770        MisfireHandler() {
3771            this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_MisfireHandler");
3772            this.setDaemon(getMakeThreadsDaemons());
3773        }
3774
3775        public void initialize() {
3776            //this.manage();
3777
this.start();
3778        }
3779
3780        public void shutdown() {
3781            shutdown = true;
3782            this.interrupt();
3783        }
3784
3785        private RecoverMisfiredJobsResult manage() {
3786            try {
3787                getLog().debug("MisfireHandler: scanning for misfires...");
3788
3789                RecoverMisfiredJobsResult res = doRecoverMisfires();
3790                numFails = 0;
3791                return res;
3792            } catch (Exception JavaDoc e) {
3793                if(numFails % 4 == 0) {
3794                    getLog().error(
3795                        "MisfireHandler: Error handling misfires: "
3796                                + e.getMessage(), e);
3797                }
3798                numFails++;
3799            }
3800            return RecoverMisfiredJobsResult.NO_OP;
3801        }
3802
3803        public void run() {
3804            
3805            while (!shutdown) {
3806
3807                long sTime = System.currentTimeMillis();
3808
3809                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
3810
3811                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
3812                    signalSchedulingChange();
3813                }
3814
3815                if (!shutdown) {
3816                    long timeToSleep = 50l; // At least a short pause to help balance threads
3817
if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
3818                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
3819                        if (timeToSleep <= 0) {
3820                            timeToSleep = 50l;
3821                        }
3822
3823                        if(numFails > 0) {
3824                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
3825                        }
3826                    }
3827                    
3828                    try {
3829                        Thread.sleep(timeToSleep);
3830                    } catch (Exception JavaDoc ignore) {
3831                    }
3832                }//while !shutdown
3833
}
3834        }
3835    }
3836}
3837
3838// EOF
3839
Popular Tags