KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > clif > scenario > util > multithread > MTScenario


1 /*
2 * CLIF is a Load Injection Framework
3 * Copyright (C) 2004 France Telecom R&D
4 * Copyright (C) 2004 INRIA
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 *
20 * CLIF $Name: $
21 *
22 * Contact: clif@objectweb.org
23 */

24
25 package org.objectweb.clif.scenario.util.multithread;
26
27 import org.objectweb.clif.datacollector.api.DataCollectorWrite;
28 import org.objectweb.clif.server.api.BladeInsertResponse;
29 import org.objectweb.clif.server.api.BladeControl;
30 import org.objectweb.clif.storage.api.ActionEvent;
31 import org.objectweb.clif.supervisor.api.ClifException;
32 import org.objectweb.clif.util.ClifClassLoader;
33 import org.objectweb.fractal.api.control.BindingController;
34 import org.objectweb.fractal.api.control.LifeCycleController;
35 import java.io.Serializable JavaDoc;
36 import java.util.StringTokenizer JavaDoc;
37 import java.util.NoSuchElementException JavaDoc;
38 import java.util.Random JavaDoc;
39
40 /**
41  * Abstract implementation of a multi-thread based scenario component.
42  * Method newSession() should be implemented by a derived class in order to provide
43  * actual action to be performed. Each session loops on calling its action() method, animated
44  * by its own thread.
45  * MTscenario must be given an argument line (as a single String) beginning with 3 integer parameters:
46  * <UL>
47  * <LI>the number of threads
48  * <LI>the loop time-out in seconds
49  * <LI>the ramp-up time in seconds (added to the loop time-out)
50  * </UL>
51  * The trailing characters of the argument String is passed as argument to the newSession() method.
52  * @see #newSession(int, String)
53  * @see #setArgument(String)
54  * @author Bruno Dillenseger
55  */

56 public abstract class MTScenario
57     implements BladeControl, BindingController, LifeCycleController
58 {
59     static private final String JavaDoc[] interfaceNames = new String JavaDoc[] {
60         DataCollectorWrite.DATA_COLLECTOR_WRITE,
61         BladeInsertResponse.BLADE_INSERT_RESPONSE };
62     protected Serializable JavaDoc testId;
63     protected String JavaDoc scenarioId;
64     private Object JavaDoc scenario_lock = new Object JavaDoc();
65     private Object JavaDoc activities_lock = new Object JavaDoc();
66     private BladeInsertResponse sr;
67     private Object JavaDoc sr_lock = new Object JavaDoc();
68     private DataCollectorWrite dcw;
69     private Object JavaDoc dc_lock = new Object JavaDoc();
70     private Activity[] threads;
71     private int arg_thread_nb = 0;
72     private int arg_duration_s = 0;
73     private int arg_rampup_duration_ms = 0;
74     private String JavaDoc sessionArg = null;
75     private volatile int thr_remaining = 0;
76     private volatile int thr_waiting = 0;
77     private volatile boolean started;
78     private volatile boolean suspended;
79     private volatile boolean stopped;
80     private StopTimer timer = null;
81     private String JavaDoc fcState = LifeCycleController.STOPPED;
82     private Random JavaDoc random = new Random JavaDoc();
83
84
85     public MTScenario()
86     {
87     }
88
89
90     abstract public MTScenarioSession newSession(int sessionId, String JavaDoc arg) throws ClifException;
91
92
93     ///////////////////////////////////
94
// interface LifeCycleController //
95
///////////////////////////////////
96

97
98     public void startFc()
99     {
100         fcState = LifeCycleController.STARTED;
101     }
102
103
104     public void stopFc()
105     {
106         if (timer != null) // scenario has been initialized
107
{
108             stop();
109         }
110         fcState = LifeCycleController.STOPPED;
111     }
112
113
114     public String JavaDoc getFcState()
115     {
116         return fcState;
117     }
118
119
120     ///////////////////////////////////////////////////
121
// implementation of interface BindingController //
122
///////////////////////////////////////////////////
123

124
125     public Object JavaDoc lookupFc(String JavaDoc clientItfName)
126     {
127         if (clientItfName.equals(DataCollectorWrite.DATA_COLLECTOR_WRITE))
128         {
129             return dcw;
130         }
131         else if (clientItfName.equals(BladeInsertResponse.BLADE_INSERT_RESPONSE))
132         {
133             return sr;
134         }
135         else
136         {
137             return null;
138         }
139     }
140
141
142     public void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
143     {
144         if (clientItfName.equals(DataCollectorWrite.DATA_COLLECTOR_WRITE))
145         {
146             synchronized (dc_lock)
147             {
148                 dcw = (DataCollectorWrite) serverItf;
149             }
150         }
151         else if (clientItfName.equals(BladeInsertResponse.BLADE_INSERT_RESPONSE))
152         {
153             synchronized (sr_lock)
154             {
155                 sr = (BladeInsertResponse) serverItf;
156             }
157         }
158     }
159
160
161     public void unbindFc(String JavaDoc clientItfName)
162     {
163         if (clientItfName.equals(DataCollectorWrite.DATA_COLLECTOR_WRITE))
164         {
165             synchronized (dc_lock)
166             {
167                 dcw = null;
168             }
169         }
170         else if (clientItfName.equals(BladeInsertResponse.BLADE_INSERT_RESPONSE))
171         {
172             synchronized (sr_lock)
173             {
174                 sr = null;
175             }
176         }
177     }
178
179
180     public String JavaDoc[] listFc()
181     {
182         return interfaceNames;
183     }
184
185
186     //////////////////////////////////////////////
187
// implementation of interface BladeControl //
188
//////////////////////////////////////////////
189

190
191     /**
192      * initializes a new test, creating and starting the given number of activity threads,
193      * and returns as soon as every thread has been actually started
194      * @param testId unique identifier of the new test
195      */

196     public void init(Serializable JavaDoc testId)
197         throws ClifException
198     {
199         synchronized(scenario_lock)
200         {
201             this.testId = testId;
202             started = stopped = suspended = false;
203             threads = new Activity[arg_thread_nb];
204             thr_waiting = 0;
205             Thread.currentThread().setContextClassLoader(ClifClassLoader.getClassLoader());
206             for (thr_remaining = 0 ; thr_remaining < arg_thread_nb ; ++thr_remaining)
207             {
208                 threads[thr_remaining] = new Activity(thr_remaining, newSession(thr_remaining, sessionArg));
209                 threads[thr_remaining].start();
210             }
211             synchronized (activities_lock)
212             {
213                 if (thr_waiting != thr_remaining)
214                 {
215                     try
216                     {
217                         activities_lock.wait();
218                     }
219                     catch (InterruptedException JavaDoc ex)
220                     {
221                         ex.printStackTrace(System.err);
222                     }
223                 }
224             }
225             timer = new StopTimer(arg_duration_s);
226         }
227     }
228
229
230     /**
231      * releases every activity thread
232      */

233     public void start()
234     {
235         synchronized (scenario_lock)
236         {
237             started = true;
238             scenario_lock.notifyAll();
239         }
240         synchronized (activities_lock)
241         {
242             if (thr_waiting != 0)
243             {
244                 try
245                 {
246                     activities_lock.wait();
247                 }
248                 catch (InterruptedException JavaDoc ex)
249                 {
250                     ex.printStackTrace(System.err);
251                 }
252             }
253         }
254         timer.start();
255     }
256
257
258     public void stop()
259     {
260         synchronized (scenario_lock)
261         {
262             stopped = true;
263             if (! started)
264             {
265                 scenario_lock.notifyAll();
266             }
267         }
268         if (suspended)
269         {
270             resume();
271         }
272         else if (Thread.currentThread() != timer)
273         {
274             synchronized (timer)
275             {
276                 timer.interrupt();
277             }
278         }
279         synchronized (activities_lock)
280         {
281             while (thr_remaining != 0)
282             {
283                 try
284                 {
285                     activities_lock.wait();
286                 }
287                 catch (InterruptedException JavaDoc ex)
288                 {
289                     ex.printStackTrace(System.err);
290                 }
291             }
292         }
293     }
294
295
296     public void suspend()
297     {
298         synchronized (scenario_lock)
299         {
300             suspended = true;
301         }
302         synchronized (activities_lock)
303         {
304             if (thr_waiting != thr_remaining)
305             {
306                 try
307                 {
308                     activities_lock.wait();
309                 }
310                 catch (InterruptedException JavaDoc ex)
311                 {
312                     ex.printStackTrace(System.err);
313                 }
314             }
315         }
316         synchronized (timer)
317         {
318             timer.interrupt();
319         }
320     }
321
322
323     public void resume()
324     {
325         synchronized (scenario_lock)
326         {
327             suspended = false;
328             scenario_lock.notifyAll();
329         }
330         synchronized (activities_lock)
331         {
332             if (thr_waiting != 0)
333             {
334                 try
335                 {
336                     activities_lock.wait();
337                 }
338                 catch (InterruptedException JavaDoc ex)
339                 {
340                     ex.printStackTrace(System.err);
341                 }
342             }
343         }
344         synchronized(timer)
345         {
346             timer.notify();
347         }
348     }
349
350
351     public void join()
352     {
353         synchronized (activities_lock)
354         {
355             if (thr_remaining != 0)
356             {
357                 try
358                 {
359                     activities_lock.wait();
360                 }
361                 catch (InterruptedException JavaDoc ex)
362                 {
363                     ex.printStackTrace(System.err);
364                 }
365             }
366         }
367     }
368
369
370     /**
371      * Sets number of threads and test duration parameters
372      * @param arg should begin with 2 integer parameters (separated with usual separators) setting
373      * (1) the number of threads and (2) the test duration (in seconds). The trailing String will
374      * be used as an argument when creating sessions.
375      * @see #newSession(int, String)
376      */

377     public void setArgument(String JavaDoc arg)
378     {
379         StringTokenizer JavaDoc parser = new StringTokenizer JavaDoc(arg);
380         try
381         {
382             arg_thread_nb = Integer.parseInt(parser.nextToken());
383             arg_duration_s = Integer.parseInt(parser.nextToken());
384             arg_rampup_duration_ms = Integer.parseInt(parser.nextToken()) * 1000;
385             try
386             {
387                 sessionArg = parser.nextToken("");
388             }
389             catch (NoSuchElementException JavaDoc ex)
390             {
391             }
392         }
393         catch (Exception JavaDoc ex)
394         {
395             System.err.println("MTScenario expects 2 arguments: <number_of_threads> <test duration in s>");
396         }
397     }
398
399
400     /**
401      * Sets this scenario's unique identifier
402      */

403     public void setId(String JavaDoc id)
404     {
405         scenarioId = id;
406     }
407
408
409     /**
410      * @return the scenario/blade identifier
411      */

412      public String JavaDoc getId()
413     {
414         return scenarioId;
415     }
416
417
418     //////////////////////////////////////////
419
// inner timer class for scheduled stop //
420
//////////////////////////////////////////
421

422
423     class StopTimer extends Thread JavaDoc
424     {
425         long delay;
426
427         public StopTimer(int delay_s)
428         {
429             super("MTScenario stop timer " + delay_s + "s");
430             delay = delay_s*1000;
431         }
432
433         public void run()
434         {
435             long ellapsed_time = 0;
436             while (ellapsed_time < delay && ! stopped)
437             {
438                 long start_time = System.currentTimeMillis();
439                 try
440                 {
441                     sleep(delay - ellapsed_time);
442                     ellapsed_time = delay;
443                 }
444                 catch (InterruptedException JavaDoc ex)
445                 {
446                     ellapsed_time += System.currentTimeMillis() - start_time;
447                     synchronized(this)
448                     {
449                         if (suspended)
450                         {
451                             try
452                             {
453                                 wait();
454                             }
455                             catch (InterruptedException JavaDoc exc)
456                             {
457                                 exc.printStackTrace(System.err);
458                             }
459                         }
460                     }
461                 }
462             }
463             if (! stopped)
464             {
465                 MTScenario.this.stop();
466                 synchronized (sr_lock)
467                 {
468                     if (sr != null)
469                     {
470                         sr.completed();
471                     }
472                 }
473             }
474         }
475     }
476
477
478     //////////////////////////////////////
479
// inner class for activity threads //
480
//////////////////////////////////////
481

482
483     class Activity extends Thread JavaDoc
484     {
485         int sessionId;
486         MTScenarioSession session;
487         long iteration = 0;
488
489
490         public Activity(int sessionId, MTScenarioSession session)
491         {
492             super(session + " MTScenarioSession #" + sessionId);
493             this.sessionId = sessionId;
494             this.session = session;
495         }
496
497
498         public void run()
499         {
500             synchronized (activities_lock)
501             {
502                 if (++thr_waiting == thr_remaining)
503                 {
504                     activities_lock.notify();
505                 }
506             }
507             synchronized (scenario_lock)
508             {
509                 if (! started && ! stopped)
510                 {
511                     try
512                     {
513                         scenario_lock.wait();
514                     }
515                     catch (InterruptedException JavaDoc ex)
516                     {
517                         ex.printStackTrace(System.err);
518                     }
519                 }
520             }
521             try
522             {
523                 sleep(random.nextInt(arg_rampup_duration_ms));
524             }
525             catch (InterruptedException JavaDoc ex)
526             {
527             }
528             synchronized (activities_lock)
529             {
530                 if (--thr_waiting == 0)
531                 {
532                     activities_lock.notify();
533                 }
534             }
535             while (! stopped)
536             {
537                 action();
538                 synchronized (scenario_lock)
539                 {
540                     if (suspended)
541                     {
542                         synchronized (activities_lock)
543                         {
544                             if (++thr_waiting == thr_remaining)
545                             {
546                                 activities_lock.notify();
547                             }
548                         }
549                         try
550                         {
551                             scenario_lock.wait();
552                         }
553                         catch (InterruptedException JavaDoc ex)
554                         {
555                             ex.printStackTrace(System.err);
556                         }
557                         synchronized (activities_lock)
558                         {
559                             if (--thr_waiting == 0)
560                             {
561                                 activities_lock.notify();
562                             }
563                         }
564                     }
565                 }
566             }
567             synchronized (activities_lock)
568             {
569                 if (--thr_remaining == 0)
570                 {
571                     activities_lock.notify();
572                 }
573             }
574         }
575
576
577         void action()
578         {
579             ActionEvent report = session.action(new ActionEvent(
580                 System.currentTimeMillis(),
581                 scenarioId,
582                 null,
583                 iteration++,
584                 sessionId,
585                 true,
586                 0,
587                 null,
588                 ""));
589             synchronized (dc_lock)
590             {
591                 if (dcw != null)
592                 {
593                     dcw.add(report);
594                 }
595             }
596         }
597     }
598 }
599
Popular Tags