KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > ofbiz > service > job > JobPoller


1 /*
2  * $Id: JobPoller.java 5462 2005-08-05 18:35:48Z jonesde $
3  *
4  * Copyright (c) 2002 The Open For Business Project - www.ofbiz.org
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included
14  * in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
21  * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
22  * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  *
24  */

25 package org.ofbiz.service.job;
26
27 import java.util.*;
28
29 import org.ofbiz.service.config.ServiceConfigUtil;
30 import org.ofbiz.base.util.Debug;
31
32 /**
33  * JobPoller - Polls for persisted jobs to run.
34  *
35  * @author <a HREF="mailto:jaz@ofbiz.org">Andy Zeneski</a>
36  * @author <a HREF="mailto:">Magnus Rosenquist</a>
37  * @version $Rev: 5462 $
38  * @since 2.0
39  */

40 public class JobPoller implements Runnable JavaDoc {
41
42     public static final String JavaDoc module = JobPoller.class.getName();
43
44     public static final int MIN_THREADS = 1;
45     public static final int MAX_THREADS = 15;
46     public static final int MAX_JOBS = 3;
47     public static final int POLL_WAIT = 20000;
48     //public static final long MAX_TTL = 18000000;
49

50     protected Thread JavaDoc thread = null;
51     protected LinkedList pool = null;
52     protected LinkedList run = null;
53     protected JobManager jm = null;
54
55     protected volatile boolean isRunning = false;
56
57     /**
58      * Creates a new JobScheduler
59      * @param jm JobManager associated with this scheduler
60      */

61     public JobPoller(JobManager jm) {
62         this.jm = jm;
63         this.run = new LinkedList();
64
65         // create the thread pool
66
this.pool = createThreadPool();
67
68         // re-load crashed jobs
69
this.jm.reloadCrashedJobs();
70
71         // start the thread only if polling is enabled
72
if (pollEnabled()) {
73
74             // create the poller thread
75
thread = new Thread JavaDoc(this, this.toString());
76             thread.setDaemon(false);
77
78             // start the poller
79
this.isRunning = true;
80             thread.start();
81         }
82     }
83
84     protected JobPoller() {}
85
86     public synchronized void run() {
87         if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread Running...", module);
88         try {
89             // wait 30 seconds before the first poll
90
wait(30000);
91         } catch (InterruptedException JavaDoc e) {
92         }
93         while (isRunning) {
94             try {
95                 // grab a list of jobs to run.
96
Iterator poll = jm.poll();
97
98                 while (poll.hasNext()) {
99                     Job job = (Job) poll.next();
100
101                     if (job.isValid())
102                         queueNow(job);
103                 }
104                 wait(pollWaitTime());
105             } catch (InterruptedException JavaDoc e) {
106                 Debug.logError(e, module);
107                 stop();
108             }
109         }
110         if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread ending...", module);
111     }
112
113     /**
114      * Returns the JobManager
115      */

116     public JobManager getManager() {
117         return jm;
118     }
119
120     /**
121      * Stops the JobPoller
122      */

123     public void stop() {
124         isRunning = false;
125         destroyThreadPool();
126     }
127
128     public List getPoolState() {
129         List stateList = new ArrayList();
130         Iterator i = this.pool.iterator();
131         while (i.hasNext()) {
132             JobInvoker invoker = (JobInvoker) i.next();
133             Map stateMap = new HashMap();
134             stateMap.put("threadName", invoker.getName());
135             stateMap.put("jobName", invoker.getJobName());
136             stateMap.put("serviceName", invoker.getServiceName());
137             stateMap.put("runTime", new Long JavaDoc(invoker.getCurrentRuntime()));
138             stateMap.put("status", new Integer JavaDoc(invoker.getCurrentStatus()));
139             stateList.add(stateMap);
140         }
141         return stateList;
142     }
143
144     /**
145      * Stops all threads in the threadPool and clears
146      * the pool as final step.
147      */

148     private void destroyThreadPool() {
149         Debug.logInfo("Destroying thread pool...", module);
150         Iterator it = pool.iterator();
151         while (it.hasNext()) {
152             JobInvoker ji = (JobInvoker) it.next();
153             ji.stop();
154         }
155         pool.clear();
156     }
157
158     public synchronized void killThread(String JavaDoc threadName) {
159         JobInvoker inv = findThread(threadName);
160         if (inv != null) {
161             inv.kill();
162             this.pool.remove(inv);
163         }
164     }
165
166     private JobInvoker findThread(String JavaDoc threadName) {
167         Iterator i = this.pool.iterator();
168         while (i.hasNext()) {
169             JobInvoker inv = (JobInvoker) i.next();
170             if (threadName.equals(inv.getName())) {
171                 return inv;
172             }
173         }
174         return null;
175     }
176
177     /**
178      * Returns the next job to run
179      */

180     public synchronized Job next() {
181         if (run.size() > 0)
182             return (Job) run.removeFirst();
183         return null;
184     }
185
186     /**
187      * Adds a job to the RUN queue
188      */

189     public synchronized void queueNow(Job job) {
190         run.add(job);
191         if (Debug.verboseOn()) Debug.logVerbose("New run queue size: " + run.size(), module);
192         if (run.size() > pool.size() && pool.size() < maxThreads()) {
193             int calcSize = (run.size() / jobsPerThread()) - (pool.size());
194             int addSize = calcSize > maxThreads() ? maxThreads() : calcSize;
195
196             for (int i = 0; i < addSize; i++) {
197                 JobInvoker iv = new JobInvoker(this, invokerWaitTime());
198                 pool.add(iv);
199             }
200         }
201     }
202
203     /**
204      * Removes a thread from the pool.
205      * @param invoker The invoker to remove.
206      */

207     public synchronized void removeThread(JobInvoker invoker) {
208         pool.remove(invoker);
209         invoker.stop();
210         if (pool.size() < minThreads()) {
211             for (int i = 0; i < minThreads() - pool.size(); i++) {
212                 JobInvoker iv = new JobInvoker(this, invokerWaitTime());
213                 pool.add(iv);
214             }
215         }
216     }
217
218     // Creates the invoker pool
219
private LinkedList createThreadPool() {
220         LinkedList threadPool = new LinkedList();
221
222         while (threadPool.size() < minThreads()) {
223             JobInvoker iv = new JobInvoker(this, invokerWaitTime());
224             threadPool.add(iv);
225         }
226
227         return threadPool;
228     }
229
230     private int maxThreads() {
231         int max = MAX_THREADS;
232
233         try {
234             max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"));
235         } catch (NumberFormatException JavaDoc nfe) {
236             Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
237         }
238         return max;
239     }
240
241     private int minThreads() {
242         int min = MIN_THREADS;
243
244         try {
245             min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"));
246         } catch (NumberFormatException JavaDoc nfe) {
247             Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
248         }
249         return min;
250     }
251
252     private int jobsPerThread() {
253         int jobs = MAX_JOBS;
254
255         try {
256             jobs = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "jobs"));
257         } catch (NumberFormatException JavaDoc nfe) {
258             Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
259         }
260         return jobs;
261     }
262
263     private int invokerWaitTime() {
264         int wait = JobInvoker.WAIT_TIME;
265
266         try {
267             wait = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "wait-millis"));
268         } catch (NumberFormatException JavaDoc nfe) {
269             Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
270         }
271         return wait;
272     }
273
274     private int pollWaitTime() {
275         int poll = POLL_WAIT;
276
277         try {
278             poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"));
279         } catch (NumberFormatException JavaDoc nfe) {
280             Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
281         }
282         return poll;
283     }
284
285     private boolean pollEnabled() {
286         String JavaDoc enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
287
288         if (enabled.equalsIgnoreCase("false"))
289             return false;
290
291         // also make sure we have a delegator to use for polling
292
if (jm.getDelegator() == null) {
293             Debug.logWarning("No delegator referenced; not starting job poller.", module);
294             return false;
295         }
296
297         return true;
298     }
299 }
300
301
Popular Tags