KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > threadpool > ThreadPool


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.mq.threadpool;
8
9 import java.lang.Thread JavaDoc;
10 import java.lang.ThreadGroup JavaDoc;
11
12 import java.util.ArrayList JavaDoc;
13 import java.util.LinkedList JavaDoc;
14
15
16 /**
17  * This is an implementation of a simple thread pool with
18  * an embedded work queue.
19  *
20  * @author Ole Husgaard (osh@sparre.dk)
21  * @version $Revision: 1.1 $
22  */

23 public class ThreadPool
24 {
25    /**
26     * The name of this thread pool
27     */

28    private String JavaDoc name;
29
30    /**
31     * Flags that worker threads should be created as daemon threads.
32     */

33    private boolean daemon;
34
35    /**
36     * The ThreadGroup of threads in this pool.
37     */

38    private ThreadGroup JavaDoc threadGroup;
39
40    /**
41     * The worker threads.
42     */

43    private ArrayList JavaDoc workers;
44
45    /**
46     * Maximum number of worker threads.
47     */

48    private int maxWorkers;
49
50    /**
51     * Count of idle worker threads.
52     * Synchronized on the [@link #workers} field.
53     */

54    private int idleWorkers;
55
56    /**
57     * Flags that we are shutting down the pool.
58     * Synchronized on the [@link #workers} field.
59     */

60    private volatile boolean stopping;
61
62    /**
63     * The work queue.
64     */

65    private LinkedList JavaDoc queue;
66
67
68    /**
69     * Create a new thread pool instance.
70     *
71     * @param Name The name of this thread pool. This is used for naming its
72     * worker threads.
73     * @param threadGroup The <code>ThreadGroup</code> that worker threads
74     * in this pool should belong to.
75     * @param maxWorkers The maximum number of worker threads in this pool.
76     * @param daemon If <code>true</code>, worker threads will be created as
77     * daemon threads.
78     */

79    public ThreadPool(String JavaDoc name, ThreadGroup JavaDoc threadGroup, int maxWorkers,
80                      boolean daemon)
81    {
82       if (name == null || threadGroup == null || maxWorkers <= 0)
83          throw new IllegalArgumentException JavaDoc();
84
85       this.name = name;
86       this.daemon = daemon;
87       this.threadGroup = threadGroup;
88       workers = new ArrayList JavaDoc();
89       this.maxWorkers = maxWorkers;
90       idleWorkers = 0;
91       stopping = false;
92       queue = new LinkedList JavaDoc();
93    }
94
95    /**
96     * Shutdown this thread pool.
97     * This will not return until all enqueued work has been cancelled,
98     * and all worker threads have done any work they started and have
99     * died.
100     */

101     public void shutdown()
102     {
103        stopping = true;
104
105        synchronized (queue) {
106           // Remove all queued work
107
queue.clear();
108           // Notify all waiting threads
109
queue.notifyAll();
110        }
111
112        // wait for all worker threads to die.
113
synchronized (workers) {
114           while (workers.size() > 0) {
115              try {
116                 // wait for some worker threads to die.
117
workers.wait();
118              } catch (InterruptedException JavaDoc ex) {
119                 // ignore
120
}
121           }
122        }
123     }
124
125
126    /**
127     * Enqueue a piece of work for this thread to handle.
128     * As soon as a thread becomes available, it will call
129     * {@link Work#doWork} of the argument.
130     * If the pool is shutting down, this method will not enqueue the
131     * work, but instead simply return.
132     *
133     * @param work The piece of work to be enqueued.
134     */

135    public void enqueueWork(Work work)
136    {
137 //System.err.println("ThreadPool("+name+"): enqueueWork() entered.");
138
// We may want to start a worker thread
139
synchronized (workers) {
140 //System.err.println("ThreadPool("+name+"): enqueueWork(): idleWorkers="+idleWorkers+" stopping="+stopping+".");
141
//System.err.println("ThreadPool("+name+"): enqueueWork(): workers.size()="+workers.size()+" maxWorkers="+maxWorkers+".");
142
if (idleWorkers == 0 && !stopping && workers.size() < maxWorkers)
143 {
144             new WorkerThread(name + "-" + (workers.size() + 1)).start();
145 //System.err.println("ThreadPool("+name+"): started new WorkerThread.");
146
}
147       }
148
149       synchronized (queue) {
150          if (stopping)
151             return; // we are shutting down, cannot take new work.
152

153          queue.addLast(work);
154 //System.err.println("ThreadPool("+name+"): enqueueWork(): enqueued work..");
155
queue.notify();
156       }
157    }
158
159    /**
160     * Cancel a piece of enqueued work.
161     *
162     * @param work The piece of work to be cancel.
163     */

164    public void cancelWork(Work work)
165    {
166       synchronized (queue) {
167          // It may be enqueued several times.
168
while (queue.remove(work))
169             ;
170       }
171    }
172
173
174    /**
175     * The threads that do the actual work.
176     */

177    private class WorkerThread
178       extends Thread JavaDoc
179    {
180       /**
181        * Create a new WorkerThread.
182        * This must be called when holding the workers monitor.
183        */

184       WorkerThread(String JavaDoc name)
185       {
186          super(threadGroup, name);
187          setDaemon(daemon);
188          workers.add(this);
189 //System.err.println("ThreadPool("+name+"): " + getName() + " created.");
190
}
191
192       /**
193        * Wait for work do to.
194        * This must be called when holding the queue monitor.
195        * This will temporarily increment the count of idle workers
196        * while waiting.
197        */

198       private void idle()
199       {
200          try {
201             synchronized (workers) {
202               ++idleWorkers;
203             }
204             //System.err.println("ThreadPool("+name+"): " + getName() + " starting to wait.");
205
queue.wait();
206          } catch (InterruptedException JavaDoc ex) {
207             // ignore
208
} finally {
209             //System.err.println("ThreadPool("+name+"): " + getName() + " done waiting.");
210
synchronized (workers) {
211               --idleWorkers;
212             }
213          }
214       }
215
216       public void run()
217       {
218 //System.err.println("ThreadPool("+name+"): " + getName() + " started to run.");
219
while (!stopping) {
220             Work work = null;
221
222             synchronized (queue) {
223                if (queue.size() == 0)
224                   idle();
225                if (!stopping && queue.size() > 0)
226                   work = (Work)queue.removeFirst();
227             }
228
229             if (work != null)
230                work.doWork();
231          }
232          synchronized (workers) {
233             workers.remove(this);
234             // Notify the shutdown thread.
235
workers.notify();
236          }
237       }
238    }
239 }
240
Popular Tags