1 3 package org.jgroups.protocols.ring; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 8 public class RingNodeFlowControl 9 { 10 final int initialWindow; 11 final float windowReduceFactor; 12 final int belowThresholdAdvanceAmount; 13 final float aboveThresholdAdvanceAmount; 14 private int memberCount; 15 private int previousBacklog; 16 private int backlog; 17 protected final Log log=LogFactory.getLog(this.getClass()); 18 19 public RingNodeFlowControl(int initialWindow, 20 float windowReduceFactor, 21 int belowThresholdAdvanceAmount, 22 float aboveThresholdAdvanceAmount) 23 { 24 this.initialWindow = initialWindow; 25 this.windowReduceFactor = windowReduceFactor; 26 this.belowThresholdAdvanceAmount = belowThresholdAdvanceAmount; 27 this.aboveThresholdAdvanceAmount = aboveThresholdAdvanceAmount; 28 } 29 30 public RingNodeFlowControl() 31 { 32 this(20, 0.7F, 3, 1.0F); 33 } 34 35 public void invalidate() 36 { 37 previousBacklog = backlog = 0; 38 } 39 40 public int getBacklog() 41 { 42 return backlog; 43 } 44 45 public void setBacklog(int backlog) 46 { 47 if(backlog <0) 48 throw new IllegalArgumentException ("backlog value has to be positive"); 49 this.backlog = backlog; 50 } 51 52 public int getBacklogDifference() 53 { 54 return backlog - previousBacklog; 55 } 56 57 public int getPreviousBacklog() 58 { 59 return previousBacklog; 60 } 61 62 public void setPreviousBacklog() 63 { 64 this.previousBacklog = backlog; 65 } 66 67 public void viewChanged(int memberCount) 68 { 69 this.memberCount = memberCount; 70 } 71 72 public int getAllowedToBroadcast(RingToken token) 73 { 74 int fairWindowShare = 0; 75 int windowSize = token.getWindowSize(); 76 if (memberCount == 0) memberCount = 1; 77 int maxMessages = (windowSize / memberCount); 78 if (maxMessages < 1) 79 maxMessages = 1; 80 81 int backlogAverage = token.getBacklog() + backlog - previousBacklog; 82 if (backlogAverage > 0) 83 { 84 fairWindowShare = windowSize * backlog / backlogAverage; 85 } 86 fairWindowShare = (fairWindowShare < 1)?1: fairWindowShare; 87 88 89 int maxAllowed = windowSize - token.getLastRoundBroadcastCount(); 90 if (maxAllowed < 1) 91 maxAllowed = 0; 92 93 94 if(log.isInfoEnabled()) log.info("fairWindowShare=" + fairWindowShare + " maxMessages=" 95 + maxMessages + " maxAllowed=" + maxAllowed); 96 97 return (fairWindowShare < maxAllowed)?Math.min(fairWindowShare, maxMessages):Math.min(maxAllowed, maxMessages); 98 } 99 100 public void updateWindow(RingToken token) 101 { 102 int threshold = token.getWindowThreshold(); 103 int window = token.getWindowSize(); 104 if (window < initialWindow) 105 { 106 window = initialWindow; 107 } 108 109 boolean congested = (token.getRetransmissionRequests().size() > 0)?true:false; 110 111 if (congested) 112 { 113 threshold = (int) (window * windowReduceFactor); 114 window = initialWindow; 115 } 116 else 117 { 118 if (window < threshold) 119 { 120 window += belowThresholdAdvanceAmount; 121 } 122 else 123 { 124 window += aboveThresholdAdvanceAmount; 125 } 126 } 127 token.setWindowSize(window); 128 token.setWindowThreshold(threshold); 129 } 130 131 } 132 | Popular Tags |