1 17 package org.apache.excalibur.event.command; 18 19 import java.util.*; 20 21 import org.apache.commons.collections.StaticBucketMap; 22 import org.apache.excalibur.event.EventHandler; 23 import org.apache.excalibur.event.Source; 24 import org.apache.excalibur.event.DequeueInterceptor; 25 import org.apache.excalibur.event.Queue; 26 import org.apache.excalibur.event.impl.NullDequeueInterceptor; 27 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 28 29 36 public final class TPSPThreadManager implements ThreadManager 37 { 38 private final StaticBucketMap m_pipelines = new StaticBucketMap(); 39 private final int m_maxThreadsPerPool; 40 private final int m_threshold; 41 private final int m_margin; 42 43 48 public TPSPThreadManager() 49 { 50 this( 2, 1000 ); 51 } 52 53 60 public TPSPThreadManager( int maxThreadPerPool, int threshold ) 61 { 62 this(maxThreadPerPool, threshold, (threshold/4)); 63 } 64 65 73 public TPSPThreadManager( int maxThreadPerPool, int threshold, int margin ) 74 { 75 m_maxThreadsPerPool = maxThreadPerPool; 76 m_threshold = threshold; 77 m_margin = margin; 78 } 79 80 85 public void register( EventPipeline pipeline ) 86 { 87 Source[] sources = pipeline.getSources(); 88 EventHandler handler = pipeline.getEventHandler(); 89 List sourceList = new ArrayList(sources.length); 90 91 for (int i = 0; i < sources.length; i++) 92 { 93 PooledExecutor threadPool = new PooledExecutor(); 94 threadPool.setMinimumPoolSize(1); 95 threadPool.setMaximumPoolSize(m_maxThreadsPerPool); 96 SourceRunner initRunner = new SourceRunner(sources[i], handler); 97 98 try 99 { 100 threadPool.execute(initRunner); 101 } 102 catch ( InterruptedException e ) 103 { 104 } 105 106 sourceList.add(new SourceDequeueInterceptor(initRunner, handler, threadPool, m_threshold, m_margin)); 107 } 108 m_pipelines.put( pipeline, sourceList ); 109 } 110 111 116 public void deregister( EventPipeline pipeline ) 117 { 118 List sources = (List) m_pipelines.remove( pipeline ); 119 Iterator it = sources.iterator(); 120 while(it.hasNext()) 121 { 122 SourceDequeueInterceptor intercept = (SourceDequeueInterceptor)it.next(); 123 intercept.stop(); 124 } 125 } 126 127 130 public void deregisterAll() 131 { 132 Iterator it = m_pipelines.keySet().iterator(); 133 while(it.hasNext()) 134 { 135 deregister((EventPipeline)it.next()); 136 } 137 } 138 139 142 protected static final class SourceRunner implements Runnable 143 { 144 private final Source m_source; 145 private final EventHandler m_handler; 146 private volatile boolean m_keepProcessing; 147 148 154 protected SourceRunner( final Source source, final EventHandler handler ) 155 { 156 if ( source == null ) throw new NullPointerException ("source"); 157 if(handler == null)throw new NullPointerException ("handler"); 158 m_source = source; 159 m_handler = handler; 160 m_keepProcessing = true; 161 } 162 163 166 public void run() 167 { 168 while (m_keepProcessing) 169 { 170 Object event = m_source.dequeue(); 171 172 if ( event != null ) 173 { 174 m_handler.handleEvent( event ); 175 } 176 177 yield(); 178 } 179 } 180 181 184 private static void yield() 185 { 186 try 187 { 188 Thread.sleep(1); 189 } 190 catch (InterruptedException ie) 191 { 192 } 194 } 195 196 199 public void stop() 200 { 201 m_keepProcessing = false; 202 } 203 204 209 public Source getSource() 210 { 211 return m_source; 212 } 213 } 214 215 218 protected static final class SourceDequeueInterceptor implements DequeueInterceptor 219 { 220 private final Source m_source; 221 private final PooledExecutor m_threadPool; 222 private final int m_threshold; 223 private final DequeueInterceptor m_parent; 224 private final int m_margin; 225 private final LinkedList m_runners; 226 private final EventHandler m_handler; 227 private final SourceRunner m_initRunner; 228 229 239 public SourceDequeueInterceptor( SourceRunner runner, EventHandler handler, PooledExecutor threadPool, int threshold, int margin ) 240 { 241 if (runner == null) throw new NullPointerException ("runner"); 242 if (handler == null) throw new NullPointerException ("handler"); 243 if (threadPool == null) throw new NullPointerException ("threadPool"); 244 if ( threshold < threadPool.getMinimumPoolSize()) 245 throw new IllegalArgumentException ("threshold must be higher than the minimum number" + 246 " of threads for the pool"); 247 if ( margin < 0 ) 248 throw new IllegalArgumentException ("margin must not be less then zero"); 249 if ( threshold - margin <= threadPool.getMinimumPoolSize() ) 250 throw new IllegalArgumentException ( "The margin must not exceed or equal the" + 251 " differnece between threshold and the thread" + 252 " pool minimum size" ); 253 254 m_source = runner.getSource(); 255 m_initRunner = runner; 256 m_threadPool = threadPool; 257 m_threshold = threshold; 258 m_runners = new LinkedList(); 259 m_handler = handler; 260 261 if ( m_source instanceof Queue) 262 { 263 Queue queue = (Queue) m_source; 264 m_parent = queue.getDequeueInterceptor(); 265 queue.setDequeueInterceptor(this); 266 } 267 else 268 { 269 m_parent = new NullDequeueInterceptor(); 270 } 271 272 m_margin = margin; 273 } 274 275 289 public void before( Source context ) 290 { 291 if (m_source.size() > (m_threshold + m_margin)) 292 { 293 SourceRunner runner = new SourceRunner(m_source, m_handler); 294 try 295 { 296 m_threadPool.execute(runner); 297 } 298 catch ( InterruptedException e ) 299 { 300 } 301 302 m_runners.add( runner ); 303 } 304 m_parent.before(context); 305 } 306 307 321 public void after( Source context ) 322 { 323 m_parent.after(context); 324 325 if (m_source.size() < (m_threshold - m_margin)) 326 { 327 if ( m_runners.size() > 0 ) 328 { 329 SourceRunner runner = (SourceRunner)m_runners.removeFirst(); 330 runner.stop(); 331 } 332 } 333 } 334 335 338 public void stop() 339 { 340 Iterator it = m_runners.iterator(); 341 while(it.hasNext()) 342 { 343 ((SourceRunner)it.next()).stop();; 344 } 345 346 m_initRunner.stop(); 347 } 348 } 349 } 350 | Popular Tags |