1 3 4 package org.jgroups.tests; 5 6 7 import org.jgroups.Channel; 8 import org.jgroups.JChannel; 9 import org.jgroups.Message; 10 import org.jgroups.conf.ConfiguratorFactory; 11 import org.jgroups.conf.ProtocolStackConfigurator; 12 import org.jgroups.debug.Debugger; 13 import org.jgroups.util.ExposedByteArrayOutputStream; 14 import org.jgroups.util.Util; 15 16 import java.io.ByteArrayInputStream ; 17 import java.io.DataInputStream ; 18 import java.io.DataOutputStream ; 19 import java.io.IOException ; 20 import java.net.DatagramPacket ; 21 import java.net.DatagramSocket ; 22 import java.net.InetAddress ; 23 import java.net.MulticastSocket ; 24 25 26 34 public class SpeedTest { 35 static long start, stop; 36 private static final String LOOPBACK="LOOPBACK(down_thread=false;up_thread=false)"; 37 38 39 public static void main(String [] args) { 40 DatagramSocket sock=null; 41 Receiver receiver; 42 int num_msgs=1000, num_sent=0, group_port=7500, num_yields=0; 43 DatagramPacket packet; 44 InetAddress group_addr=null; 45 int[][] matrix; 46 boolean jg=false; JChannel channel=null; 48 String group_name="SpeedTest-Group"; 49 Message send_msg; 50 boolean debug=false, cummulative=false, busy_sleep=false, yield=false, loopback=false; 51 Debugger debugger=null; 52 long sleep_time=1; ExposedByteArrayOutputStream output=new ExposedByteArrayOutputStream(64); 54 String props; 55 56 57 props="UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" + 58 "ucast_send_buf_size=32000;ucast_recv_buf_size=64000;" + 59 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):" + 60 "PING(timeout=2000;num_initial_members=3):" + 61 "MERGE2(min_interval=5000;max_interval=10000):" + 62 "FD_SOCK:" + 63 "VERIFY_SUSPECT(timeout=1500):" + 64 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,800,1200,2400,4800):" + 65 "UNICAST(timeout=1200):" + 66 "pbcast.STABLE(desired_avg_gossip=10000):" + 67 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" + 68 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 69 "shun=false;print_local_addr=true):" + 70 "pbcast.STATE_TRANSFER"; 71 73 74 75 for(int i=0; i < args.length; i++) { 76 if("-help".equals(args[i])) { 77 help(); 78 return; 79 } 80 if("-jg".equals(args[i])) { 81 jg=true; 82 continue; 83 } 84 if("-loopback".equals(args[i])) { 85 loopback=true; 86 continue; 87 } 88 if("-props".equals(args[i])) { 89 props=args[++i]; 90 continue; 91 } 92 if("-debug".equals(args[i])) { 93 debug=true; 94 continue; 95 } 96 if("-cummulative".equals(args[i])) { 97 cummulative=true; 98 continue; 99 } 100 if("-busy_sleep".equals(args[i])) { 101 busy_sleep=true; 102 continue; 103 } 104 if("-yield".equals(args[i])) { 105 yield=true; 106 num_yields++; 107 continue; 108 } 109 if("-sleep".equals(args[i])) { 110 sleep_time=Long.parseLong(args[++i]); 111 continue; 112 } 113 if("-num_msgs".equals(args[i])) { 114 num_msgs=Integer.parseInt(args[++i]); 115 continue; 116 } 117 help(); 118 return; 119 } 120 121 System.out.println("jg = " + jg + 122 "\nloopback = " + loopback + 123 "\ndebug = " + debug + 124 "\nsleep = " + sleep_time + 125 "\nbusy_sleep=" + busy_sleep + 126 "\nyield=" + yield + 127 "\nnum_yields=" + num_yields + 128 "\nnum_msgs = " + num_msgs + 129 '\n'); 130 131 132 133 try { 134 matrix=new int[num_msgs][2]; 135 for(int i=0; i < num_msgs; i++) { 136 for(int j=0; j < matrix[i].length; j++) 137 matrix[i][j]=0; 138 } 139 140 if(jg) { 141 if(loopback) { 142 ProtocolStackConfigurator conf=ConfiguratorFactory.getStackConfigurator(props); 143 String tmp=conf.getProtocolStackString(); 144 int index=tmp.indexOf(':'); 145 props=LOOPBACK + tmp.substring(index); 146 } 147 channel=new JChannel(props); 148 channel.connect(group_name); 150 if(debug) { 151 debugger=new Debugger(channel, cummulative); 152 debugger.start(); 153 } 154 } 155 else { 156 group_addr=InetAddress.getByName("224.0.0.36"); 157 sock=new DatagramSocket (); 158 } 159 160 if(debug) { 161 System.out.println("Press key to start"); 162 System.in.read(); 163 } 164 receiver=new Receiver(group_addr, group_port, channel, matrix, jg); 165 receiver.start(); 166 167 byte[] buf; 168 DataOutputStream out; 169 170 start=System.currentTimeMillis(); 171 for(int i=0; i < num_msgs; i++) { 172 output.reset(); 174 out=new DataOutputStream (output); 175 out.writeInt(i); 176 out.flush(); 177 buf=output.getRawBuffer(); 178 out.close(); 179 180 if(jg) { 181 send_msg=new Message(null, null, buf, 0, buf.length); 182 channel.send(send_msg); 183 } 184 else { 185 packet=new DatagramPacket (buf, buf.length, group_addr, group_port); 186 sock.send(packet); 187 } 188 num_sent++; 189 if(num_sent % 1000 == 0) 190 System.out.println("-- sent " + num_sent); 191 192 matrix[i][0]=1; 193 if(yield) { 194 for(int k=0; k < num_yields; k++) { 195 Thread.yield(); 196 } 197 } 198 else { 199 if(sleep_time > 0) { 200 sleep(sleep_time, busy_sleep); 201 } 202 } 203 } 204 while(true) { 205 System.in.read(); 206 printMatrix(matrix); 207 } 208 } 209 catch(Exception ex) { 210 ex.printStackTrace(); 211 System.exit(-1); 212 } 213 } 214 215 216 217 222 static void sleep(long msecs, boolean busy_sleep) { 223 if(!busy_sleep) { 224 Util.sleep(msecs); 225 return; 226 } 227 228 long start=System.currentTimeMillis(); 229 long stop=start + msecs; 230 231 while(stop > start) { 232 start=System.currentTimeMillis(); 233 } 234 } 235 236 static void printMatrix(int[][] m) { 237 int tmp=0; 238 System.out.print("not sent: "); 239 for(int i=0; i < m.length; i++) { 240 if(m[i][0] == 0) { 241 System.out.print(i + " "); 242 tmp++; 243 } 244 } 245 System.out.println("\ntotal not sent: " + tmp); 246 247 tmp=0; 248 System.out.print("not received: "); 249 for(int i=0; i < m.length; i++) { 250 if(m[i][1] == 0) { 251 System.out.print(i + " "); 252 tmp++; 253 } 254 } 255 System.out.println("\ntotal not received: " + tmp); 256 System.out.println("Press CTRL-C to kill this test"); 257 } 258 259 260 static void help() { 261 System.out.println("SpeedTest [-help] [-num_msgs <num>] [-sleep <sleeptime in msecs between messages>] " + 262 "[-busy_sleep] [-yield] [-jg] [-loopback] [-props <channel properties>] [-debug] [-cummulative]"); 263 System.out.println("Options -props -debug and -cummulative are only valid if -jg is used"); 264 } 265 266 267 static class Receiver implements Runnable { 268 Thread t=null; 269 byte[] buf=new byte[1024]; 270 MulticastSocket sock; 271 Channel channel; 272 int num_msgs=1000; 273 int[][] matrix=null; 274 boolean jg=false; 275 276 Receiver(InetAddress group_addr, int group_port, Channel channel, int[][] matrix, boolean jg) throws IOException { 277 this.channel=channel; 278 this.matrix=matrix; 279 this.jg=jg; 280 num_msgs=matrix.length; 281 if(group_addr != null) { 282 sock=new MulticastSocket (group_port); 283 sock.joinGroup(group_addr); 284 } 285 } 286 287 public void start() { 288 if(t == null) { 289 t=new Thread (this, "receiver thread"); 290 t.start(); 291 } 292 } 293 294 public void run() { 295 int num_received=0; 296 int number; 297 DatagramPacket packet; 298 Object obj; 299 long total_time; 300 double msgs_per_sec=0; 301 DataInputStream in; 302 303 packet=new DatagramPacket (buf, buf.length); 304 while(num_received <= num_msgs) { 305 try { 306 if(jg) { 307 obj=channel.receive(0); 308 if(obj instanceof Message) { 309 } 311 else { 312 System.out.println("received non-msg: " + obj.getClass()); 313 continue; 314 } 315 } 316 else { 317 sock.receive(packet); 318 } 319 320 in=new DataInputStream (new ByteArrayInputStream (buf)); 322 number=in.readInt(); 323 matrix[number][1]=1; 324 num_received++; 326 if(num_received % 1000 == 0) 327 System.out.println("received " + num_received + " packets"); 328 if(num_received >= num_msgs) 329 break; 330 } 331 catch(Exception ex) { 332 System.err.println(ex); 333 } 334 } 335 stop=System.currentTimeMillis(); 336 total_time=stop - start; 337 msgs_per_sec=(num_received / (total_time / 1000.0)); 338 System.out.println("\n** Sending and receiving " + num_received + " took " + 339 total_time + " msecs (" + msgs_per_sec + " msgs/sec) **"); 340 System.exit(1); 341 } 342 } 343 344 } 345 | Popular Tags |