1 21 package org.jsmtpd.generic.threadpool; 22 23 import java.util.HashSet ; 24 import java.util.Iterator ; 25 import java.util.LinkedList ; 26 import java.util.List ; 27 import java.util.Set ; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 50 public class GrowingThreadPool extends Thread implements ThreadPool { 51 private Log log = LogFactory.getLog(GrowingThreadPool.class); 52 private List <ThreadWorker> threads = new LinkedList <ThreadWorker>(); 53 private int maxSize; 54 private int minimumThreads; 55 private int growth; 56 private String displayThreadName; 57 private String threadClassName; 58 private boolean running=true; 59 private long lastUp; 60 private float growthRatio=0.2f; 61 private float minRatio=0.1f; 62 private int cycleTime=5000; 63 64 72 public GrowingThreadPool(int numThreads, String threadClassName,String displayThreadName) throws InstantiationException , IllegalAccessException , ClassNotFoundException { 73 ThreadWorker tmp; 74 IThreadedClass cls; 75 76 this.displayThreadName=displayThreadName; 77 this.threadClassName=threadClassName; 78 maxSize=numThreads; 79 minimumThreads=(int)Math.round(numThreads*minRatio); 81 if (minimumThreads==0) 82 minimumThreads=1; 83 84 growth = (int) Math.round(numThreads*growthRatio); 86 if (growth==0) 87 growth=1; 88 89 log.info("Starting initial pool of "+minimumThreads+" threads"); 90 for (int i = 0; i < minimumThreads; i++) { 91 tmp = new ThreadWorker(); 92 cls = (IThreadedClass) Class.forName(threadClassName).newInstance(); 93 tmp.setWorker(cls); 94 tmp.setName(displayThreadName+"-"+tmp.getId()); 95 tmp.start(); 96 while (!tmp.isFree()) { 97 Thread.yield(); 99 } 100 log.debug("Thread "+tmp.getName()+" started"); 101 threads.add(tmp); 102 } 103 lastUp=System.currentTimeMillis(); 104 start(); 105 106 } 107 108 112 public synchronized void gracefullShutdown() { 113 running=false; 114 this.interrupt(); 115 log.debug("Gracefull shutdown ..."); 116 ThreadWorker tmp; 117 for (int i = 0; i < threads.size(); i++) { 118 tmp = (ThreadWorker) threads.get(i); 119 tmp.gracefullShutdown(); 120 } 121 } 122 123 127 public synchronized void forceShutdown() { 128 running=false; 129 this.interrupt(); 130 log.debug("Forcing shutdown ..."); 131 ThreadWorker tmp; 132 for (int i = 0; i < threads.size(); i++) { 133 tmp = (ThreadWorker) threads.get(i); 134 tmp.forceShutdown(); 135 } 136 } 137 138 142 public synchronized boolean hasFreeThread() { 143 for (Iterator iter = threads.iterator(); iter.hasNext();) { 144 ThreadWorker element = (ThreadWorker) iter.next(); 145 if (element.isFree()) 146 return true; 147 } 148 if ((threads.size()+growth)<=maxSize) 149 return true; 150 return false; 151 } 152 153 156 public synchronized int countFreeThread() { 157 int count = 0; 158 for (Iterator iter = threads.iterator(); iter.hasNext();) { 159 ThreadWorker element = (ThreadWorker) iter.next(); 160 if (element.isFree()) 161 count++; 162 } 163 return count; 164 } 165 166 171 public synchronized void assignFreeThread(Object obj) throws BusyThreadPoolException { 172 if (countFreeThread()==0) { 173 growPool (); 174 } 175 int i = 0; 176 for (ThreadWorker worker : threads) { 177 if (worker.isFree()) { 178 log.debug("Worker "+worker.getName()+" is free, assigning job"); 179 worker.setParam(obj); 180 worker.wake(); 181 return; 182 } 183 i++; 184 } 185 log.warn("Thread pool exhausted !"); 186 throw new BusyThreadPoolException(); 187 } 188 189 public synchronized void growPool () { 190 log.debug("Trying to grow thread pool"); 191 ThreadWorker tmp; 192 IThreadedClass cls; 193 194 if ((threads.size()+growth)<=maxSize) { 195 try { 196 lastUp=System.currentTimeMillis(); 197 log.info("Increasing number of threads by "+growth+", starting from "+threads.size()); 198 for (int i=0;i<growth;i++) { 199 tmp = new ThreadWorker(); 200 cls = (IThreadedClass) Class.forName(threadClassName).newInstance(); 201 tmp.setWorker(cls); 202 tmp.setName(displayThreadName+"#"+tmp.getId()); 203 tmp.start(); 204 while (!tmp.isFree()) { 205 Thread.yield(); 207 } 208 threads.add(tmp); 209 log.debug("Thread "+tmp.getName()+" started"); 210 } 211 } catch (Exception e) { 212 log.error("Could not increase pool",e); 213 } 214 } else { 215 log.error("Thread pool too low to assign a thread"); 216 } 217 } 218 219 public synchronized void lowerPool() { 220 long diff = System.currentTimeMillis()-lastUp; 221 if ((diff>(cycleTime*4)) && (threads.size()>minimumThreads) && (countFreeThread()>growth)) { 222 Set <ThreadWorker> toRemove=new HashSet <ThreadWorker>(); 224 for (int i=threads.size()-1;i>0;i--) { ThreadWorker worker = threads.get(i); 226 if (worker.isFree()) 227 toRemove.add(worker); 228 if (toRemove.size()==growth) 229 break; 230 } 231 log.info("Reducing number of threads by "+growth+", starting from "+threads.size()); 232 for (ThreadWorker worker : toRemove) { 233 log.debug("Shutting down thread "+worker.getName()); 234 worker.gracefullShutdown(); 235 threads.remove(worker); 236 } 237 } 238 } 239 240 public void run () { 241 setName("C-"+getId()); 242 log.info("Control thread started"); 243 while (running) { 244 try { 245 Thread.sleep(cycleTime); 246 } catch (InterruptedException e) { 247 } 248 log.debug("Current size "+threads.size()+"/"+maxSize+", "+countFreeThread()+" free threads"); 249 lowerPool(); 250 } 251 log.info("Control thread ended"); 252 } 253 254 257 public void setCycleTime(int cycleTime) { 258 this.cycleTime = cycleTime; 259 } 260 261 264 public void setGrowthRatio(float growthRatio) { 265 this.growthRatio = growthRatio; 266 } 267 268 271 public void setMinRatio(float minRatio) { 272 this.minRatio = minRatio; 273 } 274 275 } | Popular Tags |