1 25 package org.ofbiz.service.job; 26 27 import java.io.IOException ; 28 import java.util.ArrayList ; 29 import java.util.Collection ; 30 import java.util.Date ; 31 import java.util.Iterator ; 32 import java.util.List ; 33 import java.util.Map ; 34 import java.sql.Timestamp ; 35 36 import org.ofbiz.base.util.Debug; 37 import org.ofbiz.base.util.UtilDateTime; 38 import org.ofbiz.base.util.UtilMisc; 39 import org.ofbiz.base.util.UtilProperties; 40 import org.ofbiz.base.util.UtilValidate; 41 import org.ofbiz.entity.GenericDelegator; 42 import org.ofbiz.entity.GenericEntityException; 43 import org.ofbiz.entity.GenericValue; 44 import org.ofbiz.entity.condition.EntityCondition; 45 import org.ofbiz.entity.condition.EntityConditionList; 46 import org.ofbiz.entity.condition.EntityExpr; 47 import org.ofbiz.entity.condition.EntityOperator; 48 import org.ofbiz.entity.serialize.SerializeException; 49 import org.ofbiz.entity.serialize.XmlSerializer; 50 import org.ofbiz.entity.transaction.GenericTransactionException; 51 import org.ofbiz.entity.transaction.TransactionUtil; 52 import org.ofbiz.service.DispatchContext; 53 import org.ofbiz.service.GenericDispatcher; 54 import org.ofbiz.service.GenericServiceException; 55 import org.ofbiz.service.LocalDispatcher; 56 import org.ofbiz.service.calendar.RecurrenceInfo; 57 import org.ofbiz.service.calendar.RecurrenceInfoException; 58 import org.ofbiz.service.config.ServiceConfigUtil; 59 60 67 public class JobManager { 68 69 public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); 70 public static final Map updateFields = UtilMisc.toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); 71 public static final String module = JobManager.class.getName(); 72 public static final String dispatcherName = "JobDispatcher"; 73 74 protected GenericDelegator delegator; 75 protected JobPoller jp; 76 77 78 public JobManager(GenericDelegator delegator) { 79 this.delegator = delegator; 80 jp = new JobPoller(this); 81 } 82 83 84 public void runJob(Job job) throws JobManagerException { 85 if (job.isValid()) 86 jp.queueNow(job); 87 } 88 89 90 public LocalDispatcher getDispatcher() { 91 LocalDispatcher thisDispatcher = null; 92 try { 93 thisDispatcher = GenericDispatcher.getLocalDispatcher(dispatcherName, delegator); 94 } catch (GenericServiceException e) { 95 Debug.logError(e, module); 96 } 97 return thisDispatcher; 98 } 99 100 101 public GenericDelegator getDelegator() { 102 return this.delegator; 103 } 104 105 public synchronized Iterator poll() { 106 List poll = new ArrayList (); 107 Collection jobEnt = null; 108 109 List order = UtilMisc.toList("runTime"); 111 112 List expressions = UtilMisc.toList(new EntityExpr("runTime", EntityOperator.LESS_THAN_EQUAL_TO, 114 UtilDateTime.nowTimestamp()), new EntityExpr("startDateTime", EntityOperator.EQUALS, null), 115 new EntityExpr("cancelDateTime", EntityOperator.EQUALS, null), 116 new EntityExpr("runByInstanceId", EntityOperator.EQUALS, null)); 117 118 List pools = ServiceConfigUtil.getRunPools(); 120 List poolsExpr = UtilMisc.toList(new EntityExpr("poolId", EntityOperator.EQUALS, null)); 121 if (pools != null) { 122 Iterator poolsIter = pools.iterator(); 123 while (poolsIter.hasNext()) { 124 String poolName = (String ) poolsIter.next(); 125 poolsExpr.add(new EntityExpr("poolId", EntityOperator.EQUALS, poolName)); 126 } 127 } 128 129 EntityCondition baseCondition = new EntityConditionList(expressions, EntityOperator.AND); 131 EntityCondition poolCondition = new EntityConditionList(poolsExpr, EntityOperator.OR); 132 EntityCondition mainCondition = new EntityConditionList(UtilMisc.toList(baseCondition, poolCondition), EntityOperator.AND); 133 134 boolean pollDone = false; 136 137 while (!pollDone) { 138 boolean beganTransaction; 139 try { 140 beganTransaction = TransactionUtil.begin(); 141 } catch (GenericTransactionException e) { 142 Debug.logError(e, "Unable to start transaction; not polling for jobs", module); 143 return null; 144 } 145 if (!beganTransaction) { 146 Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); 147 return null; 148 } 149 150 try { 151 delegator.storeByCondition("JobSandbox", updateFields, mainCondition); 153 154 jobEnt = delegator.findByAnd("JobSandbox", updateFields, order); 156 } catch (GenericEntityException ee) { 158 Debug.logError(ee, "Cannot load jobs from datasource.", module); 159 } catch (Exception e) { 160 Debug.logError(e, "Unknown error.", module); 161 } 162 163 if (jobEnt != null && jobEnt.size() > 0) { 164 Iterator i = jobEnt.iterator(); 165 166 while (i.hasNext()) { 167 GenericValue v = (GenericValue) i.next(); 168 DispatchContext dctx = getDispatcher().getDispatchContext(); 169 170 if (dctx == null) { 171 Debug.logError("Unable to locate DispatchContext object; not running job!", module); 172 continue; 173 } 174 Job job = new PersistedServiceJob(dctx, v, null); try { 176 job.queue(); 177 poll.add(job); 178 } catch (InvalidJobException e) { 179 Debug.logError(e, module); 180 } 181 } 182 } else { 183 pollDone = true; 184 } 185 186 try { 188 TransactionUtil.commit(beganTransaction); 189 } catch (GenericTransactionException e) { 190 Debug.logError(e, module); 191 } 192 193 } 194 return poll.iterator(); 195 } 196 197 public synchronized void reloadCrashedJobs() { 198 String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); 199 List toStore = new ArrayList (); 200 List crashed = null; 201 202 List exprs = UtilMisc.toList(new EntityExpr("finishDateTime", EntityOperator.EQUALS, null)); 203 exprs.add(new EntityExpr("cancelDateTime", EntityOperator.EQUALS, null)); 204 exprs.add(new EntityExpr("runByInstanceId", EntityOperator.EQUALS, instanceId)); 205 try { 206 crashed = delegator.findByAnd("JobSandbox", exprs, UtilMisc.toList("startDateTime")); 207 } catch (GenericEntityException e) { 208 Debug.logError(e, "Unable to load crashed jobs", module); 209 } 210 211 if (crashed != null && crashed.size() > 0) { 212 Iterator i = crashed.iterator(); 213 while (i.hasNext()) { 214 GenericValue job = (GenericValue) i.next(); 215 long runtime = job.getTimestamp("runTime").getTime(); 216 RecurrenceInfo ri = JobManager.getRecurrenceInfo(job); 217 if (ri != null) { 218 long next = ri.next(); 219 if (next <= runtime) { 220 Timestamp now = UtilDateTime.nowTimestamp(); 221 Debug.log("Scheduling Job : " + job, module); 223 224 String newJobId = job.getDelegator().getNextSeqId("JobSandbox").toString(); 225 String pJobId = job.getString("parentJobId"); 226 if (pJobId == null) { 227 pJobId = job.getString("jobId"); 228 } 229 GenericValue newJob = GenericValue.create(job); 230 newJob.set("statusId", "SERVICE_PENDING"); 231 newJob.set("runTime", now); 232 newJob.set("jobId", newJobId); 233 newJob.set("previousJobId", job.getString("jobId")); 234 newJob.set("parentJobId", pJobId); 235 newJob.set("startDateTime", null); 236 newJob.set("runByInstanceId", null); 237 toStore.add(newJob); 238 239 job.set("statusId", "SERVICE_CRASHED"); 241 job.set("cancelDateTime", now); 242 toStore.add(job); 243 } 244 } 245 } 246 247 if (toStore.size() > 0) { 248 try { 249 delegator.storeAll(toStore); 250 } catch (GenericEntityException e) { 251 Debug.logError(e, module); 252 } 253 if (Debug.infoOn()) Debug.logInfo("-- " + toStore.size() + " jobs re-scheduled", module); 254 } 255 256 } else { 257 if (Debug.infoOn()) Debug.logInfo("No crashed jobs to re-schedule", module); 258 } 259 } 260 261 270 public void schedule(String serviceName, Map context, long startTime, int frequency, int interval, int count) throws JobManagerException { 271 schedule(serviceName, context, startTime, frequency, interval, count, 0); 272 } 273 274 283 public void schedule(String serviceName, Map context, long startTime, int frequency, int interval, long endTime) throws JobManagerException { 284 schedule(serviceName, context, startTime, frequency, interval, -1, endTime); 285 } 286 287 297 public void schedule(String serviceName, Map context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException { 298 schedule(null, serviceName, context, startTime, frequency, interval, count, endTime); 299 } 300 301 312 public void schedule(String poolName, String serviceName, Map context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException { 313 schedule(null, serviceName, context, startTime, frequency, interval, count, endTime, -1); 314 } 315 316 328 public void schedule(String poolName, String serviceName, Map context, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { 329 if (delegator == null) { 330 Debug.logWarning("No delegator referenced; cannot schedule job.", module); 331 return; 332 } 333 334 String dataId = null; 336 try { 337 dataId = delegator.getNextSeqId("RuntimeData").toString(); 338 GenericValue runtimeData = delegator.makeValue("RuntimeData", UtilMisc.toMap("runtimeDataId", dataId)); 339 340 runtimeData.set("runtimeInfo", XmlSerializer.serialize(context)); 341 delegator.create(runtimeData); 342 } catch (GenericEntityException ee) { 343 throw new JobManagerException(ee.getMessage(), ee); 344 } catch (SerializeException se) { 345 throw new JobManagerException(se.getMessage(), se); 346 } catch (IOException ioe) { 347 throw new JobManagerException(ioe.getMessage(), ioe); 348 } 349 350 schedule(poolName, serviceName, dataId, startTime, frequency, interval, count, endTime, maxRetry); 352 } 353 354 361 public void schedule(String poolName, String serviceName, String dataId, long startTime) throws JobManagerException { 362 schedule(poolName, serviceName, dataId, startTime, -1, 0, 1, 0, -1); 363 } 364 365 377 public void schedule(String poolName, String serviceName, String dataId, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { 378 if (delegator == null) { 379 Debug.logWarning("No delegator referenced; cannot schedule job.", module); 380 return; 381 } 382 383 String infoId = null; 385 if (frequency > -1 && count != 0) { 386 try { 387 RecurrenceInfo info = RecurrenceInfo.makeInfo(delegator, startTime, frequency, interval, count); 388 infoId = info.primaryKey(); 389 } catch (RecurrenceInfoException e) { 390 throw new JobManagerException(e.getMessage(), e); 391 } 392 } 393 394 String jobName = new String (new Long ((new Date ().getTime())).toString()); 396 String jobId = delegator.getNextSeqId("JobSandbox").toString(); 397 Map jFields = UtilMisc.toMap("jobId", jobId, "jobName", jobName, "runTime", new java.sql.Timestamp (startTime), 398 "serviceName", serviceName, "recurrenceInfoId", infoId, "runtimeDataId", dataId); 399 400 if (poolName != null && poolName.length() > 0) { 402 jFields.put("poolId", poolName); 403 } else { 404 jFields.put("poolId", ServiceConfigUtil.getSendPool()); 405 } 406 407 jFields.put("loaderName", dispatcherName); 409 410 jFields.put("maxRetry", new Long (maxRetry)); 412 413 GenericValue jobV = null; 415 try { 416 jobV = delegator.makeValue("JobSandbox", jFields); 417 delegator.create(jobV); 418 } catch (GenericEntityException e) { 419 throw new JobManagerException(e.getMessage(), e); 420 } 421 } 422 423 427 public void killThread(String threadName) { 428 jp.killThread(threadName); 429 } 430 431 435 public List processList() { 436 return jp.getPoolState(); 437 } 438 439 440 public void finalize() { 441 if (jp != null) { 442 jp.stop(); 443 jp = null; 444 Debug.logInfo("JobManager: Stopped Scheduler Thread.", module); 445 } 446 } 447 448 449 public static RecurrenceInfo getRecurrenceInfo(GenericValue job) { 450 try { 451 if (job != null && !UtilValidate.isEmpty(job.getString("recurrenceInfoId"))) { 452 if (job.get("cancelDateTime") != null) { 453 return null; 455 } 456 GenericValue ri = job.getRelatedOne("RecurrenceInfo"); 457 458 if (ri != null) { 459 return new RecurrenceInfo(ri); 460 } else { 461 return null; 462 } 463 } else { 464 return null; 465 } 466 } catch (GenericEntityException e) { 467 e.printStackTrace(); 468 Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module); 469 } catch (RecurrenceInfoException re) { 470 re.printStackTrace(); 471 Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module); 472 } 473 return null; 474 } 475 476 } 477 | Popular Tags |