1 32 33 package com.knowgate.scheduler; 34 35 import java.util.Date ; 36 37 import java.sql.Connection ; 38 import java.sql.SQLException ; 39 import java.sql.Statement ; 40 import java.sql.PreparedStatement ; 41 import java.sql.ResultSet ; 42 import java.sql.ResultSetMetaData ; 43 44 import com.knowgate.debug.DebugFile; 45 import com.knowgate.jdc.JDCConnection; 46 import com.knowgate.dataobjs.DB; 47 import com.knowgate.dataobjs.DBBind; 48 import com.knowgate.dataobjs.DBSubset; 49 import com.knowgate.misc.Gadgets; 50 import com.knowgate.hipergate.QueryByForm; 51 52 import com.knowgate.crm.DistributionList; 53 54 import java.util.Properties ; 55 56 61 public class AtomFeeder { 62 private int iMaxBatchSize; 63 64 public AtomFeeder() { 65 iMaxBatchSize = 10000; 66 } 67 68 70 public void setMaxBatchSize(int iMaxBatch) { 71 iMaxBatchSize = iMaxBatch; 72 } 73 74 76 public int getMaxBatchSize() { 77 return iMaxBatchSize; 78 } 79 80 82 94 95 private int loadDynamicList(JDCConnection oConn, String sJobGUID, Date dtExec, String sListGUID, String sQueryGUID, String sWorkAreaGUID) throws SQLException { 96 Statement oStmt; 97 QueryByForm oQBF; 98 String sSQL; 99 int iInserted; 100 101 if (DebugFile.trace) { 102 DebugFile.writeln("Begin AtomFeeder.loadDynamicList([Connection] , " + sJobGUID + "," + dtExec.toString() + "," + sQueryGUID + "," + sWorkAreaGUID + " )"); 103 DebugFile.incIdent(); 104 } 105 106 String sColumns = "gu_company,gu_contact,tx_email,tx_name,tx_surname,tx_salutation,nm_commercial,tp_street,nm_street,nu_street,tx_addr1,tx_addr2,nm_country,nm_state,mn_city,zipcode,work_phone,direct_phone,home_phone,mov_phone,fax_phone,other_phone,po_box"; 108 109 oQBF = new QueryByForm(oConn, DB.k_member_address, "ma", sQueryGUID); 111 112 oStmt = oConn.createStatement(); 114 115 sSQL = "INSERT INTO " + DB.k_job_atoms + 116 " (gu_job,id_status," + sColumns + ") " + 117 " (SELECT '" + sJobGUID + "'," + String.valueOf(Atom.STATUS_PENDING) + 118 "," + sColumns + " FROM " + DB.k_member_address + " ma WHERE ma.gu_workarea='" + sWorkAreaGUID + 119 "' AND (" + oQBF.composeSQL() + ") AND NOT EXISTS (SELECT x." + DB.tx_email + 120 " FROM " + DB.k_lists + " b, " + DB.k_x_list_members + " x WHERE b." + 121 DB.gu_list + "=x." + DB.gu_list + " AND b." + DB.gu_query + "='" + sListGUID + 122 "' AND b." + DB.tp_list + "=" + String.valueOf(DistributionList.TYPE_BLACK) + 123 " AND x." + DB.tx_email + "=ma." + DB.tx_email + "))"; 124 125 if (DebugFile.trace) DebugFile.writeln("Connection.executeUpdate(" + sSQL + ")"); 126 127 iInserted = oStmt.executeUpdate(sSQL); 128 129 oStmt.close(); 130 131 if (DebugFile.trace) { 132 DebugFile.decIdent(); 133 134 DebugFile.writeln("End AtomFeeder.loadDynamicList() : " + String.valueOf(iInserted)); 135 } 136 return iInserted; 137 } 139 141 149 private int loadStaticList(JDCConnection oConn, String sJobGUID, Date dtExec, String sListGUID) throws SQLException { 150 Statement oStmt; 151 String sSQL; 152 int iInserted; 153 154 if (DebugFile.trace) { 155 DebugFile.writeln("Begin AtomFeeder.loadStaticList([Connection] , " + sJobGUID + "," + dtExec.toString() + "," + sListGUID + ")"); 156 DebugFile.incIdent(); 157 } 158 159 String sColumns = "id_format,gu_company,gu_contact,tx_email,tx_name,tx_surname,tx_salutation"; 162 163 oStmt = oConn.createStatement(); 165 166 sSQL = "INSERT INTO " + DB.k_job_atoms + 167 " (gu_job,id_status," + sColumns + ") " + 168 " (SELECT '" + sJobGUID + "'," + String.valueOf(Atom.STATUS_PENDING) + 169 "," + sColumns + " FROM " + DB.k_x_list_members + " m WHERE " + 170 DB.gu_list + "='" + sListGUID + "' AND m." + DB.bo_active + "<>0 AND " + 171 "NOT EXISTS (SELECT x." + DB.tx_email + " FROM " + DB.k_lists + " b, " + 172 DB.k_x_list_members + " x WHERE b." + DB.gu_list + "=x." + DB.gu_list + 173 " AND b." + DB.gu_query + "='" + sListGUID + "' AND b." + DB.tp_list + 174 "=" + String.valueOf(DistributionList.TYPE_BLACK) + " AND x." + DB.tx_email + "=m." + DB.tx_email + "))"; 175 176 if (DebugFile.trace) DebugFile.writeln("Connection.executeUpdate(" + sSQL + ")"); 177 178 iInserted = oStmt.executeUpdate(sSQL); 179 oStmt.close(); 180 181 if (DebugFile.trace) { 182 DebugFile.decIdent(); 183 DebugFile.writeln("End AtomFeeder.loadStaticList() : " + String.valueOf(iInserted)); 184 } 185 186 return iInserted; 187 } 189 191 199 200 private int loadDirectList(JDCConnection oConn, String sJobGUID, Date dtExec, String sListGUID) throws SQLException { 201 202 return loadStaticList(oConn, sJobGUID, dtExec, sListGUID); 204 } 206 208 private Properties parseParameters(String sTxParams) { 209 String aVariable[]; 210 String aParams[]; 211 Properties oParams = new Properties (); 212 213 if (DebugFile.trace) { 214 DebugFile.writeln("Begin AtomFeeder.parseParameters(" + sTxParams + ")"); 215 DebugFile.incIdent(); 216 } 217 218 if (sTxParams!=null) { 219 if (sTxParams.length()>0) { 220 aParams = Gadgets.split(sTxParams, ","); 221 222 for (int p = 0; p < aParams.length; p++) { 223 aVariable = Gadgets.split(aParams[p], ":"); 224 oParams.put(aVariable[0], aVariable[1]); 225 } } } 229 if (DebugFile.trace) { 230 DebugFile.decIdent(); 231 DebugFile.writeln("End AtomFeeder.parseParameters() : " + String.valueOf(oParams.size())); 232 } 233 234 return oParams; 235 } 237 239 249 250 public DBSubset loadAtoms(JDCConnection oConn, int iWorkerThreads) throws SQLException { 251 252 PreparedStatement oJobStmt; 253 DBSubset oJobsSet; 254 int iJobCount; 255 Properties oParams; 256 DistributionList oDistribList; 257 Date dtNow = new Date (); 258 Date dtExec; 259 String sSQL; 260 int iLoaded = 0; 261 262 if (DebugFile.trace) { 263 DebugFile.writeln("Begin AtomFeeder.loadAtoms([Connection], " + String.valueOf(iWorkerThreads) + ")"); 264 DebugFile.incIdent(); 265 } 266 267 269 oJobsSet = new DBSubset(DB.k_jobs, 270 "gu_job,gu_job_group,gu_workarea,id_command,tx_parameters,id_status,dt_execution,dt_finished,dt_created,dt_modified", 271 DB.id_status + "=" + String.valueOf(Job.STATUS_PENDING) + " ORDER BY " + DB.dt_execution + " DESC", iWorkerThreads); 272 273 oJobsSet.setMaxRows(iWorkerThreads); 274 275 iJobCount = oJobsSet.load(oConn); 277 sSQL = "UPDATE " + DB.k_jobs + " SET " + DB.id_status + "=" + String.valueOf(Job.STATUS_RUNNING) + "," + DB.dt_execution + "=" + DBBind.Functions.GETDATE + " WHERE " + DB.gu_job + "=?"; 279 280 if (DebugFile.trace) DebugFile.writeln("Connection.prepareStatement(" + sSQL + ")"); 281 282 oJobStmt = oConn.prepareStatement(sSQL); 283 284 for (int j=0; j<iJobCount; j++) { 287 oParams = parseParameters(oJobsSet.getString(4, j)); 289 290 if (oParams.getProperty("gu_list")!=null) { 293 oDistribList = new DistributionList(oConn, oParams.getProperty("gu_list")); 294 295 if (oDistribList.isNull(DB.dt_execution)) 298 dtExec = dtNow; 299 else 300 dtExec = oDistribList.getDate(DB.dt_execution); 301 302 switch (oDistribList.getShort(DB.tp_list)) { 304 case DistributionList.TYPE_DYNAMIC: 305 iLoaded += loadDynamicList(oConn, oJobsSet.getString(0, j), dtExec, oParams.getProperty("gu_list"), oDistribList.getString(DB.gu_query), oDistribList.getString(DB.gu_workarea)); 306 break; 307 case DistributionList.TYPE_STATIC: 308 iLoaded += loadStaticList(oConn, oJobsSet.getString(0, j), dtExec, oParams.getProperty("gu_list")); 309 break; 310 case DistributionList.TYPE_DIRECT: 311 iLoaded += loadDirectList(oConn, oJobsSet.getString(0, j), dtExec, oParams.getProperty("gu_list")); 312 break; 313 } } 315 else 316 iLoaded = 0; 317 318 320 if (DebugFile.trace) DebugFile.writeln("PrepareStatement.setString(1, '" + oJobsSet.getStringNull(0, j, "") + "')"); 321 322 oJobStmt.setString (1, oJobsSet.getString(0, j)); 323 324 if (DebugFile.trace) DebugFile.writeln("PrepareStatement.executeUpdate()"); 325 326 oJobStmt.executeUpdate(); 327 } 329 if (DebugFile.trace) DebugFile.writeln("PrepareStatement.close()"); 330 331 oJobStmt.close(); 332 333 if (DebugFile.trace) { 334 DebugFile.decIdent(); 335 DebugFile.writeln("End AtomFeeder.loadAtoms() : " + String.valueOf(oJobsSet.getRowCount())); 336 } 337 338 return oJobsSet; 339 } 341 343 351 352 public DBSubset loadAtoms(JDCConnection oConn, String sJobId) throws SQLException { 353 PreparedStatement oCmdsStmt; 354 PreparedStatement oJobStmt; 355 ResultSet oCmdsSet; 356 DBSubset oJobsSet; 357 int iJobCount; 358 String aParams[]; 359 String aVariable[]; 360 Properties oParams; 361 DistributionList oDistribList; 362 Date dtNow = new Date (); 363 Date dtExec; 364 String sSQL; 365 int iLoaded = 0; 366 367 if (DebugFile.trace) { 368 DebugFile.writeln("Begin AtomFeeder.loadAtoms([Connection], " + sJobId + ")"); 369 DebugFile.incIdent(); 370 } 371 372 374 oJobsSet = new DBSubset(DB.k_jobs, 375 "gu_job,gu_job_group,gu_workarea,id_command,tx_parameters,id_status,dt_execution,dt_finished,dt_created,dt_modified", 376 DB.gu_job + "='" + sJobId + "'", 1); 377 378 379 iJobCount = oJobsSet.load(oConn); 381 sSQL = "UPDATE " + DB.k_jobs + " SET " + DB.id_status + "=" + String.valueOf(Job.STATUS_RUNNING) + "," + DB.dt_execution + "=" + DBBind.Functions.GETDATE + " WHERE " + DB.gu_job + "=?"; 383 384 if (DebugFile.trace) DebugFile.writeln("Connection.prepareStatement(" + sSQL + ")"); 385 386 oJobStmt = oConn.prepareStatement(sSQL); 387 388 if (1==iJobCount) { 391 oParams = parseParameters(oJobsSet.getString(4, 0)); 393 394 if (oParams.getProperty("gu_list")!=null) { 397 oDistribList = new DistributionList(oConn, oParams.getProperty("gu_list")); 398 399 if (oDistribList.isNull(DB.dt_execution)) 402 dtExec = dtNow; 403 else 404 dtExec = oDistribList.getDate(DB.dt_execution); 405 406 switch (oDistribList.getShort(DB.tp_list)) { 408 case DistributionList.TYPE_DYNAMIC: 409 iLoaded += loadDynamicList(oConn, oJobsSet.getString(0, 0), dtExec, oParams.getProperty("gu_list"), oDistribList.getString(DB.gu_query), oDistribList.getString(DB.gu_workarea)); 410 break; 411 case DistributionList.TYPE_STATIC: 412 iLoaded += loadStaticList(oConn, oJobsSet.getString(0, 0), dtExec, oParams.getProperty("gu_list")); 413 break; 414 case DistributionList.TYPE_DIRECT: 415 iLoaded += loadDirectList(oConn, oJobsSet.getString(0, 0), dtExec, oParams.getProperty("gu_list")); 416 break; 417 } } 419 else 420 iLoaded = 0; 421 422 424 if (DebugFile.trace) DebugFile.writeln("PrepareStatement.setString(1, '" + oJobsSet.getStringNull(0, 0, "") + "')"); 425 426 oJobStmt.setString (1, oJobsSet.getString(0, 0)); 427 428 if (DebugFile.trace) DebugFile.writeln("PrepareStatement.executeUpdate()"); 429 430 oJobStmt.executeUpdate(); 431 } 433 if (DebugFile.trace) DebugFile.writeln("PrepareStatement.close()"); 434 435 oJobStmt.close(); 436 437 if (DebugFile.trace) { 438 DebugFile.decIdent(); 439 DebugFile.writeln("End AtomFeeder.loadAtoms() : " + sJobId); 440 } 441 442 return oJobsSet; 443 } 445 447 453 454 public void feedQueue(JDCConnection oConn, AtomQueue oQueue) throws SQLException { 455 Statement oStmt; 456 PreparedStatement oUpdt; 457 PreparedStatement oPgSt; 458 ResultSet oRSet; 459 ResultSetMetaData oMDat; 460 String sJobId; 461 int iAtomId; 462 int iJobCol; 463 int iAtmCol; 464 int iProcessed; 465 String sSQL; 466 Atom oAtm; 467 boolean bNext; 468 469 if (DebugFile.trace) { 470 DebugFile.writeln("Begin AtomFeeder.feedQueue([Connection], [AtomQueue])"); 471 DebugFile.incIdent(); 472 } 473 474 oStmt = oConn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 477 478 sSQL = "SELECT a.*, j." + DB.tx_parameters + " FROM " + DB.k_job_atoms + " a, " + DB.k_jobs + " j WHERE a." + DB.id_status + "=" + String.valueOf(Atom.STATUS_PENDING) + " AND j." + DB.gu_job + "=a." + DB.gu_job + " ORDER BY j." + DB.dt_execution; 479 480 if (DebugFile.trace) DebugFile.writeln("Statement.executeQuery(" + sSQL + ")"); 481 482 oRSet = oStmt.executeQuery(sSQL); 483 484 try { 485 oRSet.setFetchSize (getMaxBatchSize()); 486 } catch (SQLException e) { } 487 488 oMDat = oRSet.getMetaData(); 489 iJobCol = oRSet.findColumn(DB.gu_job); 490 iAtmCol = oRSet.findColumn(DB.pg_atom); 491 492 494 sSQL = "UPDATE " + DB.k_job_atoms + " SET " + DB.id_status + "=" + Atom.STATUS_RUNNING + " WHERE " + DB.gu_job + "=? AND " + DB.pg_atom + "=?"; 495 if (DebugFile.trace) DebugFile.writeln("Connection.prepareStatement(" + sSQL + ")"); 496 oUpdt = oConn.prepareStatement(sSQL); 497 498 iProcessed = 0; 499 500 bNext = oRSet.next(); 501 502 while (bNext && iProcessed<iMaxBatchSize) { 503 oAtm = new Atom(oRSet, oMDat); 504 505 oQueue.push (oAtm); 506 507 sJobId = oRSet.getString(iJobCol); 508 iAtomId = oRSet.getInt(iAtmCol); 509 510 bNext = oRSet.next(); 511 512 oUpdt.setString(1, sJobId); 513 oUpdt.setInt (2, iAtomId); 514 515 if (DebugFile.trace) DebugFile.writeln("PreparedStatement.executeUpdate(UPDATE " + DB.k_job_atoms + " SET " + DB.id_status + "=" + Atom.STATUS_RUNNING + " WHERE " + DB.gu_job + "='" + sJobId + "' AND " + DB.pg_atom + "=" + String.valueOf(iAtomId) +")"); 516 oUpdt.executeUpdate(); 517 518 iProcessed++; 519 } 521 oUpdt.close(); 522 oRSet.close(); 523 oStmt.close(); 524 525 if (DebugFile.trace) { 526 DebugFile.decIdent(); 527 DebugFile.writeln("End AtomFeeder.feedQueue() : " + String.valueOf(iProcessed)); 528 } 529 } 531 537 538 private static String escape(java.util.Date dt) { 539 String str; 540 String sMonth, sDay, sHour, sMin, sSec; 541 542 str = "{ ts '"; 543 544 sMonth = (dt.getMonth()+1<10 ? "0" + String.valueOf((dt.getMonth()+1)) : String.valueOf(dt.getMonth()+1)); 545 sDay = (dt.getDate()<10 ? "0" + String.valueOf(dt.getDate()) : String.valueOf(dt.getDate())); 546 547 str += String.valueOf(dt.getYear()+1900) + "-" + sMonth + "-" + sDay + " "; 548 549 sHour = (dt.getHours()<10 ? "0" + String.valueOf(dt.getHours()) : String.valueOf(dt.getHours())); 550 sMin = (dt.getMinutes()<10 ? "0" + String.valueOf(dt.getMinutes()) : String.valueOf(dt.getMinutes())); 551 sSec = (dt.getSeconds()<10 ? "0" + String.valueOf(dt.getSeconds()) : String.valueOf(dt.getSeconds())); 552 553 str += " " + sHour + ":" + sMin + ":" + sSec; 554 555 str = str.trim() + "'}"; 556 557 return str; 558 } 560 562 }
| Popular Tags
|