KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FD_ALL


1 package org.jgroups.protocols;
2
3 import org.jgroups.stack.Protocol;
4 import org.jgroups.*;
5 import org.jgroups.annotations.GuardedBy;
6 import org.jgroups.util.*;
7
8 import java.util.*;
9 import java.util.List JavaDoc;
10 import java.util.concurrent.ConcurrentHashMap JavaDoc;
11 import java.util.concurrent.ScheduledFuture JavaDoc;
12 import java.util.concurrent.TimeUnit JavaDoc;
13 import java.util.concurrent.locks.Lock JavaDoc;
14 import java.util.concurrent.locks.ReentrantLock JavaDoc;
15 import java.io.*;
16
17 /**
18  * Failure detection based on simple heartbeat protocol. Every member periodically multicasts a heartbeat. Every member
19  * also maintains a table of all members (minus itself). When data or a heartbeat from P are received, we reset the
20  * timestamp for P to the current time. Periodically, we check for expired members, and suspect those.
21  * @author Bela Ban
22  * @version $Id: FD_ALL.java,v 1.11 2007/05/01 10:55:10 belaban Exp $
23  */

24 public class FD_ALL extends Protocol {
25     /** Map of addresses and timestamps of last updates */
26     Map JavaDoc<Address,Long JavaDoc> timestamps=new ConcurrentHashMap JavaDoc<Address,Long JavaDoc>();
27
28     /** Number of milliseconds after which a HEARTBEAT is sent to the cluster */
29     long interval=3000;
30
31     /** Number of milliseconds after which a node P is suspected if neither a heartbeat nor data were received from P */
32     long timeout=5000;
33
34     /** when a message is received from P, this is treated as if P sent a heartbeat */
35     boolean msg_counts_as_heartbeat=true;
36
37     Address local_addr=null;
38     final List JavaDoc members=new ArrayList();
39
40     boolean shun=true;
41     TimeScheduler timer=null;
42
43     // task which multicasts HEARTBEAT message after 'interval' ms
44
@GuardedBy("lock")
45     private ScheduledFuture JavaDoc heartbeat_sender_future=null;
46
47     // task which checks for members exceeding timeout and suspects them
48
@GuardedBy("lock")
49     private ScheduledFuture JavaDoc timeout_checker_future=null;
50
51     private boolean tasks_running=false;
52
53     protected int num_heartbeats_sent, num_heartbeats_received=0;
54     protected int num_suspect_events=0;
55
56     final static String JavaDoc name="FD_ALL";
57
58     BoundedList suspect_history=new BoundedList(20);
59     final Map JavaDoc<Address,Integer JavaDoc> invalid_pingers=new HashMap JavaDoc(7); // keys=Address, val=Integer (number of pings from suspected mbrs)
60

61     final Lock JavaDoc lock=new ReentrantLock JavaDoc();
62
63
64
65
66
67     public String JavaDoc getName() {return FD_ALL.name;}
68     public String JavaDoc getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
69     public String JavaDoc getMembers() {return members != null? members.toString() : "null";}
70     public int getHeartbeatsSent() {return num_heartbeats_sent;}
71     public int getHeartbeatsReceived() {return num_heartbeats_received;}
72     public int getSuspectEventsSent() {return num_suspect_events;}
73     public long getTimeout() {return timeout;}
74     public void setTimeout(long timeout) {this.timeout=timeout;}
75     public long getInterval() {return interval;}
76     public void setInterval(long interval) {this.interval=interval;}
77     public boolean isShun() {return shun;}
78     public void setShun(boolean flag) {this.shun=flag;}
79     public boolean isRunning() {return tasks_running;}
80
81     public String JavaDoc printSuspectHistory() {
82         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
83         for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {
84             sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");
85         }
86         return sb.toString();
87     }
88
89     public String JavaDoc printTimestamps() {
90         return printTimeStamps();
91     }
92
93
94     public boolean setProperties(Properties props) {
95         String JavaDoc str;
96
97         super.setProperties(props);
98         str=props.getProperty("timeout");
99         if(str != null) {
100             timeout=Long.parseLong(str);
101             props.remove("timeout");
102         }
103
104         str=props.getProperty("interval");
105         if(str != null) {
106             interval=Long.parseLong(str);
107             props.remove("interval");
108         }
109
110         str=props.getProperty("shun");
111         if(str != null) {
112             shun=Boolean.valueOf(str).booleanValue();
113             props.remove("shun");
114         }
115
116         str=props.getProperty("msg_counts_as_heartbeat");
117         if(str != null) {
118             msg_counts_as_heartbeat=Boolean.valueOf(str).booleanValue();
119             props.remove("msg_counts_as_heartbeat");
120         }
121
122         if(!props.isEmpty()) {
123             log.error("the following properties are not recognized: " + props);
124             return false;
125         }
126         return true;
127     }
128
129     public void resetStats() {
130         num_heartbeats_sent=num_heartbeats_received=num_suspect_events=0;
131         suspect_history.removeAll();
132     }
133
134
135     public void init() throws Exception JavaDoc {
136         if(stack != null && stack.timer != null)
137             timer=stack.timer;
138         else
139             throw new Exception JavaDoc("timer cannot be retrieved from protocol stack");
140     }
141
142
143     public void stop() {
144         stopTasks();
145     }
146
147
148     public Object JavaDoc up(Event evt) {
149         Message msg;
150         Header hdr;
151         Address sender;
152
153         switch(evt.getType()) {
154
155             case Event.SET_LOCAL_ADDRESS:
156                 local_addr=(Address)evt.getArg();
157                 break;
158
159             case Event.MSG:
160                 msg=(Message)evt.getArg();
161                 hdr=(Header)msg.getHeader(name);
162                 if(msg_counts_as_heartbeat)
163                     update(msg.getSrc()); // update when data is received too ? maybe a bit costly
164
if(hdr == null)
165                     break; // message did not originate from FD_ALL layer, just pass up
166

167                 switch(hdr.type) {
168                     case Header.HEARTBEAT: // heartbeat request; send heartbeat ack
169
sender=msg.getSrc();
170                         if(sender.equals(local_addr))
171                             break;
172                         //if(log.isTraceEnabled())
173
// log.trace(local_addr + ": received a heartbeat from " + sender);
174

175                         // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause
176
// the sender to leave the group (and possibly rejoin it later)
177
if(shun && sender != null && members != null && !members.contains(sender)) {
178                             shunInvalidHeartbeatSender(sender);
179                             break;
180                         }
181
182                         update(sender); // updates the heartbeat entry for 'sender'
183
num_heartbeats_received++;
184                         break; // don't pass up !
185

186                     case Header.SUSPECT:
187                         if(log.isTraceEnabled()) log.trace("[SUSPECT] suspect hdr is " + hdr);
188                         down_prot.down(new Event(Event.SUSPECT, hdr.suspected_mbr));
189                         up_prot.up(new Event(Event.SUSPECT, hdr.suspected_mbr));
190                         break;
191
192                     case Header.NOT_MEMBER:
193                         if(shun) {
194                             if(log.isDebugEnabled()) log.debug("[NOT_MEMBER] I'm being shunned; exiting");
195                             up_prot.up(new Event(Event.EXIT));
196                         }
197                         break;
198                 }
199                 return null;
200         }
201         return up_prot.up(evt); // pass up to the layer above us
202
}
203
204
205
206
207
208
209     public Object JavaDoc down(Event evt) {
210         switch(evt.getType()) {
211             case Event.VIEW_CHANGE:
212                 down_prot.down(evt);
213                 View v=(View)evt.getArg();
214                 handleViewChange(v);
215                 return null;
216         }
217         return down_prot.down(evt);
218     }
219
220
221     private void startTasks() {
222         startHeartbeatSender();
223         startTimeoutChecker();
224         tasks_running=true;
225         if(log.isTraceEnabled())
226             log.trace("started heartbeat sender and timeout checker tasks");
227     }
228
229     private void stopTasks() {
230         stopTimeoutChecker();
231         stopHeartbeatSender();
232         tasks_running=false;
233         if(log.isTraceEnabled())
234             log.trace("stopped heartbeat sender and timeout checker tasks");
235     }
236
237     private void startTimeoutChecker() {
238         lock.lock();
239         try {
240             if(timeout_checker_future == null || timeout_checker_future.isDone()) {
241                 timeout_checker_future=timer.scheduleWithFixedDelay(new TimeoutChecker(), interval, interval, TimeUnit.MILLISECONDS);
242             }
243         }
244         finally {
245             lock.unlock();
246         }
247     }
248
249     private void stopTimeoutChecker() {
250          lock.lock();
251          try {
252              if(timeout_checker_future != null) {
253                  timeout_checker_future.cancel(true);
254                  timeout_checker_future=null;
255              }
256          }
257          finally {
258              lock.unlock();
259          }
260      }
261
262
263     private void startHeartbeatSender() {
264         lock.lock();
265         try {
266             if(heartbeat_sender_future == null || heartbeat_sender_future.isDone()) {
267                 heartbeat_sender_future=timer.scheduleWithFixedDelay(new HeartbeatSender(), interval, interval, TimeUnit.MILLISECONDS);
268             }
269         }
270         finally {
271             lock.unlock();
272         }
273     }
274
275      private void stopHeartbeatSender() {
276         lock.lock();
277         try {
278             if(heartbeat_sender_future != null) {
279                 heartbeat_sender_future.cancel(true);
280                 heartbeat_sender_future=null;
281             }
282         }
283         finally {
284             lock.unlock();
285         }
286     }
287
288
289
290
291
292
293
294
295     private void update(Address sender) {
296         if(sender != null && !sender.equals(local_addr))
297             timestamps.put(sender, Long.valueOf(System.currentTimeMillis()));
298     }
299
300
301     private void handleViewChange(View v) {
302         Vector mbrs=v.getMembers();
303         members.clear();
304         members.addAll(mbrs);
305
306         Set keys=timestamps.keySet();
307         keys.retainAll(mbrs); // remove all nodes which have left the cluster
308
for(Iterator it=mbrs.iterator(); it.hasNext();) { // and add new members
309
Address mbr=(Address)it.next();
310             if(mbr.equals(local_addr))
311                 continue;
312             if(!timestamps.containsKey(mbr)) {
313                 timestamps.put(mbr, Long.valueOf(System.currentTimeMillis()));
314             }
315         }
316
317         invalid_pingers.clear();
318
319         if(!tasks_running && members.size() > 1)
320             startTasks();
321         else if(tasks_running && members.size() < 2)
322             stopTasks();
323     }
324
325
326     /**
327      * If sender is not a member, send a NOT_MEMBER to sender (after n pings received)
328      */

329     private void shunInvalidHeartbeatSender(Address sender) {
330         int num_pings=0;
331         Message shun_msg;
332
333         if(invalid_pingers.containsKey(sender)) {
334             num_pings=invalid_pingers.get(sender).intValue();
335             if(num_pings >= 3) {
336                 if(log.isDebugEnabled())
337                     log.debug(sender + " is not in " + members + " ! Shunning it");
338                 shun_msg=new Message(sender, null, null);
339                 shun_msg.setFlag(Message.OOB);
340                 shun_msg.putHeader(name, new Header(Header.NOT_MEMBER));
341                 down_prot.down(new Event(Event.MSG, shun_msg));
342                 invalid_pingers.remove(sender);
343             }
344             else {
345                 num_pings++;
346                 invalid_pingers.put(sender, new Integer JavaDoc(num_pings));
347             }
348         }
349         else {
350             num_pings++;
351             invalid_pingers.put(sender, Integer.valueOf(num_pings));
352         }
353     }
354
355
356     private String JavaDoc printTimeStamps() {
357         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
358         Map.Entry JavaDoc<Address,Long JavaDoc> entry;
359         long current_time=System.currentTimeMillis();
360         for(Iterator it=timestamps.entrySet().iterator(); it.hasNext();) {
361             entry=(Map.Entry JavaDoc)it.next();
362             sb.append(entry.getKey()).append(": ");
363             sb.append(current_time - entry.getValue().longValue()).append(" ms old\n");
364         }
365         return sb.toString();
366     }
367
368     void suspect(Address mbr) {
369         Message suspect_msg=new Message();
370         suspect_msg.setFlag(Message.OOB);
371         Header hdr=new Header(Header.SUSPECT, mbr);
372         suspect_msg.putHeader(name, hdr);
373         down_prot.down(new Event(Event.MSG, suspect_msg));
374         num_suspect_events++;
375         suspect_history.add(mbr);
376     }
377
378
379     public static class Header extends org.jgroups.Header implements Streamable {
380         public static final byte HEARTBEAT = 0;
381         public static final byte SUSPECT = 1;
382         public static final byte NOT_MEMBER = 2; // received as response by pinged mbr when we are not a member
383

384
385         byte type=Header.HEARTBEAT;
386         Address suspected_mbr=null;
387
388
389        /** used for externalization */
390         public Header() {
391         }
392
393         public Header(byte type) {
394             this.type=type;
395         }
396
397         public Header(byte type, Address suspect) {
398             this(type);
399             this.suspected_mbr=suspect;
400         }
401
402
403         public String JavaDoc toString() {
404             switch(type) {
405                 case FD_ALL.Header.HEARTBEAT:
406                     return "heartbeat";
407                 case FD_ALL.Header.SUSPECT:
408                     return "SUSPECT (suspected_mbr=" + suspected_mbr + ")";
409                 case FD_ALL.Header.NOT_MEMBER:
410                     return "NOT_MEMBER";
411                 default:
412                     return "unknown type (" + type + ")";
413             }
414         }
415
416         public void writeExternal(ObjectOutput out) throws IOException {
417             out.writeByte(type);
418             out.writeObject(suspected_mbr);
419         }
420
421         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
422             type=in.readByte();
423             suspected_mbr=(Address)in.readObject();
424         }
425
426         public int size() {
427             int retval=Global.BYTE_SIZE; // type
428
retval+=Util.size(suspected_mbr);
429             return retval;
430         }
431
432         public void writeTo(DataOutputStream out) throws IOException {
433             out.writeByte(type);
434             Util.writeAddress(suspected_mbr, out);
435         }
436
437         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
438             type=in.readByte();
439             suspected_mbr=Util.readAddress(in);
440         }
441
442     }
443
444
445     /**
446      * Class which periodically multicasts a HEARTBEAT message to the cluster
447      */

448     class HeartbeatSender implements Runnable JavaDoc {
449
450         public void run() {
451             Message heartbeat=new Message(); // send to all
452
heartbeat.setFlag(Message.OOB);
453             Header hdr=new Header(Header.HEARTBEAT);
454             heartbeat.putHeader(name, hdr);
455             down_prot.down(new Event(Event.MSG, heartbeat));
456             //if(log.isTraceEnabled())
457
// log.trace(local_addr + ": sent heartbeat to cluster");
458
num_heartbeats_sent++;
459         }
460     }
461
462
463     class TimeoutChecker extends HeartbeatSender {
464
465         public void run() {
466             Map.Entry JavaDoc entry;
467             Object JavaDoc key;
468             Long JavaDoc val;
469
470
471             if(log.isTraceEnabled())
472                 log.trace("checking for expired senders, table is:\n" + printTimeStamps());
473
474             long current_time=System.currentTimeMillis(), diff;
475             for(Iterator it=timestamps.entrySet().iterator(); it.hasNext();) {
476                 entry=(Map.Entry JavaDoc)it.next();
477                 key=entry.getKey();
478                 val=(Long JavaDoc)entry.getValue();
479                 if(val == null) {
480                     it.remove();
481                     continue;
482                 }
483                 diff=current_time - val.longValue();
484                 if(diff > timeout) {
485                     if(log.isTraceEnabled())
486                         log.trace("haven't received a heartbeat from " + key + " for " + diff + " ms, suspecting it");
487                     suspect((Address)key);
488                 }
489             }
490         }
491
492
493     }
494
495
496
497
498 }
499
Popular Tags