1 3 package org.jgroups.protocols.pbcast; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.Promise; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Util; 11 import org.jgroups.util.Streamable; 12 13 import java.io.*; 14 import java.util.Properties ; 15 import java.util.Vector ; 16 17 18 19 20 37 public class STABLE extends Protocol { 38 Address local_addr=null; 39 final Vector mbrs=new Vector (); 40 final Digest digest=new Digest(); final Promise digest_promise=new Promise(); final Vector heard_from=new Vector (); long digest_timeout=60000; 45 46 long desired_avg_gossip=20000; 47 48 50 long stability_delay=6000; 51 StabilitySendTask stability_task=null; 52 final Object stability_mutex=new Object (); StableTask stable_task=null; final Object stable_task_mutex=new Object (); TimeScheduler timer=null; int max_gossip_runs=3; int num_gossip_runs=max_gossip_runs; static final String name="STABLE"; 59 60 63 long max_bytes=0; 64 65 66 long num_bytes_received=0; 67 68 70 boolean suspended=false; 71 72 78 80 ResumeTask resume_task=null; 81 final Object resume_task_mutex=new Object (); 82 83 84 public String getName() { 85 return name; 86 } 87 88 89 public Vector requiredDownServices() { 90 Vector retval=new Vector (); 91 retval.addElement(new Integer (Event.GET_DIGEST_STABLE)); return retval; 93 } 94 95 public boolean setProperties(Properties props) { 96 String str; 97 98 super.setProperties(props); 99 str=props.getProperty("digest_timeout"); 100 if(str != null) { 101 digest_timeout=Long.parseLong(str); 102 props.remove("digest_timeout"); 103 } 104 105 str=props.getProperty("desired_avg_gossip"); 106 if(str != null) { 107 desired_avg_gossip=Long.parseLong(str); 108 props.remove("desired_avg_gossip"); 109 } 110 111 str=props.getProperty("stability_delay"); 112 if(str != null) { 113 stability_delay=Long.parseLong(str); 114 props.remove("stability_delay"); 115 } 116 117 str=props.getProperty("max_gossip_runs"); 118 if(str != null) { 119 max_gossip_runs=Integer.parseInt(str); 120 num_gossip_runs=max_gossip_runs; 121 props.remove("max_gossip_runs"); 122 } 123 124 str=props.getProperty("max_bytes"); 125 if(str != null) { 126 max_bytes=Long.parseLong(str); 127 props.remove("max_bytes"); 128 } 129 130 str=props.getProperty("max_suspend_time"); 131 if(str != null) { 132 System.err.println("max_suspend_time is not supported any longer; please remove it (ignoring it)"); 133 props.remove("max_suspend_time"); 134 } 135 136 if(props.size() > 0) { 137 System.err.println("STABLE.setProperties(): these properties are not recognized:"); 138 props.list(System.out); 139 return false; 140 } 141 return true; 142 } 143 144 145 void suspend(long timeout) { 146 if(!suspended) { 147 suspended=true; 148 if(log.isDebugEnabled()) 149 log.debug("suspending message garbage collection"); 150 } 151 startResumeTask(timeout); } 153 154 void resume() { 155 suspended=false; 156 if(log.isDebugEnabled()) 157 log.debug("resuming message garbage collection"); 158 stopResumeTask(); 159 } 160 161 public void start() throws Exception { 162 if(stack != null && stack.timer != null) 163 timer=stack.timer; 164 else 165 throw new Exception ("STABLE.up(): timer cannot be retrieved from protocol stack"); 166 } 167 168 public void stop() { 169 stopStableTask(); 170 } 171 172 173 public void up(Event evt) { 174 Message msg; 175 StableHeader hdr; 176 Header obj; 177 int type=evt.getType(); 178 179 switch(type) { 180 181 case Event.MSG: 182 msg=(Message)evt.getArg(); 183 184 if(max_bytes > 0) { long size=Math.max(msg.getLength(), 24); 186 num_bytes_received+=size; 187 if(num_bytes_received >= max_bytes) { 188 if(log.isTraceEnabled()) { 189 StringBuffer sb=new StringBuffer ("max_bytes has been exceeded (max_bytes="); 190 sb.append(max_bytes).append(", number of bytes received="); 191 sb.append(num_bytes_received).append("): sending STABLE message"); 192 log.trace(sb.toString()); 193 } 194 195 new Thread () { 196 public void run() { 197 initialize(); 198 sendStableMessage(); 199 } 200 }.start(); 201 num_bytes_received=0; 202 } 203 } 204 205 obj=msg.getHeader(name); 206 if(obj == null || !(obj instanceof StableHeader)) 207 break; 208 hdr=(StableHeader)msg.removeHeader(name); 209 switch(hdr.type) { 210 case StableHeader.STABLE_GOSSIP: 211 handleStableGossip(msg.getSrc(), hdr.stableDigest); 212 break; 213 case StableHeader.STABILITY: 214 handleStabilityMessage(hdr.stableDigest); 215 break; 216 default: 217 if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known"); 218 } 219 return; 221 case Event.SET_LOCAL_ADDRESS: 222 local_addr=(Address)evt.getArg(); 223 break; 224 } 225 226 passUp(evt); 227 if(desired_avg_gossip > 0) { 228 if(type == Event.VIEW_CHANGE || type == Event.MSG) 229 startStableTask(); } 231 } 232 233 234 247 protected void receiveUpEvent(Event evt) { 248 if(evt.getType() == Event.GET_DIGEST_STABLE_OK) { 249 digest_promise.setResult(evt.getArg()); 250 return; 251 } 252 super.receiveUpEvent(evt); 253 } 254 255 256 public void down(Event evt) { 257 int type=evt.getType(); 258 259 switch(evt.getType()) { 260 case Event.VIEW_CHANGE: 261 View v=(View)evt.getArg(); 262 Vector tmp=v.getMembers(); 263 mbrs.removeAllElements(); 264 mbrs.addAll(tmp); 265 heard_from.retainAll(tmp); stopStableTask(); 267 break; 268 269 case Event.SUSPEND_STABLE: 270 long timeout=0; 271 Object t=evt.getArg(); 272 if(t != null && t instanceof Long ) 273 timeout=((Long )t).longValue(); 274 stopStableTask(); 275 suspend(timeout); 276 break; 277 278 case Event.RESUME_STABLE: 279 resume(); 280 break; 281 } 282 283 if(desired_avg_gossip > 0) { 284 if(type == Event.VIEW_CHANGE || type == Event.MSG) 285 startStableTask(); } 287 288 passDown(evt); 289 } 290 291 292 293 294 295 296 297 void initialize() { 298 synchronized(digest) { 299 digest.reset(mbrs.size()); 300 for(int i=0; i < mbrs.size(); i++) 301 digest.add((Address)mbrs.elementAt(i), -1, -1); 302 heard_from.removeAllElements(); 303 heard_from.addAll(mbrs); 304 } 305 } 306 307 308 void startStableTask() { 309 num_gossip_runs=max_gossip_runs; 310 311 if(stable_task != null) 315 return; 316 synchronized(stable_task_mutex) { 317 if(stable_task != null && stable_task.running()) { 318 return; } 320 stable_task=new StableTask(); 321 timer.add(stable_task, true); } 323 if(log.isTraceEnabled()) 324 log.trace("stable task started; num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs); 325 } 326 327 328 void stopStableTask() { 329 synchronized(stable_task_mutex) { 332 if(stable_task != null) { 333 stable_task.stop(); 334 stable_task=null; 335 } 336 } 337 } 338 339 340 void startResumeTask(long max_suspend_time) { 341 max_suspend_time=(long)(max_suspend_time * 1.1); 343 synchronized(resume_task_mutex) { 344 if(resume_task != null && resume_task.running()) { 345 return; } 347 else { 348 resume_task=new ResumeTask(max_suspend_time); 349 timer.add(resume_task, true); } 351 } 352 if(log.isDebugEnabled()) 353 log.debug("resume task started, max_suspend_time=" + max_suspend_time); 354 } 355 356 357 void stopResumeTask() { 358 synchronized(resume_task_mutex) { 359 if(resume_task != null) { 360 resume_task.stop(); 361 resume_task=null; 362 } 363 } 364 } 365 366 367 void startStabilityTask(Digest d, long delay) { 368 synchronized(stability_mutex) { 369 if(stability_task != null && stability_task.running()) { 370 return; } 372 else { 373 stability_task=new StabilitySendTask(d, delay); 374 timer.add(stability_task, true); } 376 } 377 } 378 379 380 void stopStabilityTask() { 381 synchronized(stability_mutex) { 382 if(stability_task != null) { 383 stability_task.stop(); 384 stability_task=null; 385 } 386 } 387 } 388 389 390 398 void handleStableGossip(Address sender, Digest d) { 399 Address mbr; 400 long highest_seqno, my_highest_seqno; 401 long highest_seen_seqno, my_highest_seen_seqno; 402 403 if(d == null || sender == null) { 404 if(log.isErrorEnabled()) log.error("digest or sender is null"); 405 return; 406 } 407 408 if(suspended) { 409 if(log.isDebugEnabled()) { 410 log.debug("STABLE message will not be handled as suspended=" + suspended); 411 } 412 return; 413 } 414 415 if(log.isDebugEnabled()) log.debug("received digest " + printStabilityDigest(d) + " from " + sender); 416 if(!heard_from.contains(sender)) { if(log.isDebugEnabled()) log.debug("already received gossip from " + sender); 418 return; 419 } 420 421 if(!this.digest.sameSenders(d)) { 424 if(log.isDebugEnabled()) { 425 log.debug("received digest from " + sender + " (digest=" + d + ") which does not match my own digest ("+ 426 this.digest + "): ignoring digest and re-initializing own digest"); 427 } 428 initialize(); 429 return; 430 } 431 432 for(int i=0; i < d.size(); i++) { 433 mbr=d.senderAt(i); 434 highest_seqno=d.highSeqnoAt(i); 435 highest_seen_seqno=d.highSeqnoSeenAt(i); 436 if(digest.getIndex(mbr) == -1) { 437 if(log.isDebugEnabled()) log.debug("sender " + mbr + " not found in stability vector"); 438 continue; 439 } 440 441 my_highest_seqno=digest.highSeqnoAt(mbr); 443 if(my_highest_seqno < 0) { 444 if(highest_seqno >= 0) 445 digest.setHighSeqnoAt(mbr, highest_seqno); 446 } 447 else { 448 digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno)); 449 } 450 451 my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr); 453 if(my_highest_seen_seqno < 0) { 454 if(highest_seen_seqno >= 0) 455 digest.setHighSeqnoSeenAt(mbr, highest_seen_seqno); 456 } 457 else { 458 digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno)); 459 } 460 } 461 462 heard_from.removeElement(sender); 463 if(heard_from.size() == 0) { 464 if(log.isDebugEnabled()) log.debug("sending stability msg " + printStabilityDigest(digest)); 465 sendStabilityMessage(digest.copy()); 466 initialize(); 467 } 468 } 469 470 471 475 void sendStableMessage() { 476 Digest d=null; 477 Message msg=new Message(); StableHeader hdr; 479 480 if(suspended) { 481 if(log.isTraceEnabled()) 482 log.trace("will not send STABLE message as suspended=" + suspended); 483 return; 484 } 485 486 d=getDigest(); 487 if(d != null && d.size() > 0) { 488 if(log.isTraceEnabled()) 489 log.trace("mcasting STABLE msg, digest=" + d + 490 " (num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')'); 491 hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d); 492 msg.putHeader(name, hdr); 493 passDown(new Event(Event.MSG, msg)); 494 } 495 } 496 497 498 499 Digest getDigest() { 500 Digest ret=null; 501 passDown(new Event(Event.GET_DIGEST_STABLE)); 502 ret=(Digest)digest_promise.getResult(digest_timeout); 503 if(ret == null) { 504 if(log.isErrorEnabled()) 505 log.error("digest could not be fetched from below " + "(timeout was " + digest_timeout + " msecs)"); 506 } 507 return ret; 508 } 509 510 511 521 void sendStabilityMessage(Digest tmp) { 522 long delay; 523 524 if(timer == null) { 525 if(log.isErrorEnabled()) 526 log.error("timer is null, cannot schedule stability message to be sent"); 527 timer=stack != null ? stack.timer : null; 528 return; 529 } 530 531 delay=Util.random(stability_delay); 535 startStabilityTask(tmp, delay); 536 } 537 538 539 void handleStabilityMessage(Digest d) { 540 if(d == null) { 541 if(log.isErrorEnabled()) log.error("stability vector is null"); 542 return; 543 } 544 545 if(suspended) { 546 if(log.isDebugEnabled()) { 547 log.debug("STABILITY message will not be handled as suspened=" + suspended); 548 } 549 return; 550 } 551 552 if(log.isDebugEnabled()) log.debug("stability vector is " + d.printHighSeqnos()); 553 stopStabilityTask(); 554 555 if(!this.digest.sameSenders(d)) { 558 if(log.isDebugEnabled()) { 559 log.debug("received digest (digest=" + d + ") which does not match my own digest ("+ 560 this.digest + "): ignoring digest and re-initializing own digest"); 561 } 562 initialize(); 563 return; 564 } 565 566 passDown(new Event(Event.STABLE, d)); 568 } 569 570 571 String printStabilityDigest(Digest d) { 572 StringBuffer sb=new StringBuffer (); 573 boolean first=true; 574 575 if(d != null) { 576 for(int i=0; i < d.size(); i++) { 577 if(!first) 578 sb.append(", "); 579 else 580 first=false; 581 sb.append(d.senderAt(i) + "#" + d.highSeqnoAt(i) + " (" + d.highSeqnoSeenAt(i) + ')'); 582 } 583 } 584 return sb.toString(); 585 } 586 587 588 589 590 591 592 593 594 595 public static class StableHeader extends Header implements Streamable { 596 static final int STABLE_GOSSIP=1; 597 static final int STABILITY=2; 598 599 int type=0; 600 Digest stableDigest=null; 603 public StableHeader() { 604 } 606 607 StableHeader(int type, Digest digest) { 608 this.type=type; 609 this.stableDigest=digest; 610 } 611 612 613 static String type2String(int t) { 614 switch(t) { 615 case STABLE_GOSSIP: 616 return "STABLE_GOSSIP"; 617 case STABILITY: 618 return "STABILITY"; 619 default: 620 return "<unknown>"; 621 } 622 } 623 624 public String toString() { 625 StringBuffer sb=new StringBuffer (); 626 sb.append('['); 627 sb.append(type2String(type)); 628 sb.append("]: digest is "); 629 sb.append(stableDigest); 630 return sb.toString(); 631 } 632 633 634 public void writeExternal(ObjectOutput out) throws IOException { 635 out.writeInt(type); 636 if(stableDigest == null) { 637 out.writeBoolean(false); 638 return; 639 } 640 out.writeBoolean(true); 641 stableDigest.writeExternal(out); 642 } 643 644 645 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 646 type=in.readInt(); 647 boolean digest_not_null=in.readBoolean(); 648 if(digest_not_null) { 649 stableDigest=new Digest(); 650 stableDigest.readExternal(in); 651 } 652 } 653 654 public void writeTo(DataOutputStream out) throws IOException { 655 out.writeInt(type); 656 Util.writeStreamable(stableDigest, out); 657 } 658 659 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 660 type=in.readInt(); 661 stableDigest=(Digest)Util.readStreamable(Digest.class, in); 662 } 663 664 665 } 666 667 668 669 670 676 private class StableTask implements TimeScheduler.Task { 677 boolean stopped=false; 678 679 public void stop() { 680 stopped=true; 681 } 682 683 public boolean running() { return !stopped; 685 } 686 687 public boolean cancelled() { 688 return stopped; 689 } 690 691 public long nextInterval() { 692 long interval=computeSleepTime(); 693 if(interval <= 0) 694 return 10000; 695 else 696 return interval; 697 } 698 699 700 public void run() { 701 if(suspended) { 702 if(log.isTraceEnabled()) 703 log.trace("stable task will not run as suspended=" + suspended); 704 stopStableTask(); 705 return; 706 } 707 initialize(); 708 sendStableMessage(); 709 num_gossip_runs--; 710 if(num_gossip_runs <= 0) { 711 if(log.isTraceEnabled()) 712 log.trace("stable task terminating (num_gossip_runs=" + 713 num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')'); 714 stopStableTask(); 715 } 716 } 717 718 long computeSleepTime() { 719 return getRandom((mbrs.size() * desired_avg_gossip * 2)); 720 } 721 722 long getRandom(long range) { 723 return (long)((Math.random() * range) % range); 724 } 725 } 726 727 728 729 730 731 734 private class StabilitySendTask implements TimeScheduler.Task { 735 Digest d=null; 736 boolean stopped=false; 737 long delay=2000; 738 739 740 public StabilitySendTask(Digest d, long delay) { 741 this.d=d; 742 this.delay=delay; 743 } 744 745 public boolean running() { 746 return !stopped; 747 } 748 749 public void stop() { 750 stopped=true; 751 } 752 753 public boolean cancelled() { 754 return stopped; 755 } 756 757 758 759 public long nextInterval() { 760 return delay; 761 } 762 763 764 public void run() { 765 Message msg; 766 StableHeader hdr; 767 768 if(suspended) { 769 if(log.isDebugEnabled()) { 770 log.debug("STABILITY message will not be sent as suspended=" + suspended); 771 } 772 stopped=true; 773 return; 774 } 775 776 if(d != null && !stopped) { 777 msg=new Message(); 778 hdr=new StableHeader(StableHeader.STABILITY, d); 779 msg.putHeader(STABLE.name, hdr); 780 passDown(new Event(Event.MSG, msg)); 781 d=null; 782 } 783 stopped=true; } 785 } 786 787 788 private class ResumeTask implements TimeScheduler.Task { 789 boolean running=true; 790 long max_suspend_time=0; 791 792 ResumeTask(long max_suspend_time) { 793 this.max_suspend_time=max_suspend_time; 794 } 795 796 void stop() { 797 running=false; 798 } 799 800 public boolean running() { 801 return running; 802 } 803 804 public boolean cancelled() { 805 return running == false; 806 } 807 808 public long nextInterval() { 809 return max_suspend_time; 810 } 811 812 public void run() { 813 if(suspended) 814 log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " + 815 "check why this event was not received (or increase max_suspend_time for large state transfers)"); 816 resume(); 817 } 818 } 819 820 821 } 822 | Popular Tags |