1 3 package org.jgroups.tests; 4 5 6 import org.jgroups.Address; 7 import org.jgroups.Message; 8 import org.jgroups.stack.IpAddress; 9 import org.jgroups.stack.NakReceiverWindow; 10 import org.jgroups.stack.Retransmitter; 11 import org.jgroups.util.Util; 12 13 import java.io.IOException ; 14 15 16 22 public class NakReceiverWindowStressTest implements Retransmitter.RetransmitCommand { 23 NakReceiverWindow win=null; 24 Address sender=null; 25 int num_msgs=1000, prev_value=0; 26 double discard_prob=0.0; long start, stop; 28 boolean trace=false; 29 boolean debug=false; 30 31 32 public NakReceiverWindowStressTest(int num_msgs, double discard_prob, boolean trace) { 33 this.num_msgs=num_msgs; 34 this.discard_prob=discard_prob; 35 this.trace=trace; 36 } 37 38 39 public void retransmit(long first_seqno, long last_seqno, Address sender) { 40 for(long i=first_seqno; i <= last_seqno; i++) { 41 if(debug) 42 out("-- xmit: " + i); 43 Message m=new Message(null, sender, new Long (i)); 44 win.add(i, m); 45 } 46 } 47 48 49 public void start() throws IOException { 50 System.out.println("num_msgs=" + num_msgs + "\ndiscard_prob=" + discard_prob); 51 52 sender=new IpAddress("localhost", 5555); 53 win=new NakReceiverWindow(sender, this, 1); 54 start=System.currentTimeMillis(); 55 sendMessages(num_msgs); 56 } 57 58 59 void sendMessages(int num_msgs) { 60 Message msg; 61 62 for(long i=1; i <= num_msgs; i++) { 63 if(discard_prob > 0 && Util.tossWeightedCoin(discard_prob) && i <= num_msgs) { 64 if(debug) out("-- discarding " + i); 65 } 66 else { 67 if(debug) out("-- adding " + i); 68 win.add(i, new Message(null, null, new Long (i))); 69 if(trace && i % 1000 == 0) 70 System.out.println("-- added " + i); 71 while((msg=win.remove()) != null) 72 processMessage(msg); 73 } 74 } 75 while(true) { 76 while((msg=win.remove()) != null) 77 processMessage(msg); 78 } 79 } 80 81 82 void processMessage(Message msg) { 83 long i; 84 85 i=((Long )msg.getObject()).longValue(); 86 if(prev_value + 1 != i) { 87 System.err.println("** processMessage(): removed seqno (" + i + ") is not 1 greater than " + 88 "previous value (" + prev_value + ')'); 89 System.exit(0); 90 } 91 prev_value++; 92 if(trace && i % 1000 == 0) 93 System.out.println("Removed " + i); 94 if(i == num_msgs) { 95 stop=System.currentTimeMillis(); 96 long total=stop-start; 97 double msgs_per_sec=num_msgs / (total/1000.0); 98 double msgs_per_ms=num_msgs / (double)total; 99 System.out.println("Inserting and removing " + num_msgs + 100 " messages into NakReceiverWindow took " + total + "ms"); 101 System.out.println("Msgs/sec: " + msgs_per_sec + ", msgs/ms: " + msgs_per_ms); 102 System.out.println("<enter> to terminate"); 103 try { 104 System.in.read(); 105 } 106 catch(Exception ex) { 107 System.err.println(ex); 108 } 109 System.exit(0); 110 } 111 } 112 113 114 void out(String msg) { 115 System.out.println(msg); 116 } 117 118 119 public static void main(String [] args) { 120 NakReceiverWindowStressTest test; 121 int num_msgs=1000; 122 double discard_prob=0.0; 123 boolean trace=false; 124 125 126 for(int i=0; i < args.length; i++) { 127 if("-help".equals(args[i])) { 128 help(); 129 return; 130 } 131 if("-num_msgs".equals(args[i])) { 132 num_msgs=Integer.parseInt(args[++i]); 133 continue; 134 } 135 if("-discard".equals(args[i])) { 136 discard_prob=Double.parseDouble(args[++i]); 137 continue; 138 } 139 if("-trace".equals(args[i])) { 140 trace=true; 141 continue; 142 } 143 } 144 145 146 test=new NakReceiverWindowStressTest(num_msgs, discard_prob, trace); 147 try { 148 test.start(); 149 } 150 catch(IOException e) { 151 e.printStackTrace(); 152 } 153 } 154 155 156 static void help() { 157 System.out.println("NakReceiverWindowStressTest [-help] [-num_msgs <number>] [-discard <probability>] " + 158 "[-trace]"); 159 } 160 161 } 162 | Popular Tags |