1 package org.jgroups.protocols; 3 4 5 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; 6 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; 7 import org.jgroups.Address; 8 import org.jgroups.Event; 9 import org.jgroups.Message; 10 import org.jgroups.View; 11 import org.jgroups.stack.AckSenderWindow; 12 import org.jgroups.stack.Protocol; 13 import org.jgroups.util.TimeScheduler; 14 15 import java.io.IOException ; 16 import java.io.ObjectInput ; 17 import java.io.ObjectOutput ; 18 import java.util.*; 19 20 21 65 public class TOTAL extends Protocol { 66 70 public static class Header extends org.jgroups.Header { 71 75 public static final int NULL_TYPE=-1; 76 79 public static final int REQ=0; 80 83 public static final int REP=1; 84 87 public static final int UCAST=2; 88 91 public static final int BCAST=3; 92 93 96 public int type; 97 101 public long localSequenceID; 102 105 public long sequenceID; 106 107 110 public Header() { 111 } 112 113 123 public Header(int type, long localSeqID, long seqID) { 124 super(); 125 switch(type) { 126 case REQ: 127 case REP: 128 case UCAST: 129 case BCAST: 130 this.type=type; 131 break; 132 default: 133 this.type=NULL_TYPE; 134 throw new IllegalArgumentException ("type"); 135 } 136 this.localSequenceID=localSeqID; 137 this.sequenceID=seqID; 138 } 139 140 143 public String toString() { 144 StringBuffer buffer=new StringBuffer (); 145 String typeName; 146 buffer.append("[TOTAL.Header"); 147 switch(type) { 148 case REQ: 149 typeName="REQ"; 150 break; 151 case REP: 152 typeName="REP"; 153 break; 154 case UCAST: 155 typeName="UCAST"; 156 break; 157 case BCAST: 158 typeName="BCAST"; 159 break; 160 case NULL_TYPE: 161 typeName="NULL_TYPE"; 162 break; 163 default: 164 typeName=""; 165 break; 166 } 167 buffer.append(", type=" + typeName); 168 buffer.append(", " + "localID=" + localSequenceID); 169 buffer.append(", " + "seqID=" + sequenceID); 170 buffer.append(']'); 171 172 return (buffer.toString()); 173 } 174 175 178 public void writeExternal(ObjectOutput out) throws IOException { 179 out.writeInt(type); 180 out.writeLong(localSequenceID); 181 out.writeLong(sequenceID); 182 } 183 184 187 public void readExternal(ObjectInput in) throws IOException , 188 ClassNotFoundException { 189 type=in.readInt(); 190 localSequenceID=in.readLong(); 191 sequenceID=in.readLong(); 192 } 193 } 194 195 196 200 private class Command implements AckSenderWindow.RetransmitCommand { 201 public Command() { 202 } 203 204 public void retransmit(long seqNo, Message msg) { 205 _retransmitBcastRequest(seqNo); 206 } 207 } 208 209 210 213 private static final String PROT_NAME="TOTAL"; 214 217 private static final String TRACE_PROP="trace"; 218 219 222 private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000}; 223 224 227 private static final long NULL_ID=-1; 228 232 private static final int NULL_STATE=-1; 233 236 private static final int RUN=0; 237 241 private static final int FLUSH=1; 242 245 private static final int BLOCK=2; 246 247 248 251 private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock(); 252 255 private int state=NULL_STATE; 256 259 private Address addr=null; 260 263 private Address sequencerAddr=null; 264 268 private long sequencerSeqID=NULL_ID; 269 275 private long localSeqID=NULL_ID; 276 281 private long seqID=NULL_ID; 282 288 private SortedMap reqTbl; 289 296 private SortedMap upTbl; 297 300 private AckSenderWindow retransmitter; 301 302 303 306 private String _addrToString(Object addr) { 307 return ( 308 addr == null ? "<null>" : 309 ((addr instanceof org.jgroups.stack.IpAddress) ? 310 (((org.jgroups.stack.IpAddress)addr).getIpAddress( 311 ).getHostAddress() + ':' + 312 ((org.jgroups.stack.IpAddress)addr).getPort()) : 313 addr.toString()) 314 ); 315 } 316 317 318 321 private String _getName() { 322 return (PROT_NAME); 323 } 324 325 332 private boolean _setProperties(Properties properties) { 333 String value; 334 335 value=properties.getProperty(TRACE_PROP); 338 if(value != null) properties.remove(TRACE_PROP); 339 if(properties.size() > 0) { 340 if(log.isErrorEnabled()) 341 log.error("The following properties are not " + 342 "recognized: " + properties.toString()); 343 return (false); 344 } 345 return (true); 346 } 347 348 354 Vector _requiredDownServices() { 355 Vector services=new Vector(); 356 357 return (services); 358 } 359 360 366 Vector _requiredUpServices() { 367 Vector services=new Vector(); 368 369 return (services); 370 } 371 372 373 377 private void _deliverBcast() { 378 Message msg; 379 Header header; 380 381 synchronized(upTbl) { 382 while((msg=(Message)upTbl.remove(new Long (seqID + 1))) != null) { 383 header=(Header)msg.removeHeader(getName()); 384 if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg)); 385 ++seqID; 386 } 387 } } 389 390 391 395 private void _replayBcast() { 396 Iterator it; 397 Message msg; 398 Header header; 399 400 403 synchronized(upTbl) { 404 if(upTbl.size() > 0) 405 if(log.isInfoEnabled()) log.info("Replaying undelivered bcasts"); 406 407 it=upTbl.entrySet().iterator(); 408 while(it.hasNext()) { 409 msg=(Message)((Map.Entry)it.next()).getValue(); 410 it.remove(); 411 if(!msg.getSrc().equals(addr)) { 412 if(log.isInfoEnabled()) 413 log.info("During replay: " + 414 "discarding BCAST[" + 415 ((TOTAL.Header)msg.getHeader(getName())).sequenceID + 416 "] from " + _addrToString(msg.getSrc())); 417 continue; 418 } 419 header=(Header)msg.removeHeader(getName()); 420 if(header.localSequenceID == NULL_ID) continue; 421 _sendBcastRequest(msg, header.localSequenceID); 422 } 423 } } 425 426 427 433 private Message _sendUcast(Message msg) { 434 msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID)); 435 return (msg); 436 } 437 438 439 447 private void _sendBcastRequest(Message msg) { 448 _sendBcastRequest(msg, ++localSeqID); 449 } 450 451 452 460 private void _sendBcastRequest(Message msg, long id) { 461 462 synchronized(reqTbl) { 466 reqTbl.put(new Long (id), msg); 467 } 468 _transmitBcastRequest(id); 469 retransmitter.add(id, msg); 470 } 471 472 473 478 private void _transmitBcastRequest(long seqID) { 479 Message reqMsg; 480 481 487 if(state == NULL_STATE) { 488 if(log.isInfoEnabled()) log.info("Transmit BCAST_REQ[" + seqID + "] in NULL_STATE"); 489 return; 490 } 491 if(state == BLOCK) return; 492 493 synchronized(reqTbl) { 494 if(!reqTbl.containsKey(new Long (seqID))) { 495 retransmitter.ack(seqID); 496 return; 497 } 498 } 499 reqMsg=new Message(sequencerAddr, addr, new byte[0]); 500 reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID)); 501 502 passDown(new Event(Event.MSG, reqMsg)); 503 } 504 505 506 511 private void _recvUcast(Message msg) { 512 msg.removeHeader(getName()); 513 } 514 515 521 private void _recvBcast(Message msg) { 522 Header header=(Header)msg.getHeader(getName()); 523 524 531 synchronized(upTbl) { 532 if(header.sequenceID <= seqID) 533 return; 534 upTbl.put(new Long (header.sequenceID), msg); 535 } 536 537 _deliverBcast(); 538 } 539 540 541 547 private void _recvBcastRequest(Message msg) { 548 Header header; 549 Message repMsg; 550 551 554 if(!addr.equals(sequencerAddr)) { 555 if(log.isErrorEnabled()) 556 log.error("Received bcast request " + 557 "but not a sequencer"); 558 return; 559 } 560 if(state == BLOCK) { 561 if(log.isInfoEnabled()) log.info("Blocked, discard bcast req"); 562 return; 563 } 564 header=(Header)msg.getHeader(getName()); 565 ++sequencerSeqID; 566 repMsg=new Message(msg.getSrc(), addr, new byte[0]); 567 repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID, 568 sequencerSeqID)); 569 570 passDown(new Event(Event.MSG, repMsg)); 571 } 572 573 574 580 private void _recvBcastReply(Header header) { 581 Message msg; 582 long id; 583 584 594 if(state == BLOCK) { 595 if(log.isInfoEnabled()) log.info("Blocked, discard bcast rep"); 596 return; 597 } 598 599 synchronized(reqTbl) { 600 msg=(Message)reqTbl.remove(new Long (header.localSequenceID)); 601 } 602 603 if(msg != null) { 604 retransmitter.ack(header.localSequenceID); 605 id=header.localSequenceID; 606 } 607 else { 608 if(log.isInfoEnabled()) 609 log.info("Bcast reply to " + 610 "non-existent BCAST_REQ[" + header.localSequenceID + 611 "], Sending NULL bcast"); 612 id=NULL_ID; 613 msg=new Message(null, addr, new byte[0]); 614 } 615 msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID)); 616 617 passDown(new Event(Event.MSG, msg)); 618 } 619 620 621 626 private void _retransmitBcastRequest(long seqID) { 627 try { 629 stateLock.readLock().acquire(); 630 try { 631 if(log.isInfoEnabled()) log.info("Retransmit BCAST_REQ[" + seqID + ']'); 632 _transmitBcastRequest(seqID); 633 } 634 finally { 635 stateLock.readLock().release(); 636 } 637 } 638 catch(InterruptedException e) { 639 log.error("failed acquiring a read lock", e); 640 } 641 } 642 643 644 648 649 654 private boolean _upBlock() { 655 try { 657 stateLock.writeLock().acquire(); 658 try { 659 state=FLUSH; 660 } 662 finally { 663 stateLock.writeLock().release(); 664 } 665 } 666 catch(InterruptedException e) { 667 log.error("failed acquiring the write lock", e); 668 } 669 670 return (true); 671 } 672 673 674 680 private boolean _upMsg(Event event) { 681 Message msg; 682 Object obj; 683 Header header; 684 685 try { 687 stateLock.readLock().acquire(); 688 try { 689 690 if(state == NULL_STATE) { 692 if(log.isErrorEnabled()) log.error("Up msg in NULL_STATE"); 693 return (false); 694 } 695 696 msg=(Message)event.getArg(); 703 if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) { 704 if(log.isErrorEnabled()) log.error("No TOTAL.Header found"); 705 return (false); 706 } 707 header=(Header)obj; 708 709 switch(header.type) { 710 case Header.UCAST: 711 _recvUcast(msg); 712 return (true); 713 case Header.BCAST: 714 _recvBcast(msg); 715 return (false); 716 case Header.REQ: 717 _recvBcastRequest(msg); 718 return (false); 719 case Header.REP: 720 _recvBcastReply(header); 721 return (false); 722 default: 723 if(log.isErrorEnabled()) log.error("Unknown header type"); 724 return (false); 725 } 726 727 } 729 finally { 730 stateLock.readLock().release(); 731 } 732 } 733 catch(InterruptedException e) { 734 if(log.isErrorEnabled()) log.error(e.getMessage()); 735 } 736 737 return (true); 738 } 739 740 741 747 private boolean _upSetLocalAddress(Event event) { 748 try { 750 stateLock.writeLock().acquire(); 751 try { 752 addr=(Address)event.getArg(); 753 } 754 finally { 755 stateLock.writeLock().release(); 756 } 757 } 758 catch(InterruptedException e) { 759 log.error(e.getMessage()); 760 } 761 return (true); 762 } 763 764 765 772 private boolean _upViewChange(Event event) { 773 Object oldSequencerAddr; 774 775 try { 777 stateLock.writeLock().acquire(); 778 try { 779 780 state=RUN; 781 782 oldSequencerAddr=sequencerAddr; 789 sequencerAddr= 790 (Address)((View)event.getArg()).getMembers().elementAt(0); 791 if(addr.equals(sequencerAddr)) { 792 sequencerSeqID=NULL_ID; 793 if((oldSequencerAddr == null) || 794 (!addr.equals(oldSequencerAddr))) 795 if(log.isInfoEnabled()) log.info("I'm the new sequencer"); 796 } 797 seqID=NULL_ID; 798 _replayBcast(); 799 800 } 802 finally { 803 stateLock.writeLock().release(); 804 } 805 } 806 catch(InterruptedException e) { 807 log.error(e.getMessage()); 808 } 809 810 return (true); 811 } 812 813 814 819 820 821 827 private boolean _downBlockOk() { 828 try { 830 stateLock.writeLock().acquire(); 831 try { 832 state=BLOCK; 833 } 834 finally { 835 stateLock.writeLock().release(); 836 } 837 } 838 catch(InterruptedException e) { 839 log.error(e.getMessage()); 840 } 841 842 return (true); 843 } 844 845 846 859 private boolean _downMsg(Event event) { 860 Message msg; 861 862 try { 864 stateLock.readLock().acquire(); 865 try { 866 867 if(state == NULL_STATE) { 870 if(log.isErrorEnabled()) log.error("Discard msg in NULL_STATE"); 871 return (false); 872 } 873 if(state == BLOCK) { 874 if(log.isErrorEnabled()) log.error("Blocked, discard msg"); 875 return (false); 876 } 877 878 msg=(Message)event.getArg(); 879 if(msg.getDest() == null) { 880 _sendBcastRequest(msg); 881 return (false); 882 } 883 else { 884 msg=_sendUcast(msg); 885 event.setArg(msg); 886 } 887 888 } 890 finally { 891 stateLock.readLock().release(); 892 } 893 } 894 catch(InterruptedException e) { 895 log.error(e.getMessage()); 896 } 897 898 return (true); 899 } 900 901 902 905 public void start() throws Exception { 906 TimeScheduler timer; 907 908 timer=stack != null ? stack.timer : null; 909 if(timer == null) 910 throw new Exception ("TOTAL.start(): timer is null"); 911 912 reqTbl=new TreeMap(); 913 upTbl=new TreeMap(); 914 retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL); 915 } 916 917 918 924 public void stop() { 925 try { 926 stateLock.writeLock().acquire(); 927 try { 928 state=NULL_STATE; 929 retransmitter.reset(); 930 reqTbl.clear(); 931 upTbl.clear(); 932 addr=null; 933 } 934 finally { 935 stateLock.writeLock().release(); 936 } 937 } 938 catch(InterruptedException e) { 939 log.error(e.getMessage()); 940 } 941 } 942 943 944 949 private void _up(Event event) { 950 switch(event.getType()) { 951 case Event.BLOCK: 952 if(!_upBlock()) return; 953 break; 954 case Event.MSG: 955 if(!_upMsg(event)) return; 956 break; 957 case Event.SET_LOCAL_ADDRESS: 958 if(!_upSetLocalAddress(event)) return; 959 break; 960 case Event.VIEW_CHANGE: 961 if(!_upViewChange(event)) return; 962 break; 963 default: 964 break; 965 } 966 967 passUp(event); 968 } 969 970 971 976 private void _down(Event event) { 977 switch(event.getType()) { 978 case Event.BLOCK_OK: 979 if(!_downBlockOk()) return; 980 break; 981 case Event.MSG: 982 if(!_downMsg(event)) return; 983 break; 984 default: 985 break; 986 } 987 988 passDown(event); 989 } 990 991 992 995 public TOTAL() { 996 } 997 998 999 public String getName() { 1002 return (_getName()); 1003 } 1004 1005 public boolean setProperties(Properties properties) { 1007 return (_setProperties(properties)); 1008 } 1009 1010 public Vector requiredDownServices() { 1012 return (_requiredDownServices()); 1013 } 1014 1015 public Vector requiredUpServices() { 1017 return (_requiredUpServices()); 1018 } 1019 1020 public void up(Event event) { 1022 _up(event); 1023 } 1024 1025 public void down(Event event) { 1027 _down(event); 1028 } 1029} 1030 | Popular Tags |