KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > SpeedTest


1 // $Id: SpeedTest.java,v 1.16 2005/04/20 06:31:18 belaban Exp $
2

3
4 package org.jgroups.tests;
5
6
7 import org.jgroups.Channel;
8 import org.jgroups.JChannel;
9 import org.jgroups.Message;
10 import org.jgroups.conf.ConfiguratorFactory;
11 import org.jgroups.conf.ProtocolStackConfigurator;
12 import org.jgroups.debug.Debugger;
13 import org.jgroups.util.ExposedByteArrayOutputStream;
14 import org.jgroups.util.Util;
15
16 import java.io.ByteArrayInputStream JavaDoc;
17 import java.io.DataInputStream JavaDoc;
18 import java.io.DataOutputStream JavaDoc;
19 import java.io.IOException JavaDoc;
20 import java.net.DatagramPacket JavaDoc;
21 import java.net.DatagramSocket JavaDoc;
22 import java.net.InetAddress JavaDoc;
23 import java.net.MulticastSocket JavaDoc;
24
25
26 /**
27  * Test time taken for multicasting n local messages (messages sent to self). Uses simple MulticastSocket.
28  * Note that packets might get dropped if Util.sleep(1) is commented out (on certain systems this has
29  * to be increased even further). If running with -jg option and Util.sleep() is commented out, there will
30  * probably be packet loss, which will be repaired (by means of retransmission) by JGroups.
31  * @author Bela Ban
32  * @version $Id: SpeedTest.java,v 1.16 2005/04/20 06:31:18 belaban Exp $
33  */

34 public class SpeedTest {
35     static long start, stop;
36     private static final String JavaDoc LOOPBACK="LOOPBACK(down_thread=false;up_thread=false)";
37
38
39     public static void main(String JavaDoc[] args) {
40         DatagramSocket JavaDoc sock=null;
41         Receiver receiver;
42         int num_msgs=1000, num_sent=0, group_port=7500, num_yields=0;
43         DatagramPacket JavaDoc packet;
44         InetAddress JavaDoc group_addr=null;
45         int[][] matrix;
46         boolean jg=false; // use JGroups channel instead of UDP MulticastSocket
47
JChannel channel=null;
48         String JavaDoc group_name="SpeedTest-Group";
49         Message send_msg;
50         boolean debug=false, cummulative=false, busy_sleep=false, yield=false, loopback=false;
51         Debugger debugger=null;
52         long sleep_time=1; // sleep in msecs between msg sends
53
ExposedByteArrayOutputStream output=new ExposedByteArrayOutputStream(64);
54         String JavaDoc props;
55
56
57         props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
58                 "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;" +
59                 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):" +
60                 "PING(timeout=2000;num_initial_members=3):" +
61                 "MERGE2(min_interval=5000;max_interval=10000):" +
62                 "FD_SOCK:" +
63                 "VERIFY_SUSPECT(timeout=1500):" +
64                 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" +
65                 "UNICAST(timeout=1200):" +
66                 "pbcast.STABLE(desired_avg_gossip=10000):" +
67                 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
68                 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
69                 "shun=false;print_local_addr=true):" +
70                 "pbcast.STATE_TRANSFER";
71                 // "PERF(details=true)";
72

73
74
75         for(int i=0; i < args.length; i++) {
76             if("-help".equals(args[i])) {
77                 help();
78                 return;
79             }
80             if("-jg".equals(args[i])) {
81                 jg=true;
82                 continue;
83             }
84             if("-loopback".equals(args[i])) {
85                 loopback=true;
86                 continue;
87             }
88             if("-props".equals(args[i])) {
89                 props=args[++i];
90                 continue;
91             }
92             if("-debug".equals(args[i])) {
93                 debug=true;
94                 continue;
95             }
96             if("-cummulative".equals(args[i])) {
97                 cummulative=true;
98                 continue;
99             }
100             if("-busy_sleep".equals(args[i])) {
101                 busy_sleep=true;
102                 continue;
103             }
104             if("-yield".equals(args[i])) {
105                 yield=true;
106                 num_yields++;
107                 continue;
108             }
109             if("-sleep".equals(args[i])) {
110                 sleep_time=Long.parseLong(args[++i]);
111                 continue;
112             }
113             if("-num_msgs".equals(args[i])) {
114                 num_msgs=Integer.parseInt(args[++i]);
115                 continue;
116             }
117             help();
118             return;
119         }
120
121         System.out.println("jg = " + jg +
122                 "\nloopback = " + loopback +
123                 "\ndebug = " + debug +
124                 "\nsleep = " + sleep_time +
125                 "\nbusy_sleep=" + busy_sleep +
126                 "\nyield=" + yield +
127                 "\nnum_yields=" + num_yields +
128                 "\nnum_msgs = " + num_msgs +
129                            '\n');
130
131
132
133         try {
134             matrix=new int[num_msgs][2];
135             for(int i=0; i < num_msgs; i++) {
136                 for(int j=0; j < matrix[i].length; j++)
137                     matrix[i][j]=0;
138             }
139
140             if(jg) {
141                 if(loopback) {
142                     ProtocolStackConfigurator conf=ConfiguratorFactory.getStackConfigurator(props);
143                     String JavaDoc tmp=conf.getProtocolStackString();
144                     int index=tmp.indexOf(':');
145                     props=LOOPBACK + tmp.substring(index);
146                 }
147                 channel=new JChannel(props);
148                 // System.out.println("props:\n" + channel.getProperties());
149
channel.connect(group_name);
150                 if(debug) {
151                     debugger=new Debugger(channel, cummulative);
152                     debugger.start();
153                 }
154             }
155             else {
156                 group_addr=InetAddress.getByName("224.0.0.36");
157                 sock=new DatagramSocket JavaDoc();
158             }
159
160             if(debug) {
161                 System.out.println("Press key to start");
162                 System.in.read();
163             }
164             receiver=new Receiver(group_addr, group_port, channel, matrix, jg);
165             receiver.start();
166
167             byte[] buf;
168             DataOutputStream JavaDoc out;
169
170             start=System.currentTimeMillis();
171             for(int i=0; i < num_msgs; i++) {
172                 // buf=Util.objectToByteBuffer(new Integer(i));
173
output.reset();
174                 out=new DataOutputStream JavaDoc(output);
175                 out.writeInt(i);
176                 out.flush();
177                 buf=output.getRawBuffer();
178                 out.close();
179
180                 if(jg) {
181                     send_msg=new Message(null, null, buf, 0, buf.length);
182                     channel.send(send_msg);
183                 }
184                 else {
185                     packet=new DatagramPacket JavaDoc(buf, buf.length, group_addr, group_port);
186                     sock.send(packet);
187                 }
188                 num_sent++;
189                 if(num_sent % 1000 == 0)
190                     System.out.println("-- sent " + num_sent);
191
192                 matrix[i][0]=1;
193                 if(yield) {
194                     for(int k=0; k < num_yields; k++) {
195                         Thread.yield();
196                     }
197                 }
198                 else {
199                     if(sleep_time > 0) {
200                         sleep(sleep_time, busy_sleep);
201                     }
202                 }
203             }
204             while(true) {
205                 System.in.read();
206                 printMatrix(matrix);
207             }
208         }
209         catch(Exception JavaDoc ex) {
210             ex.printStackTrace();
211             System.exit(-1);
212         }
213     }
214
215
216
217     /**
218      * On most UNIX systems, the minimum sleep time is 10-20ms. Even if we specify sleep(1), the thread will
219      * sleep for at least 10-20ms. On Windows, sleep() seems to be implemented as a busy sleep, that is the
220      * thread never relinquishes control and therefore the sleep(x) is exactly x ms long.
221      */

222     static void sleep(long msecs, boolean busy_sleep) {
223         if(!busy_sleep) {
224             Util.sleep(msecs);
225             return;
226         }
227
228         long start=System.currentTimeMillis();
229         long stop=start + msecs;
230
231         while(stop > start) {
232             start=System.currentTimeMillis();
233         }
234     }
235
236     static void printMatrix(int[][] m) {
237         int tmp=0;
238         System.out.print("not sent: ");
239         for(int i=0; i < m.length; i++) {
240             if(m[i][0] == 0) {
241                 System.out.print(i + " ");
242                 tmp++;
243             }
244         }
245         System.out.println("\ntotal not sent: " + tmp);
246
247         tmp=0;
248         System.out.print("not received: ");
249         for(int i=0; i < m.length; i++) {
250             if(m[i][1] == 0) {
251                 System.out.print(i + " ");
252                 tmp++;
253             }
254         }
255         System.out.println("\ntotal not received: " + tmp);
256         System.out.println("Press CTRL-C to kill this test");
257     }
258
259
260     static void help() {
261         System.out.println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] " +
262                 "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>] [-debug] [-cummulative]");
263         System.out.println("Options -props -debug and -cummulative are only valid if -jg is used");
264     }
265
266
267     static class Receiver implements Runnable JavaDoc {
268         Thread JavaDoc t=null;
269         byte[] buf=new byte[1024];
270         MulticastSocket JavaDoc sock;
271         Channel channel;
272         int num_msgs=1000;
273         int[][] matrix=null;
274         boolean jg=false;
275
276         Receiver(InetAddress JavaDoc group_addr, int group_port, Channel channel, int[][] matrix, boolean jg) throws IOException JavaDoc {
277             this.channel=channel;
278             this.matrix=matrix;
279             this.jg=jg;
280             num_msgs=matrix.length;
281             if(group_addr != null) {
282                 sock=new MulticastSocket JavaDoc(group_port);
283                 sock.joinGroup(group_addr);
284             }
285         }
286
287         public void start() {
288             if(t == null) {
289                 t=new Thread JavaDoc(this, "receiver thread");
290                 t.start();
291             }
292         }
293
294         public void run() {
295             int num_received=0;
296             int number;
297             DatagramPacket JavaDoc packet;
298             Object JavaDoc obj;
299             long total_time;
300             double msgs_per_sec=0;
301             DataInputStream JavaDoc in;
302
303             packet=new DatagramPacket JavaDoc(buf, buf.length);
304             while(num_received <= num_msgs) {
305                 try {
306                     if(jg) {
307                         obj=channel.receive(0);
308                         if(obj instanceof Message) {
309                             // msg=(Message)obj;
310
}
311                         else {
312                             System.out.println("received non-msg: " + obj.getClass());
313                             continue;
314                         }
315                     }
316                     else {
317                         sock.receive(packet);
318                     }
319
320                     // number=((Integer)Util.objectFromByteBuffer(msg_data)).intValue();
321
in=new DataInputStream JavaDoc(new ByteArrayInputStream JavaDoc(buf));
322                     number=in.readInt();
323                     matrix[number][1]=1;
324                     // System.out.println("-- received " + number);
325
num_received++;
326                     if(num_received % 1000 == 0)
327                         System.out.println("received " + num_received + " packets");
328                     if(num_received >= num_msgs)
329                         break;
330                 }
331                 catch(Exception JavaDoc ex) {
332                     System.err.println(ex);
333                 }
334             }
335             stop=System.currentTimeMillis();
336             total_time=stop - start;
337             msgs_per_sec=(num_received / (total_time / 1000.0));
338             System.out.println("\n** Sending and receiving " + num_received + " took " +
339                     total_time + " msecs (" + msgs_per_sec + " msgs/sec) **");
340             System.exit(1);
341         }
342     }
343
344 }
345
Popular Tags