1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.IpAddress; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Promise; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Util; 11 import org.jgroups.util.Streamable; 12 13 import java.io.*; 14 import java.net.ServerSocket ; 15 import java.net.Socket ; 16 import java.net.InetAddress ; 17 import java.net.UnknownHostException ; 18 import java.util.*; 19 20 21 38 public class FD_SOCK extends Protocol implements Runnable { 39 long get_cache_timeout=3000; final long get_cache_retry_timeout=500; long suspect_msg_interval=5000; int num_tries=3; final Vector members=new Vector(11); boolean srv_sock_sent=false; final Vector pingable_mbrs=new Vector(11); final Promise get_cache_promise=new Promise(); boolean got_cache_from_coord=false; Address local_addr=null; ServerSocket srv_sock=null; InetAddress srv_sock_bind_addr=null; ServerSocketHandler srv_sock_handler=null; IpAddress srv_sock_addr=null; Address ping_dest=null; Socket ping_sock=null; InputStream ping_input=null; Thread pinger_thread=null; final Hashtable cache=new Hashtable(11); 59 61 int start_port=0; 62 final Promise ping_addr_promise=new Promise(); final Object sock_mutex=new Object (); TimeScheduler timer=null; 65 final BroadcastTask bcast_task=new BroadcastTask(); boolean regular_sock_close=false; private static final int NORMAL_TEMINATION=9; 68 private static final int ABNORMAL_TEMINATION=-1; 69 private static final String name="FD_SOCK"; 70 71 72 public String getName() { 73 return name; 74 } 75 76 77 public boolean setProperties(Properties props) { 78 String str, tmp=null; 79 80 super.setProperties(props); 81 str=props.getProperty("get_cache_timeout"); 82 if(str != null) { 83 get_cache_timeout=Long.parseLong(str); 84 props.remove("get_cache_timeout"); 85 } 86 87 str=props.getProperty("suspect_msg_interval"); 88 if(str != null) { 89 suspect_msg_interval=Long.parseLong(str); 90 props.remove("suspect_msg_interval"); 91 } 92 93 str=props.getProperty("num_tries"); 94 if(str != null) { 95 num_tries=Integer.parseInt(str); 96 props.remove("num_tries"); 97 } 98 99 str=props.getProperty("start_port"); 100 if(str != null) { 101 start_port=Integer.parseInt(str); 102 props.remove("start_port"); 103 } 104 105 106 try {tmp=System.getProperty("bind.address");} catch (SecurityException ex){} 108 if(tmp != null) 109 str=tmp; 110 else 111 str=props.getProperty("srv_sock_bind_addr"); 112 if(str != null) { 113 try { 114 srv_sock_bind_addr=InetAddress.getByName(str); 115 } 116 catch(UnknownHostException e) { 117 log.error("srv_sock_bind_addr " + str + " is invalid", e); 118 return false; 119 } 120 props.remove("srv_sock_bind_addr"); 121 } 122 123 if(props.size() > 0) { 124 System.err.println("FD_SOCK.setProperties(): the following properties are not recognized:"); 125 props.list(System.out); 126 return false; 127 } 128 return true; 129 } 130 131 132 public void init() throws Exception { 133 srv_sock_handler=new ServerSocketHandler(); 134 timer=stack != null ? stack.timer : null; 135 if(timer == null) 136 throw new Exception ("FD_SOCK.init(): timer == null"); 137 } 138 139 140 public void stop() { 141 bcast_task.removeAll(); 142 stopPingerThread(); 143 stopServerSocket(); 144 } 145 146 147 public void up(Event evt) { 148 Message msg; 149 FdHeader hdr; 150 151 switch(evt.getType()) { 152 153 case Event.SET_LOCAL_ADDRESS: 154 local_addr=(Address) evt.getArg(); 155 break; 156 157 case Event.MSG: 158 msg=(Message) evt.getArg(); 159 hdr=(FdHeader) msg.removeHeader(name); 160 if(hdr == null || !(hdr instanceof FdHeader)) 161 break; 163 switch(hdr.type) { 164 165 case FdHeader.SUSPECT: 166 if(hdr.mbrs != null) { 167 if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr); 168 for(int i=0; i < hdr.mbrs.size(); i++) { 169 passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i))); 170 passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i))); 171 } 172 } 173 else 174 if(log.isWarnEnabled()) log.warn("[SUSPECT]: hdr.mbrs == null"); 175 break; 176 177 case FdHeader.WHO_HAS_SOCK: 179 if(local_addr != null && local_addr.equals(msg.getSrc())) 180 return; 182 if(hdr.mbr == null) { 183 if(log.isErrorEnabled()) log.error("hdr.mbr is null"); 184 return; 185 } 186 187 if(log.isTraceEnabled()) log.trace("who-has-sock " + hdr.mbr); 188 189 if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) { 191 sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr); return; 193 } 194 195 if(cache.containsKey(hdr.mbr)) 197 sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr)); break; 199 200 201 case FdHeader.I_HAVE_SOCK: 203 if(hdr.mbr == null || hdr.sock_addr == null) { 204 if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null"); 205 return; 206 } 207 208 cache.put(hdr.mbr, hdr.sock_addr); if(log.isTraceEnabled()) log.trace("i-have-sock: " + hdr.mbr + " --> " + 211 hdr.sock_addr + " (cache is " + cache + ')'); 212 213 if(ping_dest != null && hdr.mbr.equals(ping_dest)) 214 ping_addr_promise.setResult(hdr.sock_addr); 215 break; 216 217 case FdHeader.GET_CACHE: 219 if(hdr.mbr == null) { 220 if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null"); 221 return; 222 } 223 hdr=new FdHeader(FdHeader.GET_CACHE_RSP); 224 hdr.cachedAddrs=(Hashtable) cache.clone(); 225 msg=new Message(hdr.mbr, null, null); 226 msg.putHeader(name, hdr); 227 passDown(new Event(Event.MSG, msg)); 228 break; 229 230 case FdHeader.GET_CACHE_RSP: 231 if(hdr.cachedAddrs == null) { 232 if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null"); 233 return; 234 } 235 get_cache_promise.setResult(hdr.cachedAddrs); 236 break; 237 } 238 return; 239 } 240 241 passUp(evt); } 243 244 245 public void down(Event evt) { 246 Address mbr, tmp_ping_dest; 247 View v; 248 249 switch(evt.getType()) { 250 251 case Event.UNSUSPECT: 252 bcast_task.removeSuspectedMember((Address)evt.getArg()); 253 break; 254 255 case Event.CONNECT: 256 passDown(evt); 257 srv_sock=Util.createServerSocket(srv_sock_bind_addr, start_port); srv_sock_addr=new IpAddress(srv_sock_bind_addr, srv_sock.getLocalPort()); 259 startServerSocket(); 260 break; 263 264 case Event.VIEW_CHANGE: 265 synchronized(this) { 266 v=(View) evt.getArg(); 267 members.removeAllElements(); 268 members.addAll(v.getMembers()); 269 bcast_task.adjustSuspectedMembers(members); 270 pingable_mbrs.removeAllElements(); 271 pingable_mbrs.addAll(members); 272 passDown(evt); 273 274 if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members); 275 276 if(!got_cache_from_coord) { 278 getCacheFromCoordinator(); 279 got_cache_from_coord=true; 280 } 281 282 283 if(!srv_sock_sent) { 285 if(srv_sock_addr != null) { 286 sendIHaveSockMessage(null, local_addr, 288 srv_sock_addr); 289 srv_sock_sent=true; 290 } 291 else 292 if(log.isWarnEnabled()) log.warn("(VIEW_CHANGE): srv_sock_addr == null"); 293 } 294 295 for(Enumeration e=cache.keys(); e.hasMoreElements();) { 297 mbr=(Address) e.nextElement(); 298 if(!members.contains(mbr)) 299 cache.remove(mbr); 300 } 301 302 if(members.size() > 1) { 303 if(pinger_thread != null && pinger_thread.isAlive()) { 304 tmp_ping_dest=determinePingDest(); 305 if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) { 306 interruptPingerThread(); } 308 } 309 else 310 startPingerThread(); } 312 else { 313 ping_dest=null; 314 stopPingerThread(); 315 } 316 } 317 break; 318 319 default: 320 passDown(evt); 321 break; 322 } 323 } 324 325 326 333 public void run() { 334 Address tmp_ping_dest; 335 IpAddress ping_addr; 336 int max_fetch_tries=10; 338 if(log.isTraceEnabled()) log.trace("pinger_thread started"); 340 while(pinger_thread != null) { 341 tmp_ping_dest=determinePingDest(); if(log.isDebugEnabled()) 343 log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs); 344 if(tmp_ping_dest == null) { 345 ping_dest=null; 346 pinger_thread=null; 347 break; 348 } 349 ping_dest=tmp_ping_dest; 350 ping_addr=fetchPingAddress(ping_dest); 351 if(ping_addr == null) { 352 if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying"); 353 if(--max_fetch_tries <= 0) 354 break; 355 Util.sleep(2000); 356 continue; 357 } 358 359 if(!setupPingSocket(ping_addr)) { 360 if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest); 362 broadcastSuspectMessage(ping_dest); 363 pingable_mbrs.removeElement(ping_dest); 364 continue; 365 } 366 367 if(log.isDebugEnabled()) log.debug("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache); 368 369 try { 371 if(ping_input != null) { 372 int c=ping_input.read(); 373 switch(c) { 374 case NORMAL_TEMINATION: 375 if(log.isDebugEnabled()) 376 log.debug("peer closed socket normally"); 377 pinger_thread=null; 378 break; 379 case ABNORMAL_TEMINATION: 380 handleSocketClose(null); 381 break; 382 default: 383 break; 384 } 385 } 386 } 387 catch(IOException ex) { handleSocketClose(ex); 389 } 390 catch(Throwable catch_all_the_rest) { 391 log.error("exception", catch_all_the_rest); 392 } 393 } 394 if(log.isDebugEnabled()) log.debug("pinger thread terminated"); 395 pinger_thread=null; 396 } 397 398 399 400 401 402 403 404 void handleSocketClose(Exception ex) { 405 teardownPingSocket(); if(!regular_sock_close) { if(log.isDebugEnabled()) 408 log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')'); 409 broadcastSuspectMessage(ping_dest); 410 pingable_mbrs.removeElement(ping_dest); 411 } 412 else { 413 if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset"); 414 regular_sock_close=false; 415 } 416 } 417 418 419 void startPingerThread() { 420 if(pinger_thread == null) { 421 pinger_thread=new Thread (this, "FD_SOCK Ping thread"); 422 pinger_thread.setDaemon(true); 423 pinger_thread.start(); 424 } 425 } 426 427 428 void stopPingerThread() { 429 if(pinger_thread != null && pinger_thread.isAlive()) { 430 regular_sock_close=true; 431 teardownPingSocket(); 432 } 433 pinger_thread=null; 434 } 435 436 437 445 void interruptPingerThread() { 446 if(pinger_thread != null && pinger_thread.isAlive()) { 447 regular_sock_close=true; 448 teardownPingSocket(); } 450 } 451 452 void startServerSocket() { 453 if(srv_sock_handler != null) 454 srv_sock_handler.start(); } 456 457 void stopServerSocket() { 458 if(srv_sock_handler != null) 459 srv_sock_handler.stop(); 460 } 461 462 463 466 boolean setupPingSocket(IpAddress dest) { 467 synchronized(sock_mutex) { 468 if(dest == null) { 469 if(log.isErrorEnabled()) log.error("destination address is null"); 470 return false; 471 } 472 try { 473 ping_sock=new Socket (dest.getIpAddress(), dest.getPort()); 474 ping_sock.setSoLinger(true, 1); 475 ping_input=ping_sock.getInputStream(); 476 return true; 477 } 478 catch(Throwable ex) { 479 return false; 480 } 481 } 482 } 483 484 485 void teardownPingSocket() { 486 synchronized(sock_mutex) { 487 if(ping_sock != null) { 488 try { 489 ping_sock.shutdownInput(); 490 ping_sock.close(); 491 } 492 catch(Exception ex) { 493 } 494 ping_sock=null; 495 } 496 if(ping_input != null) { 497 try { 498 ping_input.close(); 499 } 500 catch(Exception ex) { 501 } 502 ping_input=null; 503 } 504 } 505 } 506 507 508 512 void getCacheFromCoordinator() { 513 Address coord; 514 int attempts=num_tries; 515 Message msg; 516 FdHeader hdr; 517 Hashtable result; 518 519 get_cache_promise.reset(); 520 while(attempts > 0) { 521 if((coord=determineCoordinator()) != null) { 522 if(coord.equals(local_addr)) { if(log.isDebugEnabled()) log.debug("first member; cache is empty"); 524 return; 525 } 526 hdr=new FdHeader(FdHeader.GET_CACHE); 527 hdr.mbr=local_addr; 528 msg=new Message(coord, null, null); 529 msg.putHeader(name, hdr); 530 passDown(new Event(Event.MSG, msg)); 531 result=(Hashtable) get_cache_promise.getResult(get_cache_timeout); 532 if(result != null) { 533 cache.putAll(result); if(log.isTraceEnabled()) log.trace("got cache from " + coord + ": cache is " + cache); 535 return; 536 } 537 else { 538 if(log.isErrorEnabled()) log.error("received null cache; retrying"); 539 } 540 } 541 542 Util.sleep(get_cache_retry_timeout); 543 --attempts; 544 } 545 } 546 547 548 557 void broadcastSuspectMessage(Address suspected_mbr) { 558 Message suspect_msg; 559 FdHeader hdr; 560 561 if(suspected_mbr == null) return; 562 563 if(log.isDebugEnabled()) log.debug("suspecting " + suspected_mbr + 564 " (own address is " + local_addr + ')'); 565 566 hdr=new FdHeader(FdHeader.SUSPECT); 568 hdr.mbrs=new Vector(1); 569 hdr.mbrs.addElement(suspected_mbr); 570 suspect_msg=new Message(); 571 suspect_msg.putHeader(name, hdr); 572 passDown(new Event(Event.MSG, suspect_msg)); 573 574 bcast_task.addSuspectedMember(suspected_mbr); 577 } 578 579 580 void broadcastWhoHasSockMessage(Address mbr) { 581 Message msg; 582 FdHeader hdr; 583 584 if(local_addr != null && mbr != null) 585 if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: who-has " + mbr); 586 587 msg=new Message(); hdr=new FdHeader(FdHeader.WHO_HAS_SOCK); 589 hdr.mbr=mbr; 590 msg.putHeader(name, hdr); 591 passDown(new Event(Event.MSG, msg)); 592 } 593 594 595 599 void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) { 600 Message msg=new Message(dst, null, null); 601 FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK); 602 hdr.mbr=mbr; 603 hdr.sock_addr=addr; 604 msg.putHeader(name, hdr); 605 606 if(log.isTraceEnabled()) log.trace("hdr=" + hdr); 608 609 passDown(new Event(Event.MSG, msg)); 610 } 611 612 613 617 IpAddress fetchPingAddress(Address mbr) { 618 IpAddress ret=null; 619 Message ping_addr_req; 620 FdHeader hdr; 621 622 if(mbr == null) { 623 if(log.isErrorEnabled()) log.error("mbr == null"); 624 return null; 625 } 626 ret=(IpAddress)cache.get(mbr); 629 if(ret != null) { 630 return ret; 631 } 632 633 Util.sleep(300); 634 if((ret=(IpAddress)cache.get(mbr)) != null) 635 return ret; 636 637 638 ping_addr_promise.reset(); 640 ping_addr_req=new Message(mbr, null, null); hdr=new FdHeader(FdHeader.WHO_HAS_SOCK); 642 hdr.mbr=mbr; 643 ping_addr_req.putHeader(name, hdr); 644 passDown(new Event(Event.MSG, ping_addr_req)); 645 ret=(IpAddress) ping_addr_promise.getResult(3000); 646 if(ret != null) { 647 return ret; 648 } 649 650 651 ping_addr_req=new Message(null, null, null); hdr=new FdHeader(FdHeader.WHO_HAS_SOCK); 654 hdr.mbr=mbr; 655 ping_addr_req.putHeader(name, hdr); 656 passDown(new Event(Event.MSG, ping_addr_req)); 657 ret=(IpAddress) ping_addr_promise.getResult(3000); 658 return ret; 659 } 660 661 662 Address determinePingDest() { 663 Address tmp; 664 665 if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null) 666 return null; 667 for(int i=0; i < pingable_mbrs.size(); i++) { 668 tmp=(Address) pingable_mbrs.elementAt(i); 669 if(local_addr.equals(tmp)) { 670 if(i + 1 >= pingable_mbrs.size()) 671 return (Address) pingable_mbrs.elementAt(0); 672 else 673 return (Address) pingable_mbrs.elementAt(i + 1); 674 } 675 } 676 return null; 677 } 678 679 680 Address determineCoordinator() { 681 return members.size() > 0 ? (Address) members.elementAt(0) : null; 682 } 683 684 685 686 687 688 689 690 691 public static class FdHeader extends Header implements Streamable { 692 static final byte SUSPECT=10; 693 static final byte WHO_HAS_SOCK=11; 694 static final byte I_HAVE_SOCK=12; 695 static final byte GET_CACHE=13; static final byte GET_CACHE_RSP=14; 698 699 byte type=SUSPECT; 700 Address mbr=null; IpAddress sock_addr; 703 Hashtable cachedAddrs=null; Vector mbrs=null; 707 708 public FdHeader() { 709 } 711 public FdHeader(byte type) { 712 this.type=type; 713 } 714 715 716 public String toString() { 717 StringBuffer sb=new StringBuffer (); 718 sb.append(type2String(type)); 719 if(mbr != null) 720 sb.append(", mbr=" + mbr); 721 if(sock_addr != null) 722 sb.append(", sock_addr=" + sock_addr); 723 if(cachedAddrs != null) 724 sb.append(", cache=" + cachedAddrs); 725 if(mbrs != null) 726 sb.append(", mbrs=" + mbrs); 727 return sb.toString(); 728 } 729 730 731 public static String type2String(byte type) { 732 switch(type) { 733 case SUSPECT: 734 return "SUSPECT"; 735 case WHO_HAS_SOCK: 736 return "WHO_HAS_SOCK"; 737 case I_HAVE_SOCK: 738 return "I_HAVE_SOCK"; 739 case GET_CACHE: 740 return "GET_CACHE"; 741 case GET_CACHE_RSP: 742 return "GET_CACHE_RSP"; 743 default: 744 return "unknown type (" + type + ')'; 745 } 746 } 747 748 public void writeExternal(ObjectOutput out) throws IOException { 749 out.writeByte(type); 750 out.writeObject(mbr); 751 out.writeObject(sock_addr); 752 out.writeObject(cachedAddrs); 753 out.writeObject(mbrs); 754 } 755 756 757 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 758 type=in.readByte(); 759 mbr=(Address) in.readObject(); 760 sock_addr=(IpAddress) in.readObject(); 761 cachedAddrs=(Hashtable) in.readObject(); 762 mbrs=(Vector) in.readObject(); 763 } 764 765 public void writeTo(DataOutputStream out) throws IOException { 766 int size; 767 out.writeByte(type); 768 Util.writeAddress(mbr, out); 769 Util.writeStreamable(sock_addr, out); 770 size=cachedAddrs != null? cachedAddrs.size() : 0; 771 out.writeInt(size); 772 if(size > 0) { 773 for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) { 774 Map.Entry entry=(Map.Entry)it.next(); 775 Address key=(Address)entry.getKey(); 776 IpAddress val=(IpAddress)entry.getValue(); 777 Util.writeAddress(key, out); 778 Util.writeStreamable(val, out); 779 } 780 } 781 size=mbrs != null? mbrs.size() : 0; 782 out.writeInt(size); 783 if(size > 0) { 784 for(Iterator it=mbrs.iterator(); it.hasNext();) { 785 Address address=(Address)it.next(); 786 Util.writeAddress(address, out); 787 } 788 } 789 } 790 791 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 792 int size; 793 type=in.readByte(); 794 mbr=Util.readAddress(in); 795 sock_addr=(IpAddress)Util.readStreamable(IpAddress.class, in); 796 size=in.readInt(); 797 if(size > 0) { 798 if(cachedAddrs == null) 799 cachedAddrs=new Hashtable(); 800 for(int i=0; i < size; i++) { 801 Address key=Util.readAddress(in); 802 IpAddress val=(IpAddress)Util.readStreamable(IpAddress.class, in); 803 cachedAddrs.put(key, val); 804 } 805 } 806 size=in.readInt(); 807 if(size > 0) { 808 if(mbrs == null) 809 mbrs=new Vector(); 810 for(int i=0; i < size; i++) { 811 Address addr=Util.readAddress(in); 812 mbrs.add(addr); 813 } 814 } 815 } 816 817 } 818 819 820 827 private class ServerSocketHandler implements Runnable { 828 Thread acceptor=null; 829 830 final List clients=new ArrayList(); 831 832 833 834 ServerSocketHandler() { 835 start(); 836 } 837 838 void start() { 839 if(acceptor == null) { 840 acceptor=new Thread (this, "ServerSocket acceptor thread"); 841 acceptor.setDaemon(true); 842 acceptor.start(); 843 } 844 } 845 846 847 void stop() { 848 if(acceptor != null && acceptor.isAlive()) { 849 try { 850 srv_sock.close(); } 852 catch(Exception ex) { 853 } 854 } 855 synchronized(clients) { 856 for(Iterator it=clients.iterator(); it.hasNext();) { 857 ClientConnectionHandler handler=(ClientConnectionHandler)it.next(); 858 handler.stopThread(); 859 } 860 clients.clear(); 861 } 862 acceptor=null; 863 } 864 865 866 867 public void run() { 868 Socket client_sock=null; 869 while(acceptor != null && srv_sock != null) { 870 try { 871 if(log.isTraceEnabled()) log.trace("waiting for client connections on " + srv_sock.getInetAddress() + ":" + 873 srv_sock.getLocalPort()); 874 client_sock=srv_sock.accept(); 875 if(log.isTraceEnabled()) log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort()); 877 ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients); 878 synchronized(clients) { 879 clients.add(client_conn_handler); 880 } 881 client_conn_handler.start(); 882 } 883 catch(IOException io_ex2) { 884 break; 885 } 886 } 887 acceptor=null; 888 } 889 } 890 891 892 893 894 private static class ClientConnectionHandler extends Thread { 895 Socket client_sock=null; 896 InputStream in; 897 final Object mutex=new Object (); 898 List clients=null; 899 900 ClientConnectionHandler(Socket client_sock, List clients) { 901 setName("ClientConnectionHandler"); 902 setDaemon(true); 903 this.client_sock=client_sock; 904 this.clients=clients; 905 } 906 907 void stopThread() { 908 synchronized(mutex) { 909 if(client_sock != null) { 910 try { 911 OutputStream out=client_sock.getOutputStream(); 912 out.write(NORMAL_TEMINATION); 913 } 914 catch(Throwable t) { 915 } 916 } 917 } 918 closeClientSocket(); 919 } 920 921 void closeClientSocket() { 922 synchronized(mutex) { 923 if(client_sock != null) { 924 try { 925 client_sock.close(); 926 } 927 catch(Exception ex) { 928 } 929 client_sock=null; 930 } 931 } 932 } 933 934 public void run() { 935 try { 936 synchronized(mutex) { 937 if(client_sock == null) 938 return; 939 in=client_sock.getInputStream(); 940 } 941 while((in.read()) != -1) { 942 } 943 } 944 catch(IOException io_ex1) { 945 } 946 finally { 947 closeClientSocket(); 948 synchronized(clients) { 949 clients.remove(this); 950 } 951 } 952 } 953 } 954 955 956 962 private class BroadcastTask implements TimeScheduler.Task { 963 final Vector suspected_mbrs=new Vector(7); 964 boolean stopped=false; 965 966 967 968 public void addSuspectedMember(Address mbr) { 969 if(mbr == null) return; 970 if(!members.contains(mbr)) return; 971 synchronized(suspected_mbrs) { 972 if(!suspected_mbrs.contains(mbr)) { 973 suspected_mbrs.addElement(mbr); 974 if(log.isDebugEnabled()) log.debug("mbr=" + mbr + " (size=" + suspected_mbrs.size() + ')'); 975 } 976 if(stopped && suspected_mbrs.size() > 0) { 977 stopped=false; 978 timer.add(this, true); 979 } 980 } 981 } 982 983 984 public void removeSuspectedMember(Address suspected_mbr) { 985 if(suspected_mbr == null) return; 986 if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr); 987 synchronized(suspected_mbrs) { 988 suspected_mbrs.removeElement(suspected_mbr); 989 if(suspected_mbrs.size() == 0) 990 stopped=true; 991 } 992 } 993 994 995 public void removeAll() { 996 synchronized(suspected_mbrs) { 997 suspected_mbrs.removeAllElements(); 998 stopped=true; 999 } 1000 } 1001 1002 1003 1006 public void adjustSuspectedMembers(Vector new_mbrship) { 1007 Address suspected_mbr; 1008 1009 if(new_mbrship == null || new_mbrship.size() == 0) return; 1010 synchronized(suspected_mbrs) { 1011 for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) { 1012 suspected_mbr=(Address) it.next(); 1013 if(!new_mbrship.contains(suspected_mbr)) { 1014 it.remove(); 1015 if(log.isDebugEnabled()) 1016 log.debug("removed " + suspected_mbr + " (size=" + suspected_mbrs.size() + ')'); 1017 } 1018 } 1019 if(suspected_mbrs.size() == 0) 1020 stopped=true; 1021 } 1022 } 1023 1024 1025 public boolean cancelled() { 1026 return stopped; 1027 } 1028 1029 1030 public long nextInterval() { 1031 return suspect_msg_interval; 1032 } 1033 1034 1035 public void run() { 1036 Message suspect_msg; 1037 FdHeader hdr; 1038 1039 if(log.isDebugEnabled()) 1040 log.debug("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group"); 1041 1042 synchronized(suspected_mbrs) { 1043 if(suspected_mbrs.size() == 0) { 1044 stopped=true; 1045 if(log.isDebugEnabled()) log.debug("task done (no suspected members)"); 1046 return; 1047 } 1048 1049 hdr=new FdHeader(FdHeader.SUSPECT); 1050 hdr.mbrs=(Vector) suspected_mbrs.clone(); 1051 } 1052 suspect_msg=new Message(); suspect_msg.putHeader(name, hdr); 1054 passDown(new Event(Event.MSG, suspect_msg)); 1055 if(log.isDebugEnabled()) log.debug("task done"); 1056 } 1057 } 1058 1059 1060} 1061 | Popular Tags |