KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > NAKACK


1 // $Id: NAKACK.java,v 1.11 2005/03/11 06:59:49 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.*;
8 import org.jgroups.util.List;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11
12 import java.util.Enumeration JavaDoc;
13 import java.util.Hashtable JavaDoc;
14 import java.util.Properties JavaDoc;
15 import java.util.Vector JavaDoc;
16
17
18 /**
19  * Negative AcKnowledgement layer (NAKs), paired with positive ACKs. The default is to send a message
20  * using NAKs: the sender sends messages with monotonically increasing seqnos, receiver requests
21  * retransmissions of missing messages (gaps). When a SWITCH_NAK_ACK event is received, the mode
22  * is switched to using NAK_ACKS: the sender still uses monotonically increasing seqnos, but the receiver
23  * acknowledges every message. NAK and NAK_ACK seqnos are the same, when switching the mode, the current
24  * seqno is reused. Both NAK and NAK_ACK messages use the current view ID in which the message is sent to
25  * queue messages destined for an upcoming view, or discard messages sent in a previous view. Both modes
26  * reset their seqnos to 0 when receiving a view change. The NAK_ACK scheme is used for broadcasting
27  * view changes.
28  * <p/>
29  * The third mode is for out-of-band control messages (activated by SWITCH_OUT_OF_BAND): this mode does
30  * neither employ view IDs, nor does it use the same seqnos as NAK and NAK_ACK. It uses its own seqnos,
31  * unrelated to the ones used by NAK and NAK_ACK, and never resets them. In combination with the sender's
32  * address, this makes every out-of-band message unique. Out-of-band messages are used for example for
33  * broadcasting FLUSH messages.<p>
34  * Once a mode is set, it remains in effect until exactly 1 message has been sent, afterwards the default
35  * mode NAK is used again.
36  * <p/>
37  * The following communication between 2 peers exists (left side is initiator,
38  * right side receiver): <pre>
39  * <p/>
40  * <p/>
41  * send_out_of_band
42  * --------------> synchronous (1)
43  * <-------------
44  * ack
45  * <p/>
46  * <p/>
47  * send_nak
48  * --------------> asynchronous (2)
49  * <p/>
50  * <p/>
51  * send_nak_ack
52  * --------------> synchronous (3)
53  * <--------------
54  * ack
55  * <p/>
56  * <p/>
57  * retransmit
58  * <-------------- asynchronous (4)
59  * <p/>
60  * <p/>
61  * </pre>
62  * <p/>
63  * When a message is sent, it will contain a header describing the type of the
64  * message, and containing additional data, such as sequence number etc. When a
65  * message is received, it is fed into either the OutOfBander or NAKer, depending on the
66  * header's type.<p>
67  * Note that in the synchronous modes, ACKs are sent for each request. If a reliable unicast protocol layer
68  * exists somewhere underneath this layer, then even the ACKs are transmitted reliably, thus increasing
69  * the number of messages exchanged. However, since it is envisaged that ACK/OUT_OF_BAND are not used
70  * frequently, this problem is currently not addressed.
71  *
72  * @author Bela Ban
73  */

74 public class NAKACK extends Protocol {
75     long[] retransmit_timeout={2000, 3000, 5000, 8000}; // time(s) to wait before requesting xmit
76
NAKer naker=null;
77     OutOfBander out_of_bander=null;
78     ViewId vid=null;
79     View view=null;
80     boolean is_server=false;
81     Address local_addr=null;
82     final List queued_msgs=new List(); // msgs for next view (vid > current vid)
83
Vector JavaDoc members=null; // for OutOfBander: this is the destination set to
84
// send messages to
85
boolean send_next_msg_out_of_band=false;
86     boolean send_next_msg_acking=false;
87     long rebroadcast_timeout=0; // until all outstanding ACKs recvd (rebcasting)
88
TimeScheduler timer=null;
89     static final String JavaDoc WRAPPED_MSG_KEY="NAKACK.WRAPPED_HDR";
90
91
92     /**
93      * Do some initial tasks
94      */

95     public void init() throws Exception JavaDoc {
96         timer=stack != null? stack.timer : null;
97         if(timer == null)
98             if(log.isErrorEnabled()) log.error("timer is null");
99         naker=new NAKer();
100         out_of_bander=new OutOfBander();
101     }
102
103     public void stop() {
104         out_of_bander.stop();
105         naker.stop();
106     }
107
108
109     public String JavaDoc getName() {
110         return "NAKACK";
111     }
112
113
114     public Vector JavaDoc providedUpServices() {
115         Vector JavaDoc retval=new Vector JavaDoc(3);
116         retval.addElement(new Integer JavaDoc(Event.GET_MSGS_RECEIVED));
117         retval.addElement(new Integer JavaDoc(Event.GET_MSG_DIGEST));
118         retval.addElement(new Integer JavaDoc(Event.GET_MSGS));
119         return retval;
120     }
121
122
123     public Vector JavaDoc providedDownServices() {
124         Vector JavaDoc retval=new Vector JavaDoc(1);
125         retval.addElement(new Integer JavaDoc(Event.GET_MSGS_RECEIVED));
126         return retval;
127     }
128
129
130     /**
131      * <b>Callback</b>. Called by superclass when event may be handled.<p>
132      * <b>Do not use <code>passUp()</code> in this method as the event is passed up
133      * by default by the superclass after this method returns !</b>
134      */

135     public void up(Event evt) {
136         NakAckHeader hdr;
137         Message msg, msg_copy;
138         int rc;
139
140         switch(evt.getType()) {
141
142             case Event.SUSPECT:
143
144                     if(log.isInfoEnabled()) log.info("received SUSPECT event (suspected member=" + evt.getArg() + ')');
145                 naker.suspect((Address)evt.getArg());
146                 out_of_bander.suspect((Address)evt.getArg());
147                 break;
148
149             case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
150
naker.stable((long[])evt.getArg());
151                 return; // don't pass up further (Bela Aug 7 2001)
152

153             case Event.SET_LOCAL_ADDRESS:
154                 local_addr=(Address)evt.getArg();
155                 break;
156
157             case Event.GET_MSGS_RECEIVED: // returns the highest seqnos delivered to the appl. (used by STABLE)
158
long[] highest=naker.getHighestSeqnosDelivered();
159                 passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest));
160                 return; // don't pass up further (bela Aug 7 2001)
161

162             case Event.MSG:
163                 synchronized(this) {
164                     msg=(Message)evt.getArg();
165
166                     // check to see if this is a wrapped msg. If yes, send an ACK
167
hdr=(NakAckHeader)msg.removeHeader(WRAPPED_MSG_KEY); // see whether it is a wrapped message
168
if(hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { // send back an ACK to hdr.sender
169
Message ack_msg=new Message(hdr.sender, null, null);
170                         NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);
171                         if(hdr.sender == null)
172                             if(log.isWarnEnabled()) log.warn("WRAPPED: header's 'sender' field is null; " +
173                                     "cannot send ACK !");
174                         ack_msg.putHeader(getName(), h);
175                         passDown(new Event(Event.MSG, ack_msg));
176                     }
177
178                     hdr=(NakAckHeader)msg.removeHeader(getName());
179                     if(hdr == null)
180                         break; // pass up
181

182                     switch(hdr.type) {
183
184                         case NakAckHeader.NAK_ACK_MSG:
185                         case NakAckHeader.NAK_MSG:
186                             if(hdr.type == NakAckHeader.NAK_ACK_MSG) { // first thing: send ACK back to sender
187
Message ack_msg=new Message(msg.getSrc(), null, null);
188                                 NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);
189                                 ack_msg.putHeader(getName(), h);
190                                 passDown(new Event(Event.MSG, ack_msg));
191                             }
192
193                             // while still a client, we just pass up all messages, without checking for message
194
// view IDs or seqnos: other layers further up will discard messages not destined
195
// for us (e.g. based on view IDs).
196
// Also: store msg in queue, when view change is received, replay messages with the same
197
// vid as the new view
198
if(!is_server) {
199                                 msg_copy=msg.copy(); // msg without header
200
msg_copy.putHeader(getName(), hdr); // put header back on as we removed it above
201
queued_msgs.add(msg_copy); // need a copy since passUp() will modify msg
202
passUp(new Event(Event.MSG, msg));
203                                 return;
204                             }
205
206
207                             // check for VIDs: is the message's VID the same as ours ?
208
if(vid != null && hdr.vid != null) { // only check if our vid and message's vid available
209
Address my_addr=vid.getCoordAddress(), other_addr=hdr.vid.getCoordAddress();
210
211                                 if(my_addr == null || other_addr == null) {
212                                     if(log.isWarnEnabled()) log.warn("my vid or message's vid does not contain " +
213                                             "a coordinator; discarding message !");
214                                     return;
215                                 }
216                                 if(!my_addr.equals(other_addr)) {
217                                     if(log.isWarnEnabled()) log.warn("creator of own vid (" + my_addr + ")is different from " +
218                                             "creator of message's vid (" + other_addr + "); discarding message !");
219                                     return;
220                                 }
221
222                                 rc=hdr.vid.compareTo(vid);
223                                 if(rc > 0) { // message is sent in next view -> store !
224

225                                         if(log.isInfoEnabled()) log.info("message's vid (" + hdr.vid + '#' + hdr.seqno +
226                                                 ") is bigger than current vid: (" + vid + ") message is queued !");
227                                     msg.putHeader(getName(), hdr); // put header back on as we removed it above
228
queued_msgs.add(msg);
229                                     return;
230                                 }
231                                 if(rc < 0) { // message sent in prev. view -> discard !
232

233                                         if(log.isWarnEnabled()) log.warn("message's vid (" + hdr.vid + ") is smaller than " +
234                                                 "current vid (" + vid + "): message <" + msg.getSrc() + ":#" +
235                                                 hdr.seqno + "> is discarded ! Hdr is " + hdr);
236                                     return;
237                                 }
238                                 // If we made it down here, the vids are the same --> OK
239
}
240
241
242                             msg.putHeader(getName(), hdr); // stored in received_msgs, re-sent later that's why hdr is added !
243
naker.receive(hdr.seqno, msg, null);
244                             return; // naker passes message up for us !
245

246
247                         case NakAckHeader.RETRANSMIT_MSG:
248                             naker.retransmit(msg.getSrc(), hdr.seqno, hdr.last_seqno);
249                             return;
250
251                         case NakAckHeader.NAK_ACK_RSP:
252                             naker.receiveAck(hdr.seqno, msg.getSrc());
253                             return; // discard, no need to pass up
254

255                         case NakAckHeader.OUT_OF_BAND_MSG:
256                             out_of_bander.receive(hdr.seqno, msg, hdr.stable_msgs);
257                             return; // naker passes message up for us !
258

259                         case NakAckHeader.OUT_OF_BAND_RSP:
260                             out_of_bander.receiveAck(hdr.seqno, msg.getSrc());
261                             return;
262
263                         default:
264                             if(log.isErrorEnabled()) log.error("NakAck header type " + hdr.type + " not known !");
265                             break;
266                     }
267                 } //end synchronized
268

269         }
270
271         passUp(evt);
272     }
273
274
275     /**
276      * <b>Callback</b>. Called by superclass when event may be handled.<p>
277      * <b>Do not use <code>passDown</code> in this method as the event is passed down
278      * by default by the superclass after this method returns !</b>
279      */

280     public void down(Event evt) {
281         Message msg;
282
283         if(log.isTraceEnabled())
284             log.trace("queued_msgs has " + queued_msgs.size() + " messages " +
285                     "\n\nnaker:\n" + naker.dumpContents() + "\n\nout_of_bander: " +
286                     out_of_bander.dumpContents() + "\n-----------------------------\n");
287
288         switch(evt.getType()) {
289
290             case Event.MSG:
291                 msg=(Message)evt.getArg();
292
293             // unicast address: not null and not mcast, pass down unchanged
294
if(vid == null || (msg.getDest() != null && !msg.getDest().isMulticastAddress()))
295                 break;
296
297                 if(send_next_msg_out_of_band) {
298                     out_of_bander.send(msg);
299                     send_next_msg_out_of_band=false;
300                 }
301                 else if(send_next_msg_acking) {
302                     naker.setAcks(true); // require acks when sending a msg
303
naker.send(msg);
304                     naker.setAcks(false); // don't require acks when sending a msg
305
send_next_msg_acking=false;
306                 }
307                 else
308                     naker.send(msg);
309
310                 return; // don't pass down the stack, naker does this for us !
311

312             case Event.GET_MSG_DIGEST:
313                 long[] highest_seqnos=(long[])evt.getArg();
314                 Digest digest=naker.computeMessageDigest(highest_seqnos);
315                 passUp(new Event(Event.GET_MSG_DIGEST_OK, digest));
316                 return;
317
318             case Event.GET_MSGS:
319                 List lower_seqnos=naker.getMessagesInRange((long[][])evt.getArg());
320                 passUp(new Event(Event.GET_MSGS_OK, lower_seqnos));
321                 return;
322
323             case Event.REBROADCAST_MSGS:
324                 rebroadcastMsgs((Vector JavaDoc)evt.getArg());
325                 break;
326
327             case Event.TMP_VIEW:
328                 Vector JavaDoc mbrs=((View)evt.getArg()).getMembers();
329                 members=mbrs != null? (Vector JavaDoc)mbrs.clone() : new Vector JavaDoc(11);
330                 break;
331
332             case Event.VIEW_CHANGE:
333                 synchronized(this) {
334                     view=((View)((View)evt.getArg()).clone());
335                     vid=view.getVid();
336
337                     members=(Vector JavaDoc)view.getMembers().clone();
338
339                     naker.reset();
340                     out_of_bander.reset();
341
342                     is_server=true; // check vids from now on
343

344                     // deliver messages received previously for this view
345
if(queued_msgs.size() > 0)
346                         deliverQueuedMessages();
347                 }
348                 break;
349
350             case Event.BECOME_SERVER:
351                 is_server=true;
352                 break;
353
354             case Event.SWITCH_NAK:
355                 naker.setAcks(false); // don't require acks when sending a msg
356
return; // don't pass down any further
357

358             case Event.SWITCH_NAK_ACK:
359                 send_next_msg_acking=true;
360                 return; // don't pass down any further
361

362             case Event.SWITCH_OUT_OF_BAND:
363                 send_next_msg_out_of_band=true;
364                 return;
365
366             case Event.GET_MSGS_RECEIVED: // return the highest seqnos delivered (=consumed by the application)
367
long[] h=naker.getHighestSeqnosDelivered();
368                 passUp(new Event(Event.GET_MSGS_RECEIVED_OK, h));
369                 break;
370         }
371
372
373         passDown(evt);
374     }
375
376
377     boolean coordinator() {
378         if(members == null || members.size() < 1 || local_addr == null)
379             return false;
380         return local_addr.equals(members.elementAt(0));
381     }
382
383     /**
384      * Rebroadcasts the messages given as arguments
385      */

386     void rebroadcastMsgs(Vector JavaDoc v) {
387         Vector JavaDoc final_v;
388         Message m1, m2;
389         NakAckHeader h1, h2;
390
391         if(v == null) return;
392         final_v=new Vector JavaDoc(v.size());
393
394         // weed out duplicates
395
/** todo Check!!!!! */
396         for(int i=0; i < v.size(); i++) {
397             boolean present=false;
398             m1=(Message)v.elementAt(i);
399             h1=m1 != null? (NakAckHeader)m1.getHeader(getName()) : null;
400             if(m1 == null || h1 == null) { // +++ remove
401
if(log.isErrorEnabled()) log.error("message is null");
402                 continue;
403             }
404
405             for(int j=0; j < final_v.size(); j++) {
406                 m2=(Message)final_v.elementAt(j);
407                 h2=m2 != null? (NakAckHeader)m2.getHeader(getName()) : null;
408                 if(m2 == null || h2 == null) { // +++ remove
409
if(log.isErrorEnabled()) log.error("message m2 is null");
410                     continue;
411                 }
412                 if(h1.seqno == h2.seqno && m1.getSrc() != null && m2.getSrc() != null &&
413                         m1.getSrc().equals(m2.getSrc())) {
414                     present=true;
415                 }
416             }
417             if(!present)
418                 final_v.addElement(m1);
419         }
420
421         if(log.isWarnEnabled()) log.warn("rebroadcasting " + final_v.size() + " messages");
422
423         /* Now re-broadcast messages using original NakAckHeader (same seqnos, same sender !) */
424         for(int i=0; i < final_v.size(); i++) {
425             m1=(Message)final_v.elementAt(i);
426             naker.resend(m1);
427         }
428
429         // Wait until all members have acked reception of outstanding msgs. This will empty our
430
// retransmission table (AckMcastSenderWindow)
431
naker.waitUntilAllAcksReceived(rebroadcast_timeout);
432         passUp(new Event(Event.REBROADCAST_MSGS_OK));
433     }
434
435
436     /**
437      * Deliver all messages in the queue where <code>msg.vid == vid</code> holds. Messages were stored
438      * in the queue because their vid was greater than the current view.
439      */

440     void deliverQueuedMessages() {
441         NakAckHeader hdr;
442         Message tmpmsg;
443         int rc;
444
445         while(queued_msgs.size() > 0) {
446             tmpmsg=(Message)queued_msgs.removeFromHead();
447             hdr=(NakAckHeader)tmpmsg.getHeader(getName());
448             rc=hdr.vid.compareTo(vid);
449             if(rc == 0) { // same vid -> OK
450
up(new Event(Event.MSG, tmpmsg));
451             }
452             else if(rc > 0) {
453                 ;
454             }
455             else
456             /** todo Maybe messages from previous vids are stored while client */
457                 ; // can't be the case; only messages for future views are stored !
458
}
459     }
460
461
462     public boolean setProperties(Properties JavaDoc props) {
463         String JavaDoc str;
464         long[] tmp;
465
466         super.setProperties(props);
467         str=props.getProperty("retransmit_timeout");
468         if(str != null) {
469             tmp=Util.parseCommaDelimitedLongs(str);
470             props.remove("retransmit_timeout");
471             if(tmp != null && tmp.length > 0)
472                 retransmit_timeout=tmp;
473         }
474
475         str=props.getProperty("rebroadcast_timeout");
476         if(str != null) {
477             rebroadcast_timeout=Long.parseLong(str);
478             props.remove("rebroadcast_timeout");
479         }
480
481         if(props.size() > 0) {
482             System.err.println("NAKACK.setProperties(): these properties are not recognized:");
483             props.list(System.out);
484             return false;
485         }
486         return true;
487     }
488
489
490     class NAKer implements Retransmitter.RetransmitCommand, AckMcastSenderWindow.RetransmitCommand {
491         long seqno=0; // current message sequence number
492
final Hashtable JavaDoc received_msgs=new Hashtable JavaDoc(); // ordered by sender -> NakReceiverWindow
493
final Hashtable JavaDoc sent_msgs=new Hashtable JavaDoc(); // ordered by seqno (sent by me !) - Messages
494
final AckMcastSenderWindow sender_win=new AckMcastSenderWindow(this, timer);
495         boolean acking=false; // require acks when sending msgs
496
long deleted_up_to=0;
497
498
499         // Used to periodically retransmit the last message
500
final LastMessageRetransmitter last_msg_xmitter=new LastMessageRetransmitter();
501
502
503         private class LastMessageRetransmitter implements TimeScheduler.Task {
504             boolean stopped=false;
505             int num_times=2; // number of times a message is retransmitted
506
long last_xmitted_seqno=0;
507
508
509             public void stop() {
510                 stopped=true;
511             }
512
513             public boolean cancelled() {
514                 return stopped;
515             }
516
517             public long nextInterval() {
518                 return retransmit_timeout[0];
519             }
520
521
522             /**
523              * Periodically retransmits the last seqno to all members. If the seqno doesn't change (ie. there
524              * were no new messages sent) then the retransmitter task doesn't retransmit after 'num_times' times.
525              */

526             public void run() {
527                 synchronized(sent_msgs) {
528                     long prevSeqno=seqno - 1;
529
530                     if(prevSeqno == last_xmitted_seqno) {
531
532                             if(log.isInfoEnabled()) log.info("prevSeqno=" + prevSeqno + ", last_xmitted_seqno=" +
533                                     last_xmitted_seqno + ", num_times=" + num_times);
534                         if(--num_times <= 0)
535                             return;
536                     }
537                     else {
538                         num_times=3;
539                         last_xmitted_seqno=prevSeqno;
540                     }
541
542                     if((prevSeqno >= 0) && (prevSeqno > deleted_up_to)) {
543
544                             if(log.isInfoEnabled()) log.info("retransmitting last message " + prevSeqno);
545                         retransmit(null, prevSeqno, prevSeqno);
546                     }
547                 }
548             }
549
550         }
551
552
553         NAKer() {
554             if(timer != null)
555                 timer.add(last_msg_xmitter, true); // fixed-rate scheduling
556
else
557                 if(log.isErrorEnabled()) log.error("timer is null");
558         }
559
560
561         long getNextSeqno() {
562             return seqno++;
563         }
564
565
566         long getHighestSeqnoSent() {
567             long highest_sent=-1;
568             for(Enumeration JavaDoc e=sent_msgs.keys(); e.hasMoreElements();)
569                 highest_sent=Math.max(highest_sent, ((Long JavaDoc)e.nextElement()).longValue());
570             return highest_sent;
571         }
572
573
574         /**
575          * Returns an array of the highest sequence numbers consumed by the application so far,
576          * its order corresponding with <code>mbrs</code>. Used by coordinator as argument when
577          * sending initial FLUSH request to members
578          */

579         long[] getHighestSeqnosDelivered() {
580             long[] highest_deliv=members != null? new long[members.size()] : null;
581             Address mbr;
582             NakReceiverWindow win;
583
584             if(highest_deliv == null) return null;
585
586             for(int i=0; i < highest_deliv.length; i++) highest_deliv[i]=-1;
587
588             synchronized(members) {
589                 for(int i=0; i < members.size(); i++) {
590                     mbr=(Address)members.elementAt(i);
591                     win=(NakReceiverWindow)received_msgs.get(mbr);
592                     if(win != null)
593                         highest_deliv[i]=win.getHighestDelivered();
594                 }
595             }
596             return highest_deliv;
597         }
598
599
600         /**
601          * Return all messages sent by us that are higher than <code>seqno</code>
602          */

603         List getSentMessagesHigherThan(long seqno) {
604             List retval=new List();
605             Long JavaDoc key;
606
607             for(Enumeration JavaDoc e=sent_msgs.keys(); e.hasMoreElements();) {
608                 key=(Long JavaDoc)e.nextElement();
609                 if(key.longValue() > seqno)
610                     retval.add(sent_msgs.get(key));
611             }
612             return retval;
613         }
614
615
616         /**
617          * Returns a message digest: for each member P in <code>highest_seqnos</code>, the highest seqno
618          * received from P is added to the digest's array. If P == this, then the highest seqno
619          * <em>sent</em> is added: this makes sure that messages sent but not yet received are also
620          * re-broadcast (because they are also unstable).<p>If my highest seqno for a member P is
621          * higher than the one in <code>highest_seqnos</code>, then all messages from P (received or sent)
622          * whose seqno is higher are added to the digest's messages. The coordinator will use all digests
623          * to compute a set of messages than need to be re-broadcast to the members before installing
624          * a new view.
625          */

626         Digest computeMessageDigest(long[] highest_seqnos) {
627             Digest digest=highest_seqnos != null? new Digest(highest_seqnos.length) : null;
628             Address sender;
629             NakReceiverWindow win;
630             List unstable_msgs;
631             int own_index;
632             long highest_seqno_sent=-1, highest_seqno_received=-1;
633
634             if(digest == null) {
635
636                     if(log.isWarnEnabled()) log.warn("highest_seqnos is null, cannot compute digest !");
637                 return null;
638             }
639
640             if(highest_seqnos.length != members.size()) {
641
642                     if(log.isWarnEnabled()) log.warn("the mbrship size and the size " +
643                             "of the highest_seqnos array are not equal, cannot compute digest !");
644                 return null;
645             }
646
647             System.arraycopy(highest_seqnos, 0, digest.highest_seqnos, 0, digest.highest_seqnos.length);
648
649             for(int i=0; i < highest_seqnos.length; i++) {
650                 sender=(Address)members.elementAt(i);
651                 if(sender == null) continue;
652                 win=(NakReceiverWindow)received_msgs.get(sender);
653                 if(win == null) continue;
654                 digest.highest_seqnos[i]=win.getHighestReceived();
655                 unstable_msgs=win.getMessagesHigherThan(highest_seqnos[i]);
656                 for(Enumeration JavaDoc e=unstable_msgs.elements(); e.hasMoreElements();)
657                     digest.msgs.add(e.nextElement());
658             }
659
660
661             /** If our highest seqno <em>sent</em> is higher than the one <em>received</em>, we have to
662              (a) set it in the digest and (b) add the corresponding messages **/

663
664             own_index=members.indexOf(local_addr);
665             if(own_index == -1) {
666
667                     if(log.isWarnEnabled()) log.warn("no own address in highest_seqnos");
668                 return digest;
669             }
670             highest_seqno_received=digest.highest_seqnos[own_index];
671             highest_seqno_sent=getHighestSeqnoSent();
672
673             if(highest_seqno_sent > highest_seqno_received) {
674                 // (a) Set highest seqno sent in digest
675
digest.highest_seqnos[own_index]=highest_seqno_sent;
676
677                 // (b) Add messages between highest_seqno_received and highest_seqno_sent
678
unstable_msgs=getSentMessagesHigherThan(highest_seqno_received);
679                 for(Enumeration JavaDoc e=unstable_msgs.elements(); e.hasMoreElements();)
680                     digest.msgs.add(e.nextElement());
681             }
682
683             return digest;
684         }
685
686
687         /**
688          * For each non-null member m in <code>range</code>, get messages with sequence numbers between
689          * range[m][0] and range[m][1], excluding range[m][0] and including range[m][1].
690          */

691         List getMessagesInRange(long[][] range) {
692             List retval=new List();
693             List tmp;
694             NakReceiverWindow win;
695             Address sender;
696
697             for(int i=0; i < range.length; i++) {
698                 if(range[i] != null) {
699                     sender=(Address)members.elementAt(i);
700                     if(sender == null) continue;
701                     win=(NakReceiverWindow)received_msgs.get(sender);
702                     if(win == null) continue;
703                     tmp=win.getMessagesInRange(range[i][0], range[i][1]);
704                     if(tmp == null || tmp.size() < 1) continue;
705                     for(Enumeration JavaDoc e=tmp.elements(); e.hasMoreElements();)
706                         retval.add(e.nextElement());
707                 }
708             }
709             return retval;
710         }
711
712
713         void setAcks(boolean f) {
714             acking=f;
715         }
716
717
718         /**
719          * Vector with messages (ordered by sender) that are stable and can be discarded.
720          * This applies to NAK-based sender and receivers.
721          */

722         void stable(long[] seqnos) {
723             int index;
724             long seqno;
725             NakReceiverWindow recv_win;
726             Address sender;
727
728             if(members == null || local_addr == null) {
729                  if(log.isWarnEnabled()) log.warn("members or local_addr are null !");
730                 return;
731             }
732             index=members.indexOf(local_addr);
733
734             if(index < 0) {
735
736                     if(log.isWarnEnabled()) log.warn("member " + local_addr + " not found in " + members);
737                 return;
738             }
739             seqno=seqnos[index];
740
741                 if(log.isInfoEnabled()) log.info("deleting stable messages [" +
742                         deleted_up_to + " - " + seqno + ']');
743
744             // delete sent messages that are stable (kept for retransmission requests from receivers)
745
synchronized(sent_msgs) {
746                 for(long i=deleted_up_to; i <= seqno; i++) {
747                     sent_msgs.remove(new Long JavaDoc(i));
748                 }
749                 deleted_up_to=seqno;
750             }
751             // delete received msgs that are stable
752
for(int i=0; i < members.size(); i++) {
753                 sender=(Address)members.elementAt(i);
754                 recv_win=(NakReceiverWindow)received_msgs.get(sender);
755                 if(recv_win != null)
756                     recv_win.stable(seqnos[i]); // delete all messages with seqnos <= seqnos[i]
757
}
758         }
759
760
761         void send(Message msg) {
762             long id=getNextSeqno();
763             ViewId vid_copy;
764
765             if(vid == null)
766                 return;
767             vid_copy=(ViewId)vid.clone(); /** todo No needs to copy vid */
768
769             if(acking) {
770                 msg.putHeader(getName(), new NakAckHeader(NakAckHeader.NAK_ACK_MSG, id, vid_copy));
771                 sender_win.add(id, msg.copy(), (Vector JavaDoc)members.clone()); // msg must be copied !
772
}
773             else
774                 msg.putHeader(getName(), new NakAckHeader(NakAckHeader.NAK_MSG, id, vid_copy));
775
776              if(log.isInfoEnabled()) log.info("sending msg #" + id);
777
778             sent_msgs.put(new Long JavaDoc(id), msg.copy());
779             passDown(new Event(Event.MSG, msg));
780         }
781
782
783
784
785
786         /** Re-broadcast message. Message already contains NakAckHeader (therefore also seqno).
787          Wrap message (including header) in a new message and bcasts using ACKS. Every receiver
788          acks the message, unwraps it to get the original message and delivers the original message
789          (if not yet delivered).
790          // send msgs in Vector arg again (they already have a NakAckHeader !)
791          // -> use the same seq numbers
792          // -> destination has to be set to null (broadcast), e.g.:
793          // dst=p (me !), SRC=q --> dst=null, SRC=q
794
795          TODO:
796          -----
797          resend() has to wait until it received all ACKs from all recipients (for all msgs), or until
798          members were suspected. Thus we can ensure that all members received outstanding msgs before
799          we switch to a new view. Otherwise, because the switch to a new view resets NAK and ACK msg
800          transmission, slow members might never receive all outstanding messages.
801          */

802
803
804
805         /**
806          * 1. Set the destination address of the original msg to null
807          * 2. Add a new header WRAPPED_MSG and send msg. The receiver will ACK the msg,
808          * remove the header and deliver the msg
809          */

810         void resend(Message msg) {
811             Message copy=msg.copy();
812             NakAckHeader hdr=(NakAckHeader)copy.getHeader(getName());
813             NakAckHeader wrapped_hdr;
814             long id=hdr.seqno;
815
816             if(vid == null) return;
817             copy.setDest(null); // broadcast, e.g. dst(p), src(q) --> dst(null), src(q)
818
wrapped_hdr=new NakAckHeader(NakAckHeader.WRAPPED_MSG, hdr.seqno, hdr.vid);
819             wrapped_hdr.sender=local_addr;
820             copy.putHeader(WRAPPED_MSG_KEY, wrapped_hdr);
821             sender_win.add(id, copy.copy(), (Vector JavaDoc)members.clone());
822              if(log.isInfoEnabled()) log.info("resending " + copy.getHeader(getName()));
823             passDown(new Event(Event.MSG, copy));
824         }
825
826
827         void waitUntilAllAcksReceived(long timeout) {
828             sender_win.waitUntilAllAcksReceived(timeout);
829         }
830
831
832         void receive(long id, Message msg, Vector JavaDoc stable_msgs) { /** todo Vector stable_msgs is not used in NAKer.receive() */
833             Address sender=msg.getSrc();
834             NakReceiverWindow win=(NakReceiverWindow)received_msgs.get(sender);
835             Message msg_to_deliver;
836
837             if(win == null) {
838                 win=new NakReceiverWindow(sender, this, 0);
839                 win.setRetransmitTimeouts(retransmit_timeout);
840                 received_msgs.put(sender, win);
841             }
842
843              if(log.isInfoEnabled()) log.info("received <" + sender + '#' + id + '>');
844
845             win.add(id, msg); // add in order, then remove and pass up as many msgs as possible
846
while(true) {
847                 msg_to_deliver=win.remove();
848                 if(msg_to_deliver == null)
849                     break;
850
851                 if(msg_to_deliver.getHeader(getName()) instanceof NakAckHeader)
852                     msg_to_deliver.removeHeader(getName());
853                 passUp(new Event(Event.MSG, msg_to_deliver));
854             }
855         }
856
857
858         void receiveAck(long id, Address sender) {
859
860                 if(log.isInfoEnabled()) log.info("received ack <-- ACK <" + sender + '#' + id + '>');
861             sender_win.ack(id, sender);
862         }
863
864
865         /**
866          * Implementation of interface AckMcastSenderWindow.RetransmitCommand.<p>
867          * Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already
868          * a copy, so does not need to be copied again.
869          */

870         public void retransmit(long seqno, Message msg, Address dest) {
871
872                 if(log.isInfoEnabled()) log.info("retransmitting message " + seqno + " to " + dest +
873                         ", header is " + msg.getHeader(getName()));
874
875             // check whether dest is member of group. If not, discard retransmission message and
876
// also remove it from sender_win (AckMcastSenderWindow)
877
if(members != null) {
878                 if(!members.contains(dest)) {
879
880                         if(log.isInfoEnabled()) log.info("retransmitting " + seqno + ") to " + dest + ": " + dest +
881                                 " is not a member; discarding retransmission and removing " +
882                                 dest + " from sender_win");
883                     sender_win.remove(dest);
884                     return;
885                 }
886             }
887
888             msg.setDest(dest);
889             passDown(new Event(Event.MSG, msg));
890         }
891
892
893         /**
894          * Implementation of Retransmitter.RetransmitCommand.<p>
895          * Called by retransmission thread when gap is detected. Sends retr. request
896          * to originator of msg
897          */

898         public void retransmit(long first_seqno, long last_seqno, Address sender) {
899
900                 if(log.isInfoEnabled()) log.info("retransmit([" + first_seqno + ", " + last_seqno +
901                         "]) to " + sender + ", vid=" + vid);
902
903             NakAckHeader hdr=new NakAckHeader(NakAckHeader.RETRANSMIT_MSG, first_seqno, (ViewId)vid.clone()); /** todo Not necessary to clone vid */
904             Message retransmit_msg=new Message(sender, null, null);
905
906             hdr.last_seqno=last_seqno;
907             retransmit_msg.putHeader(getName(), hdr);
908             passDown(new Event(Event.MSG, retransmit_msg));
909         }
910
911
912         // Retransmit from sent-table, called when RETRANSMIT message is received
913
void retransmit(Address dest, long first_seqno, long last_seqno) {
914             Message m, retr_msg;
915
916             for(long i=first_seqno; i <= last_seqno; i++) {
917                 m=(Message)sent_msgs.get(new Long JavaDoc(i));
918                 if(m == null) {
919                     if(log.isWarnEnabled()) log.warn("(to " + dest + "): message with " + "seqno=" + i + " not found !");
920                     continue;
921                 }
922
923                 retr_msg=m.copy();
924                 retr_msg.setDest(dest);
925
926                 try {
927                     passDown(new Event(Event.MSG, retr_msg));
928                 }
929                 catch(Exception JavaDoc e) {
930                     if(log.isDebugEnabled()) log.debug("exception is " + e);
931                 }
932             }
933         }
934
935
936         void stop() {
937             if(sender_win != null)
938                 sender_win.stop();
939         }
940
941
942         void reset() {
943             NakReceiverWindow win;
944
945             // Only reset if not coord: coord may have to retransmit the VIEW_CHANGE msg to slow members,
946
// since VIEW_CHANGE results in retransmitter resetting, retransmission would be killed, and the
947
// slow mbr would never receive a new view (see ./design/ViewChangeRetransmission.txt)
948
if(!coordinator())
949                 sender_win.reset();
950
951             sent_msgs.clear();
952             for(Enumeration JavaDoc e=received_msgs.elements(); e.hasMoreElements();) {
953                 win=(NakReceiverWindow)e.nextElement();
954                 win.reset();
955             }
956             received_msgs.clear();
957             seqno=0;
958             deleted_up_to=0;
959         }
960
961
962         public void suspect(Address mbr) {
963             NakReceiverWindow w;
964
965             w=(NakReceiverWindow)received_msgs.get(mbr);
966             if(w != null) {
967                 w.reset();
968                 received_msgs.remove(mbr);
969             }
970
971             sender_win.suspect(mbr); // don't keep retransmitting messages to mbr
972
}
973
974
975         String JavaDoc dumpContents() {
976             StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
977
978             ret.append("\nsent_msgs: " + sent_msgs.size());
979
980             ret.append("\nreceived_msgs: ");
981             for(Enumeration JavaDoc e=received_msgs.keys(); e.hasMoreElements();) {
982                 Address key=(Address)e.nextElement();
983                 NakReceiverWindow w=(NakReceiverWindow)received_msgs.get(key);
984                 ret.append('\n' + w.toString());
985             }
986
987             ret.append("\nsender_win: " + sender_win.toString());
988
989
990             return ret.toString();
991         }
992
993
994     }
995
996
997     class OutOfBander implements AckMcastSenderWindow.RetransmitCommand {
998         final AckMcastSenderWindow sender_win=new AckMcastSenderWindow(this, timer);
999         final AckMcastReceiverWindow receiver_win=new AckMcastReceiverWindow();
1000        long seqno=0;
1001
1002
1003        void send(Message msg) {
1004            long id=seqno++;
1005            Vector JavaDoc stable_msgs=sender_win.getStableMessages();
1006            NakAckHeader hdr;
1007
1008             if(log.isInfoEnabled()) log.info("sending msg #=" + id);
1009
1010            hdr=new NakAckHeader(NakAckHeader.OUT_OF_BAND_MSG, id, null);
1011            hdr.stable_msgs=stable_msgs;
1012            msg.putHeader(getName(), hdr);
1013
1014            // msg needs to be copied, otherwise it will be modified by the code below
1015
sender_win.add(id, msg.copy(), (Vector JavaDoc)members.clone());
1016
1017            passDown(new Event(Event.MSG, msg));
1018        }
1019
1020
1021        void receive(long id, Message msg, Vector JavaDoc stable_msgs) {
1022            Address sender=msg.getSrc();
1023
1024            // first thing: send ACK back to sender
1025
Message ack_msg=new Message(msg.getSrc(), null, null);
1026            NakAckHeader hdr=new NakAckHeader(NakAckHeader.OUT_OF_BAND_RSP, id, null);
1027            ack_msg.putHeader(getName(), hdr);
1028
1029
1030             if(log.isInfoEnabled()) log.info("received <" + sender + '#' + id + ">\n");
1031
1032            if(receiver_win.add(sender, id)) // not received previously
1033
passUp(new Event(Event.MSG, msg));
1034
1035            passDown(new Event(Event.MSG, ack_msg)); // send ACK
1036
if(log.isInfoEnabled()) log.info("sending ack <" + sender + '#' + id + ">\n");
1037
1038            if(stable_msgs != null)
1039                receiver_win.remove(sender, stable_msgs);
1040        }
1041
1042
1043        void receiveAck(long id, Address sender) {
1044             if(log.isInfoEnabled()) log.info("received ack <" + sender + '#' + id + '>');
1045            sender_win.ack(id, sender);
1046        }
1047
1048
1049        /**
1050         * Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already
1051         * a copy, so does not need to be copied again. All the necessary header are already present;
1052         * no header needs to be added ! The message is retransmitted as <em>unicast</em> !
1053         */

1054        public void retransmit(long seqno, Message msg, Address dest) {
1055             if(log.isInfoEnabled()) log.info("dest=" + dest + ", msg #" + seqno);
1056            msg.setDest(dest);
1057            passDown(new Event(Event.MSG, msg));
1058        }
1059
1060
1061        void reset() {
1062            sender_win.reset(); // +++ ?
1063
receiver_win.reset(); // +++ ?
1064
}
1065
1066
1067        void suspect(Address mbr) {
1068            sender_win.suspect(mbr);
1069            receiver_win.suspect(mbr);
1070        }
1071
1072
1073        void start() {
1074            sender_win.start();
1075        }
1076
1077        void stop() {
1078            if(sender_win != null)
1079                sender_win.stop();
1080        }
1081
1082
1083        String JavaDoc dumpContents() {
1084            StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
1085            ret.append("\nsender_win:\n" + sender_win.toString() +
1086                    "\nreceiver_win:\n" + receiver_win.toString());
1087            return ret.toString();
1088        }
1089
1090
1091    }
1092
1093
1094}
1095
Popular Tags