1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.*; 8 import org.jgroups.util.List; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Util; 11 12 import java.util.Enumeration ; 13 import java.util.Hashtable ; 14 import java.util.Properties ; 15 import java.util.Vector ; 16 17 18 74 public class NAKACK extends Protocol { 75 long[] retransmit_timeout={2000, 3000, 5000, 8000}; NAKer naker=null; 77 OutOfBander out_of_bander=null; 78 ViewId vid=null; 79 View view=null; 80 boolean is_server=false; 81 Address local_addr=null; 82 final List queued_msgs=new List(); Vector members=null; boolean send_next_msg_out_of_band=false; 86 boolean send_next_msg_acking=false; 87 long rebroadcast_timeout=0; TimeScheduler timer=null; 89 static final String WRAPPED_MSG_KEY="NAKACK.WRAPPED_HDR"; 90 91 92 95 public void init() throws Exception { 96 timer=stack != null? stack.timer : null; 97 if(timer == null) 98 if(log.isErrorEnabled()) log.error("timer is null"); 99 naker=new NAKer(); 100 out_of_bander=new OutOfBander(); 101 } 102 103 public void stop() { 104 out_of_bander.stop(); 105 naker.stop(); 106 } 107 108 109 public String getName() { 110 return "NAKACK"; 111 } 112 113 114 public Vector providedUpServices() { 115 Vector retval=new Vector (3); 116 retval.addElement(new Integer (Event.GET_MSGS_RECEIVED)); 117 retval.addElement(new Integer (Event.GET_MSG_DIGEST)); 118 retval.addElement(new Integer (Event.GET_MSGS)); 119 return retval; 120 } 121 122 123 public Vector providedDownServices() { 124 Vector retval=new Vector (1); 125 retval.addElement(new Integer (Event.GET_MSGS_RECEIVED)); 126 return retval; 127 } 128 129 130 135 public void up(Event evt) { 136 NakAckHeader hdr; 137 Message msg, msg_copy; 138 int rc; 139 140 switch(evt.getType()) { 141 142 case Event.SUSPECT: 143 144 if(log.isInfoEnabled()) log.info("received SUSPECT event (suspected member=" + evt.getArg() + ')'); 145 naker.suspect((Address)evt.getArg()); 146 out_of_bander.suspect((Address)evt.getArg()); 147 break; 148 149 case Event.STABLE: naker.stable((long[])evt.getArg()); 151 return; 153 case Event.SET_LOCAL_ADDRESS: 154 local_addr=(Address)evt.getArg(); 155 break; 156 157 case Event.GET_MSGS_RECEIVED: long[] highest=naker.getHighestSeqnosDelivered(); 159 passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest)); 160 return; 162 case Event.MSG: 163 synchronized(this) { 164 msg=(Message)evt.getArg(); 165 166 hdr=(NakAckHeader)msg.removeHeader(WRAPPED_MSG_KEY); if(hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { Message ack_msg=new Message(hdr.sender, null, null); 170 NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null); 171 if(hdr.sender == null) 172 if(log.isWarnEnabled()) log.warn("WRAPPED: header's 'sender' field is null; " + 173 "cannot send ACK !"); 174 ack_msg.putHeader(getName(), h); 175 passDown(new Event(Event.MSG, ack_msg)); 176 } 177 178 hdr=(NakAckHeader)msg.removeHeader(getName()); 179 if(hdr == null) 180 break; 182 switch(hdr.type) { 183 184 case NakAckHeader.NAK_ACK_MSG: 185 case NakAckHeader.NAK_MSG: 186 if(hdr.type == NakAckHeader.NAK_ACK_MSG) { Message ack_msg=new Message(msg.getSrc(), null, null); 188 NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null); 189 ack_msg.putHeader(getName(), h); 190 passDown(new Event(Event.MSG, ack_msg)); 191 } 192 193 if(!is_server) { 199 msg_copy=msg.copy(); msg_copy.putHeader(getName(), hdr); queued_msgs.add(msg_copy); passUp(new Event(Event.MSG, msg)); 203 return; 204 } 205 206 207 if(vid != null && hdr.vid != null) { Address my_addr=vid.getCoordAddress(), other_addr=hdr.vid.getCoordAddress(); 210 211 if(my_addr == null || other_addr == null) { 212 if(log.isWarnEnabled()) log.warn("my vid or message's vid does not contain " + 213 "a coordinator; discarding message !"); 214 return; 215 } 216 if(!my_addr.equals(other_addr)) { 217 if(log.isWarnEnabled()) log.warn("creator of own vid (" + my_addr + ")is different from " + 218 "creator of message's vid (" + other_addr + "); discarding message !"); 219 return; 220 } 221 222 rc=hdr.vid.compareTo(vid); 223 if(rc > 0) { 225 if(log.isInfoEnabled()) log.info("message's vid (" + hdr.vid + '#' + hdr.seqno + 226 ") is bigger than current vid: (" + vid + ") message is queued !"); 227 msg.putHeader(getName(), hdr); queued_msgs.add(msg); 229 return; 230 } 231 if(rc < 0) { 233 if(log.isWarnEnabled()) log.warn("message's vid (" + hdr.vid + ") is smaller than " + 234 "current vid (" + vid + "): message <" + msg.getSrc() + ":#" + 235 hdr.seqno + "> is discarded ! Hdr is " + hdr); 236 return; 237 } 238 } 240 241 242 msg.putHeader(getName(), hdr); naker.receive(hdr.seqno, msg, null); 244 return; 246 247 case NakAckHeader.RETRANSMIT_MSG: 248 naker.retransmit(msg.getSrc(), hdr.seqno, hdr.last_seqno); 249 return; 250 251 case NakAckHeader.NAK_ACK_RSP: 252 naker.receiveAck(hdr.seqno, msg.getSrc()); 253 return; 255 case NakAckHeader.OUT_OF_BAND_MSG: 256 out_of_bander.receive(hdr.seqno, msg, hdr.stable_msgs); 257 return; 259 case NakAckHeader.OUT_OF_BAND_RSP: 260 out_of_bander.receiveAck(hdr.seqno, msg.getSrc()); 261 return; 262 263 default: 264 if(log.isErrorEnabled()) log.error("NakAck header type " + hdr.type + " not known !"); 265 break; 266 } 267 } 269 } 270 271 passUp(evt); 272 } 273 274 275 280 public void down(Event evt) { 281 Message msg; 282 283 if(log.isTraceEnabled()) 284 log.trace("queued_msgs has " + queued_msgs.size() + " messages " + 285 "\n\nnaker:\n" + naker.dumpContents() + "\n\nout_of_bander: " + 286 out_of_bander.dumpContents() + "\n-----------------------------\n"); 287 288 switch(evt.getType()) { 289 290 case Event.MSG: 291 msg=(Message)evt.getArg(); 292 293 if(vid == null || (msg.getDest() != null && !msg.getDest().isMulticastAddress())) 295 break; 296 297 if(send_next_msg_out_of_band) { 298 out_of_bander.send(msg); 299 send_next_msg_out_of_band=false; 300 } 301 else if(send_next_msg_acking) { 302 naker.setAcks(true); naker.send(msg); 304 naker.setAcks(false); send_next_msg_acking=false; 306 } 307 else 308 naker.send(msg); 309 310 return; 312 case Event.GET_MSG_DIGEST: 313 long[] highest_seqnos=(long[])evt.getArg(); 314 Digest digest=naker.computeMessageDigest(highest_seqnos); 315 passUp(new Event(Event.GET_MSG_DIGEST_OK, digest)); 316 return; 317 318 case Event.GET_MSGS: 319 List lower_seqnos=naker.getMessagesInRange((long[][])evt.getArg()); 320 passUp(new Event(Event.GET_MSGS_OK, lower_seqnos)); 321 return; 322 323 case Event.REBROADCAST_MSGS: 324 rebroadcastMsgs((Vector )evt.getArg()); 325 break; 326 327 case Event.TMP_VIEW: 328 Vector mbrs=((View)evt.getArg()).getMembers(); 329 members=mbrs != null? (Vector )mbrs.clone() : new Vector (11); 330 break; 331 332 case Event.VIEW_CHANGE: 333 synchronized(this) { 334 view=((View)((View)evt.getArg()).clone()); 335 vid=view.getVid(); 336 337 members=(Vector )view.getMembers().clone(); 338 339 naker.reset(); 340 out_of_bander.reset(); 341 342 is_server=true; 344 if(queued_msgs.size() > 0) 346 deliverQueuedMessages(); 347 } 348 break; 349 350 case Event.BECOME_SERVER: 351 is_server=true; 352 break; 353 354 case Event.SWITCH_NAK: 355 naker.setAcks(false); return; 358 case Event.SWITCH_NAK_ACK: 359 send_next_msg_acking=true; 360 return; 362 case Event.SWITCH_OUT_OF_BAND: 363 send_next_msg_out_of_band=true; 364 return; 365 366 case Event.GET_MSGS_RECEIVED: long[] h=naker.getHighestSeqnosDelivered(); 368 passUp(new Event(Event.GET_MSGS_RECEIVED_OK, h)); 369 break; 370 } 371 372 373 passDown(evt); 374 } 375 376 377 boolean coordinator() { 378 if(members == null || members.size() < 1 || local_addr == null) 379 return false; 380 return local_addr.equals(members.elementAt(0)); 381 } 382 383 386 void rebroadcastMsgs(Vector v) { 387 Vector final_v; 388 Message m1, m2; 389 NakAckHeader h1, h2; 390 391 if(v == null) return; 392 final_v=new Vector (v.size()); 393 394 396 for(int i=0; i < v.size(); i++) { 397 boolean present=false; 398 m1=(Message)v.elementAt(i); 399 h1=m1 != null? (NakAckHeader)m1.getHeader(getName()) : null; 400 if(m1 == null || h1 == null) { if(log.isErrorEnabled()) log.error("message is null"); 402 continue; 403 } 404 405 for(int j=0; j < final_v.size(); j++) { 406 m2=(Message)final_v.elementAt(j); 407 h2=m2 != null? (NakAckHeader)m2.getHeader(getName()) : null; 408 if(m2 == null || h2 == null) { if(log.isErrorEnabled()) log.error("message m2 is null"); 410 continue; 411 } 412 if(h1.seqno == h2.seqno && m1.getSrc() != null && m2.getSrc() != null && 413 m1.getSrc().equals(m2.getSrc())) { 414 present=true; 415 } 416 } 417 if(!present) 418 final_v.addElement(m1); 419 } 420 421 if(log.isWarnEnabled()) log.warn("rebroadcasting " + final_v.size() + " messages"); 422 423 424 for(int i=0; i < final_v.size(); i++) { 425 m1=(Message)final_v.elementAt(i); 426 naker.resend(m1); 427 } 428 429 naker.waitUntilAllAcksReceived(rebroadcast_timeout); 432 passUp(new Event(Event.REBROADCAST_MSGS_OK)); 433 } 434 435 436 440 void deliverQueuedMessages() { 441 NakAckHeader hdr; 442 Message tmpmsg; 443 int rc; 444 445 while(queued_msgs.size() > 0) { 446 tmpmsg=(Message)queued_msgs.removeFromHead(); 447 hdr=(NakAckHeader)tmpmsg.getHeader(getName()); 448 rc=hdr.vid.compareTo(vid); 449 if(rc == 0) { up(new Event(Event.MSG, tmpmsg)); 451 } 452 else if(rc > 0) { 453 ; 454 } 455 else 456 457 ; } 459 } 460 461 462 public boolean setProperties(Properties props) { 463 String str; 464 long[] tmp; 465 466 super.setProperties(props); 467 str=props.getProperty("retransmit_timeout"); 468 if(str != null) { 469 tmp=Util.parseCommaDelimitedLongs(str); 470 props.remove("retransmit_timeout"); 471 if(tmp != null && tmp.length > 0) 472 retransmit_timeout=tmp; 473 } 474 475 str=props.getProperty("rebroadcast_timeout"); 476 if(str != null) { 477 rebroadcast_timeout=Long.parseLong(str); 478 props.remove("rebroadcast_timeout"); 479 } 480 481 if(props.size() > 0) { 482 System.err.println("NAKACK.setProperties(): these properties are not recognized:"); 483 props.list(System.out); 484 return false; 485 } 486 return true; 487 } 488 489 490 class NAKer implements Retransmitter.RetransmitCommand, AckMcastSenderWindow.RetransmitCommand { 491 long seqno=0; final Hashtable received_msgs=new Hashtable (); final Hashtable sent_msgs=new Hashtable (); final AckMcastSenderWindow sender_win=new AckMcastSenderWindow(this, timer); 495 boolean acking=false; long deleted_up_to=0; 497 498 499 final LastMessageRetransmitter last_msg_xmitter=new LastMessageRetransmitter(); 501 502 503 private class LastMessageRetransmitter implements TimeScheduler.Task { 504 boolean stopped=false; 505 int num_times=2; long last_xmitted_seqno=0; 507 508 509 public void stop() { 510 stopped=true; 511 } 512 513 public boolean cancelled() { 514 return stopped; 515 } 516 517 public long nextInterval() { 518 return retransmit_timeout[0]; 519 } 520 521 522 526 public void run() { 527 synchronized(sent_msgs) { 528 long prevSeqno=seqno - 1; 529 530 if(prevSeqno == last_xmitted_seqno) { 531 532 if(log.isInfoEnabled()) log.info("prevSeqno=" + prevSeqno + ", last_xmitted_seqno=" + 533 last_xmitted_seqno + ", num_times=" + num_times); 534 if(--num_times <= 0) 535 return; 536 } 537 else { 538 num_times=3; 539 last_xmitted_seqno=prevSeqno; 540 } 541 542 if((prevSeqno >= 0) && (prevSeqno > deleted_up_to)) { 543 544 if(log.isInfoEnabled()) log.info("retransmitting last message " + prevSeqno); 545 retransmit(null, prevSeqno, prevSeqno); 546 } 547 } 548 } 549 550 } 551 552 553 NAKer() { 554 if(timer != null) 555 timer.add(last_msg_xmitter, true); else 557 if(log.isErrorEnabled()) log.error("timer is null"); 558 } 559 560 561 long getNextSeqno() { 562 return seqno++; 563 } 564 565 566 long getHighestSeqnoSent() { 567 long highest_sent=-1; 568 for(Enumeration e=sent_msgs.keys(); e.hasMoreElements();) 569 highest_sent=Math.max(highest_sent, ((Long )e.nextElement()).longValue()); 570 return highest_sent; 571 } 572 573 574 579 long[] getHighestSeqnosDelivered() { 580 long[] highest_deliv=members != null? new long[members.size()] : null; 581 Address mbr; 582 NakReceiverWindow win; 583 584 if(highest_deliv == null) return null; 585 586 for(int i=0; i < highest_deliv.length; i++) highest_deliv[i]=-1; 587 588 synchronized(members) { 589 for(int i=0; i < members.size(); i++) { 590 mbr=(Address)members.elementAt(i); 591 win=(NakReceiverWindow)received_msgs.get(mbr); 592 if(win != null) 593 highest_deliv[i]=win.getHighestDelivered(); 594 } 595 } 596 return highest_deliv; 597 } 598 599 600 603 List getSentMessagesHigherThan(long seqno) { 604 List retval=new List(); 605 Long key; 606 607 for(Enumeration e=sent_msgs.keys(); e.hasMoreElements();) { 608 key=(Long )e.nextElement(); 609 if(key.longValue() > seqno) 610 retval.add(sent_msgs.get(key)); 611 } 612 return retval; 613 } 614 615 616 626 Digest computeMessageDigest(long[] highest_seqnos) { 627 Digest digest=highest_seqnos != null? new Digest(highest_seqnos.length) : null; 628 Address sender; 629 NakReceiverWindow win; 630 List unstable_msgs; 631 int own_index; 632 long highest_seqno_sent=-1, highest_seqno_received=-1; 633 634 if(digest == null) { 635 636 if(log.isWarnEnabled()) log.warn("highest_seqnos is null, cannot compute digest !"); 637 return null; 638 } 639 640 if(highest_seqnos.length != members.size()) { 641 642 if(log.isWarnEnabled()) log.warn("the mbrship size and the size " + 643 "of the highest_seqnos array are not equal, cannot compute digest !"); 644 return null; 645 } 646 647 System.arraycopy(highest_seqnos, 0, digest.highest_seqnos, 0, digest.highest_seqnos.length); 648 649 for(int i=0; i < highest_seqnos.length; i++) { 650 sender=(Address)members.elementAt(i); 651 if(sender == null) continue; 652 win=(NakReceiverWindow)received_msgs.get(sender); 653 if(win == null) continue; 654 digest.highest_seqnos[i]=win.getHighestReceived(); 655 unstable_msgs=win.getMessagesHigherThan(highest_seqnos[i]); 656 for(Enumeration e=unstable_msgs.elements(); e.hasMoreElements();) 657 digest.msgs.add(e.nextElement()); 658 } 659 660 661 663 664 own_index=members.indexOf(local_addr); 665 if(own_index == -1) { 666 667 if(log.isWarnEnabled()) log.warn("no own address in highest_seqnos"); 668 return digest; 669 } 670 highest_seqno_received=digest.highest_seqnos[own_index]; 671 highest_seqno_sent=getHighestSeqnoSent(); 672 673 if(highest_seqno_sent > highest_seqno_received) { 674 digest.highest_seqnos[own_index]=highest_seqno_sent; 676 677 unstable_msgs=getSentMessagesHigherThan(highest_seqno_received); 679 for(Enumeration e=unstable_msgs.elements(); e.hasMoreElements();) 680 digest.msgs.add(e.nextElement()); 681 } 682 683 return digest; 684 } 685 686 687 691 List getMessagesInRange(long[][] range) { 692 List retval=new List(); 693 List tmp; 694 NakReceiverWindow win; 695 Address sender; 696 697 for(int i=0; i < range.length; i++) { 698 if(range[i] != null) { 699 sender=(Address)members.elementAt(i); 700 if(sender == null) continue; 701 win=(NakReceiverWindow)received_msgs.get(sender); 702 if(win == null) continue; 703 tmp=win.getMessagesInRange(range[i][0], range[i][1]); 704 if(tmp == null || tmp.size() < 1) continue; 705 for(Enumeration e=tmp.elements(); e.hasMoreElements();) 706 retval.add(e.nextElement()); 707 } 708 } 709 return retval; 710 } 711 712 713 void setAcks(boolean f) { 714 acking=f; 715 } 716 717 718 722 void stable(long[] seqnos) { 723 int index; 724 long seqno; 725 NakReceiverWindow recv_win; 726 Address sender; 727 728 if(members == null || local_addr == null) { 729 if(log.isWarnEnabled()) log.warn("members or local_addr are null !"); 730 return; 731 } 732 index=members.indexOf(local_addr); 733 734 if(index < 0) { 735 736 if(log.isWarnEnabled()) log.warn("member " + local_addr + " not found in " + members); 737 return; 738 } 739 seqno=seqnos[index]; 740 741 if(log.isInfoEnabled()) log.info("deleting stable messages [" + 742 deleted_up_to + " - " + seqno + ']'); 743 744 synchronized(sent_msgs) { 746 for(long i=deleted_up_to; i <= seqno; i++) { 747 sent_msgs.remove(new Long (i)); 748 } 749 deleted_up_to=seqno; 750 } 751 for(int i=0; i < members.size(); i++) { 753 sender=(Address)members.elementAt(i); 754 recv_win=(NakReceiverWindow)received_msgs.get(sender); 755 if(recv_win != null) 756 recv_win.stable(seqnos[i]); } 758 } 759 760 761 void send(Message msg) { 762 long id=getNextSeqno(); 763 ViewId vid_copy; 764 765 if(vid == null) 766 return; 767 vid_copy=(ViewId)vid.clone(); 768 769 if(acking) { 770 msg.putHeader(getName(), new NakAckHeader(NakAckHeader.NAK_ACK_MSG, id, vid_copy)); 771 sender_win.add(id, msg.copy(), (Vector )members.clone()); } 773 else 774 msg.putHeader(getName(), new NakAckHeader(NakAckHeader.NAK_MSG, id, vid_copy)); 775 776 if(log.isInfoEnabled()) log.info("sending msg #" + id); 777 778 sent_msgs.put(new Long (id), msg.copy()); 779 passDown(new Event(Event.MSG, msg)); 780 } 781 782 783 784 785 786 802 803 804 805 810 void resend(Message msg) { 811 Message copy=msg.copy(); 812 NakAckHeader hdr=(NakAckHeader)copy.getHeader(getName()); 813 NakAckHeader wrapped_hdr; 814 long id=hdr.seqno; 815 816 if(vid == null) return; 817 copy.setDest(null); wrapped_hdr=new NakAckHeader(NakAckHeader.WRAPPED_MSG, hdr.seqno, hdr.vid); 819 wrapped_hdr.sender=local_addr; 820 copy.putHeader(WRAPPED_MSG_KEY, wrapped_hdr); 821 sender_win.add(id, copy.copy(), (Vector )members.clone()); 822 if(log.isInfoEnabled()) log.info("resending " + copy.getHeader(getName())); 823 passDown(new Event(Event.MSG, copy)); 824 } 825 826 827 void waitUntilAllAcksReceived(long timeout) { 828 sender_win.waitUntilAllAcksReceived(timeout); 829 } 830 831 832 void receive(long id, Message msg, Vector stable_msgs) { 833 Address sender=msg.getSrc(); 834 NakReceiverWindow win=(NakReceiverWindow)received_msgs.get(sender); 835 Message msg_to_deliver; 836 837 if(win == null) { 838 win=new NakReceiverWindow(sender, this, 0); 839 win.setRetransmitTimeouts(retransmit_timeout); 840 received_msgs.put(sender, win); 841 } 842 843 if(log.isInfoEnabled()) log.info("received <" + sender + '#' + id + '>'); 844 845 win.add(id, msg); while(true) { 847 msg_to_deliver=win.remove(); 848 if(msg_to_deliver == null) 849 break; 850 851 if(msg_to_deliver.getHeader(getName()) instanceof NakAckHeader) 852 msg_to_deliver.removeHeader(getName()); 853 passUp(new Event(Event.MSG, msg_to_deliver)); 854 } 855 } 856 857 858 void receiveAck(long id, Address sender) { 859 860 if(log.isInfoEnabled()) log.info("received ack <-- ACK <" + sender + '#' + id + '>'); 861 sender_win.ack(id, sender); 862 } 863 864 865 870 public void retransmit(long seqno, Message msg, Address dest) { 871 872 if(log.isInfoEnabled()) log.info("retransmitting message " + seqno + " to " + dest + 873 ", header is " + msg.getHeader(getName())); 874 875 if(members != null) { 878 if(!members.contains(dest)) { 879 880 if(log.isInfoEnabled()) log.info("retransmitting " + seqno + ") to " + dest + ": " + dest + 881 " is not a member; discarding retransmission and removing " + 882 dest + " from sender_win"); 883 sender_win.remove(dest); 884 return; 885 } 886 } 887 888 msg.setDest(dest); 889 passDown(new Event(Event.MSG, msg)); 890 } 891 892 893 898 public void retransmit(long first_seqno, long last_seqno, Address sender) { 899 900 if(log.isInfoEnabled()) log.info("retransmit([" + first_seqno + ", " + last_seqno + 901 "]) to " + sender + ", vid=" + vid); 902 903 NakAckHeader hdr=new NakAckHeader(NakAckHeader.RETRANSMIT_MSG, first_seqno, (ViewId)vid.clone()); 904 Message retransmit_msg=new Message(sender, null, null); 905 906 hdr.last_seqno=last_seqno; 907 retransmit_msg.putHeader(getName(), hdr); 908 passDown(new Event(Event.MSG, retransmit_msg)); 909 } 910 911 912 void retransmit(Address dest, long first_seqno, long last_seqno) { 914 Message m, retr_msg; 915 916 for(long i=first_seqno; i <= last_seqno; i++) { 917 m=(Message)sent_msgs.get(new Long (i)); 918 if(m == null) { 919 if(log.isWarnEnabled()) log.warn("(to " + dest + "): message with " + "seqno=" + i + " not found !"); 920 continue; 921 } 922 923 retr_msg=m.copy(); 924 retr_msg.setDest(dest); 925 926 try { 927 passDown(new Event(Event.MSG, retr_msg)); 928 } 929 catch(Exception e) { 930 if(log.isDebugEnabled()) log.debug("exception is " + e); 931 } 932 } 933 } 934 935 936 void stop() { 937 if(sender_win != null) 938 sender_win.stop(); 939 } 940 941 942 void reset() { 943 NakReceiverWindow win; 944 945 if(!coordinator()) 949 sender_win.reset(); 950 951 sent_msgs.clear(); 952 for(Enumeration e=received_msgs.elements(); e.hasMoreElements();) { 953 win=(NakReceiverWindow)e.nextElement(); 954 win.reset(); 955 } 956 received_msgs.clear(); 957 seqno=0; 958 deleted_up_to=0; 959 } 960 961 962 public void suspect(Address mbr) { 963 NakReceiverWindow w; 964 965 w=(NakReceiverWindow)received_msgs.get(mbr); 966 if(w != null) { 967 w.reset(); 968 received_msgs.remove(mbr); 969 } 970 971 sender_win.suspect(mbr); } 973 974 975 String dumpContents() { 976 StringBuffer ret=new StringBuffer (); 977 978 ret.append("\nsent_msgs: " + sent_msgs.size()); 979 980 ret.append("\nreceived_msgs: "); 981 for(Enumeration e=received_msgs.keys(); e.hasMoreElements();) { 982 Address key=(Address)e.nextElement(); 983 NakReceiverWindow w=(NakReceiverWindow)received_msgs.get(key); 984 ret.append('\n' + w.toString()); 985 } 986 987 ret.append("\nsender_win: " + sender_win.toString()); 988 989 990 return ret.toString(); 991 } 992 993 994 } 995 996 997 class OutOfBander implements AckMcastSenderWindow.RetransmitCommand { 998 final AckMcastSenderWindow sender_win=new AckMcastSenderWindow(this, timer); 999 final AckMcastReceiverWindow receiver_win=new AckMcastReceiverWindow(); 1000 long seqno=0; 1001 1002 1003 void send(Message msg) { 1004 long id=seqno++; 1005 Vector stable_msgs=sender_win.getStableMessages(); 1006 NakAckHeader hdr; 1007 1008 if(log.isInfoEnabled()) log.info("sending msg #=" + id); 1009 1010 hdr=new NakAckHeader(NakAckHeader.OUT_OF_BAND_MSG, id, null); 1011 hdr.stable_msgs=stable_msgs; 1012 msg.putHeader(getName(), hdr); 1013 1014 sender_win.add(id, msg.copy(), (Vector )members.clone()); 1016 1017 passDown(new Event(Event.MSG, msg)); 1018 } 1019 1020 1021 void receive(long id, Message msg, Vector stable_msgs) { 1022 Address sender=msg.getSrc(); 1023 1024 Message ack_msg=new Message(msg.getSrc(), null, null); 1026 NakAckHeader hdr=new NakAckHeader(NakAckHeader.OUT_OF_BAND_RSP, id, null); 1027 ack_msg.putHeader(getName(), hdr); 1028 1029 1030 if(log.isInfoEnabled()) log.info("received <" + sender + '#' + id + ">\n"); 1031 1032 if(receiver_win.add(sender, id)) passUp(new Event(Event.MSG, msg)); 1034 1035 passDown(new Event(Event.MSG, ack_msg)); if(log.isInfoEnabled()) log.info("sending ack <" + sender + '#' + id + ">\n"); 1037 1038 if(stable_msgs != null) 1039 receiver_win.remove(sender, stable_msgs); 1040 } 1041 1042 1043 void receiveAck(long id, Address sender) { 1044 if(log.isInfoEnabled()) log.info("received ack <" + sender + '#' + id + '>'); 1045 sender_win.ack(id, sender); 1046 } 1047 1048 1049 1054 public void retransmit(long seqno, Message msg, Address dest) { 1055 if(log.isInfoEnabled()) log.info("dest=" + dest + ", msg #" + seqno); 1056 msg.setDest(dest); 1057 passDown(new Event(Event.MSG, msg)); 1058 } 1059 1060 1061 void reset() { 1062 sender_win.reset(); receiver_win.reset(); } 1065 1066 1067 void suspect(Address mbr) { 1068 sender_win.suspect(mbr); 1069 receiver_win.suspect(mbr); 1070 } 1071 1072 1073 void start() { 1074 sender_win.start(); 1075 } 1076 1077 void stop() { 1078 if(sender_win != null) 1079 sender_win.stop(); 1080 } 1081 1082 1083 String dumpContents() { 1084 StringBuffer ret=new StringBuffer (); 1085 ret.append("\nsender_win:\n" + sender_win.toString() + 1086 "\nreceiver_win:\n" + receiver_win.toString()); 1087 return ret.toString(); 1088 } 1089 1090 1091 } 1092 1093 1094} 1095 | Popular Tags |