1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.util.Promise; 8 import org.jgroups.util.TimeScheduler; 9 10 import java.io.IOException ; 11 import java.io.ObjectInput ; 12 import java.io.ObjectOutput ; 13 import java.util.HashMap ; 14 import java.util.Iterator ; 15 import java.util.Properties ; 16 import java.util.Vector ; 17 18 19 28 public class FD_SIMPLE extends Protocol { 29 Address local_addr=null; 30 TimeScheduler timer=null; 31 HeartbeatTask task=null; 32 long interval=3000; long timeout=3000; final Vector members=new Vector (); 35 final HashMap counters=new HashMap (); int max_missed_hbs=5; static final String name="FD_SIMPLE"; 38 39 40 public String getName() { 41 return "FD_SIMPLE"; 42 } 43 44 public void init() throws Exception { 45 timer=stack.timer; 46 } 47 48 public boolean setProperties(Properties props) { 49 String str; 50 51 super.setProperties(props); 52 str=props.getProperty("timeout"); 53 if(str != null) { 54 timeout=Long.parseLong(str); 55 props.remove("timeout"); 56 } 57 58 str=props.getProperty("interval"); 59 if(str != null) { 60 interval=Long.parseLong(str); 61 props.remove("interval"); 62 } 63 64 str=props.getProperty("max_missed_hbs"); 65 if(str != null) { 66 max_missed_hbs=Integer.parseInt(str); 67 props.remove("max_missed_hbs"); 68 } 69 70 if(props.size() > 0) { 71 System.err.println("FD_SIMPLE.setProperties(): the following properties are not recognized:"); 72 props.list(System.out); 73 return false; 74 } 75 return true; 76 } 77 78 79 public void stop() { 80 if(task != null) { 81 task.stop(); 82 task=null; 83 } 84 } 85 86 87 public void up(Event evt) { 88 Message msg, rsp; 89 Address sender; 90 FdHeader hdr=null; 91 Object obj; 92 boolean counter_reset=false; 93 94 switch(evt.getType()) { 95 96 case Event.SET_LOCAL_ADDRESS: 97 local_addr=(Address)evt.getArg(); 98 break; 99 100 case Event.MSG: 101 msg=(Message)evt.getArg(); 102 sender=msg.getSrc(); 103 resetCounter(sender); 104 counter_reset=true; 105 106 hdr=(FdHeader)msg.removeHeader(name); 107 if(hdr == null) 108 break; 109 110 switch(hdr.type) { 111 case FdHeader.ARE_YOU_ALIVE: rsp=new Message(sender, null, null); 113 rsp.putHeader(name, new FdHeader(FdHeader.I_AM_ALIVE)); 114 passDown(new Event(Event.MSG, rsp)); 115 return; 117 case FdHeader.I_AM_ALIVE: 118 if(log.isInfoEnabled()) log.info("received I_AM_ALIVE response from " + sender); 119 if(task != null) 120 task.receivedHeartbeatResponse(sender); 121 if(!counter_reset) 122 resetCounter(sender); 123 return; 124 125 default: 126 if(log.isWarnEnabled()) log.warn("FdHeader type " + hdr.type + " not known"); 127 return; 128 } 129 } 130 131 passUp(evt); } 133 134 135 public void down(Event evt) { 136 Message msg; 137 int num_mbrs; 138 Address mbr; 139 View new_view; 140 Address key; 141 142 switch(evt.getType()) { 143 144 case Event.VIEW_CHANGE: 146 new_view=(View)evt.getArg(); 147 members.clear(); 148 members.addAll(new_view.getMembers()); 149 if(new_view.size() > 1) { 150 if(task == null) { 151 task=new HeartbeatTask(); 152 if(log.isInfoEnabled()) log.info("starting heartbeat task"); 153 timer.add(task, true); 154 } 155 } 156 else { 157 if(task != null) { 158 if(log.isInfoEnabled()) log.info("stopping heartbeat task"); 159 task.stop(); task=null; 161 } 162 } 163 164 for(Iterator it=counters.keySet().iterator(); it.hasNext();) { 166 key=(Address)it.next(); 167 if(!members.contains(key)) { 168 169 if(log.isInfoEnabled()) log.info("removing " + key + " from counters"); 170 it.remove(); 171 } 172 } 173 } 174 175 passDown(evt); 176 } 177 178 179 180 181 182 183 184 185 186 187 Address getHeartbeatDest() { 188 Address retval=null; 189 int r, size; 190 Vector members_copy; 191 192 if(members == null || members.size() < 2 || local_addr == null) 193 return null; 194 members_copy=(Vector )members.clone(); 195 members_copy.removeElement(local_addr); size=members_copy.size(); 197 r=((int)(Math.random() * (size + 1))) % size; 198 retval=(Address)members_copy.elementAt(r); 199 return retval; 200 } 201 202 203 int incrementCounter(Address mbr) { 204 Integer cnt; 205 int ret=0; 206 207 if(mbr == null) return ret; 208 synchronized(counters) { 209 cnt=(Integer )counters.get(mbr); 210 if(cnt == null) { 211 cnt=new Integer (0); 212 counters.put(mbr, cnt); 213 } 214 else { 215 ret=cnt.intValue() + 1; 216 counters.put(mbr, new Integer (ret)); 217 } 218 return ret; 219 } 220 } 221 222 223 void resetCounter(Address mbr) { 224 if(mbr == null) return; 225 226 synchronized(counters) { 227 counters.put(mbr, new Integer (0)); 228 } 229 } 230 231 232 String printCounters() { 233 StringBuffer sb=new StringBuffer (); 234 Address key; 235 236 for(Iterator it=counters.keySet().iterator(); it.hasNext();) { 237 key=(Address)it.next(); 238 sb.append(key).append(": ").append(counters.get(key)).append('\n'); 239 } 240 return sb.toString(); 241 } 242 243 244 245 246 247 248 249 250 public static class FdHeader extends Header { 251 static final int ARE_YOU_ALIVE=1; static final int I_AM_ALIVE=2; 254 255 int type=ARE_YOU_ALIVE; 256 257 public FdHeader() { 258 } 260 FdHeader(int type) { 261 this.type=type; 262 } 263 264 265 public String toString() { 266 switch(type) { 267 case ARE_YOU_ALIVE: 268 return "[FD_SIMPLE: ARE_YOU_ALIVE]"; 269 case I_AM_ALIVE: 270 return "[FD_SIMPLE: I_AM_ALIVE]"; 271 default: 272 return "[FD_SIMPLE: unknown type (" + type + ")]"; 273 } 274 } 275 276 277 public void writeExternal(ObjectOutput out) throws IOException { 278 out.writeInt(type); 279 } 280 281 282 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 283 type=in.readInt(); 284 } 285 286 287 } 288 289 290 class HeartbeatTask implements TimeScheduler.Task { 291 boolean stopped=false; 292 final Promise promise=new Promise(); 293 Address dest=null; 294 295 void stop() { 296 stopped=true; 297 } 298 299 public boolean cancelled() { 300 return stopped; 301 } 302 303 public long nextInterval() { 304 return interval; 305 } 306 307 public void receivedHeartbeatResponse(Address from) { 308 if(from != null && dest != null && from.equals(dest)) 309 promise.setResult(from); 310 } 311 312 public void run() { 313 Message msg; 314 int num_missed_hbs=0; 315 316 dest=getHeartbeatDest(); 317 if(dest == null) { 318 if(log.isWarnEnabled()) log.warn("heartbeat destination was null, will not send ARE_YOU_ALIVE message"); 319 return; 320 } 321 322 if(log.isInfoEnabled()) 323 log.info("sending ARE_YOU_ALIVE message to " + dest + 324 ", counters are\n" + printCounters()); 325 326 promise.reset(); 327 msg=new Message(dest, null, null); 328 msg.putHeader(name, new FdHeader(FdHeader.ARE_YOU_ALIVE)); 329 passDown(new Event(Event.MSG, msg)); 330 331 promise.getResult(timeout); 332 num_missed_hbs=incrementCounter(dest); 333 if(num_missed_hbs >= max_missed_hbs) { 334 335 if(log.isInfoEnabled()) 336 log.info("missed " + num_missed_hbs + " from " + dest + 337 ", suspecting member"); 338 passUp(new Event(Event.SUSPECT, dest)); 339 } 340 } 341 } 342 343 344 } 345 | Popular Tags |