KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > util > threadpool > BasicThreadPool


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.util.threadpool;
23
24 import java.util.Collections JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import org.jboss.util.collection.WeakValueHashMap;
28 import org.jboss.logging.Logger;
29
30 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
31 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
32 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
33 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
34 import EDU.oswego.cs.dl.util.concurrent.Heap;
35
36 /**
37  * A basic thread pool.
38  *
39  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
40  * @author Scott.Stark@jboss.org
41  * @version $Revision: 1958 $
42  */

43 public class BasicThreadPool implements ThreadPool, BasicThreadPoolMBean
44 {
45    // Constants -----------------------------------------------------
46

47    /** The jboss thread group */
48    private static final ThreadGroup JavaDoc JBOSS_THREAD_GROUP = new ThreadGroup JavaDoc("JBoss Pooled Threads");
49
50    /** The thread groups */
51    private static final Map JavaDoc threadGroups = Collections.synchronizedMap(new WeakValueHashMap());
52
53    /** The internal pool number */
54    private static final SynchronizedInt lastPoolNumber = new SynchronizedInt(0);
55
56    private static Logger log = Logger.getLogger(BasicThreadPool.class);
57
58    // Attributes ----------------------------------------------------
59

60    /** The thread pool name */
61    private String JavaDoc name;
62
63    /** The internal pool number */
64    private int poolNumber;
65
66    /** The blocking mode */
67    private BlockingMode blockingMode = BlockingMode.ABORT;
68
69    /** The pooled executor */
70    private MinPooledExecutor executor;
71
72    /** The queue */
73    private BoundedLinkedQueue queue;
74
75    /** The thread group */
76    private ThreadGroup JavaDoc threadGroup;
77
78    /** The last thread number */
79    private SynchronizedInt lastThreadNumber = new SynchronizedInt(0);
80
81    /** Has the pool been stopped? */
82    private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
83    /** The Heap<TimeoutInfo> of tasks ordered by their completion timeout */
84    private Heap tasksWithTimeouts = new Heap(13);
85    /** The task completion timeout monitor runnable */
86    private TimeoutMonitor timeoutTask;
87    /** The trace level logging flag */
88    private boolean trace;
89
90    // Static --------------------------------------------------------
91

92    // Constructors --------------------------------------------------
93

94    /**
95     * Create a new thread pool
96     */

97    public BasicThreadPool()
98    {
99       this("ThreadPool");
100    }
101
102    /**
103     * Create a new thread pool with a default queue size of 1024, max pool
104     * size of 100, min pool size of 4, and a keep alive of 60 seconds.
105     *
106     * @param name the pool name
107     */

108    public BasicThreadPool(String JavaDoc name)
109    {
110       this(name, JBOSS_THREAD_GROUP);
111    }
112
113    /**
114     * Create a new thread pool with a default queue size of 1024, max pool
115     * size of 100, min pool size of 4, and a keep alive of 60 seconds.
116     *
117     * @param name the pool name
118     * @param threadGroup threadGroup
119     */

120    public BasicThreadPool(String JavaDoc name, ThreadGroup JavaDoc threadGroup)
121    {
122       trace = log.isTraceEnabled();
123       ThreadFactory factory = new ThreadPoolThreadFactory();
124
125       queue = new BoundedLinkedQueue(1024);
126
127       executor = new MinPooledExecutor(queue, 100);
128       executor.setMinimumPoolSize(4);
129       executor.setKeepAliveTime(60 * 1000);
130       executor.setThreadFactory(factory);
131       executor.abortWhenBlocked();
132
133       poolNumber = lastPoolNumber.increment();
134       setName(name);
135       this.threadGroup = threadGroup;
136    }
137
138    // Public --------------------------------------------------------
139

140    // ThreadPool ----------------------------------------------------
141

142    public void stop(boolean immediate)
143    {
144       log.debug("stop, immediate="+immediate);
145       stopped.set(true);
146       if (immediate)
147          executor.shutdownNow();
148       else
149          executor.shutdownAfterProcessingCurrentlyQueuedTasks();
150    }
151
152    public void waitForTasks() throws InterruptedException JavaDoc
153    {
154       executor.awaitTerminationAfterShutdown();
155    }
156    public void waitForTasks(long maxWaitTime) throws InterruptedException JavaDoc
157    {
158       executor.awaitTerminationAfterShutdown(maxWaitTime);
159    }
160
161    public void runTaskWrapper(TaskWrapper wrapper)
162    {
163       if( trace )
164          log.trace("runTaskWrapper, wrapper="+wrapper);
165       if (stopped.get())
166       {
167          wrapper.rejectTask(new ThreadPoolStoppedException("Thread pool has been stopped"));
168          return;
169       }
170
171       wrapper.acceptTask();
172
173       long completionTimeout = wrapper.getTaskCompletionTimeout();
174       TimeoutInfo info = null;
175       if( completionTimeout > 0 )
176       {
177          checkTimeoutMonitor();
178          // Install the task in the
179
info = new TimeoutInfo(wrapper, completionTimeout);
180          tasksWithTimeouts.insert(info);
181       }
182       int waitType = wrapper.getTaskWaitType();
183       switch (waitType)
184       {
185          case Task.WAIT_FOR_COMPLETE:
186          {
187             executeOnThread(wrapper);
188             break;
189          }
190          default:
191          {
192             execute(wrapper);
193          }
194       }
195       waitForTask(wrapper);
196    }
197
198    public void runTask(Task task)
199    {
200       BasicTaskWrapper wrapper = new BasicTaskWrapper(task);
201       runTaskWrapper(wrapper);
202    }
203
204    public void run(Runnable JavaDoc runnable)
205    {
206       run(runnable, 0, 0);
207    }
208
209    public void run(Runnable JavaDoc runnable, long startTimeout, long completeTimeout)
210    {
211       RunnableTaskWrapper wrapper = new RunnableTaskWrapper(runnable, startTimeout, completeTimeout);
212       runTaskWrapper(wrapper);
213    }
214
215    public ThreadGroup JavaDoc getThreadGroup()
216    {
217       return threadGroup;
218    }
219
220    // ThreadPoolMBean implementation --------------------------------
221

222    public String JavaDoc getName()
223    {
224       return name;
225    }
226
227    public void setName(String JavaDoc name)
228    {
229       this.name = name;
230    }
231
232    public int getPoolNumber()
233    {
234       return poolNumber;
235    }
236
237    public String JavaDoc getThreadGroupName()
238    {
239       return threadGroup.getName();
240    }
241
242    public void setThreadGroupName(String JavaDoc threadGroupName)
243    {
244       ThreadGroup JavaDoc group;
245       synchronized(threadGroups)
246       {
247          group = (ThreadGroup JavaDoc) threadGroups.get(threadGroupName);
248          if (group == null)
249          {
250             group = new ThreadGroup JavaDoc(JBOSS_THREAD_GROUP, threadGroupName);
251             threadGroups.put(threadGroupName, group);
252          }
253       }
254       threadGroup = group;
255    }
256
257    public int getQueueSize()
258    {
259       return queue.size();
260    }
261
262    public int getMaximumQueueSize()
263    {
264       return queue.capacity();
265    }
266
267    public void setMaximumQueueSize(int size)
268    {
269       queue.setCapacity(size);
270    }
271
272    public int getPoolSize()
273    {
274       return executor.getPoolSize();
275    }
276
277    public int getMinimumPoolSize()
278    {
279       return executor.getMinimumPoolSize();
280    }
281
282    public void setMinimumPoolSize(int size)
283    {
284       synchronized (executor)
285       {
286          executor.setKeepAliveSize(size);
287          // Don't let the min size > max size
288
if (executor.getMaximumPoolSize() < size)
289          {
290             executor.setMinimumPoolSize(size);
291             executor.setMaximumPoolSize(size);
292          }
293       }
294    }
295
296    public int getMaximumPoolSize()
297    {
298       return executor.getMaximumPoolSize();
299    }
300    
301    public void setMaximumPoolSize(int size)
302    {
303       synchronized (executor)
304       {
305          executor.setMinimumPoolSize(size);
306          executor.setMaximumPoolSize(size);
307          // Don't let the min size > max size
308
if (executor.getKeepAliveSize() > size)
309             executor.setKeepAliveSize(size);
310       }
311    }
312
313    public long getKeepAliveTime()
314    {
315       return executor.getKeepAliveTime();
316    }
317
318    public void setKeepAliveTime(long time)
319    {
320       executor.setKeepAliveTime(time);
321    }
322
323    public BlockingMode getBlockingMode()
324    {
325       return blockingMode;
326    }
327
328    public void setBlockingMode(BlockingMode mode)
329    {
330       blockingMode = mode;
331       
332       if( blockingMode == BlockingMode.RUN )
333       {
334          executor.runWhenBlocked();
335       }
336       else if( blockingMode == BlockingMode.WAIT )
337       {
338          executor.waitWhenBlocked();
339       }
340       else if( blockingMode == BlockingMode.DISCARD )
341       {
342          executor.discardWhenBlocked();
343       }
344       else if( blockingMode == BlockingMode.DISCARD_OLDEST )
345       {
346          executor.discardOldestWhenBlocked();
347       }
348       else if( blockingMode == BlockingMode.ABORT )
349       {
350          executor.abortWhenBlocked();
351       }
352       else
353       {
354          throw new IllegalArgumentException JavaDoc("Failed to recognize mode: "+mode);
355       }
356    }
357
358    /**
359     * For backward compatibility with the previous string based mode
360     * @param name - the string form of the mode enum
361     */

362    public void setBlockingMode(String JavaDoc name)
363    {
364       blockingMode = BlockingMode.toBlockingMode(name);
365       if( blockingMode == null )
366          blockingMode = BlockingMode.ABORT;
367    }
368
369    /**
370     * For backward compatibility with the previous string based mode
371     * This is needed for microcontainer as it gets confused with overloaded
372     * setters.
373     * @param name - the string form of the mode enum
374     */

375    public void setBlockingModeString(String JavaDoc name)
376    {
377       blockingMode = BlockingMode.toBlockingMode(name);
378       if( blockingMode == null )
379          blockingMode = BlockingMode.ABORT;
380    }
381
382    public ThreadPool getInstance()
383    {
384       return this;
385    }
386
387    public void stop()
388    {
389       stop(false);
390    }
391
392    // Object overrides ----------------------------------------------
393

394    public String JavaDoc toString()
395    {
396       return name + '(' + poolNumber + ')';
397    }
398
399    // Package protected ---------------------------------------------
400

401    // Protected -----------------------------------------------------
402

403    /**
404     * Execute a task on the same thread
405     *
406     * @param wrapper the task wrapper
407     */

408    protected void executeOnThread(TaskWrapper wrapper)
409    {
410       if( trace )
411          log.trace("executeOnThread, wrapper="+wrapper);
412       wrapper.run();
413    }
414
415    /**
416     * Execute a task
417     *
418     * @param wrapper the task wrapper
419     */

420    protected void execute(TaskWrapper wrapper)
421    {
422       if( trace )
423          log.trace("execute, wrapper="+wrapper);
424       try
425       {
426          executor.execute(wrapper);
427       }
428       catch (Throwable JavaDoc t)
429       {
430          wrapper.rejectTask(new ThreadPoolFullException(t.toString()));
431       }
432    }
433
434    /**
435     * Wait for a task
436     *
437     * @param wrapper the task wrapper
438     */

439    protected void waitForTask(TaskWrapper wrapper)
440    {
441       wrapper.waitForTask();
442    }
443
444    /**
445     * Used to lazily create the task completion timeout thread and monitor
446     */

447    protected synchronized void checkTimeoutMonitor()
448    {
449       if( timeoutTask == null )
450          timeoutTask = new TimeoutMonitor(name, log);
451    }
452    protected TimeoutInfo getNextTimeout()
453    {
454       TimeoutInfo info = (TimeoutInfo) this.tasksWithTimeouts.extract();
455       return info;
456    }
457
458    // Private -------------------------------------------------------
459

460    // Inner classes -------------------------------------------------
461

462    /**
463     * A factory for threads
464     */

465    private class ThreadPoolThreadFactory implements ThreadFactory
466    {
467       public Thread JavaDoc newThread(Runnable JavaDoc runnable)
468       {
469          String JavaDoc threadName = BasicThreadPool.this.toString() + "-" + lastThreadNumber.increment();
470          Thread JavaDoc thread = new Thread JavaDoc(threadGroup, runnable, threadName);
471          thread.setDaemon(true);
472          return thread;
473       }
474    }
475
476    /** An encapsulation of a task and its completion timeout
477     */

478    private static class TimeoutInfo implements Comparable JavaDoc
479    {
480       long start;
481       long timeoutMS;
482       TaskWrapper wrapper;
483       boolean firstStop;
484       TimeoutInfo(TaskWrapper wrapper, long timeout)
485       {
486          this.start = System.currentTimeMillis();
487          this.timeoutMS = start + timeout;
488          this.wrapper = wrapper;
489       }
490       public void setTimeout(long timeout)
491       {
492          this.start = System.currentTimeMillis();
493          this.timeoutMS = start + timeout;
494       }
495       /** Order TimeoutInfo based on the timestamp at which the task needs to
496        * be completed by.
497        * @param o a TimeoutInfo
498        * @return the diff between this timeoutMS and the argument timeoutMS
499        */

500       public int compareTo(Object JavaDoc o)
501       {
502          TimeoutInfo ti = (TimeoutInfo) o;
503          long to0 = timeoutMS;
504          long to1 = ti.timeoutMS;
505          int diff = (int) (to0 - to1);
506          return diff;
507       }
508       TaskWrapper getTaskWrapper()
509       {
510          return wrapper;
511       }
512       public long getTaskCompletionTimeout()
513       {
514          return wrapper.getTaskCompletionTimeout();
515       }
516       /** Get the time remaining to the complete timeout timestamp in MS.
517        * @param now - the current System.currentTimeMillis value
518        * @return the time remaining to the complete timeout timestamp in MS.
519        */

520       public long getTaskCompletionTimeout(long now)
521       {
522          return timeoutMS - now;
523       }
524       /** Invoke stopTask on the wrapper and indicate whether this was the first
525        * time the task has been notified to stop.
526        * @return true if this is the first stopTask, false on the second.
527        */

528       public boolean stopTask()
529       {
530          wrapper.stopTask();
531          boolean wasFirstStop = firstStop == false;
532          firstStop = true;
533          return wasFirstStop;
534       }
535    }
536    /**
537     * The monitor runnable which validates that threads are completing within
538     * the task completion timeout limits.
539     */

540    private class TimeoutMonitor implements Runnable JavaDoc
541    {
542       final Logger log;
543       TimeoutMonitor(String JavaDoc name, Logger log)
544       {
545          this.log = log;
546          Thread JavaDoc t = new Thread JavaDoc(this, name+" TimeoutMonitor");
547          t.setDaemon(true);
548          t.start();
549       }
550       /** The monitor thread loops until the pool is shutdown. It waits for
551        * tasks with completion timeouts and sleeps until the next completion
552        * timeout and then interrupts the associated task thread, and invokes
553        * stopTask on the TaskWrapper. A new timeout check is then inserted with
554        * a 1 second timeout to validate that the TaskWrapper has exited the
555        * run method. If it has not, then the associated task thread is stopped
556        * using the deprecated Thread.stop method since this is the only way to
557        * abort a thread that is in spin loop for example.
558        *
559        * @todo this is not responsive to new tasks with timeouts smaller than
560        * the current shortest completion expiration. We probably should interrupt
561        * the thread on each insertion into the timeout heap to ensure better
562        * responsiveness.
563        */

564       public void run()
565       {
566          boolean isStopped = stopped.get();
567          while( isStopped == false )
568          {
569             boolean trace = log.isTraceEnabled();
570             try
571             {
572                TimeoutInfo info = getNextTimeout();
573                if( info != null )
574                {
575                   long now = System.currentTimeMillis();
576                   long timeToTimeout = info.getTaskCompletionTimeout(now);
577                   if( timeToTimeout > 0 )
578                   {
579                      if( trace )
580                      {
581                         log.trace("Will check wrapper="+info.getTaskWrapper()
582                            +" after "+timeToTimeout);
583                      }
584                      Thread.sleep(timeToTimeout);
585                   }
586                   // Check the status of the task
587
TaskWrapper wrapper = info.getTaskWrapper();
588                   if( wrapper.isComplete() == false )
589                   {
590                      if( trace )
591                         log.trace("Failed completion check for wrapper="+wrapper);
592                      if( info.stopTask() == true )
593                      {
594                         // Requeue the TimeoutInfo to see that the task exits run
595
info.setTimeout(1000);
596                         tasksWithTimeouts.insert(info);
597                         if( trace )
598                            log.trace("Rescheduled completion check for wrapper="+wrapper);
599                      }
600                   }
601                }
602                else
603                {
604                   Thread.sleep(1000);
605                }
606             }
607             catch(InterruptedException JavaDoc e)
608             {
609                log.debug("Timeout monitor has been interrupted", e);
610             }
611             catch(Throwable JavaDoc e)
612             {
613                log.debug("Timeout monitor saw unexpected error", e);
614             }
615             isStopped = stopped.get();
616          }
617       }
618    }
619 }
620
Popular Tags