KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > excalibur > event > command > AbstractThreadManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12  * implied.
13  *
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.excalibur.event.command;
18
19 import java.util.Iterator JavaDoc;
20
21 import org.apache.avalon.framework.activity.Disposable;
22 import org.apache.avalon.framework.activity.Initializable;
23 import org.apache.avalon.framework.logger.AbstractLogEnabled;
24 import org.apache.commons.collections.StaticBucketMap;
25 import org.apache.excalibur.event.EventHandler;
26 import org.apache.excalibur.event.Source;
27
28 import EDU.oswego.cs.dl.util.concurrent.Executor;
29
30 /**
31  * Abstract base class for a ThreadManager that has a single ThreadPool for
32  * all pipelines
33  *
34  * @author <a HREF="mailto:dev@avalon.apache.org">Avalon Development Team</a>
35  */

36 public abstract class AbstractThreadManager extends AbstractLogEnabled
37     implements Runnable JavaDoc, ThreadManager, Initializable, Disposable
38 {
39     /** The pipelines we are managing */
40     private final StaticBucketMap m_pipelines = new StaticBucketMap();
41
42     /** The ThreadPool we are using */
43     private Executor m_executor;
44
45     /** Whether we are done or not */
46     private volatile boolean m_done = false;
47
48     /** The number of milliseconds to sleep before runngin again: 1000 (1 sec.) */
49     private long m_sleepTime = 1000L;
50
51     /** Whether this class has been initialized or not */
52     private volatile boolean m_initialized = false;
53
54     /** Return whether the thread manager has been initialized or not */
55     protected boolean isInitialized()
56     {
57         return m_initialized;
58     }
59
60     /**
61      * Set the amount of time to sleep between checks on the queue
62      *
63      * @param sleepTime Number of milliseconds
64      */

65     protected void setSleepTime( long sleepTime )
66     {
67         m_sleepTime = sleepTime;
68     }
69
70     /**
71      * Get the current amount of sleep time.
72      */

73     protected long getSleepTime()
74     {
75         return m_sleepTime;
76     }
77
78     /**
79      * Set the executor we are using
80      *
81      * @param executor to use
82      */

83     protected void setExecutor( Executor executor )
84     {
85         if( null == m_executor )
86         {
87             m_executor = executor;
88         }
89         else
90         {
91             throw new IllegalStateException JavaDoc( "Can only set the executor once" );
92         }
93     }
94
95     /**
96      * Set up the ThreadManager. All required parameters must have already been set.
97      *
98      * @throws Exception if there is any problem setting up the ThreadManager
99      */

100     public void initialize() throws Exception JavaDoc
101     {
102         if( null == m_executor )
103         {
104             throw new IllegalStateException JavaDoc( "No thread pool set" );
105         }
106
107         m_executor.execute( this );
108         this.m_initialized = true;
109     }
110
111     /**
112      * Register an EventPipeline with the ThreadManager.
113      *
114      * @param pipeline The pipeline we are registering
115      */

116     public void register( EventPipeline pipeline )
117     {
118         if( !isInitialized() )
119         {
120             throw new IllegalStateException JavaDoc( "ThreadManager must be initialized before "
121                                              + "registering a pipeline" );
122         }
123
124         try
125         {
126             PipelineRunner runner = new PipelineRunner( pipeline );
127             runner.enableLogging( getLogger() );
128             m_pipelines.put( pipeline, runner );
129
130             if( m_done )
131             {
132                 m_executor.execute( this );
133             }
134         }
135         catch( InterruptedException JavaDoc ie )
136         {
137             getLogger().warn("Caught InterruptedException in register", ie);
138             // ignore for now
139
}
140     }
141
142     /**
143      * Deregister an EventPipeline with the ThreadManager
144      *
145      * @param pipeline The pipeline we are de-registering
146      */

147     public void deregister( EventPipeline pipeline )
148     {
149         if( !isInitialized() )
150         {
151             throw new IllegalStateException JavaDoc( "ThreadManager must be initialized before "
152                                              + "deregistering a pipeline" );
153         }
154
155         m_pipelines.remove( pipeline );
156
157         if( m_pipelines.isEmpty() )
158         {
159             m_done = true;
160         }
161     }
162
163     /**
164      * Deregisters all EventPipelines from this ThreadManager
165      */

166     public void deregisterAll()
167     {
168         if( !isInitialized() )
169         {
170             throw new IllegalStateException JavaDoc( "ThreadManager must be initialized "
171                                              + "before deregistering pipelines" );
172         }
173
174         Iterator JavaDoc it = m_pipelines.keySet().iterator();
175         while ( it.hasNext() )
176         {
177             deregister( (EventPipeline) it.next() );
178         }
179
180         m_done = true;
181
182         if ( ! m_pipelines.isEmpty() )
183         {
184             throw new IllegalStateException JavaDoc("We still have pipelines, but no runners are available!");
185         }
186     }
187
188
189     /**
190      * Get rid of the ThreadManager.
191      */

192     public void dispose()
193     {
194         m_done = true;
195         deregisterAll();
196
197         doDispose();
198     }
199
200     protected void doDispose() {} // default impl to work with released code
201

202     /**
203      * The code that is run in the background to manage the ThreadPool and the
204      * EventPipelines
205      */

206     public void run()
207     {
208         while( !m_done )
209         {
210             Iterator JavaDoc i = m_pipelines.values().iterator();
211
212             while( i.hasNext() )
213             {
214                 PipelineRunner nextRunner = ( PipelineRunner ) i.next();
215
216                 try
217                 {
218                     m_executor.execute( nextRunner );
219                 }
220                 catch( Exception JavaDoc e )
221                 {
222                     if( getLogger().isErrorEnabled() )
223                     {
224                         getLogger().error( "Caught exception in ThreadManager management thread", e );
225                     }
226                 }
227             }
228
229             if ( !m_done )
230             {
231                 try
232                 {
233                     Thread.sleep( m_sleepTime );
234                 }
235                 catch( InterruptedException JavaDoc e )
236                 {
237                     Thread.interrupted();
238                 }
239             }
240         }
241     }
242
243     /**
244      * The PipelineRunner class pulls all the events from the Source, and puts them in the EventHandler.
245      * Both of those objects are part of the EventPipeline.
246      */

247     public static final class PipelineRunner
248         extends AbstractLogEnabled
249         implements Runnable JavaDoc
250     {
251         /** The pipeline we are managing */
252         private final EventPipeline m_pipeline;
253
254         /**
255          * Create a PipelineRunner.
256          *
257          * @param pipeline The EventPipeline we are running
258          */

259         protected PipelineRunner( EventPipeline pipeline )
260         {
261             m_pipeline = pipeline;
262         }
263
264         /**
265          * The code that actually pulls the events from the Sources and sends them to the event handler
266          */

267         public void run()
268         {
269             Source[] sources = m_pipeline.getSources();
270             EventHandler handler = m_pipeline.getEventHandler();
271
272             for( int i = 0; i < sources.length; i++ )
273             {
274                 try
275                 {
276                     handler.handleEvents( sources[i].dequeueAll() );
277                 }
278                 catch( Exception JavaDoc e )
279                 {
280                     // We want to catch this, because this is the only
281
// place where exceptions happening in this thread
282
// can be logged
283

284                     if( getLogger().isErrorEnabled() )
285                     {
286                         getLogger().error( "Exception processing EventPipeline [msg: "
287                                            + e.getMessage() + "]", e );
288                     }
289                 }
290             }
291         }
292     }
293 }
294
Popular Tags