1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.util.Util; 8 9 import java.io.IOException ; 10 import java.io.ObjectInput ; 11 import java.io.ObjectOutput ; 12 import java.util.Enumeration ; 13 import java.util.Hashtable ; 14 import java.util.Properties ; 15 import java.util.Vector ; 16 17 18 31 public class FD_PROB extends Protocol implements Runnable { 32 Address local_addr=null; 33 Thread hb=null; 34 long timeout=3000; long gossip_interval=1000; 36 Vector members=null; 37 final Hashtable counters=new Hashtable (); final Hashtable invalid_pingers=new Hashtable (); int max_tries=2; 41 42 public String getName() { 43 return "FD_PROB"; 44 } 45 46 47 public boolean setProperties(Properties props) { 48 String str; 49 50 super.setProperties(props); 51 str=props.getProperty("timeout"); 52 if(str != null) { 53 timeout=Long.parseLong(str); 54 props.remove("timeout"); 55 } 56 57 str=props.getProperty("gossip_interval"); 58 if(str != null) { 59 gossip_interval=Long.parseLong(str); 60 props.remove("gossip_interval"); 61 } 62 63 str=props.getProperty("max_tries"); 64 if(str != null) { 65 max_tries=Integer.parseInt(str); 66 props.remove("max_tries"); 67 } 68 69 if(props.size() > 0) { 70 System.err.println("FD_PROB.setProperties(): the following properties are not recognized:"); 71 props.list(System.out); 72 return false; 73 } 74 return true; 75 } 76 77 78 public void start() throws Exception { 79 if(hb == null) { 80 hb=new Thread (this, "FD_PROB.HeartbeatThread"); 81 hb.setDaemon(true); 82 hb.start(); 83 } 84 } 85 86 87 public void stop() { 88 Thread tmp=null; 89 if(hb != null && hb.isAlive()) { 90 tmp=hb; 91 hb=null; 92 tmp.interrupt(); 93 try { 94 tmp.join(timeout); 95 } 96 catch(Exception ex) { 97 } 98 } 99 hb=null; 100 } 101 102 103 public void up(Event evt) { 104 Message msg; 105 Address hb_sender; 106 FdHeader hdr=null; 107 Object obj; 108 109 switch(evt.getType()) { 110 111 case Event.SET_LOCAL_ADDRESS: 112 local_addr=(Address) evt.getArg(); 113 break; 114 115 case Event.MSG: 116 msg=(Message) evt.getArg(); 117 obj=msg.getHeader(getName()); 118 if(obj == null || !(obj instanceof FdHeader)) { 119 updateCounter(msg.getSrc()); break; 121 } 122 123 hdr=(FdHeader) msg.removeHeader(getName()); 124 switch(hdr.type) { 125 case FdHeader.HEARTBEAT: if(checkPingerValidity(msg.getSrc()) == false) return; 128 129 131 if(log.isInfoEnabled()) log.info("<-- HEARTBEAT from " + msg.getSrc()); 132 updateCounters(hdr); 133 return; case FdHeader.NOT_MEMBER: 135 if(log.isWarnEnabled()) log.warn("NOT_MEMBER: I'm being shunned; exiting"); 136 passUp(new Event(Event.EXIT)); 137 return; 138 default: 139 if(log.isWarnEnabled()) log.warn("FdHeader type " + hdr.type + " not known"); 140 return; 141 } 142 } 143 passUp(evt); } 145 146 147 public void down(Event evt) { 148 Message msg; 149 int num_mbrs; 150 Vector excluded_mbrs; 151 FdEntry entry; 152 Address mbr; 153 154 switch(evt.getType()) { 155 156 case Event.VIEW_CHANGE: 158 passDown(evt); 159 synchronized(this) { 160 View v=(View) evt.getArg(); 161 162 excluded_mbrs=computeExcludedMembers(members, v.getMembers()); 164 if(excluded_mbrs != null && excluded_mbrs.size() > 0) { 165 for(int i=0; i < excluded_mbrs.size(); i++) { 166 mbr=(Address) excluded_mbrs.elementAt(i); 167 entry=(FdEntry) counters.get(mbr); 168 if(entry != null) 169 entry.setExcluded(true); 170 } 171 } 172 173 members=v != null ? v.getMembers() : null; 174 if(members != null) { 175 num_mbrs=members.size(); 176 if(num_mbrs >= 2) { 177 if(hb == null) { 178 try { 179 start(); 180 } 181 catch(Exception ex) { 182 if(log.isWarnEnabled()) log.warn("exception when calling start(): " + ex); 183 } 184 } 185 } 186 else 187 stop(); 188 } 189 } 190 break; 191 192 default: 193 passDown(evt); 194 break; 195 } 196 } 197 198 199 203 public void run() { 204 Message hb_msg; 205 FdHeader hdr; 206 Address hb_dest, key; 207 FdEntry entry; 208 long curr_time, diff; 209 210 211 212 if(log.isInfoEnabled()) log.info("heartbeat thread was started"); 213 214 while(hb != null && members.size() > 1) { 215 216 hb_dest=getHeartbeatDest(); 218 if(hb_dest == null) { 219 if(log.isWarnEnabled()) log.warn("hb_dest is null"); 220 Util.sleep(gossip_interval); 221 continue; 222 } 223 224 225 entry=(FdEntry) counters.get(local_addr); 227 if(entry == null) { 228 entry=new FdEntry(); 229 counters.put(local_addr, entry); 230 } 231 entry.incrementCounter(); 232 233 234 hdr=createHeader(); 236 if(hdr == null) 237 if(log.isWarnEnabled()) log.warn("header could not be created. Heartbeat will not be sent"); 238 else { 239 hb_msg=new Message(hb_dest, null, null); 240 hb_msg.putHeader(getName(), hdr); 241 242 if(log.isInfoEnabled()) log.info("--> HEARTBEAT to " + hb_dest); 243 passDown(new Event(Event.MSG, hb_msg)); 244 } 245 246 247 if(log.isInfoEnabled()) log.info("own counters are " + printCounters()); 248 249 250 for(Enumeration e=counters.keys(); e.hasMoreElements();) { 252 curr_time=System.currentTimeMillis(); 253 key=(Address) e.nextElement(); 254 entry=(FdEntry) counters.get(key); 255 256 if(entry.getTimestamp() > 0 && (diff=curr_time - entry.getTimestamp()) >= timeout) { 257 if(entry.excluded()) { 258 if(diff >= 2 * timeout) { counters.remove(key); 260 261 if(log.isInfoEnabled()) log.info("removed " + key); 262 continue; 263 } 264 } 265 else { 266 267 if(log.isInfoEnabled()) log.info("suspecting " + key); 268 passUp(new Event(Event.SUSPECT, key)); 269 } 270 } 271 } 272 Util.sleep(gossip_interval); 273 } 275 276 if(log.isInfoEnabled()) log.info("heartbeat thread was stopped"); 277 } 278 279 280 281 282 283 284 285 286 287 Address getHeartbeatDest() { 288 Address retval=null; 289 int r, size; 290 Vector members_copy; 291 292 if(members == null || members.size() < 2 || local_addr == null) 293 return null; 294 members_copy=(Vector ) members.clone(); 295 members_copy.removeElement(local_addr); size=members_copy.size(); 297 r=((int) (Math.random() * (size + 1))) % size; 298 retval=(Address) members_copy.elementAt(r); 299 return retval; 300 } 301 302 303 304 FdHeader createHeader() { 305 int num_mbrs=counters.size(), index=0; 306 FdHeader ret=null; 307 Address key; 308 FdEntry entry; 309 310 if(num_mbrs <= 0) 311 return null; 312 ret=new FdHeader(FdHeader.HEARTBEAT, num_mbrs); 313 for(Enumeration e=counters.keys(); e.hasMoreElements();) { 314 key=(Address) e.nextElement(); 315 entry=(FdEntry) counters.get(key); 316 if(entry.excluded()) 317 continue; 318 if(index >= ret.members.length) { 319 if(log.isWarnEnabled()) log.warn("index " + index + " is out of bounds (" + 320 ret.members.length + ')'); 321 break; 322 } 323 ret.members[index]=key; 324 ret.counters[index]=entry.getCounter(); 325 index++; 326 } 327 return ret; 328 } 329 330 331 332 void updateCounters(FdHeader hdr) { 333 Address key; 334 long counter; 335 FdEntry entry; 336 337 if(hdr == null || hdr.members == null || hdr.counters == null) { 338 if(log.isWarnEnabled()) log.warn("hdr is null or contains no counters"); 339 return; 340 } 341 342 for(int i=0; i < hdr.members.length; i++) { 343 key=hdr.members[i]; 344 if(key == null) continue; 345 entry=(FdEntry) counters.get(key); 346 if(entry == null) { 347 entry=new FdEntry(hdr.counters[i]); 348 counters.put(key, entry); 349 continue; 350 } 351 352 if(entry.excluded()) 353 continue; 354 355 entry.setCounter(Math.max(entry.getCounter(), hdr.counters[i])); 357 } 358 } 359 360 361 362 void updateCounter(Address mbr) { 363 FdEntry entry; 364 365 if(mbr == null) return; 366 entry=(FdEntry) counters.get(mbr); 367 if(entry != null) 368 entry.setTimestamp(); 369 } 370 371 372 String printCounters() { 373 StringBuffer sb=new StringBuffer (); 374 Address mbr; 375 FdEntry entry; 376 377 for(Enumeration e=counters.keys(); e.hasMoreElements();) { 378 mbr=(Address) e.nextElement(); 379 entry=(FdEntry) counters.get(mbr); 380 sb.append("\n" + mbr + ": " + entry._toString()); 381 } 382 return sb.toString(); 383 } 384 385 386 Vector computeExcludedMembers(Vector old_mbrship, Vector new_mbrship) { 387 Vector ret=new Vector (); 388 if(old_mbrship == null || new_mbrship == null) return ret; 389 for(int i=0; i < old_mbrship.size(); i++) 390 if(!new_mbrship.contains(old_mbrship.elementAt(i))) 391 ret.addElement(old_mbrship.elementAt(i)); 392 return ret; 393 } 394 395 396 397 boolean checkPingerValidity(Object hb_sender) { 398 int num_pings=0; 399 Message shun_msg; 400 Header hdr; 401 402 if(hb_sender != null && members != null && !members.contains(hb_sender)) { 403 if(invalid_pingers.containsKey(hb_sender)) { 404 num_pings=((Integer ) invalid_pingers.get(hb_sender)).intValue(); 405 if(num_pings >= max_tries) { 406 if(log.isErrorEnabled()) log.error("sender " + hb_sender + 407 " is not member in " + members + " ! Telling it to leave group"); 408 shun_msg=new Message((Address) hb_sender, null, null); 409 hdr=new FdHeader(FdHeader.NOT_MEMBER); 410 shun_msg.putHeader(getName(), hdr); 411 passDown(new Event(Event.MSG, shun_msg)); 412 invalid_pingers.remove(hb_sender); 413 } 414 else { 415 num_pings++; 416 invalid_pingers.put(hb_sender, new Integer (num_pings)); 417 } 418 } 419 else { 420 num_pings++; 421 invalid_pingers.put(hb_sender, new Integer (num_pings)); 422 } 423 return false; 424 } 425 else 426 return true; 427 } 428 429 430 431 432 433 434 435 436 437 public static class FdHeader extends Header { 438 static final int HEARTBEAT=1; static final int NOT_MEMBER=2; 441 442 int type=HEARTBEAT; 443 Address[] members=null; 444 long[] counters=null; 446 447 public FdHeader() { 448 } 450 FdHeader(int type) { 451 this.type=type; 452 } 453 454 FdHeader(int type, int num_elements) { 455 this(type); 456 members=new Address[num_elements]; 457 counters=new long[num_elements]; 458 } 459 460 461 public String toString() { 462 switch(type) { 463 case HEARTBEAT: 464 return "[FD_PROB: HEARTBEAT]"; 465 case NOT_MEMBER: 466 return "[FD_PROB: NOT_MEMBER]"; 467 default: 468 return "[FD_PROB: unknown type (" + type + ")]"; 469 } 470 } 471 472 public String printDetails() { 473 StringBuffer sb=new StringBuffer (); 474 Address mbr; 475 long c; 476 477 if(members != null && counters != null) 478 for(int i=0; i < members.length; i++) { 479 mbr=members[i]; 480 if(mbr == null) 481 sb.append("\n<null>"); 482 else 483 sb.append("\n" + mbr); 484 sb.append(": " + counters[i]); 485 } 486 return sb.toString(); 487 } 488 489 490 public void writeExternal(ObjectOutput out) throws IOException { 491 out.writeInt(type); 492 493 if(members != null) { 494 out.writeInt(members.length); 495 out.writeObject(members); 496 } 497 else 498 out.writeInt(0); 499 500 if(counters != null) { 501 out.writeInt(counters.length); 502 for(int i=0; i < counters.length; i++) 503 out.writeLong(counters[i]); 504 } 505 else 506 out.writeInt(0); 507 } 508 509 510 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 511 int num; 512 type=in.readInt(); 513 514 num=in.readInt(); 515 if(num == 0) 516 members=null; 517 else { 518 members=(Address[]) in.readObject(); 519 } 520 521 num=in.readInt(); 522 if(num == 0) 523 counters=null; 524 else { 525 counters=new long[num]; 526 for(int i=0; i < counters.length; i++) 527 counters[i]=in.readLong(); 528 } 529 } 530 531 532 } 533 534 535 private static class FdEntry { 536 private long counter=0; private long timestamp=0; private boolean excluded=false; 540 541 FdEntry() { 542 543 } 544 545 FdEntry(long counter) { 546 this.counter=counter; 547 timestamp=System.currentTimeMillis(); 548 } 549 550 551 long getCounter() { 552 return counter; 553 } 554 555 long getTimestamp() { 556 return timestamp; 557 } 558 559 boolean excluded() { 560 return excluded; 561 } 562 563 564 synchronized void setCounter(long new_counter) { 565 if(new_counter > counter) { timestamp=System.currentTimeMillis(); 567 counter=new_counter; 568 } 569 } 570 571 synchronized void incrementCounter() { 572 counter++; 573 timestamp=System.currentTimeMillis(); 574 } 575 576 synchronized void setTimestamp() { 577 timestamp=System.currentTimeMillis(); 578 } 579 580 synchronized void setExcluded(boolean flag) { 581 excluded=flag; 582 } 583 584 585 public String toString() { 586 return "counter=" + counter + ", timestamp=" + timestamp + ", excluded=" + excluded; 587 } 588 589 public String _toString() { 590 return "counter=" + counter + ", age=" + (System.currentTimeMillis() - timestamp) + 591 ", excluded=" + excluded; 592 } 593 } 594 595 596 } 597 | Popular Tags |