KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sourceforge > cruisecontrol > util > threadpool > ThreadQueue


1 /********************************************************************************
2  * CruiseControl, a Continuous Integration Toolkit
3  * Copyright (c) 2001, ThoughtWorks, Inc.
4  * 651 W Washington Ave. Suite 600
5  * Chicago, IL 60661 USA
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * + Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * + Redistributions in binary form must reproduce the above
16  * copyright notice, this list of conditions and the following
17  * disclaimer in the documentation and/or other materials provided
18  * with the distribution.
19  *
20  * + Neither the name of ThoughtWorks, Inc., CruiseControl, nor the
21  * names of its contributors may be used to endorse or promote
22  * products derived from this software without specific prior
23  * written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
29  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  ********************************************************************************/

37
38 package net.sourceforge.cruisecontrol.util.threadpool;
39
40 import java.util.Collections JavaDoc;
41 import java.util.HashMap JavaDoc;
42 import java.util.Iterator JavaDoc;
43 import java.util.LinkedList JavaDoc;
44 import java.util.List JavaDoc;
45 import java.util.Map JavaDoc;
46
47 import net.sourceforge.cruisecontrol.util.TdTimer;
48
49 import org.apache.log4j.Logger;
50
51 /**
52  * Used to encapsulate the concept of a Thread Pool
53  * <P>
54  * The queue accepts tasks that implement the WorkerThread interface.
55  * Each task may be named, but do not have to be. You may then waitOn
56  * for ever task to complete or just the named tasks you care about...
57  * or not wait at all.
58  *
59  * @author Jared Richardson
60  * @version $Id: ThreadQueue.java,v 1.7 2006/01/24 20:55:22 hack Exp $
61  */

62
63 public class ThreadQueue extends Thread JavaDoc {
64     private static final Logger LOG = Logger.getLogger(ThreadQueue.class);
65
66     /**
67      * The list of WorkerThreads that are waiting to run (currently idle)
68      */

69     private final List JavaDoc idleTasks = Collections.synchronizedList(new LinkedList JavaDoc());
70     /**
71      * The list of WorkerThreads that are running now (currently busy)
72      */

73     private final List JavaDoc busyTasks = Collections.synchronizedList(new LinkedList JavaDoc());
74
75     /**
76      * the resultList from each WorkerThread's run
77      */

78     private final Map JavaDoc resultList = Collections.synchronizedMap(new HashMap JavaDoc());
79
80     /**
81      * Retains a handle to all the running Threads
82      * to handle all sorts of interesting situations
83      */

84
85     private final Map JavaDoc runningThreads = Collections.synchronizedMap(new HashMap JavaDoc());
86
87     /**
88      * The number of java.lang.Threads to be launched by the pool at one time
89      */

90     private final int threadCount = ThreadQueueProperties.getMaxThreadCount();
91
92     /**
93      * The amount to time to sleep between loops
94      */

95     private static final int SLEEP_TIME = 100;
96
97     /**
98      * A handle to the Thread Pool
99      */

100     private static ThreadQueue threadPool;
101
102     /**
103      * this variable is used to generate a unique name
104      * for tasks that are not named when they arrive in
105      * the queue
106      */

107
108     private static long nameCounter = Long.MIN_VALUE;
109
110     /**
111      * this variable is simple used to synchronize
112      * access to the nameCounter above
113      */

114
115     private static final Long JavaDoc NAME_COUNTER_SYNCH = new Long JavaDoc("0");
116
117     /**
118      * tells the main process when to exit
119      */

120     private static boolean terminate = false;
121
122     /*
123        fetch tasks to be executed from the idle list,
124        put them on the busy list, and
125        execute them
126     */

127     public void run() {
128         while (true) {
129             if (ThreadQueue.terminate) {
130                 LOG.info("terminating ThreadQueue.run()");
131                 return;
132             }
133
134             final boolean nothingWaiting = idleTasks.size() == 0;
135             final boolean maxedOut = busyTasks.size() >= threadCount;
136
137             if (nothingWaiting || maxedOut) {
138                 sleep(SLEEP_TIME);
139             } else {
140                 LOG.debug("handling waiting task");
141                 handleWaitingTask();
142             }
143
144             cleanCompletedTasks();
145         }
146     }
147
148     private void handleWaitingTask() {
149         synchronized (busyTasks) {
150             WorkerThread worker = (WorkerThread) idleTasks.remove(0);
151             Thread JavaDoc thisThread = new Thread JavaDoc(worker);
152             busyTasks.add(worker);
153             runningThreads.put(worker, thisThread);
154             if (!ThreadQueue.terminate) {
155                 thisThread.start();
156             }
157         }
158     }
159
160     private void cleanCompletedTasks() {
161         synchronized (busyTasks) {
162             Iterator JavaDoc tasks = busyTasks.iterator();
163             while (tasks.hasNext()) {
164                 WorkerThread task = (WorkerThread) tasks.next();
165                 Object JavaDoc result = task.getResult();
166                 final boolean taskDone = result != null;
167                 if (taskDone) {
168                     LOG.debug("Found a finished task");
169                     LOG.debug("tempTask.getName() = " + task.getName());
170                     LOG.debug("tempTask.getResult() = " + task.getResult());
171
172                     resultList.put(task.getName(), result);
173                     tasks.remove();
174                     runningThreads.remove(task);
175                 }
176             }
177         }
178     }
179
180     /**
181      * An internal wrapper around the creation of the
182      * Thread Pool singleton
183      */

184
185     private static ThreadQueue getThreadQueue() {
186         if (threadPool == null) {
187             threadPool = new ThreadQueue();
188             threadPool.start();
189         }
190         return threadPool;
191     }
192
193     /**
194      * Adds a task to the idleList to be executed
195      */

196     public static void addTask(WorkerThread task) {
197         LOG.debug("Preparing to add worker task " + task.getName());
198         if (task.getName().equals(WorkerThread.BLANK_NAME)) {
199             task.setName(nextName());
200         }
201         //System.out.println("adding worker task "+task.getName());
202
if (isActive(task.getName())) {
203             throw new RuntimeException JavaDoc("Duplicate task name!");
204         }
205         synchronized (getThreadQueue().busyTasks) {
206             getThreadQueue().idleTasks.add(task);
207         }
208     }
209
210     /**
211      * This may not *always* work -- a task may slip by us between queue checks.
212      * That's OK. We'd rather have transient results than block the busy queue
213      * until we're done just to get a position report on a task.
214      */

215     public static String JavaDoc findPosition(String JavaDoc taskName) {
216         WorkerThread task = getIdleTask(taskName);
217         if (task != null) {
218             return getTaskPosition(task, getThreadQueue().idleTasks, "IDLE");
219         }
220         task = getBusyTask(taskName);
221         if (task != null) {
222             return getTaskPosition(task, getThreadQueue().busyTasks, "BUSY");
223         }
224         Object JavaDoc result = getResult(taskName);
225         if (result != null) {
226             return "[ COMPLETE ]";
227         }
228         return "[ not found in queues ]";
229     }
230
231     private static String JavaDoc getTaskPosition(WorkerThread task, List JavaDoc queue, String JavaDoc queueName) {
232         int position;
233         int length;
234         synchronized (getThreadQueue().busyTasks) {
235             position = queue.indexOf(task);
236             length = queue.size();
237         }
238         return formatPosition(position, length, queueName);
239     }
240
241     private static String JavaDoc formatPosition(int position, int length, String JavaDoc queueName) {
242         if (position < 0) {
243             return "[ NONE ]";
244         }
245         // position is 0-based, make it 1-based for human reporting
246
return queueName + "[ " + (position + 1) + " / " + length + " ]";
247     }
248
249     /**
250      * Checks to see if all tasks are done
251      */

252     public static boolean isQueueIdle() {
253         synchronized (getThreadQueue().busyTasks) {
254             return ((getThreadQueue().busyTasks.size() == 0) && (getThreadQueue().idleTasks.size() == 0));
255         }
256     }
257
258     /**
259      * Checks to see if a specific task is done
260      */

261     public static boolean isDone(String JavaDoc taskName) {
262         return getThreadQueue().resultList.containsKey(taskName);
263     }
264
265     /**
266      * Waits until all tasks are done
267      * same as Thread t.wait()
268      */

269     public static void waitForAll() {
270         while (!ThreadQueue.isQueueIdle()) {
271             sleep(SLEEP_TIME);
272         }
273     }
274
275     /**
276      * Waits until all tasks are done
277      * same as Thread t.wait()
278      *
279      * @return TRUE is all tasks finished, FALSE if timeout occurred
280      */

281     public static boolean waitForAll(int timeout) {
282         TdTimer myTimer = new TdTimer();
283         while (!ThreadQueue.isQueueIdle()) {
284             sleep(SLEEP_TIME);
285             if (myTimer.time() > timeout) {
286                 return false;
287             }
288         }
289         return true;
290     }
291
292     /**
293      * Waits for a specific task to finish
294      * same as Thread t.wait()
295      */

296     public static void waitFor(String JavaDoc taskName) {
297         if (!taskExists(taskName)) {
298             LOG.debug("taskName " + taskName + " doesn't exist");
299             return;
300         }
301         while (!getThreadQueue().resultList.containsKey(taskName)) {
302             sleep(SLEEP_TIME);
303         }
304     }
305
306     /**
307      * Waits for a specific task to finish, but with a timeout
308      * same as Thread t.wait(), but with a timeout
309      *
310      * @return TRUE if task finished, FALSE if timeout occurred
311      */

312     public static boolean waitFor(String JavaDoc taskName, int timeout) {
313         if (!taskExists(taskName)) {
314             return false;
315         }
316         TdTimer myTimer = new TdTimer();
317         while (!getThreadQueue().resultList.containsKey(taskName)) {
318             sleep(SLEEP_TIME);
319             if (myTimer.split() > timeout) {
320                 return false;
321             }
322         }
323         return true;
324     }
325
326     /**
327      * Checks to see if a specific task is in our system
328      *
329      * @return TRUE if task is found, FALSE if not
330      */

331     public static boolean taskExists(String JavaDoc taskName) {
332         synchronized (getThreadQueue().busyTasks) {
333             // it's either done, busy or idle
334
return !((getResult(taskName) == null)
335                     && (getBusyTask(taskName) == null)
336                     && (getIdleTask(taskName) == null));
337         }
338     }
339
340     /**
341      * Checks to see if a specific task is either running or waiting in our system
342      *
343      * @return TRUE if task is waiting or running, FALSE if it is finished
344      */

345     public static boolean isActive(String JavaDoc taskName) {
346         synchronized (getThreadQueue().busyTasks) {
347             // it's either busy or idle
348
return !((getBusyTask(taskName) == null)
349                     && (getIdleTask(taskName) == null));
350         }
351     }
352
353     /**
354      * fetch a result from a completed WorkerThread
355      * a null result means it's not done yet
356      */

357
358     public static Object JavaDoc getResult(String JavaDoc workerName) {
359         return getThreadQueue().resultList.get(workerName);
360     }
361
362     /**
363      * tells you how many tasks are running now
364      */

365     public static int numRunningTasks() {
366         return getThreadQueue().busyTasks.size();
367     }
368
369     /**
370      * tells you how many tasks are waiting now
371      */

372     public static int numWaitingTasks() {
373         return getThreadQueue().idleTasks.size();
374     }
375
376     /**
377      * tells you how many tasks have completed
378      */

379     public static int numCompletedTasks() {
380         return getThreadQueue().resultList.size();
381     }
382
383     /**
384      * tells you if a task is waiting now
385      */

386     public static boolean isIdle(String JavaDoc taskName) {
387         return getIdleTask(taskName) != null;
388     }
389
390     /**
391      * retrieves an active task from the busy list
392      *
393      * @return the active task (if present) or null if it cannot be found
394      */

395     private static WorkerThread getBusyTask(String JavaDoc taskName) {
396         synchronized (getThreadQueue().busyTasks) {
397             return getTask(taskName, getThreadQueue().busyTasks.iterator());
398         }
399     }
400
401     /**
402      * retrieves an idle task from the idle list
403      *
404      * @return the idle task (if present) or null if it cannot be found
405      */

406     private static WorkerThread getIdleTask(String JavaDoc taskName) {
407         synchronized (getThreadQueue().idleTasks) {
408             return getTask(taskName, getThreadQueue().idleTasks.iterator());
409         }
410     }
411
412     /**
413      * retrieves a task from the list
414      *
415      * @return the task (if present) or null if it cannot be found
416      */

417     private static WorkerThread getTask(String JavaDoc taskName, Iterator JavaDoc myIt) {
418         while (myIt.hasNext()) {
419             WorkerThread thisWorker = (WorkerThread) myIt.next();
420             String JavaDoc tempString = thisWorker.getName();
421             if (tempString.equalsIgnoreCase(taskName)) {
422                 return thisWorker;
423             }
424         }
425         return null;
426     }
427
428     /**
429      * @return the names of the tasks in the busy list; may be empty
430      */

431     public static List JavaDoc getBusyTaskNames() {
432         List JavaDoc names;
433         synchronized (getThreadQueue().busyTasks) {
434             names = getTaskNames(getThreadQueue().busyTasks.iterator());
435         }
436         return names;
437     }
438
439     /**
440      * @return the names of the tasks in the idle list; may be empty
441      */

442     public static List JavaDoc getIdleTaskNames() {
443         List JavaDoc names;
444         synchronized (getThreadQueue().busyTasks) {
445             names = getTaskNames(getThreadQueue().idleTasks.iterator());
446         }
447         return names;
448     }
449
450     /**
451      * @return the names of the tasks in the list; may be empty
452      */

453     private static List JavaDoc getTaskNames(Iterator JavaDoc taskIter) {
454         List JavaDoc names = new LinkedList JavaDoc();
455         while (taskIter.hasNext()) {
456             WorkerThread thisWorker = (WorkerThread) taskIter.next();
457             names.add(thisWorker.getName());
458         }
459         return names;
460     }
461
462     /**
463      * returns a string telling you number of idle
464      * and busy worker threads
465      */

466     public static String JavaDoc stats() {
467         String JavaDoc stats = numRunningTasks() + " tasks running \n";
468         stats += numWaitingTasks() + " tasks waiting \n";
469
470         return stats;
471     }
472
473     /**
474      * returns the number of idle, busy and finished
475      * worker threads
476      */

477     public static int numTotalTasks() {
478         return numRunningTasks() + numWaitingTasks() + numCompletedTasks();
479     }
480
481     /**
482      * Terminate the queue's operation
483      */

484     public static void terminate() {
485         ThreadQueue.terminate = true;
486         // give everyone up to 10 seconds to acknowledge the terminate
487
ThreadQueue.waitForAll(10000);
488         // empty the various
489
getThreadQueue().idleTasks.clear();
490         getThreadQueue().busyTasks.clear();
491         getThreadQueue().resultList.clear();
492         threadPool = null;
493         getThreadQueue();
494         ThreadQueue.terminate = false;
495     }
496
497     public static void interruptAllRunningTasks() {
498         synchronized (getThreadQueue().busyTasks) {
499             Map JavaDoc currentRunningThreads = getThreadQueue().runningThreads;
500             
501             terminateRunningTasks(currentRunningThreads);
502             interruptRunningThreads(currentRunningThreads);
503         }
504     }
505
506     private static void interruptRunningThreads(Map JavaDoc currentRunningThreads) {
507         for (Iterator JavaDoc iter = currentRunningThreads.values().iterator(); iter.hasNext();) {
508             Thread JavaDoc currentThread = (Thread JavaDoc) iter.next();
509             currentThread.interrupt();
510         }
511     }
512
513     private static void terminateRunningTasks(Map JavaDoc currentRunningThreads) {
514         for (Iterator JavaDoc iter = currentRunningThreads.keySet().iterator(); iter.hasNext();) {
515             WorkerThread currentTask = (WorkerThread) iter.next();
516             currentTask.terminate();
517             
518             LOG.info("Preparing to stop " + currentTask.getName());
519         }
520     }
521
522     /**
523      * Waits for a specific task to finish
524      * same as Thread t.wait()
525      */

526     public static void interrupt(String JavaDoc taskName) {
527         synchronized (getThreadQueue().busyTasks) {
528
529             // check for it in the idleList
530
// *remove it (before it starts running)
531
// *return
532

533             if (ThreadQueue.isIdle(taskName)) {
534                 if (getThreadQueue().idleTasks.remove(getIdleTask(taskName))) {
535                     LOG.debug("removed idle project " + taskName);
536                 } else {
537                     LOG.warn("could not remove idle project " + taskName);
538                 }
539                 return;
540             } // end of if ( getThreadQueue().isIdle(taksName()) {
541

542             // At this point, it must be busy if it is in our system
543
// *interrupt it
544
// *cleanup
545
// *return
546
WorkerThread thisWorker = getBusyTask(taskName);
547             if (thisWorker != null) {
548                 LOG.debug("Attempting to stop a project building at the moment: " + taskName);
549                 Thread JavaDoc thisThread =
550                         (Thread JavaDoc) getThreadQueue().runningThreads.get(thisWorker);
551                 thisThread.interrupt();
552                 getThreadQueue().busyTasks.remove(thisWorker);
553                 getThreadQueue().runningThreads.remove(thisThread);
554                 LOG.debug("Stopped " + taskName + " succesfully");
555                 return;
556             }
557             
558             LOG.warn("Project is neither idle nor busy: " + taskName + "; taking no action");
559         }
560     }
561
562     /**
563      * This call wraps the next unique name for a task
564      * that is inerted with no name
565      */

566
567     private static String JavaDoc nextName() {
568         synchronized (NAME_COUNTER_SYNCH) {
569             if (nameCounter == Long.MAX_VALUE) {
570                 nameCounter = Long.MIN_VALUE;
571             }
572         }
573         nameCounter++;
574         return nameCounter + "";
575     }
576
577     /**
578      * Tells the caller how many worker threads are in
579      * use to service the worker tasks
580      */

581
582     static int getMaxNumWorkerThreads() {
583         return getThreadQueue().threadCount;
584     }
585
586     /**
587      * Utility call for sleeps
588      */

589     private static void sleep(int ms) {
590         try {
591             Thread.sleep(ms);
592         } catch (Exception JavaDoc ignored) {
593         }
594     }
595 }
596
Popular Tags