1 3 package org.jgroups.tests; 4 5 import org.jgroups.util.Util; 6 7 import java.io.Serializable ; 8 import java.net.*; 9 import java.util.*; 10 11 12 22 public class McastDiscovery1_4 { 23 int ttl = 32; 24 List handlers = new ArrayList(); 25 InetAddress mcast_addr = null; 26 int mcast_port = 5000; 27 long interval = 2000; McastSender mcast_sender = null; 29 boolean running = true; 30 HashMap map = new HashMap(); 32 33 class McastSender extends Thread { 34 35 McastSender() { 36 super(); 37 setName("McastSender"); 38 setDaemon(true); 39 } 40 41 public void run() { 42 MessageHandler handler; 43 while (running) { 44 for (Iterator it = handlers.iterator(); it.hasNext();) { 45 handler = (MessageHandler) it.next(); 46 handler.sendDiscoveryRequest(ttl); 47 } 48 try { 49 sleep(interval); 50 } catch (Exception ex) { 51 } 52 } 53 } 54 } 55 56 57 public McastDiscovery1_4(InetAddress mcast_addr, int mcast_port, long interval, int ttl) { 58 this.mcast_addr = mcast_addr; 59 this.mcast_port = mcast_port; 60 this.interval = interval; 61 this.ttl = ttl; 62 } 63 64 65 public void start() throws Exception { 66 NetworkInterface intf; 67 InetAddress bind_addr; 68 MessageHandler handler; 69 70 for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { 71 intf = (NetworkInterface) en.nextElement(); 72 for (Enumeration en2 = intf.getInetAddresses(); en2.hasMoreElements();) { 73 bind_addr = (InetAddress) en2.nextElement(); 74 map.put(bind_addr, new ArrayList()); 75 System.out.println("-- creating receiver for " + bind_addr); 76 handler = new MessageHandler(bind_addr); 77 handlers.add(handler); 78 handler.start(); 79 } 80 } 81 82 mcast_sender = new McastSender(); 84 mcast_sender.start(); 85 86 System.out.println("press key to stop"); 87 System.out.flush(); 88 System.in.read(); 89 90 printValidInterfaces(); 91 } 92 93 94 void printValidInterfaces() { 95 InetAddress bind_interface; 96 Map.Entry entry; 97 List all_mbrs = new ArrayList(); 98 List l; 99 InetSocketAddress tmp_addr; 100 HashMap map_copy = (HashMap) map.clone(); 101 SortedSet s; 102 Stack st; 103 Result r; 104 105 106 System.out.println("\n========================================================"); 107 System.out.println("Responses received ordered by interface:\n"); 108 for (Iterator it = map.entrySet().iterator(); it.hasNext();) { 109 entry = (Map.Entry) it.next(); 110 bind_interface = (InetAddress) entry.getKey(); 111 System.out.println(bind_interface.getHostAddress() + ":\t " + entry.getValue()); 112 } 113 114 for (Iterator it = map.values().iterator(); it.hasNext();) { 115 l = (List) it.next(); 116 for (Iterator it2 = l.iterator(); it2.hasNext();) { 117 tmp_addr = (InetSocketAddress) it2.next(); 118 if (!all_mbrs.contains(tmp_addr)) 119 all_mbrs.add(tmp_addr); 120 } 121 } 122 123 for (Iterator it = all_mbrs.iterator(); it.hasNext();) { 124 tmp_addr = (InetSocketAddress) it.next(); 125 126 for (Iterator it2 = map.entrySet().iterator(); it2.hasNext();) { 128 entry = (Map.Entry) it2.next(); 129 l = (List) entry.getValue(); 130 131 if (!l.contains(tmp_addr)) { 132 it2.remove(); } 136 } 137 } 138 139 if (map.size() > 0) 140 System.out.println("\n-- Valid interfaces are " + map.keySet() + '\n'); 141 else { 142 System.out.println("\nNo valid interfaces found, listing interfaces by number of responses/interface:\n" + 143 "(it is best to use the interface with the most responses)"); 144 145 s = new TreeSet(); 146 for (Iterator it = map_copy.entrySet().iterator(); it.hasNext();) { 147 entry = (Map.Entry) it.next(); 148 r = new Result((InetAddress) entry.getKey(), ((List) entry.getValue()).size()); 149 s.add(r); 150 } 151 152 st = new Stack(); 153 for (Iterator it = s.iterator(); it.hasNext();) { 154 st.push(it.next()); 155 } 156 157 while (!st.empty()) 158 System.out.println("-- " + st.pop()); 159 160 } 161 162 System.out.println("\nUse of any of the above interfaces in \"UDP(bind_addr=<interface>)\" will " + 163 "guarantee that the members will find each other"); 164 System.out.println("========================================================\n\n"); 165 } 166 167 168 class MessageHandler { 169 MulticastSocket mcast_sock = null; DatagramSocket sock = null; 171 McastReceiver mcast_receiver = null; 172 UcastReceiver ucast_receiver = null; 173 InetAddress local_addr = null; 174 int local_port = 0; 175 176 177 class McastReceiver extends Thread { 178 byte[] buf; 179 DatagramPacket mcast_packet, rsp_packet; 180 DiscoveryRequest req; 181 DiscoveryResponse rsp; 182 183 McastReceiver() { 184 super(); 185 setName("McastReceiver"); 186 setDaemon(true); 187 } 188 189 public void run() { 190 while (running) { 191 buf = new byte[16000]; 192 mcast_packet = new DatagramPacket(buf, buf.length); 193 try { 194 mcast_sock.receive(mcast_packet); 195 req = (DiscoveryRequest) Util.objectFromByteBuffer(mcast_packet.getData()); 196 System.out.println("<-- " + req); 197 198 rsp = new DiscoveryResponse(new InetSocketAddress(local_addr, local_port), req.sender_addr.getAddress()); 200 buf = Util.objectToByteBuffer(rsp); 201 rsp_packet = new DatagramPacket(buf, buf.length, req.sender_addr); 202 sock.send(rsp_packet); 203 } catch (Exception ex) { 204 System.err.println("McastReceiver.run(): " + ex + ", rsp_packet=" + 205 rsp_packet.getSocketAddress() + ", length=" + rsp_packet.getLength() + " bytes"); 206 ex.printStackTrace(); 207 } 208 } 209 } 210 } 211 212 213 class UcastReceiver extends Thread { 214 215 UcastReceiver() { 216 super(); 217 setName("UcastReceiver"); 218 setDaemon(true); 219 } 220 221 public void run() { 222 DatagramPacket packet; 223 byte[] buf; 224 DiscoveryResponse rsp; 225 List l; 226 InetAddress bind_interface; 227 InetSocketAddress responder_addr; 228 229 while (running) { 230 try { 231 buf = new byte[16000]; 232 packet = new DatagramPacket(buf, buf.length); 233 sock.receive(packet); 234 rsp = (DiscoveryResponse) Util.objectFromByteBuffer(packet.getData()); 235 System.out.println("<== " + rsp); 236 bind_interface = rsp.interface_used; 237 l = (List) map.get(bind_interface); 238 if (l == null) 239 map.put(bind_interface, (l = new ArrayList())); 240 responder_addr = rsp.discovery_responder; 241 if (!l.contains(responder_addr)) 242 l.add(responder_addr); 243 } catch (Exception ex) { 244 System.err.println("UcastReceiver.run(): " + ex); 245 } 246 } 247 } 248 } 249 250 251 MessageHandler(InetAddress bind_interface) throws Exception { 252 mcast_sock = new MulticastSocket(mcast_port); 253 mcast_sock.setInterface(bind_interface); 254 mcast_sock.setTimeToLive(ttl); 255 mcast_sock.joinGroup(mcast_addr); 256 sock = new DatagramSocket(0, bind_interface); 257 local_addr = sock.getLocalAddress(); 258 local_port = sock.getLocalPort(); 259 } 260 261 void start() throws Exception { 262 running = true; 263 264 ucast_receiver = new UcastReceiver(); 266 ucast_receiver.start(); 267 268 mcast_receiver = new McastReceiver(); 270 mcast_receiver.start(); 271 } 272 273 void stop() { 274 running = false; 275 276 if (mcast_sock != null) { 277 mcast_sock.close(); 278 mcast_sock = null; 279 } 280 if (sock != null) { 281 sock.close(); 282 sock = null; 283 } 284 } 285 286 287 void sendDiscoveryRequest(int ttl) { 288 DiscoveryRequest req; 289 byte[] buf; 290 DatagramPacket packet; 291 292 req = new DiscoveryRequest(local_addr, local_port); 293 System.out.println("--> " + req); 294 295 try { 296 buf = Util.objectToByteBuffer(req); 297 packet = new DatagramPacket(buf, buf.length, mcast_addr, mcast_port); 298 mcast_sock.send(packet); 299 } catch (Exception ex) { 300 System.err.println("McastDiscovery1_4.sendDiscoveryRequest(): " + ex); 301 } 302 } 303 304 305 } 306 307 308 public static void main(String [] args) { 309 int ttl = 32; String mcast_addr = "228.8.8.8"; int mcast_port = 5000; long interval = 2000; 314 315 for (int i = 0; i < args.length; i++) { 316 if ("-help".equals(args[i])) { 317 help(); 318 return; 319 } 320 if ("-mcast_addr".equals(args[i])) { 321 mcast_addr = args[++i]; 322 continue; 323 } 324 if ("-mcast_port".equals(args[i])) { 325 mcast_port = Integer.parseInt(args[++i]); 326 continue; 327 } 328 if ("-interval".equals(args[i])) { 329 interval = Long.parseLong(args[++i]); 330 continue; 331 } 332 if ("-ttl".equals(args[i])) { 333 ttl = Integer.parseInt(args[++i]); 334 continue; 335 } 336 help(); 337 return; 338 } 339 340 try { 341 new McastDiscovery1_4(InetAddress.getByName(mcast_addr), mcast_port, interval, ttl).start(); 342 } catch (Exception ex) { 343 ex.printStackTrace(); 344 } 345 } 346 347 348 static void help() { 349 System.out.println("McastDiscovery1_4 [-mcast_addr <multicast address>] [-mcast_port <port>]" + 350 " [-interval <time between mcasts (msecs)>] [-ttl <ttl>]"); 351 } 352 353 354 } 355 356 357 abstract class DiscoveryPacket implements Serializable { 358 359 } 360 361 class DiscoveryRequest extends DiscoveryPacket { 362 InetSocketAddress sender_addr = null; 363 364 DiscoveryRequest(InetAddress addr, int port) { 365 sender_addr = new InetSocketAddress(addr, port); 366 } 367 368 369 public String toString() { 370 return "DiscoveryRequest [sender_addr=" + sender_addr + ']'; 371 } 372 373 } 374 375 376 class DiscoveryResponse extends DiscoveryPacket { 377 InetSocketAddress discovery_responder = null; InetAddress interface_used = null; 379 380 DiscoveryResponse(InetSocketAddress discovery_responder, InetAddress interface_used) { 381 this.discovery_responder = discovery_responder; 382 this.interface_used = interface_used; 383 } 384 385 386 public String toString() { 387 return "DiscoveryResponse [discovery_responder=" + discovery_responder + 388 ", interface_used=" + interface_used + ']'; 389 } 390 } 391 392 393 class Result implements Comparable { 394 InetAddress bind_interface = null; 395 int num_responses = 0; 396 397 Result(InetAddress bind_interface, int num_responses) { 398 this.bind_interface = bind_interface; 399 this.num_responses = num_responses; 400 } 401 402 public int compareTo(Object other) { 403 Result oth = (Result) other; 404 return num_responses == oth.num_responses ? 0 : (num_responses < oth.num_responses ? -1 : 1); 405 } 406 407 public String toString() { 408 return bind_interface.getHostAddress() + ":\t " + num_responses; 409 } 410 } 411 412 413 | Popular Tags |