KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > geronimo > pool > ThreadPool


1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.geronimo.pool;
19
20 import java.util.Collections JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.Map JavaDoc;
24
25 import javax.management.MalformedObjectNameException JavaDoc;
26 import javax.management.ObjectName JavaDoc;
27 import javax.management.j2ee.statistics.BoundedRangeStatistic JavaDoc;
28 import javax.management.j2ee.statistics.CountStatistic JavaDoc;
29 import javax.management.j2ee.statistics.Stats JavaDoc;
30
31 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
32 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
33 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
34 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
35 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
36 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
37
38 import org.apache.geronimo.gbean.GBeanInfo;
39 import org.apache.geronimo.gbean.GBeanInfoBuilder;
40 import org.apache.geronimo.gbean.GBeanLifecycle;
41
42 import org.apache.geronimo.management.J2EEManagedObject;
43 import org.apache.geronimo.management.StatisticsProvider;
44 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
45 import org.apache.geronimo.management.stats.BoundedRangeImpl;
46 import org.apache.geronimo.management.stats.CountStatisticImpl;
47 import org.apache.geronimo.management.stats.StatsImpl;
48
49 /**
50  * @version $Rev: 476049 $ $Date: 2006-11-16 23:35:17 -0500 (Thu, 16 Nov 2006) $
51  */

52 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
53     private ThreadPoolExecutor executor;
54     private ClassLoader JavaDoc classLoader;
55     private ObjectName JavaDoc objectName;
56     private boolean waitWhenBlocked;
57     
58     // Statistics-related fields follow
59
private boolean statsActive = true;
60     private PoolStatsImpl stats = new PoolStatsImpl();
61     private Map JavaDoc clients = new HashMap JavaDoc();
62
63     public ThreadPool(int poolSize, String JavaDoc poolName, long keepAliveTime, ClassLoader JavaDoc classLoader, String JavaDoc objectName) {
64         ThreadPoolExecutor p = new ThreadPoolExecutor(
65             poolSize, // core size
66
poolSize, // max size
67
keepAliveTime, TimeUnit.MILLISECONDS,
68             new SynchronousQueue());
69
70         p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
71         p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
72         
73         try {
74             this.objectName = ObjectName.getInstance(objectName);
75         } catch (MalformedObjectNameException JavaDoc e) {
76             throw new IllegalStateException JavaDoc("Bad object name injected: " + e.getMessage());
77         }
78
79         executor = p;
80         this.classLoader = classLoader;
81     }
82
83     public String JavaDoc getName() {
84         return objectName.getKeyProperty("name");
85     }
86
87     public String JavaDoc getObjectName() {
88         return objectName.getCanonicalName();
89     }
90
91     public boolean isEventProvider() {
92         return true;
93     }
94
95     public boolean isStateManageable() {
96         return true;
97     }
98
99     public boolean isStatisticsProvider() {
100         return true;
101     }
102
103     public Stats JavaDoc getStats() {
104         stats.threadsInUse.setLowerBound(0);
105         stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
106         int inUse = executor.getPoolSize();
107         stats.threadsInUse.setCurrent(inUse);
108         if (inUse < stats.threadsInUse.getLowWaterMark()) {
109             stats.threadsInUse.setLowWaterMark(inUse);
110         }
111         if (inUse > stats.threadsInUse.getHighWaterMark()) {
112             stats.threadsInUse.setHighWaterMark(inUse);
113         }
114         if (statsActive) {
115             synchronized (this) {
116                 stats.prepareConsumers(clients);
117             }
118         } else {
119             stats.prepareConsumers(Collections.EMPTY_MAP);
120         }
121         return stats;
122     }
123
124     public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
125         private BoundedRangeImpl threadsInUse = new BoundedRangeImpl("Threads In Use", "", "The number of threads in use by this thread pool");
126         private Map JavaDoc consumers = new HashMap JavaDoc();
127
128         public PoolStatsImpl() {
129             addStat(threadsInUse.getName(), threadsInUse);
130         }
131
132         public BoundedRangeStatistic JavaDoc getThreadsInUse() {
133             return threadsInUse;
134         }
135
136         public CountStatistic JavaDoc getCountForConsumer(String JavaDoc consumer) {
137             return (CountStatistic JavaDoc) consumers.get(consumer);
138         }
139
140         public String JavaDoc[] getThreadConsumers() {
141             return (String JavaDoc[]) consumers.keySet().toArray(new String JavaDoc[consumers.size()]);
142         }
143
144         public void prepareConsumers(Map JavaDoc clients) {
145             Map JavaDoc result = new HashMap JavaDoc();
146             for (Iterator JavaDoc it = clients.keySet().iterator(); it.hasNext();) {
147                 String JavaDoc client = (String JavaDoc) it.next();
148                 Integer JavaDoc count = (Integer JavaDoc) clients.get(client);
149                 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
150                 if (stat == null) {
151                     stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
152                     addStat(stat.getName(), stat);
153                 } else {
154                     consumers.remove(client);
155                     stat.setCount(count.intValue());
156                 }
157                 result.put(client, stat);
158             }
159             for (Iterator JavaDoc it = consumers.keySet().iterator(); it.hasNext();) {
160                 String JavaDoc client = (String JavaDoc) it.next();
161                 removeStat(((CountStatisticImpl) consumers.get(client)).getName());
162             }
163             consumers = result;
164         }
165     }
166
167
168     public int getPoolSize() {
169         return executor.getMaximumPoolSize();
170     }
171
172     public void execute(Runnable JavaDoc command) {
173         execute("Unknown", command);
174     }
175
176     public void execute(final String JavaDoc consumerName, final Runnable JavaDoc runnable) {
177         Runnable JavaDoc command;
178         if (statsActive) {
179             command = new Runnable JavaDoc() {
180                 public void run() {
181                     startWork(consumerName);
182                     try {
183                         runnable.run();
184                     } finally {
185                         finishWork(consumerName);
186                     }
187                 }
188             };
189         } else {
190             command = runnable;
191         }
192
193         ThreadPoolExecutor p;
194         synchronized (this) {
195             p = executor;
196         }
197         if (p == null) {
198             throw new IllegalStateException JavaDoc("ThreadPool has been stopped");
199         }
200         Runnable JavaDoc task = new ContextClassLoaderRunnable(command, classLoader);
201         p.execute(task);
202     }
203
204     private synchronized void startWork(String JavaDoc consumerName) {
205         Integer JavaDoc test = (Integer JavaDoc) clients.get(consumerName);
206         if (test == null) {
207             clients.put(consumerName, new Integer JavaDoc(1));
208         } else {
209             clients.put(consumerName, new Integer JavaDoc(test.intValue() + 1));
210         }
211     }
212
213     private synchronized void finishWork(String JavaDoc consumerName) {
214         Integer JavaDoc test = (Integer JavaDoc) clients.get(consumerName);
215         if (test.intValue() == 1) {
216             clients.remove(consumerName);
217         } else {
218             clients.put(consumerName, new Integer JavaDoc(test.intValue() - 1));
219         }
220     }
221     
222     private static class WaitWhenBlockedPolicy
223         implements RejectedExecutionHandler
224     {
225         public void rejectedExecution(Runnable JavaDoc r, ThreadPoolExecutor executor) throws RejectedExecutionException {
226             try {
227                 executor.getQueue().put(r);
228             }
229             catch (InterruptedException JavaDoc e) {
230                 throw new RejectedExecutionException(e);
231             }
232         }
233     }
234     
235     public void setWaitWhenBlocked(boolean wait) {
236         waitWhenBlocked = wait;
237         if(wait) {
238             executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
239         } else {
240             executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
241         }
242     }
243
244     public boolean isWaitWhenBlocked() {
245         return waitWhenBlocked;
246     }
247
248     public void doStart() throws Exception JavaDoc {
249     }
250
251     public void doStop() throws Exception JavaDoc {
252         ThreadPoolExecutor p;
253         synchronized (this) {
254             p = executor;
255             executor = null;
256             classLoader = null;
257         }
258         if (p != null) {
259             p.shutdownNow();
260         }
261     }
262
263     public void doFail() {
264         try {
265             doStop();
266         } catch (Exception JavaDoc e) {
267         }
268     }
269
270     private static final class ThreadPoolThreadFactory implements ThreadFactory {
271         private final String JavaDoc poolName;
272         private final ClassLoader JavaDoc classLoader;
273
274         private int nextWorkerID = 0;
275
276         public ThreadPoolThreadFactory(String JavaDoc poolName, ClassLoader JavaDoc classLoader) {
277             this.poolName = poolName;
278             this.classLoader = classLoader;
279         }
280
281         public Thread JavaDoc newThread(Runnable JavaDoc arg0) {
282             Thread JavaDoc thread = new Thread JavaDoc(arg0, poolName + " " + getNextWorkerID());
283             thread.setContextClassLoader(classLoader);
284             return thread;
285         }
286
287         private synchronized int getNextWorkerID() {
288             return nextWorkerID++;
289         }
290     }
291
292     private static final class ContextClassLoaderRunnable implements Runnable JavaDoc {
293         private Runnable JavaDoc task;
294         private ClassLoader JavaDoc classLoader;
295
296         public ContextClassLoaderRunnable(Runnable JavaDoc task, ClassLoader JavaDoc classLoader) {
297             this.task = task;
298             this.classLoader = classLoader;
299         }
300
301         public void run() {
302             Runnable JavaDoc myTask = task;
303             ClassLoader JavaDoc myClassLoader = classLoader;
304
305             // clear fields so they can be garbage collected
306
task = null;
307             classLoader = null;
308
309             if (myClassLoader != null) {
310                 // we asumme the thread classloader is already set to our final class loader
311
// because the only to access the thread is wrapped with the Runnable or via the initial thread pool
312
try {
313                     myTask.run();
314                 } finally {
315                     Thread.currentThread().setContextClassLoader(myClassLoader);
316                 }
317             }
318         }
319     }
320
321     public static final GBeanInfo GBEAN_INFO;
322
323     static {
324         GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class);
325
326         infoFactory.addAttribute("poolSize", int.class, true);
327         infoFactory.addAttribute("poolName", String JavaDoc.class, true);
328         infoFactory.addAttribute("keepAliveTime", long.class, true);
329         infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
330
331         infoFactory.addAttribute("objectName", String JavaDoc.class, false);
332         infoFactory.addAttribute("classLoader", ClassLoader JavaDoc.class, false);
333
334         infoFactory.addInterface(GeronimoExecutor.class);
335
336         infoFactory.setConstructor(new String JavaDoc[]{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
337
338         GBEAN_INFO = infoFactory.getBeanInfo();
339     }
340
341     public static GBeanInfo getGBeanInfo() {
342         return GBEAN_INFO;
343     }
344 }
345
Popular Tags