1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.IpAddress; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.List; 10 import org.jgroups.util.*; 11 import org.jgroups.util.Queue; 12 13 import java.io.*; 14 import java.net.*; 15 import java.util.*; 16 17 18 19 20 39 public class UDP extends Protocol implements Runnable { 40 41 47 DatagramSocket sock=null; 48 49 52 private static BoundedList last_ports_used=null; 53 54 56 int num_last_ports=100; 57 58 59 MulticastSocket mcast_recv_sock=null; 60 61 62 MulticastSocket mcast_send_sock=null; 63 64 65 IpAddress local_addr=null; 66 67 68 String channel_name=null; 69 70 UdpHeader udp_hdr=null; 71 72 73 IpAddress mcast_addr=null; 74 75 76 InetAddress bind_addr=null; 77 78 79 boolean bind_to_all_interfaces=false; 80 81 83 int bind_port=0; 84 int port_range=1; 86 87 String mcast_addr_name="228.8.8.8"; 88 89 90 int mcast_port=7600; 91 92 93 Thread mcast_receiver=null; 94 95 96 UcastReceiver ucast_receiver=null; 97 98 100 boolean ip_mcast=true; 101 102 103 int ip_ttl=64; 104 105 106 final Vector members=new Vector(11); 107 108 109 final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024); 110 111 112 int mcast_send_buf_size=32000; 113 114 115 int mcast_recv_buf_size=64000; 116 117 118 int ucast_send_buf_size=32000; 119 120 121 int ucast_recv_buf_size=64000; 122 123 127 boolean loopback=true; 128 129 130 132 boolean discard_incompatible_packets=false; 133 134 138 boolean use_incoming_packet_handler=false; 139 140 141 Queue incoming_queue=null; 142 143 145 IncomingPacketHandler incoming_packet_handler=null; 146 147 149 boolean use_outgoing_packet_handler=false; 150 151 152 Queue outgoing_queue=null; 153 154 OutgoingPacketHandler outgoing_packet_handler=null; 155 156 158 byte[] additional_data=null; 159 160 162 int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic(); 163 164 167 long max_bundle_timeout=20; 168 169 170 boolean enable_bundling=false; 171 172 173 TimeScheduler timer=null; 174 175 178 HashMap addr_translation_table=new HashMap(); 179 180 boolean use_addr_translation=false; 181 182 183 static final String name="UDP"; 184 185 static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address"; 186 187 188 final int VERSION_LENGTH=Version.getLength(); 189 190 191 192 193 197 public UDP() { 198 ; 199 } 200 201 204 public String toString() { 205 return "Protocol UDP(local address: " + local_addr + ')'; 206 } 207 208 209 BoundedList getLastPortsUsed() { 210 if(last_ports_used == null) 211 last_ports_used=new BoundedList(num_last_ports); 212 return last_ports_used; 213 } 214 215 216 217 public void run() { 218 DatagramPacket packet; 219 byte receive_buf[]=new byte[65535]; 220 int len, sender_port; 221 byte[] tmp, data; 222 InetAddress sender_addr; 223 224 packet=new DatagramPacket(receive_buf, receive_buf.length); 226 227 while(mcast_receiver != null && mcast_recv_sock != null) { 228 try { 229 packet.setData(receive_buf, 0, receive_buf.length); 230 mcast_recv_sock.receive(packet); 231 sender_addr=packet.getAddress(); 232 sender_port=packet.getPort(); 233 len=packet.getLength(); 234 data=packet.getData(); 235 236 if(len == 4) { if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') { 238 handleDiagnosticProbe(sender_addr, sender_port); 239 continue; 240 } 241 } 242 243 if(log.isTraceEnabled()){ 244 StringBuffer sb=new StringBuffer ("received (mcast) "); 245 sb.append(len).append(" bytes from ").append(sender_addr).append(':'); 246 sb.append(sender_port).append(" (size=").append(len).append(" bytes)"); 247 log.trace(sb.toString()); 248 } 249 if(len > receive_buf.length) { 250 if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " + 251 "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " + 252 "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length); 253 } 254 255 if(Version.compareTo(data) == false) { 256 if(log.isWarnEnabled()) { 257 StringBuffer sb=new StringBuffer (); 258 sb.append("packet from ").append(sender_addr).append(':').append(sender_port); 259 sb.append(" has different version (").append(Version.printVersionId(data, Version.version_id.length)); 260 sb.append(") from ours (").append(Version.printVersionId(Version.version_id)).append("). "); 261 if(discard_incompatible_packets) 262 sb.append("Packet is discarded"); 263 else 264 sb.append("This may cause problems"); 265 log.warn(sb.toString()); 266 } 267 if(discard_incompatible_packets) 268 continue; 269 } 270 271 if(use_incoming_packet_handler) { 272 tmp=new byte[len]; 273 System.arraycopy(data, 0, tmp, 0, len); 274 incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp)); 275 } 276 else 277 handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data); 278 } 279 catch(SocketException sock_ex) { 280 if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex); 281 break; 282 } 283 catch(InterruptedIOException io_ex) { ; } 286 catch(Throwable ex) { 287 if(log.isErrorEnabled()) 288 log.error("failure in multicast receive()", ex); 289 Util.sleep(100); } 291 } 292 if(log.isDebugEnabled()) log.debug("multicast thread terminated"); 293 } 294 295 301 void handleDiagnosticProbe(InetAddress sender, int port) { 302 try { 303 byte[] diag_rsp=getDiagResponse().getBytes(); 304 DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port); 305 if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port); 306 sock.send(rsp); 307 } 308 catch(Throwable t) { 309 if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port + 310 ", exception=" + t); 311 } 312 } 313 314 String getDiagResponse() { 315 StringBuffer sb=new StringBuffer (); 316 sb.append(local_addr).append(" (").append(channel_name).append(')'); 317 sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n"); 318 sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n"); 319 sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n'); 320 sb.append("members: ").append(members).append('\n'); 321 322 return sb.toString(); 323 } 324 325 326 327 328 329 330 331 public String getName() { 332 return name; 333 } 334 335 336 public void init() throws Exception { 337 if(use_incoming_packet_handler) { 338 incoming_queue=new Queue(); 339 incoming_packet_handler=new IncomingPacketHandler(); 340 } 341 if(use_outgoing_packet_handler) { 342 outgoing_queue=new Queue(); 343 if(enable_bundling) { 344 timer=stack != null? stack.timer : null; 345 if(timer == null) 346 throw new Exception ("timer could not be retrieved"); 347 outgoing_packet_handler=new BundlingOutgoingPacketHandler(); 348 } 349 else 350 outgoing_packet_handler=new OutgoingPacketHandler(); 351 } 352 } 353 354 355 358 public void start() throws Exception { 359 if(log.isDebugEnabled()) log.debug("creating sockets and starting threads"); 360 createSockets(); 361 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 362 startThreads(); 363 } 364 365 366 public void stop() { 367 if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads"); 368 stopThreads(); closeSockets(); } 371 372 373 374 386 public boolean setProperties(Properties props) { 387 String str; 388 String tmp = null; 389 390 super.setProperties(props); 391 392 try { 394 tmp=System.getProperty("bind.address"); 395 if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) { 396 tmp=null; 397 } 398 } 399 catch (SecurityException ex){ 400 } 401 402 if(tmp != null) 403 str=tmp; 404 else 405 str=props.getProperty("bind_addr"); 406 if(str != null) { 407 try { 408 bind_addr=InetAddress.getByName(str); 409 } 410 catch(UnknownHostException unknown) { 411 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); 412 return false; 413 } 414 props.remove("bind_addr"); 415 } 416 417 str=props.getProperty("bind_to_all_interfaces"); 418 if(str != null) { 419 bind_to_all_interfaces=new Boolean (str).booleanValue(); 420 props.remove("bind_to_all_interfaces"); 421 } 422 423 str=props.getProperty("bind_port"); 424 if(str != null) { 425 bind_port=Integer.parseInt(str); 426 props.remove("bind_port"); 427 } 428 429 str=props.getProperty("num_last_ports"); 430 if(str != null) { 431 num_last_ports=Integer.parseInt(str); 432 props.remove("num_last_ports"); 433 } 434 435 str=props.getProperty("start_port"); 436 if(str != null) { 437 bind_port=Integer.parseInt(str); 438 props.remove("start_port"); 439 } 440 441 str=props.getProperty("port_range"); 442 if(str != null) { 443 port_range=Integer.parseInt(str); 444 props.remove("port_range"); 445 } 446 447 str=props.getProperty("mcast_addr"); 448 if(str != null) { 449 mcast_addr_name=str; 450 props.remove("mcast_addr"); 451 } 452 453 str=props.getProperty("mcast_port"); 454 if(str != null) { 455 mcast_port=Integer.parseInt(str); 456 props.remove("mcast_port"); 457 } 458 459 str=props.getProperty("ip_mcast"); 460 if(str != null) { 461 ip_mcast=Boolean.valueOf(str).booleanValue(); 462 props.remove("ip_mcast"); 463 } 464 465 str=props.getProperty("ip_ttl"); 466 if(str != null) { 467 ip_ttl=Integer.parseInt(str); 468 props.remove("ip_ttl"); 469 } 470 471 str=props.getProperty("mcast_send_buf_size"); 472 if(str != null) { 473 mcast_send_buf_size=Integer.parseInt(str); 474 props.remove("mcast_send_buf_size"); 475 } 476 477 str=props.getProperty("mcast_recv_buf_size"); 478 if(str != null) { 479 mcast_recv_buf_size=Integer.parseInt(str); 480 props.remove("mcast_recv_buf_size"); 481 } 482 483 str=props.getProperty("ucast_send_buf_size"); 484 if(str != null) { 485 ucast_send_buf_size=Integer.parseInt(str); 486 props.remove("ucast_send_buf_size"); 487 } 488 489 str=props.getProperty("ucast_recv_buf_size"); 490 if(str != null) { 491 ucast_recv_buf_size=Integer.parseInt(str); 492 props.remove("ucast_recv_buf_size"); 493 } 494 495 str=props.getProperty("loopback"); 496 if(str != null) { 497 loopback=Boolean.valueOf(str).booleanValue(); 498 props.remove("loopback"); 499 } 500 501 str=props.getProperty("discard_incompatibe_packets"); 502 if(str != null) { 503 discard_incompatible_packets=Boolean.valueOf(str).booleanValue(); 504 props.remove("discard_incompatibe_packets"); 505 } 506 507 str=props.getProperty("use_packet_handler"); 509 if(str != null) { 510 use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); 511 props.remove("use_packet_handler"); 512 if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead"); 513 } 514 515 str=props.getProperty("use_incoming_packet_handler"); 516 if(str != null) { 517 use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); 518 props.remove("use_incoming_packet_handler"); 519 } 520 521 str=props.getProperty("use_outgoing_packet_handler"); 522 if(str != null) { 523 use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue(); 524 props.remove("use_outgoing_packet_handler"); 525 } 526 527 str=props.getProperty("max_bundle_size"); 528 if(str != null) { 529 int bundle_size=Integer.parseInt(str); 530 if(bundle_size > max_bundle_size) { 531 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + 532 ") is greater than largest UDP fragmentation size (" + max_bundle_size + ')'); 533 return false; 534 } 535 if(bundle_size <= 0) { 536 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0"); 537 return false; 538 } 539 max_bundle_size=bundle_size; 540 props.remove("max_bundle_size"); 541 } 542 543 str=props.getProperty("max_bundle_timeout"); 544 if(str != null) { 545 max_bundle_timeout=Long.parseLong(str); 546 if(max_bundle_timeout <= 0) { 547 if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid"); 548 return false; 549 } 550 props.remove("max_bundle_timeout"); 551 } 552 553 str=props.getProperty("enable_bundling"); 554 if(str != null) { 555 enable_bundling=Boolean.valueOf(str).booleanValue(); 556 props.remove("enable_bundling"); 557 } 558 559 str=props.getProperty("use_addr_translation"); 560 if(str != null) { 561 use_addr_translation=Boolean.valueOf(str).booleanValue(); 562 props.remove("use_addr_translation"); 563 } 564 565 if(props.size() > 0) { 566 System.err.println("UDP.setProperties(): the following properties are not recognized:"); 567 props.list(System.out); 568 return false; 569 } 570 571 if(enable_bundling) { 572 if(use_outgoing_packet_handler == false) 573 if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true"); 574 use_outgoing_packet_handler=true; 575 } 576 577 return true; 578 } 579 580 581 582 586 public void startUpHandler() { 587 ; 588 } 589 590 594 public void up(Event evt) { 595 596 switch(evt.getType()) { 597 598 case Event.CONFIG: 599 passUp(evt); 600 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 601 handleConfigEvent((HashMap)evt.getArg()); 602 return; 603 } 604 605 passUp(evt); 606 } 607 608 614 public void down(Event evt) { 615 Message msg; 616 Object dest_addr; 617 618 if(evt.getType() != Event.MSG) { handleDownEvent(evt); 620 return; 621 } 622 623 msg=(Message)evt.getArg(); 624 625 if(channel_name != null) { 626 msg.putHeader(name, udp_hdr); 629 } 630 631 dest_addr=msg.getDest(); 632 633 if(observer != null) 636 observer.passDown(evt); 637 638 if(dest_addr == null) { if(ip_mcast) { 640 if(mcast_addr == null) { 641 if(log.isErrorEnabled()) log.error("dest address of message is null, and " + 642 "sending to default address fails as mcast_addr is null, too !" + 643 " Discarding message " + Util.printEvent(evt)); 644 return; 645 } 646 msg.setDest(mcast_addr); 648 } 649 else { 650 sendMultipleUdpMessages(msg, members); 652 return; 653 } 654 } 655 656 try { 657 sendUdpMessage(msg); 658 } 659 catch(Exception e) { 660 if(log.isErrorEnabled()) log.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr); 661 } 662 } 663 664 665 666 667 668 669 670 671 672 673 674 675 676 682 void setSourceAddress(Message msg) { 683 if(msg.getSrc() == null) 684 msg.setSrc(local_addr); 685 } 686 687 688 689 690 696 void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) { 697 ByteArrayInputStream inp_stream=null; 698 DataInputStream inp=null; 699 Message msg=null; 700 List l; 702 try { 703 inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH); 705 inp=new DataInputStream(inp_stream); 706 707 if(enable_bundling) { 708 l=bufferToList(inp, dest, sender, port); 709 for(Enumeration en=l.elements(); en.hasMoreElements();) { 710 msg=(Message)en.nextElement(); 711 try { 712 handleMessage(msg); 713 } 714 catch(Throwable t) { 715 if(log.isErrorEnabled()) log.error("failure: " + t.toString()); 716 } 717 } 718 } 719 else { 720 msg=bufferToMessage(inp, dest, sender, port); 721 handleMessage(msg); 722 } 723 } 724 catch(Throwable e) { 725 if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e); 726 } 727 finally { 728 Util.closeInputStream(inp); 729 Util.closeInputStream(inp_stream); 730 } 731 } 732 733 734 735 void handleMessage(Message msg) { 736 Event evt; 737 UdpHeader hdr; 738 Address dst=msg.getDest(); 739 740 if(dst == null) 741 dst=mcast_addr; 742 743 if(loopback) { 745 Address SRC=msg.getSrc(); 746 if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) { 747 if(log.isTraceEnabled()) 748 log.trace("discarded own loopback multicast packet"); 749 return; 750 } 751 } 752 753 evt=new Event(Event.MSG, msg); 754 if(log.isTraceEnabled()) { 755 StringBuffer sb=new StringBuffer ("message is "); 756 sb.append(msg).append(", headers are ").append(msg.getHeaders()); 757 log.trace(sb.toString()); 758 } 759 760 762 if(observer != null) 763 observer.up(evt, up_queue.size()); 764 765 hdr=(UdpHeader)msg.getHeader(name); if(hdr != null) { 767 768 769 String ch_name=hdr.channel_name; 770 771 if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) && 774 !ch_name.equals(Util.DIAG_GROUP)) { 775 if(log.isWarnEnabled()) log.warn("discarded message from different group (" + 776 ch_name + "). Sender was " + msg.getSrc()); 777 return; 778 } 779 } 780 else { 781 if(log.isErrorEnabled()) log.error("message does not have a UDP header"); 782 } 783 passUp(evt); 784 } 785 786 787 void sendUdpMessage(Message msg) throws Exception { 788 sendUdpMessage(msg, false); 789 } 790 791 792 void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception { 793 IpAddress dest; 794 Message copy; 795 Event evt; 796 797 dest=(IpAddress)msg.getDest(); setSourceAddress(msg); 799 800 if(log.isTraceEnabled()) { 801 StringBuffer sb=new StringBuffer ("sending msg to "); 802 sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders()); 803 log.trace(sb.toString()); 804 } 805 806 if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) { 810 copy=msg.copy(); 811 copy.setSrc(local_addr); 813 evt=new Event(Event.MSG, copy); 815 816 818 if(observer != null) 819 observer.up(evt, up_queue.size()); 820 if(log.isTraceEnabled()) log.trace("looped back local message " + copy); 821 passUp(evt); 822 if(dest != null && !dest.isMulticastAddress()) 823 return; 824 } 825 826 if(use_outgoing_packet_handler) { 827 if(copyForOutgoingQueue) 828 outgoing_queue.add(msg.copy()); 829 else 830 outgoing_queue.add(msg); 831 return; 832 } 833 834 send(msg); 835 } 836 837 838 839 void send(Message msg) throws Exception { 840 Buffer buf; 841 IpAddress dest=(IpAddress)msg.getDest(); IpAddress SRC=(IpAddress)msg.getSrc(); 843 844 synchronized(out_stream) { 845 buf=messageToBuffer(msg, dest, src); 846 doSend(buf, dest.getIpAddress(), dest.getPort()); 847 } 848 } 849 850 851 852 void doSend(Buffer buf, InetAddress dest, int port) throws IOException { 853 DatagramPacket packet; 854 855 packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port); 857 if(dest.isMulticastAddress() && mcast_send_sock != null) { mcast_send_sock.send(packet); 859 } 860 else { 861 if(sock != null) 862 sock.send(packet); 863 } 864 } 865 866 867 868 void sendMultipleUdpMessages(Message msg, Vector dests) { 869 Address dest; 870 871 for(int i=0; i < dests.size(); i++) { 872 dest=(Address)dests.elementAt(i); 873 msg.setDest(dest); 874 875 try { 876 sendUdpMessage(msg, 877 true); } 879 catch(Exception e) { 880 if(log.isDebugEnabled()) log.debug("exception=" + e); 881 } 882 } 883 } 884 885 886 894 Buffer messageToBuffer(Message msg, IpAddress dest, IpAddress src) throws IOException { 895 Buffer retval; 896 DataOutputStream out=null; 897 898 out_stream.reset(); 899 out_stream.write(Version.version_id, 0, Version.version_id.length); try { 901 out=new DataOutputStream(out_stream); 902 nullAddresses(msg, dest, src); 903 msg.writeTo(out); 904 revertAddresses(msg, dest, src); 905 out.flush(); 906 retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 907 return retval; 908 } 909 finally { 910 Util.closeOutputStream(out); 911 } 912 } 913 914 915 void nullAddresses(Message msg, IpAddress dest, IpAddress src) { 916 msg.setDest(null); 917 if(!dest.isMulticastAddress()) { if(src != null) { 919 msg.setSrc(new IpAddress(src.getPort(), false)); if(src.getAdditionalData() != null) 921 ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData()); 922 } 923 else { 924 msg.setSrc(null); 925 } 926 } 927 else { if(src != null) { 929 msg.setSrc(new IpAddress(src.getPort(), false)); 930 if(src.getAdditionalData() != null) 931 ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData()); 932 } 933 } 934 } 935 936 void revertAddresses(Message msg, IpAddress dest, IpAddress src) { 937 msg.setDest(dest); 938 msg.setSrc(src); 939 } 940 941 942 Message bufferToMessage(DataInputStream instream, IpAddress dest, InetAddress sender, int port) 943 throws IOException, IllegalAccessException , InstantiationException { 944 Message msg=new Message(); 945 msg.readFrom(instream); 946 setAddresses(msg, dest, sender, port); 947 return msg; 948 } 949 950 951 void setAddresses(Message msg, IpAddress dest, InetAddress sender, int port) { 952 if(msg.getDest() == null && dest != null) 954 msg.setDest(dest); 955 956 IpAddress src_addr=(IpAddress)msg.getSrc(); 958 if(src_addr == null) { 959 try {msg.setSrc(new IpAddress(sender, port));} catch(Throwable t) {} 960 } 961 else { 962 byte[] tmp_additional_data=src_addr.getAdditionalData(); 963 if(src_addr.getIpAddress() == null) { 964 try {msg.setSrc(new IpAddress(sender, src_addr.getPort()));} catch(Throwable t) {} 965 } 966 if(tmp_additional_data != null) 967 ((IpAddress)msg.getSrc()).setAdditionalData(tmp_additional_data); 968 } 969 } 970 971 Buffer listToBuffer(List l, IpAddress dest) throws IOException { 972 Buffer retval=null; 973 IpAddress src; 974 Message msg; 975 int len=l != null? l.size() : 0; 976 DataOutputStream out=null; 977 out_stream.reset(); 978 out_stream.write(Version.version_id, 0, Version.version_id.length); try { 980 out=new DataOutputStream(out_stream); 981 out.writeInt(len); 982 for(Enumeration en=l.elements(); en.hasMoreElements();) { 983 msg=(Message)en.nextElement(); 984 src=(IpAddress)msg.getSrc(); 985 nullAddresses(msg, dest, src); 986 msg.writeTo(out); 987 revertAddresses(msg, dest, src); 988 } 989 out.flush(); 990 retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 991 return retval; 992 } 993 finally { 994 Util.closeOutputStream(out); 995 } 996 } 997 998 999 1000 List bufferToList(DataInputStream instream, IpAddress dest, InetAddress sender, int port) 1001 throws IOException, IllegalAccessException , InstantiationException { 1002 List l=new List(); 1003 DataInputStream in=null; 1004 int len; 1005 Message msg; 1006 1007 try { 1008 len=instream.readInt(); 1009 for(int i=0; i < len; i++) { 1010 msg=new Message(); 1011 msg.readFrom(instream); 1012 setAddresses(msg, dest, sender, port); 1013 l.add(msg); 1014 } 1015 return l; 1016 } 1017 finally { 1018 Util.closeInputStream(in); 1019 } 1020 } 1021 1022 1023 1024 1029 void createSockets() throws Exception { 1030 InetAddress tmp_addr=null; 1031 1032 1034 1036 if(bind_addr == null) { 1040 InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()); 1041 if(interfaces != null && interfaces.length > 0) 1042 bind_addr=interfaces[0]; 1043 } 1044 if(bind_addr == null) 1045 bind_addr=InetAddress.getLocalHost(); 1046 1047 if(bind_addr != null) 1048 if(log.isInfoEnabled()) log.info("sockets will use interface " + bind_addr.getHostAddress()); 1049 1050 1051 if(bind_port > 0) { 1054 sock=createDatagramSocketWithBindPort(); 1055 } 1056 else { 1057 sock=createEphemeralDatagramSocket(); 1058 } 1059 1060 if(sock == null) 1061 throw new Exception ("UDP.createSocket(): sock is null"); 1062 1063 local_addr=new IpAddress(sock.getLocalAddress(), sock.getLocalPort()); 1064 if(additional_data != null) 1065 local_addr.setAdditionalData(additional_data); 1066 1067 1068 if(ip_mcast) { 1070 mcast_recv_sock=new MulticastSocket(mcast_port); 1072 mcast_recv_sock.setTimeToLive(ip_ttl); 1073 tmp_addr=InetAddress.getByName(mcast_addr_name); 1074 mcast_addr=new IpAddress(tmp_addr, mcast_port); 1075 1076 if(bind_to_all_interfaces && Util.getJavaVersion() >= 14) { 1077 bindToAllInterfaces(mcast_recv_sock, mcast_addr.getIpAddress()); 1078 } 1079 else { 1080 if(bind_addr != null) 1081 mcast_recv_sock.setInterface(bind_addr); 1082 mcast_recv_sock.joinGroup(tmp_addr); 1083 } 1084 1085 mcast_send_sock=new MulticastSocket(); 1087 mcast_send_sock.setTimeToLive(ip_ttl); 1088 if(bind_addr != null) 1089 mcast_send_sock.setInterface(bind_addr); 1090 } 1092 1093 setBufferSizes(); 1094 if(log.isInfoEnabled()) log.info("socket information:\n" + dumpSocketInfo()); 1095 } 1096 1097 1098 private void bindToAllInterfaces(MulticastSocket s, InetAddress mcastAddr) throws IOException { 1099 SocketAddress tmp_mcast_addr=new InetSocketAddress(mcastAddr, mcast_port); 1100 Enumeration en=NetworkInterface.getNetworkInterfaces(); 1101 while(en.hasMoreElements()) { 1102 NetworkInterface i=(NetworkInterface)en.nextElement(); 1103 for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) { 1104 InetAddress addr=(InetAddress)en2.nextElement(); 1105 s.joinGroup(tmp_mcast_addr, i); 1108 if(log.isTraceEnabled()) 1109 log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")"); 1110 break; 1111 } 1112 } 1113 } 1114 1115 1116 1118 DatagramSocket createEphemeralDatagramSocket() throws SocketException { 1119 DatagramSocket tmp=null; 1120 int localPort=0; 1121 while(true) { 1122 tmp=new DatagramSocket(localPort, bind_addr); if(num_last_ports <= 0) 1124 break; 1125 localPort=tmp.getLocalPort(); 1126 if(getLastPortsUsed().contains(new Integer (localPort))) { 1127 if(log.isDebugEnabled()) 1128 log.debug("local port " + localPort + " already seen in this session; will try to get other port"); 1129 try {tmp.close();} catch(Throwable e) {} 1130 localPort++; 1131 continue; 1132 } 1133 else { 1134 getLastPortsUsed().add(new Integer (localPort)); 1135 break; 1136 } 1137 } 1138 return tmp; 1139 } 1140 1141 1142 1143 1144 1150 DatagramSocket createDatagramSocketWithBindPort() throws Exception { 1151 DatagramSocket tmp=null; 1152 int rcv_port=bind_port, max_port=bind_port + port_range; 1154 while(rcv_port <= max_port) { 1155 try { 1156 tmp=new DatagramSocket(rcv_port, bind_addr); 1157 break; 1158 } 1159 catch(SocketException bind_ex) { rcv_port++; 1161 } 1162 catch(SecurityException sec_ex) { rcv_port++; 1164 } 1165 1166 if(rcv_port >= max_port + 1) { throw new Exception ("UDP.createSockets(): cannot list on any port in range " + 1169 bind_port + '-' + (bind_port + port_range)); 1170 } 1171 } 1172 return tmp; 1173 } 1174 1175 1176 String dumpSocketInfo() throws Exception { 1177 StringBuffer sb=new StringBuffer (); 1178 sb.append("local_addr=").append(local_addr); 1179 sb.append(", mcast_addr=").append(mcast_addr); 1180 sb.append(", bind_addr=").append(bind_addr); 1181 sb.append(", ttl=").append(ip_ttl); 1182 1183 if(sock != null) { 1184 sb.append("\nsock: bound to "); 1185 sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort()); 1186 sb.append(", receive buffer size=").append(sock.getReceiveBufferSize()); 1187 sb.append(", send buffer size=").append(sock.getSendBufferSize()); 1188 } 1189 1190 if(mcast_recv_sock != null) { 1191 sb.append("\nmcast_recv_sock: bound to "); 1192 sb.append(mcast_recv_sock.getInterface().getHostAddress()).append(':').append(mcast_recv_sock.getLocalPort()); 1193 sb.append(", send buffer size=").append(mcast_recv_sock.getSendBufferSize()); 1194 sb.append(", receive buffer size=").append(mcast_recv_sock.getReceiveBufferSize()); 1195 } 1196 1197 if(mcast_send_sock != null) { 1198 sb.append("\nmcast_send_sock: bound to "); 1199 sb.append(mcast_send_sock.getInterface().getHostAddress()).append(':').append(mcast_send_sock.getLocalPort()); 1200 sb.append(", send buffer size=").append(mcast_send_sock.getSendBufferSize()); 1201 sb.append(", receive buffer size=").append(mcast_send_sock.getReceiveBufferSize()); 1202 } 1203 return sb.toString(); 1204 } 1205 1206 1207 void setBufferSizes() { 1208 if(sock != null) { 1209 try { 1210 sock.setSendBufferSize(ucast_send_buf_size); 1211 } 1212 catch(Throwable ex) { 1213 if(log.isWarnEnabled()) log.warn("failed setting ucast_send_buf_size in sock: " + ex); 1214 } 1215 try { 1216 sock.setReceiveBufferSize(ucast_recv_buf_size); 1217 } 1218 catch(Throwable ex) { 1219 if(log.isWarnEnabled()) log.warn("failed setting ucast_recv_buf_size in sock: " + ex); 1220 } 1221 } 1222 1223 if(mcast_recv_sock != null) { 1224 try { 1225 mcast_recv_sock.setSendBufferSize(mcast_send_buf_size); 1226 } 1227 catch(Throwable ex) { 1228 if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_recv_sock: " + ex); 1229 } 1230 1231 try { 1232 mcast_recv_sock.setReceiveBufferSize(mcast_recv_buf_size); 1233 } 1234 catch(Throwable ex) { 1235 if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_recv_sock: " + ex); 1236 } 1237 } 1238 1239 if(mcast_send_sock != null) { 1240 try { 1241 mcast_send_sock.setSendBufferSize(mcast_send_buf_size); 1242 } 1243 catch(Throwable ex) { 1244 if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_send_sock: " + ex); 1245 } 1246 1247 try { 1248 mcast_send_sock.setReceiveBufferSize(mcast_recv_buf_size); 1249 } 1250 catch(Throwable ex) { 1251 if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_send_sock: " + ex); 1252 } 1253 } 1254 1255 } 1256 1257 1258 1261 void closeSockets() { 1262 closeMulticastSocket(); 1264 1265 closeSocket(); 1267 } 1268 1269 1270 void closeMulticastSocket() { 1271 if(mcast_recv_sock != null) { 1272 try { 1273 if(mcast_addr != null) { 1274 mcast_recv_sock.leaveGroup(mcast_addr.getIpAddress()); 1275 } 1276 mcast_recv_sock.close(); mcast_recv_sock=null; 1278 if(log.isDebugEnabled()) log.debug("multicast receive socket closed"); 1279 } 1280 catch(IOException ex) { 1281 } 1282 mcast_addr=null; 1283 } 1284 1285 if(mcast_send_sock != null) { 1286 mcast_send_sock.close(); mcast_send_sock=null; 1288 if(log.isDebugEnabled()) log.debug("multicast send socket closed"); 1289 } 1290 } 1291 1292 1293 void closeSocket() { 1294 if(sock != null) { 1295 sock.close(); 1296 sock=null; 1297 if(log.isDebugEnabled()) log.debug("socket closed"); 1298 } 1299 } 1300 1301 1302 1303 1311 1338 1339 1342 void startThreads() throws Exception { 1343 if(ucast_receiver == null) { 1344 ucast_receiver=new UcastReceiver(); 1346 ucast_receiver.start(); 1347 if(log.isDebugEnabled()) log.debug("created unicast receiver thread"); 1348 } 1349 1350 if(ip_mcast) { 1351 if(mcast_receiver != null) { 1352 if(mcast_receiver.isAlive()) { 1353 if(log.isDebugEnabled()) log.debug("did not create new multicastreceiver thread as existing " + 1354 "multicast receiver thread is still running"); 1355 } 1356 else 1357 mcast_receiver=null; } 1359 1360 if(mcast_receiver == null) { 1361 mcast_receiver=new Thread (this, "UDP mcast receiver"); 1362 mcast_receiver.setPriority(Thread.MAX_PRIORITY); mcast_receiver.setDaemon(true); 1364 mcast_receiver.start(); 1365 } 1366 } 1367 if(use_outgoing_packet_handler) 1368 outgoing_packet_handler.start(); 1369 if(use_incoming_packet_handler) 1370 incoming_packet_handler.start(); 1371 } 1372 1373 1374 1377 void stopThreads() { 1378 Thread tmp; 1379 1380 if(mcast_receiver != null) { 1382 if(mcast_receiver.isAlive()) { 1383 tmp=mcast_receiver; 1384 mcast_receiver=null; 1385 closeMulticastSocket(); tmp.interrupt(); 1387 try { 1388 tmp.join(100); 1389 } 1390 catch(Exception e) { 1391 } 1392 tmp=null; 1393 } 1394 mcast_receiver=null; 1395 } 1396 1397 if(ucast_receiver != null) { 1399 ucast_receiver.stop(); 1400 ucast_receiver=null; 1401 } 1402 1403 if(incoming_packet_handler != null) 1405 incoming_packet_handler.stop(); 1406 1407 if(outgoing_packet_handler != null) 1409 outgoing_packet_handler.stop(); 1410 } 1411 1412 1413 void handleDownEvent(Event evt) { 1414 switch(evt.getType()) { 1415 1416 case Event.TMP_VIEW: 1417 case Event.VIEW_CHANGE: 1418 synchronized(members) { 1419 members.removeAllElements(); 1420 Vector tmpvec=((View)evt.getArg()).getMembers(); 1421 for(int i=0; i < tmpvec.size(); i++) 1422 members.addElement(tmpvec.elementAt(i)); 1423 } 1424 break; 1425 1426 case Event.GET_LOCAL_ADDRESS: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 1428 break; 1429 1430 case Event.CONNECT: 1431 channel_name=(String )evt.getArg(); 1432 udp_hdr=new UdpHeader(channel_name); 1433 1434 passUp(new Event(Event.CONNECT_OK)); 1438 break; 1439 1440 case Event.DISCONNECT: 1441 passUp(new Event(Event.DISCONNECT_OK)); 1442 break; 1443 1444 case Event.CONFIG: 1445 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 1446 handleConfigEvent((HashMap)evt.getArg()); 1447 break; 1448 } 1449 } 1450 1451 1452 void handleConfigEvent(HashMap map) { 1453 if(map == null) return; 1454 if(map.containsKey("additional_data")) 1455 additional_data=(byte[])map.get("additional_data"); 1456 if(map.containsKey("send_buf_size")) { 1457 mcast_send_buf_size=((Integer )map.get("send_buf_size")).intValue(); 1458 ucast_send_buf_size=mcast_send_buf_size; 1459 } 1460 if(map.containsKey("recv_buf_size")) { 1461 mcast_recv_buf_size=((Integer )map.get("recv_buf_size")).intValue(); 1462 ucast_recv_buf_size=mcast_recv_buf_size; 1463 } 1464 setBufferSizes(); 1465 } 1466 1467 1468 1469 1470 1471 1472 1473 class IncomingQueueEntry { 1474 IpAddress dest=null; 1475 InetAddress sender=null; 1476 int port=-1; 1477 byte[] buf; 1478 1479 public IncomingQueueEntry(IpAddress dest, InetAddress sender, int port, byte[] buf) { 1480 this.dest=dest; 1481 this.sender=sender; 1482 this.port=port; 1483 this.buf=buf; 1484 } 1485 1486 public IncomingQueueEntry(byte[] buf) { 1487 this.buf=buf; 1488 } 1489 } 1490 1491 1492 1493 public class UcastReceiver implements Runnable { 1494 boolean running=true; 1495 Thread thread=null; 1496 1497 1498 public void start() { 1499 if(thread == null) { 1500 thread=new Thread (this, "UDP.UcastReceiverThread"); 1501 thread.setDaemon(true); 1502 running=true; 1503 thread.start(); 1504 } 1505 } 1506 1507 1508 public void stop() { 1509 Thread tmp; 1510 if(thread != null && thread.isAlive()) { 1511 running=false; 1512 tmp=thread; 1513 thread=null; 1514 closeSocket(); tmp.interrupt(); 1516 tmp=null; 1517 } 1518 thread=null; 1519 } 1520 1521 1522 public void run() { 1523 DatagramPacket packet; 1524 byte receive_buf[]=new byte[65535]; 1525 int len; 1526 byte[] data, tmp; 1527 InetAddress sender_addr; 1528 int sender_port; 1529 1530 packet=new DatagramPacket(receive_buf, receive_buf.length); 1532 1533 while(running && thread != null && sock != null) { 1534 try { 1535 packet.setData(receive_buf, 0, receive_buf.length); 1536 sock.receive(packet); 1537 sender_addr=packet.getAddress(); 1538 sender_port=packet.getPort(); 1539 len=packet.getLength(); 1540 data=packet.getData(); 1541 if(log.isTraceEnabled()) 1542 log.trace("received (ucast) " + len + " bytes from " + sender_addr + ':' + sender_port); 1543 if(len > receive_buf.length) { 1544 if(log.isErrorEnabled()) 1545 log.error("size of the received packet (" + len + ") is bigger than allocated buffer (" + 1546 receive_buf.length + "): will not be able to handle packet. " + 1547 "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length); 1548 } 1549 1550 if(Version.compareTo(data) == false) { 1551 if(log.isWarnEnabled()) { 1552 StringBuffer sb=new StringBuffer (); 1553 sb.append("packet from ").append(sender_addr).append(':').append(sender_port); 1554 sb.append(" has different version (").append(Version.printVersionId(data, Version.version_id.length)); 1555 sb.append(") from ours (").append(Version.printVersionId(Version.version_id)).append("). "); 1556 if(discard_incompatible_packets) 1557 sb.append("Packet is discarded"); 1558 else 1559 sb.append("This may cause problems"); 1560 log.warn(sb.toString()); 1561 } 1562 if(discard_incompatible_packets) 1563 continue; 1564 } 1565 1566 if(use_incoming_packet_handler) { 1567 tmp=new byte[len]; 1568 System.arraycopy(data, 0, tmp, 0, len); 1569 incoming_queue.add(new IncomingQueueEntry(local_addr, sender_addr, sender_port, tmp)); 1570 } 1571 else 1572 handleIncomingUdpPacket(local_addr, sender_addr, sender_port, data); 1573 } 1574 catch(SocketException sock_ex) { 1575 if(log.isDebugEnabled()) log.debug("unicast receiver socket is closed, exception=" + sock_ex); 1576 break; 1577 } 1578 catch(InterruptedIOException io_ex) { ; } 1581 catch(Throwable ex) { 1582 if(log.isErrorEnabled()) 1583 log.error("[" + local_addr + "] failed receiving unicast packet", ex); 1584 Util.sleep(100); } 1586 } 1587 if(log.isDebugEnabled()) log.debug("unicast receiver thread terminated"); 1588 } 1589 } 1590 1591 1592 1596 class IncomingPacketHandler implements Runnable { 1597 Thread t=null; 1598 1599 public void run() { 1600 byte[] data; 1601 IncomingQueueEntry entry; 1602 1603 while(incoming_queue != null && incoming_packet_handler != null) { 1604 try { 1605 entry=(IncomingQueueEntry)incoming_queue.remove(); 1606 data=entry.buf; 1607 } 1608 catch(QueueClosedException closed_ex) { 1609 if(log.isDebugEnabled()) log.debug("packet_handler thread terminating"); 1610 break; 1611 } 1612 handleIncomingUdpPacket(entry.dest, entry.sender, entry.port, data); 1613 } 1614 } 1615 1616 void start() { 1617 if(t == null || !t.isAlive()) { 1618 t=new Thread (this, "UDP.IncomingPacketHandler thread"); 1619 t.setDaemon(true); 1620 t.start(); 1621 } 1622 } 1623 1624 void stop() { 1625 if(incoming_queue != null) 1626 incoming_queue.close(false); t=null; 1628 incoming_queue=null; 1629 } 1630 } 1631 1632 1633 1637 class OutgoingPacketHandler implements Runnable { 1638 Thread t=null; 1639 byte[] buf; 1640 DatagramPacket packet; 1641 IpAddress dest; 1642 1643 public void run() { 1644 Message msg; 1645 1646 while(outgoing_queue != null && outgoing_packet_handler != null) { 1647 try { 1648 msg=(Message)outgoing_queue.remove(); 1649 handleMessage(msg); 1650 } 1651 catch(QueueClosedException closed_ex) { 1652 break; 1653 } 1654 catch(Throwable th) { 1655 if(log.isErrorEnabled()) log.error("exception sending packet", th); 1656 } 1657 msg=null; } 1659 if(log.isTraceEnabled()) log.trace("packet_handler thread terminating"); 1660 } 1661 1662 protected void handleMessage(Message msg) throws Exception { 1663 send(msg); 1664 } 1665 1666 1667 void start() { 1668 if(t == null || !t.isAlive()) { 1669 t=new Thread (this, "UDP.OutgoingPacketHandler thread"); 1670 t.setDaemon(true); 1671 t.start(); 1672 } 1673 } 1674 1675 void stop() { 1676 if(outgoing_queue != null) 1677 outgoing_queue.close(false); t=null; 1679 } 1681 } 1682 1683 1684 1685 1686 1692 class BundlingOutgoingPacketHandler extends OutgoingPacketHandler { 1693 long total_bytes=0; 1694 1695 final HashMap msgs=new HashMap(11); 1696 1697 1698 void start() { 1699 super.start(); 1700 t.setName("UDP.BundlingOutgoingPacketHandler thread"); 1701 } 1702 1703 1704 public void run() { 1705 Message msg=null, leftover=null; 1706 long start=0; 1707 while(outgoing_queue != null) { 1708 try { 1709 total_bytes=0; 1710 msg=leftover != null? leftover : (Message)outgoing_queue.remove(); start=System.currentTimeMillis(); 1712 leftover=waitForMessagesToAccumulate(msg, outgoing_queue, max_bundle_size, start, max_bundle_timeout); 1713 bundleAndSend(start); 1714 } 1715 catch(QueueClosedException closed_ex) { 1716 break; 1717 } 1718 catch(Throwable th) { 1719 if(log.isErrorEnabled()) log.error("exception sending packet", th); 1720 } 1721 } 1722 bundleAndSend(start); 1723 if(log.isTraceEnabled()) log.trace("packet_handler thread terminating"); 1724 } 1725 1726 1727 1737 Message waitForMessagesToAccumulate(Message m, Queue q, long max_size, long start_time, long max_time) { 1738 Message msg, leftover=null; 1739 boolean running=true, size_exceeded=false, time_reached=false; 1740 long len, time_to_wait=max_time, waited_time=0; 1741 1742 while(running) { 1743 try { 1744 msg=m != null? m : (Message)q.remove(time_to_wait); 1745 m=null; len=msg.size(); 1747 checkLength(len); 1748 waited_time=System.currentTimeMillis() - start_time; 1749 time_to_wait=max_time - waited_time; 1750 size_exceeded=total_bytes + len > max_size; 1751 time_reached=time_to_wait <= 0; 1752 1753 if(size_exceeded) { 1754 running=false; 1755 leftover=msg; 1756 } 1757 else { 1758 addMessage(msg); 1759 total_bytes+=len; 1760 if(time_reached) 1761 running=false; 1762 } 1763 } 1764 catch(TimeoutException timeout) { 1765 waited_time=System.currentTimeMillis() - start_time; 1766 time_reached=true; 1767 break; 1768 } 1769 catch(QueueClosedException closed) { 1770 break; 1771 } 1772 catch(Exception ex) { 1773 log.error("failure in bundling", ex); 1774 } 1775 } 1776 return leftover; 1783 } 1784 1785 1786 void checkLength(long len) throws Exception { 1787 if(len > max_bundle_size) 1788 throw new Exception ("UDP.BundlingOutgoingPacketHandler.handleMessage(): message size (" + len + 1789 ") is greater than max bundling size (" + max_bundle_size + "). " + 1790 "Set the fragmentation/bundle size in FRAG and UDP correctly"); 1791 } 1792 1793 1794 void addMessage(Message msg) { 1795 List tmp; 1796 Address dst=msg.getDest(); 1797 synchronized(msgs) { 1798 tmp=(List)msgs.get(dst); 1799 if(tmp == null) { 1800 tmp=new List(); 1801 msgs.put(dst, tmp); 1802 } 1803 tmp.add(msg); 1804 } 1805 } 1806 1807 1808 1809 private void bundleAndSend(long start_time) { 1810 Map.Entry entry; 1811 IpAddress dst; 1812 Buffer buffer; 1813 InetAddress addr; 1814 int port; 1815 List l; 1816 long stop_time=System.currentTimeMillis(); 1817 1818 synchronized(msgs) { 1819 if(msgs.size() == 0) 1820 return; 1821 if(start_time == 0) 1822 start_time=System.currentTimeMillis(); 1823 1824 if(log.isTraceEnabled()) { 1825 StringBuffer sb=new StringBuffer ("sending ").append(numMsgs(msgs)).append(" msgs ("); 1826 sb.append(total_bytes).append(" bytes, ").append(stop_time-start_time).append("ms)"); 1827 sb.append(" to ").append(msgs.size()).append(" destination(s)"); 1828 if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")"); 1829 log.trace(sb.toString()); 1830 } 1831 for(Iterator it=msgs.entrySet().iterator(); it.hasNext();) { 1832 entry=(Map.Entry)it.next(); 1833 dst=(IpAddress)entry.getKey(); 1834 addr=dst.getIpAddress(); 1835 port=dst.getPort(); 1836 l=(List)entry.getValue(); 1837 try { 1838 if(l.size() > 0) { 1839 synchronized(out_stream) { 1840 buffer=listToBuffer(l, dst); 1841 doSend(buffer, addr, port); 1842 } 1843 } 1844 } 1845 catch(IOException e) { 1846 if(log.isErrorEnabled()) log.error("exception sending msg (to dest=" + dst + "): " + e); 1847 } 1848 } 1849 msgs.clear(); 1850 } 1851 } 1852 1853 private int numMsgs(HashMap map) { 1854 Collection values=map.values(); 1855 List l; 1856 int size=0; 1857 for(Iterator it=values.iterator(); it.hasNext();) { 1858 l=(List)it.next(); 1859 size+=l.size(); 1860 } 1861 return size; 1862 } 1863 } 1864 1865 1866 String dumpMessages(HashMap map) { 1867 StringBuffer sb=new StringBuffer (); 1868 Map.Entry entry; 1869 List l; 1870 Object key; 1871 if(map != null) { 1872 synchronized(map) { 1873 for(Iterator it=map.entrySet().iterator(); it.hasNext();) { 1874 entry=(Map.Entry)it.next(); 1875 key=entry.getKey(); 1876 if(key == null) 1877 key="null"; 1878 l=(List)entry.getValue(); 1879 sb.append(key).append(": "); 1880 sb.append(l.size()).append(" msgs\n"); 1881 } 1882 } 1883 } 1884 return sb.toString(); 1885 } 1886 1887 1888} 1889 | Popular Tags |