KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > knowgate > scheduler > SchedulerDaemon


1 /*
2   Copyright (C) 2003 Know Gate S.L. All rights reserved.
3                       C/Oņa, 107 1š2 28050 Madrid (Spain)
4
5   Redistribution and use in source and binary forms, with or without
6   modification, are permitted provided that the following conditions
7   are met:
8
9   1. Redistributions of source code must retain the above copyright
10      notice, this list of conditions and the following disclaimer.
11
12   2. The end-user documentation included with the redistribution,
13      if any, must include the following acknowledgment:
14      "This product includes software parts from hipergate
15      (http://www.hipergate.org/)."
16      Alternately, this acknowledgment may appear in the software itself,
17      if and wherever such third-party acknowledgments normally appear.
18
19   3. The name hipergate must not be used to endorse or promote products
20      derived from this software without prior written permission.
21      Products derived from this software may not be called hipergate,
22      nor may hipergate appear in their name, without prior written
23      permission.
24
25   This library is distributed in the hope that it will be useful,
26   but WITHOUT ANY WARRANTY; without even the implied warranty of
27   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
28
29   You should have received a copy of hipergate License with this code;
30   if not, visit http://www.hipergate.org or mail to info@hipergate.org
31 */

32
33 package com.knowgate.scheduler;
34
35 import java.io.File JavaDoc;
36 import java.sql.DriverManager JavaDoc;
37 import java.sql.Connection JavaDoc;
38 import java.sql.SQLException JavaDoc;
39 import java.sql.Connection JavaDoc;
40 import java.sql.PreparedStatement JavaDoc;
41 import java.sql.ResultSet JavaDoc;
42 import java.sql.Statement JavaDoc;
43 import java.sql.Types JavaDoc;
44
45 import java.util.Date JavaDoc;
46 import java.util.Properties JavaDoc;
47 import java.util.LinkedList JavaDoc;
48 import java.util.ListIterator JavaDoc;
49
50 import java.io.FileInputStream JavaDoc;
51 import java.io.IOException JavaDoc;
52 import java.io.FileNotFoundException JavaDoc;
53
54 import com.knowgate.debug.DebugFile;
55 import com.knowgate.dataobjs.DBBind;
56 import com.knowgate.jdc.JDCConnection;
57 import com.knowgate.dataobjs.DB;
58
59 /**
60  * <p>Scheduler daemon</p>
61  * <p>Keeps a thread pool and an atom queue for feeding the pool.</p>
62  * @author Sergio Montoro Ten
63  * @version 3.0
64  */

65
66 public class SchedulerDaemon extends Thread JavaDoc {
67
68   private boolean bContinue;
69
70   private String JavaDoc sProfile;
71
72   // Worker threads pool
73
private WorkerThreadPool oThreadPool;
74
75   private DBBind oDbb;
76
77   // This queue will be an in-memory list
78
// of pending atoms (messages) to send
79
private AtomQueue oQue = new AtomQueue();
80
81   // Environment properties (typically readed from hipergate.cnf)
82
private Properties JavaDoc oEnvProps;
83
84   private LinkedList JavaDoc oCallbacks;
85
86   private Date JavaDoc dtCreationDate;
87   private Date JavaDoc dtStartDate;
88   private Date JavaDoc dtStopDate;
89
90   // ---------------------------------------------------------------------------
91

92   private static class SystemOutNotify extends WorkerThreadCallback {
93
94     public SystemOutNotify() {
95       super("SystemOutNotify");
96     }
97
98     public void call (String JavaDoc sThreadId, int iOpCode, String JavaDoc sMessage, Exception JavaDoc oXcpt, Object JavaDoc oParam) {
99
100       if (WorkerThreadCallback.WT_EXCEPTION==iOpCode)
101         System.out.println("Thread " + sThreadId + ": ERROR " + sMessage);
102       else
103         System.out.println("Thread " + sThreadId + ": " + sMessage);
104     }
105   }
106
107   // ---------------------------------------------------------------------------
108

109   /**
110    * <p>Create new SchedulerDaemon</p>
111    * @param sPropertiesFilePath Full path to hipergate.cnf file.<br>
112    * Constructor will read the following properties from hipergate.cnf:<br>
113    * <b>driver</b> JDBC driver class<br>
114    * <b>dburl</b> URL for database connection<br>
115    * <b>dbuser</b> Database User<br>
116    * <b>dbpassword</b> Database User Password<br>
117    * @throws ClassNotFoundException
118    * @throws FileNotFoundException
119    * @throws IOException
120    * @throws SQLException
121    */

122   public SchedulerDaemon (String JavaDoc sPropertiesFilePath)
123     throws ClassNotFoundException JavaDoc, FileNotFoundException JavaDoc, IOException JavaDoc, SQLException JavaDoc {
124
125     dtStartDate = dtStopDate = null;
126
127     dtCreationDate = new Date JavaDoc();
128
129     oThreadPool = null;
130
131     oDbb = null;
132
133     bContinue = true;
134
135     if (DebugFile.trace) {
136       DebugFile.writeln("new FileInputStream("+sPropertiesFilePath+")");
137     }
138
139     FileInputStream JavaDoc oInProps = new FileInputStream JavaDoc (sPropertiesFilePath);
140     oEnvProps = new Properties JavaDoc();
141     oEnvProps.load (oInProps);
142     oInProps.close ();
143
144     oCallbacks = new LinkedList JavaDoc();
145
146     sProfile = sPropertiesFilePath.substring(sPropertiesFilePath.lastIndexOf(File.separator)+1,sPropertiesFilePath.lastIndexOf('.'));
147
148   } // SchedulerDaemon
149

150   // ---------------------------------------------------------------------------
151

152   /**
153    * Get date when this SchedulerDaemon was created
154    * @return Date
155    */

156   public Date JavaDoc creationDate() {
157     return dtCreationDate;
158   }
159
160   // ---------------------------------------------------------------------------
161

162   /**
163    * Get date when this SchedulerDaemon was started for the last time
164    * @return Date
165    */

166   public Date JavaDoc startDate() {
167     return dtStartDate;
168   }
169
170   // ---------------------------------------------------------------------------
171

172   /**
173    * Get date when this SchedulerDaemon was stopped for the last time
174    * @return Date
175    */

176   public Date JavaDoc stopDate() {
177     return dtStopDate;
178   }
179
180   // ---------------------------------------------------------------------------
181

182   public AtomQueue atomQueue() {
183     return oQue;
184   }
185
186   // ---------------------------------------------------------------------------
187

188   public WorkerThreadPool threadPool() {
189     return oThreadPool;
190   }
191
192   // ---------------------------------------------------------------------------
193

194   /**
195    * <p>Create AtomQueue and start WorkerThreadPool</p>
196    */

197
198   public void run() {
199     Statement JavaDoc oStmt;
200     ResultSet JavaDoc oRSet;
201     int iJobCount;
202     String JavaDoc sSQL;
203     AtomConsumer oCsr = null;
204     JDCConnection oCon = null;
205
206     if (DebugFile.trace) DebugFile.writeln("Begin SchedulerDaemon.run()");
207
208     try {
209
210     if (null==oDbb) oDbb = new DBBind(sProfile);
211
212     oCon = oDbb.getConnection("SchedulerDaemon");
213
214     if (DebugFile.trace) DebugFile.writeln("JDCConnection.setAutoCommit(true)");
215
216     oCon.setAutoCommit(true);
217
218     // Create Atom queue.
219
if (DebugFile.trace) DebugFile.writeln("new AtomQueue()");
220
221     oQue = new AtomQueue();
222
223     // This object feeds the queue with new atoms
224
// extracted from the database.
225
if (DebugFile.trace) DebugFile.writeln("new AtomFeeder()");
226
227     AtomFeeder oFdr = new AtomFeeder();
228
229     // This is the queue consumer object
230
// it grants that only one atom is
231
// poped from the queue at a time.
232
if (DebugFile.trace) DebugFile.writeln("new AtomConsumer([JDCconnection], [AtomQueue])");
233
234     oCsr = new AtomConsumer(oCon, oQue);
235
236     // Create WorkerThreadPool
237

238     if (DebugFile.trace) DebugFile.writeln("new WorkerThreadPool([AtomConsumer], [Properties])");
239
240     oThreadPool = new WorkerThreadPool(oCsr, oEnvProps);
241
242     // Register callbacks on each worker thread
243

244     ListIterator JavaDoc oIter = oCallbacks.listIterator();
245     while (oIter.hasNext())
246       oThreadPool.registerCallback((WorkerThreadCallback) oIter.next());
247
248     dtStartDate = new Date JavaDoc();
249
250     do {
251       try {
252
253         while(bContinue) {
254           // Count how many atoms are pending of processing at the database
255
oStmt = oCon.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
256
257           try { oStmt.setQueryTimeout(20); } catch (SQLException JavaDoc sqle) { }
258
259           // ***************************************************
260
// Finish all the jobs that have no more pending atoms
261
sSQL = "SELECT j.gu_job FROM k_jobs j WHERE ("+
262                  "j.id_status="+String.valueOf(Job.STATUS_PENDING)+" OR "+
263                  "j.id_status="+String.valueOf(Job.STATUS_RUNNING)+") AND "+
264                  "NOT EXISTS (SELECT a.pg_atom FROM k_job_atoms a WHERE "+
265                  "j.gu_job=a.gu_job AND a.id_status IN ("+
266                  String.valueOf(Atom.STATUS_PENDING)+","+
267                  String.valueOf(Atom.STATUS_RUNNING)+","+
268                  String.valueOf(Atom.STATUS_SUSPENDED)+"))";
269
270           if (DebugFile.trace) DebugFile.writeln("Statement.executeQuery("+sSQL+")");
271
272           oRSet = oStmt.executeQuery(sSQL);
273           LinkedList JavaDoc oFinished = new LinkedList JavaDoc();
274           while (oRSet.next()) {
275             oFinished.add(oRSet.getString(1));
276           } // wend
277
oRSet.close();
278
279           if (DebugFile.trace) DebugFile.writeln("Already finished jobs "+String.valueOf(oFinished.size()));
280
281           if (oFinished.size()>0) {
282             sSQL = "UPDATE k_jobs SET id_status="+String.valueOf(Job.STATUS_FINISHED)+",dt_finished="+DBBind.Functions.GETDATE+" WHERE gu_job=?";
283             if (DebugFile.trace) DebugFile.writeln("Connection.prepareStatement("+sSQL+")");
284             PreparedStatement JavaDoc oUpdt = oCon.prepareStatement(sSQL);
285             oIter = oFinished.listIterator();
286             while (oIter.hasNext()) {
287               oUpdt.setObject(1, oIter.next(), java.sql.Types.CHAR);
288               oUpdt.executeUpdate();
289             } // wend
290
oUpdt.close();
291           } // fi
292

293           // ****************************************
294
// Count jobs pending of begining execution
295

296           if (DebugFile.trace) DebugFile.writeln("Statement.executeQuery(SELECT COUNT(*) FROM k_jobs WHERE id_status=" + String.valueOf(Job.STATUS_PENDING) + ")");
297
298           oRSet = oStmt.executeQuery("SELECT COUNT(*) FROM k_jobs WHERE id_status=" + String.valueOf(Job.STATUS_PENDING));
299           oRSet.next();
300           iJobCount = oRSet.getInt(1);
301           oRSet.close();
302           oStmt.close();
303
304           if (DebugFile.trace) DebugFile.writeln(String.valueOf(iJobCount) + " pending jobs");
305
306           if (0==iJobCount)
307             sleep (10000);
308           else
309             break;
310         } // wend
311

312         if (bContinue) {
313           oFdr.loadAtoms(oCon, oThreadPool.size());
314
315           oFdr.feedQueue(oCon, oQue);
316
317           if (oQue.size()>0)
318             oThreadPool.launchAll();
319
320           do {
321
322             sleep(10000);
323
324             if (DebugFile.trace) DebugFile.writeln(String.valueOf(oThreadPool.livethreads()) + " live threads");
325
326           } while(oThreadPool.livethreads()==oThreadPool.size());
327         } // fi (bContinue)
328
}
329       catch (InterruptedException JavaDoc e) {
330         if (DebugFile.trace)
331           DebugFile.writeln("SchedulerDaemon InterruptedException " + e.getMessage());
332       }
333     } while (bContinue) ;
334
335     if (DebugFile.trace) DebugFile.writeln(" exiting SchedulerDaemon");
336
337     oThreadPool.haltAll();
338     oThreadPool = null;
339
340     oCsr.close();
341     oCsr = null;
342
343     oFdr = null;
344     oQue = null;
345
346     if (DebugFile.trace) DebugFile.writeln("JDConnection.close()");
347
348     oCon.close("SchedulerDaemon");
349     oCon = null;
350
351     oDbb.close();
352     oDbb=null;
353     }
354     catch (SQLException JavaDoc e) {
355       try { oThreadPool.haltAll(); oThreadPool=null; } catch (Exception JavaDoc ignore) {}
356       try { oCsr.close(); oCsr=null; } catch (Exception JavaDoc ignore) {}
357       try {
358         if (oCon!=null) if (!oCon.isClosed()) oCon.close("SchedulerDaemon");
359       } catch (SQLException JavaDoc sqle) {
360         if (DebugFile.trace) DebugFile.writeln("SchedulerDaemon SQLException on close() " + sqle.getMessage());
361       }
362       if (null!=oDbb) { try { oDbb.close(); } catch (Exception JavaDoc ignore) {} }
363       oCon = null;
364
365       dtStartDate = null;
366       dtStopDate = new Date JavaDoc();
367
368       if (DebugFile.trace)
369         DebugFile.writeln("SchedulerDaemon SQLException " + e.getMessage());
370         DebugFile.writeln("SchedulerDaemon.run() abnormal termination");
371     }
372     if (DebugFile.trace) DebugFile.writeln("End SchedulerDaemon.run()");
373   } // run
374

375   // ---------------------------------------------------------------------------
376

377   public void registerCallback(WorkerThreadCallback oNewCallback)
378     throws IllegalArgumentException JavaDoc {
379
380     if (oThreadPool==null)
381       oCallbacks.addLast(oNewCallback);
382     else
383       oThreadPool.registerCallback(oNewCallback);
384   }
385
386   // ---------------------------------------------------------------------------
387

388   public void unregisterCallback(String JavaDoc sCallbackName) {
389     if (oThreadPool!=null)
390       oThreadPool.unregisterCallback(sCallbackName);
391   }
392
393   // ---------------------------------------------------------------------------
394

395   private static void interruptJobs(JDCConnection oCon, Object JavaDoc[] aJobs) throws SQLException JavaDoc {
396     int nJobs;
397     if (null==aJobs) nJobs=0; else nJobs = aJobs.length;
398     if (nJobs>0) {
399       PreparedStatement JavaDoc oStmt = oCon.prepareStatement("UPDATE " + DB.k_jobs + " SET " + DB.id_status + "=" + String.valueOf(Job.STATUS_INTERRUPTED) + " WHERE " + DB.gu_job + "=?");
400       for (int j=0; j<nJobs; j++) {
401         if (null!=aJobs[j]) {
402           oStmt.setObject(1, aJobs[j], Types.CHAR);
403           oStmt.executeUpdate();
404         }
405       }
406       oStmt.close();
407     }
408   }
409
410   // ---------------------------------------------------------------------------
411

412   private static void suspendJobs(JDCConnection oCon, Object JavaDoc[] aJobs) throws SQLException JavaDoc {
413     int nJobs;
414     if (null==aJobs) nJobs=0; else nJobs = aJobs.length;
415     if (nJobs>0) {
416       PreparedStatement JavaDoc oStmt = oCon.prepareStatement("UPDATE " + DB.k_jobs + " SET " + DB.id_status + "=" + String.valueOf(Job.STATUS_SUSPENDED) + " WHERE " + DB.gu_job + "=?");
417       for (int j=0; j<nJobs; j++) {
418         if (null!=aJobs[j]) {
419           oStmt.setObject(1, aJobs[j], Types.CHAR);
420           oStmt.executeUpdate();
421         }
422       }
423       oStmt.close();
424     }
425   }
426
427   // ---------------------------------------------------------------------------
428

429   /**
430    * <p>Halt worker threads and set running jobs status to suspended</p>
431    * Wait until all running atoms are finished and then stop all worker threads
432    * @throws IllegalStateException If worker threads are not running
433    */

434   public void haltAll() throws IllegalStateException JavaDoc {
435     if (null==oThreadPool)
436       throw new IllegalStateException JavaDoc("SchedulerDaemon.haltAll() Thread pool not initialized, call start() method before trying to halt worker threads");
437     String JavaDoc[] aInitRunningJobs = oThreadPool.runningJobs();
438     oThreadPool.haltAll();
439     String JavaDoc[] aStillRunningJobs = oThreadPool.runningJobs();
440     if (null!=oDbb) {
441       try {
442         JDCConnection oCon = oDbb.getConnection("SchedulerDaemonHaltAll");
443         if (null!=aInitRunningJobs) {
444           if (null!=aStillRunningJobs) {
445             int nInitRunningJobs = aInitRunningJobs.length;
446             int nStillRunningJobs= aStillRunningJobs.length;
447             for (int i=0; i<nInitRunningJobs; i++) {
448               boolean bStillRunning = false;
449               for (int j=0; j<nStillRunningJobs && !bStillRunning; j++) {
450                 bStillRunning = aStillRunningJobs[j].equals(aInitRunningJobs[i]);
451               } // next
452
if (bStillRunning) aInitRunningJobs[i]=null;
453             } // next
454
} // fi
455
suspendJobs(oCon, aInitRunningJobs);
456         } // fi
457
oCon.close("SchedulerDaemonHaltAll");
458       } catch (SQLException JavaDoc sqle) {
459         throw new IllegalStateException JavaDoc("SchedulerDaemon.haltAll() SQLException "+sqle.getMessage());
460       }
461     }
462   }
463   // ---------------------------------------------------------------------------
464

465   /**
466    * <p>Stop worker threads and set running jobs status to interrupted</p>
467    * Call haltAll() Wait until the specified amount of miliseconds
468    * and force all worker threads still alive to stop.
469    * This method must only be used when stalled worker threads cannot be stopped
470    * by calling haltAll().
471    * @param lDelayMilis long Delay (in miliseconds) to wait before executing
472    * threads are forced to stop
473    * @throws IllegalStateException If worker threads are not running
474    */

475   public synchronized void stopAll(long lDelayMilis)
476     throws IllegalStateException JavaDoc,SQLException JavaDoc {
477
478     if (null==oThreadPool)
479       throw new IllegalStateException JavaDoc("SchedulerDaemon.stopAll() Thread pool not initialized, call start() method before trying to stop worker threads");
480
481     oThreadPool.haltAll();
482
483     try { sleep(lDelayMilis); } catch (InterruptedException JavaDoc ignore) { }
484
485     bContinue = false;
486
487     if (null!=oDbb) {
488       JDCConnection oCon = oDbb.getConnection("SchedulerDaemonStopAll");
489       oThreadPool.stopAll(oCon);
490       interruptJobs(oCon, oThreadPool.runningJobs());
491       oCon.close("SchedulerDaemonStopAll");
492     } else {
493       oThreadPool.stopAll();
494     }
495
496   } // stopAll
497

498   // ---------------------------------------------------------------------------
499

500   /**
501    * <p>Stop worker threads and set running jobs status to interrupted</p>
502    * Default delay for forcing threads to stop is 10 seconds
503    * @throws IllegalStateException If worker threads are not running
504    */

505   public void stopAll() throws IllegalStateException JavaDoc,SQLException JavaDoc {
506     stopAll(10000l);
507   }
508
509   // ---------------------------------------------------------------------------
510

511   private static void printUsage() {
512     System.out.println("");
513     System.out.println("Usage:");
514     System.out.println("SchedulerDaemon cnf_file_path [verbose]");
515   }
516
517   public static void main(String JavaDoc[] argv)
518     throws ClassNotFoundException JavaDoc, SQLException JavaDoc, IOException JavaDoc {
519
520     DBBind oGlobalDBBind = new DBBind();
521     SchedulerDaemon TheDaemon;
522
523     if (argv.length<1 || argv.length>2)
524       printUsage();
525
526     else if (argv.length==2 && !argv[1].equals("verbose"))
527       printUsage();
528
529     else {
530
531       TheDaemon = new SchedulerDaemon(argv[0]);
532
533       if (argv.length==2)
534         TheDaemon.registerCallback(new SystemOutNotify());
535
536       TheDaemon.start();
537     }
538   }
539 } // SchedulerDaemon
540
Popular Tags