| 1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.IpAddress; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.List; 10 import org.jgroups.util.*; 11 import org.jgroups.util.Queue; 12 13 import java.io.*; 14 import java.net.*; 15 import java.util.*; 16 17 18 19 20 39 public class UDP extends Protocol implements Runnable { 40 41 47 DatagramSocket sock=null; 48 49 52 private static BoundedList last_ports_used=null; 53 54 56 int num_last_ports=100; 57 58 59 MulticastSocket mcast_recv_sock=null; 60 61 62 MulticastSocket mcast_send_sock=null; 63 64 65 IpAddress local_addr=null; 66 67 68 String channel_name=null; 69 70 UdpHeader udp_hdr=null; 71 72 73 IpAddress mcast_addr=null; 74 75 76 InetAddress bind_addr=null; 77 78 79 boolean bind_to_all_interfaces=false; 80 81 83 int bind_port=0; 84 int port_range=1; 86 87 String mcast_addr_name="228.8.8.8"; 88 89 90 int mcast_port=7600; 91 92 93 Thread mcast_receiver=null; 94 95 96 UcastReceiver ucast_receiver=null; 97 98 100 boolean ip_mcast=true; 101 102 103 int ip_ttl=64; 104 105 106 final Vector members=new Vector(11); 107 108 109 final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024); 110 111 112 int mcast_send_buf_size=32000; 113 114 115 int mcast_recv_buf_size=64000; 116 117 118 int ucast_send_buf_size=32000; 119 120 121 int ucast_recv_buf_size=64000; 122 123 127 boolean loopback=true; 128 129 130 132 boolean discard_incompatible_packets=false; 133 134 138 boolean use_incoming_packet_handler=false; 139 140 141 Queue incoming_queue=null; 142 143 145 IncomingPacketHandler incoming_packet_handler=null; 146 147 149 boolean use_outgoing_packet_handler=false; 150 151 152 Queue outgoing_queue=null; 153 154 OutgoingPacketHandler outgoing_packet_handler=null; 155 156 158 byte[] additional_data=null; 159 160 162 int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic(); 163 164 167 long max_bundle_timeout=20; 168 169 170 boolean enable_bundling=false; 171 172 173 TimeScheduler timer=null; 174 175 178 HashMap addr_translation_table=new HashMap(); 179 180 boolean use_addr_translation=false; 181 182 183 static final String name="UDP"; 184 185 static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address"; 186 187 188 final int VERSION_LENGTH=Version.getLength(); 189 190 191 192 193 197 public UDP() { 198 ; 199 } 200 201 204 public String toString() { 205 return "Protocol UDP(local address: " + local_addr + ')'; 206 } 207 208 209 BoundedList getLastPortsUsed() { 210 if(last_ports_used == null) 211 last_ports_used=new BoundedList(num_last_ports); 212 return last_ports_used; 213 } 214 215 216 217 public void run() { 218 DatagramPacket packet; 219 byte receive_buf[]=new byte[65535]; 220 int len, sender_port; 221 byte[] tmp, data; 222 InetAddress sender_addr; 223 224 packet=new DatagramPacket(receive_buf, receive_buf.length); 226 227 while(mcast_receiver != null && mcast_recv_sock != null) { 228 try { 229 packet.setData(receive_buf, 0, receive_buf.length); 230 mcast_recv_sock.receive(packet); 231 sender_addr=packet.getAddress(); 232 sender_port=packet.getPort(); 233 len=packet.getLength(); 234 data=packet.getData(); 235 236 if(len == 4) { if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') { 238 handleDiagnosticProbe(sender_addr, sender_port); 239 continue; 240 } 241 } 242 243 if(log.isTraceEnabled()){ 244 StringBuffer sb=new StringBuffer ("received (mcast) "); 245 sb.append(len).append(" bytes from ").append(sender_addr).append(':'); 246 sb.append(sender_port).append(" (size=").append(len).append(" bytes)"); 247 log.trace(sb.toString()); 248 } 249 if(len > receive_buf.length) { 250 if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " + 251 "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " + 252 "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length); 253 } 254 255 if(Version.compareTo(data) == false) { 256 if(log.isWarnEnabled()) { 257 StringBuffer sb=new StringBuffer (); 258 sb.append("packet from ").append(sender_addr).append(':').append(sender_port); 259 sb.append(" has different version (").append(Version.printVersionId(data, Version.version_id.length)); 260 sb.append(") from ours (").append(Version.printVersionId(Version.version_id)).append("). "); 261 if(discard_incompatible_packets) 262 sb.append("Packet is discarded"); 263 else 264 sb.append("This may cause problems"); 265 log.warn(sb.toString()); 266 } 267 if(discard_incompatible_packets) 268 continue; 269 } 270 271 if(use_incoming_packet_handler) { 272 tmp=new byte[len]; 273 System.arraycopy(data, 0, tmp, 0, len); 274 incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp)); 275 } 276 else 277 handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data); 278 } 279 catch(SocketException sock_ex) { 280 if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex); 281 break; 282 } 283 catch(InterruptedIOException io_ex) { ; } 286 catch(Throwable ex) { 287 if(log.isErrorEnabled()) 288 log.error("failure in multicast receive()", ex); 289 Util.sleep(100); } 291 } 292 if(log.isDebugEnabled()) log.debug("multicast thread terminated"); 293 } 294 295 301 void handleDiagnosticProbe(InetAddress sender, int port) { 302 try { 303 byte[] diag_rsp=getDiagResponse().getBytes(); 304 DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port); 305 if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port); 306 sock.send(rsp); 307 } 308 catch(Throwable t) { 309 if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port + 310 ", exception=" + t); 311 } 312 } 313 314 String getDiagResponse() { 315 StringBuffer sb=new StringBuffer (); 316 sb.append(local_addr).append(" (").append(channel_name).append(')'); 317 sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n"); 318 sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n"); 319 sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n'); 320 sb.append("members: ").append(members).append('\n'); 321 322 return sb.toString(); 323 } 324 325 326 327 328 329 330 331 public String getName() { 332 return name; 333 } 334 335 336 public void init() throws Exception { 337 if(use_incoming_packet_handler) { 338 incoming_queue=new Queue(); 339 incoming_packet_handler=new IncomingPacketHandler(); 340 } 341 if(use_outgoing_packet_handler) { 342 outgoing_queue=new Queue(); 343 if(enable_bundling) { 344 timer=stack != null? stack.timer : null; 345 if(timer == null) 346 throw new Exception ("timer could not be retrieved"); 347 outgoing_packet_handler=new BundlingOutgoingPacketHandler(); 348 } 349 else 350 outgoing_packet_handler=new OutgoingPacketHandler(); 351 } 352 } 353 354 355 358 public void start() throws Exception { 359 if(log.isDebugEnabled()) log.debug("creating sockets and starting threads"); 360 createSockets(); 361 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 362 startThreads(); 363 } 364 365 366 public void stop() { 367 if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads"); 368 stopThreads(); closeSockets(); } 371 372 373 374 386 public boolean setProperties(Properties props) { 387 String str; 388 String tmp = null; 389 390 super.setProperties(props); 391 392 try { 394 tmp=System.getProperty("bind.address"); 395 if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) { 396 tmp=null; 397 } 398 } 399 catch (SecurityException ex){ 400 } 401 402 if(tmp != null) 403 str=tmp; 404 else 405 str=props.getProperty("bind_addr"); 406 if(str != null) { 407 try { 408 bind_addr=InetAddress.getByName(str); 409 } 410 catch(UnknownHostException unknown) { 411 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); 412 return false; 413 } 414 props.remove("bind_addr"); 415 } 416 417 str=props.getProperty("bind_to_all_interfaces"); 418 if(str != null) { 419 bind_to_all_interfaces=new Boolean (str).booleanValue(); 420 props.remove("bind_to_all_interfaces"); 421 } 422 423 str=props.getProperty("bind_port"); 424 if(str != null) { 425 bind_port=Integer.parseInt(str); 426 props.remove("bind_port"); 427 } 428 429 str=props.getProperty("num_last_ports"); 430 if(str != null) { 431 num_last_ports=Integer.parseInt(str); 432 props.remove("num_last_ports"); 433 } 434 435 str=props.getProperty("start_port"); 436 if(str != null) { 437 bind_port=Integer.parseInt(str); 438 props.remove("start_port"); 439 } 440 441 str=props.getProperty("port_range"); 442 if(str != null) { 443 port_range=Integer.parseInt(str); 444 props.remove("port_range"); 445 } 446 447 str=props.getProperty("mcast_addr"); 448 if(str != null) { 449 mcast_addr_name=str; 450 props.remove("mcast_addr"); 451 } 452 453 str=props.getProperty("mcast_port"); 454 if(str != null) { 455 mcast_port=Integer.parseInt(str); 456 props.remove("mcast_port"); 457 } 458 459 str=props.getProperty("ip_mcast"); 460 if(str != null) { 461 ip_mcast=Boolean.valueOf(str).booleanValue(); 462 props.remove("ip_mcast"); 463 } 464 465 str=props.getProperty("ip_ttl"); 466 if(str != null) { 467 ip_ttl=Integer.parseInt(str); 468 props.remove("ip_ttl"); 469 } 470 471 str=props.getProperty("mcast_send_buf_size"); 472 if(str != null) { 473 mcast_send_buf_size=Integer.parseInt(str); 474 props.remove("mcast_send_buf_size"); 475 } 476 477 str=props.getProperty("mcast_recv_buf_size"); 478 if(str != null) { 479 mcast_recv_buf_size=Integer.parseInt(str); 480 props.remove("mcast_recv_buf_size"); 481 } 482 483 str=props.getProperty("ucast_send_buf_size"); 484 if(str != null) { 485 ucast_send_buf_size=Integer.parseInt(str); 486 props.remove("ucast_send_buf_size"); 487 } 488 489 str=props.getProperty("ucast_recv_buf_size"); 490 if(str != null) { 491 ucast_recv_buf_size=Integer.parseInt(str); 492 props.remove("ucast_recv_buf_size"); 493 } 494 495 str=props.getProperty("loopback"); 496 if(str != null) { 497 loopback=Boolean.valueOf(str).booleanValue(); 498 props.remove("loopback"); 499 } 500 501 str=props.getProperty("discard_incompatibe_packets"); 502 if(str != null) { 503 discard_incompatible_packets=Boolean.valueOf(str).booleanValue(); 504 props.remove("discard_incompatibe_packets"); 505 } 506 507 str=props.getProperty("use_packet_handler"); 509 if(str != null) { 510 use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); 511 props.remove("use_packet_handler"); 512 if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead"); 513 } 514 515 str=props.getProperty("use_incoming_packet_handler"); 516 if(str != null) { 517 use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); 518 props.remove("use_incoming_packet_handler"); 519 } 520 521 str=props.getProperty("use_outgoing_packet_handler"); 522 if(str != null) { 523 use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue(); 524 props.remove("use_outgoing_packet_handler"); 525 } 526 527 str=props.getProperty("max_bundle_size"); 528 if(str != null) { 529 int bundle_size=Integer.parseInt(str); 530 if(bundle_size > max_bundle_size) { 531 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + 532 ") is greater than largest UDP fragmentation size (" + max_bundle_size + ')'); 533 return false; 534 } 535 if(bundle_size <= 0) { 536 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0"); 537 return false; 538 } 539 max_bundle_size=bundle_size; 540 props.remove("max_bundle_size"); 541 } 542 543 str=props.getProperty("max_bundle_timeout"); 544 if(str != null) { 545 max_bundle_timeout=Long.parseLong(str); 546 if(max_bundle_timeout <= 0) { 547 if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid"); 548 return false; 549 } 550 props.remove("max_bundle_timeout"); 551 } 552 553 str=props.getProperty("enable_bundling"); 554 if(str != null) { 555 enable_bundling=Boolean.valueOf(str).booleanValue(); 556 props.remove("enable_bundling"); 557 } 558 559 str=props.getProperty("use_addr_translation"); 560 if(str != null) { 561 use_addr_translation=Boolean.valueOf(str).booleanValue(); 562 props.remove("use_addr_translation"); 563 } 564 565 if(props.size() > 0) { 566 System.err.println("UDP.setProperties(): the following properties are not recognized:"); 567 props.list(System.out); 568 return false; 569 } 570 571 if(enable_bundling) { 572 if(use_outgoing_packet_handler == false) 573 if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true"); 574 use_outgoing_packet_handler=true; 575 } 576 577 return true; 578 } 579 580 581 582 586 public void startUpHandler() { 587 ; 588 } 589 590 594 public void up(Event evt) { 595 596 switch(evt.getType()) { 597 598 case Event.CONFIG: 599 passUp(evt); 600 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 601 handleConfigEvent((HashMap)evt.getArg()); 602 return; 603 } 604 605 passUp(evt); 606 } 607 608 614 public void down(Event evt) { 615 Message msg; 616 Object dest_addr; 617 618 if(evt.getType() != Event.MSG) { handleDownEvent(evt); 620 return; 621 } 622 623 msg=(Message)evt.getArg(); 624 625 if(channel_name != null) { 626 msg.putHeader(name, udp_hdr); 629 } 630 631 dest_addr=msg.getDest(); 632 633 if(observer != null) 636 observer.passDown(evt); 637 638 if(dest_addr == null) { if(ip_mcast) { 640 if(mcast_addr == null) { 641 if(log.isErrorEnabled()) log.error("dest address of message is null, and " + 642 "sending to default address fails as mcast_addr is null, too !" + 643 " Discarding message " + Util.printEvent(evt)); 644 return; 645 } 646 msg.setDest(mcast_addr); 648 } 649 else { 650 sendMultipleUdpMessages(msg, members); 652 return; 653 } 654 } 655 656 try { 657 sendUdpMessage(msg); 658 } 659 catch(Exception e) { 660 if(log.isErrorEnabled()) log.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr); 661 } 662 } 663 664 665 666 667 668 669 670 671 672 673 674 675 676 682 void setSourceAddress(Message msg) { 683 if(msg.getSrc() == null) 684 msg.setSrc(local_addr); 685 } 686 687 688 689 690 696 void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) { 697 ByteArrayInputStream inp_stream=null; 698 DataInputStream inp=null; 699 Message msg=null; 700 List l; 702 try { 703 inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH); 705 inp=new DataInputStream(inp_stream); 706 707 if(enable_bundling) { 708 l=bufferToList(inp, dest, sender, port); 709 for(Enumeration en=l.elements(); en.hasMoreElements();) { 710 msg=(Message)en.nextElement(); 711 try { 712 handleMessage(msg); 713 } 714 catch(Throwable t) { 715 if(log.isErrorEnabled()) log.error("failure: " + t.toString()); 716 } 717 } 718 } 719 else { 720 msg=bufferToMessage(inp, dest, sender, port); 721 handleMessage(msg); 722 } 723 } 724 catch(Throwable e) { 725 if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e); 726 } 727 finally { 728 Util.closeInputStream(inp); 729 Util.closeInputStream(inp_stream); 730 } 731 } 732 733 734 735 void handleMessage(Message msg) { 736 Event evt; 737 UdpHeader hdr; 738 Address dst=msg.getDest(); 739 740 if(dst == null) 741 dst=mcast_addr; 742 743 if(loopback) { 745 Address SRC=msg.getSrc(); 746 if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) { 747 if(log.isTraceEnabled()) 748 log.trace("discarded own loopback multicast packet"); 749 return; 750 } 751 } 752 753 evt=new Event(Event.MSG, msg); 754 if(log.isTraceEnabled()) { 755 StringBuffer sb=new StringBuffer ("message is "); 756 sb.append(msg).append(", headers are ").append(msg.getHeaders()); 757 log.trace(sb.toString()); 758 } 759 760 762 if(observer != null) 763 observer.up(evt, up_queue.size()); 764 765 hdr=(UdpHeader)msg.getHeader(name); if(hdr != null) { 767 768 769 String ch_name=hdr.channel_name; 770 771 if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) && 774 !ch_name.equals(Util.DIAG_GROUP)) { 775 if(log.isWarnEnabled()) log.warn("discarded message from different group (" + 776 ch_name + "). Sender was " + msg.getSrc()); 777 return; 778 } 779 } 780 else { 781 if(log.isErrorEnabled()) log.error("message does not have a UDP header"); 782 } 783 passUp(evt); 784 } 785 786 787 void sendUdpMessage(Message msg) throws Exception { 788 sendUdpMessage(msg, false); 789 } 790 791 792 void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception { 793 IpAddress dest; 794 Message copy; 795 Event evt; 796 797 dest=(IpAddress)msg.getDest(); setSourceAddress(msg); 799 800 if(log.isTraceEnabled()) { 801 StringBuffer sb=new StringBuffer ("sending msg to "); 802 sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders()); 803 log.trace(sb.toString()); 804 } 805 806 if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) { 810 copy=msg.copy(); 811 copy.setSrc(local_addr); 813 evt=new Event(Event.MSG, copy); 815 816 818 if(observer != null) 819 observer.up(evt, up_queue.size()); 820 if(log.isTraceEnabled()) log.trace("looped back local message " + copy); 821 passUp(evt); 822 if(dest != null && !dest.isMulticastAddress()) 823 return; 824 } 825 826 if(use_outgoing_packet_handler) { 827 if(copyForOutgoingQueue) 828 outgoing_queue.add(msg.copy()); 829 else 830 outgoing_queue.add(msg); 831 return; 832 } 833 834 send(msg); 835 } 836 837 838 839 void send(Message msg) throws Exception { 840 Buffer buf; 841 IpAddress dest=(IpAddress)msg.getDest(); IpAddress SRC=(IpAddress)msg.getSrc(); 843 844 synchronized(out_stream) { 845 buf=messageToBuffer(msg, dest, src); 846 doSend(buf, dest.getIpAddress(), dest.getPort()); 847 } 848 } 849 850 851 852 void doSend(Buffer buf, InetAddress dest, int port) throws IOException { 853 DatagramPacket packet; 854 855 packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port); 857 if(dest.isMulticastAddress() && mcast_send_sock != null) { mcast_send_sock.send(packet); 859 } 860 else { 861 if(sock != null) 862 sock.send(packet); 863 } 864 } 865 866 867 868 void sendMultipleUdpMessages(Message msg, Vector dests) { 869 Address dest; 870 871 for(int i=0
|