KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > RoundTripMulticast


1 package org.jgroups.tests;
2
3 import org.jgroups.Global;
4 import org.jgroups.ReceiverAdapter;
5 import org.jgroups.stack.IpAddress;
6 import org.jgroups.util.Util;
7
8 import java.io.IOException JavaDoc;
9 import java.net.*;
10 import java.nio.ByteBuffer JavaDoc;
11
12 /**
13  * Class that measure RTT between a client and server using multicast sockets
14  * @author Bela Ban
15  * @version $Id: RoundTripMulticast.java,v 1.2 2006/08/08 06:28:19 belaban Exp $
16  */

17 public class RoundTripMulticast extends ReceiverAdapter {
18     MulticastSocket mcast_recv_sock; // to receive mcast traffic
19
MulticastSocket mcast_send_sock; // to send mcast traffic
20
DatagramSocket ucast_sock; // to receive and send unicast traffic
21
InetAddress bind_addr, mcast_addr;
22     int mcast_port=7500;
23     int num=1000;
24     int msg_size=10;
25     boolean server=false;
26     final byte[] RSP_BUF=new byte[]{1}; // 1=response
27
int num_responses=0;
28     final Object JavaDoc mutex=new Object JavaDoc();
29     IpAddress local_addr;
30
31
32     interface Receiver {
33         void receive(byte[] buffer, int offset, int length, InetAddress sender, int sender_port);
34     }
35
36
37     private void start(boolean server, int num, int msg_size, InetAddress bind_addr,
38                        InetAddress mcast_addr, int mcast_port) throws Exception JavaDoc {
39         this.server=server;
40         this.num=num;
41         this.msg_size=msg_size;
42         this.bind_addr=bind_addr;
43         this.mcast_addr=mcast_addr;
44         this.mcast_port=mcast_port;
45
46         // SocketAddress saddr=new InetSocketAddress(bind_addr, mcast_port);
47
mcast_send_sock=new MulticastSocket(mcast_port);
48         mcast_send_sock.setTimeToLive(2);
49         mcast_send_sock.setInterface(bind_addr);
50         SocketAddress group=new InetSocketAddress(mcast_addr, mcast_port);
51         mcast_send_sock.joinGroup(group, null);
52
53         mcast_recv_sock=new MulticastSocket(mcast_port);
54         mcast_recv_sock.setTimeToLive(2);
55         mcast_recv_sock.setInterface(bind_addr);
56         mcast_recv_sock.joinGroup(group, null);
57
58         ucast_sock=new DatagramSocket(0, bind_addr);
59         ucast_sock.setTrafficClass(16); // 0x10
60
local_addr=new IpAddress(ucast_sock.getLocalAddress(), ucast_sock.getLocalPort());
61
62
63         if(server) {
64             Receiver r=new Receiver() {
65                 public void receive(byte[] buf, int offset, int length, InetAddress sender, int sender_port) {
66                     ByteBuffer JavaDoc buffer=ByteBuffer.wrap(buf, offset, length);
67                     byte r=buffer.get();
68                     // System.out.println("received " + (r == 0? "request" : "response"));
69
short len=buffer.getShort();
70                     byte[] tmp=new byte[len];
71                     buffer.get(tmp, 0, len);
72                     try {
73                         IpAddress real_sender=(IpAddress)Util.streamableFromByteBuffer(IpAddress.class, tmp);
74                         DatagramPacket packet=new DatagramPacket(RSP_BUF, 0, RSP_BUF.length, real_sender.getIpAddress(), real_sender.getPort());
75                         ucast_sock.send(packet); // send the response via DatagramSocket
76
}
77                     catch(Exception JavaDoc e) {
78                         e.printStackTrace();
79                     }
80                 }
81             };
82             ReceiverThread rt=new ReceiverThread(r, mcast_recv_sock);
83             rt.start();
84
85             System.out.println("server started (ctrl-c to kill)");
86             while(true) {
87                 Util.sleep(60000);
88             }
89         }
90         else {
91             System.out.println("sending " + num + " requests");
92             sendRequests();
93         }
94
95         mcast_recv_sock.close();
96         mcast_send_sock.close();
97         ucast_sock.close();
98     }
99
100
101
102     private void sendRequests() throws Exception JavaDoc {
103         byte[] marshalled_addr=Util.streamableToByteBuffer(local_addr);
104         int length=Global.BYTE_SIZE + // request or response byte
105
Global.SHORT_SIZE + // length of marshalled IpAddress
106
marshalled_addr.length +
107                 msg_size;
108         long start, stop, total;
109         double requests_per_sec;
110         double ms_per_req;
111         int print=num / 10;
112         int count=0;
113
114         num_responses=0;
115
116         ByteBuffer JavaDoc buffer=ByteBuffer.allocate(length);
117         buffer.put((byte)0); // request
118
buffer.putShort((short)marshalled_addr.length);
119         buffer.put(marshalled_addr, 0, marshalled_addr.length);
120         byte[] payload=new byte[msg_size];
121         buffer.put(payload, 0, payload.length);
122         byte[] array=buffer.array();
123
124         ReceiverThread mcast_receiver=new ReceiverThread(
125                 new Receiver() {
126                     public void receive(byte[] buffer, int offset, int length, InetAddress sender, int sender_port) {
127                         // System.out.println("mcast from " + sender + ":" + sender_port + " was discarded");
128
}
129                 },
130                 mcast_recv_sock
131         );
132         mcast_receiver.start();
133
134         ReceiverThread ucast_receiver=new ReceiverThread(
135                 new Receiver() {
136                     public void receive(byte[] buffer, int offset, int length, InetAddress sender, int sender_port) {
137                         synchronized(mutex) {
138                             num_responses++;
139                             mutex.notify();
140                         }
141                     }
142                 },
143                 ucast_sock);
144         ucast_receiver.start();
145
146         start=System.currentTimeMillis();
147         for(int i=0; i < num; i++) {
148             DatagramPacket packet=new DatagramPacket(array, 0, array.length, mcast_addr, mcast_port);
149             try {
150                 mcast_send_sock.send(packet);
151                 synchronized(mutex) {
152                     while(num_responses != count +1) {
153                         mutex.wait(1000);
154                     }
155                     count=num_responses;
156                     if(num_responses >= num) {
157                         System.out.println("received all responses (" + num_responses + ")");
158                         break;
159                     }
160                 }
161                 if(num_responses % print == 0) {
162                     System.out.println("- received " + num_responses);
163                 }
164             }
165             catch(Exception JavaDoc e) {
166                 e.printStackTrace();
167             }
168         }
169         stop=System.currentTimeMillis();
170
171
172         /*start=System.currentTimeMillis();
173         for(int i=0; i < num; i++) {
174             DatagramPacket packet=new DatagramPacket(array, 0, array.length, mcast_addr, mcast_port);
175             try {
176                 mcast_send_sock.send(packet);
177
178                 if(num_responses % print == 0) {
179                     System.out.println("- received " + num_responses);
180                 }
181                 synchronized(mutex) {
182                     if(num_responses >= num) {
183                         System.out.println("received all responses (" + num_responses + ")");
184                         break;
185                     }
186                     else
187                         mutex.wait();
188                 }
189             }
190             catch(Exception e) {
191                 e.printStackTrace();
192             }
193         }
194         stop=System.currentTimeMillis();*/

195         total=stop-start;
196         requests_per_sec=num / (total / 1000.0);
197         ms_per_req=total / (double)num;
198         System.out.println("Took " + total + "ms for " + num + " requests: " + requests_per_sec +
199                 " requests/sec, " + ms_per_req + " ms/request");
200     }
201
202
203     static class ReceiverThread implements Runnable JavaDoc {
204         Receiver receiver;
205         Thread JavaDoc thread;
206         DatagramSocket sock;
207         byte[] buf=new byte[65000];
208         DatagramPacket packet;
209
210         public ReceiverThread(Receiver r, DatagramSocket sock) {
211             this.receiver=r;
212             this.sock=sock;
213         }
214
215         public final void start() {
216             thread=new Thread JavaDoc(this);
217             thread.start();
218         }
219
220         public void stop() {
221             thread=null;
222             sock.close();
223         }
224
225         public void run() {
226             while(thread != null && thread.equals(Thread.currentThread())) {
227                 packet=new DatagramPacket(buf, 0, buf.length);
228                 try {
229                     sock.receive(packet);
230                     if(receiver != null) {
231                         receiver.receive(packet.getData(), packet.getOffset(), packet.getLength(), packet.getAddress(), packet.getPort());
232                     }
233                 }
234                 catch(IOException JavaDoc e) {
235                     break;
236                 }
237             }
238         }
239     }
240
241
242     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
243         boolean server=false;
244         int num=100;
245         int msg_size=10; // 10 bytes
246
InetAddress bind_addr=null, mcast_addr=null;
247         int mcast_port=7500;
248
249         for(int i=0; i < args.length; i++) {
250             if(args[i].equals("-num")) {
251                 num=Integer.parseInt(args[++i]);
252                 continue;
253             }
254             if(args[i].equals("-server")) {
255                 server=true;
256                 continue;
257             }
258             if(args[i].equals("-size")) {
259                 msg_size=Integer.parseInt(args[++i]);
260                 continue;
261             }
262             if(args[i].equals("-bind_addr")) {
263                 bind_addr=InetAddress.getByName(args[++i]);
264                 continue;
265             }
266             if(args[i].equals("-mcast_addr")) {
267                 mcast_addr=InetAddress.getByName(args[++i]);
268                 continue;
269             }
270             if(args[i].equals("-mcast_port")) {
271                 mcast_port=Integer.parseInt(args[++i]);
272                 continue;
273             }
274             RoundTripMulticast.help();
275             return;
276         }
277
278         if(bind_addr == null)
279             bind_addr=InetAddress.getLocalHost();
280         if(mcast_addr == null)
281             mcast_addr=InetAddress.getByName("225.5.5.5");
282         new RoundTripMulticast().start(server, num, msg_size, bind_addr, mcast_addr, mcast_port);
283     }
284
285
286
287     private static void help() {
288         System.out.println("RoundTrip [-server] [-num <number of messages>] " +
289                 "[-size <size of each message (in bytes)>] [-bind_addr <bind address>] " +
290                 "[-mcast_addr <mcast addr>] [-mcast_port <mcast port>]");
291     }
292 }
293
Popular Tags