1 3 package org.jgroups.tests; 4 5 import org.jgroups.*; 6 import org.jgroups.blocks.PullPushAdapter; 7 import org.jgroups.util.Util; 8 9 import java.io.IOException ; 10 import java.io.ObjectInput ; 11 import java.io.ObjectOutput ; 12 import java.util.HashMap ; 13 import java.util.Iterator ; 14 import java.util.Map ; 15 import java.util.Vector ; 16 17 24 public class PerfTest implements MessageListener, MembershipListener{ 25 26 27 HashMap data=new HashMap (); 28 29 30 Vector mbrs=new Vector (); 31 32 33 String props=null; 34 35 36 PullPushAdapter adapter=null; 37 38 39 JChannel ch=null; 40 41 42 boolean sender=true; 43 44 45 long sleep_time=10; 46 47 48 boolean busy_sleep=false; 49 50 51 int num_bursts=100; 52 53 54 int num_msgs_per_burst=10; 55 56 57 int msg_size=10000; 58 59 60 byte[] buf=null; 61 62 63 long sent_msgs=0; 64 65 final static String HDRNAME="PerfHeaderName"; 66 67 68 69 70 public PerfTest(String props, int num_bursts, int num_msgs_per_burst, 71 int msg_size, long sleep_time, boolean sender) { 72 this.props=props; 73 this.num_bursts=num_bursts; 74 this.num_msgs_per_burst=num_msgs_per_burst; 75 this.msg_size=msg_size; 76 this.sleep_time=sleep_time; 77 this.buf=new byte[msg_size]; 78 this.sender=sender; 79 } 80 81 82 83 public void start() throws Exception { 84 try { 85 ch=new JChannel(props); 86 ch.connect("PerfTest-Group"); 87 adapter=new PullPushAdapter(ch, this, this); 88 mainLoop(); 89 } 90 finally { 91 if(ch != null) 92 ch.close(); 93 } 94 } 95 96 void mainLoop() throws Exception { 97 boolean looping=true; 98 int choice; 99 while(looping) { 100 choice=choice(); 101 switch(choice) { 102 case 'q': case 'x': 103 looping=false; 104 break; 105 case 's': 106 MyHeader hdr=new MyHeader(MyHeader.START, num_bursts * num_msgs_per_burst); 107 Message start_msg=new Message(null, null, null); 108 start_msg.putHeader(HDRNAME, hdr); 109 adapter.send(start_msg); 110 break; 111 case 'c': 112 Message clear_msg=new Message(); 113 clear_msg.putHeader(HDRNAME, new MyHeader(MyHeader.CLEAR, 0)); 114 adapter.send(clear_msg); 115 break; 116 case 't': 117 printStats(); 118 break; 119 case 'p': 120 printParams(); 121 break; 122 case 'v': 123 System.out.println("-- view: " + ch.getView()); 124 break; 125 case 'a': 126 printStatsForAllSenders(); 127 break; 128 } 129 } 130 } 131 132 133 private void printStatsForAllSenders() { 134 long start_time=0, stop_time=0, total_time; 135 Entry entry; 136 int num_msgs=0, num_senders=0; 137 138 for(Iterator it=data.values().iterator(); it.hasNext();) { 139 entry=(Entry)it.next(); 140 if(entry.num_received > 0) { 141 num_msgs+=entry.num_received; 142 num_senders++; 143 144 if(start_time == 0) 146 start_time=entry.start; 147 else { 148 start_time=Math.min(start_time, entry.start); 149 } 150 151 if(stop_time == 0) { 153 stop_time=entry.stop; 154 } 155 else { 156 stop_time=Math.max(stop_time, entry.stop); 157 } 158 } 159 } 160 161 total_time=stop_time - start_time; 162 163 StringBuffer sb=new StringBuffer (); 164 sb.append("total number of messages sent by me: ").append(sent_msgs).append('\n'); 165 sb.append("total number of messages received: ").append(num_msgs).append('\n'); 166 sb.append("total number of senders: ").append(num_senders).append('\n'); 167 sb.append("total time: ").append(total_time).append(" ms\n"); 168 sb.append("msgs/sec: ").append((double)num_msgs / (total_time/1000.0)).append('\n'); 169 sb.append("throughput (kb/sec): ").append((num_msgs * msg_size/1000.0) / (total_time / 1000.0)).append('\n'); 170 System.out.println(sb.toString()); 171 } 172 173 174 private void printParams() { 175 System.out.println("num_bursts: " + num_bursts + '\n' + 176 "num_msgs_per_burst: " + num_msgs_per_burst + '\n' + 177 "msg_size: " + msg_size + '\n' + 178 "sleep_time: " + sleep_time + '\n' + 179 "sender: " + sender); 180 } 181 182 private void printStats() { 183 for(Iterator it=data.entrySet().iterator(); it.hasNext();) { 184 Map.Entry entry=(Map.Entry )it.next(); 185 System.out.println("stats for " + entry.getKey() + ""); 186 System.out.println(((Entry)entry.getValue()).printStats() + '\n'); 187 } 188 } 189 190 191 void sendMessages() { 192 MyHeader hdr; 193 Message msg; 194 int seqno=0; 195 long start, stop; 196 197 if(sender == false) { 198 System.out.println("-- I'm not a sender; will not send messages"); 199 return; 200 } 201 else { 202 System.out.println("-- sending " + num_bursts * num_msgs_per_burst + " msgs"); 203 } 204 205 sent_msgs=0; 206 207 try { 208 start=System.currentTimeMillis(); 209 for(int i=0; i < num_bursts; i++) { 210 for(int j=0; j < num_msgs_per_burst; j++) { 211 hdr=new MyHeader(MyHeader.DATA, seqno++); 212 msg=new Message(null, null, buf); 213 msg.putHeader(HDRNAME, hdr); 214 adapter.send(msg); 215 sent_msgs++; 216 if(sent_msgs % 100 == 0) 217 System.out.println("++ sent " + sent_msgs); 218 } 219 Util.sleep(sleep_time); 220 } 221 stop=System.currentTimeMillis(); 222 System.out.println("-- sent " + num_bursts * num_msgs_per_burst + " msgs (in " + 223 (stop-start) + " ms)"); 224 } 228 catch(Throwable t) { 229 t.printStackTrace(); 230 } 231 } 232 233 234 235 int choice() throws Exception { 236 System.out.println("s=send, c=clear, t=print stats, p=print parameters v=view, " + 237 "a=times for all messages, q=quit\nChoice: "); 238 System.out.flush(); 239 System.in.skip(System.in.available()); 240 int c=System.in.read(); 241 System.out.flush(); 242 return c; 243 } 244 245 246 public void receive(Message msg) { 247 Address sender=msg.getSrc(); 248 MyHeader hdr=(MyHeader)msg.removeHeader(HDRNAME); 249 if(hdr == null) { 250 System.err.println("-- error: header was null"); 251 return; 252 } 253 switch(hdr.type) { 254 case MyHeader.START: 255 updateTimestamp(); 256 257 new Thread () { 258 public void run() { 259 sendMessages(); 262 } 263 }.start(); 264 265 266 break; 267 case MyHeader.DATA: 268 Entry entry=(Entry)data.get(sender); 269 if(entry == null) { 270 System.err.println("-- received a message from " + sender + ", who is not in the list"); 271 } 272 else { 273 entry.add(hdr.seqno); 274 if((hdr.seqno) % 100 == 0) 275 System.out.println("-- received " + sender + ':' + hdr.seqno); 276 if(entry.getNumReceived() >= num_bursts * num_msgs_per_burst) { 277 if(entry.done()) 278 System.out.println("*--* " + sender + " DONE"); 279 } 280 } 281 break; 282 case MyHeader.DONE: 283 284 break; 285 case MyHeader.CLEAR: 286 clear(); 287 break; 288 default: 289 break; 290 } 291 } 292 293 private void updateTimestamp() { 294 for(Iterator it=data.values().iterator(); it.hasNext();) { 295 Entry entry=(Entry)it.next(); 296 entry.start=System.currentTimeMillis(); 297 } 298 } 299 300 void clear() { 301 System.out.println("-- clearing the data"); 302 data.clear(); 303 for(int i=0; i < mbrs.size(); i++) 304 data.put(mbrs.elementAt(i), new Entry(num_bursts * num_msgs_per_burst)); 305 } 306 307 public byte[] getState() { 308 return null; 309 } 310 311 public void setState(byte[] state) { 312 ; 313 } 314 315 public void viewAccepted(View new_view) { 316 System.out.println("-- new view: " + new_view.getMembers()); 317 mbrs.clear(); 318 mbrs.addAll(new_view.getMembers()); 319 clear(); 320 } 321 322 public void suspect(Address suspected_mbr) { 323 ; 324 } 325 326 public void block() { 327 ; 328 } 329 330 331 332 public static void main(String [] args) { 333 String props=null; 334 int num_bursts=100; 335 int num_msgs_per_burst=10; 336 long sleep_time=10; 337 int msg_size=10000; boolean sender=true; 339 340 PerfTest t; 341 342 343 for(int i=0; i < args.length; i++) { 344 if("-props".equals(args[i])) { 345 props=args[++i]; 346 continue; 347 } 348 if("-num_bursts".equals(args[i])) { 349 num_bursts=Integer.parseInt(args[++i]); 350 continue; 351 } 352 if("-num_msgs_per_burst".equals(args[i])) { 353 num_msgs_per_burst=Integer.parseInt(args[++i]); 354 continue; 355 } 356 if("-sleep_time".equals(args[i])) { 357 sleep_time=Long.parseLong(args[++i]); 358 continue; 359 } 360 if("-msg_size".equals(args[i])) { 361 msg_size=Integer.parseInt(args[++i]); 362 continue; 363 } 364 if("-sender".equals(args[i])) { 365 sender=Boolean.valueOf(args[++i]).booleanValue(); 366 continue; 367 } 368 help(); 369 return; 370 } 371 try { 372 t=new PerfTest(props, num_bursts, num_msgs_per_burst, msg_size, sleep_time, sender); 373 t.start(); 374 } 375 catch(Throwable ex) { 376 ex.printStackTrace(); 377 } 378 } 379 380 static void help() { 381 System.out.println("PerfTest [-help] [-props <properties>] [-num_bursts <num>] " + 382 "[-num_msgs_per_burst <num>] [-sleep_time <number of msecs>] " + 383 "[-msg_size <bytes>] [-sender <true/false>]"); 384 } 385 386 387 388 389 public static class MyHeader extends Header { 390 public static final int DATA = 1; 391 public static final int START = 2; 392 public static final int CLEAR = 3; 393 public static final int DONE = 4; 394 395 int type=0; 396 int seqno=-1; 397 398 399 public MyHeader() { 400 401 } 402 403 public MyHeader(int type, int seqno) { 404 this.type=type; 405 this.seqno=seqno; 406 } 407 408 public long size() { 409 return 16; 410 } 411 412 public String toString() { 413 StringBuffer sb=new StringBuffer (); 414 switch(type) { 415 case DATA: sb.append("DATA (seqno=").append(seqno).append(')'); break; 416 case START: sb.append("START"); break; 417 case CLEAR: sb.append("CLEAR"); break; 418 default: sb.append("<n/a>"); break; 419 } 420 return sb.toString(); 421 } 422 423 public void writeExternal(ObjectOutput out) throws IOException { 424 out.writeInt(type); 425 out.writeInt(seqno); 426 } 427 428 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 429 type=in.readInt(); 430 seqno=in.readInt(); 431 } 432 433 } 434 435 436 437 class Entry { 438 long start=0, stop=0; 439 int num_received=0; 440 int[] seqnos=null; 441 442 443 Entry(int num) { 444 seqnos=new int[num]; 445 for(int i=0; i < seqnos.length; i++) 446 seqnos[i]=-1; 447 start=System.currentTimeMillis(); 448 } 449 450 void add(int seqno) { 451 if(seqnos != null) 452 seqnos[seqno]=seqno; 453 num_received++; 454 if(num_received >= seqnos.length) { 455 if(done()) 456 stop=System.currentTimeMillis(); 457 } 458 } 459 460 boolean done() { 461 if(seqnos == null) 462 return false; 463 for(int i=0; i < seqnos.length; i++) 464 if(seqnos[i] < 0) 465 return false; 466 return true; 467 } 468 469 int getNumReceived() { 470 return num_received; 471 } 472 473 int getRealReceived() { 474 int num=0; 475 if(seqnos == null) return 0; 476 for(int i=0; i < seqnos.length; i++) { 477 if(seqnos[i] > -1) 478 num++; 479 } 480 return num; 481 } 482 483 String printStats() { 484 StringBuffer sb=new StringBuffer (); 485 sb.append("done=").append(done()).append('\n'); 486 sb.append("number of messages received: ").append(getRealReceived()).append('\n'); 487 sb.append("total time: ").append(stop-start).append(" ms\n"); 488 sb.append("msgs/sec: ").append((double)getRealReceived() / ((stop-start)/1000.0)).append('\n'); 489 sb.append("throughput (kb/sec): ").append((getRealReceived() * msg_size/1000.0) / ((stop-start) / 1000.0)).append('\n'); 490 return sb.toString(); 491 } 492 } 493 494 495 } 496 | Popular Tags |