KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > excalibur > event > command > TPCThreadManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12  * implied.
13  *
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.excalibur.event.command;
18
19 import org.apache.avalon.framework.logger.NullLogger;
20 import org.apache.avalon.framework.parameters.ParameterException;
21 import org.apache.avalon.framework.parameters.Parameterizable;
22 import org.apache.avalon.framework.parameters.Parameters;
23 import org.apache.excalibur.util.SystemUtil;
24
25 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
26
27 /**
28  * This is a ThreadManager that uses a certain number of threads per
29  * processor. The number of threads in the pool is a direct proportion to
30  * the number of processors. The size of the thread pool is (processors
31  * threads-per-processor) + 1
32  *
33  * @author <a HREF="mailto:dev@avalon.apache.org">Avalon Development Team</a>
34  */

35 public final class TPCThreadManager extends AbstractThreadManager implements Parameterizable
36 {
37     private PooledExecutor m_threadPool;
38     private int m_processors = -1;
39     private int m_threadsPerProcessor = 1;
40     private int m_keepAliveTime = 300000;
41     private boolean m_hardShutdown = false;
42
43     /**
44      * The following parameters can be set for this class:
45      *
46      * <table>
47      * <tr>
48      * <th>Name</th> <th>Description</th> <th>Default Value</th>
49      * </tr>
50      * <tr>
51      * <td>processors</td>
52      * <td>Number of processors (autodetected if less than one)</td>
53      * <td>Results from SystemUtil.numProcessors()</td>
54      * </tr>
55      * <tr>
56      * <td>threads-per-processor</td>
57      * <td>Threads per processor to use (Rewritten to 1 if less than one)</td>
58      * <td>1</td>
59      * </tr>
60      * <tr>
61      * <td>sleep-time</td>
62      * <td>Time (in milliseconds) to wait between queue pipeline processing runs</td>
63      * <td>1000</td>
64      * </tr>
65      * <tr>
66      * <td>keep-alive-time</td>
67      * <td>Time (in milliseconds) that idle threads should remain in the threadpool</td>
68      * <td>300000</td>
69      * </tr>
70      * <tr>
71      * <td>force-shutdown</td>
72      * <td>At shutdown time, allow currently queued tasks to finish, or immediately quit</td>
73      * <td>false</td>
74      * </tr>
75      * </table>
76      *
77      * @param parameters The Parameters object
78      *
79      * @throws ParameterException if there is a problem with the parameters.
80      */

81     public void parameterize( Parameters parameters ) throws ParameterException
82     {
83         m_processors = Math.max(1, parameters.getParameterAsInteger( "processors", 1 ) );
84
85         m_threadsPerProcessor =
86             Math.max( parameters.getParameterAsInteger( "threads-per-processor", 1 ), 1 );
87
88         m_keepAliveTime = parameters.getParameterAsInteger("keep-alive-time", 300000);
89
90         setSleepTime( parameters.getParameterAsLong( "sleep-time", 1000L ) );
91
92         m_hardShutdown = ( parameters.getParameterAsBoolean( "force-shutdown", false ) );
93     }
94
95     public void initialize() throws Exception JavaDoc
96     {
97         if( m_processors < 1 )
98         {
99             m_processors = Math.max( 1, SystemUtil.numProcessors() );
100         }
101
102         if( isInitialized() )
103         {
104             throw new IllegalStateException JavaDoc( "ThreadManager is already initailized" );
105         }
106
107         final int maxPoolSize = ( m_processors * m_threadsPerProcessor ) + 1;
108         m_threadPool = new PooledExecutor( m_processors + 1 );
109         m_threadPool.setMinimumPoolSize( 2 ); // at least two threads
110
m_threadPool.setMaximumPoolSize( maxPoolSize );
111         m_threadPool.waitWhenBlocked();
112         if( maxPoolSize == 2 )
113         {
114             // The PooledExecutor has an inherent race condition between releasing threads
115
// and adding new tasks (when using the waitWhenBlocked policy):
116
// it could be that a thread is being released while a new
117
// task is being added. That task would then remain waiting to be picked up by
118
// the next thread that becomes available, but meanwhile the threadpool is below its maximum capacity.
119
// If the threadpool has a maximum size of 1, then this could leave the task waiting forever.
120
// Here we check if maxPoolSize == 2 because one of the threads used by the threadpool will
121
// be used continuously by the ThreadManager itself.
122
// As a solution to this problem, the one available work-thread we have in this case
123
// is set to never expire.
124
m_threadPool.setKeepAliveTime( -1 );
125         }
126         else
127         {
128             m_threadPool.setKeepAliveTime( m_keepAliveTime );
129         }
130
131         if( null == getLogger() )
132         {
133             this.enableLogging( new NullLogger() );
134         }
135
136         setExecutor( m_threadPool );
137
138         super.initialize();
139     }
140
141     protected final void doDispose()
142     {
143         if ( m_hardShutdown )
144         {
145             m_threadPool.shutdownNow();
146         }
147         else
148         {
149             m_threadPool.shutdownAfterProcessingCurrentlyQueuedTasks();
150         }
151
152         m_threadPool.interruptAll();
153
154         try
155         {
156             if ( !m_threadPool.awaitTerminationAfterShutdown( getSleepTime() ) )
157             {
158                 getLogger().warn("Thread pool took longer than " + getSleepTime() +
159                      " ms to shut down");
160             }
161         }
162         catch (InterruptedException JavaDoc ie)
163         {
164             getLogger().warn("Thread pool was interrupted while waiting for shutdown to complete.", ie);
165         }
166     }
167 }
168
Popular Tags