1 3 package org.jgroups.blocks; 4 5 import org.jgroups.util.TimedWriter; 6 import org.jgroups.util.Util; 7 8 import java.io.*; 9 import java.net.InetAddress ; 10 import java.net.ServerSocket ; 11 import java.net.Socket ; 12 13 14 15 16 17 31 public class Link implements Runnable { 32 String local_addr=null, remote_addr=null; 33 InetAddress local=null, remote=null; 34 int local_port=0, remote_port=0; 35 ServerSocket srv_sock=null; 36 Socket outgoing=null; Socket incoming=null; DataOutputStream outstream=null; 39 DataInputStream instream=null; 40 boolean established=false; boolean stop=false; 42 boolean trace=false; 43 Thread receiver_thread=null; 44 final long receiver_thread_join_timeout=2000; 45 Receiver receiver=null; 46 static final int HB_PACKET=-99; 47 Heartbeat hb=null; 48 long timeout=10000; long hb_interval=3000; final Object outgoing_mutex=new Object (); TimedWriter writer=null; 52 53 54 55 public interface Receiver { 56 void receive(byte[] msg); 57 void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port); 58 void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port); 59 void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs); 60 void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port); 61 } 62 63 64 65 public Link(String local_addr, int local_port, String remote_addr, int remote_port) { 66 this.local_addr=local_addr; this.local_port=local_port; 67 this.remote_addr=remote_addr; this.remote_port=remote_port; 68 hb=new Heartbeat(timeout, hb_interval); 69 } 70 71 72 public Link(String local_addr, int local_port, String remote_addr, int remote_port, Receiver r) { 73 this(local_addr, local_port, remote_addr, remote_port); 74 setReceiver(r); 75 } 76 77 78 79 public Link(String local_addr, int local_port, String remote_addr, int remote_port, 80 long timeout, long hb_interval, Receiver r) { 81 this.local_addr=local_addr; this.local_port=local_port; 82 this.remote_addr=remote_addr; this.remote_port=remote_port; 83 this.timeout=timeout; this.hb_interval=hb_interval; 84 hb=new Heartbeat(timeout, hb_interval); 85 setReceiver(r); 86 } 87 88 89 public void setTrace(boolean t) {trace=t;} 90 public void setReceiver(Receiver r) {receiver=r;} 91 public boolean established() {return established;} 92 public InetAddress getLocalAddress() {return local;} 93 public InetAddress getRemoteAddress() {return remote;} 94 public int getLocalPort() {return local_port;} 95 public int getRemotePort() {return remote_port;} 96 97 98 99 100 101 public void start() throws Exception { 102 local=InetAddress.getByName(local_addr); 103 remote=InetAddress.getByName(remote_addr); 104 srv_sock=new ServerSocket (local_port, 1, local); 105 createOutgoingConnection(hb_interval); startReceiverThread(); hb.start(); } 109 110 111 112 public void stop() { 113 stopReceiverThread(); 114 hb.stop(); 115 try {srv_sock.close();} catch(Exception e) {} 116 established=false; 117 } 118 119 120 121 122 123 124 public boolean send(byte[] buf) { 125 if(buf == null || buf.length == 0) { 126 if(trace) System.err.println("Link.send(): buffer is null or does not contain any data !"); 127 return false; 128 } 129 if(!established) { if(trace) System.err.println("Link.send(): connection not established, discarding message"); 131 return false; 132 } 133 134 try { 135 outstream.writeInt(buf.length); outstream.write(buf); return true; 138 } 139 catch(Exception ex) { if(trace) System.err.println("Link.send1(): sending failed; retrying"); 141 return retry(buf); 142 } 143 } 144 145 146 boolean retry(byte[] buf) { 147 closeOutgoingConnection(); if(!createOutgoingConnection()) { closeOutgoingConnection(); return false; 151 } 152 else { 153 try { 154 outstream.writeInt(buf.length); 155 outstream.write(buf); 156 return true; 157 } 158 catch(Exception e) { 159 if(trace) System.out.println("Link.send2(): failed, closing connection"); 160 closeOutgoingConnection(); 161 return false; 162 } 163 } 164 } 165 166 167 168 169 173 public void run() { 174 int num_bytes; 175 byte[] buf; 176 InetAddress peer=null; 177 int peer_port=0; 178 179 while(!stop) { 180 try { 181 if(trace) System.out.println("-- WAITING for ACCEPT"); 182 incoming=srv_sock.accept(); 183 instream=new DataInputStream(incoming.getInputStream()); 184 peer=incoming.getInetAddress(); 185 peer_port=incoming.getPort(); 186 187 188 if(trace) System.out.println("-- ACCEPT: incoming is " + printSocket(incoming)); 189 190 191 192 if(remote.equals(incoming.getInetAddress())) { 193 if(trace) 194 System.out.println("Link.run(): accepted connection from " + peer + ':' + peer_port); 195 } 196 else { 197 if(trace) 198 System.err.println("Link.run(): rejected connection request from " + 199 peer + ':' + peer_port + ". Address not specified as peer in link !"); 200 closeIncomingConnection(); continue; 202 } 203 204 if(!established) { 206 createOutgoingConnection(); 207 } 208 209 while(!stop) { 210 try { 211 num_bytes=instream.readInt(); 212 if(num_bytes == HB_PACKET) { 213 hb.receivedHeartbeat(); 214 continue; 215 } 216 217 buf=new byte[num_bytes]; 218 instream.readFully(buf, 0, buf.length); 219 hb.receivedMessage(); if(receiver != null) 221 receiver.receive(buf); 222 } 223 catch(Exception ex) { closeIncomingConnection(); break; 226 } 227 } 228 } 229 catch(IOException io_ex) { 230 receiver_thread=null; 231 break; 232 } 233 catch(Exception e) { 234 } 235 } 236 } 237 238 239 240 public String toString() { 241 StringBuffer ret=new StringBuffer (); 242 ret.append("Link <" + local_addr + ':' + local_port + " --> " + 243 remote_addr + ':' + remote_port + '>'); 244 ret.append(established? " (established)" : " (not established)"); 245 return ret.toString(); 246 } 247 248 249 public boolean equals(Object other) { 250 Link o; 251 252 if(other == null) 253 return false; 254 if(!(other instanceof Link)) 255 return false; 256 o=(Link)other; 257 if(local_addr.equals(o.local_addr) && remote_addr.equals(o.remote_addr) && 258 local_port == o.local_port && remote_port == o.remote_port) 259 return true; 260 else 261 return false; 262 } 263 264 265 public int hashCode() { 266 return local_addr.hashCode() + remote_addr.hashCode() + local_port + remote_port; 267 } 268 269 270 void startReceiverThread() { 271 stopReceiverThread(); 272 receiver_thread=new Thread (this, "Link.ReceiverThreadThread"); 273 receiver_thread.setDaemon(true); 274 receiver_thread.start(); 275 } 276 277 void stopReceiverThread() { 278 if(receiver_thread != null && receiver_thread.isAlive()) { 279 stop=true; 280 closeIncomingConnection(); 281 try {receiver_thread.join(receiver_thread_join_timeout);} catch(Exception e) {} 282 stop=false; 283 } 284 receiver_thread=null; 285 } 286 287 288 289 290 293 boolean createOutgoingConnection() { 294 synchronized(outgoing_mutex) { if(established) { 296 return true; 297 } 298 try { 299 outgoing=new Socket (remote, remote_port, local, 0); outgoing.setSoLinger(true, 1); outstream=new DataOutputStream(outgoing.getOutputStream()); 303 if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port); 304 established=true; 305 306 if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing)); 307 308 return true; 309 } 310 catch(Exception e) { 311 established=false; 312 return false; 313 } 314 } 315 } 316 317 318 319 320 325 boolean createOutgoingConnection(long timeout) { 326 synchronized(outgoing_mutex) { if(established) { 328 return true; 329 } 330 try { 331 if(writer == null) writer=new TimedWriter(); 332 333 outgoing=writer.createSocket(local, remote, remote_port, timeout); 336 outgoing.setSoLinger(true, 1); outstream=new DataOutputStream(outgoing.getOutputStream()); 338 if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port); 339 established=true; 340 if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing)); 341 return true; 342 } 343 catch(Exception e) { 344 established=false; 345 return false; 346 } 347 } 348 } 349 350 351 352 353 354 void closeOutgoingConnection() { 355 synchronized(outgoing_mutex) { 356 if(!established) { 357 return; 358 } 359 if(outstream != null) { 360 361 if(trace) System.out.println("-- CLOSE: outgoing is " + printSocket(outgoing)); 362 363 try { 364 outstream.close(); } 366 catch(Exception e) {} 367 outstream=null; 368 } 369 if(outgoing != null) { 370 try { 371 outgoing.close(); 372 } 373 catch(Exception e) {} 374 outgoing=null; 375 } 376 established=false; 377 if(receiver != null) receiver.linkDown(local, local_port, remote, remote_port); 378 } 379 } 380 381 382 385 synchronized void closeIncomingConnection() { 386 if(instream != null) { 387 388 if(trace) System.out.println("-- CLOSE: incoming is " + printSocket(incoming)); 389 390 try {instream.close();} catch(Exception e) {} 391 instream=null; 392 } 393 if(incoming != null) { 394 try {incoming.close();} catch(Exception e) {} 395 incoming=null; 396 } 397 } 398 399 400 401 402 synchronized void closeConnections() { 403 404 closeOutgoingConnection(); 407 408 409 closeIncomingConnection(); 413 } 414 415 416 417 418 String printSocket(Socket s) { 419 if(s == null) return "<null>"; 420 StringBuffer ret=new StringBuffer (); 421 ret.append(s.getLocalAddress().getHostName()); 422 ret.append(':'); 423 ret.append(s.getLocalPort()); 424 ret.append(" --> "); 425 ret.append(s.getInetAddress().getHostName()); 426 ret.append(':'); 427 ret.append(s.getPort()); 428 return ret.toString(); 429 } 430 431 432 433 434 435 436 437 438 439 440 447 class Heartbeat implements Runnable { 448 Thread thread=null; 449 long timeout=10000; long hb_interval=3000; boolean stop_hb=false; 452 long last_hb=System.currentTimeMillis(); 453 boolean missed_hb=false; 454 final TimedWriter writer=new TimedWriter(); 455 456 457 458 public Heartbeat(long timeout, long hb_interval) { 459 this.timeout=timeout; 460 this.hb_interval=hb_interval; 461 } 462 463 464 public synchronized void start() { 465 stop(); 466 stop_hb=false; 467 missed_hb=false; 468 last_hb=System.currentTimeMillis(); 469 thread=new Thread (this, "HeartbeatThread"); 470 thread.setDaemon(true); 471 thread.start(); 472 } 473 474 475 public synchronized void interrupt() { 476 thread.interrupt(); 477 } 478 479 480 public synchronized void stop() { 481 if(thread != null && thread.isAlive()) { 482 stop_hb=true; 483 missed_hb=false; 484 thread.interrupt(); 485 try {thread.join(timeout+1000);} catch(Exception e) {} 486 thread=null; 487 } 488 } 489 490 491 492 496 public void receivedMessage() { 497 last_hb=System.currentTimeMillis(); 498 if(missed_hb) { 499 if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port); 500 missed_hb=false; 501 } 502 } 503 504 505 506 public void receivedHeartbeat() { 507 last_hb=System.currentTimeMillis(); 508 if(missed_hb) { 509 if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port); 510 missed_hb=false; 511 } 512 } 513 514 515 519 public void run() { 520 long diff=0, curr_time=0, num_missed_hbs=0; 521 522 if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " started"); 523 while(!stop_hb) { 524 525 if(established) { 527 if(outstream != null) { 529 try { 530 writer.write(outstream, HB_PACKET, 1500); 531 Thread.sleep(hb_interval); 532 } 533 catch(Exception io_ex) { closeOutgoingConnection(); continue; 536 } 537 } 538 else { 539 established=false; 540 continue; 541 } 542 543 curr_time=System.currentTimeMillis(); 545 diff=curr_time - last_hb; 546 547 if(curr_time - last_hb > hb_interval) { 548 num_missed_hbs=(curr_time - last_hb) / hb_interval; 549 if(receiver != null) 550 receiver.missedHeartbeat(local, local_port, remote, remote_port, (int)num_missed_hbs); 551 missed_hb=true; 552 } 553 554 if(diff >= timeout) { 555 if(trace) System.out.println("###### Link.Heartbeat.run(): no heartbeat receveived for " + 556 diff + " msecs. Closing connections. #####"); 557 closeConnections(); } 559 } 560 else { synchronized(outgoing_mutex) { if(established) { 563 continue; 564 } 565 try { 566 outgoing=writer.createSocket(local, remote, remote_port, hb_interval); 567 outstream=new DataOutputStream(outgoing.getOutputStream()); 568 if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port); 569 established=true; 570 if(trace) System.out.println("-- CREATE (CE): " + printSocket(outgoing)); 571 continue; 572 } 573 catch(InterruptedException interrupted_ex) { 574 continue; 575 } 576 catch(Exception ex) { Util.sleep(hb_interval); } 579 } 580 } 581 } 582 if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " stopped"); 583 thread=null; 584 } 585 } 586 587 588 589 590 591 592 593 594 private static class MyReceiver implements Link.Receiver { 595 596 public void receive(byte[] msg) { 597 System.out.println("<-- " + new String (msg)); 598 } 599 600 public void linkDown(InetAddress l, int lp, InetAddress r, int rp) { 601 System.out.println("** linkDown(): " + r + ':' + rp); 602 } 603 604 public void linkUp(InetAddress l, int lp, InetAddress r, int rp) { 605 System.out.println("** linkUp(): " + r + ':' + rp); 606 } 607 608 public void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) { 609 System.out.println("** missedHeartbeat(): " + r + ':' + rp); 610 } 611 612 public void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) { 613 System.out.println("** receivedHeartbeatAgain(): " + r + ':' + rp); 614 } 615 } 616 617 618 619 public static void main(String [] args) { 620 String local, remote; 621 int local_port, remote_port; 622 623 624 if(args.length != 4) { 625 System.err.println("\nLink <local host> <local port> <remote host> <remote port>\n"); 626 return; 627 } 628 local=args[0]; 629 remote=args[2]; 630 local_port=Integer.parseInt(args[1]); 631 remote_port=Integer.parseInt(args[3]); 632 633 Link l=new Link(local, local_port, remote, remote_port, new MyReceiver()); 634 635 try { 636 l.start(); 637 System.out.println(l); 638 639 BufferedReader in= new BufferedReader(new InputStreamReader(System.in)); 640 while(true) { 641 System.out.print("> "); System.out.flush(); 642 String line=in.readLine(); 643 l.send(line.getBytes()); 644 } 645 } 646 catch(Exception e) { 647 System.err.println(e); 648 } 649 } 650 } 651 652 653 | Popular Tags |