KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > AckSenderWindow


1 // $Id: AckSenderWindow.java,v 1.10 2005/01/28 12:19:52 belaban Exp $
2

3 package org.jgroups.stack;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Address;
9 import org.jgroups.Event;
10 import org.jgroups.Message;
11 import org.jgroups.util.Queue;
12 import org.jgroups.util.Util;
13
14 import java.util.HashMap JavaDoc;
15
16
17 /**
18  * ACK-based sliding window for a sender. Messages are added to the window keyed by seqno
19  * When an ACK is received, the corresponding message is removed. The Retransmitter
20  * continously iterates over the entries in the hashmap, retransmitting messages based on their
21  * creation time and an (increasing) timeout. When there are no more messages in the retransmission
22  * table left, the thread terminates. It will be re-activated when a new entry is added to the
23  * retransmission table.
24  * @author Bela Ban
25  */

26 public class AckSenderWindow implements Retransmitter.RetransmitCommand {
27     RetransmitCommand retransmit_command = null; // called to request XMIT of msg
28
final HashMap JavaDoc msgs = new HashMap JavaDoc(); // keys: seqnos (Long), values: Messages
29
long[] interval = new long[]{400,800,1200,1600};
30     final Retransmitter retransmitter = new Retransmitter(null, this);
31     final Queue msg_queue = new Queue(); // for storing messages if msgs is full
32
int window_size = -1; // the max size of msgs, when exceeded messages will be queued
33

34     /** when queueing, after msgs size falls below this value, msgs are added again (queueing stops) */
35     int min_threshold = -1;
36     boolean use_sliding_window = false, queueing = false;
37     Protocol transport = null; // used to send messages
38
static final Log log=LogFactory.getLog(AckSenderWindow.class);
39
40
41     public interface RetransmitCommand {
42         void retransmit(long seqno, Message msg);
43     }
44
45
46     /**
47      * Creates a new instance. Thre retransmission thread has to be started separately with
48      * <code>start()</code>.
49      * @param com If not null, its method <code>retransmit()</code> will be called when a message
50      * needs to be retransmitted (called by the Retransmitter).
51      */

52     public AckSenderWindow(RetransmitCommand com) {
53         retransmit_command = com;
54         retransmitter.setRetransmitTimeouts(interval);
55     }
56
57
58     public AckSenderWindow(RetransmitCommand com, long[] interval) {
59         retransmit_command = com;
60         this.interval = interval;
61         retransmitter.setRetransmitTimeouts(interval);
62     }
63
64     /**
65      * This constructor whould be used when we want AckSenderWindow to send the message added
66      * by add(), rather then ourselves.
67      */

68     public AckSenderWindow(RetransmitCommand com, long[] interval, Protocol transport) {
69         retransmit_command = com;
70         this.interval = interval;
71         this.transport = transport;
72         retransmitter.setRetransmitTimeouts(interval);
73     }
74
75
76     public void setWindowSize(int window_size, int min_threshold) {
77         this.window_size = window_size;
78         this.min_threshold = min_threshold;
79
80         // sanity tests for the 2 values:
81
if (min_threshold > window_size) {
82             this.min_threshold = window_size;
83             this.window_size = min_threshold;
84             if(log.isWarnEnabled()) log.warn("min_threshold (" + min_threshold +
85                     ") has to be less than window_size ( " + window_size + "). Values are swapped");
86         }
87         if (this.window_size <= 0) {
88             this.window_size = this.min_threshold > 0 ? (int) (this.min_threshold * 1.5) : 1000;
89             if(log.isWarnEnabled()) log.warn("window_size is <= 0, setting it to " + this.window_size);
90         }
91         if (this.min_threshold <= 0) {
92             this.min_threshold = this.window_size > 0 ? (int) (this.window_size * 0.5) : 250;
93             if(log.isWarnEnabled()) log.warn("min_threshold is <= 0, setting it to " + this.min_threshold);
94         }
95
96         if(log.isTraceEnabled())
97             log.trace("window_size=" + this.window_size + ", min_threshold=" + this.min_threshold);
98         use_sliding_window = true;
99     }
100
101
102     public void reset() {
103         synchronized (msgs) {
104             msgs.clear();
105         }
106
107         // moved out of sync scope: Retransmitter.reset()/add()/remove() are sync'ed anyway
108
// Bela Jan 15 2003
109
retransmitter.reset();
110     }
111
112
113     /**
114      * Adds a new message to the retransmission table. If the message won't have received an ack within
115      * a certain time frame, the retransmission thread will retransmit the message to the receiver. If
116      * a sliding window protocol is used, we only add up to <code>window_size</code> messages. If the table is
117      * full, we add all new messages to a queue. Those will only be added once the table drains below a certain
118      * threshold (<code>min_threshold</code>)
119      */

120     public void add(long seqno, Message msg) {
121         Long JavaDoc tmp=new Long JavaDoc(seqno);
122
123         synchronized(msgs) {
124             if(msgs.containsKey(tmp))
125                 return;
126
127             if(!use_sliding_window) {
128                 addMessage(seqno, tmp, msg);
129             }
130             else { // we use a sliding window
131
if(queueing)
132                     addToQueue(seqno, msg);
133                 else {
134                     if(msgs.size() + 1 > window_size) {
135                         queueing=true;
136                         addToQueue(seqno, msg);
137                         if(log.isTraceEnabled())
138                             log.trace("window_size (" + window_size + ") was exceeded, " +
139                                     "starting to queue messages until window size falls under " + min_threshold);
140                     }
141                     else {
142                         addMessage(seqno, tmp, msg);
143                     }
144                 }
145             }
146         }
147     }
148
149
150     /**
151      * Removes the message from <code>msgs</code>, removing them also from retransmission. If
152      * sliding window protocol is used, and was queueing, check whether we can resume adding elements.
153      * Add all elements. If this goes above window_size, stop adding and back to queueing. Else
154      * set queueing to false.
155      */

156     public void ack(long seqno) {
157         Long JavaDoc tmp=new Long JavaDoc(seqno);
158         Entry entry;
159
160         synchronized(msgs) {
161             msgs.remove(tmp);
162             retransmitter.remove(seqno);
163
164             if(use_sliding_window && queueing) {
165                 if(msgs.size() < min_threshold) { // we fell below threshold, now we can resume adding msgs
166
if(log.isTraceEnabled())
167                         log.trace("number of messages in table fell under min_threshold (" +
168                                 min_threshold + "): adding " + msg_queue.size() + " messages on queue");
169
170                     while(msgs.size() < window_size) {
171                         if((entry=removeFromQueue()) != null)
172                             addMessage(entry.seqno, new Long JavaDoc(entry.seqno), entry.msg);
173                         else
174                             break;
175                     }
176
177                     if(msgs.size() + 1 > window_size) {
178                         if(log.isTraceEnabled())
179                             log.trace("exceeded window_size (" + window_size + ") again, will still queue");
180                         return; // still queueing
181
}
182                     else
183                         queueing=false; // allows add() to add messages again
184

185                     if(log.isTraceEnabled()) log.trace("set queueing to false (table size=" + msgs.size() + ')');
186                 }
187             }
188         }
189     }
190
191
192     public String JavaDoc toString() {
193         return msgs.keySet().toString() + " (retransmitter: " + retransmitter.toString() + ')';
194     }
195
196     /* -------------------------------- Retransmitter.RetransmitCommand interface ------------------- */
197     public void retransmit(long first_seqno, long last_seqno, Address sender) {
198         Message msg;
199
200         if(retransmit_command != null) {
201             //if(log.isTraceEnabled())
202
// log.trace("retransmitting messages " + first_seqno + " - " + last_seqno + " to " + sender);
203
for(long i = first_seqno; i <= last_seqno; i++) {
204                 if((msg = (Message) msgs.get(new Long JavaDoc(i))) != null) { // find the message to retransmit
205
retransmit_command.retransmit(i, msg);
206                 }
207             }
208         }
209     }
210     /* ----------------------------- End of Retransmitter.RetransmitCommand interface ---------------- */
211
212
213
214
215
216     /* ---------------------------------- Private methods --------------------------------------- */
217     void addMessage(long seqno, Long JavaDoc tmp, Message msg) {
218         if (transport != null)
219             transport.passDown(new Event(Event.MSG, msg));
220         msgs.put(tmp, msg);
221         retransmitter.add(seqno, seqno);
222     }
223
224     void addToQueue(long seqno, Message msg) {
225         try {
226             msg_queue.add(new Entry(seqno, msg));
227         }
228         catch(Exception JavaDoc ex) {
229             if(log.isErrorEnabled()) log.error("exception=" + ex);
230         }
231     }
232
233     Entry removeFromQueue() {
234         try {
235             return msg_queue.size() == 0 ? null : (Entry)msg_queue.remove();
236         }
237         catch(Exception JavaDoc ex) {
238             if(log.isErrorEnabled()) log.error("exception=" + ex);
239             return null;
240         }
241     }
242     /* ------------------------------ End of Private methods ------------------------------------ */
243
244
245
246
247     /** Struct used to store message alongside with its seqno in the message queue */
248     class Entry {
249         final long seqno;
250         final Message msg;
251
252         Entry(long seqno, Message msg) {
253             this.seqno = seqno;
254             this.msg = msg;
255         }
256     }
257
258
259     static class Dummy implements RetransmitCommand {
260         final long last_xmit_req = 0;
261          long curr_time;
262
263
264         public void retransmit(long seqno, Message msg) {
265
266                 if(log.isDebugEnabled()) log.debug("seqno=" + seqno);
267
268             curr_time = System.currentTimeMillis();
269         }
270     }
271
272
273     public static void main(String JavaDoc[] args) {
274         long[] xmit_timeouts = {1000, 2000, 3000, 4000};
275         AckSenderWindow win = new AckSenderWindow(new Dummy(), xmit_timeouts);
276
277
278
279         final int NUM = 1000;
280
281         for (int i = 1; i < NUM; i++)
282             win.add(i, new Message());
283
284
285         System.out.println(win);
286         Util.sleep(5000);
287
288         for (int i = 1; i < NUM; i++) {
289             if (i % 2 == 0) // ack the even seqnos
290
win.ack(i);
291         }
292
293         System.out.println(win);
294         Util.sleep(4000);
295
296         for (int i = 1; i < NUM; i++) {
297             if (i % 2 != 0) // ack the odd seqnos
298
win.ack(i);
299         }
300         System.out.println(win);
301
302         if (true) {
303             Util.sleep(4000);
304             System.out.println("--done--");
305             return;
306         }
307
308
309         win.add(3, new Message());
310         win.add(5, new Message());
311         win.add(4, new Message());
312         win.add(8, new Message());
313         win.add(9, new Message());
314         win.add(6, new Message());
315         win.add(7, new Message());
316         win.add(3, new Message());
317         System.out.println(win);
318
319
320         try {
321             Thread.sleep(5000);
322             win.ack(5);
323             System.out.println("ack(5)");
324             win.ack(4);
325             System.out.println("ack(4)");
326             win.ack(6);
327             System.out.println("ack(6)");
328             win.ack(7);
329             System.out.println("ack(7)");
330             win.ack(8);
331             System.out.println("ack(8)");
332             win.ack(6);
333             System.out.println("ack(6)");
334             win.ack(9);
335             System.out.println("ack(9)");
336             System.out.println(win);
337
338             Thread.sleep(5000);
339             win.ack(3);
340             System.out.println("ack(3)");
341             System.out.println(win);
342
343             Thread.sleep(3000);
344             win.add(10, new Message());
345             win.add(11, new Message());
346             System.out.println(win);
347             Thread.sleep(3000);
348             win.ack(10);
349             System.out.println("ack(10)");
350             win.ack(11);
351             System.out.println("ack(11)");
352             System.out.println(win);
353
354             win.add(12, new Message());
355             win.add(13, new Message());
356             win.add(14, new Message());
357             win.add(15, new Message());
358             win.add(16, new Message());
359             System.out.println(win);
360
361             Util.sleep(1000);
362             win.ack(12);
363             System.out.println("ack(12)");
364             win.ack(13);
365             System.out.println("ack(13)");
366
367             win.ack(15);
368             System.out.println("ack(15)");
369             System.out.println(win);
370
371             Util.sleep(5000);
372             win.ack(16);
373             System.out.println("ack(16)");
374             System.out.println(win);
375
376             Util.sleep(1000);
377
378             win.ack(14);
379             System.out.println("ack(14)");
380             System.out.println(win);
381         } catch (Exception JavaDoc e) {
382             System.err.println(e);
383         }
384     }
385
386 }
387
Popular Tags