KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > Link


1 // $Id: Link.java,v 1.5 2004/09/23 16:29:11 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.jgroups.util.TimedWriter;
6 import org.jgroups.util.Util;
7
8 import java.io.*;
9 import java.net.InetAddress JavaDoc;
10 import java.net.ServerSocket JavaDoc;
11 import java.net.Socket JavaDoc;
12
13
14
15
16
17 /**
18  * Implements a physical link between 2 parties (point-to-point connection). For incoming traffic,
19  * a server socket is created (bound to a given local address and port). The receiver thread does the
20  * following: it accepts a new connection from the server socket and (on the same thread) reads messages
21  * until the connection breaks. Then it goes back to accept(). This is done in 2 nested while-loops.
22  * The outgoing connection is established when started. If this fails, the link is marked as not established.
23  * This means that there is not outgoing socket.<br>
24  * A heartbeat will be exchanged between the 2 peers periodically as long as the connection is established
25  * (outgoing socket is okay). When the connection breaks, the heartbeat will stop and a connection establisher
26  * thread will be started. It periodically tries to re-establish connection to the peer. When this happens
27  * it will stop and the heartbeat thread will resume.<br>
28  * For details see Link.txt
29  * @author Bela Ban, June 2000
30  */

31 public class Link implements Runnable JavaDoc {
32     String JavaDoc local_addr=null, remote_addr=null;
33     InetAddress JavaDoc local=null, remote=null;
34     int local_port=0, remote_port=0;
35     ServerSocket JavaDoc srv_sock=null;
36     Socket JavaDoc outgoing=null; // traffic to peer
37
Socket JavaDoc incoming=null; // traffic from peer
38
DataOutputStream outstream=null;
39     DataInputStream instream=null;
40     boolean established=false; // (incoming and outgoing) connections to peer are up and running
41
boolean stop=false;
42     boolean trace=false;
43     Thread JavaDoc receiver_thread=null;
44     final long receiver_thread_join_timeout=2000;
45     Receiver receiver=null;
46     static final int HB_PACKET=-99;
47     Heartbeat hb=null;
48     long timeout=10000; // if no heartbeat was received for timeout msecs, assume peer is dead
49
long hb_interval=3000; // send a heartbeat every n msecs
50
final Object JavaDoc outgoing_mutex=new Object JavaDoc(); // sync on creation and closing of outgoing socket
51
TimedWriter writer=null;
52
53
54
55     public interface Receiver {
56     void receive(byte[] msg);
57     void linkDown(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port);
58     void linkUp(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port);
59     void missedHeartbeat(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port, int num_hbs);
60     void receivedHeartbeatAgain(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port);
61     }
62
63
64     
65     public Link(String JavaDoc local_addr, int local_port, String JavaDoc remote_addr, int remote_port) {
66     this.local_addr=local_addr; this.local_port=local_port;
67     this.remote_addr=remote_addr; this.remote_port=remote_port;
68     hb=new Heartbeat(timeout, hb_interval);
69     }
70
71
72     public Link(String JavaDoc local_addr, int local_port, String JavaDoc remote_addr, int remote_port, Receiver r) {
73     this(local_addr, local_port, remote_addr, remote_port);
74     setReceiver(r);
75     }
76
77
78
79     public Link(String JavaDoc local_addr, int local_port, String JavaDoc remote_addr, int remote_port,
80         long timeout, long hb_interval, Receiver r) {
81     this.local_addr=local_addr; this.local_port=local_port;
82     this.remote_addr=remote_addr; this.remote_port=remote_port;
83     this.timeout=timeout; this.hb_interval=hb_interval;
84     hb=new Heartbeat(timeout, hb_interval);
85     setReceiver(r);
86     }
87
88
89     public void setTrace(boolean t) {trace=t;}
90     public void setReceiver(Receiver r) {receiver=r;}
91     public boolean established() {return established;}
92     public InetAddress JavaDoc getLocalAddress() {return local;}
93     public InetAddress JavaDoc getRemoteAddress() {return remote;}
94     public int getLocalPort() {return local_port;}
95     public int getRemotePort() {return remote_port;}
96
97
98
99
100
101     public void start() throws Exception JavaDoc {
102     local=InetAddress.getByName(local_addr);
103     remote=InetAddress.getByName(remote_addr);
104     srv_sock=new ServerSocket JavaDoc(local_port, 1, local);
105     createOutgoingConnection(hb_interval); // connection to peer established, sets established=true
106
startReceiverThread(); // start reading from incoming socket
107
hb.start(); // starts heartbeat (conn establisher is not yet started)
108
}
109
110
111
112     public void stop() {
113     stopReceiverThread();
114     hb.stop();
115     try {srv_sock.close();} catch(Exception JavaDoc e) {}
116     established=false;
117     }
118
119
120
121
122
123     /** Tries to send buffer across out socket. Tries to establish connection if not yet connected. */
124     public boolean send(byte[] buf) {
125     if(buf == null || buf.length == 0) {
126         if(trace) System.err.println("Link.send(): buffer is null or does not contain any data !");
127         return false;
128     }
129     if(!established) { // will be set by ConnectionEstablisher when connection has been set up
130
if(trace) System.err.println("Link.send(): connection not established, discarding message");
131         return false;
132     }
133
134     try {
135         outstream.writeInt(buf.length); // synchronized anyway
136
outstream.write(buf); // synchronized anyway, we don't need to sync on outstream
137
return true;
138     }
139     catch(Exception JavaDoc ex) { // either IOException or EOFException (subclass if IOException)
140
if(trace) System.err.println("Link.send1(): sending failed; retrying");
141         return retry(buf);
142     }
143     }
144
145
146     boolean retry(byte[] buf) {
147     closeOutgoingConnection(); // there something wrong, close connection
148
if(!createOutgoingConnection()) { // ... and re-open. if this fails,
149
closeOutgoingConnection(); // just abort and return failure to caller
150
return false;
151     }
152     else {
153         try {
154         outstream.writeInt(buf.length);
155         outstream.write(buf);
156         return true;
157         }
158         catch(Exception JavaDoc e) {
159         if(trace) System.out.println("Link.send2(): failed, closing connection");
160         closeOutgoingConnection();
161         return false;
162         }
163     }
164     }
165
166
167
168
169     /** Receiver thread main loop. Accept a connection and then read on it until the connection
170     breaks. Only then is the next connection handled. The reason is that there is only supposed
171     to be 1 connection to this server socket at the same time.
172     */

173     public void run() {
174     int num_bytes;
175     byte[] buf;
176     InetAddress JavaDoc peer=null;
177     int peer_port=0;
178
179     while(!stop) {
180         try {
181         if(trace) System.out.println("-- WAITING for ACCEPT");
182         incoming=srv_sock.accept();
183         instream=new DataInputStream(incoming.getInputStream());
184         peer=incoming.getInetAddress();
185         peer_port=incoming.getPort();
186
187
188         if(trace) System.out.println("-- ACCEPT: incoming is " + printSocket(incoming));
189
190         
191         /** This piece of code would only accept connections from the peer address defined above. */
192         if(remote.equals(incoming.getInetAddress())) {
193             if(trace)
194             System.out.println("Link.run(): accepted connection from " + peer + ':' + peer_port);
195         }
196         else {
197             if(trace)
198             System.err.println("Link.run(): rejected connection request from " +
199                        peer + ':' + peer_port + ". Address not specified as peer in link !");
200             closeIncomingConnection(); // only close incoming connection
201
continue;
202         }
203         
204         // now try to create outgoing connection
205
if(!established) {
206             createOutgoingConnection();
207         }
208
209         while(!stop) {
210             try {
211             num_bytes=instream.readInt();
212             if(num_bytes == HB_PACKET) {
213                 hb.receivedHeartbeat();
214                 continue;
215             }
216
217             buf=new byte[num_bytes];
218             instream.readFully(buf, 0, buf.length);
219             hb.receivedMessage(); // equivalent to heartbeat response (HB_PACKET)
220
if(receiver != null)
221                 receiver.receive(buf);
222             }
223             catch(Exception JavaDoc ex) { // IOException, EOFException, SocketException
224
closeIncomingConnection(); // close incoming when read() fails
225
break;
226             }
227         }
228         }
229         catch(IOException io_ex) {
230         receiver_thread=null;
231         break;
232         }
233         catch(Exception JavaDoc e) {
234         }
235     }
236     }
237
238
239
240     public String JavaDoc toString() {
241     StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
242     ret.append("Link <" + local_addr + ':' + local_port + " --> " +
243            remote_addr + ':' + remote_port + '>');
244     ret.append(established? " (established)" : " (not established)");
245     return ret.toString();
246     }
247
248
249     public boolean equals(Object JavaDoc other) {
250     Link o;
251
252     if(other == null)
253         return false;
254     if(!(other instanceof Link))
255         return false;
256     o=(Link)other;
257     if(local_addr.equals(o.local_addr) && remote_addr.equals(o.remote_addr) &&
258        local_port == o.local_port && remote_port == o.remote_port)
259         return true;
260     else
261         return false;
262     }
263
264
265     public int hashCode() {
266     return local_addr.hashCode() + remote_addr.hashCode() + local_port + remote_port;
267     }
268
269
270     void startReceiverThread() {
271     stopReceiverThread();
272     receiver_thread=new Thread JavaDoc(this, "Link.ReceiverThreadThread");
273         receiver_thread.setDaemon(true);
274     receiver_thread.start();
275     }
276
277     void stopReceiverThread() {
278     if(receiver_thread != null && receiver_thread.isAlive()) {
279         stop=true;
280         closeIncomingConnection();
281         try {receiver_thread.join(receiver_thread_join_timeout);} catch(Exception JavaDoc e) {}
282         stop=false;
283     }
284     receiver_thread=null;
285     }
286
287
288
289
290     /** Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
291     stop the connection establisher ! The reason is that this method is going to be called by the
292     connection establisher as well, therefore it would kill itself ! */

293     boolean createOutgoingConnection() {
294     synchronized(outgoing_mutex) { // serialize access with ConnectionEstablisher
295
if(established) {
296         return true;
297         }
298         try {
299         // create a socket to remote:remote_port, bind to local address (choose any local port);
300
outgoing=new Socket JavaDoc(remote, remote_port, local, 0); // 0 means choose any local port
301
outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default !
302
outstream=new DataOutputStream(outgoing.getOutputStream());
303         if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);
304         established=true;
305
306         if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing));
307
308         return true;
309         }
310         catch(Exception JavaDoc e) {
311         established=false;
312         return false;
313         }
314     }
315     }
316
317
318
319     
320     /**
321     Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
322     stop the connection establisher ! The reason is that this method is going to be called by the
323     connection establisher as well, therefore it would kill itself !
324     */

325     boolean createOutgoingConnection(long timeout) {
326     synchronized(outgoing_mutex) { // serialize access with ConnectionEstablisher
327
if(established) {
328         return true;
329         }
330         try {
331         if(writer == null) writer=new TimedWriter();
332
333         // create a socket to remote:remote_port, bind to local address (choose any local port);
334
// outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port
335
outgoing=writer.createSocket(local, remote, remote_port, timeout);
336         outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default !
337
outstream=new DataOutputStream(outgoing.getOutputStream());
338         if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);
339         established=true;
340         if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing));
341         return true;
342         }
343         catch(Exception JavaDoc e) {
344         established=false;
345         return false;
346         }
347     }
348     }
349
350
351
352
353     /** Closes the outgoing connection */
354     void closeOutgoingConnection() {
355     synchronized(outgoing_mutex) {
356         if(!established) {
357         return;
358         }
359         if(outstream != null) {
360         
361         if(trace) System.out.println("-- CLOSE: outgoing is " + printSocket(outgoing));
362         
363         try {
364             outstream.close(); // flush data before socket is closed
365
}
366         catch(Exception JavaDoc e) {}
367         outstream=null;
368         }
369         if(outgoing != null) {
370         try {
371             outgoing.close();
372         }
373         catch(Exception JavaDoc e) {}
374         outgoing=null;
375         }
376         established=false;
377         if(receiver != null) receiver.linkDown(local, local_port, remote, remote_port);
378     }
379     }
380
381     
382     /** When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),
383         then it closes the outgoing *and* incoming socket. The latter needs to be done,
384         so that we can return to accept() and await a new client connection request. */

385     synchronized void closeIncomingConnection() {
386     if(instream != null) {
387         
388         if(trace) System.out.println("-- CLOSE: incoming is " + printSocket(incoming));
389         
390         try {instream.close();} catch(Exception JavaDoc e) {}
391         instream=null;
392     }
393     if(incoming != null) {
394         try {incoming.close();} catch(Exception JavaDoc e) {}
395         incoming=null;
396     }
397     }
398
399
400
401     /** Close outgoing and incoming sockets. */
402     synchronized void closeConnections() {
403
404     // 1. Closes the outgoing connection. Then the connection establisher is started. The heartbeat
405
// thread cannot be stopped in here, because this method is called by it !
406
closeOutgoingConnection();
407
408
409     // 2. When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),
410
// then it closes the outgoing *and* incoming socket. The latter needs to be done,
411
// so that we can return to accept() and await a new client connection request.
412
closeIncomingConnection();
413     }
414
415
416
417
418     String JavaDoc printSocket(Socket JavaDoc s) {
419     if(s == null) return "<null>";
420     StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
421     ret.append(s.getLocalAddress().getHostName());
422     ret.append(':');
423     ret.append(s.getLocalPort());
424     ret.append(" --> ");
425     ret.append(s.getInetAddress().getHostName());
426     ret.append(':');
427     ret.append(s.getPort());
428     return ret.toString();
429     }
430     
431
432
433
434
435
436
437
438
439
440     /**
441        Sends heartbeats across the link as long as we are connected (established=true). Uses a TimedWriter
442        for both sending and responding to heartbeats. The reason is that a write() might hang if the
443        peer has not closed its end, but the connection hangs (e.g. network partition, peer was stop-a'ed,
444        ctrl-z of peer or peer's NIC was unplumbed) and the writer buffer is filled to capacity. This way,
445        we don't hang sending timeouts.
446      */

447     class Heartbeat implements Runnable JavaDoc {
448     Thread JavaDoc thread=null;
449     long timeout=10000; // time to wait for heartbeats from peer, if not received -> boom !
450
long hb_interval=3000; // {send a heartbeat | try to create connection} every 3 secs
451
boolean stop_hb=false;
452     long last_hb=System.currentTimeMillis();
453     boolean missed_hb=false;
454     final TimedWriter writer=new TimedWriter();
455
456
457
458     public Heartbeat(long timeout, long hb_interval) {
459         this.timeout=timeout;
460         this.hb_interval=hb_interval;
461     }
462
463
464     public synchronized void start() {
465         stop();
466         stop_hb=false;
467         missed_hb=false;
468         last_hb=System.currentTimeMillis();
469         thread=new Thread JavaDoc(this, "HeartbeatThread");
470             thread.setDaemon(true);
471         thread.start();
472     }
473
474
475     public synchronized void interrupt() {
476         thread.interrupt();
477     }
478
479
480     public synchronized void stop() {
481         if(thread != null && thread.isAlive()) {
482         stop_hb=true;
483         missed_hb=false;
484         thread.interrupt();
485         try {thread.join(timeout+1000);} catch(Exception JavaDoc e) {}
486         thread=null;
487         }
488     }
489
490     
491     
492     /**
493        When we receive a message from the peer, this means the peer is alive. Therefore we
494        update the time of the last heartbeat.
495      */

496     public void receivedMessage() {
497         last_hb=System.currentTimeMillis();
498         if(missed_hb) {
499         if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);
500         missed_hb=false;
501         }
502     }
503
504
505     /** Callback, called by the Link whenever it encounters a heartbeat (HB_PACKET) */
506     public void receivedHeartbeat() {
507         last_hb=System.currentTimeMillis();
508         if(missed_hb) {
509         if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);
510         missed_hb=false;
511         }
512     }
513     
514
515     /**
516        Sends heartbeats when connection is established. Tries to establish connection when not established.
517        Switches between 'established' and 'not established' roles.
518      */

519     public void run() {
520         long diff=0, curr_time=0, num_missed_hbs=0;
521         
522         if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " started");
523         while(!stop_hb) {
524
525         if(established) { // send heartbeats
526

527             // 1. Send heartbeat (use timed write)
528
if(outstream != null) {
529             try {
530                 writer.write(outstream, HB_PACKET, 1500);
531                 Thread.sleep(hb_interval);
532             }
533             catch(Exception JavaDoc io_ex) { // IOException and TimedWriter.Timeout
534
closeOutgoingConnection(); // sets established to false
535
continue;
536             }
537             }
538             else {
539             established=false;
540             continue;
541             }
542          
543             // 2. If time of last HB received > timeout --> close connection
544
curr_time=System.currentTimeMillis();
545             diff=curr_time - last_hb;
546
547             if(curr_time - last_hb > hb_interval) {
548             num_missed_hbs=(curr_time - last_hb) / hb_interval;
549             if(receiver != null)
550                 receiver.missedHeartbeat(local, local_port, remote, remote_port, (int)num_missed_hbs);
551             missed_hb=true;
552             }
553
554             if(diff >= timeout) {
555             if(trace) System.out.println("###### Link.Heartbeat.run(): no heartbeat receveived for " +
556                              diff + " msecs. Closing connections. #####");
557             closeConnections(); // close both incoming *and* outgoing connections
558
}
559         }
560         else { // try to establish connection
561
synchronized(outgoing_mutex) { // serialize access with createOutgoingConnection()
562
if(established) {
563                 continue;
564             }
565             try {
566                 outgoing=writer.createSocket(local, remote, remote_port, hb_interval);
567                 outstream=new DataOutputStream(outgoing.getOutputStream());
568                 if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);
569                 established=true;
570                 if(trace) System.out.println("-- CREATE (CE): " + printSocket(outgoing));
571                 continue;
572             }
573             catch(InterruptedException JavaDoc interrupted_ex) {
574                 continue;
575             }
576             catch(Exception JavaDoc ex) { // IOException, TimedWriter.Timeout
577
Util.sleep(hb_interval); // returns when done or interrupted
578
}
579             }
580         }
581         }
582         if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " stopped");
583         thread=null;
584     }
585     }
586     
587
588
589
590
591
592
593
594     private static class MyReceiver implements Link.Receiver {
595     
596     public void receive(byte[] msg) {
597         System.out.println("<-- " + new String JavaDoc(msg));
598     }
599     
600     public void linkDown(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp) {
601         System.out.println("** linkDown(): " + r + ':' + rp);
602     }
603     
604     public void linkUp(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp) {
605         System.out.println("** linkUp(): " + r + ':' + rp);
606     }
607     
608     public void missedHeartbeat(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp, int num) {
609         System.out.println("** missedHeartbeat(): " + r + ':' + rp);
610     }
611     
612     public void receivedHeartbeatAgain(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp) {
613         System.out.println("** receivedHeartbeatAgain(): " + r + ':' + rp);
614     }
615     }
616
617
618
619     public static void main(String JavaDoc[] args) {
620     String JavaDoc local, remote;
621     int local_port, remote_port;
622
623
624     if(args.length != 4) {
625         System.err.println("\nLink <local host> <local port> <remote host> <remote port>\n");
626         return;
627     }
628     local=args[0];
629     remote=args[2];
630     local_port=Integer.parseInt(args[1]);
631     remote_port=Integer.parseInt(args[3]);
632
633     Link l=new Link(local, local_port, remote, remote_port, new MyReceiver());
634
635     try {
636         l.start();
637         System.out.println(l);
638         
639         BufferedReader in= new BufferedReader(new InputStreamReader(System.in));
640         while(true) {
641         System.out.print("> "); System.out.flush();
642         String JavaDoc line=in.readLine();
643         l.send(line.getBytes());
644         }
645     }
646     catch(Exception JavaDoc e) {
647         System.err.println(e);
648     }
649     }
650 }
651
652
653
Popular Tags