1 package org.jgroups.tests; 2 3 import org.jgroups.*; 4 import org.jgroups.stack.IpAddress; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.stack.ProtocolObserver; 7 import org.jgroups.util.Util; 8 9 import java.io.BufferedReader ; 10 import java.io.File ; 11 import java.io.FileWriter ; 12 import java.io.InputStreamReader ; 13 import java.util.Date ; 14 15 16 33 34 public class ContinousThroughputTest { 35 String props="UDP:" + 36 "PING(up_thread=false;down_thread=false):" + 37 "FD(timeout=1000;shun=false):" + 38 "STABLE(up_thread=false;down_thread=false):" + 39 "MERGE(up_thread=false;down_thread=false):" + 40 "NAKACK:" + 41 "FLUSH:" + 42 "GMS:" + 43 "VIEW_ENFORCER(up_thread=false;down_thread=false):" + 44 "QUEUE(up_thread=false;down_thread=false)"; 46 JChannel channel=null; 49 Thread sendThread, receiveThread; 50 boolean coordinator=false; 51 IpAddress my_addr=null; 52 View view; 53 BufferedReader reader; 54 float troughputSum=0, meanTroughput=0, minTroughput=10000, maxTroughput=0; 55 int numTests=0; 56 FileWriter logWriter; 57 Protocol prot=null; 58 59 62 63 public ContinousThroughputTest() { 64 sendThread=new Thread ("sendThread") { 65 public void run() { 66 parser(); 67 } 68 }; 69 receiveThread=new Thread ("receiveThread") { 70 public void run() { 71 checkChannel(); 72 } 73 }; 74 reader=new BufferedReader (new InputStreamReader (System.in)); 75 try { 76 channel=new JChannel(props); 77 channel.setOpt(Channel.BLOCK, Boolean.FALSE); 80 channel.setOpt(Channel.SUSPECT, Boolean.FALSE); 81 channel.connect("Janus"); 82 } 83 catch(Exception ex) { 84 System.out.println("Connection Failed!" + ex); 85 System.exit(1); 86 } 87 my_addr=(IpAddress)channel.getLocalAddress(); 88 89 try { 90 File log=new File ("ContinousThroughputTest" + my_addr.getIpAddress().getHostName() 91 + (System.currentTimeMillis() / 10000) + ".log"); 92 if(!log.exists()) { 93 log.createNewFile(); 94 } 95 logWriter=new FileWriter (log); 96 logWriter.write("ContinousThroughputTest.java log\r\n"); 97 logWriter.write("Date:" + new Date (System.currentTimeMillis()) + "\r\n"); 98 log("Protocol Stack is " + props); 99 System.out.println("Protocol Stack is " + props); 100 } 101 catch(Exception ex) { 102 System.out.println("File problems " + ex); 103 System.exit(5); 104 } 105 } 106 107 static void main(String [] args) { 108 ContinousThroughputTest perfTest=new ContinousThroughputTest(); 109 perfTest.go(); 110 } 111 112 void go() { 113 receiveThread.start(); 115 sendThread.start(); 117 } 118 119 123 124 public void checkChannel() { 125 String payload=null; 126 Object received=null; 127 Message msg=null; 128 boolean done=false; 129 long n; 130 int i=1; 131 132 System.out.println("Started receiving"); 133 try { 134 while(!done) { 135 received=channel.receive(0); 136 if(received instanceof Message) { 137 msg=(Message)received; 138 payload=(String )msg.getObject(); 139 System.out.println(payload); 140 if("stop".equalsIgnoreCase(payload)) { 141 done=true; 142 } 143 if("pingpong".equalsIgnoreCase(payload)) { 144 n=((Long )((Message)channel.receive(0)).getObject()).longValue(); 145 i=((Integer )((Message)channel.receive(0)).getObject()).intValue(); 146 log("Starting pingpong test. Rounds: " + n + " Bursts: " + i); 147 pingpongTest(n, i, false); 148 } 149 if("cping".equalsIgnoreCase(payload)) { 150 log("Starting cping test. Bursts: " + 1); 152 cpingTest(1, true); 153 } 154 if("sweep".equalsIgnoreCase(payload)) { 155 n=((Long )((Message)channel.receive(0)).getObject()).longValue(); 156 i=((Integer )((Message)channel.receive(0)).getObject()).intValue(); 157 log("Starting sweep test. Rounds: " + n + " initial burst: " + i); 158 sweep(n, i); 159 } 160 } 161 if(received instanceof View) { 162 view=(View)received; 163 System.out.println(view); 164 if(view.getMembers().elementAt(0).equals(my_addr)) { 165 System.out.println("I'm the new Coordinator"); 166 coordinator=true; 167 } 168 resetData(); 169 } 170 } 171 } 172 catch(Exception ex) { 173 System.out.println("checkChannel() :" + ex); 174 try { 175 logWriter.write("Stopped cause " + ex + "\r\n"); 176 } 177 catch(Exception e) { 178 } 179 System.exit(2); 180 } 181 System.out.println("Stopped Receiving"); 182 183 channel.disconnect(); 184 System.out.println("Disconnected from \"Janus\""); 185 channel.close(); 186 System.out.println("Channel Closed"); 187 System.exit(0); 188 } 189 190 193 public void parser() { 194 boolean done=false; 195 String input; 196 int number=0; 197 int burstlength=1; 198 199 System.out.println("Ready."); 200 try { 201 while(!done) { 202 input=reader.readLine(); 203 if("stop".equalsIgnoreCase(input)) { 204 done=true; 205 } 206 if("pingpong".equalsIgnoreCase(input)) { 207 number=askNumber(reader, "How many rounds?"); 208 burstlength=askNumber(reader, "Length of bursts?"); 209 channel.send(new Message(null, null, input)); 210 channel.send(new Message(null, null, new Long (number))); 211 channel.send(new Message(null, null, new Integer (burstlength))); 212 continue; 213 214 } 215 if("cping".equalsIgnoreCase(input)) { 216 channel.send(new Message(null, null, input)); 218 continue; 220 } 221 if("sweep".equalsIgnoreCase(input)) { 222 number=askNumber(reader, "Number of tests"); 223 burstlength=askNumber(reader, "Initial length of bursts?"); 224 channel.send(new Message(null, null, input)); 225 channel.send(new Message(null, null, new Long (number))); 226 channel.send(new Message(null, null, new Integer (burstlength))); 227 continue; 228 } 229 channel.send(new Message(null, null, input)); 230 } 231 } 232 catch(Exception ex) { 233 System.out.println(ex); 234 } 235 } 236 237 240 241 void sendBurst(long n) { 242 try { 243 byte[] buf=Util.objectToByteBuffer("Standard Mex"); 244 for(int i=0; i < n; i++) { 245 channel.send(new Message(null, null, buf)); 246 } 247 } 248 catch(Exception ex) { 249 System.out.println("sendBurst: " + ex); 250 } 251 } 252 253 254 257 258 void showStats(long start, long stop, long messages, int burstlength) { 259 String result; 260 long elapsedTime=(stop - start); 261 long troughPut=(messages * 1000) / elapsedTime; 262 maxTroughput=(maxTroughput > troughPut) ? maxTroughput : troughPut; 264 minTroughput=(minTroughput < troughPut) ? minTroughput : troughPut; 265 result="Elapsed Time: " + (stop - start) + 267 "| messages:" + messages + 268 "| burst length:" + burstlength + 269 "| Troughput:" + troughPut + 270 "| max: " + maxTroughput + 271 "| min: " + minTroughput + 272 "\r\n"; 273 System.out.println(result); 274 try { 275 logWriter.write(result); 276 logWriter.flush(); 277 } 278 catch(Exception ex) { 279 System.out.println("showStats():" + ex); 280 } 281 282 } 283 284 int askNumber(BufferedReader reader, String text) { 285 int number=0; 286 String input="10"; 287 System.out.println(text); 288 try { 289 input=reader.readLine(); 290 } 291 catch(Exception ex) { 292 System.out.println("AskNumber :" + ex); 293 } 294 295 number=Integer.parseInt(input); 296 return number; 297 } 298 299 302 303 void resetData() { 304 maxTroughput=0; 305 minTroughput=10000; 306 meanTroughput=0; 307 numTests=0; 308 troughputSum=0; 309 } 310 311 316 void pingpongTest(long n, int burst_length, boolean partialResultsPrint) { 317 long i=0; 318 long start=System.currentTimeMillis(); 319 long tempstart=System.currentTimeMillis(); 320 long stop, throughput; 321 try { 322 for(i=0; i < n; i++) { 323 for(int k=0; k < burst_length; k++) 324 channel.send(new Message(null, null, new Long (i))); 325 for(int j=0; j < (view.size() * burst_length); j++) { 326 channel.receive(20000); 327 } 328 if(partialResultsPrint && ((i % 1000) == 0)) { 329 if(i == 0) continue; 330 stop=System.currentTimeMillis(); 331 throughput=(1000000 / (stop - tempstart)) * view.size() * burst_length; 332 try { 333 System.out.println(new Date (stop).toString() + " : " + throughput); 334 logWriter.write(new Date (stop).toString() + " : " + throughput); 335 logWriter.write("\r\n"); 336 logWriter.flush(); 337 tempstart=System.currentTimeMillis(); 338 } 339 catch(Exception ex) { 340 ex.printStackTrace(); 341 } 342 } 343 } 344 } 345 catch(TimeoutException ex) { 346 System.out.println("Timeout Receiving, round: " + i); 347 System.exit(5); 348 } 349 catch(Exception ex) { 350 ex.printStackTrace(); 351 System.exit(4); 352 } 353 stop=System.currentTimeMillis(); 354 showStats(start, stop, n * view.size() * burst_length, burst_length); 355 } 356 357 void sweep(long tests, int burstlenght) { 358 long messagespertest=10000; 359 for(int i=0; i < tests; i++) { 360 burstlenght+=i; 361 pingpongTest(messagespertest / burstlenght, burstlenght, false); 362 } 363 } 364 365 368 void cpingTest(int burst_lenght, boolean printoutput) { 369 Object recvd=null; 370 long start=System.currentTimeMillis(); 371 for(long i=1; i < Long.MAX_VALUE; i++) { 372 try { 374 channel.send(null, null, "cping"); 375 for(int j=0; j < burst_lenght * view.size();) { 376 recvd=channel.receive(10000); 377 if(recvd instanceof View) { 378 view=(View)recvd; 379 System.out.println(view); 380 log(view.toString()); 381 } 382 else { 383 j++; 384 } 385 } 386 } 387 catch(TimeoutException tex) { 388 try { 389 channel.send(new Message(null, null, "cping")); 390 System.out.println("Resent a message for timeout"); 391 log("Resent a message for timeout"); 392 } 393 catch(Exception ex) { 394 System.exit(9); 395 } 396 } 397 catch(Exception ex) { 398 System.exit(9); 399 } 400 if((i % 1000) == 0) { 401 long stop=System.currentTimeMillis(); 402 long throughput=i * 1000 * view.size() / (stop - start); 403 System.out.println("Througputh = " + throughput); 404 log("Througputh = " + throughput); 405 start=System.currentTimeMillis(); 406 i=0; 407 } 408 } 409 } 410 411 414 415 public static class MessageLenghtObserver implements ProtocolObserver { 416 417 public void setProtocol(Protocol prot) { 418 419 throw new java.lang.UnsupportedOperationException ("Method setProtocol() not yet implemented."); 420 } 421 422 public boolean up(Event evt, int num_evts) { 423 424 throw new java.lang.UnsupportedOperationException ("Method up() not yet implemented."); 425 } 426 427 public boolean passUp(Event evt) { 428 return true; 429 } 430 431 public boolean down(Event evt, int num_evts) { 432 return true; 433 } 434 435 public boolean passDown(Event evt) { 436 byte[] buf=null; 437 if(evt.getType() == Event.MSG) 438 try { 439 buf=Util.objectToByteBuffer(evt.getArg()); 440 System.out.println("UDP: sending a message of " + 441 buf.length + 442 "bytes"); 443 System.out.println("Message was :"); 444 System.out.println(new String (buf)); 445 } 446 catch(Exception ex) { 447 448 } 449 return true; 450 } 451 } 452 453 void log(String str) { 454 try { 455 logWriter.write(str + "\r\n"); 456 logWriter.flush(); 457 } 458 catch(Exception ex) { 459 460 } 461 } 462 } 463 | Popular Tags |