KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FLOW_CONTROL


1 // $Id: FLOW_CONTROL.java,v 1.6 2004/09/23 16:29:41 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.blocks.GroupRequest;
9 import org.jgroups.stack.MessageProtocol;
10 import org.jgroups.util.ReusableThread;
11 import org.jgroups.util.RspList;
12 import org.jgroups.util.Util;
13
14 import java.io.Serializable JavaDoc;
15 import java.util.HashMap JavaDoc;
16 import java.util.Properties JavaDoc;
17
18
19 /**
20  * FLOW_CONTROL provides end-end congestion control and flow control.
21  * Attempts to maximize through put, by minimizing the
22  * possible block times(Forward flow control). Initially, sender starts with a smaller
23  * window size <code> W</code> and large expected RTT <code>grpRTT</code>. Sender also
24  * keeps a margin in the window size. When the margin is hit, insted of waiting for the
25  * window size to be exhausted, sender multicasts a FLOW_CONTROL info request message.
26  * If the window size is exhausted before the responses are received, send will be blocked.
27  * FCInfo(flow control info) from all the receivers is gathered at the sender, and current RTT
28  * is computed. If the current RTT is greater than estimated RTT window size and margin are reduced,
29  * otherwise they are increased.
30  * <br>
31  * Horizontal interaction is initiated by the sender with the other group members.
32  * <br>
33  * <em>Note: A reliable transport layer is required for this protocol to function properly.</em>
34  * With little effort this can be made completely independent.<br/>
35  * todo: handle view changes (e.g. members {A,B,C}, blocked on C, and C crashes --> unblock
36  * todo: block on down() instead of sending BLOCK_SEND
37  *
38  * @author Ananda Bollu
39  */

40
41 public class FLOW_CONTROL extends MessageProtocol implements Runnable JavaDoc {
42     private int _numMSGsSentThisPeriod=0;
43     private static final String JavaDoc FLOW_CONTROL="FLOW_CONTROL";
44     private final HashMap JavaDoc _rcvdMSGCounter=new HashMap JavaDoc();
45
46     private int _windowSize=1000;
47     private int _fwdMarginSize=200;
48     private int _estimatedRTT=100000;
49     private boolean waitingForResponse=false;
50     private final ReusableThread _reusableThread;
51     private double RTT_WEIGHT=0.125;
52     private int _msgsSentAfterFCreq=0;
53     private final double TIME_OUT_FACTOR=0.25;//if resp not received from more than n*TIME_OUT_INCREMENT_FACTOR
54
private final double TIME_OUT_INCR_MULT=1.25;
55     private double WINDOW_SIZE_REDUCTION=0.75;
56     private double WINDOW_SIZE_EXPANSION=1.25;
57     private boolean isBlockState=false;
58
59     private int _windowsize_cap=1000000; //initial window size can not be more than 10^6 messages.
60

61     public FLOW_CONTROL() {
62         _reusableThread=new ReusableThread(FLOW_CONTROL);
63     }
64
65     public String JavaDoc getName() {
66         return FLOW_CONTROL;
67     }
68
69     /**
70      * If Event.MSG type is received count is incremented by one,
71      * and message is passed to the down_prot. At some point,
72      * based on the algorithm(FLOW_CONTROL protocol definition)
73      * data collection sequence is started. This is done by each
74      * member in SENDER role when _numMSGsSentThisPeriod hits the margin.
75      * Before rsp arrives only _fwdMarginSize number of messages can be sent,
76      * and then sender will be blocked.
77      */

78     public boolean handleDownEvent(Event evt) {
79         if(evt.getType() == Event.MSG) {
80             _numMSGsSentThisPeriod++;
81             if((_numMSGsSentThisPeriod > (_windowSize - _fwdMarginSize)) && !waitingForResponse) {
82                 waitingForResponse=true;
83                 //wait for the previous request to return.before assigning a new task.
84
_reusableThread.waitUntilDone();
85                 _reusableThread.assignTask(this);
86             }
87             if(waitingForResponse) {
88                 _msgsSentAfterFCreq++;
89                 if((_msgsSentAfterFCreq >= _fwdMarginSize) && !isBlockState) {
90
91                     if(log.isInfoEnabled()) log.info("ACTION BLOCK");
92                     System.err.println("0;" + System.currentTimeMillis() + ';' + _windowSize);
93                     passUp(new Event(Event.BLOCK_SEND));
94                     isBlockState=true;
95                 }
96             }
97         }
98         return true;
99     }
100
101     /**
102      * If Event.MSG type is received message, number of received
103      * messages from the sender is incremented. And the message is
104      * passed up the stack.
105      */

106     public boolean handleUpEvent(Event evt) {
107         if(evt.getType() == Event.MSG) {
108             Message msg=(Message)evt.getArg();
109             Address SRC=msg.getSrc();
110             FCInfo fcForSrc=(FCInfo)_rcvdMSGCounter.get(src);
111             if(fcForSrc == null) {
112                 fcForSrc=new FCInfo();
113                 _rcvdMSGCounter.put(src, fcForSrc);
114             }
115             fcForSrc.increment(1);
116
117             if(log.isInfoEnabled()) log.info("message (" + fcForSrc.getRcvdMSGCount() + ") received from " + src);
118         }
119         return true;
120     }
121
122     /**
123      * Called when a request for this protocol layer is received.
124      * Processes and return value is sent back in the reply.
125      * FLOW_CONTROL protocol of all members gets this message(including sender?)
126      *
127      * @return Object containing FC information for sender with senderID.
128      * <b>Callback</b>. Called when a request for this protocol layer is received.
129      */

130     public Object JavaDoc handle(Message req) {
131         Address SRC=req.getSrc();
132         Long JavaDoc resp=new Long JavaDoc(((FCInfo)_rcvdMSGCounter.get(src)).getRcvdMSGCount());
133
134         if(log.isInfoEnabled()) log.info("Reqest came from " + src + " Prepared response " + resp);
135         return resp;
136     }
137
138     /**
139      * FCInfo request must be submitted in a different thread.
140      * handleDownEvent() can still be called to send messages
141      * while waiting for FCInfo from receivers. usually takes
142      * RTT.
143      */

144     public void run() {
145
146         if(log.isInfoEnabled()) log.info("--- hit the _fwdMargin. Remaining size " + _fwdMarginSize);
147         reqFCInfo();
148     }
149
150     /**
151      * Following parameters can be optionally supplied:
152      * <ul>
153      * <li>window size cap - <code>int</code> Limits the window size to a reasonable value.
154      * <li>window size - <code>int</code> these many number of messages are sent before a block could happen
155      * <li>forward margin -<code>int</code> a request for flow control information is sent when remaining window size hits this margin
156      * <li>RTT weight -<code>double</code> Max RTT in the group is calculated during each Flow control request. lower number assigns
157      * higher weight to current RTT in estimating RTT.
158      * <li>window size reduction factor -<code>double</code> When current RTT is greater than estimated RTT current window size
159      * is reduced by this multiple.
160      * <li>window size expansion factor -<code>double</code> When current RTT is less than estimated RTT window is incremented
161      * by this multiple.
162      * </ul>
163      *
164      * @see org.jgroups.stack.Protocol#setProperties(Properties)
165      */

166     public boolean setProperties(Properties JavaDoc props) {
167         String JavaDoc str=null;
168         String JavaDoc winsizekey="window_size";
169         String JavaDoc fwdmrgnkey="fwd_mrgn";
170         String JavaDoc rttweightkey="rttweight";
171         String JavaDoc sizereductionkey="reduction";
172         String JavaDoc sizeexpansionkey="expansion";
173         String JavaDoc windowsizeCapKey="window_size_cap";
174
175         super.setProperties(props);
176         str=props.getProperty(windowsizeCapKey);
177         if(str != null) {
178             _windowsize_cap=Integer.parseInt(str);
179             props.remove(windowsizeCapKey);
180         }
181         str=props.getProperty(winsizekey);
182         if(str != null) {
183             _windowSize=Integer.parseInt(str);
184             if(_windowSize > _windowsize_cap)
185                 _windowSize=_windowsize_cap;
186             props.remove(winsizekey);
187         }
188
189         str=props.getProperty(fwdmrgnkey);
190         if(str != null) {
191             _fwdMarginSize=Integer.parseInt(str);
192             props.remove(fwdmrgnkey);
193         }
194
195         str=props.getProperty(rttweightkey);
196         if(str != null) {
197             RTT_WEIGHT=Double.parseDouble(str);
198             props.remove(rttweightkey);
199         }
200
201         str=props.getProperty(sizereductionkey);
202         if(str != null) {
203             WINDOW_SIZE_REDUCTION=Double.parseDouble(str);
204             props.remove(sizereductionkey);
205         }
206
207         str=props.getProperty(sizeexpansionkey);
208         if(str != null) {
209             WINDOW_SIZE_EXPANSION=Double.parseDouble(str);
210             props.remove(sizeexpansionkey);
211         }
212
213
214         if(props.size() > 0) {
215             System.err.println("FLOW_CONTROL.setProperties(): the following properties are not recognized:");
216             props.list(System.out);
217             return false;
218         }
219         return true;
220
221     }
222
223     /*-----------private stuff ------*/
224
225     private RspList reqFCInfo() {
226         RspList rspList=null;
227         long reqSentTime=0, rspRcvdTime=0;
228         try {
229             reqSentTime=System.currentTimeMillis();
230             //alternatively use _estimatedRTT for timeout.(timeout is the right way, but need to
231
//check the use cases.
232
rspList=castMessage(null, new Message(null, null, Util.objectToByteBuffer(FLOW_CONTROL)),
233                     GroupRequest.GET_ALL, 0);
234             rspRcvdTime=System.currentTimeMillis();
235         }
236         catch(Exception JavaDoc ex) {
237             ex.printStackTrace();
238         }
239
240         /*If NAKACK layer is present, if n+1 th message is FLOW_CONTROL Request, if responses are received
241           that means all n messages sent earlier are received(?), ignore NAK_ACK.
242         */

243         //ANALYSE RESPONSES
244

245         long currentRTT=rspRcvdTime - reqSentTime;
246
247         if(currentRTT > _estimatedRTT) {
248             _windowSize=(int)(_windowSize * WINDOW_SIZE_REDUCTION);
249             _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_REDUCTION);
250         }
251         else {
252             _windowSize=(int)(_windowSize * WINDOW_SIZE_EXPANSION);
253             if(_windowSize > _windowsize_cap)
254                 _windowSize=_windowsize_cap;
255             _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_EXPANSION);
256         }
257
258         _estimatedRTT=(int)((RTT_WEIGHT * currentRTT) + (1.0 - RTT_WEIGHT) * _estimatedRTT);
259
260         //reset for new FLOW_CONTROL request period.
261
_numMSGsSentThisPeriod=0;
262         waitingForResponse=false;
263         _msgsSentAfterFCreq=0;
264
265         if(isBlockState) {
266
267             if(log.isWarnEnabled()) log.warn("ACTION UNBLOCK");
268             passUp(new Event(Event.UNBLOCK_SEND));
269             System.err.println("1;" + System.currentTimeMillis() + ';' + _windowSize);
270             isBlockState=false;
271         }
272
273
274         {
275             if(log.isWarnEnabled()) log.warn("estimatedTimeout = " + _estimatedRTT);
276             if(log.isWarnEnabled()) log.warn("window size = " + _windowSize + " forward margin size = " + _fwdMarginSize);
277         }
278
279         return rspList;
280     }
281
282
283     /* use this instead of Integer. */
284     private class FCInfo implements Serializable JavaDoc {
285         int _curValue;
286
287         public FCInfo() {
288         }
289
290         public void increment(int i) {
291             _curValue+=i;
292         }
293
294         public int getRcvdMSGCount() {
295             return _curValue;
296         }
297
298         public String JavaDoc toString() {
299             return Integer.toString(_curValue);
300         }
301     }
302
303
304 }
305
306
Popular Tags