KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > PerfTest


1 // $Id: PerfTest.java,v 1.6 2004/07/05 14:15:11 belaban Exp $
2

3 package org.jgroups.tests;
4
5 import org.jgroups.*;
6 import org.jgroups.blocks.PullPushAdapter;
7 import org.jgroups.util.Util;
8
9 import java.io.IOException JavaDoc;
10 import java.io.ObjectInput JavaDoc;
11 import java.io.ObjectOutput JavaDoc;
12 import java.util.HashMap JavaDoc;
13 import java.util.Iterator JavaDoc;
14 import java.util.Map JavaDoc;
15 import java.util.Vector JavaDoc;
16
17 /**
18  * Test which multicasts n messages to all members. Measures the time until all members have received
19  * all messages from all senders. Start a number of members (e.g. 4). Wait until all of them are up and
20  * have joined the group. Then press 's' for all senders to start multicasting messages. When you see all
21  * *--* DONE messages for all senders, press 'a' to see the total stats.
22  * @author Bela Ban
23  */

24 public class PerfTest implements MessageListener, MembershipListener{
25
26     /** HashMap<Address, Entry>. Stores received multicasts. Keyed by sender */
27     HashMap JavaDoc data=new HashMap JavaDoc();
28
29     /** Keeps track of membership */
30     Vector JavaDoc mbrs=new Vector JavaDoc();
31
32     /** Channel properties */
33     String JavaDoc props=null;
34
35     /** Sits on top of the channel */
36     PullPushAdapter adapter=null;
37
38     /** My channel for sending and receiving messages */
39     JChannel ch=null;
40
41     /** Am I a sender as well ? */
42     boolean sender=true;
43
44     /** Sleep time between bursts in milliseconds. 0 means no sleep */
45     long sleep_time=10;
46
47     /** Use busy sleeping ? (see #Util.sleep(long,boolean) for details) */
48     boolean busy_sleep=false;
49
50     /** Number of bursts. Total number of messages is <tt>num_bursts * num_msgs_per_burst</tt> */
51     int num_bursts=100;
52
53     /** Number of messages per burst. After a burst we sleep for <tt>sleep_time</tt> msecs */
54     int num_msgs_per_burst=10;
55
56     /** Size of a message in bytes */
57     int msg_size=10000;
58
59     /** The buffer to be sent (will be <tt>msg_size</tt> bytes) */
60     byte[] buf=null;
61
62     /** Number of messages sent by us */
63     long sent_msgs=0;
64
65     final static String JavaDoc HDRNAME="PerfHeaderName";
66
67
68
69
70     public PerfTest(String JavaDoc props, int num_bursts, int num_msgs_per_burst,
71                     int msg_size, long sleep_time, boolean sender) {
72         this.props=props;
73         this.num_bursts=num_bursts;
74         this.num_msgs_per_burst=num_msgs_per_burst;
75         this.msg_size=msg_size;
76         this.sleep_time=sleep_time;
77         this.buf=new byte[msg_size];
78         this.sender=sender;
79     }
80
81
82
83     public void start() throws Exception JavaDoc {
84         try {
85             ch=new JChannel(props);
86             ch.connect("PerfTest-Group");
87             adapter=new PullPushAdapter(ch, this, this);
88             mainLoop();
89         }
90         finally {
91             if(ch != null)
92                 ch.close();
93         }
94     }
95
96     void mainLoop() throws Exception JavaDoc {
97         boolean looping=true;
98         int choice;
99         while(looping) {
100             choice=choice();
101             switch(choice) {
102                 case 'q': case 'x':
103                     looping=false;
104                     break;
105                 case 's':
106                     MyHeader hdr=new MyHeader(MyHeader.START, num_bursts * num_msgs_per_burst);
107                     Message start_msg=new Message(null, null, null);
108                     start_msg.putHeader(HDRNAME, hdr);
109                     adapter.send(start_msg);
110                     break;
111                 case 'c':
112                     Message clear_msg=new Message();
113                     clear_msg.putHeader(HDRNAME, new MyHeader(MyHeader.CLEAR, 0));
114                     adapter.send(clear_msg);
115                     break;
116                 case 't':
117                     printStats();
118                     break;
119                 case 'p':
120                     printParams();
121                     break;
122                 case 'v':
123                     System.out.println("-- view: " + ch.getView());
124                     break;
125                 case 'a':
126                     printStatsForAllSenders();
127                     break;
128             }
129         }
130     }
131
132
133     private void printStatsForAllSenders() {
134         long start_time=0, stop_time=0, total_time;
135         Entry entry;
136         int num_msgs=0, num_senders=0;
137
138         for(Iterator JavaDoc it=data.values().iterator(); it.hasNext();) {
139             entry=(Entry)it.next();
140             if(entry.num_received > 0) {
141                 num_msgs+=entry.num_received;
142                 num_senders++;
143
144                 // get the earliest start time
145
if(start_time == 0)
146                     start_time=entry.start;
147                 else {
148                     start_time=Math.min(start_time, entry.start);
149                 }
150
151                 // get the latest stop time
152
if(stop_time == 0) {
153                     stop_time=entry.stop;
154                 }
155                 else {
156                     stop_time=Math.max(stop_time, entry.stop);
157                 }
158             }
159         }
160
161         total_time=stop_time - start_time;
162
163         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
164         sb.append("total number of messages sent by me: ").append(sent_msgs).append('\n');
165         sb.append("total number of messages received: ").append(num_msgs).append('\n');
166         sb.append("total number of senders: ").append(num_senders).append('\n');
167         sb.append("total time: ").append(total_time).append(" ms\n");
168         sb.append("msgs/sec: ").append((double)num_msgs / (total_time/1000.0)).append('\n');
169         sb.append("throughput (kb/sec): ").append((num_msgs * msg_size/1000.0) / (total_time / 1000.0)).append('\n');
170         System.out.println(sb.toString());
171     }
172
173
174     private void printParams() {
175         System.out.println("num_bursts: " + num_bursts + '\n' +
176                            "num_msgs_per_burst: " + num_msgs_per_burst + '\n' +
177                            "msg_size: " + msg_size + '\n' +
178                            "sleep_time: " + sleep_time + '\n' +
179                            "sender: " + sender);
180     }
181
182     private void printStats() {
183         for(Iterator JavaDoc it=data.entrySet().iterator(); it.hasNext();) {
184             Map.Entry JavaDoc entry=(Map.Entry JavaDoc)it.next();
185             System.out.println("stats for " + entry.getKey() + "");
186             System.out.println(((Entry)entry.getValue()).printStats() + '\n');
187         }
188     }
189
190
191     void sendMessages() {
192         MyHeader hdr;
193         Message msg;
194         int seqno=0;
195         long start, stop;
196
197         if(sender == false) {
198             System.out.println("-- I'm not a sender; will not send messages");
199             return;
200         }
201         else {
202             System.out.println("-- sending " + num_bursts * num_msgs_per_burst + " msgs");
203         }
204
205         sent_msgs=0;
206
207         try {
208             start=System.currentTimeMillis();
209             for(int i=0; i < num_bursts; i++) {
210                 for(int j=0; j < num_msgs_per_burst; j++) {
211                     hdr=new MyHeader(MyHeader.DATA, seqno++);
212                     msg=new Message(null, null, buf);
213                     msg.putHeader(HDRNAME, hdr);
214                     adapter.send(msg);
215                     sent_msgs++;
216                     if(sent_msgs % 100 == 0)
217                         System.out.println("++ sent " + sent_msgs);
218                 }
219                 Util.sleep(sleep_time);
220             }
221             stop=System.currentTimeMillis();
222             System.out.println("-- sent " + num_bursts * num_msgs_per_burst + " msgs (in " +
223                                (stop-start) + " ms)");
224 // System.out.flush();
225
// Util.sleep(1000);
226
// System.exit(1);
227
}
228         catch(Throwable JavaDoc t) {
229             t.printStackTrace();
230         }
231     }
232
233
234
235     int choice() throws Exception JavaDoc {
236         System.out.println("s=send, c=clear, t=print stats, p=print parameters v=view, " +
237                            "a=times for all messages, q=quit\nChoice: ");
238         System.out.flush();
239         System.in.skip(System.in.available());
240         int c=System.in.read();
241         System.out.flush();
242         return c;
243     }
244
245
246     public void receive(Message msg) {
247         Address sender=msg.getSrc();
248         MyHeader hdr=(MyHeader)msg.removeHeader(HDRNAME);
249         if(hdr == null) {
250             System.err.println("-- error: header was null");
251             return;
252         }
253         switch(hdr.type) {
254             case MyHeader.START:
255                 updateTimestamp();
256
257                 new Thread JavaDoc() {
258                     public void run() {
259                         // needs to be done in a separate thread; otherwise we cannot receive
260
// data messages until we have sent all messages (sendMessages() returned).
261
sendMessages();
262                     }
263                 }.start();
264
265
266                 break;
267             case MyHeader.DATA:
268                 Entry entry=(Entry)data.get(sender);
269                 if(entry == null) {
270                     System.err.println("-- received a message from " + sender + ", who is not in the list");
271                 }
272                 else {
273                     entry.add(hdr.seqno);
274                     if((hdr.seqno) % 100 == 0)
275                         System.out.println("-- received " + sender + ':' + hdr.seqno);
276                     if(entry.getNumReceived() >= num_bursts * num_msgs_per_burst) {
277                         if(entry.done())
278                             System.out.println("*--* " + sender + " DONE");
279                     }
280                 }
281                 break;
282             case MyHeader.DONE:
283
284                 break;
285             case MyHeader.CLEAR:
286                 clear();
287                 break;
288             default:
289                 break;
290         }
291     }
292
293     private void updateTimestamp() {
294         for(Iterator JavaDoc it=data.values().iterator(); it.hasNext();) {
295             Entry entry=(Entry)it.next();
296             entry.start=System.currentTimeMillis();
297         }
298     }
299
300     void clear() {
301         System.out.println("-- clearing the data");
302         data.clear();
303         for(int i=0; i < mbrs.size(); i++)
304             data.put(mbrs.elementAt(i), new Entry(num_bursts * num_msgs_per_burst));
305     }
306
307     public byte[] getState() {
308         return null;
309     }
310
311     public void setState(byte[] state) {
312         ;
313     }
314
315     public void viewAccepted(View new_view) {
316         System.out.println("-- new view: " + new_view.getMembers());
317         mbrs.clear();
318         mbrs.addAll(new_view.getMembers());
319         clear();
320     }
321
322     public void suspect(Address suspected_mbr) {
323         ;
324     }
325
326     public void block() {
327         ;
328     }
329
330
331
332     public static void main(String JavaDoc[] args) {
333         String JavaDoc props=null;
334         int num_bursts=100;
335         int num_msgs_per_burst=10;
336         long sleep_time=10;
337         int msg_size=10000; // in bytes
338
boolean sender=true;
339
340         PerfTest t;
341
342
343         for(int i=0; i < args.length; i++) {
344             if("-props".equals(args[i])) {
345                 props=args[++i];
346                 continue;
347             }
348             if("-num_bursts".equals(args[i])) {
349                 num_bursts=Integer.parseInt(args[++i]);
350                 continue;
351             }
352             if("-num_msgs_per_burst".equals(args[i])) {
353                 num_msgs_per_burst=Integer.parseInt(args[++i]);
354                 continue;
355             }
356             if("-sleep_time".equals(args[i])) {
357                 sleep_time=Long.parseLong(args[++i]);
358                 continue;
359             }
360             if("-msg_size".equals(args[i])) {
361                 msg_size=Integer.parseInt(args[++i]);
362                 continue;
363             }
364             if("-sender".equals(args[i])) {
365                 sender=Boolean.valueOf(args[++i]).booleanValue();
366                 continue;
367             }
368             help();
369             return;
370         }
371         try {
372             t=new PerfTest(props, num_bursts, num_msgs_per_burst, msg_size, sleep_time, sender);
373             t.start();
374         }
375         catch(Throwable JavaDoc ex) {
376             ex.printStackTrace();
377         }
378     }
379
380     static void help() {
381         System.out.println("PerfTest [-help] [-props <properties>] [-num_bursts <num>] " +
382                            "[-num_msgs_per_burst <num>] [-sleep_time <number of msecs>] " +
383                            "[-msg_size <bytes>] [-sender <true/false>]");
384     }
385
386
387
388
389     public static class MyHeader extends Header {
390         public static final int DATA = 1;
391         public static final int START = 2;
392         public static final int CLEAR = 3;
393         public static final int DONE = 4;
394
395         int type=0;
396         int seqno=-1;
397
398
399         public MyHeader() {
400
401         }
402
403         public MyHeader(int type, int seqno) {
404             this.type=type;
405             this.seqno=seqno;
406         }
407
408         public long size() {
409             return 16;
410         }
411
412         public String JavaDoc toString() {
413             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
414             switch(type) {
415                 case DATA: sb.append("DATA (seqno=").append(seqno).append(')'); break;
416                 case START: sb.append("START"); break;
417                 case CLEAR: sb.append("CLEAR"); break;
418                 default: sb.append("<n/a>"); break;
419             }
420             return sb.toString();
421         }
422
423         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
424             out.writeInt(type);
425             out.writeInt(seqno);
426         }
427
428         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
429             type=in.readInt();
430             seqno=in.readInt();
431         }
432
433     }
434
435
436
437     class Entry {
438         long start=0, stop=0;
439         int num_received=0;
440         int[] seqnos=null;
441
442
443         Entry(int num) {
444             seqnos=new int[num];
445             for(int i=0; i < seqnos.length; i++)
446                 seqnos[i]=-1;
447             start=System.currentTimeMillis();
448         }
449
450         void add(int seqno) {
451             if(seqnos != null)
452                 seqnos[seqno]=seqno;
453             num_received++;
454             if(num_received >= seqnos.length) {
455                 if(done())
456                     stop=System.currentTimeMillis();
457             }
458         }
459
460         boolean done() {
461             if(seqnos == null)
462                 return false;
463             for(int i=0; i < seqnos.length; i++)
464                 if(seqnos[i] < 0)
465                     return false;
466             return true;
467         }
468
469         int getNumReceived() {
470             return num_received;
471         }
472
473         int getRealReceived() {
474             int num=0;
475             if(seqnos == null) return 0;
476             for(int i=0; i < seqnos.length; i++) {
477                 if(seqnos[i] > -1)
478                     num++;
479             }
480             return num;
481         }
482
483         String JavaDoc printStats() {
484             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
485             sb.append("done=").append(done()).append('\n');
486             sb.append("number of messages received: ").append(getRealReceived()).append('\n');
487             sb.append("total time: ").append(stop-start).append(" ms\n");
488             sb.append("msgs/sec: ").append((double)getRealReceived() / ((stop-start)/1000.0)).append('\n');
489             sb.append("throughput (kb/sec): ").append((getRealReceived() * msg_size/1000.0) / ((stop-start) / 1000.0)).append('\n');
490             return sb.toString();
491         }
492     }
493
494
495 }
496
Popular Tags