1 23 package org.objectweb.clif.scenario.util.isac.engine.loadprofile; 24 25 import java.util.Enumeration ; 26 import java.util.Hashtable ; 27 28 import org.objectweb.clif.datacollector.api.DataCollectorWrite; 29 import org.objectweb.clif.scenario.util.isac.engine.IsacScenarioEngine; 30 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorManager; 31 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorsPool; 32 import org.objectweb.clif.scenario.util.isac.engine.sessionobject.SessionObjectManager; 33 import org.objectweb.clif.scenario.util.isac.exception.IsacRuntimeException; 34 import org.objectweb.clif.scenario.util.isac.loadprofile.GroupDescription; 35 import org.objectweb.clif.scenario.util.isac.util.BooleanHolder; 36 import org.objectweb.clif.server.api.BladeInsertResponse; 37 import org.objectweb.util.monolog.api.BasicLevel; 38 import org.objectweb.util.monolog.api.Logger; 39 40 53 public class GroupExecutionWithPoolThread extends Thread { 54 static protected Logger log = IsacScenarioEngine.logger 56 .getLogger(GroupExecutionWithPoolThread.class.getName()); 57 58 private static final int LOOP_DURATION=1000; 60 61 private Object suspendLock; 64 65 private Object groupExecutionLock; 66 67 private Object surplusLock; 70 71 private Object componentLock; 73 74 private Object dataCollectorWrite_lock; 75 76 private Object timerLock; 78 79 private DataCollectorWrite dataCollectorWrite; 81 82 private BladeInsertResponse bladeInsertResponse; 84 85 private Object bladeInsertResponse_lock; 86 87 private volatile BooleanHolder stopped; 89 90 private volatile BooleanHolder suspended; 91 92 private long suspendTotalTime; 94 95 private long beginTime; 96 97 private long lateTotalTime; 98 99 private long activitiesTotalTime; 100 101 private int currentSecond; 102 103 private boolean forceThreadStop; 106 107 private GroupDescriptionManager groupDescriptionManager; 109 110 private BehaviorManager behaviorManager; 111 112 private SessionObjectManager sessionObjectManager; 113 114 private boolean localStarted; 117 118 private boolean localStopped; 119 120 private boolean localSuspended; 121 122 private Hashtable pools; 124 125 private String bladeId; 126 127 156 public GroupExecutionWithPoolThread(String bladeId, GroupDescriptionManager gdm, 157 BehaviorManager bm, SessionObjectManager som, Object sl, Object tl, 158 Object cl, Object sul, DataCollectorWrite dcw, Object dcwl, 159 BladeInsertResponse bir, Object birl, BooleanHolder stopped, 160 BooleanHolder suspended) { 161 this.bladeId = bladeId; 163 this.groupDescriptionManager = gdm; 164 this.behaviorManager = bm; 165 this.sessionObjectManager = som; 166 this.suspendLock = sl; 167 this.timerLock = tl; 168 this.componentLock = cl; 169 this.surplusLock = sl; 170 this.dataCollectorWrite = dcw; 171 this.dataCollectorWrite_lock = dcwl; 172 this.bladeInsertResponse = bir; 173 this.bladeInsertResponse_lock = birl; 174 this.stopped = stopped; 175 this.suspended = suspended; 176 this.lateTotalTime = 0; 178 this.beginTime = 0; 179 this.suspendTotalTime = 0; 180 this.currentSecond = 1; 181 this.activitiesTotalTime = 0; 182 this.localStarted = false; 185 this.localStopped = false; 186 this.localSuspended = false; 187 this.groupExecutionLock = new Object (); 189 190 this.initializePools(); 192 } 193 194 197 private void initializePools() { 198 this.pools = new Hashtable (); 200 Enumeration groupsId = this.groupDescriptionManager.getGroupsIds(); 202 while (groupsId.hasMoreElements()) { 204 Integer id = (Integer ) groupsId.nextElement(); 206 int max = this.groupDescriptionManager 208 .getMaximumBehaviorsForAGroup(id); 209 BehaviorsPool pool = new BehaviorsPool(bladeId, id, 211 this.groupDescriptionManager, this.behaviorManager, 212 this.sessionObjectManager, this.stopped, this.suspended, 213 this.suspendLock, this.groupExecutionLock, 214 this.dataCollectorWrite, this.dataCollectorWrite_lock, 215 this.timerLock); 216 pool.initializeBehaviorsThreads(); 218 this.pools.put(id, pool); 220 } 221 } 222 223 226 public void run() { 227 this.beginTime = System.currentTimeMillis(); 229 if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) { 230 log.log(BasicLevel.DEBUG, " ** GET ** beginTime=" + beginTime); 231 } 232 boolean allGroupsFinished = false ; 234 while (!stopped.getBooleanValue() && !allGroupsFinished) { 236 if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) { 237 log.log(BasicLevel.DEBUG, "******************** SECOND=" 238 + currentSecond + "***************"); 239 } 240 this.executeSuspendIfNeeded(); 242 Enumeration groupsIds = this.groupDescriptionManager.getGroupsIds(); 244 if (!groupsIds.hasMoreElements()) { 246 log.log(BasicLevel.DEBUG, "There is no more group in the GDM"); 247 break; 249 } 250 allGroupsFinished = true ; 252 while (groupsIds.hasMoreElements()) { 254 this.executeSuspendIfNeeded(); 256 Integer currentId = (Integer ) groupsIds.nextElement(); 258 if (IsacScenarioEngine.DEBUG_ON 259 && log.isLoggable(BasicLevel.DEBUG)) { 260 log.log(BasicLevel.DEBUG, " ** GET (" + currentId 261 + ")** Group analyzed=" + currentId); 262 } 263 GroupDescription currentGroupDescription = (GroupDescription) this.groupDescriptionManager 265 .getGroupDescription(currentId); 266 long numberOfThreads = currentGroupDescription 269 .getVirtualUserNumber(currentSecond); 270 if (IsacScenarioEngine.DEBUG_ON 271 && log.isLoggable(BasicLevel.DEBUG)) { 272 log.log(BasicLevel.DEBUG, 273 " ** GET ** numberOfThreads needed=" 274 + numberOfThreads); 275 } 276 277 if (!((BehaviorsPool) this.pools.get(currentId)) 279 .setNumberOfThreadsRunning((int) numberOfThreads)) { 280 allGroupsFinished = false ; 281 } 282 } this.sleepSurplusTime(); 284 this.currentSecond++; 285 } if (stopped.getBooleanValue()) { 288 this.waitForAllThreadsEnding(); 290 synchronized (this.componentLock) { 291 this.localStopped = true; 293 this.componentLock.notify(); 294 } 295 } 296 else { 299 this.stopped.setBooleanValue(true); 301 this.waitForAllThreadsEnding(); 303 log.log(BasicLevel.DEBUG, " ** GET ** send completed msg"); 304 synchronized (bladeInsertResponse_lock) { 306 if (bladeInsertResponse != null) { 307 bladeInsertResponse.completed(); 308 } 309 } 310 } 311 long endTime = System.currentTimeMillis(); 313 log.log(BasicLevel.WARN,"** EXECUTION FINISHED : "); 314 log.log(BasicLevel.WARN,"** currentSecond=" + currentSecond); 315 if (!allGroupsFinished) { 316 log.log(BasicLevel.WARN,"** application was forced to stop"); 317 } else { 318 log.log(BasicLevel.WARN,"** application finish by itself"); 319 } 320 log.log(BasicLevel.WARN,"** REAL DURATION OF THE TEST=" 321 + (this.beginTime - endTime)); 322 log.log(BasicLevel.WARN,"** TOTAL SUSPENDED TIME=" + this.suspendTotalTime); 323 log.log(BasicLevel.WARN,"** REAL ACTIVITIES TIME=" 324 + this.activitiesTotalTime); 325 log.log(BasicLevel.WARN,"** TOTAL LATE TIME=" + this.lateTotalTime); 326 } 327 328 331 private void waitForAllThreadsEnding() { 332 log.log(BasicLevel.DEBUG, " ** GET ** WAIT ALL THREAD FINISHING"); 333 synchronized (this.groupExecutionLock) { 334 log.log(BasicLevel.DEBUG, " ** GET ** AWAKE ALL POOLED THREADS"); 335 this.awakeAllThreads(); 336 boolean haveAllFinished = false; 337 while (!haveAllFinished) { 338 log.log(BasicLevel.DEBUG, 339 " ** GET ** WHILE THEY DON'T HAVE ALL FINISH"); 340 haveAllFinished = true; 342 Enumeration poolsElems = this.pools.elements(); 344 while (poolsElems.hasMoreElements()) { 345 BehaviorsPool temp = (BehaviorsPool) poolsElems 346 .nextElement(); 347 if (temp.getNumberOfThreadAlive() != 0) { 348 log.log(BasicLevel.DEBUG, 349 " ** GET ** A POOL HAVE SOME RUNNING THREADS ->" 350 + temp.getNumberOfThreadAlive()); 351 haveAllFinished = false; 353 break; 355 } 356 } 357 if (haveAllFinished) { 359 break; 360 } 361 log.log(BasicLevel.DEBUG, 363 " ** GET ** WAIT NOTIFICATION OF A POOL"); 364 try { 365 this.groupExecutionLock.wait(); 366 } catch (InterruptedException ex) { 367 throw new IsacRuntimeException( 368 "Unable to wait the end of all threads",ex); 369 } 370 } 371 } 372 } 373 374 377 private void awakeAllThreads() { 378 Enumeration e = this.pools.elements(); 379 while (e.hasMoreElements()) { 380 ((BehaviorsPool) e.nextElement()).awakeAllThreads(); 381 } 382 } 383 384 388 private void sleepSurplusTime() { 389 long time = System.currentTimeMillis(); 390 long elapsed = time - beginTime - (currentSecond - 1) * LOOP_DURATION 391 - suspendTotalTime - lateTotalTime; 392 if (log.isLoggable(BasicLevel.WARN)) 396 log.log(BasicLevel.WARN, " ** GET ** elapsed time=" + elapsed); 397 399 this.activitiesTotalTime += elapsed; 400 if (LOOP_DURATION - elapsed < 0) { 402 this.lateTotalTime += elapsed - LOOP_DURATION; 403 } else { 404 long sleeped = 0; 405 while (sleeped < (LOOP_DURATION - elapsed)) { 406 long beginSleep = System.currentTimeMillis(); 407 synchronized (this.componentLock) { 408 if (!this.suspended.getBooleanValue()) { 410 synchronized (this.surplusLock) { 411 try { 412 surplusLock.wait(LOOP_DURATION - elapsed); 413 } catch (InterruptedException ex) { 414 throw new IsacRuntimeException( 415 "Unable to sleep the surplus time",ex); 416 } 417 } 418 } 419 } 420 sleeped += System.currentTimeMillis() - beginSleep; 421 if (this.stopped.getBooleanValue()) 423 return; 424 this.executeSuspendIfNeeded(); 425 } 426 } 427 } 428 429 433 private void executeSuspendIfNeeded() { 434 log.log(BasicLevel.DEBUG, " ** GET ** execute suspend if needed"); 435 if (this.suspended.getBooleanValue()) { 437 long beginSuspend = System.currentTimeMillis(); 439 Enumeration poolElements = this.pools.elements() ; 441 while (poolElements.hasMoreElements()) { 442 BehaviorsPool pool = (BehaviorsPool)poolElements.nextElement() ; 443 pool.waitForSuspendState() ; 444 } 445 synchronized (this.componentLock) { 448 this.localSuspended = true; 449 this.componentLock.notify(); 451 } 452 log.log(BasicLevel.DEBUG, " ** GET ** DO SUSPEND"); 453 synchronized (this.suspendLock) { 455 try { 456 this.suspendLock.wait(); 457 } catch (InterruptedException ex) { 458 throw new IsacRuntimeException( 459 "Unable to wait during the suspended state in GroupExecutionThread",ex); 460 } 461 } 462 poolElements = this.pools.elements() ; 464 while (poolElements.hasMoreElements()) { 465 BehaviorsPool pool = (BehaviorsPool)poolElements.nextElement() ; 466 pool.waitForResumeState() ; 467 } 468 synchronized (this.componentLock) { 469 this.localSuspended = false; 471 this.componentLock.notify(); 473 } 474 this.suspendTotalTime += System.currentTimeMillis() - beginSuspend; 477 } 478 } 479 480 484 487 public boolean isLocalStarted() { 488 return localStarted; 489 } 490 491 494 public boolean isLocalStopped() { 495 return localStopped; 496 } 497 498 501 public boolean isLocalSuspended() { 502 return localSuspended; 503 } 504 } | Popular Tags |