1 3 package org.jgroups.stack; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.Address; 8 import org.jgroups.util.Promise; 9 import org.jgroups.util.Util; 10 11 import java.io.*; 12 import java.net.InetAddress ; 13 import java.net.ServerSocket ; 14 import java.net.Socket ; 15 import java.util.*; 16 17 44 public class GossipRouter { 45 46 public static final int GET = -10; 47 public static final int REGISTER = -11; 48 public static final int DUMP = -21; 49 public static final int SHUTDOWN = -1; 50 public static final int SHUTDOWN_OK = -2; 51 52 public static final int PORT = 8980; 53 public static final long EXPIRY_TIME = 30000; 54 public static final long GOSSIP_REQUEST_TIMEOUT = 1000; 55 public static final long ROUTING_CLIENT_REPLY_TIMEOUT = 120000; 56 57 private final int MARK_BUFFER_SIZE = 2048; 59 60 private static final Object GOSSIP_REQUEST = new Object (); 61 private static final Object GOSSIP_FAILURE = new Object (); 62 63 private int port; 64 private String bindAddressString; 65 66 private long expiryTime; 68 69 private long gossipRequestTimeout; 74 75 private long routingClientReplyTimeout; 79 80 private final Hashtable routingTable=new Hashtable(); 82 83 private final Map gossipTable = new HashMap(); 85 86 private ServerSocket srvSock = null; 87 private InetAddress bindAddress = null; 88 89 Timer timer = null; 91 92 protected final Log log=LogFactory.getLog(this.getClass()); 93 94 98 public GossipRouter() { 99 this(PORT); 100 } 101 102 public GossipRouter(int port) { 103 this(port, null); 104 } 105 106 public GossipRouter(int port, String bindAddressString) { 107 this(port, bindAddressString, EXPIRY_TIME); 108 } 109 110 public GossipRouter(int port, String bindAddressString, 111 long expiryTime) { 112 this(port, bindAddressString, expiryTime, 113 GOSSIP_REQUEST_TIMEOUT, 114 ROUTING_CLIENT_REPLY_TIMEOUT); 115 } 116 117 public GossipRouter(int port, String bindAddressString, 118 long expiryTime, long gossipRequestTimeout, 119 long routingClientReplyTimeout) { 120 this.port=port; 121 this.bindAddressString=bindAddressString; 122 this.expiryTime = expiryTime; 123 this.gossipRequestTimeout = gossipRequestTimeout; 124 this.routingClientReplyTimeout = routingClientReplyTimeout; 125 } 126 127 128 132 public void setPort(int port) { 133 this.port = port; 134 } 135 136 public int getPort() { 137 return port; 138 } 139 140 public void setBindAddress(String bindAddress) { 141 bindAddressString = bindAddress; 142 } 143 144 public String getBindAddress() { 145 return bindAddressString; 146 } 147 148 public void setExpiryTime(long expiryTime) { 149 this.expiryTime = expiryTime; 150 } 151 152 public long getExpiryTime() { 153 return expiryTime; 154 } 155 156 public void setGossipRequestTimeout(long gossipRequestTimeout) { 157 this.gossipRequestTimeout = gossipRequestTimeout; 158 } 159 160 public long getGossipRequestTimeout() { 161 return gossipRequestTimeout; 162 } 163 164 public void setRoutingClientReplyTimeout(long routingClientReplyTimeout) { 165 this.routingClientReplyTimeout = routingClientReplyTimeout; 166 } 167 168 public long getRoutingClientReplyTimeout() { 169 return routingClientReplyTimeout; 170 } 171 172 public boolean isStarted() { 173 return srvSock!=null; 174 } 175 176 180 181 184 public void create() throws Exception { 185 } 187 188 193 public void start() throws Exception { 194 195 if (srvSock!=null) { 196 throw new Exception ("Router already started."); 197 } 198 199 if (bindAddressString!=null) { 200 bindAddress = InetAddress.getByName(bindAddressString); 201 srvSock = new ServerSocket (port, 50, bindAddress); 202 } 203 else { 204 srvSock = new ServerSocket (port, 50); 205 } 206 207 new Thread (new Runnable () { 209 public void run() { 210 mainLoop(); 211 cleanup(); 212 } 213 }, "JGroups Router Main Thread").start(); 214 215 timer = new Timer(true); 218 timer.schedule(new TimerTask() { 219 public void run() { 220 sweep(); 221 } 222 }, expiryTime, expiryTime); 223 } 224 225 229 public void stop() { 230 231 if (srvSock==null) { 232 if(log.isWarnEnabled()) log.warn("Router already stopped"); 233 return; 234 } 235 236 timer.cancel(); 237 shutdown(); 238 try { 239 srvSock.close(); 240 } 241 catch(Exception e) { 242 if(log.isErrorEnabled()) log.error("Failed to close server socket: "+e); 243 } 244 srvSock = null; 246 if(log.isInfoEnabled()) log.info("Router stopped"); 247 } 248 249 252 public void destroy() { 253 } 255 256 257 261 public String dumpRoutingTable() { 262 return dumpTable(routingTable); 263 } 264 265 public String dumpGossipTable() { 266 return dumpTable(gossipTable); 267 } 268 269 270 271 275 public static String requestTypeToString(int type) { 276 return 277 type == GET ? "GET" : 278 (type == REGISTER ? "REGISTER" : 279 (type == DUMP ? "DUMP" : 280 (type == SHUTDOWN ? "SHUTDOWN" : "UNKNOWN REQUEST: "+type))); 281 } 282 283 284 287 private void mainLoop() { 288 Socket sock = null; 289 DataInputStream input = null; 290 DataOutputStream output = null; 291 Address peer_addr = null; 292 byte[] buf; 293 int len, type = -1; 294 String gname = null; 295 Date d; 296 boolean up = true; 297 298 if(bindAddress == null) { 299 bindAddress=srvSock.getInetAddress(); 300 } 301 d=new Date(); 302 System.out.println("GossipRouter started at " + d + 303 "\nListening on port " + port + " bound on address " + bindAddress + '\n'); 304 d=null; 305 306 while(up) { 307 308 try { 309 sock=srvSock.accept(); 310 sock.setSoLinger(true, 500); 311 312 if(log.isTraceEnabled()) { 313 log.trace("router accepted connection from "+sock); 314 } 315 316 final BufferedInputStream bis = new BufferedInputStream(sock.getInputStream()); 317 final Promise waitArea = new Promise(); 318 final Socket s = sock; 319 320 326 Thread t = new Thread (new Runnable () { 327 public void run() { 328 ObjectInputStream ois = null; 329 try { 330 bis.mark(MARK_BUFFER_SIZE); 331 ois = new ObjectInputStream(bis); 333 GossipData gossip_req = (GossipData)ois.readObject(); 334 335 waitArea.setResult(GOSSIP_REQUEST); 337 338 GossipData gresp = processGossip(gossip_req); 339 if (gresp != null) { 340 ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream()); 341 oos.writeObject(gresp); 342 oos.close(); 343 } 344 bis.close(); 345 s.close(); 346 } 347 catch(Exception e) { 348 if(log.isDebugEnabled()) log.debug("gossip thread exception :"+e); 349 waitArea.setResult(GOSSIP_FAILURE); 350 } 351 finally { 352 try { 353 ois.close(); 354 } 355 catch(Exception e) { 356 } 358 } 359 } 360 }, "Gossip Request Thread"); 361 362 t.start(); 363 364 Object waitResult = waitArea.getResult(gossipRequestTimeout); 365 waitArea.reset(); 366 367 if (waitResult != null) { 368 continue; 370 } 371 372 374 peer_addr = new IpAddress(sock.getInetAddress(), sock.getPort()); 375 output = new DataOutputStream(sock.getOutputStream()); 376 377 buf = Util.objectToByteBuffer(peer_addr); 379 output.writeInt(buf.length); 380 output.write(buf, 0, buf.length); 381 382 waitResult = waitArea.getResult(routingClientReplyTimeout); 386 387 if (waitResult == null) { 388 throw new Exception ("Timeout waiting for router client answer"); 390 } 391 else if (waitResult == GOSSIP_REQUEST) { 392 output.close(); 395 continue; 396 } 397 398 bis.reset(); 399 input=new DataInputStream(bis); 400 401 type=input.readInt(); 402 if(log.isTraceEnabled()) { 403 log.trace("request of type "+requestTypeToString(type)); 404 } 405 406 gname=input.readUTF(); 407 408 409 414 switch(type) { 415 case GossipRouter.GET: 416 processGetRequest(sock, output, gname); break; 418 case GossipRouter.DUMP: 419 processDumpRequest(sock, output); break; 421 case GossipRouter.REGISTER: 422 Address addr; 423 len=input.readInt(); 424 buf=new byte[len]; 425 input.readFully(buf, 0, buf.length); addr=(Address)Util.objectFromByteBuffer(buf); 427 SocketThread st = new SocketThread(sock, input, addr); 428 addEntry(gname, new AddressEntry(addr, sock, st, output)); 429 st.start(); 430 break; 431 case GossipRouter.SHUTDOWN: 432 if(log.isInfoEnabled()) log.info("router shutting down"); 433 output.writeInt(SHUTDOWN_OK); 434 output.flush(); 435 try { 436 sock.close(); 437 } 438 catch(Exception e) { 439 } 441 up = false; 442 continue; 443 default: 444 if(log.isErrorEnabled()) log.error("request of type " + type + " not recognized"); 445 continue; 446 } 447 } 448 catch(Exception e) { 449 if(log.isErrorEnabled()) log.error("failure handling a client connection: " + e.getMessage(), e); 450 try { 451 sock.close(); 452 } 453 catch(IOException e2) { 454 if(log.isWarnEnabled()) log.warn("failed to close socket "+sock); 455 } 456 continue; 457 } 458 } 459 } 460 461 462 465 private void cleanup() { 466 467 synchronized(routingTable) { 469 for(Iterator i=routingTable.keySet().iterator(); i.hasNext();) { 470 String gname=(String )i.next(); 471 List l=(List)routingTable.get(gname); 472 if (l!=null) { 473 for(Iterator j=l.iterator(); j.hasNext(); ) { 474 AddressEntry e = (AddressEntry)j.next(); 475 e.destroy(); 476 } 477 } 478 } 479 routingTable.clear(); 480 if(log.isInfoEnabled()) log.info("routing table cleared"); 481 } 482 synchronized(gossipTable) { 483 gossipTable.clear(); 484 if(log.isInfoEnabled()) log.info("gossip table cleared"); 485 } 486 487 } 488 489 492 private void shutdown() { 493 try { 494 Socket s = new Socket (srvSock.getInetAddress(), 495 srvSock.getLocalPort()); 496 DataInputStream dis = new DataInputStream(s.getInputStream()); 497 int len = dis.readInt(); 498 byte[] buf = new byte[len]; 499 dis.readFully(buf, 0, buf.length); 500 DataOutputStream dos = new DataOutputStream(s.getOutputStream()); 501 dos.writeInt(SHUTDOWN); 502 dos.writeUTF(""); 503 dis.readInt(); 505 dos.flush(); 506 dos.close(); 507 s.close(); 508 } 509 catch(Exception e) { 510 if(log.isErrorEnabled()) log.error("shutdown failed: "+e); 511 } 512 513 } 514 515 519 522 private GossipData processGossip(GossipData gossip) { 523 524 if(log.isTraceEnabled()) log.trace("gossip is "+gossip); 525 526 if (gossip==null) { 527 if(log.isWarnEnabled()) log.warn("null gossip request"); 528 return null; 529 } 530 531 String group = gossip.getGroup(); 532 Address mbr = null; 533 534 synchronized(gossipTable) { 535 536 switch(gossip.getType()) { 537 538 case GossipData.REGISTER_REQ: 539 mbr=gossip.getMbr(); 540 if(group == null || mbr == null) { 541 if(log.isErrorEnabled()) log.error("group or member is null, cannot register member"); 542 return null; 543 } 544 addGossipEntry(group, new AddressEntry(mbr)); 545 return null; 546 547 case GossipData.GET_REQ: 548 if(group == null) { 549 if(log.isErrorEnabled()) log.error("group is null, cannot get membership"); 550 return null; 551 } 552 Vector mbrs = null; 553 List l = (List)gossipTable.get(group); 554 if (l != null) { 555 mbrs = new Vector(); 556 for(Iterator i = l.iterator(); i.hasNext(); ) { 557 AddressEntry e = (AddressEntry)i.next(); 558 mbrs.add(e.addr); 559 } 560 } 561 return new GossipData(GossipData.GET_RSP, group, null, mbrs); 562 563 case GossipData.GET_RSP: 564 if(log.isWarnEnabled()) log.warn("received a GET_RSP. Should not be received by server"); 565 return null; 566 567 default: 568 if(log.isWarnEnabled()) log.warn("received unkown gossip request (gossip=" + gossip + ')'); 569 return null; 570 } 571 } 572 } 573 574 575 581 private void addGossipEntry(String groupname, AddressEntry e) { 582 583 List val; 584 585 if(groupname == null) { 586 if(log.isErrorEnabled()) log.error("groupname was null, not added !"); 587 return; 588 } 589 590 synchronized(gossipTable) { 591 592 val=(List)gossipTable.get(groupname); 593 if(val == null) { 594 val=Collections.synchronizedList(new ArrayList()); 595 gossipTable.put(groupname, val); 596 } 597 int index = val.indexOf(e); 598 if (index==-1) { 599 val.add(e); 600 return; 601 } 602 ((AddressEntry)val.get(index)).update(); 603 } 604 } 605 606 607 611 private void sweep() { 612 613 long diff, currentTime=System.currentTimeMillis(); 614 int num_entries_removed=0; 615 String key=null; 616 List val; 617 618 if(log.isTraceEnabled()) log.trace("running sweep"); 619 620 synchronized(gossipTable) { 621 for(Iterator i=gossipTable.keySet().iterator(); i.hasNext();) { 622 key=(String )i.next(); 623 val=(List)gossipTable.get(key); 624 if(val != null) { 625 for(Iterator j=val.iterator(); j.hasNext();) { 626 AddressEntry ae = (AddressEntry)j.next(); 627 diff=currentTime - ae.timestamp; 628 if(diff > expiryTime) { 629 j.remove(); 630 if(log.isTraceEnabled()) 631 log.trace("Removed member " + ae + " from group " + key + '(' + diff + " msecs old)"); 632 num_entries_removed++; 633 } 634 } 635 } 636 } 637 } 638 639 if(num_entries_removed > 0) { 640 if(log.isTraceEnabled()) log.trace("done (removed " + num_entries_removed + " entries)"); 641 } 642 } 643 644 648 651 private void processGetRequest(Socket sock, DataOutputStream output, String groupname) { 652 653 List grpmbrs=(List)routingTable.get(groupname); 654 org.jgroups.util.List ret=null; 655 AddressEntry entry; 656 byte[] buf; 657 658 if(log.isTraceEnabled()) { 659 log.trace("groupname=" + groupname + ", result=" + grpmbrs); 660 } 661 662 if(grpmbrs != null && grpmbrs.size() > 0) { 663 ret=new org.jgroups.util.List(); 664 for(Iterator i=grpmbrs.iterator(); i.hasNext(); ) { 665 entry=(AddressEntry)i.next(); 666 ret.add(entry.addr); 667 } 668 } 669 try { 670 if(ret == null || ret.size() == 0) { 671 output.writeInt(0); 672 } 673 else { 674 buf=Util.objectToByteBuffer(ret); 675 output.writeInt(buf.length); 676 output.write(buf, 0, buf.length); 677 } 678 } 679 catch(Exception e) { 680 if(log.isErrorEnabled()) log.error("exception=" + e); 681 } 682 finally { 683 try { 684 if(output != null) 685 output.close(); 686 sock.close(); 687 } 688 catch(Exception e) { 689 } 690 } 691 } 692 693 694 697 private void processDumpRequest(Socket sock, DataOutputStream output) { 698 699 try { 700 output.writeUTF(dumpRoutingTable()); 701 } 702 catch(Exception e) { 703 if(log.isErrorEnabled()) log.error("error sending the answer back to the client: " + e); 704 } 705 finally { 706 try { 707 if(output != null) { 708 output.close(); 709 } 710 } 711 catch(Exception e) { 712 if(log.isErrorEnabled()) log.error("error closing the output stream: " + e); 713 } 714 try { 715 sock.close(); 716 } 717 catch(Exception e) { 718 if(log.isErrorEnabled()) log.error("error closing the socket: " + e); 719 } 720 } 721 } 722 723 724 private String dumpTable(Map map) { 725 726 String label = (map instanceof Hashtable)?"routing":"gossip"; 727 StringBuffer sb=new StringBuffer (); 728 synchronized(map) { 729 if(map.size() == 0) { 730 sb.append("empty "); 731 sb.append(label); 732 sb.append(" table"); 733 } 734 else { 735 for(Iterator i=map.keySet().iterator(); i.hasNext();) { 736 String gname=(String )i.next(); 737 sb.append("GROUP: '" + gname + "'\n"); 738 List l=(List)map.get(gname); 739 if(l == null) { 740 sb.append("\tnull list of addresses\n"); 741 } 742 else 743 if(l.size() == 0) { 744 sb.append("\tempty list of addresses\n"); 745 } 746 else { 747 for(Iterator j=l.iterator(); j.hasNext();) { 748 AddressEntry ae=(AddressEntry)j.next(); 749 sb.append('\t'); 750 sb.append(ae.toString()); 751 sb.append('\n'); 752 } 753 } 754 } 755 } 756 } 757 return sb.toString(); 758 } 759 760 761 762 private void route(Address dest, String dest_group, byte[] msg, Address sender) { 763 if(log.isTraceEnabled()) { 764 int len=msg != null? msg.length : 0; 765 log.trace("routing request from " + sender + " for "+dest_group+" to " + 766 (dest==null?"ALL":dest.toString())+", " + len + " bytes"); 767 } 768 769 if(dest == null) { 770 if(dest_group == null) { 772 if(log.isErrorEnabled()) log.error("both dest address and group are null"); 773 return; 774 } 775 else { 776 sendToAllMembersInGroup(dest_group, msg, sender); 777 } 778 } 779 else { 780 AddressEntry ae = findAddressEntry(dest); 782 if (ae == null) { 783 if(log.isErrorEnabled()) log.error("cannot find address "+dest+" in the routing table"); 784 return; 785 } 786 if (ae.output==null) { 787 if(log.isErrorEnabled()) log.error("address "+dest+" is associated with a null output stream"); 788 return; 789 } 790 try { 791 sendToMember(ae.output, msg); 792 } 793 catch(Exception e) { 794 if(log.isErrorEnabled()) log.error("failed sending message to "+dest+": "+e.getMessage()); 795 removeEntry(ae.sock); } 797 } 798 } 799 800 801 804 private void addEntry(String groupname, AddressEntry e) { 805 List val; 806 807 if(groupname == null) { 808 if(log.isErrorEnabled()) log.error("groupname was null, not added !"); 809 return; 810 } 811 812 synchronized(routingTable) { 813 val=(List)routingTable.get(groupname); 814 if(val == null) { 815 val=Collections.synchronizedList(new ArrayList()); 816 routingTable.put(groupname, val); 817 } 818 int index = val.indexOf(e); 819 if (index==-1) { 820 val.add(e); 821 return; 822 } 823 ((AddressEntry)val.remove(index)).destroy(); 825 val.add(e); 826 } 827 } 828 829 830 private void removeEntry(Socket sock) { 831 832 List val; 833 AddressEntry entry; 834 synchronized(routingTable) { 835 for(Enumeration e=routingTable.keys(); e.hasMoreElements();) { 836 val=(List)routingTable.get(e.nextElement()); 837 for(Iterator i=val.iterator(); i.hasNext();) { 838 entry=(AddressEntry)i.next(); 839 if(entry.sock == sock) { 840 entry.destroy(); 841 i.remove(); 843 return; 844 } 845 } 846 } 847 } 848 } 849 850 853 private AddressEntry findAddressEntry(Address addr) { 854 855 List val; 856 AddressEntry entry; 857 synchronized(routingTable) { 858 for(Enumeration e=routingTable.keys(); e.hasMoreElements();) { 859 val=(List)routingTable.get(e.nextElement()); 860 for(Iterator i=val.iterator(); i.hasNext();) { 861 entry=(AddressEntry)i.next(); 862 if(addr.equals(entry.addr)) { 863 return entry; 864 } 865 } 866 } 867 return null; 868 } 869 } 870 871 872 873 874 private void sendToAllMembersInGroup(String groupname, byte[] msg, Address sender) { 875 List val; 876 val=(List)routingTable.get(groupname); 877 if(val == null || val.size() == 0) { 878 return; 879 } 880 881 synchronized(val) { 882 for(Iterator i=val.iterator(); i.hasNext();) { 883 AddressEntry ae = (AddressEntry)i.next(); 884 if(ae.addr != null && ae.addr.equals(sender)) { 885 continue; 888 } 889 DataOutputStream dos = ae.output; 890 891 if (dos!=null) { 892 try { 894 sendToMember(dos, msg); 895 } 896 catch(Exception e) { 897 if(log.isWarnEnabled()) log.warn("cannot send to "+ae.addr+": "+e.getMessage()); 898 ae.destroy(); i.remove(); 900 } 901 } 902 } 903 } 904 } 905 906 907 910 private void sendToMember(DataOutputStream out, byte[] msg) throws IOException { 911 if (out==null) { 912 return; 913 } 914 915 synchronized(out) { 916 out.writeInt(msg.length); 917 out.write(msg, 0, msg.length); 918 } 919 } 920 921 922 923 928 class AddressEntry { 929 Address addr=null; 930 Socket sock=null; 931 DataOutputStream output=null; 932 long timestamp=0; 933 final SocketThread thread; 934 935 938 public AddressEntry(Address addr) { 939 this(addr, null, null, null); 940 } 941 942 public AddressEntry(Address addr, Socket sock, SocketThread thread, DataOutputStream output) { 943 this.addr=addr; 944 this.sock=sock; 945 this.thread = thread; 946 this.output=output; 947 this.timestamp = System.currentTimeMillis(); 948 } 949 950 void destroy() { 951 if (thread != null) { 952 thread.finish(); 953 } 954 if(output != null) { 955 try { 956 output.close(); 957 } 958 catch(Exception e) { 959 } 960 output=null; 961 } 962 if(sock != null) { 963 try { 964 sock.close(); 965 } 966 catch(Exception e) { 967 } 968 sock=null; 969 } 970 timestamp = 0; 971 } 972 973 public void update() { 974 timestamp = System.currentTimeMillis(); 975 } 976 977 public boolean equals(Object other) { 978 return addr.equals(((AddressEntry)other).addr); 979 } 980 981 public String toString() { 982 StringBuffer sb = new StringBuffer ("addr="); 983 sb.append(addr); 984 if (sock==null) { 985 sb.append(", timestamp="); 986 sb.append(timestamp); 987 } 988 else { 989 sb.append(", sock="); 990 sb.append(sock); 991 } 992 return sb.toString(); 993 } 994 } 995 996 997 private static int threadCounter = 0; 998 999 1000 1001 class SocketThread extends Thread { 1002 private volatile boolean active = true; 1003 Socket sock=null; 1004 DataInputStream input=null; 1005 Address addr=null; 1006 1007 public SocketThread(Socket sock, DataInputStream ois, Address addr) { 1008 super("SocketThread "+(threadCounter++)); 1009 this.sock=sock; 1010 input=ois; 1011 this.addr=addr; 1012 } 1013 1014 void closeSocket() { 1015 try { 1016 if(input != null) 1017 input.close(); 1018 if(sock != null) 1019 sock.close(); 1020 } 1021 catch(Exception e) { 1022 } 1023 } 1024 1025 void finish() { 1026 if(log.isTraceEnabled()) log.trace("terminating the SocketThread for "+sock); 1027 active = false; 1028 } 1029 1030 1031 public void run() { 1032 byte[] buf; 1033 int len; 1034 Address dst_addr=null; 1035 String gname; 1036 1037 while(active) { 1038 try { 1039 gname=input.readUTF(); 1041 1042 dst_addr=Util.readAddress(input); 1044 1045 if (log.isTraceEnabled()) { 1046 log.trace("group " + gname + ", routing request to " + (dst_addr == null ? "all" : dst_addr.toString())); 1047 } 1048 1049 len=input.readInt(); 1051 if(len == 0) { 1052 if(log.isWarnEnabled()) log.warn("received null message"); 1053 continue; 1054 } 1055 1056 buf=new byte[len]; 1058 input.readFully(buf, 0, buf.length); 1060 route(dst_addr, gname, buf, addr); 1062 } 1063 catch(EOFException io_ex) { 1064 if(log.isTraceEnabled()) 1065 log.trace("client " +sock.getInetAddress().getHostName() + ':' + sock.getPort() + 1066 " closed connection; removing it from routing table"); 1067 removeEntry(sock); return; 1069 } 1070 catch(Exception e) { 1071 if(log.isErrorEnabled()) log.error("exception=" + e); 1072 break; 1073 } 1074 } 1075 closeSocket(); 1076 } 1077 1078 } 1079 1080 1081 public static void main(String [] args) throws Exception { 1082 String arg; 1083 int port=8080; 1084 long expiry = GossipRouter.EXPIRY_TIME; 1085 long timeout = GossipRouter.GOSSIP_REQUEST_TIMEOUT; 1086 long routingTimeout = GossipRouter.ROUTING_CLIENT_REPLY_TIMEOUT; 1087 GossipRouter router=null; 1088 String address=null; 1089 1090 for(int i=0; i < args.length; i++) { 1091 arg=args[i]; 1092 if("-help".equals(arg)) { 1093 System.out.println(); 1094 System.out.println("GossipRouter [-port <port>] [-bindaddress <address>] [options]"); 1095 System.out.println("Options: "); 1096 System.out.println(" -expiry <msecs> - Time until a gossip cache entry expires."); 1097 System.out.println(" -timeout <msecs> - Number of millisecs the router waits to receive"); 1098 System.out.println(" a gossip request after connection was established;"); 1099 System.out.println(" upon expiration, the router initiates the routing"); 1100 System.out.println(" protocol on the connection."); 1101 return; 1102 } 1103 else if("-port".equals(arg)) { 1104 port=Integer.parseInt(args[++i]); 1105 } 1106 else if("-bindaddress".equals(arg)) { 1107 address=args[++i]; 1108 } 1109 else if("-expiry".equals(arg)) { 1110 expiry=Long.parseLong(args[++i]); 1111 } 1112 else if("-timeout".equals(arg)) { 1113 timeout=Long.parseLong(args[++i]); 1114 } 1115 else if("-rtimeout".equals(arg)) { 1116 routingTimeout=Long.parseLong(args[++i]); 1117 } 1118 } 1119 System.out.println("GossipRouter is starting..."); 1120 1121 try { 1122 router= new GossipRouter(port, address, expiry, timeout, routingTimeout); 1123 router.start(); 1124 } 1125 catch(Exception e) { 1126 System.err.println(e); 1127 } 1128 } 1129 1130 1131} 1132 | Popular Tags |