KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > VERIFY_SUSPECT


1 // $Id: VERIFY_SUSPECT.java,v 1.13 2005/04/22 15:57:25 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Header;
8 import org.jgroups.Message;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Util;
11 import org.jgroups.util.Streamable;
12
13 import java.io.*;
14 import java.util.Enumeration JavaDoc;
15 import java.util.Hashtable JavaDoc;
16 import java.util.Properties JavaDoc;
17 import java.util.Vector JavaDoc;
18
19
20 /**
21  * Catches SUSPECT events traveling up the stack. Verifies that the suspected member is really dead. If yes,
22  * passes SUSPECT event up the stack, otherwise discards it. Has to be placed somewhere above the FD layer and
23  * below the GMS layer (receiver of the SUSPECT event). Note that SUSPECT events may be reordered by this protocol.
24  */

25 public class VERIFY_SUSPECT extends Protocol implements Runnable JavaDoc {
26     Address local_addr=null;
27     long timeout=2000; // number of millisecs to wait for an are-you-dead msg
28
int num_msgs=1; // number of are-you-alive msgs and i-am-not-dead responses (for redundancy)
29
final Vector JavaDoc members=null;
30     final Hashtable JavaDoc suspects=new Hashtable JavaDoc(); // keys=Addresses, vals=time in mcses since added
31
Thread JavaDoc timer=null;
32     final String JavaDoc name="VERIFY_SUSPECT";
33
34
35     public String JavaDoc getName() {
36         return name;
37     }
38
39
40     public boolean setProperties(Properties JavaDoc props) {
41         String JavaDoc str;
42
43         super.setProperties(props);
44         str=props.getProperty("timeout");
45         if(str != null) {
46             timeout=Long.parseLong(str);
47             props.remove("timeout");
48         }
49
50         str=props.getProperty("num_msgs");
51         if(str != null) {
52             num_msgs=Integer.parseInt(str);
53             if(num_msgs <= 0) {
54                 if(log.isWarnEnabled()) log.warn("num_msgs is invalid (" +
55                         num_msgs + "): setting it to 1");
56                 num_msgs=1;
57             }
58             props.remove("num_msgs");
59         }
60
61         if(props.size() > 0) {
62             System.err.println("VERIFY_SUSPECT.setProperties(): the following properties are not recognized:");
63             props.list(System.out);
64             return false;
65         }
66         return true;
67     }
68
69
70     public void up(Event evt) {
71         Address suspected_mbr;
72         Message msg, rsp;
73         Object JavaDoc obj;
74         VerifyHeader hdr;
75
76         switch(evt.getType()) {
77
78         case Event.SET_LOCAL_ADDRESS:
79             local_addr=(Address)evt.getArg();
80             break;
81
82         case Event.SUSPECT: // it all starts here ...
83
suspected_mbr=(Address)evt.getArg();
84             if(suspected_mbr == null) {
85                 if(log.isErrorEnabled()) log.error("suspected member is null");
86                 return;
87             }
88             suspect(suspected_mbr);
89             return; // don't pass up; we will decide later (after verification) whether to pass it up
90

91
92         case Event.MSG:
93             msg=(Message)evt.getArg();
94             obj=msg.getHeader(name);
95             if(obj == null || !(obj instanceof VerifyHeader))
96                 break;
97             hdr=(VerifyHeader)msg.removeHeader(name);
98             switch(hdr.type) {
99             case VerifyHeader.ARE_YOU_DEAD:
100                 if(hdr.from == null) {
101                     if(log.isErrorEnabled()) log.error("ARE_YOU_DEAD: hdr.from is null");
102                 }
103                 else {
104                     for(int i=0; i < num_msgs; i++) {
105                         rsp=new Message(hdr.from, null, null);
106                         rsp.putHeader(name, new VerifyHeader(VerifyHeader.I_AM_NOT_DEAD, local_addr));
107                         passDown(new Event(Event.MSG, rsp));
108                     }
109                 }
110                 return;
111             case VerifyHeader.I_AM_NOT_DEAD:
112                 if(hdr.from == null) {
113                     if(log.isErrorEnabled()) log.error("I_AM_NOT_DEAD: hdr.from is null");
114                     return;
115                 }
116                 unsuspect(hdr.from);
117                 return;
118             }
119             return;
120         }
121         passUp(evt);
122     }
123
124
125     /**
126      * Will be started when a suspect is added to the suspects hashtable. Continually iterates over the
127      * entries and removes entries whose time have elapsed. For each removed entry, a SUSPECT event is passed
128      * up the stack (because elapsed time means verification of member's liveness failed). Computes the shortest
129      * time to wait (min of all timeouts) and waits(time) msecs. Will be woken up when entry is removed (in case
130      * of successful verification of that member's liveness). Terminates when no entry remains in the hashtable.
131      */

132     public void run() {
133         Address mbr;
134         long val, curr_time, diff;
135
136         while(timer != null && Thread.currentThread().equals(timer) && suspects.size() > 0) {
137             diff=0;
138
139             synchronized(suspects) {
140                 for(Enumeration JavaDoc e=suspects.keys(); e.hasMoreElements();) {
141                     mbr=(Address)e.nextElement();
142                     val=((Long JavaDoc)suspects.get(mbr)).longValue();
143                     curr_time=System.currentTimeMillis();
144                     diff=curr_time - val;
145                     if(diff >= timeout) { // haven't been unsuspected, pass up SUSPECT
146
if(log.isTraceEnabled())
147                             log.trace("diff=" + diff + ", mbr " + mbr + " is dead (passing up SUSPECT event)");
148                         passUp(new Event(Event.SUSPECT, mbr));
149                         suspects.remove(mbr);
150                         continue;
151                     }
152                     diff=Math.max(diff, timeout - diff);
153                 }
154             }
155
156             if(diff > 0)
157                 Util.sleep(diff);
158         }
159         timer=null;
160     }
161
162
163
164     /* --------------------------------- Private Methods ----------------------------------- */
165
166
167     /**
168      * Sends ARE_YOU_DEAD message to suspected_mbr, wait for return or timeout
169      */

170     void suspect(Address mbr) {
171         Message msg;
172         if(mbr == null) return;
173
174         synchronized(suspects) {
175             if(suspects.containsKey(mbr))
176                 return;
177             suspects.put(mbr, new Long JavaDoc(System.currentTimeMillis()));
178             if(log.isTraceEnabled()) log.trace("verifying that " + mbr + " is dead");
179             for(int i=0; i < num_msgs; i++) {
180                 msg=new Message(mbr, null, null);
181                 msg.putHeader(name, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr));
182                 passDown(new Event(Event.MSG, msg));
183             }
184         }
185         if(timer == null)
186             startTimer();
187     }
188
189     void unsuspect(Address mbr) {
190         if(mbr == null) return;
191         synchronized(suspects) {
192             if(suspects.containsKey(mbr)) {
193                 if(log.isTraceEnabled()) log.trace("member " + mbr + " is not dead !");
194                 suspects.remove(mbr);
195                 passDown(new Event(Event.UNSUSPECT, mbr));
196                 passUp(new Event(Event.UNSUSPECT, mbr));
197             }
198         }
199     }
200
201
202     void startTimer() {
203         if(timer == null || !timer.isAlive()) {
204             timer=new Thread JavaDoc(this, "VERIFY_SUSPECT.TimerThread");
205             timer.setDaemon(true);
206             timer.start();
207         }
208     }
209
210     public void stop() {
211         Thread JavaDoc tmp;
212         if(timer != null && timer.isAlive()) {
213             tmp=timer;
214             timer=null;
215             tmp.interrupt();
216             tmp=null;
217         }
218         timer=null;
219     }
220     /* ----------------------------- End of Private Methods -------------------------------- */
221
222
223
224
225
226     public static class VerifyHeader extends Header implements Streamable {
227         static final short ARE_YOU_DEAD=1; // 'from' is sender of verify msg
228
static final short I_AM_NOT_DEAD=2; // 'from' is suspected member
229

230         short type=ARE_YOU_DEAD;
231         Address from=null; // member who wants to verify that suspected_mbr is dead
232

233
234         public VerifyHeader() {
235         } // used for externalization
236

237         VerifyHeader(short type) {
238             this.type=type;
239         }
240
241         VerifyHeader(short type, Address from) {
242             this(type);
243             this.from=from;
244         }
245
246
247         public String JavaDoc toString() {
248             switch(type) {
249                 case ARE_YOU_DEAD:
250                     return "[VERIFY_SUSPECT: ARE_YOU_DEAD]";
251                 case I_AM_NOT_DEAD:
252                     return "[VERIFY_SUSPECT: I_AM_NOT_DEAD]";
253                 default:
254                     return "[VERIFY_SUSPECT: unknown type (" + type + ")]";
255             }
256         }
257
258         public void writeExternal(ObjectOutput out) throws IOException {
259             out.writeShort(type);
260             out.writeObject(from);
261         }
262
263
264         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
265             type=in.readShort();
266             from=(Address)in.readObject();
267         }
268
269         public void writeTo(DataOutputStream out) throws IOException {
270             out.writeShort(type);
271             Util.writeAddress(from, out);
272         }
273
274         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
275             type=in.readShort();
276             from=Util.readAddress(in);
277         }
278
279     }
280
281
282 }
283
284
Popular Tags