KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > clif > scenario > util > isac > engine > loadprofile > GroupExecutionWithPoolThread


1 /*
2  * CLIF is a Load Injection Framework
3  * Copyright (C) 2004 France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * CLIF
20  *
21  * Contact: clif@objectweb.org
22  */

23 package org.objectweb.clif.scenario.util.isac.engine.loadprofile;
24
25 import java.util.Enumeration JavaDoc;
26 import java.util.Hashtable JavaDoc;
27
28 import org.objectweb.clif.datacollector.api.DataCollectorWrite;
29 import org.objectweb.clif.scenario.util.isac.engine.IsacScenarioEngine;
30 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorManager;
31 import org.objectweb.clif.scenario.util.isac.engine.behavior.BehaviorsPool;
32 import org.objectweb.clif.scenario.util.isac.engine.sessionobject.SessionObjectManager;
33 import org.objectweb.clif.scenario.util.isac.exception.IsacRuntimeException;
34 import org.objectweb.clif.scenario.util.isac.loadprofile.GroupDescription;
35 import org.objectweb.clif.scenario.util.isac.util.BooleanHolder;
36 import org.objectweb.clif.server.api.BladeInsertResponse;
37 import org.objectweb.util.monolog.api.BasicLevel;
38 import org.objectweb.util.monolog.api.Logger;
39
40 /**
41  * This class has the same functionnality than GroupExecutionThread, but it use
42  * pooled behaviors threads
43  *
44  * This class will extends the Thread class in order to be runnable, during it
45  * running this class will manage the threads and each injection group. Each
46  * second this class will analyse how many thread, for each group, must be
47  * active and it will start or stop new thread, in order to follow the group
48  * description
49  *
50  * @author JC Meillaud
51  * @author A Peyrard
52  */

53 public class GroupExecutionWithPoolThread extends Thread JavaDoc {
54     // logger
55
static protected Logger log = IsacScenarioEngine.logger
56             .getLogger(GroupExecutionWithPoolThread.class.getName());
57
58     // loop duration time
59
private static final int LOOP_DURATION=1000;
60
61     // attributes
62
// locks...
63
private Object JavaDoc suspendLock;
64
65     private Object JavaDoc groupExecutionLock;
66
67     // lock to be awake then component execute some methods when we are in
68
// surplus time wait
69
private Object JavaDoc surplusLock;
70
71     // lock used to notify component
72
private Object JavaDoc componentLock;
73
74     private Object JavaDoc dataCollectorWrite_lock;
75
76     // this lock will be used to execute timers on threads
77
private Object JavaDoc timerLock;
78
79     // client interface datacontrolwrite
80
private DataCollectorWrite dataCollectorWrite;
81
82     // client interface bladeInsertResponse
83
private BladeInsertResponse bladeInsertResponse;
84
85     private Object JavaDoc bladeInsertResponse_lock;
86
87     // boolean running states shared with threads and isac component
88
private volatile BooleanHolder stopped;
89
90     private volatile BooleanHolder suspended;
91
92     // some variables to store time informations
93
private long suspendTotalTime;
94
95     private long beginTime;
96
97     private long lateTotalTime;
98
99     private long activitiesTotalTime;
100
101     private int currentSecond;
102
103     // boolean which store if we need to stop the thread, or wait them naturals
104
// endings
105
private boolean forceThreadStop;
106
107     // references to the managers
108
private GroupDescriptionManager groupDescriptionManager;
109
110     private BehaviorManager behaviorManager;
111
112     private SessionObjectManager sessionObjectManager;
113
114     // local boolean state, which state are defined when all threads have switch
115
// to this state
116
private boolean localStarted;
117
118     private boolean localStopped;
119
120     private boolean localSuspended;
121
122     // Hashtable which will store all the pools
123
private Hashtable JavaDoc pools;
124
125     private String JavaDoc bladeId;
126
127     /**
128      * Constructor, build a new thread which will manage the launching of
129      * behaviors thread following the groups descriptions
130      *
131      * @param gdm
132      * The group description manager
133      * @param bm
134      * The behavior manager
135      * @param sl
136      * The suspendLock
137      * @param tl
138      * The timerLock
139      * @param cl
140      * The componentLock
141      * @param sul
142      * The surplusLock
143      * @param dcw
144      * The data collector write client interface
145      * @param dcwl
146      * The interface lock
147      * @param bir
148      * The interface bladeinsertresponse
149      * @param birl
150      * This interface lock
151      * @param stopped
152      * boolean which store state of the clif component
153      * @param suspended
154      * boolean which store state of the clif component
155      */

156     public GroupExecutionWithPoolThread(String JavaDoc bladeId, GroupDescriptionManager gdm,
157             BehaviorManager bm, SessionObjectManager som, Object JavaDoc sl, Object JavaDoc tl,
158             Object JavaDoc cl, Object JavaDoc sul, DataCollectorWrite dcw, Object JavaDoc dcwl,
159             BladeInsertResponse bir, Object JavaDoc birl, BooleanHolder stopped,
160             BooleanHolder suspended) {
161         // set the references of the attributes
162
this.bladeId = bladeId;
163         this.groupDescriptionManager = gdm;
164         this.behaviorManager = bm;
165         this.sessionObjectManager = som;
166         this.suspendLock = sl;
167         this.timerLock = tl;
168         this.componentLock = cl;
169         this.surplusLock = sl;
170         this.dataCollectorWrite = dcw;
171         this.dataCollectorWrite_lock = dcwl;
172         this.bladeInsertResponse = bir;
173         this.bladeInsertResponse_lock = birl;
174         this.stopped = stopped;
175         this.suspended = suspended;
176         // init the local variable
177
this.lateTotalTime = 0;
178         this.beginTime = 0;
179         this.suspendTotalTime = 0;
180         this.currentSecond = 1;
181         this.activitiesTotalTime = 0;
182         // init local boolean state, which state are defined when all threads
183
// have switch to this state
184
this.localStarted = false;
185         this.localStopped = false;
186         this.localSuspended = false;
187         // init the groupExecution lock
188
this.groupExecutionLock = new Object JavaDoc();
189
190         // initialize all pools
191
this.initializePools();
192     }
193
194     /**
195      * This method initialize all pools which will be used during execution
196      */

197     private void initializePools() {
198         // init the pools table
199
this.pools = new Hashtable JavaDoc();
200         // get an enumeration of all existing groups
201
Enumeration JavaDoc groupsId = this.groupDescriptionManager.getGroupsIds();
202         // for each group create a pool
203
while (groupsId.hasMoreElements()) {
204             // get the id
205
Integer JavaDoc id = (Integer JavaDoc) groupsId.nextElement();
206             // get the maximum behaviors running together
207
int max = this.groupDescriptionManager
208                     .getMaximumBehaviorsForAGroup(id);
209             // initialize a new pool
210
BehaviorsPool pool = new BehaviorsPool(bladeId, id,
211                     this.groupDescriptionManager, this.behaviorManager,
212                     this.sessionObjectManager, this.stopped, this.suspended,
213                     this.suspendLock, this.groupExecutionLock,
214                     this.dataCollectorWrite, this.dataCollectorWrite_lock,
215                     this.timerLock);
216             // initialize the pool
217
pool.initializeBehaviorsThreads();
218             // store the pool in the table
219
this.pools.put(id, pool);
220         }
221     }
222
223     /**
224      * @see java.lang.Runnable#run()
225      */

226     public void run() {
227         // get the starting time
228
this.beginTime = System.currentTimeMillis();
229         if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) {
230             log.log(BasicLevel.DEBUG, " ** GET ** beginTime=" + beginTime);
231         }
232         // init a boolean which store if all groups have finish
233
boolean allGroupsFinished = false ;
234         // while the stopped mode is not set
235
while (!stopped.getBooleanValue() && !allGroupsFinished) {
236             if (IsacScenarioEngine.DEBUG_ON && log.isLoggable(BasicLevel.DEBUG)) {
237                 log.log(BasicLevel.DEBUG, "******************** SECOND="
238                         + currentSecond + "***************");
239             }
240             // if we are in suspended mode do suspend
241
this.executeSuspendIfNeeded();
242             // get all the group ids which are still running
243
Enumeration JavaDoc groupsIds = this.groupDescriptionManager.getGroupsIds();
244             // check if there is still some groups
245
if (!groupsIds.hasMoreElements()) {
246                 log.log(BasicLevel.DEBUG, "There is no more group in the GDM");
247                 // exit the while loop
248
break;
249             }
250             // put the all finished group to true
251
allGroupsFinished = true ;
252             // for each group
253
while (groupsIds.hasMoreElements()) {
254                 // if we are in suspended mode do suspend
255
this.executeSuspendIfNeeded();
256                 // get the current group id
257
Integer JavaDoc currentId = (Integer JavaDoc) groupsIds.nextElement();
258                 if (IsacScenarioEngine.DEBUG_ON
259                         && log.isLoggable(BasicLevel.DEBUG)) {
260                     log.log(BasicLevel.DEBUG, " ** GET (" + currentId
261                             + ")** Group analyzed=" + currentId);
262                 }
263                 // get the group description
264
GroupDescription currentGroupDescription = (GroupDescription) this.groupDescriptionManager
265                         .getGroupDescription(currentId);
266                 // get the number of thread which need to run, following the
267
// group description
268
long numberOfThreads = currentGroupDescription
269                         .getVirtualUserNumber(currentSecond);
270                 if (IsacScenarioEngine.DEBUG_ON
271                         && log.isLoggable(BasicLevel.DEBUG)) {
272                     log.log(BasicLevel.DEBUG,
273                             " ** GET ** numberOfThreads needed="
274                                     + numberOfThreads);
275                 }
276
277                 // set the number of thread required in the pool to the number of thread required
278
if (!((BehaviorsPool) this.pools.get(currentId))
279                         .setNumberOfThreadsRunning((int) numberOfThreads)) {
280                     allGroupsFinished = false ;
281                 }
282             } // while groups.hasMoreElements
283
this.sleepSurplusTime();
284             this.currentSecond++;
285         } // while !stopped
286
// test if we are in stopped mode
287
if (stopped.getBooleanValue()) {
288             // wait the stop of all threads
289
this.waitForAllThreadsEnding();
290             synchronized (this.componentLock) {
291                 // set the local stop value
292
this.localStopped = true;
293                 this.componentLock.notify();
294             }
295         }
296         // we finish the execution by ourself, so we could send a completed
297
// message
298
else {
299             // put the stopped state, in order to kill all threads
300
this.stopped.setBooleanValue(true);
301             // before the sending of the complete message, stop all the threads
302
this.waitForAllThreadsEnding();
303             log.log(BasicLevel.DEBUG, " ** GET ** send completed msg");
304             // send a completed message to the right CLIF component
305
synchronized (bladeInsertResponse_lock) {
306                 if (bladeInsertResponse != null) {
307                     bladeInsertResponse.completed();
308                 }
309             }
310         }
311         // XXX
312
long endTime = System.currentTimeMillis();
313         log.log(BasicLevel.WARN,"** EXECUTION FINISHED : ");
314         log.log(BasicLevel.WARN,"** currentSecond=" + currentSecond);
315         if (!allGroupsFinished) {
316             log.log(BasicLevel.WARN,"** application was forced to stop");
317         } else {
318             log.log(BasicLevel.WARN,"** application finish by itself");
319         }
320         log.log(BasicLevel.WARN,"** REAL DURATION OF THE TEST="
321                 + (this.beginTime - endTime));
322         log.log(BasicLevel.WARN,"** TOTAL SUSPENDED TIME=" + this.suspendTotalTime);
323         log.log(BasicLevel.WARN,"** REAL ACTIVITIES TIME="
324                 + this.activitiesTotalTime);
325         log.log(BasicLevel.WARN,"** TOTAL LATE TIME=" + this.lateTotalTime);
326     }
327
328     /**
329      * This method wait the end of all the threads of all pools
330      */

331     private void waitForAllThreadsEnding() {
332         log.log(BasicLevel.DEBUG, " ** GET ** WAIT ALL THREAD FINISHING");
333         synchronized (this.groupExecutionLock) {
334             log.log(BasicLevel.DEBUG, " ** GET ** AWAKE ALL POOLED THREADS");
335             this.awakeAllThreads();
336             boolean haveAllFinished = false;
337             while (!haveAllFinished) {
338                 log.log(BasicLevel.DEBUG,
339                         " ** GET ** WHILE THEY DON'T HAVE ALL FINISH");
340                 // put the all finished to true
341
haveAllFinished = true;
342                 // test if there is some alive threads in each pool
343
Enumeration JavaDoc poolsElems = this.pools.elements();
344                 while (poolsElems.hasMoreElements()) {
345                     BehaviorsPool temp = (BehaviorsPool) poolsElems
346                             .nextElement();
347                     if (temp.getNumberOfThreadAlive() != 0) {
348                         log.log(BasicLevel.DEBUG,
349                                 " ** GET ** A POOL HAVE SOME RUNNING THREADS ->"
350                                         + temp.getNumberOfThreadAlive());
351                         // there is still alive threads
352
haveAllFinished = false;
353                         // finish the while on the pools
354
break;
355                     }
356                 }
357                 // if all pool have finish
358
if (haveAllFinished) {
359                     break;
360                 }
361                 // wait a notify from a last thread of a pool
362
log.log(BasicLevel.DEBUG,
363                         " ** GET ** WAIT NOTIFICATION OF A POOL");
364                 try {
365                     this.groupExecutionLock.wait();
366                 } catch (InterruptedException JavaDoc ex) {
367                     throw new IsacRuntimeException(
368                             "Unable to wait the end of all threads",ex);
369                 }
370             }
371         }
372     }
373
374     /**
375      * This method execute the method awakeAll to all pools
376      */

377     private void awakeAllThreads() {
378         Enumeration JavaDoc e = this.pools.elements();
379         while (e.hasMoreElements()) {
380             ((BehaviorsPool) e.nextElement()).awakeAllThreads();
381         }
382     }
383
384     /**
385      * This fonction calculate the elapsed time since the last loop in the big
386      * while each loop must during LOOP_DURATIONms, so wait the rest of time
387      */

388     private void sleepSurplusTime() {
389         long time = System.currentTimeMillis();
390         long elapsed = time - beginTime - (currentSecond - 1) * LOOP_DURATION
391                 - suspendTotalTime - lateTotalTime;
392         // if (IsacScenarioEngine.DEBUG_ON &&
393
// log.isLoggable(BasicLevel.DEBUG))
394
// {
395
if (log.isLoggable(BasicLevel.WARN))
396             log.log(BasicLevel.WARN, " ** GET ** elapsed time=" + elapsed);
397         // }
398

399         this.activitiesTotalTime += elapsed;
400         // if we are in late don't do any sleep
401
if (LOOP_DURATION - elapsed < 0) {
402             this.lateTotalTime += elapsed - LOOP_DURATION;
403         } else {
404             long sleeped = 0;
405             while (sleeped < (LOOP_DURATION - elapsed)) {
406                 long beginSleep = System.currentTimeMillis();
407                 synchronized (this.componentLock) {
408                     // check if the suspended state have been put
409
if (!this.suspended.getBooleanValue()) {
410                         synchronized (this.surplusLock) {
411                             try {
412                                 surplusLock.wait(LOOP_DURATION - elapsed);
413                             } catch (InterruptedException JavaDoc ex) {
414                                 throw new IsacRuntimeException(
415                                         "Unable to sleep the surplus time",ex);
416                             }
417                         }
418                     }
419                 }
420                 sleeped += System.currentTimeMillis() - beginSleep;
421                 // test if we are in stopped state
422
if (this.stopped.getBooleanValue())
423                     return;
424                 this.executeSuspendIfNeeded();
425             }
426         }
427     }
428
429     /**
430      * This method analyse if we are in suspended state and wait during the
431      * suspended state if we are
432      */

433     private void executeSuspendIfNeeded() {
434         log.log(BasicLevel.DEBUG, " ** GET ** execute suspend if needed");
435         // synchronized (this.scenarioLock) {
436
if (this.suspended.getBooleanValue()) {
437             // we will store the begining time of the suspend
438
long beginSuspend = System.currentTimeMillis();
439             // for all group wait suspend state
440
Enumeration JavaDoc poolElements = this.pools.elements() ;
441             while (poolElements.hasMoreElements()) {
442                 BehaviorsPool pool = (BehaviorsPool)poolElements.nextElement() ;
443                 pool.waitForSuspendState() ;
444             }
445             // now put the suspended local state, because all
446
// threads have been put in this state
447
synchronized (this.componentLock) {
448                 this.localSuspended = true;
449                 // awake the isac component
450
this.componentLock.notify();
451             }
452             log.log(BasicLevel.DEBUG, " ** GET ** DO SUSPEND");
453             // now we could switch to suspend mode
454
synchronized (this.suspendLock) {
455                 try {
456                     this.suspendLock.wait();
457                 } catch (InterruptedException JavaDoc ex) {
458                     throw new IsacRuntimeException(
459                             "Unable to wait during the suspended state in GroupExecutionThread",ex);
460                 }
461             }
462             // for all group wait resume state
463
poolElements = this.pools.elements() ;
464             while (poolElements.hasMoreElements()) {
465                 BehaviorsPool pool = (BehaviorsPool)poolElements.nextElement() ;
466                 pool.waitForResumeState() ;
467             }
468             synchronized (this.componentLock) {
469                 // now put the resume state
470
this.localSuspended = false;
471                 // awake the isac component
472
this.componentLock.notify();
473             }
474             // add this suspend time to the variable which store the
475
// total suspend time
476
this.suspendTotalTime += System.currentTimeMillis() - beginSuspend;
477         }
478     }
479
480     /////////////////////////////////
481
// Local sates attributes
482
/////////////////////////////////
483

484     /**
485      * @return Returns the localStarted.
486      */

487     public boolean isLocalStarted() {
488         return localStarted;
489     }
490
491     /**
492      * @return Returns the localStopped.
493      */

494     public boolean isLocalStopped() {
495         return localStopped;
496     }
497
498     /**
499      * @return Returns the localSuspended.
500      */

501     public boolean isLocalSuspended() {
502         return localSuspended;
503     }
504 }
Popular Tags