KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sourceforge > groboutils > util > thread > v1 > ThreadPool


1 /*
2  * @(#)ThreadPool.java 0.9.0 06/04/2000 - 15:13:54
3  *
4  * Copyright (C) 2000,,2003 2002 Matt Albrecht
5  * groboclown@users.sourceforge.net
6  * http://groboutils.sourceforge.net
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a
9  * copy of this software and associated documentation files (the "Software"),
10  * to deal in the Software without restriction, including without limitation
11  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12  * and/or sell copies of the Software, and to permit persons to whom the
13  * Software is furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
21  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24  * DEALINGS IN THE SOFTWARE.
25  */

26
27 package net.sourceforge.groboutils.util.thread.v1;
28
29
30 import net.sourceforge.groboutils.util.datastruct.v1.SynchQueue;
31
32 import java.util.Vector JavaDoc;
33
34
35 /**
36  * A pool of QueueThread instances, each containing an instance of
37  * an ObjectListener implemented class. The Class to be the listener
38  * is passed into the constructor. Requirements for the Class are:
39  * 1. it implements QueueThread.ObjectListener, 2. it has a public
40  * constructor without any parameters.
41  * <P>
42  * The pool handles menial tasks such as:
43  * <ol>
44  * <li>Growing the thread pool if the number of waiting objects
45  * is above a threshold number, up to a maximum number of
46  * threads.
47  * <li>Finding the thread with the fewest number of waiting objects.
48  * <li>Optimization of determining which thread to pass events to.
49  * </ol>
50  * <P>
51  * The pool gets much of its functionality by sharing a single SynchQueue
52  * between all of its threads.
53  *
54  * @author Matt Albrecht <a HREF="mailto:groboclown@users.sourceforge.net">groboclown@users.sourceforge.net</a>
55  * @since June 4, 2000
56  * @version $Date: 2003/02/10 22:52:49 $
57  */

58 public class ThreadPool
59 {
60     //----------------------------
61
// Public data
62

63     
64     //----------------------------
65
// Private data
66

67     private Class JavaDoc m_objListenerClass = null;
68     private Object JavaDoc m_objListenerInitData = null;
69     
70     private QueueThread[] m_pool = null;
71     private SynchQueue m_sharedQueue = new SynchQueue();
72     
73     private int m_maxThreads = 10;
74     private int m_numThreads = 0;
75     private int m_depthThreshold = 5;
76     
77     //----------------------------
78
// constructors
79

80     /**
81      * Default constructor
82      */

83     public ThreadPool( Class JavaDoc objectListenerClass )
84     {
85         this( objectListenerClass, null, 1, 10 );
86     }
87     
88     
89     /**
90      *
91      */

92     public ThreadPool( Class JavaDoc objectListenerClass, int maxThreads )
93     {
94         this( objectListenerClass, null, 1, maxThreads );
95     }
96     
97     
98     /**
99      *
100      * @param initData if the given objectListenerClass is an instance
101      * of ThreadObjectListener, then the initData will be passed
102      * into the initialize( Object ) method.
103      */

104     public ThreadPool( Class JavaDoc objectListenerClass, Object JavaDoc initData )
105     {
106         this( objectListenerClass, initData, 1, 10 );
107     }
108     
109     
110     /**
111      *
112      */

113     public ThreadPool( Class JavaDoc objectListenerClass, Object JavaDoc initData,
114             int maxThreads )
115     {
116         this( objectListenerClass, initData, 1, maxThreads );
117     }
118     
119     
120     /**
121      *
122      */

123     public ThreadPool( Class JavaDoc objectListenerClass, Object JavaDoc initData,
124             int startingThreadCount, int maxThreads )
125     {
126         this.m_objListenerClass = objectListenerClass;
127         this.m_objListenerInitData = initData;
128         try
129         {
130             createObjectListenerInstance();
131         }
132         catch (Exception JavaDoc ex)
133         {
134 ex.printStackTrace();
135             throw new IllegalArgumentException JavaDoc( "Class "+objectListenerClass+
136                 " does not create ObjectListener instances");
137         }
138         
139         setMaximumThreadCount( maxThreads );
140         this.m_pool = new QueueThread[ maxThreads ];
141         while (this.m_numThreads < startingThreadCount)
142         {
143             addNewThread();
144         }
145     }
146     
147     
148     //----------------------------
149
// Public methods
150

151     /**
152      *
153      */

154     public void setDepthThreshold( int threshold )
155     {
156         if (threshold < 1)
157         {
158             throw new IllegalArgumentException JavaDoc("threshold "+threshold+
159                 " is too low");
160         }
161         this.m_depthThreshold = threshold;
162     }
163     
164     /**
165      *
166      */

167     public int getObjectDepth()
168     {
169         return this.m_sharedQueue.size();
170     }
171     
172     /**
173      * Adds the given object into the shared queue, so that the next
174      * available thread will process it.
175      */

176     public void addObject( Object JavaDoc o )
177     {
178         checkThreshold();
179         this.m_sharedQueue.enqueue( o );
180     }
181     
182     /**
183      *
184      */

185     public int getThreadCount()
186     {
187         return this.m_numThreads;
188     }
189     
190     /**
191      *
192      */

193     public int getMaximumThreadCount()
194     {
195         return this.m_maxThreads;
196     }
197     
198     /**
199      *
200      */

201     public void setMaximumThreadCount( int max )
202     {
203         if (max < 1)
204         {
205             throw new IllegalArgumentException JavaDoc("maximum count "+max+
206                 " is out of bounds" );
207         }
208         this.m_maxThreads = max;
209     }
210     
211     
212     /**
213      * Waits for all expecting objects in the queue to be processed,
214      * and for each thread to finish processing an object.
215      */

216     public void waitForThreadsToFinish()
217     {
218         // wait for the SynchQueue to empty
219
while (getObjectDepth() > 0)
220         {
221             Thread.yield();
222         }
223         
224         Vector JavaDoc v = new Vector JavaDoc();
225         synchronized (v)
226         {
227             // find all threads which are still processing objects
228
for (int i = this.m_numThreads; --i >= 0;)
229             {
230                 if (this.m_pool[i].isProcessingObjects())
231                 {
232                     v.addElement( this.m_pool[i] );
233                 }
234             }
235             
236             // wait for all threads to finish processing their objects
237
QueueThread qt;
238             while (v.size() > 0)
239             {
240                 Thread.yield();
241                 for (int i = v.size(); --i >= 0;)
242                 {
243                     qt = (QueueThread)v.elementAt(i);
244                     if (!qt.isProcessingObjects())
245                     {
246                         v.removeElementAt(i);
247                         // don't need to adjust i because
248
// we're procressing backwards.
249
}
250                 }
251             }
252         }
253     }
254     
255     /**
256      * Stops all threads.
257      */

258     public synchronized void stopThreads()
259     {
260         for (int i = this.m_numThreads; --i >= 0;)
261         {
262             if (this.m_pool[i] != null)
263                 this.m_pool[i].stop();
264         }
265     }
266     
267     /**
268      * Suspends all threads.
269      */

270     public synchronized void suspendThreads()
271     {
272         for (int i = this.m_numThreads; --i >= 0;)
273         {
274             if (this.m_pool[i] != null)
275                 this.m_pool[i].suspend();
276         }
277     }
278     
279     /**
280      * Resumes all threads.
281      */

282     public synchronized void resumeThreads()
283     {
284         for (int i = this.m_numThreads; --i >= 0;)
285         {
286             if (this.m_pool[i] != null)
287                 this.m_pool[i].resume();
288         }
289     }
290     
291     
292     
293     
294     //----------------------------
295
// Protected methods
296

297     /**
298      * If there are not enough threads, then add one into the
299      * internal array, start the thread, and return the created
300      * thread.
301      *
302      * @return the new thread, or <tt>null</tt> if the pool has
303      * exceeded its maximum thread count.
304      */

305     protected synchronized QueueThread addNewThread()
306     {
307         QueueThread qt = null;
308         if (this.m_numThreads < this.m_maxThreads)
309         {
310             qt = this.m_pool[ this.m_numThreads++ ] =
311                 new QueueThread( createObjectListenerInstance(),
312                 this.m_sharedQueue );
313             qt.start();
314         }
315         return qt;
316     }
317     
318     
319     /**
320      * Checks if the depth on the shared queue is too deep (beyond the
321      * threshold), and if so, creates a new thread to help deal with the
322      * situation.
323      */

324     protected void checkThreshold()
325     {
326         if (this.m_sharedQueue.size() > this.m_depthThreshold)
327         {
328             addNewThread();
329         }
330     }
331     
332     /**
333      * Create an instance of the basic object listener class, as given
334      * in the constructor.
335      *
336      * @exception IllegalStateException thrown if there is an error
337      * creating a new instance of the class.
338      */

339     protected IObjectListener createObjectListenerInstance()
340     {
341         try
342         {
343 //System.out.println("Creating an instance of class "+this.m_objListenerClass+
344
//", modifiers = "+(java.lang.reflect.Modifier.toString(
345
//this.m_objListenerClass.getModifiers())));
346
IObjectListener ol = (IObjectListener)
347                 this.m_objListenerClass.newInstance();
348             if (ol instanceof IThreadObjectListener)
349             {
350                 ((IThreadObjectListener)ol).initialize(
351                     this.m_objListenerInitData );
352             }
353             return ol;
354         }
355         catch (InstantiationException JavaDoc ie)
356         {
357             throw new IllegalStateException JavaDoc("could not instantiate from class "+
358                 this.m_objListenerClass.getName()+
359                 ": general instantiation exception "+ie.getMessage());
360         }
361         catch (IllegalAccessException JavaDoc iae)
362         {
363             throw new IllegalStateException JavaDoc("could not instantiate from class "+
364                 this.m_objListenerClass.getName()+
365                 ": could not access constructor "+iae.getMessage());
366         }
367         catch (ClassCastException JavaDoc cce)
368         {
369             throw new IllegalStateException JavaDoc("could not instantiate from class "+
370                 this.m_objListenerClass.getName()+": instance of wrong type "+
371                 cce.getMessage());
372         }
373     }
374     
375     //----------------------------
376
// Private methods
377
}
378     
379
380
Popular Tags