1 package com.knowgate.scheduler; 2 3 import java.sql.DriverManager ; 4 import java.sql.SQLException ; 5 import java.sql.Statement ; 6 import java.sql.ResultSet ; 7 import java.sql.ResultSetMetaData ; 8 9 import java.io.FileInputStream ; 10 import java.io.IOException ; 11 import java.io.FileNotFoundException ; 12 13 import java.util.Properties ; 14 import java.util.LinkedList ; 15 import java.util.ListIterator ; 16 17 import javax.mail.MessagingException ; 18 19 import com.knowgate.jdc.JDCConnection; 20 21 import com.knowgate.dataobjs.DB; 22 import com.knowgate.dataobjs.DBBind; 23 import com.knowgate.dataobjs.DBSubset; 24 25 import com.knowgate.misc.Gadgets; 26 27 36 37 public class SingleThreadExecutor extends Thread { 38 39 private String sEnvProps; 40 41 private Properties oEnvProps; 42 43 private boolean bContinue; 44 45 private String sLastError; 46 47 private String sJob; 48 49 private Job oJob; 50 51 private Atom oAtm; 52 53 private LinkedList oCallbacks; 54 55 private int iCallbacks; 56 57 59 private static class SystemOutNotify extends WorkerThreadCallback { 60 61 public SystemOutNotify() { 62 super("SystemOutNotify"); 63 } 64 65 public void call (String sThreadId, int iOpCode, String sMessage, Exception oXcpt, Object oParam) { 66 67 if (WorkerThreadCallback.WT_EXCEPTION==iOpCode) 68 System.out.println("Thread " + sThreadId + ": ERROR " + sMessage); 69 else 70 System.out.println("Thread " + sThreadId + ": " + sMessage); 71 } 72 } 73 74 76 82 public SingleThreadExecutor (String sPropertiesFilePath) 83 throws FileNotFoundException , IOException { 84 85 sJob = null; 86 87 bContinue = true; 88 89 if (sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))==-1) 90 sEnvProps = sPropertiesFilePath; 91 else 92 sEnvProps = sPropertiesFilePath.substring(sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))+1); 93 94 FileInputStream oInProps = new FileInputStream (sPropertiesFilePath); 95 oEnvProps = new Properties (); 96 oEnvProps.load (oInProps); 97 oInProps.close (); 98 99 oCallbacks = new LinkedList (); 100 } 101 102 108 public SingleThreadExecutor (String sPropertiesFilePath, String sJobId) 109 throws FileNotFoundException , IOException { 110 111 sJob = sJobId; 112 113 bContinue = true; 114 115 if (sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))==-1) 116 sEnvProps = sPropertiesFilePath; 117 else 118 sEnvProps = sPropertiesFilePath.substring(sPropertiesFilePath.lastIndexOf(System.getProperty("file.separator"))+1); 119 120 FileInputStream oInProps = new FileInputStream (sPropertiesFilePath); 121 oEnvProps = new Properties (); 122 oEnvProps.load (oInProps); 123 oInProps.close (); 124 125 oCallbacks = new LinkedList (); 126 } 127 128 130 public Atom activeAtom() { 131 return oAtm; 132 } 133 134 136 public Job activeJob() { 137 return oJob; 138 } 139 140 142 public String lastError() { 143 return sLastError; 144 } 145 146 148 153 public void registerCallback(WorkerThreadCallback oNewCallback) 154 throws IllegalArgumentException { 155 156 WorkerThreadCallback oCallback; 157 ListIterator oIter = oCallbacks.listIterator(); 158 159 while (oIter.hasNext()) { 160 oCallback = (WorkerThreadCallback) oIter.next(); 161 162 if (oCallback.name().equals(oNewCallback.name())) { 163 throw new IllegalArgumentException ("Callback " + oNewCallback.name() + " is already registered"); 164 } } 167 oCallbacks.addLast(oNewCallback); 168 iCallbacks++; 169 } 171 173 179 public boolean unregisterCallback(String sCallbackName) { 180 WorkerThreadCallback oCallback; 181 ListIterator oIter = oCallbacks.listIterator(); 182 183 while (oIter.hasNext()) { 184 oCallback = (WorkerThreadCallback) oIter.next(); 185 186 if (oCallback.name().equals(sCallbackName)) { 187 oIter.remove(); 188 iCallbacks--; 189 return true; 190 } } 193 return false; 194 } 196 198 private void callBack (int iOpCode, String sMessage, Exception oXcpt, Object oParam) { 199 200 WorkerThreadCallback oCallback; 201 ListIterator oIter = oCallbacks.listIterator(); 202 203 while (oIter.hasNext()) { 204 oCallback = (WorkerThreadCallback) oIter.next(); 205 oCallback.call(getName(), iOpCode, sMessage, oXcpt, oParam); 206 } 208 } 210 212 public void run() { 213 Statement oStm; 214 JDCConnection oCon; 215 AtomFeeder oFdr; 216 DBSubset oDBS; 217 String sSQL; 218 String sJId; 219 ResultSet oRst; 220 ResultSetMetaData oMDt; 221 222 DBBind oDBB = null; 223 224 try { 225 oDBB = new DBBind(sEnvProps); 226 227 oCon = new JDCConnection (DriverManager.getConnection (oEnvProps.getProperty("dburl"), 228 oEnvProps.getProperty ("dbuser"), 229 oEnvProps.getProperty ("dbpassword")), null); 230 bContinue = true; 231 232 sLastError = ""; 233 234 while (bContinue) { 235 236 oFdr = new AtomFeeder(); 237 238 if (sJob==null) 239 oDBS = oFdr.loadAtoms(oCon,1); 240 else 241 oDBS = oFdr.loadAtoms(oCon, sJob); 242 243 if (oDBS.getRowCount()>0) { 244 245 sJId = oDBS.getString(0,0); 246 247 oJob = Job.instantiate(oCon, sJId, oEnvProps); 248 249 oStm = oCon.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 250 251 sSQL = "SELECT a.*, j." + DB.tx_parameters + " FROM " + DB.k_job_atoms + " a, " + DB.k_jobs + " j WHERE a." + DB.id_status + "=" + String.valueOf(Atom.STATUS_PENDING) + " AND j." + DB.gu_job + "=a." + DB.gu_job + " AND j." + DB.gu_job + "='" + sJId + "'"; 252 253 oRst = oStm.executeQuery(sSQL); 254 oMDt = oRst.getMetaData(); 255 256 while (oRst.next()) { 257 258 oAtm = new Atom(oRst, oMDt); 259 260 oJob.process(oAtm); 261 262 } oRst.close(); 264 oStm.close(); 265 } 266 else 267 bContinue = false; 268 } 270 oCon.close(); 271 272 oDBB.close(); 273 } 274 catch (MessagingException e) { 275 276 oDBB.close(); 277 278 sLastError = "MessagingException " + e.getMessage(); 279 280 if (iCallbacks>0) callBack(-1, sLastError, new MessagingException (e.getMessage(), e.getNextException()), null); 281 282 if (oJob!=null) oJob.log(sLastError + "\n"); 283 284 } 285 catch (SQLException e) { 286 287 if (null!=oDBB) oDBB.close(); 288 289 sLastError = "SQLException " + e.getMessage(); 290 291 if (iCallbacks>0) callBack(-1, sLastError, new SQLException (e.getMessage(), e.getSQLState(), e.getErrorCode()), null); 292 293 if (oJob!=null) oJob.log(sLastError + "\n"); 294 295 } 296 catch (FileNotFoundException e) { 297 if (null!=oDBB) oDBB.close(); 298 299 sLastError = "FileNotFoundException " + e.getMessage(); 300 301 if (iCallbacks>0) callBack(-1, sLastError, new FileNotFoundException (e.getMessage()), null); 302 303 if (oJob!=null) oJob.log(sLastError + "\n"); 304 } 305 catch (IOException e) { 306 if (null!=oDBB) oDBB.close(); 307 308 sLastError = "IOException " + e.getMessage(); 309 310 if (iCallbacks>0) callBack(-1, sLastError, new IOException (e.getMessage()), null); 311 312 if (oJob!=null) oJob.log(sLastError + "\n"); 313 } 314 catch (ClassNotFoundException e) { 315 316 if (null!=oDBB) oDBB.close(); 317 318 sLastError = "ClassNotFoundException " + e.getMessage(); 319 320 if (iCallbacks>0) callBack(-1, sLastError, new ClassNotFoundException (e.getMessage()), null); 321 322 if (oJob!=null) oJob.log(sLastError + "\n"); 323 } 324 catch (InstantiationException e) { 325 326 if (null!=oDBB) oDBB.close(); 327 328 sLastError = "InstantiationException " + e.getMessage(); 329 330 if (iCallbacks>0) callBack(-1, sLastError, new InstantiationException (e.getMessage()), null); 331 332 if (oJob!=null) oJob.log(sLastError + "\n"); 333 } 334 catch (IllegalAccessException e) { 335 336 if (null!=oDBB) oDBB.close(); 337 338 sLastError = "IllegalAccessException " + e.getMessage(); 339 340 if (iCallbacks>0) callBack(-1, sLastError, new IllegalAccessException (e.getMessage()), null); 341 342 if (oJob!=null) oJob.log(sLastError + "\n"); 343 } 344 catch (NullPointerException e) { 345 346 if (null!=oDBB) oDBB.close(); 347 348 sLastError = "NullPointerException " + e.getMessage(); 349 350 if (iCallbacks>0) callBack(-1, sLastError, new NullPointerException (e.getMessage()), null); 351 352 if (oJob!=null) oJob.log(sLastError + "\n"); 353 } 354 } 356 358 364 public void halt() { 365 bContinue = false; 366 } 367 368 371 private static void printUsage() { 372 System.out.println(""); 373 System.out.println("Usage:"); 374 System.out.println("SingleThreadExecutor {run | lrun} job_type cnf_file_path {gu_job | xml_file_path} [verbose]"); 375 System.out.println("job_type is one of {MAIL | FAX | SAVE | FTP}"); 376 } 377 378 public static void main(String [] argv) 379 throws java.io.FileNotFoundException , java.io.IOException , SQLException , 380 ClassNotFoundException , IllegalAccessException , InstantiationException , 381 org.xml.sax.SAXException { 382 383 SingleThreadExecutor oExec; 384 385 if (argv.length!=4 && argv.length!=5) 386 printUsage(); 387 388 else if (argv.length==5 && !argv[4].equals("verbose")) 389 printUsage(); 390 391 else if (!argv[0].equals("run") && !argv[0].equals("lrun")) 392 printUsage(); 393 394 else if (!argv[1].equalsIgnoreCase("MAIL") && !argv[1].equalsIgnoreCase("FAX") && 395 !argv[1].equalsIgnoreCase("SAVE") && !argv[1].equalsIgnoreCase("FTP") ) 396 printUsage(); 397 398 else { 399 400 if (argv[0].equals("run")) 401 oExec = new SingleThreadExecutor(argv[2], argv[3]); 402 403 else { 404 String sJobGUID = Gadgets.generateUUID(); 405 406 Job.main(new String []{"create", argv[1], argv[2], argv[3], sJobGUID }); 407 408 oExec = new SingleThreadExecutor(argv[2], sJobGUID); 409 } 410 411 if (argv.length==5) 412 oExec.registerCallback(new SystemOutNotify()); 413 414 oExec.start(); 415 } } 418 } | Popular Tags |