1 package org.jgroups.protocols; 2 3 import org.jgroups.stack.Protocol; 4 import org.jgroups.*; 5 import org.jgroups.annotations.GuardedBy; 6 import org.jgroups.util.*; 7 8 import java.util.*; 9 import java.util.List ; 10 import java.util.concurrent.ConcurrentHashMap ; 11 import java.util.concurrent.ScheduledFuture ; 12 import java.util.concurrent.TimeUnit ; 13 import java.util.concurrent.locks.Lock ; 14 import java.util.concurrent.locks.ReentrantLock ; 15 import java.io.*; 16 17 24 public class FD_ALL extends Protocol { 25 26 Map <Address,Long > timestamps=new ConcurrentHashMap <Address,Long >(); 27 28 29 long interval=3000; 30 31 32 long timeout=5000; 33 34 35 boolean msg_counts_as_heartbeat=true; 36 37 Address local_addr=null; 38 final List members=new ArrayList(); 39 40 boolean shun=true; 41 TimeScheduler timer=null; 42 43 @GuardedBy("lock") 45 private ScheduledFuture heartbeat_sender_future=null; 46 47 @GuardedBy("lock") 49 private ScheduledFuture timeout_checker_future=null; 50 51 private boolean tasks_running=false; 52 53 protected int num_heartbeats_sent, num_heartbeats_received=0; 54 protected int num_suspect_events=0; 55 56 final static String name="FD_ALL"; 57 58 BoundedList suspect_history=new BoundedList(20); 59 final Map <Address,Integer > invalid_pingers=new HashMap (7); 61 final Lock lock=new ReentrantLock (); 62 63 64 65 66 67 public String getName() {return FD_ALL.name;} 68 public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";} 69 public String getMembers() {return members != null? members.toString() : "null";} 70 public int getHeartbeatsSent() {return num_heartbeats_sent;} 71 public int getHeartbeatsReceived() {return num_heartbeats_received;} 72 public int getSuspectEventsSent() {return num_suspect_events;} 73 public long getTimeout() {return timeout;} 74 public void setTimeout(long timeout) {this.timeout=timeout;} 75 public long getInterval() {return interval;} 76 public void setInterval(long interval) {this.interval=interval;} 77 public boolean isShun() {return shun;} 78 public void setShun(boolean flag) {this.shun=flag;} 79 public boolean isRunning() {return tasks_running;} 80 81 public String printSuspectHistory() { 82 StringBuilder sb=new StringBuilder (); 83 for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) { 84 sb.append(new Date()).append(": ").append(en.nextElement()).append("\n"); 85 } 86 return sb.toString(); 87 } 88 89 public String printTimestamps() { 90 return printTimeStamps(); 91 } 92 93 94 public boolean setProperties(Properties props) { 95 String str; 96 97 super.setProperties(props); 98 str=props.getProperty("timeout"); 99 if(str != null) { 100 timeout=Long.parseLong(str); 101 props.remove("timeout"); 102 } 103 104 str=props.getProperty("interval"); 105 if(str != null) { 106 interval=Long.parseLong(str); 107 props.remove("interval"); 108 } 109 110 str=props.getProperty("shun"); 111 if(str != null) { 112 shun=Boolean.valueOf(str).booleanValue(); 113 props.remove("shun"); 114 } 115 116 str=props.getProperty("msg_counts_as_heartbeat"); 117 if(str != null) { 118 msg_counts_as_heartbeat=Boolean.valueOf(str).booleanValue(); 119 props.remove("msg_counts_as_heartbeat"); 120 } 121 122 if(!props.isEmpty()) { 123 log.error("the following properties are not recognized: " + props); 124 return false; 125 } 126 return true; 127 } 128 129 public void resetStats() { 130 num_heartbeats_sent=num_heartbeats_received=num_suspect_events=0; 131 suspect_history.removeAll(); 132 } 133 134 135 public void init() throws Exception { 136 if(stack != null && stack.timer != null) 137 timer=stack.timer; 138 else 139 throw new Exception ("timer cannot be retrieved from protocol stack"); 140 } 141 142 143 public void stop() { 144 stopTasks(); 145 } 146 147 148 public Object up(Event evt) { 149 Message msg; 150 Header hdr; 151 Address sender; 152 153 switch(evt.getType()) { 154 155 case Event.SET_LOCAL_ADDRESS: 156 local_addr=(Address)evt.getArg(); 157 break; 158 159 case Event.MSG: 160 msg=(Message)evt.getArg(); 161 hdr=(Header)msg.getHeader(name); 162 if(msg_counts_as_heartbeat) 163 update(msg.getSrc()); if(hdr == null) 165 break; 167 switch(hdr.type) { 168 case Header.HEARTBEAT: sender=msg.getSrc(); 170 if(sender.equals(local_addr)) 171 break; 172 175 if(shun && sender != null && members != null && !members.contains(sender)) { 178 shunInvalidHeartbeatSender(sender); 179 break; 180 } 181 182 update(sender); num_heartbeats_received++; 184 break; 186 case Header.SUSPECT: 187 if(log.isTraceEnabled()) log.trace("[SUSPECT] suspect hdr is " + hdr); 188 down_prot.down(new Event(Event.SUSPECT, hdr.suspected_mbr)); 189 up_prot.up(new Event(Event.SUSPECT, hdr.suspected_mbr)); 190 break; 191 192 case Header.NOT_MEMBER: 193 if(shun) { 194 if(log.isDebugEnabled()) log.debug("[NOT_MEMBER] I'm being shunned; exiting"); 195 up_prot.up(new Event(Event.EXIT)); 196 } 197 break; 198 } 199 return null; 200 } 201 return up_prot.up(evt); } 203 204 205 206 207 208 209 public Object down(Event evt) { 210 switch(evt.getType()) { 211 case Event.VIEW_CHANGE: 212 down_prot.down(evt); 213 View v=(View)evt.getArg(); 214 handleViewChange(v); 215 return null; 216 } 217 return down_prot.down(evt); 218 } 219 220 221 private void startTasks() { 222 startHeartbeatSender(); 223 startTimeoutChecker(); 224 tasks_running=true; 225 if(log.isTraceEnabled()) 226 log.trace("started heartbeat sender and timeout checker tasks"); 227 } 228 229 private void stopTasks() { 230 stopTimeoutChecker(); 231 stopHeartbeatSender(); 232 tasks_running=false; 233 if(log.isTraceEnabled()) 234 log.trace("stopped heartbeat sender and timeout checker tasks"); 235 } 236 237 private void startTimeoutChecker() { 238 lock.lock(); 239 try { 240 if(timeout_checker_future == null || timeout_checker_future.isDone()) { 241 timeout_checker_future=timer.scheduleWithFixedDelay(new TimeoutChecker(), interval, interval, TimeUnit.MILLISECONDS); 242 } 243 } 244 finally { 245 lock.unlock(); 246 } 247 } 248 249 private void stopTimeoutChecker() { 250 lock.lock(); 251 try { 252 if(timeout_checker_future != null) { 253 timeout_checker_future.cancel(true); 254 timeout_checker_future=null; 255 } 256 } 257 finally { 258 lock.unlock(); 259 } 260 } 261 262 263 private void startHeartbeatSender() { 264 lock.lock(); 265 try { 266 if(heartbeat_sender_future == null || heartbeat_sender_future.isDone()) { 267 heartbeat_sender_future=timer.scheduleWithFixedDelay(new HeartbeatSender(), interval, interval, TimeUnit.MILLISECONDS); 268 } 269 } 270 finally { 271 lock.unlock(); 272 } 273 } 274 275 private void stopHeartbeatSender() { 276 lock.lock(); 277 try { 278 if(heartbeat_sender_future != null) { 279 heartbeat_sender_future.cancel(true); 280 heartbeat_sender_future=null; 281 } 282 } 283 finally { 284 lock.unlock(); 285 } 286 } 287 288 289 290 291 292 293 294 295 private void update(Address sender) { 296 if(sender != null && !sender.equals(local_addr)) 297 timestamps.put(sender, Long.valueOf(System.currentTimeMillis())); 298 } 299 300 301 private void handleViewChange(View v) { 302 Vector mbrs=v.getMembers(); 303 members.clear(); 304 members.addAll(mbrs); 305 306 Set keys=timestamps.keySet(); 307 keys.retainAll(mbrs); for(Iterator it=mbrs.iterator(); it.hasNext();) { Address mbr=(Address)it.next(); 310 if(mbr.equals(local_addr)) 311 continue; 312 if(!timestamps.containsKey(mbr)) { 313 timestamps.put(mbr, Long.valueOf(System.currentTimeMillis())); 314 } 315 } 316 317 invalid_pingers.clear(); 318 319 if(!tasks_running && members.size() > 1) 320 startTasks(); 321 else if(tasks_running && members.size() < 2) 322 stopTasks(); 323 } 324 325 326 329 private void shunInvalidHeartbeatSender(Address sender) { 330 int num_pings=0; 331 Message shun_msg; 332 333 if(invalid_pingers.containsKey(sender)) { 334 num_pings=invalid_pingers.get(sender).intValue(); 335 if(num_pings >= 3) { 336 if(log.isDebugEnabled()) 337 log.debug(sender + " is not in " + members + " ! Shunning it"); 338 shun_msg=new Message(sender, null, null); 339 shun_msg.setFlag(Message.OOB); 340 shun_msg.putHeader(name, new Header(Header.NOT_MEMBER)); 341 down_prot.down(new Event(Event.MSG, shun_msg)); 342 invalid_pingers.remove(sender); 343 } 344 else { 345 num_pings++; 346 invalid_pingers.put(sender, new Integer (num_pings)); 347 } 348 } 349 else { 350 num_pings++; 351 invalid_pingers.put(sender, Integer.valueOf(num_pings)); 352 } 353 } 354 355 356 private String printTimeStamps() { 357 StringBuilder sb=new StringBuilder (); 358 Map.Entry <Address,Long > entry; 359 long current_time=System.currentTimeMillis(); 360 for(Iterator it=timestamps.entrySet().iterator(); it.hasNext();) { 361 entry=(Map.Entry )it.next(); 362 sb.append(entry.getKey()).append(": "); 363 sb.append(current_time - entry.getValue().longValue()).append(" ms old\n"); 364 } 365 return sb.toString(); 366 } 367 368 void suspect(Address mbr) { 369 Message suspect_msg=new Message(); 370 suspect_msg.setFlag(Message.OOB); 371 Header hdr=new Header(Header.SUSPECT, mbr); 372 suspect_msg.putHeader(name, hdr); 373 down_prot.down(new Event(Event.MSG, suspect_msg)); 374 num_suspect_events++; 375 suspect_history.add(mbr); 376 } 377 378 379 public static class Header extends org.jgroups.Header implements Streamable { 380 public static final byte HEARTBEAT = 0; 381 public static final byte SUSPECT = 1; 382 public static final byte NOT_MEMBER = 2; 384 385 byte type=Header.HEARTBEAT; 386 Address suspected_mbr=null; 387 388 389 390 public Header() { 391 } 392 393 public Header(byte type) { 394 this.type=type; 395 } 396 397 public Header(byte type, Address suspect) { 398 this(type); 399 this.suspected_mbr=suspect; 400 } 401 402 403 public String toString() { 404 switch(type) { 405 case FD_ALL.Header.HEARTBEAT: 406 return "heartbeat"; 407 case FD_ALL.Header.SUSPECT: 408 return "SUSPECT (suspected_mbr=" + suspected_mbr + ")"; 409 case FD_ALL.Header.NOT_MEMBER: 410 return "NOT_MEMBER"; 411 default: 412 return "unknown type (" + type + ")"; 413 } 414 } 415 416 public void writeExternal(ObjectOutput out) throws IOException { 417 out.writeByte(type); 418 out.writeObject(suspected_mbr); 419 } 420 421 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 422 type=in.readByte(); 423 suspected_mbr=(Address)in.readObject(); 424 } 425 426 public int size() { 427 int retval=Global.BYTE_SIZE; retval+=Util.size(suspected_mbr); 429 return retval; 430 } 431 432 public void writeTo(DataOutputStream out) throws IOException { 433 out.writeByte(type); 434 Util.writeAddress(suspected_mbr, out); 435 } 436 437 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 438 type=in.readByte(); 439 suspected_mbr=Util.readAddress(in); 440 } 441 442 } 443 444 445 448 class HeartbeatSender implements Runnable { 449 450 public void run() { 451 Message heartbeat=new Message(); heartbeat.setFlag(Message.OOB); 453 Header hdr=new Header(Header.HEARTBEAT); 454 heartbeat.putHeader(name, hdr); 455 down_prot.down(new Event(Event.MSG, heartbeat)); 456 num_heartbeats_sent++; 459 } 460 } 461 462 463 class TimeoutChecker extends HeartbeatSender { 464 465 public void run() { 466 Map.Entry entry; 467 Object key; 468 Long val; 469 470 471 if(log.isTraceEnabled()) 472 log.trace("checking for expired senders, table is:\n" + printTimeStamps()); 473 474 long current_time=System.currentTimeMillis(), diff; 475 for(Iterator it=timestamps.entrySet().iterator(); it.hasNext();) { 476 entry=(Map.Entry )it.next(); 477 key=entry.getKey(); 478 val=(Long )entry.getValue(); 479 if(val == null) { 480 it.remove(); 481 continue; 482 } 483 diff=current_time - val.longValue(); 484 if(diff > timeout) { 485 if(log.isTraceEnabled()) 486 log.trace("haven't received a heartbeat from " + key + " for " + diff + " ms, suspecting it"); 487 suspect((Address)key); 488 } 489 } 490 } 491 492 493 } 494 495 496 497 498 } 499 | Popular Tags |