1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Marshaller; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Streamable; 11 import org.jgroups.util.Util; 12 13 import java.io.*; 14 import java.util.Hashtable ; 15 import java.util.Iterator ; 16 import java.util.Properties ; 17 import java.util.Vector ; 18 19 20 21 22 40 public class FD extends Protocol { 41 Address ping_dest=null; 42 Address local_addr=null; 43 long timeout=3000; long last_ack=System.currentTimeMillis(); 45 int num_tries=0; 46 int max_tries=2; final Vector members=new Vector (11); 48 final Hashtable invalid_pingers=new Hashtable (7); 50 51 final Vector pingable_mbrs=new Vector (11); 52 53 boolean shun=true; 54 TimeScheduler timer=null; 55 Monitor monitor=null; private final Object monitor_mutex=new Object (); 57 58 59 final Broadcaster bcast_task=new Broadcaster(); 60 final static String name="FD"; 61 62 63 64 65 66 67 public String getName() { 68 return name; 69 } 70 71 72 public boolean setProperties(Properties props) { 73 String str; 74 75 super.setProperties(props); 76 str=props.getProperty("timeout"); 77 if(str != null) { 78 timeout=Long.parseLong(str); 79 props.remove("timeout"); 80 } 81 82 str=props.getProperty("max_tries"); if(str != null) { 84 max_tries=Integer.parseInt(str); 85 props.remove("max_tries"); 86 } 87 88 str=props.getProperty("shun"); 89 if(str != null) { 90 shun=Boolean.valueOf(str).booleanValue(); 91 props.remove("shun"); 92 } 93 94 if(props.size() > 0) { 95 System.err.println("FD.setProperties(): the following properties are not recognized:"); 96 props.list(System.out); 97 return false; 98 } 99 return true; 100 } 101 102 103 public void init() throws Exception { 104 if(stack != null && stack.timer != null) 105 timer=stack.timer; 106 else 107 throw new Exception ("FD.init(): timer cannot be retrieved from protocol stack"); 108 } 109 110 111 public void stop() { 112 stopMonitor(); 113 } 114 115 116 Object getPingDest(Vector mbrs) { 117 Object tmp, retval=null; 118 119 if(mbrs == null || mbrs.size() < 2 || local_addr == null) 120 return null; 121 for(int i=0; i < mbrs.size(); i++) { 122 tmp=mbrs.elementAt(i); 123 if(local_addr.equals(tmp)) { 124 if(i + 1 >= mbrs.size()) 125 retval=mbrs.elementAt(0); 126 else 127 retval=mbrs.elementAt(i + 1); 128 break; 129 } 130 } 131 return retval; 132 } 133 134 135 private void startMonitor() { 136 synchronized(monitor_mutex) { 137 if(monitor != null && monitor.started == false) { 138 monitor=null; 139 } 140 if(monitor == null) { 141 monitor=new Monitor(); 142 last_ack=System.currentTimeMillis(); timer.add(monitor, true); num_tries=0; 145 } 146 } 147 } 148 149 private void stopMonitor() { 150 synchronized(monitor_mutex) { 151 if(monitor != null) { 152 monitor.stop(); 153 monitor=null; 154 } 155 } 156 } 157 158 159 public void up(Event evt) { 160 Message msg; 161 FdHeader hdr=null; 162 Object sender, tmphdr; 163 164 switch(evt.getType()) { 165 166 case Event.SET_LOCAL_ADDRESS: 167 local_addr=(Address)evt.getArg(); 168 break; 169 170 case Event.MSG: 171 msg=(Message)evt.getArg(); 172 tmphdr=msg.getHeader(name); 173 if(tmphdr == null || !(tmphdr instanceof FdHeader)) { 174 if(ping_dest != null && (sender=msg.getSrc()) != null) { 175 if(ping_dest.equals(sender)) { 176 last_ack=System.currentTimeMillis(); 177 if(log.isTraceEnabled()) 178 log.trace("received msg from " + sender + " (counts as ack)"); 179 num_tries=0; 180 } 181 } 182 break; } 184 185 hdr=(FdHeader)msg.removeHeader(name); 186 switch(hdr.type) { 187 case FdHeader.HEARTBEAT: Address hb_sender=msg.getSrc(); 189 Message hb_ack=new Message(hb_sender, null, null); 190 FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK); 191 192 tmp_hdr.from=local_addr; 194 hb_ack.putHeader(name, tmp_hdr); 195 if(log.isTraceEnabled()) 196 log.trace("received are-you-alive from " + hb_sender + ", sending response"); 197 passDown(new Event(Event.MSG, hb_ack)); 198 199 if(shun) 202 shunInvalidHeartbeatSender(hb_sender); 203 break; 205 case FdHeader.HEARTBEAT_ACK: if(ping_dest != null && ping_dest.equals(hdr.from)) { 207 last_ack=System.currentTimeMillis(); 208 num_tries=0; 209 if(log.isDebugEnabled()) log.debug("received ack from " + hdr.from); 210 } 211 else { 212 stop(); 213 ping_dest=(Address)getPingDest(members); 214 if(ping_dest != null) { 215 try { 216 startMonitor(); 217 } 218 catch(Exception ex) { 219 if(log.isWarnEnabled()) log.warn("exception when calling startMonitor(): " + ex); 220 } 221 } 222 } 223 break; 224 225 case FdHeader.SUSPECT: 226 if(hdr.mbrs != null) { 227 if(log.isTraceEnabled()) log.trace("[SUSPECT] suspect hdr is " + hdr); 228 for(int i=0; i < hdr.mbrs.size(); i++) { 229 Address m=(Address)hdr.mbrs.elementAt(i); 230 if(local_addr != null && m.equals(local_addr)) { 231 if(log.isWarnEnabled()) 232 log.warn("I was suspected, but will not remove myself from membership " + 233 "(waiting for EXIT message)"); 234 } 235 else { 236 pingable_mbrs.remove(m); 237 ping_dest=(Address)getPingDest(pingable_mbrs); 238 } 239 passUp(new Event(Event.SUSPECT, m)); 240 passDown(new Event(Event.SUSPECT, m)); 241 } 242 } 243 break; 244 245 case FdHeader.NOT_MEMBER: 246 if(shun) { 247 if(log.isDebugEnabled()) log.debug("[NOT_MEMBER] I'm being shunned; exiting"); 248 passUp(new Event(Event.EXIT)); 249 } 250 break; 251 } 252 return; 253 } 254 passUp(evt); } 256 257 258 public void down(Event evt) { 259 View v; 260 261 switch(evt.getType()) { 262 case Event.VIEW_CHANGE: 263 synchronized(this) { 264 stop(); 265 v=(View)evt.getArg(); 266 members.clear(); 267 members.addAll(v.getMembers()); 268 bcast_task.adjustSuspectedMembers(members); 269 pingable_mbrs.clear(); 270 pingable_mbrs.addAll(members); 271 passDown(evt); 272 ping_dest=(Address)getPingDest(pingable_mbrs); 273 if(ping_dest != null) { 274 try { 275 startMonitor(); 276 } 277 catch(Exception ex) { 278 if(log.isWarnEnabled()) log.warn("exception when calling startMonitor(): " + ex); 279 } 280 } 281 } 282 break; 283 284 case Event.UNSUSPECT: 285 unsuspect((Address)evt.getArg()); 286 passDown(evt); 287 break; 288 289 default: 290 passDown(evt); 291 break; 292 } 293 } 294 295 296 void unsuspect(Address mbr) { 297 bcast_task.removeSuspectedMember(mbr); 298 pingable_mbrs.removeAllElements(); 299 pingable_mbrs.addAll(members); 300 pingable_mbrs.removeAll(bcast_task.getSuspectedMembers()); 301 ping_dest=(Address)getPingDest(pingable_mbrs); 302 } 303 304 305 308 void shunInvalidHeartbeatSender(Address hb_sender) { 309 int num_pings=0; 310 Message shun_msg; 311 312 if(hb_sender != null && members != null && !members.contains(hb_sender)) { 313 if(invalid_pingers.containsKey(hb_sender)) { 314 num_pings=((Integer )invalid_pingers.get(hb_sender)).intValue(); 315 if(num_pings >= max_tries) { 316 if(log.isDebugEnabled()) 317 log.debug(hb_sender + " is not in " + members + " ! Shunning it"); 318 shun_msg=new Message(hb_sender, null, null); 319 shun_msg.putHeader(name, new FdHeader(FdHeader.NOT_MEMBER)); 320 passDown(new Event(Event.MSG, shun_msg)); 321 invalid_pingers.remove(hb_sender); 322 } 323 else { 324 num_pings++; 325 invalid_pingers.put(hb_sender, new Integer (num_pings)); 326 } 327 } 328 else { 329 num_pings++; 330 invalid_pingers.put(hb_sender, new Integer (num_pings)); 331 } 332 } 333 } 334 335 336 public static class FdHeader extends Header implements Streamable { 337 static final byte HEARTBEAT=0; 338 static final byte HEARTBEAT_ACK=1; 339 static final byte SUSPECT=2; 340 static final byte NOT_MEMBER=3; 342 343 byte type=HEARTBEAT; 344 Vector mbrs=null; 345 Address from=null; 347 348 349 public FdHeader() { 350 } 352 FdHeader(byte type) { 353 this.type=type; 354 } 355 356 357 public String toString() { 358 switch(type) { 359 case HEARTBEAT: 360 return "[FD: heartbeat]"; 361 case HEARTBEAT_ACK: 362 return "[FD: heartbeat ack]"; 363 case SUSPECT: 364 return "[FD: SUSPECT (suspected_mbrs=" + mbrs + ", from=" + from + ")]"; 365 case NOT_MEMBER: 366 return "[FD: NOT_MEMBER]"; 367 default: 368 return "[FD: unknown type (" + type + ")]"; 369 } 370 } 371 372 public void writeExternal(ObjectOutput out) throws IOException { 373 out.writeByte(type); 374 if(mbrs == null) 375 out.writeBoolean(false); 376 else { 377 out.writeBoolean(true); 378 out.writeInt(mbrs.size()); 379 for(Iterator it=mbrs.iterator(); it.hasNext();) { 380 Address addr=(Address)it.next(); 381 Marshaller.write(addr, out); 382 } 383 } 384 Marshaller.write(from, out); 385 } 386 387 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 388 type=in.readByte(); 389 boolean mbrs_not_null=in.readBoolean(); 390 if(mbrs_not_null) { 391 int len=in.readInt(); 392 mbrs=new Vector (11); 393 for(int i=0; i < len; i++) { 394 Address addr=(Address)Marshaller.read(in); 395 mbrs.add(addr); 396 } 397 } 398 from=(Address)Marshaller.read(in); 399 } 400 401 public void writeTo(DataOutputStream out) throws IOException { 402 out.writeByte(type); 403 out.writeInt(mbrs != null? mbrs.size() : 0); 404 if(mbrs != null) { 405 for(Iterator it=mbrs.iterator(); it.hasNext();) { 406 Address address=(Address)it.next(); 407 Util.writeAddress(address, out); 408 } 409 } 410 Util.writeAddress(from, out); 411 } 412 413 414 public long size() { 415 int retval=Global.BYTE_SIZE; 416 Address addr; 417 if(mbrs != null) { 418 retval+=Global.INT_SIZE; for(Iterator it=mbrs.iterator(); it.hasNext();) { 420 addr=(Address)it.next(); 421 if(addr != null) 422 retval+=addr.size() + Global.BYTE_SIZE; } 424 } 425 retval+=Global.BYTE_SIZE; if(from != null) 427 retval+=from.size(); 428 return retval; 429 } 430 431 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 432 type=in.readByte(); 433 int size=in.readInt(); 434 if(size > 0) { 435 if(mbrs == null) 436 mbrs=new Vector (); 437 for(int i=0; i < size; i++) { 438 Address addr=Util.readAddress(in); 439 mbrs.add(addr); 440 } 441 } 442 from=Util.readAddress(in); 443 } 444 445 } 446 447 448 private class Monitor implements TimeScheduler.Task { 449 boolean started=true; 450 451 public void stop() { 452 started=false; 453 } 454 455 456 public boolean cancelled() { 457 return !started; 458 } 459 460 461 public long nextInterval() { 462 return timeout; 463 } 464 465 466 public void run() { 467 Message hb_req; 468 long not_heard_from=0; 470 if(ping_dest == null) { 471 if(log.isWarnEnabled()) 472 log.warn("ping_dest is null: members=" + members + ", pingable_mbrs=" + 473 pingable_mbrs + ", local_addr=" + local_addr); 474 return; 475 } 476 477 478 hb_req=new Message(ping_dest, null, null); 480 hb_req.putHeader(name, new FdHeader(FdHeader.HEARTBEAT)); if(log.isDebugEnabled()) 482 log.debug("sending are-you-alive msg to " + ping_dest + " (own address=" + local_addr + ')'); 483 passDown(new Event(Event.MSG, hb_req)); 484 485 not_heard_from=System.currentTimeMillis() - last_ack; 489 if(not_heard_from > timeout + 500) { if(num_tries >= max_tries) { 492 if(log.isDebugEnabled()) 493 log.debug("[" + local_addr + "]: received no heartbeat ack from " + ping_dest + 494 " for " + (num_tries +1) + " times (" + ((num_tries+1) * timeout) + 495 " milliseconds), suspecting it"); 496 bcast_task.addSuspectedMember(ping_dest); 499 num_tries=0; 500 } 501 else { 502 if(log.isDebugEnabled()) 503 log.debug("heartbeat missing from " + ping_dest + " (number=" + num_tries + ')'); 504 num_tries++; 505 } 506 } 507 } 508 509 510 public String toString() { 511 return "" + started; 512 } 513 514 } 515 516 517 523 private class Broadcaster { 524 final Vector suspected_mbrs=new Vector (7); 525 BroadcastTask task=null; 526 private final Object bcast_mutex=new Object (); 527 528 529 Vector getSuspectedMembers() { 530 return suspected_mbrs; 531 } 532 533 537 private void startBroadcastTask(Address suspect) { 538 synchronized(bcast_mutex) { 539 if(task == null || task.cancelled()) { 540 task=new BroadcastTask((Vector )suspected_mbrs.clone()); 541 task.addSuspectedMember(suspect); 542 task.run(); timer.add(task); if(log.isTraceEnabled()) 545 log.trace("BroadcastTask started"); 546 } 547 else { 548 task.addSuspectedMember(suspect); 549 } 550 } 551 } 552 553 private void stopBroadcastTask() { 554 synchronized(bcast_mutex) { 555 if(task != null) { 556 task.stop(); 557 task=null; 558 } 559 } 560 } 561 562 563 void addSuspectedMember(Address mbr) { 564 if(mbr == null) return; 565 if(!members.contains(mbr)) return; 566 synchronized(suspected_mbrs) { 567 if(!suspected_mbrs.contains(mbr)) { 568 suspected_mbrs.addElement(mbr); 569 startBroadcastTask(mbr); 570 } 571 } 572 } 573 574 void removeSuspectedMember(Address suspected_mbr) { 575 if(suspected_mbr == null) return; 576 if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr); 577 synchronized(suspected_mbrs) { 578 suspected_mbrs.removeElement(suspected_mbr); 579 if(suspected_mbrs.size() == 0) 580 stopBroadcastTask(); 581 } 582 } 583 584 void removeAll() { 585 synchronized(suspected_mbrs) { 586 suspected_mbrs.removeAllElements(); 587 stopBroadcastTask(); 588 } 589 } 590 591 592 void adjustSuspectedMembers(Vector new_mbrship) { 593 if(new_mbrship == null || new_mbrship.size() == 0) return; 594 StringBuffer sb=new StringBuffer (); 595 synchronized(suspected_mbrs) { 596 sb.append("suspected_mbrs: ").append(suspected_mbrs); 597 suspected_mbrs.retainAll(new_mbrship); 598 if(suspected_mbrs.size() == 0) 599 stopBroadcastTask(); 600 sb.append(", after adjustment: ").append(suspected_mbrs); 601 log.debug(sb.toString()); 602 } 603 } 604 } 605 606 607 private class BroadcastTask implements TimeScheduler.Task { 608 boolean cancelled=false; 609 private Vector suspected_members=null; 610 611 612 public BroadcastTask(Vector suspected_members) { 613 this.suspected_members=suspected_members; 614 } 615 616 public void stop() { 617 cancelled=true; 618 suspected_members.clear(); 619 if(log.isTraceEnabled()) 620 log.trace("BroadcastTask stopped"); 621 } 622 623 public boolean cancelled() { 624 return cancelled; 625 } 626 627 public long nextInterval() { 628 return FD.this.timeout; 629 } 630 631 public void run() { 632 Message suspect_msg; 633 FD.FdHeader hdr; 634 635 synchronized(suspected_members) { 636 if(suspected_members.size() == 0) { 637 stop(); 638 if(log.isDebugEnabled()) log.debug("task done (no suspected members)"); 639 return; 640 } 641 642 hdr=new FdHeader(FdHeader.SUSPECT); 643 hdr.mbrs=(Vector )suspected_members.clone(); 644 hdr.from=local_addr; 645 } 646 suspect_msg=new Message(); suspect_msg.putHeader(name, hdr); 648 if(log.isDebugEnabled()) 649 log.debug("broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group"); 650 passDown(new Event(Event.MSG, suspect_msg)); 651 if(log.isDebugEnabled()) log.debug("task done"); 652 } 653 654 public void addSuspectedMember(Address suspect) { 655 if(suspect != null && !suspected_members.contains(suspect)) { 656 suspected_members.add(suspect); 657 } 658 } 659 } 660 661 } 662 | Popular Tags |