1 package org.jgroups.blocks; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.Address; 6 import org.jgroups.Global; 7 import org.jgroups.Version; 8 import org.jgroups.stack.IpAddress; 9 import org.jgroups.util.Util; 10 11 import java.io.*; 12 import java.net.InetAddress ; 13 import java.net.ServerSocket ; 14 import java.net.Socket ; 15 import java.net.SocketException ; 16 import java.util.*; 17 import java.util.concurrent.BlockingQueue ; 18 import java.util.concurrent.LinkedBlockingQueue ; 19 import java.util.concurrent.locks.Lock ; 20 import java.util.concurrent.locks.ReentrantLock ; 21 22 26 public abstract class BasicConnectionTable { 27 final Map<Address,Connection> conns=new HashMap<Address,Connection>(); Receiver receiver=null; 29 boolean use_send_queues=false; int send_queue_size=10000; 31 InetAddress bind_addr=null; 32 Address local_addr=null; int srv_port=7800; 34 int recv_buf_size=120000; 35 int send_buf_size=60000; 36 final Vector<ConnectionListener> conn_listeners=new Vector<ConnectionListener>(); Reaper reaper=null; long reaper_interval=60000; long conn_expire_time=300000; int sock_conn_timeout=1000; ThreadGroup thread_group=null; 42 protected final Log log= LogFactory.getLog(getClass()); 43 final byte[] cookie={'b', 'e', 'l', 'a'}; 44 boolean use_reaper=false; static final int backlog=20; volatile ServerSocket srv_sock=null; 47 boolean tcp_nodelay=false; 48 int linger=-1; 49 50 54 InetAddress external_addr=null; 55 int max_port=0; Thread acceptor=null; boolean running=false; 58 59 final static long MAX_JOIN_TIMEOUT=Global.THREAD_SHUTDOWN_WAIT_TIME; 60 61 62 public final void setReceiver(Receiver r) { 63 receiver=r; 64 } 65 66 public void addConnectionListener(ConnectionListener l) { 67 if(l != null && !conn_listeners.contains(l)) 68 conn_listeners.addElement(l); 69 } 70 71 public void removeConnectionListener(ConnectionListener l) { 72 if(l != null) conn_listeners.removeElement(l); 73 } 74 75 public Address getLocalAddress() { 76 if(local_addr == null) 77 local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null; 78 return local_addr; 79 } 80 81 public int getSendBufferSize() { 82 return send_buf_size; 83 } 84 85 public void setSendBufferSize(int send_buf_size) { 86 this.send_buf_size=send_buf_size; 87 } 88 89 public int getReceiveBufferSize() { 90 return recv_buf_size; 91 } 92 93 public void setReceiveBufferSize(int recv_buf_size) { 94 this.recv_buf_size=recv_buf_size; 95 } 96 97 public int getSocketConnectionTimeout() { 98 return sock_conn_timeout; 99 } 100 101 public void setSocketConnectionTimeout(int sock_conn_timeout) { 102 this.sock_conn_timeout=sock_conn_timeout; 103 } 104 105 public int getNumConnections() { 106 return conns.size(); 107 } 108 109 public boolean getTcpNodelay() { 110 return tcp_nodelay; 111 } 112 113 public void setTcpNodelay(boolean tcp_nodelay) { 114 this.tcp_nodelay=tcp_nodelay; 115 } 116 117 public int getLinger() { 118 return linger; 119 } 120 121 public void setLinger(int linger) { 122 this.linger=linger; 123 } 124 125 public boolean getUseSendQueues() {return use_send_queues;} 126 127 public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;} 128 129 public int getSendQueueSize() { 130 return send_queue_size; 131 } 132 133 public void setSendQueueSize(int send_queue_size) { 134 this.send_queue_size=send_queue_size; 135 } 136 137 public void start() throws Exception { 138 running=true; 139 } 140 141 public void stop() { 142 running=false; 143 } 144 145 148 public void remove(Address addr) { 149 Connection conn; 150 151 synchronized(conns) { 152 conn=conns.remove(addr); 153 } 154 155 if(conn != null) { 156 try { 157 conn.destroy(); } 159 catch(Exception e) { 160 } 161 } 162 if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString()); 163 } 164 165 169 public void receive(Address sender, byte[] data, int offset, int length) { 170 if(receiver != null) { 171 receiver.receive(sender, data, offset, length); 172 } 173 else 174 if(log.isErrorEnabled()) log.error("receiver is null (not set) !"); 175 } 176 177 public String toString() { 178 StringBuilder ret=new StringBuilder (); 179 Address key; 180 Connection val; 181 Map.Entry entry; 182 HashMap<Address,Connection> copy; 183 184 synchronized(conns) { 185 copy=new HashMap<Address,Connection>(conns); 186 } 187 ret.append("local_addr=" + local_addr).append("\n"); 188 ret.append("connections (" + copy.size() + "):\n"); 189 for(Iterator it=copy.entrySet().iterator(); it.hasNext();) { 190 entry=(Map.Entry)it.next(); 191 key=(Address)entry.getKey(); 192 val=(Connection)entry.getValue(); 193 ret.append("key: " + key + ": " + val + '\n'); 194 } 195 ret.append('\n'); 196 return ret.toString(); 197 } 198 199 void notifyConnectionOpened(Address peer) { 200 if(peer == null) return; 201 for(int i=0; i < conn_listeners.size(); i++) 202 conn_listeners.elementAt(i).connectionOpened(peer); 203 } 204 205 void notifyConnectionClosed(Address peer) { 206 if(peer == null) return; 207 for(int i=0; i < conn_listeners.size(); i++) 208 conn_listeners.elementAt(i).connectionClosed(peer); 209 } 210 211 void addConnection(Address peer, Connection c) { 212 conns.put(peer, c); 213 if(reaper != null && !reaper.isRunning()) 214 reaper.start(); 215 } 216 217 public void send(Address dest, byte[] data, int offset, int length) throws Exception { 218 Connection conn; 219 if(dest == null) { 220 if(log.isErrorEnabled()) 221 log.error("destination is null"); 222 return; 223 } 224 225 if(data == null) { 226 log.warn("data is null; discarding packet"); 227 return; 228 } 229 230 if(!running) { 231 if(log.isWarnEnabled()) 232 log.warn("connection table is not running, discarding message to " + dest); 233 return; 234 } 235 236 if(dest.equals(local_addr)) { 237 receive(local_addr, data, offset, length); 238 return; 239 } 240 241 try { 243 conn=getConnection(dest); 244 if(conn == null) return; 245 } 246 catch(Throwable ex) { 247 throw new Exception ("connection to " + dest + " could not be established", ex); 248 } 249 250 try { 252 conn.send(data, offset, length); 253 } 254 catch(Throwable ex) { 255 if(log.isTraceEnabled()) 256 log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table", ex); 257 remove(dest); 258 } 259 } 260 261 abstract Connection getConnection(Address dest) throws Exception ; 262 263 267 271 272 276 public void retainAll(Collection current_mbrs) { 277 if(current_mbrs == null) return; 278 HashMap<Address,Connection> copy; 279 synchronized(conns) { 280 copy=new HashMap<Address,Connection>(conns); 281 conns.keySet().retainAll(current_mbrs); 282 } 283 284 Map.Entry entry; 287 for(Iterator it=copy.entrySet().iterator(); it.hasNext();) { 288 entry=(Map.Entry)it.next(); 289 Object oKey=entry.getKey(); 290 if(!current_mbrs.contains(oKey)) { Connection conn=(Connection)entry.getValue(); 292 if(null != conn) { if(log.isTraceEnabled()) 294 log.trace("Destroy this orphaned connection: " + conn); 295 conn.destroy(); 296 } 297 } 298 } 299 copy.clear(); 300 } 301 302 303 304 305 public interface Receiver { 306 void receive(Address sender, byte[] data, int offset, int length); 307 } 308 309 310 public interface ConnectionListener { 311 void connectionOpened(Address peer_addr); 312 void connectionClosed(Address peer_addr); 313 } 314 315 class Connection implements Runnable { 316 Socket sock=null; String sock_addr=null; DataOutputStream out=null; DataInputStream in=null; Thread receiverThread=null; Address peer_addr=null; final Lock send_lock=new ReentrantLock (); long last_access=System.currentTimeMillis(); 325 326 BlockingQueue <byte[]> send_queue=null; 327 Sender sender=null; 328 boolean is_running=false; 329 330 331 private String getSockAddress() { 332 if(sock_addr != null) 333 return sock_addr; 334 if(sock != null) { 335 StringBuffer sb; 336 sb=new StringBuffer (); 337 sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort()); 338 sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort()); 339 sock_addr=sb.toString(); 340 } 341 return sock_addr; 342 } 343 344 345 346 347 Connection(Socket s, Address peer_addr) { 348 sock=s; 349 this.peer_addr=peer_addr; 350 351 if(use_send_queues) { 352 send_queue=new LinkedBlockingQueue <byte[]>(send_queue_size); 353 sender=new Sender(); 354 } 355 356 try { 357 360 out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())); 363 in=new DataInputStream(new BufferedInputStream(sock.getInputStream())); 364 if(sender != null) 365 sender.start(); 366 } 367 catch(Exception ex) { 368 if(log.isErrorEnabled()) log.error("exception is " + ex); 369 } 370 } 371 372 373 boolean established() { 374 return receiverThread != null; 375 } 376 377 378 void setPeerAddress(Address peer_addr) { 379 this.peer_addr=peer_addr; 380 } 381 382 Address getPeerAddress() {return peer_addr;} 383 384 void updateLastAccessed() { 385 last_access=System.currentTimeMillis(); 386 } 387 388 void init() { 389 is_running=true; 390 if(receiverThread == null || !receiverThread.isAlive()) { 391 receiverThread=new Thread (thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]"); 393 receiverThread.setDaemon(true); 394 receiverThread.start(); 395 if(log.isTraceEnabled()) 396 log.trace("receiver started: " + receiverThread); 397 } 398 399 } 400 401 402 void destroy() { 403 is_running=false; 404 closeSocket(); if(sender != null) 406 sender.stop(); 407 Thread tmp=receiverThread; 408 receiverThread=null; 409 if(tmp != null) { 410 Util.interruptAndWaitToDie(tmp); 411 } 412 } 413 414 415 421 void send(byte[] data, int offset, int length) { 422 if(!is_running) { 423 if(log.isWarnEnabled()) 424 log.warn("Connection is not running, discarding message"); 425 return; 426 } 427 if(use_send_queues) { 428 try { 429 byte[] tmp=new byte[length]; 431 System.arraycopy(data, offset, tmp, 0, length); 432 send_queue.put(tmp); 433 } 434 catch(InterruptedException e) { 435 Thread.currentThread().interrupt(); 436 } 437 } 438 else 439 _send(data, offset, length, true); 440 } 441 442 443 450 private void _send(byte[] data, int offset, int length, boolean acquire_lock) { 451 if(acquire_lock) 452 send_lock.lock(); 453 454 try { 455 doSend(data, offset, length); 456 updateLastAccessed(); 457 } 458 catch(InterruptedException iex) { 459 Thread.currentThread().interrupt(); } 461 catch(Throwable ex) { 462 if(log.isErrorEnabled()) log.error("exception is " + ex); 463 } 464 finally { 465 if(acquire_lock) 466 send_lock.unlock(); 467 } 468 } 469 470 471 void doSend(byte[] data, int offset, int length) throws Exception { 472 try { 473 if(out != null) { 477 out.writeInt(length); Util.doubleWrite(data, offset, length, out); 479 out.flush(); } 481 } 482 catch(Exception ex) { 483 remove(peer_addr); 484 throw ex; 485 } 486 } 487 488 489 493 Address readPeerAddress(Socket client_sock) throws Exception { 494 Address client_peer_addr=null; 495 byte[] input_cookie=new byte[cookie.length]; 496 int client_port=client_sock != null? client_sock.getPort() : 0; 497 short version; 498 InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null; 499 500 if(in != null) { 501 initCookie(input_cookie); 502 503 in.read(input_cookie, 0, input_cookie.length); 505 if(!matchCookie(input_cookie)) 506 throw new SocketException ("ConnectionTable.Connection.readPeerAddress(): cookie sent by " + 507 client_peer_addr + " does not match own cookie; terminating connection"); 508 version=in.readShort(); 510 511 if(Version.isBinaryCompatible(version) == false) { 512 if(log.isWarnEnabled()) 513 log.warn(new StringBuffer ("packet from ").append(client_addr).append(':').append(client_port). 514 append(" has different version (").append(Version.print(version)).append(") from ours ("). 515 append(Version.printVersion()).append("). This may cause problems")); 516 } 517 client_peer_addr=new IpAddress(); 518 client_peer_addr.readFrom(in); 519 520 updateLastAccessed(); 521 } 522 return client_peer_addr; 523 } 524 525 526 530 void sendLocalAddress(Address local_addr) { 531 if(local_addr == null) { 532 if(log.isWarnEnabled()) log.warn("local_addr is null"); 533 return; 534 } 535 if(out != null) { 536 try { 537 out.write(cookie, 0, cookie.length); 539 540 out.writeShort(Version.version); 542 local_addr.writeTo(out); 543 out.flush(); updateLastAccessed(); 545 } 546 catch(Throwable t) { 547 if(log.isErrorEnabled()) log.error("exception is " + t); 548 } 549 } 550 } 551 552 553 void initCookie(byte[] c) { 554 if(c != null) 555 for(int i=0; i < c.length; i++) 556 c[i]=0; 557 } 558 559 boolean matchCookie(byte[] input) { 560 if(input == null || input.length < cookie.length) return false; 561 for(int i=0; i < cookie.length; i++) 562 if(cookie[i] != input[i]) return false; 563 return true; 564 } 565 566 567 String printCookie(byte[] c) { 568 if(c == null) return ""; 569 return new String (c); 570 } 571 572 573 public void run() { 574 byte[] buf=new byte[256]; int len=0; 576 577 while(receiverThread != null && receiverThread.equals(Thread.currentThread()) && is_running) { 578 try { 579 if(in == null) { 580 if(log.isErrorEnabled()) log.error("input stream is null !"); 581 break; 582 } 583 len=in.readInt(); 584 if(len > buf.length) 585 buf=new byte[len]; 586 in.readFully(buf, 0, len); 587 updateLastAccessed(); 588 receive(peer_addr, buf, 0, len); } 590 catch(OutOfMemoryError mem_ex) { 591 if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection"); 592 break; } 594 catch(EOFException eof_ex) { if(log.isTraceEnabled()) log.trace("exception is " + eof_ex); 596 notifyConnectionClosed(peer_addr); 597 break; 598 } 599 catch(IOException io_ex) { 600 if(log.isTraceEnabled()) log.trace("exception is " + io_ex); 601 notifyConnectionClosed(peer_addr); 602 break; 603 } 604 catch(Throwable e) { 605 if(log.isWarnEnabled()) log.warn("exception is " + e); 606 } 607 } 608 if(log.isTraceEnabled()) 609 log.trace("ConnectionTable.Connection.Receiver terminated"); 610 receiverThread=null; 611 closeSocket(); 612 } 614 615 616 public String toString() { 617 StringBuilder ret=new StringBuilder (); 618 InetAddress local=null, remote=null; 619 String local_str, remote_str; 620 621 if(sock == null) 622 ret.append("<null socket>"); 623 else { 624 Socket tmp_sock=sock; 627 local=tmp_sock.getLocalAddress(); 628 remote=tmp_sock.getInetAddress(); 629 local_str=local != null ? Util.shortName(local) : "<null>"; 630 remote_str=remote != null ? Util.shortName(remote) : "<null>"; 631 ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() + 632 " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" + 633 ((System.currentTimeMillis() - last_access) / 1000) + " secs old)"); 634 tmp_sock=null; 635 } 636 637 return ret.toString(); 638 } 639 640 641 void closeSocket() { 642 Util.close(sock); sock=null; 644 Util.close(out); Util.close(in); 648 } 649 650 651 class Sender implements Runnable { 652 Thread senderThread; 653 private boolean is_it_running=false; 654 655 void start() { 656 if(senderThread == null || !senderThread.isAlive()) { 657 senderThread=new Thread (thread_group, this, "ConnectionTable.Connection.Sender local_addr=" + local_addr + " [" + getSockAddress() + "]"); 658 senderThread.setDaemon(true); 659 is_it_running=true; 660 senderThread.start(); 661 if(log.isTraceEnabled()) 662 log.trace("sender thread started: " + senderThread); 663 } 664 } 665 666 void stop() { 667 is_it_running=false; 668 if(send_queue != null) 669 send_queue.clear(); 670 if(senderThread != null) { 671 Thread tmp=senderThread; 672 senderThread=null; 673 Util.interruptAndWaitToDie(tmp); 674 } 675 } 676 677 boolean isRunning() { 678 return is_it_running && senderThread != null; 679 } 680 681 public void run() { 682 byte[] data; 683 while(senderThread != null && senderThread.equals(Thread.currentThread()) && is_it_running) { 684 try { 685 data=send_queue.take(); 686 if(data == null) 687 continue; 688 _send(data, 0, data.length, false); 690 } 691 catch(InterruptedException e) { 692 ; 693 } 694 } 695 is_it_running=false; 696 if(log.isTraceEnabled()) 697 log.trace("ConnectionTable.Connection.Sender thread terminated"); 698 } 699 } 700 701 702 } 703 704 class Reaper implements Runnable { 705 Thread t=null; 706 707 Reaper() { 708 ; 709 } 710 711 private boolean haveZeroConnections() { 713 synchronized(conns) { 714 return conns.isEmpty(); 715 } 716 } 717 718 public void start() { 719 720 if(haveZeroConnections()) 721 return; 722 if(t != null && !t.isAlive()) 723 t=null; 724 if(t == null) { 725 t=new Thread (thread_group, this, "ConnectionTable.ReaperThread"); 727 t.setDaemon(true); t.start(); 729 } 730 } 731 732 public void stop() { 733 Thread tmp=t; 734 if(t != null) 735 t=null; 736 if(tmp != null) { 737 Util.interruptAndWaitToDie(tmp); 738 } 739 } 740 741 742 public boolean isRunning() { 743 return t != null; 744 } 745 746 public void run() { 747 Connection value; 748 Map.Entry entry; 749 long curr_time; 750 751 if(log.isDebugEnabled()) log.debug("connection reaper thread was started. Number of connections=" + 752 conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" + 753 conn_expire_time); 754 755 while(!haveZeroConnections() && t != null && t.equals(Thread.currentThread())) { 756 Util.sleep(reaper_interval); 757 if(t == null || !Thread.currentThread().equals(t)) 758 break; 759 synchronized(conns) { 760 curr_time=System.currentTimeMillis(); 761 for(Iterator it=conns.entrySet().iterator(); it.hasNext();) { 762 entry=(Map.Entry)it.next(); 763 value=(Connection)entry.getValue(); 764 if(log.isTraceEnabled()) log.trace("connection is " + 765 ((curr_time - value.last_access) / 1000) + " seconds old (curr-time=" + 766 curr_time + ", last_access=" + value.last_access + ')'); 767 if(value.last_access + conn_expire_time < curr_time) { 768 if(log.isTraceEnabled()) log.trace("connection " + value + 769 " has been idle for too long (conn_expire_time=" + conn_expire_time + 770 "), will be removed"); 771 value.destroy(); 772 it.remove(); 773 } 774 } 775 } 776 } 777 if(log.isDebugEnabled()) log.debug("reaper terminated"); 778 t=null; 779 } 780 } 781 } 782 | Popular Tags |