| 1 package org.jgroups.protocols; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.stack.LogicalAddress1_4; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Queue; 9 import org.jgroups.util.QueueClosedException; 10 import org.jgroups.util.Util; 11 12 import java.io.*; 13 import java.net.*; 14 import java.util.*; 15 16 38 public class UDP1_4 extends Protocol implements Receiver { 39 40 static final String name="UDP1_4"; 41 42 43 ConnectorTable ct=null; 44 45 46 List bind_addrs=null; 47 48 49 String group_name=null; 50 51 52 InetSocketAddress mcast_addr=null; 53 54 55 LogicalAddress1_4 local_addr=new LogicalAddress1_4(null, null); 56 57 58 LogicalAddress1_4 local_addr_canonical=local_addr.copy(); 59 60 61 ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535); 62 63 67 int local_bind_port=0; 68 int port_range=1; 70 71 75 boolean ip_mcast=true; 76 77 78 int ip_ttl=32; 79 80 81 Vector members=new Vector(); 82 83 87 UdpHeader udp_hdr=null; 88 89 90 int mcast_send_buf_size=300000; 91 92 93 int mcast_recv_buf_size=300000; 94 95 96 int ucast_send_buf_size=300000; 97 98 99 int ucast_recv_buf_size=300000; 100 101 108 boolean loopback=true; 110 116 boolean use_packet_handler=false; 117 118 119 Queue packet_queue=null; 120 121 125 byte[] additional_data=null; 126 127 131 PacketHandler packet_handler=null; 132 133 protected static Log mylog=LogFactory.getLog(UDP1_4.class); 134 135 136 final int VERSION_LENGTH=Version.getLength(); 137 138 141 static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000; 143 144 145 146 150 public UDP1_4() { 151 } 152 153 156 public String toString() { 157 return "Protocol UDP(local address: " + local_addr + ')'; 158 } 159 160 161 public void receive(DatagramPacket packet) { 162 int len=packet.getLength(); 163 byte[] data=packet.getData(); 164 SocketAddress sender=packet.getSocketAddress(); 165 166 if(len == 4) { if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') { 168 handleDiagnosticProbe(sender); 169 return; 170 } 171 } 172 173 if(mylog.isTraceEnabled()) 174 mylog.trace("received " + len + " bytes from " + sender); 175 176 if(Version.compareTo(packet.getData()) == false) { 177 if(mylog.isWarnEnabled()) mylog.warn("packet from " + sender + " has different version (" + 178 Version.printVersionId(data, Version.version_id.length) + 179 ") from ours (" + Version.printVersionId(Version.version_id) + 180 "). This may cause problems"); 181 } 182 183 if(use_packet_handler && packet_queue != null) { 184 byte[] tmp=new byte[len]; 185 System.arraycopy(data, 0, tmp, 0, len); 186 try { 187 Object [] arr=new Object []{tmp, sender}; 188 packet_queue.add(arr); 189 return; 190 } 191 catch(QueueClosedException e) { 192 if(mylog.isWarnEnabled()) mylog.warn("packet queue for packet handler thread is closed"); 193 } 195 } 196 197 handleIncomingUdpPacket(data, sender); 198 } 199 200 201 202 203 268 void handleDiagnosticProbe(SocketAddress sender) { 269 try { 270 byte[] diag_rsp=getDiagResponse().getBytes(); 271 DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender); 272 273 if(mylog.isInfoEnabled()) mylog.info("sending diag response to " + sender); 274 ct.send(rsp); 275 } catch(Throwable t) { 276 if(mylog.isErrorEnabled()) mylog.error("failed sending diag rsp to " + sender + ", exception=" + t); 277 } 278 } 279 280 String getDiagResponse() { 281 StringBuffer sb=new StringBuffer (); 282 sb.append(local_addr).append(" (").append(group_name).append(')'); 283 sb.append(" [").append(mcast_addr).append("]\n"); 284 sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n"); 285 sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n'); 286 sb.append("members: ").append(members).append('\n'); 287 288 return sb.toString(); 289 } 290 291 292 293 294 295 296 297 public String getName() { 298 return name; 299 } 300 301 302 public void init() throws Exception { 303 if(use_packet_handler) { 304 packet_queue=new Queue(); 305 packet_handler=new PacketHandler(); 306 } 307 } 308 309 310 313 public void start() throws Exception { 314 if(mylog.isInfoEnabled()) mylog.info("creating sockets and starting threads"); 315 if(ct == null) { 316 ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this); 317 318 for(Iterator it=bind_addrs.iterator(); it.hasNext();) { 319 String bind_addr=(String )it.next(); 320 ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size, 321 ucast_send_buf_size, ip_ttl, this); 322 } 323 324 List physical_addrs=ct.getConnectorAddresses(); for(Iterator it=physical_addrs.iterator(); it.hasNext();) { 327 SocketAddress address=(SocketAddress)it.next(); 328 local_addr.addPhysicalAddress(address); 329 } 330 331 if(additional_data != null) 332 local_addr.setAdditionalData(additional_data); 333 334 ct.start(); 335 336 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 337 if(use_packet_handler) 338 packet_handler.start(); 339 } 340 } 341 342 343 public void stop() { 344 if(mylog.isInfoEnabled()) mylog.info("closing sockets and stopping threads"); 345 if(packet_handler != null) 346 packet_handler.stop(); 347 if(ct != null) { 348 ct.stop(); 349 ct=null; 350 } 351 local_addr.removeAllPhysicalAddresses(); 352 } 353 354 355 367 public boolean setProperties(Properties props) { 368 String str; 369 List exclude_list=null; 370 String mcast_addr_name="230.8.8.8"; 371 int mcast_port=7500; 372 373 super.setProperties(props); 374 str=props.getProperty("bind_addrs"); 375 if(str != null) { 376 str=str.trim(); 377 if("all".equals(str.toLowerCase())) { 378 try { 379 bind_addrs=determineAllBindInterfaces(); 380 } 381 catch(SocketException e) { 382 e.printStackTrace(); 383 bind_addrs=null; 384 } 385 } 386 else { 387 bind_addrs=Util.parseCommaDelimitedStrings(str); 388 } 389 props.remove("bind_addrs"); 390 } 391 392 str=props.getProperty("bind_addrs_exclude"); 393 if(str != null) { 394 str=str.trim(); 395 exclude_list=Util.parseCommaDelimitedStrings(str); 396 props.remove("bind_addrs_exclude"); 397 } 398 399 str=props.getProperty("bind_port"); 400 if(str != null) { 401 local_bind_port=Integer.parseInt(str); 402 props.remove("bind_port"); 403 } 404 405 str=props.getProperty("start_port"); 406 if(str != null) { 407 local_bind_port=Integer.parseInt(str); 408 props.remove("start_port"); 409 } 410 411 str=props.getProperty("port_range"); 412 if(str != null) { 413 port_range=Integer.parseInt(str); 414 props.remove("port_range"); 415 } 416 417 str=props.getProperty("mcast_addr"); 418 if(str != null) { 419 mcast_addr_name=str; 420 props.remove("mcast_addr"); 421 } 422 423 str=props.getProperty("mcast_port"); 424 if(str != null) { 425 mcast_port=Integer.parseInt(str); 426 props.remove("mcast_port"); 427 } 428 429 str=props.getProperty("ip_mcast"); 430 if(str != null) { 431 ip_mcast=Boolean.valueOf(str).booleanValue(); 432 props.remove("ip_mcast"); 433 } 434 435 str=props.getProperty("ip_ttl"); 436 if(str != null) { 437 ip_ttl=Integer.parseInt(str); 438 props.remove("ip_ttl"); 439 } 440 441 str=props.getProperty("mcast_send_buf_size"); 442 if(str != null) { 443 mcast_send_buf_size=Integer.parseInt(str); 444 props.remove("mcast_send_buf_size"); 445 } 446 447 str=props.getProperty("mcast_recv_buf_size"); 448 if(str != null) { 449 mcast_recv_buf_size=Integer.parseInt(str); 450 props.remove("mcast_recv_buf_size"); 451 } 452 453 str=props.getProperty("ucast_send_buf_size"); 454 if(str != null) { 455 ucast_send_buf_size=Integer.parseInt(str); 456 props.remove("ucast_send_buf_size"); 457 } 458 459 str=props.getProperty("ucast_recv_buf_size"); 460 if(str != null) { 461 ucast_recv_buf_size=Integer.parseInt(str); 462 props.remove("ucast_recv_buf_size"); 463 } 464 465 str=props.getProperty("use_packet_handler"); 466 if(str != null) { 467 use_packet_handler=Boolean.valueOf(str).booleanValue(); 468 props.remove("use_packet_handler"); 469 } 470 471 472 mcast_addr=new InetSocketAddress(mcast_addr_name, mcast_port); 474 475 if(bind_addrs == null) 477 bind_addrs=new ArrayList(); 478 if(bind_addrs.size() == 0) { 479 try { 480 String default_bind_addr=determineDefaultBindInterface(); 481 bind_addrs.add(default_bind_addr); 482 } 483 catch(SocketException ex) { 484 if(mylog.isErrorEnabled()) mylog.error("failed determining the default bind interface: " + ex); 485 } 486 } 487 if(exclude_list != null) { 488 bind_addrs.removeAll(exclude_list); 489 } 490 if(bind_addrs.size() == 0) { 491 if(mylog.isErrorEnabled()) mylog.error("no valid bind interface found, unable to listen for network traffic"); 492 return false; 493 } 494 else { 495 496 if(mylog.isInfoEnabled()) mylog.info("bind interfaces are " + bind_addrs); 497 } 498 499 if(props.size() > 0) { 500 System.err.println("UDP1_4.setProperties(): the following properties are not recognized:"); 501 props.list(System.out); 502 return false; 503 } 504 return true; 505 } 506 507 508 512 public void startUpHandler() { 513 ; 514 } 515 516 521 public void up(Event evt) { 522 passUp(evt); 523 524 switch(evt.getType()) { 525 526 case Event.CONFIG: 527 passUp(evt); 528 if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg()); 529 handleConfigEvent((HashMap)evt.getArg()); 530 return; 531 } 532 533 passUp(evt); 534 } 535 536 542 public void down(Event evt) { 543 Message msg; 544 Object dest_addr; 545 546 if(evt.getType() != Event.MSG) { handleDownEvent(evt); 548 return; 549 } 550 551 msg=(Message)evt.getArg(); 552 553 if(udp_hdr != null && udp_hdr.channel_name != null) { 554 msg.putHeader(name, udp_hdr); 556 } 557 558 dest_addr=msg.getDest(); 559 560 if(observer != null) 563 observer.passDown(evt); 564 565 if(dest_addr == null) { if(ip_mcast == false) { 567 sendMultipleUdpMessages(msg, members); 569 return; 570 } 571 } 572 573 try { 574 sendUdpMessage(msg); } 576 catch(Exception e) { 577 if(mylog.isErrorEnabled()) mylog.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr); 578 } 579 } 580 581 582 583 584 585 586 587 588 589 590 591 592 void handleMessage(Message msg) { 593 594 } 595 596 597 601 void handleIncomingUdpPacket(byte[] data, SocketAddress sender) { 602 ByteArrayInputStream inp_stream; 603 ObjectInputStream inp; 604 Message msg=null; 605 UdpHeader hdr=null; 606 Event evt; 607 Address dst, src; 608 609 try { 610 inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH); 612 inp=new ObjectInputStream(inp_stream); 613 msg=new Message(); 614 msg.readExternal(inp); 615 dst=msg.getDest(); 616 src=msg.getSrc(); 617 if(src == null) { 618 if(mylog.isErrorEnabled()) mylog.error("sender's address is null"); 619 } 620 else { 621 ((LogicalAddress1_4)src).setPrimaryPhysicalAddress(sender); 622 } 623 624 if((dst == null || dst.isMulticastAddress()) && src != null && local_addr.equals(src)) { 626 if(mylog.isTraceEnabled()) 627 mylog.trace("discarded own loopback multicast packet"); 628 629 631 return; 632 } 633 634 evt=new Event(Event.MSG, msg); 635 if(mylog.isTraceEnabled()) 636 mylog.trace("Message is " + msg + ", headers are " + msg.getHeaders()); 637 638 640 if(observer != null) 641 observer.up(evt, up_queue.size()); 642 643 hdr=(UdpHeader)msg.removeHeader(name); 644 } catch(Throwable e) { 645 if(mylog.isErrorEnabled()) mylog.error("exception=" + Util.getStackTrace(e)); 646 return; 647 } 648 649 if(hdr != null) { 650 651 652 String ch_name=null; 653 654 if(hdr.channel_name != null) 655 ch_name=hdr.channel_name; 656 657 if(ch_name != null && group_name != null && !group_name.equals(ch_name) && 660 !ch_name.equals(Util.DIAG_GROUP)) { 661 662 if(mylog.isWarnEnabled()) mylog.warn("discarded message from different group (" + 663 ch_name + "). Sender was " + msg.getSrc()); 664 return; 665 } 666 } 667 668 passUp(evt); 669 } 670 671 672 675 void sendUdpMessage(Message msg) throws Exception { 676 Address dest, src; 677 ObjectOutputStream out; 678 byte buf[]; 679 DatagramPacket packet; 680 Message copy; 681 Event evt; 683 dest=msg.getDest(); SRC=msg.getSrc(); 685 if(src == null) { 686 src=local_addr_canonical; msg.setSrc(src); 688 } 689 690 if(mylog.isTraceEnabled()) 691 mylog.trace("sending message to " + msg.getDest() + 692 " (src=" + msg.getSrc() + "), headers are " + msg.getHeaders()); 693 694 if(dest == null || dest.isMulticastAddress() || dest.equals(local_addr)) { 698 copy=msg.copy(); 699 copy.removeHeader(name); 700 evt=new Event(Event.MSG, copy); 701 702 704 if(observer != null) 705 observer.up(evt, up_queue.size()); 706 if(mylog.isTraceEnabled()) mylog.trace("looped back local message " + copy); 707 708 passUp(evt); 710 712 if(dest != null && !dest.isMulticastAddress()) 713 return; } 715 716 out_stream.reset(); 717 out_stream.write(Version.version_id, 0, Version.version_id.length); out=new ObjectOutputStream(out_stream); 719 msg.writeExternal(out); 720 out.flush(); buf=out_stream.toByteArray(); 722 packet=new DatagramPacket(buf, buf.length, mcast_addr); 723 724 727 728 ct.send(packet); 730 } 732 733 734 void sendMultipleUdpMessages(Message msg, Vector dests) { 735 Address dest; 736 737 for(int i=0; i < dests.size(); i++) { 738 dest=(Address)dests.elementAt(i); 739 msg.setDest(dest); 740 741 try { 742 sendUdpMessage(msg); 743 } 744 catch(Exception e) { 745 if(mylog.isDebugEnabled()) mylog.debug("exception=" + e); 746 } 747 } 748 } 749 750 751 752 753 754 790 791 792 793 794 void handleDownEvent(Event evt) { 795 switch(evt.getType()) { 796 797 case Event.TMP_VIEW: 798 case Event.VIEW_CHANGE: 799 synchronized(members) { 800 members.removeAllElements(); 801 Vector tmpvec=((View)evt.getArg()).getMembers(); 802 for(int i=0; i < tmpvec.size(); i++) 803 members.addElement(tmpvec.elementAt(i)); 804 } 805 break; 806 807 case Event.GET_LOCAL_ADDRESS: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 809 break; 810 811 case Event.CONNECT: 812 group_name=(String )evt.getArg(); 813 udp_hdr=new UdpHeader(group_name); 814 815 passUp(new Event(Event.CONNECT_OK)); 819 break; 820 821 case Event.DISCONNECT: 822 passUp(new Event(Event.DISCONNECT_OK)); 823 break; 824 825 case Event.CONFIG: 826 if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg()); 827 handleConfigEvent((HashMap)evt.getArg()); 828 break; 829 } 830 } 831 832 833 void handleConfigEvent(HashMap map) { 834 if(map == null) return; 835 if(map.containsKey("additional_data")) 836 additional_data=(byte[])map.get("additional_data"); 837 if(map.containsKey("send_buf_size")) { 838 mcast_send_buf_size=((Integer )map.get("send_buf_size")).intValue(); 839 ucast_send_buf_size=mcast_send_buf_size; 840 } 841 if(map.containsKey("recv_buf_size")) { 842 mcast_recv_buf_size=((Integer )map.get("recv_buf_size")).intValue(); 843 ucast_recv_buf_size=mcast_recv_buf_size; 844 } 845 } 846 847 848 849 public String determineDefaultBindInterface() throws SocketException { 850 for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { 851 NetworkInterface ni=(NetworkInterface)en.nextElement(); 852 for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) { 853 InetAddress bind_addr=(InetAddress)en2.nextElement(); 854 if(!bind_addr.isLoopbackAddress()) { 855 return bind_addr.getHostAddress(); 856 } 857 } 858 } 859 return null; 860 } 861 862 public List determineAllBindInterfaces() throws SocketException { 863 List ret=new ArrayList(); 864 for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { 865 NetworkInterface ni=(NetworkInterface)en.nextElement(); 866 for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) { 867 InetAddress bind_addr=(InetAddress)en2.nextElement(); 868 ret.add(bind_addr.getHostAddress()); 869 } 870 } 871 872 return ret; 873 } 874 875 876 877 878 879 880 881 882 886 class PacketHandler implements Runnable { 887 Thread t=null; 888 889 public void run() { 890 byte[] data; 891 SocketAddress sender; 892 893 while(packet_queue != null && packet_handler != null) { 894 try { 895 Object [] arr=(Object [])packet_queue.remove(); 896 data=(byte[])arr[0]; 897 sender=(SocketAddress)arr[1]; 898 } catch(QueueClosedException closed_ex) { 899 if(mylog.isInfoEnabled()) mylog.info("packet_handler thread terminating"); 900 break; 901 } 902 handleIncomingUdpPacket(data, sender); 903 data=null; } 905 } 906 907 void start() { 908 if(t == null) { 909 t=new Thread (this, "UDP1_4.PacketHandler thread"); 910 t.setDaemon(true); 911 t.start(); 912 |