KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > util > ThreadPool


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.util;
31
32 import com.caucho.config.ConfigException;
33
34 import java.util.ArrayList JavaDoc;
35 import java.util.logging.Level JavaDoc;
36 import java.util.logging.Logger JavaDoc;
37
38 /**
39  * A generic pool of threads available for Alarms and Work tasks.
40  */

41 public class ThreadPool {
42   private static final L10N L = new L10N(ThreadPool.class);
43   private static final Logger JavaDoc log
44     = Logger.getLogger(ThreadPool.class.getName());
45   
46   private static final long MAX_EXPIRE = Long.MAX_VALUE / 2;
47
48   private static final ThreadPool _globalThreadPool = new ThreadPool();
49
50   private static int _g_id;
51     
52   private int _threadMax = 8192;
53   
54   private int _threadIdleMin = 5;
55   private int _threadIdleMax = 10;
56
57   private long _resetCount;
58
59   private final ArrayList JavaDoc<Item> _threads
60     = new ArrayList JavaDoc<Item>();
61
62   private final ArrayList JavaDoc<Runnable JavaDoc> _taskQueue
63     = new ArrayList JavaDoc<Runnable JavaDoc>();
64
65   private final ArrayList JavaDoc<ClassLoader JavaDoc> _loaderQueue
66     = new ArrayList JavaDoc<ClassLoader JavaDoc>();
67
68   private final ThreadLauncher _launcher = new ThreadLauncher();
69   private final ScheduleThread _scheduler = new ScheduleThread();
70   
71   private boolean _isQueuePriority;
72
73   private final Object JavaDoc _idleLock = new Object JavaDoc();
74   
75   private Item _idleHead;
76
77   private static int _threadCount;
78   // number of threads in the idle stack
79
private static int _idleCount;
80   // number of threads which are in the process of starting
81
private static int _startCount;
82
83   private static int _scheduleWaitCount;
84
85   private ThreadPool()
86   {
87   }
88
89   public static ThreadPool getThreadPool()
90   {
91     return _globalThreadPool;
92   }
93
94   //
95
// Configuration properties
96
//
97

98   /**
99    * Sets the maximum number of threads.
100    */

101   public void setThreadMax(int max)
102   {
103     if (max < _threadIdleMax)
104       throw new ConfigException(L.l("lt;thread-max> ({0}) must be less than &lt;thread-idle-max> ({1})", max, _threadIdleMax));
105     
106     _threadMax = max;
107
108   }
109
110   /**
111    * Gets the maximum number of threads.
112    */

113   public int getThreadMax()
114   {
115     return _threadMax;
116   }
117
118   /**
119    * Sets the minimum number of idle threads.
120    */

121   public void setThreadIdleMin(int min)
122   {
123     if (_threadIdleMax < min)
124       throw new ConfigException(L.l("lt;thread-idle-min> ({0}) must be less than &lt;thread-idle-max> ({1})", min, _threadIdleMax));
125     
126     _threadIdleMin = min;
127   }
128
129   /**
130    * Gets the minimum number of idle threads.
131    */

132   public int getThreadIdleMin()
133   {
134     return _threadIdleMin;
135   }
136
137   /**
138    * Sets the maximum number of idle threads.
139    */

140   public void setThreadIdleMax(int max)
141   {
142     if (max < _threadIdleMin)
143       throw new ConfigException(L.l("lt;thread-idle-max> ({0}) must be greater than &lt;thread-idle-min> ({1})",
144                     max, _threadIdleMin));
145     
146     if (_threadMax < max)
147       throw new ConfigException(L.l("lt;thread-idle-max> ({0}) must be less than &lt;thread-max> ({1})",
148                     max, _threadMax));
149     
150     _threadIdleMax = max;
151   }
152
153   /**
154    * Gets the maximum number of idle threads.
155    */

156   public int getThreadIdleMax()
157   {
158     return _threadIdleMax;
159   }
160
161   /**
162    * Returns the total thread count.
163    */

164   public int getThreadCount()
165   {
166     return _threadCount;
167   }
168
169   /**
170    * Returns the idle thread count.
171    */

172   public int getThreadIdleCount()
173   {
174     return _idleCount;
175   }
176
177   /**
178    * Returns the active thread count.
179    */

180   public int getThreadActiveCount()
181   {
182     return getThreadCount() - getThreadIdleCount();
183   }
184
185   /**
186    * Returns the free thread count.
187    */

188   public int getFreeThreadCount()
189   {
190     return _threadMax - _threadCount;
191   }
192
193   //
194
// Resin methods
195
//
196

197   /**
198    * Resets the thread pool, letting old threads drain.
199    */

200   public void reset()
201   {
202     // XXX: not reliable
203
_resetCount++;
204   }
205
206   /**
207    * Resets the thread pool, letting old threads drain.
208    */

209   public void closeEnvironment(ClassLoader JavaDoc env)
210   {
211     // XXX: incorrect
212
reset();
213   }
214
215   /**
216    * Schedules a new task.
217    */

218   public boolean schedule(Runnable JavaDoc task)
219   {
220     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
221     
222     return schedule(task, loader, getDefaultPriority(), MAX_EXPIRE, true);
223   }
224
225   /**
226    * Adds a new task.
227    */

228   public boolean schedule(Runnable JavaDoc task, long timeout)
229   {
230     long expire;
231     
232     if (timeout < 0 || timeout > MAX_EXPIRE)
233       expire = MAX_EXPIRE;
234     else
235       expire = Alarm.getCurrentTime() + timeout;
236     
237     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
238     
239     return schedule(task, loader, getDefaultPriority(), expire, true);
240   }
241
242   /**
243    * Adds a new task.
244    */

245   public void schedulePriority(Runnable JavaDoc task)
246   {
247     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
248     
249     schedule(task, loader, 0, MAX_EXPIRE, true);
250   }
251
252   /**
253    * Adds a new task.
254    */

255   public boolean start(Runnable JavaDoc task)
256   {
257     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
258     
259     return schedule(task, loader, getDefaultPriority(), MAX_EXPIRE, false);
260   }
261
262   /**
263    * Adds a new task.
264    */

265   public boolean start(Runnable JavaDoc task, long timeout)
266   {
267     long expire;
268     
269     if (timeout < 0 || timeout > MAX_EXPIRE)
270       expire = MAX_EXPIRE;
271     else
272       expire = Alarm.getCurrentTime() + timeout;
273     
274     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
275     
276     return schedule(task, loader, getDefaultPriority(), expire, false);
277   }
278
279   /**
280    * Adds a new task.
281    */

282   public void startPriority(Runnable JavaDoc task)
283   {
284     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
285     
286     schedule(task, loader, 0, MAX_EXPIRE, false);
287   }
288
289   /**
290    * Adds a new task.
291    */

292   public boolean startPriority(Runnable JavaDoc task, long timeout)
293   {
294     long expire;
295     
296     if (timeout < 0 || timeout > MAX_EXPIRE)
297       expire = MAX_EXPIRE;
298     else
299       expire = Alarm.getCurrentTime() + timeout;
300     
301     ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
302     
303     return schedule(task, loader, 0, expire, true);
304   }
305
306   private int getDefaultPriority()
307   {
308     if (_threadIdleMin >= 10)
309       return 5;
310     else if (_threadIdleMin >= 5)
311       return 2;
312     else if (_threadIdleMin >= 1)
313       return 1;
314     else
315       return 0;
316   }
317
318   /**
319    * interrupts all the threads.
320    */

321   public void interrupt()
322   {
323     synchronized (_idleLock) {
324       for (Item item = _idleHead;
325        item != null;
326        item = item._next) {
327     Thread JavaDoc thread = item.getThread();
328
329     if (thread != null) {
330       try {
331         thread.interrupt();
332       } catch (Throwable JavaDoc e) {
333       }
334     }
335       }
336     }
337   }
338
339   /**
340    * Adds a new task.
341    */

342   private boolean schedule(Runnable JavaDoc task,
343                ClassLoader JavaDoc loader,
344                int freeThreads,
345                long expireTime,
346                boolean queueIfFull)
347   {
348     Item poolItem = null;
349
350     while (poolItem == null) {
351       try {
352     synchronized (_idleLock) {
353       int idleCount = _idleCount;
354       int freeCount = idleCount + _threadMax - _threadCount;
355       boolean startNew = false;
356
357       if (idleCount > 0 && freeThreads < freeCount) {
358         poolItem = _idleHead;
359         _idleHead = poolItem._next;
360
361         poolItem._next = null;
362         poolItem._prev = null;
363         poolItem._isIdle = false;
364         if (_idleHead != null)
365           _idleHead._prev = null;
366
367         _idleCount--;
368
369         if (idleCount < _threadIdleMin)
370           startNew = true;
371       }
372       else
373         startNew = true;
374
375       if (startNew) {
376         synchronized (_launcher) {
377           _launcher.notifyAll();
378         }
379         
380         if (poolItem == null) {
381           if (queueIfFull) {
382         synchronized (_taskQueue) {
383           _taskQueue.add(task);
384           _loaderQueue.add(loader);
385           _taskQueue.notifyAll();
386         }
387         
388         return false;
389           }
390
391           if (expireTime < Alarm.getCurrentTime())
392         return false;
393           
394           _scheduleWaitCount++;
395         
396           try {
397         // clear interrupted flag
398
Thread.interrupted();
399         
400         _idleLock.wait(5000);
401           } finally {
402         _scheduleWaitCount--;
403           }
404         }
405       }
406     }
407       } catch (OutOfMemoryError JavaDoc e) {
408     try {
409       System.err.println("Exiting due to OutOfMemoryError");
410     } finally {
411       System.exit(11);
412     }
413       } catch (Throwable JavaDoc e) {
414     e.printStackTrace();
415       }
416     }
417
418     poolItem.start(task, loader);
419
420     return true;
421   }
422
423   class Item implements Runnable JavaDoc {
424     private final int _id;
425     private final String JavaDoc _name;
426
427     private Thread JavaDoc _thread;
428     private Thread JavaDoc _queueThread;
429
430     private Item _prev;
431     private Item _next;
432     private boolean _isIdle;
433
434     private long _threadResetCount;
435   
436     private Runnable JavaDoc _task;
437     private ClassLoader JavaDoc _classLoader;
438
439     private Item()
440     {
441       synchronized (Item.class) {
442     _id = _g_id++;
443     _name = "resin-" + _id;
444       }
445     }
446
447     /**
448      * Returns the id.
449      */

450     int getId()
451     {
452       return _id;
453     }
454
455     /**
456      * Returns the name.
457      */

458     public String JavaDoc getName()
459     {
460       return _name;
461     }
462
463     /**
464      * Returns the thread id.
465      */

466     public long getThreadId()
467     {
468       return _thread.getId();
469     }
470
471     /**
472      * Returns the thread.
473      */

474     Thread JavaDoc getThread()
475     {
476       return _thread;
477     }
478
479     /**
480      * Starts the thread.
481      */

482     private boolean start(Runnable JavaDoc task, ClassLoader JavaDoc loader)
483     {
484       synchronized (this) {
485     _task = task;
486     _classLoader = loader;
487
488     notifyAll();
489       }
490
491       return true;
492     }
493
494     /**
495      * The main thread execution method.
496      */

497     public void run()
498     {
499       _thread = Thread.currentThread();
500
501       synchronized (_idleLock) {
502     _threadCount++;
503     _startCount--;
504     _threads.add(this);
505
506     if (_startCount < 0) {
507       System.out.println("ThreadPool start count is negative: " + _startCount);
508       _startCount = 0;
509     }
510       }
511       
512       try {
513     runTasks();
514       } finally {
515     synchronized (_idleLock) {
516       _threadCount--;
517
518       _threads.remove(this);
519     }
520
521     if (_threadCount < _threadIdleMin) {
522       synchronized (_launcher) {
523         _launcher.notifyAll();
524       }
525     }
526       }
527     }
528
529     private void runTasks()
530     {
531       _threadResetCount = _resetCount;
532     
533       Thread JavaDoc thread = Thread.currentThread();
534       ClassLoader JavaDoc systemClassLoader = ClassLoader.getSystemClassLoader();
535       boolean isIdle = false;
536
537       while (true) {
538     try {
539       // put the thread into the idle ring
540
if (! isIdle) {
541         _isQueuePriority = true;
542       
543         isIdle = true;
544       
545         synchronized (_idleLock) {
546           if (_threadIdleMax < _idleCount) {
547         return;
548           }
549           
550           _next = _idleHead;
551           _prev = null;
552           _isIdle = true;
553
554           if (_idleHead != null)
555         _idleHead._prev = this;
556
557           _idleHead = this;
558           _idleCount++;
559
560           if (_scheduleWaitCount > 0)
561         _idleLock.notifyAll();
562         }
563       }
564
565       Runnable JavaDoc task = null;
566       ClassLoader JavaDoc classLoader = null;
567
568       // clear interrupted flag
569
Thread.interrupted();
570     
571       // wait for the next available task
572
synchronized (this) {
573         if (_task == null) {
574           thread.setContextClassLoader(systemClassLoader);
575           wait(60000L);
576         }
577
578         task = _task;
579         _task = null;
580
581         classLoader = _classLoader;
582         _classLoader = null;
583       }
584
585       // if the task is available, run it in the proper context
586
if (task != null) {
587         isIdle = false;
588
589         thread.setContextClassLoader(classLoader);
590         try {
591           task.run();
592         } catch (Throwable JavaDoc e) {
593           log.log(Level.WARNING, e.toString(), e);
594         } finally {
595           thread.setContextClassLoader(ClassLoader.getSystemClassLoader());
596         }
597       }
598       else {
599         boolean isDead = false;
600         boolean isReset = false;
601
602         // check to see if we're over the idle thread limit
603
synchronized (_idleLock) {
604           if (_isIdle &&
605           (_threadIdleMax < _idleCount ||
606            _resetCount != _threadResetCount)) {
607         isDead = true;
608
609         isReset = _resetCount != _threadResetCount;
610           
611         Item next = _next;
612         Item prev = _prev;
613
614         _next = null;
615         _prev = null;
616         _isIdle = false;
617
618         if (next != null)
619           next._prev = prev;
620
621         if (prev != null)
622           prev._next = next;
623         else
624           _idleHead = next;
625
626         _idleCount--;
627           }
628         }
629
630         if (isReset) {
631           synchronized (_launcher) {
632         _launcher.notifyAll();
633           }
634         }
635       
636         if (isDead)
637           return;
638       }
639     } catch (Throwable JavaDoc e) {
640     }
641       }
642     }
643   }
644
645   class ThreadLauncher implements Runnable JavaDoc {
646     private ThreadLauncher()
647     {
648       Thread JavaDoc thread = new Thread JavaDoc(this);
649       thread.setName("resin-thread-launcher");
650       thread.setDaemon(true);
651
652       thread.start();
653     }
654
655     /**
656      * Starts a new connection
657      */

658     private boolean startConnection(long waitTime)
659       throws InterruptedException JavaDoc
660     {
661       boolean doStart = true;
662       
663       synchronized (_idleLock) {
664     int idleCount = _idleCount;
665
666     if (_threadMax < _threadCount + _startCount)
667       doStart = false;
668     else if (_threadIdleMin < idleCount + _startCount)
669       doStart = false;
670
671     if (doStart)
672       _startCount++;
673       }
674
675       if (doStart) {
676     try {
677       Item poolItem = new Item();
678     
679       Thread JavaDoc thread = new Thread JavaDoc(poolItem, poolItem.getName());
680       thread.setDaemon(true);
681
682       thread.start();
683     } catch (Throwable JavaDoc e) {
684       _startCount--;
685
686       e.printStackTrace();
687       if (_startCount < 0) {
688         Thread.dumpStack();
689         _startCount = 0;
690       }
691     }
692
693     // Thread.yield();
694
}
695       else {
696     Thread.interrupted();
697     synchronized (this) {
698       wait(waitTime);
699       return false;
700     }
701       }
702
703       return true;
704     }
705     
706     public void run()
707     {
708       ClassLoader JavaDoc systemLoader = ClassLoader.getSystemClassLoader();
709       
710       Thread.currentThread().setContextClassLoader(systemLoader);
711
712       try {
713     for (int i = 0; i < _threadIdleMin; i++)
714       startConnection(0);
715       } catch (Throwable JavaDoc e) {
716     e.printStackTrace();
717       }
718       
719       while (true) {
720     try {
721       startConnection(10000);
722
723       //Thread.currentThread().sleep(5);
724
Thread.currentThread().yield();
725     } catch (OutOfMemoryError JavaDoc e) {
726       System.exit(10);
727     } catch (Throwable JavaDoc e) {
728       e.printStackTrace();
729     }
730       }
731     }
732   }
733
734   class ScheduleThread implements Runnable JavaDoc {
735     private ScheduleThread()
736     {
737       Thread JavaDoc thread = new Thread JavaDoc(this);
738       thread.setName("resin-thread-scheduler");
739       thread.setDaemon(true);
740
741       thread.start();
742     }
743     
744     public void run()
745     {
746       ClassLoader JavaDoc systemLoader = ClassLoader.getSystemClassLoader();
747
748       Thread JavaDoc thread = Thread.currentThread();
749       thread.setContextClassLoader(systemLoader);
750       
751       while (true) {
752     try {
753       Runnable JavaDoc task = null;
754       ClassLoader JavaDoc loader = null;
755
756       Thread.interrupted();
757       
758       synchronized (_taskQueue) {
759         if (_taskQueue.size() > 0) {
760           task = _taskQueue.remove(0);
761           loader = _loaderQueue.remove(0);
762         }
763         else {
764           try {
765         _taskQueue.wait(60000);
766           } catch (Throwable JavaDoc e) {
767         thread.interrupted();
768         log.finer(e.toString());
769           }
770         }
771       }
772
773       if (task != null) {
774         schedule(task, loader, _threadIdleMin, MAX_EXPIRE, false);
775       }
776     } catch (OutOfMemoryError JavaDoc e) {
777       System.exit(10);
778     } catch (Throwable JavaDoc e) {
779       e.printStackTrace();
780     }
781       }
782     }
783   }
784 }
785
Popular Tags