1 package org.jgroups.tests; 2 3 import org.jgroups.Address; 4 import org.jgroups.JChannel; 5 import org.jgroups.View; 6 import org.jgroups.blocks.GroupRequest; 7 import org.jgroups.blocks.RpcDispatcher; 8 import org.jgroups.util.Util; 9 10 import java.util.Vector ; 11 import java.util.concurrent.CyclicBarrier ; 12 import java.util.concurrent.atomic.AtomicInteger ; 13 import java.util.concurrent.atomic.AtomicLong ; 14 15 20 public class UnicastStressTest { 21 int num_channels=6; 22 int num_threads=1; int num_msgs=1000; int msg_size=4096; String props=null; 26 int buddies=1; 27 28 private JChannel[] channels; 29 private RpcDispatcher[] dispatchers; 30 private Receiver[] receivers; 31 32 final AtomicInteger msgs_received=new AtomicInteger (0); 33 final AtomicLong bytes_received=new AtomicLong (0); 34 35 final CyclicBarrier start_barrier; 36 final CyclicBarrier terminate_barrier; 37 38 39 public UnicastStressTest(String props, int num_channels, int num_threads, int num_msgs, int msg_size, int buddies) { 40 this.props=props; 41 this.num_channels=num_channels; 42 this.num_threads=num_threads; 43 this.num_msgs=num_msgs; 44 this.msg_size=msg_size; 45 this.buddies=buddies; 46 start_barrier=new CyclicBarrier (num_channels * num_threads +1); 47 terminate_barrier=new CyclicBarrier (num_channels +1); 48 if(buddies > num_channels) 49 throw new IllegalArgumentException ("buddies needs to be smaller than number of channels"); 50 } 51 52 53 private void start() throws Exception { 54 channels=new JChannel[num_channels]; 55 receivers=new Receiver[num_channels]; 56 dispatchers=new RpcDispatcher[num_channels]; 57 long start, stop; 58 59 int num_expected_msgs=num_threads * num_msgs * buddies; 60 int num_total_msgs=num_channels * num_threads * num_msgs; for(int i=0; i < channels.length; i++) { 62 channels[i]=new JChannel(props); 63 receivers[i]=new Receiver(terminate_barrier, bytes_received, msgs_received, num_expected_msgs, num_total_msgs); 64 dispatchers[i]=new RpcDispatcher(channels[i], null, null, receivers[i]); 65 channels[i].connect("x"); 66 } 67 68 for(int i=0; i < channels.length; i++) { 70 JChannel channel=channels[i]; 71 View view=channel.getView(); 72 Vector <Address> members=view.getMembers(); 73 if(members.size() != num_channels) { 74 throw new Exception ("cluster has not formed correctly, expected " + num_channels + " channels, found" + 75 " only " + members.size() + " (view: " + view + ")"); 76 } 77 Vector <Address> tmp=pickBuddies(members, channel.getLocalAddress()); 78 79 for(int j=0; j < num_threads; j++) { 80 Sender sender=new Sender(start_barrier, msg_size, num_msgs, dispatchers[i], channel.getLocalAddress(), tmp); 81 sender.start(); } 83 } 84 85 System.out.println("sending " + num_total_msgs + " msgs with " + num_threads + " threads over " + num_channels + " channels"); 86 87 start_barrier.await(); start=System.currentTimeMillis(); 89 90 91 terminate_barrier.await(); stop=System.currentTimeMillis(); 93 94 for(int i=0; i < dispatchers.length; i++) { 95 dispatchers[i].stop(); 96 } 97 for(int i=channels.length -1; i >= 0; i--) { 98 channels[i].close(); 99 } 100 101 printStats(stop - start); 102 } 103 104 private void printStats(long time) { 105 for(int i=0; i < receivers.length; i++) { 106 System.out.println("receiver #" + (i+1) + ": " + receivers[i].getNumReceivedMessages()); 107 } 108 System.out.println("total received messages for " + num_channels + " channels: " + msgs_received.get()); 109 System.out.println("total bytes received by " + num_channels + " channels: " + Util.printBytes(bytes_received.get())); 110 System.out.println("time: " + time + " ms"); 111 double msgs_per_sec=msgs_received.get() / (time /1000.0); 112 double throughput=bytes_received.get() / (time / 1000.0); 113 System.out.println("Message rate: " + msgs_per_sec + " msgs/sec"); 114 System.out.println("Throughput: " + Util.printBytes(throughput) + " / sec"); 115 } 116 117 private Vector <Address> pickBuddies(Vector <Address> members, Address local_addr) { 118 Vector <Address> retval=new Vector <Address>(); 119 int index=members.indexOf(local_addr); 120 if(index < 0) 121 return null; 122 for(int i=index +1; i <= index + buddies; i++) { 123 int real_index=i % members.size(); 124 Address buddy=members.get(real_index); 125 retval.add(buddy); 126 } 127 return retval; 128 } 129 130 131 public static class Receiver { 132 final AtomicInteger msgs; 133 final AtomicLong bytes; 134 final int num_expected_msgs, num_total_msgs, print; 135 final CyclicBarrier barrier; 136 final AtomicInteger num_received_msgs=new AtomicInteger (0); 137 138 139 public Receiver(CyclicBarrier barrier, AtomicLong bytes, AtomicInteger msgs, int num_expected_msgs, int num_total_msgs) { 140 this.barrier=barrier; 141 this.bytes=bytes; 142 this.msgs=msgs; 143 this.num_expected_msgs=num_expected_msgs; 144 this.num_total_msgs=num_total_msgs; 145 print=num_total_msgs / 10; 146 } 147 148 public int getNumReceivedMessages() {return num_received_msgs.get();} 149 150 public void receive(byte[] data) { 151 msgs.incrementAndGet(); 152 bytes.addAndGet(data.length); 153 154 int count=num_received_msgs.incrementAndGet(); 155 if(count % print == 0) { 156 System.out.println("received " + count + " msgs"); 157 } 158 159 if((count=num_received_msgs.get()) >= num_expected_msgs) { 160 try { 161 barrier.await(); 162 } 163 catch(Exception e) { 164 } 165 } 166 167 } 168 } 169 170 private static class Sender extends Thread { 171 private final CyclicBarrier barrier; 172 private final int num_msgs; 173 private final int msg_size; 174 private final RpcDispatcher disp; 175 private final Vector buddies; 176 177 178 public Sender(CyclicBarrier barrier, int msg_size, int num_msgs, RpcDispatcher disp, Address local_addr, Vector buddies) { 179 this.barrier=barrier; 180 this.msg_size=msg_size; 181 this.num_msgs=num_msgs; 182 this.disp=disp; 183 this.buddies=buddies; 184 setName("Sender (" + local_addr + " --> " + buddies + ")"); 185 } 186 187 public void run() { 188 final byte[] data=new byte[msg_size]; 189 final Object [] arg=new Object []{data}; 190 final Class [] types=new Class []{byte[].class}; 191 192 try { 193 barrier.await(); 194 } 195 catch(Exception e) { 196 } 197 198 for(int i=0; i < num_msgs; i++) { 199 disp.callRemoteMethods(buddies, "receive", arg, types, GroupRequest.GET_NONE, 5000, true); 200 } 201 } 202 } 203 204 205 206 public static void main(String [] args) throws Exception { 207 int num_channels=6; 208 int num_threads=10; int num_msgs=10000; int msg_size=4096; int buddies=1; 212 String props=null; 213 214 for(int i=0; i < args.length; i++) { 215 if(args[i].equalsIgnoreCase("-props")) { 216 props=args[++i]; 217 continue; 218 } 219 if(args[i].equalsIgnoreCase("-num_channels")) { 220 num_channels=Integer.parseInt(args[++i]); 221 continue; 222 } 223 if(args[i].equalsIgnoreCase("-num_threads")) { 224 num_threads=Integer.parseInt(args[++i]); 225 continue; 226 } 227 if(args[i].equalsIgnoreCase("-num_msgs")) { 228 num_msgs=Integer.parseInt(args[++i]); 229 continue; 230 } 231 if(args[i].equalsIgnoreCase("-msg_size")) { 232 msg_size=Integer.parseInt(args[++i]); 233 continue; 234 } 235 if(args[i].equalsIgnoreCase("-buddies")) { 236 buddies=Integer.parseInt(args[++i]); 237 continue; 238 } 239 help(); 240 return; 241 } 242 243 new UnicastStressTest(props, num_channels, num_threads, num_msgs, msg_size, buddies).start(); 244 } 245 246 247 248 249 private static void help() { 250 System.out.println("UnicastStressTest [-help] [-props <props>] [-num_channels <num>] " + 251 "[-num_threads <threads per channel>] [-num_msgs <number of msgs per thread>] [-msg_size <size in bytes>] " + 252 "[-buddies <num>]"); 253 } 254 } 255 | Popular Tags |