KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > avalon > excalibur > command > TPSPThreadManager


1 /*
2  * Copyright (C) The Apache Software Foundation. All rights reserved.
3  *
4  * This software is published under the terms of the Apache Software License
5  * version 1.1, a copy of which has been included with this distribution in
6  * the LICENSE.txt file.
7  */

8 package org.apache.avalon.excalibur.command;
9
10 import org.apache.avalon.framework.parameters.Parameters;
11 import org.apache.avalon.excalibur.concurrent.Mutex;
12 import org.apache.avalon.excalibur.thread.*;
13 import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
14
15 import org.apache.avalon.excalibur.event.Sink;
16 import org.apache.avalon.excalibur.event.EventHandler;
17
18 import java.util.HashSet JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Iterator JavaDoc;
21
22 /**
23  * This is a ThreadManager which provides a threadpool per Sink per EventPipeline.
24  *
25  * ::NOTE:: This is not implemented yet!
26  *
27  * @author <a HREF="mailto:bloritsch@apache.org">Berin Loritsch</a>
28  */

29 public final class TPSPThreadManager implements Runnable JavaDoc, ThreadManager
30 {
31     private final ThreadPool m_threadPool;
32     private final Mutex m_mutex = new Mutex();
33     private final HashMap JavaDoc m_pipelines = new HashMap JavaDoc();
34     private ThreadControl m_threadControl;
35     private boolean m_done = false;
36     private final long m_sleepTime;
37
38     /**
39      * The default constructor assumes there is a system property named "os.arch.cpus"
40      * that has a default for the number of CPUs on a system. Otherwise, the value
41      * is 1.
42      */

43     public TPSPThreadManager()
44     {
45         this ( 1, 1, 1000 );
46     }
47
48     /**
49      * Constructor provides a specified number of threads per processor. If
50      * either value is less then one, then the value is rewritten as one.
51      */

52     public TPSPThreadManager( int numProcessors, int threadsPerProcessor, long sleepTime )
53     {
54         int processors = Math.max( numProcessors, 1 );
55         int threads = Math.max( threadsPerProcessor, 1 );
56
57         m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager",
58                 ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L );
59
60         m_sleepTime = sleepTime;
61         m_threadControl = m_threadPool.execute( this );
62     }
63
64     /**
65      * Register an EventPipeline with the ThreadManager.
66      */

67     public void register( EventPipeline pipeline )
68     {
69         try
70         {
71             m_mutex.acquire();
72
73             m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
74
75             if ( m_done )
76             {
77                 m_threadControl = m_threadPool.execute( this );
78             }
79         }
80         catch ( InterruptedException JavaDoc ie )
81         {
82             // ignore for now
83
}
84         finally
85         {
86             m_mutex.release();
87         }
88     }
89
90     /**
91      * Deregister an EventPipeline with the ThreadManager
92      */

93     public void deregister( EventPipeline pipeline )
94     {
95         try
96         {
97             m_mutex.acquire();
98
99             m_pipelines.remove( pipeline );
100
101             if ( m_pipelines.isEmpty() )
102             {
103                 m_done = true;
104                 m_threadControl.join( 1000 );
105             }
106         }
107         catch ( InterruptedException JavaDoc ie )
108         {
109             // ignore for now
110
}
111         finally
112         {
113             m_mutex.release();
114         }
115     }
116
117     /**
118      * Deregisters all EventPipelines from this ThreadManager
119      */

120     public void deregisterAll()
121     {
122         try
123         {
124             m_mutex.acquire();
125
126             m_done = true;
127             m_pipelines.clear();
128
129             m_threadControl.join( 1000 );
130         }
131         catch ( InterruptedException JavaDoc ie )
132         {
133             // ignore for now
134
}
135         finally
136         {
137             m_mutex.release();
138         }
139     }
140
141     public void run()
142     {
143         while ( ! m_done )
144         {
145             try
146             {
147                 m_mutex.acquire();
148
149                 Iterator JavaDoc i = m_pipelines.values().iterator();
150
151                 while ( i.hasNext() )
152                 {
153                     m_threadPool.execute( (PipelineRunner) i.next() );
154                 }
155             }
156             catch ( InterruptedException JavaDoc ie )
157             {
158                 // ignore for now
159
}
160             finally
161             {
162                 m_mutex.release();
163             }
164
165             try
166             {
167                 Thread.sleep( m_sleepTime );
168             }
169             catch ( InterruptedException JavaDoc ie )
170             {
171                // ignore and continue processing
172
}
173         }
174     }
175
176     public final static class PipelineRunner implements Runnable JavaDoc
177     {
178         private final EventPipeline m_pipeline;
179
180         protected PipelineRunner( EventPipeline pipeline )
181         {
182             m_pipeline = pipeline;
183         }
184
185         public void run()
186         {
187             Sink[] sinks = m_pipeline.getSinks();
188             EventHandler handler = m_pipeline.getEventHandler();
189
190             for (int i = 0; i < sinks.length; i++)
191             {
192                 handler.handleEvents( sinks[i].dequeueAll() );
193             }
194         }
195     }
196 }
Popular Tags