1 37 38 package net.sourceforge.cruisecontrol.util.threadpool; 39 40 import java.util.Collections ; 41 import java.util.HashMap ; 42 import java.util.Iterator ; 43 import java.util.LinkedList ; 44 import java.util.List ; 45 import java.util.Map ; 46 47 import net.sourceforge.cruisecontrol.util.TdTimer; 48 49 import org.apache.log4j.Logger; 50 51 62 63 public class ThreadQueue extends Thread { 64 private static final Logger LOG = Logger.getLogger(ThreadQueue.class); 65 66 69 private final List idleTasks = Collections.synchronizedList(new LinkedList ()); 70 73 private final List busyTasks = Collections.synchronizedList(new LinkedList ()); 74 75 78 private final Map resultList = Collections.synchronizedMap(new HashMap ()); 79 80 84 85 private final Map runningThreads = Collections.synchronizedMap(new HashMap ()); 86 87 90 private final int threadCount = ThreadQueueProperties.getMaxThreadCount(); 91 92 95 private static final int SLEEP_TIME = 100; 96 97 100 private static ThreadQueue threadPool; 101 102 107 108 private static long nameCounter = Long.MIN_VALUE; 109 110 114 115 private static final Long NAME_COUNTER_SYNCH = new Long ("0"); 116 117 120 private static boolean terminate = false; 121 122 127 public void run() { 128 while (true) { 129 if (ThreadQueue.terminate) { 130 LOG.info("terminating ThreadQueue.run()"); 131 return; 132 } 133 134 final boolean nothingWaiting = idleTasks.size() == 0; 135 final boolean maxedOut = busyTasks.size() >= threadCount; 136 137 if (nothingWaiting || maxedOut) { 138 sleep(SLEEP_TIME); 139 } else { 140 LOG.debug("handling waiting task"); 141 handleWaitingTask(); 142 } 143 144 cleanCompletedTasks(); 145 } 146 } 147 148 private void handleWaitingTask() { 149 synchronized (busyTasks) { 150 WorkerThread worker = (WorkerThread) idleTasks.remove(0); 151 Thread thisThread = new Thread (worker); 152 busyTasks.add(worker); 153 runningThreads.put(worker, thisThread); 154 if (!ThreadQueue.terminate) { 155 thisThread.start(); 156 } 157 } 158 } 159 160 private void cleanCompletedTasks() { 161 synchronized (busyTasks) { 162 Iterator tasks = busyTasks.iterator(); 163 while (tasks.hasNext()) { 164 WorkerThread task = (WorkerThread) tasks.next(); 165 Object result = task.getResult(); 166 final boolean taskDone = result != null; 167 if (taskDone) { 168 LOG.debug("Found a finished task"); 169 LOG.debug("tempTask.getName() = " + task.getName()); 170 LOG.debug("tempTask.getResult() = " + task.getResult()); 171 172 resultList.put(task.getName(), result); 173 tasks.remove(); 174 runningThreads.remove(task); 175 } 176 } 177 } 178 } 179 180 184 185 private static ThreadQueue getThreadQueue() { 186 if (threadPool == null) { 187 threadPool = new ThreadQueue(); 188 threadPool.start(); 189 } 190 return threadPool; 191 } 192 193 196 public static void addTask(WorkerThread task) { 197 LOG.debug("Preparing to add worker task " + task.getName()); 198 if (task.getName().equals(WorkerThread.BLANK_NAME)) { 199 task.setName(nextName()); 200 } 201 if (isActive(task.getName())) { 203 throw new RuntimeException ("Duplicate task name!"); 204 } 205 synchronized (getThreadQueue().busyTasks) { 206 getThreadQueue().idleTasks.add(task); 207 } 208 } 209 210 215 public static String findPosition(String taskName) { 216 WorkerThread task = getIdleTask(taskName); 217 if (task != null) { 218 return getTaskPosition(task, getThreadQueue().idleTasks, "IDLE"); 219 } 220 task = getBusyTask(taskName); 221 if (task != null) { 222 return getTaskPosition(task, getThreadQueue().busyTasks, "BUSY"); 223 } 224 Object result = getResult(taskName); 225 if (result != null) { 226 return "[ COMPLETE ]"; 227 } 228 return "[ not found in queues ]"; 229 } 230 231 private static String getTaskPosition(WorkerThread task, List queue, String queueName) { 232 int position; 233 int length; 234 synchronized (getThreadQueue().busyTasks) { 235 position = queue.indexOf(task); 236 length = queue.size(); 237 } 238 return formatPosition(position, length, queueName); 239 } 240 241 private static String formatPosition(int position, int length, String queueName) { 242 if (position < 0) { 243 return "[ NONE ]"; 244 } 245 return queueName + "[ " + (position + 1) + " / " + length + " ]"; 247 } 248 249 252 public static boolean isQueueIdle() { 253 synchronized (getThreadQueue().busyTasks) { 254 return ((getThreadQueue().busyTasks.size() == 0) && (getThreadQueue().idleTasks.size() == 0)); 255 } 256 } 257 258 261 public static boolean isDone(String taskName) { 262 return getThreadQueue().resultList.containsKey(taskName); 263 } 264 265 269 public static void waitForAll() { 270 while (!ThreadQueue.isQueueIdle()) { 271 sleep(SLEEP_TIME); 272 } 273 } 274 275 281 public static boolean waitForAll(int timeout) { 282 TdTimer myTimer = new TdTimer(); 283 while (!ThreadQueue.isQueueIdle()) { 284 sleep(SLEEP_TIME); 285 if (myTimer.time() > timeout) { 286 return false; 287 } 288 } 289 return true; 290 } 291 292 296 public static void waitFor(String taskName) { 297 if (!taskExists(taskName)) { 298 LOG.debug("taskName " + taskName + " doesn't exist"); 299 return; 300 } 301 while (!getThreadQueue().resultList.containsKey(taskName)) { 302 sleep(SLEEP_TIME); 303 } 304 } 305 306 312 public static boolean waitFor(String taskName, int timeout) { 313 if (!taskExists(taskName)) { 314 return false; 315 } 316 TdTimer myTimer = new TdTimer(); 317 while (!getThreadQueue().resultList.containsKey(taskName)) { 318 sleep(SLEEP_TIME); 319 if (myTimer.split() > timeout) { 320 return false; 321 } 322 } 323 return true; 324 } 325 326 331 public static boolean taskExists(String taskName) { 332 synchronized (getThreadQueue().busyTasks) { 333 return !((getResult(taskName) == null) 335 && (getBusyTask(taskName) == null) 336 && (getIdleTask(taskName) == null)); 337 } 338 } 339 340 345 public static boolean isActive(String taskName) { 346 synchronized (getThreadQueue().busyTasks) { 347 return !((getBusyTask(taskName) == null) 349 && (getIdleTask(taskName) == null)); 350 } 351 } 352 353 357 358 public static Object getResult(String workerName) { 359 return getThreadQueue().resultList.get(workerName); 360 } 361 362 365 public static int numRunningTasks() { 366 return getThreadQueue().busyTasks.size(); 367 } 368 369 372 public static int numWaitingTasks() { 373 return getThreadQueue().idleTasks.size(); 374 } 375 376 379 public static int numCompletedTasks() { 380 return getThreadQueue().resultList.size(); 381 } 382 383 386 public static boolean isIdle(String taskName) { 387 return getIdleTask(taskName) != null; 388 } 389 390 395 private static WorkerThread getBusyTask(String taskName) { 396 synchronized (getThreadQueue().busyTasks) { 397 return getTask(taskName, getThreadQueue().busyTasks.iterator()); 398 } 399 } 400 401 406 private static WorkerThread getIdleTask(String taskName) { 407 synchronized (getThreadQueue().idleTasks) { 408 return getTask(taskName, getThreadQueue().idleTasks.iterator()); 409 } 410 } 411 412 417 private static WorkerThread getTask(String taskName, Iterator myIt) { 418 while (myIt.hasNext()) { 419 WorkerThread thisWorker = (WorkerThread) myIt.next(); 420 String tempString = thisWorker.getName(); 421 if (tempString.equalsIgnoreCase(taskName)) { 422 return thisWorker; 423 } 424 } 425 return null; 426 } 427 428 431 public static List getBusyTaskNames() { 432 List names; 433 synchronized (getThreadQueue().busyTasks) { 434 names = getTaskNames(getThreadQueue().busyTasks.iterator()); 435 } 436 return names; 437 } 438 439 442 public static List getIdleTaskNames() { 443 List names; 444 synchronized (getThreadQueue().busyTasks) { 445 names = getTaskNames(getThreadQueue().idleTasks.iterator()); 446 } 447 return names; 448 } 449 450 453 private static List getTaskNames(Iterator taskIter) { 454 List names = new LinkedList (); 455 while (taskIter.hasNext()) { 456 WorkerThread thisWorker = (WorkerThread) taskIter.next(); 457 names.add(thisWorker.getName()); 458 } 459 return names; 460 } 461 462 466 public static String stats() { 467 String stats = numRunningTasks() + " tasks running \n"; 468 stats += numWaitingTasks() + " tasks waiting \n"; 469 470 return stats; 471 } 472 473 477 public static int numTotalTasks() { 478 return numRunningTasks() + numWaitingTasks() + numCompletedTasks(); 479 } 480 481 484 public static void terminate() { 485 ThreadQueue.terminate = true; 486 ThreadQueue.waitForAll(10000); 488 getThreadQueue().idleTasks.clear(); 490 getThreadQueue().busyTasks.clear(); 491 getThreadQueue().resultList.clear(); 492 threadPool = null; 493 getThreadQueue(); 494 ThreadQueue.terminate = false; 495 } 496 497 public static void interruptAllRunningTasks() { 498 synchronized (getThreadQueue().busyTasks) { 499 Map currentRunningThreads = getThreadQueue().runningThreads; 500 501 terminateRunningTasks(currentRunningThreads); 502 interruptRunningThreads(currentRunningThreads); 503 } 504 } 505 506 private static void interruptRunningThreads(Map currentRunningThreads) { 507 for (Iterator iter = currentRunningThreads.values().iterator(); iter.hasNext();) { 508 Thread currentThread = (Thread ) iter.next(); 509 currentThread.interrupt(); 510 } 511 } 512 513 private static void terminateRunningTasks(Map currentRunningThreads) { 514 for (Iterator iter = currentRunningThreads.keySet().iterator(); iter.hasNext();) { 515 WorkerThread currentTask = (WorkerThread) iter.next(); 516 currentTask.terminate(); 517 518 LOG.info("Preparing to stop " + currentTask.getName()); 519 } 520 } 521 522 526 public static void interrupt(String taskName) { 527 synchronized (getThreadQueue().busyTasks) { 528 529 533 if (ThreadQueue.isIdle(taskName)) { 534 if (getThreadQueue().idleTasks.remove(getIdleTask(taskName))) { 535 LOG.debug("removed idle project " + taskName); 536 } else { 537 LOG.warn("could not remove idle project " + taskName); 538 } 539 return; 540 } 542 WorkerThread thisWorker = getBusyTask(taskName); 547 if (thisWorker != null) { 548 LOG.debug("Attempting to stop a project building at the moment: " + taskName); 549 Thread thisThread = 550 (Thread ) getThreadQueue().runningThreads.get(thisWorker); 551 thisThread.interrupt(); 552 getThreadQueue().busyTasks.remove(thisWorker); 553 getThreadQueue().runningThreads.remove(thisThread); 554 LOG.debug("Stopped " + taskName + " succesfully"); 555 return; 556 } 557 558 LOG.warn("Project is neither idle nor busy: " + taskName + "; taking no action"); 559 } 560 } 561 562 566 567 private static String nextName() { 568 synchronized (NAME_COUNTER_SYNCH) { 569 if (nameCounter == Long.MAX_VALUE) { 570 nameCounter = Long.MIN_VALUE; 571 } 572 } 573 nameCounter++; 574 return nameCounter + ""; 575 } 576 577 581 582 static int getMaxNumWorkerThreads() { 583 return getThreadQueue().threadCount; 584 } 585 586 589 private static void sleep(int ms) { 590 try { 591 Thread.sleep(ms); 592 } catch (Exception ignored) { 593 } 594 } 595 } 596 | Popular Tags |