KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > knowgate > scheduler > WorkerThread


1 /*
2   Copyright (C) 2003 Know Gate S.L. All rights reserved.
3                       C/Oņa, 107 1š2 28050 Madrid (Spain)
4
5   Redistribution and use in source and binary forms, with or without
6   modification, are permitted provided that the following conditions
7   are met:
8
9   1. Redistributions of source code must retain the above copyright
10      notice, this list of conditions and the following disclaimer.
11
12   2. The end-user documentation included with the redistribution,
13      if any, must include the following acknowledgment:
14      "This product includes software parts from hipergate
15      (http://www.hipergate.org/)."
16      Alternately, this acknowledgment may appear in the software itself,
17      if and wherever such third-party acknowledgments normally appear.
18
19   3. The name hipergate must not be used to endorse or promote products
20      derived from this software without prior written permission.
21      Products derived from this software may not be called hipergate,
22      nor may hipergate appear in their name, without prior written
23      permission.
24
25   This library is distributed in the hope that it will be useful,
26   but WITHOUT ANY WARRANTY; without even the implied warranty of
27   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
28
29   You should have received a copy of hipergate License with this code;
30   if not, visit http://www.hipergate.org or mail to info@hipergate.org
31 */

32
33 package com.knowgate.scheduler;
34
35
36 import java.lang.Thread JavaDoc;
37 import java.util.Date JavaDoc;
38 import java.util.Properties JavaDoc;
39 import java.util.LinkedList JavaDoc;
40 import java.util.ListIterator JavaDoc;
41
42 import java.sql.SQLException JavaDoc;
43 import java.sql.Connection JavaDoc;
44 import java.io.IOException JavaDoc;
45 import java.io.FileNotFoundException JavaDoc;
46 import java.io.File JavaDoc;
47 import java.io.FileInputStream JavaDoc;
48 import java.io.FileOutputStream JavaDoc;
49
50 import javax.mail.MessagingException JavaDoc;
51
52 import com.knowgate.jdc.JDCConnection;
53 import com.knowgate.dataobjs.DB;
54 import com.knowgate.dataxslt.*;
55 import com.knowgate.dataxslt.db.PageSetDB;
56 import com.knowgate.debug.DebugFile;
57 import com.knowgate.scheduler.*;
58 import com.knowgate.crm.DistributionList;
59
60 /**
61  * <p>Scheduled Job Worker Thread</p>
62  * @author Sergio Montoro Ten
63  * @version 1.0
64  */

65
66 public class WorkerThread extends Thread JavaDoc {
67
68   private String JavaDoc sLastError;
69   private Job oJob; // Current Job
70
private Atom oAtm; // Atom being processed
71
private long lRunningTime;
72   private int delay = 1; // Thread sleeps n miliseconds on each loop
73
private AtomConsumer oConsumer;
74   private WorkerThreadPool oPool;
75   private LinkedList JavaDoc oCallbacks;
76   private int iCallbacks;
77   private boolean bContinue;
78
79   // ----------------------------------------------------------
80

81   /**
82    * Create WorkerThread
83    * @param oThreadPool
84    * @param oAtomConsumer
85    */

86
87   public WorkerThread(WorkerThreadPool oThreadPool, AtomConsumer oAtomConsumer) {
88     oConsumer = oAtomConsumer;
89     oPool = oThreadPool;
90     oCallbacks = new LinkedList JavaDoc();
91     iCallbacks = 0;
92     oJob = null;
93     sLastError = "";
94     lRunningTime = 0;
95   }
96
97   // ----------------------------------------------------------
98

99   public int getDelayMS() {
100     return delay;
101   }
102
103   // ----------------------------------------------------------
104

105   public void getDelayMS(int iMiliseconds) {
106     delay=iMiliseconds;
107   }
108
109   // ----------------------------------------------------------
110

111   public long getRunningTimeMS() {
112     return lRunningTime;
113   }
114
115   // ----------------------------------------------------------
116

117   public void setConsumer (AtomConsumer oAtomConsumer) {
118     oConsumer = oAtomConsumer;
119   }
120
121   // ----------------------------------------------------------
122

123   /**
124    * Get Environment property from hipergate.cnf
125    * @param sKey Property Name
126    * @return Property Value or <b>null</b> if not found
127    */

128   public String JavaDoc getProperty(String JavaDoc sKey) {
129     return oPool.getProperty(sKey);
130   }
131
132   // ---------------------------------------------------------------------------
133

134   public Atom activeAtom() {
135     return oAtm;
136   }
137
138   // ---------------------------------------------------------------------------
139

140   public Job activeJob() {
141     return oJob;
142   }
143
144   // ---------------------------------------------------------------------------
145

146   public String JavaDoc lastError() {
147     return sLastError;
148   }
149
150   // ---------------------------------------------------------------------------
151

152   /**
153    * Register a thread callback object
154    * @param oNewCallback WorkerThreadCallback subclass instance
155    * @throws IllegalArgumentException If a callback with same name has oNewCallback was already registered
156    */

157   public void registerCallback(WorkerThreadCallback oNewCallback)
158     throws IllegalArgumentException JavaDoc {
159
160     WorkerThreadCallback oCallback;
161     ListIterator JavaDoc oIter = oCallbacks.listIterator();
162
163     while (oIter.hasNext()) {
164       oCallback = (WorkerThreadCallback) oIter.next();
165
166       if (oCallback.name().equals(oNewCallback.name())) {
167         throw new IllegalArgumentException JavaDoc("Callback " + oNewCallback.name() + " is already registered");
168       } // fi
169
} // wend
170

171     oCallbacks.addLast(oNewCallback);
172     iCallbacks++;
173   } // registerCallback
174

175   // ---------------------------------------------------------------------------
176

177   /**
178    * Unregister a thread callback object
179    * @param sCallbackName Name of callback to be unregistered
180    * @return <b>true</b> if a callback with such name was found and unregistered,
181    * <b>false</b> otherwise
182    */

183   public boolean unregisterCallback(String JavaDoc sCallbackName) {
184     WorkerThreadCallback oCallback;
185     ListIterator JavaDoc oIter = oCallbacks.listIterator();
186
187     while (oIter.hasNext()) {
188       oCallback = (WorkerThreadCallback) oIter.next();
189
190       if (oCallback.name().equals(sCallbackName)) {
191         oIter.remove();
192         iCallbacks--;
193         return true;
194       } // fi
195
} // wend
196

197     return false;
198   } // unregisterCallback
199

200   // ---------------------------------------------------------------------------
201

202   private void callBack(int iOpCode, String JavaDoc sMessage, Exception JavaDoc oXcpt, Object JavaDoc oParam) {
203     WorkerThreadCallback oCallback;
204     ListIterator JavaDoc oIter = oCallbacks.listIterator();
205
206     while (oIter.hasNext()) {
207       oCallback = (WorkerThreadCallback) oIter.next();
208       oCallback.call(getName(), iOpCode, sMessage, oXcpt, oParam);
209     } // wend
210

211   }
212
213   // ---------------------------------------------------------------------------
214

215   /**
216    * <p>Process atoms obtained throught AtomConsumer</p>
217    * Each worker WorkerThread will enter an endless loop until the queue is empty
218    * or an interrupt signal is received.<br>
219    * If an exception is thrown while creating of processing atoms the workerthread
220    * will be aborted.
221    */

222   public void run() {
223     String JavaDoc sJob = ""; // Current Job Unique Id.
224
JDCConnection oConsumerConnection = null;
225
226     if (DebugFile.trace) {
227        DebugFile.writeln("Begin WorkerThread.run()");
228        DebugFile.incIdent();
229        DebugFile.writeln("thread=" + getName());
230      }
231
232     bContinue = true;
233
234     sLastError = "";
235
236     while (bContinue) {
237
238       try {
239         if (delay>0) sleep(delay);
240
241         long lStartRun = new Date JavaDoc().getTime();
242
243         if (DebugFile.trace) DebugFile.writeln(getName() + " getting next atom...");
244
245         oAtm = oConsumer.next();
246
247         if (oAtm==null) {
248           // No more atoms to consume
249
if (DebugFile.trace) DebugFile.writeln(getName() + " no more atoms.");
250
251           if (iCallbacks>0) callBack (WorkerThreadCallback.WT_ATOMCONSUMER_NOMORE, "Thread " + getName() + " no more Atoms", null, oConsumer);
252
253           break;
254         }
255
256         if (iCallbacks>0) callBack (WorkerThreadCallback.WT_ATOM_GET, "Thread " + getName() + " got Atom " + String.valueOf(oAtm.getInt(DB.pg_atom)), null, oAtm);
257
258         oConsumerConnection = oConsumer.getConnection();
259
260         if (DebugFile.trace) DebugFile.writeln(getName() + " AtomConsumer.getConnection() : " + (oConsumerConnection!=null ? "[Conenction]" : "null"));
261
262         // ***********************************
263
// Instantiate the proper Job subclass
264

265         if (!sJob.equals(oAtm.getString(DB.gu_job))) {
266
267           // The Job is only re-loaded if it is different from the previous job at this thread
268
// this is a Job instance reuse policy for better performance.
269

270           sJob = oAtm.getString(DB.gu_job);
271
272           try {
273             // Dynamically instantiate the job subclass specified at k_lu_job_commands table
274
oJob = Job.instantiate(oConsumerConnection, sJob, oPool.getProperties());
275
276             if (iCallbacks>0) callBack(WorkerThreadCallback.WT_JOB_INSTANTIATE, "instantiate job " + sJob + " command " + oJob.getString(DB.id_command), null, oJob);
277           }
278           catch (ClassNotFoundException JavaDoc e) {
279             sJob = "";
280             oJob = null;
281             sLastError = "Job.instantiate(" + sJob + ") ClassNotFoundException " + e.getMessage();
282
283             if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError);
284
285             if (iCallbacks>0) callBack(-1, sLastError, e, null);
286
287             bContinue = false;
288           }
289           catch (IllegalAccessException JavaDoc e) {
290             sJob = "";
291             oJob = null;
292             sLastError = "Job.instantiate(" + sJob + ") IllegalAccessException " + e.getMessage();
293
294             if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError);
295
296             if (iCallbacks>0) callBack(-1, sLastError, e, null);
297
298             bContinue = false;
299           }
300           catch (InstantiationException JavaDoc e) {
301             sJob = "";
302             oJob = null;
303             sLastError = "Job.instantiate(" + sJob + ") InstantiationException " + e.getMessage();
304
305             if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError);
306
307             if (iCallbacks>0) callBack(-1, sLastError, e, null);
308
309             bContinue = false;
310           }
311           catch (SQLException JavaDoc e) {
312             sJob = "";
313             oJob = null;
314             sLastError = " Job.instantiate(" + sJob + ") SQLException " + e.getMessage();
315
316             if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError);
317
318             if (iCallbacks>0) callBack(-1, sLastError, e, null);
319
320             bContinue = false;
321           }
322         } // fi(Previous_Job == CurrentAtom->Job)
323

324         // ---------------------------------------------------------------------
325

326         if (null!=oJob) {
327
328           // -------------------------------------------------------------------
329
// Actual Atom processing call here!
330

331           oJob.process(oAtm);
332
333           if (DebugFile.trace)
334             DebugFile.writeln("Thread " + getName() + " consumed Atom " + String.valueOf(oAtm.getInt(DB.pg_atom)));
335
336           // Move Atom register from k_job_atoms to k_job_atoms_archived
337
oAtm.archive(oConsumerConnection);
338
339           if (iCallbacks>0) callBack(WorkerThreadCallback.WT_ATOM_CONSUME, "Thread " + getName() + " consumed Atom " + String.valueOf(oAtm.getInt(DB.pg_atom)), null, oAtm);
340
341           oAtm = null;
342
343           if (DebugFile.trace) DebugFile.writeln("job " + oJob.getString(DB.gu_job) + " pending " + String.valueOf(oJob.pending()));
344
345           if (oJob.pending()==0) {
346             oJob.setStatus(oConsumerConnection, Job.STATUS_FINISHED);
347
348             if (iCallbacks>0) callBack(WorkerThreadCallback.WT_JOB_FINISH, "finish", null, oJob);
349           }
350
351           // -------------------------------------------------------------------
352

353         } // fi (oJob)
354
else {
355           oAtm = null;
356           sLastError = "Job.instantiate(" + sJob + ") returned null";
357           if (DebugFile.trace) DebugFile.writeln("ERROR: " + sLastError);
358
359           if (iCallbacks>0) callBack(-1, sLastError, new NullPointerException JavaDoc("Job.instantiate(" + sJob + ")"), null);
360
361           bContinue = false;
362         }
363         oConsumerConnection = null;
364         lRunningTime += new Date JavaDoc().getTime()-lStartRun;
365       }
366       catch (Exception JavaDoc e) {
367
368         if (DebugFile.trace)
369           DebugFile.writeln(getName() + " " + e.getClass().getName() + " " + e.getMessage());
370
371         if (null!=oJob) {
372           sLastError = e.getClass().getName() + ", job " + oJob.getString(DB.gu_job) + " ";
373           if (null!=oAtm) {
374             sLastError = "atom " + String.valueOf(oAtm.getInt(DB.pg_atom)) + " ";
375             if (null!=oConsumerConnection) {
376               try {
377                 oAtm.setStatus(oConsumerConnection, Atom.STATUS_INTERRUPTED, e.getClass().getName() + " " + e.getMessage());
378               } catch (SQLException JavaDoc sqle) {
379                 if (DebugFile.trace) DebugFile.writeln("Atom.setStatus() SQLException " + sqle.getMessage());
380               }
381             }
382           }
383           sLastError += e.getMessage();
384
385           oJob.log(getName() + " " + e.getClass().getName() + ", job " + oJob.getString(DB.gu_job) + " ");
386           if (null!=oAtm) oJob.log("atom " + String.valueOf(oAtm.getInt(DB.pg_atom)) + " ");
387           oJob.log(e.getMessage() + "\n");
388         } // fi (oJob)
389
else
390           sLastError = e.getClass().getName() + " " + e.getMessage();
391
392         if (iCallbacks>0) callBack(-1, sLastError, e, oJob);
393
394         bContinue = false;
395       }
396       finally {
397         sJob = "";
398         oAtm = null;
399       }
400     } // wend
401

402     if (oJob!=null) { oJob.free(); oJob=null; }
403
404     if (DebugFile.trace) {
405       DebugFile.decIdent();
406       DebugFile.writeln("End WorkerThread.run()");
407     }
408   } // run
409

410   // ---------------------------------------------------------------------------
411

412   /**
413    * <p>Halt thread execution commiting all operations in course before stopping</p>
414    * If a thread is dead-locked by any reason halting it will not cause any effect.<br>
415    * halt() method only sends a signals to the each WokerThread telling it that must
416    * finish pending operations and stop.
417    */

418   public void halt() {
419     bContinue = false;
420   }
421
422   // ---------------------------------------------------------------------------
423

424 } // WorkerThread
425
Popular Tags