KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > knowgate > scheduler > WorkerThreadPool


1 /*
2   Copyright (C) 2003 Know Gate S.L. All rights reserved.
3                       C/Oņa, 107 1š2 28050 Madrid (Spain)
4
5   Redistribution and use in source and binary forms, with or without
6   modification, are permitted provided that the following conditions
7   are met:
8
9   1. Redistributions of source code must retain the above copyright
10      notice, this list of conditions and the following disclaimer.
11
12   2. The end-user documentation included with the redistribution,
13      if any, must include the following acknowledgment:
14      "This product includes software parts from hipergate
15      (http://www.hipergate.org/)."
16      Alternately, this acknowledgment may appear in the software itself,
17      if and wherever such third-party acknowledgments normally appear.
18
19   3. The name hipergate must not be used to endorse or promote products
20      derived from this software without prior written permission.
21      Products derived from this software may not be called hipergate,
22      nor may hipergate appear in their name, without prior written
23      permission.
24
25   This library is distributed in the hope that it will be useful,
26   but WITHOUT ANY WARRANTY; without even the implied warranty of
27   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
28
29   You should have received a copy of hipergate License with this code;
30   if not, visit http://www.hipergate.org or mail to info@hipergate.org
31 */

32
33 package com.knowgate.scheduler;
34
35 import java.sql.SQLException JavaDoc;
36
37 import java.util.Properties JavaDoc;
38 import java.util.LinkedList JavaDoc;
39 import java.util.ListIterator JavaDoc;
40 import java.util.HashMap JavaDoc;
41 import java.util.Iterator JavaDoc;
42
43 import com.knowgate.dataobjs.DB;
44 import com.knowgate.debug.DebugFile;
45 import com.knowgate.jdc.JDCConnection;
46
47 /**
48  * WorkerThread Pool
49  * @author Sergio Montoro Ten
50  * @version 3.0
51  */

52
53 public class WorkerThreadPool {
54
55   private WorkerThread aThreads[];
56   private long aStartTime[];
57   private Properties JavaDoc oEnvProps;
58
59   // ---------------------------------------------------------------------------
60

61   /**
62    * <p>Create WorkerThreadPool</p>
63    * thread Pool size is readed from maxschedulerthreads property of oEnvironmentProps,
64    * the default value is 1.
65    * Each thread is given the name WorkerThread_<i>n</i>
66    * @param oAtomConsumer Atom Consumer Object to be used
67    * @param oEnvironmentProps Environment Properties collection
68    * (usually readed from hipergate.cnf)
69    */

70   public WorkerThreadPool(AtomConsumer oAtomConsumer, Properties JavaDoc oEnvironmentProps) {
71     int nThreads = Integer.parseInt(oEnvironmentProps.getProperty("maxschedulerthreads", "1"));
72
73     if (DebugFile.trace) DebugFile.writeln("maxschedulerthreads=" + String.valueOf(nThreads));
74
75     oEnvProps = oEnvironmentProps;
76     aThreads = new WorkerThread[nThreads];
77     aStartTime = new long[nThreads];
78
79     for (int t=0; t<nThreads; t++) {
80       if (DebugFile.trace) DebugFile.writeln("new WorkerThread(" + String.valueOf(t) + ")");
81
82       aThreads[t] = new WorkerThread(this, oAtomConsumer);
83
84       aThreads[t].setName("WorkerThread_" + String.valueOf(t));
85     } // next(t)
86
}
87
88   // ---------------------------------------------------------------------------
89

90   /**
91    * Get Pool Size
92    */

93   public int size() {
94     return aThreads.length;
95   }
96
97   // ---------------------------------------------------------------------------
98

99   /**
100    * Get Environment properties collection from hipergate.cnf
101    */

102   public Properties JavaDoc getProperties() {
103     return oEnvProps;
104   }
105
106   // ---------------------------------------------------------------------------
107

108   /**
109    * Get Environment property
110    * @return
111    */

112   public String JavaDoc getProperty(String JavaDoc sKey) {
113     return oEnvProps.getProperty(sKey);
114   }
115
116   // ---------------------------------------------------------------------------
117

118   public long getRunningTimeMS() {
119     long lRunningTime = 0l;
120     for (int t=0; t<aThreads.length; t++)
121       lRunningTime += aThreads[t].getRunningTimeMS();
122     return lRunningTime;
123   }
124
125   // ---------------------------------------------------------------------------
126

127   /**
128    * Launch all WorkerThreads and start consuming atoms from queue.
129    */

130   public void launchAll() {
131     for (int t=0; t<aThreads.length; t++) {
132       if (!aThreads[t].isAlive()) {
133         aStartTime[t] = new java.util.Date JavaDoc().getTime();
134         aThreads[t].start();
135       }
136     } // next
137
} // launchAll
138

139   // ---------------------------------------------------------------------------
140

141   /**
142    * Count of currently active WorkerThreads
143    */

144   public int livethreads() {
145     int iLive = 0;
146
147     for (int t=0; t<aThreads.length; t++) {
148       if (aThreads[t].isAlive()) {
149         iLive++;
150       }
151     } // next
152
return iLive;
153   } // livethreads
154

155   // ---------------------------------------------------------------------------
156

157   public WorkerThread[] threads() {
158     return aThreads;
159   }
160
161   // ---------------------------------------------------------------------------
162

163   /**
164    * Get array of atoms currently running at live WorkerThreads
165    * @return Atom[]
166    * @since 3.0
167    */

168   public Atom[] runningAtoms() {
169     if (livethreads()==0) return null;
170     Atom[] aAtoms = new Atom[livethreads()];
171     int a = 0;
172     final int iThreads = aThreads.length;
173     for (int t=0; t<iThreads; t++) {
174       if (aThreads[t].isAlive()) {
175         aAtoms[a++]=aThreads[t].activeAtom();
176       } // fi
177
} // next (t)
178
return aAtoms;
179   } // runningAtoms
180

181   // ---------------------------------------------------------------------------
182

183   /**
184    * Get array with GUIDs of Jobs currently run by live WorkerThreads
185    * @return String[] Job GUID array
186    * @since 3.0
187    */

188   public String JavaDoc[] runningJobs() {
189     Atom[] aAtoms = runningAtoms();
190     if (aAtoms==null) return null;
191     LinkedList JavaDoc oJobs = new LinkedList JavaDoc();
192     String JavaDoc sJob;
193     int nAtoms = aAtoms.length;
194     for (int a=0; a<nAtoms; a++) {
195       sJob = aAtoms[a].getString(DB.gu_job);
196       if (oJobs.contains(sJob)) oJobs.add(sJob);
197     }
198     if (oJobs.size()==0) return null;
199     String JavaDoc[] aJobs = new String JavaDoc[oJobs.size()];
200     ListIterator JavaDoc oIter = oJobs.listIterator();
201     int j = 0;
202     while (oIter.hasNext()) {
203       aJobs[j] = (String JavaDoc) oIter.next();
204     } // wend
205
return aJobs;
206   } // runningJobs
207

208   // ---------------------------------------------------------------------------
209

210   /**
211    * Register a thread callback object for each thread in this pool
212    * @param oNewCallback WorkerThreadCallback subclass instance
213    * @throws IllegalArgumentException If a callback with same name has oNewCallback was already registered
214    */

215   public void registerCallback(WorkerThreadCallback oNewCallback)
216     throws IllegalArgumentException JavaDoc {
217     final int iThreads = aThreads.length;
218     for (int t=0; t<iThreads; t++)
219       aThreads[t].registerCallback(oNewCallback);
220   } // registerCallback
221

222   // ---------------------------------------------------------------------------
223

224   /**
225     * Unregister a thread callback object for each thread in this pool
226     * @param sCallbackName Name of callback to be unregistered
227     */

228    public void unregisterCallback(String JavaDoc sCallbackName) {
229      final int iThreads = aThreads.length;
230
231      for (int t=0; t<iThreads; t++)
232        aThreads[t].unregisterCallback(sCallbackName);
233
234    } // unregisterCallback
235

236    // --------------------------------------------------------------------------
237

238    /**
239     * <p>Halt all pooled threads commiting any pending operations before stoping</p>
240     * If a thread is dead-locked by any reason halting it will not cause any effect.<br>
241     * halt() method only sends a signals to the each WokerThread telling it that must
242     * finish pending operations and stop.
243     */

244    public void haltAll() {
245      final int iThreads = aThreads.length;
246      for (int t=0; t<iThreads; t++)
247        aThreads[t].halt();
248    }
249
250    // --------------------------------------------------------------------------
251

252    /**
253     * <p>Call stop() on every thread of the pool which is alive</p>
254     * This method should only be used when threads cannot be stopped by calling
255     * haltAll()
256     * @deprecated Use stopAll(JDCConnection) instead
257     */

258    public void stopAll() {
259      final int iThreads = aThreads.length;
260      for (int t=0; t<iThreads; t++) {
261        if (aThreads[t].isAlive()) aThreads[t].stop();
262      }
263    }
264
265    // ---------------------------------------------------------------------------
266

267    /**
268     * <p>Call stop() on every thread of the pool which is alive</p>
269     * All running atoms are set to STATUS_INTERRUPTED
270     * @since 3.0
271     */

272    public void stopAll(JDCConnection oConn) throws SQLException JavaDoc {
273      final int iThreads = aThreads.length;
274      Atom oActiveAtom;
275      for (int t=0; t<iThreads; t++) {
276        oActiveAtom = aThreads[t].activeAtom();
277        if (null!=oActiveAtom)
278          oActiveAtom.setStatus(oConn, Atom.STATUS_INTERRUPTED, "Interrupted by user");
279        if (aThreads[t].isAlive()) aThreads[t].stop();
280      }
281    }
282
283    // ---------------------------------------------------------------------------
284

285 } // WorkerThreadPool
286
Popular Tags