KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > Retransmitter


1 // $Id: Retransmitter.java,v 1.6 2004/09/23 16:29:53 belaban Exp $
2

3 package org.jgroups.stack;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.Address;
8 import org.jgroups.util.TimeScheduler;
9 import org.jgroups.util.Util;
10
11 import java.util.ArrayList JavaDoc;
12 import java.util.LinkedList JavaDoc;
13 import java.util.ListIterator JavaDoc;
14
15
16 /**
17  * Maintains a pool of sequence numbers of messages that need to be retransmitted. Messages
18  * are aged and retransmission requests sent according to age (linear backoff used). If a
19  * TimeScheduler instance is given to the constructor, it will be used, otherwise Reransmitter
20  * will create its own. The retransmit timeouts have to be set first thing after creating an instance.
21  * The <code>add()</code> method adds a range of sequence numbers of messages to be retransmitted. The
22  * <code>remove()</code> method removes a sequence number again, cancelling retransmission requests for it.
23  * Whenever a message needs to be retransmitted, the <code>RetransmitCommand.retransmit()</code> method is called.
24  * It can be used e.g. by an ack-based scheme (e.g. AckSenderWindow) to retransmit a message to the receiver, or
25  * by a nak-based scheme to send a retransmission request to the sender of the missing message.
26  *
27  * @author John Giorgiadis
28  * @author Bela Ban
29  * @version $Revision: 1.6 $
30  */

31 public class Retransmitter {
32
33     private static final long SEC=1000;
34     /** Default retransmit intervals (ms) - exponential approx. */
35     private static long[] RETRANSMIT_TIMEOUTS={2 * SEC, 3 * SEC, 5 * SEC, 8 * SEC};
36     /** Default retransmit thread suspend timeout (ms) */
37     private static final long SUSPEND_TIMEOUT=2000;
38
39     private Address sender=null;
40     private final LinkedList JavaDoc msgs=new LinkedList JavaDoc();
41     private RetransmitCommand cmd=null;
42     private boolean retransmitter_owned;
43     private TimeScheduler retransmitter=null;
44     protected static final Log log=LogFactory.getLog(Retransmitter.class);
45
46
47     /** Retransmit command (see Gamma et al.) used to retrieve missing messages */
48     public interface RetransmitCommand {
49         /**
50          * Get the missing messages between sequence numbers
51          * <code>first_seqno</code> and <code>last_seqno</code>. This can either be done by sending a
52          * retransmit message to destination <code>sender</code> (nak-based scheme), or by
53          * retransmitting the missing message(s) to <code>sender</code> (ack-based scheme).
54          * @param first_seqno The sequence number of the first missing message
55          * @param last_seqno The sequence number of the last missing message
56          * @param sender The destination of the member to which the retransmit request will be sent
57          * (nak-based scheme), or to which the message will be retransmitted (ack-based scheme).
58          */

59         void retransmit(long first_seqno, long last_seqno, Address sender);
60     }
61
62
63     /**
64      * Create a new Retransmitter associated with the given sender address
65      * @param sender the address from which retransmissions are expected or to which retransmissions are sent
66      * @param cmd the retransmission callback reference
67      * @param sched retransmissions scheduler
68      */

69     public Retransmitter(Address sender, RetransmitCommand cmd, TimeScheduler sched) {
70         init(sender, cmd, sched, false);
71     }
72
73
74     /**
75      * Create a new Retransmitter associated with the given sender address
76      * @param sender the address from which retransmissions are expected or to which retransmissions are sent
77      * @param cmd the retransmission callback reference
78      */

79     public Retransmitter(Address sender, RetransmitCommand cmd) {
80         init(sender, cmd, new TimeScheduler(SUSPEND_TIMEOUT), true);
81     }
82
83
84     public void setRetransmitTimeouts(long[] timeouts) {
85         if(timeouts != null)
86             RETRANSMIT_TIMEOUTS=timeouts;
87     }
88
89
90     /**
91      * Add the given range [first_seqno, last_seqno] in the list of
92      * entries eligible for retransmission. If first_seqno > last_seqno,
93      * then the range [last_seqno, first_seqno] is added instead
94      * <p>
95      * If retransmitter thread is suspended, wake it up
96      * TODO:
97      * Does not check for duplicates !
98      */

99     public void add(long first_seqno, long last_seqno) {
100         Entry e;
101
102         if(first_seqno > last_seqno) {
103             long tmp=first_seqno;
104             first_seqno=last_seqno;
105             last_seqno=tmp;
106         }
107         synchronized(msgs) {
108             e=new Entry(first_seqno, last_seqno, RETRANSMIT_TIMEOUTS);
109             msgs.add(e);
110             retransmitter.add(e);
111         }
112     }
113
114     /**
115      * Remove the given sequence number from the list of seqnos eligible
116      * for retransmission. If there are no more seqno intervals in the
117      * respective entry, cancel the entry from the retransmission
118      * scheduler and remove it from the pending entries
119      */

120     public void remove(long seqno) {
121         Entry e;
122
123         synchronized(msgs) {
124             for(ListIterator JavaDoc it=msgs.listIterator(); it.hasNext();) {
125                 e=(Entry)it.next();
126                 synchronized(e) {
127                     if(seqno < e.low || seqno > e.high) continue;
128                     e.remove(seqno);
129                     if(e.low > e.high) {
130                         e.cancel();
131                         it.remove();
132                     }
133                 }
134                 break;
135             }
136         }
137     }
138
139     /**
140      * Reset the retransmitter: clear all msgs and cancel all the
141      * respective tasks
142      */

143     public void reset() {
144         Entry entry;
145
146         synchronized(msgs) {
147             for(ListIterator JavaDoc it=msgs.listIterator(); it.hasNext();) {
148                 entry=(Entry)it.next();
149                 entry.cancel();
150             }
151             msgs.clear();
152         }
153     }
154
155     /**
156      * Stop the rentransmition and clear all pending msgs.
157      * <p>
158      * If this retransmitter has been provided an externally managed
159      * scheduler, then just clear all msgs and the associated tasks, else
160      * stop the scheduler. In this case the method blocks until the
161      * scheduler's thread is dead. Only the owner of the scheduler should
162      * stop it.
163      */

164     public void stop() {
165         Entry entry;
166
167         // i. If retransmitter is owned, stop it else cancel all tasks
168
// ii. Clear all pending msgs
169
synchronized(msgs) {
170             if(retransmitter_owned) {
171                 try {
172                     retransmitter.stop();
173                 }
174                 catch(InterruptedException JavaDoc ex) {
175                     if(log.isErrorEnabled()) log.error(Util.printStackTrace(ex));
176                 }
177             }
178             else {
179                 for(ListIterator JavaDoc it=msgs.listIterator(); it.hasNext();) {
180                     entry=(Entry)it.next();
181                     entry.cancel();
182             }
183             }
184             msgs.clear();
185         }
186     }
187
188
189     public String JavaDoc toString() {
190         return (msgs.size() + " messages to retransmit: (" + msgs.toString() + ')');
191     }
192
193
194
195
196
197     /* ------------------------------- Private Methods -------------------------------------- */
198
199     /**
200      * Init this object
201      *
202      * @param sender the address from which retransmissions are expected
203      * @param cmd the retransmission callback reference
204      * @param sched retransmissions scheduler
205      * @param sched_owned whether the scheduler parameter is owned by this
206      * object or is externally provided
207      */

208     private void init(Address sender, RetransmitCommand cmd, TimeScheduler sched, boolean sched_owned) {
209         this.sender=sender;
210         this.cmd=cmd;
211         retransmitter_owned=sched_owned;
212         retransmitter=sched;
213     }
214
215
216     /* ---------------------------- End of Private Methods ------------------------------------ */
217
218
219
220     /**
221      * The retransmit task executed by the scheduler in regular intervals
222      */

223     private static abstract class Task implements TimeScheduler.Task {
224         private final Interval intervals;
225         private boolean cancelled;
226
227         protected Task(long[] intervals) {
228             this.intervals=new Interval(intervals);
229             this.cancelled=false;
230         }
231
232         public long nextInterval() {
233             return (intervals.next());
234         }
235
236         public boolean cancelled() {
237             return (cancelled);
238         }
239
240         public void cancel() {
241             cancelled=true;
242         }
243     }
244
245
246     /**
247      * The entry associated with an initial group of missing messages
248      * with contiguous sequence numbers and with all its subgroups.<br>
249      * E.g.
250      * - initial group: [5-34]
251      * - msg 12 is acknowledged, now the groups are: [5-11], [13-34]
252      * <p>
253      * Groups are stored in a list as long[2] arrays of the each group's
254      * bounds. For speed and convenience, the lowest & highest bounds of
255      * all the groups in this entry are also stored separately
256      */

257     private class Entry extends Task {
258         public long low;
259         public long high;
260         public final java.util.List JavaDoc list;
261
262         public Entry(long low, long high, long[] intervals) {
263             super(intervals);
264             this.low=low;
265             this.high=high;
266             list=new ArrayList JavaDoc();
267             list.add(new long[]{low, high});
268         }
269
270         /**
271          * Remove the given seqno and resize or partition groups as
272          * necessary. The algorithm is as follows:<br>
273          * i. Find the group with low <= seqno <= high
274          * ii. If seqno == low,
275          * a. if low == high, then remove the group
276          * Adjust global low. If global low was pointing to the group
277          * deleted in the previous step, set it to point to the next group.
278          * If there is no next group, set global low to be higher than
279          * global high. This way the entry is invalidated and will be removed
280          * all together from the pending msgs and the task scheduler
281          * iii. If seqno == high, adjust high, adjust global high if this is
282          * the group at the tail of the list
283          * iv. Else low < seqno < high, break [low,high] into [low,seqno-1]
284          * and [seqno+1,high]
285          *
286          * @param seqno the sequence number to remove
287          */

288         public void remove(long seqno) {
289             int i;
290             long[] bounds, newBounds;
291
292             bounds=null;
293             synchronized(this) {
294                 for(i=0; i < list.size(); ++i) {
295                     bounds=(long[])list.get(i);
296                     if(seqno < bounds[0] || seqno > bounds[1]) continue;
297                     break;
298                 }
299                 if(i == list.size()) return;
300
301                 if(seqno == bounds[0]) {
302                     if(bounds[0] == bounds[1])
303                         list.remove(i);
304                     else
305                         bounds[0]++;
306                     if(i == 0)
307                         low=list.size() == 0 ? high + 1 : ((long[])list.get(i))[0];
308                 }
309                 else if(seqno == bounds[1]) {
310                     bounds[1]--;
311                     if(i == list.size() - 1) high=((long[])list.get(i))[1];
312                 }
313                 else {
314                     newBounds=new long[2];
315                     newBounds[0]=seqno + 1;
316                     newBounds[1]=bounds[1];
317                     bounds[1]=seqno - 1;
318                     list.add(i + 1, newBounds);
319                 }
320             }
321         }
322
323         /**
324          * Retransmission task:<br>
325          * For each interval, call the retransmission callback command
326          */

327         public void run() {
328             long[] bounds;
329
330             synchronized(this) {
331                 for(int i=0; i < list.size(); ++i) {
332                     bounds=(long[])list.get(i);
333                     cmd.retransmit(bounds[0], bounds[1], sender);
334                 }
335             }
336         }
337
338         public String JavaDoc toString() {
339             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
340             if(low == high)
341                 sb.append(low);
342             else
343                 sb.append(low).append(':').append(high);
344             return sb.toString();
345         }
346
347     } // end class Entry
348

349
350     public static void main(String JavaDoc[] args) {
351         Retransmitter xmitter;
352         Address sender;
353
354         try {
355             sender=new org.jgroups.stack.IpAddress("localhost", 5555);
356             xmitter=new Retransmitter(sender, new MyXmitter());
357             xmitter.setRetransmitTimeouts(new long[]{1000, 2000, 4000, 8000});
358
359             xmitter.add(1, 10);
360             xmitter.remove(1);
361             xmitter.remove(2);
362             xmitter.remove(4);
363
364             Util.sleep(3000);
365             xmitter.remove(3);
366
367             Util.sleep(1000);
368             xmitter.remove(10);
369             xmitter.remove(8);
370             xmitter.remove(6);
371             xmitter.remove(7);
372             xmitter.remove(9);
373             xmitter.remove(5);
374         }
375         catch(Exception JavaDoc e) {
376             System.err.println(e);
377         }
378     }
379
380
381     static class MyXmitter implements Retransmitter.RetransmitCommand {
382
383         public void retransmit(long first_seqno, long last_seqno, Address sender) {
384             System.out.println("-- " + new java.util.Date JavaDoc() + ": retransmit(" + first_seqno + ", " +
385                                last_seqno + ", " + sender + ')');
386         }
387     }
388
389     static void sleep(long timeout) {
390         Util.sleep(timeout);
391     }
392
393 }
394
395
Popular Tags