KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > jcorporate > expresso > core > utility > JobHandler


1 /* ====================================================================
2  * The Jcorporate Apache Style Software License, Version 1.2 05-07-2002
3  *
4  * Copyright (c) 1995-2002 Jcorporate Ltd. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  *
18  * 3. The end-user documentation included with the redistribution,
19  * if any, must include the following acknowledgment:
20  * "This product includes software developed by Jcorporate Ltd.
21  * (http://www.jcorporate.com/)."
22  * Alternately, this acknowledgment may appear in the software itself,
23  * if and wherever such third-party acknowledgments normally appear.
24  *
25  * 4. "Jcorporate" and product names such as "Expresso" must
26  * not be used to endorse or promote products derived from this
27  * software without prior written permission. For written permission,
28  * please contact info@jcorporate.com.
29  *
30  * 5. Products derived from this software may not be called "Expresso",
31  * or other Jcorporate product names; nor may "Expresso" or other
32  * Jcorporate product names appear in their name, without prior
33  * written permission of Jcorporate Ltd.
34  *
35  * 6. No product derived from this software may compete in the same
36  * market space, i.e. framework, without prior written permission
37  * of Jcorporate Ltd. For written permission, please contact
38  * partners@jcorporate.com.
39  *
40  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
41  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
42  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
43  * DISCLAIMED. IN NO EVENT SHALL JCORPORATE LTD OR ITS CONTRIBUTORS
44  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
46  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
47  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
48  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
49  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
50  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
51  * SUCH DAMAGE.
52  * ====================================================================
53  *
54  * This software consists of voluntary contributions made by many
55  * individuals on behalf of the Jcorporate Ltd. Contributions back
56  * to the project(s) are encouraged when you make modifications.
57  * Please send them to support@jcorporate.com. For more information
58  * on Jcorporate Ltd. and its products, please see
59  * <http://www.jcorporate.com/>.
60  *
61  * Portions of this software are based upon other open source
62  * products and are subject to their respective licenses.
63  */

64
65 package com.jcorporate.expresso.core.utility;
66
67 import com.jcorporate.expresso.core.db.DBConnection;
68 import com.jcorporate.expresso.core.db.DBConnectionPool;
69 import com.jcorporate.expresso.core.db.DBException;
70 import com.jcorporate.expresso.core.dbobj.NextNumber;
71 import com.jcorporate.expresso.core.dbobj.SecuredDBObject;
72 import com.jcorporate.expresso.core.job.Job;
73 import com.jcorporate.expresso.core.job.ServerException;
74 import com.jcorporate.expresso.core.misc.ConfigManager;
75 import com.jcorporate.expresso.core.misc.DateTime;
76 import com.jcorporate.expresso.core.misc.StringUtil;
77 import com.jcorporate.expresso.core.registry.ExpressoThread;
78 import com.jcorporate.expresso.core.registry.MutableRequestRegistry;
79 import com.jcorporate.expresso.core.security.SuperUser;
80 import com.jcorporate.expresso.kernel.LogManager;
81 import com.jcorporate.expresso.kernel.util.FastStringBuffer;
82 import com.jcorporate.expresso.services.crontab.Crontab;
83 import com.jcorporate.expresso.services.crontab.CrontabEntry;
84 import com.jcorporate.expresso.services.crontab.CrontabListenerI;
85 import com.jcorporate.expresso.services.dbobj.JobHandlerControl;
86 import com.jcorporate.expresso.services.dbobj.JobHandlerRegistry;
87 import com.jcorporate.expresso.services.dbobj.JobQueue;
88 import com.jcorporate.expresso.services.dbobj.Setup;
89 import org.apache.log4j.Logger;
90
91 import java.io.BufferedReader JavaDoc;
92 import java.io.File JavaDoc;
93 import java.io.IOException JavaDoc;
94 import java.io.InputStreamReader JavaDoc;
95 import java.util.Enumeration JavaDoc;
96 import java.util.Hashtable JavaDoc;
97 import java.util.Iterator JavaDoc;
98 import java.util.StringTokenizer JavaDoc;
99 import java.util.Vector JavaDoc;
100
101
102 /**
103  * The JobHandler is an object which runs constantly on the server side,
104  * looking periodically in a job queue for work to do. If it finds work to do,
105  * it spawns a new server object as specified in the queue. It may either wait
106  * for that object to complete or spawn it as a seperate thread, depending on
107  * setup options for the object being spawned
108  *
109  * @author Michael Nash
110  */

111 public class JobHandler extends ExpressoThread {
112     /**
113      * The list of JobHandler servers
114      */

115     protected static Vector JavaDoc serverList = new Vector JavaDoc(3);
116
117     /**
118      * The log4j Logger
119      */

120     protected static final Logger log = Logger.getLogger(JobHandler.class);
121
122     /**
123      * Job Handler Register
124      */

125     private static JobHandlerRegistry m_jobHandlerRegistry = null;
126
127     /**
128      * Job Handler Control instance
129      */

130     private static JobHandlerControl m_jobHandlerControl = null;
131
132     /**
133      * The crontab for this job handler
134      */

135     protected Crontab m_cronMgr = null;
136
137     /**
138      * Current handler status
139      */

140     protected String JavaDoc m_jobHandlerStatus = "unknown";
141
142     /**
143      * Status flag for if the server is running
144      */

145     protected boolean running;
146
147     /**
148      * The server id
149      */

150     protected long m_serverID = -1; /* ID number of this server instance */
151
152     /**
153      * Data Context
154      */

155     private String JavaDoc dbName = null;
156
157     /**
158      * Scan criteria string
159      */

160     private String JavaDoc m_qScanCriteria = null;
161
162     /**
163      * default interval and max numbers - read from setup once we connect to
164      * database
165      */

166     private int interval = 30;
167
168     /**
169      * Maximum number of simultaneous jobs
170      */

171     private int maxJobs = 3;
172
173     /**
174      * Constructor
175      *
176      * @param dbName The data context for this job handler to run in.
177      * @throws DBException If the database cannot be contacted
178      * @throws ServerException If another uncaught exception occurs
179      */

180     public JobHandler(String JavaDoc dbName) throws DBException, ServerException {
181         if (log.isDebugEnabled()) {
182             log.debug("Job Handler for context '" + dbName + "' instantiated.");
183         }
184
185         setDataContext(dbName);
186
187         /* Read how often we are supposed to check the queue */
188         String JavaDoc intervalString = Setup.getValueRequired(getDataContext(),
189                 "TimerInterval");
190
191         if (!intervalString.equals("")) {
192             try {
193                 interval = new Integer JavaDoc(intervalString).intValue();
194             } catch (NumberFormatException JavaDoc ne) {
195                 throw new ServerException("Cannot convert " + intervalString +
196                         " to a number");
197             }
198         }
199
200         /* See how many jobs we are able to run at once */
201         String JavaDoc maxJobsString = Setup.getValueRequired(getDataContext(),
202                 "MaxJobs");
203
204         if (!maxJobsString.equals("")) {
205             try {
206                 maxJobs = new Integer JavaDoc(maxJobsString).intValue();
207             } catch (NumberFormatException JavaDoc ne) {
208                 throw new ServerException("Cannot convert " + maxJobsString +
209                         " to a number");
210             }
211         }
212
213         /* Register Job Handler */
214         registerJobHandler();
215
216         // init cron mgr
217
synchronized (JobHandler.class) {
218             m_cronMgr = new Crontab();
219             m_jobHandlerControl = new JobHandlerControl();
220             m_jobHandlerControl.setDataContext(dbName);
221         }
222     } /* JobHandler(String) */
223
224     /**
225      * Sets the Job Handler's data context
226      *
227      * @param newDBName The new data context name
228      */

229     public void setDataContext(String JavaDoc newDBName) {
230         if (StringUtil.notNull(newDBName).equals("")) {
231             log.error("Must specify db/context");
232             dbName = "default";
233         } else {
234             dbName = newDBName;
235         }
236     } /* setDBName(String) */
237
238     /**
239      * Sets the Job Handler Id
240      *
241      * @param num long integer.
242      */

243     public void setID(long num) {
244         m_serverID = num;
245     }
246
247     /**
248      * Retrieve the server id
249      *
250      * @return The Job Handler Server ID Number
251      */

252     public long getID() {
253         return m_serverID;
254     }
255
256
257     /**
258      * Retrieve the crontab for this job handler. Excessive use of this function
259      * may not be threadsafe. Use only for necessary management of the cron
260      * handler and sychronize all modification acccess against the instance.
261      *
262      * @return Crontab instance.
263      */

264     public Crontab getCronManager() {
265         return this.m_cronMgr;
266     }
267
268     /**
269      * Return a list of current Job objects
270      *
271      * @return Vector A list of the currently executing jobs
272      * @throws ServerException If the list cannot be obtained
273      */

274     public Vector JavaDoc getServerList() throws ServerException {
275         return serverList;
276     } /* getServerList() */
277
278     /**
279      * Main method so that JobHandler can be launched from a command line
280      *
281      * @param args Command line arguments to supply the information to connect
282      * to the database
283      */

284     public static void main(String JavaDoc[] args) {
285         System.out.println("JobHandler");
286
287         Hashtable JavaDoc commandArgs = new Hashtable JavaDoc(10);
288         String JavaDoc paramCode = null;
289         String JavaDoc paramValue = null;
290
291         for (int i = 0; i < args.length; i++) {
292             StringTokenizer JavaDoc stk = new StringTokenizer JavaDoc(args[i], "=");
293
294             if (!stk.hasMoreTokens()) {
295                 usage();
296             }
297
298             paramCode = stk.nextToken();
299
300             if (!stk.hasMoreTokens()) {
301                 usage();
302             }
303
304             paramValue = stk.nextToken();
305             commandArgs.put(paramCode, paramValue);
306         }
307         /* for each command-line argument */
308
309         try {
310             String JavaDoc logDir = (String JavaDoc) commandArgs.get("logDir");
311             String JavaDoc logConfig = (String JavaDoc) commandArgs.get("logConfig");
312
313             if (logConfig == null) {
314                 logConfig = (String JavaDoc) commandArgs.get("configDir") +
315                         "/expressoLogging.xml";
316             }
317
318             new LogManager(logConfig, logDir);
319
320             // set up ConfigManager first
321
ConfigManager.load(getConfigDir(commandArgs));
322
323             // set up ConfigManager first
324
ConfigManager.setWebAppDir(getAppDir(commandArgs));
325
326             //initialize the db pool
327
ConfigManager.dbInitialize();
328
329             //call the LoadLists method and load up the lists used by this application
330
String JavaDoc dbName = (String JavaDoc) commandArgs.get("db");
331
332             if (dbName == null) {
333                 dbName = "default";
334             }
335
336             resetQ(dbName);
337             log.info("Jobhandler running on database '" + dbName + "'");
338             log.info("JobHandler running...");
339
340             JobHandler ts = new JobHandler(dbName);
341             ts.run();
342         } catch (Exception JavaDoc de) {
343             de.printStackTrace(System.err);
344             log.error("Server Error", de);
345             System.exit(1);
346         }
347     } /* main(String) */
348
349     /**
350      * Resets any 'running' jobs on startup to available.
351      *
352      * @param dbName The data context to reset.
353      * @throws DBException upon error updating the appropriate fields
354      */

355     public static void resetQ(String JavaDoc dbName) throws DBException {
356         // reset JobQueue
357
JobQueue jql = new JobQueue(SecuredDBObject.SYSTEM_ACCOUNT);
358         jql.setDataContext(dbName);
359         jql.setField("StatusCode", JobQueue.JOB_STATUS_RUNNING);
360
361         JobQueue jq = null;
362
363         for (Iterator JavaDoc e = jql.searchAndRetrieveList().iterator(); e.hasNext();) {
364             jq = (JobQueue) e.next();
365             jq.setField("StatusCode", JobQueue.JOB_STATUS_AVAILABLE);
366             jq.setField("ServerID", 0);
367             jq.update();
368         }
369
370         // reset ControlQ
371
JobHandlerControl jhc = new JobHandlerControl(SecuredDBObject.SYSTEM_ACCOUNT);
372         jhc.setDataContext(dbName);
373         jhc.setField(JobHandlerControl.FLD_STATUS_CODE,
374                 JobHandlerControl.JOB_STATUS_RUNNING);
375
376         JobHandlerControl oneEntry = null;
377
378         for (Iterator JavaDoc e = jhc.searchAndRetrieveList().iterator(); e.hasNext();) {
379             oneEntry = (JobHandlerControl) e.next();
380             oneEntry.setField(JobHandlerControl.FLD_STATUS_CODE,
381                     JobHandlerControl.JOB_STATUS_AVAILABLE);
382             oneEntry.setField(JobHandlerControl.FLD_SERVERID, 0);
383             oneEntry.update();
384         }
385     }
386
387     /**
388      * Useful for determining current state from the outside. <b>Not thread
389      * safe!</b> Use it for debug purposes only, don't actually base decisions
390      * to start or stop the job handler based upon this.
391      *
392      * @return the current status code of the JobHandler.
393      */

394     public String JavaDoc getStatus() {
395         return m_jobHandlerStatus;
396     }
397
398     /**
399      * Register JobHandler in the DataBase.
400      */

401     public synchronized void registerJobHandler() {
402         synchronized (JobHandler.class) {
403             if (m_jobHandlerRegistry == null) {
404                 try {
405                     String JavaDoc osName = SystemInfo.getOSName();
406                     String JavaDoc hostName = SystemInfo.getHostName();
407
408                     if (log.isInfoEnabled()) {
409                         log.info("Registering Job Handler:");
410                         log.info("host: " + hostName + " serverID: " + getID() +
411                                 " OS: " + osName);
412                     }
413
414                     m_jobHandlerRegistry = new JobHandlerRegistry();
415                     m_jobHandlerRegistry.setDataContext(this.getDataContext());
416                     m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_OSNAME,
417                             osName);
418                     m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_HOSTNAME,
419                             hostName);
420
421                     boolean found = m_jobHandlerRegistry.find();
422                     String JavaDoc serverID = null;
423                     long uniqNum = -1;
424
425                     if (!found) {
426                         NextNumber myNext = NextNumber.getInstance();
427                         uniqNum = myNext.getNext(getDataContext(),
428                                 m_jobHandlerRegistry,
429                                 JobHandlerRegistry.FLD_SERVERID);
430                         setID(uniqNum);
431                         serverID = String.valueOf(getID());
432                         m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_SERVERID,
433                                 serverID);
434                         m_jobHandlerRegistry.add();
435                     } else {
436                         serverID = m_jobHandlerRegistry.getField(JobHandlerRegistry.FLD_SERVERID);
437                         uniqNum = Long.valueOf(serverID).longValue();
438                         setID(uniqNum);
439                     }
440
441                     if (getID() <= 0) {
442                         if (log.isInfoEnabled()) {
443                             log.error("Unable to allocate server ID");
444                         }
445                     } else {
446                         if (log.isInfoEnabled()) {
447                             log.info("Job Handler Server Instance ID:" +
448                                     uniqNum);
449                         }
450                     }
451
452                     setStatus("Enabled");
453                 } catch (DBException dbe) {
454                     log.error("Cannot registry Job Handler, reason: ");
455                     dbe.printStackTrace();
456                 }
457             }
458         }
459     }
460
461     /**
462      * Main thread process of the JobHandler process
463      */

464     public void run() {
465         super.run();
466
467         if (log.isDebugEnabled()) {
468             log.debug("Job Handler for '" + getDataContext() + "' started.");
469         }
470
471         this.running = true;
472
473         //Set the default statuses for this thread to superuser
474
//and the context specified.
475
new MutableRequestRegistry(dbName, SuperUser.SUPER_USER);
476
477         try {
478             resetCurrentJobs();
479
480             while (running) {
481                 try {
482                     updateJobHandlerStatus();
483                     checkQueue();
484                     checkControlQueue();
485                 } catch (ServerException ae) {
486                     // try {
487
log.error("Unable to check queue or run entry. " +
488                             "Entry retained for retry", ae);
489
490                     // Event myEvent = new Event(dbName, "SYSERROR",
491
// "Unable to check queue:" +
492
// ae.getMessage(), false);
493
log.error(ae);
494
495                     // } catch (DBException de2) {
496
// log.error(de2);
497
//}
498
}
499
500                 yield();
501                 sleep(interval * 1000);
502             } /* forever */
503         } catch (InterruptedException JavaDoc ie) {
504             log.info("JobHandler was interrupted."); // standard procedure during shutdown
505
} catch (Throwable JavaDoc t) {
506             log.error("Unable to start job handler correctly", t);
507         }
508     } /* run() */
509
510     /**
511      * mark thread for shutdown and interrupt
512      */

513     public void shutDown() {
514         m_cronMgr.removeAllCrontabEntries();
515         this.running = false;
516
517         if (log.isInfoEnabled()) {
518             log.info("JobHandler for context '" + getDataContext() +
519                     "' shutting down.");
520         }
521         interrupt();
522     }
523
524     /**
525      * Marks all current running jobs as "Available"
526      *
527      * @throws DBException upon data access error
528      */

529     protected synchronized void resetCurrentJobs() throws DBException {
530         /* Reset any current jobs */
531         JobQueue jql = new JobQueue(SecuredDBObject.SYSTEM_ACCOUNT);
532         jql.setDataContext(getDataContext());
533
534         try {
535             jql.setField(JobQueue.FLD_STATUS_CODE, JobQueue.JOB_STATUS_RUNNING);
536         } catch (DBException ex) {
537             //
538
//We've got this intermittant race condition that I can't seem
539
//to track down where the JobQueue Metadata isn't getting
540
//initialized before this thread attempts to set the field.
541
//So pending actually finding out where this race condition is
542
//taking place, we'll try a sleep for a second and then retry once.
543
//
544
try {
545                 sleep(1000);
546             } catch (InterruptedException JavaDoc ex1) {
547             }
548
549             jql.setField(JobQueue.FLD_STATUS_CODE, JobQueue.JOB_STATUS_RUNNING);
550         }
551
552         JobQueue jq = null;
553
554         for (Iterator JavaDoc j = jql.searchAndRetrieveList().iterator(); j.hasNext();) {
555             jq = (JobQueue) j.next();
556             jq.setField("StatusCode", JobQueue.JOB_STATUS_AVAILABLE);
557             jq.setField("ServerID", 0);
558             jq.update();
559         }
560     }
561
562     /**
563      * For startup command line parsing.
564      *
565      * @param commandArgs a hashtable parsed from the command line.
566      * @return a string that is the full path name to the application directory
567      * @throws IOException upon error
568      */

569     private static String JavaDoc getAppDir(Hashtable JavaDoc commandArgs)
570             throws IOException JavaDoc {
571         String JavaDoc webAppDir = StringUtil.notNull((String JavaDoc) commandArgs.get("webAppDir"));
572
573         if (webAppDir.equals("")) {
574             boolean noDir = true;
575             System.out.println("No webAppDir=xxx command line argument was found.");
576
577             while (noDir) {
578                 System.out.print("Please enter Expresso web application root directory:");
579
580                 BufferedReader JavaDoc br = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
581                 webAppDir = br.readLine();
582
583                 if (StringUtil.notNull(webAppDir).equals("")) {
584                     System.out.println("You must enter a directory name, not blank");
585
586                     continue;
587                 }
588
589                 File JavaDoc theDir = new File JavaDoc(webAppDir);
590
591                 if (!theDir.isDirectory()) {
592                     System.out.println(webAppDir +
593                             " is not a valid directory. ");
594
595                     continue;
596                 } else {
597                     noDir = false;
598                 }
599             }
600         }
601
602         return webAppDir;
603     } /* getWebAppDir(Hashtable) */
604
605     /**
606      * For startup command line parsing.
607      *
608      * @param commandArgs a hashtable that contains all the key/values parsed
609      * from the command line.
610      * @return the configuration directory
611      * @throws IOException upon error locating the config directory
612      */

613     private static String JavaDoc getConfigDir(Hashtable JavaDoc commandArgs)
614             throws IOException JavaDoc {
615         String JavaDoc configDir = StringUtil.notNull((String JavaDoc) commandArgs.get("configDir"));
616
617         if (configDir.equals("")) {
618             boolean noDir = true;
619             System.out.println("No configDir=xxx command line argument was found.");
620
621             while (noDir) {
622                 System.out.print("Please enter Expresso configuration directory:");
623
624                 BufferedReader JavaDoc br = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
625                 configDir = br.readLine();
626
627                 if (StringUtil.notNull(configDir).equals("")) {
628                     System.out.println("You must enter a directory name, not blank");
629
630                     continue;
631                 }
632
633                 File JavaDoc theDir = new File JavaDoc(configDir);
634
635                 if (!theDir.isDirectory()) {
636                     System.out.println(configDir +
637                             " is not a valid directory. ");
638
639                     continue;
640                 } else {
641                     noDir = false;
642                 }
643             }
644         }
645
646         return configDir;
647     } /* getConfigDir(Hashtable) */
648
649     private int getCurrJobNum() {
650         int load = getServerListSize();
651
652         return load;
653     }
654
655     /**
656      * @return the current data context for this JobHandler
657      */

658     private String JavaDoc getDataContext() {
659         if (StringUtil.notNull(dbName).equals("")) {
660             throw new IllegalArgumentException JavaDoc("No db/context was set");
661         }
662
663         return dbName;
664     } /* getDBName() */
665
666     /**
667      * Builds the custom where clause to locate jobs that are ready to be
668      * executed.
669      *
670      * @return a custom where clause for finding jobs that are ready to be used
671      */

672     private synchronized String JavaDoc getQueueScanCriteria() {
673         if (m_qScanCriteria == null) {
674             FastStringBuffer fsb = new FastStringBuffer(256);
675             fsb.append("StatusCode='");
676             fsb.append(JobQueue.JOB_STATUS_AVAILABLE);
677             fsb.append("' AND ServerID=0 AND JobOSName in (");
678             fsb.append("'");
679             fsb.append(SystemInfo.getOSName());
680             fsb.append("','");
681             fsb.append(SystemInfo.getOSClass());
682             fsb.append("','any')");
683
684             // inJobOSName += "'" + SystemInfo.getOSClass() + "',";
685
// inJobOSName += "'any'";
686
// m_qScanCriteria = "StatusCode='A' AND ServerID=0 AND JobOSName in (" +
687
// inJobOSName + ")";
688
m_qScanCriteria = fsb.toString();
689         }
690
691         return m_qScanCriteria;
692     }
693
694     private int getServerListSize() {
695         int jobListSize = 0;
696
697         try {
698             jobListSize = getServerList().size();
699         } catch (ServerException se) {
700             log.error("Cannot get size of server job list, reason:");
701             se.printStackTrace();
702         }
703
704         return jobListSize;
705     }
706
707     private void setStatus(String JavaDoc status) {
708         m_jobHandlerStatus = status;
709     }
710
711     private static String JavaDoc getTimeStamp() {
712         try {
713             String JavaDoc timeStamp = DateTime.getDateTimeForDB("default");
714
715             return timeStamp;
716         } catch (DBException dbe) {
717             return "";
718         }
719     }
720
721     /**
722      * Check the job control queue for special commands coming down the pipe
723      *
724      * @throws DBException if there's an error.
725      */

726     private synchronized void checkControlQueue() throws DBException {
727         synchronized (JobHandler.class) {
728             String JavaDoc sID = String.valueOf(getID());
729
730             try {
731                 m_jobHandlerControl.setField(JobHandlerControl.FLD_SERVERID, sID);
732                 m_jobHandlerControl.setField(JobHandlerControl.FLD_STATUS_CODE,
733                         JobHandlerControl.JOB_STATUS_AVAILABLE);
734                 m_jobHandlerControl.setDataContext(this.getDataContext());
735             } catch (DBException ex) {
736                 log.error("Error checking control queue", ex);
737                 throw ex;
738             }
739
740             for (Iterator JavaDoc e = m_jobHandlerControl.searchAndRetrieveList(JobHandlerControl.FLD_REQUEST_TIME).iterator();
741                  e.hasNext();) {
742                 JobHandlerControl ctrlEntry = (JobHandlerControl) e.next();
743                 String JavaDoc operation = ctrlEntry.getField(JobHandlerControl.FLD_COMMAND);
744
745                 if (!operation.equals("")) {
746                     // clear the operation field for next update
747
ctrlEntry.setField(JobHandlerControl.FLD_STATUS_CODE,
748                             JobHandlerControl.JOB_STATUS_RUNNING);
749                     ctrlEntry.update();
750
751                     if (log.isInfoEnabled()) {
752                         log.info("Going to proceed with Job Handler Operation: " +
753                                 operation);
754                     }
755
756                     doOperation(operation);
757                 }
758
759                 ctrlEntry.setField(JobHandlerControl.FLD_STATUS_CODE,
760                         JobHandlerControl.JOB_STATUS_COMPLETED);
761                 ctrlEntry.update();
762             }
763         }
764     }
765
766     /**
767      * Actually check the job queue and invoke the appropriate server module
768      */

769     private synchronized void checkQueue() throws ServerException {
770         if (log.isDebugEnabled()) {
771             System.out.print(".");
772         }
773
774         try {
775             JobQueue myQueueList = new JobQueue(SecuredDBObject.SYSTEM_ACCOUNT);
776             myQueueList.setDataContext(getDataContext());
777
778             JobQueue oneQueueEntry = null;
779             String JavaDoc crt = getQueueScanCriteria();
780             myQueueList.setCustomWhereClause(crt);
781
782             for (Iterator JavaDoc e = myQueueList.searchAndRetrieveList("Priority|JobNumber").iterator(); e.hasNext();) {
783                 oneQueueEntry = (JobQueue) e.next();
784
785                 if (log.isInfoEnabled()) {
786                     log.info("Processing queue entry " +
787                             oneQueueEntry.getField("JobNumber") + " from user " +
788                             oneQueueEntry.getField(JobQueue.FLD_UID));
789                 }
790
791                 String JavaDoc job = oneQueueEntry.getField("JobNumber");
792
793                 if (log.isInfoEnabled()) {
794                     log.info("Processing Job " + job);
795                 }
796
797                 try {
798                     oneQueueEntry.setField(JobQueue.FLD_JOBNUMBER, job);
799                     oneQueueEntry.setField(JobQueue.FLD_STATUS_CODE,
800                             JobQueue.JOB_STATUS_RUNNING);
801                     oneQueueEntry.setField(JobQueue.FLD_SERVERID,
802                             Long.toString(getID()));
803                 } catch (DBException ex) {
804                     log.error("Error setting Job Queue Entry fields", ex);
805                     throw ex;
806                 }
807
808                 if (!markJob(job)) {
809                     log.warn("Unable to lock job " + job);
810
811                     return;
812                 }
813
814                 String JavaDoc myServerName = null;
815
816                 try {
817                     myServerName = oneQueueEntry.getField("JobCode");
818
819                     if (myServerName == null) {
820                         throw new ServerException("Job name to run was null");
821                     }
822
823                     if (log.isInfoEnabled()) {
824                         log.info("Loading Job " + myServerName);
825                     }
826
827                     final Job myServer = Job.instantiate(myServerName);
828
829                     if (log.isInfoEnabled()) {
830                         log.info("Loaded Job " + myServerName);
831                     }
832
833                     if (myServer.multiThreaded()) {
834                         if (getServerListSize() >= maxJobs) {
835                             if (log.isInfoEnabled()) {
836                                 log.info("There are currently " +
837                                         getServerListSize() +
838                                         " jobs in job list, max jobs is " +
839                                         maxJobs +
840                                         ". Waiting for current jobs to complete");
841                             }
842
843                             waitForAllJobs();
844                         }
845
846                         myServer.setQueue(oneQueueEntry);
847                         serverList.addElement(myServer);
848
849                         // check if have Cron entry set
850
// if so - submit to cron, otherwise - run asis
851
if (oneQueueEntry.useCron()) {
852                             String JavaDoc cronEntry = oneQueueEntry.getCronEntry();
853                             StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(cronEntry,
854                                     ",");
855
856                             int minute = 0;
857                             int hour = 0;
858                             int dayOfMonth = 0;
859                             int month = 0;
860                             int dayOfWeek = 0;
861                             int year = 0;
862                             minute = Integer.valueOf(st.nextToken()).intValue();
863                             hour = Integer.valueOf(st.nextToken()).intValue();
864                             dayOfMonth = Integer.valueOf(st.nextToken())
865                                     .intValue();
866                             month = Integer.valueOf(st.nextToken()).intValue();
867                             dayOfWeek = Integer.valueOf(st.nextToken())
868                                     .intValue();
869                             year = Integer.valueOf(st.nextToken()).intValue();
870
871                             // indicate that cron process will be fired for this job.
872
myServer.setUseCron(true);
873
874                             CrontabEntry cronID = m_cronMgr.addCrontabEntry(minute,
875                                     hour, dayOfMonth, month, dayOfWeek, year, myServer.getClass().getName(),
876                                     new CrontabListenerI() {
877                                         public void handleCrontabEntry(CrontabEntry entry) {
878                                             log.info("Starting Cron process");
879 // myServer.setUseCron(false);
880
myServer.start();
881                                         }
882                                     }, myServer.getJobNumber());
883
884                             myServer.setCronAlarmEntry(cronID);
885
886                             if (log.isInfoEnabled()) {
887                                 log.info("Job '" + myServerName +
888                                         "' submitted to the CRON");
889                             }
890                         } else {
891                             if (log.isInfoEnabled()) {
892                                 log.info("Job '" + myServerName + "' begins");
893                             }
894
895                             myServer.start();
896                         }
897                     } else {
898                         if (getServerListSize() > 0) {
899                             if (log.isInfoEnabled()) {
900                                 log.info("Job class " + myServerName +
901                                         " is single-threaded, waiting for all " +
902                                         "current jobs to complete");
903                             }
904
905                             waitForAllJobs();
906                         }
907
908                         myServer.setQueue(oneQueueEntry);
909                         serverList.addElement(myServer);
910
911                         if (oneQueueEntry.useCron()) {
912                             String JavaDoc cronEntry = oneQueueEntry.getCronEntry();
913                             StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(cronEntry,
914                                     ",");
915                             int minute = Integer.valueOf(st.nextToken())
916                                     .intValue();
917                             int hour = Integer.valueOf(st.nextToken()).intValue();
918                             int dayOfMonth = Integer.valueOf(st.nextToken())
919                                     .intValue();
920                             int month = Integer.valueOf(st.nextToken())
921                                     .intValue();
922                             int dayOfWeek = Integer.valueOf(st.nextToken())
923                                     .intValue();
924                             int year = Integer.valueOf(st.nextToken()).intValue();
925
926                             // indicate that cron process will be fired for this job.
927
myServer.setUseCron(true);
928
929                             // create cron entry
930
CrontabEntry cronID = m_cronMgr.addCrontabEntry(minute,
931                                     hour, dayOfMonth, month, dayOfWeek, year, myServer.getClass().getName(),
932                                     new CrontabListenerI() {
933                                         public void handleCrontabEntry(CrontabEntry entry) {
934                                             log.info("Running Cron process");
935 // myServer.setUseCron(false);
936
myServer.run();
937                                         }
938                                     }, myServer.getJobNumber());
939
940                             myServer.setCronAlarmEntry(cronID);
941
942                             if (log.isInfoEnabled()) {
943                                 log.info("Job '" + myServerName +
944                                         "' submitted to the CRON");
945                             }
946                         } else {
947                             try {
948                                 if (log.isInfoEnabled()) {
949                                     log.info("Running (and waiting for) Job " +
950                                             myServerName);
951                                 }
952
953                                 myServer.run();
954
955                                 if (log.isInfoEnabled()) {
956                                     log.info("Job class " + myServerName +
957                                             " completed");
958                                 }
959                             } catch (Exception JavaDoc exp) {
960                                 log.error("Exception caught while running Job.",
961                                         exp);
962                             }
963                         }
964                     }
965                     /* else not multi-threaded */
966
967                     yield();
968                 } catch (Throwable JavaDoc eo) {
969                     log.error("Error loading job class", eo);
970                     throw new ServerException("Exception loading " +
971                             "job class '" + myServerName +
972                             "'- see detailed message in log", eo);
973                 }
974             }
975             /* for each queue entry */
976
977             if (!serverList.isEmpty()) {
978                 cleanFinishedJobs();
979             }
980
981             removeCronJobForRemovedQueueEntry();
982
983         } catch (DBException de) {
984             log.error(de);
985             throw new ServerException("Database Exception checking" + "queue",
986                     de);
987         }
988     } /* checkQueue() */
989
990     private void cleanFinishedJobs() {
991         /* go thru the server list & see if there are any */
992         /* finished ones */
993         Job oneService = null;
994
995         synchronized (Job.class) {
996             for (Enumeration JavaDoc ee = serverList.elements(); ee.hasMoreElements();) {
997                 oneService = (Job) ee.nextElement();
998
999                 // remove service only when it is finished its execution and
1000
// is not a part of cron process.
1001
if (!(oneService.isAlive() || oneService.useCron())) {
1002                    log.info("Job " + oneService.getClass().getName() +
1003                            " is completed");
1004                    serverList.removeElement(oneService);
1005                }
1006            }
1007        }
1008    }
1009
1010    /**
1011     * Iterate through all crontab entries, checking to make sure they exist in the JobQueue. If it doesn't
1012     * (probably because the JobQueue entry has been deleted), remove the crontab entry
1013     *
1014     * @throws DBException upon database access error.
1015     */

1016    private void removeCronJobForRemovedQueueEntry() throws DBException {
1017        JobQueue jobQueue = new JobQueue();
1018        for (Iterator JavaDoc i = m_cronMgr.getAllEntries().iterator(); i.hasNext();) {
1019            CrontabEntry oneCronEntry = (CrontabEntry) i.next();
1020
1021            String JavaDoc jobNumber = oneCronEntry.getJobNumber();
1022            // the jobNumber will be null if a cron entry was not added through the JobQueue
1023
if (jobNumber != null) {
1024                jobQueue.clear();
1025                jobQueue.setField(JobQueue.FLD_JOBNUMBER, jobNumber);
1026                if (!jobQueue.find()) {
1027                    m_cronMgr.removeCrontabEntry(oneCronEntry);
1028                }
1029            }
1030        }
1031    }
1032
1033    private void doOperation(String JavaDoc oper) throws DBException {
1034        if (log.isInfoEnabled()) {
1035            log.info("Starting operation: " + oper);
1036        }
1037
1038        StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(oper, ", ");
1039        String JavaDoc cmd = st.nextToken();
1040
1041        if (cmd.startsWith(JobHandlerControl.STOP_COMMAND)) {
1042            Job oneService = null;
1043
1044            for (Enumeration JavaDoc ee = serverList.elements(); ee.hasMoreElements();) {
1045                oneService = (Job) ee.nextElement();
1046
1047                String JavaDoc pid = oneService.getJobNumber();
1048
1049                if (oper.indexOf(pid) != -1) {
1050                    oneService.interrupt();
1051
1052                    // remove job from cron queue
1053
if (oneService.useCron()) {
1054                        CrontabEntry cronID = (CrontabEntry) oneService.getCronAlarmEntry();
1055                        m_cronMgr.removeCrontabEntry(cronID);
1056                    }
1057
1058                    // mark as stopped
1059
JobQueue jq = oneService.getJobQueueEntry();
1060                    jq.setJobStatus(JobQueue.JOB_STATUS_STOPPED);
1061                    jq.update();
1062
1063                    // remove job object from server list
1064
serverList.removeElement(oneService);
1065                    log.info("Job was killed by request, job number: " + pid);
1066                }
1067            }
1068        } else if (cmd.startsWith(JobHandlerControl.RESTART_COMMAND)) {
1069            Job oneService = null;
1070
1071            for (Enumeration JavaDoc ee = serverList.elements(); ee.hasMoreElements();) {
1072                oneService = (Job) ee.nextElement();
1073
1074                String JavaDoc pid = oneService.getJobNumber();
1075
1076                if (oper.indexOf(pid) != -1) {
1077                    oneService.interrupt();
1078
1079                    // remove job from cron queue
1080
if (oneService.useCron()) {
1081                        CrontabEntry cronID = (CrontabEntry) oneService.getCronAlarmEntry();
1082                        m_cronMgr.removeCrontabEntry(cronID);
1083                    }
1084
1085                    serverList.removeElement(oneService);
1086
1087                    // mark as stopped
1088
JobQueue jq = oneService.getJobQueueEntry();
1089                    jq.setJobStatus(JobQueue.JOB_STATUS_AVAILABLE);
1090                    jq.update();
1091
1092                    // remove job object from server list
1093
if (log.isInfoEnabled()) {
1094                        log.info("Job was restarted by request, job number: " +
1095                                pid);
1096                    }
1097                }
1098            }
1099        }
1100    }
1101
1102    /**
1103     * Marks an available job as running.
1104     *
1105     * @param jobNumber The name of the job to mark
1106     * @return true if we are succesful
1107     */

1108    private synchronized boolean markJob(String JavaDoc jobNumber) {
1109        DBConnectionPool myPool = null;
1110        DBConnection myConnection = null;
1111
1112        try {
1113            myPool = DBConnectionPool.getInstance(dbName);
1114            myConnection = myPool.getConnection(getClass().getName());
1115
1116            FastStringBuffer fsb = new FastStringBuffer(128);
1117            fsb.append("UPDATE JOBQUEUE SET StatusCode ='");
1118            fsb.append(JobQueue.JOB_STATUS_RUNNING);
1119            fsb.append("', ServerID = ");
1120            fsb.append(getID());
1121            fsb.append(" WHERE JobNumber = ");
1122            fsb.append(jobNumber);
1123            fsb.append(" AND StatusCode = 'A'");
1124            fsb.append(" AND ServerID = 0");
1125
1126            // myConnection.executeUpdate("UPDATE JOBQUEUE SET " +
1127
// "StatusCode = 'R', ServerID = " +
1128
// getID() + " WHERE JobNumber = " +
1129
// jobNumber + " AND StatusCode = 'A'" +
1130
// " AND ServerID = 0");
1131
myConnection.executeUpdate(fsb.toString());
1132
1133            if (myConnection.getUpdateCount() != 1) {
1134                log.warn("Unable to process job " + jobNumber +
1135                        ", job no longer available.");
1136                myPool.release(myConnection);
1137
1138                return false;
1139            }
1140            /* if job wasn't available anymore */
1141        } catch (DBException de) {
1142            log.warn("Unable to lock job", de);
1143
1144            return false;
1145        } finally {
1146            if (myConnection != null) {
1147                myPool.release(myConnection);
1148            }
1149        }
1150
1151        return true;
1152    } /* markJob(String) */
1153
1154    /**
1155     * Writes the latest available status to the database table.
1156     */

1157    private synchronized void updateJobHandlerStatus() {
1158        try {
1159            String JavaDoc timeStamp = getTimeStamp();
1160            String JavaDoc status = getStatus();
1161            String JavaDoc id = String.valueOf(getID());
1162            String JavaDoc freeMem = String.valueOf(SystemInfo.getFreeMem());
1163            String JavaDoc totalMem = String.valueOf(SystemInfo.getTotalMem());
1164            String JavaDoc usedMem = String.valueOf(SystemInfo.getUsedMem());
1165            String JavaDoc powerF = String.valueOf(SystemInfo.getPowerFactor());
1166            String JavaDoc loadAverage = String.valueOf(SystemInfo.getLoadAverage());
1167            String JavaDoc jobNum = String.valueOf(getCurrJobNum());
1168
1169            synchronized (JobHandler.class) {
1170                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_SERVERID,
1171                        id);
1172                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_TIMESTAMP,
1173                        timeStamp);
1174                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_STATUS,
1175                        status);
1176                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_FREEMEM,
1177                        freeMem);
1178                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_TOTALMEM,
1179                        totalMem);
1180                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_USEDMEM,
1181                        usedMem);
1182                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_POWERFACTOR,
1183                        powerF);
1184                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_LOAD_AVERAGE,
1185                        loadAverage);
1186                m_jobHandlerRegistry.setField(JobHandlerRegistry.FLD_CURR_JOBNUM,
1187                        jobNum);
1188                m_jobHandlerRegistry.setCheckZeroUpdate(false);
1189                m_jobHandlerRegistry.update();
1190
1191                if (log.isDebugEnabled()) {
1192                    log.debug("JobHandler : " + SystemInfo.getHostName() + "/" +
1193                            getID() + "/" + getDataContext() + " is alive");
1194                }
1195            }
1196        } catch (DBException dbe) {
1197            log.error("Cannot update Registry for Job Handler, Reason: ", dbe);
1198        }
1199    } /* updateJobHandlerStatus */
1200
1201    /**
1202     * Display the usage message if the user gives a bad option, then bail out.
1203     */

1204    private static void usage() {
1205        System.out.println("Usage: JobHandler arg=value ... ");
1206        System.out.println("Where arguments are: configDir, db, [logDir], [logConfig], and webAppDir");
1207        System.exit(1);
1208    } /* usage() */
1209
1210    /**
1211     * Wait for all current jobs to complete before returning.
1212     *
1213     * @throws InterruptedException if Thread.interrupt() is called while
1214     * waiting.
1215     */

1216    private void waitForAllJobs() throws InterruptedException JavaDoc {
1217        while (true) {
1218            /* go thru the server list & see if there are any finished ones */
1219            int aliveCount = 0;
1220            Job oneService = null;
1221
1222            for (Enumeration JavaDoc ee = serverList.elements(); ee.hasMoreElements();) {
1223                oneService = (Job) ee.nextElement();
1224
1225                if (oneService.isAlive()) {
1226                    aliveCount++;
1227                }
1228            }
1229            /* each server in the list */
1230
1231            if (aliveCount == 0) {
1232                log.info("All jobs complete");
1233
1234                return;
1235            }
1236
1237            if (log.isInfoEnabled()) {
1238                log.info("There are still " + aliveCount +
1239                        " active jobs. Waiting for all jobs to complete");
1240            }
1241
1242            yield();
1243            sleep(interval * 1000);
1244        } /* forever */
1245    } /* waitForAllJobs() */
1246} /* JobHandler */
1247
Popular Tags