KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > perf > Test


1 package org.jgroups.tests.perf;
2
3 import org.jgroups.util.Util;
4 import org.apache.commons.logging.Log;
5 import org.apache.commons.logging.LogFactory;
6
7 import java.io.BufferedReader JavaDoc;
8 import java.io.FileReader JavaDoc;
9 import java.util.*;
10
11 /** You start the test by running this class.
12  * Use parameters -Xbatch -Xconcurrentio (Solaris specific)
13  * @author Bela Ban (belaban@yahoo.com)
14
15  */

16 public class Test implements Receiver {
17     String JavaDoc props=null;
18     Properties config;
19     boolean sender=false;
20     Transport transport=null;
21     Object JavaDoc local_addr=null;
22
23     /** HashMap<Object,MemberInfo> members. Keys=member addresses, value=MemberInfo */
24     HashMap senders=new HashMap();
25
26     /** Keeps track of members */
27     ArrayList members=new ArrayList();
28
29     /** Set when first message is received */
30     long start=0;
31
32     /** Set when last message is received */
33     long stop=0;
34
35     int num_members=0;
36
37     Log log=LogFactory.getLog(getClass());
38
39     boolean all_received=false;
40
41     /** HashMap<Object, HashMap>. A hashmap of senders, each value is the 'senders' hashmap */
42     HashMap results=new HashMap();
43
44     /** Dump info in gnuplot format */
45     boolean gnuplot_output=false;
46
47     /** Log every n msgs received (if gnuplot_output == true) */
48     long log_interval=1000;
49
50     /** Last time we dumped info */
51     long last_dump=0;
52
53     long counter=1;
54     long msg_size=1000;
55
56
57
58     public void start(Properties c, boolean verbose) throws Exception JavaDoc {
59         String JavaDoc config_file="config.txt";
60         BufferedReader JavaDoc fileReader;
61         String JavaDoc line;
62         String JavaDoc key, val;
63         StringTokenizer st;
64         Properties tmp=new Properties();
65
66         config_file=c.getProperty("config");
67         fileReader=new BufferedReader JavaDoc(new FileReader JavaDoc(config_file));
68         while((line=fileReader.readLine()) != null) {
69             if(line.startsWith("#"))
70                 continue;
71             line=line.trim();
72             if(line.length() == 0)
73                 continue;
74             st=new StringTokenizer(line, "=", false);
75             key=st.nextToken().toLowerCase();
76             val=st.nextToken();
77             tmp.put(key, val);
78         }
79         fileReader.close();
80
81         // 'tmp' now contains all properties from the file, now we need to override the ones
82
// passed to us by 'c'
83
tmp.putAll(c);
84         this.config=tmp;
85
86         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
87         sb.append("\n\n----------------------- TEST -----------------------\n");
88         sb.append("Date: ").append(new Date()).append('\n');
89         sb.append("Run by: ").append(System.getProperty("user.name")).append("\n\n");
90         if(verbose)
91             sb.append("Properties: ").append(printProperties()).append("\n-------------------------\n\n");
92
93         for(Iterator it=this.config.entrySet().iterator(); it.hasNext();) {
94             Map.Entry entry=(Map.Entry)it.next();
95             sb.append(entry.getKey()).append(":\t").append(entry.getValue()).append('\n');
96         }
97         sb.append('\n');
98         System.out.println("Configuration is: " + sb);
99
100         log.info(sb.toString());
101
102         props=this.config.getProperty("props");
103         num_members=Integer.parseInt(this.config.getProperty("num_members"));
104         sender=Boolean.valueOf(this.config.getProperty("sender")).booleanValue();
105         msg_size=Long.parseLong(this.config.getProperty("msg_size"));
106         String JavaDoc tmp2=this.config.getProperty("gnuplot_output", "false");
107         if(Boolean.valueOf(tmp2).booleanValue())
108             this.gnuplot_output=true;
109         tmp2=this.config.getProperty("log_interval");
110         if(tmp2 != null)
111             log_interval=Long.parseLong(tmp2);
112
113         if(gnuplot_output) {
114             sb=new StringBuffer JavaDoc();
115             sb.append("\n##### msgs_received");
116             sb.append(", free_mem [KB] ");
117             sb.append(", total_mem [KB] ");
118             sb.append(", total_msgs_sec [msgs/sec] ");
119             sb.append(", total_throughput [KB/sec] ");
120             sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) ");
121             sb.append(" [msgs/sec] ");
122             sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) ");
123             sb.append(" [KB/sec]\n");
124             if(log.isInfoEnabled()) log.info(sb.toString());
125         }
126
127         String JavaDoc transport_name=this.config.getProperty("transport");
128         transport=(Transport)Thread.currentThread().getContextClassLoader().loadClass(transport_name).newInstance();
129         transport.create(this.config);
130         transport.setReceiver(this);
131         transport.start();
132         local_addr=transport.getLocalAddress();
133     }
134
135     private String JavaDoc printProperties() {
136         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
137         Properties p=System.getProperties();
138         for(Iterator it=p.entrySet().iterator(); it.hasNext();) {
139             Map.Entry entry=(Map.Entry)it.next();
140             sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');
141         }
142         return sb.toString();
143     }
144
145     public void stop() {
146         if(transport != null) {
147             transport.stop();
148             transport.destroy();
149         }
150     }
151
152     public void receive(Object JavaDoc sender, byte[] payload) {
153         Data d;
154
155         try {
156             int type=payload[0];
157             if(type == 1) { // DATA
158
int len=payload.length -1;
159                 handleData(sender, len);
160                 return;
161             }
162
163             byte[] tmp=new byte[payload.length-1];
164             System.arraycopy(payload, 1, tmp, 0, tmp.length);
165             d=(Data)Util.objectFromByteBuffer(tmp);
166
167             switch(d.getType()) {
168                 case Data.DISCOVERY_REQ:
169                     sendDiscoveryResponse();
170                     break;
171                 case Data.DISCOVERY_RSP:
172                     synchronized(this.members) {
173                         if(!this.members.contains(sender)) {
174                             this.members.add(sender);
175                             System.out.println("-- " + sender + " joined");
176                             if(d.sender) {
177                                 synchronized(this.members) {
178                                     if(!this.senders.containsKey(sender)) {
179                                         this.senders.put(sender, new MemberInfo(d.num_msgs));
180                                     }
181                                 }
182                             }
183                             this.members.notify();
184                         }
185                     }
186                     break;
187
188                 case Data.DONE:
189                     if(all_received)
190                         return;
191                     MemberInfo mi=(MemberInfo)this.senders.get(sender);
192                     if(mi != null) {
193                         mi.done=true;
194                         if(mi.stop == 0)
195                             mi.stop=System.currentTimeMillis();
196                         if(allReceived()) {
197                             all_received=true;
198                             if(stop == 0)
199                                 stop=System.currentTimeMillis();
200                             sendResults();
201                             if(!this.sender)
202                                 dumpSenders();
203                             synchronized(this) {
204                                 this.notify();
205                             }
206                         }
207                     }
208                     else {
209                         System.err.println("-- sender " + sender + " not found in senders hashmap");
210                     }
211                     break;
212
213                 case Data.RESULTS:
214                     synchronized(results) {
215                         if(!results.containsKey(sender)) {
216                             results.put(sender, d.results);
217                             results.notify();
218                         }
219                     }
220                     break;
221
222                 default:
223                     System.err.println("received invalid data type: " + payload[0]);
224                     break;
225             }
226         }
227         catch(Exception JavaDoc e) {
228             e.printStackTrace();
229         }
230     }
231
232     private void handleData(Object JavaDoc sender, int num_bytes) {
233         if(all_received)
234             return;
235         if(start == 0) {
236             start=System.currentTimeMillis();
237             last_dump=start;
238         }
239         MemberInfo info=(MemberInfo)this.senders.get(sender);
240         if(info != null) {
241             if(info.start == 0)
242                 info.start=System.currentTimeMillis();
243             info.num_msgs_received++;
244             counter++;
245             info.total_bytes_received+=num_bytes;
246             if(info.num_msgs_received % 1000 == 0)
247                 System.out.println("-- received " + info.num_msgs_received +
248                                    " messages from " + sender);
249
250             if(counter % log_interval == 0) {
251                 if(log.isInfoEnabled()) log.info(dumpStats(counter));
252             }
253
254             if(info.num_msgs_received >= info.num_msgs_expected) {
255                 info.done=true;
256                 if(info.stop == 0)
257                     info.stop=System.currentTimeMillis();
258                 if(allReceived()) {
259                     all_received=true;
260                     if(stop == 0)
261                         stop=System.currentTimeMillis();
262                     try {
263                         sendResults();
264                     }
265                     catch(Exception JavaDoc e) {
266                         e.printStackTrace();
267                     }
268                     if(!this.sender)
269                         dumpSenders();
270                     synchronized(this) {
271                         this.notify();
272                     }
273                 }
274             }
275         }
276         else {
277             System.err.println("-- sender " + sender + " not found in senders hashmap");
278         }
279     }
280
281     void sendResults() throws Exception JavaDoc {
282         Data d=new Data(Data.RESULTS);
283         byte[] buf;
284         d.results=(HashMap)this.senders.clone();
285         buf=generatePayload(d, null);
286         transport.send(null, buf);
287     }
288
289     boolean allReceived() {
290         MemberInfo mi;
291
292         for(Iterator it=this.senders.values().iterator(); it.hasNext();) {
293             mi=(MemberInfo)it.next();
294             if(mi.done == false)
295                 return false;
296         }
297         return true;
298     }
299
300
301     void sendMessages() throws Exception JavaDoc {
302         long total_msgs=0;
303         int msgSize=Integer.parseInt(config.getProperty("msg_size"));
304         int num_msgs=Integer.parseInt(config.getProperty("num_msgs"));
305         int logInterval=Integer.parseInt(config.getProperty("log_interval"));
306         // boolean gnuplot_output=Boolean.getBoolean(config.getProperty("gnuplot_output", "false"));
307
byte[] buf=new byte[msgSize];
308         for(int k=0; k < msgSize; k++)
309             buf[k]='.';
310         Data d=new Data(Data.DATA);
311         byte[] payload=generatePayload(d, buf);
312         for(int i=0; i < num_msgs; i++) {
313             transport.send(null, payload);
314             total_msgs++;
315             if(total_msgs % 1000 == 0) {
316                 System.out.println("++ sent " + total_msgs);
317             }
318             if(total_msgs % logInterval == 0) {
319                 //if(gnuplot_output == false)
320
// if(log.isInfoEnabled()) log.info(dumpStats(total_msgs));
321
}
322         }
323     }
324
325
326     byte[] generatePayload(Data d, byte[] buf) throws Exception JavaDoc {
327         byte[] tmp=buf != null? buf : Util.objectToByteBuffer(d);
328         byte[] payload=new byte[tmp.length +1];
329         payload[0]=intToByte(d.getType());
330         System.arraycopy(tmp, 0, payload, 1, tmp.length);
331         return payload;
332     }
333
334     private byte intToByte(int type) {
335         switch(type) {
336             case Data.DATA: return 1;
337             case Data.DISCOVERY_REQ: return 2;
338             case Data.DISCOVERY_RSP: return 3;
339             case Data.DONE: return 4;
340             case Data.RESULTS: return 5;
341             default: return 0;
342         }
343     }
344
345
346     void fetchResults() throws Exception JavaDoc {
347         System.out.println("-- sent all messages. Asking receivers if they received all messages\n");
348
349         int expected_responses=this.members.size();
350
351         // now send DONE message (periodically re-send to make up for message loss over unreliable transport)
352
// when all results have been received, dump stats and exit
353
Data d2=new Data(Data.DONE);
354         byte[] tmp=generatePayload(d2, null);
355         System.out.println("-- fetching results (from " + expected_responses + " members)");
356         synchronized(this.results) {
357             while((results.size()) < expected_responses) {
358                 transport.send(null, tmp);
359                 this.results.wait(1000);
360             }
361         }
362         System.out.println("-- received all responses");
363     }
364
365
366     void dumpResults() {
367         Object JavaDoc member;
368         Map.Entry entry;
369         HashMap map;
370         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
371         sb.append("\n-- results:\n\n");
372
373         for(Iterator it=results.entrySet().iterator(); it.hasNext();) {
374             entry=(Map.Entry)it.next();
375             member=entry.getKey();
376             map=(HashMap)entry.getValue();
377             sb.append("-- results from ").append(member).append(":\n");
378             dump(map, sb);
379             sb.append('\n');
380         }
381         System.out.println(sb.toString());
382         if(log.isInfoEnabled()) log.info(sb.toString());
383     }
384
385
386     void dumpSenders() {
387         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
388         dump(this.senders, sb);
389         System.out.println(sb.toString());
390     }
391
392     void dump(HashMap map, StringBuffer JavaDoc sb) {
393         Map.Entry entry;
394         Object JavaDoc mySender;
395         MemberInfo mi;
396         MemberInfo combined=new MemberInfo(0);
397         combined.start = Long.MAX_VALUE;
398         combined.stop = Long.MIN_VALUE;
399
400         for(Iterator it2=map.entrySet().iterator(); it2.hasNext();) {
401             entry=(Map.Entry)it2.next();
402             mySender=entry.getKey();
403             mi=(MemberInfo)entry.getValue();
404             combined.start=Math.min(combined.start, mi.start);
405             combined.stop=Math.max(combined.stop, mi.stop);
406             combined.num_msgs_expected+=mi.num_msgs_expected;
407             combined.num_msgs_received+=mi.num_msgs_received;
408             combined.total_bytes_received+=mi.total_bytes_received;
409             sb.append("sender: ").append(mySender).append(": ").append(mi).append('\n');
410         }
411         sb.append("\ncombined: ").append(combined).append('\n');
412     }
413
414
415     String JavaDoc dumpStats(long received_msgs) {
416         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
417         if(gnuplot_output)
418             sb.append(received_msgs).append(' ');
419         else
420             sb.append("\nmsgs_received=").append(received_msgs);
421
422         if(gnuplot_output)
423             sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');
424         else
425             sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0);
426
427         if(gnuplot_output)
428             sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' ');
429         else
430             sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n');
431
432         dumpThroughput(sb, received_msgs);
433         return sb.toString();
434     }
435
436     void dumpThroughput(StringBuffer JavaDoc sb, long received_msgs) {
437         double tmp;
438         long current=System.currentTimeMillis();
439
440         if(current - start == 0 || current - last_dump == 0)
441             return;
442
443         tmp=(1000 * received_msgs) / (current - start);
444         if(gnuplot_output)
445             sb.append(tmp).append(' ');
446         else
447             sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]");
448
449         tmp=(received_msgs * msg_size) / (current - start);
450         if(gnuplot_output)
451             sb.append(tmp).append(' ');
452         else
453             sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]");
454
455         tmp=(1000 * log_interval) / (current - last_dump);
456         if(gnuplot_output)
457             sb.append(tmp).append(' ');
458         else {
459             sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)=");
460             sb.append(tmp).append(" [msgs/sec]");
461         }
462
463         tmp=(log_interval * msg_size) / (current - last_dump);
464         if(gnuplot_output)
465             sb.append(tmp).append(' ');
466         else {
467             sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)=");
468             sb.append(tmp).append(" [KB/sec]\n");
469         }
470         last_dump=current;
471     }
472
473
474     void runDiscoveryPhase() throws Exception JavaDoc {
475         Data d=new Data(Data.DISCOVERY_REQ);
476         transport.send(null, generatePayload(d, null));
477         sendDiscoveryResponse();
478
479         synchronized(this.members) {
480             System.out.println("-- waiting for " + num_members + " members to join");
481             while(this.members.size() < num_members) {
482                 this.members.wait();
483                 System.out.println("-- members: " + this.members.size());
484             }
485         }
486     }
487
488     void sendDiscoveryResponse() throws Exception JavaDoc {
489         Data d2=new Data(Data.DISCOVERY_RSP);
490         if(sender) {
491             d2.sender=true;
492             d2.num_msgs=Long.parseLong(config.getProperty("num_msgs"));
493         }
494         transport.send(null, generatePayload(d2, null));
495     }
496
497
498     public static void main(String JavaDoc[] args) {
499         Properties config=new Properties();
500         boolean sender=false, verbose=false;
501         Test t=null;
502
503         for(int i=0; i < args.length; i++) {
504             if("-sender".equals(args[i])) {
505                 config.put("sender", "true");
506                 sender=true;
507                 continue;
508             }
509             if("-receiver".equals(args[i])) {
510                 config.put("sender", "false");
511                 sender=false;
512                 continue;
513             }
514             if("-config".equals(args[i])) {
515                 String JavaDoc config_file=args[++i];
516                 config.put("config", config_file);
517                 continue;
518             }
519             if("-props".equals(args[i])) {
520                 String JavaDoc props=args[++i];
521                 config.put("props", props);
522                 continue;
523             }
524             if("-verbose".equals(args[i])) {
525                 verbose=true;
526                 continue;
527             }
528             help();
529             return;
530         }
531
532         try {
533             t=new Test();
534             t.start(config, verbose);
535             t.runDiscoveryPhase();
536             if(sender) {
537                 t.sendMessages();
538                 t.fetchResults();
539                 t.dumpResults();
540             }
541             else {
542                 synchronized(t) {
543                     t.wait();
544                 }
545                 Util.sleep(2000);
546             }
547         }
548         catch(Exception JavaDoc e) {
549             e.printStackTrace();
550         }
551         finally {
552             if(t != null)
553                 t.stop();
554         }
555     }
556
557
558     static void help() {
559         System.out.println("Test [-help] ([-sender] | [-receiver]) [-config <config file>] [-props <stack config>] [-verbose]");
560     }
561
562
563 }
564
Popular Tags