1 8 package org.apache.avalon.excalibur.command; 9 10 import org.apache.avalon.framework.parameters.Parameters; 11 import org.apache.avalon.excalibur.concurrent.Mutex; 12 import org.apache.avalon.excalibur.thread.*; 13 import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool; 14 15 import org.apache.avalon.excalibur.event.Sink; 16 import org.apache.avalon.excalibur.event.EventHandler; 17 18 import java.util.HashSet ; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 22 29 public final class TPSPThreadManager implements Runnable , ThreadManager 30 { 31 private final ThreadPool m_threadPool; 32 private final Mutex m_mutex = new Mutex(); 33 private final HashMap m_pipelines = new HashMap (); 34 private ThreadControl m_threadControl; 35 private boolean m_done = false; 36 private final long m_sleepTime; 37 38 43 public TPSPThreadManager() 44 { 45 this ( 1, 1, 1000 ); 46 } 47 48 52 public TPSPThreadManager( int numProcessors, int threadsPerProcessor, long sleepTime ) 53 { 54 int processors = Math.max( numProcessors, 1 ); 55 int threads = Math.max( threadsPerProcessor, 1 ); 56 57 m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager", 58 ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L ); 59 60 m_sleepTime = sleepTime; 61 m_threadControl = m_threadPool.execute( this ); 62 } 63 64 67 public void register( EventPipeline pipeline ) 68 { 69 try 70 { 71 m_mutex.acquire(); 72 73 m_pipelines.put( pipeline, new PipelineRunner( pipeline ) ); 74 75 if ( m_done ) 76 { 77 m_threadControl = m_threadPool.execute( this ); 78 } 79 } 80 catch ( InterruptedException ie ) 81 { 82 } 84 finally 85 { 86 m_mutex.release(); 87 } 88 } 89 90 93 public void deregister( EventPipeline pipeline ) 94 { 95 try 96 { 97 m_mutex.acquire(); 98 99 m_pipelines.remove( pipeline ); 100 101 if ( m_pipelines.isEmpty() ) 102 { 103 m_done = true; 104 m_threadControl.join( 1000 ); 105 } 106 } 107 catch ( InterruptedException ie ) 108 { 109 } 111 finally 112 { 113 m_mutex.release(); 114 } 115 } 116 117 120 public void deregisterAll() 121 { 122 try 123 { 124 m_mutex.acquire(); 125 126 m_done = true; 127 m_pipelines.clear(); 128 129 m_threadControl.join( 1000 ); 130 } 131 catch ( InterruptedException ie ) 132 { 133 } 135 finally 136 { 137 m_mutex.release(); 138 } 139 } 140 141 public void run() 142 { 143 while ( ! m_done ) 144 { 145 try 146 { 147 m_mutex.acquire(); 148 149 Iterator i = m_pipelines.values().iterator(); 150 151 while ( i.hasNext() ) 152 { 153 m_threadPool.execute( (PipelineRunner) i.next() ); 154 } 155 } 156 catch ( InterruptedException ie ) 157 { 158 } 160 finally 161 { 162 m_mutex.release(); 163 } 164 165 try 166 { 167 Thread.sleep( m_sleepTime ); 168 } 169 catch ( InterruptedException ie ) 170 { 171 } 173 } 174 } 175 176 public final static class PipelineRunner implements Runnable 177 { 178 private final EventPipeline m_pipeline; 179 180 protected PipelineRunner( EventPipeline pipeline ) 181 { 182 m_pipeline = pipeline; 183 } 184 185 public void run() 186 { 187 Sink[] sinks = m_pipeline.getSinks(); 188 EventHandler handler = m_pipeline.getEventHandler(); 189 190 for (int i = 0; i < sinks.length; i++) 191 { 192 handler.handleEvents( sinks[i].dequeueAll() ); 193 } 194 } 195 } 196 } | Popular Tags |