1 32 33 package com.knowgate.scheduler; 34 35 import java.io.File ; 36 import java.sql.DriverManager ; 37 import java.sql.Connection ; 38 import java.sql.SQLException ; 39 import java.sql.Connection ; 40 import java.sql.PreparedStatement ; 41 import java.sql.ResultSet ; 42 import java.sql.Statement ; 43 import java.sql.Types ; 44 45 import java.util.Date ; 46 import java.util.Properties ; 47 import java.util.LinkedList ; 48 import java.util.ListIterator ; 49 50 import java.io.FileInputStream ; 51 import java.io.IOException ; 52 import java.io.FileNotFoundException ; 53 54 import com.knowgate.debug.DebugFile; 55 import com.knowgate.dataobjs.DBBind; 56 import com.knowgate.jdc.JDCConnection; 57 import com.knowgate.dataobjs.DB; 58 59 65 66 public class SchedulerDaemon extends Thread { 67 68 private boolean bContinue; 69 70 private String sProfile; 71 72 private WorkerThreadPool oThreadPool; 74 75 private DBBind oDbb; 76 77 private AtomQueue oQue = new AtomQueue(); 80 81 private Properties oEnvProps; 83 84 private LinkedList oCallbacks; 85 86 private Date dtCreationDate; 87 private Date dtStartDate; 88 private Date dtStopDate; 89 90 92 private static class SystemOutNotify extends WorkerThreadCallback { 93 94 public SystemOutNotify() { 95 super("SystemOutNotify"); 96 } 97 98 public void call (String sThreadId, int iOpCode, String sMessage, Exception oXcpt, Object oParam) { 99 100 if (WorkerThreadCallback.WT_EXCEPTION==iOpCode) 101 System.out.println("Thread " + sThreadId + ": ERROR " + sMessage); 102 else 103 System.out.println("Thread " + sThreadId + ": " + sMessage); 104 } 105 } 106 107 109 122 public SchedulerDaemon (String sPropertiesFilePath) 123 throws ClassNotFoundException , FileNotFoundException , IOException , SQLException { 124 125 dtStartDate = dtStopDate = null; 126 127 dtCreationDate = new Date (); 128 129 oThreadPool = null; 130 131 oDbb = null; 132 133 bContinue = true; 134 135 if (DebugFile.trace) { 136 DebugFile.writeln("new FileInputStream("+sPropertiesFilePath+")"); 137 } 138 139 FileInputStream oInProps = new FileInputStream (sPropertiesFilePath); 140 oEnvProps = new Properties (); 141 oEnvProps.load (oInProps); 142 oInProps.close (); 143 144 oCallbacks = new LinkedList (); 145 146 sProfile = sPropertiesFilePath.substring(sPropertiesFilePath.lastIndexOf(File.separator)+1,sPropertiesFilePath.lastIndexOf('.')); 147 148 } 150 152 156 public Date creationDate() { 157 return dtCreationDate; 158 } 159 160 162 166 public Date startDate() { 167 return dtStartDate; 168 } 169 170 172 176 public Date stopDate() { 177 return dtStopDate; 178 } 179 180 182 public AtomQueue atomQueue() { 183 return oQue; 184 } 185 186 188 public WorkerThreadPool threadPool() { 189 return oThreadPool; 190 } 191 192 194 197 198 public void run() { 199 Statement oStmt; 200 ResultSet oRSet; 201 int iJobCount; 202 String sSQL; 203 AtomConsumer oCsr = null; 204 JDCConnection oCon = null; 205 206 if (DebugFile.trace) DebugFile.writeln("Begin SchedulerDaemon.run()"); 207 208 try { 209 210 if (null==oDbb) oDbb = new DBBind(sProfile); 211 212 oCon = oDbb.getConnection("SchedulerDaemon"); 213 214 if (DebugFile.trace) DebugFile.writeln("JDCConnection.setAutoCommit(true)"); 215 216 oCon.setAutoCommit(true); 217 218 if (DebugFile.trace) DebugFile.writeln("new AtomQueue()"); 220 221 oQue = new AtomQueue(); 222 223 if (DebugFile.trace) DebugFile.writeln("new AtomFeeder()"); 226 227 AtomFeeder oFdr = new AtomFeeder(); 228 229 if (DebugFile.trace) DebugFile.writeln("new AtomConsumer([JDCconnection], [AtomQueue])"); 233 234 oCsr = new AtomConsumer(oCon, oQue); 235 236 238 if (DebugFile.trace) DebugFile.writeln("new WorkerThreadPool([AtomConsumer], [Properties])"); 239 240 oThreadPool = new WorkerThreadPool(oCsr, oEnvProps); 241 242 244 ListIterator oIter = oCallbacks.listIterator(); 245 while (oIter.hasNext()) 246 oThreadPool.registerCallback((WorkerThreadCallback) oIter.next()); 247 248 dtStartDate = new Date (); 249 250 do { 251 try { 252 253 while(bContinue) { 254 oStmt = oCon.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 256 257 try { oStmt.setQueryTimeout(20); } catch (SQLException sqle) { } 258 259 sSQL = "SELECT j.gu_job FROM k_jobs j WHERE ("+ 262 "j.id_status="+String.valueOf(Job.STATUS_PENDING)+" OR "+ 263 "j.id_status="+String.valueOf(Job.STATUS_RUNNING)+") AND "+ 264 "NOT EXISTS (SELECT a.pg_atom FROM k_job_atoms a WHERE "+ 265 "j.gu_job=a.gu_job AND a.id_status IN ("+ 266 String.valueOf(Atom.STATUS_PENDING)+","+ 267 String.valueOf(Atom.STATUS_RUNNING)+","+ 268 String.valueOf(Atom.STATUS_SUSPENDED)+"))"; 269 270 if (DebugFile.trace) DebugFile.writeln("Statement.executeQuery("+sSQL+")"); 271 272 oRSet = oStmt.executeQuery(sSQL); 273 LinkedList oFinished = new LinkedList (); 274 while (oRSet.next()) { 275 oFinished.add(oRSet.getString(1)); 276 } oRSet.close(); 278 279 if (DebugFile.trace) DebugFile.writeln("Already finished jobs "+String.valueOf(oFinished.size())); 280 281 if (oFinished.size()>0) { 282 sSQL = "UPDATE k_jobs SET id_status="+String.valueOf(Job.STATUS_FINISHED)+",dt_finished="+DBBind.Functions.GETDATE+" WHERE gu_job=?"; 283 if (DebugFile.trace) DebugFile.writeln("Connection.prepareStatement("+sSQL+")"); 284 PreparedStatement oUpdt = oCon.prepareStatement(sSQL); 285 oIter = oFinished.listIterator(); 286 while (oIter.hasNext()) { 287 oUpdt.setObject(1, oIter.next(), java.sql.Types.CHAR); 288 oUpdt.executeUpdate(); 289 } oUpdt.close(); 291 } 293 296 if (DebugFile.trace) DebugFile.writeln("Statement.executeQuery(SELECT COUNT(*) FROM k_jobs WHERE id_status=" + String.valueOf(Job.STATUS_PENDING) + ")"); 297 298 oRSet = oStmt.executeQuery("SELECT COUNT(*) FROM k_jobs WHERE id_status=" + String.valueOf(Job.STATUS_PENDING)); 299 oRSet.next(); 300 iJobCount = oRSet.getInt(1); 301 oRSet.close(); 302 oStmt.close(); 303 304 if (DebugFile.trace) DebugFile.writeln(String.valueOf(iJobCount) + " pending jobs"); 305 306 if (0==iJobCount) 307 sleep (10000); 308 else 309 break; 310 } 312 if (bContinue) { 313 oFdr.loadAtoms(oCon, oThreadPool.size()); 314 315 oFdr.feedQueue(oCon, oQue); 316 317 if (oQue.size()>0) 318 oThreadPool.launchAll(); 319 320 do { 321 322 sleep(10000); 323 324 if (DebugFile.trace) DebugFile.writeln(String.valueOf(oThreadPool.livethreads()) + " live threads"); 325 326 } while(oThreadPool.livethreads()==oThreadPool.size()); 327 } } 329 catch (InterruptedException e) { 330 if (DebugFile.trace) 331 DebugFile.writeln("SchedulerDaemon InterruptedException " + e.getMessage()); 332 } 333 } while (bContinue) ; 334 335 if (DebugFile.trace) DebugFile.writeln(" exiting SchedulerDaemon"); 336 337 oThreadPool.haltAll(); 338 oThreadPool = null; 339 340 oCsr.close(); 341 oCsr = null; 342 343 oFdr = null; 344 oQue = null; 345 346 if (DebugFile.trace) DebugFile.writeln("JDConnection.close()"); 347 348 oCon.close("SchedulerDaemon"); 349 oCon = null; 350 351 oDbb.close(); 352 oDbb=null; 353 } 354 catch (SQLException e) { 355 try { oThreadPool.haltAll(); oThreadPool=null; } catch (Exception ignore) {} 356 try { oCsr.close(); oCsr=null; } catch (Exception ignore) {} 357 try { 358 if (oCon!=null) if (!oCon.isClosed()) oCon.close("SchedulerDaemon"); 359 } catch (SQLException sqle) { 360 if (DebugFile.trace) DebugFile.writeln("SchedulerDaemon SQLException on close() " + sqle.getMessage()); 361 } 362 if (null!=oDbb) { try { oDbb.close(); } catch (Exception ignore) {} } 363 oCon = null; 364 365 dtStartDate = null; 366 dtStopDate = new Date (); 367 368 if (DebugFile.trace) 369 DebugFile.writeln("SchedulerDaemon SQLException " + e.getMessage()); 370 DebugFile.writeln("SchedulerDaemon.run() abnormal termination"); 371 } 372 if (DebugFile.trace) DebugFile.writeln("End SchedulerDaemon.run()"); 373 } 375 377 public void registerCallback(WorkerThreadCallback oNewCallback) 378 throws IllegalArgumentException { 379 380 if (oThreadPool==null) 381 oCallbacks.addLast(oNewCallback); 382 else 383 oThreadPool.registerCallback(oNewCallback); 384 } 385 386 388 public void unregisterCallback(String sCallbackName) { 389 if (oThreadPool!=null) 390 oThreadPool.unregisterCallback(sCallbackName); 391 } 392 393 395 private static void interruptJobs(JDCConnection oCon, Object [] aJobs) throws SQLException { 396 int nJobs; 397 if (null==aJobs) nJobs=0; else nJobs = aJobs.length; 398 if (nJobs>0) { 399 PreparedStatement oStmt = oCon.prepareStatement("UPDATE " + DB.k_jobs + " SET " + DB.id_status + "=" + String.valueOf(Job.STATUS_INTERRUPTED) + " WHERE " + DB.gu_job + "=?"); 400 for (int j=0; j<nJobs; j++) { 401 if (null!=aJobs[j]) { 402 oStmt.setObject(1, aJobs[j], Types.CHAR); 403 oStmt.executeUpdate(); 404 } 405 } 406 oStmt.close(); 407 } 408 } 409 410 412 private static void suspendJobs(JDCConnection oCon, Object [] aJobs) throws SQLException { 413 int nJobs; 414 if (null==aJobs) nJobs=0; else nJobs = aJobs.length; 415 if (nJobs>0) { 416 PreparedStatement oStmt = oCon.prepareStatement("UPDATE " + DB.k_jobs + " SET " + DB.id_status + "=" + String.valueOf(Job.STATUS_SUSPENDED) + " WHERE " + DB.gu_job + "=?"); 417 for (int j=0; j<nJobs; j++) { 418 if (null!=aJobs[j]) { 419 oStmt.setObject(1, aJobs[j], Types.CHAR); 420 oStmt.executeUpdate(); 421 } 422 } 423 oStmt.close(); 424 } 425 } 426 427 429 434 public void haltAll() throws IllegalStateException { 435 if (null==oThreadPool) 436 throw new IllegalStateException ("SchedulerDaemon.haltAll() Thread pool not initialized, call start() method before trying to halt worker threads"); 437 String [] aInitRunningJobs = oThreadPool.runningJobs(); 438 oThreadPool.haltAll(); 439 String [] aStillRunningJobs = oThreadPool.runningJobs(); 440 if (null!=oDbb) { 441 try { 442 JDCConnection oCon = oDbb.getConnection("SchedulerDaemonHaltAll"); 443 if (null!=aInitRunningJobs) { 444 if (null!=aStillRunningJobs) { 445 int nInitRunningJobs = aInitRunningJobs.length; 446 int nStillRunningJobs= aStillRunningJobs.length; 447 for (int i=0; i<nInitRunningJobs; i++) { 448 boolean bStillRunning = false; 449 for (int j=0; j<nStillRunningJobs && !bStillRunning; j++) { 450 bStillRunning = aStillRunningJobs[j].equals(aInitRunningJobs[i]); 451 } if (bStillRunning) aInitRunningJobs[i]=null; 453 } } suspendJobs(oCon, aInitRunningJobs); 456 } oCon.close("SchedulerDaemonHaltAll"); 458 } catch (SQLException sqle) { 459 throw new IllegalStateException ("SchedulerDaemon.haltAll() SQLException "+sqle.getMessage()); 460 } 461 } 462 } 463 465 475 public synchronized void stopAll(long lDelayMilis) 476 throws IllegalStateException ,SQLException { 477 478 if (null==oThreadPool) 479 throw new IllegalStateException ("SchedulerDaemon.stopAll() Thread pool not initialized, call start() method before trying to stop worker threads"); 480 481 oThreadPool.haltAll(); 482 483 try { sleep(lDelayMilis); } catch (InterruptedException ignore) { } 484 485 bContinue = false; 486 487 if (null!=oDbb) { 488 JDCConnection oCon = oDbb.getConnection("SchedulerDaemonStopAll"); 489 oThreadPool.stopAll(oCon); 490 interruptJobs(oCon, oThreadPool.runningJobs()); 491 oCon.close("SchedulerDaemonStopAll"); 492 } else { 493 oThreadPool.stopAll(); 494 } 495 496 } 498 500 505 public void stopAll() throws IllegalStateException ,SQLException { 506 stopAll(10000l); 507 } 508 509 511 private static void printUsage() { 512 System.out.println(""); 513 System.out.println("Usage:"); 514 System.out.println("SchedulerDaemon cnf_file_path [verbose]"); 515 } 516 517 public static void main(String [] argv) 518 throws ClassNotFoundException , SQLException , IOException { 519 520 DBBind oGlobalDBBind = new DBBind(); 521 SchedulerDaemon TheDaemon; 522 523 if (argv.length<1 || argv.length>2) 524 printUsage(); 525 526 else if (argv.length==2 && !argv[1].equals("verbose")) 527 printUsage(); 528 529 else { 530 531 TheDaemon = new SchedulerDaemon(argv[0]); 532 533 if (argv.length==2) 534 TheDaemon.registerCallback(new SystemOutNotify()); 535 536 TheDaemon.start(); 537 } 538 } 539 }
| Popular Tags
|