KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FD_SIMPLE


1 // $Id: FD_SIMPLE.java,v 1.7 2004/09/23 16:29:41 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.util.Promise;
8 import org.jgroups.util.TimeScheduler;
9
10 import java.io.IOException JavaDoc;
11 import java.io.ObjectInput JavaDoc;
12 import java.io.ObjectOutput JavaDoc;
13 import java.util.HashMap JavaDoc;
14 import java.util.Iterator JavaDoc;
15 import java.util.Properties JavaDoc;
16 import java.util.Vector JavaDoc;
17
18
19 /**
20  * Simple failure detection protocol. Periodically sends a are-you-alive message to a randomly chosen member
21  * (excluding itself) and waits for a response. If a response has not been received within timeout msecs, a counter
22  * associated with that member will be incremented. If the counter exceeds max_missed_hbs, that member will be
23  * suspected. When a message or a heartbeat are received, the counter is reset to 0.
24  *
25  * @author Bela Ban Aug 2002
26  * @version $Revision: 1.7 $
27  */

28 public class FD_SIMPLE extends Protocol {
29     Address local_addr=null;
30     TimeScheduler timer=null;
31     HeartbeatTask task=null;
32     long interval=3000; // interval in msecs between are-you-alive messages
33
long timeout=3000; // time (in msecs) to wait for a response to are-you-alive
34
final Vector JavaDoc members=new Vector JavaDoc();
35     final HashMap JavaDoc counters=new HashMap JavaDoc(); // keys=Addresses, vals=Integer (count)
36
int max_missed_hbs=5; // max number of missed responses until a member is suspected
37
static final String JavaDoc name="FD_SIMPLE";
38
39
40     public String JavaDoc getName() {
41         return "FD_SIMPLE";
42     }
43
44     public void init() throws Exception JavaDoc {
45         timer=stack.timer;
46     }
47
48     public boolean setProperties(Properties JavaDoc props) {
49         String JavaDoc str;
50
51         super.setProperties(props);
52         str=props.getProperty("timeout");
53         if(str != null) {
54             timeout=Long.parseLong(str);
55             props.remove("timeout");
56         }
57
58         str=props.getProperty("interval");
59         if(str != null) {
60             interval=Long.parseLong(str);
61             props.remove("interval");
62         }
63
64         str=props.getProperty("max_missed_hbs");
65         if(str != null) {
66             max_missed_hbs=Integer.parseInt(str);
67             props.remove("max_missed_hbs");
68         }
69
70         if(props.size() > 0) {
71             System.err.println("FD_SIMPLE.setProperties(): the following properties are not recognized:");
72             props.list(System.out);
73             return false;
74         }
75         return true;
76     }
77
78
79     public void stop() {
80         if(task != null) {
81             task.stop();
82             task=null;
83         }
84     }
85
86
87     public void up(Event evt) {
88         Message msg, rsp;
89         Address sender;
90         FdHeader hdr=null;
91         Object JavaDoc obj;
92         boolean counter_reset=false;
93
94         switch(evt.getType()) {
95
96             case Event.SET_LOCAL_ADDRESS:
97                 local_addr=(Address)evt.getArg();
98                 break;
99
100             case Event.MSG:
101                 msg=(Message)evt.getArg();
102                 sender=msg.getSrc();
103                 resetCounter(sender);
104                 counter_reset=true;
105
106                 hdr=(FdHeader)msg.removeHeader(name);
107                 if(hdr == null)
108                     break;
109
110                 switch(hdr.type) {
111                     case FdHeader.ARE_YOU_ALIVE: // are-you-alive request, send i-am-alive response
112
rsp=new Message(sender, null, null);
113                         rsp.putHeader(name, new FdHeader(FdHeader.I_AM_ALIVE));
114                         passDown(new Event(Event.MSG, rsp));
115                         return; // don't pass up further
116

117                     case FdHeader.I_AM_ALIVE:
118                         if(log.isInfoEnabled()) log.info("received I_AM_ALIVE response from " + sender);
119                         if(task != null)
120                             task.receivedHeartbeatResponse(sender);
121                         if(!counter_reset)
122                             resetCounter(sender);
123                         return;
124
125                     default:
126                         if(log.isWarnEnabled()) log.warn("FdHeader type " + hdr.type + " not known");
127                         return;
128                 }
129         }
130
131         passUp(evt); // pass up to the layer above us
132
}
133
134
135     public void down(Event evt) {
136         Message msg;
137         int num_mbrs;
138         Address mbr;
139         View new_view;
140         Address key;
141
142         switch(evt.getType()) {
143
144             // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
145
case Event.VIEW_CHANGE:
146                 new_view=(View)evt.getArg();
147                 members.clear();
148                 members.addAll(new_view.getMembers());
149                 if(new_view.size() > 1) {
150                     if(task == null) {
151                         task=new HeartbeatTask();
152                         if(log.isInfoEnabled()) log.info("starting heartbeat task");
153                         timer.add(task, true);
154                     }
155                 }
156                 else {
157                     if(task != null) {
158                         if(log.isInfoEnabled()) log.info("stopping heartbeat task");
159                         task.stop(); // will be removed from TimeScheduler
160
task=null;
161                     }
162                 }
163
164                 // remove all keys from 'counters' which are not in this new view
165
for(Iterator JavaDoc it=counters.keySet().iterator(); it.hasNext();) {
166                     key=(Address)it.next();
167                     if(!members.contains(key)) {
168
169                         if(log.isInfoEnabled()) log.info("removing " + key + " from counters");
170                         it.remove();
171                     }
172                 }
173         }
174
175         passDown(evt);
176     }
177     
178
179
180
181
182
183
184
185     /* -------------------------------- Private Methods ------------------------------- */
186     
187     Address getHeartbeatDest() {
188         Address retval=null;
189         int r, size;
190         Vector JavaDoc members_copy;
191
192         if(members == null || members.size() < 2 || local_addr == null)
193             return null;
194         members_copy=(Vector JavaDoc)members.clone();
195         members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
196
size=members_copy.size();
197         r=((int)(Math.random() * (size + 1))) % size;
198         retval=(Address)members_copy.elementAt(r);
199         return retval;
200     }
201
202
203     int incrementCounter(Address mbr) {
204         Integer JavaDoc cnt;
205         int ret=0;
206
207         if(mbr == null) return ret;
208         synchronized(counters) {
209             cnt=(Integer JavaDoc)counters.get(mbr);
210             if(cnt == null) {
211                 cnt=new Integer JavaDoc(0);
212                 counters.put(mbr, cnt);
213             }
214             else {
215                 ret=cnt.intValue() + 1;
216                 counters.put(mbr, new Integer JavaDoc(ret));
217             }
218             return ret;
219         }
220     }
221
222
223     void resetCounter(Address mbr) {
224         if(mbr == null) return;
225
226         synchronized(counters) {
227             counters.put(mbr, new Integer JavaDoc(0));
228         }
229     }
230
231
232     String JavaDoc printCounters() {
233         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
234         Address key;
235
236         for(Iterator JavaDoc it=counters.keySet().iterator(); it.hasNext();) {
237             key=(Address)it.next();
238             sb.append(key).append(": ").append(counters.get(key)).append('\n');
239         }
240         return sb.toString();
241     }
242
243     /* ----------------------------- End of Private Methods --------------------------- */
244
245
246
247
248
249
250     public static class FdHeader extends Header {
251         static final int ARE_YOU_ALIVE=1; // sent periodically to a random member
252
static final int I_AM_ALIVE=2; // response to above message
253

254
255         int type=ARE_YOU_ALIVE;
256
257         public FdHeader() {
258         } // used for externalization
259

260         FdHeader(int type) {
261             this.type=type;
262         }
263
264
265         public String JavaDoc toString() {
266             switch(type) {
267                 case ARE_YOU_ALIVE:
268                     return "[FD_SIMPLE: ARE_YOU_ALIVE]";
269                 case I_AM_ALIVE:
270                     return "[FD_SIMPLE: I_AM_ALIVE]";
271                 default:
272                     return "[FD_SIMPLE: unknown type (" + type + ")]";
273             }
274         }
275
276
277         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
278             out.writeInt(type);
279         }
280
281
282         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
283             type=in.readInt();
284         }
285
286
287     }
288
289
290     class HeartbeatTask implements TimeScheduler.Task {
291         boolean stopped=false;
292         final Promise promise=new Promise();
293         Address dest=null;
294
295         void stop() {
296             stopped=true;
297         }
298
299         public boolean cancelled() {
300             return stopped;
301         }
302
303         public long nextInterval() {
304             return interval;
305         }
306
307         public void receivedHeartbeatResponse(Address from) {
308             if(from != null && dest != null && from.equals(dest))
309                 promise.setResult(from);
310         }
311
312         public void run() {
313             Message msg;
314             int num_missed_hbs=0;
315
316             dest=getHeartbeatDest();
317             if(dest == null) {
318                 if(log.isWarnEnabled()) log.warn("heartbeat destination was null, will not send ARE_YOU_ALIVE message");
319                 return;
320             }
321
322             if(log.isInfoEnabled())
323                 log.info("sending ARE_YOU_ALIVE message to " + dest +
324                         ", counters are\n" + printCounters());
325
326             promise.reset();
327             msg=new Message(dest, null, null);
328             msg.putHeader(name, new FdHeader(FdHeader.ARE_YOU_ALIVE));
329             passDown(new Event(Event.MSG, msg));
330
331             promise.getResult(timeout);
332             num_missed_hbs=incrementCounter(dest);
333             if(num_missed_hbs >= max_missed_hbs) {
334
335                 if(log.isInfoEnabled())
336                     log.info("missed " + num_missed_hbs + " from " + dest +
337                             ", suspecting member");
338                 passUp(new Event(Event.SUSPECT, dest));
339             }
340         }
341     }
342
343
344 }
345
Popular Tags