1 17 18 package org.apache.geronimo.pool; 19 20 import java.util.Collections ; 21 import java.util.HashMap ; 22 import java.util.Iterator ; 23 import java.util.Map ; 24 25 import javax.management.MalformedObjectNameException ; 26 import javax.management.ObjectName ; 27 import javax.management.j2ee.statistics.BoundedRangeStatistic ; 28 import javax.management.j2ee.statistics.CountStatistic ; 29 import javax.management.j2ee.statistics.Stats ; 30 31 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; 32 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler; 33 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException; 34 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; 35 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue; 36 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; 37 38 import org.apache.geronimo.gbean.GBeanInfo; 39 import org.apache.geronimo.gbean.GBeanInfoBuilder; 40 import org.apache.geronimo.gbean.GBeanLifecycle; 41 42 import org.apache.geronimo.management.J2EEManagedObject; 43 import org.apache.geronimo.management.StatisticsProvider; 44 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats; 45 import org.apache.geronimo.management.stats.BoundedRangeImpl; 46 import org.apache.geronimo.management.stats.CountStatisticImpl; 47 import org.apache.geronimo.management.stats.StatsImpl; 48 49 52 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider { 53 private ThreadPoolExecutor executor; 54 private ClassLoader classLoader; 55 private ObjectName objectName; 56 private boolean waitWhenBlocked; 57 58 private boolean statsActive = true; 60 private PoolStatsImpl stats = new PoolStatsImpl(); 61 private Map clients = new HashMap (); 62 63 public ThreadPool(int poolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) { 64 ThreadPoolExecutor p = new ThreadPoolExecutor( 65 poolSize, poolSize, keepAliveTime, TimeUnit.MILLISECONDS, 68 new SynchronousQueue()); 69 70 p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 71 p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader)); 72 73 try { 74 this.objectName = ObjectName.getInstance(objectName); 75 } catch (MalformedObjectNameException e) { 76 throw new IllegalStateException ("Bad object name injected: " + e.getMessage()); 77 } 78 79 executor = p; 80 this.classLoader = classLoader; 81 } 82 83 public String getName() { 84 return objectName.getKeyProperty("name"); 85 } 86 87 public String getObjectName() { 88 return objectName.getCanonicalName(); 89 } 90 91 public boolean isEventProvider() { 92 return true; 93 } 94 95 public boolean isStateManageable() { 96 return true; 97 } 98 99 public boolean isStatisticsProvider() { 100 return true; 101 } 102 103 public Stats getStats() { 104 stats.threadsInUse.setLowerBound(0); 105 stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize()); 106 int inUse = executor.getPoolSize(); 107 stats.threadsInUse.setCurrent(inUse); 108 if (inUse < stats.threadsInUse.getLowWaterMark()) { 109 stats.threadsInUse.setLowWaterMark(inUse); 110 } 111 if (inUse > stats.threadsInUse.getHighWaterMark()) { 112 stats.threadsInUse.setHighWaterMark(inUse); 113 } 114 if (statsActive) { 115 synchronized (this) { 116 stats.prepareConsumers(clients); 117 } 118 } else { 119 stats.prepareConsumers(Collections.EMPTY_MAP); 120 } 121 return stats; 122 } 123 124 public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats { 125 private BoundedRangeImpl threadsInUse = new BoundedRangeImpl("Threads In Use", "", "The number of threads in use by this thread pool"); 126 private Map consumers = new HashMap (); 127 128 public PoolStatsImpl() { 129 addStat(threadsInUse.getName(), threadsInUse); 130 } 131 132 public BoundedRangeStatistic getThreadsInUse() { 133 return threadsInUse; 134 } 135 136 public CountStatistic getCountForConsumer(String consumer) { 137 return (CountStatistic ) consumers.get(consumer); 138 } 139 140 public String [] getThreadConsumers() { 141 return (String []) consumers.keySet().toArray(new String [consumers.size()]); 142 } 143 144 public void prepareConsumers(Map clients) { 145 Map result = new HashMap (); 146 for (Iterator it = clients.keySet().iterator(); it.hasNext();) { 147 String client = (String ) it.next(); 148 Integer count = (Integer ) clients.get(client); 149 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client); 150 if (stat == null) { 151 stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue()); 152 addStat(stat.getName(), stat); 153 } else { 154 consumers.remove(client); 155 stat.setCount(count.intValue()); 156 } 157 result.put(client, stat); 158 } 159 for (Iterator it = consumers.keySet().iterator(); it.hasNext();) { 160 String client = (String ) it.next(); 161 removeStat(((CountStatisticImpl) consumers.get(client)).getName()); 162 } 163 consumers = result; 164 } 165 } 166 167 168 public int getPoolSize() { 169 return executor.getMaximumPoolSize(); 170 } 171 172 public void execute(Runnable command) { 173 execute("Unknown", command); 174 } 175 176 public void execute(final String consumerName, final Runnable runnable) { 177 Runnable command; 178 if (statsActive) { 179 command = new Runnable () { 180 public void run() { 181 startWork(consumerName); 182 try { 183 runnable.run(); 184 } finally { 185 finishWork(consumerName); 186 } 187 } 188 }; 189 } else { 190 command = runnable; 191 } 192 193 ThreadPoolExecutor p; 194 synchronized (this) { 195 p = executor; 196 } 197 if (p == null) { 198 throw new IllegalStateException ("ThreadPool has been stopped"); 199 } 200 Runnable task = new ContextClassLoaderRunnable(command, classLoader); 201 p.execute(task); 202 } 203 204 private synchronized void startWork(String consumerName) { 205 Integer test = (Integer ) clients.get(consumerName); 206 if (test == null) { 207 clients.put(consumerName, new Integer (1)); 208 } else { 209 clients.put(consumerName, new Integer (test.intValue() + 1)); 210 } 211 } 212 213 private synchronized void finishWork(String consumerName) { 214 Integer test = (Integer ) clients.get(consumerName); 215 if (test.intValue() == 1) { 216 clients.remove(consumerName); 217 } else { 218 clients.put(consumerName, new Integer (test.intValue() - 1)); 219 } 220 } 221 222 private static class WaitWhenBlockedPolicy 223 implements RejectedExecutionHandler 224 { 225 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException { 226 try { 227 executor.getQueue().put(r); 228 } 229 catch (InterruptedException e) { 230 throw new RejectedExecutionException(e); 231 } 232 } 233 } 234 235 public void setWaitWhenBlocked(boolean wait) { 236 waitWhenBlocked = wait; 237 if(wait) { 238 executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy()); 239 } else { 240 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 241 } 242 } 243 244 public boolean isWaitWhenBlocked() { 245 return waitWhenBlocked; 246 } 247 248 public void doStart() throws Exception { 249 } 250 251 public void doStop() throws Exception { 252 ThreadPoolExecutor p; 253 synchronized (this) { 254 p = executor; 255 executor = null; 256 classLoader = null; 257 } 258 if (p != null) { 259 p.shutdownNow(); 260 } 261 } 262 263 public void doFail() { 264 try { 265 doStop(); 266 } catch (Exception e) { 267 } 268 } 269 270 private static final class ThreadPoolThreadFactory implements ThreadFactory { 271 private final String poolName; 272 private final ClassLoader classLoader; 273 274 private int nextWorkerID = 0; 275 276 public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) { 277 this.poolName = poolName; 278 this.classLoader = classLoader; 279 } 280 281 public Thread newThread(Runnable arg0) { 282 Thread thread = new Thread (arg0, poolName + " " + getNextWorkerID()); 283 thread.setContextClassLoader(classLoader); 284 return thread; 285 } 286 287 private synchronized int getNextWorkerID() { 288 return nextWorkerID++; 289 } 290 } 291 292 private static final class ContextClassLoaderRunnable implements Runnable { 293 private Runnable task; 294 private ClassLoader classLoader; 295 296 public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) { 297 this.task = task; 298 this.classLoader = classLoader; 299 } 300 301 public void run() { 302 Runnable myTask = task; 303 ClassLoader myClassLoader = classLoader; 304 305 task = null; 307 classLoader = null; 308 309 if (myClassLoader != null) { 310 try { 313 myTask.run(); 314 } finally { 315 Thread.currentThread().setContextClassLoader(myClassLoader); 316 } 317 } 318 } 319 } 320 321 public static final GBeanInfo GBEAN_INFO; 322 323 static { 324 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class); 325 326 infoFactory.addAttribute("poolSize", int.class, true); 327 infoFactory.addAttribute("poolName", String .class, true); 328 infoFactory.addAttribute("keepAliveTime", long.class, true); 329 infoFactory.addAttribute("waitWhenBlocked", boolean.class, true); 330 331 infoFactory.addAttribute("objectName", String .class, false); 332 infoFactory.addAttribute("classLoader", ClassLoader .class, false); 333 334 infoFactory.addInterface(GeronimoExecutor.class); 335 336 infoFactory.setConstructor(new String []{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"}); 337 338 GBEAN_INFO = infoFactory.getBeanInfo(); 339 } 340 341 public static GBeanInfo getGBeanInfo() { 342 return GBEAN_INFO; 343 } 344 } 345 | Popular Tags |