KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > clif > scenario > util > isac > engine > loadprofile > GroupExecutionThread


1 /*
2  * CLIF is a Load Injection Framework
3  * Copyright (C) 2004 France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * CLIF
20  *
21  * Contact: clif@objectweb.org
22  */

23 package org.objectweb.clif.scenario.util.isac.engine.loadprofile;
24
25 import java.util.Enumeration JavaDoc;
26 import java.util.Vector JavaDoc;
27
28 import org.objectweb.clif.datacollector.api.DataCollectorWrite;
29 import org.objectweb.clif.scenario.util.isac.engine.IsacScenarioEngine;
30 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorExecutionThread;
31 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorManager;
32 import org.objectweb.clif.scenario.util.isac.engine.behavior.node.ExecutableNode;
33 import org.objectweb.clif.scenario.util.isac.engine.sessionobject.SessionObjectManager;
34 import org.objectweb.clif.scenario.util.isac.exception.IsacRuntimeException;
35 import org.objectweb.clif.scenario.util.isac.loadprofile.GroupDescription;
36 import org.objectweb.clif.scenario.util.isac.util.BooleanHolder;
37 import org.objectweb.clif.scenario.util.isac.util.SessionObjectHashtable;
38 import org.objectweb.clif.scenario.util.isac.util.IntHolder;
39 import org.objectweb.clif.server.api.BladeInsertResponse;
40 import org.objectweb.util.monolog.api.BasicLevel;
41 import org.objectweb.util.monolog.api.Logger;
42
43 /**
44  * This class will extends the Thread class in order to be runnable, during it
45  * running this class will manage the threads and each injection group. Each
46  * second this class will analyse how many thread, for each group, must be
47  * active and it will start or stop new thread, in order to follow the group
48  * description
49  *
50  * @author JC Meillaud
51  * @author A Peyrard
52  */

53 public class GroupExecutionThread extends Thread JavaDoc {
54     // logger
55
static protected Logger log = IsacScenarioEngine.logger
56             .getLogger(GroupExecutionThread.class.getName());
57     // time definition of a loop
58
private static final int LOOP_DURATION = 1000;
59
60     // attributes
61
// locks...
62
private Object JavaDoc scenarioLock;
63
64     private Object JavaDoc activitiesLock;
65
66     // lock to wait the surplus of time between each loop
67
private Object JavaDoc surplusLock ;
68
69     private Object JavaDoc groupExecutionActivitiesLock;
70
71     private Object JavaDoc dataCollectorWrite_lock;
72
73     // this lock will be used to execute timers on threads
74
private Object JavaDoc timerLock;
75
76     // client interface datacontrolwrite
77
private DataCollectorWrite dataCollectorWrite;
78
79     // client interface bladeInsertResponse
80
private BladeInsertResponse bladeInsertResponse;
81
82     private Object JavaDoc bladeInsertResponse_lock;
83
84     // boolean running states shared with threads and isac component
85
private volatile BooleanHolder started;
86
87     private volatile BooleanHolder stopped;
88
89     private volatile BooleanHolder suspended;
90
91     // some variables to store time informations
92
private long suspendTotalTime;
93
94     private long beginTime;
95
96     private long lateTotalTime;
97
98     private long activitiesTotalTime;
99
100     private int currentSecond;
101
102     // references to the managers
103
private GroupDescriptionManager groupDescriptionManager;
104
105     private BehaviorManager behaviorManager;
106
107     private SessionObjectManager sessionObjectManager;
108
109     // local boolean state, which state are defined when all threads have switch
110
// to this state
111
private boolean localStarted;
112
113     private boolean localStopped;
114
115     private boolean localSuspended;
116
117     private String JavaDoc bladeId;
118
119     /**
120      * Constructor, build a new thread which will manage the launching of
121      * behaviors thread following the groups descriptions
122      *
123      * @param gdm
124      * The group description manager
125      * @param bm
126      * The behavior manager
127      * @param sl
128      * The scenarioLock
129      * @param al
130      * The activitiesLock
131      * @param dcw
132      * The data collector write client interface
133      * @param dcwl
134      * The interface lock
135      * @param bir
136      * The interface bladeinsertresponse
137      * @param birl
138      * This interface lock
139      * @param started
140      * boolean which store state of the clif component
141      * @param stopped
142      * boolean which store state of the clif component
143      * @param suspended
144      * boolean which store state of the clif component
145      */

146     public GroupExecutionThread(String JavaDoc bladeId, GroupDescriptionManager gdm,
147             BehaviorManager bm, SessionObjectManager som, Object JavaDoc sl, Object JavaDoc al,
148             DataCollectorWrite dcw, Object JavaDoc dcwl, BladeInsertResponse bir,
149             Object JavaDoc birl, BooleanHolder started, BooleanHolder stopped,
150             BooleanHolder suspended, Object JavaDoc surpluslock)
151     {
152         this.bladeId = bladeId;
153         this.groupDescriptionManager = gdm;
154         this.behaviorManager = bm;
155         this.sessionObjectManager = som;
156         this.scenarioLock = sl;
157         this.activitiesLock = al;
158         this.dataCollectorWrite = dcw;
159         this.dataCollectorWrite_lock = dcwl;
160         this.bladeInsertResponse = bir;
161         this.bladeInsertResponse_lock = birl;
162         this.started = started;
163         this.stopped = stopped;
164         this.suspended = suspended;
165         // init the local variable
166
this.lateTotalTime = 0;
167         this.beginTime = 0;
168         this.suspendTotalTime = 0;
169         this.currentSecond = 1;
170         this.activitiesTotalTime = 0;
171         // init local boolean state, which state are defined when all threads
172
// have switch to this state
173
this.localStarted = false;
174         this.localStopped = false;
175         this.localSuspended = false;
176         // init the timer lock which will be shared with the threads
177
this.timerLock = new Object JavaDoc();
178         // init the surplus lock
179
this.surplusLock = surpluslock ;
180     }
181
182     /**
183      * @see java.lang.Runnable#run()
184      */

185     public void run() {
186         // get the starting time
187
this.beginTime = System.currentTimeMillis();
188         if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) {
189             log.log(BasicLevel.DEBUG, " ** GET ** beginTime=" + beginTime);
190         }
191
192         // while the stopped mode is not set
193
while (!stopped.getBooleanValue()) {
194             if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) {
195                 log.log(BasicLevel.DEBUG, "******************** SECOND="
196                         + currentSecond + "***************");
197             }
198             // if we are in suspended mode do suspend
199
this.executeSuspendIfNeeded();
200             // get all the group ids which are still running
201
Enumeration JavaDoc groupsIds = this.groupDescriptionManager.getGroupsIds();
202             // check if there is still some groups
203
if (!groupsIds.hasMoreElements()) {
204                 log.log(BasicLevel.DEBUG, "There is no more group in the GDM");
205                 // exit the while loop
206
break;
207             }
208             // for each group
209
while (groupsIds.hasMoreElements()) {
210                 // if we are in suspended mode do suspend
211
this.executeSuspendIfNeeded();
212                 // get the current group id
213
Integer JavaDoc currentId = (Integer JavaDoc) groupsIds.nextElement();
214                 if (IsacScenarioEngine.DEBUG_ON
215                         && log.isLoggable(BasicLevel.DEBUG)) {
216                     log.log(BasicLevel.DEBUG, " ** GET (" + currentId
217                             + ")** Group analyzed=" + currentId);
218                 }
219                 // get the group description
220
GroupDescription currentGroupDescription = this.groupDescriptionManager
221                         .getGroupDescription(currentId);
222                 // get the behaviors which are runnings
223
Vector JavaDoc behaviorsThreads = this.groupDescriptionManager
224                         .getGroupRunningBehaviors(currentId);
225                 // get the stop mode
226
boolean forceThreadStop = currentGroupDescription.isForceStop();
227                 // get the number of thread which need to run, following the
228
// group description
229
long numberOfThreads = currentGroupDescription
230                         .getVirtualUserNumber(currentSecond);
231                 if (IsacScenarioEngine.DEBUG_ON
232                         && log.isLoggable(BasicLevel.DEBUG)) {
233                     log.log(BasicLevel.DEBUG,
234                             " ** GET ** numberOfThreads needed="
235                                     + numberOfThreads);
236                 }
237                 /////////////////////////////////////////////////
238
// if the group has no more ramp description
239
if (numberOfThreads == GroupDescription.END) {
240                     log.log(BasicLevel.DEBUG,
241                             " ** GET ** This group is finished");
242                     // if we need to force the stop of threads
243
if (forceThreadStop) {
244                         // send the local stop to all threads
245

246                         // get the reference of the int, which store the
247
// number of thread needed
248
IntHolder numberOfThreadRequired = this.groupDescriptionManager
249                                 .getNumberThreadRunningRequiredForAGroup(currentId);
250                         synchronized (activitiesLock) {
251                             // set the number of thread required
252
numberOfThreadRequired.setIntValue(0);
253                             // send stop to all thread in this group
254
for (int i = 0; i < behaviorsThreads.size(); i++)
255                                 ((BehaviorExecutionThread) behaviorsThreads
256                                         .elementAt(i)).setLocalStopped(true);
257                             if (behaviorsThreads.size() != 0) {
258                                 // awake the threads which are in timer
259
log.log(BasicLevel.DEBUG,
260                                         " ** GET ** AWAKE THREADS IN TIMER");
261                                 synchronized (this.timerLock) {
262                                     this.timerLock.notifyAll();
263                                 }
264                                 // wait for all stop of the threads
265
log
266                                         .log(BasicLevel.DEBUG,
267                                                 " ** GET ** wait for all stop of the threads");
268                                 try {
269                                     this.activitiesLock.wait();
270                                 } catch (InterruptedException JavaDoc ex) {
271                                     throw new IsacRuntimeException(
272                                             "Unable to wait the stop of threads for the group",ex);
273                                 }
274                             }
275                         }
276                         // remove the group description from the manager,
277
// because there is no more running threads,
278
// and no more ramps
279
this.groupDescriptionManager.removeGroup(currentId);
280                         log
281                                 .log(BasicLevel.DEBUG,
282                                         " ** GET ** group description removed in the GDM");
283                     } else {
284                         // analyse if there is still running threads, else
285
// remove the group
286
synchronized (activitiesLock) {
287                             int numberOfThreadsRunning = behaviorsThreads
288                                     .size();
289                             if (IsacScenarioEngine.DEBUG_ON
290                                     && log.isLoggable(BasicLevel.DEBUG)) {
291                                 log.log(BasicLevel.DEBUG,
292                                         "Thread still running : "
293                                                 + numberOfThreadsRunning);
294                             }
295                             if (numberOfThreadsRunning == 0) {
296                                 // remove the group description from the
297
// manager, because there is no more running
298
// threads,
299
// and no more ramps
300
this.groupDescriptionManager
301                                         .removeGroup(currentId);
302                             }
303                         }
304                     }
305                     // all the processings for this group have been done, so
306
// switch to the next one
307
continue;
308                 }
309                 ///////////////////////////////////////////////////////////
310
// If we reach this line, we have still running threads and
311
// ramps descriptions
312
// compare the number of thread running and ajust the threads
313
// running
314
synchronized (activitiesLock) {
315                     // get number of threads running
316
int numberOfThreadsRunning = behaviorsThreads.size();
317                     if (IsacScenarioEngine.DEBUG_ON
318                             && log.isLoggable(BasicLevel.DEBUG)) {
319                         log.log(BasicLevel.DEBUG, " ** GET ** Thread Running="
320                                 + numberOfThreadsRunning);
321                     }
322                     // first case the number are equals
323
if (numberOfThreadsRunning == numberOfThreads) {
324                         // we have nothing to do, so switch to the next group
325
log.log(BasicLevel.DEBUG,
326                                 " ** GET ** we have the right number thread");
327                         continue;
328                     }
329                     // in this case there is less threads than it is define in
330
// group description
331
else if (numberOfThreadsRunning < numberOfThreads) {
332                         log.log(BasicLevel.DEBUG,
333                                 " ** GET ** we need to add some threads");
334                         // get the number of thread waiting of this group,
335
// refrence shared between all threads
336
IntHolder numberOfWaiting = this.groupDescriptionManager
337                                 .getNumberThreadWaitingForAGroup(currentId);
338
339                         // get the behaviorId for this group
340
String JavaDoc behaviorId = currentGroupDescription
341                                 .getBehaviorId();
342                         // get the executable node representing the behavior
343
ExecutableNode executableNode = this.behaviorManager
344                                 .getBehavior(behaviorId);
345                         // get references which will be shared between group and
346
// threads
347
IntHolder numberOfThreadRequired = this.groupDescriptionManager
348                                 .getNumberThreadRunningRequiredForAGroup(currentId);
349                         // launch the missing threads
350
for (int i = 0; i < numberOfThreads
351                                 - numberOfThreadsRunning; i++) {
352                             // get a newly created table containing the sessions
353
SessionObjectHashtable clonedSessionsObjects = (SessionObjectHashtable) (this.sessionObjectManager
354                                     .getSessionObjectsForABehavior(behaviorId))
355                                     .clone();
356                             // init a new thread
357
BehaviorExecutionThread thread = new BehaviorExecutionThread(
358                                     bladeId,
359                                     numberOfThreadsRunning + i, executableNode,
360                                     clonedSessionsObjects, scenarioLock,
361                                     activitiesLock, timerLock,
362                                     this.dataCollectorWrite,
363                                     this.dataCollectorWrite_lock, started,
364                                     stopped, suspended, behaviorsThreads,
365                                     numberOfWaiting, numberOfThreadRequired);
366                             // store the thread in behaviorsThreads vector
367
behaviorsThreads.add(thread);
368                             // launch the thread
369
thread.start();
370                         }
371                     }
372                     // in this case we need to stop threads or leave thread
373
// ending
374
else {
375                         log.log(BasicLevel.DEBUG,
376                                 " ** GET ** we need to stop some threads");
377                         // analyze the force thread stop boolean
378
if (forceThreadStop) {
379                             // get the reference of the int, which store the
380
// number of thread needed
381
IntHolder numberOfThreadRequired = this.groupDescriptionManager
382                                     .getNumberThreadRunningRequiredForAGroup(currentId);
383                             // set the number of thread required
384
numberOfThreadRequired
385                                     .setIntValue((int) numberOfThreads);
386                             // in this case we will stop as much thread as the
387
// difference
388
for (int i = 0; i < numberOfThreadsRunning
389                                     - numberOfThreads; i++) {
390                                 // stop the thread which are in the begining of
391
// the vector
392
((BehaviorExecutionThread) behaviorsThreads
393                                         .elementAt(i)).setLocalStopped(true);
394                             }
395                             if (behaviorsThreads.size() != numberOfThreadRequired
396                                     .getIntValue()) {
397                                 // awake the threads which are in timer
398
log.log(BasicLevel.DEBUG,
399                                         " ** GET ** AWAKE THREADS IN TIMER");
400                                 synchronized (this.timerLock) {
401                                     this.timerLock.notifyAll();
402                                 }
403                                 // wait the stop of all stopping threads
404
log
405                                         .log(BasicLevel.DEBUG,
406                                                 " ** GET ** wait for all stop of the threads");
407                                 try {
408                                     this.activitiesLock.wait();
409                                 } catch (InterruptedException JavaDoc ex) {
410                                     throw new IsacRuntimeException(
411                                             "Unable to wait the ending of the surplus of running threads",ex);
412                                 }
413                             }
414                         } else {
415                             log
416                                     .log(BasicLevel.DEBUG,
417                                             " ** GET ** leave the threads running because");
418                             // in this case we will leave the threads running
419
continue;
420                         }
421                     }
422                 } // synchronized
423
} // while groups.hasMoreElements
424
// TODO sleep during the rest of time of the second
425
this.sleepSurplusTime();
426             this.currentSecond++;
427         } // while !stopped
428
// test if we are in stopped mode
429
if (stopped.getBooleanValue()) {
430             // awake the threads which are in timer
431
log.log(BasicLevel.DEBUG, " ** GET ** AWAKE THREADS IN TIMER");
432             synchronized (this.timerLock) {
433                 this.timerLock.notifyAll();
434             }
435             synchronized (activitiesLock) {
436                 while (this.groupDescriptionManager
437                         .getTotalNumberThreadRunning() != 0) {
438                     if (IsacScenarioEngine.DEBUG_ON
439                             && log.isLoggable(BasicLevel.DEBUG)) {
440                         log.log(BasicLevel.DEBUG,
441                                 " ** GET ** wait for all stop of the threads still running="
442                                         + this.groupDescriptionManager
443                                                 .getTotalNumberThreadRunning());
444                     }
445                     try {
446                         this.activitiesLock.wait();
447                     } catch (InterruptedException JavaDoc ex) {
448                         throw new IsacRuntimeException(
449                                 "Unable to wait for the stop of all threads",ex);
450                     }
451                 }
452             }
453         }
454         // we finish the execution by ourself, so we could send a completed
455
// message
456
else {
457             log.log(BasicLevel.DEBUG, " ** GET ** send completed msg");
458             // send a completed message to the right CLIF component
459
synchronized (bladeInsertResponse_lock) {
460                 if (bladeInsertResponse != null) {
461                     bladeInsertResponse.completed();
462                 }
463             }
464         }
465         // XXX
466
long endTime = System.currentTimeMillis();
467         log.log(BasicLevel.WARN,"** EXECUTION FINISHED : ");
468         log.log(BasicLevel.WARN,"** currentSecond=" + currentSecond);
469         if (stopped.getBooleanValue()) {
470             log.log(BasicLevel.WARN,"** application was forced to stop");
471         } else {
472             log.log(BasicLevel.WARN,"** application finish by itself");
473         }
474         log.log(BasicLevel.WARN,"** REAL DURATION OF THE TEST="
475                 + (this.beginTime - endTime));
476         log.log(BasicLevel.WARN,"** TOTAL SUSPENDED TIME=" + this.suspendTotalTime);
477         log.log(BasicLevel.WARN,"** REAL ACTIVITIES TIME="
478                 + this.activitiesTotalTime);
479         log.log(BasicLevel.WARN,"** TOTAL LATE TIME=" + this.lateTotalTime);
480     }
481
482     /**
483      * This fonction calculate the elapsed time since the last loop in the big
484      * while each loop must during 1000ms, so wait the rest of time
485      */

486     private void sleepSurplusTime() {
487         long time = System.currentTimeMillis();
488         long elapsed = time - beginTime - (currentSecond - 1) * LOOP_DURATION
489                 - suspendTotalTime - lateTotalTime;
490         // if (IsacScenarioEngine.DEBUG_ON &&
491
// log.isLoggable(BasicLevel.DEBUG))
492
// {
493
if (log.isLoggable(BasicLevel.WARN))
494             log.log(BasicLevel.WARN, " ** GET ** elapsed time=" + elapsed);
495         // }
496

497         this.activitiesTotalTime += elapsed;
498         // if we are in late don't do any sleep
499
if (LOOP_DURATION - elapsed < 0) {
500             this.lateTotalTime += elapsed - LOOP_DURATION;
501         } else {
502             long sleeped = 0;
503             while (sleeped < (LOOP_DURATION - elapsed)) {
504                 long beginSleep = System.currentTimeMillis();
505                 // check if the suspended state have been put
506
if (!this.suspended.getBooleanValue()) {
507                     synchronized (this.surplusLock) {
508                         try {
509                             surplusLock.wait(LOOP_DURATION - elapsed);
510                         } catch (InterruptedException JavaDoc ex) {
511                             throw new IsacRuntimeException(
512                                     "Unable to sleep the surplus time",ex);
513                         }
514                     }
515                 }
516                 sleeped += System.currentTimeMillis() - beginSleep;
517                 // test if we are in stopped state
518
if (this.stopped.getBooleanValue())
519                     return;
520                 this.executeSuspendIfNeeded();
521             }
522         }
523     }
524
525     /**
526      * This method analyse if we are in suspended state and wait during the
527      * suspended state if we are
528      */

529     private void executeSuspendIfNeeded() {
530         log.log(BasicLevel.DEBUG, " ** GET ** execute suspend if needed");
531         // synchronized (this.scenarioLock) {
532
if (this.suspended.getBooleanValue()) {
533             log.log(BasicLevel.DEBUG, " ** GET ** suspend activated !!!!!!!");
534             // we will store the begining time of the suspend
535
long beginSuspend = System.currentTimeMillis();
536             synchronized (this.activitiesLock) {
537                 log.log(BasicLevel.DEBUG, "** GET ** total thread running="
538                         + this.groupDescriptionManager
539                                 .getTotalNumberThreadRunning()
540                         + " threads waiting="
541                         + this.groupDescriptionManager
542                                 .getTotalNumberThreadWaiting());
543                 // awake the threads which are in timer
544
log.log(BasicLevel.DEBUG, " ** GET )** AWAKE THREADS IN TIMER");
545                 synchronized (this.timerLock) {
546                     this.timerLock.notifyAll();
547                 }
548                 // wait the all the thread have put in suspended mode
549
while (this.groupDescriptionManager
550                         .getTotalNumberThreadRunning() != this.groupDescriptionManager
551                         .getTotalNumberThreadWaiting()) {
552                     log.log(BasicLevel.DEBUG,
553                             " ** GET ** Wait the threads suspend");
554                     try {
555                         this.activitiesLock.wait();
556                     } catch (InterruptedException JavaDoc ex) {
557                         throw new IsacRuntimeException(
558                                 "Unable to wait all the thread to be in waiting mode",ex);
559                     }
560                 }
561                 log.log(BasicLevel.DEBUG, " ** GET )** set local suspend");
562                 // now put the suspended local state, because all
563
// threads have been put in this state
564
this.localSuspended = true;
565                 log.log(BasicLevel.DEBUG, " ** GET )** notify isac engine");
566                 // awake the isac component
567
this.activitiesLock.notifyAll();
568             }
569             log.log(BasicLevel.DEBUG, " ** GET )** DO SUSPEND");
570             // now we could switch to suspend mode
571
synchronized (this.scenarioLock) {
572                 try {
573                     this.scenarioLock.wait();
574                 } catch (InterruptedException JavaDoc ex) {
575                     throw new IsacRuntimeException(
576                             "Unable to wait during the suspended state in GroupExecutionThread -> "
577                                     + ex);
578                 }
579             }
580             // now wait the resume of all threads
581
synchronized (this.activitiesLock) {
582                 while (this.groupDescriptionManager
583                         .getTotalNumberThreadWaiting() != 0) {
584                     try {
585                         this.activitiesLock.wait();
586                     } catch (InterruptedException JavaDoc ex) {
587                         throw new IsacRuntimeException(
588                                 "Unable to wait all the thread to be in resume mode");
589                     }
590                 }
591                 // now put the resume state
592
this.localSuspended = false;
593                 // awake the isac component
594
this.activitiesLock.notifyAll();
595             }
596             // add this suspend time to the variable which store the
597
// total suspend time
598
this.suspendTotalTime += System.currentTimeMillis() - beginSuspend;
599         }
600         // }
601
}
602
603     /////////////////////////////////
604
// Local sates attributes
605
/////////////////////////////////
606

607     /**
608      * @return Returns the localStarted.
609      */

610     public boolean isLocalStarted() {
611         return localStarted;
612     }
613
614     /**
615      * @return Returns the localStopped.
616      */

617     public boolean isLocalStopped() {
618         return localStopped;
619     }
620
621     /**
622      * @return Returns the localSuspended.
623      */

624     public boolean isLocalSuspended() {
625         return localSuspended;
626     }
627 }
Popular Tags