| 1 3 package org.jgroups.blocks; 4 5 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.Message; 10 import org.jgroups.Version; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.util.Util; 13 14 import java.io.DataInputStream ; 15 import java.io.DataOutputStream ; 16 import java.io.EOFException ; 17 import java.io.IOException ; 18 import java.net.*; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 import java.util.Map ; 22 import java.util.Vector ; 23 24 25 35 public class ConnectionTable implements Runnable { 36 final HashMap conns=new HashMap (); Receiver receiver=null; 38 ServerSocket srv_sock=null; 39 boolean reuse_addr=false; 40 InetAddress bind_addr=null; 41 42 46 InetAddress external_addr=null; 47 Address local_addr=null; int srv_port=7800; 49 int max_port=0; Thread acceptor=null; static final int backlog=20; int recv_buf_size=120000; 53 int send_buf_size=60000; 54 final Vector conn_listeners=new Vector (); final Object recv_mutex=new Object (); Reaper reaper=null; long reaper_interval=60000; long conn_expire_time=300000; boolean use_reaper=false; int sock_conn_timeout=1000; ThreadGroup thread_group=null; 62 protected final Log log=LogFactory.getLog(getClass()); 63 static int javaVersion=0; 64 final byte[] cookie={'b', 'e', 'l', 'a'}; 65 66 67 static { 68 javaVersion=Util.getJavaVersion(); 69 } 70 71 72 73 public interface Receiver { 74 void receive(Message msg); 75 } 76 77 78 79 80 public interface ConnectionListener { 81 void connectionOpened(Address peer_addr); 82 void connectionClosed(Address peer_addr); 83 } 84 85 86 91 public ConnectionTable(int srv_port) throws Exception { 92 this.srv_port=srv_port; 93 start(); 94 } 95 96 97 105 public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time) throws Exception { 106 this.srv_port=srv_port; 107 this.reaper_interval=reaper_interval; 108 this.conn_expire_time=conn_expire_time; 109 use_reaper=true; 110 start(); 111 } 112 113 114 130 public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception { 131 setReceiver(r); 132 this.bind_addr=bind_addr; 133 this.external_addr=external_addr; 134 this.srv_port=srv_port; 135 this.max_port=max_port; 136 start(); 137 } 138 139 140 161 public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, 162 long reaper_interval, long conn_expire_time) throws Exception { 163 setReceiver(r); 164 this.bind_addr=bind_addr; 165 this.external_addr=external_addr; 166 this.srv_port=srv_port; 167 this.max_port=max_port; 168 this.reaper_interval=reaper_interval; 169 this.conn_expire_time=conn_expire_time; 170 use_reaper=true; 171 start(); 172 } 173 174 175 public void setReceiver(Receiver r) { 176 receiver=r; 177 } 178 179 180 public void addConnectionListener(ConnectionListener l) { 181 if(l != null && !conn_listeners.contains(l)) 182 conn_listeners.addElement(l); 183 } 184 185 186 public void removeConnectionListener(ConnectionListener l) { 187 if(l != null) conn_listeners.removeElement(l); 188 } 189 190 191 public Address getLocalAddress() { 192 if(local_addr == null) 193 local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null; 194 return local_addr; 195 } 196 197 198 public int getSendBufferSize() { 199 return send_buf_size; 200 } 201 202 public void setSendBufferSize(int send_buf_size) { 203 this.send_buf_size=send_buf_size; 204 } 205 206 public int getReceiveBufferSize() { 207 return recv_buf_size; 208 } 209 210 public void setReceiveBufferSize(int recv_buf_size) { 211 this.recv_buf_size=recv_buf_size; 212 } 213 214 public int getSocketConnectionTimeout() { 215 return sock_conn_timeout; 216 } 217 218 public void setSocketConnectionTimeout(int sock_conn_timeout) { 219 this.sock_conn_timeout=sock_conn_timeout; 220 } 221 222 226 public void send(Message msg) throws SocketException { 227 Address dest=msg != null ? msg.getDest() : null; 228 Connection conn; 229 230 if(dest == null) { 231 if(log.isErrorEnabled()) 232 log.error("msg is null or message's destination is null"); 233 return; 234 } 235 236 try { 238 conn=getConnection(dest); 239 if(conn == null) return; 240 } 241 catch(SocketException sock_ex) { 242 throw sock_ex; 244 } 245 catch(Throwable ex) { 246 if(log.isInfoEnabled()) log.info("connection to " + dest + " could not be established: " + ex); 247 throw new SocketException(ex.toString()); 248 } 249 250 try { 252 conn.send(msg); 253 } 254 catch(Throwable ex) { 255 if(log.isTraceEnabled()) 256 log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table"); 257 remove(dest); 258 } 259 } 260 261 262 263 Connection getConnection(Address dest) throws Exception { 264 Connection conn=null; 265 Socket sock; 266 267 synchronized(conns) { 268 conn=(Connection)conns.get(dest); 269 if(conn == null) { 270 272 if(javaVersion >= 14) { 273 SocketAddress tmpBindAddr=new InetSocketAddress(bind_addr, 0); 274 InetAddress tmpDest=((IpAddress)dest).getIpAddress(); 275 SocketAddress destAddr=new InetSocketAddress(tmpDest, ((IpAddress)dest).getPort()); 276 sock=new Socket(); 277 sock.bind(tmpBindAddr); 278 sock.connect(destAddr, sock_conn_timeout); 279 } 280 else { 281 sock=new Socket(((IpAddress)dest).getIpAddress(), ((IpAddress)dest).getPort(), bind_addr, 0); 282 } 283 284 try { 285 sock.setSendBufferSize(send_buf_size); 286 } 287 catch(IllegalArgumentException ex) { 288 if(log.isErrorEnabled()) log.error("exception setting send buffer size to " + 289 send_buf_size + " bytes: " + ex); 290 } 291 try { 292 sock.setReceiveBufferSize(recv_buf_size); 293 } 294 catch(IllegalArgumentException ex) { 295 if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " + 296 send_buf_size + " bytes: " + ex); 297 } 298 conn=new Connection(sock, dest); 299 conn.sendLocalAddress(local_addr); 300 notifyConnectionOpened(dest); 301 addConnection(dest, conn); 303 conn.init(); 304 if(log.isInfoEnabled()) log.info("created socket to " + dest); 305 } 306 return conn; 307 } 308 } 309 310 311 public void start() throws Exception { 312 srv_sock=createServerSocket(srv_port, max_port); 313 314 if (external_addr!=null) 315 local_addr=new IpAddress(external_addr, srv_sock.getLocalPort()); 316 else if (bind_addr != null) 317 local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort()); 318 else 319 local_addr=new IpAddress(srv_sock.getLocalPort()); 320 321 if(log.isInfoEnabled()) log.info("server socket created on " + local_addr); 322 323 thread_group = new ThreadGroup (Thread.currentThread().getThreadGroup(), "ConnectionTableGroup"); 325 acceptor=new Thread (thread_group, this, "ConnectionTable.AcceptorThread"); 327 acceptor.setDaemon(true); 328 acceptor.start(); 329 330 if(use_reaper && reaper == null) { 332 reaper=new Reaper(); 333 reaper.start(); 334 } 335 } 336 337 338 339 public void stop() { 340 Iterator it=null; 341 Connection conn; 342 ServerSocket tmp; 343 344 if(srv_sock != null) { 346 try { 347 tmp=srv_sock; 348 srv_sock=null; 349 tmp.close(); 350 } 351 catch(Exception e) { 352 } 353 } 354 355 356 synchronized(conns) { 358 it=conns.values().iterator(); 359 while(it.hasNext()) { 360 conn=(Connection)it.next(); 361 conn.destroy(); 362 } 363 conns.clear(); 364 } 365 local_addr=null; 366 } 367 368 369 372 public void remove(Address addr) { 373 Connection conn; 374 375 synchronized(conns) { 376 conn=(Connection)conns.remove(addr); 377 } 378 379 if(conn != null) { 380 try { 381 conn.destroy(); } 383 catch(Exception e) { 384 } 385 } 386 if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString()); 387 } 388 389 390 395 public void run() { 396 Socket client_sock; 397 Connection conn=null; 398 Address peer_addr; 399 400 while(srv_sock != null) { 401 try { 402 client_sock=srv_sock.accept(); 403 if(log.isTraceEnabled()) 404 log.trace("accepted connection from " + client_sock.getInetAddress() + ":" + client_sock.getPort()); 405 406 conn=new Connection(client_sock, null); peer_addr=conn.readPeerAddress(client_sock); 410 411 conn.setPeerAddress(peer_addr); 413 414 synchronized(conns) { 415 if(conns.containsKey(peer_addr)) { 416 if(log.isTraceEnabled()) 417 log.trace(peer_addr + " is already there, will reuse connection"); 418 } 421 else { 422 addConnection(peer_addr, conn); 424 notifyConnectionOpened(peer_addr); 425 } 426 } 427 428 conn.init(); } 430 catch(SocketException sock_ex) { 431 if(log.isInfoEnabled()) log.info("exception is " + sock_ex); 432 if(conn != null) 433 conn.destroy(); 434 if(srv_sock == null) 435 break; } 437 catch(Throwable ex) { 438 if(log.isWarnEnabled()) log.warn("exception is " + ex); 439 if(srv_sock == null) 440 break; } 442 } 443 if(log.isTraceEnabled()) 444 log.trace(Thread.currentThread().getName() + " terminated"); 445 } 446 447 448 452 public void receive(Message msg) { 453 if(receiver != null) { 454 synchronized(recv_mutex) { 455 receiver.receive(msg); 456 } 457 } 458 else 459 if(log.isErrorEnabled()) log.error("receiver is null (not set) !"); 460 } 461 462 463 public String toString() { 464 StringBuffer ret=new StringBuffer (); 465 Address key; 466 Connection val; 467 Map.Entry entry; 468 HashMap copy; 469 470 synchronized(conns) { 471 copy=new HashMap (conns); 472 } 473 ret.append("connections (" + copy.size() + "):\n"); 474 for(Iterator it=copy.entrySet().iterator(); it.hasNext();) { 475 entry=(Map.Entry )it.next(); 476 key=(Address)entry.getKey(); 477 val=(Connection)entry.getValue(); 478 ret.append("key: " + key + ": " + val + '\n'); 479 } 480 ret.append('\n'); 481 return ret.toString(); 482 } 483 484 485 487 protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception { 488 ServerSocket ret=null; 489 490 while(true) { 491 try { 492 if(bind_addr == null) 493 ret=new ServerSocket(start_port); 494 else { 495 496 ret=new ServerSocket(start_port, backlog, bind_addr); 497 } 498 } 499 catch(BindException bind_ex) { 500 if (start_port==end_port) throw new BindException("No available port to bind to"); 501 if(bind_addr != null && javaVersion >= 14) { 502 NetworkInterface nic=NetworkInterface.getByInetAddress(bind_addr); 503 if(nic == null) 504 throw new BindException("bind_addr " + bind_addr + " is not a valid interface"); 505 } 506 start_port++; 507 continue; 508 } 509 catch(IOException io_ex) { 510 if(log.isErrorEnabled()) log.error("exception is " + io_ex); 511 } 512 srv_port=start_port; 513 break; 514 } 515 return ret; 516 } 517 518 519 void notifyConnectionOpened(Address peer) { 520 if(peer == null) return; 521 for(int i=0; i < conn_listeners.size(); i++) 522 ((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer); 523 } 524 525 void notifyConnectionClosed(Address peer) { 526 if(peer == null) return; 527 for(int i=0; i < conn_listeners.size(); i++) 528 ((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer); 529 } 530 531 532 void addConnection(Address peer, Connection c) { 533 conns.put(peer, c); 534 if(reaper != null && !reaper.isRunning()) 535 reaper.start(); 536 } 537 538 539 540 541 class Connection implements Runnable { 542 Socket sock=null; String sock_addr=null; DataOutputStream out=null; DataInputStream in=null; Thread receiverThread=null; Address peer_addr=null; final Object send_mutex=new Object (); long last_access=System.currentTimeMillis(); LinkedQueue send_queue=new LinkedQueue(); 551 Sender sender=new Sender(); 552 final long POLL_TIMEOUT=30000; 553 554 555 String getSockAddress() { 556 if(sock_addr != null) 557 return sock_addr; 558 if(sock != null) { 559 StringBuffer sb=new StringBuffer (); 560 sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort()); 561 sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort()); 562 sock_addr=sb.toString(); 563 } 564 return sock_addr; 565 } 566 567 class Sender implements Runnable { 568 Thread senderThread; 569 private boolean running=false; 570 571 void start() { 572 if(senderThread == null || !senderThread.isAlive()) { 573 senderThread=new Thread (thread_group, this, "ConnectionTable.Connection.Sender [" + getSockAddress() + "]"); 574 senderThread.setDaemon(true); 575 senderThread.start(); 576 running=true; 577 if(log.isTraceEnabled()) 578 log.trace("ConnectionTable.Connection.Sender thread started"); 579 } 580 } 581 582 void stop() { 583 if(senderThread != null) { 584 senderThread.interrupt(); 585 senderThread=null; 586 running=false; 587 } 588 } 589 590 boolean isRunning() { 591 return running && senderThread != null; 592 } 593 594 public void run() { 595 Message msg; 596 while(senderThread != null && senderThread.equals(Thread.currentThread())) { 597 try { 598 msg=(Message)send_queue.poll(POLL_TIMEOUT); 599 if(msg == null) 600 break; 601 _send(msg); 602 } 603 catch(InterruptedException e) { 604 break; 605 } 606 } 607 running=false; 608 if(log.isTraceEnabled()) 609 log.trace("ConnectionTable.Connection.Sender thread terminated"); 610 } 611 } 612 613 614 Connection(Socket s, Address peer_addr) { 615 sock=s; 616 this.peer_addr=peer_addr; 617 try { 618 out=new DataOutputStream (sock.getOutputStream()); 619 in=new DataInputStream (sock.getInputStream()); 620 } 621 catch(Exception ex) { 622 if(log.isErrorEnabled()) log.error("exception is " + ex); 623 } 624 } 625 626 627 boolean established() { 628 return receiverThread != null; 629 } 630 631 632 void setPeerAddress(Address peer_addr) { 633 this.peer_addr=peer_addr; 634 } 635 636 void updateLastAccessed() { 637 last_access=System.currentTimeMillis(); 638 } 639 640 void init() { 641 if(receiverThread == null || !receiverThread.isAlive()) { 643 receiverThread=new Thread (thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]"); 645 receiverThread.setDaemon(true); 646 receiverThread.start(); 647 if(log.isTraceEnabled()) 648 log.trace("ConnectionTable.Connection.Receiver started"); 649 } 650 } 651 652 653 void destroy() { 654 closeSocket(); sender.stop(); 656 receiverThread=null; 657 } 658 659 660 void send(Message msg) { 661 try { 662 send_queue.put(msg); 663 if(!sender.isRunning()) 664 sender.start(); 665 } 666 catch(InterruptedException e) { 667 log.error("failed adding message to send_queue", e); 668 } 669 } 670 671 private void _send(Message msg) { 672 synchronized(send_mutex) { 673 try { 674 doSend(msg); 675 updateLastAccessed(); 676 } 677 catch(IOException io_ex) { 678 if(log.isWarnEnabled()) 679 log.warn("peer closed connection, trying to re-establish connection and re-send msg"); 680 try { 681 doSend(msg); 682 updateLastAccessed(); 683 } 684 catch(IOException io_ex2) { 685 if(log.isErrorEnabled()) log.error("2nd attempt to send data failed too"); 686 } 687 catch(Exception ex2) { 688 if(log.isErrorEnabled()) log.error("exception is " + ex2); 689 } 690 } 691 catch(Exception ex) { 692 if(log.isErrorEnabled()) log.error("exception is " + ex); 693 } 694 } 695 } 696 697 698 void doSend(Message msg) throws Exception { 699 IpAddress dst_addr=(IpAddress)msg.getDest(); 700 byte[] buffie=null; 701 702 if(dst_addr == null || dst_addr.getIpAddress() == null) { 703 if(log.isErrorEnabled()) log.error("the destination address is null; aborting send"); 704 return; 705 } 706 707 try { 708 if(msg.getSrc() == null) 710 msg.setSrc(local_addr); 711 712 buffie=Util.objectToByteBuffer(msg); 713 if(buffie.length <= 0) { 714 if(log.isErrorEnabled()) log.error("buffer.length is 0. Will not send message"); 715 return; 716 } 717 718 if(out != null) { 722 out.writeInt(buffie.length); Util.doubleWrite(buffie, out); 724 out.flush(); } 726 } 727 catch(Exception ex) { 728 if(log.isErrorEnabled()) 729 log.error("failure sending to " + dst_addr, ex); 730 remove(dst_addr); 731 throw ex; 732 } 733 } 734 735 736 740 Address readPeerAddress(Socket client_sock) throws Exception { 741 Address client_peer_addr=null; 742 byte[] version, buf, input_cookie=new byte[cookie.length]; 743 int len=0, client_port=client_sock != null? client_sock.getPort() : 0; 744 InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null; 745 746 if(in != null) { 747 initCookie(input_cookie); 748 749 in.read(input_cookie, 0, input_cookie.length); 751 if(!matchCookie(input_cookie)) 752 throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " + 753 client_peer_addr + " does not match own cookie; terminating connection"); 754 version=new byte[Version.version_id.length]; 756 in.read(version, 0, version.length); 757 758 if(Version.compareTo(version) == false) { 759 if(log.isWarnEnabled()) log.warn("packet from " + client_addr + ':' + client_port + 760 " has different version (" + 761 Version.printVersionId(version, Version.version_id.length) + 762 ") from ours (" + Version.printVersionId(Version.version_id) + 763 "). This may cause problems"); 764 } 765 766 len=in.readInt(); 768 769 buf=new byte[len]; 771 in.readFully(buf, 0, len); 772 client_peer_addr=(Address)Util.objectFromByteBuffer(buf); 773 updateLastAccessed(); 774 }
|