1 32 33 package com.knowgate.scheduler; 34 35 import java.sql.SQLException ; 36 37 import java.util.Properties ; 38 import java.util.LinkedList ; 39 import java.util.ListIterator ; 40 import java.util.HashMap ; 41 import java.util.Iterator ; 42 43 import com.knowgate.dataobjs.DB; 44 import com.knowgate.debug.DebugFile; 45 import com.knowgate.jdc.JDCConnection; 46 47 52 53 public class WorkerThreadPool { 54 55 private WorkerThread aThreads[]; 56 private long aStartTime[]; 57 private Properties oEnvProps; 58 59 61 70 public WorkerThreadPool(AtomConsumer oAtomConsumer, Properties oEnvironmentProps) { 71 int nThreads = Integer.parseInt(oEnvironmentProps.getProperty("maxschedulerthreads", "1")); 72 73 if (DebugFile.trace) DebugFile.writeln("maxschedulerthreads=" + String.valueOf(nThreads)); 74 75 oEnvProps = oEnvironmentProps; 76 aThreads = new WorkerThread[nThreads]; 77 aStartTime = new long[nThreads]; 78 79 for (int t=0; t<nThreads; t++) { 80 if (DebugFile.trace) DebugFile.writeln("new WorkerThread(" + String.valueOf(t) + ")"); 81 82 aThreads[t] = new WorkerThread(this, oAtomConsumer); 83 84 aThreads[t].setName("WorkerThread_" + String.valueOf(t)); 85 } } 87 88 90 93 public int size() { 94 return aThreads.length; 95 } 96 97 99 102 public Properties getProperties() { 103 return oEnvProps; 104 } 105 106 108 112 public String getProperty(String sKey) { 113 return oEnvProps.getProperty(sKey); 114 } 115 116 118 public long getRunningTimeMS() { 119 long lRunningTime = 0l; 120 for (int t=0; t<aThreads.length; t++) 121 lRunningTime += aThreads[t].getRunningTimeMS(); 122 return lRunningTime; 123 } 124 125 127 130 public void launchAll() { 131 for (int t=0; t<aThreads.length; t++) { 132 if (!aThreads[t].isAlive()) { 133 aStartTime[t] = new java.util.Date ().getTime(); 134 aThreads[t].start(); 135 } 136 } } 139 141 144 public int livethreads() { 145 int iLive = 0; 146 147 for (int t=0; t<aThreads.length; t++) { 148 if (aThreads[t].isAlive()) { 149 iLive++; 150 } 151 } return iLive; 153 } 155 157 public WorkerThread[] threads() { 158 return aThreads; 159 } 160 161 163 168 public Atom[] runningAtoms() { 169 if (livethreads()==0) return null; 170 Atom[] aAtoms = new Atom[livethreads()]; 171 int a = 0; 172 final int iThreads = aThreads.length; 173 for (int t=0; t<iThreads; t++) { 174 if (aThreads[t].isAlive()) { 175 aAtoms[a++]=aThreads[t].activeAtom(); 176 } } return aAtoms; 179 } 181 183 188 public String [] runningJobs() { 189 Atom[] aAtoms = runningAtoms(); 190 if (aAtoms==null) return null; 191 LinkedList oJobs = new LinkedList (); 192 String sJob; 193 int nAtoms = aAtoms.length; 194 for (int a=0; a<nAtoms; a++) { 195 sJob = aAtoms[a].getString(DB.gu_job); 196 if (oJobs.contains(sJob)) oJobs.add(sJob); 197 } 198 if (oJobs.size()==0) return null; 199 String [] aJobs = new String [oJobs.size()]; 200 ListIterator oIter = oJobs.listIterator(); 201 int j = 0; 202 while (oIter.hasNext()) { 203 aJobs[j] = (String ) oIter.next(); 204 } return aJobs; 206 } 208 210 215 public void registerCallback(WorkerThreadCallback oNewCallback) 216 throws IllegalArgumentException { 217 final int iThreads = aThreads.length; 218 for (int t=0; t<iThreads; t++) 219 aThreads[t].registerCallback(oNewCallback); 220 } 222 224 228 public void unregisterCallback(String sCallbackName) { 229 final int iThreads = aThreads.length; 230 231 for (int t=0; t<iThreads; t++) 232 aThreads[t].unregisterCallback(sCallbackName); 233 234 } 236 238 244 public void haltAll() { 245 final int iThreads = aThreads.length; 246 for (int t=0; t<iThreads; t++) 247 aThreads[t].halt(); 248 } 249 250 252 258 public void stopAll() { 259 final int iThreads = aThreads.length; 260 for (int t=0; t<iThreads; t++) { 261 if (aThreads[t].isAlive()) aThreads[t].stop(); 262 } 263 } 264 265 267 272 public void stopAll(JDCConnection oConn) throws SQLException { 273 final int iThreads = aThreads.length; 274 Atom oActiveAtom; 275 for (int t=0; t<iThreads; t++) { 276 oActiveAtom = aThreads[t].activeAtom(); 277 if (null!=oActiveAtom) 278 oActiveAtom.setStatus(oConn, Atom.STATUS_INTERRUPTED, "Interrupted by user"); 279 if (aThreads[t].isAlive()) aThreads[t].stop(); 280 } 281 } 282 283 285 } | Popular Tags |