KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > McastDiscovery


1 // $Id: McastDiscovery.java,v 1.1 2005/06/23 13:31:10 belaban Exp $
2

3 package org.jgroups.tests;
4
5 import org.jgroups.util.Util;
6
7 import java.io.Serializable JavaDoc;
8 import java.net.*;
9 import java.util.*;
10
11
12 /**
13  * Discovers all neighbors in an IP multicast environment by using expanding ring multicasts (increasing TTL).
14  * The sender multicasts a discovery packet on all available network interfaces, while also listening on
15  * all interfaces. The discovery packet contains the sender's address, which is the address and port of the
16  * interface on which the packet was sent. A receiver replies with an ACK back to the sender's address and port.
17  * After n responses or m milliseconds, the sender terminates and computes the network interfaces which should be used.
18  * The network interface is the intersection of the interface variable of all ACKs received.
19  * @author Bela Ban July 26 2002
20  * @version $Revision: 1.1 $
21  */

22 public class McastDiscovery {
23     int ttl = 32;
24     List handlers = new ArrayList();
25     InetAddress mcast_addr = null;
26     int mcast_port = 5000;
27     long interval = 2000; // time between sends
28
McastSender mcast_sender = null;
29     boolean running = true;
30     HashMap map = new HashMap(); // keys=interface (InetAddress), values=List of receivers (InetAddress)
31

32
33     class McastSender extends Thread JavaDoc {
34
35         McastSender() {
36             super();
37             setName("McastSender");
38             setDaemon(true);
39         }
40
41         public void run() {
42             MessageHandler handler;
43             while (running) {
44                 for (Iterator it = handlers.iterator(); it.hasNext();) {
45                     handler = (MessageHandler) it.next();
46                     handler.sendDiscoveryRequest(ttl);
47                 }
48                 try {
49                     sleep(interval);
50                 } catch (Exception JavaDoc ex) {
51                 }
52             }
53         }
54     }
55
56
57     public McastDiscovery(InetAddress mcast_addr, int mcast_port, long interval, int ttl) {
58         this.mcast_addr = mcast_addr;
59         this.mcast_port = mcast_port;
60         this.interval = interval;
61         this.ttl = ttl;
62     }
63
64
65     public void start() throws Exception JavaDoc {
66         NetworkInterface intf;
67         InetAddress bind_addr;
68         MessageHandler handler;
69
70         for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
71             intf = (NetworkInterface) en.nextElement();
72             for (Enumeration en2 = intf.getInetAddresses(); en2.hasMoreElements();) {
73                 bind_addr = (InetAddress) en2.nextElement();
74                 map.put(bind_addr, new ArrayList());
75                 System.out.println("-- creating receiver for " + bind_addr);
76                 handler = new MessageHandler(bind_addr);
77                 handlers.add(handler);
78                 handler.start();
79             }
80         }
81
82         // Now start sending mcast discovery messages:
83
mcast_sender = new McastSender();
84         mcast_sender.start();
85
86         System.out.println("press key to stop");
87         System.out.flush();
88         System.in.read();
89
90         printValidInterfaces();
91     }
92
93
94     void printValidInterfaces() {
95         InetAddress bind_interface;
96         Map.Entry entry;
97         List all_mbrs = new ArrayList();
98         List l;
99         InetSocketAddress tmp_addr;
100         HashMap map_copy = (HashMap) map.clone();
101         SortedSet s;
102         Stack st;
103         Result r;
104
105
106         System.out.println("\n========================================================");
107         System.out.println("Responses received ordered by interface:\n");
108         for (Iterator it = map.entrySet().iterator(); it.hasNext();) {
109             entry = (Map.Entry) it.next();
110             bind_interface = (InetAddress) entry.getKey();
111             System.out.println(bind_interface.getHostAddress() + ":\t " + entry.getValue());
112         }
113
114         for (Iterator it = map.values().iterator(); it.hasNext();) {
115             l = (List) it.next();
116             for (Iterator it2 = l.iterator(); it2.hasNext();) {
117                 tmp_addr = (InetSocketAddress) it2.next();
118                 if (!all_mbrs.contains(tmp_addr))
119                     all_mbrs.add(tmp_addr);
120             }
121         }
122
123         for (Iterator it = all_mbrs.iterator(); it.hasNext();) {
124             tmp_addr = (InetSocketAddress) it.next();
125
126             // tmp_mbr has to be in all values (Lists) of map, remove entry from map if not
127
for (Iterator it2 = map.entrySet().iterator(); it2.hasNext();) {
128                 entry = (Map.Entry) it2.next();
129                 l = (List) entry.getValue();
130
131                 if (!l.contains(tmp_addr)) {
132                     //System.out.println("Member " + tmp_addr + " did not respond to interface " + entry.getKey() +
133
// ", removing interface");
134
it2.remove(); // remove the entry (key plus value) from map
135
}
136             }
137         }
138
139         if (map.size() > 0)
140             System.out.println("\n-- Valid interfaces are " + map.keySet() + '\n');
141         else {
142             System.out.println("\nNo valid interfaces found, listing interfaces by number of responses/interface:\n" +
143                     "(it is best to use the interface with the most responses)");
144
145             s = new TreeSet();
146             for (Iterator it = map_copy.entrySet().iterator(); it.hasNext();) {
147                 entry = (Map.Entry) it.next();
148                 r = new Result((InetAddress) entry.getKey(), ((List) entry.getValue()).size());
149                 s.add(r);
150             }
151
152             st = new Stack();
153             for (Iterator it = s.iterator(); it.hasNext();) {
154                 st.push(it.next());
155             }
156
157             while (!st.empty())
158                 System.out.println("-- " + st.pop());
159
160         }
161
162         System.out.println("\nUse of any of the above interfaces in \"UDP(bind_addr=<interface>)\" will " +
163                 "guarantee that the members will find each other");
164         System.out.println("========================================================\n\n");
165     }
166
167
168     class MessageHandler {
169         MulticastSocket mcast_sock = null; // for receiving mcast discovery messages and sending back unicast discovery responses
170
DatagramSocket sock = null;
171         McastReceiver mcast_receiver = null;
172         UcastReceiver ucast_receiver = null;
173         InetAddress local_addr = null;
174         int local_port = 0;
175
176
177         class McastReceiver extends Thread JavaDoc {
178             byte[] buf;
179             DatagramPacket mcast_packet, rsp_packet;
180             DiscoveryRequest req;
181             DiscoveryResponse rsp;
182
183             McastReceiver() {
184                 super();
185                 setName("McastReceiver");
186                 setDaemon(true);
187             }
188
189             public void run() {
190                 while (running) {
191                     buf = new byte[16000];
192                     mcast_packet = new DatagramPacket(buf, buf.length);
193                     try {
194                         mcast_sock.receive(mcast_packet);
195                         req = (DiscoveryRequest) Util.objectFromByteBuffer(mcast_packet.getData());
196                         System.out.println("<-- " + req);
197
198                         // send response back to req.sender_addr
199
rsp = new DiscoveryResponse(new InetSocketAddress(local_addr, local_port), req.sender_addr.getAddress());
200                         buf = Util.objectToByteBuffer(rsp);
201                         rsp_packet = new DatagramPacket(buf, buf.length, req.sender_addr);
202                         sock.send(rsp_packet);
203                     } catch (Exception JavaDoc ex) {
204                         System.err.println("McastReceiver.run(): " + ex + ", rsp_packet=" +
205                                 rsp_packet.getSocketAddress() + ", length=" + rsp_packet.getLength() + " bytes");
206                         ex.printStackTrace();
207                     }
208                 }
209             }
210         }
211
212
213         class UcastReceiver extends Thread JavaDoc {
214
215             UcastReceiver() {
216                 super();
217                 setName("UcastReceiver");
218                 setDaemon(true);
219             }
220
221             public void run() {
222                 DatagramPacket packet;
223                 byte[] buf;
224                 DiscoveryResponse rsp;
225                 List l;
226                 InetAddress bind_interface;
227                 InetSocketAddress responder_addr;
228
229                 while (running) {
230                     try {
231                         buf = new byte[16000];
232                         packet = new DatagramPacket(buf, buf.length);
233                         sock.receive(packet);
234                         rsp = (DiscoveryResponse) Util.objectFromByteBuffer(packet.getData());
235                         System.out.println("<== " + rsp);
236                         bind_interface = rsp.interface_used;
237                         l = (List) map.get(bind_interface);
238                         if (l == null)
239                             map.put(bind_interface, (l = new ArrayList()));
240                         responder_addr = rsp.discovery_responder;
241                         if (!l.contains(responder_addr))
242                             l.add(responder_addr);
243                     } catch (Exception JavaDoc ex) {
244                         System.err.println("UcastReceiver.run(): " + ex);
245                     }
246                 }
247             }
248         }
249
250
251         MessageHandler(InetAddress bind_interface) throws Exception JavaDoc {
252             mcast_sock = new MulticastSocket(mcast_port);
253             mcast_sock.setInterface(bind_interface);
254             mcast_sock.setTimeToLive(ttl);
255             mcast_sock.joinGroup(mcast_addr);
256             sock = new DatagramSocket(0, bind_interface);
257             local_addr = sock.getLocalAddress();
258             local_port = sock.getLocalPort();
259         }
260
261         void start() throws Exception JavaDoc {
262             running = true;
263
264             // 1. start listening on unicast socket. when discovery response received --> ad to map hashmap
265
ucast_receiver = new UcastReceiver();
266             ucast_receiver.start();
267
268             // 2. start listening on mcast socket. when discovery request received --> send ack
269
mcast_receiver = new McastReceiver();
270             mcast_receiver.start();
271         }
272
273         void stop() {
274             running = false;
275
276             if (mcast_sock != null) {
277                 mcast_sock.close();
278                 mcast_sock = null;
279             }
280             if (sock != null) {
281                 sock.close();
282                 sock = null;
283             }
284         }
285
286
287         void sendDiscoveryRequest(int ttl) {
288             DiscoveryRequest req;
289             byte[] buf;
290             DatagramPacket packet;
291
292             req = new DiscoveryRequest(local_addr, local_port);
293             System.out.println("--> " + req);
294
295             try {
296                 buf = Util.objectToByteBuffer(req);
297                 packet = new DatagramPacket(buf, buf.length, mcast_addr, mcast_port);
298                 mcast_sock.send(packet);
299             } catch (Exception JavaDoc ex) {
300                 System.err.println("McastDiscovery.sendDiscoveryRequest(): " + ex);
301             }
302         }
303
304
305     }
306
307
308     public static void main(String JavaDoc[] args) {
309         int ttl = 32; // ttl to use for IP mcast packets
310
String JavaDoc mcast_addr = "228.8.8.8"; // multicast address to use
311
int mcast_port = 5000; // port to use for mcast socket
312
long interval = 2000; // time between mcast requests
313

314
315         for (int i = 0; i < args.length; i++) {
316             if ("-help".equals(args[i])) {
317                 help();
318                 return;
319             }
320             if ("-mcast_addr".equals(args[i])) {
321                 mcast_addr = args[++i];
322                 continue;
323             }
324             if ("-mcast_port".equals(args[i])) {
325                 mcast_port = Integer.parseInt(args[++i]);
326                 continue;
327             }
328             if ("-interval".equals(args[i])) {
329                 interval = Long.parseLong(args[++i]);
330                 continue;
331             }
332             if ("-ttl".equals(args[i])) {
333                 ttl = Integer.parseInt(args[++i]);
334                 continue;
335             }
336             help();
337             return;
338         }
339
340         try {
341             new McastDiscovery(InetAddress.getByName(mcast_addr), mcast_port, interval, ttl).start();
342         } catch (Exception JavaDoc ex) {
343             ex.printStackTrace();
344         }
345     }
346
347
348     static void help() {
349         System.out.println("McastDiscovery [-mcast_addr <multicast address>] [-mcast_port <port>]" +
350                 " [-interval <time between mcasts (msecs)>] [-ttl <ttl>]");
351     }
352
353
354 }
355
356
357 abstract class DiscoveryPacket implements Serializable JavaDoc {
358
359 }
360
361 class DiscoveryRequest extends DiscoveryPacket {
362     InetSocketAddress sender_addr = null;
363
364     DiscoveryRequest(InetAddress addr, int port) {
365         sender_addr = new InetSocketAddress(addr, port);
366     }
367
368
369     public String JavaDoc toString() {
370         return "DiscoveryRequest [sender_addr=" + sender_addr + ']';
371     }
372
373 }
374
375
376 class DiscoveryResponse extends DiscoveryPacket {
377     InetSocketAddress discovery_responder = null; // address of member who responds to discovery request
378
InetAddress interface_used = null;
379
380     DiscoveryResponse(InetSocketAddress discovery_responder, InetAddress interface_used) {
381         this.discovery_responder = discovery_responder;
382         this.interface_used = interface_used;
383     }
384
385
386     public String JavaDoc toString() {
387         return "DiscoveryResponse [discovery_responder=" + discovery_responder +
388                 ", interface_used=" + interface_used + ']';
389     }
390 }
391
392
393 class Result implements Comparable JavaDoc {
394     InetAddress bind_interface = null;
395     int num_responses = 0;
396
397     Result(InetAddress bind_interface, int num_responses) {
398         this.bind_interface = bind_interface;
399         this.num_responses = num_responses;
400     }
401
402     public int compareTo(Object JavaDoc other) {
403         Result oth = (Result) other;
404         return num_responses == oth.num_responses ? 0 : (num_responses < oth.num_responses ? -1 : 1);
405     }
406
407     public String JavaDoc toString() {
408         return bind_interface.getHostAddress() + ":\t " + num_responses;
409     }
410 }
411
412
413
Popular Tags