1 23 package org.objectweb.clif.scenario.util.isac.engine.loadprofile; 24 25 import java.util.Enumeration ; 26 import java.util.Vector ; 27 28 import org.objectweb.clif.datacollector.api.DataCollectorWrite; 29 import org.objectweb.clif.scenario.util.isac.engine.IsacScenarioEngine; 30 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorExecutionThread; 31 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorManager; 32 import org.objectweb.clif.scenario.util.isac.engine.behavior.node.ExecutableNode; 33 import org.objectweb.clif.scenario.util.isac.engine.sessionobject.SessionObjectManager; 34 import org.objectweb.clif.scenario.util.isac.exception.IsacRuntimeException; 35 import org.objectweb.clif.scenario.util.isac.loadprofile.GroupDescription; 36 import org.objectweb.clif.scenario.util.isac.util.BooleanHolder; 37 import org.objectweb.clif.scenario.util.isac.util.SessionObjectHashtable; 38 import org.objectweb.clif.scenario.util.isac.util.IntHolder; 39 import org.objectweb.clif.server.api.BladeInsertResponse; 40 import org.objectweb.util.monolog.api.BasicLevel; 41 import org.objectweb.util.monolog.api.Logger; 42 43 53 public class GroupExecutionThread extends Thread { 54 static protected Logger log = IsacScenarioEngine.logger 56 .getLogger(GroupExecutionThread.class.getName()); 57 private static final int LOOP_DURATION = 1000; 59 60 private Object scenarioLock; 63 64 private Object activitiesLock; 65 66 private Object surplusLock ; 68 69 private Object groupExecutionActivitiesLock; 70 71 private Object dataCollectorWrite_lock; 72 73 private Object timerLock; 75 76 private DataCollectorWrite dataCollectorWrite; 78 79 private BladeInsertResponse bladeInsertResponse; 81 82 private Object bladeInsertResponse_lock; 83 84 private volatile BooleanHolder started; 86 87 private volatile BooleanHolder stopped; 88 89 private volatile BooleanHolder suspended; 90 91 private long suspendTotalTime; 93 94 private long beginTime; 95 96 private long lateTotalTime; 97 98 private long activitiesTotalTime; 99 100 private int currentSecond; 101 102 private GroupDescriptionManager groupDescriptionManager; 104 105 private BehaviorManager behaviorManager; 106 107 private SessionObjectManager sessionObjectManager; 108 109 private boolean localStarted; 112 113 private boolean localStopped; 114 115 private boolean localSuspended; 116 117 private String bladeId; 118 119 146 public GroupExecutionThread(String bladeId, GroupDescriptionManager gdm, 147 BehaviorManager bm, SessionObjectManager som, Object sl, Object al, 148 DataCollectorWrite dcw, Object dcwl, BladeInsertResponse bir, 149 Object birl, BooleanHolder started, BooleanHolder stopped, 150 BooleanHolder suspended, Object surpluslock) 151 { 152 this.bladeId = bladeId; 153 this.groupDescriptionManager = gdm; 154 this.behaviorManager = bm; 155 this.sessionObjectManager = som; 156 this.scenarioLock = sl; 157 this.activitiesLock = al; 158 this.dataCollectorWrite = dcw; 159 this.dataCollectorWrite_lock = dcwl; 160 this.bladeInsertResponse = bir; 161 this.bladeInsertResponse_lock = birl; 162 this.started = started; 163 this.stopped = stopped; 164 this.suspended = suspended; 165 this.lateTotalTime = 0; 167 this.beginTime = 0; 168 this.suspendTotalTime = 0; 169 this.currentSecond = 1; 170 this.activitiesTotalTime = 0; 171 this.localStarted = false; 174 this.localStopped = false; 175 this.localSuspended = false; 176 this.timerLock = new Object (); 178 this.surplusLock = surpluslock ; 180 } 181 182 185 public void run() { 186 this.beginTime = System.currentTimeMillis(); 188 if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) { 189 log.log(BasicLevel.DEBUG, " ** GET ** beginTime=" + beginTime); 190 } 191 192 while (!stopped.getBooleanValue()) { 194 if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) { 195 log.log(BasicLevel.DEBUG, "******************** SECOND=" 196 + currentSecond + "***************"); 197 } 198 this.executeSuspendIfNeeded(); 200 Enumeration groupsIds = this.groupDescriptionManager.getGroupsIds(); 202 if (!groupsIds.hasMoreElements()) { 204 log.log(BasicLevel.DEBUG, "There is no more group in the GDM"); 205 break; 207 } 208 while (groupsIds.hasMoreElements()) { 210 this.executeSuspendIfNeeded(); 212 Integer currentId = (Integer ) groupsIds.nextElement(); 214 if (IsacScenarioEngine.DEBUG_ON 215 && log.isLoggable(BasicLevel.DEBUG)) { 216 log.log(BasicLevel.DEBUG, " ** GET (" + currentId 217 + ")** Group analyzed=" + currentId); 218 } 219 GroupDescription currentGroupDescription = this.groupDescriptionManager 221 .getGroupDescription(currentId); 222 Vector behaviorsThreads = this.groupDescriptionManager 224 .getGroupRunningBehaviors(currentId); 225 boolean forceThreadStop = currentGroupDescription.isForceStop(); 227 long numberOfThreads = currentGroupDescription 230 .getVirtualUserNumber(currentSecond); 231 if (IsacScenarioEngine.DEBUG_ON 232 && log.isLoggable(BasicLevel.DEBUG)) { 233 log.log(BasicLevel.DEBUG, 234 " ** GET ** numberOfThreads needed=" 235 + numberOfThreads); 236 } 237 if (numberOfThreads == GroupDescription.END) { 240 log.log(BasicLevel.DEBUG, 241 " ** GET ** This group is finished"); 242 if (forceThreadStop) { 244 246 IntHolder numberOfThreadRequired = this.groupDescriptionManager 249 .getNumberThreadRunningRequiredForAGroup(currentId); 250 synchronized (activitiesLock) { 251 numberOfThreadRequired.setIntValue(0); 253 for (int i = 0; i < behaviorsThreads.size(); i++) 255 ((BehaviorExecutionThread) behaviorsThreads 256 .elementAt(i)).setLocalStopped(true); 257 if (behaviorsThreads.size() != 0) { 258 log.log(BasicLevel.DEBUG, 260 " ** GET ** AWAKE THREADS IN TIMER"); 261 synchronized (this.timerLock) { 262 this.timerLock.notifyAll(); 263 } 264 log 266 .log(BasicLevel.DEBUG, 267 " ** GET ** wait for all stop of the threads"); 268 try { 269 this.activitiesLock.wait(); 270 } catch (InterruptedException ex) { 271 throw new IsacRuntimeException( 272 "Unable to wait the stop of threads for the group",ex); 273 } 274 } 275 } 276 this.groupDescriptionManager.removeGroup(currentId); 280 log 281 .log(BasicLevel.DEBUG, 282 " ** GET ** group description removed in the GDM"); 283 } else { 284 synchronized (activitiesLock) { 287 int numberOfThreadsRunning = behaviorsThreads 288 .size(); 289 if (IsacScenarioEngine.DEBUG_ON 290 && log.isLoggable(BasicLevel.DEBUG)) { 291 log.log(BasicLevel.DEBUG, 292 "Thread still running : " 293 + numberOfThreadsRunning); 294 } 295 if (numberOfThreadsRunning == 0) { 296 this.groupDescriptionManager 301 .removeGroup(currentId); 302 } 303 } 304 } 305 continue; 308 } 309 synchronized (activitiesLock) { 315 int numberOfThreadsRunning = behaviorsThreads.size(); 317 if (IsacScenarioEngine.DEBUG_ON 318 && log.isLoggable(BasicLevel.DEBUG)) { 319 log.log(BasicLevel.DEBUG, " ** GET ** Thread Running=" 320 + numberOfThreadsRunning); 321 } 322 if (numberOfThreadsRunning == numberOfThreads) { 324 log.log(BasicLevel.DEBUG, 326 " ** GET ** we have the right number thread"); 327 continue; 328 } 329 else if (numberOfThreadsRunning < numberOfThreads) { 332 log.log(BasicLevel.DEBUG, 333 " ** GET ** we need to add some threads"); 334 IntHolder numberOfWaiting = this.groupDescriptionManager 337 .getNumberThreadWaitingForAGroup(currentId); 338 339 String behaviorId = currentGroupDescription 341 .getBehaviorId(); 342 ExecutableNode executableNode = this.behaviorManager 344 .getBehavior(behaviorId); 345 IntHolder numberOfThreadRequired = this.groupDescriptionManager 348 .getNumberThreadRunningRequiredForAGroup(currentId); 349 for (int i = 0; i < numberOfThreads 351 - numberOfThreadsRunning; i++) { 352 SessionObjectHashtable clonedSessionsObjects = (SessionObjectHashtable) (this.sessionObjectManager 354 .getSessionObjectsForABehavior(behaviorId)) 355 .clone(); 356 BehaviorExecutionThread thread = new BehaviorExecutionThread( 358 bladeId, 359 numberOfThreadsRunning + i, executableNode, 360 clonedSessionsObjects, scenarioLock, 361 activitiesLock, timerLock, 362 this.dataCollectorWrite, 363 this.dataCollectorWrite_lock, started, 364 stopped, suspended, behaviorsThreads, 365 numberOfWaiting, numberOfThreadRequired); 366 behaviorsThreads.add(thread); 368 thread.start(); 370 } 371 } 372 else { 375 log.log(BasicLevel.DEBUG, 376 " ** GET ** we need to stop some threads"); 377 if (forceThreadStop) { 379 IntHolder numberOfThreadRequired = this.groupDescriptionManager 382 .getNumberThreadRunningRequiredForAGroup(currentId); 383 numberOfThreadRequired 385 .setIntValue((int) numberOfThreads); 386 for (int i = 0; i < numberOfThreadsRunning 389 - numberOfThreads; i++) { 390 ((BehaviorExecutionThread) behaviorsThreads 393 .elementAt(i)).setLocalStopped(true); 394 } 395 if (behaviorsThreads.size() != numberOfThreadRequired 396 .getIntValue()) { 397 log.log(BasicLevel.DEBUG, 399 " ** GET ** AWAKE THREADS IN TIMER"); 400 synchronized (this.timerLock) { 401 this.timerLock.notifyAll(); 402 } 403 log 405 .log(BasicLevel.DEBUG, 406 " ** GET ** wait for all stop of the threads"); 407 try { 408 this.activitiesLock.wait(); 409 } catch (InterruptedException ex) { 410 throw new IsacRuntimeException( 411 "Unable to wait the ending of the surplus of running threads",ex); 412 } 413 } 414 } else { 415 log 416 .log(BasicLevel.DEBUG, 417 " ** GET ** leave the threads running because"); 418 continue; 420 } 421 } 422 } } this.sleepSurplusTime(); 426 this.currentSecond++; 427 } if (stopped.getBooleanValue()) { 430 log.log(BasicLevel.DEBUG, " ** GET ** AWAKE THREADS IN TIMER"); 432 synchronized (this.timerLock) { 433 this.timerLock.notifyAll(); 434 } 435 synchronized (activitiesLock) { 436 while (this.groupDescriptionManager 437 .getTotalNumberThreadRunning() != 0) { 438 if (IsacScenarioEngine.DEBUG_ON 439 && log.isLoggable(BasicLevel.DEBUG)) { 440 log.log(BasicLevel.DEBUG, 441 " ** GET ** wait for all stop of the threads still running=" 442 + this.groupDescriptionManager 443 .getTotalNumberThreadRunning()); 444 } 445 try { 446 this.activitiesLock.wait(); 447 } catch (InterruptedException ex) { 448 throw new IsacRuntimeException( 449 "Unable to wait for the stop of all threads",ex); 450 } 451 } 452 } 453 } 454 else { 457 log.log(BasicLevel.DEBUG, " ** GET ** send completed msg"); 458 synchronized (bladeInsertResponse_lock) { 460 if (bladeInsertResponse != null) { 461 bladeInsertResponse.completed(); 462 } 463 } 464 } 465 long endTime = System.currentTimeMillis(); 467 log.log(BasicLevel.WARN,"** EXECUTION FINISHED : "); 468 log.log(BasicLevel.WARN,"** currentSecond=" + currentSecond); 469 if (stopped.getBooleanValue()) { 470 log.log(BasicLevel.WARN,"** application was forced to stop"); 471 } else { 472 log.log(BasicLevel.WARN,"** application finish by itself"); 473 } 474 log.log(BasicLevel.WARN,"** REAL DURATION OF THE TEST=" 475 + (this.beginTime - endTime)); 476 log.log(BasicLevel.WARN,"** TOTAL SUSPENDED TIME=" + this.suspendTotalTime); 477 log.log(BasicLevel.WARN,"** REAL ACTIVITIES TIME=" 478 + this.activitiesTotalTime); 479 log.log(BasicLevel.WARN,"** TOTAL LATE TIME=" + this.lateTotalTime); 480 } 481 482 486 private void sleepSurplusTime() { 487 long time = System.currentTimeMillis(); 488 long elapsed = time - beginTime - (currentSecond - 1) * LOOP_DURATION 489 - suspendTotalTime - lateTotalTime; 490 if (log.isLoggable(BasicLevel.WARN)) 494 log.log(BasicLevel.WARN, " ** GET ** elapsed time=" + elapsed); 495 497 this.activitiesTotalTime += elapsed; 498 if (LOOP_DURATION - elapsed < 0) { 500 this.lateTotalTime += elapsed - LOOP_DURATION; 501 } else { 502 long sleeped = 0; 503 while (sleeped < (LOOP_DURATION - elapsed)) { 504 long beginSleep = System.currentTimeMillis(); 505 if (!this.suspended.getBooleanValue()) { 507 synchronized (this.surplusLock) { 508 try { 509 surplusLock.wait(LOOP_DURATION - elapsed); 510 } catch (InterruptedException ex) { 511 throw new IsacRuntimeException( 512 "Unable to sleep the surplus time",ex); 513 } 514 } 515 } 516 sleeped += System.currentTimeMillis() - beginSleep; 517 if (this.stopped.getBooleanValue()) 519 return; 520 this.executeSuspendIfNeeded(); 521 } 522 } 523 } 524 525 529 private void executeSuspendIfNeeded() { 530 log.log(BasicLevel.DEBUG, " ** GET ** execute suspend if needed"); 531 if (this.suspended.getBooleanValue()) { 533 log.log(BasicLevel.DEBUG, " ** GET ** suspend activated !!!!!!!"); 534 long beginSuspend = System.currentTimeMillis(); 536 synchronized (this.activitiesLock) { 537 log.log(BasicLevel.DEBUG, "** GET ** total thread running=" 538 + this.groupDescriptionManager 539 .getTotalNumberThreadRunning() 540 + " threads waiting=" 541 + this.groupDescriptionManager 542 .getTotalNumberThreadWaiting()); 543 log.log(BasicLevel.DEBUG, " ** GET )** AWAKE THREADS IN TIMER"); 545 synchronized (this.timerLock) { 546 this.timerLock.notifyAll(); 547 } 548 while (this.groupDescriptionManager 550 .getTotalNumberThreadRunning() != this.groupDescriptionManager 551 .getTotalNumberThreadWaiting()) { 552 log.log(BasicLevel.DEBUG, 553 " ** GET ** Wait the threads suspend"); 554 try { 555 this.activitiesLock.wait(); 556 } catch (InterruptedException ex) { 557 throw new IsacRuntimeException( 558 "Unable to wait all the thread to be in waiting mode",ex); 559 } 560 } 561 log.log(BasicLevel.DEBUG, " ** GET )** set local suspend"); 562 this.localSuspended = true; 565 log.log(BasicLevel.DEBUG, " ** GET )** notify isac engine"); 566 this.activitiesLock.notifyAll(); 568 } 569 log.log(BasicLevel.DEBUG, " ** GET )** DO SUSPEND"); 570 synchronized (this.scenarioLock) { 572 try { 573 this.scenarioLock.wait(); 574 } catch (InterruptedException ex) { 575 throw new IsacRuntimeException( 576 "Unable to wait during the suspended state in GroupExecutionThread -> " 577 + ex); 578 } 579 } 580 synchronized (this.activitiesLock) { 582 while (this.groupDescriptionManager 583 .getTotalNumberThreadWaiting() != 0) { 584 try { 585 this.activitiesLock.wait(); 586 } catch (InterruptedException ex) { 587 throw new IsacRuntimeException( 588 "Unable to wait all the thread to be in resume mode"); 589 } 590 } 591 this.localSuspended = false; 593 this.activitiesLock.notifyAll(); 595 } 596 this.suspendTotalTime += System.currentTimeMillis() - beginSuspend; 599 } 600 } 602 603 607 610 public boolean isLocalStarted() { 611 return localStarted; 612 } 613 614 617 public boolean isLocalStopped() { 618 return localStopped; 619 } 620 621 624 public boolean isLocalSuspended() { 625 return localSuspended; 626 } 627 } | Popular Tags |