1 package org.jgroups.tests; 2 3 5 6 import org.jgroups.Channel; 7 import org.jgroups.JChannel; 8 import org.jgroups.Message; 9 import org.jgroups.debug.Debugger; 10 import org.jgroups.util.Util; 11 12 import java.net.DatagramPacket ; 13 import java.net.InetAddress ; 14 import java.net.MulticastSocket ; 15 import java.nio.ByteBuffer ; 16 17 18 33 public class SpeedTest1_4 { 34 static long start, stop; 35 36 37 public static void main(String [] args) { 38 MulticastSocket sock=null; 39 Receiver receiver=null; 40 int num_msgs=1000; 41 byte[] buf; 42 DatagramPacket packet; 43 InetAddress group_addr=null; 44 int[][] matrix; 45 boolean jg=false; JChannel channel=null; 47 String props=null, loopback_props; 48 String group_name="SpeedTest-Group"; 49 Message send_msg; 50 boolean debug=false, cummulative=false; 51 Debugger debugger=null; 52 long sleep_time=1; boolean busy_sleep=false; 54 boolean yield=false; 55 int num_yields=0; 56 boolean loopback=false; 57 58 59 props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" + 60 "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;" + 61 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):" + 62 "PING(timeout=2000;num_initial_members=3):" + 63 "MERGE2(min_interval=5000;max_interval=10000):" + 64 "FD_SOCK:" + 65 "VERIFY_SUSPECT(timeout=1500):" + 66 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" + 67 "UNICAST(timeout=1200):" + 68 "pbcast.STABLE(desired_avg_gossip=10000):" + 69 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" + 70 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 72 "shun=false;print_local_addr=true):" + 73 "pbcast.STATE_TRANSFER"; 74 76 77 loopback_props="LOOPBACK:" + 78 "PING(timeout=2000;num_initial_members=3):" + 79 "MERGE2(min_interval=5000;max_interval=10000):" + 80 "FD_SOCK:" + 81 "VERIFY_SUSPECT(timeout=1500):" + 82 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" + 83 "UNICAST(timeout=5000):" + 84 "pbcast.STABLE(desired_avg_gossip=20000):" + 85 "FRAG(frag_size=16000;down_thread=false;up_thread=false):" + 86 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 87 "shun=false;print_local_addr=true):" + 88 "pbcast.STATE_TRANSFER"; 89 90 91 for(int i=0; i < args.length; i++) { 92 if("-help".equals(args[i])) { 93 help(); 94 return; 95 } 96 if("-jg".equals(args[i])) { 97 jg=true; 98 continue; 99 } 100 if("-loopback".equals(args[i])) { 101 loopback=true; 102 props=loopback_props; 103 continue; 104 } 105 if("-props".equals(args[i])) { 106 props=args[++i]; 107 continue; 108 } 109 if("-debug".equals(args[i])) { 110 debug=true; 111 continue; 112 } 113 if("-cummulative".equals(args[i])) { 114 cummulative=true; 115 continue; 116 } 117 if("-busy_sleep".equals(args[i])) { 118 busy_sleep=true; 119 continue; 120 } 121 if("-yield".equals(args[i])) { 122 yield=true; 123 num_yields++; 124 continue; 125 } 126 if("-sleep".equals(args[i])) { 127 sleep_time=Long.parseLong(args[++i]); 128 continue; 129 } 130 if("-num_msgs".equals(args[i])) { 131 num_msgs=Integer.parseInt(args[++i]); 132 continue; 133 } 134 help(); 135 return; 136 } 137 138 System.out.println("jg = " + jg + 139 "\nloopback = " + loopback + 140 "\ndebug = " + debug + 141 "\nsleep = " + sleep_time + 142 "\nbusy_sleep=" + busy_sleep + 143 "\nyield=" + yield + 144 "\nnum_yields=" + num_yields + 145 "\nnum_msgs = " + num_msgs + 146 '\n'); 147 148 149 150 try { 151 matrix=new int[num_msgs][2]; 152 for(int i=0; i < num_msgs; i++) { 153 for(int j=0; j < matrix[i].length; j++) 154 matrix[i][j]=0; 155 } 156 157 if(jg) { 158 channel=new JChannel(props); 159 channel.connect(group_name); 160 if(debug) { 161 debugger=new Debugger(channel, cummulative); 162 debugger.start(); 163 } 164 } 165 else { 166 group_addr=InetAddress.getByName("224.0.0.36"); 167 sock=new MulticastSocket (7777); 168 sock.joinGroup(group_addr); 169 } 170 171 if(debug) { 172 System.out.println("Press key to start"); 173 System.in.read(); 174 } 175 receiver=new Receiver(sock, channel, matrix, jg); 176 receiver.start(); 177 178 ByteBuffer bb=ByteBuffer.allocate(16); 179 bb.mark(); 180 181 start=System.currentTimeMillis(); 182 for(int i=0; i < num_msgs; i++) { 183 bb.reset(); 184 bb.putInt(i); 185 buf=(byte[])(bb.array()).clone(); 186 187 if(jg) { 188 send_msg=new Message(null, null, buf); 189 channel.send(send_msg); 190 } 191 else { 192 packet=new DatagramPacket (buf, buf.length, group_addr, 7777); 193 sock.send(packet); 194 } 195 if(i % 1000 == 0) 196 System.out.println("-- sent " + i); 197 198 matrix[i][0]=1; 199 if(yield) { 200 for(int k=0; k < num_yields; k++) { 201 Thread.yield(); 202 } 203 } 204 else { 205 if(sleep_time > 0) { 206 sleep(sleep_time, busy_sleep); 207 } 208 } 209 } 210 while(true) { 211 System.in.read(); 212 printMatrix(matrix); 213 } 214 } 215 catch(Exception ex) { 216 System.err.println(ex); 217 } 218 } 219 220 221 226 static void sleep(long msecs, boolean busy_sleep) { 227 if(!busy_sleep) { 228 Util.sleep(msecs); 229 return; 230 } 231 232 long start=System.currentTimeMillis(); 233 long stop=start + msecs; 234 235 while(stop > start) { 236 start=System.currentTimeMillis(); 237 } 238 } 239 240 static void printMatrix(int[][] m) { 241 int tmp=0; 242 System.out.print("not sent: "); 243 for(int i=0; i < m.length; i++) { 244 if(m[i][0] == 0) { 245 System.out.print(i + " "); 246 tmp++; 247 } 248 } 249 System.out.println("\ntotal not sent: " + tmp); 250 251 tmp=0; 252 System.out.print("not received: "); 253 for(int i=0; i < m.length; i++) { 254 if(m[i][1] == 0) { 255 System.out.print(i + " "); 256 tmp++; 257 } 258 } 259 System.out.println("\ntotal not received: " + tmp); 260 System.out.println("Press CTRL-C to kill this test"); 261 } 262 263 264 static void help() { 265 System.out.println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] " + 266 "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>] [-debug] [-cummulative]"); 267 System.out.println("Options -props -debug and -cummulative are only valid if -jg is used"); 268 } 269 270 271 static class Receiver implements Runnable { 272 Thread t=null; 273 byte[] buf=new byte[1024]; 274 MulticastSocket sock; 275 Channel channel; 276 int num_msgs=1000; 277 int[][] matrix=null; 278 boolean jg=false; 279 280 281 Receiver(MulticastSocket sock, Channel channel, int[][] matrix, boolean jg) { 282 this.sock=sock; 283 this.channel=channel; 284 this.matrix=matrix; 285 this.jg=jg; 286 num_msgs=matrix.length; 287 } 288 289 public void start() { 290 if(t == null) { 291 t=new Thread (this, "receiver thread"); 292 t.start(); 293 } 294 } 295 296 public void run() { 297 int num_received=0; 298 int number; 299 DatagramPacket packet; 300 Object obj; 301 Message msg; 302 byte[] msg_data=null; 303 long total_time; 304 double msgs_per_sec=0; 305 ByteBuffer rb=ByteBuffer.allocate(16); 306 rb.mark(); 307 308 packet=new DatagramPacket (buf, buf.length); 309 while(num_received <= num_msgs) { 310 try { 311 if(jg) { 312 obj=channel.receive(0); 313 if(obj instanceof Message) { 314 msg=(Message)obj; 315 msg_data=msg.getBuffer(); 316 } 317 else { 318 System.out.println("received non-msg: " + obj.getClass()); 319 continue; 320 } 321 } 322 else { 323 sock.receive(packet); 324 msg_data=packet.getData(); 325 } 326 327 rb.rewind(); 328 rb.put(msg_data); 329 rb.rewind(); 330 number=rb.getInt(); 331 332 matrix[number][1]=1; 333 num_received++; 335 if(num_received % 1000 == 0) 336 System.out.println("received " + num_received + " packets"); 337 if(num_received >= num_msgs) 338 break; 339 } 340 catch(Exception ex) { 341 System.err.println("receiver: " + ex); 342 } 343 } 344 stop=System.currentTimeMillis(); 345 total_time=stop - start; 346 msgs_per_sec=(num_received / (total_time / 1000.0)); 347 System.out.println("\n** Sending and receiving " + num_received + " took " + 348 total_time + " msecs (" + msgs_per_sec + " msgs/sec) **"); 349 System.exit(1); 350 } 351 } 352 353 } 354 | Popular Tags |