KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > corba > se > impl > orbutil > threadpool > ThreadPoolImpl


1 /*
2  * Copyright 2005 Sun Microsystems, Inc. All rights reserved.
3  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
4  */

5
6 package com.sun.corba.se.impl.orbutil.threadpool;
7
8 import java.security.AccessController JavaDoc;
9 import java.security.PrivilegedAction JavaDoc;
10
11 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
12 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
13 import com.sun.corba.se.spi.orbutil.threadpool.Work;
14 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
15
16 import com.sun.corba.se.impl.orbutil.ORBConstants;
17 import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
18
19 import com.sun.corba.se.spi.monitoring.MonitoringConstants;
20 import com.sun.corba.se.spi.monitoring.MonitoredObject;
21 import com.sun.corba.se.spi.monitoring.MonitoringFactories;
22 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
23
24 public class ThreadPoolImpl implements ThreadPool
25 {
26     private static int threadCounter = 0; // serial counter useful for debugging
27

28     private WorkQueue workQueue;
29     
30     // Stores the number of available worker threads
31
private int availableWorkerThreads = 0;
32     
33     // Stores the number of threads in the threadpool currently
34
private int currentThreadCount = 0;
35     
36     // Minimum number of worker threads created at instantiation of the threadpool
37
private int minWorkerThreads = 0;
38     
39     // Maximum number of worker threads in the threadpool
40
private int maxWorkerThreads = 0;
41     
42     // Inactivity timeout value for worker threads to exit and stop running
43
private long inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT ;
44     
45     // Indicates if the threadpool is bounded or unbounded
46
private boolean boundedThreadPool = false;
47     
48     // Running count of the work items processed
49
// Set the value to 1 so that divide by zero is avoided in
50
// averageWorkCompletionTime()
51
private long processedCount = 1;
52     
53     // Running aggregate of the time taken in millis to execute work items
54
// processed by the threads in the threadpool
55
private long totalTimeTaken = 0;
56
57     // Lock for protecting state when required
58
private Object JavaDoc lock = new Object JavaDoc();
59
60     // Name of the ThreadPool
61
private String JavaDoc name;
62
63     // MonitoredObject for ThreadPool
64
private MonitoredObject threadpoolMonitoredObject;
65     
66     // ThreadGroup in which threads should be created
67
private final ThreadGroup JavaDoc threadGroup ;
68
69     /**
70      * This constructor is used to create an unbounded threadpool
71      */

72     public ThreadPoolImpl(ThreadGroup JavaDoc tg, String JavaDoc threadpoolName) {
73         maxWorkerThreads = Integer.MAX_VALUE;
74         workQueue = new WorkQueueImpl(this);
75     threadGroup = tg ;
76     name = threadpoolName;
77     initializeMonitoring();
78     }
79  
80     /**
81      * This constructor is used to create an unbounded threadpool
82      * in the ThreadGroup of the current thread
83      */

84     public ThreadPoolImpl(String JavaDoc threadpoolName) {
85     this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
86     }
87
88     /**
89      * This constructor is used to create bounded threadpool
90      */

91     public ThreadPoolImpl(int minSize, int maxSize, long timeout,
92                         String JavaDoc threadpoolName)
93     {
94         inactivityTimeout = timeout;
95         minWorkerThreads = minSize;
96         maxWorkerThreads = maxSize;
97         boundedThreadPool = true;
98         workQueue = new WorkQueueImpl(this);
99     threadGroup = Thread.currentThread().getThreadGroup() ;
100     name = threadpoolName;
101         for (int i = 0; i < minWorkerThreads; i++) {
102             createWorkerThread();
103         }
104     initializeMonitoring();
105     }
106
107     // Setup monitoring for this threadpool
108
private void initializeMonitoring() {
109     // Get root monitored object
110
MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
111         createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
112         getRootMonitoredObject();
113
114     // Create the threadpool monitoring root
115
MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
116             MonitoringConstants.THREADPOOL_MONITORING_ROOT);
117     if (threadPoolMonitoringObjectRoot == null) {
118         threadPoolMonitoringObjectRoot = MonitoringFactories.
119             getMonitoredObjectFactory().createMonitoredObject(
120             MonitoringConstants.THREADPOOL_MONITORING_ROOT,
121             MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
122         root.addChild(threadPoolMonitoringObjectRoot);
123     }
124     threadpoolMonitoredObject = MonitoringFactories.
125             getMonitoredObjectFactory().
126             createMonitoredObject(name,
127             MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
128
129     threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
130
131     LongMonitoredAttributeBase b1 = new
132         LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
133             MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
134         public Object JavaDoc getValue() {
135             return new Long JavaDoc(ThreadPoolImpl.this.currentNumberOfThreads());
136         }
137         };
138     threadpoolMonitoredObject.addAttribute(b1);
139     LongMonitoredAttributeBase b2 = new
140         LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
141             MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
142         public Object JavaDoc getValue() {
143             return new Long JavaDoc(ThreadPoolImpl.this.numberOfAvailableThreads());
144         }
145         };
146     threadpoolMonitoredObject.addAttribute(b2);
147     LongMonitoredAttributeBase b3 = new
148         LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
149             MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
150         public Object JavaDoc getValue() {
151             return new Long JavaDoc(ThreadPoolImpl.this.numberOfBusyThreads());
152         }
153         };
154     threadpoolMonitoredObject.addAttribute(b3);
155     LongMonitoredAttributeBase b4 = new
156         LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
157             MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
158         public Object JavaDoc getValue() {
159             return new Long JavaDoc(ThreadPoolImpl.this.averageWorkCompletionTime());
160         }
161         };
162     threadpoolMonitoredObject.addAttribute(b4);
163     LongMonitoredAttributeBase b5 = new
164         LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
165             MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
166         public Object JavaDoc getValue() {
167             return new Long JavaDoc(ThreadPoolImpl.this.currentProcessedCount());
168         }
169         };
170     threadpoolMonitoredObject.addAttribute(b5);
171
172     // Add the monitored object for the WorkQueue
173

174     threadpoolMonitoredObject.addChild(
175         ((WorkQueueImpl)workQueue).getMonitoredObject());
176     }
177
178     // Package private method to get the monitored object for this
179
// class
180
MonitoredObject getMonitoredObject() {
181     return threadpoolMonitoredObject;
182     }
183     
184     public WorkQueue getAnyWorkQueue()
185     {
186     return workQueue;
187     }
188
189     public WorkQueue getWorkQueue(int queueId)
190     throws NoSuchWorkQueueException
191     {
192     if (queueId != 0)
193         throw new NoSuchWorkQueueException();
194     return workQueue;
195     }
196
197     /**
198      * To be called from the workqueue when work is added to the
199      * workQueue. This method would create new threads if required
200      * or notify waiting threads on the queue for available work
201      */

202     void notifyForAvailableWork(WorkQueue aWorkQueue) {
203     synchronized (lock) {
204         if (availableWorkerThreads == 0) {
205         createWorkerThread();
206         } else {
207         aWorkQueue.notify();
208         }
209     }
210     }
211     
212
213     /**
214      * To be called from the workqueue to create worker threads when none
215      * available.
216      */

217     void createWorkerThread() {
218     synchronized (lock) {
219         final String JavaDoc name = getName() ;
220           
221         if (boundedThreadPool) {
222         if (currentThreadCount < maxWorkerThreads) {
223             currentThreadCount++;
224         } else {
225             // REVIST - Need to create a thread to monitor the
226
// the state for deadlock i.e. all threads waiting for
227
// something which can be got from the item in the
228
// workqueue, but there is no thread available to
229
// process that work item - DEADLOCK !!
230
return;
231         }
232         } else {
233         currentThreadCount++;
234         }
235
236         // If we get here, we need to create a thread.
237
AccessController.doPrivileged(
238         new PrivilegedAction JavaDoc() {
239             public Object JavaDoc run() {
240             // Thread creation needs to be in a doPrivileged block
241
// for two reasons:
242
// 1. The creation of a thread in a specific ThreadGroup
243
// is a privileged operation. Lack of a doPrivileged
244
// block here causes an AccessControlException
245
// (see bug 6268145).
246
// 2. We want to make sure that the permissions associated
247
// with this thread do NOT include the permissions of
248
// the current thread that is calling this method.
249
// This leads to problems in the app server where
250
// some threads in the ThreadPool randomly get
251
// bad permissions, leading to unpredictable
252
// permission errors.
253
WorkerThread thread = new WorkerThread(threadGroup, name);
254                 
255             // The thread must be set to a daemon thread so the
256
// VM can exit if the only threads left are PooledThreads
257
// or other daemons. We don't want to rely on the
258
// calling thread always being a daemon.
259
// Note that no exception is possible here since we
260
// are inside the doPrivileged block.
261
thread.setDaemon(true);
262
263             thread.start();
264             
265             return null ;
266             }
267         }
268         ) ;
269     }
270     }
271     
272     /**
273     * This method will return the minimum number of threads maintained
274     * by the threadpool.
275     */

276     public int minimumNumberOfThreads() {
277         return minWorkerThreads;
278     }
279     
280     /**
281     * This method will return the maximum number of threads in the
282     * threadpool at any point in time, for the life of the threadpool
283     */

284     public int maximumNumberOfThreads() {
285         return maxWorkerThreads;
286     }
287     
288     /**
289     * This method will return the time in milliseconds when idle
290     * threads in the threadpool are removed.
291     */

292     public long idleTimeoutForThreads() {
293         return inactivityTimeout;
294     }
295     
296     /**
297     * This method will return the total number of threads currently in the
298     * threadpool. This method returns a value which is not synchronized.
299     */

300     public int currentNumberOfThreads() {
301     synchronized (lock) {
302         return currentThreadCount;
303     }
304     }
305     
306     /**
307     * This method will return the number of available threads in the
308     * threadpool which are waiting for work. This method returns a
309     * value which is not synchronized.
310     */

311     public int numberOfAvailableThreads() {
312     synchronized (lock) {
313         return availableWorkerThreads;
314     }
315     }
316     
317     /**
318     * This method will return the number of busy threads in the threadpool
319     * This method returns a value which is not synchronized.
320     */

321     public int numberOfBusyThreads() {
322     synchronized (lock) {
323         return (currentThreadCount - availableWorkerThreads);
324     }
325     }
326     
327     /**
328      * This method returns the average elapsed time taken to complete a Work
329      * item in milliseconds.
330      */

331     public long averageWorkCompletionTime() {
332     synchronized (lock) {
333         return (totalTimeTaken / processedCount);
334     }
335     }
336     
337     /**
338      * This method returns the number of Work items processed by the threadpool
339      */

340     public long currentProcessedCount() {
341     synchronized (lock) {
342         return processedCount;
343     }
344     }
345
346     public String JavaDoc getName() {
347         return name;
348     }
349
350     /**
351     * This method will return the number of WorkQueues serviced by the threadpool.
352     */

353     public int numberOfWorkQueues() {
354         return 1;
355     }
356
357
358     private static synchronized int getUniqueThreadId() {
359         return ThreadPoolImpl.threadCounter++;
360     }
361
362
363     private class WorkerThread extends Thread JavaDoc
364     {
365         private Work currentWork;
366         private int threadId = 0; // unique id for the thread
367
// thread pool this WorkerThread belongs too
368
private String JavaDoc threadPoolName;
369     // name seen by Thread.getName()
370
private StringBuffer JavaDoc workerThreadName = new StringBuffer JavaDoc();
371
372         WorkerThread(ThreadGroup JavaDoc tg, String JavaDoc threadPoolName) {
373         super(tg, "Idle");
374         this.threadId = ThreadPoolImpl.getUniqueThreadId();
375             this.threadPoolName = threadPoolName;
376         setName(composeWorkerThreadName(threadPoolName, "Idle"));
377         }
378         
379         public void run() {
380             while (true) {
381                 try {
382
383             synchronized (lock) {
384             availableWorkerThreads++;
385             }
386                     
387                     // Get some work to do
388
currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);
389
390             synchronized (lock) {
391             availableWorkerThreads--;
392             // It is possible in notifyForAvailableWork that the
393
// check for availableWorkerThreads = 0 may return
394
// false, because the availableWorkerThreads has not been
395
// decremented to zero before the producer thread added
396
// work to the queue. This may create a deadlock, if the
397
// executing thread needs information which is in the work
398
// item queued in the workqueue, but has no thread to work
399
// on it since none was created because availableWorkerThreads = 0
400
// returned false.
401
// The following code will ensure that a thread is always available
402
// in those situations
403
if ((availableWorkerThreads == 0) &&
404                 (workQueue.workItemsInQueue() > 0)) {
405                 createWorkerThread();
406             }
407             }
408
409                     // Set the thread name for debugging.
410
setName(composeWorkerThreadName(threadPoolName,
411                       Integer.toString(this.threadId)));
412
413                     long start = System.currentTimeMillis();
414                     
415             try {
416             // Do the work
417
currentWork.doWork();
418             } catch (Throwable JavaDoc t) {
419             // Ignore all errors.
420
;
421             }
422                     
423                     long end = System.currentTimeMillis();
424                     
425
426             synchronized (lock) {
427             totalTimeTaken += (end - start);
428             processedCount++;
429             }
430
431             // set currentWork to null so that the work item can be
432
// garbage collected
433
currentWork = null;
434
435                 setName(composeWorkerThreadName(threadPoolName, "Idle"));
436
437                 } catch (TimeoutException e) {
438                     // This thread timed out waiting for something to do.
439

440             synchronized (lock) {
441             availableWorkerThreads--;
442
443             // This should for both bounded and unbounded case
444
if (currentThreadCount > minWorkerThreads) {
445                 currentThreadCount--;
446                 // This thread can exit.
447
return;
448             } else {
449                 // Go back to waiting on workQueue
450
continue;
451             }
452             }
453                 } catch (InterruptedException JavaDoc ie) {
454                     // InterruptedExceptions are
455
// caught here. Thus, threads can be forced out of
456
// requestWork and so they have to reacquire the lock.
457
// Other options include ignoring or
458
// letting this thread die.
459
// Ignoring for now. REVISIT
460
synchronized (lock) {
461             availableWorkerThreads--;
462             }
463
464                 } catch (Throwable JavaDoc e) {
465
466                     // Ignore any exceptions that currentWork.process
467
// accidently lets through, but let Errors pass.
468
// Add debugging output? REVISIT
469
synchronized (lock) {
470             availableWorkerThreads--;
471             }
472
473                 }
474             }
475         }
476
477     private String JavaDoc composeWorkerThreadName(String JavaDoc poolName, String JavaDoc workerName) {
478             workerThreadName.setLength(0);
479         workerThreadName.append("p: ").append(poolName);
480         workerThreadName.append("; w: ").append(workerName);
481         return workerThreadName.toString();
482     }
483     } // End of WorkerThread class
484

485 }
486
487 // End of file.
488
Popular Tags