1 23 24 package com.sun.enterprise.connectors.work; 25 26 import com.sun.logging.LogDomains; 27 import com.sun.corba.ee.spi.orbutil.threadpool.*; 28 import com.sun.enterprise.connectors.ConnectorRuntimeException; 29 import com.sun.enterprise.connectors.ConnectorRuntime; 30 import com.sun.enterprise.connectors.work.monitor.*; 31 import com.sun.enterprise.util.S1ASThreadPoolManager; 32 import java.util.logging.*; 33 import javax.resource.spi.work.ExecutionContext ; 34 import javax.resource.spi.work.Work ; 35 import javax.resource.spi.work.WorkException ; 36 import javax.resource.spi.work.WorkListener ; 37 import javax.resource.spi.work.WorkManager ; 38 import com.sun.enterprise.util.i18n.StringManager; 39 40 45 46 public class CommonWorkManager implements MonitorableWorkManager { 47 48 private static WorkManager wm = null; 49 50 private ThreadPoolManager tpm; 51 private ThreadPool tp; 52 53 private static Logger logger = 54 LogDomains.getLogger(LogDomains.RSR_LOGGER); 55 56 private boolean isMonitoringEnabled = false; 58 private WorkStats workStats = null; 59 60 private StringManager localStrings = StringManager.getManager( 61 CommonWorkManager.class); 62 63 69 public CommonWorkManager (String threadpoolId) 70 throws ConnectorRuntimeException { 71 int env = ConnectorRuntime.getRuntime().getEnviron(); 72 73 if (env == ConnectorRuntime.SERVER) { 74 tpm = S1ASThreadPoolManager.getThreadPoolManager(); 75 76 if (threadpoolId == null) { 77 tp = tpm.getDefaultThreadPool(); 78 } else { 79 try { 80 tp = tpm.getThreadPool(threadpoolId); 81 logger.info("Got the thread pool for :" + threadpoolId); 82 } catch (NoSuchThreadPoolException e) { 83 String msg = localStrings.getString("workmanager.threadpool_not_found"); 84 85 logger.log(Level.SEVERE,msg, threadpoolId); 86 throw new ConnectorRuntimeException(e.getMessage()); 87 } 88 } 89 } 90 } 91 92 97 public CommonWorkManager() throws ConnectorRuntimeException { 98 this(null); 99 } 100 101 107 public void doWork (Work work) 108 throws WorkException { 109 doWork(work, -1, null, null); 110 } 111 112 122 public void doWork(Work work, long startTimeout, 123 ExecutionContext execContext, WorkListener workListener) 124 throws WorkException { 125 126 if (logger.isLoggable(Level.FINEST)) { 127 String msg = "doWork for [" + work.toString() + "] START"; 128 logger.log(Level.FINEST, debugMsg(msg)); 129 } 130 131 WorkCoordinator wc = new WorkCoordinator 132 (work, startTimeout, execContext, tp.getAnyWorkQueue(), workListener, 133 this.workStats); 134 wc.submitWork(WorkCoordinator.WAIT_UNTIL_FINISH); 135 wc.lock(); 136 137 WorkException we = wc.getException(); 138 if (we != null) { 139 throw we; 140 } 141 142 if (logger.isLoggable(Level.FINEST)) { 143 String msg = "doWork for [" + work.toString() + "] END"; 144 msg = "doWork for [" + work.toString() + "] END"; 145 logger.log(Level.FINEST, debugMsg(msg)); 146 } 147 } 148 149 156 public long startWork(Work work) throws WorkException { 158 162 return startWork(work, -1, null, null); 163 } 164 165 175 public long startWork(Work work, long startTimeout, 176 ExecutionContext execContext, WorkListener workListener) 177 throws WorkException { 178 179 if (logger.isLoggable(Level.FINEST)) { 180 String msg = "startWork for [" + work.toString() + "] START"; 181 logger.log(Level.FINEST, debugMsg(msg)); 182 } 183 184 long acceptanceTime = System.currentTimeMillis(); 185 186 WorkCoordinator wc = new WorkCoordinator 187 (work, startTimeout, execContext, tp.getAnyWorkQueue(), workListener, 188 this.workStats); 189 wc.submitWork(WorkCoordinator.WAIT_UNTIL_START); 190 wc.lock(); 191 192 WorkException we = wc.getException(); 193 if (we != null) { 194 throw we; 195 } 196 197 if (logger.isLoggable(Level.FINEST)) { 198 String msg = "startWork for [" + work.toString() + "] END"; 199 logger.log(Level.FINEST, debugMsg(msg)); 200 } 201 long startTime = System.currentTimeMillis(); 202 203 return (startTime - acceptanceTime); 204 } 205 206 213 public void scheduleWork(Work work) throws WorkException { 215 scheduleWork(work, -1, null, null); 216 return; 217 } 218 219 229 public void scheduleWork(Work work, long startTimeout, 230 ExecutionContext execContext, WorkListener workListener) 231 throws WorkException { 232 233 if (logger.isLoggable(Level.FINEST)) { 234 String msg = "scheduleWork for [" + work.toString() + "] START"; 235 logger.log(Level.FINEST, debugMsg(msg)); 236 } 237 238 WorkCoordinator wc = new WorkCoordinator 239 (work, startTimeout, execContext, tp.getAnyWorkQueue(), workListener, 240 this.workStats); 241 wc.submitWork(WorkCoordinator.NO_WAIT); 242 wc.lock(); 243 244 WorkException we = wc.getException(); 245 if (we != null) { 246 throw we; 247 } 248 249 if (logger.isLoggable(Level.FINEST)) { 250 String msg = "scheduleWork for [" + work.toString() + "] END"; 251 logger.log(Level.FINEST, debugMsg(msg)); 252 } 253 return; 254 } 255 256 private String debugMsg (String message) { 257 String msg = "[Thread " + Thread.currentThread().getName() 258 + "] -- " + message; 259 return msg; 260 } 261 262 public boolean isMonitoringEnabled() { 264 return this.isMonitoringEnabled; 265 } 266 267 public void setMonitoringEnabled(boolean isEnabled) { 268 this.isMonitoringEnabled = isEnabled; 269 if ( this.workStats == null ) { 270 this.workStats = new WorkStats(); 271 } 272 if (!isEnabled){ 274 this.workStats.reset(); 275 } 276 } 277 278 public long getWaitQueueLength(){ 279 return (long)this.tp.getAnyWorkQueue().workItemsInQueue(); 280 } 281 282 public long getMaxWaitQueueLength() { 283 return this.workStats.maxWaitQueueLength; 284 } 285 286 public long getMinWaitQueueLength() { 287 if (this.workStats.minWaitQueueLength != Long.MAX_VALUE){ 288 return this.workStats.minWaitQueueLength; 289 } else { 290 return 0; 291 } 292 } 293 294 public long getMaxWorkRequestWaitTime(){ 295 return this.workStats.maxWorkRequestWaitTime; 296 297 } 298 public long getMinWorkRequestWaitTime(){ 299 return this.workStats.minWorkRequestWaitTime; 300 } 301 302 public long getSubmittedWorkCount() { 303 return this.workStats.submittedWorkCount; 304 } 305 306 public long getRejectedWorkCount() { 307 return this.workStats.rejectedWorkCount; 308 } 309 310 public long getCompletedWorkCount() { 311 return this.workStats.completedWorkCount; 312 } 313 314 public long getCurrentActiveWorkCount() { 315 return this.workStats.currentActiveWorkCount; 316 } 317 public long getMaxActiveWorkCount() { 318 return this.workStats.maxActiveWorkCount; 319 } 320 321 public long getMinActiveWorkCount() { 322 if (this.workStats.minActiveWorkCount != Long.MAX_VALUE){ 323 return this.workStats.minActiveWorkCount; 324 } else { 325 return 0; 326 } 327 } 328 330 } 331 332 341 class WorkStats { 342 long submittedWorkCount; 343 long completedWorkCount; 344 long rejectedWorkCount; 345 long maxWaitQueueLength; 346 long minWaitQueueLength; 347 348 long currentActiveWorkCount; 349 long minActiveWorkCount; 350 long maxActiveWorkCount; 351 352 long maxWorkRequestWaitTime; 353 long minWorkRequestWaitTime; 354 long currWaitQueueLength; 355 356 public void reset(){ 357 this.submittedWorkCount = 0L; 358 this.rejectedWorkCount = 0L; 359 this.completedWorkCount = 0L; 360 361 this.currWaitQueueLength = 0L; 362 this.maxWaitQueueLength = 0L; 363 this.minWaitQueueLength = Long.MAX_VALUE; 364 365 this.currentActiveWorkCount = 0L; 366 this.minActiveWorkCount = Long.MAX_VALUE; 367 this.maxActiveWorkCount= 0L; 368 369 this.maxWorkRequestWaitTime = 0L; 370 this.minWorkRequestWaitTime = 0L; 371 } 372 373 public synchronized void setWorkWaitTime(long waitTime){ 374 if (waitTime > maxWorkRequestWaitTime) { 376 this.maxWorkRequestWaitTime = waitTime; 377 } 378 379 if (waitTime < minWorkRequestWaitTime) { 381 this.minWorkRequestWaitTime = waitTime; 382 } 383 } 384 385 public synchronized void incrementWaitQueueLength(){ 386 setWaitQueueLength(this.currWaitQueueLength++); 387 } 388 389 public synchronized void decrementWaitQueueLength(){ 390 setWaitQueueLength(this.currWaitQueueLength--); 391 } 392 393 private void setWaitQueueLength(long waitQueueLength){ 394 if (waitQueueLength > maxWaitQueueLength) { 396 maxWaitQueueLength = waitQueueLength; 397 } 398 if (waitQueueLength < minWaitQueueLength) { 400 minWaitQueueLength = waitQueueLength; 401 } 402 } 403 404 public synchronized void setActiveWorkCount(long currentActiveWorkCount){ 405 this.currentActiveWorkCount = currentActiveWorkCount; 406 if (currentActiveWorkCount > maxActiveWorkCount) { 408 maxActiveWorkCount = currentActiveWorkCount; 409 } 410 if (currentActiveWorkCount < minActiveWorkCount) { 412 minActiveWorkCount = currentActiveWorkCount; 413 } 414 } 415 } 416 | Popular Tags |