1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.blocks.GroupRequest; 9 import org.jgroups.stack.MessageProtocol; 10 import org.jgroups.util.ReusableThread; 11 import org.jgroups.util.RspList; 12 import org.jgroups.util.Util; 13 14 import java.io.Serializable ; 15 import java.util.HashMap ; 16 import java.util.Properties ; 17 18 19 40 41 public class FLOW_CONTROL extends MessageProtocol implements Runnable { 42 private int _numMSGsSentThisPeriod=0; 43 private static final String FLOW_CONTROL="FLOW_CONTROL"; 44 private final HashMap _rcvdMSGCounter=new HashMap (); 45 46 private int _windowSize=1000; 47 private int _fwdMarginSize=200; 48 private int _estimatedRTT=100000; 49 private boolean waitingForResponse=false; 50 private final ReusableThread _reusableThread; 51 private double RTT_WEIGHT=0.125; 52 private int _msgsSentAfterFCreq=0; 53 private final double TIME_OUT_FACTOR=0.25; private final double TIME_OUT_INCR_MULT=1.25; 55 private double WINDOW_SIZE_REDUCTION=0.75; 56 private double WINDOW_SIZE_EXPANSION=1.25; 57 private boolean isBlockState=false; 58 59 private int _windowsize_cap=1000000; 61 public FLOW_CONTROL() { 62 _reusableThread=new ReusableThread(FLOW_CONTROL); 63 } 64 65 public String getName() { 66 return FLOW_CONTROL; 67 } 68 69 78 public boolean handleDownEvent(Event evt) { 79 if(evt.getType() == Event.MSG) { 80 _numMSGsSentThisPeriod++; 81 if((_numMSGsSentThisPeriod > (_windowSize - _fwdMarginSize)) && !waitingForResponse) { 82 waitingForResponse=true; 83 _reusableThread.waitUntilDone(); 85 _reusableThread.assignTask(this); 86 } 87 if(waitingForResponse) { 88 _msgsSentAfterFCreq++; 89 if((_msgsSentAfterFCreq >= _fwdMarginSize) && !isBlockState) { 90 91 if(log.isInfoEnabled()) log.info("ACTION BLOCK"); 92 System.err.println("0;" + System.currentTimeMillis() + ';' + _windowSize); 93 passUp(new Event(Event.BLOCK_SEND)); 94 isBlockState=true; 95 } 96 } 97 } 98 return true; 99 } 100 101 106 public boolean handleUpEvent(Event evt) { 107 if(evt.getType() == Event.MSG) { 108 Message msg=(Message)evt.getArg(); 109 Address SRC=msg.getSrc(); 110 FCInfo fcForSrc=(FCInfo)_rcvdMSGCounter.get(src); 111 if(fcForSrc == null) { 112 fcForSrc=new FCInfo(); 113 _rcvdMSGCounter.put(src, fcForSrc); 114 } 115 fcForSrc.increment(1); 116 117 if(log.isInfoEnabled()) log.info("message (" + fcForSrc.getRcvdMSGCount() + ") received from " + src); 118 } 119 return true; 120 } 121 122 130 public Object handle(Message req) { 131 Address SRC=req.getSrc(); 132 Long resp=new Long (((FCInfo)_rcvdMSGCounter.get(src)).getRcvdMSGCount()); 133 134 if(log.isInfoEnabled()) log.info("Reqest came from " + src + " Prepared response " + resp); 135 return resp; 136 } 137 138 144 public void run() { 145 146 if(log.isInfoEnabled()) log.info("--- hit the _fwdMargin. Remaining size " + _fwdMarginSize); 147 reqFCInfo(); 148 } 149 150 166 public boolean setProperties(Properties props) { 167 String str=null; 168 String winsizekey="window_size"; 169 String fwdmrgnkey="fwd_mrgn"; 170 String rttweightkey="rttweight"; 171 String sizereductionkey="reduction"; 172 String sizeexpansionkey="expansion"; 173 String windowsizeCapKey="window_size_cap"; 174 175 super.setProperties(props); 176 str=props.getProperty(windowsizeCapKey); 177 if(str != null) { 178 _windowsize_cap=Integer.parseInt(str); 179 props.remove(windowsizeCapKey); 180 } 181 str=props.getProperty(winsizekey); 182 if(str != null) { 183 _windowSize=Integer.parseInt(str); 184 if(_windowSize > _windowsize_cap) 185 _windowSize=_windowsize_cap; 186 props.remove(winsizekey); 187 } 188 189 str=props.getProperty(fwdmrgnkey); 190 if(str != null) { 191 _fwdMarginSize=Integer.parseInt(str); 192 props.remove(fwdmrgnkey); 193 } 194 195 str=props.getProperty(rttweightkey); 196 if(str != null) { 197 RTT_WEIGHT=Double.parseDouble(str); 198 props.remove(rttweightkey); 199 } 200 201 str=props.getProperty(sizereductionkey); 202 if(str != null) { 203 WINDOW_SIZE_REDUCTION=Double.parseDouble(str); 204 props.remove(sizereductionkey); 205 } 206 207 str=props.getProperty(sizeexpansionkey); 208 if(str != null) { 209 WINDOW_SIZE_EXPANSION=Double.parseDouble(str); 210 props.remove(sizeexpansionkey); 211 } 212 213 214 if(props.size() > 0) { 215 System.err.println("FLOW_CONTROL.setProperties(): the following properties are not recognized:"); 216 props.list(System.out); 217 return false; 218 } 219 return true; 220 221 } 222 223 224 225 private RspList reqFCInfo() { 226 RspList rspList=null; 227 long reqSentTime=0, rspRcvdTime=0; 228 try { 229 reqSentTime=System.currentTimeMillis(); 230 rspList=castMessage(null, new Message(null, null, Util.objectToByteBuffer(FLOW_CONTROL)), 233 GroupRequest.GET_ALL, 0); 234 rspRcvdTime=System.currentTimeMillis(); 235 } 236 catch(Exception ex) { 237 ex.printStackTrace(); 238 } 239 240 243 245 long currentRTT=rspRcvdTime - reqSentTime; 246 247 if(currentRTT > _estimatedRTT) { 248 _windowSize=(int)(_windowSize * WINDOW_SIZE_REDUCTION); 249 _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_REDUCTION); 250 } 251 else { 252 _windowSize=(int)(_windowSize * WINDOW_SIZE_EXPANSION); 253 if(_windowSize > _windowsize_cap) 254 _windowSize=_windowsize_cap; 255 _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_EXPANSION); 256 } 257 258 _estimatedRTT=(int)((RTT_WEIGHT * currentRTT) + (1.0 - RTT_WEIGHT) * _estimatedRTT); 259 260 _numMSGsSentThisPeriod=0; 262 waitingForResponse=false; 263 _msgsSentAfterFCreq=0; 264 265 if(isBlockState) { 266 267 if(log.isWarnEnabled()) log.warn("ACTION UNBLOCK"); 268 passUp(new Event(Event.UNBLOCK_SEND)); 269 System.err.println("1;" + System.currentTimeMillis() + ';' + _windowSize); 270 isBlockState=false; 271 } 272 273 274 { 275 if(log.isWarnEnabled()) log.warn("estimatedTimeout = " + _estimatedRTT); 276 if(log.isWarnEnabled()) log.warn("window size = " + _windowSize + " forward margin size = " + _fwdMarginSize); 277 } 278 279 return rspList; 280 } 281 282 283 284 private class FCInfo implements Serializable { 285 int _curValue; 286 287 public FCInfo() { 288 } 289 290 public void increment(int i) { 291 _curValue+=i; 292 } 293 294 public int getRcvdMSGCount() { 295 return _curValue; 296 } 297 298 public String toString() { 299 return Integer.toString(_curValue); 300 } 301 } 302 303 304 } 305 306 | Popular Tags |