KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > SpeedTest_NIO


1 package org.jgroups.tests;
2
3 // $Id: SpeedTest_NIO.java,v 1.3 2006/12/28 09:05:49 belaban Exp $
4

5
6 import org.jgroups.Channel;
7 import org.jgroups.JChannel;
8 import org.jgroups.Message;
9 import org.jgroups.util.Util;
10
11 import java.net.DatagramPacket JavaDoc;
12 import java.net.InetAddress JavaDoc;
13 import java.net.MulticastSocket JavaDoc;
14 import java.nio.ByteBuffer JavaDoc;
15
16
17 /**
18  * Same test as SpeedTest, but using NIO ByteBuffer rather than serialization. For 10000 messages, this took
19  * 25% of SpeedTest !
20  * Test time taken for multicasting n local messages (messages sent to self). Uses simple MulticastSocket.
21  * Note that packets might get dropped if Util.sleep(1) is commented out (on certain systems this has
22  * to be increased even further). If running with -jg option and Util.sleep() is commented out, there will
23  * probably be packet loss, which will be repaired (by means of retransmission) by JGroups. To see the
24  * retransmit messages, enable tracing (trace=true) in jgroups.properties and add the following lines:
25  * <pre>
26  * trace0=NAKACK.retransmit DEBUG STDOUT
27  * trace1=UNICAST.retransmit DEBUG STDOUT
28  * </pre>
29  *
30  * @author Bela Ban
31  */

32 public class SpeedTest_NIO {
33     static long start=0, stop=0;
34
35
36     public static void main(String JavaDoc[] args) {
37         MulticastSocket JavaDoc sock=null;
38         Receiver receiver=null;
39         int num_msgs=1000;
40         byte[] buf;
41         DatagramPacket JavaDoc packet;
42         InetAddress JavaDoc group_addr=null;
43         int[][] matrix;
44         boolean jg=false; // use JGroups channel instead of UDP MulticastSocket
45
JChannel channel=null;
46         String JavaDoc props=null, loopback_props;
47         String JavaDoc group_name="SpeedTest-Group";
48         Message send_msg;
49         long sleep_time=1; // sleep in msecs between msg sends
50
boolean busy_sleep=false;
51         boolean yield=false;
52         int num_yields=0;
53         boolean loopback=false;
54
55
56         props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
57                 "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;" +
58                 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):" +
59                 "PING(timeout=2000;num_initial_members=3):" +
60                 "MERGE2(min_interval=5000;max_interval=10000):" +
61                 "FD_SOCK:" +
62                 "VERIFY_SUSPECT(timeout=1500):" +
63                 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" +
64                 "UNICAST(timeout=1200):" +
65                 "pbcast.STABLE(desired_avg_gossip=10000):" +
66                 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
67 // "PIGGYBACK(max_size=16000;max_wait_time=500):" +
68
"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
69                 "shun=false;print_local_addr=true):" +
70                 "pbcast.STATE_TRANSFER";
71         // "PERF(details=true)";
72

73
74         loopback_props="LOOPBACK:" +
75                 "PING(timeout=2000;num_initial_members=3):" +
76                 "MERGE2(min_interval=5000;max_interval=10000):" +
77                 "FD_SOCK:" +
78                 "VERIFY_SUSPECT(timeout=1500):" +
79                 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" +
80                 "UNICAST(timeout=5000):" +
81                 "pbcast.STABLE(desired_avg_gossip=20000):" +
82                 "FRAG(frag_size=16000;down_thread=false;up_thread=false):" +
83                 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
84                 "shun=false;print_local_addr=true):" +
85                 "pbcast.STATE_TRANSFER";
86
87
88         for(int i=0; i < args.length; i++) {
89             if("-help".equals(args[i])) {
90                 help();
91                 return;
92             }
93             if("-jg".equals(args[i])) {
94                 jg=true;
95                 continue;
96             }
97             if("-loopback".equals(args[i])) {
98                 loopback=true;
99                 props=loopback_props;
100                 continue;
101             }
102             if("-props".equals(args[i])) {
103                 props=args[++i];
104                 continue;
105             }
106             if("-busy_sleep".equals(args[i])) {
107                 busy_sleep=true;
108                 continue;
109             }
110             if("-yield".equals(args[i])) {
111                 yield=true;
112                 num_yields++;
113                 continue;
114             }
115             if("-sleep".equals(args[i])) {
116                 sleep_time=Long.parseLong(args[++i]);
117                 continue;
118             }
119             if("-num_msgs".equals(args[i])) {
120                 num_msgs=Integer.parseInt(args[++i]);
121                 continue;
122             }
123             help();
124             return;
125         }
126
127         System.out.println("jg = " + jg +
128                 "\nloopback = " + loopback +
129                 "\nsleep = " + sleep_time +
130                 "\nbusy_sleep=" + busy_sleep +
131                 "\nyield=" + yield +
132                 "\nnum_yields=" + num_yields +
133                 "\nnum_msgs = " + num_msgs +
134                            '\n');
135
136
137
138         try {
139             matrix=new int[num_msgs][2];
140             for(int i=0; i < num_msgs; i++) {
141                 for(int j=0; j < matrix[i].length; j++)
142                     matrix[i][j]=0;
143             }
144
145             if(jg) {
146                 channel=new JChannel(props);
147                 channel.connect(group_name);
148             }
149             else {
150                 group_addr=InetAddress.getByName("224.0.0.36");
151                 sock=new MulticastSocket JavaDoc(7777);
152                 sock.joinGroup(group_addr);
153             }
154
155             receiver=new Receiver(sock, channel, matrix, jg);
156             receiver.start();
157
158             ByteBuffer JavaDoc bb=ByteBuffer.allocate(16);
159             bb.mark();
160
161             start=System.currentTimeMillis();
162             for(int i=0; i < num_msgs; i++) {
163                 bb.reset();
164                 bb.putInt(i);
165                 buf=(byte[])(bb.array()).clone();
166
167                 if(jg) {
168                     send_msg=new Message(null, null, buf);
169                     channel.send(send_msg);
170                 }
171                 else {
172                     packet=new DatagramPacket JavaDoc(buf, buf.length, group_addr, 7777);
173                     sock.send(packet);
174                 }
175                 if(i % 1000 == 0)
176                     System.out.println("-- sent " + i);
177
178                 matrix[i][0]=1;
179                 if(yield) {
180                     for(int k=0; k < num_yields; k++) {
181                         Thread.yield();
182                     }
183                 }
184                 else {
185                     if(sleep_time > 0) {
186                         sleep(sleep_time, busy_sleep);
187                     }
188                 }
189             }
190             while(true) {
191                 System.in.read();
192                 printMatrix(matrix);
193             }
194         }
195         catch(Exception JavaDoc ex) {
196             System.err.println(ex);
197         }
198     }
199
200
201     /**
202      * On most UNIX systems, the minimum sleep time is 10-20ms. Even if we specify sleep(1), the thread will
203      * sleep for at least 10-20ms. On Windows, sleep() seems to be implemented as a busy sleep, that is the
204      * thread never relinquishes control and therefore the sleep(x) is exactly x ms long.
205      */

206     static void sleep(long msecs, boolean busy_sleep) {
207         if(!busy_sleep) {
208             Util.sleep(msecs);
209             return;
210         }
211
212         long start=System.currentTimeMillis();
213         long stop=start + msecs;
214
215         while(stop > start) {
216             start=System.currentTimeMillis();
217         }
218     }
219
220     static void printMatrix(int[][] m) {
221         int tmp=0;
222         System.out.print("not sent: ");
223         for(int i=0; i < m.length; i++) {
224             if(m[i][0] == 0) {
225                 System.out.print(i + " ");
226                 tmp++;
227             }
228         }
229         System.out.println("\ntotal not sent: " + tmp);
230
231         tmp=0;
232         System.out.print("not received: ");
233         for(int i=0; i < m.length; i++) {
234             if(m[i][1] == 0) {
235                 System.out.print(i + " ");
236                 tmp++;
237             }
238         }
239         System.out.println("\ntotal not received: " + tmp);
240         System.out.println("Press CTRL-C to kill this test");
241     }
242
243
244     static void help() {
245         System.out.println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] " +
246                 "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>]");
247         System.out.println("Options -props are only valid if -jg is used");
248     }
249
250
251     static class Receiver implements Runnable JavaDoc {
252         Thread JavaDoc t=null;
253         byte[] buf=new byte[1024];
254         MulticastSocket JavaDoc sock;
255         Channel channel;
256         int num_msgs=1000;
257         int[][] matrix=null;
258         boolean jg=false;
259
260
261         Receiver(MulticastSocket JavaDoc sock, Channel channel, int[][] matrix, boolean jg) {
262             this.sock=sock;
263             this.channel=channel;
264             this.matrix=matrix;
265             this.jg=jg;
266             num_msgs=matrix.length;
267         }
268
269         public void start() {
270             if(t == null) {
271                 t=new Thread JavaDoc(this, "receiver thread");
272                 t.start();
273             }
274         }
275
276         public void run() {
277             int num_received=0;
278             int number;
279             DatagramPacket JavaDoc packet;
280             Object JavaDoc obj;
281             Message msg;
282             byte[] msg_data=null;
283             long total_time;
284             double msgs_per_sec=0;
285             ByteBuffer JavaDoc rb=ByteBuffer.allocate(16);
286             rb.mark();
287
288             packet=new DatagramPacket JavaDoc(buf, buf.length);
289             while(num_received <= num_msgs) {
290                 try {
291                     if(jg) {
292                         obj=channel.receive(0);
293                         if(obj instanceof Message) {
294                             msg=(Message)obj;
295                             msg_data=msg.getBuffer();
296                         }
297                         else {
298                             System.out.println("received non-msg: " + obj.getClass());
299                             continue;
300                         }
301                     }
302                     else {
303                         sock.receive(packet);
304                         msg_data=packet.getData();
305                     }
306
307                     rb.rewind();
308                     rb.put(msg_data);
309                     rb.rewind();
310                     number=rb.getInt();
311
312                     matrix[number][1]=1;
313                     // System.out.println("#set " + number);
314
num_received++;
315                     if(num_received % 1000 == 0)
316                         System.out.println("received " + num_received + " packets");
317                     if(num_received >= num_msgs)
318                         break;
319                 }
320                 catch(Exception JavaDoc ex) {
321                     System.err.println("receiver: " + ex);
322                 }
323             }
324             stop=System.currentTimeMillis();
325             total_time=stop - start;
326             msgs_per_sec=(num_received / (total_time / 1000.0));
327             System.out.println("\n** Sending and receiving " + num_received + " took " +
328                     total_time + " msecs (" + msgs_per_sec + " msgs/sec) **");
329             System.exit(1);
330         }
331     }
332
333 }
334
Popular Tags