1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Header; 8 import org.jgroups.Message; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Util; 11 import org.jgroups.util.Streamable; 12 13 import java.io.*; 14 import java.util.Enumeration ; 15 import java.util.Hashtable ; 16 import java.util.Properties ; 17 import java.util.Vector ; 18 19 20 25 public class VERIFY_SUSPECT extends Protocol implements Runnable { 26 Address local_addr=null; 27 long timeout=2000; int num_msgs=1; final Vector members=null; 30 final Hashtable suspects=new Hashtable (); Thread timer=null; 32 final String name="VERIFY_SUSPECT"; 33 34 35 public String getName() { 36 return name; 37 } 38 39 40 public boolean setProperties(Properties props) { 41 String str; 42 43 super.setProperties(props); 44 str=props.getProperty("timeout"); 45 if(str != null) { 46 timeout=Long.parseLong(str); 47 props.remove("timeout"); 48 } 49 50 str=props.getProperty("num_msgs"); 51 if(str != null) { 52 num_msgs=Integer.parseInt(str); 53 if(num_msgs <= 0) { 54 if(log.isWarnEnabled()) log.warn("num_msgs is invalid (" + 55 num_msgs + "): setting it to 1"); 56 num_msgs=1; 57 } 58 props.remove("num_msgs"); 59 } 60 61 if(props.size() > 0) { 62 System.err.println("VERIFY_SUSPECT.setProperties(): the following properties are not recognized:"); 63 props.list(System.out); 64 return false; 65 } 66 return true; 67 } 68 69 70 public void up(Event evt) { 71 Address suspected_mbr; 72 Message msg, rsp; 73 Object obj; 74 VerifyHeader hdr; 75 76 switch(evt.getType()) { 77 78 case Event.SET_LOCAL_ADDRESS: 79 local_addr=(Address)evt.getArg(); 80 break; 81 82 case Event.SUSPECT: suspected_mbr=(Address)evt.getArg(); 84 if(suspected_mbr == null) { 85 if(log.isErrorEnabled()) log.error("suspected member is null"); 86 return; 87 } 88 suspect(suspected_mbr); 89 return; 91 92 case Event.MSG: 93 msg=(Message)evt.getArg(); 94 obj=msg.getHeader(name); 95 if(obj == null || !(obj instanceof VerifyHeader)) 96 break; 97 hdr=(VerifyHeader)msg.removeHeader(name); 98 switch(hdr.type) { 99 case VerifyHeader.ARE_YOU_DEAD: 100 if(hdr.from == null) { 101 if(log.isErrorEnabled()) log.error("ARE_YOU_DEAD: hdr.from is null"); 102 } 103 else { 104 for(int i=0; i < num_msgs; i++) { 105 rsp=new Message(hdr.from, null, null); 106 rsp.putHeader(name, new VerifyHeader(VerifyHeader.I_AM_NOT_DEAD, local_addr)); 107 passDown(new Event(Event.MSG, rsp)); 108 } 109 } 110 return; 111 case VerifyHeader.I_AM_NOT_DEAD: 112 if(hdr.from == null) { 113 if(log.isErrorEnabled()) log.error("I_AM_NOT_DEAD: hdr.from is null"); 114 return; 115 } 116 unsuspect(hdr.from); 117 return; 118 } 119 return; 120 } 121 passUp(evt); 122 } 123 124 125 132 public void run() { 133 Address mbr; 134 long val, curr_time, diff; 135 136 while(timer != null && Thread.currentThread().equals(timer) && suspects.size() > 0) { 137 diff=0; 138 139 synchronized(suspects) { 140 for(Enumeration e=suspects.keys(); e.hasMoreElements();) { 141 mbr=(Address)e.nextElement(); 142 val=((Long )suspects.get(mbr)).longValue(); 143 curr_time=System.currentTimeMillis(); 144 diff=curr_time - val; 145 if(diff >= timeout) { if(log.isTraceEnabled()) 147 log.trace("diff=" + diff + ", mbr " + mbr + " is dead (passing up SUSPECT event)"); 148 passUp(new Event(Event.SUSPECT, mbr)); 149 suspects.remove(mbr); 150 continue; 151 } 152 diff=Math.max(diff, timeout - diff); 153 } 154 } 155 156 if(diff > 0) 157 Util.sleep(diff); 158 } 159 timer=null; 160 } 161 162 163 164 165 166 167 170 void suspect(Address mbr) { 171 Message msg; 172 if(mbr == null) return; 173 174 synchronized(suspects) { 175 if(suspects.containsKey(mbr)) 176 return; 177 suspects.put(mbr, new Long (System.currentTimeMillis())); 178 if(log.isTraceEnabled()) log.trace("verifying that " + mbr + " is dead"); 179 for(int i=0; i < num_msgs; i++) { 180 msg=new Message(mbr, null, null); 181 msg.putHeader(name, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr)); 182 passDown(new Event(Event.MSG, msg)); 183 } 184 } 185 if(timer == null) 186 startTimer(); 187 } 188 189 void unsuspect(Address mbr) { 190 if(mbr == null) return; 191 synchronized(suspects) { 192 if(suspects.containsKey(mbr)) { 193 if(log.isTraceEnabled()) log.trace("member " + mbr + " is not dead !"); 194 suspects.remove(mbr); 195 passDown(new Event(Event.UNSUSPECT, mbr)); 196 passUp(new Event(Event.UNSUSPECT, mbr)); 197 } 198 } 199 } 200 201 202 void startTimer() { 203 if(timer == null || !timer.isAlive()) { 204 timer=new Thread (this, "VERIFY_SUSPECT.TimerThread"); 205 timer.setDaemon(true); 206 timer.start(); 207 } 208 } 209 210 public void stop() { 211 Thread tmp; 212 if(timer != null && timer.isAlive()) { 213 tmp=timer; 214 timer=null; 215 tmp.interrupt(); 216 tmp=null; 217 } 218 timer=null; 219 } 220 221 222 223 224 225 226 public static class VerifyHeader extends Header implements Streamable { 227 static final short ARE_YOU_DEAD=1; static final short I_AM_NOT_DEAD=2; 230 short type=ARE_YOU_DEAD; 231 Address from=null; 233 234 public VerifyHeader() { 235 } 237 VerifyHeader(short type) { 238 this.type=type; 239 } 240 241 VerifyHeader(short type, Address from) { 242 this(type); 243 this.from=from; 244 } 245 246 247 public String toString() { 248 switch(type) { 249 case ARE_YOU_DEAD: 250 return "[VERIFY_SUSPECT: ARE_YOU_DEAD]"; 251 case I_AM_NOT_DEAD: 252 return "[VERIFY_SUSPECT: I_AM_NOT_DEAD]"; 253 default: 254 return "[VERIFY_SUSPECT: unknown type (" + type + ")]"; 255 } 256 } 257 258 public void writeExternal(ObjectOutput out) throws IOException { 259 out.writeShort(type); 260 out.writeObject(from); 261 } 262 263 264 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 265 type=in.readShort(); 266 from=(Address)in.readObject(); 267 } 268 269 public void writeTo(DataOutputStream out) throws IOException { 270 out.writeShort(type); 271 Util.writeAddress(from, out); 272 } 273 274 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 275 type=in.readShort(); 276 from=Util.readAddress(in); 277 } 278 279 } 280 281 282 } 283 284 | Popular Tags |