KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > knowgate > scheduler > SingleThreadExecutor


1 package com.knowgate.scheduler;
2
3 import java.sql.DriverManager JavaDoc;
4 import java.sql.SQLException JavaDoc ;
5 import java.sql.Statement JavaDoc;
6 import java.sql.ResultSet JavaDoc;
7 import java.sql.ResultSetMetaData JavaDoc;
8
9 import java.io.FileInputStream JavaDoc;
10 import java.io.IOException JavaDoc;
11 import java.io.FileNotFoundException JavaDoc;
12
13 import java.util.Properties JavaDoc;
14 import java.util.LinkedList JavaDoc;
15 import java.util.ListIterator JavaDoc;
16
17 import javax.mail.MessagingException JavaDoc;
18
19 import com.knowgate.jdc.JDCConnection;
20
21 import com.knowgate.dataobjs.DB;
22 import com.knowgate.dataobjs.DBBind;
23 import com.knowgate.dataobjs.DBSubset;
24
25 import com.knowgate.misc.Gadgets;
26
27 /**
28  * <p>Single Thread Scheduler Executor</p>
29  * <p>SingleThreadExecutor is a class that processes jobs and atoms in a simple way,
30  * unlike SchedulerDaemon witch is based on an AtomQueue and a WorkerThreadPool,
31  * SingleThreadExecutor uses directly the database for tracking execution progress
32  * for a single thread.</p>
33  * @author Sergio Montoro Ten
34  * @version 1.0
35  */

36
37 public class SingleThreadExecutor extends Thread JavaDoc {
38
39   private String JavaDoc sEnvProps;
40
41   private Properties JavaDoc oEnvProps;
42
43   private boolean bContinue;
44
45   private String JavaDoc sLastError;
46
47   private String JavaDoc sJob;
48
49   private Job oJob;
50
51   private Atom oAtm;
52
53   private LinkedList JavaDoc oCallbacks;
54
55   private int iCallbacks;
56
57   // ---------------------------------------------------------------------------
58

59   private static class SystemOutNotify extends WorkerThreadCallback {
60
61     public SystemOutNotify() {
62       super("SystemOutNotify");
63     }
64
65     public void call (String JavaDoc sThreadId, int iOpCode, String JavaDoc sMessage, Exception JavaDoc oXcpt, Object JavaDoc oParam) {
66
67       if (WorkerThreadCallback.WT_EXCEPTION==iOpCode)
68         System.out.println("Thread " + sThreadId + ": ERROR " + sMessage);
69       else
70         System.out.println("Thread " + sThreadId + ": " + sMessage);
71     }
72   }
73
74   // ---------------------------------------------------------------------------
75

76   /**
77    * <p>Create new SingleThreadExecutor</p>
78    * @param sPropertiesFilePath Absolute path to hipergate.cnf properties file
79    * @throws FileNotFoundException
80    * @throws IOException
81    */

82   public SingleThreadExecutor (String JavaDoc sPropertiesFilePath)
83     throws FileNotFoundException JavaDoc, IOException JavaDoc {
84
85     sJob = null;
86
87     bContinue = true;
88
89     if (sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))==-1)
90       sEnvProps = sPropertiesFilePath;
91     else
92       sEnvProps = sPropertiesFilePath.substring(sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))+1);
93
94     FileInputStream JavaDoc oInProps = new FileInputStream JavaDoc (sPropertiesFilePath);
95     oEnvProps = new Properties JavaDoc();
96     oEnvProps.load (oInProps);
97     oInProps.close ();
98
99     oCallbacks = new LinkedList JavaDoc();
100   }
101
102   /**
103    * <p>Create new SingleThreadExecutor for a single Job</p>
104    * @param sPropertiesFilePath Absolute path to hipergate.cnf properties file
105    * @throws FileNotFoundException
106    * @throws IOException
107    */

108   public SingleThreadExecutor (String JavaDoc sPropertiesFilePath, String JavaDoc sJobId)
109     throws FileNotFoundException JavaDoc, IOException JavaDoc {
110
111     sJob = sJobId;
112
113     bContinue = true;
114
115     if (sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))==-1)
116       sEnvProps = sPropertiesFilePath;
117     else
118       sEnvProps = sPropertiesFilePath.substring(sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))+1);
119
120     FileInputStream JavaDoc oInProps = new FileInputStream JavaDoc (sPropertiesFilePath);
121     oEnvProps = new Properties JavaDoc();
122     oEnvProps.load (oInProps);
123     oInProps.close ();
124
125     oCallbacks = new LinkedList JavaDoc();
126   }
127
128   // ---------------------------------------------------------------------------
129

130   public Atom activeAtom() {
131     return oAtm;
132   }
133
134   // ---------------------------------------------------------------------------
135

136   public Job activeJob() {
137     return oJob;
138   }
139
140   // ---------------------------------------------------------------------------
141

142   public String JavaDoc lastError() {
143     return sLastError;
144   }
145
146   // ---------------------------------------------------------------------------
147

148   /**
149    * Register a thread callback object
150    * @param oNewCallback WorkerThreadCallback subclass instance
151    * @throws IllegalArgumentException If a callback with same name has oNewCallback was already registered
152    */

153   public void registerCallback(WorkerThreadCallback oNewCallback)
154     throws IllegalArgumentException JavaDoc {
155
156     WorkerThreadCallback oCallback;
157     ListIterator JavaDoc oIter = oCallbacks.listIterator();
158
159     while (oIter.hasNext()) {
160       oCallback = (WorkerThreadCallback) oIter.next();
161
162       if (oCallback.name().equals(oNewCallback.name())) {
163         throw new IllegalArgumentException JavaDoc("Callback " + oNewCallback.name() + " is already registered");
164       } // fi
165
} // wend
166

167     oCallbacks.addLast(oNewCallback);
168     iCallbacks++;
169   } // registerCallback
170

171   // ---------------------------------------------------------------------------
172

173   /**
174    * Unregister a thread callback object
175    * @param sCallbackName Name of callback to be unregistered
176    * @return <b>true</b> if a callback with such name was found and unregistered,
177    * <b>false</b> otherwise
178    */

179   public boolean unregisterCallback(String JavaDoc sCallbackName) {
180     WorkerThreadCallback oCallback;
181     ListIterator JavaDoc oIter = oCallbacks.listIterator();
182
183     while (oIter.hasNext()) {
184       oCallback = (WorkerThreadCallback) oIter.next();
185
186       if (oCallback.name().equals(sCallbackName)) {
187         oIter.remove();
188         iCallbacks--;
189         return true;
190       } // fi
191
} // wend
192

193     return false;
194   } // unregisterCallback
195

196   // ---------------------------------------------------------------------------
197

198   private void callBack (int iOpCode, String JavaDoc sMessage, Exception JavaDoc oXcpt, Object JavaDoc oParam) {
199
200     WorkerThreadCallback oCallback;
201     ListIterator JavaDoc oIter = oCallbacks.listIterator();
202
203     while (oIter.hasNext()) {
204       oCallback = (WorkerThreadCallback) oIter.next();
205       oCallback.call(getName(), iOpCode, sMessage, oXcpt, oParam);
206     } // wend
207

208   } // callBack
209

210   // ---------------------------------------------------------------------------
211

212   public void run() {
213     Statement JavaDoc oStm;
214     JDCConnection oCon;
215     AtomFeeder oFdr;
216     DBSubset oDBS;
217     String JavaDoc sSQL;
218     String JavaDoc sJId;
219     ResultSet JavaDoc oRst;
220     ResultSetMetaData JavaDoc oMDt;
221
222     DBBind oDBB = null;
223
224     try {
225       oDBB = new DBBind(sEnvProps);
226
227       oCon = new JDCConnection (DriverManager.getConnection (oEnvProps.getProperty("dburl"),
228                                 oEnvProps.getProperty ("dbuser"),
229                                 oEnvProps.getProperty ("dbpassword")), null);
230       bContinue = true;
231
232       sLastError = "";
233
234       while (bContinue) {
235
236         oFdr = new AtomFeeder();
237
238         if (sJob==null)
239           oDBS = oFdr.loadAtoms(oCon,1);
240         else
241           oDBS = oFdr.loadAtoms(oCon, sJob);
242
243         if (oDBS.getRowCount()>0) {
244
245           sJId = oDBS.getString(0,0);
246
247           oJob = Job.instantiate(oCon, sJId, oEnvProps);
248
249           oStm = oCon.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
250
251           sSQL = "SELECT a.*, j." + DB.tx_parameters + " FROM " + DB.k_job_atoms + " a, " + DB.k_jobs + " j WHERE a." + DB.id_status + "=" + String.valueOf(Atom.STATUS_PENDING) + " AND j." + DB.gu_job + "=a." + DB.gu_job + " AND j." + DB.gu_job + "='" + sJId + "'";
252
253           oRst = oStm.executeQuery(sSQL);
254           oMDt = oRst.getMetaData();
255
256           while (oRst.next()) {
257
258             oAtm = new Atom(oRst, oMDt);
259
260             oJob.process(oAtm);
261
262           } // wend
263
oRst.close();
264           oStm.close();
265         }
266         else
267           bContinue = false;
268       } // wend
269

270       oCon.close();
271
272       oDBB.close();
273     }
274     catch (MessagingException JavaDoc e) {
275
276       oDBB.close();
277
278       sLastError = "MessagingException " + e.getMessage();
279
280       if (iCallbacks>0) callBack(-1, sLastError, new MessagingException JavaDoc(e.getMessage(), e.getNextException()), null);
281
282       if (oJob!=null) oJob.log(sLastError + "\n");
283
284     }
285     catch (SQLException JavaDoc e) {
286
287       if (null!=oDBB) oDBB.close();
288
289       sLastError = "SQLException " + e.getMessage();
290
291       if (iCallbacks>0) callBack(-1, sLastError, new SQLException JavaDoc(e.getMessage(), e.getSQLState(), e.getErrorCode()), null);
292
293       if (oJob!=null) oJob.log(sLastError + "\n");
294
295     }
296     catch (FileNotFoundException JavaDoc e) {
297       if (null!=oDBB) oDBB.close();
298
299       sLastError = "FileNotFoundException " + e.getMessage();
300
301       if (iCallbacks>0) callBack(-1, sLastError, new FileNotFoundException JavaDoc(e.getMessage()), null);
302
303       if (oJob!=null) oJob.log(sLastError + "\n");
304     }
305     catch (IOException JavaDoc e) {
306       if (null!=oDBB) oDBB.close();
307
308       sLastError = "IOException " + e.getMessage();
309
310       if (iCallbacks>0) callBack(-1, sLastError, new IOException JavaDoc(e.getMessage()), null);
311
312       if (oJob!=null) oJob.log(sLastError + "\n");
313     }
314     catch (ClassNotFoundException JavaDoc e) {
315
316       if (null!=oDBB) oDBB.close();
317
318       sLastError = "ClassNotFoundException " + e.getMessage();
319
320       if (iCallbacks>0) callBack(-1, sLastError, new ClassNotFoundException JavaDoc(e.getMessage()), null);
321
322       if (oJob!=null) oJob.log(sLastError + "\n");
323     }
324     catch (InstantiationException JavaDoc e) {
325
326       if (null!=oDBB) oDBB.close();
327
328       sLastError = "InstantiationException " + e.getMessage();
329
330       if (iCallbacks>0) callBack(-1, sLastError, new InstantiationException JavaDoc(e.getMessage()), null);
331
332       if (oJob!=null) oJob.log(sLastError + "\n");
333     }
334     catch (IllegalAccessException JavaDoc e) {
335
336       if (null!=oDBB) oDBB.close();
337
338       sLastError = "IllegalAccessException " + e.getMessage();
339
340       if (iCallbacks>0) callBack(-1, sLastError, new IllegalAccessException JavaDoc(e.getMessage()), null);
341
342       if (oJob!=null) oJob.log(sLastError + "\n");
343     }
344     catch (NullPointerException JavaDoc e) {
345
346       if (null!=oDBB) oDBB.close();
347
348       sLastError = "NullPointerException " + e.getMessage();
349
350       if (iCallbacks>0) callBack(-1, sLastError, new NullPointerException JavaDoc(e.getMessage()), null);
351
352       if (oJob!=null) oJob.log(sLastError + "\n");
353     }
354   } // run
355

356   // ---------------------------------------------------------------------------
357

358   /**
359    * <p>Halt thread execution commiting all operations in course before stopping</p>
360    * If a thread is dead-locked by any reason halting it will not cause any effect.<br>
361    * halt() method only sends a signals to the each WokerThread telling it that must
362    * finish pending operations and stop.
363    */

364   public void halt() {
365     bContinue = false;
366   }
367
368   // ***************************************************************************
369
// Static Methods
370

371    private static void printUsage() {
372      System.out.println("");
373      System.out.println("Usage:");
374      System.out.println("SingleThreadExecutor {run | lrun} job_type cnf_file_path {gu_job | xml_file_path} [verbose]");
375      System.out.println("job_type is one of {MAIL | FAX | SAVE | FTP}");
376    }
377
378   public static void main(String JavaDoc[] argv)
379     throws java.io.FileNotFoundException JavaDoc, java.io.IOException JavaDoc, SQLException JavaDoc,
380     ClassNotFoundException JavaDoc, IllegalAccessException JavaDoc, InstantiationException JavaDoc,
381     org.xml.sax.SAXException JavaDoc {
382
383     SingleThreadExecutor oExec;
384
385     if (argv.length!=4 && argv.length!=5)
386       printUsage();
387
388     else if (argv.length==5 && !argv[4].equals("verbose"))
389       printUsage();
390
391     else if (!argv[0].equals("run") && !argv[0].equals("lrun"))
392       printUsage();
393
394     else if (!argv[1].equalsIgnoreCase("MAIL") && !argv[1].equalsIgnoreCase("FAX") &&
395              !argv[1].equalsIgnoreCase("SAVE") && !argv[1].equalsIgnoreCase("FTP") )
396       printUsage();
397
398     else {
399
400       if (argv[0].equals("run"))
401         oExec = new SingleThreadExecutor(argv[2], argv[3]);
402
403       else {
404         String JavaDoc sJobGUID = Gadgets.generateUUID();
405
406         Job.main(new String JavaDoc[]{"create", argv[1], argv[2], argv[3], sJobGUID });
407
408         oExec = new SingleThreadExecutor(argv[2], sJobGUID);
409       }
410
411       if (argv.length==5)
412         oExec.registerCallback(new SystemOutNotify());
413
414       oExec.start();
415     } // fi
416
} // main
417

418 } // SingleThreadExecutor
419
Popular Tags