KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > web > connector > grizzly > LinkedListPipeline


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23 package com.sun.enterprise.web.connector.grizzly;
24
25 import java.util.LinkedList JavaDoc;
26 import java.util.NoSuchElementException JavaDoc;
27
28 /**
29  * Internal FIFO used by the Worker Threads to pass information
30  * between <code>Task</code> objects.
31  *
32  * @author Jean-Francois Arcand
33  */

34 public class LinkedListPipeline extends LinkedList JavaDoc implements Pipeline{
35
36
37     /**
38      * The number of thread waiting for a <code>Task</code>
39      */

40     protected int waitingThreads = 0;
41     
42     
43     /**
44      * The maximum number of Thread
45      */

46     protected int maxThreads = 20;
47     
48
49     /**
50      * The minimum numbers of <code>WorkerThread</code>
51      */

52     protected int minThreads = 5;
53                                                                                 
54     /**
55      * The minimum numbers of spare <code>WorkerThread</code>
56      */

57     protected int minSpareThreads = 2;
58
59
60     /**
61      * The port used.
62      */

63     protected int port = 8080;
64     
65
66     /**
67      * The number of <code>??WorkerThread</code>
68      */

69     protected int threadCount = 0;
70     
71
72     /**
73      * The name of this Pipeline
74      */

75     protected String JavaDoc name;
76     
77     
78     /**
79      * The Thread Priority
80      */

81     protected int priority = Thread.NORM_PRIORITY;
82     
83     
84     /**
85      * Has the pipeline already started
86      */

87     protected boolean isStarted = false;
88     
89
90     /**
91      * <code>WorkerThread</code> amanged by this pipeline.
92      */

93     protected WorkerThread[] workerThreads;
94     
95     
96     /**
97      * Maximum pending connection before refusing requests.
98      */

99     protected int maxQueueSizeInBytes = -1;
100     
101     
102     /**
103      * The increment number used when adding new thread.
104      */

105     protected int threadsIncrement = 1;
106     
107     
108     /**
109      * The request times out during transaction.
110      */

111     protected int threadsTimeout = Constants.DEFAULT_TIMEOUT;
112         
113     
114     /**
115      * The <code>PipelineStatistic</code> objects used when gathering statistics.
116      */

117     protected PipelineStatistic pipelineStat;
118     
119     // ------------------------------------------------------- Constructor -----/
120

121     public LinkedListPipeline(){
122         super();
123     }
124     
125     public LinkedListPipeline(int maxThreads, int minThreads, String JavaDoc name,
126                               int port, int priority){
127                         
128         this.maxThreads = maxThreads;
129         this.port = port;
130         this.name = name;
131         this.minThreads = minThreads;
132         this.priority = priority;
133         
134         if ( minThreads < minSpareThreads )
135             minSpareThreads = minThreads;
136         
137     }
138
139     
140     public LinkedListPipeline(int maxThreads, int minThreads, String JavaDoc name,
141                               int port){
142                         
143         this(maxThreads,minThreads,name,port,Thread.NORM_PRIORITY);
144     }
145
146    
147     // ------------------------------------------------ Lifecycle ------------/
148

149     /**
150      * Init the <code>Pipeline</code> by initializing the required
151      * <code>WorkerThread</code>. Default value is 10
152      */

153     public void initPipeline(){
154         
155         if (minThreads > maxThreads) {
156             minThreads = maxThreads;
157         }
158         
159         workerThreads = new WorkerThread[maxThreads];
160         increaseWorkerThread(minThreads, false);
161         
162    }
163
164     
165     /**
166      * Start the <code>Pipeline</code> and all associated
167      * <code>WorkerThread</code>
168      */

169     public void startPipeline(){
170         if (!isStarted) {
171             for (int i=0; i < minThreads; i++){
172                 workerThreads[i].start();
173             }
174             isStarted = true;
175         }
176     }
177     
178
179     /**
180      * Stop the <code>Pipeline</code> and all associated
181      * <code>WorkerThread</code>
182      */

183     public void stopPipeline(){
184         if (isStarted) {
185             for (int i=0; i < threadCount; i++){
186                 workerThreads[i].terminate();
187             }
188             isStarted = false;
189         }
190     }
191
192
193     /**
194      * Create new <code>WorkerThread</code>
195      */

196     protected void increaseWorkerThread(int increment, boolean startThread){
197         WorkerThread workerThread;
198         int currentCount = threadCount;
199         int increaseCount = threadCount + increment;
200         for (int i=currentCount; i < increaseCount; i++){
201             workerThread = new WorkerThread(this,
202                     name + "WorkerThread-" + port + "-" + i);
203             workerThread.setPriority(priority);
204             
205             if (startThread)
206                 workerThread.start();
207             
208             workerThreads[i] = workerThread;
209             threadCount++;
210         }
211     }
212     
213     
214     /**
215      * Interrupt the <code>Thread</code> using it thread id
216      */

217     public boolean interruptThread(long threadID){
218         ThreadGroup JavaDoc threadGroup = workerThreads[0].getThreadGroup();
219         Thread JavaDoc[] threads = new Thread JavaDoc[threadGroup.activeCount()];
220         threadGroup.enumerate(threads);
221         
222         for (Thread JavaDoc thread: threads){
223             if ( thread != null && thread.getId() == threadID ){
224                 if ( Thread.State.RUNNABLE != thread.getState()){
225                     try{
226                         thread.interrupt();
227                         return true;
228                     } catch (Throwable JavaDoc t){
229                         ; // Swallow any exceptions.
230
}
231                 }
232             }
233         }
234         return false;
235     }
236     
237     
238     // ---------------------------------------------------- Queue ------------//
239

240     
241     /**
242      * Add an object to this pipeline
243      */

244     public synchronized void addTask(Task task) {
245         
246         boolean rejected = false;
247         int queueSize = size();
248         if ( maxQueueSizeInBytes != -1 && maxQueueSizeInBytes < queueSize){
249             task.cancelTask("Maximum Connections Reached: "
250                 + maxQueueSizeInBytes + " -- Retry later",
251                     HtmlHelper.OK);
252             task.getSelectorThread().returnTask(task);
253             return;
254         }
255         
256         addLast(task);
257         notify();
258
259         // Create worker threads if we know we will run out of them
260
if (threadCount < maxThreads && waitingThreads < queueSize ){
261             int left = maxThreads - threadCount;
262             if (threadsIncrement > left){
263                 threadsIncrement = left;
264             }
265             increaseWorkerThread(threadsIncrement,true);
266         }
267     }
268
269
270     /**
271      * Return a <code>Task</code> object available in the pipeline.
272      * All Threads will synchronize on that method
273      */

274     public synchronized Task getTask() {
275         if ( isEmpty() ) {
276             
277             try {
278                 waitingThreads++;
279                 wait();
280
281             } catch (InterruptedException JavaDoc e) {
282                 Thread.interrupted();
283             }
284             waitingThreads--;
285        
286         }
287         try {
288             return (Task)removeFirst();
289         } catch(NoSuchElementException JavaDoc e) {
290             return null;
291         } finally {
292             if ( pipelineStat != null) {
293                 pipelineStat.gather(size());
294             }
295         }
296     }
297
298     
299     /**
300      * Return <code>true</code> if the size of this <code>ArrayList</code>
301      * minus the current waiting threads is lower than zero.
302      */

303     public boolean isEmpty() {
304         return (size() - waitingThreads <= 0);
305     }
306     
307     // --------------------------------------------------Properties ----------//
308

309     /**
310      * Return the number of waiting threads.
311      */

312     public int getWaitingThread(){
313         return waitingThreads;
314     }
315     
316     
317     /**
318      * Set the number of threads used by this pipeline.
319      */

320     public void setMaxThreads(int maxThreads){
321         this.maxThreads = maxThreads;
322     }
323     
324     
325     /**
326      * Return the number of threads used by this pipeline.
327      */

328     public int getMaxThreads(){
329         return maxThreads;
330     }
331     
332     
333     public int getCurrentThreadCount() {
334         return threadCount;
335     }
336       
337       
338     /**
339      * Return the curent number of threads that are currently processing
340      * a task.
341      */

342     public int getCurrentThreadsBusy(){
343         return (threadCount - waitingThreads);
344     }
345         
346
347     /**
348      * Return the maximum spare thread.
349      */

350     public int getMaxSpareThreads() {
351         return maxThreads;
352     }
353
354
355     /**
356      * Return the minimum spare thread.
357      */

358     public int getMinSpareThreads() {
359         return minSpareThreads;
360     }
361
362
363     /**
364      * Set the minimum space thread this <code>Pipeline</code> can handle.
365      */

366     public void setMinSpareThreads(int minSpareThreads) {
367         this.minSpareThreads = minSpareThreads;
368     }
369
370     
371     /**
372      * Set the thread priority of the <code>Pipeline</code>
373      */

374     public void setPriority(int priority){
375         this.priority = priority;
376     }
377     
378     
379     /**
380      * Set the name of this <code>Pipeline</code>
381      */

382     public void setName(String JavaDoc name){
383         this.name = name;
384     }
385     
386     
387     /**
388      * Return the name of this <code>Pipeline</code>
389      * @return the name of this <code>Pipeline</code>
390      */

391     public String JavaDoc getName(){
392         return name+port;
393     }
394
395     
396     /**
397      * Set the port used by this <code>Pipeline</code>
398      * @param port the port used by this <code>Pipeline</code>
399      */

400     public void setPort(int port){
401         this.port = port;
402     }
403     
404     
405     /**
406      * Set the minimum thread this <code>Pipeline</code> will creates
407      * when initializing.
408      * @param minThreads the minimum number of threads.
409      */

410     public void setMinThreads(int minThreads){
411         this.minThreads = minThreads;
412     }
413     
414     
415     public String JavaDoc toString(){
416        return "name: " + name + " maxThreads: " + maxThreads
417                 + " minThreads:" + minThreads;
418     }
419     
420     
421     /**
422      * Set the number the <code>Pipeline</code> will use when increasing the
423      * thread pool
424      */

425     public void setThreadsIncrement(int threadsIncrement){
426         this.threadsIncrement = threadsIncrement;
427     }
428     
429     
430     /**
431      * Set the timeout value a thread will use to times out the request.
432      */

433     public void setThreadsTimeout(int threadsTimeout){
434         this.threadsTimeout = threadsTimeout;
435     }
436     
437     
438     /**
439      * The number of <code>Task</code> currently queued
440      * @return number of queued connections
441      */

442     public int getTaskQueuedCount(){
443        return size();
444     }
445
446     
447     /**
448      * Set the maximum pending connection this <code>Pipeline</code>
449      * can handle.
450      */

451     public void setQueueSizeInBytes(int maxQueueSizeInBytesCount){
452         this.maxQueueSizeInBytes = maxQueueSizeInBytesCount;
453     }
454     
455     
456     /**
457      * Get the maximum pending connection this <code>Pipeline</code>
458      * can handle.
459      */

460     public int getQueueSizeInBytes(){
461         return maxQueueSizeInBytes;
462     }
463   
464     
465     /**
466      * Set the <code>PipelineStatistic</code> object used
467      * to gather statistic;
468      */

469     public void setPipelineStatistic(PipelineStatistic pipelineStatistic){
470         this.pipelineStat = pipelineStatistic;
471     }
472     
473     
474     /**
475      * Return the <code>PipelineStatistic</code> object used
476      * to gather statistic;
477      */

478     public PipelineStatistic getPipelineStatistic(){
479         return pipelineStat;
480     }
481     
482 }
483
Popular Tags