KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exoplatform > services > threadpool > impl > ThreadPoolServiceImpl


1 /**
2  * $Id: ThreadPoolServiceImpl.java,v 1.2 2004/05/24 17:02:00 tuan08 Exp $
3  *
4  * The contents of this file are subject to the ClickBlocks Public
5  * License Version 1.0 (the "License"); you may not use this file
6  * except in compliance with the License. You may obtain a copy of
7  * the License at http://www.clickblocks.org
8  *
9  * Software distributed under the License is distributed on an "AS
10  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
11  * implied, including, but not limited to, the implied warranties of
12  * merchantability, fitness for a particular purpose and
13  * non-infringement. See the License for the specific language
14  * governing rights and limitations under the License.
15  *
16  * ClickBlocks, the ClickBlocks logo and combinations thereof are
17  * trademarks of ClickBlocks, LLC in the United States and other
18  * countries.
19  *
20  * The Initial Developer of the Original Code is ClickBlocks, LLC.
21  * Portions created by ClickBlocks, LLC are Copyright (C) 2000.
22  * All Rights Reserved.
23  *
24  * Contributor(s): Mark Grand
25  */

26
27 package org.exoplatform.services.threadpool.impl;
28
29
30 import java.util.*;
31
32 import org.apache.commons.logging.Log;
33 import org.exoplatform.services.log.LogService;
34 import org.exoplatform.services.threadpool.ThreadPoolService;
35
36 public class ThreadPoolServiceImpl implements ThreadPoolService {
37
38   /**
39    * The maximum pool size; used if not otherwise specified.
40    * Default value is 500
41    */

42   public static final int DEFAULT_MAXIMUMPOOLSIZE = 500;
43   /**
44    * The normal pool size; used if not otherwise specified.
45    * Default value is 1.
46    */

47   public static final int DEFAULT_NORMALPOOLSIZE = 1;
48   /**
49    * The maximum time to keep worker threads alive waiting for
50    * new tasks; used if not otherwise specified. Default
51    * value is one minute (60000 milliseconds).
52    */

53   public static final long DEFAULT_MAXIDLETIME = 60 * 1000;
54   // Bounds declared as volatile to avoid having to carefully
55
// synchronize responses to changes in value.
56
protected volatile int maximumPoolSize = DEFAULT_MAXIMUMPOOLSIZE;
57   protected volatile int normalPoolSize = DEFAULT_NORMALPOOLSIZE;
58   protected long maxIdleTime = DEFAULT_MAXIDLETIME;
59   /*
60    * The queue is used to hand off the task
61    * to a thread in the pool
62    */

63   protected Queue handOff;
64   /**
65    * Lock used for protecting poolSize and threads map *
66    */

67   protected Object JavaDoc poolLock = new Object JavaDoc();
68   /**
69    * Current pool size. Relies on poolLock for all locking.
70    * But is also volatile to allow simpler checking inside
71    * worker thread runloop.
72    */

73   protected volatile int poolSize = 0;
74   /**
75    * An object to map active worker objects to their active
76    * thread. This is used by the interruptAll method.
77    * It may also be useful in subclasses that need to perform
78    * other thread management chores.
79    * All operations on the Map should be done holding
80    * a synchronization lock on poolLock.
81    */

82   protected Map threads;
83   /**
84    * This object delegates the creation of threads to the
85    * factory object referenced by this variable.
86    */

87   private ThreadFactoryIF threadFactory = new DefaultThreadFactory();
88
89   private Log log;
90
91   /**
92    * Construct a new pool with all default settings
93    */

94   public ThreadPoolServiceImpl(LogService logService) {
95     log = logService.getLog("org.exoplatform.services.threadpool");
96     maximumPoolSize = DEFAULT_MAXIMUMPOOLSIZE;
97     handOff = new Queue();
98     runWhenBlocked();
99     threads = new HashMap();
100   } // constructor()
101

102   /**
103    * Return the maximum number of threads to simultaneously
104    * execute New requests are handled according to the
105    * current blocking policy once this limit is exceeded.
106    */

107   public int getMaximumPoolSize() {
108     return maximumPoolSize;
109   } // getMaximumPoolSize
110

111   /**
112    * Set the maximum number of threads to use. Decreasing
113    * the pool size will not immediately kill existing threads,
114    * but they may later die when idle.
115    *
116    * @throws IllegalArgumentException if less or equal to zero. (It is not
117    * considered an error to set the maximum to be
118    * less than than the normal. However, in this
119    * case there are no guarantees about behavior.)
120    */

121   public void setMaximumPoolSize(int newMaximum) {
122     if (newMaximum <= 0) throw new IllegalArgumentException JavaDoc();
123     maximumPoolSize = newMaximum;
124   } // setMaximumPoolSize(int)
125

126   /**
127    * Return the normal number of threads to simultaneously
128    * execute. (Default value is 1). If fewer than the mininum
129    * number are running upon reception of a new request, a new
130    * thread is started to handle this request.
131    */

132   public int getNormalPoolSize() {
133     return normalPoolSize;
134   } // getNormalPoolSize()
135

136   /**
137    * Set the normal number of threads to use.
138    *
139    * @throws IllegalArgumentException if less than zero.
140    * (It is not considered an error to set the
141    * normal to be greater than the maximum. However,
142    * in this case there are no guarantees about
143    * behavior.)
144    */

145   public void setNormalPoolSize(int newNormal) {
146     if (newNormal < 0) {
147       throw new IllegalArgumentException JavaDoc();
148     } // if
149
normalPoolSize = newNormal;
150   } // setNormalPoolSize(int)
151

152   /**
153    * Return the current number of active threads in the pool.
154    * This number is just a snaphot, and may change immediately.
155    */

156   public int getPoolSize() {
157     return poolSize;
158   } // getPoolSize()
159

160   /**
161    * Set the object that will be used to create threads.
162    */

163   public void setThreadFactory(ThreadFactoryIF newValue) {
164     threadFactory = newValue;
165   } // setThreadFactory(ThreadFactoryIF)
166

167   /**
168    * Return the current thread factory object.
169    */

170   protected ThreadFactoryIF getThreadFactory() {
171     return threadFactory;
172   } // getThreadFactory()
173

174   /**
175    * Create and start a thread to handle a new task.
176    * Call only when holding poolLock.
177    */

178   protected void addThread(Runnable JavaDoc task) {
179     ++poolSize;
180     Worker worker = new Worker(task);
181     Thread JavaDoc thread = getThreadFactory().createThread(worker);
182     threads.put(worker, thread);
183     thread.start();
184   } // addThread(Runnable)
185

186   /**
187    * Create and start up to numberOfThreads threads in the
188    * pool.
189    *
190    * @return The actual number of threads created. This may be
191    * less than the number of threads requested if
192    * creating more would exceed maximum pool size.
193    */

194   public int createThreads(int numberOfThreads) {
195     int ncreated = 0;
196     for (int i = 0; i < numberOfThreads; ++i) {
197       synchronized (poolLock) {
198         if (getPoolSize() < getMaximumPoolSize()) {
199           ++ncreated;
200           addThread(null);
201         } else {
202           break;
203         } // if
204
} // synchronized
205
} // for
206
return ncreated;
207   } // createThreads
208

209   /**
210    * Interrupt all threads in the pool, causing them all
211    * to terminate. Threads will terminate sooner if the
212    * executed tasks themselves respond to interrupts.
213    */

214   public void interruptAll() {
215     // Synchronized to avoid concurrentModification exceptions
216
synchronized (poolLock) {
217       for (Iterator it = threads.values().iterator();
218            it.hasNext();) {
219         Thread JavaDoc t = (Thread JavaDoc) (it.next());
220         t.interrupt();
221       } // for
222
} // synchronized
223
} // interruptAll()
224

225   /**
226    * Remove all unprocessed tasks from pool queue, and
227    * return them in a java.util.List. It should normally be
228    * used only when there are not any active clients of the
229    * pool (otherwise you face the possibility that the method
230    * will loop pulling out tasks as clients are putting them
231    * in.) This method can be useful after shutting down a pool
232    * (via interruptAll) to determine whether there are any
233    * pending tasks that were not processed. You can then, for
234    * example execute all unprocessed tasks via code along the
235    * lines of:
236    * <pre>
237    * List tasks = pool.drain();
238    * for (Iterator it = tasks.iterator(); it.hasNext();)
239    * ( (Runnable)(it.next()) ).run();
240    * </pre>
241    */

242   public List drain() {
243     boolean wasInterrupted = false;
244     Vector tasks = new Vector();
245     for (; ;) {
246       try {
247         Object JavaDoc x = handOff.get(0);
248         if (x == null)
249           break;
250         else
251           tasks.addElement(x);
252       } catch (InterruptedException JavaDoc ex) {
253         wasInterrupted = true; // postpone re-interrupt until drained
254
} // try
255
} // for
256
if (wasInterrupted) Thread.currentThread().interrupt();
257     return tasks;
258   } // drain()
259

260
261   /**
262    * Return the number of milliseconds to keep threads
263    * alive waiting for new tasks. A negative value
264    * means to wait forever. A zero value means not to wait
265    * at all.
266    */

267   public synchronized long getMaxIdleTime() {
268     return maxIdleTime;
269   } // getMaxIdleTime()
270

271   /**
272    * Set the number of milliseconds to keep threads
273    * alive waiting for new tasks. A negative value
274    * means to wait forever. A zero value means not to wait
275    * at all.
276    */

277   public synchronized void setMaxIdleTime(long msecs) {
278     maxIdleTime = msecs;
279   } // setMaxIdleTime(long)
280

281   /**
282    * Called upon termination of worker thread *
283    */

284   protected void workerDone(Worker w) {
285     synchronized (poolLock) {
286       --poolSize;
287       threads.remove(w);
288     } // synchronized
289
} // sooner
290

291   /**
292    * get a task from the handoff queue *
293    */

294   protected Runnable JavaDoc getTask() throws InterruptedException JavaDoc {
295     long waitTime = getMaxIdleTime();
296     if (waitTime >= 0) {
297       return (Runnable JavaDoc) (handOff.get(waitTime));
298     } else {
299       return (Runnable JavaDoc) (handOff.get());
300     } // if
301
} // getTask()
302

303   /**
304    * Class defining the basic run loop for pooled threads.
305    */

306   protected class Worker implements Runnable JavaDoc {
307     protected Runnable JavaDoc firstTask;
308
309     Worker(Runnable JavaDoc firstTask) {
310       this.firstTask = firstTask;
311     } // constructor(Runnable)
312

313     public void run() {
314       try {
315         Runnable JavaDoc task = firstTask;
316         firstTask = null;
317         if (task != null) {
318           task.run();
319         } // if
320
// Continue working until max lowered
321
while (getPoolSize() <= getMaximumPoolSize()) {
322           task = getTask();
323           if (task != null) {
324             task.run();
325           } else {
326             break;
327           } // if
328
} // while
329
} catch (InterruptedException JavaDoc e) {
330         // let this just fall through so the thread
331
// dies a quiet death.
332
} finally {
333         workerDone(this);
334       } // try
335
} // run()
336
} // class Worker
337

338
339   /**
340    * Class for actions to take when execute() blocks. Uses
341    * Strategy pattern to represent different actions. You can
342    * add more in subclasses, and/or create subclasses of
343    * these. If so, you will also want to add or modify the
344    * corresponding methods that set the current
345    * blockedExectionStrategy.
346    */

347   protected interface BlockedExecutionStrategy {
348     /**
349      * Return true if successfully handled so, execute
350      * should terminate; else return false if execute loop
351      * should be retried.
352      */

353     public boolean blockedAction(Runnable JavaDoc task);
354   } // interface BlockedExecutionStrategy
355

356   /**
357    * Class defining Run action *
358    */

359   protected class RunWhenBlocked implements BlockedExecutionStrategy {
360     public boolean blockedAction(Runnable JavaDoc task) {
361       task.run();
362       return true;
363     } // blockedAction(Runnable)
364
} // class RunWhenBlocked
365

366   /**
367    * Class defining Wait action *
368    */

369   protected class WaitWhenBlocked implements BlockedExecutionStrategy {
370     public boolean blockedAction(Runnable JavaDoc task) {
371       try {
372         handOff.put(task);
373       } catch (InterruptedException JavaDoc ex) {
374         Thread.currentThread().interrupt(); // propagate
375
} // try
376
return true;
377     } // blockedAction(Runnable)
378
} // class WaitWhenBlocked
379

380   /**
381    * Class defining Discard action *
382    */

383   protected class DiscardWhenBlocked implements BlockedExecutionStrategy {
384     public boolean blockedAction(Runnable JavaDoc task) {
385       return true;
386     } // blockedAction(Runnable)
387
} // class DiscardWhenBlocked
388

389   /**
390    * The current strategy *
391    */

392   protected BlockedExecutionStrategy blockedExecutionStrategy;
393
394   /**
395    * Get the strategy for blocked execution *
396    */

397   protected synchronized BlockedExecutionStrategy getBlockedExecutionStrategy() {
398     return blockedExecutionStrategy;
399   } // getBlockedExecutionStrategy()
400

401   /**
402    * Set the policy for blocked execution to be that
403    * the current thread executes the task if
404    * there are no available threads in the pool.
405    */

406   public synchronized void runWhenBlocked() {
407     blockedExecutionStrategy = new RunWhenBlocked();
408   } // runWhenBlocked()
409

410   /**
411    * Set the policy for blocked execution to be to
412    * wait until a thread is available.
413    */

414   public synchronized void WhenBlocked() {
415     blockedExecutionStrategy = new WaitWhenBlocked();
416   } // WaitWhenBlocked()
417

418   /**
419    * Set the policy for blocked execution to be to
420    * return without executing the request
421    */

422   public synchronized void discardWhenBlocked() {
423     blockedExecutionStrategy = new DiscardWhenBlocked();
424   } // discardWhenBlocked()
425

426   /**
427    * Arrange for the given task to be executed by a thread in
428    * this pool. The method normally returns when the task
429    * has been handed off for (possibly later) execution.
430    */

431   public void execute(Runnable JavaDoc task) throws InterruptedException JavaDoc {
432     log.debug("execute method called");
433     while (true) {
434       synchronized (poolLock) {
435         // Ensure normal number of threads
436
if (getPoolSize() < getNormalPoolSize()) {
437           addThread(task);
438           return;
439         } // if
440

441         // Try to give to existing thread
442
if (handOff.put(task, 0)) {
443           return;
444         } // if put
445

446         // There was no immediately available thread,
447
// so try to add a new thread to pool.
448
if (getPoolSize() < getMaximumPoolSize()) {
449           addThread(task);
450           return;
451         } // if maximumPoolSize
452
} // synchronized
453

454       // Cannot hand off and cannot create -- ask for help
455
if (getBlockedExecutionStrategy().blockedAction(task)) {
456         return;
457       } // if blockedAction
458
} // while
459
} // execute(Runnable)
460

461 }
462
Popular Tags