1 24 25 package org.objectweb.clif.scenario.util.multithread; 26 27 import org.objectweb.clif.datacollector.api.DataCollectorWrite; 28 import org.objectweb.clif.server.api.BladeInsertResponse; 29 import org.objectweb.clif.server.api.BladeControl; 30 import org.objectweb.clif.storage.api.ActionEvent; 31 import org.objectweb.clif.supervisor.api.ClifException; 32 import org.objectweb.clif.util.ClifClassLoader; 33 import org.objectweb.fractal.api.control.BindingController; 34 import org.objectweb.fractal.api.control.LifeCycleController; 35 import java.io.Serializable ; 36 import java.util.StringTokenizer ; 37 import java.util.NoSuchElementException ; 38 import java.util.Random ; 39 40 56 public abstract class MTScenario 57 implements BladeControl, BindingController, LifeCycleController 58 { 59 static private final String [] interfaceNames = new String [] { 60 DataCollectorWrite.DATA_COLLECTOR_WRITE, 61 BladeInsertResponse.BLADE_INSERT_RESPONSE }; 62 protected Serializable testId; 63 protected String scenarioId; 64 private Object scenario_lock = new Object (); 65 private Object activities_lock = new Object (); 66 private BladeInsertResponse sr; 67 private Object sr_lock = new Object (); 68 private DataCollectorWrite dcw; 69 private Object dc_lock = new Object (); 70 private Activity[] threads; 71 private int arg_thread_nb = 0; 72 private int arg_duration_s = 0; 73 private int arg_rampup_duration_ms = 0; 74 private String sessionArg = null; 75 private volatile int thr_remaining = 0; 76 private volatile int thr_waiting = 0; 77 private volatile boolean started; 78 private volatile boolean suspended; 79 private volatile boolean stopped; 80 private StopTimer timer = null; 81 private String fcState = LifeCycleController.STOPPED; 82 private Random random = new Random (); 83 84 85 public MTScenario() 86 { 87 } 88 89 90 abstract public MTScenarioSession newSession(int sessionId, String arg) throws ClifException; 91 92 93 97 98 public void startFc() 99 { 100 fcState = LifeCycleController.STARTED; 101 } 102 103 104 public void stopFc() 105 { 106 if (timer != null) { 108 stop(); 109 } 110 fcState = LifeCycleController.STOPPED; 111 } 112 113 114 public String getFcState() 115 { 116 return fcState; 117 } 118 119 120 124 125 public Object lookupFc(String clientItfName) 126 { 127 if (clientItfName.equals(DataCollectorWrite.DATA_COLLECTOR_WRITE)) 128 { 129 return dcw; 130 } 131 else if (clientItfName.equals(BladeInsertResponse.BLADE_INSERT_RESPONSE)) 132 { 133 return sr; 134 } 135 else 136 { 137 return null; 138 } 139 } 140 141 142 public void bindFc(String clientItfName, Object serverItf) 143 { 144 if (clientItfName.equals(DataCollectorWrite.DATA_COLLECTOR_WRITE)) 145 { 146 synchronized (dc_lock) 147 { 148 dcw = (DataCollectorWrite) serverItf; 149 } 150 } 151 else if (clientItfName.equals(BladeInsertResponse.BLADE_INSERT_RESPONSE)) 152 { 153 synchronized (sr_lock) 154 { 155 sr = (BladeInsertResponse) serverItf; 156 } 157 } 158 } 159 160 161 public void unbindFc(String clientItfName) 162 { 163 if (clientItfName.equals(DataCollectorWrite.DATA_COLLECTOR_WRITE)) 164 { 165 synchronized (dc_lock) 166 { 167 dcw = null; 168 } 169 } 170 else if (clientItfName.equals(BladeInsertResponse.BLADE_INSERT_RESPONSE)) 171 { 172 synchronized (sr_lock) 173 { 174 sr = null; 175 } 176 } 177 } 178 179 180 public String [] listFc() 181 { 182 return interfaceNames; 183 } 184 185 186 190 191 196 public void init(Serializable testId) 197 throws ClifException 198 { 199 synchronized(scenario_lock) 200 { 201 this.testId = testId; 202 started = stopped = suspended = false; 203 threads = new Activity[arg_thread_nb]; 204 thr_waiting = 0; 205 Thread.currentThread().setContextClassLoader(ClifClassLoader.getClassLoader()); 206 for (thr_remaining = 0 ; thr_remaining < arg_thread_nb ; ++thr_remaining) 207 { 208 threads[thr_remaining] = new Activity(thr_remaining, newSession(thr_remaining, sessionArg)); 209 threads[thr_remaining].start(); 210 } 211 synchronized (activities_lock) 212 { 213 if (thr_waiting != thr_remaining) 214 { 215 try 216 { 217 activities_lock.wait(); 218 } 219 catch (InterruptedException ex) 220 { 221 ex.printStackTrace(System.err); 222 } 223 } 224 } 225 timer = new StopTimer(arg_duration_s); 226 } 227 } 228 229 230 233 public void start() 234 { 235 synchronized (scenario_lock) 236 { 237 started = true; 238 scenario_lock.notifyAll(); 239 } 240 synchronized (activities_lock) 241 { 242 if (thr_waiting != 0) 243 { 244 try 245 { 246 activities_lock.wait(); 247 } 248 catch (InterruptedException ex) 249 { 250 ex.printStackTrace(System.err); 251 } 252 } 253 } 254 timer.start(); 255 } 256 257 258 public void stop() 259 { 260 synchronized (scenario_lock) 261 { 262 stopped = true; 263 if (! started) 264 { 265 scenario_lock.notifyAll(); 266 } 267 } 268 if (suspended) 269 { 270 resume(); 271 } 272 else if (Thread.currentThread() != timer) 273 { 274 synchronized (timer) 275 { 276 timer.interrupt(); 277 } 278 } 279 synchronized (activities_lock) 280 { 281 while (thr_remaining != 0) 282 { 283 try 284 { 285 activities_lock.wait(); 286 } 287 catch (InterruptedException ex) 288 { 289 ex.printStackTrace(System.err); 290 } 291 } 292 } 293 } 294 295 296 public void suspend() 297 { 298 synchronized (scenario_lock) 299 { 300 suspended = true; 301 } 302 synchronized (activities_lock) 303 { 304 if (thr_waiting != thr_remaining) 305 { 306 try 307 { 308 activities_lock.wait(); 309 } 310 catch (InterruptedException ex) 311 { 312 ex.printStackTrace(System.err); 313 } 314 } 315 } 316 synchronized (timer) 317 { 318 timer.interrupt(); 319 } 320 } 321 322 323 public void resume() 324 { 325 synchronized (scenario_lock) 326 { 327 suspended = false; 328 scenario_lock.notifyAll(); 329 } 330 synchronized (activities_lock) 331 { 332 if (thr_waiting != 0) 333 { 334 try 335 { 336 activities_lock.wait(); 337 } 338 catch (InterruptedException ex) 339 { 340 ex.printStackTrace(System.err); 341 } 342 } 343 } 344 synchronized(timer) 345 { 346 timer.notify(); 347 } 348 } 349 350 351 public void join() 352 { 353 synchronized (activities_lock) 354 { 355 if (thr_remaining != 0) 356 { 357 try 358 { 359 activities_lock.wait(); 360 } 361 catch (InterruptedException ex) 362 { 363 ex.printStackTrace(System.err); 364 } 365 } 366 } 367 } 368 369 370 377 public void setArgument(String arg) 378 { 379 StringTokenizer parser = new StringTokenizer (arg); 380 try 381 { 382 arg_thread_nb = Integer.parseInt(parser.nextToken()); 383 arg_duration_s = Integer.parseInt(parser.nextToken()); 384 arg_rampup_duration_ms = Integer.parseInt(parser.nextToken()) * 1000; 385 try 386 { 387 sessionArg = parser.nextToken(""); 388 } 389 catch (NoSuchElementException ex) 390 { 391 } 392 } 393 catch (Exception ex) 394 { 395 System.err.println("MTScenario expects 2 arguments: <number_of_threads> <test duration in s>"); 396 } 397 } 398 399 400 403 public void setId(String id) 404 { 405 scenarioId = id; 406 } 407 408 409 412 public String getId() 413 { 414 return scenarioId; 415 } 416 417 418 422 423 class StopTimer extends Thread 424 { 425 long delay; 426 427 public StopTimer(int delay_s) 428 { 429 super("MTScenario stop timer " + delay_s + "s"); 430 delay = delay_s*1000; 431 } 432 433 public void run() 434 { 435 long ellapsed_time = 0; 436 while (ellapsed_time < delay && ! stopped) 437 { 438 long start_time = System.currentTimeMillis(); 439 try 440 { 441 sleep(delay - ellapsed_time); 442 ellapsed_time = delay; 443 } 444 catch (InterruptedException ex) 445 { 446 ellapsed_time += System.currentTimeMillis() - start_time; 447 synchronized(this) 448 { 449 if (suspended) 450 { 451 try 452 { 453 wait(); 454 } 455 catch (InterruptedException exc) 456 { 457 exc.printStackTrace(System.err); 458 } 459 } 460 } 461 } 462 } 463 if (! stopped) 464 { 465 MTScenario.this.stop(); 466 synchronized (sr_lock) 467 { 468 if (sr != null) 469 { 470 sr.completed(); 471 } 472 } 473 } 474 } 475 } 476 477 478 482 483 class Activity extends Thread 484 { 485 int sessionId; 486 MTScenarioSession session; 487 long iteration = 0; 488 489 490 public Activity(int sessionId, MTScenarioSession session) 491 { 492 super(session + " MTScenarioSession #" + sessionId); 493 this.sessionId = sessionId; 494 this.session = session; 495 } 496 497 498 public void run() 499 { 500 synchronized (activities_lock) 501 { 502 if (++thr_waiting == thr_remaining) 503 { 504 activities_lock.notify(); 505 } 506 } 507 synchronized (scenario_lock) 508 { 509 if (! started && ! stopped) 510 { 511 try 512 { 513 scenario_lock.wait(); 514 } 515 catch (InterruptedException ex) 516 { 517 ex.printStackTrace(System.err); 518 } 519 } 520 } 521 try 522 { 523 sleep(random.nextInt(arg_rampup_duration_ms)); 524 } 525 catch (InterruptedException ex) 526 { 527 } 528 synchronized (activities_lock) 529 { 530 if (--thr_waiting == 0) 531 { 532 activities_lock.notify(); 533 } 534 } 535 while (! stopped) 536 { 537 action(); 538 synchronized (scenario_lock) 539 { 540 if (suspended) 541 { 542 synchronized (activities_lock) 543 { 544 if (++thr_waiting == thr_remaining) 545 { 546 activities_lock.notify(); 547 } 548 } 549 try 550 { 551 scenario_lock.wait(); 552 } 553 catch (InterruptedException ex) 554 { 555 ex.printStackTrace(System.err); 556 } 557 synchronized (activities_lock) 558 { 559 if (--thr_waiting == 0) 560 { 561 activities_lock.notify(); 562 } 563 } 564 } 565 } 566 } 567 synchronized (activities_lock) 568 { 569 if (--thr_remaining == 0) 570 { 571 activities_lock.notify(); 572 } 573 } 574 } 575 576 577 void action() 578 { 579 ActionEvent report = session.action(new ActionEvent( 580 System.currentTimeMillis(), 581 scenarioId, 582 null, 583 iteration++, 584 sessionId, 585 true, 586 0, 587 null, 588 "")); 589 synchronized (dc_lock) 590 { 591 if (dcw != null) 592 { 593 dcw.add(report); 594 } 595 } 596 } 597 } 598 } 599 | Popular Tags |