KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > NAKACK


1 // $Id: NAKACK.java,v 1.11 2005/03/11 06:59:49 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.*;
8 import org.jgroups.util.List;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11
12 import java.util.Enumeration JavaDoc;
13 import java.util.Hashtable JavaDoc;
14 import java.util.Properties JavaDoc;
15 import java.util.Vector JavaDoc;
16
17
18 /**
19  * Negative AcKnowledgement layer (NAKs), paired with positive ACKs. The default is to send a message
20  * using NAKs: the sender sends messages with monotonically increasing seqnos, receiver requests
21  * retransmissions of missing messages (gaps). When a SWITCH_NAK_ACK event is received, the mode
22  * is switched to using NAK_ACKS: the sender still uses monotonically increasing seqnos, but the receiver
23  * acknowledges every message. NAK and NAK_ACK seqnos are the same, when switching the mode, the current
24  * seqno is reused. Both NAK and NAK_ACK messages use the current view ID in which the message is sent to
25  * queue messages destined for an upcoming view, or discard messages sent in a previous view. Both modes
26  * reset their seqnos to 0 when receiving a view change. The NAK_ACK scheme is used for broadcasting
27  * view changes.
28  * <p/>
29  * The third mode is for out-of-band control messages (activated by SWITCH_OUT_OF_BAND): this mode does
30  * neither employ view IDs, nor does it use the same seqnos as NAK and NAK_ACK. It uses its own seqnos,
31  * unrelated to the ones used by NAK and NAK_ACK, and never resets them. In combination with the sender's
32  * address, this makes every out-of-band message unique. Out-of-band messages are used for example for
33  * broadcasting FLUSH messages.<p>
34  * Once a mode is set, it remains in effect until exactly 1 message has been sent, afterwards the default
35  * mode NAK is used again.
36  * <p/>
37  * The following communication between 2 peers exists (left side is initiator,
38  * right side receiver): <pre>
39  * <p/>
40  * <p/>
41  * send_out_of_band
42  * --------------> synchronous (1)
43  * <-------------
44  * ack
45  * <p/>
46  * <p/>
47  * send_nak
48  * --------------> asynchronous (2)
49  * <p/>
50  * <p/>
51  * send_nak_ack
52  * --------------> synchronous (3)
53  * <--------------
54  * ack
55  * <p/>
56  * <p/>
57  * retransmit
58  * <-------------- asynchronous (4)
59  * <p/>
60  * <p/>
61  * </pre>
62  * <p/>
63  * When a message is sent, it will contain a header describing the type of the
64  * message, and containing additional data, such as sequence number etc. When a
65  * message is received, it is fed into either the OutOfBander or NAKer, depending on the
66  * header's type.<p>
67  * Note that in the synchronous modes, ACKs are sent for each request. If a reliable unicast protocol layer
68  * exists somewhere underneath this layer, then even the ACKs are transmitted reliably, thus increasing
69  * the number of messages exchanged. However, since it is envisaged that ACK/OUT_OF_BAND are not used
70  * frequently, this problem is currently not addressed.
71  *
72  * @author Bela Ban
73  */

74 public class NAKACK extends Protocol {
75     long[] retransmit_timeout={2000, 3000, 5000, 8000}; // time(s) to wait before requesting xmit
76
NAKer naker=null;
77     OutOfBander out_of_bander=null;
78     ViewId vid=null;
79     View view=null;
80     boolean is_server=false;
81     Address local_addr=null;
82     final List queued_msgs=new List(); // msgs for next view (vid > current vid)
83
Vector JavaDoc members=null; // for OutOfBander: this is the destination set to
84
// send messages to
85
boolean send_next_msg_out_of_band=false;
86     boolean send_next_msg_acking=false;
87     long rebroadcast_timeout=0; // until all outstanding ACKs recvd (rebcasting)
88
TimeScheduler timer=null;
89     static final String JavaDoc WRAPPED_MSG_KEY="NAKACK.WRAPPED_HDR";
90
91
92     /**
93      * Do some initial tasks
94      */

95     public void init() throws Exception JavaDoc {
96         timer=stack != null? stack.timer : null;
97         if(timer == null)
98             if(log.isErrorEnabled()) log.error("timer is null");
99         naker=new NAKer();
100         out_of_bander=new OutOfBander();
101     }
102
103     public void stop() {
104         out_of_bander.stop();
105         naker.stop();
106     }
107
108
109     public String JavaDoc getName() {
110         return "NAKACK";
111     }
112
113
114     public Vector JavaDoc providedUpServices() {
115         Vector JavaDoc retval=new Vector JavaDoc(3);
116         retval.addElement(new Integer JavaDoc(Event.GET_MSGS_RECEIVED));
117         retval.addElement(new Integer JavaDoc(Event.GET_MSG_DIGEST));
118         retval.addElement(new Integer JavaDoc(Event.GET_MSGS));
119         return retval;
120     }
121
122
123     public Vector JavaDoc providedDownServices() {
124         Vector JavaDoc retval=new Vector JavaDoc(1);
125         retval.addElement(new Integer JavaDoc(Event.GET_MSGS_RECEIVED));
126         return retval;
127     }
128
129
130     /**
131      * <b>Callback</b>. Called by superclass when event may be handled.<p>
132      * <b>Do not use <code>passUp()</code> in this method as the event is passed up
133      * by default by the superclass after this method returns !</b>
134      */

135     public void up(Event evt) {
136         NakAckHeader hdr;
137         Message msg, msg_copy;
138         int rc;
139
140         switch(evt.getType()) {
141
142             case Event.SUSPECT:
143
144                     if(log.isInfoEnabled()) log.info("received SUSPECT event (suspected member=" + evt.getArg() + ')');
145                 naker.suspect((Address)evt.getArg());
146                 out_of_bander.suspect((Address)evt.getArg());
147                 break;
148
149             case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
150
naker.stable((long[])evt.getArg());
151                 return; // don't pass up further (Bela Aug 7 2001)
152

153             case Event.SET_LOCAL_ADDRESS:
154                 local_addr=(Address)evt.getArg();
155                 break;
156
157             case Event.GET_MSGS_RECEIVED: // returns the highest seqnos delivered to the appl. (used by STABLE)
158
long[] highest=naker.getHighestSeqnosDelivered();
159                 passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest));
160                 return; // don't pass up further (bela Aug 7 2001)
161

162             case Event.MSG:
163                 synchronized(this) {
164                     msg=(Message)evt.getArg();
165
166                     // check to see if this is a wrapped msg. If yes, send an ACK
167
hdr=(NakAckHeader)msg.removeHeader(WRAPPED_MSG_KEY); // see whether it is a wrapped message
168
if(hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { // send back an ACK to hdr.sender
169
Message ack_msg=new Message(hdr.sender, null, null);
170                         NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);
171                         if(hdr.sender == null)
172                             if(log.isWarnEnabled()) log.warn("WRAPPED: header's 'sender' field is null; " +
173                                     "cannot send ACK !");
174                         ack_msg.putHeader(getName(), h);
175                         passDown(new Event(Event.MSG, ack_msg));
176                     }
177
178                     hdr=(NakAckHeader)msg.removeHeader(getName());
179                     if(hdr == null)
180                         break; // pass up
181

182                     switch(hdr.type) {
183
184                         case NakAckHeader.NAK_ACK_MSG:
185                         case NakAckHeader.NAK_MSG:
186                             if(hdr.type == NakAckHeader.NAK_ACK_MSG) { // first thing: send ACK back to sender
187
Message ack_msg=new Message(msg.getSrc(), null, null);
188                                 NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);
189                                 ack_msg.putHeader(getName(), h);
190                                 passDown(new Event(Event.MSG, ack_msg));
191                             }
192
193                             // while still a client, we just pass up all messages, without checking for message
194
// view IDs or seqnos: other layers further up will discard messages not destined
195
// for us (e.g. based on view IDs).
196
// Also: store msg in queue, when view change is received, replay messages with the same
197
// vid as the new view
198
if(!is_server) {
199                                 msg_copy=msg.copy(); // msg without header
200
msg_copy.putHeader(getName(), hdr); // put header back on as we removed it above
201
queued_msgs.add(msg_copy); // need a copy since passUp() will modify msg
202
passUp(new Event(Event.MSG, msg));
203                                 return;
204                             }
205
206
207                             // check for VIDs: is the message's VID the same as ours ?
208
if(vid != null && hdr.vid != null) { // only check if our vid and message's vid available
209
Address my_addr=vid.getCoordAddress(), other_addr=hdr.vid.getCoordAddress();
210
211                                 if(my_addr == null || other_addr == null) {
212                                     if(log.isWarnEnabled()) log.warn("my vid or message's vid does not contain " +
213                                             "a coordinator; discarding message !");
214                                     return;
215                                 }
216                                 if(!my_addr.equals(other_addr)) {
217                                     if(log.isWarnEnabled()) log.warn("creator of own vid (" + my_addr + ")is different from " +
218                                             "creator of message's vid (" + other_addr + "); discarding message !");
219                                     return;
220                                 }
221
222                                 rc=hdr.vid.compareTo(vid);
223                                 if(rc > 0) { // message is sent in next view -> store !
224

225                                         if(log.isInfoEnabled()) log.info("message's vid (" + hdr.vid + '#' + hdr.seqno +
226                                                 ") is bigger than current vid: (" + vid + ") message is queued !");
227                                     msg.putHeader(getName(), hdr); // put header back on as we removed it above
228
queued_msgs.add(msg);
229                                     return;
230                                 }
231                                 if(rc < 0) { // message sent in prev. view -> discard !
232

233                                         if(log.isWarnEnabled()) log.warn("message's vid (" + hdr.vid + ") is smaller than " +
234                                                 "current vid (" + vid + "): message <" + msg.getSrc() + ":#" +
235                                                 hdr.seqno + "> is discarded ! Hdr is " + hdr);
236                                     return;
237                                 }
238                                 // If we made it down here, the vids are the same --> OK
239
}
240
241
242                             msg.putHeader(getName(), hdr); // stored in received_msgs, re-sent later that's why hdr is added !
243
naker.receive(hdr.seqno, msg, null);
244                             return; // naker passes message up for us !
245

246
247                         case NakAckHeader.RETRANSMIT_MSG:
248                             naker.retransmit(msg.getSrc(), hdr.seqno, hdr.last_seqno);
249                             return;
250
251                         case NakAckHeader.NAK_ACK_RSP:
252                             naker.receiveAck(hdr.seqno, msg.getSrc());
253                             return; // discard, no need to pass up
254

255                         case NakAckHeader.OUT_OF_BAND_MSG:
256                             out_of_bander.receive(hdr.seqno, msg, hdr.stable_msgs);
257                             return; // naker passes message up for us !
258

259                         case NakAckHeader.OUT_OF_BAND_RSP:
260                             out_of_bander.receiveAck(hdr.seqno, msg.getSrc());
261                             return;
262
263                         default:
264                             if(log.isErrorEnabled()) log.error("NakAck header type " + hdr.type + " not known !");
265                             break;
266                     }
267                 } //end synchronized
268

269         }
270
271         passUp(evt);
272     }
273
274
275     /**
276      * <b>Callback</b>. Called by superclass when event may be handled.<p>
277      * <b>Do not use <code>passDown</code> in this method as the event is passed down
278      * by default by the superclass after this method returns !</b>
279      */

280     public void down(Event evt) {
281         Message msg;
282
283         if(log.isTraceEnabled())
284             log.trace("queued_msgs has " + queued_msgs.size() + " messages " +
285                     "\n\nnaker:\n" + naker.dumpContents() + "\n\nout_of_bander: " +
286                     out_of_bander.dumpContents() + "\n-----------------------------\n");
287
288         switch(evt.getType()) {
289
290             case Event.MSG:
291                 msg=(Message)evt.getArg();
292
293             // unicast address: not null and not mcast, pass down unchanged
294
if(vid == null || (msg.getDest() != null && !msg.getDest().isMulticastAddress()))
295                 break;
296
297                 if(send_next_msg_out_of_band) {
298                     out_of_bander.send(msg);
299                     send_next_msg_out_of_band=false;
300                 }
301                 else if(send_next_msg_acking) {
302                     naker.setAcks(true); // require acks when sending a msg
303
naker.send(msg);
304                     naker.setAcks(false); // don't require acks when sending a msg
305
send_next_msg_acking=false;
306                 }
307                 else
308                     naker.send(msg);
309
310                 return; // don't pass down the stack, naker does this for us !
311

312             case Event.GET_MSG_DIGEST:
313                 long[] highest_seqnos=(long[])evt.getArg();
314                 Digest digest=naker.computeMessageDigest(highest_seqnos);
315                 passUp(new Event(Event.GET_MSG_DIGEST_OK, digest));
316                 return;
317
318             case Event.GET_MSGS:
319                 List lower_seqnos=naker.getMessagesInRange((long[][])evt.getArg());
320                 passUp(new Event(Event.GET_MSGS_OK, lower_seqnos));
321                 return;
322
323             case Event.REBROADCAST_MSGS:
324                 rebroadcastMsgs((Vector JavaDoc)evt.getArg());
325                 break;
326
327             case Event.TMP_VIEW:
328                 Vector JavaDoc mbrs=((View)evt.getArg()).getMembers();
329                 members=mbrs != null? (Vector JavaDoc)mbrs.clone() : new Vector JavaDoc(11);
330                 break;
331
332             case Event.VIEW_CHANGE:
333                 synchronized(this) {
334                     view=((View)((View)evt.getArg()).clone());
335                     vid=view.getVid();
336
337                     members=(Vector JavaDoc)view.getMembers().clone();
338
339                     naker.reset();
340                     out_of_bander.reset();
341
342                     is_server=true; // check vids from now on
343

344                     // deliver messages received previously for this view
345
if(queued_msgs.size() > 0)
346                         deliverQueuedMessages();
347                 }
348                 break;
349
350             case Event.BECOME_SERVER:
351                 is_server=true;
352                 break;
353
354             case Event.SWITCH_NAK:
355                 naker.setAcks(false); // don't require acks when sending a msg
356
return; // don't pass down any further
357

358             case Event.SWITCH_NAK_ACK:
359                 send_next_msg_acking=true;
360                 return; // don't pass down any further
361

362             case Event.SWITCH_OUT_OF_BAND:
363                 send_next_msg_out_of_band=true;
364                 return;
365
366             case Event.GET_MSGS_RECEIVED: // return the highest seqnos delivered (=consumed by the application)
367
long[] h=naker.getHighestSeqnosDelivered();
368                 passUp(new Event(Event.GET_MSGS_RECEIVED_OK, h));
369                 break;
370         }
371
372
373         passDown(evt);
374     }
375
376
377     boolean coordinator() {
378         if(members == null || members.size() < 1 || local_addr == null)
379             return false;
380         return local_addr.equals(members.elementAt(0));
381     }
382
383     /**
384      * Rebroadcasts the messages given as arguments
385      */

386     void rebroadcastMsgs(Vector JavaDoc v) {
387         Vector JavaDoc final_v;
388         Message m1, m2;
389         NakAckHeader h1, h2;
390
391         if(v == null) return;
392         final_v=new Vector JavaDoc(v.size());
393
394         // weed out duplicates
395
/** todo Check!!!!! */
396         for(int i=0; i < v.size(); i++) {
397             boolean present=false;
398             m1=(Message)v.elementAt(i);
399             h1=m1 != null? (NakAckHeader)m1.getHeader(getName()) : null;
400             if(m1 == null || h1 == null) { // +++ remove
401
if(log.isErrorEnabled()) log.error("message is null");
402                 continue;
403             }
404
405             for(int j=0; j < final_v.size(); j++) {
406                 m2=(Message)final_v.elementAt(j);
407                 h2=m2 != null? (NakAckHeader)m2.getHeader(getName()) : null;
408                 if(m2 == null || h2 == null) { // +++ remove
409
if(log.isErrorEnabled()) log.error("message m2 is null");
410                     continue;
411                 }
412                 if(h1.seqno == h2.seqno && m1.getSrc() != null && m2.getSrc() != null &&
413                         m1.getSrc().equals(m2.getSrc())) {
414                     present=true;
415                 }
416             }
417             if(!present)
418                 final_v.addElement(m1);
419         }
420
421         if(log.isWarnEnabled()) log.warn("rebroadcasting " + final_v.size() + " messages");
422
423         /* Now re-broadcast messages using original NakAckHeader (same seqnos, same sender !) */
424         for(int i=0; i < final_v.size(); i++) {
425             m1=(Message)final_v.elementAt(i);
426             naker.resend(m1);
427         }
428
429         // Wait until all members have acked reception of outstanding msgs. This will empty our
430
// retransmission table (AckMcastSenderWindow)
431
naker.waitUntilAllAcksReceived(rebroadcast_timeout);
432         passUp(new Event(Event.REBROADCAST_MSGS_OK));
433     }
434
435
436     /**
437      * Deliver all messages in the queue where <code>msg.vid == vid</code> holds. Messages were stored
438      * in the queue because their vid was greater than the current view.
439      */

440     void deliverQueuedMessages() {
441         NakAckHeader hdr;
442         Message tmpmsg;
443         int rc;
444
445         while(queued_msgs.size() > 0) {
446             tmpmsg=(Message)queued_msgs.removeFromHead();
447             hdr=(NakAckHeader)tmpmsg.getHeader(getName());
448             rc=hdr.vid.compareTo(vid);
449             if(rc == 0) { // same vid -> OK
450
up(new Event(Event.MSG, tmpmsg));
451             }
452             else if(rc > 0) {
453                 ;
454             }
455             else
456             /** todo Maybe messages from previous vids are stored while client */
457                 ; // can't be the case; only messages for future views are stored !
458
}
459     }
460
461
462     public boolean setProperties(Properties JavaDoc props) {
463         String JavaDoc str;
464         long[] tmp;
465
466         super.setProperties(props);
467         str=props.getProperty("retransmit_timeout");
468         if(str != null) {
469             tmp=Util.parseCommaDelimitedLongs(str);
470             props.remove("retransmit_timeout");
471             if(tmp != null && tmp.length > 0)
472                 retransmit_timeout=tmp;
473         }
474
475         str=props.getProperty("rebroadcast_timeout");
476         if(str != null) {
477             rebroadcast_timeout=Long.parseLong(str);
478             props.remove("rebroadcast_timeout");
479         }
480
481         if(props.size() > 0) {
482             System.err.println("NAKACK.setProperties(): these properties are not recognized:");
483             props.list(System.out);
484             return false;
485         }
486         return true;
487     }
488
489
490     class NAKer implements Retransmitter.RetransmitCommand, AckMcastSenderWindow.RetransmitCommand {
491         long seqno=0; // current message sequence number
492
final Hashtable JavaDoc received_msgs=new Hashtable JavaDoc(); // ordered by sender -> NakReceiverWindow
493
final Hashtable JavaDoc sent_msgs=new Hashtable JavaDoc(); // ordered by seqno (sent by me !) - Messages
494
final AckMcastSenderWindow sender_win=new AckMcastSenderWindow(this, timer);
495         boolean acking=false; // require acks when sending msgs
496
long deleted_up_to=0;
497
498
499         // Used to periodically retransmit the last message
500
final LastMessageRetransmitter last_msg_xmitter=new LastMessageRetransmitter();
501
502
503         private class LastMessageRetransmitter implements TimeScheduler.Task {
504             boolean stopped=false;
505             int num_times=2; // number of times a message is retransmitted
506
long last_xmitted_seqno=0;
507
508
509             public void stop() {
510                 stopped=true;
511             }
512
513             public boolean cancelled() {
514                 return stopped;
515             }
516
517             public long nextInterval() {
518                 return retransmit_timeout[0];
519             }
520
521
522             /**
523              * Periodically retransmits the last seqno to all members. If the seqno doesn't change (ie. there
524              * were no new messages sent) then the retransmitter task doesn't retransmit after 'num_times' times.
525              */

526             public void run() {
527                 synchronized(sent_msgs) {
528                     long prevSeqno=seqno - 1;
529
530                     if(prevSeqno == last_xmitted_seqno) {
531
532                             if(log.isInfoEnabled()) log.info("prevSeqno=" + prevSeqno + ", last_xmitted_seqno=" +
533                                     last_xmitted_seqno + ", num_times=" + num_times);
534                         if(--num_times <= 0)
535                             return;
536                     }
537                     else {
538                         num_times=3;
539                         last_xmitted_seqno=prevSeqno;
540                     }
541
542                     if((prevSeqno >= 0) && (prevSeqno > deleted_up_to)) {
543
544                             if(log.isInfoEnabled()) log.info("retransmitting last message " + prevSeqno);
545                         retransmit(null, prevSeqno, prevSeqno);
546                     }
547                 }
548             }
549
550         }
551
552
553         NAKer() {
554             if(timer != null)
555                 timer.add(last_msg_xmitter, true); // fixed-rate scheduling
556
else
557                 if(log.isErrorEnabled()) log.error("timer is null");
558         }
559
560
561         long getNextSeqno() {
562             return seqno++;
563         }
564
565
566         long getHighestSeqnoSent() {
567             long highest_sent=-1;
568             for(Enumeration JavaDoc e=sent_msgs.keys(); e.hasMoreElements();)
569                 highest_sent=Math.max(highest_sent, ((Long JavaDoc)e.nextElement()).longValue());
570             return highest_sent;
571         }
572
573
574         /**
575          * Returns an array of the highest sequence numbers consumed by the application so far,
576          * its order corresponding with <code>mbrs</code>. Used by coordinator as argument when
577          * sending initial FLUSH request to members
578          */

579         long[] getHighestSeqnosDelivered() {
580             long[] highest_deliv=members != null? new long[members.size()] : null;
581             Address mbr;
582             NakReceiverWindow win;
583
584             if(highest_deliv == null) return null;
585
586             for(int i=0; i < highest_deliv.length; i++) highest_deliv[i]=-1;
587
588             synchronized(members) {
589                 for(int i=0; i < members.size(); i++) {
590                     mbr=(Address)members.elementAt(i);
591                     win=(NakReceiverWindow)received_msgs.get(mbr);
592                     if(win != null)
593                         highest_deliv[i]=win.getHighestDelivered();
594                 }
595             }
596             return highest_deliv;
597         }
598
599
600         /**
601          * Return all messages sent by us that are higher than <code>seqno</code>
602          */

603         List getSentMessagesHigherThan(long seqno) {
604             List retval=new List();
605             Long JavaDoc key;
606
607             for(Enumeration JavaDoc e=sent_msgs.keys(); e.hasMoreElements();) {
608                 key=(Long JavaDoc)e.nextElement();
609                 if(key.longValue() > seqno)
610                     retval.add(sent_msgs.get(key));
611             }
612             return retval;
613         }
614
615
616         /**
617          * Returns a message digest: for each member P in <code>highest_seqnos</code>, the highest seqno
618          * received from P is added to the digest's array. If P == this, then the highest seqno
619          * <em>sent</em> is added: this makes sure that messages sent but not yet received are also
620          * re-broadcast (because they are also unstable).<p>If my highest seqno for a member P is
621          * higher than the one in <code>highest_seqnos</code>, then all messages from P (received or sent)
622          * whose seqno is higher are added to the digest's messages. The coordinator will use all digests
623          * to compute a set of messages than need to be re-broadcast to the members before installing
624          * a new view.
625          */

626         Digest computeMessageDigest(long[] highest_seqnos) {
627             Digest digest=highest_seqnos != null? new Digest(highest_seqnos.length) : null;
628             Address sender;
629             NakReceiverWindow win;
630             List unstable_msgs;
631             int own_index;
632             long highest_seqno_sent=-1, highest_seqno_received=-1;
633
634             if(digest == null) {
635
636                     if(log.isWarnEnabled()) log.warn("highest_seqnos is null, cannot compute digest !");
637                 return null;
638             }
639
640             if(highest_seqnos.length != members.size()) {
641
642                     if(log.isWarnEnabled()) log.warn("the mbrship size and the size " +
643                             "of the highest_seqnos array are not equal, cannot compute digest !");
644                 return null;
645             }
646
647             System.arraycopy(highest_seqnos, 0, digest.highest_seqnos, 0, digest.highest_seqnos.length);
648
649             for(int i=0; i < highest_seqnos.length; i++) {
650                 sender=(Address)members.elementAt(i);
651                 if(sender == null) continue;
652                 win=(NakReceiverWindow)received_msgs.get(sender);
653                 if(win == null) continue;
654                 digest.highest_seqnos[i]=win.getHighestReceived();
655                 unstable_msgs=win.getMessagesHigherThan(highest_seqnos[i]);
656                 for(Enumeration JavaDoc e=unstable_msgs.elements(); e.hasMoreElements();)
657                     digest.msgs.add(e.nextElement());
658             }
659
660
661             /** If our highest seqno <em>sent</em> is higher than the one <em>received</em>, we have to
662              (a) set it in the digest and (b) add the corresponding messages **/

663
664             own_index=members.indexOf(local_addr);
665             if(own_index == -1) {
666
667                     if(log.isWarnEnabled()) log.warn("no own address in highest_seqnos");
668                 return digest;
669             }
670             highest_seqno_received=digest.highest_seqnos[own_index];
671             highest_seqno_sent=getHighestSeqnoSent();
672
673             if(highest_seqno_sent > highest_seqno_received) {
674                 // (a) Set highest seqno sent in digest
675
digest.highest_seqnos[own_index]=highest_seqno_sent;
676
677                 // (b) Add messages between highest_seqno_received and highest_seqno_sent
678
unstable_msgs=getSentMessagesHigherThan(highest_seqno_received);
679                 for(Enumeration JavaDoc e=unstable_msgs.elements(); e.hasMoreElements();)
680                     digest.msgs.add(e.nextElement());
681             }
682
683             return digest;
684         }
685
686
687         /**
688          * For each non-null member m in <code>range</code>, get messages with sequence numbers between
689          * range[m][0] and range[m][1], excluding range[m][0] and including range[m][1].
690          */

691         List getMessagesInRange(long[][] range) {
692             List retval=new List();
693             List tmp;
694             NakReceiverWindow win;
695             Address sender;
696
697             for(int i=0; i < range.length; i++) {
698                 if(range[i] != null) {
699                     sender=(Address)members.elementAt(i);
700                     if(sender == null) continue;
701                     win=(NakReceiverWindow)received_msgs.get(sender);
702                     if(win == null) continue;
703                     tmp=win.getMessagesInRange(range[i][0], range[i][1]);
704                     if(tmp == null || tmp.size() < 1) continue;
705                     for(Enumeration JavaDoc e=tmp.elements(); e.hasMoreElements();)
706                         retval.add(e.nextElement());
707                 }
708             }
709             return retval;
710         }
711
712
713         void setAcks(boolean f) {
714             acking=f;
715         }
716
717
718         /**
719          * Vector with messages (ordered by sender) that are stable and can be discarded.
720          * This applies to NAK-based sender and receivers.
721          */

722         void stable(long[] seqnos) {
723             int index;
724             long seqno;
725             NakReceiverWindow recv_win;
726             Address sender;
727
728             if(members == null || local_addr == null) {
729                  if(log.isWarnEnabled()) log.warn("members or local_addr are null !");
730                 return;
731             }
732             index=members.indexOf(local_addr);
733
734             if(index < 0) {
735
736                     if(log.isWarnEnabled()) log.warn("member " + local_addr + " not found in " + members);
737                 return;
738             }
739             seqno=seqnos[index];
740
741                 if(log.isInfoEnabled()) log.info("deleting stable messages [" +
742                         deleted_up_to + " - " + seqno + ']');
743
744             // delete sent messages that are stable (kept for retransmission requests from receivers)
745
synchronized(sent_msgs) {
746                 for(long i=deleted_up_to; i <= seqno; i++) {
747                     sent_msgs.remove(new Long JavaDoc(i));
748                 }
749                 deleted_up_to=seqno;
750             }
751             // delete received msgs that are stable
752
for(int i=0; i < members.size(); i++) {
753                 sender=(Address)members.elementAt(i);
754                 recv_win=(NakReceiverWindow)received_msgs.get(sender);
755                 if(recv_win != null)
756                     recv_win.stable(seqnos[i]); // delete all messages with seqnos <= seqnos[i]
757
}
758         }
759
760
761         void send(Message msg) {
762             long id=getNextSeqno();
763             ViewId vid_copy;
764
765             if(vid == null)
766                 return;
767             vid_copy=(ViewId)vid.clone(); /** todo No needs to copy vid */
768
769    &n