KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FD_PID


1 // $Id: FD_PID.java,v 1.6 2004/09/23 16:29:41 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.IpAddress;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Promise;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11
12 import java.io.IOException JavaDoc;
13 import java.io.ObjectInput JavaDoc;
14 import java.io.ObjectOutput JavaDoc;
15 import java.net.InetAddress JavaDoc;
16 import java.util.Enumeration JavaDoc;
17 import java.util.Hashtable JavaDoc;
18 import java.util.Properties JavaDoc;
19 import java.util.Vector JavaDoc;
20
21
22 /**
23  * Process-ID based FD protocol. The existence of a process will be tested
24  * via the process ID instead of message based pinging. In order to probe a process' existence, the application (or
25  * some other protocol layer) has to send down a SET_PID event for the member. The addresses of all members will
26  * be associated with their respective PIDs. The PID will be used to probe for the existence of that process.<p>
27  * A cache of Addresses and PIDs is maintained in each member, which is adjusted upon reception of view changes.
28  * The population of the addr:pid cache is as follows:<br>
29  * When a new member joins, it requests the PID cache from the coordinator. Then it broadcasts its own addr:pid
30  * association, so all members can update their cache. When a member P is to be pinged by Q, and Q doesn't have
31  * P'd PID, Q will broadcast a WHO_HAS_PID message, to which all members who have that entry will respond. The
32  * latter case should actually never happen because all members should always have consistent caches. However,
33  * it is left in the code as a second line of defense.<p>
34  * Note that
35  * <em>1. The SET_PID has to be sent down after connecting to a channel !</em><p>
36  * <em>2. Note that if a process is shunned and subsequently reconnects, the SET_PID event has to be resent !</em><p>
37  * <em>3. This protocol only works for groups whose members are on the same host </em>. 'Host' actually means the
38  * same IP address (e.g. for multi-homed systems).
39  */

40 public class FD_PID extends Protocol {
41     Address ping_dest=null; // address of the member we monitor
42
int ping_pid=0; // PID of the member we monitor
43
Address local_addr=null; // our own address
44
int local_pid=0; // PID of this process
45
long timeout=3000; // msecs to wait for an are-you-alive msg
46
long get_pids_timeout=3000; // msecs to wait for the PID cache from the coordinator
47
final long get_pids_retry_timeout=500; // msecs to wait until we retry fetching the cache from the coord
48
int num_tries=3; // attempts the coord is solicited for PID cache until we give up
49
final Vector JavaDoc members=new Vector JavaDoc(); // list of group members (updated on VIEW_CHANGE)
50
final Hashtable JavaDoc pids=new Hashtable JavaDoc(); // keys=Addresses, vals=Integer (PIDs)
51
boolean own_pid_sent=false; // has own PID been broadcast yet ?
52
final Vector JavaDoc pingable_mbrs=new Vector JavaDoc(); // mbrs from which we select ping_dest. possible subset of 'members'
53
final Promise get_pids_promise=new Promise(); // used for rendezvous on GET_PIDS and GET_PIDS_RSP
54
boolean got_cache_from_coord=false; // was cache already fetched ?
55
TimeScheduler timer=null; // timer for recurring task of liveness pinging
56
Monitor monitor=null; // object that performs the actual monitoring
57

58
59     public String JavaDoc getName() {
60         return "FD_PID";
61     }
62
63
64     public boolean setProperties(Properties JavaDoc props) {
65         String JavaDoc str;
66
67         super.setProperties(props);
68         str=props.getProperty("timeout");
69         if(str != null) {
70             timeout=Long.parseLong(str);
71             props.remove("timeout");
72         }
73
74         str=props.getProperty("get_pids_timeout");
75         if(str != null) {
76             get_pids_timeout=Long.parseLong(str);
77             props.remove("get_pids_timeout");
78         }
79
80         str=props.getProperty("num_tries");
81         if(str != null) {
82             num_tries=Integer.parseInt(str);
83             props.remove("num_tries");
84         }
85
86         if(props.size() > 0) {
87             System.err.println("FD_PID.setProperties(): the following properties are not recognized:");
88             props.list(System.out);
89             return false;
90         }
91         return true;
92     }
93
94
95     public void start() throws Exception JavaDoc {
96         if(stack != null && stack.timer != null)
97             timer=stack.timer;
98         else {
99             if(log.isWarnEnabled()) log.warn("TimeScheduler in protocol stack is null (or protocol stack is null)");
100             return;
101         }
102
103         if(monitor != null && monitor.started == false) {
104             monitor=null;
105         }
106         if(monitor == null) {
107             monitor=new Monitor();
108             timer.add(monitor, true); // fixed-rate scheduling
109
}
110     }
111
112     public void stop() {
113         if(monitor != null) {
114             monitor.stop();
115             monitor=null;
116         }
117     }
118
119
120     public void up(Event evt) {
121         Message msg;
122         FdHeader hdr=null;
123         Object JavaDoc tmphdr;
124
125         switch(evt.getType()) {
126
127             case Event.SET_LOCAL_ADDRESS:
128                 local_addr=(Address)evt.getArg();
129                 break;
130
131             case Event.MSG:
132                 msg=(Message)evt.getArg();
133                 tmphdr=msg.getHeader(getName());
134                 if(tmphdr == null || !(tmphdr instanceof FdHeader))
135                     break; // message did not originate from FD_PID layer, just pass up
136

137                 hdr=(FdHeader)msg.removeHeader(getName());
138
139                 switch(hdr.type) {
140
141                     case FdHeader.SUSPECT:
142                         if(hdr.mbr != null) {
143
144                             if(log.isInfoEnabled()) log.info("[SUSPECT] hdr: " + hdr);
145                             passUp(new Event(Event.SUSPECT, hdr.mbr));
146                             passDown(new Event(Event.SUSPECT, hdr.mbr));
147                         }
148                         break;
149
150                         // If I have the PID for the address 'hdr.mbr', return it. Otherwise look it up in my cache and return it
151
case FdHeader.WHO_HAS_PID:
152                         if(local_addr != null && local_addr.equals(msg.getSrc()))
153                             return; // don't reply to WHO_HAS bcasts sent by me !
154

155                         if(hdr.mbr == null) {
156                             if(log.isErrorEnabled()) log.error("[WHO_HAS_PID] hdr.mbr is null");
157                             return;
158                         }
159
160                         // 1. Try my own address, maybe it's me whose PID is wanted
161
if(local_addr != null && local_addr.equals(hdr.mbr) && local_pid > 0) {
162                             sendIHavePidMessage(msg.getSrc(), hdr.mbr, local_pid); // unicast message to msg.getSrc()
163
return;
164                         }
165
166                         // 2. If I don't have it, maybe it is in the cache
167
if(pids.containsKey(hdr.mbr))
168                             sendIHavePidMessage(msg.getSrc(), hdr.mbr, ((Integer JavaDoc)pids.get(hdr.mbr)).intValue()); // ucast msg
169
break;
170
171
172                         // Update the cache with the add:pid entry (if on the same host)
173
case FdHeader.I_HAVE_PID:
174
175                         if(log.isInfoEnabled()) log.info("i-have pid: " + hdr.mbr + " --> " + hdr.pid);
176
177                         if(hdr.mbr == null || hdr.pid <= 0) {
178                             if(log.isErrorEnabled()) log.error("[I_HAVE_PID] hdr.mbr is null or hdr.pid == 0");
179                             return;
180                         }
181
182                         if(!sameHost(local_addr, hdr.mbr)) {
183                             if(log.isErrorEnabled())
184                                 log.error(hdr.mbr + " is not on the same host as I (" +
185                                         local_addr + ", discarding I_HAVE_PID event");
186                             return;
187                         }
188
189                         // if(!pids.containsKey(hdr.mbr))
190
pids.put(hdr.mbr, new Integer JavaDoc(hdr.pid)); // update the cache
191

192                         if(log.isInfoEnabled()) log.info("[" + local_addr + "]: the cache is " + pids);
193
194                         if(ping_pid <= 0 && ping_dest != null && pids.containsKey(ping_dest)) {
195                             ping_pid=((Integer JavaDoc)pids.get(ping_dest)).intValue();
196                             try {
197                                 start();
198                             }
199                             catch(Exception JavaDoc ex) {
200                                 if(log.isWarnEnabled()) log.warn("exception when calling start(): " + ex);
201                             }
202                         }
203                         break;
204
205                         // Return the cache to the sender of this message
206
case FdHeader.GET_PIDS:
207                         if(hdr.mbr == null) {
208
209                             if(log.isErrorEnabled()) log.error("[GET_PIDS]: hdr.mbr == null");
210                             return;
211                         }
212                         hdr=new FdHeader(FdHeader.GET_PIDS_RSP);
213                         hdr.pids=(Hashtable JavaDoc)pids.clone();
214                         msg=new Message(hdr.mbr, null, null);
215                         msg.putHeader(getName(), hdr);
216                         passDown(new Event(Event.MSG, msg));
217                         break;
218
219                     case FdHeader.GET_PIDS_RSP:
220                         if(hdr.pids == null) {
221
222                             if(log.isErrorEnabled()) log.error("[GET_PIDS_RSP]: cache is null");
223                             return;
224                         }
225                         get_pids_promise.setResult(hdr.pids);
226                         break;
227                 }
228                 return;
229         }
230
231         passUp(evt); // pass up to the layer above us
232
}
233
234
235     public void down(Event evt) {
236         Integer JavaDoc pid;
237         Address mbr, tmp_ping_dest;
238         View v;
239
240
241         switch(evt.getType()) {
242
243             case Event.SET_PID:
244                 // 1. Set the PID for local_addr
245
pid=(Integer JavaDoc)evt.getArg();
246                 if(pid == null) {
247                     if(log.isErrorEnabled()) log.error("SET_PID did not contain a pid !");
248                     return;
249                 }
250                 local_pid=pid.intValue();
251
252                 if(log.isInfoEnabled()) log.info("local_pid=" + local_pid);
253                 break;
254
255             case Event.VIEW_CHANGE:
256                 synchronized(this) {
257                     v=(View)evt.getArg();
258                     members.removeAllElements();
259                     members.addAll(v.getMembers());
260                     pingable_mbrs.removeAllElements();
261                     pingable_mbrs.addAll(members);
262                     passDown(evt);
263
264
265                     // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
266
if(!got_cache_from_coord) {
267                         getPidsFromCoordinator();
268                         got_cache_from_coord=true;
269                     }
270
271
272                     // 2. Broadcast my own addr:pid to all members so they can update their cache
273
if(!own_pid_sent) {
274                         if(local_pid > 0) {
275                             sendIHavePidMessage(null, // send to all members
276
local_addr,
277                                     local_pid);
278                             own_pid_sent=true;
279                         }
280                         else
281                             if(log.isWarnEnabled()) log.warn("[VIEW_CHANGE]: local_pid == 0");
282                     }
283
284                     // 3. Remove all entries in 'pids' which are not in the new membership
285
if(members != null) {
286                         for(Enumeration JavaDoc e=pids.keys(); e.hasMoreElements();) {
287                             mbr=(Address)e.nextElement();
288                             if(!members.contains(mbr))
289                                 pids.remove(mbr);
290                         }
291                     }
292                     tmp_ping_dest=determinePingDest();
293                     ping_pid=0;
294                     if(tmp_ping_dest == null) {
295                         stop();
296                         ping_dest=null;
297                     }
298                     else {
299                         ping_dest=tmp_ping_dest;
300                         try {
301                             start();
302                         }
303                         catch(Exception JavaDoc ex) {
304                             if(log.isWarnEnabled()) log.warn("exception when calling start(): " + ex);
305                         }
306                     }
307                 }
308                 break;
309
310             default:
311                 passDown(evt);
312                 break;
313         }
314     }
315
316
317
318
319
320
321
322     /* ----------------------------------- Private Methods -------------------------------------- */
323
324
325
326     /**
327      * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_PIDS message
328      * to coordinator and wait for GET_PIDS_RSP response. Loop until valid response has been received.
329      */

330     void getPidsFromCoordinator() {
331         Address coord;
332         int attempts=num_tries;
333         Message msg;
334         FdHeader hdr;
335         Hashtable JavaDoc result;
336
337         get_pids_promise.reset();
338         while(attempts > 0) {
339             if((coord=determineCoordinator()) != null) {
340                 if(coord.equals(local_addr)) { // we are the first member --> empty cache
341

342                     if(log.isInfoEnabled()) log.info("first member; cache is empty");
343                     return;
344                 }
345                 hdr=new FdHeader(FdHeader.GET_PIDS);
346                 hdr.mbr=local_addr;
347                 msg=new Message(coord, null, null);
348                 msg.putHeader(getName(), hdr);
349                 passDown(new Event(Event.MSG, msg));
350                 result=(Hashtable JavaDoc)get_pids_promise.getResult(get_pids_timeout);
351                 if(result != null) {
352                     pids.putAll(result); // replace all entries (there should be none !) in pids with the new values
353

354                     if(log.isInfoEnabled())
355                         log.info("got cache from " +
356                                 coord + ": cache is " + pids);
357                     return;
358                 }
359                 else {
360
361                     if(log.isErrorEnabled()) log.error("received null cache; retrying");
362                 }
363             }
364
365             Util.sleep(get_pids_retry_timeout);
366             --attempts;
367         }
368     }
369
370
371     void broadcastSuspectMessage(Address suspected_mbr) {
372         Message suspect_msg;
373         FdHeader hdr;
374
375
376         if(log.isInfoEnabled())
377             log.info("suspecting " + suspected_mbr +
378                     " (own address is " + local_addr + ')');
379
380         hdr=new FdHeader(FdHeader.SUSPECT);
381         hdr.mbr=suspected_mbr;
382         suspect_msg=new Message(); // mcast SUSPECT to all members
383
suspect_msg.putHeader(getName(), hdr);
384         passDown(new Event(Event.MSG, suspect_msg));
385     }
386
387
388     void broadcastWhoHasPidMessage(Address mbr) {
389         Message msg;
390         FdHeader hdr;
391
392         if(local_addr != null && mbr != null)
393             if(log.isInfoEnabled()) log.info("[" + local_addr + "]: who-has " + mbr);
394
395         msg=new Message(); // bcast msg
396
hdr=new FdHeader(FdHeader.WHO_HAS_PID);
397         hdr.mbr=mbr;
398         msg.putHeader(getName(), hdr);
399         passDown(new Event(Event.MSG, msg));
400     }
401
402
403     /**
404      * Sends or broadcasts a I_HAVE_PID response. If 'dst' is null, the reponse will be broadcast, otherwise
405      * it will be unicast back to the requester
406      */

407     void sendIHavePidMessage(Address dst, Address mbr, int pid) {
408         Message msg=new Message(dst, null, null);
409         FdHeader hdr=new FdHeader(FdHeader.I_HAVE_PID);
410         hdr.mbr=mbr;
411         hdr.pid=pid;
412         msg.putHeader(getName(), hdr);
413         passDown(new Event(Event.MSG, msg));
414     }
415
416
417     /**
418      * Set ping_dest and ping_pid. If ping_pid is not known, broadcast a WHO_HAS_PID message.
419      */

420     Address determinePingDest() {
421         Address tmp;
422
423         if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
424             return null;
425         for(int i=0; i < pingable_mbrs.size(); i++) {
426             tmp=(Address)pingable_mbrs.elementAt(i);
427             if(local_addr.equals(tmp)) {
428                 if(i + 1 >= pingable_mbrs.size())
429                     return (Address)pingable_mbrs.elementAt(0);
430                 else
431                     return (Address)pingable_mbrs.elementAt(i + 1);
432             }
433         }
434         return null;
435     }
436
437
438     Address determineCoordinator() {
439         return members.size() > 0 ? (Address)members.elementAt(0) : null;
440     }
441
442
443     /**
444      * Checks whether 2 Addresses are on the same host
445      */

446     boolean sameHost(Address one, Address two) {
447         InetAddress JavaDoc a, b;
448         String JavaDoc host_a, host_b;
449
450         if(one == null || two == null) return false;
451         if(!(one instanceof IpAddress) || !(two instanceof IpAddress)) {
452             if(log.isErrorEnabled()) log.error("addresses have to be of type IpAddress to be compared");
453             return false;
454         }
455
456         a=((IpAddress)one).getIpAddress();
457         b=((IpAddress)two).getIpAddress();
458         if(a == null || b == null) return false;
459         host_a=a.getHostAddress();
460         host_b=b.getHostAddress();
461         return host_a.equals(host_b);
462     }
463
464
465
466     /* ------------------------------- End of Private Methods ------------------------------------ */
467
468
469     public static class FdHeader extends Header {
470         static final int SUSPECT=10;
471         static final int WHO_HAS_PID=11;
472         static final int I_HAVE_PID=12;
473         static final int GET_PIDS=13; // sent by joining member to coordinator
474
static final int GET_PIDS_RSP=14; // sent by coordinator to joining member in response to GET_PIDS
475

476
477         int type=SUSPECT;
478         Address mbr=null; // set on SUSPECT (suspected mbr), WHO_HAS_PID (requested mbr), I_HAVE_PID
479
int pid=0; // set on I_HAVE_PID
480
Hashtable JavaDoc pids=null; // set on GET_PIDS_RSP
481

482
483         public FdHeader() {
484         } // used for externalization
485

486         FdHeader(int type) {
487             this.type=type;
488         }
489
490
491         public String JavaDoc toString() {
492             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
493             sb.append(type2String(type));
494             if(mbr != null)
495                 sb.append(", mbr=" + mbr);
496             if(pid > 0)
497                 sb.append(", pid=" + pid);
498             if(pids != null)
499                 sb.append(", pids=" + pids);
500             return sb.toString();
501         }
502
503
504         public static String JavaDoc type2String(int type) {
505             switch(type) {
506                 case SUSPECT:
507                     return "SUSPECT";
508                 case WHO_HAS_PID:
509                     return "WHO_HAS_PID";
510                 case I_HAVE_PID:
511                     return "I_HAVE_PID";
512                 case GET_PIDS:
513                     return "GET_PIDS";
514                 case GET_PIDS_RSP:
515                     return "GET_PIDS_RSP";
516                 default:
517                     return "unknown type (" + type + ')';
518             }
519         }
520
521         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
522             out.writeInt(type);
523             out.writeObject(mbr);
524             out.writeInt(pid);
525             out.writeObject(pids);
526         }
527
528
529         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
530             type=in.readInt();
531             mbr=(Address)in.readObject();
532             pid=in.readInt();
533             pids=(Hashtable JavaDoc)in.readObject();
534         }
535
536     }
537
538
539     /**
540      * An instance of this class will be added to the TimeScheduler to be scheduled to be run every timeout
541      * msecs. When there is no ping_dest (e.g. only 1 member in the group), this task will be cancelled in
542      * TimeScheduler (and re-scheduled if ping_dest becomes available later).
543      */

544     private class Monitor implements TimeScheduler.Task {
545         boolean started=true;
546
547
548         void stop() {
549             started=false;
550         }
551
552
553         /* -------------------------------------- TimeScheduler.Task Interface -------------------------------- */
554
555         public boolean cancelled() {
556             return !started;
557         }
558
559
560         public long nextInterval() {
561             return timeout;
562         }
563
564
565         /**
566          * Periodically probe for the destination process identified by ping_dest/ping_pid. Suspect the ping_dest
567          * member if /prop/<ping_pid> process does not exist.
568          */

569         public void run() {
570             if(ping_dest == null) {
571                 if(log.isWarnEnabled()) log.warn("ping_dest is null, skipping ping");
572                 return;
573             }
574
575
576             if(log.isInfoEnabled())
577                 log.info("ping_dest=" + ping_dest + ", ping_pid=" + ping_pid +
578                         ", cache=" + pids);
579
580             // If the PID for ping_dest is not known, send a broadcast to solicit it
581
if(ping_pid <= 0) {
582                 if(ping_dest != null && pids.containsKey(ping_dest)) {
583                     ping_pid=((Integer JavaDoc)pids.get(ping_dest)).intValue();
584
585                     if(log.isInfoEnabled())
586                         log.info("found PID for " +
587                                 ping_dest + " in cache (pid=" + ping_pid + ')');
588                 }
589                 else {
590
591                     if(log.isErrorEnabled())
592                         log.error("PID for " + ping_dest + " not known" +
593                                 ", cache is " + pids);
594                     broadcastWhoHasPidMessage(ping_dest);
595                     return;
596                 }
597             }
598
599             if(!Util.fileExists("/proc/" + ping_pid)) {
600
601                 if(log.isInfoEnabled()) log.info("process " + ping_pid + " does not exist");
602                 broadcastSuspectMessage(ping_dest);
603                 pingable_mbrs.removeElement(ping_dest);
604                 ping_dest=determinePingDest();
605                 if(ping_dest == null)
606                     stop();
607                 ping_pid=0;
608             }
609             else {
610
611                 if(log.isInfoEnabled()) log.info(ping_dest + " is alive");
612             }
613         }
614
615         /* ---------------------------------- End of TimeScheduler.Task Interface ---------------------------- */
616
617     }
618
619 }
620
Popular Tags