KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > aspects > asynchronous > concurrent > ThreadManagerImpl


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22
23 package org.jboss.aspects.asynchronous.concurrent;
24
25 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
26 import EDU.oswego.cs.dl.util.concurrent.ThreadFactoryUser;
27 import org.jboss.aspects.asynchronous.AsynchronousConstants;
28 import org.jboss.aspects.asynchronous.AsynchronousParameters;
29 import org.jboss.aspects.asynchronous.AsynchronousTask;
30 import org.jboss.aspects.asynchronous.AsynchronousUserTask;
31 import org.jboss.aspects.asynchronous.ThreadManager;
32 import org.jboss.aspects.asynchronous.ThreadManagerRequest;
33 import org.jboss.aspects.asynchronous.ThreadManagerResponse;
34 import org.jboss.aspects.asynchronous.common.AsynchronousEmptyTask;
35 import org.jboss.aspects.asynchronous.common.ThreadManagerResponseImpl;
36
37
38 /**
39  * @author <a HREF="mailto:chussenet@yahoo.com">{Claude Hussenet Independent Consultant}</a>.
40  * @version <tt>$Revision: 37406 $</tt>
41  */

42
43 public class ThreadManagerImpl
44
45 extends ThreadFactoryUser
46
47 implements AsynchronousConstants, ThreadManager
48 {
49
50    protected PooledExecutor _pooledExecutor = null;
51
52    protected boolean waitWhenPoolSizeIsFull = true;
53
54    protected boolean isPooling = true;
55
56    /**
57     * Create a new pool with all default settings
58     */

59
60    public ThreadManagerImpl()
61    {
62
63       _pooledExecutor = new PooledExecutor();
64
65       setWaitWhenPoolSizeIsFull(false);
66
67    }
68
69    /**
70     * Create a new pool with all default settings except
71     * <p/>
72     * for maximum pool size.
73     */

74
75    public ThreadManagerImpl(int maximumPoolSize)
76    {
77
78       _pooledExecutor = new PooledExecutor(maximumPoolSize);
79
80       setWaitWhenPoolSizeIsFull(false);
81
82    }
83
84    /**
85     * Set the minimum number of threads to use.
86     *
87     * @throws IllegalArgumentException if less than zero. (It is not
88     * <p/>
89     * considered an error to set the minimum to be greater than the
90     * <p/>
91     * maximum. However, in this case there are no guarantees about
92     * <p/>
93     * behavior.)
94     */

95
96    public void setMaximumPoolSize(int maximumPoolSize)
97    {
98
99       _pooledExecutor.setMaximumPoolSize(maximumPoolSize);
100
101    }
102
103    /**
104     * Set the policy for blocked execution to be to wait until a thread
105     * <p/>
106     * is available.
107     * <p/>
108     * <p/>
109     * <p/>
110     * OR
111     * <p/>
112     * <p/>
113     * <p/>
114     * Set the policy for blocked execution to be to
115     * <p/>
116     * throw a RuntimeException.
117     */

118
119    public void setWaitWhenPoolSizeIsFull(boolean value)
120    {
121
122       if (value)
123
124          _pooledExecutor.waitWhenBlocked();
125
126       else
127
128          _pooledExecutor.abortWhenBlocked();
129
130       waitWhenPoolSizeIsFull = value;
131
132    }
133
134    /**
135     * return the policy when the pool is full
136     */

137
138    public boolean getWaitWhenPoolSizeIsFull()
139    {
140
141       return waitWhenPoolSizeIsFull;
142
143    }
144
145    /**
146     * Return the maximum number of threads to simultaneously execute
147     */

148
149    public int getMaximumPoolSize()
150    {
151
152       return _pooledExecutor.getMaximumPoolSize();
153
154    }
155
156    /**
157     * Set the minimum number of threads to use.
158     *
159     * @throws IllegalArgumentException if less than zero. (It is not
160     * <p/>
161     * considered an error to set the minimum to be greater than the
162     * <p/>
163     * maximum. However, in this case there are no guarantees about
164     * <p/>
165     * behavior.)
166     */

167
168    public void setMinimumPoolSize(int minimumPoolSize)
169    {
170
171       _pooledExecutor.setMinimumPoolSize(minimumPoolSize);
172
173    }
174
175    /**
176     * Return the minimum number of threads to simultaneously execute.
177     * <p/>
178     * (Default value is 1). If fewer than the mininum number are
179     * <p/>
180     * running upon reception of a new request, a new thread is started
181     * <p/>
182     * to handle this request.
183     */

184
185    public int getMinimumPoolSize()
186    {
187
188       return _pooledExecutor.getMinimumPoolSize();
189
190    }
191
192    /**
193     * Set the number of milliseconds to keep threads alive waiting for
194     * <p/>
195     * new commands. A negative value means to wait forever. A zero
196     * <p/>
197     * value means not to wait at all.
198     */

199
200    public void setKeepAliveTime(long time)
201    {
202
203       _pooledExecutor.setKeepAliveTime(time);
204
205    }
206
207    /**
208     * Return the number of milliseconds to keep threads alive waiting
209     * <p/>
210     * for new commands. A negative value means to wait forever. A zero
211     * <p/>
212     * value means not to wait at all.
213     */

214
215    public long getKeepAliveTime()
216    {
217
218       return _pooledExecutor.getKeepAliveTime();
219
220    }
221
222    /**
223     * Return the current number of active threads in the pool. This
224     * <p/>
225     * number is just a snaphot, and may change immediately upon
226     * <p/>
227     * returning
228     */

229
230    public long getPoolSize()
231    {
232
233       return _pooledExecutor.getPoolSize();
234
235    }
236
237    /**
238     * Return the response from an asynchronous task
239     * <p/>
240     * The call returns within the timeout defined
241     * <p/>
242     * in the process method
243     */

244
245    public ThreadManagerResponse waitForResponse(AsynchronousTask input)
246    {
247
248       AsynchronousTask[] tTask = {input};
249
250       return waitForResponses(tTask)[0];
251
252    }
253
254    /**
255     * Return an array of responses from an array of asynchronous task
256     * <p/>
257     * The call returns within the maximum timeout from the array of tasks
258     */

259
260    public ThreadManagerResponse[] waitForResponses(AsynchronousTask[] inputImpl)
261    {
262
263       if (inputImpl == null)
264       {
265
266          System.err.println("PPMImpl:waitForResponses NULL PARAMETER");
267
268          return null;
269
270       }
271
272       ThreadManagerResponse[] response =
273
274       new ThreadManagerResponseImpl[inputImpl.length];
275
276       for (int i = 0; i < inputImpl.length; i++)
277       {
278
279          AsynchronousTask fr = inputImpl[i];
280
281          response[i] = fr.getResponse();
282
283       }
284
285       return response;
286
287    }
288
289    public AsynchronousTask process(ThreadManagerRequest ppmRequest)
290    {
291
292       return process(ppmRequest.getId(),
293
294       ppmRequest.getTaskClassImpl(),
295
296       ppmRequest.getInputParameters(),
297
298       ppmRequest.getTimeout());
299
300    }
301
302    /**
303     * Create, start and return a new asynchronous task from :
304     * <p/>
305     * <p/>
306     * <p/>
307     * <p><b>taskImpl</b> class instance defining the task to process
308     * <p/>
309     * <p><b>inputParametersImpl</b> class instance defining the input parameters
310     * <p/>
311     * <p><b>timeout</b> defined the given time limit to process the task
312     */

313
314    private AsynchronousTask process(String JavaDoc id,
315
316                                     AsynchronousUserTask taskImpl,
317
318                                     AsynchronousParameters inputParametersImpl,
319
320                                     long timeout)
321    {
322
323       try
324       {
325
326
327          if (this.getPoolSize() + 1 > this.getMaximumPoolSize())
328
329             System.err.println("process: The maximum pool size defined at "
330
331             + this.getMaximumPoolSize()
332
333             + " is reached before processing task["
334
335             + id
336
337             + "] !");
338
339          org.jboss.aspects.asynchronous.concurrent.AsynchronousTask ft =
340
341          new AsynchronousTaskImpl(id,
342
343          taskImpl,
344
345          inputParametersImpl,
346
347          timeout);
348
349          Runnable JavaDoc cmd = ft.add();
350
351          if (isPooling())
352
353             _pooledExecutor.execute(cmd);
354
355          else
356          {
357
358             Thread JavaDoc thread = getThreadFactory().newThread(cmd);
359
360             thread.start();
361
362          }
363
364          Thread.yield();
365
366          Thread.sleep(0);
367
368          Thread.yield();
369
370          return ft;
371
372       }
373       catch (Exception JavaDoc e)
374       {
375
376
377          return new AsynchronousEmptyTask(id,
378
379          AsynchronousConstants.CAN_NOT_PROCESS,
380
381          e,
382
383          e.getMessage(),
384
385          System.currentTimeMillis());
386
387       }
388
389    }
390
391    public boolean isPooling()
392    {
393
394       return isPooling;
395
396    }
397
398    public void setPooling(boolean isPooling)
399    {
400
401       this.isPooling = isPooling;
402
403    }
404
405 }
406
407
Popular Tags