KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > NAKACK


1 // $Id: NAKACK.java,v 1.40 2005/04/20 11:18:33 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.NakReceiverWindow;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.stack.Retransmitter;
9 import org.jgroups.util.*;
10
11 import java.util.*;
12 import java.io.*;
13
14
15 /**
16  * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno).
17  * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted
18  * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10
19  * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below
20  * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we
21  * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message
22  * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that
23  * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent
24  * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this
25  * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member.
26  * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use
27  * vsync.
28  *
29  * @author Bela Ban
30  */

31 public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand {
32     private long[] retransmit_timeout={600, 1200, 2400, 4800}; // time(s) to wait before requesting retransmission
33
private boolean is_server=false;
34     private Address local_addr=null;
35     private final Vector members=new Vector(11);
36     private long seqno=0; // current message sequence number (starts with 0)
37
private long max_xmit_size=8192; // max size of a retransmit message (otherwise send multiple)
38
private int gc_lag=20; // number of msgs garbage collection lags behind
39

40     /**
41      * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a
42      * message, the sender only retransmits once.
43      */

44     private boolean use_mcast_xmit=false;
45
46
47     /**
48      * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
49      * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
50      * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
51      * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
52      * around, and don't need to wait for garbage collection to remove them.
53      */

54     private boolean discard_delivered_msgs=false;
55
56     /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
57      * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
58      */

59     private int max_xmit_buf_size=0;
60
61
62     /**
63      * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term
64      * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established)
65      */

66     private final HashMap received_msgs=new HashMap(11);
67
68     /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */
69     private final TreeMap sent_msgs=new TreeMap();
70
71     private boolean leaving=false;
72     private TimeScheduler timer=null;
73     static final String JavaDoc name="NAKACK";
74
75
76 // public static final HashMap xmit_stats=new HashMap(); // sender - HashMap(seqno - XmitStat)
77
//
78
// public static class XmitStat {
79
// int num_xmits_requests=0;
80
// long xmit_received;
81
// long[] xmit_reqs=new long[10];
82
//
83
// public XmitStat() {
84
// for(int i=0; i < xmit_reqs.length; i++)
85
// xmit_reqs[i]=0;
86
// xmitRequest();
87
// }
88
//
89
// public void xmitRequest() {
90
// xmit_reqs[num_xmits_requests++]=System.currentTimeMillis();
91
// }
92
//
93
// public void xmitReceived() {
94
// xmit_received=System.currentTimeMillis();
95
// }
96
//
97
// public String toString() {
98
// StringBuffer sb=new StringBuffer();
99
// sb.append("total time: ");
100
// if(xmit_received > 0)
101
// sb.append(xmit_received - xmit_reqs[0]).append("\n");
102
// else
103
// sb.append("n/a\n");
104
// sb.append(num_xmits_requests).append(" XMIT requests:\n");
105
// for(int i=0; i < num_xmits_requests; i++) {
106
// sb.append("#").append(i+1).append(": ").append(xmit_reqs[i]);
107
// if(i-1 >= 0) {
108
// sb.append(" (diff to prev=").append(xmit_reqs[i] - xmit_reqs[i-1]);
109
// }
110
// sb.append("\nreceived at " ).append(xmit_received).append("\n");
111
// }
112
// return sb.toString();
113
// }
114
// }
115
//
116
// public static String dumpXmitStats() {
117
// StringBuffer sb=new StringBuffer();
118
// HashMap tmp;
119
// Map.Entry entry, entry2;
120
// Long seqno;
121
// XmitStat stat;
122
// Address sender;
123
// for(Iterator it=xmit_stats.entrySet().iterator(); it.hasNext();) {
124
// entry=(Map.Entry)it.next();
125
// sender=(Address)entry.getKey();
126
// sb.append("\nsender=" + sender + ":\n");
127
//
128
// tmp=(HashMap)entry.getValue();
129
// for(Iterator it2=tmp.entrySet().iterator(); it2.hasNext();) {
130
// entry2=(Map.Entry)it2.next();
131
// seqno=(Long)entry2.getKey();
132
// stat=(XmitStat)entry2.getValue();
133
// sb.append(seqno).append(": ").append(stat).append("\n");
134
// }
135
// }
136
// return sb.toString();
137
// }
138
//
139
//
140
// public static void addXmitRequest(Address sender, long seqno) {
141
// HashMap tmp=(HashMap)xmit_stats.get(sender);
142
// if(tmp == null) {
143
// tmp=new HashMap();
144
// xmit_stats.put(sender, tmp);
145
// }
146
// XmitStat stat=(XmitStat)tmp.get(new Long(seqno));
147
// if(stat == null) {
148
// stat=new XmitStat();
149
// tmp.put(new Long(seqno), stat);
150
// }
151
// else {
152
// stat.xmitRequest();
153
// }
154
// }
155
//
156
// public static void addXmitResponse(Address sender, long seqno) {
157
// HashMap tmp=(HashMap)xmit_stats.get(sender);
158
// if(tmp != null) {
159
// XmitStat stat=(XmitStat)tmp.get(new Long(seqno));
160
// if(stat != null)
161
// stat.xmitReceived();
162
// }
163
// }
164

165
166
167
168
169     public NAKACK() {
170     }
171
172
173     public String JavaDoc getName() {
174         return name;
175     }
176
177
178     public Vector providedUpServices() {
179         Vector retval=new Vector(5);
180         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST));
181         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STABLE));
182         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STATE));
183         retval.addElement(new Integer JavaDoc(Event.SET_DIGEST));
184         retval.addElement(new Integer JavaDoc(Event.MERGE_DIGEST));
185         return retval;
186     }
187
188
189     public Vector providedDownServices() {
190         Vector retval=new Vector(2);
191         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST));
192         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STABLE));
193         return retval;
194     }
195
196
197     public void start() throws Exception JavaDoc {
198         timer=stack != null ? stack.timer : null;
199         if(timer == null) {
200             throw new Exception JavaDoc("NAKACK.up(): timer is null");
201         }
202     }
203
204     public void stop() {
205         removeAll(); // clears sent_msgs and destroys all NakReceiverWindows
206
}
207
208
209     /**
210      * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this
211      * method as the event is passed down by default by the superclass after this method returns !</b>
212      */

213     public void down(Event evt) {
214         Digest digest;
215         Vector mbrs;
216
217         switch(evt.getType()) {
218
219         case Event.MSG:
220             Message msg=(Message)evt.getArg();
221             Address dest=msg.getDest();
222             if(dest != null && !dest.isMulticastAddress()) {
223                 break; // unicast address: not null and not mcast, pass down unchanged
224
}
225             send(evt, msg);
226             return; // don't pass down the stack
227

228         case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
229
stable((Digest)evt.getArg());
230             return; // do not pass down further (Bela Aug 7 2001)
231

232         case Event.GET_DIGEST:
233             digest=getDigest();
234             passUp(new Event(Event.GET_DIGEST_OK, digest != null ? digest.copy() : null));
235             return;
236
237         case Event.GET_DIGEST_STABLE:
238             digest=getDigestHighestDeliveredMsgs();
239             passUp(new Event(Event.GET_DIGEST_STABLE_OK, digest != null ? digest.copy() : null));
240             return;
241
242         case Event.GET_DIGEST_STATE:
243             digest=getDigest();
244             passUp(new Event(Event.GET_DIGEST_STATE_OK, digest != null ? digest.copy() : null));
245             return;
246
247         case Event.SET_DIGEST:
248             setDigest((Digest)evt.getArg());
249             return;
250
251         case Event.MERGE_DIGEST:
252             mergeDigest((Digest)evt.getArg());
253             return;
254
255         case Event.CONFIG:
256             passDown(evt);
257             if(log.isDebugEnabled()) {
258                 log.debug("received CONFIG event: " + evt.getArg());
259             }
260             handleConfigEvent((HashMap)evt.getArg());
261             return;
262
263         case Event.TMP_VIEW:
264             mbrs=((View)evt.getArg()).getMembers();
265             members.removeAllElements();
266             members.addAll(mbrs);
267             adjustReceivers();
268             break;
269
270         case Event.VIEW_CHANGE:
271             mbrs=((View)evt.getArg()).getMembers();
272             members.removeAllElements();
273             members.addAll(mbrs);
274             adjustReceivers();
275             is_server=true; // check vids from now on
276
break;
277
278         case Event.BECOME_SERVER:
279             is_server=true;
280             break;
281
282         case Event.DISCONNECT:
283             leaving=true;
284             removeAll();
285             seqno=0;
286             break;
287         }
288
289         passDown(evt);
290     }
291
292
293
294     /**
295      * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this
296      * method as the event is passed up by default by the superclass after this method returns !</b>
297      */

298     public void up(Event evt) {
299         NakAckHeader hdr;
300         Message msg;
301         Digest digest;
302
303         switch(evt.getType()) {
304
305         case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
306
stable((Digest)evt.getArg());
307             return; // do not pass up further (Bela Aug 7 2001)
308

309         case Event.GET_DIGEST:
310             digest=getDigestHighestDeliveredMsgs();
311             passDown(new Event(Event.GET_DIGEST_OK, digest));
312             return;
313
314         case Event.GET_DIGEST_STABLE:
315             digest=getDigestHighestDeliveredMsgs();
316             passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest));
317             return;
318
319         case Event.SET_LOCAL_ADDRESS:
320             local_addr=(Address)evt.getArg();
321             break;
322
323         case Event.CONFIG:
324             passUp(evt);
325             if(log.isDebugEnabled()) {
326                 log.debug("received CONFIG event: " + evt.getArg());
327             }
328             handleConfigEvent((HashMap)evt.getArg());
329             return;
330
331         case Event.MSG:
332             msg=(Message)evt.getArg();
333             hdr=(NakAckHeader)msg.getHeader(name);
334             if(hdr == null)
335                 break; // pass up (e.g. unicast msg)
336

337             // discard messages while not yet server (i.e., until JOIN has returned)
338
if(!is_server) {
339                 if(log.isTraceEnabled())
340                     log.trace("message was discarded (not yet server)");
341                 return;
342             }
343
344             // Changed by bela Jan 29 2003: we must not remove the header, otherwise
345
// further xmit requests will fail !
346
//hdr=(NakAckHeader)msg.removeHeader(getName());
347

348             switch(hdr.type) {
349
350             case NakAckHeader.MSG:
351                 handleMessage(msg, hdr);
352                 return; // transmitter passes message up for us !
353

354             case NakAckHeader.XMIT_REQ:
355                 if(hdr.range == null) {
356                     if(log.isErrorEnabled()) {
357                         log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + msg.getSrc());
358                     }
359                     return;
360                 }
361                 handleXmitReq(msg.getSrc(), hdr.range.low, hdr.range.high);
362                 return;
363
364             case NakAckHeader.XMIT_RSP:
365                 if(log.isTraceEnabled())
366                     log.trace("received missing messages " + hdr.range);
367                 handleXmitRsp(msg);
368                 return;
369
370             default:
371                 if(log.isErrorEnabled()) {
372                     log.error("NakAck header type " + hdr.type + " not known !");
373                 }
374                 return;
375             }
376         }
377         passUp(evt);
378     }
379
380
381     public boolean setProperties(Properties props) {
382         String JavaDoc str;
383         long[] tmp;
384
385         super.setProperties(props);
386         str=props.getProperty("retransmit_timeout");
387         if(str != null) {
388             tmp=Util.parseCommaDelimitedLongs(str);
389             props.remove("retransmit_timeout");
390             if(tmp != null && tmp.length > 0) {
391                 retransmit_timeout=tmp;
392             }
393         }
394
395         str=props.getProperty("gc_lag");
396         if(str != null) {
397             gc_lag=Integer.parseInt(str);
398             if(gc_lag < 1) {
399                 System.err.println("NAKACK.setProperties(): gc_lag has to be at least 1");
400                 return false;
401             }
402             props.remove("gc_lag");
403         }
404
405         str=props.getProperty("max_xmit_size");
406         if(str != null) {
407             max_xmit_size=Long.parseLong(str);
408             props.remove("max_xmit_size");
409         }
410
411         str=props.getProperty("use_mcast_xmit");
412         if(str != null) {
413             use_mcast_xmit=Boolean.valueOf(str).booleanValue();
414             props.remove("use_mcast_xmit");
415         }
416
417         str=props.getProperty("discard_delivered_msgs");
418         if(str != null) {
419             discard_delivered_msgs=Boolean.valueOf(str).booleanValue();
420             props.remove("discard_delivered_msgs");
421         }
422
423         str=props.getProperty("max_xmit_buf_size");
424         if(str != null) {
425             max_xmit_buf_size=Integer.parseInt(str);
426             props.remove("max_xmit_buf_size");
427         }
428
429         if(props.size() > 0) {
430             System.err.println("NAKACK.setProperties(): these properties are not recognized:");
431             props.list(System.out);
432             return false;
433         }
434         return true;
435     }
436
437
438
439
440     /* --------------------------------- Private Methods --------------------------------------- */
441
442     long getNextSeqno() {
443         return seqno++; // no need for synchronization; access to seqno is serialized anyway
444
}
445
446
447     /**
448      * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't
449      * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a
450      * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details.
451      */

452     private final void send(Event evt, Message msg) {
453         long msg_id=getNextSeqno();
454         if(log.isTraceEnabled())
455             log.trace("sending msg #" + msg_id);
456
457         msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG, msg_id));
458         synchronized(sent_msgs) {
459             if(Global.copy) {
460                 sent_msgs.put(new Long JavaDoc(msg_id), msg.copy());
461             }
462             else {
463                 sent_msgs.put(new Long JavaDoc(msg_id), msg);
464             }
465         }
466         passDown(evt);
467     }
468
469
470     /**
471      * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many
472      * messages as possible from the NRW and passes them up the stack. Discards messages from non-members.
473      */

474     void handleMessage(Message msg, NakAckHeader hdr) {
475         NakReceiverWindow win;
476         Message msg_to_deliver;
477         Address sender=msg.getSrc();
478
479         if(sender == null) {
480             if(log.isErrorEnabled())
481                 log.error("sender of message is null");
482             return;
483         }
484
485         if(log.isTraceEnabled()) {
486             StringBuffer JavaDoc sb=new StringBuffer JavaDoc('[');
487             sb.append(local_addr).append("] received ").append(sender).append('#').append(hdr.seqno);
488             log.trace(sb.toString());
489         }
490
491         // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added !
492

493         // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs !
494
// msg.putHeader(getName(), hdr);
495

496         synchronized(received_msgs) {
497             win=(NakReceiverWindow)received_msgs.get(sender);
498         }
499         if(win == null) { // discard message if there is no entry for sender
500
if(leaving)
501                 return;
502             if(log.isWarnEnabled()) {
503                 StringBuffer JavaDoc sb=new StringBuffer JavaDoc('[');
504                 sb.append(local_addr).append("] discarded message from non-member ").append(sender);
505                 if(log.isWarnEnabled())
506                     log.warn(sb.toString());
507             }
508             return;
509         }
510         win.add(hdr.seqno, msg); // add in order, then remove and pass up as many msgs as possible
511

512         while((msg_to_deliver=win.remove()) != null) {
513
514             // Changed by bela Jan 29 2003: not needed (see above)
515
//msg_to_deliver.removeHeader(getName());
516
passUp(new Event(Event.MSG, msg_to_deliver));
517         }
518     }
519
520
521     /**
522      * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large
523      * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer
524      * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the
525      * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each
526      * requested message was retransmitted separately.
527      *
528      * @param dest The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
529      * @param first_seqno The first sequence number to be retransmitted (<= last_seqno)
530      * @param last_seqno The last sequence number to be retransmitted (>= first_seqno)
531      */

532     void handleXmitReq(Address dest, long first_seqno, long last_seqno) {
533         Message m, tmp;
534         LinkedList list;
535         long size=0, marker=first_seqno, len;
536
537         if(log.isTraceEnabled())
538             log.trace(local_addr + ": received xmit request for " + dest + " [" + first_seqno + " - " + last_seqno + ']');
539
540         if(first_seqno > last_seqno) {
541             if(log.isErrorEnabled())
542                 log.error("first_seqno (" + first_seqno + ") > last_seqno (" + last_seqno + "): not able to retransmit");
543             return;
544         }
545         list=new LinkedList();
546         for(long i=first_seqno; i <= last_seqno; i++) {
547             m=(Message)sent_msgs.get(new Long JavaDoc(i)); // no need to synchronize
548
if(m == null) {
549                 if(log.isErrorEnabled()) {
550                     log.error("(requester=" + dest + ", local_addr=" + this.local_addr + ") message with " +
551                             "seqno=" + i + " not found in sent_msgs ! sent_msgs=" + printSentMsgs());
552                 }
553                 continue;
554             }
555             len=m.size();
556             size+=len;
557             if(size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709)
558
// yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size.
559

560                 // size has reached max_xmit_size. go ahead and send message (excluding the current message)
561
if(log.isTraceEnabled())
562                     log.trace("xmitting msgs [" + marker + '-' + (i - 1) + "] to " + dest);
563                 sendXmitRsp(dest, (LinkedList)list.clone(), marker, i - 1);
564                 marker=i;
565                 list.clear();
566                 // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under
567
// bug report #854887
568
size=len;
569             }
570             if(Global.copy) {
571                 tmp=m.copy();
572             }
573             else {
574                 tmp=m;
575             }
576             tmp.setDest(dest);
577             tmp.setSrc(local_addr);
578             list.add(tmp);
579         }
580
581         if(list.size() > 0) {
582             if(log.isTraceEnabled())
583                 log.trace("xmitting msgs [" + marker + '-' + last_seqno + "] to " + dest);
584             sendXmitRsp(dest, (LinkedList)list.clone(), marker, last_seqno);
585             list.clear();
586         }
587     }
588
589
590     void sendXmitRsp(Address dest, LinkedList xmit_list, long first_seqno, long last_seqno) {
591         Buffer buf;
592         if(xmit_list == null || xmit_list.size() == 0) {
593             if(log.isErrorEnabled())
594                 log.error("xmit_list is empty");
595             return;
596         }
597         if(use_mcast_xmit)
598             dest=null;
599
600         try {
601             buf=Util.msgListToByteBuffer(xmit_list);
602             Message msg=new Message(dest, null, buf.getBuf(), buf.getOffset(), buf.getLength());
603             msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP, first_seqno, last_seqno));
604             passDown(new Event(Event.MSG, msg));
605         }
606         catch(IOException ex) {
607             log.error("failed marshalling xmit list", ex);
608         }
609     }
610
611
612
613
614     void handleXmitRsp(Message msg) {
615         LinkedList list;
616         Message m;
617
618         if(msg == null) {
619             if(log.isWarnEnabled())
620                 log.warn("message is null");
621             return;
622         }
623         try {
624             list=Util.byteBufferToMessageList(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
625             if(list != null) {
626                 for(Iterator it=list.iterator(); it.hasNext();) {
627                     m=(Message)it.next();
628                     up(new Event(Event.MSG, m));
629                 }
630                 list.clear();
631             }
632         }
633         catch(Exception JavaDoc ex) {
634             if(log.isErrorEnabled()) {
635                 log.error("message did not contain a list (LinkedList) of retransmitted messages: " + ex);
636             }
637         }
638     }
639
640
641
642
643     /**
644      * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all
645      * entries from received_msgs that are not in <code>members</code>
646      */

647     void adjustReceivers() {
648         Address sender;
649         NakReceiverWindow win;
650
651         synchronized(received_msgs) {
652
653             // 1. Remove all senders in received_msgs that are not members anymore
654
for(Iterator it=received_msgs.keySet().iterator(); it.hasNext();) {
655                 sender=(Address)it.next();
656                 if(!members.contains(sender)) {
657                     win=(NakReceiverWindow)received_msgs.get(sender);
658                     win.reset();
659                     if(log.isDebugEnabled()) {
660                         log.debug("removing " + sender + " from received_msgs (not member anymore)");
661                     }
662                     it.remove();
663                 }
664             }
665
666             // 2. Add newly joined members to received_msgs (starting seqno=0)
667
for(int i=0; i < members.size(); i++) {
668                 sender=(Address)members.elementAt(i);
669                 if(!received_msgs.containsKey(sender)) {
670                     win=new NakReceiverWindow(sender, this, 0, timer);
671                     win.setRetransmitTimeouts(retransmit_timeout);
672                     win.setDiscardDeliveredMessages(discard_delivered_msgs);
673                     win.setMaxXmitBufSize(this.max_xmit_buf_size);
674                     received_msgs.put(sender, win);
675                 }
676             }
677         }
678     }
679
680
681     /**
682      * Returns a message digest: for each member P the highest seqno received from P is added to the digest.
683      */

684     Digest getDigest() {
685         Digest digest;
686         Address sender;
687         Range range;
688
689         digest=new Digest(members.size());
690         for(int i=0; i < members.size(); i++) {
691             sender=(Address)members.elementAt(i);
692             range=getLowestAndHighestSeqno(sender, false); // get the highest received seqno
693
if(range == null) {
694                 if(log.isErrorEnabled()) {
695                     log.error("range is null");
696                 }
697                 continue;
698             }
699             digest.add(sender, range.low, range.high); // add another entry to the digest
700
}
701         return digest;
702     }
703
704
705     /**
706      * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to
707      * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the
708      * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine
709      * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN).
710      */

711     Digest getDigestHighestDeliveredMsgs() {
712         Digest digest;
713         Address sender;
714         Range range;
715         long high_seqno_seen=0;
716
717         digest=new Digest(members.size());
718         for(int i=0; i < members.size(); i++) {
719             sender=(Address)members.elementAt(i);
720             range=getLowestAndHighestSeqno(sender, true); // get the highest deliverable seqno
721
if(range == null) {
722                 if(log.isErrorEnabled()) {
723                     log.error("range is null");
724                 }
725                 continue;
726             }
727             high_seqno_seen=getHighSeqnoSeen(sender);
728             digest.add(sender, range.low, range.high, high_seqno_seen); // add another entry to the digest
729
}
730         return digest;
731     }
732
733
734     /**
735      * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists,
736      * reset it.
737      */

738     void setDigest(Digest d) {
739         Address sender;
740         NakReceiverWindow win;
741         long initial_seqno;
742
743         clear();
744         if(d == null || d.senders == null) {
745             if(log.isErrorEnabled()) {
746                 log.error("digest or digest.senders is null");
747             }
748             return;
749         }
750         for(int i=0; i < d.size(); i++) {
751             sender=d.senderAt(i);
752             if(sender == null) {
753                 if(log.isErrorEnabled()) {
754                     log.error("sender at index " + i + " in digest is null");
755                 }
756                 continue;
757             }
758             initial_seqno=d.highSeqnoAt(i);
759             win=new NakReceiverWindow(sender, this, initial_seqno, timer);
760             win.setRetransmitTimeouts(retransmit_timeout);
761             win.setDiscardDeliveredMessages(discard_delivered_msgs);
762             win.setMaxXmitBufSize(this.max_xmit_buf_size);
763             synchronized(received_msgs) {
764                 received_msgs.put(sender, win);
765             }
766         }
767     }
768
769
770     /**
771      * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member
772      * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry
773      * exists, create one with the initial seqno set to the seqno of the member in the digest.
774      */

775     void mergeDigest(Digest d) {
776         Address sender;
777         NakReceiverWindow win;
778         long initial_seqno;
779
780         if(d == null || d.senders == null) {
781             if(log.isErrorEnabled()) {
782                 log.error("digest or digest.senders is null");
783             }
784             return;
785         }
786         for(int i=0; i < d.size(); i++) {
787             sender=d.senderAt(i);
788             if(sender == null) {
789                 if(log.isErrorEnabled()) {
790                     log.error("sender at index " + i + " in digest is null");
791                 }
792                 continue;
793             }
794             initial_seqno=d.highSeqnoAt(i);
795             synchronized(received_msgs) {
796                 win=(NakReceiverWindow)received_msgs.get(sender);
797                 if(win == null) {
798                     win=new NakReceiverWindow(sender, this, initial_seqno, timer);
799                     win.setRetransmitTimeouts(retransmit_timeout);
800                     win.setDiscardDeliveredMessages(discard_delivered_msgs);
801                     win.setMaxXmitBufSize(this.max_xmit_buf_size);
802                     received_msgs.put(sender, win);
803                 }
804                 else {
805                     if(win.getHighestReceived() < initial_seqno) {
806                         win.reset();
807                         received_msgs.remove(sender);
808                         win=new NakReceiverWindow(sender, this, initial_seqno, timer);
809                         win.setRetransmitTimeouts(retransmit_timeout);
810                         win.setDiscardDeliveredMessages(discard_delivered_msgs);
811                         win.setMaxXmitBufSize(this.max_xmit_buf_size);
812                         received_msgs.put(sender, win);
813                     }
814                 }
815             }
816         }
817     }
818
819
820     /**
821      * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far.
822      *
823      * @param sender The address for which the highest and lowest seqnos are to be retrieved
824      * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno
825      * *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8,
826      * whereas the higheset_seqno_seen (deliverable) is 5.
827      */

828     Range getLowestAndHighestSeqno(Address sender, boolean stop_at_gaps) {
829         Range r=null;
830         NakReceiverWindow win;
831
832         if(sender == null) {
833             if(log.isErrorEnabled()) {
834                 log.error("sender is null");
835             }
836             return r;
837         }
838         synchronized(received_msgs) {
839             win=(NakReceiverWindow)received_msgs.get(sender);
840         }
841         if(win == null) {
842             if(log.isErrorEnabled()) {
843                 log.error("sender " + sender + " not found in received_msgs");
844             }
845             return r;
846         }
847         if(stop_at_gaps) {
848             r=new Range(win.getLowestSeen(), win.getHighestSeen()); // deliverable messages (no gaps)
849
}
850         else {
851             r=new Range(win.getLowestSeen(), win.getHighestReceived() + 1); // received messages
852
}
853         return r;
854     }
855
856
857     /**
858      * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned
859      * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather
860      * then <em>received</em>
861      */

862     long getHighSeqnoSeen(Address sender) {
863         NakReceiverWindow win;
864         long ret=0;
865
866         if(sender == null) {
867             if(log.isErrorEnabled()) {
868                 log.error("sender is null");
869             }
870             return ret;
871         }
872         if(sender.equals(local_addr)) {
873             return seqno - 1;
874         }
875
876         synchronized(received_msgs) {
877             win=(NakReceiverWindow)received_msgs.get(sender);
878         }
879         if(win == null) {
880             if(log.isErrorEnabled()) {
881                 log.error("sender " + sender + " not found in received_msgs");
882             }
883             return ret;
884         }
885         ret=win.getHighestReceived();
886         return ret;
887     }
888
889
890     /**
891      * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest
892      * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs:
893      * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the
894      * NakReceiverWindow corresponding to P which are <= seqno at digest[P].
895      */

896     void stable(Digest d) {
897         long tmp_seqno;
898         NakReceiverWindow recv_win;
899         Address sender;
900         long my_highest_rcvd; // highest seqno received in my digest for a sender P
901
long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P
902

903         if(members == null || local_addr == null || d == null) {
904             if(log.isWarnEnabled())
905                 log.warn("members, local_addr or digest are null !");
906             return;
907         }
908
909         if(log.isDebugEnabled()) {
910             log.debug("received digest " + d);
911         }
912
913         for(int i=0; i < d.size(); i++) {
914             sender=d.senderAt(i);
915             tmp_seqno=d.highSeqnoAt(i);
916             if(sender == null)
917                 continue;
918
919             // check whether the last seqno received for a sender P in the stability vector is > last seqno
920
// received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic
921
// in DESIGN)
922
synchronized(received_msgs) {
923                 recv_win=(NakReceiverWindow)received_msgs.get(sender);
924             }
925             if(recv_win != null) {
926                 my_highest_rcvd=recv_win.getHighestReceived();
927                 stability_highest_rcvd=d.highSeqnoSeenAt(i);
928
929                 if(stability_highest_rcvd >= 0 && stability_highest_rcvd > my_highest_rcvd) {
930                     if(log.isTraceEnabled()) {
931                         log.trace("my_highest_rcvd (" + my_highest_rcvd + ") < stability_highest_rcvd (" +
932                                 stability_highest_rcvd + "): requesting retransmission of " +
933                                 sender + '#' + stability_highest_rcvd);
934                     }
935                     retransmit(stability_highest_rcvd, stability_highest_rcvd, sender);
936                 }
937             }
938
939             tmp_seqno-=gc_lag;
940             if(tmp_seqno < 0) {
941                 continue;
942             }
943
944             if(log.isTraceEnabled())
945                 log.trace("deleting msgs <= " + tmp_seqno + " from " + sender);
946
947             // garbage collect from sent_msgs if sender was myself
948
if(sender.equals(local_addr)) {
949                 synchronized(sent_msgs) {
950                     // gets us a subset from [lowest seqno - seqno]
951
SortedMap stable_keys=sent_msgs.headMap(new Long JavaDoc(tmp_seqno));
952                     if(stable_keys != null) {
953                         stable_keys.clear(); // this will modify sent_msgs directly
954
}
955                 }
956             }
957
958             // delete *delivered* msgs that are stable
959
// recv_win=(NakReceiverWindow)received_msgs.get(sender);
960
if(recv_win != null)
961                 recv_win.stable(tmp_seqno); // delete all messages with seqnos <= seqno
962
}
963     }
964
965
966
967     /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */
968
969
970     /**
971      * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected. Sends
972      * XMIT_REQ to originator of msg
973      */

974     public void retransmit(long first_seqno, long last_seqno, Address sender) {
975         NakAckHeader hdr;
976         Message retransmit_msg=new Message(sender, null, null);
977
978         if(log.isTraceEnabled())
979             log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + sender);
980         //
981
// if(log.isDebugEnabled()) log.debug("TRACE.special()", "XMIT: " + first_seqno + " - " + last_seqno + ", sender=" + sender);
982

983         //for(long i=first_seqno; i <= last_seqno; i++) {
984
// addXmitRequest(sender, i);
985
//}
986

987         hdr=new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno, last_seqno);
988         retransmit_msg.putHeader(name, hdr);
989         passDown(new Event(Event.MSG, retransmit_msg));
990     }
991
992
993     /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */
994
995
996
997     void clear() {
998         NakReceiverWindow win;
999
1000        // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between
1001
// a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might
1002
// fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection)
1003

1004        // sent_msgs.clear();
1005

1006        synchronized(received_msgs) {
1007            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
1008                win=(NakReceiverWindow)it.next();
1009                win.reset();
1010            }
1011            received_msgs.clear();
1012        }
1013    }
1014
1015
1016    void removeAll() {
1017        NakReceiverWindow win;
1018
1019// if(log.isTraceEnabled()) {
1020
// if(sent_msgs.size() > 0 && received_msgs.size() > 0) {
1021
// String contents=dumpContents();
1022
// log.trace("contents for " + local_addr + ":\n" + contents);
1023
// }
1024
// }
1025

1026        synchronized(sent_msgs) {
1027            sent_msgs.clear();
1028        }
1029
1030        synchronized(received_msgs) {
1031            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
1032                win=(NakReceiverWindow)it.next();
1033                win.destroy();
1034            }
1035            received_msgs.clear();
1036        }
1037    }
1038
1039
1040/* private String dumpContents() {
1041        StringBuffer ret=new StringBuffer();
1042        Map.Entry entry;
1043        Address addr;
1044        Object w;
1045
1046
1047        ret.append("\nsent_msgs: " + printSentMsgs());
1048        ret.append("\nreceived_msgs:\n");
1049        synchronized(received_msgs) {
1050            for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
1051                entry=(Map.Entry)it.next();
1052                addr=(Address)entry.getKey();
1053                w=entry.getValue();
1054                ret.append(addr).append(": ").append(w.toString()).append('\n');
1055            }
1056        }
1057        return ret.toString();
1058    }*/

1059
1060
1061    String JavaDoc printSentMsgs() {
1062        StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1063        Long JavaDoc min_seqno, max_seqno;
1064        synchronized(sent_msgs) {
1065            min_seqno=sent_msgs.size() > 0 ? (Long JavaDoc)sent_msgs.firstKey() : new Long JavaDoc(0);
1066            max_seqno=sent_msgs.size() > 0 ? (Long JavaDoc)sent_msgs.lastKey() : new Long JavaDoc(0);
1067        }
1068        sb.append('[').append(min_seqno).append(" - ").append(max_seqno).append(']');
1069        return sb.toString();
1070    }
1071
1072
1073    void handleConfigEvent(HashMap map) {
1074        if(map == null) {
1075            return;
1076        }
1077        if(map.containsKey("frag_size")) {
1078            max_xmit_size=((Integer JavaDoc)map.get("frag_size")).intValue();
1079            if(log.isInfoEnabled()) {
1080                log.info("max_xmit_size=" + max_xmit_size);
1081            }
1082        }
1083    }
1084
1085    /* ----------------------------- End of Private Methods ------------------------------------ */
1086
1087
1088}
1089
Popular Tags