1 23 package org.objectweb.clif.scenario.util.isac.engine.behavior; 24 25 import java.util.Vector ; 26 27 import org.objectweb.clif.datacollector.api.DataCollectorWrite; 28 import org.objectweb.clif.scenario.util.isac.engine.IsacScenarioEngine; 29 import org.objectweb.clif.scenario.util.isac.engine.behavior.node.ExecutableNode; 30 import org.objectweb.clif.scenario.util.isac.engine.loadprofile.GroupDescriptionManager; 31 import org.objectweb.clif.scenario.util.isac.engine.sessionobject.SessionObjectManager; 32 import org.objectweb.clif.scenario.util.isac.exception.IsacRuntimeException; 33 import org.objectweb.clif.scenario.util.isac.loadprofile.GroupDescription; 34 import org.objectweb.clif.scenario.util.isac.util.BooleanHolder; 35 import org.objectweb.clif.scenario.util.isac.util.SessionObjectHashtable; 36 import org.objectweb.clif.scenario.util.isac.util.IntHolder; 37 import org.objectweb.util.monolog.api.BasicLevel; 38 import org.objectweb.util.monolog.api.Logger; 39 40 47 public class BehaviorsPool { 48 static Logger log = IsacScenarioEngine.logger.getLogger(BehaviorsPool.class 50 .getName()); 51 52 56 58 private Integer groupId; 60 61 private GroupDescriptionManager groupDescriptionManager; 63 64 private BehaviorManager behaviorManager; 65 66 private SessionObjectManager sessionObjectManager; 67 68 private boolean forceStopThread; 69 70 private int size; 73 74 private Thread [] poolThread; 76 77 79 private volatile BooleanHolder stopped; 81 82 private volatile BooleanHolder suspended; 83 84 private Object suspendLock; 86 87 private Object groupExecutionLock; 89 90 private DataCollectorWrite dataCollectorWrite; 92 93 private Object dataCollectorWrite_lock; 95 96 98 private Object activitiesLock; 100 101 private IntHolder totalSuspendedBehaviors; 104 105 private IntHolder behaviorsRequiredNumber; 108 109 private Vector behaviorsRunning; 111 112 private Object poolLock; 116 117 private Object timerLock; 119 120 private IntHolder behaviorsThreadsAlive; 122 123 private String bladeId; 124 125 151 public BehaviorsPool(String bladeId, Integer groupId, 152 GroupDescriptionManager groupDescriptionManager, 153 BehaviorManager behaviorManager, 154 SessionObjectManager sessionObjectManager, BooleanHolder stopped, 155 BooleanHolder suspended, Object suspendLock, 156 Object groupExecutionLock, DataCollectorWrite dcw, Object dcwl, 157 Object tl) 158 { 159 this.bladeId = bladeId; 160 this.groupId = groupId; 161 this.groupDescriptionManager = groupDescriptionManager; 162 this.behaviorManager = behaviorManager; 163 this.sessionObjectManager = sessionObjectManager; 164 this.stopped = stopped; 165 this.suspended = suspended; 166 this.suspendLock = suspendLock; 167 this.groupExecutionLock = groupExecutionLock; 168 this.dataCollectorWrite = dcw; 169 this.dataCollectorWrite_lock = dcwl; 170 this.timerLock = tl; 171 } 172 173 177 public void initializeBehaviorsThreads() { 178 this.poolLock = new Object (); 180 this.activitiesLock = new Object (); 182 this.totalSuspendedBehaviors = this.groupDescriptionManager 184 .getNumberThreadWaitingForAGroup(groupId); 185 this.behaviorsRunning = this.groupDescriptionManager 186 .getGroupRunningBehaviors(groupId); 187 this.behaviorsRequiredNumber = this.groupDescriptionManager 188 .getNumberThreadRunningRequiredForAGroup(groupId); 189 this.size = this.groupDescriptionManager 191 .getMaximumBehaviorsForAGroup(this.groupId); 192 this.poolThread = new Thread [this.size]; 194 String behaviorId = this.groupDescriptionManager.getGroupDescription( 197 this.groupId).getBehaviorId(); 198 this.forceStopThread = this.groupDescriptionManager 200 .getGroupDescription(this.groupId).isForceStop(); 201 ExecutableNode executableNode = this.behaviorManager 203 .getBehavior(behaviorId); 204 this.behaviorsThreadsAlive = new IntHolder(0); 205 for (int i = 0; i < this.size; i++) { 207 SessionObjectHashtable clonedSessionsObjects = (SessionObjectHashtable) (this.sessionObjectManager 209 .getSessionObjectsForABehavior(behaviorId)).clone(); 210 poolThread[i] = new BehaviorExecutionPooledThread(bladeId, i, 212 executableNode, clonedSessionsObjects, this.suspendLock, 213 this.poolLock, this.timerLock, this.activitiesLock, 214 this.groupExecutionLock, this.dataCollectorWrite, 215 this.dataCollectorWrite_lock, this.stopped, this.suspended, 216 this.behaviorsRunning, this.totalSuspendedBehaviors, 217 this.behaviorsRequiredNumber, this.behaviorsThreadsAlive); 218 poolThread[i].setDaemon(true); 219 poolThread[i].start(); 220 } 221 } 222 223 230 public boolean setNumberOfThreadsRunning(int number) { 231 int behaviorsRunningSize = 0; 233 boolean haveFinished = false; 235 synchronized (this.poolLock) { 236 behaviorsRunningSize = this.behaviorsRunning.size(); 238 log.log(BasicLevel.INFO,"nb-THREADS="+behaviorsRunningSize) ; 239 if (number == GroupDescription.END) { 241 number = 0 ; 243 haveFinished = true; 245 } 246 if (behaviorsRunningSize < number) { 248 this.behaviorsRequiredNumber.setIntValue(number 250 - behaviorsRunningSize); 251 if (IsacScenarioEngine.DEBUG_ON) { 252 log.log(BasicLevel.DEBUG, " *POOL* Add=" 253 + (number - behaviorsRunningSize)); 254 } 255 } else if (behaviorsRunningSize > number) { 256 if (this.forceStopThread) { 257 int numberOfThreadToStop = behaviorsRunningSize - number; 261 this.behaviorsRequiredNumber 263 .setIntValue(numberOfThreadToStop); 264 if (IsacScenarioEngine.DEBUG_ON) { 265 log.log(BasicLevel.DEBUG, " *POOL* Stop=" 266 + numberOfThreadToStop); 267 } 268 for (int i = 0; i < numberOfThreadToStop; i++) { 270 ((BehaviorExecutionPooledThread) this.behaviorsRunning 271 .elementAt(i)).setLocalStopped(true); 272 } 273 synchronized (timerLock) { 277 this.timerLock.notifyAll(); 278 } 279 log.log(BasicLevel.DEBUG, 280 " *POOL* Wait for stopped thread"); 281 if (this.behaviorsRequiredNumber.getIntValue() != 0) { 283 try { 284 this.poolLock.wait(); 285 } catch (InterruptedException ex) { 286 throw new IsacRuntimeException( 287 "Unable to wait the stop of surplus threads",ex); 288 } 289 } 290 log.log(BasicLevel.DEBUG, 291 " *POOL* All thread have been stopped !!!!"); 292 } else { 293 log 294 .log(BasicLevel.WARN, " * POOL * Still " 295 + this.behaviorsRunning.size() 296 + " threads running"); 297 haveFinished = false ; 299 } 300 } 301 } 302 303 if (behaviorsRunningSize == number) { 306 log.log(BasicLevel.DEBUG, 308 " *POOL* We have the right number of thread"); 309 } else 310 if (behaviorsRunningSize < number) { 312 for (int i = behaviorsRunningSize; i < number; i++) { 314 synchronized (this.activitiesLock) { 315 this.activitiesLock.notify(); 316 } 317 } 318 synchronized (this.poolLock) { 319 if (this.behaviorsRequiredNumber.getIntValue() != 0) { 320 log.log(BasicLevel.DEBUG, " *POOL* Wait thread added"); 322 try { 323 this.poolLock.wait(); 324 } catch (InterruptedException ex) { 325 throw new IsacRuntimeException( 326 "Unable to wait the launch of requied threads"); 327 } 328 } 329 } 330 log.log(BasicLevel.DEBUG, 331 " *POOL* All thread have been added !!!!"); 332 } 333 335 return haveFinished ; 337 } 338 339 342 public void waitForSuspendState() { 343 synchronized (this.poolLock) { 344 if (this.behaviorsRunning.size() != this.totalSuspendedBehaviors 345 .getIntValue()) { 346 try { 347 this.poolLock.wait(); 348 } catch (InterruptedException iex) { 349 throw new IsacRuntimeException( 350 "Unable to wait the suspend state",iex); 351 } 352 } 353 } 354 } 355 356 359 public void waitForResumeState() { 360 synchronized (this.poolLock) { 361 if (this.totalSuspendedBehaviors.getIntValue() != 0) { 362 try { 363 this.poolLock.wait(); 364 } catch (InterruptedException iex) { 365 throw new IsacRuntimeException( 366 "Unable to wait the resume state",iex); 367 } 368 } 369 } 370 } 371 372 377 public int getNumberOfThreadAlive() { 378 return this.behaviorsThreadsAlive.getIntValue(); 379 } 380 381 384 public void awakeAllThreads() { 385 synchronized (this.activitiesLock) { 386 this.activitiesLock.notifyAll(); 387 } 388 } 389 } | Popular Tags |