1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.blocks.GroupRequest; 7 import org.jgroups.protocols.pbcast.Digest; 8 import org.jgroups.protocols.ring.RingNodeFlowControl; 9 import org.jgroups.protocols.ring.RingToken; 10 import org.jgroups.protocols.ring.TokenLostException; 11 import org.jgroups.protocols.ring.UdpRingNode; 12 import org.jgroups.stack.IpAddress; 13 import org.jgroups.stack.RpcProtocol; 14 import org.jgroups.util.RspList; 15 import org.jgroups.util.Util; 16 17 import java.io.IOException ; 18 import java.io.ObjectInput ; 19 import java.io.ObjectOutput ; 20 import java.util.*; 21 22 60 61 62 public class TOTAL_TOKEN extends RpcProtocol 63 { 64 65 66 public static class TotalTokenHeader extends Header 67 { 68 69 70 73 private long seq; 74 75 76 79 public TotalTokenHeader() 80 { 81 } 82 83 public TotalTokenHeader(long seq) 84 { 85 this.seq = seq; 86 } 87 88 public TotalTokenHeader(Long seq) 89 { 90 this.seq = seq.longValue(); 91 } 92 93 94 98 public long getSeq() 99 { 100 return seq; 101 } 102 103 107 public long size() 108 { 109 return 121; 111 } 112 113 118 public void writeExternal(ObjectOutput out) throws IOException 119 { 120 out.writeLong(seq); 121 } 122 123 127 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException 128 { 129 seq = in.readLong(); 130 } 131 132 public String toString() 133 { 134 return "[TotalTokenHeader=" + seq + ']'; 135 } 136 } 137 138 public static class RingTokenHeader extends Header 139 { 140 public RingTokenHeader() 141 { 142 } 143 144 public void writeExternal(ObjectOutput out) throws IOException 145 { 146 } 147 148 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException 149 { 150 } 151 152 public long size() 153 { 154 return 110; 156 } 157 } 158 159 160 private static final int OPERATIONAL_STATE = 0; 161 private static final int RECOVERY_STATE = 1; 162 163 UdpRingNode node; 164 RingNodeFlowControl flowControl; 165 Address localAddress; 166 final TokenTransmitter tokenRetransmitter=new TokenTransmitter(); 167 List newMessagesQueue; 168 SortedSet liveMembersInRecovery,suspects; 169 170 final Object mutex = new Object (); 171 TreeMap receivedMessagesQueue; 172 long myAru = 0; 173 174 final Object threadCoordinationMutex = new Object (); 175 final boolean tokenInStack = false; 176 final boolean threadDeliveringMessage = false; 177 boolean tokenSeen = false; 178 179 180 volatile boolean isRecoveryLeader = false; 181 volatile int state; 182 volatile int sleepTime = 10; 183 184 long highestSeenSeq = 0; 185 long lastRoundTokensAru = 0; 186 int lastRoundTransmitCount,lastRoundRebroadcastCount = 0; 187 int blockSendingBacklogThreshold = Integer.MAX_VALUE; 188 int unblockSendingBacklogThreshold = Integer.MIN_VALUE; 189 boolean tokenCirculating = false; 190 boolean senderBlocked = false; 191 public static final String prot_name = "TOTAL_TOKEN"; 192 193 194 public String getName() 195 { 196 return prot_name; 197 } 198 199 private String getState() 200 { 201 if (state == OPERATIONAL_STATE) 202 { 203 return "OPERATIONAL"; 204 } 205 else 206 return "RECOVERY"; 207 } 208 209 210 public void start() throws Exception { 211 super.start(); 212 newMessagesQueue = Collections.synchronizedList(new ArrayList()); 213 receivedMessagesQueue = new TreeMap(); 214 tokenRetransmitter.start(); 215 } 216 217 220 public void stop() 221 { 222 super.stop(); 223 tokenRetransmitter.shutDown(); 224 } 225 226 227 228 229 233 public boolean setProperties(Properties props) 234 { 235 String str; 236 237 super.setProperties(props); 238 str = props.getProperty("block_sending"); 239 if (str != null) 240 { 241 blockSendingBacklogThreshold = Integer.parseInt(str); 242 props.remove("block_sending"); 243 } 244 245 str = props.getProperty("unblock_sending"); 246 if (str != null) 247 { 248 unblockSendingBacklogThreshold = Integer.parseInt(str); 249 props.remove("unblock_sending"); 250 } 251 252 if (props.size() > 0) 253 { 254 System.err.println("UDP.setProperties(): the following properties are not recognized:"); 255 props.list(System.out); 256 return false; 257 } 258 return true; 259 } 260 261 public IpAddress getTokenReceiverAddress() 262 { 263 return node != null? node.getTokenReceiverAddress() : null; 264 } 265 266 public Vector providedUpServices() 267 { 268 Vector retval = new Vector(); 269 retval.addElement(new Integer (Event.GET_DIGEST)); 270 retval.addElement(new Integer (Event.GET_DIGEST_STATE)); 271 retval.addElement(new Integer (Event.SET_DIGEST)); 272 return retval; 273 } 274 275 public boolean handleUpEvent(Event evt) 276 { 277 Message msg; 278 Header h; 279 switch (evt.getType()) 280 { 281 282 case Event.SET_LOCAL_ADDRESS: 283 localAddress = (Address) evt.getArg(); 284 node = new UdpRingNode(this, localAddress); 285 flowControl = new RingNodeFlowControl(); 286 break; 287 288 case Event.SUSPECT: 289 Address suspect = (Address) evt.getArg(); 290 onSuspectMessage(suspect); 291 break; 292 293 case Event.MSG: 294 msg = (Message) evt.getArg(); 295 h = msg.getHeader(getName()); 296 if (h instanceof TotalTokenHeader) 297 { 298 messageArrived(msg); 299 return false; 300 } 301 else if (h instanceof RingTokenHeader) 302 { 303 if(node != null) { 304 Object tmp=msg.getObject(); 305 node.tokenArrived(tmp); 306 } 307 return false; 308 } 309 } 310 return true; 311 } 312 313 public boolean handleDownEvent(Event evt) 314 { 315 switch (evt.getType()) 316 { 317 case Event.GET_DIGEST: 318 case Event.GET_DIGEST_STATE: 319 320 Digest d = new Digest(members.size()); 321 Address sender = null; 322 for (int j = 0; j < members.size(); j++) 324 { 325 sender = (Address) members.elementAt(j); 326 d.add(sender, highestSeenSeq, highestSeenSeq); 327 } 328 passUp(new Event(Event.GET_DIGEST_OK, d)); 329 return false; 330 case Event.SET_DIGEST: 331 Digest receivedDigest = (Digest) evt.getArg(); 332 myAru = receivedDigest.highSeqnoAt(0); 333 return false; 334 335 case Event.VIEW_CHANGE: 336 onViewChange(); 337 return true; 338 339 366 367 case Event.MSG: 368 Message msg = (Message) evt.getArg(); 369 if (msg == null) return false; 371 if (msg.getDest() == null || msg.getDest().isMulticastAddress()) 372 { 373 newMessagesQueue.add(msg); 374 return false; 375 } 376 } 377 return true; 378 } 379 380 private void onViewChange() 381 { 382 isRecoveryLeader = false; 383 384 if (suspects != null) 385 { 386 suspects.clear(); 387 suspects = null; 388 } 389 if (liveMembersInRecovery != null) 390 { 391 liveMembersInRecovery.clear(); 392 liveMembersInRecovery = null; 393 } 394 } 395 396 private void onSuspectMessage(Address suspect) 397 { 398 state = RECOVERY_STATE; 399 if (suspects == null || suspects.size() == 0) 400 { 401 suspects = Collections.synchronizedSortedSet(new TreeSet()); 402 liveMembersInRecovery = Collections.synchronizedSortedSet(new TreeSet(members)); 403 } 404 suspects.add(suspect); 405 liveMembersInRecovery.removeAll(suspects); 406 isRecoveryLeader = isRecoveryLeader(liveMembersInRecovery); 407 } 408 409 414 private boolean isRecoveryLeader(SortedSet liveMembers) 415 { 416 boolean recoveryLeader = false; 417 if (liveMembers.size() > 0) 418 { 419 recoveryLeader = localAddress.equals(liveMembers.first()); 420 } 421 422 { 423 if(log.isInfoEnabled()) log.info("live memebers are " + liveMembers); 424 if(log.isInfoEnabled()) log.info("I am recovery leader?" + recoveryLeader); 425 } 426 return recoveryLeader; 427 428 } 429 430 public long getAllReceivedUpTo() 431 { 432 return myAru; 433 } 434 435 public void installTransitionalView(Vector members) 436 { 437 if(node != null) 438 node.reconfigure(members); 439 } 440 441 484 private void recover() 485 { 486 487 if (isRecoveryLeader && state == RECOVERY_STATE) 488 { 489 490 { 491 if(log.isInfoEnabled()) log.info("I am starting recovery now"); 492 } 493 494 Vector m = new Vector(liveMembersInRecovery); 495 496 RspList list=callRemoteMethods(m, "getAllReceivedUpTo", new Object []{}, new Class []{}, GroupRequest.GET_ALL, 0); 497 Vector myAllReceivedUpTos = list.getResults(); 499 500 callRemoteMethods(m, "getAllReceivedUpTo", new Object []{}, new Class []{}, GroupRequest.GET_ALL, 0); 501 Vector myAllReceivedUpTosConfirm = list.getResults(); 503 504 505 while (!myAllReceivedUpTos.equals(myAllReceivedUpTosConfirm)) 506 { 507 myAllReceivedUpTos = myAllReceivedUpTosConfirm; 508 callRemoteMethods(m, "getAllReceivedUpTo", new Object []{}, new Class []{}, GroupRequest.GET_ALL, 0); 509 myAllReceivedUpTosConfirm = list.getResults(); 511 512 { 513 if(log.isInfoEnabled()) log.info("myAllReceivedUpto values are" + myAllReceivedUpTos); 514 if(log.isInfoEnabled()) log.info("myAllReceivedUpto confirm values are " + myAllReceivedUpTosConfirm); 515 } 516 } 517 518 519 { 520 if(log.isInfoEnabled()) log.info("myAllReceivedUpto stabilized values are" + myAllReceivedUpTos); 521 if(log.isInfoEnabled()) log.info("installing transitional view to repair the ring..."); 522 } 523 524 callRemoteMethods(m, "installTransitionalView", new Object []{m}, new String []{Vector.class.getName()}, 525 GroupRequest.GET_ALL, 0); 526 528 Vector xmits = prepareRecoveryRetransmissionList(myAllReceivedUpTos); 529 RingToken injectToken = null; 530 if (xmits.size() > 1) 531 { 532 533 { 534 if(log.isInfoEnabled()) log.info("VS not satisfied, injecting recovery token..."); 535 } 536 long aru = ((Long ) xmits.firstElement()).longValue(); 537 long highest = ((Long ) xmits.lastElement()).longValue(); 538 539 injectToken = new RingToken(RingToken.RECOVERY); 540 injectToken.setHighestSequence(highest); 541 injectToken.setAllReceivedUpto(aru); 542 543 544 Collection rtr = injectToken.getRetransmissionRequests(); 545 rtr.addAll(xmits); 546 } 547 else 548 { 549 550 { 551 if(log.isInfoEnabled()) log.info("VS satisfied, injecting operational token..."); 552 } 553 injectToken = new RingToken(); 554 long sequence = ((Long ) xmits.firstElement()).longValue(); 555 injectToken.setHighestSequence(sequence); 556 injectToken.setAllReceivedUpto(sequence); 557 } 558 if(node != null) 559 node.passToken(injectToken); 560 tokenRetransmitter.resetTimeout(); 561 } 562 } 563 564 572 private Vector prepareRecoveryRetransmissionList(Vector sequences) 573 { 574 Collections.sort(sequences); 575 Long first = (Long ) sequences.firstElement(); 576 Long last = (Long ) sequences.lastElement(); 577 578 579 Vector retransmissions = new Vector(); 580 if (first.equals(last)) 581 { 582 retransmissions.add(new Long (first.longValue())); 583 } 584 else 585 { 586 for (long j = first.longValue() + 1; j <= last.longValue(); j++) 587 { 588 retransmissions.add(new Long (j)); 589 } 590 } 591 return retransmissions; 592 } 593 594 595 protected void updateView(View newMembers) 596 { 597 super.updateView(newMembers); 598 Vector newViewMembers = newMembers.getMembers(); 599 flowControl.viewChanged(newViewMembers.size()); 600 if(node != null) 601 node.reconfigure(newViewMembers); 602 boolean isCoordinator = localAddress.equals(newViewMembers.firstElement()); 603 int memberSize = newViewMembers.size(); 604 605 if (memberSize == 1 && isCoordinator && !tokenCirculating) 606 { 607 tokenCirculating = true; 609 RingToken token = new RingToken(); 610 if(node != null) 611 node.passToken(token); 612 tokenRetransmitter.resetTimeout(); 613 } 614 sleepTime = (20/memberSize); 615 } 616 617 618 629 private void messageArrived(Message m) 630 { 631 TotalTokenHeader h = (TotalTokenHeader) m.getHeader(getName()); 632 long seq = h.getSeq(); 633 634 635 synchronized (mutex) 636 { 637 if ((myAru + 1) <= seq) 638 { 639 if (seq > highestSeenSeq) 640 { 641 highestSeenSeq = seq; 642 } 643 644 receivedMessagesQueue.put(new Long (seq), m); 645 if ((myAru + 1) == seq) 646 { 647 myAru = seq; 648 passUp(new Event(Event.MSG, m)); 649 } 650 if (isReceiveQueueHolePlugged()) 651 { 652 myAru = deliverMissingMessages(); 653 } 654 } 655 } 656 } 657 658 664 private boolean isReceiveQueueHolePlugged() 665 { 666 return ((myAru < highestSeenSeq) && receivedMessagesQueue.containsKey(new Long (myAru + 1))); 667 } 668 669 670 676 private long deliverMissingMessages() 677 { 678 Map.Entry entry = null; 679 boolean inOrder = true; 680 long lastDelivered = myAru; 681 Set deliverySet = receivedMessagesQueue.tailMap(new Long (myAru + 1)).entrySet(); 682 683 684 { 685 if(log.isInfoEnabled()) log.info("hole getting plugged, prior muAru " + myAru); 686 } 687 688 689 for (Iterator iterator = deliverySet.iterator();inOrder && iterator.hasNext();) 690 { 691 entry = (Map.Entry) iterator.next(); 692 long nextInQueue = ((Long ) entry.getKey()).longValue(); 693 if (lastDelivered + 1 == nextInQueue) 694 { 695 Message m = (Message) entry.getValue(); 696 passUp(new Event(Event.MSG, m)); 697 lastDelivered++; 698 } 699 else 700 { 701 inOrder = false; 702 } 703 } 704 705 706 { 707 if(log.isInfoEnabled()) log.info("hole getting plugged, post muAru " + lastDelivered); 708 } 709 return lastDelivered; 710 } 711 712 721 private void updateTokenRtR(RingToken token) 722 { 723 long holeLowerBound = 0; 724 long holeUpperBound = 0; 725 Long missingSequence = null; 726 Collection retransmissionList = null; 727 728 729 if (myAru < token.getHighestSequence()) 731 { 732 retransmissionList = token.getRetransmissionRequests(); 733 Set received = receivedMessagesQueue.tailMap(new Long (myAru + 1)).keySet(); 734 Iterator nonMissing = received.iterator(); 735 holeLowerBound = myAru; 736 737 738 if(log.isDebugEnabled()) log.debug("retransmission request prior" + retransmissionList); 739 740 while (nonMissing.hasNext()) 741 { 742 Long seq = (Long ) nonMissing.next(); 743 holeUpperBound = seq.longValue(); 744 745 while (holeLowerBound < holeUpperBound) 746 { 747 missingSequence = new Long (++holeLowerBound); 748 retransmissionList.add(missingSequence); 749 } 750 holeLowerBound = holeUpperBound; 751 } 752 753 holeUpperBound = token.getHighestSequence(); 754 while (holeLowerBound < holeUpperBound) 755 { 756 missingSequence = new Long (++holeLowerBound); 757 retransmissionList.add(missingSequence); 758 } 759 760 761 if(log.isDebugEnabled()) log.debug("retransmission request after" + retransmissionList); 762 } 763 } 764 765 766 781 private int broadcastMessages(int allowedCount, RingToken token) 782 { 783 List sendList = null; 784 synchronized (newMessagesQueue) 785 { 786 int queueSize = newMessagesQueue.size(); 787 788 if (queueSize <= 0) 789 { 790 return 0; 791 } 792 793 else if (queueSize > allowedCount) 794 { 795 sendList = new ArrayList(newMessagesQueue.subList(0, allowedCount)); 796 newMessagesQueue.removeAll(sendList); 797 } 798 799 else 800 { 801 sendList = new ArrayList(); 802 sendList.addAll(newMessagesQueue); 803 newMessagesQueue.clear(); 804 } 805 } 806 807 long tokenSeq = token.getHighestSequence(); 808 809 for (Iterator iterator = sendList.iterator(); iterator.hasNext();) 810 { 811 Message m = (Message) iterator.next(); 812 m.setSrc(localAddress); 813 m.setDest(null); m.putHeader(getName(), new TotalTokenHeader(++tokenSeq)); 815 receivedMessagesQueue.put(new Long (tokenSeq), m); 816 passDown(new Event(Event.MSG, m)); 817 } 818 819 if (token.getHighestSequence() == token.getAllReceivedUpto()) 820 { 821 token.setAllReceivedUpto(tokenSeq); 822 } 823 token.setHighestSequence(tokenSeq); 824 return sendList.size(); 825 } 826 827 828 845 private void tokenReceived(RingToken token) 846 { 847 848 849 { 850 if(log.isInfoEnabled()) log.info(token.toString()); 851 if(log.isDebugEnabled()) log.debug(getState()); 852 } 853 854 855 flowControl.setBacklog(newMessagesQueue.size()); 856 flowControl.updateWindow(token); 857 858 859 blockSenderIfRequired(); 860 unBlockSenderIfAcceptable(); 861 862 863 long tokensAru = 0; 864 int broadcastCount = 0; 865 int rebroadcastCount = 0; 866 synchronized (mutex) 867 { 868 if (!tokenSeen) 869 { 870 long lastRoundAru = token.getHighestSequence() - token.getLastRoundBroadcastCount(); 871 if (myAru < token.getAllReceivedUpto()) 872 { 873 myAru = lastRoundAru; 874 } 875 tokenSeen = true; 877 } 878 879 if (token.getType() == RingToken.RECOVERY) 880 { 881 highestSeenSeq = token.getHighestSequence(); 882 if (highestSeenSeq == myAru) 883 { 884 if(log.isInfoEnabled()) log.info("member node recovered"); 885 token.addRecoveredMember(localAddress); 886 } 887 } 888 889 updateTokenRtR(token); 890 891 int allowedToBroadcast = flowControl.getAllowedToBroadcast(token); 892 rebroadcastCount = rebroadcastMessages(token); 893 allowedToBroadcast -= rebroadcastCount; 894 895 896 { 897 if(log.isInfoEnabled()) log.info("myAllReceivedUpto" + myAru); 898 if(log.isInfoEnabled()) log.info("allowedToBroadcast" + allowedToBroadcast); 899 if(log.isInfoEnabled()) log.info("newMessagesQueue.size()" + newMessagesQueue.size()); 900 } 901 902 tokensAru = token.getAllReceivedUpto(); 903 904 if (myAru < tokensAru || 905 localAddress.equals(token.getAruId()) || 906 token.getAruId() == null) 907 { 908 token.setAllReceivedUpto(myAru); 909 if (token.getAllReceivedUpto() == token.getHighestSequence()) 910 { 911 token.setAruId(null); 912 } 913 else 914 { 915 token.setAruId(localAddress); 916 } 917 } 918 if (allowedToBroadcast > 0 && token.getType() == RingToken.OPERATIONAL) 919 { 920 broadcastCount = broadcastMessages(allowedToBroadcast, token); 921 } 922 923 if (tokensAru > lastRoundTokensAru) 924 { 925 removeStableMessages(receivedMessagesQueue, lastRoundTokensAru); 926 } 927 928 } 930 Util.sleep(sleepTime); 932 933 token.incrementTokenSequence(); 934 token.addLastRoundBroadcastCount(broadcastCount - lastRoundTransmitCount); 935 token.addBacklog(flowControl.getBacklogDifference()); 936 flowControl.setPreviousBacklog(); 937 lastRoundTransmitCount = broadcastCount; 938 lastRoundRebroadcastCount = rebroadcastCount; 939 lastRoundTokensAru = tokensAru; 940 } 941 942 949 private int rebroadcastMessages(RingToken token) 950 { 951 int rebroadCastCount = 0; 952 Collection rexmitRequests = token.getRetransmissionRequests(); 953 if (rexmitRequests.size() > 0) 954 { 955 Collection rbl = getRebroadcastList(rexmitRequests); 956 rebroadCastCount = rbl.size(); 957 if (rebroadCastCount > 0) 958 { 959 960 { 961 if(log.isInfoEnabled()) log.info("rebroadcasting " + rbl); 962 } 963 964 Long s = null; 965 for (Iterator iterator = rbl.iterator(); iterator.hasNext();) 966 { 967 s = (Long ) iterator.next(); 968 Message m = (Message) receivedMessagesQueue.get(s); 969 passDown(new Event(Event.MSG, m)); 970 } 971 } 972 } 973 return rebroadCastCount; 974 } 975 976 977 private void invalidateOnTokenloss() 978 { 979 lastRoundTransmitCount = 0; 980 flowControl.invalidate(); 981 } 982 983 991 private void blockSenderIfRequired() 992 { 993 if (!senderBlocked && flowControl.getBacklog() > blockSendingBacklogThreshold) 994 { 995 passUp(new Event(Event.BLOCK_SEND)); 996 senderBlocked = true; 997 } 998 } 999 1000 1008 private void unBlockSenderIfAcceptable() 1009 { 1010 if (senderBlocked && flowControl.getBacklog() < unblockSendingBacklogThreshold) 1011 { 1012 passUp(new Event(Event.UNBLOCK_SEND)); 1013 senderBlocked = false; 1014 } 1015 1016 } 1017 1018 1026 1027 private void removeStableMessages(TreeMap m, long upToSeq) 1028 { 1029 1030 if (m.size() > 0) 1031 { 1032 long first = ((Long ) m.firstKey()).longValue(); 1033 if (first > upToSeq) 1034 { 1035 upToSeq = first; 1036 } 1037 1038 1039 { 1040 if(log.isDebugEnabled()) log.debug("cutting queue first key " + m.firstKey() 1041 + " cut at " + upToSeq + " last key " + m.lastKey()); 1042 } 1043 SortedMap stable = m.headMap(new Long (upToSeq)); 1044 stable.clear(); 1045 } 1046 } 1047 1048 1053 private Collection getRebroadcastList(Collection rtr) 1054 { 1055 ArrayList rebroadcastList = new ArrayList(rtr); 1056 rebroadcastList.retainAll(receivedMessagesQueue.keySet()); 1057 rtr.removeAll(rebroadcastList); 1058 Collections.sort(rebroadcastList); 1059 return rebroadcastList; 1060 } 1061 1062 1071 private class TokenTransmitter extends Thread 1072 { 1073 long rtt = 0; 1074 long timer; 1075 double srtt = 1000; final double a = 0.09; 1077 final int timeoutFactor = 10; 1078 volatile boolean running = false; 1079 1080 private TokenTransmitter() 1081 { 1082 super("TokenTransmitter"); 1083 resetTimeout(); 1084 running = true; 1085 } 1086 1087 private void shutDown() 1088 { 1089 running = false; 1090 } 1091 1092 private void recalculateTimeout() 1093 { 1094 long now = System.currentTimeMillis(); 1095 if (timer > 0) 1096 { 1097 rtt = now - timer; 1098 srtt = (1 - a) * srtt + a * rtt; 1099 } 1100 } 1101 1102 private double getTimeout() 1103 { 1104 return srtt * timeoutFactor; 1105 } 1106 1107 private void resetTimeout() 1108 { 1109 timer = System.currentTimeMillis(); 1110 } 1111 1112 private boolean isRecoveryCompleted(RingToken token) 1113 { 1114 if (liveMembersInRecovery.equals(token.getRecoveredMembers())) 1115 { 1116 return true; 1117 } 1118 return false; 1119 } 1120 1121 public void run() 1122 { 1123 while (running) 1124 { 1125 RingToken token = null; 1126 int timeout = 0; 1127 1128 if(node == null) { 1129 Util.sleep(500); 1131 continue; 1132 } 1133 1134 try 1135 { 1136 timeout = (int) getTimeout(); 1137 1138 if(log.isInfoEnabled()) log.info("timeout(ms)=" + timeout); 1139 1140 token = (RingToken) node.receiveToken(timeout); 1141 1142 if (token.getType() == RingToken.OPERATIONAL && 1143 state == RECOVERY_STATE) 1144 { 1145 state = OPERATIONAL_STATE; 1146 } 1147 1148 tokenReceived(token); 1149 recalculateTimeout(); 1150 1151 if (token.getType() == RingToken.RECOVERY && 1152 isRecoveryCompleted(token)) 1153 { 1154 1155 if(log.isInfoEnabled()) log.info("all members recovered, injecting operational token"); 1156 token.setType(RingToken.OPERATIONAL); 1157 } 1158 node.passToken(token); 1159 resetTimeout(); 1160 } 1161 catch (TokenLostException tle) 1162 { 1163 invalidateOnTokenloss(); 1164 state = RECOVERY_STATE; 1165 recover(); 1166 } 1167 } 1168 } 1169 } 1170} 1171 1172 | Popular Tags |