1 22 package org.xsocket; 23 24 import java.util.Collection ; 25 import java.util.LinkedList ; 26 import java.util.List ; 27 import java.util.Timer ; 28 import java.util.TimerTask ; 29 import java.util.concurrent.Callable ; 30 import java.util.concurrent.Executor ; 31 import java.util.concurrent.Future ; 32 import java.util.concurrent.LinkedBlockingQueue ; 33 import java.util.concurrent.RejectedExecutionException ; 34 import java.util.concurrent.ThreadFactory ; 35 import java.util.concurrent.ThreadPoolExecutor ; 36 import java.util.concurrent.TimeUnit ; 37 import java.util.logging.Level ; 38 import java.util.logging.Logger ; 39 40 import org.xsocket.IWorkerPool; 41 42 43 48 public final class DynamicWorkerPool implements IWorkerPool { 49 50 private static final Logger LOG = Logger.getLogger(DynamicWorkerPool.class.getName()); 51 52 public static final String WORKER_PREFIX = "xWorker"; 53 54 public static final int DEFAULT_LOAD_THRESHOLD_DECREASE = 20; 55 public static final int DEFAULT_LOAD_THRESHOLD_INCREASE = 80; 56 public static final int DEFAULT_ADJUST_CHECK_PERIOD = 1000; 57 58 59 private static final int SLUGGISH_PERIOD = 30 * 1000; 60 61 private final Timer timer = new Timer ("xDynamicWorkerPoolTimer", true); 62 63 64 private ThreadPoolExecutor executor = null; 65 private final Object decLock = new Object (); 66 67 private SizeManager sizeManager = null; 68 private int minSize = 0; 69 private int maxSize = 0; 70 71 72 77 public DynamicWorkerPool(int minSize, int maxSize) { 78 this.minSize = minSize; 79 this.maxSize = maxSize; 80 81 int initial = minSize; 82 if (initial < 1) { 83 initial = 1; 84 } 85 86 executor = new ThreadPoolExecutor (initial, initial 87 , 0L, TimeUnit.MILLISECONDS 88 , new LinkedBlockingQueue <Runnable >() 89 , new WorkerThreadFactory()); 90 91 sizeManager = new SizeManager(DEFAULT_ADJUST_CHECK_PERIOD); 92 93 timer.schedule(new TimerTask () { 95 public void run() { 96 Thread.currentThread().setPriority(Thread.MIN_PRIORITY); 97 } 98 }, 0); 99 } 100 101 102 105 public void execute(Runnable command) { 106 try { 107 if (minSize < 1) { 108 synchronized (decLock) { 109 if (getPoolSize() < 1) { 110 sizeManager.timeLastChange = System.currentTimeMillis(); executor.setCorePoolSize(1); 112 } 113 } 114 } 115 116 executor.execute(command); 117 118 } catch (RejectedExecutionException e) { 119 120 if (executor.isShutdown()) { 122 Thread t = new Thread (command); 123 t.setDaemon(true); 124 t.start(); 125 126 } else { 128 LOG.warning("couldn't process command " + command + ". Reason: " + e.toString()); 129 } 130 } 131 } 132 133 134 135 138 public <T> List <Future <T>> invokeAll(Collection <Callable <T>> tasks) throws InterruptedException { 139 return executor.invokeAll(tasks); 140 } 141 142 143 146 public int getActiveCount() { 147 return executor.getActiveCount(); 148 } 149 150 151 154 public int getPoolSize() { 155 return executor.getCorePoolSize(); 156 } 157 158 159 160 165 public int getLoad() { 166 return (int) sizeManager.load.getValue(); 167 } 168 169 private int currentLoad() { 170 int currentSize = getPoolSize(); 171 if (currentSize == 0) { 172 return 0; 173 } 174 175 int activeCount = getActiveCount(); 176 if (activeCount == 0) { 177 return 0; 178 } 179 180 return (int) ((activeCount * 100) / currentSize); 181 } 182 183 184 187 public int getMaximumPoolSize() { 188 return maxSize; 189 } 190 191 192 193 196 public int getMinimumPoolSize() { 197 return minSize; 198 } 199 200 201 204 public boolean isOpen() { 205 return !executor.isShutdown(); 206 } 207 208 209 212 public void close() { 213 sizeManager.shutdown(); 214 executor.shutdownNow(); 215 216 timer.cancel(); 217 } 218 219 220 224 public void setAdjustPeriod(int adjustPeriodSec) { 225 sizeManager.setAdjustPeriod(adjustPeriodSec); 226 } 227 228 229 232 public int getAdjustPeriod() { 233 return sizeManager.getAdjustPeriod(); 234 } 235 236 237 240 public int getThresholdIncrease() { 241 return sizeManager.getIncThreshold(); 242 } 243 244 245 248 public void setThresholdIncrease(int incThreshold) { 249 sizeManager.setIncThreshold(incThreshold); 250 } 251 252 253 256 public int getThresholdDecrease() { 257 return sizeManager.getDecThreshold(); 258 } 259 260 261 264 public void setThresholdDecrease(int decThreshold) { 265 sizeManager.setDecThreshold(decThreshold); 266 } 267 268 269 270 273 int getLoadSluggish() { 274 return (int) sizeManager.sluggishLoad.getValue(); 275 } 276 277 278 281 long getDecRate() { 282 return sizeManager.getDecRate(); 283 } 284 285 286 289 @Override 290 public String toString() { 291 return "DynamicWorkerPool (size=" + getPoolSize() + ", running=" + getActiveCount()+ ", load=" + getLoad() 292 + ", minSize=" + getMinimumPoolSize() + ", maxSize=" + getMaximumPoolSize() + ", isOpen=" + isOpen() + ")"; 293 } 294 295 296 private static final class WorkerThreadFactory implements ThreadFactory { 297 private static int poolCounter = 0; 298 private int threadCounter = 0; 299 private String namePrefix = null; 300 301 WorkerThreadFactory() { 302 namePrefix = WORKER_PREFIX + "-" + (++poolCounter) + "-"; 303 } 304 305 public Thread newThread(Runnable r) { 306 Thread t = new Thread (r); 307 t.setName(namePrefix + (++threadCounter)); 308 t.setDaemon(true); 309 t.setPriority(Thread.NORM_PRIORITY); 310 return t; 311 } 312 } 313 314 315 private final class SizeManager { 316 private long timeLastChange = System.currentTimeMillis(); 317 318 private int adjustPeriodSec = 0; 319 320 private int decRate = 0; 321 private Average load = null; 322 private Average sluggishLoad = null; 323 324 private int incThreshold = DEFAULT_LOAD_THRESHOLD_INCREASE; 325 private int decThreshold = DEFAULT_LOAD_THRESHOLD_DECREASE; 326 327 private TimerTask task = null; 328 329 330 331 332 public SizeManager(int period) { 333 load = new Average(3); 334 sluggishLoad = new Average((int) (SLUGGISH_PERIOD / period)); 335 decRate = 5 * period; 336 337 setAdjustPeriod(period); 338 } 339 340 341 public int getDecThreshold() { 342 return decThreshold; 343 } 344 345 public int getIncThreshold() { 346 return incThreshold; 347 } 348 349 public long getDecRate() { 350 return decRate; 351 } 352 353 public void setDecThreshold(int decThreshold) { 354 this.decThreshold = decThreshold; 355 } 356 357 public void setIncThreshold(int incThreshold) { 358 this.incThreshold = incThreshold; 359 } 360 361 public void setAdjustPeriod(int period) { 362 this.adjustPeriodSec = period; 363 364 if (task != null) { 365 task.cancel(); 366 } 367 368 369 task = new TimerTask () { 370 @Override 371 public void run() { 372 adjustWorkerSize(); 373 } 374 }; 375 timer.schedule(task, period, period); 376 } 377 378 379 public int getAdjustPeriod() { 380 return adjustPeriodSec; 381 } 382 383 384 public void shutdown() { 385 if (task != null) { 386 task.cancel(); 387 } 388 } 389 390 391 private void adjustWorkerSize() { 392 393 try { 394 int size = getPoolSize(); 395 double load = currentLoad(); 396 397 checkForInc(size, load); 398 checkForDec(size, load); 399 } catch (Exception e) { 400 if (LOG.isLoggable(Level.FINE)) { 401 LOG.fine("error occured vy adjusting pool size. Reason: " + e.toString()); 402 } 403 } 404 } 405 406 407 private void checkForInc(int size, double currentLoad) { 408 load.add((int) currentLoad); 409 410 if ((load.getValue() >= incThreshold)) { 411 if (size < maxSize) { 412 if (LOG.isLoggable(Level.FINE)) { 413 LOG.fine("average load is " + load.getValue() + " increase pool size. new size is " 414 + (size + 1) + " (minSize=" + getMinimumPoolSize() 415 + ", maxSize=" + getMaximumPoolSize() + ")"); 416 } 417 executor.setCorePoolSize(size + 1); 418 timeLastChange = System.currentTimeMillis(); 419 } 420 } 421 } 422 423 424 private void checkForDec(int size, double currentLoad) { 425 sluggishLoad.add((int) currentLoad); 426 427 if (load.getValue() <= decThreshold) { 428 if ((sluggishLoad.getValue() <= decThreshold) && (size > minSize)) { 429 if (System.currentTimeMillis() > (timeLastChange + decRate)) { 430 if (size == 1) { 431 synchronized (decLock) { 432 if (!executor.getQueue().isEmpty()) { 433 if (LOG.isLoggable(Level.FINE)) { 434 LOG.fine("average load is " + sluggishLoad.getValue() + " decrease pool size. new size is " 435 + (size - 1) + " minSize=" + getMinimumPoolSize() 436 + ", maxSize=" + getMaximumPoolSize() + ")"); 437 } 438 executor.setCorePoolSize(size - 1); 439 timeLastChange = System.currentTimeMillis(); 440 } 441 } 442 } 443 } 444 } 445 } 446 } 447 } 448 449 450 451 452 private static final class Average { 453 private LinkedList <Double > list = new LinkedList <Double >(); 454 private int capacity = 0; 455 456 Average(int capacity) { 457 this.capacity = capacity; 458 } 459 460 public void add(double size) { 461 list.addLast(size); 462 if (list.size() > capacity) { 463 list.removeFirst(); 464 } 465 } 466 467 public void clear() { 468 list.clear(); 469 } 470 471 @SuppressWarnings ("unchecked") 472 public double getValue() { 473 if (list.size() == 0) { 474 return 0; 475 } 476 477 double i = 0; 478 LinkedList <Double > copy = (LinkedList <Double >) list.clone(); 479 480 for (double size : copy) { 481 i += size; 482 } 483 484 if (i <= 0) { 485 return 0; 486 } 487 488 i = i / copy.size(); 489 490 return i; 491 } 492 } 493 } 494 | Popular Tags |