KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > quartz > simpl > SimpleThreadPool


1 /*
2  * Copyright 2004-2005 OpenSymphony
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy
6  * of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations
14  * under the License.
15  *
16  */

17
18 /*
19  * Previously Copyright (c) 2001-2004 James House
20  */

21 package org.quartz.simpl;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.quartz.SchedulerConfigException;
26 import org.quartz.spi.ThreadPool;
27
28 import java.util.Iterator JavaDoc;
29 import java.util.LinkedList JavaDoc;
30 import java.util.List JavaDoc;
31
32 /**
33  * <p>
34  * This is class is a simple implementation of a thread pool, based on the
35  * <code>{@link org.quartz.spi.ThreadPool}</code> interface.
36  * </p>
37  *
38  * <p>
39  * <CODE>Runnable</CODE> objects are sent to the pool with the <code>{@link #runInThread(Runnable)}</code>
40  * method, which blocks until a <code>Thread</code> becomes available.
41  * </p>
42  *
43  * <p>
44  * The pool has a fixed number of <code>Thread</code>s, and does not grow or
45  * shrink based on demand.
46  * </p>
47  *
48  * @author James House
49  * @author Juergen Donnerstag
50  */

51 public class SimpleThreadPool implements ThreadPool {
52
53     /*
54      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
55      *
56      * Data members.
57      *
58      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
59      */

60
61     private int count = -1;
62
63     private int prio = Thread.NORM_PRIORITY;
64
65     private boolean isShutdown = false;
66     private boolean handoffPending = false;
67
68     private boolean inheritLoader = false;
69
70     private boolean inheritGroup = true;
71
72     private boolean makeThreadsDaemons = false;
73
74     private ThreadGroup JavaDoc threadGroup;
75
76     private final Object JavaDoc nextRunnableLock = new Object JavaDoc();
77
78     private List JavaDoc workers;
79     private LinkedList JavaDoc availWorkers = new LinkedList JavaDoc();
80     private LinkedList JavaDoc busyWorkers = new LinkedList JavaDoc();
81
82     private String JavaDoc threadNamePrefix = "SimpleThreadPoolWorker";
83
84     private final Log log = LogFactory.getLog(getClass());
85
86     /*
87      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
88      *
89      * Constructors.
90      *
91      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
92      */

93
94     /**
95      * <p>
96      * Create a new (unconfigured) <code>SimpleThreadPool</code>.
97      * </p>
98      *
99      * @see #setThreadCount(int)
100      * @see #setThreadPriority(int)
101      */

102     public SimpleThreadPool() {
103     }
104
105     /**
106      * <p>
107      * Create a new <code>SimpleThreadPool</code> with the specified number
108      * of <code>Thread</code> s that have the given priority.
109      * </p>
110      *
111      * @param threadCount
112      * the number of worker <code>Threads</code> in the pool, must
113      * be > 0.
114      * @param threadPriority
115      * the thread priority for the worker threads.
116      *
117      * @see java.lang.Thread
118      */

119     public SimpleThreadPool(int threadCount, int threadPriority) {
120         setThreadCount(threadCount);
121         setThreadPriority(threadPriority);
122     }
123
124     /*
125      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
126      *
127      * Interface.
128      *
129      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
130      */

131
132     public Log getLog() {
133         return log;
134     }
135
136     public int getPoolSize() {
137         return getThreadCount();
138     }
139
140     /**
141      * <p>
142      * Set the number of worker threads in the pool - has no effect after
143      * <code>initialize()</code> has been called.
144      * </p>
145      */

146     public void setThreadCount(int count) {
147         this.count = count;
148     }
149
150     /**
151      * <p>
152      * Get the number of worker threads in the pool.
153      * </p>
154      */

155     public int getThreadCount() {
156         return count;
157     }
158
159     /**
160      * <p>
161      * Set the thread priority of worker threads in the pool - has no effect
162      * after <code>initialize()</code> has been called.
163      * </p>
164      */

165     public void setThreadPriority(int prio) {
166         this.prio = prio;
167     }
168
169     /**
170      * <p>
171      * Get the thread priority of worker threads in the pool.
172      * </p>
173      */

174     public int getThreadPriority() {
175         return prio;
176     }
177
178     public void setThreadNamePrefix(String JavaDoc prfx) {
179         this.threadNamePrefix = prfx;
180     }
181
182     public String JavaDoc getThreadNamePrefix() {
183         return threadNamePrefix;
184     }
185
186     /**
187      * @return Returns the
188      * threadsInheritContextClassLoaderOfInitializingThread.
189      */

190     public boolean isThreadsInheritContextClassLoaderOfInitializingThread() {
191         return inheritLoader;
192     }
193
194     /**
195      * @param inheritLoader
196      * The threadsInheritContextClassLoaderOfInitializingThread to
197      * set.
198      */

199     public void setThreadsInheritContextClassLoaderOfInitializingThread(
200             boolean inheritLoader) {
201         this.inheritLoader = inheritLoader;
202     }
203
204     public boolean isThreadsInheritGroupOfInitializingThread() {
205         return inheritGroup;
206     }
207
208     public void setThreadsInheritGroupOfInitializingThread(
209             boolean inheritGroup) {
210         this.inheritGroup = inheritGroup;
211     }
212
213
214     /**
215      * @return Returns the value of makeThreadsDaemons.
216      */

217     public boolean isMakeThreadsDaemons() {
218         return makeThreadsDaemons;
219     }
220
221     /**
222      * @param makeThreadsDaemons
223      * The value of makeThreadsDaemons to set.
224      */

225     public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
226         this.makeThreadsDaemons = makeThreadsDaemons;
227     }
228
229     public void initialize() throws SchedulerConfigException {
230
231         if (count <= 0) {
232             throw new SchedulerConfigException(
233                     "Thread count must be > 0");
234         }
235         if (prio <= 0 || prio > 9) {
236             throw new SchedulerConfigException(
237                     "Thread priority must be > 0 and <= 9");
238         }
239
240         if(isThreadsInheritGroupOfInitializingThread()) {
241             threadGroup = Thread.currentThread().getThreadGroup();
242         } else {
243             // follow the threadGroup tree to the root thread group.
244
threadGroup = Thread.currentThread().getThreadGroup();
245             ThreadGroup JavaDoc parent = threadGroup;
246             while ( !parent.getName().equals("main") ) {
247                 threadGroup = parent;
248                 parent = threadGroup.getParent();
249             }
250             threadGroup = new ThreadGroup JavaDoc(parent, "SimpleThreadPool");
251             if (isMakeThreadsDaemons()) {
252                 threadGroup.setDaemon(true);
253             }
254         }
255
256
257         if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
258             getLog().info(
259                     "Job execution threads will use class loader of thread: "
260                             + Thread.currentThread().getName());
261         }
262
263         // create the worker threads and start them
264
Iterator JavaDoc workerThreads = createWorkerThreads(count).iterator();
265         while(workerThreads.hasNext()) {
266             WorkerThread wt = (WorkerThread) workerThreads.next();
267             wt.start();
268             availWorkers.add(wt);
269         }
270     }
271
272     protected List JavaDoc createWorkerThreads(int count) {
273         workers = new LinkedList JavaDoc();
274         for (int i = 1; i<= count; ++i) {
275             WorkerThread wt = new WorkerThread(this, threadGroup,
276                 getThreadNamePrefix() + "-" + i,
277                 getThreadPriority(),
278                 isMakeThreadsDaemons());
279             if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
280                 wt.setContextClassLoader(Thread.currentThread()
281                         .getContextClassLoader());
282             }
283             workers.add(wt);
284         }
285
286         return workers;
287     }
288
289     /**
290      * <p>
291      * Terminate any worker threads in this thread group.
292      * </p>
293      *
294      * <p>
295      * Jobs currently in progress will complete.
296      * </p>
297      */

298     public void shutdown() {
299         shutdown(true);
300     }
301
302     /**
303      * <p>
304      * Terminate any worker threads in this thread group.
305      * </p>
306      *
307      * <p>
308      * Jobs currently in progress will complete.
309      * </p>
310      */

311     public void shutdown(boolean waitForJobsToComplete) {
312
313         synchronized (nextRunnableLock) {
314             isShutdown = true;
315
316             // signal each worker thread to shut down
317
Iterator JavaDoc workerThreads = workers.iterator();
318             while(workerThreads.hasNext()) {
319                 WorkerThread wt = (WorkerThread) workerThreads.next();
320                 wt.shutdown();
321                 availWorkers.remove(wt);
322             }
323
324             // Give waiting (wait(1000)) worker threads a chance to shut down.
325
// Active worker threads will shut down after finishing their
326
// current job.
327
nextRunnableLock.notifyAll();
328
329             if (waitForJobsToComplete == true) {
330
331                 // wait for hand-off in runInThread to complete...
332
while(handoffPending)
333                         try { nextRunnableLock.wait(100); } catch(Throwable JavaDoc t) {}
334
335                 // Wait until all worker threads are shut down
336
while (busyWorkers.size() > 0) {
337                     WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
338                     try {
339                         getLog().debug(
340                                 "Waiting for thread " + wt.getName()
341                                         + " to shut down");
342
343                         // note: with waiting infinite time the
344
// application may appear to 'hang'.
345
nextRunnableLock.wait(2000);
346                     } catch (InterruptedException JavaDoc ex) {
347                     }
348                 }
349
350                 int activeCount = threadGroup.activeCount();
351                 if (activeCount > 0) {
352                     getLog().info(
353                         "There are still " + activeCount + " worker threads active."
354                         + " See javadoc runInThread(Runnable) for a possible explanation");
355                 }
356
357                 getLog().debug("shutdown complete");
358             }
359         }
360     }
361
362     /**
363      * <p>
364      * Run the given <code>Runnable</code> object in the next available
365      * <code>Thread</code>. If while waiting the thread pool is asked to
366      * shut down, the Runnable is executed immediately within a new additional
367      * thread.
368      * </p>
369      *
370      * @param runnable
371      * the <code>Runnable</code> to be added.
372      */

373     public boolean runInThread(Runnable JavaDoc runnable) {
374         if (runnable == null) {
375             return false;
376         }
377
378         synchronized (nextRunnableLock) {
379
380             handoffPending = true;
381
382             // Wait until a worker thread is available
383
while ((availWorkers.size() < 1) && !isShutdown) {
384                 try {
385                     nextRunnableLock.wait(500);
386                 } catch (InterruptedException JavaDoc ignore) {
387                 }
388             }
389
390             if (!isShutdown) {
391                 WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
392                 busyWorkers.add(wt);
393                 wt.run(runnable);
394             }
395             else {
396                 // If the thread pool is going down, execute the Runnable
397
// within a new additional worker thread (no thread from the pool).
398
WorkerThread wt = new WorkerThread(this, threadGroup,
399                         "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
400                 busyWorkers.add(wt);
401                 workers.add(wt);
402                 wt.start();
403             }
404             nextRunnableLock.notifyAll();
405             handoffPending = false;
406         }
407
408         return true;
409     }
410
411     public int blockForAvailableThreads() {
412         synchronized(nextRunnableLock) {
413
414             while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
415                 try {
416                     nextRunnableLock.wait(500);
417                 } catch (InterruptedException JavaDoc ignore) {
418                 }
419             }
420
421             return availWorkers.size();
422         }
423     }
424
425     protected void makeAvailable(WorkerThread wt) {
426         synchronized(nextRunnableLock) {
427             if(!isShutdown)
428                 availWorkers.add(wt);
429             busyWorkers.remove(wt);
430             nextRunnableLock.notifyAll();
431         }
432     }
433
434     /*
435      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
436      *
437      * WorkerThread Class.
438      *
439      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
440      */

441
442     /**
443      * <p>
444      * A Worker loops, waiting to execute tasks.
445      * </p>
446      */

447     class WorkerThread extends Thread JavaDoc {
448
449         // A flag that signals the WorkerThread to terminate.
450
private boolean run = true;
451
452         private SimpleThreadPool tp;
453
454         private Runnable JavaDoc runnable = null;
455
456         /**
457          * <p>
458          * Create a worker thread and start it. Waiting for the next Runnable,
459          * executing it, and waiting for the next Runnable, until the shutdown
460          * flag is set.
461          * </p>
462          */

463         WorkerThread(SimpleThreadPool tp, ThreadGroup JavaDoc threadGroup, String JavaDoc name,
464                      int prio, boolean isDaemon) {
465
466             this(tp, threadGroup, name, prio, isDaemon, null);
467         }
468
469         /**
470          * <p>
471          * Create a worker thread, start it, execute the runnable and terminate
472          * the thread (one time execution).
473          * </p>
474          */

475         WorkerThread(SimpleThreadPool tp, ThreadGroup JavaDoc threadGroup, String JavaDoc name,
476                      int prio, boolean isDaemon, Runnable JavaDoc runnable) {
477
478             super(threadGroup, name);
479             this.tp = tp;
480             this.runnable = runnable;
481             setPriority(prio);
482             setDaemon(isDaemon);
483         }
484
485         /**
486          * <p>
487          * Signal the thread that it should terminate.
488          * </p>
489          */

490         void shutdown() {
491             run = false;
492
493             // Javadoc mentions that it interrupts blocked I/O operations as
494
// well. Hence the job will most likely fail. I think we should
495
// shut the work thread gracefully, by letting the job finish
496
// uninterrupted. See SimpleThreadPool.shutdown()
497
//interrupt();
498
}
499
500         public void run(Runnable JavaDoc newRunnable) {
501             synchronized(this) {
502                 if(runnable != null)
503                     throw new IllegalStateException JavaDoc("Already running a Runnable!");
504
505                 runnable = newRunnable;
506                 this.notifyAll();
507             }
508         }
509
510         /**
511          * <p>
512          * Loop, executing targets as they are received.
513          * </p>
514          */

515         public void run() {
516             boolean runOnce = (runnable != null);
517
518             boolean ran = false;
519             while (run) {
520                 try {
521                     synchronized(this) {
522                         while (runnable == null && run) {
523                             this.wait(500);
524                         }
525                     }
526
527                     if (runnable != null) {
528                         ran = true;
529                         runnable.run();
530                     }
531                 } catch (InterruptedException JavaDoc unblock) {
532                     // do nothing (loop will terminate if shutdown() was called
533
try {
534                         getLog().error("worker threat got 'interrupt'ed.", unblock);
535                     } catch(Exception JavaDoc e) {
536                         // ignore to help with a tomcat glitch
537
}
538                 } catch (Exception JavaDoc exceptionInRunnable) {
539                     try {
540                         getLog().error("Error while executing the Runnable: ",
541                             exceptionInRunnable);
542                     } catch(Exception JavaDoc e) {
543                         // ignore to help with a tomcat glitch
544
}
545                 } finally {
546                     runnable = null;
547                     // repair the thread in case the runnable mucked it up...
548
if(getPriority() != tp.getThreadPriority())
549                         setPriority(tp.getThreadPriority());
550
551                     if (runOnce) {
552                         run = false;
553                     }
554                     else if(ran) {
555                         ran = false;
556                         makeAvailable(this);
557                     }
558
559                 }
560             }
561
562             //if (log.isDebugEnabled())
563
try {
564                 getLog().debug("WorkerThread is shutting down");
565             } catch(Exception JavaDoc e) {
566                 // ignore to help with a tomcat glitch
567
}
568         }
569     }
570 }
571
Popular Tags