KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FD


1 // $Id: FD.java,v 1.21 2005/04/23 20:41:39 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Marshaller;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Streamable;
11 import org.jgroups.util.Util;
12
13 import java.io.*;
14 import java.util.Hashtable JavaDoc;
15 import java.util.Iterator JavaDoc;
16 import java.util.Properties JavaDoc;
17 import java.util.Vector JavaDoc;
18
19
20
21
22 /**
23  * Failure detection based on simple heartbeat protocol. Regularly polls members for
24  * liveness. Multicasts SUSPECT messages when a member is not reachable. The simple
25  * algorithms works as follows: the membership is known and ordered. Each HB protocol
26  * periodically sends an 'are-you-alive' message to its *neighbor*. A neighbor is the next in
27  * rank in the membership list, which is recomputed upon a view change. When a response hasn't
28  * been received for n milliseconds and m tries, the corresponding member is suspected (and
29  * eventually excluded if faulty).<p>
30  * FD starts when it detects (in a view change notification) that there are at least
31  * 2 members in the group. It stops running when the membership drops below 2.<p>
32  * When a message is received from the monitored neighbor member, it causes the pinger thread to
33  * 'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p>
34  * When we receive a ping from a member that's not in the membership list, we shun it by sending it a
35  * NOT_MEMBER message. That member will then leave the group (and possibly rejoin). This is only done if
36  * <code>shun</code> is true.
37  * @author Bela Ban
38  * @version $Revision: 1.21 $
39  */

40 public class FD extends Protocol {
41     Address ping_dest=null;
42     Address local_addr=null;
43     long timeout=3000; // number of millisecs to wait for an are-you-alive msg
44
long last_ack=System.currentTimeMillis();
45     int num_tries=0;
46     int max_tries=2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
47
final Vector JavaDoc members=new Vector JavaDoc(11);
48     final Hashtable JavaDoc invalid_pingers=new Hashtable JavaDoc(7); // keys=Address, val=Integer (number of pings from suspected mbrs)
49

50     /** Members from which we select ping_dest. may be subset of {@link #members} */
51     final Vector JavaDoc pingable_mbrs=new Vector JavaDoc(11);
52
53     boolean shun=true;
54     TimeScheduler timer=null;
55     Monitor monitor=null; // task that performs the actual monitoring for failure detection
56
private final Object JavaDoc monitor_mutex=new Object JavaDoc();
57
58     /** Transmits SUSPECT message until view change or UNSUSPECT is received */
59     final Broadcaster bcast_task=new Broadcaster();
60     final static String JavaDoc name="FD";
61
62
63
64
65
66
67     public String JavaDoc getName() {
68         return name;
69     }
70
71
72     public boolean setProperties(Properties JavaDoc props) {
73         String JavaDoc str;
74
75         super.setProperties(props);
76         str=props.getProperty("timeout");
77         if(str != null) {
78             timeout=Long.parseLong(str);
79             props.remove("timeout");
80         }
81
82         str=props.getProperty("max_tries"); // before suspecting a member
83
if(str != null) {
84             max_tries=Integer.parseInt(str);
85             props.remove("max_tries");
86         }
87
88         str=props.getProperty("shun");
89         if(str != null) {
90             shun=Boolean.valueOf(str).booleanValue();
91             props.remove("shun");
92         }
93
94         if(props.size() > 0) {
95             System.err.println("FD.setProperties(): the following properties are not recognized:");
96             props.list(System.out);
97             return false;
98         }
99         return true;
100     }
101
102
103     public void init() throws Exception JavaDoc {
104         if(stack != null && stack.timer != null)
105             timer=stack.timer;
106         else
107             throw new Exception JavaDoc("FD.init(): timer cannot be retrieved from protocol stack");
108     }
109
110
111     public void stop() {
112         stopMonitor();
113     }
114
115
116     Object JavaDoc getPingDest(Vector JavaDoc mbrs) {
117         Object JavaDoc tmp, retval=null;
118
119         if(mbrs == null || mbrs.size() < 2 || local_addr == null)
120             return null;
121         for(int i=0; i < mbrs.size(); i++) {
122             tmp=mbrs.elementAt(i);
123             if(local_addr.equals(tmp)) {
124                 if(i + 1 >= mbrs.size())
125                     retval=mbrs.elementAt(0);
126                 else
127                     retval=mbrs.elementAt(i + 1);
128                 break;
129             }
130         }
131         return retval;
132     }
133
134
135     private void startMonitor() {
136         synchronized(monitor_mutex) {
137             if(monitor != null && monitor.started == false) {
138                 monitor=null;
139             }
140             if(monitor == null) {
141                 monitor=new Monitor();
142                 last_ack=System.currentTimeMillis(); // start from scratch
143
timer.add(monitor, true); // fixed-rate scheduling
144
num_tries=0;
145             }
146         }
147     }
148
149     private void stopMonitor() {
150         synchronized(monitor_mutex) {
151             if(monitor != null) {
152                 monitor.stop();
153                 monitor=null;
154             }
155         }
156     }
157
158
159     public void up(Event evt) {
160         Message msg;
161         FdHeader hdr=null;
162         Object JavaDoc sender, tmphdr;
163
164         switch(evt.getType()) {
165
166         case Event.SET_LOCAL_ADDRESS:
167             local_addr=(Address)evt.getArg();
168             break;
169
170         case Event.MSG:
171             msg=(Message)evt.getArg();
172             tmphdr=msg.getHeader(name);
173             if(tmphdr == null || !(tmphdr instanceof FdHeader)) {
174                 if(ping_dest != null && (sender=msg.getSrc()) != null) {
175                     if(ping_dest.equals(sender)) {
176                         last_ack=System.currentTimeMillis();
177                         if(log.isTraceEnabled())
178                             log.trace("received msg from " + sender + " (counts as ack)");
179                         num_tries=0;
180                     }
181                 }
182                 break; // message did not originate from FD layer, just pass up
183
}
184
185             hdr=(FdHeader)msg.removeHeader(name);
186             switch(hdr.type) {
187             case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
188
Address hb_sender=msg.getSrc();
189                 Message hb_ack=new Message(hb_sender, null, null);
190                 FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK);
191
192                 // 1. Send an ack
193
tmp_hdr.from=local_addr;
194                 hb_ack.putHeader(name, tmp_hdr);
195                 if(log.isTraceEnabled())
196                     log.trace("received are-you-alive from " + hb_sender + ", sending response");
197                 passDown(new Event(Event.MSG, hb_ack));
198
199                 // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause
200
// the sender to leave the group (and possibly rejoin it later)
201
if(shun)
202                     shunInvalidHeartbeatSender(hb_sender);
203                 break; // don't pass up !
204

205             case FdHeader.HEARTBEAT_ACK: // heartbeat ack
206
if(ping_dest != null && ping_dest.equals(hdr.from)) {
207                     last_ack=System.currentTimeMillis();
208                     num_tries=0;
209                     if(log.isDebugEnabled()) log.debug("received ack from " + hdr.from);
210                 }
211                 else {
212                     stop();
213                     ping_dest=(Address)getPingDest(members);
214                     if(ping_dest != null) {
215                         try {
216                             startMonitor();
217                         }
218                         catch(Exception JavaDoc ex) {
219                             if(log.isWarnEnabled()) log.warn("exception when calling startMonitor(): " + ex);
220                         }
221                     }
222                 }
223                 break;
224
225             case FdHeader.SUSPECT:
226                 if(hdr.mbrs != null) {
227                     if(log.isTraceEnabled()) log.trace("[SUSPECT] suspect hdr is " + hdr);
228                     for(int i=0; i < hdr.mbrs.size(); i++) {
229                         Address m=(Address)hdr.mbrs.elementAt(i);
230                         if(local_addr != null && m.equals(local_addr)) {
231                             if(log.isWarnEnabled())
232                                 log.warn("I was suspected, but will not remove myself from membership " +
233                                          "(waiting for EXIT message)");
234                         }
235                         else {
236                             pingable_mbrs.remove(m);
237                             ping_dest=(Address)getPingDest(pingable_mbrs);
238                         }
239                         passUp(new Event(Event.SUSPECT, m));
240                         passDown(new Event(Event.SUSPECT, m));
241                     }
242                 }
243                 break;
244
245             case FdHeader.NOT_MEMBER:
246                 if(shun) {
247                     if(log.isDebugEnabled()) log.debug("[NOT_MEMBER] I'm being shunned; exiting");
248                     passUp(new Event(Event.EXIT));
249                 }
250                 break;
251             }
252             return;
253         }
254         passUp(evt); // pass up to the layer above us
255
}
256
257
258     public void down(Event evt) {
259         View v;
260
261         switch(evt.getType()) {
262         case Event.VIEW_CHANGE:
263             synchronized(this) {
264                 stop();
265                 v=(View)evt.getArg();
266                 members.clear();
267                 members.addAll(v.getMembers());
268                 bcast_task.adjustSuspectedMembers(members);
269                 pingable_mbrs.clear();
270                 pingable_mbrs.addAll(members);
271                 passDown(evt);
272                 ping_dest=(Address)getPingDest(pingable_mbrs);
273                 if(ping_dest != null) {
274                     try {
275                         startMonitor();
276                     }
277                     catch(Exception JavaDoc ex) {
278                         if(log.isWarnEnabled()) log.warn("exception when calling startMonitor(): " + ex);
279                     }
280                 }
281             }
282             break;
283
284         case Event.UNSUSPECT:
285             unsuspect((Address)evt.getArg());
286             passDown(evt);
287             break;
288
289         default:
290             passDown(evt);
291             break;
292         }
293     }
294
295
296     void unsuspect(Address mbr) {
297         bcast_task.removeSuspectedMember(mbr);
298         pingable_mbrs.removeAllElements();
299         pingable_mbrs.addAll(members);
300         pingable_mbrs.removeAll(bcast_task.getSuspectedMembers());
301         ping_dest=(Address)getPingDest(pingable_mbrs);
302     }
303
304
305     /**
306      * If sender is not a member, send a NOT_MEMBER to sender (after n pings received)
307      */

308     void shunInvalidHeartbeatSender(Address hb_sender) {
309         int num_pings=0;
310         Message shun_msg;
311
312         if(hb_sender != null && members != null && !members.contains(hb_sender)) {
313             if(invalid_pingers.containsKey(hb_sender)) {
314                 num_pings=((Integer JavaDoc)invalid_pingers.get(hb_sender)).intValue();
315                 if(num_pings >= max_tries) {
316                     if(log.isDebugEnabled())
317                         log.debug(hb_sender + " is not in " + members + " ! Shunning it");
318                     shun_msg=new Message(hb_sender, null, null);
319                     shun_msg.putHeader(name, new FdHeader(FdHeader.NOT_MEMBER));
320                     passDown(new Event(Event.MSG, shun_msg));
321                     invalid_pingers.remove(hb_sender);
322                 }
323                 else {
324                     num_pings++;
325                     invalid_pingers.put(hb_sender, new Integer JavaDoc(num_pings));
326                 }
327             }
328             else {
329                 num_pings++;
330                 invalid_pingers.put(hb_sender, new Integer JavaDoc(num_pings));
331             }
332         }
333     }
334
335
336     public static class FdHeader extends Header implements Streamable {
337         static final byte HEARTBEAT=0;
338         static final byte HEARTBEAT_ACK=1;
339         static final byte SUSPECT=2;
340         static final byte NOT_MEMBER=3; // received as response by pinged mbr when we are not a member
341

342
343         byte type=HEARTBEAT;
344         Vector JavaDoc mbrs=null;
345         Address from=null; // member who detected that suspected_mbr has failed
346

347
348
349         public FdHeader() {
350         } // used for externalization
351

352         FdHeader(byte type) {
353             this.type=type;
354         }
355
356
357         public String JavaDoc toString() {
358             switch(type) {
359                 case HEARTBEAT:
360                     return "[FD: heartbeat]";
361                 case HEARTBEAT_ACK:
362                     return "[FD: heartbeat ack]";
363                 case SUSPECT:
364                     return "[FD: SUSPECT (suspected_mbrs=" + mbrs + ", from=" + from + ")]";
365                 case NOT_MEMBER:
366                     return "[FD: NOT_MEMBER]";
367                 default:
368                     return "[FD: unknown type (" + type + ")]";
369             }
370         }
371
372         public void writeExternal(ObjectOutput out) throws IOException {
373             out.writeByte(type);
374             if(mbrs == null)
375                 out.writeBoolean(false);
376             else {
377                 out.writeBoolean(true);
378                 out.writeInt(mbrs.size());
379                 for(Iterator JavaDoc it=mbrs.iterator(); it.hasNext();) {
380                     Address addr=(Address)it.next();
381                     Marshaller.write(addr, out);
382                 }
383             }
384             Marshaller.write(from, out);
385         }
386
387         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
388             type=in.readByte();
389             boolean mbrs_not_null=in.readBoolean();
390             if(mbrs_not_null) {
391                 int len=in.readInt();
392                 mbrs=new Vector JavaDoc(11);
393                 for(int i=0; i < len; i++) {
394                     Address addr=(Address)Marshaller.read(in);
395                     mbrs.add(addr);
396                 }
397             }
398             from=(Address)Marshaller.read(in);
399         }
400
401         public void writeTo(DataOutputStream out) throws IOException {
402             out.writeByte(type);
403             out.writeInt(mbrs != null? mbrs.size() : 0);
404             if(mbrs != null) {
405                 for(Iterator JavaDoc it=mbrs.iterator(); it.hasNext();) {
406                     Address address=(Address)it.next();
407                     Util.writeAddress(address, out);
408                 }
409             }
410             Util.writeAddress(from, out);
411         }
412
413
414         public long size() {
415             int retval=Global.BYTE_SIZE;
416             Address addr;
417             if(mbrs != null) {
418                 retval+=Global.INT_SIZE; // size()
419
for(Iterator JavaDoc it=mbrs.iterator(); it.hasNext();) {
420                     addr=(Address)it.next();
421                     if(addr != null)
422                         retval+=addr.size() + Global.BYTE_SIZE; // presence
423
}
424             }
425             retval+=Global.BYTE_SIZE; // presence byte for 'from'
426
if(from != null)
427                 retval+=from.size();
428             return retval;
429         }
430
431         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
432             type=in.readByte();
433             int size=in.readInt();
434             if(size > 0) {
435                 if(mbrs == null)
436                     mbrs=new Vector JavaDoc();
437                 for(int i=0; i < size; i++) {
438                     Address addr=Util.readAddress(in);
439                     mbrs.add(addr);
440                 }
441             }
442             from=Util.readAddress(in);
443         }
444
445     }
446
447
448     private class Monitor implements TimeScheduler.Task {
449         boolean started=true;
450
451         public void stop() {
452             started=false;
453         }
454
455
456         public boolean cancelled() {
457             return !started;
458         }
459
460
461         public long nextInterval() {
462             return timeout;
463         }
464
465
466         public void run() {
467             Message hb_req;
468             long not_heard_from=0; // time in msecs we haven't heard from ping_dest
469

470             if(ping_dest == null) {
471                 if(log.isWarnEnabled())
472                     log.warn("ping_dest is null: members=" + members + ", pingable_mbrs=" +
473                             pingable_mbrs + ", local_addr=" + local_addr);
474                 return;
475             }
476
477
478             // 1. send heartbeat request
479
hb_req=new Message(ping_dest, null, null);
480             hb_req.putHeader(name, new FdHeader(FdHeader.HEARTBEAT)); // send heartbeat request
481
if(log.isDebugEnabled())
482                 log.debug("sending are-you-alive msg to " + ping_dest + " (own address=" + local_addr + ')');
483             passDown(new Event(Event.MSG, hb_req));
484
485             // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been
486
// received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install
487
// a new view
488
not_heard_from=System.currentTimeMillis() - last_ack;
489             // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003)
490
if(not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs
491
if(num_tries >= max_tries) {
492                     if(log.isDebugEnabled())
493                         log.debug("[" + local_addr + "]: received no heartbeat ack from " + ping_dest +
494                                 " for " + (num_tries +1) + " times (" + ((num_tries+1) * timeout) +
495                                 " milliseconds), suspecting it");
496                     // broadcast a SUSPECT message to all members - loop until
497
// unsuspect or view change is received
498
bcast_task.addSuspectedMember(ping_dest);
499                     num_tries=0;
500                 }
501                 else {
502                     if(log.isDebugEnabled())
503                         log.debug("heartbeat missing from " + ping_dest + " (number=" + num_tries + ')');
504                     num_tries++;
505                 }
506             }
507         }
508
509
510         public String JavaDoc toString() {
511             return "" + started;
512         }
513
514     }
515
516
517     /**
518      * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
519      * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
520      * sure they are retransmitted until a view has been received which doesn't contain the suspected members
521      * any longer. Then the task terminates.
522      */

523     private class Broadcaster {
524         final Vector JavaDoc suspected_mbrs=new Vector JavaDoc(7);
525         BroadcastTask task=null;
526         private final Object JavaDoc bcast_mutex=new Object JavaDoc();
527
528
529         Vector JavaDoc getSuspectedMembers() {
530             return suspected_mbrs;
531         }
532
533         /**
534          * Starts a new task, or - if already running - adds the argument to the running task.
535          * @param suspect
536          */

537         private void startBroadcastTask(Address suspect) {
538             synchronized(bcast_mutex) {
539                 if(task == null || task.cancelled()) {
540                     task=new BroadcastTask((Vector JavaDoc)suspected_mbrs.clone());
541                     task.addSuspectedMember(suspect);
542                     task.run(); // run immediately the first time
543
timer.add(task); // then every timeout milliseconds, until cancelled
544
if(log.isTraceEnabled())
545                         log.trace("BroadcastTask started");
546                 }
547                 else {
548                     task.addSuspectedMember(suspect);
549                 }
550             }
551         }
552
553         private void stopBroadcastTask() {
554             synchronized(bcast_mutex) {
555                 if(task != null) {
556                     task.stop();
557                     task=null;
558                 }
559             }
560         }
561
562         /** Adds a suspected member. Starts the task if not yet running */
563         void addSuspectedMember(Address mbr) {
564             if(mbr == null) return;
565             if(!members.contains(mbr)) return;
566             synchronized(suspected_mbrs) {
567                 if(!suspected_mbrs.contains(mbr)) {
568                     suspected_mbrs.addElement(mbr);
569                     startBroadcastTask(mbr);
570                 }
571             }
572         }
573
574         void removeSuspectedMember(Address suspected_mbr) {
575             if(suspected_mbr == null) return;
576             if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);
577             synchronized(suspected_mbrs) {
578                 suspected_mbrs.removeElement(suspected_mbr);
579                 if(suspected_mbrs.size() == 0)
580                     stopBroadcastTask();
581             }
582         }
583
584         void removeAll() {
585             synchronized(suspected_mbrs) {
586                 suspected_mbrs.removeAllElements();
587                 stopBroadcastTask();
588             }
589         }
590
591         /** Removes all elements from suspected_mbrs that are <em>not</em> in the new membership */
592         void adjustSuspectedMembers(Vector JavaDoc new_mbrship) {
593             if(new_mbrship == null || new_mbrship.size() == 0) return;
594             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
595             synchronized(suspected_mbrs) {
596                 sb.append("suspected_mbrs: ").append(suspected_mbrs);
597                 suspected_mbrs.retainAll(new_mbrship);
598                 if(suspected_mbrs.size() == 0)
599                     stopBroadcastTask();
600                 sb.append(", after adjustment: ").append(suspected_mbrs);
601                 log.debug(sb.toString());
602             }
603         }
604     }
605
606
607     private class BroadcastTask implements TimeScheduler.Task {
608         boolean cancelled=false;
609         private Vector JavaDoc suspected_members=null;
610
611
612         public BroadcastTask(Vector JavaDoc suspected_members) {
613             this.suspected_members=suspected_members;
614         }
615
616         public void stop() {
617             cancelled=true;
618             suspected_members.clear();
619             if(log.isTraceEnabled())
620                 log.trace("BroadcastTask stopped");
621         }
622
623         public boolean cancelled() {
624             return cancelled;
625         }
626
627         public long nextInterval() {
628             return FD.this.timeout;
629         }
630
631         public void run() {
632             Message suspect_msg;
633             FD.FdHeader hdr;
634
635             synchronized(suspected_members) {
636                 if(suspected_members.size() == 0) {
637                     stop();
638                     if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
639                     return;
640                 }
641
642                 hdr=new FdHeader(FdHeader.SUSPECT);
643                 hdr.mbrs=(Vector JavaDoc)suspected_members.clone();
644                 hdr.from=local_addr;
645             }
646             suspect_msg=new Message(); // mcast SUSPECT to all members
647
suspect_msg.putHeader(name, hdr);
648             if(log.isDebugEnabled())
649                 log.debug("broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group");
650             passDown(new Event(Event.MSG, suspect_msg));
651             if(log.isDebugEnabled()) log.debug("task done");
652         }
653
654         public void addSuspectedMember(Address suspect) {
655             if(suspect != null && !suspected_members.contains(suspect)) {
656                 suspected_members.add(suspect);
657             }
658         }
659     }
660
661 }
662
Popular Tags