1 package org.jgroups.protocols.pbcast; 2 3 import org.jgroups.*; 4 import org.jgroups.annotations.GuardedBy; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.util.Digest; 7 import org.jgroups.util.Promise; 8 import org.jgroups.util.Streamable; 9 import org.jgroups.util.Util; 10 11 import java.io.*; 12 import java.util.*; 13 14 41 public class FLUSH extends Protocol { 42 public static final String NAME = "FLUSH"; 43 44 @GuardedBy ("sharedLock") 45 private View currentView; 46 47 private Address localAddress; 48 49 54 @GuardedBy ("sharedLock") 55 private Address flushCoordinator; 56 57 @GuardedBy ("sharedLock") 58 private final List<Address> flushMembers; 59 60 @GuardedBy ("sharedLock") 61 private final Set<Address> flushOkSet; 62 63 @GuardedBy ("sharedLock") 64 private final Map<Address, Digest> flushCompletedMap; 65 66 @GuardedBy ("sharedLock") 67 private final Set<Address> stopFlushOkSet; 68 69 @GuardedBy ("sharedLock") 70 private final Set<Address> suspected; 71 72 private final Object sharedLock = new Object (); 73 74 private final Object blockMutex = new Object (); 75 76 80 @GuardedBy ("blockMutex") 81 private boolean isBlockingFlushDown = true; 82 83 87 private long timeout = 8000; 88 89 private boolean enable_reconciliation = true; 90 91 @GuardedBy ("sharedLock") 92 private boolean receivedFirstView = false; 93 94 @GuardedBy ("sharedLock") 95 private boolean receivedMoreThanOneView = false; 96 97 private long startFlushTime; 98 99 private long totalTimeInFlush; 100 101 private int numberOfFlushes; 102 103 private double averageFlushDuration; 104 105 private final Promise flush_promise = new Promise(); 106 107 @GuardedBy ("sharedLock") 108 private final FlushPhase flushPhase = new FlushPhase(); 109 110 @GuardedBy ("sharedLock") 111 private final List<Address> reconcileOks = new ArrayList<Address>(); 112 113 public FLUSH() { 114 super(); 115 currentView = new View(new ViewId(), new Vector<Address>()); 116 flushOkSet = new TreeSet<Address>(); 117 flushCompletedMap = new HashMap<Address, Digest>(); 118 stopFlushOkSet = new TreeSet<Address>(); 119 flushMembers = new ArrayList<Address>(); 120 suspected = new TreeSet<Address>(); 121 } 122 123 public String getName() { 124 return NAME; 125 } 126 127 public boolean setProperties(Properties props) { 128 super.setProperties(props); 129 130 timeout = Util.parseLong(props, "timeout", timeout); 131 enable_reconciliation = Util.parseBoolean(props, "enable_reconciliation", enable_reconciliation); 132 String str = props.getProperty("auto_flush_conf"); 133 if (str != null) { 134 log.warn("auto_flush_conf has been deprecated and its value will be ignored"); 135 props.remove("auto_flush_conf"); 136 } 137 138 if (!props.isEmpty()) { 139 log.error("the following properties are not recognized: " + props); 140 return false; 141 } 142 return true; 143 } 144 145 public void start() throws Exception { 146 Map<String ,Boolean > map = new HashMap<String ,Boolean >(); 147 map.put("flush_supported", Boolean.TRUE); 148 up_prot.up(new Event(Event.CONFIG, map)); 149 down_prot.down(new Event(Event.CONFIG, map)); 150 151 synchronized (sharedLock) { 152 receivedFirstView = false; 153 receivedMoreThanOneView = false; 154 } 155 synchronized (blockMutex) { 156 isBlockingFlushDown = true; 157 } 158 } 159 160 public void stop() { 161 synchronized (sharedLock) { 162 currentView = new View(new ViewId(), new Vector<Address>()); 163 flushCompletedMap.clear(); 164 flushOkSet.clear(); 165 stopFlushOkSet.clear(); 166 flushMembers.clear(); 167 suspected.clear(); 168 flushCoordinator = null; 169 } 170 } 171 172 173 174 public double getAverageFlushDuration() { 175 return averageFlushDuration; 176 } 177 178 public long getTotalTimeInFlush() { 179 return totalTimeInFlush; 180 } 181 182 public int getNumberOfFlushes() { 183 return numberOfFlushes; 184 } 185 186 public boolean startFlush(long timeout) { 187 Map atts = new HashMap(); 188 atts.put("timeout", new Long (timeout)); 189 return startFlush(new Event(Event.SUSPEND,atts), 3, false); 190 } 191 192 private boolean startFlush(Event evt, int numberOfAttempts, boolean isRetry) { 193 boolean successfulFlush = false; 194 if (!flushPhase.isFlushInProgress() || isRetry) { 195 flush_promise.reset(); 196 Map atts = (Map) evt.getArg(); 197 long timeout = ((Long )atts.get("timeout")).longValue(); 198 if (log.isDebugEnabled()){ 199 if(isRetry) 200 log.debug("Retrying FLUSH at " + localAddress + ", "+ evt + ". Attempts left " + numberOfAttempts); 201 else 202 log.debug("Received " + evt+ " at " + localAddress + ". Running FLUSH..."); 203 } 204 205 onSuspend((View)atts.get("view")); 206 try { 207 Boolean r = (Boolean ) flush_promise.getResultWithTimeout(timeout); 208 successfulFlush = r.booleanValue(); 209 } catch (TimeoutException e) { 210 if (log.isTraceEnabled()) 211 log.trace("At " + localAddress 212 + " timed out waiting for flush responses after " 213 + timeout + " msec"); 214 } 215 } 216 217 if (!successfulFlush && numberOfAttempts > 0) { 218 long backOffSleepTime = Util.random(5); 219 backOffSleepTime = backOffSleepTime < 2 ? backOffSleepTime + 2: backOffSleepTime; 220 if (log.isTraceEnabled()) 221 log.trace("At " + localAddress + ". Backing off for " 222 + backOffSleepTime + " sec. Attempts left " 223 + numberOfAttempts); 224 225 Util.sleep(backOffSleepTime * 1000); 226 Boolean succeededWhileWeSlept = (Boolean )flush_promise.getResult(1); 227 boolean shouldRetry = !(succeededWhileWeSlept !=null && succeededWhileWeSlept.booleanValue()); 228 if(shouldRetry) 229 successfulFlush = startFlush(evt, --numberOfAttempts, true); 230 } 231 return successfulFlush; 232 } 233 234 public void stopFlush() { 235 down(new Event(Event.RESUME)); 236 } 237 238 242 243 public Object down(Event evt) { 244 switch (evt.getType()) { 245 case Event.MSG: 246 Message msg = (Message) evt.getArg(); 247 FlushHeader fh = (FlushHeader) msg.getHeader(getName()); 248 if (fh != null && fh.type == FlushHeader.FLUSH_BYPASS) { 249 return down_prot.down(evt); 250 } else { 251 blockMessageDuringFlush(); 252 } 253 break; 254 case Event.GET_STATE: 255 blockMessageDuringFlush(); 256 break; 257 258 case Event.CONNECT: 259 sendBlockUpToChannel(); 260 break; 261 262 case Event.SUSPEND: 263 return startFlush(evt, 3, false); 264 265 case Event.RESUME: 266 onResume(); 267 return null; 268 } 269 return down_prot.down(evt); 270 } 271 272 private void blockMessageDuringFlush() { 273 boolean shouldSuspendByItself = false; 274 long start = 0, stop = 0; 275 synchronized (blockMutex) { 276 while (isBlockingFlushDown) { 277 if (log.isDebugEnabled()) 278 log.debug("FLUSH block at " + localAddress + " for " 279 + (timeout <= 0 ? "ever" : timeout + "ms")); 280 try { 281 start = System.currentTimeMillis(); 282 if (timeout <= 0) 283 blockMutex.wait(); 284 else 285 blockMutex.wait(timeout); 286 stop = System.currentTimeMillis(); 287 } catch (InterruptedException e) { 288 Thread.currentThread().interrupt(); } 291 if (isBlockingFlushDown) { 292 isBlockingFlushDown = false; 293 shouldSuspendByItself = true; 294 blockMutex.notifyAll(); 295 } 296 } 297 } 298 if (shouldSuspendByItself) { 299 log.warn("unblocking FLUSH.down() at " + localAddress 300 + " after timeout of " + (stop - start) + "ms"); 301 flush_promise.setResult(Boolean.TRUE); 302 } 303 } 304 305 public Object up(Event evt) { 306 307 switch (evt.getType()) { 308 case Event.MSG: 309 Message msg = (Message) evt.getArg(); 310 FlushHeader fh = (FlushHeader) msg.getHeader(getName()); 311 if (fh != null) { 312 if (fh.type == FlushHeader.FLUSH_BYPASS) { 313 return up_prot.up(evt); 314 } else if (fh.type == FlushHeader.START_FLUSH) { 315 handleStartFlush(msg, fh); 316 } else if (fh.type == FlushHeader.FLUSH_RECONCILE) { 317 handleFlushReconcile(msg, fh); 318 } else if (fh.type == FlushHeader.FLUSH_RECONCILE_OK) { 319 onFlushReconcileOK(msg); 320 } else if (fh.type == FlushHeader.STOP_FLUSH) { 321 onStopFlush(); 322 } else if (fh.type == FlushHeader.ABORT_FLUSH) { 323 flush_promise.setResult(Boolean.FALSE); 325 } else if (isCurrentFlushMessage(fh)) { 326 if (fh.type == FlushHeader.FLUSH_OK) { 327 onFlushOk(msg.getSrc(), fh.viewID); 328 } else if (fh.type == FlushHeader.STOP_FLUSH_OK) { 329 onStopFlushOk(msg.getSrc()); 330 } else if (fh.type == FlushHeader.FLUSH_COMPLETED) { 331 onFlushCompleted(msg.getSrc(), fh.digest); 332 } 333 } else { 334 if (log.isDebugEnabled()) 335 log.debug(localAddress 336 + " received outdated FLUSH message " + fh 337 + ",ignoring it."); 338 } 339 return null; } 341 break; 342 343 case Event.VIEW_CHANGE: 344 View newView = (View) evt.getArg(); 349 boolean firstView = onViewChange(newView); 350 boolean singletonMember = newView.size() == 1 351 && newView.containsMember(localAddress); 352 if (firstView && singletonMember) { 353 up_prot.up(evt); 354 synchronized (blockMutex) { 355 isBlockingFlushDown = false; 356 blockMutex.notifyAll(); 357 } 358 if (log.isDebugEnabled()) 359 log.debug("At " 360 + localAddress 361 + " unblocking FLUSH.down() and sending UNBLOCK up"); 362 363 up_prot.up(new Event(Event.UNBLOCK)); 364 return null; 365 } 366 break; 367 368 case Event.TMP_VIEW: 369 383 View tmpView = (View) evt.getArg(); 384 if(!tmpView.containsMember(localAddress)){ 385 onViewChange(tmpView); 386 } 387 break; 388 389 case Event.SET_LOCAL_ADDRESS: 390 localAddress = (Address) evt.getArg(); 391 break; 392 393 case Event.SUSPECT: 394 onSuspect((Address) evt.getArg()); 395 break; 396 397 case Event.SUSPEND: 398 return startFlush(evt, 3, false); 399 400 case Event.RESUME: 401 onResume(); 402 return null; 403 404 } 405 406 return up_prot.up(evt); 407 } 408 409 private void onFlushReconcileOK(Message msg) { 410 if (log.isDebugEnabled()) 411 log.debug(localAddress + " received reconcile ok from " 412 + msg.getSrc()); 413 414 synchronized (sharedLock) { 415 reconcileOks.add(msg.getSrc()); 416 if (reconcileOks.size() >= flushMembers.size()) { 417 flush_promise.setResult(Boolean.TRUE); 418 if (log.isDebugEnabled()) 419 log.debug("All FLUSH_RECONCILE_OK received at " 420 + localAddress); 421 } 422 } 423 } 424 425 private void handleFlushReconcile(Message msg, FlushHeader fh) { 426 Address requester = msg.getSrc(); 427 Digest reconcileDigest = fh.digest; 428 429 if (log.isDebugEnabled()) 430 log.debug("Received FLUSH_RECONCILE at " + localAddress 431 + " passing digest to NAKACK " + reconcileDigest); 432 433 down_prot.down(new Event(Event.REBROADCAST, reconcileDigest)); 435 436 if (log.isDebugEnabled()) 437 log.debug("Returned from FLUSH_RECONCILE at " + localAddress 438 + " Sending RECONCILE_OK to " + requester + ", thread " 439 + Thread.currentThread()); 440 441 Message reconcileOk = new Message(requester); 442 reconcileOk.setFlag(Message.OOB); 443 reconcileOk.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK)); 444 down_prot.down(new Event(Event.MSG, reconcileOk)); 445 } 446 447 private void handleStartFlush(Message msg, FlushHeader fh) { 448 byte oldPhase = flushPhase.transitionToFirstPhase(); 449 if (oldPhase == FlushPhase.START_PHASE) { 450 sendBlockUpToChannel(); 451 onStartFlush(msg.getSrc(), fh); 452 } else if (oldPhase == FlushPhase.FIRST_PHASE) { 453 Address flushRequester = msg.getSrc(); 454 Address coordinator = null; 455 synchronized (sharedLock) { 456 if(flushCoordinator != null) 457 coordinator = flushCoordinator; 458 else 459 coordinator = flushRequester; 460 } 461 462 if (flushRequester.compareTo(coordinator) < 0) { 463 rejectFlush(fh.viewID, coordinator); 464 if (log.isDebugEnabled()) { 465 log.debug("Rejecting flush at " + localAddress 466 + " to current flush coordinator " + coordinator 467 + " and switching flush coordinator to " 468 + flushRequester); 469 } 470 synchronized (sharedLock) { 471 flushCoordinator = flushRequester; 472 } 473 } else if (flushRequester.compareTo(coordinator) > 0) { 474 rejectFlush(fh.viewID, flushRequester); 475 if (log.isDebugEnabled()) { 476 log.debug("Rejecting flush at " + localAddress 477 + " to flush requester " + flushRequester + " coordinator is " + coordinator); 478 } 479 } 480 else if (flushRequester.equals(coordinator)){ 481 if (log.isDebugEnabled()) { 482 log.debug("Accepting flush at " + localAddress 483 + ", proceeding with flush"); 484 } 485 onStartFlush(msg.getSrc(), fh); 486 } 487 } else if (oldPhase == FlushPhase.SECOND_PHASE) { 488 Address flushRequester = msg.getSrc(); 489 rejectFlush(fh.viewID, flushRequester); 490 if (log.isDebugEnabled()) { 491 log.debug("Rejecting flush in second phase at " + localAddress 492 + " to flush requester " + flushRequester); 493 } 494 } 495 } 496 497 public Vector<Integer > providedDownServices() { 498 Vector<Integer > retval = new Vector<Integer >(2); 499 retval.addElement(new Integer (Event.SUSPEND)); 500 retval.addElement(new Integer (Event.RESUME)); 501 return retval; 502 } 503 504 private void rejectFlush(long viewId, Address flushRequester) { 505 Message reject = new Message(flushRequester, localAddress, null); 506 reject.putHeader(getName(), new FlushHeader(FlushHeader.ABORT_FLUSH,viewId)); 507 down_prot.down(new Event(Event.MSG, reject)); 508 } 509 510 private void sendBlockUpToChannel() { 511 up_prot.up(new Event(Event.BLOCK)); 512 } 513 514 private boolean isCurrentFlushMessage(FlushHeader fh) { 515 return fh.viewID == currentViewId(); 516 } 517 518 private long currentViewId() { 519 long viewId = -1; 520 synchronized (sharedLock) { 521 ViewId view = currentView.getVid(); 522 if (view != null) { 523 viewId = view.getId(); 524 } 525 } 526 return viewId; 527 } 528 529 private boolean onViewChange(View view) { 530 boolean amINewCoordinator = false; 531 boolean isThisOurFirstView = false; 532 synchronized (sharedLock) { 533 if (receivedFirstView) { 534 receivedMoreThanOneView = true; 535 } 536 if (!receivedFirstView) { 537 receivedFirstView = true; 538 } 539 isThisOurFirstView = receivedFirstView && !receivedMoreThanOneView; 540 suspected.retainAll(view.getMembers()); 541 currentView = view; 542 boolean coordinatorLeft = flushCoordinator != null && !view.containsMember(flushCoordinator); 543 544 if(coordinatorLeft){ 545 flushCoordinator = view.getMembers().get(0); 546 amINewCoordinator = localAddress.equals(flushCoordinator); 547 } 548 } 549 550 if (amINewCoordinator) { 556 if (log.isDebugEnabled()) 557 log.debug("Coordinator left, " + localAddress 558 + " will complete flush"); 559 onResume(); 560 } 561 562 if (log.isDebugEnabled()) 563 log.debug("Installing view at " + localAddress + " view is " 564 + view); 565 566 return isThisOurFirstView; 567 } 568 569 private void onStopFlush() { 570 flushPhase.transitionToSecondPhase(); 571 if (stats) { 572 long stopFlushTime = System.currentTimeMillis(); 573 totalTimeInFlush += (stopFlushTime - startFlushTime); 574 if (numberOfFlushes > 0) { 575 averageFlushDuration = totalTimeInFlush / (double) numberOfFlushes; 576 } 577 } 578 579 590 boolean amISurvivingMember = false; 591 synchronized(sharedLock){ 592 amISurvivingMember = currentView.containsMember(localAddress); 593 } 594 if(amISurvivingMember){ 595 Message msg = new Message(null, localAddress, null); 596 msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH_OK,currentViewId())); 597 down_prot.down(new Event(Event.MSG, msg)); 598 if (log.isDebugEnabled()) 599 log.debug("Received STOP_FLUSH and sent STOP_FLUSH_OK from " 600 + localAddress); 601 } 602 } 603 604 private void onSuspend(View view) { 605 Message msg = null; 606 Collection<Address> participantsInFlush = null; 607 synchronized (sharedLock) { 608 if (view != null) { 610 participantsInFlush = new ArrayList<Address>(view.getMembers()); 611 participantsInFlush.retainAll(currentView.getMembers()); 612 } else { 613 participantsInFlush = new ArrayList<Address>(currentView.getMembers()); 614 } 615 msg = new Message(null, localAddress, null); 616 msg.putHeader(getName(), new FlushHeader(FlushHeader.START_FLUSH, 617 currentViewId(), participantsInFlush)); 618 } 619 if (participantsInFlush.isEmpty()) { 620 flush_promise.setResult(Boolean.TRUE); 621 } else { 622 down_prot.down(new Event(Event.MSG, msg)); 623 if (log.isDebugEnabled()) 624 log.debug("Flush coordinator " + localAddress 625 + " is starting FLUSH with participants " + participantsInFlush); 626 } 627 } 628 629 private void onResume() { 630 long viewID = currentViewId(); 631 Message msg = new Message(null, localAddress, null); 632 msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH, viewID)); 633 down_prot.down(new Event(Event.MSG, msg)); 634 if (log.isDebugEnabled()) 635 log.debug("Received RESUME at " + localAddress 636 + ", sent STOP_FLUSH to all"); 637 } 638 639 private void onStartFlush(Address flushStarter, FlushHeader fh) { 640 if (stats) { 641 startFlushTime = System.currentTimeMillis(); 642 numberOfFlushes += 1; 643 } 644 boolean amIParticipant = false; 645 synchronized (sharedLock) { 646 flushCoordinator = flushStarter; 647 flushMembers.clear(); 648 if (fh.flushParticipants != null) { 649 flushMembers.addAll(fh.flushParticipants); 650 } 651 flushMembers.removeAll(suspected); 652 amIParticipant = flushMembers.contains(localAddress); 653 } 654 if (amIParticipant) { 655 Message msg = new Message(null); 656 msg.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_OK,fh.viewID)); 657 down_prot.down(new Event(Event.MSG, msg)); 658 if (log.isDebugEnabled()) 659 log.debug("Received START_FLUSH at " + localAddress 660 + " responded with FLUSH_OK"); 661 } 662 } 663 664 private void onFlushOk(Address address, long viewID) { 665 666 boolean flushOkCompleted = false; 667 boolean amIParticipant = false; 668 Message m = null; 669 synchronized (sharedLock) { 670 amIParticipant = flushMembers.contains(address); 671 flushOkSet.add(address); 672 flushOkCompleted = flushOkSet.containsAll(flushMembers); 673 if (flushOkCompleted) { 674 m = new Message(flushCoordinator); 675 } 676 if (log.isDebugEnabled()) 677 log.debug("At " + localAddress + " FLUSH_OK from " + address 678 + ",completed " + flushOkCompleted + ", flushOkSet " 679 + flushOkSet); 680 } 681 682 if (flushOkCompleted && amIParticipant) { 683 synchronized (blockMutex) { 684 isBlockingFlushDown = true; 685 } 686 Digest digest = (Digest) down_prot.down(new Event(Event.GET_DIGEST)); 687 FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_COMPLETED,viewID); 688 fh.addDigest(digest); 689 m.putHeader(getName(), fh); 690 if (log.isDebugEnabled()) 691 log.debug(localAddress 692 + " is blocking FLUSH.down(). Sending FLUSH_COMPLETED message to " 693 + flushCoordinator); 694 down_prot.down(new Event(Event.MSG, m)); 695 696 } 697 } 698 699 private void onStopFlushOk(Address address) { 700 701 boolean stopFlushOkCompleted = false; 702 synchronized (sharedLock) { 703 stopFlushOkSet.add(address); 704 TreeSet<Address> membersCopy = new TreeSet<Address>(currentView.getMembers()); 705 membersCopy.removeAll(suspected); 706 stopFlushOkCompleted = stopFlushOkSet.containsAll(membersCopy); 707 708 if (log.isDebugEnabled()) 709 log.debug("At " + localAddress + " STOP_FLUSH_OK from " + address 710 + ",completed " + stopFlushOkCompleted 711 + ", stopFlushOkSet " + stopFlushOkSet); 712 } 713 714 if (stopFlushOkCompleted) { 715 synchronized (sharedLock) { 716 flushPhase.transitionToStart(); 717 flushCompletedMap.clear(); 718 flushOkSet.clear(); 719 stopFlushOkSet.clear(); 720 flushMembers.clear(); 721 suspected.clear(); 722 flushCoordinator = null; 723 } 724 725 if (log.isDebugEnabled()) 726 log.debug("At " + localAddress 727 + " unblocking FLUSH.down() and sending UNBLOCK up"); 728 729 synchronized (blockMutex) { 730 isBlockingFlushDown = false; 731 blockMutex.notifyAll(); 732 } 733 up_prot.up(new Event(Event.UNBLOCK)); 734 } 735 } 736 737 private void onFlushCompleted(Address address, Digest digest) { 738 boolean flushCompleted = false; 739 Message msg = null; 740 boolean needsReconciliationPhase = false; 741 synchronized (sharedLock) { 742 flushCompletedMap.put(address, digest); 743 if (flushCompletedMap.size() >= flushMembers.size()) { 744 flushCompleted = flushCompletedMap.keySet().containsAll(flushMembers); 745 } 746 747 if (log.isDebugEnabled()) 748 log.debug("At " + localAddress + " FLUSH_COMPLETED from " + address 749 + ",completed " + flushCompleted + ",flushCompleted " 750 + flushCompletedMap.keySet()); 751 752 needsReconciliationPhase = enable_reconciliation && flushCompleted && hasVirtualSynchronyGaps(); 753 if (needsReconciliationPhase){ 754 755 Digest d = findHighestSequences(); 756 msg = new Message(); 757 msg.setFlag(Message.OOB); 758 FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_RECONCILE, currentViewId(), flushMembers); 759 reconcileOks.clear(); 760 fh.addDigest(d); 761 msg.putHeader(getName(), fh); 762 763 if (log.isTraceEnabled()) 764 log.trace("Reconciling flush mebers due to virtual synchrony gap, digest is " 765 + d + " flush members are " + flushMembers); 766 767 flushCompletedMap.clear(); 768 } 769 } 770 if(needsReconciliationPhase){ 771 down_prot.down(new Event(Event.MSG, msg)); 772 }else if(flushCompleted){ 773 flush_promise.setResult(Boolean.TRUE); 774 if (log.isDebugEnabled()) 775 log.debug("All FLUSH_COMPLETED received at " 776 + localAddress); 777 } 778 } 779 780 private boolean hasVirtualSynchronyGaps() { 781 ArrayList <Digest> digests = new ArrayList<Digest>(); 782 digests.addAll(flushCompletedMap.values()); 783 Digest firstDigest = digests.get(0); 784 List<Digest> remainingDigests = digests.subList(1, digests.size()); 785 for (Digest digest : remainingDigests) { 786 Digest diff = firstDigest.difference(digest); 787 if (diff != Digest.EMPTY_DIGEST) { 788 return true; 789 } 790 } 791 return false; 792 } 793 794 private Digest findHighestSequences() { 795 Digest result = null; 796 List<Digest> digests = new ArrayList<Digest>(flushCompletedMap.values()); 797 798 result =digests.get(0); 799 List<Digest> remainingDigests = digests.subList(1, digests.size()); 800 801 for (Digest digestG : remainingDigests) { 802 result = result.highestSequence(digestG); 803 } 804 return result; 805 } 806 807 private void onSuspect(Address address) { 808 boolean flushOkCompleted = false; 809 Message m = null; 810 long viewID = 0; 811 synchronized (sharedLock) { 812 suspected.add(address); 813 flushMembers.removeAll(suspected); 814 viewID = currentViewId(); 815 flushOkCompleted = !flushOkSet.isEmpty() && flushOkSet.containsAll(flushMembers); 816 if (flushOkCompleted) { 817 m = new Message(flushCoordinator, localAddress, null); 818 } 819 if (log.isDebugEnabled()) 820 log.debug("Suspect is " + address + ",completed " 821 + flushOkCompleted + ", flushOkSet " + flushOkSet 822 + " flushMembers " + flushMembers); 823 } 824 if (flushOkCompleted) { 825 Digest digest = (Digest) down_prot.down(new Event(Event.GET_DIGEST)); 826 FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_COMPLETED,viewID); 827 fh.addDigest(digest); 828 m.putHeader(getName(), fh); 829 down_prot.down(new Event(Event.MSG, m)); 830 if (log.isDebugEnabled()) 831 log.debug(localAddress + " sent FLUSH_COMPLETED message to " 832 + flushCoordinator); 833 } 834 } 835 836 private class FlushPhase { 837 private byte phase = 0; 838 839 public static final byte START_PHASE = 0; 840 841 public static final byte FIRST_PHASE = 1; 842 843 public static final byte SECOND_PHASE = 2; 844 845 FlushPhase() { 846 } 847 848 public byte transitionToFirstPhase() { 849 byte oldPhase = -1; 850 synchronized(sharedLock){ 851 oldPhase = phase; 852 if(oldPhase == START_PHASE){ 853 phase = FIRST_PHASE; 854 } 855 } 856 return oldPhase; 857 } 858 859 public void transitionToStart() { 860 synchronized(sharedLock){ 861 phase = START_PHASE; 862 } 863 } 864 865 public void transitionToSecondPhase() { 866 synchronized(sharedLock){ 867 phase = SECOND_PHASE; 868 } 869 } 870 871 public boolean isFlushInProgress() { 872 synchronized(sharedLock){ 873 return phase != START_PHASE; 874 } 875 } 876 } 877 878 public static class FlushHeader extends Header implements Streamable { 879 public static final byte START_FLUSH = 0; 880 881 public static final byte FLUSH_OK = 1; 882 883 public static final byte STOP_FLUSH = 2; 884 885 public static final byte FLUSH_COMPLETED = 3; 886 887 public static final byte STOP_FLUSH_OK = 4; 888 889 public static final byte ABORT_FLUSH = 5; 890 891 public static final byte FLUSH_BYPASS = 6; 892 893 public static final byte FLUSH_RECONCILE = 7; 894 895 public static final byte FLUSH_RECONCILE_OK = 8; 896 897 byte type; 898 899 long viewID; 900 901 Collection<Address> flushParticipants; 902 903 Digest digest = null; 904 905 public FlushHeader() { 906 this(START_FLUSH, 0); 907 } 909 public FlushHeader(byte type) { 910 this(type, 0); 911 } 912 913 public FlushHeader(byte type, long viewID) { 914 this(type, viewID, null); 915 } 916 917 public FlushHeader(byte type, long viewID, Collection<Address> flushView) { 918 this.type = type; 919 this.viewID = viewID; 920 this.flushParticipants = flushView; 921 } 922 923 public void addDigest(Digest digest) { 924 this.digest = digest; 925 } 926 927 public String toString() { 928 switch (type) { 929 case START_FLUSH: 930 return "FLUSH[type=START_FLUSH,viewId=" + viewID + ",members=" 931 + flushParticipants + "]"; 932 case FLUSH_OK: 933 return "FLUSH[type=FLUSH_OK,viewId=" + viewID + "]"; 934 case STOP_FLUSH: 935 return "FLUSH[type=STOP_FLUSH,viewId=" + viewID + "]"; 936 case STOP_FLUSH_OK: 937 return "FLUSH[type=STOP_FLUSH_OK,viewId=" + viewID + "]"; 938 case ABORT_FLUSH: 939 return "FLUSH[type=ABORT_FLUSH,viewId=" + viewID + "]"; 940 case FLUSH_COMPLETED: 941 return "FLUSH[type=FLUSH_COMPLETED,viewId=" + viewID + "]"; 942 case FLUSH_BYPASS: 943 return "FLUSH[type=FLUSH_BYPASS,viewId=" + viewID + "]"; 944 case FLUSH_RECONCILE: 945 return "FLUSH[type=FLUSH_RECONCILE,viewId=" + viewID 946 + ",digest=" + digest + "]"; 947 case FLUSH_RECONCILE_OK: 948 return "FLUSH[type=FLUSH_RECONCILE_OK,viewId=" + viewID + "]"; 949 default: 950 return "[FLUSH: unknown type (" + type + ")]"; 951 } 952 } 953 954 public void writeExternal(ObjectOutput out) throws IOException { 955 out.writeByte(type); 956 out.writeLong(viewID); 957 out.writeObject(flushParticipants); 958 out.writeObject(digest); 959 } 960 961 public void readExternal(ObjectInput in) throws IOException, 962 ClassNotFoundException { 963 type = in.readByte(); 964 viewID = in.readLong(); 965 flushParticipants = (Collection) in.readObject(); 966 digest = (Digest) in.readObject(); 967 } 968 969 public void writeTo(DataOutputStream out) throws IOException { 970 out.writeByte(type); 971 out.writeLong(viewID); 972 if (flushParticipants != null && !flushParticipants.isEmpty()) { 973 out.writeShort(flushParticipants.size()); 974 for (Iterator<Address> iter = flushParticipants.iterator(); iter.hasNext();) { 975 Address address = iter.next(); 976 Util.writeAddress(address, out); 977 } 978 } else { 979 out.writeShort(0); 980 } 981 if (digest != null) { 982 out.writeBoolean(true); 983 Util.writeStreamable(digest, out); 984 } else { 985 out.writeBoolean(false); 986 } 987 } 988 989 public void readFrom(DataInputStream in) throws IOException, 990 IllegalAccessException , InstantiationException { 991 type = in.readByte(); 992 viewID = in.readLong(); 993 int flushParticipantsSize = in.readShort(); 994 if (flushParticipantsSize > 0) { 995 flushParticipants = new ArrayList<Address>(flushParticipantsSize); 996 for (int i = 0; i < flushParticipantsSize; i++) { 997 flushParticipants.add(Util.readAddress(in)); 998 } 999 } 1000 boolean hasDigest = in.readBoolean(); 1001 if (hasDigest) { 1002 digest = (Digest) Util.readStreamable(Digest.class, in); 1003 } 1004 } 1005 } 1006} 1007 | Popular Tags |