1 3 package org.jgroups.protocols.pbcast; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.NakReceiverWindow; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.stack.Retransmitter; 9 import org.jgroups.util.*; 10 11 import java.util.*; 12 import java.io.*; 13 14 15 31 public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand { 32 private long[] retransmit_timeout={600, 1200, 2400, 4800}; private boolean is_server=false; 34 private Address local_addr=null; 35 private final Vector members=new Vector(11); 36 private long seqno=0; private long max_xmit_size=8192; private int gc_lag=20; 40 44 private boolean use_mcast_xmit=false; 45 46 47 54 private boolean discard_delivered_msgs=false; 55 56 59 private int max_xmit_buf_size=0; 60 61 62 66 private final HashMap received_msgs=new HashMap(11); 67 68 69 private final TreeMap sent_msgs=new TreeMap(); 70 71 private boolean leaving=false; 72 private TimeScheduler timer=null; 73 static final String name="NAKACK"; 74 75 76 165 166 167 168 169 public NAKACK() { 170 } 171 172 173 public String getName() { 174 return name; 175 } 176 177 178 public Vector providedUpServices() { 179 Vector retval=new Vector(5); 180 retval.addElement(new Integer (Event.GET_DIGEST)); 181 retval.addElement(new Integer (Event.GET_DIGEST_STABLE)); 182 retval.addElement(new Integer (Event.GET_DIGEST_STATE)); 183 retval.addElement(new Integer (Event.SET_DIGEST)); 184 retval.addElement(new Integer (Event.MERGE_DIGEST)); 185 return retval; 186 } 187 188 189 public Vector providedDownServices() { 190 Vector retval=new Vector(2); 191 retval.addElement(new Integer (Event.GET_DIGEST)); 192 retval.addElement(new Integer (Event.GET_DIGEST_STABLE)); 193 return retval; 194 } 195 196 197 public void start() throws Exception { 198 timer=stack != null ? stack.timer : null; 199 if(timer == null) { 200 throw new Exception ("NAKACK.up(): timer is null"); 201 } 202 } 203 204 public void stop() { 205 removeAll(); } 207 208 209 213 public void down(Event evt) { 214 Digest digest; 215 Vector mbrs; 216 217 switch(evt.getType()) { 218 219 case Event.MSG: 220 Message msg=(Message)evt.getArg(); 221 Address dest=msg.getDest(); 222 if(dest != null && !dest.isMulticastAddress()) { 223 break; } 225 send(evt, msg); 226 return; 228 case Event.STABLE: stable((Digest)evt.getArg()); 230 return; 232 case Event.GET_DIGEST: 233 digest=getDigest(); 234 passUp(new Event(Event.GET_DIGEST_OK, digest != null ? digest.copy() : null)); 235 return; 236 237 case Event.GET_DIGEST_STABLE: 238 digest=getDigestHighestDeliveredMsgs(); 239 passUp(new Event(Event.GET_DIGEST_STABLE_OK, digest != null ? digest.copy() : null)); 240 return; 241 242 case Event.GET_DIGEST_STATE: 243 digest=getDigest(); 244 passUp(new Event(Event.GET_DIGEST_STATE_OK, digest != null ? digest.copy() : null)); 245 return; 246 247 case Event.SET_DIGEST: 248 setDigest((Digest)evt.getArg()); 249 return; 250 251 case Event.MERGE_DIGEST: 252 mergeDigest((Digest)evt.getArg()); 253 return; 254 255 case Event.CONFIG: 256 passDown(evt); 257 if(log.isDebugEnabled()) { 258 log.debug("received CONFIG event: " + evt.getArg()); 259 } 260 handleConfigEvent((HashMap)evt.getArg()); 261 return; 262 263 case Event.TMP_VIEW: 264 mbrs=((View)evt.getArg()).getMembers(); 265 members.removeAllElements(); 266 members.addAll(mbrs); 267 adjustReceivers(); 268 break; 269 270 case Event.VIEW_CHANGE: 271 mbrs=((View)evt.getArg()).getMembers(); 272 members.removeAllElements(); 273 members.addAll(mbrs); 274 adjustReceivers(); 275 is_server=true; break; 277 278 case Event.BECOME_SERVER: 279 is_server=true; 280 break; 281 282 case Event.DISCONNECT: 283 leaving=true; 284 removeAll(); 285 seqno=0; 286 break; 287 } 288 289 passDown(evt); 290 } 291 292 293 294 298 public void up(Event evt) { 299 NakAckHeader hdr; 300 Message msg; 301 Digest digest; 302 303 switch(evt.getType()) { 304 305 case Event.STABLE: stable((Digest)evt.getArg()); 307 return; 309 case Event.GET_DIGEST: 310 digest=getDigestHighestDeliveredMsgs(); 311 passDown(new Event(Event.GET_DIGEST_OK, digest)); 312 return; 313 314 case Event.GET_DIGEST_STABLE: 315 digest=getDigestHighestDeliveredMsgs(); 316 passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest)); 317 return; 318 319 case Event.SET_LOCAL_ADDRESS: 320 local_addr=(Address)evt.getArg(); 321 break; 322 323 case Event.CONFIG: 324 passUp(evt); 325 if(log.isDebugEnabled()) { 326 log.debug("received CONFIG event: " + evt.getArg()); 327 } 328 handleConfigEvent((HashMap)evt.getArg()); 329 return; 330 331 case Event.MSG: 332 msg=(Message)evt.getArg(); 333 hdr=(NakAckHeader)msg.getHeader(name); 334 if(hdr == null) 335 break; 337 if(!is_server) { 339 if(log.isTraceEnabled()) 340 log.trace("message was discarded (not yet server)"); 341 return; 342 } 343 344 348 switch(hdr.type) { 349 350 case NakAckHeader.MSG: 351 handleMessage(msg, hdr); 352 return; 354 case NakAckHeader.XMIT_REQ: 355 if(hdr.range == null) { 356 if(log.isErrorEnabled()) { 357 log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + msg.getSrc()); 358 } 359 return; 360 } 361 handleXmitReq(msg.getSrc(), hdr.range.low, hdr.range.high); 362 return; 363 364 case NakAckHeader.XMIT_RSP: 365 if(log.isTraceEnabled()) 366 log.trace("received missing messages " + hdr.range); 367 handleXmitRsp(msg); 368 return; 369 370 default: 371 if(log.isErrorEnabled()) { 372 log.error("NakAck header type " + hdr.type + " not known !"); 373 } 374 return; 375 } 376 } 377 passUp(evt); 378 } 379 380 381 public boolean setProperties(Properties props) { 382 String str; 383 long[] tmp; 384 385 super.setProperties(props); 386 str=props.getProperty("retransmit_timeout"); 387 if(str != null) { 388 tmp=Util.parseCommaDelimitedLongs(str); 389 props.remove("retransmit_timeout"); 390 if(tmp != null && tmp.length > 0) { 391 retransmit_timeout=tmp; 392 } 393 } 394 395 str=props.getProperty("gc_lag"); 396 if(str != null) { 397 gc_lag=Integer.parseInt(str); 398 if(gc_lag < 1) { 399 System.err.println("NAKACK.setProperties(): gc_lag has to be at least 1"); 400 return false; 401 } 402 props.remove("gc_lag"); 403 } 404 405 str=props.getProperty("max_xmit_size"); 406 if(str != null) { 407 max_xmit_size=Long.parseLong(str); 408 props.remove("max_xmit_size"); 409 } 410 411 str=props.getProperty("use_mcast_xmit"); 412 if(str != null) { 413 use_mcast_xmit=Boolean.valueOf(str).booleanValue(); 414 props.remove("use_mcast_xmit"); 415 } 416 417 str=props.getProperty("discard_delivered_msgs"); 418 if(str != null) { 419 discard_delivered_msgs=Boolean.valueOf(str).booleanValue(); 420 props.remove("discard_delivered_msgs"); 421 } 422 423 str=props.getProperty("max_xmit_buf_size"); 424 if(str != null) { 425 max_xmit_buf_size=Integer.parseInt(str); 426 props.remove("max_xmit_buf_size"); 427 } 428 429 if(props.size() > 0) { 430 System.err.println("NAKACK.setProperties(): these properties are not recognized:"); 431 props.list(System.out); 432 return false; 433 } 434 return true; 435 } 436 437 438 439 440 441 442 long getNextSeqno() { 443 return seqno++; } 445 446 447 452 private final void send(Event evt, Message msg) { 453 long msg_id=getNextSeqno(); 454 if(log.isTraceEnabled()) 455 log.trace("sending msg #" + msg_id); 456 457 msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG, msg_id)); 458 synchronized(sent_msgs) { 459 if(Global.copy) { 460 sent_msgs.put(new Long (msg_id), msg.copy()); 461 } 462 else { 463 sent_msgs.put(new Long (msg_id), msg); 464 } 465 } 466 passDown(evt); 467 } 468 469 470 474 void handleMessage(Message msg, NakAckHeader hdr) { 475 NakReceiverWindow win; 476 Message msg_to_deliver; 477 Address sender=msg.getSrc(); 478 479 if(sender == null) { 480 if(log.isErrorEnabled()) 481 log.error("sender of message is null"); 482 return; 483 } 484 485 if(log.isTraceEnabled()) { 486 StringBuffer sb=new StringBuffer ('['); 487 sb.append(local_addr).append("] received ").append(sender).append('#').append(hdr.seqno); 488 log.trace(sb.toString()); 489 } 490 491 493 496 synchronized(received_msgs) { 497 win=(NakReceiverWindow)received_msgs.get(sender); 498 } 499 if(win == null) { if(leaving) 501 return; 502 if(log.isWarnEnabled()) { 503 StringBuffer sb=new StringBuffer ('['); 504 sb.append(local_addr).append("] discarded message from non-member ").append(sender); 505 if(log.isWarnEnabled()) 506 log.warn(sb.toString()); 507 } 508 return; 509 } 510 win.add(hdr.seqno, msg); 512 while((msg_to_deliver=win.remove()) != null) { 513 514 passUp(new Event(Event.MSG, msg_to_deliver)); 517 } 518 } 519 520 521 532 void handleXmitReq(Address dest, long first_seqno, long last_seqno) { 533 Message m, tmp; 534 LinkedList list; 535 long size=0, marker=first_seqno, len; 536 537 if(log.isTraceEnabled()) 538 log.trace(local_addr + ": received xmit request for " + dest + " [" + first_seqno + " - " + last_seqno + ']'); 539 540 if(first_seqno > last_seqno) { 541 if(log.isErrorEnabled()) 542 log.error("first_seqno (" + first_seqno + ") > last_seqno (" + last_seqno + "): not able to retransmit"); 543 return; 544 } 545 list=new LinkedList(); 546 for(long i=first_seqno; i <= last_seqno; i++) { 547 m=(Message)sent_msgs.get(new Long (i)); if(m == null) { 549 if(log.isErrorEnabled()) { 550 log.error("(requester=" + dest + ", local_addr=" + this.local_addr + ") message with " + 551 "seqno=" + i + " not found in sent_msgs ! sent_msgs=" + printSentMsgs()); 552 } 553 continue; 554 } 555 len=m.size(); 556 size+=len; 557 if(size > max_xmit_size && list.size() > 0) { 560 if(log.isTraceEnabled()) 562 log.trace("xmitting msgs [" + marker + '-' + (i - 1) + "] to " + dest); 563 sendXmitRsp(dest, (LinkedList)list.clone(), marker, i - 1); 564 marker=i; 565 list.clear(); 566 size=len; 569 } 570 if(Global.copy) { 571 tmp=m.copy(); 572 } 573 else { 574 tmp=m; 575 } 576 tmp.setDest(dest); 577 tmp.setSrc(local_addr); 578 list.add(tmp); 579 } 580 581 if(list.size() > 0) { 582 if(log.isTraceEnabled()) 583 log.trace("xmitting msgs [" + marker + '-' + last_seqno + "] to " + dest); 584 sendXmitRsp(dest, (LinkedList)list.clone(), marker, last_seqno); 585 list.clear(); 586 } 587 } 588 589 590 void sendXmitRsp(Address dest, LinkedList xmit_list, long first_seqno, long last_seqno) { 591 Buffer buf; 592 if(xmit_list == null || xmit_list.size() == 0) { 593 if(log.isErrorEnabled()) 594 log.error("xmit_list is empty"); 595 return; 596 } 597 if(use_mcast_xmit) 598 dest=null; 599 600 try { 601 buf=Util.msgListToByteBuffer(xmit_list); 602 Message msg=new Message(dest, null, buf.getBuf(), buf.getOffset(), buf.getLength()); 603 msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP, first_seqno, last_seqno)); 604 passDown(new Event(Event.MSG, msg)); 605 } 606 catch(IOException ex) { 607 log.error("failed marshalling xmit list", ex); 608 } 609 } 610 611 612 613 614 void handleXmitRsp(Message msg) { 615 LinkedList list; 616 Message m; 617 618 if(msg == null) { 619 if(log.isWarnEnabled()) 620 log.warn("message is null"); 621 return; 622 } 623 try { 624 list=Util.byteBufferToMessageList(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); 625 if(list != null) { 626 for(Iterator it=list.iterator(); it.hasNext();) { 627 m=(Message)it.next(); 628 up(new Event(Event.MSG, m)); 629 } 630 list.clear(); 631 } 632 } 633 catch(Exception ex) { 634 if(log.isErrorEnabled()) { 635 log.error("message did not contain a list (LinkedList) of retransmitted messages: " + ex); 636 } 637 } 638 } 639 640 641 642 643 647 void adjustReceivers() { 648 Address sender; 649 NakReceiverWindow win; 650 651 synchronized(received_msgs) { 652 653 for(Iterator it=received_msgs.keySet().iterator(); it.hasNext();) { 655 sender=(Address)it.next(); 656 if(!members.contains(sender)) { 657 win=(NakReceiverWindow)received_msgs.get(sender); 658 win.reset(); 659 if(log.isDebugEnabled()) { 660 log.debug("removing " + sender + " from received_msgs (not member anymore)"); 661 } 662 it.remove(); 663 } 664 } 665 666 for(int i=0; i < members.size(); i++) { 668 sender=(Address)members.elementAt(i); 669 if(!received_msgs.containsKey(sender)) { 670 win=new NakReceiverWindow(sender, this, 0, timer); 671 win.setRetransmitTimeouts(retransmit_timeout); 672 win.setDiscardDeliveredMessages(discard_delivered_msgs); 673 win.setMaxXmitBufSize(this.max_xmit_buf_size); 674 received_msgs.put(sender, win); 675 } 676 } 677 } 678 } 679 680 681 684 Digest getDigest() { 685 Digest digest; 686 Address sender; 687 Range range; 688 689 digest=new Digest(members.size()); 690 for(int i=0; i < members.size(); i++) { 691 sender=(Address)members.elementAt(i); 692 range=getLowestAndHighestSeqno(sender, false); if(range == null) { 694 if(log.isErrorEnabled()) { 695 log.error("range is null"); 696 } 697 continue; 698 } 699 digest.add(sender, range.low, range.high); } 701 return digest; 702 } 703 704 705 711 Digest getDigestHighestDeliveredMsgs() { 712 Digest digest; 713 Address sender; 714 Range range; 715 long high_seqno_seen=0; 716 717 digest=new Digest(members.size()); 718 for(int i=0; i < members.size(); i++) { 719 sender=(Address)members.elementAt(i); 720 range=getLowestAndHighestSeqno(sender, true); if(range == null) { 722 if(log.isErrorEnabled()) { 723 log.error("range is null"); 724 } 725 continue; 726 } 727 high_seqno_seen=getHighSeqnoSeen(sender); 728 digest.add(sender, range.low, range.high, high_seqno_seen); } 730 return digest; 731 } 732 733 734 738 void setDigest(Digest d) { 739 Address sender; 740 NakReceiverWindow win; 741 long initial_seqno; 742 743 clear(); 744 if(d == null || d.senders == null) { 745 if(log.isErrorEnabled()) { 746 log.error("digest or digest.senders is null"); 747 } 748 return; 749 } 750 for(int i=0; i < d.size(); i++) { 751 sender=d.senderAt(i); 752 if(sender == null) { 753 if(log.isErrorEnabled()) { 754 log.error("sender at index " + i + " in digest is null"); 755 } 756 continue; 757 } 758 initial_seqno=d.highSeqnoAt(i); 759 win=new NakReceiverWindow(sender, this, initial_seqno, timer); 760 win.setRetransmitTimeouts(retransmit_timeout); 761 win.setDiscardDeliveredMessages(discard_delivered_msgs); 762 win.setMaxXmitBufSize(this.max_xmit_buf_size); 763 synchronized(received_msgs) { 764 received_msgs.put(sender, win); 765 } 766 } 767 } 768 769 770 775 void mergeDigest(Digest d) { 776 Address sender; 777 NakReceiverWindow win; 778 long initial_seqno; 779 780 if(d == null || d.senders == null) { 781 if(log.isErrorEnabled()) { 782 log.error("digest or digest.senders is null"); 783 } 784 return; 785 } 786 for(int i=0; i < d.size(); i++) { 787 sender=d.senderAt(i); 788 if(sender == null) { 789 if(log.isErrorEnabled()) { 790 log.error("sender at index " + i + " in digest is null"); 791 } 792 continue; 793 } 794 initial_seqno=d.highSeqnoAt(i); 795 synchronized(received_msgs) { 796 win=(NakReceiverWindow)received_msgs.get(sender); 797 if(win == null) { 798 win=new NakReceiverWindow(sender, this, initial_seqno, timer); 799 win.setRetransmitTimeouts(retransmit_timeout); 800 win.setDiscardDeliveredMessages(discard_delivered_msgs); 801 win.setMaxXmitBufSize(this.max_xmit_buf_size); 802 received_msgs.put(sender, win); 803 } 804 else { 805 if(win.getHighestReceived() < initial_seqno) { 806 win.reset(); 807 received_msgs.remove(sender); 808 win=new NakReceiverWindow(sender, this, initial_seqno, timer); 809 win.setRetransmitTimeouts(retransmit_timeout); 810 win.setDiscardDeliveredMessages(discard_delivered_msgs); 811 win.setMaxXmitBufSize(this.max_xmit_buf_size); 812 received_msgs.put(sender, win); 813 } 814 } 815 } 816 } 817 } 818 819 820 828 Range getLowestAndHighestSeqno(Address sender, boolean stop_at_gaps) { 829 Range r=null; 830 NakReceiverWindow win; 831 832 if(sender == null) { 833 if(log.isErrorEnabled()) { 834 log.error("sender is null"); 835 } 836 return r; 837 } 838 synchronized(received_msgs) { 839 win=(NakReceiverWindow)received_msgs.get(sender); 840 } 841 if(win == null) { 842 if(log.isErrorEnabled()) { 843 log.error("sender " + sender + " not found in received_msgs"); 844 } 845 return r; 846 } 847 if(stop_at_gaps) { 848 r=new Range(win.getLowestSeen(), win.getHighestSeen()); } 850 else { 851 r=new Range(win.getLowestSeen(), win.getHighestReceived() + 1); } 853 return r; 854 } 855 856 857 862 long getHighSeqnoSeen(Address sender) { 863 NakReceiverWindow win; 864 long ret=0; 865 866 if(sender == null) { 867 if(log.isErrorEnabled()) { 868 log.error("sender is null"); 869 } 870 return ret; 871 } 872 if(sender.equals(local_addr)) { 873 return seqno - 1; 874 } 875 876 synchronized(received_msgs) { 877 win=(NakReceiverWindow)received_msgs.get(sender); 878 } 879 if(win == null) { 880 if(log.isErrorEnabled()) { 881 log.error("sender " + sender + " not found in received_msgs"); 882 } 883 return ret; 884 } 885 ret=win.getHighestReceived(); 886 return ret; 887 } 888 889 890 896 void stable(Digest d) { 897 long tmp_seqno; 898 NakReceiverWindow recv_win; 899 Address sender; 900 long my_highest_rcvd; long stability_highest_rcvd; 903 if(members == null || local_addr == null || d == null) { 904 if(log.isWarnEnabled()) 905 log.warn("members, local_addr or digest are null !"); 906 return; 907 } 908 909 if(log.isDebugEnabled()) { 910 log.debug("received digest " + d); 911 } 912 913 for(int i=0; i < d.size(); i++) { 914 sender=d.senderAt(i); 915 tmp_seqno=d.highSeqnoAt(i); 916 if(sender == null) 917 continue; 918 919 synchronized(received_msgs) { 923 recv_win=(NakReceiverWindow)received_msgs.get(sender); 924 } 925 if(recv_win != null) { 926 my_highest_rcvd=recv_win.getHighestReceived(); 927 stability_highest_rcvd=d.highSeqnoSeenAt(i); 928 929 if(stability_highest_rcvd >= 0 && stability_highest_rcvd > my_highest_rcvd) { 930 if(log.isTraceEnabled()) { 931 log.trace("my_highest_rcvd (" + my_highest_rcvd + ") < stability_highest_rcvd (" + 932 stability_highest_rcvd + "): requesting retransmission of " + 933 sender + '#' + stability_highest_rcvd); 934 } 935 retransmit(stability_highest_rcvd, stability_highest_rcvd, sender); 936 } 937 } 938 939 tmp_seqno-=gc_lag; 940 if(tmp_seqno < 0) { 941 continue; 942 } 943 944 if(log.isTraceEnabled()) 945 log.trace("deleting msgs <= " + tmp_seqno + " from " + sender); 946 947 if(sender.equals(local_addr)) { 949 synchronized(sent_msgs) { 950 SortedMap stable_keys=sent_msgs.headMap(new Long (tmp_seqno)); 952 if(stable_keys != null) { 953 stable_keys.clear(); } 955 } 956 } 957 958 if(recv_win != null) 961 recv_win.stable(tmp_seqno); } 963 } 964 965 966 967 968 969 970 974 public void retransmit(long first_seqno, long last_seqno, Address sender) { 975 NakAckHeader hdr; 976 Message retransmit_msg=new Message(sender, null, null); 977 978 if(log.isTraceEnabled()) 979 log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + sender); 980 983 987 hdr=new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno, last_seqno); 988 retransmit_msg.putHeader(name, hdr); 989 passDown(new Event(Event.MSG, retransmit_msg)); 990 } 991 992 993 994 995 996 997 void clear() { 998 NakReceiverWindow win; 999 1000 1004 1006 synchronized(received_msgs) { 1007 for(Iterator it=received_msgs.values().iterator(); it.hasNext();) { 1008 win=(NakReceiverWindow)it.next(); 1009 win.reset(); 1010 } 1011 received_msgs.clear(); 1012 } 1013 } 1014 1015 1016 void removeAll() { 1017 NakReceiverWindow win; 1018 1019 1026 synchronized(sent_msgs) { 1027 sent_msgs.clear(); 1028 } 1029 1030 synchronized(received_msgs) { 1031 for(Iterator it=received_msgs.values().iterator(); it.hasNext();) { 1032 win=(NakReceiverWindow)it.next(); 1033 win.destroy(); 1034 } 1035 received_msgs.clear(); 1036 } 1037 } 1038 1039 1040 1059 1060 1061 String printSentMsgs() { 1062 StringBuffer sb=new StringBuffer (); 1063 Long min_seqno, max_seqno; 1064 synchronized(sent_msgs) { 1065 min_seqno=sent_msgs.size() > 0 ? (Long )sent_msgs.firstKey() : new Long (0); 1066 max_seqno=sent_msgs.size() > 0 ? (Long )sent_msgs.lastKey() : new Long (0); 1067 } 1068 sb.append('[').append(min_seqno).append(" - ").append(max_seqno).append(']'); 1069 return sb.toString(); 1070 } 1071 1072 1073 void handleConfigEvent(HashMap map) { 1074 if(map == null) { 1075 return; 1076 } 1077 if(map.containsKey("frag_size")) { 1078 max_xmit_size=((Integer )map.get("frag_size")).intValue(); 1079 if(log.isInfoEnabled()) { 1080 log.info("max_xmit_size=" + max_xmit_size); 1081 } 1082 } 1083 } 1084 1085 1086 1087 1088} 1089 | Popular Tags |