1 3 package org.jgroups.blocks; 4 5 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.Message; 10 import org.jgroups.Version; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.util.Util; 13 14 import java.io.DataInputStream ; 15 import java.io.DataOutputStream ; 16 import java.io.EOFException ; 17 import java.io.IOException ; 18 import java.net.*; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 import java.util.Map ; 22 import java.util.Vector ; 23 24 25 35 public class ConnectionTable implements Runnable { 36 final HashMap conns=new HashMap (); Receiver receiver=null; 38 ServerSocket srv_sock=null; 39 boolean reuse_addr=false; 40 InetAddress bind_addr=null; 41 42 46 InetAddress external_addr=null; 47 Address local_addr=null; int srv_port=7800; 49 int max_port=0; Thread acceptor=null; static final int backlog=20; int recv_buf_size=120000; 53 int send_buf_size=60000; 54 final Vector conn_listeners=new Vector (); final Object recv_mutex=new Object (); Reaper reaper=null; long reaper_interval=60000; long conn_expire_time=300000; boolean use_reaper=false; int sock_conn_timeout=1000; ThreadGroup thread_group=null; 62 protected final Log log=LogFactory.getLog(getClass()); 63 static int javaVersion=0; 64 final byte[] cookie={'b', 'e', 'l', 'a'}; 65 66 67 static { 68 javaVersion=Util.getJavaVersion(); 69 } 70 71 72 73 public interface Receiver { 74 void receive(Message msg); 75 } 76 77 78 79 80 public interface ConnectionListener { 81 void connectionOpened(Address peer_addr); 82 void connectionClosed(Address peer_addr); 83 } 84 85 86 91 public ConnectionTable(int srv_port) throws Exception { 92 this.srv_port=srv_port; 93 start(); 94 } 95 96 97 105 public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time) throws Exception { 106 this.srv_port=srv_port; 107 this.reaper_interval=reaper_interval; 108 this.conn_expire_time=conn_expire_time; 109 use_reaper=true; 110 start(); 111 } 112 113 114 130 public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception { 131 setReceiver(r); 132 this.bind_addr=bind_addr; 133 this.external_addr=external_addr; 134 this.srv_port=srv_port; 135 this.max_port=max_port; 136 start(); 137 } 138 139 140 161 public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, 162 long reaper_interval, long conn_expire_time) throws Exception { 163 setReceiver(r); 164 this.bind_addr=bind_addr; 165 this.external_addr=external_addr; 166 this.srv_port=srv_port; 167 this.max_port=max_port; 168 this.reaper_interval=reaper_interval; 169 this.conn_expire_time=conn_expire_time; 170 use_reaper=true; 171 start(); 172 } 173 174 175 public void setReceiver(Receiver r) { 176 receiver=r; 177 } 178 179 180 public void addConnectionListener(ConnectionListener l) { 181 if(l != null && !conn_listeners.contains(l)) 182 conn_listeners.addElement(l); 183 } 184 185 186 public void removeConnectionListener(ConnectionListener l) { 187 if(l != null) conn_listeners.removeElement(l); 188 } 189 190 191 public Address getLocalAddress() { 192 if(local_addr == null) 193 local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null; 194 return local_addr; 195 } 196 197 198 public int getSendBufferSize() { 199 return send_buf_size; 200 } 201 202 public void setSendBufferSize(int send_buf_size) { 203 this.send_buf_size=send_buf_size; 204 } 205 206 public int getReceiveBufferSize() { 207 return recv_buf_size; 208 } 209 210 public void setReceiveBufferSize(int recv_buf_size) { 211 this.recv_buf_size=recv_buf_size; 212 } 213 214 public int getSocketConnectionTimeout() { 215 return sock_conn_timeout; 216 } 217 218 public void setSocketConnectionTimeout(int sock_conn_timeout) { 219 this.sock_conn_timeout=sock_conn_timeout; 220 } 221 222 226 public void send(Message msg) throws SocketException { 227 Address dest=msg != null ? msg.getDest() : null; 228 Connection conn; 229 230 if(dest == null) { 231 if(log.isErrorEnabled()) 232 log.error("msg is null or message's destination is null"); 233 return; 234 } 235 236 try { 238 conn=getConnection(dest); 239 if(conn == null) return; 240 } 241 catch(SocketException sock_ex) { 242 throw sock_ex; 244 } 245 catch(Throwable ex) { 246 if(log.isInfoEnabled()) log.info("connection to " + dest + " could not be established: " + ex); 247 throw new SocketException(ex.toString()); 248 } 249 250 try { 252 conn.send(msg); 253 } 254 catch(Throwable ex) { 255 if(log.isTraceEnabled()) 256 log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table"); 257 remove(dest); 258 } 259 } 260 261 262 263 Connection getConnection(Address dest) throws Exception { 264 Connection conn=null; 265 Socket sock; 266 267 synchronized(conns) { 268 conn=(Connection)conns.get(dest); 269 if(conn == null) { 270 272 if(javaVersion >= 14) { 273 SocketAddress tmpBindAddr=new InetSocketAddress(bind_addr, 0); 274 InetAddress tmpDest=((IpAddress)dest).getIpAddress(); 275 SocketAddress destAddr=new InetSocketAddress(tmpDest, ((IpAddress)dest).getPort()); 276 sock=new Socket(); 277 sock.bind(tmpBindAddr); 278 sock.connect(destAddr, sock_conn_timeout); 279 } 280 else { 281 sock=new Socket(((IpAddress)dest).getIpAddress(), ((IpAddress)dest).getPort(), bind_addr, 0); 282 } 283 284 try { 285 sock.setSendBufferSize(send_buf_size); 286 } 287 catch(IllegalArgumentException ex) { 288 if(log.isErrorEnabled()) log.error("exception setting send buffer size to " + 289 send_buf_size + " bytes: " + ex); 290 } 291 try { 292 sock.setReceiveBufferSize(recv_buf_size); 293 } 294 catch(IllegalArgumentException ex) { 295 if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " + 296 send_buf_size + " bytes: " + ex); 297 } 298 conn=new Connection(sock, dest); 299 conn.sendLocalAddress(local_addr); 300 notifyConnectionOpened(dest); 301 addConnection(dest, conn); 303 conn.init(); 304 if(log.isInfoEnabled()) log.info("created socket to " + dest); 305 } 306 return conn; 307 } 308 } 309 310 311 public void start() throws Exception { 312 srv_sock=createServerSocket(srv_port, max_port); 313 314 if (external_addr!=null) 315 local_addr=new IpAddress(external_addr, srv_sock.getLocalPort()); 316 else if (bind_addr != null) 317 local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort()); 318 else 319 local_addr=new IpAddress(srv_sock.getLocalPort()); 320 321 if(log.isInfoEnabled()) log.info("server socket created on " + local_addr); 322 323 thread_group = new ThreadGroup (Thread.currentThread().getThreadGroup(), "ConnectionTableGroup"); 325 acceptor=new Thread (thread_group, this, "ConnectionTable.AcceptorThread"); 327 acceptor.setDaemon(true); 328 acceptor.start(); 329 330 if(use_reaper && reaper == null) { 332 reaper=new Reaper(); 333 reaper.start(); 334 } 335 } 336 337 338 339 public void stop() { 340 Iterator it=null; 341 Connection conn; 342 ServerSocket tmp; 343 344 if(srv_sock != null) { 346 try { 347 tmp=srv_sock; 348 srv_sock=null; 349 tmp.close(); 350 } 351 catch(Exception e) { 352 } 353 } 354 355 356 synchronized(conns) { 358 it=conns.values().iterator(); 359 while(it.hasNext()) { 360 conn=(Connection)it.next(); 361 conn.destroy(); 362 } 363 conns.clear(); 364 } 365 local_addr=null; 366 } 367 368 369 372 public void remove(Address addr) { 373 Connection conn; 374 375 synchronized(conns) { 376 conn=(Connection)conns.remove(addr); 377 } 378 379 if(conn != null) { 380 try { 381 conn.destroy(); } 383 catch(Exception e) { 384 } 385 } 386 if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString()); 387 } 388 389 390 395 public void run() { 396 Socket client_sock; 397 Connection conn=null; 398 Address peer_addr; 399 400 while(srv_sock != null) { 401 try { 402 client_sock=srv_sock.accept(); 403 if(log.isTraceEnabled()) 404 log.trace("accepted connection from " + client_sock.getInetAddress() + ":" + client_sock.getPort()); 405 406 conn=new Connection(client_sock, null); peer_addr=conn.readPeerAddress(client_sock); 410 411 conn.setPeerAddress(peer_addr); 413 414 synchronized(conns) { 415 if(conns.containsKey(peer_addr)) { 416 if(log.isTraceEnabled()) 417 log.trace(peer_addr + " is already there, will reuse connection"); 418 } 421 else { 422 addConnection(peer_addr, conn); 424 notifyConnectionOpened(peer_addr); 425 } 426 } 427 428 conn.init(); } 430 catch(SocketException sock_ex) { 431 if(log.isInfoEnabled()) log.info("exception is " + sock_ex); 432 if(conn != null) 433 conn.destroy(); 434 if(srv_sock == null) 435 break; } 437 catch(Throwable ex) { 438 if(log.isWarnEnabled()) log.warn("exception is " + ex); 439 if(srv_sock == null) 440 break; } 442 } 443 if(log.isTraceEnabled()) 444 log.trace(Thread.currentThread().getName() + " terminated"); 445 } 446 447 448 452 public void receive(Message msg) { 453 if(receiver != null) { 454 synchronized(recv_mutex) { 455 receiver.receive(msg); 456 } 457 } 458 else 459 if(log.isErrorEnabled()) log.error("receiver is null (not set) !"); 460 } 461 462 463 public String toString() { 464 StringBuffer ret=new StringBuffer (); 465 Address key; 466 Connection val; 467 Map.Entry entry; 468 HashMap copy; 469 470 synchronized(conns) { 471 copy=new HashMap (conns); 472 } 473 ret.append("connections (" + copy.size() + "):\n"); 474 for(Iterator it=copy.entrySet().iterator(); it.hasNext();) { 475 entry=(Map.Entry )it.next(); 476 key=(Address)entry.getKey(); 477 val=(Connection)entry.getValue(); 478 ret.append("key: " + key + ": " + val + '\n'); 479 } 480 ret.append('\n'); 481 return ret.toString(); 482 } 483 484 485 487 protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception { 488 ServerSocket ret=null; 489 490 while(true) { 491 try { 492 if(bind_addr == null) 493 ret=new ServerSocket(start_port); 494 else { 495 496 ret=new ServerSocket(start_port, backlog, bind_addr); 497 } 498 } 499 catch(BindException bind_ex) { 500 if (start_port==end_port) throw new BindException("No available port to bind to"); 501 if(bind_addr != null && javaVersion >= 14) { 502 NetworkInterface nic=NetworkInterface.getByInetAddress(bind_addr); 503 if(nic == null) 504 throw new BindException("bind_addr " + bind_addr + " is not a valid interface"); 505 } 506 start_port++; 507 continue; 508 } 509 catch(IOException io_ex) { 510 if(log.isErrorEnabled()) log.error("exception is " + io_ex); 511 } 512 srv_port=start_port; 513 break; 514 } 515 return ret; 516 } 517 518 519 void notifyConnectionOpened(Address peer) { 520 if(peer == null) return; 521 for(int i=0; i < conn_listeners.size(); i++) 522 ((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer); 523 } 524 525 void notifyConnectionClosed(Address peer) { 526 if(peer == null) return; 527 for(int i=0; i < conn_listeners.size(); i++) 528 ((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer); 529 } 530 531 532 void addConnection(Address peer, Connection c) { 533 conns.put(peer, c); 534 if(reaper != null && !reaper.isRunning()) 535 reaper.start(); 536 } 537 538 539 540 541 class Connection implements Runnable { 542 Socket sock=null; String sock_addr=null; DataOutputStream out=null; DataInputStream in=null; Thread receiverThread=null; Address peer_addr=null; final Object send_mutex=new Object (); long last_access=System.currentTimeMillis(); LinkedQueue send_queue=new LinkedQueue(); 551 Sender sender=new Sender(); 552 final long POLL_TIMEOUT=30000; 553 554 555 String getSockAddress() { 556 if(sock_addr != null) 557 return sock_addr; 558 if(sock != null) { 559 StringBuffer sb=new StringBuffer (); 560 sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort()); 561 sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort()); 562 sock_addr=sb.toString(); 563 } 564 return sock_addr; 565 } 566 567 class Sender implements Runnable { 568 Thread senderThread; 569 private boolean running=false; 570 571 void start() { 572 if(senderThread == null || !senderThread.isAlive()) { 573 senderThread=new Thread (thread_group, this, "ConnectionTable.Connection.Sender [" + getSockAddress() + "]"); 574 senderThread.setDaemon(true); 575 senderThread.start(); 576 running=true; 577 if(log.isTraceEnabled()) 578 log.trace("ConnectionTable.Connection.Sender thread started"); 579 } 580 } 581 582 void stop() { 583 if(senderThread != null) { 584 senderThread.interrupt(); 585 senderThread=null; 586 running=false; 587 } 588 } 589 590 boolean isRunning() { 591 return running && senderThread != null; 592 } 593 594 public void run() { 595 Message msg; 596 while(senderThread != null && senderThread.equals(Thread.currentThread())) { 597 try { 598 msg=(Message)send_queue.poll(POLL_TIMEOUT); 599 if(msg == null) 600 break; 601 _send(msg); 602 } 603 catch(InterruptedException e) { 604 break; 605 } 606 } 607 running=false; 608 if(log.isTraceEnabled()) 609 log.trace("ConnectionTable.Connection.Sender thread terminated"); 610 } 611 } 612 613 614 Connection(Socket s, Address peer_addr) { 615 sock=s; 616 this.peer_addr=peer_addr; 617 try { 618 out=new DataOutputStream (sock.getOutputStream()); 619 in=new DataInputStream (sock.getInputStream()); 620 } 621 catch(Exception ex) { 622 if(log.isErrorEnabled()) log.error("exception is " + ex); 623 } 624 } 625 626 627 boolean established() { 628 return receiverThread != null; 629 } 630 631 632 void setPeerAddress(Address peer_addr) { 633 this.peer_addr=peer_addr; 634 } 635 636 void updateLastAccessed() { 637 last_access=System.currentTimeMillis(); 638 } 639 640 void init() { 641 if(receiverThread == null || !receiverThread.isAlive()) { 643 receiverThread=new Thread (thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]"); 645 receiverThread.setDaemon(true); 646 receiverThread.start(); 647 if(log.isTraceEnabled()) 648 log.trace("ConnectionTable.Connection.Receiver started"); 649 } 650 } 651 652 653 void destroy() { 654 closeSocket(); sender.stop(); 656 receiverThread=null; 657 } 658 659 660 void send(Message msg) { 661 try { 662 send_queue.put(msg); 663 if(!sender.isRunning()) 664 sender.start(); 665 } 666 catch(InterruptedException e) { 667 log.error("failed adding message to send_queue", e); 668 } 669 } 670 671 private void _send(Message msg) { 672 synchronized(send_mutex) { 673 try { 674 doSend(msg); 675 updateLastAccessed(); 676 } 677 catch(IOException io_ex) { 678 if(log.isWarnEnabled()) 679 log.warn("peer closed connection, trying to re-establish connection and re-send msg"); 680 try { 681 doSend(msg); 682 updateLastAccessed(); 683 } 684 catch(IOException io_ex2) { 685 if(log.isErrorEnabled()) log.error("2nd attempt to send data failed too"); 686 } 687 catch(Exception ex2) { 688 if(log.isErrorEnabled()) log.error("exception is " + ex2); 689 } 690 } 691 catch(Exception ex) { 692 if(log.isErrorEnabled()) log.error("exception is " + ex); 693 } 694 } 695 } 696 697 698 void doSend(Message msg) throws Exception { 699 IpAddress dst_addr=(IpAddress)msg.getDest(); 700 byte[] buffie=null; 701 702 if(dst_addr == null || dst_addr.getIpAddress() == null) { 703 if(log.isErrorEnabled()) log.error("the destination address is null; aborting send"); 704 return; 705 } 706 707 try { 708 if(msg.getSrc() == null) 710 msg.setSrc(local_addr); 711 712 buffie=Util.objectToByteBuffer(msg); 713 if(buffie.length <= 0) { 714 if(log.isErrorEnabled()) log.error("buffer.length is 0. Will not send message"); 715 return; 716 } 717 718 if(out != null) { 722 out.writeInt(buffie.length); Util.doubleWrite(buffie, out); 724 out.flush(); } 726 } 727 catch(Exception ex) { 728 if(log.isErrorEnabled()) 729 log.error("failure sending to " + dst_addr, ex); 730 remove(dst_addr); 731 throw ex; 732 } 733 } 734 735 736 740 Address readPeerAddress(Socket client_sock) throws Exception { 741 Address client_peer_addr=null; 742 byte[] version, buf, input_cookie=new byte[cookie.length]; 743 int len=0, client_port=client_sock != null? client_sock.getPort() : 0; 744 InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null; 745 746 if(in != null) { 747 initCookie(input_cookie); 748 749 in.read(input_cookie, 0, input_cookie.length); 751 if(!matchCookie(input_cookie)) 752 throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " + 753 client_peer_addr + " does not match own cookie; terminating connection"); 754 version=new byte[Version.version_id.length]; 756 in.read(version, 0, version.length); 757 758 if(Version.compareTo(version) == false) { 759 if(log.isWarnEnabled()) log.warn("packet from " + client_addr + ':' + client_port + 760 " has different version (" + 761 Version.printVersionId(version, Version.version_id.length) + 762 ") from ours (" + Version.printVersionId(Version.version_id) + 763 "). This may cause problems"); 764 } 765 766 len=in.readInt(); 768 769 buf=new byte[len]; 771 in.readFully(buf, 0, len); 772 client_peer_addr=(Address)Util.objectFromByteBuffer(buf); 773 updateLastAccessed(); 774 } 775 return client_peer_addr; 776 } 777 778 779 783 void sendLocalAddress(Address local_addr) { 784 byte[] buf; 785 786 if(local_addr == null) { 787 if(log.isWarnEnabled()) log.warn("local_addr is null"); 788 return; 789 } 790 if(out != null) { 791 try { 792 buf=Util.objectToByteBuffer(local_addr); 793 794 out.write(cookie, 0, cookie.length); 796 797 out.write(Version.version_id, 0, Version.version_id.length); 799 800 out.writeInt(buf.length); 802 803 out.write(buf, 0, buf.length); 805 out.flush(); updateLastAccessed(); 807 } 808 catch(Throwable t) { 809 if(log.isErrorEnabled()) log.error("exception is " + t); 810 } 811 } 812 } 813 814 815 void initCookie(byte[] c) { 816 if(c != null) 817 for(int i=0; i < c.length; i++) 818 c[i]=0; 819 } 820 821 boolean matchCookie(byte[] input) { 822 if(input == null || input.length < cookie.length) return false; 823 if(log.isInfoEnabled()) log.info("input_cookie is " + printCookie(input)); 824 for(int i=0; i < cookie.length; i++) 825 if(cookie[i] != input[i]) return false; 826 return true; 827 } 828 829 830 String printCookie(byte[] c) { 831 if(c == null) return ""; 832 return new String (c); 833 } 834 835 836 public void run() { 837 Message msg; 838 byte[] buf=new byte[256]; 839 int len=0; 840 841 while(receiverThread != null && receiverThread.equals(Thread.currentThread())) { 842 try { 843 if(in == null) { 844 if(log.isErrorEnabled()) log.error("input stream is null !"); 845 break; 846 } 847 len=in.readInt(); 848 if(len > buf.length) 849 buf=new byte[len]; 850 in.readFully(buf, 0, len); 851 updateLastAccessed(); 852 msg=(Message)Util.objectFromByteBuffer(buf); 853 receive(msg); } 855 catch(OutOfMemoryError mem_ex) { 856 if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection"); 857 break; } 859 catch(EOFException eof_ex) { if(log.isInfoEnabled()) log.info("exception is " + eof_ex); 861 notifyConnectionClosed(peer_addr); 862 break; 863 } 864 catch(IOException io_ex) { 865 if(log.isInfoEnabled()) log.info("exception is " + io_ex); 866 notifyConnectionClosed(peer_addr); 867 break; 868 } 869 catch(Throwable e) { 870 if(log.isWarnEnabled()) log.warn("exception is " + e); 871 } 872 } 873 if(log.isTraceEnabled()) 874 log.trace("ConnectionTable.Connection.Receiver terminated"); 875 receiverThread=null; 876 closeSocket(); 877 remove(peer_addr); 878 } 879 880 881 public String toString() { 882 StringBuffer ret=new StringBuffer (); 883 InetAddress local=null, remote=null; 884 String local_str, remote_str; 885 886 if(sock == null) 887 ret.append("<null socket>"); 888 else { 889 Socket tmp_sock=sock; 892 local=tmp_sock.getLocalAddress(); 893 remote=tmp_sock.getInetAddress(); 894 local_str=local != null ? Util.shortName(local) : "<null>"; 895 remote_str=remote != null ? Util.shortName(remote) : "<null>"; 896 ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() + 897 " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" + 898 ((System.currentTimeMillis() - last_access) / 1000) + " secs old)"); 899 tmp_sock=null; 900 } 901 902 return ret.toString(); 903 } 904 905 906 void closeSocket() { 907 if(sock != null) { 908 try { 909 sock.close(); } 911 catch(Exception e) { 912 } 913 sock=null; 914 } 915 if(out != null) { 916 try { 917 out.close(); } 919 catch(Exception e) { 920 } 921 } 924 if(in != null) { 925 try { 926 in.close(); 927 } 928 catch(Exception ex) { 929 } 930 in=null; 931 } 932 } 933 } 934 935 936 class Reaper implements Runnable { 937 Thread t=null; 938 939 Reaper() { 940 ; 941 } 942 943 public void start() { 944 if(conns.size() == 0) 945 return; 946 if(t != null && !t.isAlive()) 947 t=null; 948 if(t == null) { 949 t=new Thread (thread_group, this, "ConnectionTable.ReaperThread"); 951 t.setDaemon(true); t.start(); 953 } 954 } 955 956 public void stop() { 957 if(t != null) 958 t=null; 959 } 960 961 962 public boolean isRunning() { 963 return t != null; 964 } 965 966 public void run() { 967 Connection value; 968 Map.Entry entry; 969 long curr_time; 970 971 if(log.isInfoEnabled()) log.info("connection reaper thread was started. Number of connections=" + 972 conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" + 973 conn_expire_time); 974 975 while(conns.size() > 0 && t != null && t.equals(Thread.currentThread())) { 976 Util.sleep(reaper_interval); 978 synchronized(conns) { 979 curr_time=System.currentTimeMillis(); 980 for(Iterator it=conns.entrySet().iterator(); it.hasNext();) { 981 entry=(Map.Entry )it.next(); 982 value=(Connection)entry.getValue(); 983 if(log.isInfoEnabled()) log.info("connection is " + 984 ((curr_time - value.last_access) / 1000) + " seconds old (curr-time=" + 985 curr_time + ", last_access=" + value.last_access + ')'); 986 if(value.last_access + conn_expire_time < curr_time) { 987 if(log.isInfoEnabled()) log.info("connection " + value + 988 " has been idle for too long (conn_expire_time=" + conn_expire_time + 989 "), will be removed"); 990 value.destroy(); 991 it.remove(); 992 } 993 } 994 } 995 } 996 if(log.isInfoEnabled()) log.info("reaper terminated"); 997 t=null; 998 } 999 } 1000 1001 1002} 1003 1004 1005 | Popular Tags |