1 package org.jgroups.tests; 2 3 5 6 import org.jgroups.Channel; 7 import org.jgroups.JChannel; 8 import org.jgroups.Message; 9 import org.jgroups.util.Util; 10 11 import java.net.DatagramPacket ; 12 import java.net.InetAddress ; 13 import java.net.MulticastSocket ; 14 import java.nio.ByteBuffer ; 15 16 17 32 public class SpeedTest_NIO { 33 static long start=0, stop=0; 34 35 36 public static void main(String [] args) { 37 MulticastSocket sock=null; 38 Receiver receiver=null; 39 int num_msgs=1000; 40 byte[] buf; 41 DatagramPacket packet; 42 InetAddress group_addr=null; 43 int[][] matrix; 44 boolean jg=false; JChannel channel=null; 46 String props=null, loopback_props; 47 String group_name="SpeedTest-Group"; 48 Message send_msg; 49 long sleep_time=1; boolean busy_sleep=false; 51 boolean yield=false; 52 int num_yields=0; 53 boolean loopback=false; 54 55 56 props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" + 57 "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;" + 58 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):" + 59 "PING(timeout=2000;num_initial_members=3):" + 60 "MERGE2(min_interval=5000;max_interval=10000):" + 61 "FD_SOCK:" + 62 "VERIFY_SUSPECT(timeout=1500):" + 63 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" + 64 "UNICAST(timeout=1200):" + 65 "pbcast.STABLE(desired_avg_gossip=10000):" + 66 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" + 67 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 69 "shun=false;print_local_addr=true):" + 70 "pbcast.STATE_TRANSFER"; 71 73 74 loopback_props="LOOPBACK:" + 75 "PING(timeout=2000;num_initial_members=3):" + 76 "MERGE2(min_interval=5000;max_interval=10000):" + 77 "FD_SOCK:" + 78 "VERIFY_SUSPECT(timeout=1500):" + 79 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" + 80 "UNICAST(timeout=5000):" + 81 "pbcast.STABLE(desired_avg_gossip=20000):" + 82 "FRAG(frag_size=16000;down_thread=false;up_thread=false):" + 83 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 84 "shun=false;print_local_addr=true):" + 85 "pbcast.STATE_TRANSFER"; 86 87 88 for(int i=0; i < args.length; i++) { 89 if("-help".equals(args[i])) { 90 help(); 91 return; 92 } 93 if("-jg".equals(args[i])) { 94 jg=true; 95 continue; 96 } 97 if("-loopback".equals(args[i])) { 98 loopback=true; 99 props=loopback_props; 100 continue; 101 } 102 if("-props".equals(args[i])) { 103 props=args[++i]; 104 continue; 105 } 106 if("-busy_sleep".equals(args[i])) { 107 busy_sleep=true; 108 continue; 109 } 110 if("-yield".equals(args[i])) { 111 yield=true; 112 num_yields++; 113 continue; 114 } 115 if("-sleep".equals(args[i])) { 116 sleep_time=Long.parseLong(args[++i]); 117 continue; 118 } 119 if("-num_msgs".equals(args[i])) { 120 num_msgs=Integer.parseInt(args[++i]); 121 continue; 122 } 123 help(); 124 return; 125 } 126 127 System.out.println("jg = " + jg + 128 "\nloopback = " + loopback + 129 "\nsleep = " + sleep_time + 130 "\nbusy_sleep=" + busy_sleep + 131 "\nyield=" + yield + 132 "\nnum_yields=" + num_yields + 133 "\nnum_msgs = " + num_msgs + 134 '\n'); 135 136 137 138 try { 139 matrix=new int[num_msgs][2]; 140 for(int i=0; i < num_msgs; i++) { 141 for(int j=0; j < matrix[i].length; j++) 142 matrix[i][j]=0; 143 } 144 145 if(jg) { 146 channel=new JChannel(props); 147 channel.connect(group_name); 148 } 149 else { 150 group_addr=InetAddress.getByName("224.0.0.36"); 151 sock=new MulticastSocket (7777); 152 sock.joinGroup(group_addr); 153 } 154 155 receiver=new Receiver(sock, channel, matrix, jg); 156 receiver.start(); 157 158 ByteBuffer bb=ByteBuffer.allocate(16); 159 bb.mark(); 160 161 start=System.currentTimeMillis(); 162 for(int i=0; i < num_msgs; i++) { 163 bb.reset(); 164 bb.putInt(i); 165 buf=(byte[])(bb.array()).clone(); 166 167 if(jg) { 168 send_msg=new Message(null, null, buf); 169 channel.send(send_msg); 170 } 171 else { 172 packet=new DatagramPacket (buf, buf.length, group_addr, 7777); 173 sock.send(packet); 174 } 175 if(i % 1000 == 0) 176 System.out.println("-- sent " + i); 177 178 matrix[i][0]=1; 179 if(yield) { 180 for(int k=0; k < num_yields; k++) { 181 Thread.yield(); 182 } 183 } 184 else { 185 if(sleep_time > 0) { 186 sleep(sleep_time, busy_sleep); 187 } 188 } 189 } 190 while(true) { 191 System.in.read(); 192 printMatrix(matrix); 193 } 194 } 195 catch(Exception ex) { 196 System.err.println(ex); 197 } 198 } 199 200 201 206 static void sleep(long msecs, boolean busy_sleep) { 207 if(!busy_sleep) { 208 Util.sleep(msecs); 209 return; 210 } 211 212 long start=System.currentTimeMillis(); 213 long stop=start + msecs; 214 215 while(stop > start) { 216 start=System.currentTimeMillis(); 217 } 218 } 219 220 static void printMatrix(int[][] m) { 221 int tmp=0; 222 System.out.print("not sent: "); 223 for(int i=0; i < m.length; i++) { 224 if(m[i][0] == 0) { 225 System.out.print(i + " "); 226 tmp++; 227 } 228 } 229 System.out.println("\ntotal not sent: " + tmp); 230 231 tmp=0; 232 System.out.print("not received: "); 233 for(int i=0; i < m.length; i++) { 234 if(m[i][1] == 0) { 235 System.out.print(i + " "); 236 tmp++; 237 } 238 } 239 System.out.println("\ntotal not received: " + tmp); 240 System.out.println("Press CTRL-C to kill this test"); 241 } 242 243 244 static void help() { 245 System.out.println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] " + 246 "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>]"); 247 System.out.println("Options -props are only valid if -jg is used"); 248 } 249 250 251 static class Receiver implements Runnable { 252 Thread t=null; 253 byte[] buf=new byte[1024]; 254 MulticastSocket sock; 255 Channel channel; 256 int num_msgs=1000; 257 int[][] matrix=null; 258 boolean jg=false; 259 260 261 Receiver(MulticastSocket sock, Channel channel, int[][] matrix, boolean jg) { 262 this.sock=sock; 263 this.channel=channel; 264 this.matrix=matrix; 265 this.jg=jg; 266 num_msgs=matrix.length; 267 } 268 269 public void start() { 270 if(t == null) { 271 t=new Thread (this, "receiver thread"); 272 t.start(); 273 } 274 } 275 276 public void run() { 277 int num_received=0; 278 int number; 279 DatagramPacket packet; 280 Object obj; 281 Message msg; 282 byte[] msg_data=null; 283 long total_time; 284 double msgs_per_sec=0; 285 ByteBuffer rb=ByteBuffer.allocate(16); 286 rb.mark(); 287 288 packet=new DatagramPacket (buf, buf.length); 289 while(num_received <= num_msgs) { 290 try { 291 if(jg) { 292 obj=channel.receive(0); 293 if(obj instanceof Message) { 294 msg=(Message)obj; 295 msg_data=msg.getBuffer(); 296 } 297 else { 298 System.out.println("received non-msg: " + obj.getClass()); 299 continue; 300 } 301 } 302 else { 303 sock.receive(packet); 304 msg_data=packet.getData(); 305 } 306 307 rb.rewind(); 308 rb.put(msg_data); 309 rb.rewind(); 310 number=rb.getInt(); 311 312 matrix[number][1]=1; 313 num_received++; 315 if(num_received % 1000 == 0) 316 System.out.println("received " + num_received + " packets"); 317 if(num_received >= num_msgs) 318 break; 319 } 320 catch(Exception ex) { 321 System.err.println("receiver: " + ex); 322 } 323 } 324 stop=System.currentTimeMillis(); 325 total_time=stop - start; 326 msgs_per_sec=(num_received / (total_time / 1000.0)); 327 System.out.println("\n** Sending and receiving " + num_received + " took " + 328 total_time + " msecs (" + msgs_per_sec + " msgs/sec) **"); 329 System.exit(1); 330 } 331 } 332 333 } 334 | Popular Tags |