1 5 6 package com.sun.corba.se.impl.orbutil.threadpool; 7 8 import java.security.AccessController ; 9 import java.security.PrivilegedAction ; 10 11 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 12 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; 13 import com.sun.corba.se.spi.orbutil.threadpool.Work; 14 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; 15 16 import com.sun.corba.se.impl.orbutil.ORBConstants; 17 import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl; 18 19 import com.sun.corba.se.spi.monitoring.MonitoringConstants; 20 import com.sun.corba.se.spi.monitoring.MonitoredObject; 21 import com.sun.corba.se.spi.monitoring.MonitoringFactories; 22 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; 23 24 public class ThreadPoolImpl implements ThreadPool 25 { 26 private static int threadCounter = 0; 28 private WorkQueue workQueue; 29 30 private int availableWorkerThreads = 0; 32 33 private int currentThreadCount = 0; 35 36 private int minWorkerThreads = 0; 38 39 private int maxWorkerThreads = 0; 41 42 private long inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT ; 44 45 private boolean boundedThreadPool = false; 47 48 private long processedCount = 1; 52 53 private long totalTimeTaken = 0; 56 57 private Object lock = new Object (); 59 60 private String name; 62 63 private MonitoredObject threadpoolMonitoredObject; 65 66 private final ThreadGroup threadGroup ; 68 69 72 public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) { 73 maxWorkerThreads = Integer.MAX_VALUE; 74 workQueue = new WorkQueueImpl(this); 75 threadGroup = tg ; 76 name = threadpoolName; 77 initializeMonitoring(); 78 } 79 80 84 public ThreadPoolImpl(String threadpoolName) { 85 this( Thread.currentThread().getThreadGroup(), threadpoolName ) ; 86 } 87 88 91 public ThreadPoolImpl(int minSize, int maxSize, long timeout, 92 String threadpoolName) 93 { 94 inactivityTimeout = timeout; 95 minWorkerThreads = minSize; 96 maxWorkerThreads = maxSize; 97 boundedThreadPool = true; 98 workQueue = new WorkQueueImpl(this); 99 threadGroup = Thread.currentThread().getThreadGroup() ; 100 name = threadpoolName; 101 for (int i = 0; i < minWorkerThreads; i++) { 102 createWorkerThread(); 103 } 104 initializeMonitoring(); 105 } 106 107 private void initializeMonitoring() { 109 MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory(). 111 createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null). 112 getRootMonitoredObject(); 113 114 MonitoredObject threadPoolMonitoringObjectRoot = root.getChild( 116 MonitoringConstants.THREADPOOL_MONITORING_ROOT); 117 if (threadPoolMonitoringObjectRoot == null) { 118 threadPoolMonitoringObjectRoot = MonitoringFactories. 119 getMonitoredObjectFactory().createMonitoredObject( 120 MonitoringConstants.THREADPOOL_MONITORING_ROOT, 121 MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION); 122 root.addChild(threadPoolMonitoringObjectRoot); 123 } 124 threadpoolMonitoredObject = MonitoringFactories. 125 getMonitoredObjectFactory(). 126 createMonitoredObject(name, 127 MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION); 128 129 threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject); 130 131 LongMonitoredAttributeBase b1 = new 132 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, 133 MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { 134 public Object getValue() { 135 return new Long (ThreadPoolImpl.this.currentNumberOfThreads()); 136 } 137 }; 138 threadpoolMonitoredObject.addAttribute(b1); 139 LongMonitoredAttributeBase b2 = new 140 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, 141 MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { 142 public Object getValue() { 143 return new Long (ThreadPoolImpl.this.numberOfAvailableThreads()); 144 } 145 }; 146 threadpoolMonitoredObject.addAttribute(b2); 147 LongMonitoredAttributeBase b3 = new 148 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, 149 MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) { 150 public Object getValue() { 151 return new Long (ThreadPoolImpl.this.numberOfBusyThreads()); 152 } 153 }; 154 threadpoolMonitoredObject.addAttribute(b3); 155 LongMonitoredAttributeBase b4 = new 156 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, 157 MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) { 158 public Object getValue() { 159 return new Long (ThreadPoolImpl.this.averageWorkCompletionTime()); 160 } 161 }; 162 threadpoolMonitoredObject.addAttribute(b4); 163 LongMonitoredAttributeBase b5 = new 164 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, 165 MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) { 166 public Object getValue() { 167 return new Long (ThreadPoolImpl.this.currentProcessedCount()); 168 } 169 }; 170 threadpoolMonitoredObject.addAttribute(b5); 171 172 174 threadpoolMonitoredObject.addChild( 175 ((WorkQueueImpl)workQueue).getMonitoredObject()); 176 } 177 178 MonitoredObject getMonitoredObject() { 181 return threadpoolMonitoredObject; 182 } 183 184 public WorkQueue getAnyWorkQueue() 185 { 186 return workQueue; 187 } 188 189 public WorkQueue getWorkQueue(int queueId) 190 throws NoSuchWorkQueueException 191 { 192 if (queueId != 0) 193 throw new NoSuchWorkQueueException(); 194 return workQueue; 195 } 196 197 202 void notifyForAvailableWork(WorkQueue aWorkQueue) { 203 synchronized (lock) { 204 if (availableWorkerThreads == 0) { 205 createWorkerThread(); 206 } else { 207 aWorkQueue.notify(); 208 } 209 } 210 } 211 212 213 217 void createWorkerThread() { 218 synchronized (lock) { 219 final String name = getName() ; 220 221 if (boundedThreadPool) { 222 if (currentThreadCount < maxWorkerThreads) { 223 currentThreadCount++; 224 } else { 225 return; 231 } 232 } else { 233 currentThreadCount++; 234 } 235 236 AccessController.doPrivileged( 238 new PrivilegedAction () { 239 public Object run() { 240 WorkerThread thread = new WorkerThread(threadGroup, name); 254 255 thread.setDaemon(true); 262 263 thread.start(); 264 265 return null ; 266 } 267 } 268 ) ; 269 } 270 } 271 272 276 public int minimumNumberOfThreads() { 277 return minWorkerThreads; 278 } 279 280 284 public int maximumNumberOfThreads() { 285 return maxWorkerThreads; 286 } 287 288 292 public long idleTimeoutForThreads() { 293 return inactivityTimeout; 294 } 295 296 300 public int currentNumberOfThreads() { 301 synchronized (lock) { 302 return currentThreadCount; 303 } 304 } 305 306 311 public int numberOfAvailableThreads() { 312 synchronized (lock) { 313 return availableWorkerThreads; 314 } 315 } 316 317 321 public int numberOfBusyThreads() { 322 synchronized (lock) { 323 return (currentThreadCount - availableWorkerThreads); 324 } 325 } 326 327 331 public long averageWorkCompletionTime() { 332 synchronized (lock) { 333 return (totalTimeTaken / processedCount); 334 } 335 } 336 337 340 public long currentProcessedCount() { 341 synchronized (lock) { 342 return processedCount; 343 } 344 } 345 346 public String getName() { 347 return name; 348 } 349 350 353 public int numberOfWorkQueues() { 354 return 1; 355 } 356 357 358 private static synchronized int getUniqueThreadId() { 359 return ThreadPoolImpl.threadCounter++; 360 } 361 362 363 private class WorkerThread extends Thread 364 { 365 private Work currentWork; 366 private int threadId = 0; private String threadPoolName; 369 private StringBuffer workerThreadName = new StringBuffer (); 371 372 WorkerThread(ThreadGroup tg, String threadPoolName) { 373 super(tg, "Idle"); 374 this.threadId = ThreadPoolImpl.getUniqueThreadId(); 375 this.threadPoolName = threadPoolName; 376 setName(composeWorkerThreadName(threadPoolName, "Idle")); 377 } 378 379 public void run() { 380 while (true) { 381 try { 382 383 synchronized (lock) { 384 availableWorkerThreads++; 385 } 386 387 currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout); 389 390 synchronized (lock) { 391 availableWorkerThreads--; 392 if ((availableWorkerThreads == 0) && 404 (workQueue.workItemsInQueue() > 0)) { 405 createWorkerThread(); 406 } 407 } 408 409 setName(composeWorkerThreadName(threadPoolName, 411 Integer.toString(this.threadId))); 412 413 long start = System.currentTimeMillis(); 414 415 try { 416 currentWork.doWork(); 418 } catch (Throwable t) { 419 ; 421 } 422 423 long end = System.currentTimeMillis(); 424 425 426 synchronized (lock) { 427 totalTimeTaken += (end - start); 428 processedCount++; 429 } 430 431 currentWork = null; 434 435 setName(composeWorkerThreadName(threadPoolName, "Idle")); 436 437 } catch (TimeoutException e) { 438 440 synchronized (lock) { 441 availableWorkerThreads--; 442 443 if (currentThreadCount > minWorkerThreads) { 445 currentThreadCount--; 446 return; 448 } else { 449 continue; 451 } 452 } 453 } catch (InterruptedException ie) { 454 synchronized (lock) { 461 availableWorkerThreads--; 462 } 463 464 } catch (Throwable e) { 465 466 synchronized (lock) { 470 availableWorkerThreads--; 471 } 472 473 } 474 } 475 } 476 477 private String composeWorkerThreadName(String poolName, String workerName) { 478 workerThreadName.setLength(0); 479 workerThreadName.append("p: ").append(poolName); 480 workerThreadName.append("; w: ").append(workerName); 481 return workerThreadName.toString(); 482 } 483 } 485 } 486 487 | Popular Tags |