KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FD_PROB


1 // $Id: FD_PROB.java,v 1.6 2004/09/23 16:29:41 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.util.Util;
8
9 import java.io.IOException JavaDoc;
10 import java.io.ObjectInput JavaDoc;
11 import java.io.ObjectOutput JavaDoc;
12 import java.util.Enumeration JavaDoc;
13 import java.util.Hashtable JavaDoc;
14 import java.util.Properties JavaDoc;
15 import java.util.Vector JavaDoc;
16
17
18 /**
19  * Probabilistic failure detection protocol based on "A Gossip-Style Failure Detection Service"
20  * by Renesse, Minsky and Hayden.<p>
21  * Each member maintains a list of all other members: for each member P, 2 data are maintained, a heartbeat
22  * counter and the time of the last increment of the counter. Each member periodically sends its own heartbeat
23  * counter list to a randomly chosen member Q. Q updates its own heartbeat counter list and the associated
24  * time (if counter was incremented). Each member periodically increments its own counter. If, when sending
25  * its heartbeat counter list, a member P detects that another member Q's heartbeat counter was not incremented
26  * for timeout seconds, Q will be suspected.<p>
27  * This protocol can be used both with a PBCAST *and* regular stacks.
28  * @author Bela Ban 1999
29  * @version $Revision: 1.6 $
30  */

31 public class FD_PROB extends Protocol implements Runnable JavaDoc {
32     Address local_addr=null;
33     Thread JavaDoc hb=null;
34     long timeout=3000; // before a member with a non updated timestamp is suspected
35
long gossip_interval=1000;
36     Vector JavaDoc members=null;
37     final Hashtable JavaDoc counters=new Hashtable JavaDoc(); // keys=Addresses, vals=FdEntries
38
final Hashtable JavaDoc invalid_pingers=new Hashtable JavaDoc(); // keys=Address, vals=Integer (number of pings from suspected mbrs)
39
int max_tries=2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
40

41
42     public String JavaDoc getName() {
43         return "FD_PROB";
44     }
45
46
47     public boolean setProperties(Properties JavaDoc props) {
48         String JavaDoc str;
49
50         super.setProperties(props);
51         str=props.getProperty("timeout");
52         if(str != null) {
53             timeout=Long.parseLong(str);
54             props.remove("timeout");
55         }
56
57         str=props.getProperty("gossip_interval");
58         if(str != null) {
59             gossip_interval=Long.parseLong(str);
60             props.remove("gossip_interval");
61         }
62
63         str=props.getProperty("max_tries");
64         if(str != null) {
65             max_tries=Integer.parseInt(str);
66             props.remove("max_tries");
67         }
68
69         if(props.size() > 0) {
70             System.err.println("FD_PROB.setProperties(): the following properties are not recognized:");
71             props.list(System.out);
72             return false;
73         }
74         return true;
75     }
76
77
78     public void start() throws Exception JavaDoc {
79         if(hb == null) {
80             hb=new Thread JavaDoc(this, "FD_PROB.HeartbeatThread");
81             hb.setDaemon(true);
82             hb.start();
83         }
84     }
85
86
87     public void stop() {
88         Thread JavaDoc tmp=null;
89         if(hb != null && hb.isAlive()) {
90             tmp=hb;
91             hb=null;
92             tmp.interrupt();
93             try {
94                 tmp.join(timeout);
95             }
96             catch(Exception JavaDoc ex) {
97             }
98         }
99         hb=null;
100     }
101
102
103     public void up(Event evt) {
104         Message msg;
105         Address hb_sender;
106         FdHeader hdr=null;
107         Object JavaDoc obj;
108
109         switch(evt.getType()) {
110
111             case Event.SET_LOCAL_ADDRESS:
112                 local_addr=(Address) evt.getArg();
113                 break;
114
115             case Event.MSG:
116                 msg=(Message) evt.getArg();
117                 obj=msg.getHeader(getName());
118                 if(obj == null || !(obj instanceof FdHeader)) {
119                     updateCounter(msg.getSrc()); // got a msg from this guy, reset its time (we heard from it now)
120
break;
121                 }
122
123                 hdr=(FdHeader) msg.removeHeader(getName());
124                 switch(hdr.type) {
125                     case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
126
if(checkPingerValidity(msg.getSrc()) == false) // false == sender of heartbeat is not a member
127
return;
128
129                         // 2. Update my own array of counters
130

131                             if(log.isInfoEnabled()) log.info("<-- HEARTBEAT from " + msg.getSrc());
132                         updateCounters(hdr);
133                         return; // don't pass up !
134
case FdHeader.NOT_MEMBER:
135                         if(log.isWarnEnabled()) log.warn("NOT_MEMBER: I'm being shunned; exiting");
136                         passUp(new Event(Event.EXIT));
137                         return;
138                     default:
139                         if(log.isWarnEnabled()) log.warn("FdHeader type " + hdr.type + " not known");
140                         return;
141                 }
142         }
143         passUp(evt); // pass up to the layer above us
144
}
145
146
147     public void down(Event evt) {
148         Message msg;
149         int num_mbrs;
150         Vector JavaDoc excluded_mbrs;
151         FdEntry entry;
152         Address mbr;
153
154         switch(evt.getType()) {
155
156             // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
157
case Event.VIEW_CHANGE:
158                 passDown(evt);
159                 synchronized(this) {
160                     View v=(View) evt.getArg();
161
162                     // mark excluded members
163
excluded_mbrs=computeExcludedMembers(members, v.getMembers());
164                     if(excluded_mbrs != null && excluded_mbrs.size() > 0) {
165                         for(int i=0; i < excluded_mbrs.size(); i++) {
166                             mbr=(Address) excluded_mbrs.elementAt(i);
167                             entry=(FdEntry) counters.get(mbr);
168                             if(entry != null)
169                                 entry.setExcluded(true);
170                         }
171                     }
172
173                     members=v != null ? v.getMembers() : null;
174                     if(members != null) {
175                         num_mbrs=members.size();
176                         if(num_mbrs >= 2) {
177                             if(hb == null) {
178                                 try {
179                                     start();
180                                 }
181                                 catch(Exception JavaDoc ex) {
182                                     if(log.isWarnEnabled()) log.warn("exception when calling start(): " + ex);
183                                 }
184                             }
185                         }
186                         else
187                             stop();
188                     }
189                 }
190                 break;
191
192             default:
193                 passDown(evt);
194                 break;
195         }
196     }
197
198
199     /**
200      Loop while more than 1 member available. Choose a member randomly (not myself !) and send a
201      heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.
202      */

203     public void run() {
204         Message hb_msg;
205         FdHeader hdr;
206         Address hb_dest, key;
207         FdEntry entry;
208         long curr_time, diff;
209
210
211
212             if(log.isInfoEnabled()) log.info("heartbeat thread was started");
213
214         while(hb != null && members.size() > 1) {
215
216             // 1. Get a random member P (excluding ourself)
217
hb_dest=getHeartbeatDest();
218             if(hb_dest == null) {
219                 if(log.isWarnEnabled()) log.warn("hb_dest is null");
220                 Util.sleep(gossip_interval);
221                 continue;
222             }
223
224
225             // 2. Increment own counter
226
entry=(FdEntry) counters.get(local_addr);
227             if(entry == null) {
228                 entry=new FdEntry();
229                 counters.put(local_addr, entry);
230             }
231             entry.incrementCounter();
232
233
234             // 3. Send heartbeat to P
235
hdr=createHeader();
236             if(hdr == null)
237                 if(log.isWarnEnabled()) log.warn("header could not be created. Heartbeat will not be sent");
238             else {
239                 hb_msg=new Message(hb_dest, null, null);
240                 hb_msg.putHeader(getName(), hdr);
241
242                     if(log.isInfoEnabled()) log.info("--> HEARTBEAT to " + hb_dest);
243                 passDown(new Event(Event.MSG, hb_msg));
244             }
245
246
247                 if(log.isInfoEnabled()) log.info("own counters are " + printCounters());
248
249
250             // 4. Suspect members from which we haven't heard for timeout msecs
251
for(Enumeration JavaDoc e=counters.keys(); e.hasMoreElements();) {
252                 curr_time=System.currentTimeMillis();
253                 key=(Address) e.nextElement();
254                 entry=(FdEntry) counters.get(key);
255
256                 if(entry.getTimestamp() > 0 && (diff=curr_time - entry.getTimestamp()) >= timeout) {
257                     if(entry.excluded()) {
258                         if(diff >= 2 * timeout) { // remove members marked as 'excluded' after 2*timeout msecs
259
counters.remove(key);
260
261                                 if(log.isInfoEnabled()) log.info("removed " + key);
262                             continue;
263                         }
264                     }
265                     else {
266
267                             if(log.isInfoEnabled()) log.info("suspecting " + key);
268                         passUp(new Event(Event.SUSPECT, key));
269                     }
270                 }
271             }
272             Util.sleep(gossip_interval);
273         } // end while
274

275
276             if(log.isInfoEnabled()) log.info("heartbeat thread was stopped");
277     }
278
279
280
281
282
283
284
285     /* -------------------------------- Private Methods ------------------------------- */
286
287     Address getHeartbeatDest() {
288         Address retval=null;
289         int r, size;
290         Vector JavaDoc members_copy;
291
292         if(members == null || members.size() < 2 || local_addr == null)
293             return null;
294         members_copy=(Vector JavaDoc) members.clone();
295         members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
296
size=members_copy.size();
297         r=((int) (Math.random() * (size + 1))) % size;
298         retval=(Address) members_copy.elementAt(r);
299         return retval;
300     }
301
302
303     /** Create a header containing the counters for all members */
304     FdHeader createHeader() {
305         int num_mbrs=counters.size(), index=0;
306         FdHeader ret=null;
307         Address key;
308         FdEntry entry;
309
310         if(num_mbrs <= 0)
311             return null;
312         ret=new FdHeader(FdHeader.HEARTBEAT, num_mbrs);
313         for(Enumeration JavaDoc e=counters.keys(); e.hasMoreElements();) {
314             key=(Address) e.nextElement();
315             entry=(FdEntry) counters.get(key);
316             if(entry.excluded())
317                 continue;
318             if(index >= ret.members.length) {
319                 if(log.isWarnEnabled()) log.warn("index " + index + " is out of bounds (" +
320                                                      ret.members.length + ')');
321                 break;
322             }
323             ret.members[index]=key;
324             ret.counters[index]=entry.getCounter();
325             index++;
326         }
327         return ret;
328     }
329
330
331     /** Set my own counters values to max(own-counter, counter) */
332     void updateCounters(FdHeader hdr) {
333         Address key;
334         long counter;
335         FdEntry entry;
336
337         if(hdr == null || hdr.members == null || hdr.counters == null) {
338             if(log.isWarnEnabled()) log.warn("hdr is null or contains no counters");
339             return;
340         }
341
342         for(int i=0; i < hdr.members.length; i++) {
343             key=hdr.members[i];
344             if(key == null) continue;
345             entry=(FdEntry) counters.get(key);
346             if(entry == null) {
347                 entry=new FdEntry(hdr.counters[i]);
348                 counters.put(key, entry);
349                 continue;
350             }
351
352             if(entry.excluded())
353                 continue;
354
355             // only update counter (and adjust timestamp) if new counter is greater then old one
356
entry.setCounter(Math.max(entry.getCounter(), hdr.counters[i]));
357         }
358     }
359
360
361     /** Resets the counter for mbr */
362     void updateCounter(Address mbr) {
363         FdEntry entry;
364
365         if(mbr == null) return;
366         entry=(FdEntry) counters.get(mbr);
367         if(entry != null)
368             entry.setTimestamp();
369     }
370
371
372     String JavaDoc printCounters() {
373         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
374         Address mbr;
375         FdEntry entry;
376
377         for(Enumeration JavaDoc e=counters.keys(); e.hasMoreElements();) {
378             mbr=(Address) e.nextElement();
379             entry=(FdEntry) counters.get(mbr);
380             sb.append("\n" + mbr + ": " + entry._toString());
381         }
382         return sb.toString();
383     }
384
385
386     Vector JavaDoc computeExcludedMembers(Vector JavaDoc old_mbrship, Vector JavaDoc new_mbrship) {
387         Vector JavaDoc ret=new Vector JavaDoc();
388         if(old_mbrship == null || new_mbrship == null) return ret;
389         for(int i=0; i < old_mbrship.size(); i++)
390             if(!new_mbrship.contains(old_mbrship.elementAt(i)))
391                 ret.addElement(old_mbrship.elementAt(i));
392         return ret;
393     }
394
395
396     /** If hb_sender is not a member, send a SUSPECT to sender (after n pings received) */
397     boolean checkPingerValidity(Object JavaDoc hb_sender) {
398         int num_pings=0;
399         Message shun_msg;
400         Header hdr;
401
402         if(hb_sender != null && members != null && !members.contains(hb_sender)) {
403             if(invalid_pingers.containsKey(hb_sender)) {
404                 num_pings=((Integer JavaDoc) invalid_pingers.get(hb_sender)).intValue();
405                 if(num_pings >= max_tries) {
406                     if(log.isErrorEnabled()) log.error("sender " + hb_sender +
407                                                                   " is not member in " + members + " ! Telling it to leave group");
408                     shun_msg=new Message((Address) hb_sender, null, null);
409                     hdr=new FdHeader(FdHeader.NOT_MEMBER);
410                     shun_msg.putHeader(getName(), hdr);
411                     passDown(new Event(Event.MSG, shun_msg));
412                     invalid_pingers.remove(hb_sender);
413                 }
414                 else {
415                     num_pings++;
416                     invalid_pingers.put(hb_sender, new Integer JavaDoc(num_pings));
417                 }
418             }
419             else {
420                 num_pings++;
421                 invalid_pingers.put(hb_sender, new Integer JavaDoc(num_pings));
422             }
423             return false;
424         }
425         else
426             return true;
427     }
428
429
430     /* ----------------------------- End of Private Methods --------------------------- */
431
432
433
434
435
436
437     public static class FdHeader extends Header {
438         static final int HEARTBEAT=1; // sent periodically to a random member
439
static final int NOT_MEMBER=2; // sent to the sender, when it is not a member anymore (shunned)
440

441
442         int type=HEARTBEAT;
443         Address[] members=null;
444         long[] counters=null; // correlates with 'members' (same indexes)
445

446
447         public FdHeader() {
448         } // used for externalization
449

450         FdHeader(int type) {
451             this.type=type;
452         }
453
454         FdHeader(int type, int num_elements) {
455             this(type);
456             members=new Address[num_elements];
457             counters=new long[num_elements];
458         }
459
460
461         public String JavaDoc toString() {
462             switch(type) {
463                 case HEARTBEAT:
464                     return "[FD_PROB: HEARTBEAT]";
465                 case NOT_MEMBER:
466                     return "[FD_PROB: NOT_MEMBER]";
467                 default:
468                     return "[FD_PROB: unknown type (" + type + ")]";
469             }
470         }
471
472         public String JavaDoc printDetails() {
473             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
474             Address mbr;
475             long c;
476
477             if(members != null && counters != null)
478                 for(int i=0; i < members.length; i++) {
479                     mbr=members[i];
480                     if(mbr == null)
481                         sb.append("\n<null>");
482                     else
483                         sb.append("\n" + mbr);
484                     sb.append(": " + counters[i]);
485                 }
486             return sb.toString();
487         }
488
489
490         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
491             out.writeInt(type);
492
493             if(members != null) {
494                 out.writeInt(members.length);
495                 out.writeObject(members);
496             }
497             else
498                 out.writeInt(0);
499
500             if(counters != null) {
501                 out.writeInt(counters.length);
502                 for(int i=0; i < counters.length; i++)
503                     out.writeLong(counters[i]);
504             }
505             else
506                 out.writeInt(0);
507         }
508
509
510         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
511             int num;
512             type=in.readInt();
513
514             num=in.readInt();
515             if(num == 0)
516                 members=null;
517             else {
518                 members=(Address[]) in.readObject();
519             }
520
521             num=in.readInt();
522             if(num == 0)
523                 counters=null;
524             else {
525                 counters=new long[num];
526                 for(int i=0; i < counters.length; i++)
527                     counters[i]=in.readLong();
528             }
529         }
530
531
532     }
533
534
535     private static class FdEntry {
536         private long counter=0; // heartbeat counter
537
private long timestamp=0; // last time the counter was incremented
538
private boolean excluded=false; // set to true if member was excluded from group
539

540
541         FdEntry() {
542
543         }
544
545         FdEntry(long counter) {
546             this.counter=counter;
547             timestamp=System.currentTimeMillis();
548         }
549
550
551         long getCounter() {
552             return counter;
553         }
554
555         long getTimestamp() {
556             return timestamp;
557         }
558
559         boolean excluded() {
560             return excluded;
561         }
562
563
564         synchronized void setCounter(long new_counter) {
565             if(new_counter > counter) { // only set time if counter was incremented
566
timestamp=System.currentTimeMillis();
567                 counter=new_counter;
568             }
569         }
570
571         synchronized void incrementCounter() {
572             counter++;
573             timestamp=System.currentTimeMillis();
574         }
575
576         synchronized void setTimestamp() {
577             timestamp=System.currentTimeMillis();
578         }
579
580         synchronized void setExcluded(boolean flag) {
581             excluded=flag;
582         }
583
584
585         public String JavaDoc toString() {
586             return "counter=" + counter + ", timestamp=" + timestamp + ", excluded=" + excluded;
587         }
588
589         public String JavaDoc _toString() {
590             return "counter=" + counter + ", age=" + (System.currentTimeMillis() - timestamp) +
591                     ", excluded=" + excluded;
592         }
593     }
594
595
596 }
597
Popular Tags