1 package org.jgroups.protocols; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.stack.LogicalAddress1_4; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Queue; 9 import org.jgroups.util.QueueClosedException; 10 import org.jgroups.util.Util; 11 12 import java.io.*; 13 import java.net.*; 14 import java.util.*; 15 16 38 public class UDP1_4 extends Protocol implements Receiver { 39 40 static final String name="UDP1_4"; 41 42 43 ConnectorTable ct=null; 44 45 46 List bind_addrs=null; 47 48 49 String group_name=null; 50 51 52 InetSocketAddress mcast_addr=null; 53 54 55 LogicalAddress1_4 local_addr=new LogicalAddress1_4(null, null); 56 57 58 LogicalAddress1_4 local_addr_canonical=local_addr.copy(); 59 60 61 ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535); 62 63 67 int local_bind_port=0; 68 int port_range=1; 70 71 75 boolean ip_mcast=true; 76 77 78 int ip_ttl=32; 79 80 81 Vector members=new Vector(); 82 83 87 UdpHeader udp_hdr=null; 88 89 90 int mcast_send_buf_size=300000; 91 92 93 int mcast_recv_buf_size=300000; 94 95 96 int ucast_send_buf_size=300000; 97 98 99 int ucast_recv_buf_size=300000; 100 101 108 boolean loopback=true; 110 116 boolean use_packet_handler=false; 117 118 119 Queue packet_queue=null; 120 121 125 byte[] additional_data=null; 126 127 131 PacketHandler packet_handler=null; 132 133 protected static Log mylog=LogFactory.getLog(UDP1_4.class); 134 135 136 final int VERSION_LENGTH=Version.getLength(); 137 138 141 static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000; 143 144 145 146 150 public UDP1_4() { 151 } 152 153 156 public String toString() { 157 return "Protocol UDP(local address: " + local_addr + ')'; 158 } 159 160 161 public void receive(DatagramPacket packet) { 162 int len=packet.getLength(); 163 byte[] data=packet.getData(); 164 SocketAddress sender=packet.getSocketAddress(); 165 166 if(len == 4) { if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') { 168 handleDiagnosticProbe(sender); 169 return; 170 } 171 } 172 173 if(mylog.isTraceEnabled()) 174 mylog.trace("received " + len + " bytes from " + sender); 175 176 if(Version.compareTo(packet.getData()) == false) { 177 if(mylog.isWarnEnabled()) mylog.warn("packet from " + sender + " has different version (" + 178 Version.printVersionId(data, Version.version_id.length) + 179 ") from ours (" + Version.printVersionId(Version.version_id) + 180 "). This may cause problems"); 181 } 182 183 if(use_packet_handler && packet_queue != null) { 184 byte[] tmp=new byte[len]; 185 System.arraycopy(data, 0, tmp, 0, len); 186 try { 187 Object [] arr=new Object []{tmp, sender}; 188 packet_queue.add(arr); 189 return; 190 } 191 catch(QueueClosedException e) { 192 if(mylog.isWarnEnabled()) mylog.warn("packet queue for packet handler thread is closed"); 193 } 195 } 196 197 handleIncomingUdpPacket(data, sender); 198 } 199 200 201 202 203 268 void handleDiagnosticProbe(SocketAddress sender) { 269 try { 270 byte[] diag_rsp=getDiagResponse().getBytes(); 271 DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender); 272 273 if(mylog.isInfoEnabled()) mylog.info("sending diag response to " + sender); 274 ct.send(rsp); 275 } catch(Throwable t) { 276 if(mylog.isErrorEnabled()) mylog.error("failed sending diag rsp to " + sender + ", exception=" + t); 277 } 278 } 279 280 String getDiagResponse() { 281 StringBuffer sb=new StringBuffer (); 282 sb.append(local_addr).append(" (").append(group_name).append(')'); 283 sb.append(" [").append(mcast_addr).append("]\n"); 284 sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n"); 285 sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n'); 286 sb.append("members: ").append(members).append('\n'); 287 288 return sb.toString(); 289 } 290 291 292 293 294 295 296 297 public String getName() { 298 return name; 299 } 300 301 302 public void init() throws Exception { 303 if(use_packet_handler) { 304 packet_queue=new Queue(); 305 packet_handler=new PacketHandler(); 306 } 307 } 308 309 310 313 public void start() throws Exception { 314 if(mylog.isInfoEnabled()) mylog.info("creating sockets and starting threads"); 315 if(ct == null) { 316 ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this); 317 318 for(Iterator it=bind_addrs.iterator(); it.hasNext();) { 319 String bind_addr=(String )it.next(); 320 ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size, 321 ucast_send_buf_size, ip_ttl, this); 322 } 323 324 List physical_addrs=ct.getConnectorAddresses(); for(Iterator it=physical_addrs.iterator(); it.hasNext();) { 327 SocketAddress address=(SocketAddress)it.next(); 328 local_addr.addPhysicalAddress(address); 329 } 330 331 if(additional_data != null) 332 local_addr.setAdditionalData(additional_data); 333 334 ct.start(); 335 336 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 337 if(use_packet_handler) 338 packet_handler.start(); 339 } 340 } 341 342 343 public void stop() { 344 if(mylog.isInfoEnabled()) mylog.info("closing sockets and stopping threads"); 345 if(packet_handler != null) 346 packet_handler.stop(); 347 if(ct != null) { 348 ct.stop(); 349 ct=null; 350 } 351 local_addr.removeAllPhysicalAddresses(); 352 } 353 354 355 367 public boolean setProperties(Properties props) { 368 String str; 369 List exclude_list=null; 370 String mcast_addr_name="230.8.8.8"; 371 int mcast_port=7500; 372 373 super.setProperties(props); 374 str=props.getProperty("bind_addrs"); 375 if(str != null) { 376 str=str.trim(); 377 if("all".equals(str.toLowerCase())) { 378 try { 379 bind_addrs=determineAllBindInterfaces(); 380 } 381 catch(SocketException e) { 382 e.printStackTrace(); 383 bind_addrs=null; 384 } 385 } 386 else { 387 bind_addrs=Util.parseCommaDelimitedStrings(str); 388 } 389 props.remove("bind_addrs"); 390 } 391 392 str=props.getProperty("bind_addrs_exclude"); 393 if(str != null) { 394 str=str.trim(); 395 exclude_list=Util.parseCommaDelimitedStrings(str); 396 props.remove("bind_addrs_exclude"); 397 } 398 399 str=props.getProperty("bind_port"); 400 if(str != null) { 401 local_bind_port=Integer.parseInt(str); 402 props.remove("bind_port"); 403 } 404 405 str=props.getProperty("start_port"); 406 if(str != null) { 407 local_bind_port=Integer.parseInt(str); 408 props.remove("start_port"); 409 } 410 411 str=props.getProperty("port_range"); 412 if(str != null) { 413 port_range=Integer.parseInt(str); 414 props.remove("port_range"); 415 } 416 417 str=props.getProperty("mcast_addr"); 418 if(str != null) { 419 mcast_addr_name=str; 420 props.remove("mcast_addr"); 421 } 422 423 str=props.getProperty("mcast_port"); 424 if(str != null) { 425 mcast_port=Integer.parseInt(str); 426 props.remove("mcast_port"); 427 } 428 429 str=props.getProperty("ip_mcast"); 430 if(str != null) { 431 ip_mcast=Boolean.valueOf(str).booleanValue(); 432 props.remove("ip_mcast"); 433 } 434 435 str=props.getProperty("ip_ttl"); 436 if(str != null) { 437 ip_ttl=Integer.parseInt(str); 438 props.remove("ip_ttl"); 439 } 440 441 str=props.getProperty("mcast_send_buf_size"); 442 if(str != null) { 443 mcast_send_buf_size=Integer.parseInt(str); 444 props.remove("mcast_send_buf_size"); 445 } 446 447 str=props.getProperty("mcast_recv_buf_size"); 448 if(str != null) { 449 mcast_recv_buf_size=Integer.parseInt(str); 450 props.remove("mcast_recv_buf_size"); 451 } 452 453 str=props.getProperty("ucast_send_buf_size"); 454 if(str != null) { 455 ucast_send_buf_size=Integer.parseInt(str); 456 props.remove("ucast_send_buf_size"); 457 } 458 459 str=props.getProperty("ucast_recv_buf_size"); 460 if(str != null) { 461 ucast_recv_buf_size=Integer.parseInt(str); 462 props.remove("ucast_recv_buf_size"); 463 } 464 465 str=props.getProperty("use_packet_handler"); 466 if(str != null) { 467 use_packet_handler=Boolean.valueOf(str).booleanValue(); 468 props.remove("use_packet_handler"); 469 } 470 471 472 mcast_addr=new InetSocketAddress(mcast_addr_name, mcast_port); 474 475 if(bind_addrs == null) 477 bind_addrs=new ArrayList(); 478 if(bind_addrs.size() == 0) { 479 try { 480 String default_bind_addr=determineDefaultBindInterface(); 481 bind_addrs.add(default_bind_addr); 482 } 483 catch(SocketException ex) { 484 if(mylog.isErrorEnabled()) mylog.error("failed determining the default bind interface: " + ex); 485 } 486 } 487 if(exclude_list != null) { 488 bind_addrs.removeAll(exclude_list); 489 } 490 if(bind_addrs.size() == 0) { 491 if(mylog.isErrorEnabled()) mylog.error("no valid bind interface found, unable to listen for network traffic"); 492 return false; 493 } 494 else { 495 496 if(mylog.isInfoEnabled()) mylog.info("bind interfaces are " + bind_addrs); 497 } 498 499 if(props.size() > 0) { 500 System.err.println("UDP1_4.setProperties(): the following properties are not recognized:"); 501 props.list(System.out); 502 return false; 503 } 504 return true; 505 } 506 507 508 512 public void startUpHandler() { 513 ; 514 } 515 516 521 public void up(Event evt) { 522 passUp(evt); 523 524 switch(evt.getType()) { 525 526 case Event.CONFIG: 527 passUp(evt); 528 if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg()); 529 handleConfigEvent((HashMap)evt.getArg()); 530 return; 531 } 532 533 passUp(evt); 534 } 535 536 542 public void down(Event evt) { 543 Message msg; 544 Object dest_addr; 545 546 if(evt.getType() != Event.MSG) { handleDownEvent(evt); 548 return; 549 } 550 551 msg=(Message)evt.getArg(); 552 553 if(udp_hdr != null && udp_hdr.channel_name != null) { 554 msg.putHeader(name, udp_hdr); 556 } 557 558 dest_addr=msg.getDest(); 559 560 if(observer != null) 563 observer.passDown(evt); 564 565 if(dest_addr == null) { if(ip_mcast == false) { 567 sendMultipleUdpMessages(msg, members); 569 return; 570 } 571 } 572 573 try { 574 sendUdpMessage(msg); } 576 catch(Exception e) { 577 if(mylog.isErrorEnabled()) mylog.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr); 578 } 579 } 580 581 582 583 584 585 586 587 588 589 590 591 592 void handleMessage(Message msg) { 593 594 } 595 596 597 601 void handleIncomingUdpPacket(byte[] data, SocketAddress sender) { 602 ByteArrayInputStream inp_stream; 603 ObjectInputStream inp; 604 Message msg=null; 605 UdpHeader hdr=null; 606 Event evt; 607 Address dst, src; 608 609 try { 610 inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH); 612 inp=new ObjectInputStream(inp_stream); 613 msg=new Message(); 614 msg.readExternal(inp); 615 dst=msg.getDest(); 616 src=msg.getSrc(); 617 if(src == null) { 618 if(mylog.isErrorEnabled()) mylog.error("sender's address is null"); 619 } 620 else { 621 ((LogicalAddress1_4)src).setPrimaryPhysicalAddress(sender); 622 } 623 624 if((dst == null || dst.isMulticastAddress()) && src != null && local_addr.equals(src)) { 626 if(mylog.isTraceEnabled()) 627 mylog.trace("discarded own loopback multicast packet"); 628 629 631 return; 632 } 633 634 evt=new Event(Event.MSG, msg); 635 if(mylog.isTraceEnabled()) 636 mylog.trace("Message is " + msg + ", headers are " + msg.getHeaders()); 637 638 640 if(observer != null) 641 observer.up(evt, up_queue.size()); 642 643 hdr=(UdpHeader)msg.removeHeader(name); 644 } catch(Throwable e) { 645 if(mylog.isErrorEnabled()) mylog.error("exception=" + Util.getStackTrace(e)); 646 return; 647 } 648 649 if(hdr != null) { 650 651 652 String ch_name=null; 653 654 if(hdr.channel_name != null) 655 ch_name=hdr.channel_name; 656 657 if(ch_name != null && group_name != null && !group_name.equals(ch_name) && 660 !ch_name.equals(Util.DIAG_GROUP)) { 661 662 if(mylog.isWarnEnabled()) mylog.warn("discarded message from different group (" + 663 ch_name + "). Sender was " + msg.getSrc()); 664 return; 665 } 666 } 667 668 passUp(evt); 669 } 670 671 672 675 void sendUdpMessage(Message msg) throws Exception { 676 Address dest, src; 677 ObjectOutputStream out; 678 byte buf[]; 679 DatagramPacket packet; 680 Message copy; 681 Event evt; 683 dest=msg.getDest(); SRC=msg.getSrc(); 685 if(src == null) { 686 src=local_addr_canonical; msg.setSrc(src); 688 } 689 690 if(mylog.isTraceEnabled()) 691 mylog.trace("sending message to " + msg.getDest() + 692 " (src=" + msg.getSrc() + "), headers are " + msg.getHeaders()); 693 694 if(dest == null || dest.isMulticastAddress() || dest.equals(local_addr)) { 698 copy=msg.copy(); 699 copy.removeHeader(name); 700 evt=new Event(Event.MSG, copy); 701 702 704 if(observer != null) 705 observer.up(evt, up_queue.size()); 706 if(mylog.isTraceEnabled()) mylog.trace("looped back local message " + copy); 707 708 passUp(evt); 710 712 if(dest != null && !dest.isMulticastAddress()) 713 return; } 715 716 out_stream.reset(); 717 out_stream.write(Version.version_id, 0, Version.version_id.length); out=new ObjectOutputStream(out_stream); 719 msg.writeExternal(out); 720 out.flush(); buf=out_stream.toByteArray(); 722 packet=new DatagramPacket(buf, buf.length, mcast_addr); 723 724 727 728 ct.send(packet); 730 } 732 733 734 void sendMultipleUdpMessages(Message msg, Vector dests) { 735 Address dest; 736 737 for(int i=0; i < dests.size(); i++) { 738 dest=(Address)dests.elementAt(i); 739 msg.setDest(dest); 740 741 try { 742 sendUdpMessage(msg); 743 } 744 catch(Exception e) { 745 if(mylog.isDebugEnabled()) mylog.debug("exception=" + e); 746 } 747 } 748 } 749 750 751 752 753 754 790 791 792 793 794 void handleDownEvent(Event evt) { 795 switch(evt.getType()) { 796 797 case Event.TMP_VIEW: 798 case Event.VIEW_CHANGE: 799 synchronized(members) { 800 members.removeAllElements(); 801 Vector tmpvec=((View)evt.getArg()).getMembers(); 802 for(int i=0; i < tmpvec.size(); i++) 803 members.addElement(tmpvec.elementAt(i)); 804 } 805 break; 806 807 case Event.GET_LOCAL_ADDRESS: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 809 break; 810 811 case Event.CONNECT: 812 group_name=(String )evt.getArg(); 813 udp_hdr=new UdpHeader(group_name); 814 815 passUp(new Event(Event.CONNECT_OK)); 819 break; 820 821 case Event.DISCONNECT: 822 passUp(new Event(Event.DISCONNECT_OK)); 823 break; 824 825 case Event.CONFIG: 826 if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg()); 827 handleConfigEvent((HashMap)evt.getArg()); 828 break; 829 } 830 } 831 832 833 void handleConfigEvent(HashMap map) { 834 if(map == null) return; 835 if(map.containsKey("additional_data")) 836 additional_data=(byte[])map.get("additional_data"); 837 if(map.containsKey("send_buf_size")) { 838 mcast_send_buf_size=((Integer )map.get("send_buf_size")).intValue(); 839 ucast_send_buf_size=mcast_send_buf_size; 840 } 841 if(map.containsKey("recv_buf_size")) { 842 mcast_recv_buf_size=((Integer )map.get("recv_buf_size")).intValue(); 843 ucast_recv_buf_size=mcast_recv_buf_size; 844 } 845 } 846 847 848 849 public String determineDefaultBindInterface() throws SocketException { 850 for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { 851 NetworkInterface ni=(NetworkInterface)en.nextElement(); 852 for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) { 853 InetAddress bind_addr=(InetAddress)en2.nextElement(); 854 if(!bind_addr.isLoopbackAddress()) { 855 return bind_addr.getHostAddress(); 856 } 857 } 858 } 859 return null; 860 } 861 862 public List determineAllBindInterfaces() throws SocketException { 863 List ret=new ArrayList(); 864 for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { 865 NetworkInterface ni=(NetworkInterface)en.nextElement(); 866 for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) { 867 InetAddress bind_addr=(InetAddress)en2.nextElement(); 868 ret.add(bind_addr.getHostAddress()); 869 } 870 } 871 872 return ret; 873 } 874 875 876 877 878 879 880 881 882 886 class PacketHandler implements Runnable { 887 Thread t=null; 888 889 public void run() { 890 byte[] data; 891 SocketAddress sender; 892 893 while(packet_queue != null && packet_handler != null) { 894 try { 895 Object [] arr=(Object [])packet_queue.remove(); 896 data=(byte[])arr[0]; 897 sender=(SocketAddress)arr[1]; 898 } catch(QueueClosedException closed_ex) { 899 if(mylog.isInfoEnabled()) mylog.info("packet_handler thread terminating"); 900 break; 901 } 902 handleIncomingUdpPacket(data, sender); 903 data=null; } 905 } 906 907 void start() { 908 if(t == null) { 909 t=new Thread (this, "UDP1_4.PacketHandler thread"); 910 t.setDaemon(true); 911 t.start(); 912 } 913 } 914 915 void stop() { 916 if(packet_queue != null) 917 packet_queue.close(false); t=null; 919 packet_queue=null; 920 } 921 } 922 923 924 925 926 931 public static class Connector implements Runnable { 932 933 protected Thread t=null; 934 935 protected SenderThread sender_thread=null; 936 937 938 NetworkInterface bind_interface; 939 940 941 945 MulticastSocket mcast_sock=null; 946 947 948 SocketAddress local_addr=null; 949 950 951 Receiver receiver=null; 952 953 954 protected byte[] receive_buffer=null; 955 956 957 Queue send_queue=new Queue(); 958 959 960 class SenderThread extends Thread { 961 962 963 public void run() { 964 Object [] arr; 965 byte[] buf; 966 SocketAddress dest; 967 968 while(send_queue != null) { 969 try { 970 arr=(Object [])send_queue.remove(); 971 buf=(byte[])arr[0]; 972 dest=(SocketAddress)arr[1]; 973 mcast_sock.send(new DatagramPacket(buf, buf.length, dest)); 974 } 975 catch(QueueClosedException e) { 976 break; 977 } 978 catch(SocketException e) { 979 e.printStackTrace(); 980 } 981 catch(IOException e) { 982 e.printStackTrace(); 983 } 984 985 } 986 } 987 } 988 989 990 991 public Connector(NetworkInterface bind_interface, int local_bind_port, 992 int port_range, int receive_buffer_size, 993 int receive_sock_buf_size, int send_sock_buf_size, 994 int ip_ttl, Receiver receiver) throws IOException { 995 this.bind_interface=bind_interface; 996 this.receiver=receiver; 997 this.receive_buffer=new byte[receive_buffer_size]; 998 999 mcast_sock=createMulticastSocket(local_bind_port, port_range); 1000 1001 mcast_sock.setReceiveBufferSize(receive_sock_buf_size); 1005 mcast_sock.setSendBufferSize(send_sock_buf_size); 1006 mcast_sock.setTimeToLive(ip_ttl); 1007 System.out.println("ttl=" + mcast_sock.getTimeToLive()); 1008 mcast_sock.setNetworkInterface(this.bind_interface); local_addr=mcast_sock.getLocalSocketAddress(); 1010 System.out.println("-- local_addr=" + local_addr); 1011 System.out.println("-- mcast_sock: send_bufsize=" + mcast_sock.getSendBufferSize() + 1012 ", recv_bufsize=" + mcast_sock.getReceiveBufferSize()); 1013 } 1014 1015 1016 public SocketAddress getLocalAddress() { 1017 return local_addr; 1018 } 1019 1020 public NetworkInterface getBindInterface() { 1021 return bind_interface; 1022 } 1023 1024 public void start() throws Exception { 1025 if(mcast_sock == null) 1026 throw new Exception ("UDP1_4.Connector.start(): connector has been stopped (start() cannot be called)"); 1027 1028 if(t != null && t.isAlive()) { 1029 if(mylog.isWarnEnabled()) mylog.warn("connector thread is already running"); 1030 return; 1031 } 1032 t=new Thread (this, "ConnectorThread for " + local_addr); 1033 t.setDaemon(true); 1034 t.start(); 1035 1036 sender_thread=new SenderThread(); 1037 sender_thread.start(); 1038 } 1039 1040 1043 public void stop() { 1044 if(mcast_sock != null) 1045 mcast_sock.close(); t=null; 1047 mcast_sock=null; 1048 } 1049 1050 1051 1052 1053 public void send(DatagramPacket packet) throws Exception { 1054 1056 byte[] buf=(byte[])packet.getData().clone(); 1057 Object [] arr=new Object []{buf, packet.getSocketAddress()}; 1058 send_queue.add(arr); 1059 } 1060 1061 public void run() { 1062 DatagramPacket packet=new DatagramPacket(receive_buffer, receive_buffer.length); 1063 while(t != null) { 1064 try { 1065 packet.setData(receive_buffer, 0, receive_buffer.length); 1066 ConnectorTable.receivePacket(packet, mcast_sock, receiver); 1067 } 1068 catch(Throwable t) { 1069 if(t == null || mcast_sock == null || mcast_sock.isClosed()) 1070 break; 1071 if(mylog.isErrorEnabled()) mylog.error("[" + local_addr + "] exception=" + t); 1072 Util.sleep(300); } 1074 } 1075 t=null; 1076 } 1077 1078 1079 1080 1081 public String toString() { 1082 StringBuffer sb=new StringBuffer (); 1083 sb.append("local_addr=").append(local_addr).append(", mcast_group="); 1084 return sb.toString(); 1085 } 1086 1087 1088 1089 1090 1091 private MulticastSocket createMulticastSocket(int local_bind_port, int port_range) throws IOException { 1093 MulticastSocket sock=null; 1094 int tmp_port=local_bind_port; 1095 1096 int max_port=tmp_port + port_range; 1097 while(tmp_port <= max_port) { 1098 try { 1099 sock=new MulticastSocket(tmp_port); 1100 break; 1101 } 1102 catch(Exception bind_ex) { 1103 tmp_port++; 1104 } 1105 } 1106 if(sock == null) 1107 throw new IOException("could not create a MulticastSocket (port range: " + local_bind_port + 1108 " - " + (local_bind_port+port_range)); 1109 return sock; 1110 } 1111 } 1112 1113 1114 1115 1116 1117 1118 public static class ConnectorTable implements Receiver, Runnable { 1119 1120 Thread t=null; 1121 1122 1123 MulticastSocket mcast_sock=null; 1124 1125 1126 InetSocketAddress mcast_addr=null; 1127 1128 Receiver receiver=null; 1129 1130 1131 byte[] receive_buffer=null; 1132 1133 1134 Vector connectors=new Vector(); 1135 1136 boolean running=false; 1137 1138 1139 1140 1141 1142 public ConnectorTable(InetSocketAddress mcast_addr, 1143 int receive_buffer_size, int receive_sock_buf_size, 1144 boolean ip_mcast, Receiver receiver) throws IOException { 1145 this.receiver=receiver; 1146 this.mcast_addr=mcast_addr; 1147 this.receive_buffer=new byte[receive_buffer_size]; 1148 1149 if(ip_mcast) { 1150 mcast_sock=new MulticastSocket(mcast_addr.getPort()); 1151 mcast_sock.setReceiveBufferSize(receive_sock_buf_size); 1155 } 1156 } 1157 1158 1159 public Receiver getReceiver() { 1160 return receiver; 1161 } 1162 1163 public void setReceiver(Receiver receiver) { 1164 this.receiver=receiver; 1165 } 1166 1167 1168 1169 public void start() throws Exception { 1170 Connector tmp; 1171 if(running) 1172 return; 1173 1174 if(mcast_sock != null) { 1175 t=new Thread (this, "ConnectorTable thread"); 1177 t.setDaemon(true); 1178 t.start(); 1179 } 1180 1181 1182 for(Iterator it=connectors.iterator(); it.hasNext();) { 1184 tmp=(Connector)it.next(); 1185 tmp.start(); 1186 } 1187 1188 running=true; 1189 } 1190 1191 1192 public void stop() { 1193 Connector tmp; 1194 for(Iterator it=connectors.iterator(); it.hasNext();) { 1195 tmp=(Connector)it.next(); 1196 tmp.stop(); 1197 } 1198 connectors.clear(); 1199 t=null; 1200 if(mcast_sock != null) { 1201 mcast_sock.close(); 1202 mcast_sock=null; 1203 } 1204 running=false; 1205 } 1206 1207 1208 public void run() { 1209 DatagramPacket p=new DatagramPacket(receive_buffer, receive_buffer.length); 1211 while(t != null && mcast_sock != null && !mcast_sock.isClosed()) { 1212 p.setData(receive_buffer, 0, receive_buffer.length); 1213 try { 1214 receivePacket(p, mcast_sock, this); 1215 } 1216 catch(Throwable t) { 1217 if(t == null || mcast_sock == null || mcast_sock.isClosed()) 1218 break; 1219 if(mylog.isErrorEnabled()) mylog.error("exception=" + t); 1220 Util.sleep(300); } 1222 } 1223 t=null; 1224 } 1225 1226 1227 1231 public List getConnectorAddresses() { 1232 Connector c; 1233 ArrayList ret=new ArrayList(); 1234 for(Iterator it=connectors.iterator(); it.hasNext();) { 1235 c=(Connector)it.next(); 1236 ret.add(c.getLocalAddress()); 1237 } 1238 return ret; 1239 } 1240 1241 1250 public void send(DatagramPacket msg) throws Exception { 1251 InetAddress dest; 1252 1253 if(msg == null) 1254 return; 1255 dest=msg.getAddress(); 1256 if(dest == null) 1257 throw new IOException("UDP1_4.ConnectorTable.send(): destination address is null"); 1258 1259 if(dest.isMulticastAddress()) { 1260 for(int i=0; i < connectors.size(); i++) { 1262 ((Connector)connectors.get(i)).send(msg); 1263 } 1264 } 1265 else { 1266 Connector c=pickRandomConnector(connectors); 1268 c.send(msg); 1269 } 1270 } 1271 1272 private Connector pickRandomConnector(Vector conns) { 1273 int size=conns.size(); 1274 int index=((int)(Util.random(size))) -1; 1275 return (Connector)conns.get(index); 1276 } 1277 1278 1288 public void listenOn(String bind_interface, int local_port, int port_range, 1289 int receive_buffer_size, int receiver_sock_buf_size, int send_sock_buf_size, 1290 int ip_ttl, Receiver receiver) throws IOException { 1291 if(bind_interface == null) 1292 return; 1293 1294 NetworkInterface ni=NetworkInterface.getByInetAddress(InetAddress.getByName(bind_interface)); 1295 if(ni == null) 1296 throw new IOException("UDP1_4.ConnectorTable.listenOn(): bind interface for " + 1297 bind_interface + " not found"); 1298 1299 Connector tmp=findConnector(ni); 1300 if(tmp != null) { 1301 if(mylog.isWarnEnabled()) mylog.warn("connector for interface " + bind_interface + 1302 " is already present (will be skipped): " + tmp); 1303 return; 1304 } 1305 1306 if(mcast_sock != null) { 1308 mcast_sock.joinGroup(mcast_addr, ni); 1309 1310 if(mylog.isInfoEnabled()) mylog.info("joining " + mcast_addr + " on interface " + ni); 1311 } 1312 1313 tmp=new Connector(ni, local_port, port_range, receive_buffer_size, receiver_sock_buf_size, 1315 send_sock_buf_size, ip_ttl, receiver); 1316 connectors.add(tmp); 1317 } 1318 1319 private Connector findConnector(NetworkInterface ni) { 1320 for(int i=0; i < connectors.size(); i++) { 1321 Connector c=(Connector)connectors.elementAt(i); 1322 if(c.getBindInterface().equals(ni)) 1323 return c; 1324 } 1325 return null; 1326 } 1327 1328 1329 public void receive(DatagramPacket packet) { 1330 if(receiver != null) { 1331 receiver.receive(packet); 1332 } 1333 } 1334 1335 1336 public String toString() { 1337 StringBuffer sb=new StringBuffer (); 1338 sb.append("*** todo: implement ***"); 1339 return sb.toString(); 1340 } 1341 1342 1343 public static void receivePacket(DatagramPacket packet, DatagramSocket sock, Receiver receiver) throws IOException { 1344 int len; 1345 1346 sock.receive(packet); 1347 len=packet.getLength(); 1348 if(len == 1 && packet.getData()[0] == 0) { 1349 if(mylog.isTraceEnabled()) mylog.trace("received dummy packet"); 1350 return; 1351 } 1352 if(receiver != null) 1353 receiver.receive(packet); 1354 } 1355 } 1356 1357 1358 1359 1360 1361 1362 public static class MyReceiver implements Receiver { 1363 ConnectorTable t=null; 1364 1365 public MyReceiver() { 1366 1367 } 1368 1369 public void setConnectorTable(ConnectorTable t) { 1370 this.t=t; 1371 } 1372 1373 public void receive(DatagramPacket packet) { 1374 System.out.println("-- received " + packet.getLength() + " bytes from " + packet.getSocketAddress()); 1375 InetAddress sender=packet.getAddress(); 1376 byte[] buf=packet.getData(); 1377 int len=packet.getLength(); 1378 String tmp=new String (buf, 0, len); 1379 if(len > 4) { 1380 if(tmp.startsWith("rsp:")) { 1381 System.out.println("-- received respose: \"" + tmp + '\"'); 1382 return; 1383 } 1384 } 1385 1386 byte[] rsp_buf=("rsp: this is a response to " + tmp).getBytes(); 1387 DatagramPacket response=new DatagramPacket(rsp_buf, rsp_buf.length, sender, packet.getPort()); 1388 1389 try { 1390 t.send(response); 1391 } 1392 catch(Exception e) { 1393 e.printStackTrace(); 1394 System.err.println("MyReceiver: problem sending response to " + sender); 1395 } 1396 } 1397 } 1398 1399 1400 1401 public static class MulticastReceiver implements Runnable { 1402 Unmarshaller m=null; 1403 DatagramSocket sock=null; 1405 public void run() { 1406 } 1409 1410 } 1411 1412 public static class Unmarshaller { 1413 Queue q=null; 1414 1415 void receive(byte[] data, SocketAddress sender) { 1416 } 1419 } 1420 1421 1422 1423 public static class Mailman { 1424 1425 } 1426 1427 1428 static void help() { 1429 System.out.println("UDP1_4 [-help] [-bind_addrs <list of interfaces>]"); 1430 } 1431 1432 1433 1434 public static void main(String [] args) { 1435 MyReceiver r=new MyReceiver(); 1436 ConnectorTable ct; 1437 String line; 1438 InetSocketAddress mcast_addr; 1439 BufferedReader in=null; 1440 DatagramPacket packet; 1441 byte[] send_buf; 1442 int receive_buffer_size=65000; 1443 boolean ip_mcast=true; 1444 1445 try { 1446 mcast_addr=new InetSocketAddress("230.1.2.3", 7500); 1447 ct=new ConnectorTable(mcast_addr, receive_buffer_size, 120000, ip_mcast, r); 1448 r.setConnectorTable(ct); 1449 } 1450 catch(Throwable t) { 1451 t.printStackTrace(); 1452 return; 1453 } 1454 1455 for(int i=0; i < args.length; i++) { 1456 if("-help".equals(args[i])) { 1457 help(); 1458 continue; 1459 } 1460 if("-bind_addrs".equals(args[i])) { 1461 while(++i < args.length && !args[i].trim().startsWith("-")) { 1462 try { 1463 ct.listenOn(args[i], 0, 1, receive_buffer_size, 120000, 12000, 32, r); 1464 } 1465 catch(IOException e) { 1466 e.printStackTrace(); 1467 return; 1468 } 1469 } 1470 } 1471 } 1472 1473 1474 try { 1475 ct.start(); in=new BufferedReader(new InputStreamReader(System.in)); 1477 while(true) { 1478 System.out.print("> "); System.out.flush(); 1479 line=in.readLine(); 1480 if(line.startsWith("quit") || line.startsWith("exit")) 1481 break; 1482 send_buf=line.getBytes(); 1483 packet=new DatagramPacket(send_buf, send_buf.length, mcast_addr); 1484 ct.send(packet); 1485 } 1486 } 1487 catch(Exception e) { 1488 e.printStackTrace(); 1489 } 1490 finally { 1491 if(ct != null) 1492 ct.stop(); 1493 } 1494 } 1495 1496 1497 1498 1499} 1500 1501 1502interface Receiver { 1503 1504 1509 void receive(DatagramPacket packet); 1510} 1511 | Popular Tags |