1 32 33 package com.knowgate.scheduler; 34 35 36 import java.lang.Thread ; 37 import java.util.Date ; 38 import java.util.Properties ; 39 import java.util.LinkedList ; 40 import java.util.ListIterator ; 41 42 import java.sql.SQLException ; 43 import java.sql.Connection ; 44 import java.io.IOException ; 45 import java.io.FileNotFoundException ; 46 import java.io.File ; 47 import java.io.FileInputStream ; 48 import java.io.FileOutputStream ; 49 50 import javax.mail.MessagingException ; 51 52 import com.knowgate.jdc.JDCConnection; 53 import com.knowgate.dataobjs.DB; 54 import com.knowgate.dataxslt.*; 55 import com.knowgate.dataxslt.db.PageSetDB; 56 import com.knowgate.debug.DebugFile; 57 import com.knowgate.scheduler.*; 58 import com.knowgate.crm.DistributionList; 59 60 65 66 public class WorkerThread extends Thread { 67 68 private String sLastError; 69 private Job oJob; private Atom oAtm; private long lRunningTime; 72 private int delay = 1; private AtomConsumer oConsumer; 74 private WorkerThreadPool oPool; 75 private LinkedList oCallbacks; 76 private int iCallbacks; 77 private boolean bContinue; 78 79 81 86 87 public WorkerThread(WorkerThreadPool oThreadPool, AtomConsumer oAtomConsumer) { 88 oConsumer = oAtomConsumer; 89 oPool = oThreadPool; 90 oCallbacks = new LinkedList (); 91 iCallbacks = 0; 92 oJob = null; 93 sLastError = ""; 94 lRunningTime = 0; 95 } 96 97 99 public int getDelayMS() { 100 return delay; 101 } 102 103 105 public void getDelayMS(int iMiliseconds) { 106 delay=iMiliseconds; 107 } 108 109 111 public long getRunningTimeMS() { 112 return lRunningTime; 113 } 114 115 117 public void setConsumer (AtomConsumer oAtomConsumer) { 118 oConsumer = oAtomConsumer; 119 } 120 121 123 128 public String getProperty(String sKey) { 129 return oPool.getProperty(sKey); 130 } 131 132 134 public Atom activeAtom() { 135 return oAtm; 136 } 137 138 140 public Job activeJob() { 141 return oJob; 142 } 143 144 146 public String lastError() { 147 return sLastError; 148 } 149 150 152 157 public void registerCallback(WorkerThreadCallback oNewCallback) 158 throws IllegalArgumentException { 159 160 WorkerThreadCallback oCallback; 161 ListIterator oIter = oCallbacks.listIterator(); 162 163 while (oIter.hasNext()) { 164 oCallback = (WorkerThreadCallback) oIter.next(); 165 166 if (oCallback.name().equals(oNewCallback.name())) { 167 throw new IllegalArgumentException ("Callback " + oNewCallback.name() + " is already registered"); 168 } } 171 oCallbacks.addLast(oNewCallback); 172 iCallbacks++; 173 } 175 177 183 public boolean unregisterCallback(String sCallbackName) { 184 WorkerThreadCallback oCallback; 185 ListIterator oIter = oCallbacks.listIterator(); 186 187 while (oIter.hasNext()) { 188 oCallback = (WorkerThreadCallback) oIter.next(); 189 190 if (oCallback.name().equals(sCallbackName)) { 191 oIter.remove(); 192 iCallbacks--; 193 return true; 194 } } 197 return false; 198 } 200 202 private void callBack(int iOpCode, String sMessage, Exception oXcpt, Object oParam) { 203 WorkerThreadCallback oCallback; 204 ListIterator oIter = oCallbacks.listIterator(); 205 206 while (oIter.hasNext()) { 207 oCallback = (WorkerThreadCallback) oIter.next(); 208 oCallback.call(getName(), iOpCode, sMessage, oXcpt, oParam); 209 } 211 } 212 213 215 222 public void run() { 223 String sJob = ""; JDCConnection oConsumerConnection = null; 225 226 if (DebugFile.trace) { 227 DebugFile.writeln("Begin WorkerThread.run()"); 228 DebugFile.incIdent(); 229 DebugFile.writeln("thread=" + getName()); 230 } 231 232 bContinue = true; 233 234 sLastError = ""; 235 236 while (bContinue) { 237 238 try { 239 if (delay>0) sleep(delay); 240 241 long lStartRun = new Date ().getTime(); 242 243 if (DebugFile.trace) DebugFile.writeln(getName() + " getting next atom..."); 244 245 oAtm = oConsumer.next(); 246 247 if (oAtm==null) { 248 if (DebugFile.trace) DebugFile.writeln(getName() + " no more atoms."); 250 251 if (iCallbacks>0) callBack (WorkerThreadCallback.WT_ATOMCONSUMER_NOMORE, "Thread " + getName() + " no more Atoms", null, oConsumer); 252 253 break; 254 } 255 256 if (iCallbacks>0) callBack (WorkerThreadCallback.WT_ATOM_GET, "Thread " + getName() + " got Atom " + String.valueOf(oAtm.getInt(DB.pg_atom)), null, oAtm); 257 258 oConsumerConnection = oConsumer.getConnection(); 259 260 if (DebugFile.trace) DebugFile.writeln(getName() + " AtomConsumer.getConnection() : " + (oConsumerConnection!=null ? "[Conenction]" : "null")); 261 262 265 if (!sJob.equals(oAtm.getString(DB.gu_job))) { 266 267 270 sJob = oAtm.getString(DB.gu_job); 271 272 try { 273 oJob = Job.instantiate(oConsumerConnection, sJob, oPool.getProperties()); 275 276 if (iCallbacks>0) callBack(WorkerThreadCallback.WT_JOB_INSTANTIATE, "instantiate job " + sJob + " command " + oJob.getString(DB.id_command), null, oJob); 277 } 278 catch (ClassNotFoundException e) { 279 sJob = ""; 280 oJob = null; 281 sLastError = "Job.instantiate(" + sJob + ") ClassNotFoundException " + e.getMessage(); 282 283 if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError); 284 285 if (iCallbacks>0) callBack(-1, sLastError, e, null); 286 287 bContinue = false; 288 } 289 catch (IllegalAccessException e) { 290 sJob = ""; 291 oJob = null; 292 sLastError = "Job.instantiate(" + sJob + ") IllegalAccessException " + e.getMessage(); 293 294 if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError); 295 296 if (iCallbacks>0) callBack(-1, sLastError, e, null); 297 298 bContinue = false; 299 } 300 catch (InstantiationException e) { 301 sJob = ""; 302 oJob = null; 303 sLastError = "Job.instantiate(" + sJob + ") InstantiationException " + e.getMessage(); 304 305 if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError); 306 307 if (iCallbacks>0) callBack(-1, sLastError, e, null); 308 309 bContinue = false; 310 } 311 catch (SQLException e) { 312 sJob = ""; 313 oJob = null; 314 sLastError = " Job.instantiate(" + sJob + ") SQLException " + e.getMessage(); 315 316 if (DebugFile.trace) DebugFile.writeln(getName() + " " + sLastError); 317 318 if (iCallbacks>0) callBack(-1, sLastError, e, null); 319 320 bContinue = false; 321 } 322 } 324 326 if (null!=oJob) { 327 328 331 oJob.process(oAtm); 332 333 if (DebugFile.trace) 334 DebugFile.writeln("Thread " + getName() + " consumed Atom " + String.valueOf(oAtm.getInt(DB.pg_atom))); 335 336 oAtm.archive(oConsumerConnection); 338 339 if (iCallbacks>0) callBack(WorkerThreadCallback.WT_ATOM_CONSUME, "Thread " + getName() + " consumed Atom " + String.valueOf(oAtm.getInt(DB.pg_atom)), null, oAtm); 340 341 oAtm = null; 342 343 if (DebugFile.trace) DebugFile.writeln("job " + oJob.getString(DB.gu_job) + " pending " + String.valueOf(oJob.pending())); 344 345 if (oJob.pending()==0) { 346 oJob.setStatus(oConsumerConnection, Job.STATUS_FINISHED); 347 348 if (iCallbacks>0) callBack(WorkerThreadCallback.WT_JOB_FINISH, "finish", null, oJob); 349 } 350 351 353 } else { 355 oAtm = null; 356 sLastError = "Job.instantiate(" + sJob + ") returned null"; 357 if (DebugFile.trace) DebugFile.writeln("ERROR: " + sLastError); 358 359 if (iCallbacks>0) callBack(-1, sLastError, new NullPointerException ("Job.instantiate(" + sJob + ")"), null); 360 361 bContinue = false; 362 } 363 oConsumerConnection = null; 364 lRunningTime += new Date ().getTime()-lStartRun; 365 } 366 catch (Exception e) { 367 368 if (DebugFile.trace) 369 DebugFile.writeln(getName() + " " + e.getClass().getName() + " " + e.getMessage()); 370 371 if (null!=oJob) { 372 sLastError = e.getClass().getName() + ", job " + oJob.getString(DB.gu_job) + " "; 373 if (null!=oAtm) { 374 sLastError = "atom " + String.valueOf(oAtm.getInt(DB.pg_atom)) + " "; 375 if (null!=oConsumerConnection) { 376 try { 377 oAtm.setStatus(oConsumerConnection, Atom.STATUS_INTERRUPTED, e.getClass().getName() + " " + e.getMessage()); 378 } catch (SQLException sqle) { 379 if (DebugFile.trace) DebugFile.writeln("Atom.setStatus() SQLException " + sqle.getMessage()); 380 } 381 } 382 } 383 sLastError += e.getMessage(); 384 385 oJob.log(getName() + " " + e.getClass().getName() + ", job " + oJob.getString(DB.gu_job) + " "); 386 if (null!=oAtm) oJob.log("atom " + String.valueOf(oAtm.getInt(DB.pg_atom)) + " "); 387 oJob.log(e.getMessage() + "\n"); 388 } else 390 sLastError = e.getClass().getName() + " " + e.getMessage(); 391 392 if (iCallbacks>0) callBack(-1, sLastError, e, oJob); 393 394 bContinue = false; 395 } 396 finally { 397 sJob = ""; 398 oAtm = null; 399 } 400 } 402 if (oJob!=null) { oJob.free(); oJob=null; } 403 404 if (DebugFile.trace) { 405 DebugFile.decIdent(); 406 DebugFile.writeln("End WorkerThread.run()"); 407 } 408 } 410 412 418 public void halt() { 419 bContinue = false; 420 } 421 422 424 } | Popular Tags |