KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > web > connector > grizzly > ThreadPoolExecutorPipeline


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23 package com.sun.enterprise.web.connector.grizzly;
24
25 import java.util.concurrent.ArrayBlockingQueue JavaDoc;
26 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
27 import java.util.concurrent.RejectedExecutionHandler JavaDoc;
28 import java.util.concurrent.TimeUnit JavaDoc;
29
30
31 /**
32  * A wrapper around an <code>ThreadPoolExecutor</code>. This thread pool
33  * is bounded by an <code>ArrayBlockingQueue</code>
34  *
35  * @author Jean-Francois Arcand
36  */

37 public class ThreadPoolExecutorPipeline implements Pipeline,
38                                                    RejectedExecutionHandler JavaDoc{
39     
40
41     /**
42      * The number of thread waiting for a <code>Task</code>
43      */

44     private int waitingThreads = 0;
45     
46     
47     /**
48      * The maximum number of Thread
49      */

50     private int maxThreads = 20;
51     
52
53     /**
54      * The minimum numbers of <code>WorkerThread</code>
55      */

56     private int minThreads = 10;
57
58     
59     /**
60      * The port used.
61      */

62     private int port = 8080;
63     
64
65     /**
66      * The number of <code>WorkerThread</code>
67      */

68     private int threadCount =0;
69     
70
71     /**
72      * The name of this Pipeline
73      */

74     private String JavaDoc name;
75     
76     
77     /**
78      * The Thread Priority
79      */

80     private int priority = Thread.NORM_PRIORITY;
81     
82     
83     /**
84      * Has the pipeline already started
85      */

86     private boolean isStarted = false;
87     
88     
89     /**
90      * <code>ExecutorService</code> wrapped by this pipeline.
91      */

92     private ThreadPoolExecutor JavaDoc workerThreads;
93     
94    
95     /**
96      * Connection queue
97      */

98     private ArrayBlockingQueue JavaDoc<Runnable JavaDoc> arrayBlockingQueue;
99     
100     
101     /**
102      * Maximum pending connection before refusing requests.
103      */

104     private int maxQueueSizeInBytes = -1;
105     
106     
107     /**
108      * maximum size of the connection queue, in bytes.
109      */

110     private int queueSizeInBytes = 4096;
111     
112     
113     /**
114      * The <code>PipelineStatistic</code> objects used when gathering statistics.
115      */

116     protected PipelineStatistic pipelineStat;
117     // ------------------------------------------------ Lifecycle ------------/
118

119     /**
120      * Init the <code>Pipeline</code> by initializing the required
121      * <code>ThreadPoolExecutor</code>.
122      */

123     public void initPipeline(){
124         
125         if (isStarted){
126             return;
127         }
128         isStarted = true;
129         arrayBlockingQueue =
130                         new ArrayBlockingQueue JavaDoc<Runnable JavaDoc>(maxQueueSizeInBytes, true);
131         
132         workerThreads = new ThreadPoolExecutor JavaDoc(
133                                maxThreads,
134                                maxThreads,
135                                0L,
136                                TimeUnit.MILLISECONDS,
137                                arrayBlockingQueue,
138                                new GrizzlyThreadFactory(name,port,priority),
139                                this);
140     }
141
142     /**
143      * Start the <code>Pipeline</code>
144      */

145     public void startPipeline(){
146         if (isStarted){
147             return;
148         }
149         ; // Do nothing
150
}
151     
152
153     /**
154      * Stop the <code>Pipeline</code>
155      */

156     public void stopPipeline(){
157         if (!isStarted){
158             return;
159         }
160         isStarted = false;
161         workerThreads.shutdown();
162     }
163     
164     // ---------------------------------------------------- Queue ------------//
165

166     
167     /**
168      * Add an object to this pipeline
169      */

170     public void addTask(Task task){
171         if (workerThreads.getQueue().size() > maxQueueSizeInBytes ){
172             task.cancelTask("Maximum Connections Reached: "
173                             + pipelineStat.getQueueSizeInBytes()
174                             + " -- Retry later", HtmlHelper.OK);
175             task.getSelectorThread().returnTask(task);
176             return;
177         }
178         workerThreads.execute((Runnable JavaDoc)task);
179                 
180         if ( pipelineStat != null) {
181             pipelineStat.gather(size());
182         }
183     }
184
185
186     /**
187      * Return a <code>Task</code> object available in the pipeline.
188      *
189      */

190     public Task getTask() {
191         return null;
192     }
193
194     
195     /**
196      * Returns the number of tasks in this <code>Pipeline</code>.
197      *
198      * @return Number of tasks in this <code>Pipeline</code>.
199      */

200     public int size() {
201         return workerThreads.getQueue().size();
202     }
203
204     
205     /**
206      * Interrupt the <code>Thread</code> using it thread id
207      */

208     public boolean interruptThread(long threadID){
209         return ((GrizzlyThreadFactory)workerThreads.getThreadFactory())
210             .interruptThread(threadID);
211     }
212     // --------------------------------------------------Properties ----------//
213

214     /**
215      * Return the number of waiting threads.
216      */

217     public int getWaitingThread(){
218         return workerThreads.getPoolSize() - workerThreads.getActiveCount();
219     }
220     
221     
222     /**
223      * Set the number of threads used by this pipeline.
224      */

225     public void setMaxThreads(int maxThreads){
226         this.maxThreads = maxThreads;
227     }
228     
229     
230     /**
231      * Return the number of threads used by this pipeline.
232      */

233     public int getMaxThreads(){
234         return maxThreads;
235     }
236     
237     
238     /**
239      * Return the current number of threads used.
240      */

241     public int getCurrentThreadCount() {
242         return workerThreads.getPoolSize() ;
243     }
244       
245       
246     /**
247      * Return the curent number of threads that are currently processing
248      * a task.
249      */

250     public int getCurrentThreadsBusy(){
251         return workerThreads.getActiveCount();
252     }
253     
254     
255     /**
256      * Return the maximum spare thread.
257      */

258     public int getMaxSpareThreads() {
259         return getWaitingThread();
260     }
261     
262     
263     /**
264      * Set the thread priority of the <code>Pipeline</code>
265      */

266     public void setPriority(int priority){
267         this.priority = priority;
268     }
269     
270     
271     /**
272      * Set the name of this <code>Pipeline</code>
273      */

274     public void setName(String JavaDoc name){
275         this.name = name;
276     }
277     
278     
279     /**
280      * Return the name of this <code>Pipeline</code>
281      * @return the name of this <code>Pipeline</code>
282      */

283     public String JavaDoc getName(){
284         return name+port;
285     }
286
287     
288     /**
289      * Set the port used by this <code>Pipeline</code>
290      * @param port the port used by this <code>Pipeline</code>
291      */

292     public void setPort(int port){
293         this.port = port;
294     }
295     
296     
297     /**
298      * Set the minimum thread this <code>Pipeline</code> will creates
299      * when initializing.
300      * @param minThreads the minimum number of threads.
301      */

302     public void setMinThreads(int minThreads){
303         this.minThreads = minThreads;
304     }
305     
306     
307      /**
308      * Set the maximum pending connection this <code>Pipeline</code>
309      * can handle.
310      */

311     public void setQueueSizeInBytes(int maxQueueSizeInBytes){
312         this.maxQueueSizeInBytes = maxQueueSizeInBytes;
313         if ( pipelineStat != null )
314             pipelineStat.setQueueSizeInBytes(maxQueueSizeInBytes);
315     }
316     
317     
318     /**
319      * Get the maximum pending connection this <code>Pipeline</code>
320      * can handle.
321      */

322     public int getQueueSizeInBytes(){
323         return maxQueueSizeInBytes;
324     }
325     
326     
327     public String JavaDoc toString(){
328        return "name: " + name + " maxThreads: " + maxThreads
329                 + " minThreads:" + minThreads;
330     }
331
332
333     /**
334      * When the <code>maxQueueSizeInBytesConnection</code> is reached,
335      * terminate <code>Task</code>
336      */

337     public void rejectedExecution(Runnable JavaDoc r, ThreadPoolExecutor JavaDoc executor){
338         Task task = (Task)r;
339         task.cancelTask("Maximum Connections Reached -- Retry later",
340                         HtmlHelper.OK);
341         task.getSelectorThread().returnTask(task);
342     }
343     
344     
345     public void setThreadsIncrement(int threadsIncrement){
346         ; // Not Supported
347
}
348     
349     
350     public void setThreadsTimeout(int threadsTimeout){
351         ; // Not Supported
352
}
353
354
355      /**
356      * Return the minimum spare thread.
357      */

358     public int getMinSpareThreads() {
359         return 0;
360     }
361
362
363     /**
364      * Set the minimum space thread this <code>Pipeline</code> can handle.
365      */

366     public void setMinSpareThreads(int minSpareThreads) {
367     }
368     
369     
370     /**
371      * Set the <code>PipelineStatistic</code> object used
372      * to gather statistic;
373      */

374     public void setPipelineStatistic(PipelineStatistic pipelineStatistic){
375         this.pipelineStat = pipelineStatistic;
376     }
377     
378     
379     /**
380      * Return the <code>PipelineStatistic</code> object used
381      * to gather statistic;
382      */

383     public PipelineStatistic getPipelineStatistic(){
384         return pipelineStat;
385     }
386 }
387
Popular Tags