KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > jetspeed > services > threadpool > JetspeedThreadPoolService


1 /*
2  * Copyright 2000-2001,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.jetspeed.services.threadpool;
18
19 // Java Stuff
20
import java.util.*;
21 import javax.servlet.ServletConfig JavaDoc;
22
23 // Turbine Stuff
24
import org.apache.turbine.services.TurbineBaseService;
25
26 // Jetspeed classes
27
import org.apache.jetspeed.services.logging.JetspeedLogFactoryService;
28 import org.apache.jetspeed.services.logging.JetspeedLogger;
29
30 /**
31  * This is a Service that provides a simple threadpool usable by all
32  * thread intensive classes in order to optimize resources utilization
33  * screen:<br>
34  *
35  * <p>It uses 3 parameters for contolling resource usage:
36  * <dl>
37  * <dt>init.count</dt>
38  * <dd>The number of threads to start at initizaliation</dd>
39  * <dt>max.count</dt>
40  * <dd>The maximum number of threads started by this service</dd>
41  * <dt>minspare.count</dt>
42  * <dd>The pool tries to keep lways this minimum number if threads
43  * available</dd>
44  * </dl>
45  * </p>
46  *
47  * @author <a HREF="mailto:burton@apache.org">Kevin A. Burton</a>
48  * @author <a HREF="mailto:raphael@apache.org">Raphaël Luta</a>
49  * @author <a HREF="mailto:sgala@hisitech.com">Santiago Gala</a>
50  * @version $Id: JetspeedThreadPoolService.java,v 1.10 2004/02/23 03:51:31 jford Exp $
51  */

52 public class JetspeedThreadPoolService
53     extends TurbineBaseService
54     implements ThreadPoolService
55 {
56     /**
57      * Static initialization of the logger for this class
58      */

59     protected static final JetspeedLogger logger = JetspeedLogFactoryService.getLogger(JetspeedThreadPoolService.class.getName());
60     
61     /**
62      * The number of threads to create on initialization
63      */

64     private int initThreads = 50;
65
66     /**
67      * The maximum number of threads that should ever be created.
68      */

69     private int maxThreads = 100;
70
71     /**
72      * The minimum amount of threads that should always be available
73      */

74     private int minSpareThreads = 15;
75
76     /**
77      * The default priority to use when creating new threads.
78      */

79     public static final int DEFAULT_THREAD_PRIORITY = Thread.MIN_PRIORITY;
80
81     /**
82      * Stores threads that are available within the pool.
83      */

84     private Vector availableThreads = new Vector();
85
86     
87     /**
88      * The thread group used for all created threads.
89      */

90     private ThreadGroup JavaDoc tg = new ThreadGroup JavaDoc("JetspeedThreadPoolService");
91     
92     /**
93      * Create a new queue for adding Runnable objects to.
94      */

95     private Queue queue = new Queue();
96
97     /**
98      * Holds the total number of threads that have ever been processed.
99      */

100     private int count = 0;
101
102
103     /**
104      * Constructor.
105      *
106      * @exception Exception, a generic exception.
107      */

108     public JetspeedThreadPoolService()
109         throws Exception JavaDoc
110     {
111     }
112
113
114     /**
115      * Late init. Don't return control until early init says we're done.
116      */

117     public void init( )
118     {
119         while( !getInit() ) {
120             try {
121                 Thread.sleep(500);
122             } catch (InterruptedException JavaDoc ie ) {
123                 logger.info("ThreadPool service: Waiting for init()..." );
124             }
125         }
126     }
127
128     /**
129      * Called during Turbine.init()
130      *
131      * @param config A ServletConfig.
132      */

133     public synchronized void init( ServletConfig JavaDoc config )
134     {
135         if( getInit() ) {
136             //Already inited
137
return;
138         }
139
140         try
141         {
142             logger.info ( "JetspeedThreadPoolService early init()....starting!");
143             initThreadpool(config);
144             setInit(true);
145             logger.info ( "JetspeedThreadPoolService early init()....finished!");
146         }
147         catch (Exception JavaDoc e)
148         {
149             logger.error ( "Cannot initialize JetspeedThreadPoolService!", e );
150         }
151
152         // we don't call setInit(true) yet, because we want init() to be called also
153
}
154
155     /**
156      * Processes the Runnable object with an available thread at default priority
157      *
158      * @see #process( Runnable, int )
159      * @param runnable the runnable code to process
160      */

161     public void process( Runnable JavaDoc runnable ) {
162         
163         process( runnable, Thread.MIN_PRIORITY );
164         
165     }
166     
167     /**
168      * Process a Runnable object by allocating a Thread for it
169      * at the given priority
170      *
171      * @param runnable the runnable code to process
172      * @param priority the priority used be the thread that will run this runnable
173      */

174     public void process( Runnable JavaDoc runnable, int priority ) {
175         
176         RunnableThread thread = this.getAvailableThread();
177         
178         if ( thread == null ) {
179
180             this.getQueue().add( runnable );
181         
182         } else {
183
184             try {
185                 synchronized ( thread ) {
186                     //get the default priority of this Thread
187
int defaultPriority = thread.getPriority();
188                     if( defaultPriority != priority ) {
189                         //setting priority triggers security checks,
190
//so we do it only if needed.
191
thread.setPriority( priority );
192                     }
193                     thread.setRunnable( runnable );
194                     thread.notify();
195                 }
196             } catch ( Throwable JavaDoc t ) {
197                 logger.error("Throwable", t);
198             }
199             
200         }
201         
202         
203     }
204     
205     /**
206      * Get the number of threads that have been created
207      *
208      * @return the number of threads currently created by the pool
209      */

210     public int getThreadCount() {
211         return this.tg.activeCount();
212     }
213     
214     /**
215      * Get the number of threads that are available.
216      *
217      * @return the number of threads available in the pool
218      */

219     public int getAvailableThreadCount() {
220         return this.availableThreads.size();
221     }
222     
223     /**
224      * Get the current length of the Runnable queue, waiting for processing
225      *
226      * @return the length of the queue of waiting processes
227      */

228     public int getQueueLength() {
229         return this.getQueue().size();
230     }
231    
232     /**
233      * Get the number of threads that have successfully been processed
234      * for logging and debugging purposes.
235      *
236      * @return the number of processes executed since initialization
237      */

238     public int getThreadProcessedCount() {
239         return this.count;
240     }
241    
242     /**
243      * Get the queue used by the JetspeedThreadPoolService
244      *
245      * @return the queue holding the waiting processes
246      */

247     Queue getQueue() {
248         return this.queue;
249     }
250
251     /**
252      * Place this thread back into the pool so that it can be used again
253      *
254      * @param thread the thread to release back to the pool
255      */

256     void release( RunnableThread thread ) {
257         
258         synchronized ( this.availableThreads ) {
259
260             this.availableThreads.addElement( thread );
261         
262             ++this.count;
263
264             /*
265             It is important to synchronize here because it is possible that
266             between the time we check the queue and we get this another
267             thread might return and fetch the queue to the end.
268             */

269             synchronized( this.getQueue() ) {
270             
271                 //now if there are any objects in the queue add one for processing to
272
//the thread that you just freed up.
273
if ( this.getQueue().size() > 0 ) {
274
275                     Runnable JavaDoc r = this.getQueue().get();
276
277                     if ( r != null ) {
278                         this.process( r );
279                     } else {
280                         logger.info( "JetspeedThreadPoolService: no Runnable found." );
281                     }
282             
283                 }
284             
285             }
286             
287         }
288         
289     }
290     
291     /**
292      * This method initialized the ThreadPool
293      *
294      * @param config A ServletConfig.
295      */

296     private void initThreadpool( ServletConfig JavaDoc config )
297     {
298         Properties props = getProperties();
299
300         try {
301
302             this.initThreads = Integer.parseInt( props.getProperty( "init.count" ) );
303             this.maxThreads = Integer.parseInt( props.getProperty( "max.count" ) );
304             this.minSpareThreads = Integer.parseInt( props.getProperty( "minspare.count" ) );
305
306         } catch ( NumberFormatException JavaDoc e ) {
307             logger.error("Invalid number format in properties", e);
308         }
309                                           
310         //create the number of threads needed for initialization
311
createThreads( this.initThreads );
312
313     }
314
315     /**
316      * Create "count" number of threads and make them available.
317      *
318      * @param count the number of threads to create
319      */

320     private synchronized void createThreads( int count ) {
321         
322         //if the amount of threads you are about to create would end up being
323
//greater than maxThreads then just cap this off to the end point so that
324
//you end up with exactly maxThreads
325
if ( this.getThreadCount() < this.maxThreads &&
326              this.getThreadCount() + count > this.maxThreads ) {
327             
328             count = this.maxThreads - this.getThreadCount();
329             
330         } else if ( this.getThreadCount() >= this.maxThreads ) {
331
332             return;
333         }
334         
335         logger.info( "JetspeedThreadPoolService: creating " +
336                    count +
337                    " more thread(s) for a total of: " +
338                    ( this.getThreadCount() + count ) );
339         
340         for (int i = 0; i < count; ++i ) {
341             
342
343             //RunnableThread has a static numbering counter
344
RunnableThread thread = new RunnableThread( this.tg);
345             thread.setPriority( DEFAULT_THREAD_PRIORITY );
346             
347             thread.start(); //The thread calls release to add...
348
//SGP this.availableThreads.addElement( thread );
349

350         }
351         
352     }
353
354     /**
355      * Get a thread that is available from the pool or null if there are no more
356      * threads left.
357      *
358      * @return a thread from the pool or null if non available
359      */

360     private RunnableThread getAvailableThread() {
361         
362        
363         synchronized( this.availableThreads ) {
364
365             //if the current number of available threads is less than minSpareThreads
366
//then we need to create more
367

368             if ( this.getAvailableThreadCount() < this.minSpareThreads ) {
369                 this.createThreads( this.minSpareThreads );
370             }
371
372             //now if there aren't any threads available then just return null.
373
if ( this.getAvailableThreadCount() == 0 ) {
374                 return null;
375             }
376         
377             RunnableThread thread = null;
378             
379             
380             
381             //get the element to use
382
int id = this.availableThreads.size() - 1;
383
384             
385             
386             thread = (RunnableThread)this.availableThreads.elementAt( id );
387             this.availableThreads.removeElementAt( id );
388
389             return thread;
390         }
391         
392     
393     }
394
395 }
396
397 /**
398  * Handles holding Runnables until they are ready to be processed. This is an impl
399  * of a FIFO (First In First Out) Queue. This makes it possible to add Runnable
400  * objects so that they get processed and they pass through the queue in a predictable
401  * fashion.
402  *
403  * @author <a HREF="mailto:burton@apache.org">Kevin A. Burton</a>
404  * @version $Id: JetspeedThreadPoolService.java,v 1.10 2004/02/23 03:51:31 jford Exp $
405  */

406 class Queue {
407     
408     /**
409      * Holds Runnables that have been requested to process but there are no
410      * threads available.
411      */

412     private Vector queue = new Vector();
413     
414     /**
415      * Add a Runnable object into the queue.
416      *
417      * @param runnable the process to add to the queue
418      */

419     public synchronized void add( Runnable JavaDoc runnable ) {
420         queue.insertElementAt( runnable, 0 );
421     }
422     
423     /**
424      * Get a Runnable object from the queue, and then remove it. Return null
425      * if no more Runnable objects exist.
426      *
427      * @return the first Runnable stored in the queue or null if empty
428      */

429     public synchronized Runnable JavaDoc get() {
430         
431         if ( this.queue.size() == 0 ) {
432             JetspeedThreadPoolService.logger.info( "JetspeedThreadPoolService->Queue: No more Runnables left in queue. Returning null" );
433             return null;
434         }
435         
436         int id = queue.size() - 1;
437         Runnable JavaDoc runnable = (Runnable JavaDoc)queue.elementAt( id );
438         this.queue.removeElementAt( id );
439
440         return runnable;
441     }
442     
443     /**
444      * Return the size of the queue.
445      *
446      * @return the size of the queue
447      */

448     public int size() {
449         return this.queue.size();
450     }
451     
452 }
453
Popular Tags