1 25 package org.ofbiz.service.job; 26 27 import java.util.*; 28 29 import org.ofbiz.service.config.ServiceConfigUtil; 30 import org.ofbiz.base.util.Debug; 31 32 40 public class JobPoller implements Runnable { 41 42 public static final String module = JobPoller.class.getName(); 43 44 public static final int MIN_THREADS = 1; 45 public static final int MAX_THREADS = 15; 46 public static final int MAX_JOBS = 3; 47 public static final int POLL_WAIT = 20000; 48 50 protected Thread thread = null; 51 protected LinkedList pool = null; 52 protected LinkedList run = null; 53 protected JobManager jm = null; 54 55 protected volatile boolean isRunning = false; 56 57 61 public JobPoller(JobManager jm) { 62 this.jm = jm; 63 this.run = new LinkedList(); 64 65 this.pool = createThreadPool(); 67 68 this.jm.reloadCrashedJobs(); 70 71 if (pollEnabled()) { 73 74 thread = new Thread (this, this.toString()); 76 thread.setDaemon(false); 77 78 this.isRunning = true; 80 thread.start(); 81 } 82 } 83 84 protected JobPoller() {} 85 86 public synchronized void run() { 87 if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread Running...", module); 88 try { 89 wait(30000); 91 } catch (InterruptedException e) { 92 } 93 while (isRunning) { 94 try { 95 Iterator poll = jm.poll(); 97 98 while (poll.hasNext()) { 99 Job job = (Job) poll.next(); 100 101 if (job.isValid()) 102 queueNow(job); 103 } 104 wait(pollWaitTime()); 105 } catch (InterruptedException e) { 106 Debug.logError(e, module); 107 stop(); 108 } 109 } 110 if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread ending...", module); 111 } 112 113 116 public JobManager getManager() { 117 return jm; 118 } 119 120 123 public void stop() { 124 isRunning = false; 125 destroyThreadPool(); 126 } 127 128 public List getPoolState() { 129 List stateList = new ArrayList(); 130 Iterator i = this.pool.iterator(); 131 while (i.hasNext()) { 132 JobInvoker invoker = (JobInvoker) i.next(); 133 Map stateMap = new HashMap(); 134 stateMap.put("threadName", invoker.getName()); 135 stateMap.put("jobName", invoker.getJobName()); 136 stateMap.put("serviceName", invoker.getServiceName()); 137 stateMap.put("runTime", new Long (invoker.getCurrentRuntime())); 138 stateMap.put("status", new Integer (invoker.getCurrentStatus())); 139 stateList.add(stateMap); 140 } 141 return stateList; 142 } 143 144 148 private void destroyThreadPool() { 149 Debug.logInfo("Destroying thread pool...", module); 150 Iterator it = pool.iterator(); 151 while (it.hasNext()) { 152 JobInvoker ji = (JobInvoker) it.next(); 153 ji.stop(); 154 } 155 pool.clear(); 156 } 157 158 public synchronized void killThread(String threadName) { 159 JobInvoker inv = findThread(threadName); 160 if (inv != null) { 161 inv.kill(); 162 this.pool.remove(inv); 163 } 164 } 165 166 private JobInvoker findThread(String threadName) { 167 Iterator i = this.pool.iterator(); 168 while (i.hasNext()) { 169 JobInvoker inv = (JobInvoker) i.next(); 170 if (threadName.equals(inv.getName())) { 171 return inv; 172 } 173 } 174 return null; 175 } 176 177 180 public synchronized Job next() { 181 if (run.size() > 0) 182 return (Job) run.removeFirst(); 183 return null; 184 } 185 186 189 public synchronized void queueNow(Job job) { 190 run.add(job); 191 if (Debug.verboseOn()) Debug.logVerbose("New run queue size: " + run.size(), module); 192 if (run.size() > pool.size() && pool.size() < maxThreads()) { 193 int calcSize = (run.size() / jobsPerThread()) - (pool.size()); 194 int addSize = calcSize > maxThreads() ? maxThreads() : calcSize; 195 196 for (int i = 0; i < addSize; i++) { 197 JobInvoker iv = new JobInvoker(this, invokerWaitTime()); 198 pool.add(iv); 199 } 200 } 201 } 202 203 207 public synchronized void removeThread(JobInvoker invoker) { 208 pool.remove(invoker); 209 invoker.stop(); 210 if (pool.size() < minThreads()) { 211 for (int i = 0; i < minThreads() - pool.size(); i++) { 212 JobInvoker iv = new JobInvoker(this, invokerWaitTime()); 213 pool.add(iv); 214 } 215 } 216 } 217 218 private LinkedList createThreadPool() { 220 LinkedList threadPool = new LinkedList(); 221 222 while (threadPool.size() < minThreads()) { 223 JobInvoker iv = new JobInvoker(this, invokerWaitTime()); 224 threadPool.add(iv); 225 } 226 227 return threadPool; 228 } 229 230 private int maxThreads() { 231 int max = MAX_THREADS; 232 233 try { 234 max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads")); 235 } catch (NumberFormatException nfe) { 236 Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); 237 } 238 return max; 239 } 240 241 private int minThreads() { 242 int min = MIN_THREADS; 243 244 try { 245 min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads")); 246 } catch (NumberFormatException nfe) { 247 Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); 248 } 249 return min; 250 } 251 252 private int jobsPerThread() { 253 int jobs = MAX_JOBS; 254 255 try { 256 jobs = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "jobs")); 257 } catch (NumberFormatException nfe) { 258 Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); 259 } 260 return jobs; 261 } 262 263 private int invokerWaitTime() { 264 int wait = JobInvoker.WAIT_TIME; 265 266 try { 267 wait = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "wait-millis")); 268 } catch (NumberFormatException nfe) { 269 Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); 270 } 271 return wait; 272 } 273 274 private int pollWaitTime() { 275 int poll = POLL_WAIT; 276 277 try { 278 poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis")); 279 } catch (NumberFormatException nfe) { 280 Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); 281 } 282 return poll; 283 } 284 285 private boolean pollEnabled() { 286 String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled"); 287 288 if (enabled.equalsIgnoreCase("false")) 289 return false; 290 291 if (jm.getDelegator() == null) { 293 Debug.logWarning("No delegator referenced; not starting job poller.", module); 294 return false; 295 } 296 297 return true; 298 } 299 } 300 301 | Popular Tags |