KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FD_SOCK


1 // $Id: FD_SOCK.java,v 1.22 2005/04/18 13:55:15 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.IpAddress;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Promise;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11 import org.jgroups.util.Streamable;
12
13 import java.io.*;
14 import java.net.ServerSocket JavaDoc;
15 import java.net.Socket JavaDoc;
16 import java.net.InetAddress JavaDoc;
17 import java.net.UnknownHostException JavaDoc;
18 import java.util.*;
19
20
21 /**
22  * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
23  * server socket and announces its address together with the server socket's address in a multicast. A
24  * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
25  * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
26  * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
27  * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
28  * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
29  * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
30  * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
31  * they won't be detected.
32  * The FD_SOCK protocol will work for groups where members are on different hosts, but its main usage is when
33  * all group members are on the same host.<p> The costs involved are 2 additional threads: one that
34  * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
35  * server socket. However, those threads will be idle as long as both peers are running.
36  * @author Bela Ban May 29 2001
37  */

38 public class FD_SOCK extends Protocol implements Runnable JavaDoc {
39     long get_cache_timeout=3000; // msecs to wait for the socket cache from the coordinator
40
final long get_cache_retry_timeout=500; // msecs to wait until we retry getting the cache from coord
41
long suspect_msg_interval=5000; // (BroadcastTask): mcast SUSPECT every 5000 msecs
42
int num_tries=3; // attempts coord is solicited for socket cache until we give up
43
final Vector members=new Vector(11); // list of group members (updated on VIEW_CHANGE)
44
boolean srv_sock_sent=false; // has own socket been broadcast yet ?
45
final Vector pingable_mbrs=new Vector(11); // mbrs from which we select ping_dest. may be subset of 'members'
46
final Promise get_cache_promise=new Promise(); // used for rendezvous on GET_CACHE and GET_CACHE_RSP
47
boolean got_cache_from_coord=false; // was cache already fetched ?
48
Address local_addr=null; // our own address
49
ServerSocket JavaDoc srv_sock=null; // server socket to which another member connects to monitor me
50
InetAddress JavaDoc srv_sock_bind_addr=null; // the NIC on which the ServerSocket should listen
51
ServerSocketHandler srv_sock_handler=null; // accepts new connections on srv_sock
52
IpAddress srv_sock_addr=null; // pair of server_socket:port
53
Address ping_dest=null; // address of the member we monitor
54
Socket JavaDoc ping_sock=null; // socket to the member we monitor
55
InputStream ping_input=null; // input stream of the socket to the member we monitor
56
Thread JavaDoc pinger_thread=null; // listens on ping_sock, suspects member if socket is closed
57
final Hashtable cache=new Hashtable(11); // keys=Addresses, vals=IpAddresses (socket:port)
58

59     /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
60      * picks a random port */

61     int start_port=0;
62     final Promise ping_addr_promise=new Promise(); // to fetch the ping_addr for ping_dest
63
final Object JavaDoc sock_mutex=new Object JavaDoc(); // for access to ping_sock, ping_input
64
TimeScheduler timer=null;
65     final BroadcastTask bcast_task=new BroadcastTask(); // to transmit SUSPECT message (until view change)
66
boolean regular_sock_close=false; // used by interruptPingerThread() when new ping_dest is computed
67
private static final int NORMAL_TEMINATION=9;
68     private static final int ABNORMAL_TEMINATION=-1;
69     private static final String JavaDoc name="FD_SOCK";
70
71
72     public String JavaDoc getName() {
73         return name;
74     }
75
76
77     public boolean setProperties(Properties props) {
78         String JavaDoc str, tmp=null;
79
80         super.setProperties(props);
81         str=props.getProperty("get_cache_timeout");
82         if(str != null) {
83             get_cache_timeout=Long.parseLong(str);
84             props.remove("get_cache_timeout");
85         }
86
87         str=props.getProperty("suspect_msg_interval");
88         if(str != null) {
89             suspect_msg_interval=Long.parseLong(str);
90             props.remove("suspect_msg_interval");
91         }
92
93         str=props.getProperty("num_tries");
94         if(str != null) {
95             num_tries=Integer.parseInt(str);
96             props.remove("num_tries");
97         }
98
99         str=props.getProperty("start_port");
100         if(str != null) {
101             start_port=Integer.parseInt(str);
102             props.remove("start_port");
103         }
104
105
106         // PropertyPermission not granted if running in an untrusted environment with JNLP.
107
try {tmp=System.getProperty("bind.address");} catch (SecurityException JavaDoc ex){}
108         if(tmp != null)
109             str=tmp;
110         else
111             str=props.getProperty("srv_sock_bind_addr");
112         if(str != null) {
113             try {
114                 srv_sock_bind_addr=InetAddress.getByName(str);
115             }
116             catch(UnknownHostException JavaDoc e) {
117                 log.error("srv_sock_bind_addr " + str + " is invalid", e);
118                 return false;
119             }
120             props.remove("srv_sock_bind_addr");
121         }
122
123         if(props.size() > 0) {
124             System.err.println("FD_SOCK.setProperties(): the following properties are not recognized:");
125             props.list(System.out);
126             return false;
127         }
128         return true;
129     }
130
131
132     public void init() throws Exception JavaDoc {
133         srv_sock_handler=new ServerSocketHandler();
134         timer=stack != null ? stack.timer : null;
135         if(timer == null)
136             throw new Exception JavaDoc("FD_SOCK.init(): timer == null");
137     }
138
139
140     public void stop() {
141         bcast_task.removeAll();
142         stopPingerThread();
143         stopServerSocket();
144     }
145
146
147     public void up(Event evt) {
148         Message msg;
149         FdHeader hdr;
150
151         switch(evt.getType()) {
152
153         case Event.SET_LOCAL_ADDRESS:
154             local_addr=(Address) evt.getArg();
155             break;
156
157         case Event.MSG:
158             msg=(Message) evt.getArg();
159             hdr=(FdHeader) msg.removeHeader(name);
160             if(hdr == null || !(hdr instanceof FdHeader))
161                 break; // message did not originate from FD_SOCK layer, just pass up
162

163             switch(hdr.type) {
164
165             case FdHeader.SUSPECT:
166                 if(hdr.mbrs != null) {
167                     if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr);
168                     for(int i=0; i < hdr.mbrs.size(); i++) {
169                         passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
170                         passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
171                     }
172                 }
173                 else
174                     if(log.isWarnEnabled()) log.warn("[SUSPECT]: hdr.mbrs == null");
175                 break;
176
177                 // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
178
case FdHeader.WHO_HAS_SOCK:
179                 if(local_addr != null && local_addr.equals(msg.getSrc()))
180                     return; // don't reply to WHO_HAS bcasts sent by me !
181

182                 if(hdr.mbr == null) {
183                     if(log.isErrorEnabled()) log.error("hdr.mbr is null");
184                     return;
185                 }
186
187                 if(log.isTraceEnabled()) log.trace("who-has-sock " + hdr.mbr);
188
189                 // 1. Try my own address, maybe it's me whose socket is wanted
190
if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) {
191                     sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr); // unicast message to msg.getSrc()
192
return;
193                 }
194
195                 // 2. If I don't have it, maybe it is in the cache
196
if(cache.containsKey(hdr.mbr))
197                     sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr)); // ucast msg
198
break;
199
200
201                 // Update the cache with the addr:sock_addr entry (if on the same host)
202
case FdHeader.I_HAVE_SOCK:
203                 if(hdr.mbr == null || hdr.sock_addr == null) {
204                     if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
205                     return;
206                 }
207
208                 // if(!cache.containsKey(hdr.mbr))
209
cache.put(hdr.mbr, hdr.sock_addr); // update the cache
210
if(log.isTraceEnabled()) log.trace("i-have-sock: " + hdr.mbr + " --> " +
211                                                    hdr.sock_addr + " (cache is " + cache + ')');
212
213                 if(ping_dest != null && hdr.mbr.equals(ping_dest))
214                     ping_addr_promise.setResult(hdr.sock_addr);
215                 break;
216
217                 // Return the cache to the sender of this message
218
case FdHeader.GET_CACHE:
219                 if(hdr.mbr == null) {
220                     if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null");
221                     return;
222                 }
223                 hdr=new FdHeader(FdHeader.GET_CACHE_RSP);
224                 hdr.cachedAddrs=(Hashtable) cache.clone();
225                 msg=new Message(hdr.mbr, null, null);
226                 msg.putHeader(name, hdr);
227                 passDown(new Event(Event.MSG, msg));
228                 break;
229
230             case FdHeader.GET_CACHE_RSP:
231                 if(hdr.cachedAddrs == null) {
232                     if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null");
233                     return;
234                 }
235                 get_cache_promise.setResult(hdr.cachedAddrs);
236                 break;
237             }
238             return;
239         }
240
241         passUp(evt); // pass up to the layer above us
242
}
243
244
245     public void down(Event evt) {
246         Address mbr, tmp_ping_dest;
247         View v;
248
249         switch(evt.getType()) {
250
251             case Event.UNSUSPECT:
252                 bcast_task.removeSuspectedMember((Address)evt.getArg());
253                 break;
254
255             case Event.CONNECT:
256                 passDown(evt);
257                 srv_sock=Util.createServerSocket(srv_sock_bind_addr, start_port); // grab a random unused port above 10000
258
srv_sock_addr=new IpAddress(srv_sock_bind_addr, srv_sock.getLocalPort());
259                 startServerSocket();
260                 //if(pinger_thread == null)
261
// startPingerThread();
262
break;
263
264             case Event.VIEW_CHANGE:
265                 synchronized(this) {
266                     v=(View) evt.getArg();
267                     members.removeAllElements();
268                     members.addAll(v.getMembers());
269                     bcast_task.adjustSuspectedMembers(members);
270                     pingable_mbrs.removeAllElements();
271                     pingable_mbrs.addAll(members);
272                     passDown(evt);
273
274                     if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);
275
276                     // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
277
if(!got_cache_from_coord) {
278                         getCacheFromCoordinator();
279                         got_cache_from_coord=true;
280                     }
281
282
283                     // 2. Broadcast my own addr:sock to all members so they can update their cache
284
if(!srv_sock_sent) {
285                         if(srv_sock_addr != null) {
286                             sendIHaveSockMessage(null, // send to all members
287
local_addr,
288                                     srv_sock_addr);
289                             srv_sock_sent=true;
290                         }
291                         else
292                             if(log.isWarnEnabled()) log.warn("(VIEW_CHANGE): srv_sock_addr == null");
293                     }
294
295                     // 3. Remove all entries in 'cache' which are not in the new membership
296
for(Enumeration e=cache.keys(); e.hasMoreElements();) {
297                         mbr=(Address) e.nextElement();
298                         if(!members.contains(mbr))
299                             cache.remove(mbr);
300                     }
301
302                     if(members.size() > 1) {
303                         if(pinger_thread != null && pinger_thread.isAlive()) {
304                             tmp_ping_dest=determinePingDest();
305                             if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) {
306                                 interruptPingerThread(); // allows the thread to use the new socket
307
}
308                         }
309                         else
310                             startPingerThread(); // only starts if not yet running
311
}
312                     else {
313                         ping_dest=null;
314                         stopPingerThread();
315                     }
316                 }
317                 break;
318
319             default:
320                 passDown(evt);
321                 break;
322         }
323     }
324
325
326     /**
327      * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
328      * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
329      * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
330      * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
331      * there are fewer than 2 members).
332      */

333     public void run() {
334         Address tmp_ping_dest;
335         IpAddress ping_addr;
336         int max_fetch_tries=10; // number of times a socket address is to be requested before giving up
337

338         if(log.isTraceEnabled()) log.trace("pinger_thread started"); // +++ remove
339

340         while(pinger_thread != null) {
341             tmp_ping_dest=determinePingDest(); // gets the neighbor to our right
342
if(log.isDebugEnabled())
343                 log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs);
344             if(tmp_ping_dest == null) {
345                 ping_dest=null;
346                 pinger_thread=null;
347                 break;
348             }
349             ping_dest=tmp_ping_dest;
350             ping_addr=fetchPingAddress(ping_dest);
351             if(ping_addr == null) {
352                 if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying");
353                 if(--max_fetch_tries <= 0)
354                     break;
355                 Util.sleep(2000);
356                 continue;
357             }
358
359             if(!setupPingSocket(ping_addr)) {
360                 // covers use cases #7 and #8 in GmsTests.txt
361
if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest);
362                 broadcastSuspectMessage(ping_dest);
363                 pingable_mbrs.removeElement(ping_dest);
364                 continue;
365             }
366
367             if(log.isDebugEnabled()) log.debug("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache);
368
369             // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
370
try {
371                 if(ping_input != null) {
372                     int c=ping_input.read();
373                     switch(c) {
374                         case NORMAL_TEMINATION:
375                             if(log.isDebugEnabled())
376                                 log.debug("peer closed socket normally");
377                             pinger_thread=null;
378                             break;
379                         case ABNORMAL_TEMINATION:
380                             handleSocketClose(null);
381                             break;
382                         default:
383                             break;
384                     }
385                 }
386             }
387             catch(IOException ex) { // we got here when the peer closed the socket --> suspect peer and then continue
388
handleSocketClose(ex);
389             }
390             catch(Throwable JavaDoc catch_all_the_rest) {
391                 log.error("exception", catch_all_the_rest);
392             }
393         }
394         if(log.isDebugEnabled()) log.debug("pinger thread terminated");
395         pinger_thread=null;
396     }
397
398
399
400
401     /* ----------------------------------- Private Methods -------------------------------------- */
402
403
404     void handleSocketClose(Exception JavaDoc ex) {
405         teardownPingSocket(); // make sure we have no leftovers
406
if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
407
if(log.isDebugEnabled())
408                 log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')');
409             broadcastSuspectMessage(ping_dest);
410             pingable_mbrs.removeElement(ping_dest);
411         }
412         else {
413             if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset");
414             regular_sock_close=false;
415         }
416     }
417
418
419     void startPingerThread() {
420         if(pinger_thread == null) {
421             pinger_thread=new Thread JavaDoc(this, "FD_SOCK Ping thread");
422             pinger_thread.setDaemon(true);
423             pinger_thread.start();
424         }
425     }
426
427
428     void stopPingerThread() {
429         if(pinger_thread != null && pinger_thread.isAlive()) {
430             regular_sock_close=true;
431             teardownPingSocket();
432         }
433         pinger_thread=null;
434     }
435
436
437     /**
438      * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
439      * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
440      * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
441      * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
442      * code portable and we don't have to check for OSs.
443      * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
444      */

445     void interruptPingerThread() {
446         if(pinger_thread != null && pinger_thread.isAlive()) {
447             regular_sock_close=true;
448             teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
449
}
450     }
451
452     void startServerSocket() {
453         if(srv_sock_handler != null)
454             srv_sock_handler.start(); // won't start if already running
455
}
456
457     void stopServerSocket() {
458         if(srv_sock_handler != null)
459             srv_sock_handler.stop();
460     }
461
462
463     /**
464      * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
465      */

466     boolean setupPingSocket(IpAddress dest) {
467         synchronized(sock_mutex) {
468             if(dest == null) {
469                 if(log.isErrorEnabled()) log.error("destination address is null");
470                 return false;
471             }
472             try {
473                 ping_sock=new Socket JavaDoc(dest.getIpAddress(), dest.getPort());
474                 ping_sock.setSoLinger(true, 1);
475                 ping_input=ping_sock.getInputStream();
476                 return true;
477             }
478             catch(Throwable JavaDoc ex) {
479                 return false;
480             }
481         }
482     }
483
484
485     void teardownPingSocket() {
486         synchronized(sock_mutex) {
487             if(ping_sock != null) {
488                 try {
489                     ping_sock.shutdownInput();
490                     ping_sock.close();
491                 }
492                 catch(Exception JavaDoc ex) {
493                 }
494                 ping_sock=null;
495             }
496             if(ping_input != null) {
497                 try {
498                     ping_input.close();
499                 }
500                 catch(Exception JavaDoc ex) {
501                 }
502                 ping_input=null;
503             }
504         }
505     }
506
507
508     /**
509      * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
510      * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
511      */

512     void getCacheFromCoordinator() {
513         Address coord;
514         int attempts=num_tries;
515         Message msg;
516         FdHeader hdr;
517         Hashtable result;
518
519         get_cache_promise.reset();
520         while(attempts > 0) {
521             if((coord=determineCoordinator()) != null) {
522                 if(coord.equals(local_addr)) { // we are the first member --> empty cache
523
if(log.isDebugEnabled()) log.debug("first member; cache is empty");
524                     return;
525                 }
526                 hdr=new FdHeader(FdHeader.GET_CACHE);
527                 hdr.mbr=local_addr;
528                 msg=new Message(coord, null, null);
529                 msg.putHeader(name, hdr);
530                 passDown(new Event(Event.MSG, msg));
531                 result=(Hashtable) get_cache_promise.getResult(get_cache_timeout);
532                 if(result != null) {
533                     cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
534
if(log.isTraceEnabled()) log.trace("got cache from " + coord + ": cache is " + cache);
535                     return;
536                 }
537                 else {
538                     if(log.isErrorEnabled()) log.error("received null cache; retrying");
539                 }
540             }
541
542             Util.sleep(get_cache_retry_timeout);
543             --attempts;
544         }
545     }
546
547
548     /**
549      * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
550      * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
551      * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
552      * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
553      * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
554      * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
555      * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
556      */

557     void broadcastSuspectMessage(Address suspected_mbr) {
558         Message suspect_msg;
559         FdHeader hdr;
560
561         if(suspected_mbr == null) return;
562
563         if(log.isDebugEnabled()) log.debug("suspecting " + suspected_mbr +
564                 " (own address is " + local_addr + ')');
565
566         // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
567
hdr=new FdHeader(FdHeader.SUSPECT);
568         hdr.mbrs=new Vector(1);
569         hdr.mbrs.addElement(suspected_mbr);
570         suspect_msg=new Message();
571         suspect_msg.putHeader(name, hdr);
572         passDown(new Event(Event.MSG, suspect_msg));
573
574         // 2. Add to broadcast task and start latter (if not yet running). The task will end when
575
// suspected members are removed from the membership
576
bcast_task.addSuspectedMember(suspected_mbr);
577     }
578
579
580     void broadcastWhoHasSockMessage(Address mbr) {
581         Message msg;
582         FdHeader hdr;
583
584         if(local_addr != null && mbr != null)
585             if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: who-has " + mbr);
586
587         msg=new Message(); // bcast msg
588
hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
589         hdr.mbr=mbr;
590         msg.putHeader(name, hdr);
591         passDown(new Event(Event.MSG, msg));
592     }
593
594
595     /**
596      Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
597      it will be unicast back to the requester
598      */

599     void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
600         Message msg=new Message(dst, null, null);
601         FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
602         hdr.mbr=mbr;
603         hdr.sock_addr=addr;
604         msg.putHeader(name, hdr);
605
606         if(log.isTraceEnabled()) // +++ remove
607
log.trace("hdr=" + hdr);
608
609         passDown(new Event(Event.MSG, msg));
610     }
611
612
613     /**
614      Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
615      then by multicasting a request to all members.
616      */

617     IpAddress fetchPingAddress(Address mbr) {
618         IpAddress ret=null;
619         Message ping_addr_req;
620         FdHeader hdr;
621
622         if(mbr == null) {
623             if(log.isErrorEnabled()) log.error("mbr == null");
624             return null;
625         }
626         // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
627
// we ask them to do so
628
ret=(IpAddress)cache.get(mbr);
629         if(ret != null) {
630             return ret;
631         }
632
633         Util.sleep(300);
634         if((ret=(IpAddress)cache.get(mbr)) != null)
635             return ret;
636
637
638         // 2. Try to get from mbr
639
ping_addr_promise.reset();
640         ping_addr_req=new Message(mbr, null, null); // unicast
641
hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
642         hdr.mbr=mbr;
643         ping_addr_req.putHeader(name, hdr);
644         passDown(new Event(Event.MSG, ping_addr_req));
645         ret=(IpAddress) ping_addr_promise.getResult(3000);
646         if(ret != null) {
647             return ret;
648         }
649
650
651         // 3. Try to get from all members
652
ping_addr_req=new Message(null, null, null); // multicast
653
hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
654         hdr.mbr=mbr;
655         ping_addr_req.putHeader(name, hdr);
656         passDown(new Event(Event.MSG, ping_addr_req));
657         ret=(IpAddress) ping_addr_promise.getResult(3000);
658         return ret;
659     }
660
661
662     Address determinePingDest() {
663         Address tmp;
664
665         if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
666             return null;
667         for(int i=0; i < pingable_mbrs.size(); i++) {
668             tmp=(Address) pingable_mbrs.elementAt(i);
669             if(local_addr.equals(tmp)) {
670                 if(i + 1 >= pingable_mbrs.size())
671                     return (Address) pingable_mbrs.elementAt(0);
672                 else
673                     return (Address) pingable_mbrs.elementAt(i + 1);
674             }
675         }
676         return null;
677     }
678
679
680     Address determineCoordinator() {
681         return members.size() > 0 ? (Address) members.elementAt(0) : null;
682     }
683
684
685
686
687
688     /* ------------------------------- End of Private Methods ------------------------------------ */
689
690
691     public static class FdHeader extends Header implements Streamable {
692         static final byte SUSPECT=10;
693         static final byte WHO_HAS_SOCK=11;
694         static final byte I_HAVE_SOCK=12;
695         static final byte GET_CACHE=13; // sent by joining member to coordinator
696
static final byte GET_CACHE_RSP=14; // sent by coordinator to joining member in response to GET_CACHE
697

698
699         byte type=SUSPECT;
700         Address mbr=null; // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
701
IpAddress sock_addr; // set on I_HAVE_SOCK
702

703         // Hashtable<Address,IpAddress>
704
Hashtable cachedAddrs=null; // set on GET_CACHE_RSP
705
Vector mbrs=null; // set on SUSPECT (list of suspected members)
706

707
708         public FdHeader() {
709         } // used for externalization
710

711         public FdHeader(byte type) {
712             this.type=type;
713         }
714
715
716         public String JavaDoc toString() {
717             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
718             sb.append(type2String(type));
719             if(mbr != null)
720                 sb.append(", mbr=" + mbr);
721             if(sock_addr != null)
722                 sb.append(", sock_addr=" + sock_addr);
723             if(cachedAddrs != null)
724                 sb.append(", cache=" + cachedAddrs);
725             if(mbrs != null)
726                 sb.append(", mbrs=" + mbrs);
727             return sb.toString();
728         }
729
730
731         public static String JavaDoc type2String(byte type) {
732             switch(type) {
733                 case SUSPECT:
734                     return "SUSPECT";
735                 case WHO_HAS_SOCK:
736                     return "WHO_HAS_SOCK";
737                 case I_HAVE_SOCK:
738                     return "I_HAVE_SOCK";
739                 case GET_CACHE:
740                     return "GET_CACHE";
741                 case GET_CACHE_RSP:
742                     return "GET_CACHE_RSP";
743                 default:
744                     return "unknown type (" + type + ')';
745             }
746         }
747
748         public void writeExternal(ObjectOutput out) throws IOException {
749             out.writeByte(type);
750             out.writeObject(mbr);
751             out.writeObject(sock_addr);
752             out.writeObject(cachedAddrs);
753             out.writeObject(mbrs);
754         }
755
756
757         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
758             type=in.readByte();
759             mbr=(Address) in.readObject();
760             sock_addr=(IpAddress) in.readObject();
761             cachedAddrs=(Hashtable) in.readObject();
762             mbrs=(Vector) in.readObject();
763         }
764
765         public void writeTo(DataOutputStream out) throws IOException {
766             int size;
767             out.writeByte(type);
768             Util.writeAddress(mbr, out);
769             Util.writeStreamable(sock_addr, out);
770             size=cachedAddrs != null? cachedAddrs.size() : 0;
771             out.writeInt(size);
772             if(size > 0) {
773                 for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
774                     Map.Entry entry=(Map.Entry)it.next();
775                     Address key=(Address)entry.getKey();
776                     IpAddress val=(IpAddress)entry.getValue();
777                     Util.writeAddress(key, out);
778                     Util.writeStreamable(val, out);
779                 }
780             }
781             size=mbrs != null? mbrs.size() : 0;
782             out.writeInt(size);
783             if(size > 0) {
784                 for(Iterator it=mbrs.iterator(); it.hasNext();) {
785                     Address address=(Address)it.next();
786                     Util.writeAddress(address, out);
787                 }
788             }
789         }
790
791         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
792             int size;
793             type=in.readByte();
794             mbr=Util.readAddress(in);
795             sock_addr=(IpAddress)Util.readStreamable(IpAddress.class, in);
796             size=in.readInt();
797             if(size > 0) {
798                 if(cachedAddrs == null)
799                     cachedAddrs=new Hashtable();
800                 for(int i=0; i < size; i++) {
801                     Address key=Util.readAddress(in);
802                     IpAddress val=(IpAddress)Util.readStreamable(IpAddress.class, in);
803                     cachedAddrs.put(key, val);
804                 }
805             }
806             size=in.readInt();
807             if(size > 0) {
808                 if(mbrs == null)
809                     mbrs=new Vector();
810                 for(int i=0; i < size; i++) {
811                     Address addr=Util.readAddress(in);
812                     mbrs.add(addr);
813                 }
814             }
815         }
816
817     }
818
819
820     /**
821      * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
822      * until that client closes the connection. Note that there is no new thread spawned for the listening on the
823      * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
824      * to create a connection will be blocked until the first client closes its connection. This should not be a problem
825      * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
826      */

827     private class ServerSocketHandler implements Runnable JavaDoc {
828         Thread JavaDoc acceptor=null;
829         /** List<ClientConnectionHandler> */
830         final List clients=new ArrayList();
831
832
833
834         ServerSocketHandler() {
835             start();
836         }
837
838         void start() {
839             if(acceptor == null) {
840                 acceptor=new Thread JavaDoc(this, "ServerSocket acceptor thread");
841                 acceptor.setDaemon(true);
842                 acceptor.start();
843             }
844         }
845
846
847         void stop() {
848             if(acceptor != null && acceptor.isAlive()) {
849                 try {
850                     srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
851
}
852                 catch(Exception JavaDoc ex) {
853                 }
854             }
855             synchronized(clients) {
856                 for(Iterator it=clients.iterator(); it.hasNext();) {
857                     ClientConnectionHandler handler=(ClientConnectionHandler)it.next();
858                     handler.stopThread();
859                 }
860                 clients.clear();
861             }
862             acceptor=null;
863         }
864
865
866         /** Only accepts 1 client connection at a time (saving threads) */
867         public void run() {
868             Socket JavaDoc client_sock=null;
869             while(acceptor != null && srv_sock != null) {
870                 try {
871                     if(log.isTraceEnabled()) // +++ remove
872
log.trace("waiting for client connections on " + srv_sock.getInetAddress() + ":" +
873                                   srv_sock.getLocalPort());
874                     client_sock=srv_sock.accept();
875                     if(log.isTraceEnabled()) // +++ remove
876
log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort());
877                     ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients);
878                     synchronized(clients) {
879                         clients.add(client_conn_handler);
880                     }
881                     client_conn_handler.start();
882                 }
883                 catch(IOException io_ex2) {
884                     break;
885                 }
886             }
887             acceptor=null;
888         }
889     }
890
891
892
893     /** Handles a client connection; multiple client can connect at the same time */
894     private static class ClientConnectionHandler extends Thread JavaDoc {
895         Socket JavaDoc client_sock=null;
896         InputStream in;
897         final Object JavaDoc mutex=new Object JavaDoc();
898         List clients=null;
899
900         ClientConnectionHandler(Socket JavaDoc client_sock, List clients) {
901             setName("ClientConnectionHandler");
902             setDaemon(true);
903             this.client_sock=client_sock;
904             this.clients=clients;
905         }
906
907         void stopThread() {
908             synchronized(mutex) {
909                 if(client_sock != null) {
910                     try {
911                         OutputStream out=client_sock.getOutputStream();
912                         out.write(NORMAL_TEMINATION);
913                     }
914                     catch(Throwable JavaDoc t) {
915                     }
916                 }
917             }
918             closeClientSocket();
919         }
920
921         void closeClientSocket() {
922             synchronized(mutex) {
923                 if(client_sock != null) {
924                     try {
925                         client_sock.close();
926                     }
927                     catch(Exception JavaDoc ex) {
928                     }
929                     client_sock=null;
930                 }
931             }
932         }
933
934         public void run() {
935             try {
936                 synchronized(mutex) {
937                     if(client_sock == null)
938                         return;
939                     in=client_sock.getInputStream();
940                 }
941                 while((in.read()) != -1) {
942                 }
943             }
944             catch(IOException io_ex1) {
945             }
946             finally {
947                 closeClientSocket();
948                 synchronized(clients) {
949                     clients.remove(this);
950                 }
951             }
952         }
953     }
954
955
956     /**
957      * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
958      * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
959      * sure they are retransmitted until a view has been received which doesn't contain the suspected members
960      * any longer. Then the task terminates.
961      */

962     private class BroadcastTask implements TimeScheduler.Task {
963         final Vector suspected_mbrs=new Vector(7);
964         boolean stopped=false;
965
966
967         /** Adds a suspected member. Starts the task if not yet running */
968         public void addSuspectedMember(Address mbr) {
969             if(mbr == null) return;
970             if(!members.contains(mbr)) return;
971             synchronized(suspected_mbrs) {
972                 if(!suspected_mbrs.contains(mbr)) {
973                     suspected_mbrs.addElement(mbr);
974                     if(log.isDebugEnabled()) log.debug("mbr=" + mbr + " (size=" + suspected_mbrs.size() + ')');
975                 }
976                 if(stopped && suspected_mbrs.size() > 0) {
977                     stopped=false;
978                     timer.add(this, true);
979                 }
980             }
981         }
982
983
984         public void removeSuspectedMember(Address suspected_mbr) {
985             if(suspected_mbr == null) return;
986             if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);
987             synchronized(suspected_mbrs) {
988                 suspected_mbrs.removeElement(suspected_mbr);
989                 if(suspected_mbrs.size() == 0)
990                     stopped=true;
991             }
992         }
993
994
995         public void removeAll() {
996             synchronized(suspected_mbrs) {
997                 suspected_mbrs.removeAllElements();
998                 stopped=true;
999             }
1000        }
1001
1002
1003        /**
1004         * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
1005         */

1006        public void adjustSuspectedMembers(Vector new_mbrship) {
1007            Address suspected_mbr;
1008
1009            if(new_mbrship == null || new_mbrship.size() == 0) return;
1010            synchronized(suspected_mbrs) {
1011                for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
1012                    suspected_mbr=(Address) it.next();
1013                    if(!new_mbrship.contains(suspected_mbr)) {
1014                        it.remove();
1015                        if(log.isDebugEnabled())
1016                            log.debug("removed " + suspected_mbr + " (size=" + suspected_mbrs.size() + ')');
1017                    }
1018                }
1019                if(suspected_mbrs.size() == 0)
1020                    stopped=true;
1021            }
1022        }
1023
1024
1025        public boolean cancelled() {
1026            return stopped;
1027        }
1028
1029
1030        public long nextInterval() {
1031            return suspect_msg_interval;
1032        }
1033
1034
1035        public void run() {
1036            Message suspect_msg;
1037            FdHeader hdr;
1038
1039            if(log.isDebugEnabled())
1040                log.debug("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group");
1041
1042            synchronized(suspected_mbrs) {
1043                if(suspected_mbrs.size() == 0) {
1044                    stopped=true;
1045                    if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
1046                    return;
1047                }
1048
1049                hdr=new FdHeader(FdHeader.SUSPECT);
1050                hdr.mbrs=(Vector) suspected_mbrs.clone();
1051            }
1052            suspect_msg=new Message(); // mcast SUSPECT to all members
1053
suspect_msg.putHeader(name, hdr);
1054            passDown(new Event(Event.MSG, suspect_msg));
1055            if(log.isDebugEnabled()) log.debug("task done");
1056        }
1057    }
1058
1059
1060}
1061
Popular Tags