1 17 package org.apache.excalibur.event.command; 18 19 import java.util.Iterator ; 20 21 import org.apache.avalon.framework.activity.Disposable; 22 import org.apache.avalon.framework.activity.Initializable; 23 import org.apache.avalon.framework.logger.AbstractLogEnabled; 24 import org.apache.commons.collections.StaticBucketMap; 25 import org.apache.excalibur.event.EventHandler; 26 import org.apache.excalibur.event.Source; 27 28 import EDU.oswego.cs.dl.util.concurrent.Executor; 29 30 36 public abstract class AbstractThreadManager extends AbstractLogEnabled 37 implements Runnable , ThreadManager, Initializable, Disposable 38 { 39 40 private final StaticBucketMap m_pipelines = new StaticBucketMap(); 41 42 43 private Executor m_executor; 44 45 46 private volatile boolean m_done = false; 47 48 49 private long m_sleepTime = 1000L; 50 51 52 private volatile boolean m_initialized = false; 53 54 55 protected boolean isInitialized() 56 { 57 return m_initialized; 58 } 59 60 65 protected void setSleepTime( long sleepTime ) 66 { 67 m_sleepTime = sleepTime; 68 } 69 70 73 protected long getSleepTime() 74 { 75 return m_sleepTime; 76 } 77 78 83 protected void setExecutor( Executor executor ) 84 { 85 if( null == m_executor ) 86 { 87 m_executor = executor; 88 } 89 else 90 { 91 throw new IllegalStateException ( "Can only set the executor once" ); 92 } 93 } 94 95 100 public void initialize() throws Exception 101 { 102 if( null == m_executor ) 103 { 104 throw new IllegalStateException ( "No thread pool set" ); 105 } 106 107 m_executor.execute( this ); 108 this.m_initialized = true; 109 } 110 111 116 public void register( EventPipeline pipeline ) 117 { 118 if( !isInitialized() ) 119 { 120 throw new IllegalStateException ( "ThreadManager must be initialized before " 121 + "registering a pipeline" ); 122 } 123 124 try 125 { 126 PipelineRunner runner = new PipelineRunner( pipeline ); 127 runner.enableLogging( getLogger() ); 128 m_pipelines.put( pipeline, runner ); 129 130 if( m_done ) 131 { 132 m_executor.execute( this ); 133 } 134 } 135 catch( InterruptedException ie ) 136 { 137 getLogger().warn("Caught InterruptedException in register", ie); 138 } 140 } 141 142 147 public void deregister( EventPipeline pipeline ) 148 { 149 if( !isInitialized() ) 150 { 151 throw new IllegalStateException ( "ThreadManager must be initialized before " 152 + "deregistering a pipeline" ); 153 } 154 155 m_pipelines.remove( pipeline ); 156 157 if( m_pipelines.isEmpty() ) 158 { 159 m_done = true; 160 } 161 } 162 163 166 public void deregisterAll() 167 { 168 if( !isInitialized() ) 169 { 170 throw new IllegalStateException ( "ThreadManager must be initialized " 171 + "before deregistering pipelines" ); 172 } 173 174 Iterator it = m_pipelines.keySet().iterator(); 175 while ( it.hasNext() ) 176 { 177 deregister( (EventPipeline) it.next() ); 178 } 179 180 m_done = true; 181 182 if ( ! m_pipelines.isEmpty() ) 183 { 184 throw new IllegalStateException ("We still have pipelines, but no runners are available!"); 185 } 186 } 187 188 189 192 public void dispose() 193 { 194 m_done = true; 195 deregisterAll(); 196 197 doDispose(); 198 } 199 200 protected void doDispose() {} 202 206 public void run() 207 { 208 while( !m_done ) 209 { 210 Iterator i = m_pipelines.values().iterator(); 211 212 while( i.hasNext() ) 213 { 214 PipelineRunner nextRunner = ( PipelineRunner ) i.next(); 215 216 try 217 { 218 m_executor.execute( nextRunner ); 219 } 220 catch( Exception e ) 221 { 222 if( getLogger().isErrorEnabled() ) 223 { 224 getLogger().error( "Caught exception in ThreadManager management thread", e ); 225 } 226 } 227 } 228 229 if ( !m_done ) 230 { 231 try 232 { 233 Thread.sleep( m_sleepTime ); 234 } 235 catch( InterruptedException e ) 236 { 237 Thread.interrupted(); 238 } 239 } 240 } 241 } 242 243 247 public static final class PipelineRunner 248 extends AbstractLogEnabled 249 implements Runnable 250 { 251 252 private final EventPipeline m_pipeline; 253 254 259 protected PipelineRunner( EventPipeline pipeline ) 260 { 261 m_pipeline = pipeline; 262 } 263 264 267 public void run() 268 { 269 Source[] sources = m_pipeline.getSources(); 270 EventHandler handler = m_pipeline.getEventHandler(); 271 272 for( int i = 0; i < sources.length; i++ ) 273 { 274 try 275 { 276 handler.handleEvents( sources[i].dequeueAll() ); 277 } 278 catch( Exception e ) 279 { 280 284 if( getLogger().isErrorEnabled() ) 285 { 286 getLogger().error( "Exception processing EventPipeline [msg: " 287 + e.getMessage() + "]", e ); 288 } 289 } 290 } 291 } 292 } 293 } 294 | Popular Tags |