1 package org.jgroups.tests; 2 3 import org.jgroups.Global; 4 import org.jgroups.ReceiverAdapter; 5 import org.jgroups.stack.IpAddress; 6 import org.jgroups.util.Util; 7 8 import java.io.IOException ; 9 import java.net.*; 10 import java.nio.ByteBuffer ; 11 12 17 public class RoundTripMulticast extends ReceiverAdapter { 18 MulticastSocket mcast_recv_sock; MulticastSocket mcast_send_sock; DatagramSocket ucast_sock; InetAddress bind_addr, mcast_addr; 22 int mcast_port=7500; 23 int num=1000; 24 int msg_size=10; 25 boolean server=false; 26 final byte[] RSP_BUF=new byte[]{1}; int num_responses=0; 28 final Object mutex=new Object (); 29 IpAddress local_addr; 30 31 32 interface Receiver { 33 void receive(byte[] buffer, int offset, int length, InetAddress sender, int sender_port); 34 } 35 36 37 private void start(boolean server, int num, int msg_size, InetAddress bind_addr, 38 InetAddress mcast_addr, int mcast_port) throws Exception { 39 this.server=server; 40 this.num=num; 41 this.msg_size=msg_size; 42 this.bind_addr=bind_addr; 43 this.mcast_addr=mcast_addr; 44 this.mcast_port=mcast_port; 45 46 mcast_send_sock=new MulticastSocket(mcast_port); 48 mcast_send_sock.setTimeToLive(2); 49 mcast_send_sock.setInterface(bind_addr); 50 SocketAddress group=new InetSocketAddress(mcast_addr, mcast_port); 51 mcast_send_sock.joinGroup(group, null); 52 53 mcast_recv_sock=new MulticastSocket(mcast_port); 54 mcast_recv_sock.setTimeToLive(2); 55 mcast_recv_sock.setInterface(bind_addr); 56 mcast_recv_sock.joinGroup(group, null); 57 58 ucast_sock=new DatagramSocket(0, bind_addr); 59 ucast_sock.setTrafficClass(16); local_addr=new IpAddress(ucast_sock.getLocalAddress(), ucast_sock.getLocalPort()); 61 62 63 if(server) { 64 Receiver r=new Receiver() { 65 public void receive(byte[] buf, int offset, int length, InetAddress sender, int sender_port) { 66 ByteBuffer buffer=ByteBuffer.wrap(buf, offset, length); 67 byte r=buffer.get(); 68 short len=buffer.getShort(); 70 byte[] tmp=new byte[len]; 71 buffer.get(tmp, 0, len); 72 try { 73 IpAddress real_sender=(IpAddress)Util.streamableFromByteBuffer(IpAddress.class, tmp); 74 DatagramPacket packet=new DatagramPacket(RSP_BUF, 0, RSP_BUF.length, real_sender.getIpAddress(), real_sender.getPort()); 75 ucast_sock.send(packet); } 77 catch(Exception e) { 78 e.printStackTrace(); 79 } 80 } 81 }; 82 ReceiverThread rt=new ReceiverThread(r, mcast_recv_sock); 83 rt.start(); 84 85 System.out.println("server started (ctrl-c to kill)"); 86 while(true) { 87 Util.sleep(60000); 88 } 89 } 90 else { 91 System.out.println("sending " + num + " requests"); 92 sendRequests(); 93 } 94 95 mcast_recv_sock.close(); 96 mcast_send_sock.close(); 97 ucast_sock.close(); 98 } 99 100 101 102 private void sendRequests() throws Exception { 103 byte[] marshalled_addr=Util.streamableToByteBuffer(local_addr); 104 int length=Global.BYTE_SIZE + Global.SHORT_SIZE + marshalled_addr.length + 107 msg_size; 108 long start, stop, total; 109 double requests_per_sec; 110 double ms_per_req; 111 int print=num / 10; 112 int count=0; 113 114 num_responses=0; 115 116 ByteBuffer buffer=ByteBuffer.allocate(length); 117 buffer.put((byte)0); buffer.putShort((short)marshalled_addr.length); 119 buffer.put(marshalled_addr, 0, marshalled_addr.length); 120 byte[] payload=new byte[msg_size]; 121 buffer.put(payload, 0, payload.length); 122 byte[] array=buffer.array(); 123 124 ReceiverThread mcast_receiver=new ReceiverThread( 125 new Receiver() { 126 public void receive(byte[] buffer, int offset, int length, InetAddress sender, int sender_port) { 127 } 129 }, 130 mcast_recv_sock 131 ); 132 mcast_receiver.start(); 133 134 ReceiverThread ucast_receiver=new ReceiverThread( 135 new Receiver() { 136 public void receive(byte[] buffer, int offset, int length, InetAddress sender, int sender_port) { 137 synchronized(mutex) { 138 num_responses++; 139 mutex.notify(); 140 } 141 } 142 }, 143 ucast_sock); 144 ucast_receiver.start(); 145 146 start=System.currentTimeMillis(); 147 for(int i=0; i < num; i++) { 148 DatagramPacket packet=new DatagramPacket(array, 0, array.length, mcast_addr, mcast_port); 149 try { 150 mcast_send_sock.send(packet); 151 synchronized(mutex) { 152 while(num_responses != count +1) { 153 mutex.wait(1000); 154 } 155 count=num_responses; 156 if(num_responses >= num) { 157 System.out.println("received all responses (" + num_responses + ")"); 158 break; 159 } 160 } 161 if(num_responses % print == 0) { 162 System.out.println("- received " + num_responses); 163 } 164 } 165 catch(Exception e) { 166 e.printStackTrace(); 167 } 168 } 169 stop=System.currentTimeMillis(); 170 171 172 195 total=stop-start; 196 requests_per_sec=num / (total / 1000.0); 197 ms_per_req=total / (double)num; 198 System.out.println("Took " + total + "ms for " + num + " requests: " + requests_per_sec + 199 " requests/sec, " + ms_per_req + " ms/request"); 200 } 201 202 203 static class ReceiverThread implements Runnable { 204 Receiver receiver; 205 Thread thread; 206 DatagramSocket sock; 207 byte[] buf=new byte[65000]; 208 DatagramPacket packet; 209 210 public ReceiverThread(Receiver r, DatagramSocket sock) { 211 this.receiver=r; 212 this.sock=sock; 213 } 214 215 public final void start() { 216 thread=new Thread (this); 217 thread.start(); 218 } 219 220 public void stop() { 221 thread=null; 222 sock.close(); 223 } 224 225 public void run() { 226 while(thread != null && thread.equals(Thread.currentThread())) { 227 packet=new DatagramPacket(buf, 0, buf.length); 228 try { 229 sock.receive(packet); 230 if(receiver != null) { 231 receiver.receive(packet.getData(), packet.getOffset(), packet.getLength(), packet.getAddress(), packet.getPort()); 232 } 233 } 234 catch(IOException e) { 235 break; 236 } 237 } 238 } 239 } 240 241 242 public static void main(String [] args) throws Exception { 243 boolean server=false; 244 int num=100; 245 int msg_size=10; InetAddress bind_addr=null, mcast_addr=null; 247 int mcast_port=7500; 248 249 for(int i=0; i < args.length; i++) { 250 if(args[i].equals("-num")) { 251 num=Integer.parseInt(args[++i]); 252 continue; 253 } 254 if(args[i].equals("-server")) { 255 server=true; 256 continue; 257 } 258 if(args[i].equals("-size")) { 259 msg_size=Integer.parseInt(args[++i]); 260 continue; 261 } 262 if(args[i].equals("-bind_addr")) { 263 bind_addr=InetAddress.getByName(args[++i]); 264 continue; 265 } 266 if(args[i].equals("-mcast_addr")) { 267 mcast_addr=InetAddress.getByName(args[++i]); 268 continue; 269 } 270 if(args[i].equals("-mcast_port")) { 271 mcast_port=Integer.parseInt(args[++i]); 272 continue; 273 } 274 RoundTripMulticast.help(); 275 return; 276 } 277 278 if(bind_addr == null) 279 bind_addr=InetAddress.getLocalHost(); 280 if(mcast_addr == null) 281 mcast_addr=InetAddress.getByName("225.5.5.5"); 282 new RoundTripMulticast().start(server, num, msg_size, bind_addr, mcast_addr, mcast_port); 283 } 284 285 286 287 private static void help() { 288 System.out.println("RoundTrip [-server] [-num <number of messages>] " + 289 "[-size <size of each message (in bytes)>] [-bind_addr <bind address>] " + 290 "[-mcast_addr <mcast addr>] [-mcast_port <mcast port>]"); 291 } 292 } 293 | Popular Tags |