KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > AckMcastSenderWindow


1 // $Id: AckMcastSenderWindow.java,v 1.8 2004/09/23 16:29:53 belaban Exp $
2

3 package org.jgroups.stack;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Address;
9 import org.jgroups.Message;
10 import org.jgroups.util.TimeScheduler;
11
12 import java.io.PrintWriter JavaDoc;
13 import java.io.StringWriter JavaDoc;
14 import java.util.*;
15
16
17
18
19 /**
20  * Keeps track of ACKs from receivers for each message. When a new message is
21  * sent, it is tagged with a sequence number and the receiver set (set of
22  * members to which the message is sent) and added to a hashtable
23  * (key = sequence number, val = message + receiver set). Each incoming ACK
24  * is noted and when all ACKs for a specific sequence number haven been
25  * received, the corresponding entry is removed from the hashtable. A
26  * retransmission thread periodically re-sends the message point-to-point to
27  * all receivers from which no ACKs have been received yet. A view change or
28  * suspect message causes the corresponding non-existing receivers to be
29  * removed from the hashtable.
30  * <p>
31  * This class may need flow control in order to avoid needless
32  * retransmissions because of timeouts.
33  *
34  * @author Bela Ban June 9 1999
35  * @author John Georgiadis May 8 2001
36  * @version $Revision: 1.8 $
37  */

38 public class AckMcastSenderWindow {
39     /**
40      * Called by retransmitter thread whenever a message needs to be re-sent
41      * to a destination. <code>dest</code> has to be set in the
42      * <code>dst</code> field of <code>msg</code>, as the latter was sent
43      * multicast, but now we are sending a unicast message. Message has to be
44      * copied before sending it (as headers will be appended and therefore
45      * the message changed!).
46      */

47     public interface RetransmitCommand {
48     /**
49      * Retranmit the given msg
50      *
51      * @param seqno the sequence number associated with the message
52      * @param msg the msg to retransmit (it should be a copy!)
53      * @param dest the msg destination
54      */

55     void retransmit(long seqno, Message msg, Address dest);
56     }
57
58
59     /**
60      * The retransmit task executed by the scheduler in regular intervals
61      */

62     private static abstract class Task implements TimeScheduler.Task {
63     private final Interval intervals;
64     private boolean cancelled;
65
66     protected Task(long[] intervals) {
67         this.intervals = new Interval(intervals);
68         this.cancelled = false;
69     }
70     public long nextInterval() { return(intervals.next()); }
71     public void cancel() { cancelled = true; }
72     public boolean cancelled() { return(cancelled); }
73     }
74
75
76     /**
77      * The entry associated with a pending msg
78      */

79     private class Entry extends Task {
80     /** The msg sequence number */
81     public final long seqno;
82     /** The msg to retransmit */
83     public Message msg = null;
84     /** destination addr -> boolean (true = received, false = not) */
85     public final Hashtable senders = new Hashtable();
86     /** How many destinations have received the msg */
87     public int num_received = 0;
88
89     public Entry(long seqno, Message msg, Vector dests, long[] intervals) {
90         super(intervals);
91         this.seqno = seqno;
92         this.msg = msg;
93         for (int i = 0; i < dests.size(); i++)
94         senders.put(dests.elementAt(i), Boolean.FALSE);
95     }
96
97     boolean allReceived() {
98         return(num_received >= senders.size());
99     }
100
101     /** Retransmit this entry */
102     public void run() { _retransmit(this); }
103
104     public String JavaDoc toString() {
105         StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
106         buff.append("num_received = " + num_received +
107             ", received msgs = " + senders);
108         return(buff.toString());
109     }
110     }
111
112     
113
114     private static final long SEC = 1000;
115     /** Default retransmit intervals (ms) - exponential approx. */
116     private static final long[] RETRANSMIT_TIMEOUTS = {
117     2*SEC,
118     3*SEC,
119     5*SEC,
120     8*SEC};
121     /** Default retransmit thread suspend timeout (ms) */
122     private static final long SUSPEND_TIMEOUT = 2000;
123
124     protected static final Log log=LogFactory.getLog(AckMcastSenderWindow.class);
125
126
127     // Msg tables related
128
/** Table of pending msgs: seqno -> Entry */
129     private final Hashtable msgs = new Hashtable();
130
131     /** List of recently suspected members. Used to cease retransmission to suspected members */
132     private final LinkedList suspects=new LinkedList();
133
134     /** Max number in suspects list */
135     private final int max_suspects=20;
136
137     /**
138      * List of acknowledged msgs since the last call to
139      * <code>getStableMessages()</code>
140      */

141     private final Vector stable_msgs = new Vector();
142     /** Whether a call to <code>waitUntilAcksReceived()</code> is still active */
143     private boolean waiting = false;
144
145     // Retransmission thread related
146
/** Whether retransmitter is externally provided or owned by this object */
147     private boolean retransmitter_owned;
148     /** The retransmission scheduler */
149     private TimeScheduler retransmitter = null;
150     /** Retransmission intervals */
151     private long[] retransmit_intervals;
152     /** The callback object for retransmission */
153     private RetransmitCommand cmd = null;
154
155
156     /**
157      * Convert exception stack trace to string
158      */

159     private static String JavaDoc _toString(Throwable JavaDoc ex) {
160     StringWriter JavaDoc sw = new StringWriter JavaDoc();
161     PrintWriter JavaDoc pw = new PrintWriter JavaDoc(sw);
162     ex.printStackTrace(pw);
163     return(sw.toString());
164     }
165
166
167     /**
168      * @param entry the record associated with the msg to retransmit. It
169      * contains the list of receivers that haven't yet ack reception
170      */

171     private void _retransmit(Entry entry) {
172     Address sender;
173     boolean received;
174
175     synchronized(entry) {
176         for(Enumeration e = entry.senders.keys(); e.hasMoreElements();) {
177         sender = (Address)e.nextElement();
178         received = ((Boolean JavaDoc)entry.senders.get(sender)).booleanValue();
179         if (!received) {
180             if(suspects.contains(sender)) {
181
182                 if(log.isWarnEnabled()) log.warn("removing " + sender +
183                        " from retransmit list as it is in the suspect list");
184             remove(sender);
185             continue;
186             }
187
188             if(log.isInfoEnabled()) log.info("--> retransmitting msg #" +
189                    entry.seqno + " to " + sender);
190             cmd.retransmit(entry.seqno, entry.msg.copy(), sender);
191         }
192         }
193     }
194     }
195
196
197     /**
198      * Setup this object's state
199      *
200      * @param cmd the callback object for retranmissions
201      * @param retransmit_timeout the interval between two consecutive
202      * retransmission attempts
203      * @param sched the external scheduler to use to schedule retransmissions
204      * @param sched_owned if true, the scheduler is owned by this object and
205      * can be started/stopped/destroyed. If false, the scheduler is shared
206      * among multiple objects and start()/stop() should not be called from
207      * within this object
208      *
209      * @throws IllegalArgumentException if <code>cmd</code> is null
210      */

211     private void init(RetransmitCommand cmd, long[] retransmit_intervals,
212               TimeScheduler sched, boolean sched_owned) {
213     if (cmd == null) {
214         if(log.isErrorEnabled()) log.error("command is null. Cannot retransmit " + "messages !");
215         throw new IllegalArgumentException JavaDoc("cmd");
216     }
217
218     retransmitter_owned = sched_owned;
219     retransmitter = sched;
220     this.retransmit_intervals = retransmit_intervals;
221     this.cmd = cmd;
222
223     start();
224     }
225
226
227     /**
228      * Create and <b>start</b> the retransmitter
229      *
230      * @param cmd the callback object for retranmissions
231      * @param retransmit_timeout the interval between two consecutive
232      * retransmission attempts
233      * @param sched the external scheduler to use to schedule retransmissions
234      *
235      * @throws IllegalArgumentException if <code>cmd</code> is null
236      */

237     public AckMcastSenderWindow(RetransmitCommand cmd,
238                 long[] retransmit_intervals, TimeScheduler sched) {
239     init(cmd, retransmit_intervals, sched, false);
240     }
241
242
243     /**
244      * Create and <b>start</b> the retransmitter
245      *
246      * @param cmd the callback object for retranmissions
247      * @param sched the external scheduler to use to schedule retransmissions
248      *
249      * @throws IllegalArgumentException if <code>cmd</code> is null
250      */

251     public AckMcastSenderWindow(RetransmitCommand cmd, TimeScheduler sched) {
252     init(cmd, RETRANSMIT_TIMEOUTS, sched, false);
253     }
254
255
256
257     /**
258      * Create and <b>start</b> the retransmitter
259      *
260      * @param cmd the callback object for retranmissions
261      * @param retransmit_timeout the interval between two consecutive
262      * retransmission attempts
263      *
264      * @throws IllegalArgumentException if <code>cmd</code> is null
265      */

266     public AckMcastSenderWindow(RetransmitCommand cmd, long[] retransmit_intervals) {
267     init(cmd, retransmit_intervals, new TimeScheduler(SUSPEND_TIMEOUT), true);
268     }
269
270     /**
271      * Create and <b>start</b> the retransmitter
272      *
273      * @param cmd the callback object for retranmissions
274      *
275      * @throws IllegalArgumentException if <code>cmd</code> is null
276      */

277     public AckMcastSenderWindow(RetransmitCommand cmd) {
278     this(cmd, RETRANSMIT_TIMEOUTS);
279     }
280
281
282     /**
283      * Adds a new message to the hash table.
284      *
285      * @param seqno The sequence number associated with the message
286      * @param msg The message (should be a copy!)
287      * @param receivers The set of addresses to which the message was sent
288      * and from which consequently an ACK is expected
289      */

290     public void add(long seqno, Message msg, Vector receivers) {
291     Entry e;
292
293     if (waiting) return;
294     if (receivers.size() == 0) return;
295
296     synchronized(msgs) {
297         if (msgs.get(new Long JavaDoc(seqno)) != null) return;
298         e = new Entry(seqno, msg, receivers, retransmit_intervals);
299         msgs.put(new Long JavaDoc(seqno), e);
300         retransmitter.add(e);
301     }
302     }
303
304
305     /**
306      * An ACK has been received from <code>sender</code>. Tag the sender in
307      * the hash table as 'received'. If all ACKs have been received, remove
308      * the entry all together.
309      *
310      * @param seqno The sequence number of the message for which an ACK has
311      * been received.
312      * @param sender The sender which sent the ACK
313      */

314     public void ack(long seqno, Address sender) {
315     Entry entry;
316     Boolean JavaDoc received;
317
318     synchronized(msgs) {
319         entry = (Entry)msgs.get(new Long JavaDoc(seqno));
320         if (entry == null) return;
321             
322         synchronized(entry) {
323         received = (Boolean JavaDoc)entry.senders.get(sender);
324         if (received == null || received.booleanValue()) return;
325             
326         // If not yet received
327
entry.senders.put(sender, Boolean.TRUE);
328         entry.num_received++;
329         if (!entry.allReceived()) return;
330         }
331             
332         synchronized(stable_msgs) {
333         entry.cancel();
334         msgs.remove(new Long JavaDoc(seqno));
335         stable_msgs.add(new Long JavaDoc(seqno));
336         }
337         // wake up waitUntilAllAcksReceived() method
338
msgs.notifyAll();
339     }
340     }
341     
342
343     /**
344      * Remove <code>obj</code> from all receiver sets and wake up
345      * retransmission thread.
346      *
347      * @param obj the sender to remove
348      */

349     public void remove(Address obj) {
350     Long JavaDoc key;
351     Entry entry;
352
353     synchronized(msgs) {
354         for (Enumeration e = msgs.keys(); e.hasMoreElements();) {
355         key = (Long JavaDoc)e.nextElement();
356         entry = (Entry)msgs.get(key);
357         synchronized(entry) {
358             //if (((Boolean)entry.senders.remove(obj)).booleanValue()) entry.num_received--;
359
//if (!entry.allReceived()) continue;
360
Boolean JavaDoc received = (Boolean JavaDoc)entry.senders.remove(obj);
361             if(received == null) continue; // suspected member not in entry.senders ?
362
if (received.booleanValue()) entry.num_received--;
363             if (!entry.allReceived()) continue;
364         }
365         synchronized(stable_msgs) {
366             entry.cancel();
367             msgs.remove(key);
368             stable_msgs.add(key);
369         }
370         // wake up waitUntilAllAcksReceived() method
371
msgs.notifyAll();
372         }
373     }
374     }
375
376
377     /**
378      * Process with address <code>suspected</code> is suspected: remove it
379      * from all receiver sets. This means that no ACKs are expected from this
380      * process anymore.
381      *
382      * @param suspected The suspected process
383      */

384     public void suspect(Address suspected) {
385
386         if(log.isInfoEnabled()) log.info("suspect is " + suspected);
387     remove(suspected);
388     suspects.add(suspected);
389     if(suspects.size() >= max_suspects)
390         suspects.removeFirst();
391     }
392
393
394     /**
395      * @return a copy of stable messages, or null (if non available). Removes
396      * all stable messages afterwards
397      */

398     public Vector getStableMessages() {
399     Vector retval;
400
401     synchronized(stable_msgs) {
402         retval = (stable_msgs.size() > 0)? (Vector)stable_msgs.clone():null;
403         if (stable_msgs.size() > 0) stable_msgs.clear();
404     }
405         
406     return(retval);
407     }
408
409
410     public void clearStableMessages() {
411     synchronized(stable_msgs) {
412         stable_msgs.clear();
413     }
414     }
415
416
417     /**
418      * @return the number of currently pending msgs
419      */

420     public long size() {
421     synchronized(msgs) {
422         return(msgs.size());
423     }
424     }
425
426
427     /** Returns the number of members for a given entry for which acks have to be received */
428     public long getNumberOfResponsesExpected(long seqno) {
429     Entry entry=(Entry)msgs.get(new Long JavaDoc(seqno));
430     if(entry != null)
431         return entry.senders.size();
432     else
433         return -1;
434     }
435
436     /** Returns the number of members for a given entry for which acks have been received */
437     public long getNumberOfResponsesReceived(long seqno) {
438     Entry entry=(Entry)msgs.get(new Long JavaDoc(seqno));
439     if(entry != null)
440         return entry.num_received;
441     else
442         return -1;
443     }
444
445     /** Prints all members plus whether an ack has been received from those members for a given seqno */
446     public String JavaDoc printDetails(long seqno) {
447     Entry entry=(Entry)msgs.get(new Long JavaDoc(seqno));
448     if(entry != null)
449         return entry.toString();
450     else
451         return null;
452     }
453
454
455     /**
456      * Waits until all outstanding messages have been ACKed by all receivers.
457      * Takes into account suspicions and view changes. Returns when there are
458      * no entries left in the hashtable. <b>While waiting, no entries can be
459      * added to the hashtable (they will be discarded).</b>
460      *
461      * @param timeout Miliseconds to wait. 0 means wait indefinitely.
462      */

463     public void waitUntilAllAcksReceived(long timeout) {
464     long time_to_wait, start_time, current_time;
465     Address suspect;
466
467     // remove all suspected members from retransmission
468
for(Iterator it=suspects.iterator(); it.hasNext();) {
469         suspect=(Address)it.next();
470         remove(suspect);
471     }
472     
473     time_to_wait = timeout;
474     waiting = true;
475     if (timeout <= 0) {
476         synchronized(msgs) {
477         while(msgs.size() > 0)
478             try { msgs.wait(); } catch(InterruptedException JavaDoc ex) {}
479         }
480     } else {
481         start_time = System.currentTimeMillis();
482         synchronized(msgs) {
483         while(msgs.size() > 0) {
484             current_time = System.currentTimeMillis();
485             time_to_wait = timeout - (current_time - start_time);
486             if (time_to_wait <= 0) break;
487             
488             try {
489             msgs.wait(time_to_wait);
490             } catch(InterruptedException JavaDoc ex) {
491             if(log.isWarnEnabled()) log.warn(ex.toString());
492             }
493         }
494         }
495     }
496     waiting = false;
497     }
498
499
500
501
502     /**
503      * Start the retransmitter. This has no effect, if the retransmitter
504      * was externally provided
505      */

506     public void start() {
507     if (retransmitter_owned)
508         retransmitter.start();
509     }
510
511
512     /**
513      * Stop the rentransmition and clear all pending msgs.
514      * <p>
515      * If this retransmitter has been provided an externally managed
516      * scheduler, then just clear all msgs and the associated tasks, else
517      * stop the scheduler. In this case the method blocks until the
518      * scheduler's thread is dead. Only the owner of the scheduler should
519      * stop it.
520      */

521     public void stop() {
522     Entry entry;
523
524     // i. If retransmitter is owned, stop it else cancel all tasks
525
// ii. Clear all pending msgs and notify anyone waiting
526
synchronized(msgs) {
527         if (retransmitter_owned) {
528         try {
529             retransmitter.stop();
530         } catch(InterruptedException JavaDoc ex) {
531             if(log.isErrorEnabled()) log.error(_toString(ex));
532         }
533         } else {
534         for (Enumeration e = msgs.elements(); e.hasMoreElements();) {
535             entry = (Entry)e.nextElement();
536             entry.cancel();
537         }
538         }
539         msgs.clear();
540         // wake up waitUntilAllAcksReceived() method
541
msgs.notifyAll();
542     }
543     }
544
545
546     /**
547      * Remove all pending msgs from the hashtable. Cancel all associated
548      * tasks in the retransmission scheduler
549      */

550     public void reset() {
551     Entry entry;
552
553     if (waiting) return;
554
555     synchronized(msgs) {
556         for (Enumeration e = msgs.elements(); e.hasMoreElements();) {
557         entry = (Entry)e.nextElement();
558         entry.cancel();
559         }
560         msgs.clear();
561         msgs.notifyAll();
562     }
563     }
564
565
566     public String JavaDoc toString() {
567     StringBuffer JavaDoc ret;
568     Entry entry;
569     Long JavaDoc key;
570
571     ret = new StringBuffer JavaDoc();
572     synchronized(msgs) {
573         ret.append("msgs: (" + msgs.size() + ')');
574         for (Enumeration e = msgs.keys(); e.hasMoreElements();) {
575         key = (Long JavaDoc)e.nextElement();
576         entry = (Entry)msgs.get(key);
577         ret.append("key = " + key + ", value = " + entry + '\n');
578         }
579         synchronized(stable_msgs) {
580         ret.append("\nstable_msgs: " + stable_msgs);
581         }
582     }
583         
584     return(ret.toString());
585     }
586 }
587
Popular Tags