KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > control > activity > task > thread > ThreadPoolTask


1 /**
2  * Dream Copyright (C) 2003-2004 INRIA Rhone-Alpes
3  *
4  * This library is free software; you can redistribute it and/or modify it under
5  * the terms of the GNU Lesser General Public License as published by the Free
6  * Software Foundation; either version 2 of the License, or (at your option) any
7  * later version.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Contact: dream@objectweb.org
19  *
20  * Initial developer(s): Matthieu Leclercq
21  * Contributor(s):
22  */

23
24 package org.objectweb.dream.control.activity.task.thread;
25
26 import java.util.ArrayList JavaDoc;
27 import java.util.HashSet JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.LinkedList JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Set JavaDoc;
32
33 import org.objectweb.dream.control.activity.task.Task;
34 import org.objectweb.dream.control.activity.task.TaskLifeCycleController;
35 import org.objectweb.dream.control.activity.task.TaskStoppedListener;
36 import org.objectweb.fractal.api.NoSuchInterfaceException;
37 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
38 import org.objectweb.fractal.julia.control.lifecycle.ChainedIllegalLifeCycleException;
39 import org.objectweb.util.monolog.api.BasicLevel;
40
41 /**
42  * Thread Pool implementation. In a thread pool, many threads call concurrently
43  * the scheduler. The number of threads can be controlled by the
44  * {@link ThreadPoolController }control interface.
45  */

46 public class ThreadPoolTask extends AbstractThreadTask
47     implements
48       TaskLifeCycleController,
49       ThreadPoolController
50 {
51
52   Object JavaDoc threadTaskLock = new Object JavaDoc();
53   long waitTimeout = DEFAULT_WAIT_TIMEOUT;
54   int capacity = 0;
55   LinkedList JavaDoc activeThreads = new LinkedList JavaDoc();
56   Set JavaDoc waitingThreads = new HashSet JavaDoc();
57
58   List JavaDoc taskStoppedListeners = new ArrayList JavaDoc();
59   boolean stopping;
60
61   /**
62    * @see AbstractThreadTask#isExecuting()
63    */

64   protected boolean isExecuting()
65   {
66     return ((PoolThread) Thread.currentThread()).executing;
67   }
68
69   /**
70    * @see AbstractThreadTask#setExecuting(boolean)
71    */

72   protected void setExecuting(boolean b)
73   {
74     ((PoolThread) Thread.currentThread()).executing = b;
75   }
76
77   // ---------------------------------------------------------------------------
78
// Implementation of the ThreadPoolController interface
79
// ---------------------------------------------------------------------------
80

81   /**
82    * @see ThreadPoolController#getNbActiveThreads()
83    */

84   public int getNbActiveThreads()
85   {
86     synchronized (threadTaskLock)
87     {
88       return activeThreads.size();
89     }
90   }
91
92   /**
93    * @see ThreadPoolController#addThreads(int)
94    */

95   public void addThreads(int i) throws ThreadPoolOverflowException,
96       IllegalLifeCycleException
97   {
98     synchronized (threadTaskLock)
99     {
100       if (getFcState() == STOPPED)
101       {
102         throw new IllegalLifeCycleException(
103             "Can't add thread in the stopped state.");
104       }
105       if (getNbActiveThreads() + i > getCapacity())
106       {
107         throw new ThreadPoolOverflowException(
108             "Can't add Thread in pool due to capacity limitation");
109       }
110       for (int j = 0; j < i; j++)
111       {
112         PoolThread t = null;
113         synchronized (waitingThreads)
114         {
115           if (!waitingThreads.isEmpty())
116           {
117             // notify a waiting thread
118
Iterator JavaDoc iterator = waitingThreads.iterator();
119             t = (PoolThread) iterator.next();
120             iterator.remove();
121             synchronized (activeThreads)
122             {
123               activeThreads.add(t);
124             }
125             synchronized (t)
126             {
127               logger.log(BasicLevel.DEBUG, "Notify a waiting thread task");
128               t.revive = true;
129               t.notify();
130             }
131             continue;
132           }
133         }
134         logger.log(BasicLevel.DEBUG, "Creates a new thread task");
135         t = new PoolThread();
136         synchronized (activeThreads)
137         {
138           activeThreads.add(t);
139         }
140         t.start();
141       }
142     }
143   }
144
145   /**
146    * @see ThreadPoolController#removeThreads(int)
147    */

148   public void removeThreads(int i) throws IllegalLifeCycleException
149   {
150     synchronized (threadTaskLock)
151     {
152       if (getFcState() == STOPPED)
153       {
154         throw new IllegalLifeCycleException(
155             "Can't remove thread in the stopped state.");
156       }
157       for (int j = 0; j < i; j++)
158       {
159         PoolThread t;
160         synchronized (activeThreads)
161         {
162           t = (PoolThread) activeThreads.removeFirst();
163         }
164         logger.log(BasicLevel.DEBUG, "Stop a thread task");
165         t.executing = false;
166         t.interrupt();
167       }
168     }
169   }
170
171   /**
172    * @see ThreadPoolController#setCapacity(int)
173    */

174   public void setCapacity(int i)
175   {
176     capacity = i;
177   }
178
179   /**
180    * @see ThreadPoolController#getCapacity()
181    */

182   public int getCapacity()
183   {
184     return capacity;
185   }
186
187   /**
188    * @see ThreadPoolController#getWaitTimeout()
189    */

190   public long getWaitTimeout()
191   {
192     return waitTimeout;
193   }
194
195   /**
196    * @see ThreadPoolController#setWaitTimeout(long)
197    */

198   public void setWaitTimeout(long millis)
199   {
200     waitTimeout = millis;
201   }
202
203   // ---------------------------------------------------------------------------
204
// Implementation of the LifeCycleController interface
205
// ---------------------------------------------------------------------------
206

207   /**
208    * @see org.objectweb.fractal.api.control.LifeCycleController#startFc()
209    */

210   public void startFc() throws IllegalLifeCycleException
211   {
212     if (getFcState() == STARTED)
213     {
214       return;
215     }
216     synchronized (threadTaskLock)
217     {
218       super.startFc();
219       stopping = false;
220     }
221   }
222
223   /**
224    * @see org.objectweb.fractal.api.control.LifeCycleController#stopFc()
225    */

226   public void stopFc() throws IllegalLifeCycleException
227   {
228     if (getFcState() == STOPPED)
229     {
230       return;
231     }
232     synchronized (threadTaskLock)
233     {
234       logger.log(BasicLevel.DEBUG, "Stopping thread pool task");
235       stopping = true;
236       interruptPool();
237       boolean noActiveThread;
238       synchronized (activeThreads)
239       {
240         noActiveThread = activeThreads.isEmpty();
241       }
242       while (!noActiveThread)
243       {
244         try
245         {
246           logger.log(BasicLevel.DEBUG, "Join threads of the task");
247           threadTaskLock.wait();
248         }
249         catch (InterruptedException JavaDoc e)
250         {
251           throw new ChainedIllegalLifeCycleException(e, weaveableC,
252               "Interrupted while waiting for the end of the thread pool.");
253         }
254         synchronized (activeThreads)
255         {
256           noActiveThread = activeThreads.isEmpty();
257         }
258       }
259     }
260   }
261
262   /**
263    * @see TaskLifeCycleController#asyncStop(TaskStoppedListener)
264    */

265   public void asyncStop(TaskStoppedListener listener)
266   {
267     if (getFcState() == STOPPED)
268     {
269       // thread is already stopped
270
Task taskItf = null;
271       try
272       {
273         taskItf = (Task) weaveableC.getFcInterface("task");
274       }
275       catch (NoSuchInterfaceException ignored)
276       {
277         // can't happend
278
}
279       listener.taskStopped(taskItf);
280       return;
281     }
282     synchronized (threadTaskLock)
283     {
284       logger.log(BasicLevel.DEBUG, "Stopping asynchronously thread pool task");
285       stopping = true;
286       interruptPool();
287       taskStoppedListeners.add(listener);
288     }
289   }
290
291   // ---------------------------------------------------------------------------
292
// Utility method
293
// ---------------------------------------------------------------------------
294

295   protected void interruptPool()
296   {
297     logger.log(BasicLevel.DEBUG, "Interrupting threads of the task");
298     synchronized (activeThreads)
299     {
300       Iterator JavaDoc iter = activeThreads.iterator();
301       while (iter.hasNext())
302       {
303         PoolThread thread = (PoolThread) iter.next();
304         thread.executing = false;
305         thread.interrupt();
306       }
307     }
308     synchronized (waitingThreads)
309     {
310       Iterator JavaDoc iter = waitingThreads.iterator();
311       while (iter.hasNext())
312       {
313         PoolThread thread = (PoolThread) iter.next();
314         thread.executing = false;
315         thread.interrupt();
316       }
317     }
318   }
319
320   // ---------------------------------------------------------------------------
321
// Inner class
322
// ---------------------------------------------------------------------------
323

324   private class PoolThread extends Thread JavaDoc
325   {
326
327     boolean executing = true;
328     boolean revive;
329
330     // ---------------------------------------------------------------------------
331
// Implementation of the Runnable interface
332
// ---------------------------------------------------------------------------
333

334     /**
335      * @see Runnable#run()
336      */

337     public void run()
338     {
339       logger.log(BasicLevel.DEBUG, "Begin of the run method");
340       boolean runThread = true;
341       do
342       {
343         revive = false;
344         try
345         {
346           execute(null);
347         }
348         catch (InterruptedException JavaDoc e)
349         {
350           logger.log(BasicLevel.DEBUG, "Interrupted");
351         }
352         finally
353         {
354           boolean noActiveThread;
355           synchronized (activeThreads)
356           {
357             // remove myself from activeThreads.
358
activeThreads.remove(this);
359             if (activeThreads.size() >= getCapacity())
360             {
361               logger.log(BasicLevel.DEBUG, "More active threads than capacity");
362               runThread = false;
363             }
364             noActiveThread = activeThreads.isEmpty();
365           }
366           List JavaDoc listeners = null;
367           synchronized (threadTaskLock)
368           {
369             if (stopping && noActiveThread)
370             {
371               logger.log(BasicLevel.DEBUG,
372                   "No more active thread component stopped");
373               try
374               {
375                 // set state to stop.
376
ThreadPoolTask.super.stopFc();
377                 stopping = false;
378               }
379               catch (IllegalLifeCycleException ignored)
380               {
381                 // ignored
382
}
383               // notify threads that may be waiting in stopFc.
384
threadTaskLock.notifyAll();
385               listeners = taskStoppedListeners;
386               taskStoppedListeners = new ArrayList JavaDoc();
387             }
388           }
389           if (listeners != null)
390           {
391             // notify listener.
392
Task taskItf = null;
393             try
394             {
395               taskItf = (Task) weaveableC.getFcInterface("task");
396             }
397             catch (NoSuchInterfaceException ignored)
398             {
399               // can't happend
400
}
401             Iterator JavaDoc iter = listeners.iterator();
402             while (iter.hasNext())
403             {
404               TaskStoppedListener listener = (TaskStoppedListener) iter.next();
405               listener.taskStopped(taskItf);
406             }
407             // end run method.
408
break;
409           }
410           synchronized (waitingThreads)
411           {
412             // add myself in waitingThreads.
413
waitingThreads.add(this);
414           }
415           try
416           {
417             synchronized (this)
418             {
419               if (!revive && runThread)
420               {
421                 try
422                 {
423                   logger.log(BasicLevel.DEBUG, "Waits for revive");
424                   this.wait(waitTimeout);
425                   executing = true;
426                 }
427                 catch (InterruptedException JavaDoc e1)
428                 {
429                   logger.log(BasicLevel.DEBUG,
430                       "Interrupted while waiting for timeout");
431                 }
432               }
433             }
434           }
435           finally
436           {
437             synchronized (waitingThreads)
438             {
439               // remove myself in waitingThreads.
440
waitingThreads.remove(this);
441             }
442           }
443         }
444       }
445       while (revive && runThread);
446       logger.log(BasicLevel.DEBUG, "End of the run method");
447     }
448   }
449 }
Popular Tags