1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 8 import java.io.IOException ; 9 import java.io.ObjectInput ; 10 import java.io.ObjectOutput ; 11 import java.util.Vector ; 12 13 14 15 16 17 22 class SavedMessages { 23 24 29 class Entry { 30 private final Message msg; 31 private final long seq; 32 33 public Entry( Message msg, long seq ) { 34 this.msg = msg; 35 this.seq = seq; 36 } 37 38 public Message getMsg() { 39 return msg; 40 } 41 42 public long getSeq() { 43 return seq; 44 } 45 } 47 48 private final Vector messages; 50 51 54 public SavedMessages() { 55 messages = new Vector (); 56 } 57 58 62 public void insertMessage( Message msg, long seq ) { 63 synchronized( messages ) { 64 int size = messages.size(); 65 int index = 0; 66 long this_seq = -1; 68 try { 70 while( (index < size) && 71 ((this_seq = ((Entry) (messages.elementAt(index))).getSeq()) < seq) ) { 72 index++; 73 } 74 } catch ( java.lang.ClassCastException e ) { 75 System.err.println( "Error: (TOTAL_OLD) SavedMessages.insertMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index " + index + ')' ); 76 return; 77 } 78 79 if ( this_seq == seq ) { 81 System.err.println( "SavedMessages.insertMessage() - sequence " + seq + " already exists in saved messages. Message NOT saved." ); 82 return; 83 } 84 85 messages.insertElementAt( new Entry( msg, seq ), index ); 86 } } 88 89 96 private Message getMessage( long seq, boolean delete_msg ) { 97 synchronized( messages ) { 98 int size = messages.size(); 99 int index = 0; 100 long this_seq = -1; 101 try { 102 while( (index < size) && 103 ((this_seq = (((Entry) (messages.elementAt(index))).getSeq())) < seq) ) { 104 index++; 105 } 106 } catch ( java.lang.ClassCastException e ) { 107 System.err.println( "Error: (TOTAL_OLD) SavedMessages.getMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index " + index + ')' ); 108 return null; 109 } 110 if ( this_seq == seq ) { 112 Object temp_obj = messages.elementAt(index); 114 if ( temp_obj instanceof Entry ) { 115 Message ret_val = ((Entry) temp_obj).getMsg().copy(); 116 117 if ( delete_msg ) { 119 messages.removeElementAt(index); 120 } 121 122 return ret_val; 123 } else { 124 System.err.println( "Error: (TOTAL_OLD) SavedMessages.getMessage() - could not cast element of \"messages\" to an Entry (index " + index + ')' ); 125 return null; 126 } } else { 128 return null; 130 } 131 } } 133 134 139 public Message getMessage( long seq ) { 140 return getMessage( seq, true ); 141 } 142 143 147 public Message peekMessage( long seq ) { 148 return getMessage( seq, false ); 149 } 150 151 158 private Message getFirstMessage( boolean delete_msg ) { 159 synchronized( messages ) { 160 if ( isEmpty() ) { 161 return null; 162 } else { 163 Object temp_obj = messages.firstElement(); 164 if ( temp_obj instanceof Entry ) { 165 Message ret_val = ((Entry) temp_obj).getMsg().copy(); 166 messages.removeElementAt(0); 167 return ret_val; 168 } else { 169 System.err.println( "Error: (TOTAL_OLD) SavedMessages.getFirstMessage() - could not cast element of \"messages\" to an Entry" ); 170 return null; 171 } } 173 } } 175 176 181 public synchronized Message getFirstMessage() { 182 return getFirstMessage( true ); 183 } 184 185 189 public Message peekFirstMessage() { 190 return getFirstMessage( false ); 191 } 192 193 197 public long getFirstSeq() { 198 synchronized( messages ) { 199 if ( isEmpty() ) { 200 return -1; 201 } else { 202 Object temp_obj = messages.firstElement(); 203 if ( temp_obj instanceof Entry ) { 204 return ((Entry) temp_obj).getSeq(); 205 } else { 206 System.err.println( "Error: (TOTAL_OLD) SavedMessages.getFirstSeq() - could not cast element of \"messages\" to an Entry " ); 207 return -1; 208 } 209 } 210 } } 212 213 217 public boolean isEmpty() { 218 return messages.isEmpty(); 219 } 220 221 224 public int getSize() { 225 return messages.size(); 226 } 227 228 231 public void clearMessages() { 232 synchronized( messages ) { 233 messages.removeAllElements(); 234 } 235 } 236 } 238 239 245 class MessageAcks { 246 247 254 class Entry { 255 public final Address addr; 256 public long seq; 257 258 public Entry( Address addr, long seq ) { 259 this.addr = addr; 260 this.seq = seq; 261 } 262 263 public Entry ( Address addr ) { 264 this.addr = addr; 265 this.seq = -1; } 267 } 269 private final Vector acks; 271 272 private final SavedMessages message_history; 274 275 278 public MessageAcks( Vector members ) { 279 acks = new Vector (); 280 281 message_history = new SavedMessages(); 283 284 reset( members ); 286 } 287 288 292 public synchronized void reset( Vector members ) { 293 clear(); 294 295 int num_members = members.size(); 297 for( int i=0; i<num_members; i++ ) { 298 Object temp_obj = members.elementAt(i); 299 if ( temp_obj instanceof Address ) { 300 acks.addElement( new Entry( (Address) temp_obj ) ); 301 } else { 302 System.err.println( "Error: (TOTAL_OLD) MessageAcks.reset() - could not cast element of \"members\" to an Address object" ); 303 return; 304 } 305 } 306 } 307 308 311 private void clear() { 312 acks.removeAllElements(); 313 message_history.clearMessages(); 314 } 315 316 320 private Entry getEntry( Address addr ) { 321 synchronized( acks ) { 322 int size = acks.size(); 324 for( int i=0; i<size; i++ ) { 325 Object temp_obj = acks.elementAt(i); 326 if ( temp_obj instanceof Entry ) { 327 Entry this_entry = (Entry) temp_obj; 328 if ( (this_entry.addr).equals(addr) ) { 329 return this_entry; 331 } 332 } else { 333 System.err.println( "Error: (TOTAL_OLD) MessageAcks.getEntry() - could not cast element of \"acks\" to an Entry" ); 334 } } 336 337 return null; 339 } 340 } 341 342 350 public void setSeq( Address addr, long seq ) { 351 Entry this_entry = getEntry( addr ); 352 if ( (this_entry != null) && (this_entry.seq < seq) ) { 353 this_entry.seq = seq; 354 355 truncateHistory(); 357 } 358 } 359 360 368 public long getSeq( Address addr ) { 369 Entry this_entry = getEntry( addr ); 370 if ( this_entry == null ) { 371 return -2; } else { 373 return this_entry.seq; 374 } 375 } 376 377 381 public Message getMessage( long seq ) { 382 return message_history.peekMessage( seq ); 383 } 384 385 391 public void addMessage( Message msg, long seq ) { 392 message_history.insertMessage( msg, seq ); 393 } 394 395 399 private long getLowestSeqAck() { 400 synchronized( acks ) { 401 long ret_val = -10; 403 int size = acks.size(); 404 for( int i=0; i<size; i++ ) { 405 Object temp_obj = acks.elementAt(i); 406 if ( temp_obj instanceof Entry ) { 407 long this_seq = ((Entry) temp_obj).seq; 408 if ( this_seq < ret_val ) { 409 ret_val = this_seq; 410 } 411 } else { 412 System.err.println( "Error: (TOTAL_OLD) MessageAcks.getLowestSeqAck() - could not cast element of \"acks\" to an Entry (index=" + i + ')' ); 413 return -1; 414 } 415 } 416 417 return ret_val; 418 } 419 } 420 421 425 private synchronized void truncateHistory() { 426 long lowest_ack_seq = getLowestSeqAck(); 427 if ( lowest_ack_seq < 0 ) { 428 return; 431 } 432 433 synchronized( message_history ) { 435 long lowest_stored_seq; 436 while( ((lowest_stored_seq = message_history.getFirstSeq()) >=0) && 438 (lowest_stored_seq > lowest_ack_seq) ) { 439 message_history.getFirstMessage(); 441 } 442 } } 444 } 446 447 457 public class TOTAL_OLD extends Protocol { 458 private final static String PROTOCOL_NAME = "TOTAL_OLD"; 460 461 private Address local_addr = null; 462 private Vector members = new Vector (); 465 470 private long next_seq_id = -1; 471 472 478 private long next_seq_id_to_assign = -1; 479 480 private final static long INIT_SEQ_ID = 10; 482 487 private final SavedMessages queued_messages = new SavedMessages(); 488 489 496 private MessageAcks ack_history = null; 497 498 503 private final TotalRetransmissionThread retrans_thread = new TotalRetransmissionThread( this ); 504 505 506 509 public String getName() { 510 return PROTOCOL_NAME; 511 } 512 513 514 515 516 public void start() throws Exception { 517 retrans_thread.start(); 519 } 520 521 public void stop() { 522 retrans_thread.stopResendRequests(); 524 } 525 526 527 528 public void reset() { 529 531 next_seq_id = -1; 533 queued_messages.clearMessages(); 535 536 retrans_thread.reset(); 538 } 539 540 541 544 protected long getNextSeqID() { 545 return next_seq_id; 546 } 547 548 549 554 protected long getFirstQueuedSeqID() { 555 return queued_messages.getFirstSeq(); 556 } 557 558 559 562 public void up(Event evt) { 563 Message msg; 564 565 567 Object temp_obj; switch( evt.getType() ) { 569 570 case Event.SET_LOCAL_ADDRESS: 571 temp_obj = evt.getArg(); 572 if ( temp_obj instanceof Address ) { 573 local_addr = (Address) temp_obj; 574 } else { 575 System.err.println( "Error: Total.up() - could not cast local address to an Address object" ); 576 } 577 break; 578 579 case Event.MSG: 580 temp_obj = evt.getArg(); 582 if ( temp_obj instanceof Message ) { 583 msg = (Message) temp_obj; 584 temp_obj = msg.removeHeader(getName()); 585 if ( temp_obj instanceof TotalHeader ) { 586 TotalHeader hdr = (TotalHeader) temp_obj; 587 588 switch( hdr.total_header_type ) { 590 591 case TotalHeader.TOTAL_UNICAST: 592 passUp(evt); 594 return; 595 596 case TotalHeader.TOTAL_BCAST: 597 handleBCastMessage( msg, hdr.seq_id ); 598 break; 599 600 case TotalHeader.TOTAL_REQUEST: 601 if ( isSequencer() ) { 603 handleRequestMessage( msg ); 604 } 605 break; 606 607 case TotalHeader.TOTAL_NEW_VIEW: 608 next_seq_id = hdr.seq_id; 610 611 break; 613 614 case TotalHeader.TOTAL_CUM_SEQ_ACK: 615 if ( isSequencer() ) { 617 temp_obj = msg.getSrc(); 618 if ( temp_obj instanceof Address ) { 619 ack_history.setSeq( (Address) temp_obj, hdr.seq_id ); 620 } else { 621 System.err.println( "Error: TOTAL_OLD.Up() - could not cast source of message to an Address object (case TotalHeader.TOTAL_CUM_SEQ_ACK)" ); 622 } 623 } 624 break; 625 626 case TotalHeader.TOTAL_RESEND: 627 if ( isSequencer() ) { 629 handleResendRequest( msg, hdr.seq_id ); 630 } 631 break; 632 633 default: 634 System.err.println( "Error: TOTAL_OLD.up() - unrecognized TotalHeader in message - " + hdr.toString() ); 636 return; } } else { 639 System.err.println( "Error: TOTAL_OLD.up() - could not cast message header to TotalHeader (case Event.MSG)" ); 640 } } else { 642 System.err.println( "Error: TOTAL_OLD.up() - could not cast argument of Event to a Message (case Event.MSG)" ); 643 } 645 return; 648 654 655 case Event.TMP_VIEW: case Event.VIEW_CHANGE: 657 System.out.println( "View Change event passed up to TOTAL_OLD (debug - mms21)" ); 658 View new_view = (View) evt.getArg(); 659 members = new_view.getMembers(); 660 { 661 System.out.println( "New view members (printed in TOTAL_OLD):" ); 663 int view_size = members.size(); 664 for( int i=0; i<view_size; i++ ) { 665 System.out.println( " " + members.elementAt(i).toString() ); 666 } 667 } 668 669 reset(); 671 672 if ( isSequencer() ) { 675 System.err.println( "TOTAL_OLD.up() - I am the sequencer of this new view" ); 677 678 ack_history = new MessageAcks( members ); 680 681 next_seq_id_to_assign = INIT_SEQ_ID; 683 684 Message new_view_msg = new Message( null, local_addr, null ); 686 new_view_msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_NEW_VIEW, next_seq_id_to_assign ) ); 687 passDown( new Event( Event.MSG, new_view_msg ) ); 688 } 689 690 break; 691 693 default: 694 break; 695 } 697 passUp(evt); } 699 700 701 705 private synchronized int passUpMessages() { 706 if ( next_seq_id < 0 ) { 707 return 0; 709 } 710 711 long lowest_seq_stored = queued_messages.getFirstSeq(); 712 if ( lowest_seq_stored < 0 ) { 713 return 0; 715 } 716 if ( lowest_seq_stored < next_seq_id ) { 717 System.err.println( "Error: TOTAL_OLD.passUpMessages() - next expected sequence id (" + next_seq_id + ") is greater than the sequence id of a stored message (" + lowest_seq_stored + ')' ); 720 return 0; 721 } else if ( next_seq_id == lowest_seq_stored ) { 722 Message msg = queued_messages.getFirstMessage(); 724 if ( msg == null ) { 725 System.err.println( "Error: TOTAL_OLD.passUpMessages() - unexpected null Message retrieved from stored messages" ); 726 return 0; 727 } 728 passUp( new Event( Event.MSG, msg ) ); 729 730 next_seq_id++; 732 733 return (1 + passUpMessages()); 734 } else { 735 746 return 0; 747 } 748 } 749 750 751 private final long last_request_time = -1; 752 758 private synchronized void handleBCastMessage( Message msg, long seq ) { 759 766 767 if ( seq < next_seq_id ) { 768 return; 771 } 772 773 queued_messages.insertMessage( msg, seq ); 775 776 int num_passed = passUpMessages(); 778 if ( num_passed > 1 ) 780 System.err.println( "TOTAL_OLD.handleBCastMessage() - " + num_passed + " message(s) passed up the Protocol Stack" ); 781 782 818 } 819 820 821 826 private synchronized void handleRequestMessage( Message msg ) { 827 if ( next_seq_id_to_assign < 0 ) { 828 System.err.println( "Error: TOTAL_OLD.handleRequestMessage() - cannot handle request... do not know what sequence id to assign" ); 830 return; 831 } 832 833 msg.setDest( null ); 835 836 msg.setSrc( local_addr ); 838 839 msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_BCAST, next_seq_id_to_assign ) ); 841 842 Message msg_copy = msg.copy(); 844 ack_history.addMessage( msg_copy, next_seq_id_to_assign ); 845 846 { 848 Object header = msg_copy.getHeader(getName()); 849 if ( !(header instanceof TotalHeader) ) { 850 System.err.println( "Error: TOTAL_OLD.handleRequestMessage() - BAD: stored message that did not contain a TotalHeader - " + next_seq_id_to_assign ); 851 } 852 } 855 next_seq_id_to_assign++; 857 858 passDown( new Event( Event.MSG, msg ) ); 860 } 861 862 863 866 private synchronized void handleResendRequest( Message msg, long seq ) { 867 System.err.println( "TOTAL_OLD.handleRequestMessage() - received resend request for message " + seq ); 868 869 886 Address requester = null; 887 Message resend_msg = ack_history.getMessage( seq ); 889 if ( resend_msg == null ) { 891 System.err.println( "TOTAL_OLD.handleResendRequest() - could not find the message " + seq + " in the history to resend" ); 893 return; 894 } 895 resend_msg.setDest( requester ); 896 897 { 901 Object header = resend_msg.getHeader(getName()); 902 if ( header instanceof TotalHeader ) { 903 } else { 905 System.err.println( "TOTAL_OLD: resend msg BAD (header is NOT a TotalHeader) - " + seq ); 906 } 907 } 910 passDown( new Event( Event.MSG, resend_msg ) ); 911 System.err.println( "TOTAL_OLD.handleResendRequest() - responded to resend request for message " + seq ); 912 } 913 914 915 918 public void down(Event evt) { 919 Message msg; 920 921 923 switch( evt.getType() ) { 924 925 case Event.VIEW_CHANGE: 926 System.err.println( "NOTE: VIEW_CHANGE Event going down through " + PROTOCOL_NAME ); 928 929 Vector new_members=((View)evt.getArg()).getMembers(); 930 synchronized(members) { 931 members.removeAllElements(); 932 if(new_members != null && new_members.size() > 0) 933 for(int i=0; i < new_members.size(); i++) 934 members.addElement(new_members.elementAt(i)); 935 } 936 break; 937 938 case Event.MSG: 939 Object temp_obj = evt.getArg(); 940 if ( temp_obj instanceof Message ) { 941 msg = (Message) temp_obj; 942 943 946 if ( msg.getDest() == null ) { 948 950 Address sequencer = getSequencer(); 953 if ( sequencer != null ) { 954 msg.setDest( sequencer ); 956 } else { 957 } 961 962 msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_REQUEST, -1)); 964 965 966 } else { 967 msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_UNICAST, -1 ) ); } 970 } else { 971 System.err.println( "Error: TOTAL_OLD.down() - could not cast argument of Event to a Message (case Event.MSG)" ); 972 } break; 974 975 default: 976 break; 977 } 979 passDown(evt); 981 } 982 983 984 990 private boolean isSequencer() { 991 if ( local_addr == null ) { 992 System.err.println( "TOTAL_OLD.isSequencer() - local address unknown!" ); 994 return false; 995 } 996 997 synchronized( members ) { 998 if ( members.size() == 0 ) { 999 System.err.println( "TOTAL_OLD.isSequencer() - no members!" ); 1001 return false; 1002 } 1003 1004 Object temp_obj = members.elementAt(0); 1005 if ( temp_obj instanceof Address ) { 1006 Address seq_addr = (Address) temp_obj; 1007 return local_addr.equals(seq_addr); 1008 } else { 1009 System.err.println( "Error: TOTAL_OLD.isSequencer() - could not cast element of \"members\" to an Address" ); 1010 return false; 1011 } } 1013 } 1014 1015 1016 1020 protected Address getLocalAddr() { 1021 return local_addr; 1022 } 1023 1024 1025 1029 protected Address getSequencer() { 1030 synchronized( members ) { 1031 if ( members.size() == 0 ) { 1032 System.err.println( "TOTAL_OLD.getSequencer() - no members" ); 1033 return null; 1034 } else { 1035 Object temp_obj = members.elementAt(0); 1036 if ( temp_obj instanceof Address ) { 1037 return (Address) temp_obj; 1038 } else { 1039 System.err.println( "Error: TOTAL_OLD.getSequencer() - could not cast first element of \"members\" to an Address" ); 1040 return null; 1041 } 1042 } 1043 } 1044 } 1045 1046 1047 1048 1049 1050 1056 public static class TotalHeader extends Header { 1057 public final static int TOTAL_UNICAST = 0; public final static int TOTAL_BCAST = 1; public final static int TOTAL_REQUEST = 2; public final static int TOTAL_NEW_VIEW = 3; public final static int TOTAL_NEW_VIEW_ACK = 4; public final static int TOTAL_CUM_SEQ_ACK = 5; public final static int TOTAL_SEQ_ACK = 6; public final static int TOTAL_RESEND = 7; 1067 public int total_header_type; 1068 1069 1087 public long seq_id; 1089 1090 public TotalHeader() {} 1092 public TotalHeader( int type, long seq ) { 1093 switch( type ) { 1094 case TOTAL_UNICAST: 1095 case TOTAL_BCAST: 1096 case TOTAL_REQUEST: 1097 case TOTAL_NEW_VIEW: 1098 case TOTAL_NEW_VIEW_ACK: 1099 case TOTAL_CUM_SEQ_ACK: 1100 case TOTAL_SEQ_ACK: 1101 case TOTAL_RESEND: 1102 total_header_type = type; 1104 break; 1105 1106 default: 1107 System.err.println( "Error: TotalHeader.TotalHeader() - unknown TotalHeader type given: " + type ); 1109 total_header_type = -1; 1110 break; 1111 } 1112 1113 seq_id = seq; 1114 } 1115 1116 1120 public String toString() { 1121 String type = ""; 1122 switch( total_header_type ) { 1123 case TOTAL_UNICAST: 1124 type = "TOTAL_UNICAST"; 1125 break; 1126 1127 case TOTAL_BCAST: 1128 type = "TOTAL_BCAST"; 1129 break; 1130 1131 case TOTAL_REQUEST: 1132 type = "TOTAL_REQUEST"; 1133 break; 1134 1135 case TOTAL_NEW_VIEW: 1136 type = "NEW_VIEW"; 1137 break; 1138 1139 case TOTAL_NEW_VIEW_ACK: 1140 type = "NEW_VIEW_ACK"; 1141 break; 1142 1143 case TOTAL_CUM_SEQ_ACK: 1144 type = "TOTAL_CUM_SEQ_ACK"; 1145 break; 1146 1147 case TOTAL_SEQ_ACK: 1148 type = "TOTAL_SEQ_ACK"; 1149 break; 1150 1151 case TOTAL_RESEND: 1152 type = "TOTAL_RESEND"; 1153 break; 1154 1155 default: 1156 type = "UNKNOWN TYPE (" + total_header_type + ')'; 1157 break; 1158 } 1159 1160 return "[ TOTAL_OLD: type=" + type + ", seq=" + seq_id + " ]"; 1161 } 1162 1163 1164 1165 public void writeExternal(ObjectOutput out) throws IOException { 1166 out.writeInt(total_header_type); 1167 out.writeLong(seq_id); 1168 } 1169 1170 1171 1172 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 1173 total_header_type=in.readInt(); 1174 seq_id=in.readLong(); 1175 } 1176 1177 1178 1179 } 1181 1182 1183} 1185 1186 1191class TotalRetransmissionThread extends Thread { 1192 private long last_retrans_request_time; private long last_requested_seq; 1196 final private static long polling_delay = 1000; final private static long resend_timeout = 2000; final private static int max_request = 10; 1201 private TOTAL_OLD prot_ptr; 1203 1204 private boolean is_running; 1206 1207 1208 1214 public TotalRetransmissionThread( TOTAL_OLD parent_prot ) { 1215 if ( parent_prot != null ) { 1216 prot_ptr = parent_prot; 1217 } else { 1218 System.err.println( "Error: TotalRetransmissionThread.TotalRetransmissionThread() - given parent protocol reference is null\n (FATAL ERROR - TOTAL_OLD protocol will not function properly)" ); 1220 1221 is_running = false; 1223 } 1224 1225 reset(); 1227 1228 is_running = true; 1230 } 1231 1232 1236 public void reset() { 1237 last_retrans_request_time = -1; 1239 last_requested_seq = -1; 1240 } 1241 1242 1243 1247 private void sendResendRequest( Address sequencer, Address local_addr, long seq_id ) { 1248 Message resend_msg = new Message( sequencer, local_addr, null ); 1249 resend_msg.putHeader(getName(), new TOTAL_OLD.TotalHeader( TOTAL_OLD.TotalHeader.TOTAL_RESEND, seq_id ) ); 1250 prot_ptr.passDown( new Event( Event.MSG, resend_msg ) ); 1251 1252 System.err.println( "TotalRetransmissionThread.resend() - resend requested for message " + seq_id ); 1254 } 1255 1256 1257 1262 private void checkForResend() { 1263 long first_seq_id = prot_ptr.getFirstQueuedSeqID(); 1269 if ( first_seq_id >= 0 ) { 1270 1272 long next_seq_id = prot_ptr.getNextSeqID(); if ( (next_seq_id < first_seq_id) ) { 1277 long time_now = System.currentTimeMillis(); 1279 if ( (next_seq_id > last_requested_seq) || 1280 (time_now > (last_retrans_request_time + resend_timeout)) || 1281 (last_retrans_request_time < 0) ) { 1282 Address sequencer = prot_ptr.getSequencer(); 1285 if ( sequencer == null ) { 1286 System.out.println( "Error: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - could not determine sequencer to send a TOTAL_RESEND request" ); 1287 1288 return; 1289 } 1290 1291 Address local_addr = prot_ptr.getLocalAddr(); 1292 if ( local_addr == null ) { 1293 System.out.println( "Warning: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - local address not specified in TOTAL_RESEND request... attempting to send requests anyway" ); 1294 } 1295 1296 long temp_long = (next_seq_id + max_request); long last_resend_seq_id = (temp_long > first_seq_id) ? first_seq_id : temp_long; 1298 for( long resend_seq=next_seq_id; resend_seq<last_resend_seq_id ; resend_seq++ ) { 1299 sendResendRequest( sequencer, local_addr, resend_seq ); 1300 } 1301 last_retrans_request_time = time_now; 1303 last_requested_seq = last_resend_seq_id; 1304 } 1305 } } } 1309 1310 1311 1315 public void run() { 1316 while( is_running ) { 1317 checkForResend(); 1320 1321 try { 1323 sleep( polling_delay ); 1324 } catch( InterruptedException e ) {} } 1326 } 1327 1328 1332 public void stopResendRequests() { 1333 is_running = false; 1334 } 1335} | Popular Tags |