KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > ring > RingNodeFlowControl


1 //$Id: RingNodeFlowControl.java,v 1.3 2004/09/23 16:29:40 belaban Exp $
2

3 package org.jgroups.protocols.ring;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7
8 public class RingNodeFlowControl
9 {
10    final int initialWindow;
11    final float windowReduceFactor;
12    final int belowThresholdAdvanceAmount;
13    final float aboveThresholdAdvanceAmount;
14    private int memberCount;
15    private int previousBacklog;
16    private int backlog;
17     protected final Log log=LogFactory.getLog(this.getClass());
18
19    public RingNodeFlowControl(int initialWindow,
20                               float windowReduceFactor,
21                               int belowThresholdAdvanceAmount,
22                               float aboveThresholdAdvanceAmount)
23    {
24       this.initialWindow = initialWindow;
25       this.windowReduceFactor = windowReduceFactor;
26       this.belowThresholdAdvanceAmount = belowThresholdAdvanceAmount;
27       this.aboveThresholdAdvanceAmount = aboveThresholdAdvanceAmount;
28    }
29
30    public RingNodeFlowControl()
31    {
32       this(20, 0.7F, 3, 1.0F);
33    }
34
35    public void invalidate()
36    {
37       previousBacklog = backlog = 0;
38    }
39
40    public int getBacklog()
41    {
42       return backlog;
43    }
44
45    public void setBacklog(int backlog)
46    {
47       if(backlog <0)
48       throw new IllegalArgumentException JavaDoc("backlog value has to be positive");
49       this.backlog = backlog;
50    }
51
52    public int getBacklogDifference()
53    {
54       return backlog - previousBacklog;
55    }
56
57    public int getPreviousBacklog()
58    {
59       return previousBacklog;
60    }
61
62    public void setPreviousBacklog()
63    {
64       this.previousBacklog = backlog;
65    }
66
67    public void viewChanged(int memberCount)
68    {
69       this.memberCount = memberCount;
70    }
71
72    public int getAllowedToBroadcast(RingToken token)
73    {
74       int fairWindowShare = 0;
75       int windowSize = token.getWindowSize();
76       if (memberCount == 0) memberCount = 1;
77       int maxMessages = (windowSize / memberCount);
78       if (maxMessages < 1)
79          maxMessages = 1;
80
81       int backlogAverage = token.getBacklog() + backlog - previousBacklog;
82       if (backlogAverage > 0)
83       {
84          fairWindowShare = windowSize * backlog / backlogAverage;
85       }
86       fairWindowShare = (fairWindowShare < 1)?1: fairWindowShare;
87
88
89       int maxAllowed = windowSize - token.getLastRoundBroadcastCount();
90       if (maxAllowed < 1)
91          maxAllowed = 0;
92
93
94          if(log.isInfoEnabled()) log.info("fairWindowShare=" + fairWindowShare + " maxMessages="
95                     + maxMessages + " maxAllowed=" + maxAllowed);
96
97       return (fairWindowShare < maxAllowed)?Math.min(fairWindowShare, maxMessages):Math.min(maxAllowed, maxMessages);
98    }
99
100    public void updateWindow(RingToken token)
101    {
102       int threshold = token.getWindowThreshold();
103       int window = token.getWindowSize();
104       if (window < initialWindow)
105       {
106          window = initialWindow;
107       }
108
109       boolean congested = (token.getRetransmissionRequests().size() > 0)?true:false;
110
111       if (congested)
112       {
113          threshold = (int) (window * windowReduceFactor);
114          window = initialWindow;
115       }
116       else
117       {
118          if (window < threshold)
119          {
120             window += belowThresholdAdvanceAmount;
121          }
122          else
123          {
124             window += aboveThresholdAdvanceAmount;
125          }
126       }
127       token.setWindowSize(window);
128       token.setWindowThreshold(threshold);
129    }
130
131 }
132
Popular Tags