KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > ofbiz > service > job > JobManager


1 /*
2  * $Id: JobManager.java 5462 2005-08-05 18:35:48Z jonesde $
3  *
4  * Copyright (c) 2001, 2002 The Open For Business Project - www.ofbiz.org
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included
14  * in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
21  * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
22  * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  *
24  */

25 package org.ofbiz.service.job;
26
27 import java.io.IOException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.Date JavaDoc;
31 import java.util.Iterator JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Map JavaDoc;
34 import java.sql.Timestamp JavaDoc;
35
36 import org.ofbiz.base.util.Debug;
37 import org.ofbiz.base.util.UtilDateTime;
38 import org.ofbiz.base.util.UtilMisc;
39 import org.ofbiz.base.util.UtilProperties;
40 import org.ofbiz.base.util.UtilValidate;
41 import org.ofbiz.entity.GenericDelegator;
42 import org.ofbiz.entity.GenericEntityException;
43 import org.ofbiz.entity.GenericValue;
44 import org.ofbiz.entity.condition.EntityCondition;
45 import org.ofbiz.entity.condition.EntityConditionList;
46 import org.ofbiz.entity.condition.EntityExpr;
47 import org.ofbiz.entity.condition.EntityOperator;
48 import org.ofbiz.entity.serialize.SerializeException;
49 import org.ofbiz.entity.serialize.XmlSerializer;
50 import org.ofbiz.entity.transaction.GenericTransactionException;
51 import org.ofbiz.entity.transaction.TransactionUtil;
52 import org.ofbiz.service.DispatchContext;
53 import org.ofbiz.service.GenericDispatcher;
54 import org.ofbiz.service.GenericServiceException;
55 import org.ofbiz.service.LocalDispatcher;
56 import org.ofbiz.service.calendar.RecurrenceInfo;
57 import org.ofbiz.service.calendar.RecurrenceInfoException;
58 import org.ofbiz.service.config.ServiceConfigUtil;
59
60 /**
61  * JobManager
62  *
63  * @author <a HREF="mailto:jaz@ofbiz.org">Andy Zeneski</a>
64  * @version $Rev: 5462 $
65  * @since 2.0
66  */

67 public class JobManager {
68
69     public static final String JavaDoc instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
70     public static final Map JavaDoc updateFields = UtilMisc.toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
71     public static final String JavaDoc module = JobManager.class.getName();
72     public static final String JavaDoc dispatcherName = "JobDispatcher";
73
74     protected GenericDelegator delegator;
75     protected JobPoller jp;
76
77     /** Creates a new JobManager object. */
78     public JobManager(GenericDelegator delegator) {
79         this.delegator = delegator;
80         jp = new JobPoller(this);
81     }
82
83     /** Queues a Job to run now. */
84     public void runJob(Job job) throws JobManagerException {
85         if (job.isValid())
86             jp.queueNow(job);
87     }
88
89     /** Returns the ServiceDispatcher. */
90     public LocalDispatcher getDispatcher() {
91         LocalDispatcher thisDispatcher = null;
92         try {
93             thisDispatcher = GenericDispatcher.getLocalDispatcher(dispatcherName, delegator);
94         } catch (GenericServiceException e) {
95             Debug.logError(e, module);
96         }
97         return thisDispatcher;
98     }
99
100     /** Returns the GenericDelegator. */
101     public GenericDelegator getDelegator() {
102         return this.delegator;
103     }
104
105     public synchronized Iterator JavaDoc poll() {
106         List JavaDoc poll = new ArrayList JavaDoc();
107         Collection JavaDoc jobEnt = null;
108
109         // sort the results by time
110
List JavaDoc order = UtilMisc.toList("runTime");
111
112         // basic query
113
List JavaDoc expressions = UtilMisc.toList(new EntityExpr("runTime", EntityOperator.LESS_THAN_EQUAL_TO,
114                 UtilDateTime.nowTimestamp()), new EntityExpr("startDateTime", EntityOperator.EQUALS, null),
115                 new EntityExpr("cancelDateTime", EntityOperator.EQUALS, null),
116                 new EntityExpr("runByInstanceId", EntityOperator.EQUALS, null));
117
118         // limit to just defined pools
119
List JavaDoc pools = ServiceConfigUtil.getRunPools();
120         List JavaDoc poolsExpr = UtilMisc.toList(new EntityExpr("poolId", EntityOperator.EQUALS, null));
121         if (pools != null) {
122             Iterator JavaDoc poolsIter = pools.iterator();
123             while (poolsIter.hasNext()) {
124                 String JavaDoc poolName = (String JavaDoc) poolsIter.next();
125                 poolsExpr.add(new EntityExpr("poolId", EntityOperator.EQUALS, poolName));
126             }
127         }
128
129         // make the conditions
130
EntityCondition baseCondition = new EntityConditionList(expressions, EntityOperator.AND);
131         EntityCondition poolCondition = new EntityConditionList(poolsExpr, EntityOperator.OR);
132         EntityCondition mainCondition = new EntityConditionList(UtilMisc.toList(baseCondition, poolCondition), EntityOperator.AND);
133
134         // we will loop until we have no more to do
135
boolean pollDone = false;
136
137         while (!pollDone) {
138             boolean beganTransaction;
139             try {
140                 beganTransaction = TransactionUtil.begin();
141             } catch (GenericTransactionException e) {
142                 Debug.logError(e, "Unable to start transaction; not polling for jobs", module);
143                 return null;
144             }
145             if (!beganTransaction) {
146                 Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
147                 return null;
148             }
149
150             try {
151                 // first update the jobs w/ this instance running information
152
delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
153
154                 // now query all the 'queued' jobs for this instance
155
jobEnt = delegator.findByAnd("JobSandbox", updateFields, order);
156                 //jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order);
157
} catch (GenericEntityException ee) {
158                 Debug.logError(ee, "Cannot load jobs from datasource.", module);
159             } catch (Exception JavaDoc e) {
160                 Debug.logError(e, "Unknown error.", module);
161             }
162
163             if (jobEnt != null && jobEnt.size() > 0) {
164                 Iterator JavaDoc i = jobEnt.iterator();
165
166                 while (i.hasNext()) {
167                     GenericValue v = (GenericValue) i.next();
168                     DispatchContext dctx = getDispatcher().getDispatchContext();
169
170                     if (dctx == null) {
171                         Debug.logError("Unable to locate DispatchContext object; not running job!", module);
172                         continue;
173                     }
174                     Job job = new PersistedServiceJob(dctx, v, null); // todo fix the requester
175
try {
176                         job.queue();
177                         poll.add(job);
178                     } catch (InvalidJobException e) {
179                         Debug.logError(e, module);
180                     }
181                 }
182             } else {
183                 pollDone = true;
184             }
185
186             // finished this run; commit the transaction
187
try {
188                 TransactionUtil.commit(beganTransaction);
189             } catch (GenericTransactionException e) {
190                 Debug.logError(e, module);
191             }
192
193         }
194         return poll.iterator();
195     }
196
197     public synchronized void reloadCrashedJobs() {
198         String JavaDoc instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
199         List JavaDoc toStore = new ArrayList JavaDoc();
200         List JavaDoc crashed = null;
201
202         List JavaDoc exprs = UtilMisc.toList(new EntityExpr("finishDateTime", EntityOperator.EQUALS, null));
203         exprs.add(new EntityExpr("cancelDateTime", EntityOperator.EQUALS, null));
204         exprs.add(new EntityExpr("runByInstanceId", EntityOperator.EQUALS, instanceId));
205         try {
206             crashed = delegator.findByAnd("JobSandbox", exprs, UtilMisc.toList("startDateTime"));
207         } catch (GenericEntityException e) {
208             Debug.logError(e, "Unable to load crashed jobs", module);
209         }
210
211         if (crashed != null && crashed.size() > 0) {
212             Iterator JavaDoc i = crashed.iterator();
213             while (i.hasNext()) {
214                 GenericValue job = (GenericValue) i.next();
215                 long runtime = job.getTimestamp("runTime").getTime();
216                 RecurrenceInfo ri = JobManager.getRecurrenceInfo(job);
217                 if (ri != null) {
218                     long next = ri.next();
219                     if (next <= runtime) {
220                         Timestamp JavaDoc now = UtilDateTime.nowTimestamp();
221                         // only re-schedule if there is no new recurrences since last run
222
Debug.log("Scheduling Job : " + job, module);
223
224                         String JavaDoc newJobId = job.getDelegator().getNextSeqId("JobSandbox").toString();
225                         String JavaDoc pJobId = job.getString("parentJobId");
226                         if (pJobId == null) {
227                             pJobId = job.getString("jobId");
228                         }
229                         GenericValue newJob = GenericValue.create(job);
230                         newJob.set("statusId", "SERVICE_PENDING");
231                         newJob.set("runTime", now);
232                         newJob.set("jobId", newJobId);
233                         newJob.set("previousJobId", job.getString("jobId"));
234                         newJob.set("parentJobId", pJobId);
235                         newJob.set("startDateTime", null);
236                         newJob.set("runByInstanceId", null);
237                         toStore.add(newJob);
238
239                         // set the cancel time on the old job to the same as the re-schedule time
240
job.set("statusId", "SERVICE_CRASHED");
241                         job.set("cancelDateTime", now);
242                         toStore.add(job);
243                     }
244                 }
245             }
246
247             if (toStore.size() > 0) {
248                 try {
249                     delegator.storeAll(toStore);
250                 } catch (GenericEntityException e) {
251                     Debug.logError(e, module);
252                 }
253                 if (Debug.infoOn()) Debug.logInfo("-- " + toStore.size() + " jobs re-scheduled", module);
254             }
255
256         } else {
257             if (Debug.infoOn()) Debug.logInfo("No crashed jobs to re-schedule", module);
258         }
259     }
260
261     /**
262      * Schedule a job to start at a specific time with specific recurrence info
263      *@param serviceName The name of the service to invoke
264      *@param context The context for the service
265      *@param startTime The time in milliseconds the service should run
266      *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
267      *@param interval The interval of the frequency recurrence
268      *@param count The number of times to repeat
269      */

270     public void schedule(String JavaDoc serviceName, Map JavaDoc context, long startTime, int frequency, int interval, int count) throws JobManagerException {
271         schedule(serviceName, context, startTime, frequency, interval, count, 0);
272     }
273
274     /**
275      * Schedule a job to start at a specific time with specific recurrence info
276      *@param serviceName The name of the service to invoke
277      *@param context The context for the service
278      *@param startTime The time in milliseconds the service should run
279      *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
280      *@param interval The interval of the frequency recurrence
281      *@param endTime The time in milliseconds the service should expire
282      */

283     public void schedule(String JavaDoc serviceName, Map JavaDoc context, long startTime, int frequency, int interval, long endTime) throws JobManagerException {
284         schedule(serviceName, context, startTime, frequency, interval, -1, endTime);
285     }
286
287     /**
288      * Schedule a job to start at a specific time with specific recurrence info
289      *@param serviceName The name of the service to invoke
290      *@param context The context for the service
291      *@param startTime The time in milliseconds the service should run
292      *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
293      *@param interval The interval of the frequency recurrence
294      *@param count The number of times to repeat
295      *@param endTime The time in milliseconds the service should expire
296      */

297     public void schedule(String JavaDoc serviceName, Map JavaDoc context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException {
298         schedule(null, serviceName, context, startTime, frequency, interval, count, endTime);
299     }
300
301     /**
302      * Schedule a job to start at a specific time with specific recurrence info
303      *@param poolName The name of the pool to run the service from
304      *@param serviceName The name of the service to invoke
305      *@param context The context for the service
306      *@param startTime The time in milliseconds the service should run
307      *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
308      *@param interval The interval of the frequency recurrence
309      *@param count The number of times to repeat
310      *@param endTime The time in milliseconds the service should expire
311      */

312     public void schedule(String JavaDoc poolName, String JavaDoc serviceName, Map JavaDoc context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException {
313         schedule(null, serviceName, context, startTime, frequency, interval, count, endTime, -1);
314     }
315
316     /**
317      * Schedule a job to start at a specific time with specific recurrence info
318      *@param poolName The name of the pool to run the service from
319      *@param serviceName The name of the service to invoke
320      *@param context The context for the service
321      *@param startTime The time in milliseconds the service should run
322      *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
323      *@param interval The interval of the frequency recurrence
324      *@param count The number of times to repeat
325      *@param endTime The time in milliseconds the service should expire
326      *@param maxRetry The max number of retries on failure (-1 for no max)
327      */

328     public void schedule(String JavaDoc poolName, String JavaDoc serviceName, Map JavaDoc context, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException {
329         if (delegator == null) {
330             Debug.logWarning("No delegator referenced; cannot schedule job.", module);
331             return;
332         }
333
334         // persist the context
335
String JavaDoc dataId = null;
336         try {
337             dataId = delegator.getNextSeqId("RuntimeData").toString();
338             GenericValue runtimeData = delegator.makeValue("RuntimeData", UtilMisc.toMap("runtimeDataId", dataId));
339
340             runtimeData.set("runtimeInfo", XmlSerializer.serialize(context));
341             delegator.create(runtimeData);
342         } catch (GenericEntityException ee) {
343             throw new JobManagerException(ee.getMessage(), ee);
344         } catch (SerializeException se) {
345             throw new JobManagerException(se.getMessage(), se);
346         } catch (IOException JavaDoc ioe) {
347             throw new JobManagerException(ioe.getMessage(), ioe);
348         }
349
350         // schedule the job
351
schedule(poolName, serviceName, dataId, startTime, frequency, interval, count, endTime, maxRetry);
352     }
353
354     /**
355      * Schedule a job to start at a specific time with specific recurrence info
356      *@param poolName The name of the pool to run the service from
357      *@param serviceName The name of the service to invoke
358      *@param dataId The persisted context (RuntimeData.runtimeDataId)
359      *@param startTime The time in milliseconds the service should run
360      */

361     public void schedule(String JavaDoc poolName, String JavaDoc serviceName, String JavaDoc dataId, long startTime) throws JobManagerException {
362         schedule(poolName, serviceName, dataId, startTime, -1, 0, 1, 0, -1);
363     }
364
365     /**
366      * Schedule a job to start at a specific time with specific recurrence info
367      *@param poolName The name of the pool to run the service from
368      *@param serviceName The name of the service to invoke
369      *@param dataId The persisted context (RuntimeData.runtimeDataId)
370      *@param startTime The time in milliseconds the service should run
371      *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
372      *@param interval The interval of the frequency recurrence
373      *@param count The number of times to repeat
374      *@param endTime The time in milliseconds the service should expire
375      *@param maxRetry The max number of retries on failure (-1 for no max)
376      */

377     public void schedule(String JavaDoc poolName, String JavaDoc serviceName, String JavaDoc dataId, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException {
378         if (delegator == null) {
379             Debug.logWarning("No delegator referenced; cannot schedule job.", module);
380             return;
381         }
382
383         // create the recurrence
384
String JavaDoc infoId = null;
385         if (frequency > -1 && count != 0) {
386             try {
387                 RecurrenceInfo info = RecurrenceInfo.makeInfo(delegator, startTime, frequency, interval, count);
388                 infoId = info.primaryKey();
389             } catch (RecurrenceInfoException e) {
390                 throw new JobManagerException(e.getMessage(), e);
391             }
392         }
393
394         // set the persisted fields
395
String JavaDoc jobName = new String JavaDoc(new Long JavaDoc((new Date JavaDoc().getTime())).toString());
396         String JavaDoc jobId = delegator.getNextSeqId("JobSandbox").toString();
397         Map JavaDoc jFields = UtilMisc.toMap("jobId", jobId, "jobName", jobName, "runTime", new java.sql.Timestamp JavaDoc(startTime),
398                 "serviceName", serviceName, "recurrenceInfoId", infoId, "runtimeDataId", dataId);
399
400         // set the pool ID
401
if (poolName != null && poolName.length() > 0) {
402             jFields.put("poolId", poolName);
403         } else {
404             jFields.put("poolId", ServiceConfigUtil.getSendPool());
405         }
406
407         // set the loader name
408
jFields.put("loaderName", dispatcherName);
409
410         // set the max retry
411
jFields.put("maxRetry", new Long JavaDoc(maxRetry));
412
413         // create the value and store
414
GenericValue jobV = null;
415         try {
416             jobV = delegator.makeValue("JobSandbox", jFields);
417             delegator.create(jobV);
418         } catch (GenericEntityException e) {
419             throw new JobManagerException(e.getMessage(), e);
420         }
421     }
422
423     /**
424      * Kill a JobInvoker Thread.
425      * @param threadName Name of the JobInvoker Thread to kill.
426      */

427     public void killThread(String JavaDoc threadName) {
428         jp.killThread(threadName);
429     }
430
431     /**
432      * Get a List of each threads current state.
433      * @return List containing a Map of each thread's state.
434      */

435     public List JavaDoc processList() {
436         return jp.getPoolState();
437     }
438
439     /** Close out the scheduler thread. */
440     public void finalize() {
441         if (jp != null) {
442             jp.stop();
443             jp = null;
444             Debug.logInfo("JobManager: Stopped Scheduler Thread.", module);
445         }
446     }
447
448     /** gets the recurrence info object for a job. */
449     public static RecurrenceInfo getRecurrenceInfo(GenericValue job) {
450         try {
451             if (job != null && !UtilValidate.isEmpty(job.getString("recurrenceInfoId"))) {
452                 if (job.get("cancelDateTime") != null) {
453                     // cancel has been flagged, no more recurrence
454
return null;
455                 }
456                 GenericValue ri = job.getRelatedOne("RecurrenceInfo");
457
458                 if (ri != null) {
459                     return new RecurrenceInfo(ri);
460                 } else {
461                     return null;
462                 }
463             } else {
464                 return null;
465             }
466         } catch (GenericEntityException e) {
467             e.printStackTrace();
468             Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module);
469         } catch (RecurrenceInfoException re) {
470             re.printStackTrace();
471             Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module);
472         }
473         return null;
474     }
475
476 }
477
Popular Tags