KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > SpeedTest1_4


1 package org.jgroups.tests;
2
3 // $Id: SpeedTest1_4.java,v 1.8 2005/04/18 16:20:25 belaban Exp $
4

5
6 import org.jgroups.Channel;
7 import org.jgroups.JChannel;
8 import org.jgroups.Message;
9 import org.jgroups.debug.Debugger;
10 import org.jgroups.util.Util;
11
12 import java.net.DatagramPacket JavaDoc;
13 import java.net.InetAddress JavaDoc;
14 import java.net.MulticastSocket JavaDoc;
15 import java.nio.ByteBuffer JavaDoc;
16
17
18 /**
19  * Same test as SpeedTest, but using NIO ByteBuffer rather than serialization. For 10000 messages, this took
20  * 25% of SpeedTest !
21  * Test time taken for multicasting n local messages (messages sent to self). Uses simple MulticastSocket.
22  * Note that packets might get dropped if Util.sleep(1) is commented out (on certain systems this has
23  * to be increased even further). If running with -jg option and Util.sleep() is commented out, there will
24  * probably be packet loss, which will be repaired (by means of retransmission) by JGroups. To see the
25  * retransmit messages, enable tracing (trace=true) in jgroups.properties and add the following lines:
26  * <pre>
27  * trace0=NAKACK.retransmit DEBUG STDOUT
28  * trace1=UNICAST.retransmit DEBUG STDOUT
29  * </pre>
30  *
31  * @author Bela Ban
32  */

33 public class SpeedTest1_4 {
34     static long start, stop;
35
36
37     public static void main(String JavaDoc[] args) {
38         MulticastSocket JavaDoc sock=null;
39         Receiver receiver=null;
40         int num_msgs=1000;
41         byte[] buf;
42         DatagramPacket JavaDoc packet;
43         InetAddress JavaDoc group_addr=null;
44         int[][] matrix;
45         boolean jg=false; // use JGroups channel instead of UDP MulticastSocket
46
JChannel channel=null;
47         String JavaDoc props=null, loopback_props;
48         String JavaDoc group_name="SpeedTest-Group";
49         Message send_msg;
50         boolean debug=false, cummulative=false;
51         Debugger debugger=null;
52         long sleep_time=1; // sleep in msecs between msg sends
53
boolean busy_sleep=false;
54         boolean yield=false;
55         int num_yields=0;
56         boolean loopback=false;
57
58
59         props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
60                 "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;" +
61                 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):" +
62                 "PING(timeout=2000;num_initial_members=3):" +
63                 "MERGE2(min_interval=5000;max_interval=10000):" +
64                 "FD_SOCK:" +
65                 "VERIFY_SUSPECT(timeout=1500):" +
66                 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" +
67                 "UNICAST(timeout=1200):" +
68                 "pbcast.STABLE(desired_avg_gossip=10000):" +
69                 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
70 // "PIGGYBACK(max_size=16000;max_wait_time=500):" +
71
"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
72                 "shun=false;print_local_addr=true):" +
73                 "pbcast.STATE_TRANSFER";
74         // "PERF(details=true)";
75

76
77         loopback_props="LOOPBACK:" +
78                 "PING(timeout=2000;num_initial_members=3):" +
79                 "MERGE2(min_interval=5000;max_interval=10000):" +
80                 "FD_SOCK:" +
81                 "VERIFY_SUSPECT(timeout=1500):" +
82                 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" +
83                 "UNICAST(timeout=5000):" +
84                 "pbcast.STABLE(desired_avg_gossip=20000):" +
85                 "FRAG(frag_size=16000;down_thread=false;up_thread=false):" +
86                 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
87                 "shun=false;print_local_addr=true):" +
88                 "pbcast.STATE_TRANSFER";
89
90
91         for(int i=0; i < args.length; i++) {
92             if("-help".equals(args[i])) {
93                 help();
94                 return;
95             }
96             if("-jg".equals(args[i])) {
97                 jg=true;
98                 continue;
99             }
100             if("-loopback".equals(args[i])) {
101                 loopback=true;
102                 props=loopback_props;
103                 continue;
104             }
105             if("-props".equals(args[i])) {
106                 props=args[++i];
107                 continue;
108             }
109             if("-debug".equals(args[i])) {
110                 debug=true;
111                 continue;
112             }
113             if("-cummulative".equals(args[i])) {
114                 cummulative=true;
115                 continue;
116             }
117             if("-busy_sleep".equals(args[i])) {
118                 busy_sleep=true;
119                 continue;
120             }
121             if("-yield".equals(args[i])) {
122                 yield=true;
123                 num_yields++;
124                 continue;
125             }
126             if("-sleep".equals(args[i])) {
127                 sleep_time=Long.parseLong(args[++i]);
128                 continue;
129             }
130             if("-num_msgs".equals(args[i])) {
131                 num_msgs=Integer.parseInt(args[++i]);
132                 continue;
133             }
134             help();
135             return;
136         }
137
138         System.out.println("jg = " + jg +
139                 "\nloopback = " + loopback +
140                 "\ndebug = " + debug +
141                 "\nsleep = " + sleep_time +
142                 "\nbusy_sleep=" + busy_sleep +
143                 "\nyield=" + yield +
144                 "\nnum_yields=" + num_yields +
145                 "\nnum_msgs = " + num_msgs +
146                            '\n');
147
148
149
150         try {
151             matrix=new int[num_msgs][2];
152             for(int i=0; i < num_msgs; i++) {
153                 for(int j=0; j < matrix[i].length; j++)
154                     matrix[i][j]=0;
155             }
156
157             if(jg) {
158                 channel=new JChannel(props);
159                 channel.connect(group_name);
160                 if(debug) {
161                     debugger=new Debugger(channel, cummulative);
162                     debugger.start();
163                 }
164             }
165             else {
166                 group_addr=InetAddress.getByName("224.0.0.36");
167                 sock=new MulticastSocket JavaDoc(7777);
168                 sock.joinGroup(group_addr);
169             }
170
171             if(debug) {
172                 System.out.println("Press key to start");
173                 System.in.read();
174             }
175             receiver=new Receiver(sock, channel, matrix, jg);
176             receiver.start();
177
178             ByteBuffer JavaDoc bb=ByteBuffer.allocate(16);
179             bb.mark();
180
181             start=System.currentTimeMillis();
182             for(int i=0; i < num_msgs; i++) {
183                 bb.reset();
184                 bb.putInt(i);
185                 buf=(byte[])(bb.array()).clone();
186
187                 if(jg) {
188                     send_msg=new Message(null, null, buf);
189                     channel.send(send_msg);
190                 }
191                 else {
192                     packet=new DatagramPacket JavaDoc(buf, buf.length, group_addr, 7777);
193                     sock.send(packet);
194                 }
195                 if(i % 1000 == 0)
196                     System.out.println("-- sent " + i);
197
198                 matrix[i][0]=1;
199                 if(yield) {
200                     for(int k=0; k < num_yields; k++) {
201                         Thread.yield();
202                     }
203                 }
204                 else {
205                     if(sleep_time > 0) {
206                         sleep(sleep_time, busy_sleep);
207                     }
208                 }
209             }
210             while(true) {
211                 System.in.read();
212                 printMatrix(matrix);
213             }
214         }
215         catch(Exception JavaDoc ex) {
216             System.err.println(ex);
217         }
218     }
219
220
221     /**
222      * On most UNIX systems, the minimum sleep time is 10-20ms. Even if we specify sleep(1), the thread will
223      * sleep for at least 10-20ms. On Windows, sleep() seems to be implemented as a busy sleep, that is the
224      * thread never relinquishes control and therefore the sleep(x) is exactly x ms long.
225      */

226     static void sleep(long msecs, boolean busy_sleep) {
227         if(!busy_sleep) {
228             Util.sleep(msecs);
229             return;
230         }
231
232         long start=System.currentTimeMillis();
233         long stop=start + msecs;
234
235         while(stop > start) {
236             start=System.currentTimeMillis();
237         }
238     }
239
240     static void printMatrix(int[][] m) {
241         int tmp=0;
242         System.out.print("not sent: ");
243         for(int i=0; i < m.length; i++) {
244             if(m[i][0] == 0) {
245                 System.out.print(i + " ");
246                 tmp++;
247             }
248         }
249         System.out.println("\ntotal not sent: " + tmp);
250
251         tmp=0;
252         System.out.print("not received: ");
253         for(int i=0; i < m.length; i++) {
254             if(m[i][1] == 0) {
255                 System.out.print(i + " ");
256                 tmp++;
257             }
258         }
259         System.out.println("\ntotal not received: " + tmp);
260         System.out.println("Press CTRL-C to kill this test");
261     }
262
263
264     static void help() {
265         System.out.println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] " +
266                 "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>] [-debug] [-cummulative]");
267         System.out.println("Options -props -debug and -cummulative are only valid if -jg is used");
268     }
269
270
271     static class Receiver implements Runnable JavaDoc {
272         Thread JavaDoc t=null;
273         byte[] buf=new byte[1024];
274         MulticastSocket JavaDoc sock;
275         Channel channel;
276         int num_msgs=1000;
277         int[][] matrix=null;
278         boolean jg=false;
279
280
281         Receiver(MulticastSocket JavaDoc sock, Channel channel, int[][] matrix, boolean jg) {
282             this.sock=sock;
283             this.channel=channel;
284             this.matrix=matrix;
285             this.jg=jg;
286             num_msgs=matrix.length;
287         }
288
289         public void start() {
290             if(t == null) {
291                 t=new Thread JavaDoc(this, "receiver thread");
292                 t.start();
293             }
294         }
295
296         public void run() {
297             int num_received=0;
298             int number;
299             DatagramPacket JavaDoc packet;
300             Object JavaDoc obj;
301             Message msg;
302             byte[] msg_data=null;
303             long total_time;
304             double msgs_per_sec=0;
305             ByteBuffer JavaDoc rb=ByteBuffer.allocate(16);
306             rb.mark();
307
308             packet=new DatagramPacket JavaDoc(buf, buf.length);
309             while(num_received <= num_msgs) {
310                 try {
311                     if(jg) {
312                         obj=channel.receive(0);
313                         if(obj instanceof Message) {
314                             msg=(Message)obj;
315                             msg_data=msg.getBuffer();
316                         }
317                         else {
318                             System.out.println("received non-msg: " + obj.getClass());
319                             continue;
320                         }
321                     }
322                     else {
323                         sock.receive(packet);
324                         msg_data=packet.getData();
325                     }
326
327                     rb.rewind();
328                     rb.put(msg_data);
329                     rb.rewind();
330                     number=rb.getInt();
331
332                     matrix[number][1]=1;
333                     // System.out.println("#set " + number);
334
num_received++;
335                     if(num_received % 1000 == 0)
336                         System.out.println("received " + num_received + " packets");
337                     if(num_received >= num_msgs)
338                         break;
339                 }
340                 catch(Exception JavaDoc ex) {
341                     System.err.println("receiver: " + ex);
342                 }
343             }
344             stop=System.currentTimeMillis();
345             total_time=stop - start;
346             msgs_per_sec=(num_received / (total_time / 1000.0));
347             System.out.println("\n** Sending and receiving " + num_received + " took " +
348                     total_time + " msecs (" + msgs_per_sec + " msgs/sec) **");
349             System.exit(1);
350         }
351     }
352
353 }
354
Popular Tags