KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > excalibur > event > command > TPSPThreadManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12  * implied.
13  *
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.excalibur.event.command;
18
19 import java.util.*;
20
21 import org.apache.commons.collections.StaticBucketMap;
22 import org.apache.excalibur.event.EventHandler;
23 import org.apache.excalibur.event.Source;
24 import org.apache.excalibur.event.DequeueInterceptor;
25 import org.apache.excalibur.event.Queue;
26 import org.apache.excalibur.event.impl.NullDequeueInterceptor;
27 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
28
29 /**
30  * This is a <code>ThreadManager</code> which provides a threadpool per
31  * <code>Sink</code> per <code>EventPipeline</code>. ::NOTE:: This is not
32  * tested yet!
33  *
34  * @author <a HREF="mailto:dev@avalon.apache.org">Avalon Development Team</a>
35  */

36 public final class TPSPThreadManager implements ThreadManager
37 {
38     private final StaticBucketMap m_pipelines = new StaticBucketMap();
39     private final int m_maxThreadsPerPool;
40     private final int m_threshold;
41     private final int m_margin;
42
43     /**
44      * The default constructor assumes there is a system property named
45      * "os.arch.cpus" that has a default for the number of CPUs on a system.
46      * Otherwise, the value is 1.
47      */

48     public TPSPThreadManager()
49     {
50         this( 2, 1000 );
51     }
52
53     /**
54      * Constructor provides a specified number of threads per processor. If
55      * either value is less then one, then the value is rewritten as one.
56      *
57      * @param maxThreadPerPool The number of processors in the machine
58      * @param threshold The number of events before a new thread is started
59      */

60     public TPSPThreadManager( int maxThreadPerPool, int threshold )
61     {
62         this(maxThreadPerPool, threshold, (threshold/4));
63     }
64
65     /**
66      * Constructor provides a specified number of threads per processor. If
67      * either value is less then one, then the value is rewritten as one.
68      *
69      * @param maxThreadPerPool The number of processors in the machine
70      * @param threshold The number of events before a new thread is started
71      * @param margin The number of events +/- the threshold for thread evaluation
72      */

73     public TPSPThreadManager( int maxThreadPerPool, int threshold, int margin )
74     {
75         m_maxThreadsPerPool = maxThreadPerPool;
76         m_threshold = threshold;
77         m_margin = margin;
78     }
79
80     /**
81      * Register an EventPipeline with the ThreadManager.
82      *
83      * @param pipeline The pipeline we are registering
84      */

85     public void register( EventPipeline pipeline )
86     {
87         Source[] sources = pipeline.getSources();
88         EventHandler handler = pipeline.getEventHandler();
89         List sourceList = new ArrayList(sources.length);
90
91         for (int i = 0; i < sources.length; i++)
92         {
93             PooledExecutor threadPool = new PooledExecutor();
94             threadPool.setMinimumPoolSize(1);
95             threadPool.setMaximumPoolSize(m_maxThreadsPerPool);
96             SourceRunner initRunner = new SourceRunner(sources[i], handler);
97
98             try
99             {
100                 threadPool.execute(initRunner);
101             }
102             catch ( InterruptedException JavaDoc e )
103             {
104             }
105
106             sourceList.add(new SourceDequeueInterceptor(initRunner, handler, threadPool, m_threshold, m_margin));
107         }
108         m_pipelines.put( pipeline, sourceList );
109     }
110
111     /**
112      * Deregister an EventPipeline with the ThreadManager
113      *
114      * @param pipeline The pipeline to unregister
115      */

116     public void deregister( EventPipeline pipeline )
117     {
118         List sources = (List) m_pipelines.remove( pipeline );
119         Iterator it = sources.iterator();
120         while(it.hasNext())
121         {
122             SourceDequeueInterceptor intercept = (SourceDequeueInterceptor)it.next();
123             intercept.stop();
124         }
125     }
126
127     /**
128      * Deregisters all EventPipelines from this ThreadManager
129      */

130     public void deregisterAll()
131     {
132         Iterator it = m_pipelines.keySet().iterator();
133         while(it.hasNext())
134         {
135             deregister((EventPipeline)it.next());
136         }
137     }
138
139     /**
140      * The SourceRunner is used to dequeue events one at a time.
141      */

142     protected static final class SourceRunner implements Runnable JavaDoc
143     {
144         private final Source m_source;
145         private final EventHandler m_handler;
146         private volatile boolean m_keepProcessing;
147
148         /**
149          * Create a new SourceRunner.
150          *
151          * @param source The source to pull events from.
152          * @param handler The handler to send events to.
153          */

154         protected SourceRunner( final Source source, final EventHandler handler )
155         {
156             if ( source == null ) throw new NullPointerException JavaDoc("source");
157             if(handler == null)throw new NullPointerException JavaDoc("handler");
158             m_source = source;
159             m_handler = handler;
160             m_keepProcessing = true;
161         }
162
163         /**
164          * Called by the PooledExecutor to ensure all components are working.
165          */

166         public void run()
167         {
168             while (m_keepProcessing)
169             {
170                 Object JavaDoc event = m_source.dequeue();
171
172                 if ( event != null )
173                 {
174                     m_handler.handleEvent( event );
175                 }
176
177                 yield();
178             }
179         }
180
181         /**
182          * A way to make sure we yield the processor up to the next thread.
183          */

184         private static void yield()
185         {
186             try
187             {
188                 Thread.sleep(1);
189             }
190             catch (InterruptedException JavaDoc ie)
191             {
192                 //Nothing to do.
193
}
194         }
195
196         /**
197          * Stop the runner nicely.
198          */

199         public void stop()
200         {
201             m_keepProcessing = false;
202         }
203
204         /**
205          * Get a reference to the Source.
206          *
207          * @return the <code>Source</code>
208          */

209         public Source getSource()
210         {
211             return m_source;
212         }
213     }
214
215     /**
216      * This is used to plug into Queues so that we can intercept calls to the dequeue operation.
217      */

218     protected static final class SourceDequeueInterceptor implements DequeueInterceptor
219     {
220         private final Source m_source;
221         private final PooledExecutor m_threadPool;
222         private final int m_threshold;
223         private final DequeueInterceptor m_parent;
224         private final int m_margin;
225         private final LinkedList m_runners;
226         private final EventHandler m_handler;
227         private final SourceRunner m_initRunner;
228
229         /**
230          * Create a new SourceDequeueInterceptor. The parameters are used to ensure a working
231          * environment.
232          *
233          * @param runner The initial SourceRunner.
234          * @param handler The EventHandler to send events to.
235          * @param threadPool The PooledExecutor for the set of threads.
236          * @param threshold The threshold of events before a new thread is executed.
237          * @param margin The margin of error allowed for the events.
238          */

239         public SourceDequeueInterceptor( SourceRunner runner, EventHandler handler, PooledExecutor threadPool, int threshold, int margin )
240         {
241             if (runner == null) throw new NullPointerException JavaDoc("runner");
242             if (handler == null) throw new NullPointerException JavaDoc("handler");
243             if (threadPool == null) throw new NullPointerException JavaDoc("threadPool");
244             if ( threshold < threadPool.getMinimumPoolSize())
245                 throw new IllegalArgumentException JavaDoc("threshold must be higher than the minimum number" +
246                                                    " of threads for the pool");
247             if ( margin < 0 )
248                 throw new IllegalArgumentException JavaDoc("margin must not be less then zero");
249             if ( threshold - margin <= threadPool.getMinimumPoolSize() )
250                 throw new IllegalArgumentException JavaDoc( "The margin must not exceed or equal the" +
251                                                     " differnece between threshold and the thread" +
252                                                     " pool minimum size" );
253
254             m_source = runner.getSource();
255             m_initRunner = runner;
256             m_threadPool = threadPool;
257             m_threshold = threshold;
258             m_runners = new LinkedList();
259             m_handler = handler;
260
261             if ( m_source instanceof Queue)
262             {
263                 Queue queue = (Queue) m_source;
264                 m_parent = queue.getDequeueInterceptor();
265                 queue.setDequeueInterceptor(this);
266             }
267             else
268             {
269                 m_parent = new NullDequeueInterceptor();
270             }
271
272             m_margin = margin;
273         }
274
275         /**
276          * An operation executed before dequeing events from
277          * the queue. The Source is passed in so the implementation
278          * can determine to execute based on the queue properties.
279          *
280          * <p>
281          * This method is called once at the beginning of any <code>dequeue</code>
282          * method regardless of how many queue elements are dequeued.
283          * </p>
284          *
285          * @since Feb 10, 2003
286          *
287          * @param context The source from which the dequeue is performed.
288          */

289         public void before( Source context )
290         {
291             if (m_source.size() > (m_threshold + m_margin))
292             {
293                 SourceRunner runner = new SourceRunner(m_source, m_handler);
294                 try
295                 {
296                     m_threadPool.execute(runner);
297                 }
298                 catch ( InterruptedException JavaDoc e )
299                 {
300                 }
301
302                 m_runners.add( runner );
303             }
304             m_parent.before(context);
305         }
306
307         /**
308          * An operation executed after dequeing events from
309          * the queue. The Source is passed in so the implementation
310          * can determine to execute based on the queue properties.
311          *
312          * <p>
313          * This method is called once at the end of any <code>dequeue</code>
314          * method regardless of how many queue elements are dequeued.
315          * </p>
316          *
317          * @since Feb 10, 2003
318          *
319          * @param context The source from which the dequeue is performed.
320          */

321         public void after( Source context )
322         {
323             m_parent.after(context);
324
325             if (m_source.size() < (m_threshold - m_margin))
326             {
327                 if ( m_runners.size() > 0 )
328                 {
329                     SourceRunner runner = (SourceRunner)m_runners.removeFirst();
330                     runner.stop();
331                 }
332             }
333         }
334
335         /**
336          * Ensure all event runners are stopped for this partial pipeline.
337          */

338         public void stop()
339         {
340             Iterator it = m_runners.iterator();
341             while(it.hasNext())
342             {
343                 ((SourceRunner)it.next()).stop();;
344             }
345
346             m_initRunner.stop();
347         }
348     }
349 }
350
Popular Tags