1 package org.jgroups.tests.perf; 2 3 import org.jgroups.util.Util; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 7 import java.io.BufferedReader ; 8 import java.io.FileReader ; 9 import java.util.*; 10 11 16 public class Test implements Receiver { 17 String props=null; 18 Properties config; 19 boolean sender=false; 20 Transport transport=null; 21 Object local_addr=null; 22 23 24 HashMap senders=new HashMap(); 25 26 27 ArrayList members=new ArrayList(); 28 29 30 long start=0; 31 32 33 long stop=0; 34 35 int num_members=0; 36 37 Log log=LogFactory.getLog(getClass()); 38 39 boolean all_received=false; 40 41 42 HashMap results=new HashMap(); 43 44 45 boolean gnuplot_output=false; 46 47 48 long log_interval=1000; 49 50 51 long last_dump=0; 52 53 long counter=1; 54 long msg_size=1000; 55 56 57 58 public void start(Properties c, boolean verbose) throws Exception { 59 String config_file="config.txt"; 60 BufferedReader fileReader; 61 String line; 62 String key, val; 63 StringTokenizer st; 64 Properties tmp=new Properties(); 65 66 config_file=c.getProperty("config"); 67 fileReader=new BufferedReader (new FileReader (config_file)); 68 while((line=fileReader.readLine()) != null) { 69 if(line.startsWith("#")) 70 continue; 71 line=line.trim(); 72 if(line.length() == 0) 73 continue; 74 st=new StringTokenizer(line, "=", false); 75 key=st.nextToken().toLowerCase(); 76 val=st.nextToken(); 77 tmp.put(key, val); 78 } 79 fileReader.close(); 80 81 tmp.putAll(c); 84 this.config=tmp; 85 86 StringBuffer sb=new StringBuffer (); 87 sb.append("\n\n----------------------- TEST -----------------------\n"); 88 sb.append("Date: ").append(new Date()).append('\n'); 89 sb.append("Run by: ").append(System.getProperty("user.name")).append("\n\n"); 90 if(verbose) 91 sb.append("Properties: ").append(printProperties()).append("\n-------------------------\n\n"); 92 93 for(Iterator it=this.config.entrySet().iterator(); it.hasNext();) { 94 Map.Entry entry=(Map.Entry)it.next(); 95 sb.append(entry.getKey()).append(":\t").append(entry.getValue()).append('\n'); 96 } 97 sb.append('\n'); 98 System.out.println("Configuration is: " + sb); 99 100 log.info(sb.toString()); 101 102 props=this.config.getProperty("props"); 103 num_members=Integer.parseInt(this.config.getProperty("num_members")); 104 sender=Boolean.valueOf(this.config.getProperty("sender")).booleanValue(); 105 msg_size=Long.parseLong(this.config.getProperty("msg_size")); 106 String tmp2=this.config.getProperty("gnuplot_output", "false"); 107 if(Boolean.valueOf(tmp2).booleanValue()) 108 this.gnuplot_output=true; 109 tmp2=this.config.getProperty("log_interval"); 110 if(tmp2 != null) 111 log_interval=Long.parseLong(tmp2); 112 113 if(gnuplot_output) { 114 sb=new StringBuffer (); 115 sb.append("\n##### msgs_received"); 116 sb.append(", free_mem [KB] "); 117 sb.append(", total_mem [KB] "); 118 sb.append(", total_msgs_sec [msgs/sec] "); 119 sb.append(", total_throughput [KB/sec] "); 120 sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) "); 121 sb.append(" [msgs/sec] "); 122 sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) "); 123 sb.append(" [KB/sec]\n"); 124 if(log.isInfoEnabled()) log.info(sb.toString()); 125 } 126 127 String transport_name=this.config.getProperty("transport"); 128 transport=(Transport)Thread.currentThread().getContextClassLoader().loadClass(transport_name).newInstance(); 129 transport.create(this.config); 130 transport.setReceiver(this); 131 transport.start(); 132 local_addr=transport.getLocalAddress(); 133 } 134 135 private String printProperties() { 136 StringBuffer sb=new StringBuffer (); 137 Properties p=System.getProperties(); 138 for(Iterator it=p.entrySet().iterator(); it.hasNext();) { 139 Map.Entry entry=(Map.Entry)it.next(); 140 sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); 141 } 142 return sb.toString(); 143 } 144 145 public void stop() { 146 if(transport != null) { 147 transport.stop(); 148 transport.destroy(); 149 } 150 } 151 152 public void receive(Object sender, byte[] payload) { 153 Data d; 154 155 try { 156 int type=payload[0]; 157 if(type == 1) { int len=payload.length -1; 159 handleData(sender, len); 160 return; 161 } 162 163 byte[] tmp=new byte[payload.length-1]; 164 System.arraycopy(payload, 1, tmp, 0, tmp.length); 165 d=(Data)Util.objectFromByteBuffer(tmp); 166 167 switch(d.getType()) { 168 case Data.DISCOVERY_REQ: 169 sendDiscoveryResponse(); 170 break; 171 case Data.DISCOVERY_RSP: 172 synchronized(this.members) { 173 if(!this.members.contains(sender)) { 174 this.members.add(sender); 175 System.out.println("-- " + sender + " joined"); 176 if(d.sender) { 177 synchronized(this.members) { 178 if(!this.senders.containsKey(sender)) { 179 this.senders.put(sender, new MemberInfo(d.num_msgs)); 180 } 181 } 182 } 183 this.members.notify(); 184 } 185 } 186 break; 187 188 case Data.DONE: 189 if(all_received) 190 return; 191 MemberInfo mi=(MemberInfo)this.senders.get(sender); 192 if(mi != null) { 193 mi.done=true; 194 if(mi.stop == 0) 195 mi.stop=System.currentTimeMillis(); 196 if(allReceived()) { 197 all_received=true; 198 if(stop == 0) 199 stop=System.currentTimeMillis(); 200 sendResults(); 201 if(!this.sender) 202 dumpSenders(); 203 synchronized(this) { 204 this.notify(); 205 } 206 } 207 } 208 else { 209 System.err.println("-- sender " + sender + " not found in senders hashmap"); 210 } 211 break; 212 213 case Data.RESULTS: 214 synchronized(results) { 215 if(!results.containsKey(sender)) { 216 results.put(sender, d.results); 217 results.notify(); 218 } 219 } 220 break; 221 222 default: 223 System.err.println("received invalid data type: " + payload[0]); 224 break; 225 } 226 } 227 catch(Exception e) { 228 e.printStackTrace(); 229 } 230 } 231 232 private void handleData(Object sender, int num_bytes) { 233 if(all_received) 234 return; 235 if(start == 0) { 236 start=System.currentTimeMillis(); 237 last_dump=start; 238 } 239 MemberInfo info=(MemberInfo)this.senders.get(sender); 240 if(info != null) { 241 if(info.start == 0) 242 info.start=System.currentTimeMillis(); 243 info.num_msgs_received++; 244 counter++; 245 info.total_bytes_received+=num_bytes; 246 if(info.num_msgs_received % 1000 == 0) 247 System.out.println("-- received " + info.num_msgs_received + 248 " messages from " + sender); 249 250 if(counter % log_interval == 0) { 251 if(log.isInfoEnabled()) log.info(dumpStats(counter)); 252 } 253 254 if(info.num_msgs_received >= info.num_msgs_expected) { 255 info.done=true; 256 if(info.stop == 0) 257 info.stop=System.currentTimeMillis(); 258 if(allReceived()) { 259 all_received=true; 260 if(stop == 0) 261 stop=System.currentTimeMillis(); 262 try { 263 sendResults(); 264 } 265 catch(Exception e) { 266 e.printStackTrace(); 267 } 268 if(!this.sender) 269 dumpSenders(); 270 synchronized(this) { 271 this.notify(); 272 } 273 } 274 } 275 } 276 else { 277 System.err.println("-- sender " + sender + " not found in senders hashmap"); 278 } 279 } 280 281 void sendResults() throws Exception { 282 Data d=new Data(Data.RESULTS); 283 byte[] buf; 284 d.results=(HashMap)this.senders.clone(); 285 buf=generatePayload(d, null); 286 transport.send(null, buf); 287 } 288 289 boolean allReceived() { 290 MemberInfo mi; 291 292 for(Iterator it=this.senders.values().iterator(); it.hasNext();) { 293 mi=(MemberInfo)it.next(); 294 if(mi.done == false) 295 return false; 296 } 297 return true; 298 } 299 300 301 void sendMessages() throws Exception { 302 long total_msgs=0; 303 int msgSize=Integer.parseInt(config.getProperty("msg_size")); 304 int num_msgs=Integer.parseInt(config.getProperty("num_msgs")); 305 int logInterval=Integer.parseInt(config.getProperty("log_interval")); 306 byte[] buf=new byte[msgSize]; 308 for(int k=0; k < msgSize; k++) 309 buf[k]='.'; 310 Data d=new Data(Data.DATA); 311 byte[] payload=generatePayload(d, buf); 312 for(int i=0; i < num_msgs; i++) { 313 transport.send(null, payload); 314 total_msgs++; 315 if(total_msgs % 1000 == 0) { 316 System.out.println("++ sent " + total_msgs); 317 } 318 if(total_msgs % logInterval == 0) { 319 } 322 } 323 } 324 325 326 byte[] generatePayload(Data d, byte[] buf) throws Exception { 327 byte[] tmp=buf != null? buf : Util.objectToByteBuffer(d); 328 byte[] payload=new byte[tmp.length +1]; 329 payload[0]=intToByte(d.getType()); 330 System.arraycopy(tmp, 0, payload, 1, tmp.length); 331 return payload; 332 } 333 334 private byte intToByte(int type) { 335 switch(type) { 336 case Data.DATA: return 1; 337 case Data.DISCOVERY_REQ: return 2; 338 case Data.DISCOVERY_RSP: return 3; 339 case Data.DONE: return 4; 340 case Data.RESULTS: return 5; 341 default: return 0; 342 } 343 } 344 345 346 void fetchResults() throws Exception { 347 System.out.println("-- sent all messages. Asking receivers if they received all messages\n"); 348 349 int expected_responses=this.members.size(); 350 351 Data d2=new Data(Data.DONE); 354 byte[] tmp=generatePayload(d2, null); 355 System.out.println("-- fetching results (from " + expected_responses + " members)"); 356 synchronized(this.results) { 357 while((results.size()) < expected_responses) { 358 transport.send(null, tmp); 359 this.results.wait(1000); 360 } 361 } 362 System.out.println("-- received all responses"); 363 } 364 365 366 void dumpResults() { 367 Object member; 368 Map.Entry entry; 369 HashMap map; 370 StringBuffer sb=new StringBuffer (); 371 sb.append("\n-- results:\n\n"); 372 373 for(Iterator it=results.entrySet().iterator(); it.hasNext();) { 374 entry=(Map.Entry)it.next(); 375 member=entry.getKey(); 376 map=(HashMap)entry.getValue(); 377 sb.append("-- results from ").append(member).append(":\n"); 378 dump(map, sb); 379 sb.append('\n'); 380 } 381 System.out.println(sb.toString()); 382 if(log.isInfoEnabled()) log.info(sb.toString()); 383 } 384 385 386 void dumpSenders() { 387 StringBuffer sb=new StringBuffer (); 388 dump(this.senders, sb); 389 System.out.println(sb.toString()); 390 } 391 392 void dump(HashMap map, StringBuffer sb) { 393 Map.Entry entry; 394 Object mySender; 395 MemberInfo mi; 396 MemberInfo combined=new MemberInfo(0); 397 combined.start = Long.MAX_VALUE; 398 combined.stop = Long.MIN_VALUE; 399 400 for(Iterator it2=map.entrySet().iterator(); it2.hasNext();) { 401 entry=(Map.Entry)it2.next(); 402 mySender=entry.getKey(); 403 mi=(MemberInfo)entry.getValue(); 404 combined.start=Math.min(combined.start, mi.start); 405 combined.stop=Math.max(combined.stop, mi.stop); 406 combined.num_msgs_expected+=mi.num_msgs_expected; 407 combined.num_msgs_received+=mi.num_msgs_received; 408 combined.total_bytes_received+=mi.total_bytes_received; 409 sb.append("sender: ").append(mySender).append(": ").append(mi).append('\n'); 410 } 411 sb.append("\ncombined: ").append(combined).append('\n'); 412 } 413 414 415 String dumpStats(long received_msgs) { 416 StringBuffer sb=new StringBuffer (); 417 if(gnuplot_output) 418 sb.append(received_msgs).append(' '); 419 else 420 sb.append("\nmsgs_received=").append(received_msgs); 421 422 if(gnuplot_output) 423 sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' '); 424 else 425 sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0); 426 427 if(gnuplot_output) 428 sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' '); 429 else 430 sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n'); 431 432 dumpThroughput(sb, received_msgs); 433 return sb.toString(); 434 } 435 436 void dumpThroughput(StringBuffer sb, long received_msgs) { 437 double tmp; 438 long current=System.currentTimeMillis(); 439 440 if(current - start == 0 || current - last_dump == 0) 441 return; 442 443 tmp=(1000 * received_msgs) / (current - start); 444 if(gnuplot_output) 445 sb.append(tmp).append(' '); 446 else 447 sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]"); 448 449 tmp=(received_msgs * msg_size) / (current - start); 450 if(gnuplot_output) 451 sb.append(tmp).append(' '); 452 else 453 sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]"); 454 455 tmp=(1000 * log_interval) / (current - last_dump); 456 if(gnuplot_output) 457 sb.append(tmp).append(' '); 458 else { 459 sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)="); 460 sb.append(tmp).append(" [msgs/sec]"); 461 } 462 463 tmp=(log_interval * msg_size) / (current - last_dump); 464 if(gnuplot_output) 465 sb.append(tmp).append(' '); 466 else { 467 sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)="); 468 sb.append(tmp).append(" [KB/sec]\n"); 469 } 470 last_dump=current; 471 } 472 473 474 void runDiscoveryPhase() throws Exception { 475 Data d=new Data(Data.DISCOVERY_REQ); 476 transport.send(null, generatePayload(d, null)); 477 sendDiscoveryResponse(); 478 479 synchronized(this.members) { 480 System.out.println("-- waiting for " + num_members + " members to join"); 481 while(this.members.size() < num_members) { 482 this.members.wait(); 483 System.out.println("-- members: " + this.members.size()); 484 } 485 } 486 } 487 488 void sendDiscoveryResponse() throws Exception { 489 Data d2=new Data(Data.DISCOVERY_RSP); 490 if(sender) { 491 d2.sender=true; 492 d2.num_msgs=Long.parseLong(config.getProperty("num_msgs")); 493 } 494 transport.send(null, generatePayload(d2, null)); 495 } 496 497 498 public static void main(String [] args) { 499 Properties config=new Properties(); 500 boolean sender=false, verbose=false; 501 Test t=null; 502 503 for(int i=0; i < args.length; i++) { 504 if("-sender".equals(args[i])) { 505 config.put("sender", "true"); 506 sender=true; 507 continue; 508 } 509 if("-receiver".equals(args[i])) { 510 config.put("sender", "false"); 511 sender=false; 512 continue; 513 } 514 if("-config".equals(args[i])) { 515 String config_file=args[++i]; 516 config.put("config", config_file); 517 continue; 518 } 519 if("-props".equals(args[i])) { 520 String props=args[++i]; 521 config.put("props", props); 522 continue; 523 } 524 if("-verbose".equals(args[i])) { 525 verbose=true; 526 continue; 527 } 528 help(); 529 return; 530 } 531 532 try { 533 t=new Test(); 534 t.start(config, verbose); 535 t.runDiscoveryPhase(); 536 if(sender) { 537 t.sendMessages(); 538 t.fetchResults(); 539 t.dumpResults(); 540 } 541 else { 542 synchronized(t) { 543 t.wait(); 544 } 545 Util.sleep(2000); 546 } 547 } 548 catch(Exception e) { 549 e.printStackTrace(); 550 } 551 finally { 552 if(t != null) 553 t.stop(); 554 } 555 } 556 557 558 static void help() { 559 System.out.println("Test [-help] ([-sender] | [-receiver]) [-config <config file>] [-props <stack config>] [-verbose]"); 560 } 561 562 563 } 564 | Popular Tags |