1 8 package org.apache.avalon.excalibur.command; 9 10 import org.apache.avalon.framework.parameters.Parameters; 11 import org.apache.avalon.excalibur.concurrent.Mutex; 12 import org.apache.avalon.excalibur.thread.*; 13 import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool; 14 15 import org.apache.avalon.excalibur.event.Sink; 16 import org.apache.avalon.excalibur.event.EventHandler; 17 18 import java.util.HashSet ; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 22 29 public final class TPCThreadManager implements Runnable , ThreadManager 30 { 31 private final ThreadPool m_threadPool; 32 private final Mutex m_mutex = new Mutex(); 33 private final HashMap m_pipelines = new HashMap (); 34 private ThreadControl m_threadControl; 35 private boolean m_done = false; 36 private final long m_sleepTime; 37 38 43 public TPCThreadManager() 44 { 45 this( Integer.parseInt( System.getProperty( "os.arch.cpus", "1" ) ) , 1 ); 46 } 47 48 53 public TPCThreadManager(Parameters params) 54 { 55 this( params.getParameterAsInteger( "os.arch.cpus", 1 ) , 56 params.getParameterAsInteger( "container.threadsPerCPU", 2 ) ); 57 } 58 59 62 public TPCThreadManager( int numProcessors ) 63 { 64 this( numProcessors, 1 ); 65 } 66 67 71 public TPCThreadManager( int numProcessors, int threadsPerProcessor ) 72 { 73 this( numProcessors, threadsPerProcessor, 1000 ); 74 } 75 76 80 public TPCThreadManager( int numProcessors, int threadsPerProcessor, long sleepTime ) 81 { 82 int processors = Math.max( numProcessors, 1 ); 83 int threads = Math.max( threadsPerProcessor, 1 ); 84 85 m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager", 86 ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L ); 87 88 m_sleepTime = sleepTime; 89 m_threadControl = m_threadPool.execute( this ); 90 } 91 92 95 public void register( EventPipeline pipeline ) 96 { 97 try 98 { 99 m_mutex.acquire(); 100 101 m_pipelines.put( pipeline, new PipelineRunner( pipeline ) ); 102 103 if ( m_done ) 104 { 105 m_threadControl = m_threadPool.execute( this ); 106 } 107 } 108 catch ( InterruptedException ie ) 109 { 110 } 112 finally 113 { 114 m_mutex.release(); 115 } 116 } 117 118 121 public void deregister( EventPipeline pipeline ) 122 { 123 try 124 { 125 m_mutex.acquire(); 126 127 m_pipelines.remove( pipeline ); 128 129 if ( m_pipelines.isEmpty() ) 130 { 131 m_done = true; 132 m_threadControl.join( 1000 ); 133 } 134 } 135 catch ( InterruptedException ie ) 136 { 137 } 139 finally 140 { 141 m_mutex.release(); 142 } 143 } 144 145 148 public void deregisterAll() 149 { 150 try 151 { 152 m_mutex.acquire(); 153 154 m_done = true; 155 m_pipelines.clear(); 156 157 m_threadControl.join( 1000 ); 158 } 159 catch ( InterruptedException ie ) 160 { 161 } 163 finally 164 { 165 m_mutex.release(); 166 } 167 } 168 169 public void run() 170 { 171 while ( ! m_done ) 172 { 173 try 174 { 175 m_mutex.acquire(); 176 177 Iterator i = m_pipelines.values().iterator(); 178 179 while ( i.hasNext() ) 180 { 181 m_threadPool.execute( (PipelineRunner) i.next() ); 182 } 183 } 184 catch ( InterruptedException ie ) 185 { 186 } 188 finally 189 { 190 m_mutex.release(); 191 } 192 193 try 194 { 195 Thread.sleep( m_sleepTime ); 196 } 197 catch ( InterruptedException ie ) 198 { 199 } 201 } 202 } 203 204 public final static class PipelineRunner implements Runnable 205 { 206 private final EventPipeline m_pipeline; 207 208 protected PipelineRunner( EventPipeline pipeline ) 209 { 210 m_pipeline = pipeline; 211 } 212 213 public void run() 214 { 215 Sink[] sinks = m_pipeline.getSinks(); 216 EventHandler handler = m_pipeline.getEventHandler(); 217 218 for (int i = 0; i < sinks.length; i++) 219 { 220 handler.handleEvents( sinks[i].dequeueAll() ); 221 } 222 } 223 } 224 } | Popular Tags |