1 3 4 package org.jgroups.stack; 5 6 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; 7 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; 8 import org.apache.commons.logging.Log; 9 import org.apache.commons.logging.LogFactory; 10 import org.jgroups.Address; 11 import org.jgroups.Message; 12 import org.jgroups.util.List; 13 import org.jgroups.util.TimeScheduler; 14 15 import java.util.*; 16 17 18 19 50 public class NakReceiverWindow { 51 52 53 54 55 62 63 64 private final ReadWriteLock lock=new WriterPreferenceReadWriteLock(); 65 67 68 private long head=0; 69 private long tail=0; 70 71 72 private long lowest_seen=0; 73 74 75 private long highest_seen=0; 76 77 78 private final TreeMap received_msgs=new TreeMap(); 79 80 82 private final TreeMap delivered_msgs=new TreeMap(); 83 84 91 private boolean discard_delivered_msgs=false; 92 93 94 97 private int max_xmit_buf_size=0; 98 99 101 private Retransmitter retransmitter=null; 102 103 protected static final Log log=LogFactory.getLog(NakReceiverWindow.class); 104 105 106 118 public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, 119 long start_seqno, TimeScheduler sched) { 120 head=start_seqno; 121 tail=head; 122 123 if(cmd != null) 124 retransmitter=sched == null ? 125 new Retransmitter(sender, cmd) : 126 new Retransmitter(sender, cmd, sched); 127 } 128 129 138 public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long start_seqno) { 139 this(sender, cmd, start_seqno, null); 140 } 141 142 148 public NakReceiverWindow(Address sender, long start_seqno) { 149 this(sender, null, start_seqno); 150 } 151 152 153 public void setRetransmitTimeouts(long[] timeouts) { 154 if(retransmitter != null) 155 retransmitter.setRetransmitTimeouts(timeouts); 156 } 157 158 159 public void setDiscardDeliveredMessages(boolean flag) { 160 this.discard_delivered_msgs=flag; 161 } 162 163 public int getMaxXmitBufSize() { 164 return max_xmit_buf_size; 165 } 166 167 public void setMaxXmitBufSize(int max_xmit_buf_size) { 168 this.max_xmit_buf_size=max_xmit_buf_size; 169 } 170 171 172 185 public void add(long seqno, Message msg) { 186 long old_tail; 187 188 try { 189 lock.writeLock().acquire(); 190 try { 191 old_tail=tail; 192 if(seqno < head) { 193 if(log.isTraceEnabled()) { 194 StringBuffer sb=new StringBuffer ("seqno "); 195 sb.append(seqno).append(" is smaller than ").append(head).append("); discarding message"); 196 log.trace(sb.toString()); 197 } 198 return; 199 } 200 201 if(seqno == tail) { 203 received_msgs.put(new Long (seqno), msg); 204 tail++; 205 highest_seen=seqno; 206 } 207 else if(seqno > tail) { 212 for(long i=tail; i < seqno; i++) { 213 received_msgs.put(new Long (i), null); 214 tail++; 217 } 218 received_msgs.put(new Long (seqno), msg); 219 tail=seqno + 1; 220 if(retransmitter != null) { 221 retransmitter.add(old_tail, seqno - 1); 222 } 223 } 225 else if(seqno < tail) { 226 if(log.isTraceEnabled()) { 227 StringBuffer sb=new StringBuffer ("added missing msg "); 228 sb.append(msg.getSrc()).append('#').append(seqno); 229 log.trace(sb.toString()); 230 } 231 232 Object val=received_msgs.get(new Long (seqno)); 233 if(val == null) { 234 received_msgs.put(new Long (seqno), msg); 236 237 if(highest_seen +1 == seqno || seqno == head) 238 updateHighestSeen(); 239 240 if(retransmitter != null) retransmitter.remove(seqno); 246 } 247 } 248 updateLowestSeen(); 249 } 250 finally { 251 lock.writeLock().release(); 252 } 253 } 254 catch(InterruptedException e) { 255 log.error("failed acquiring write lock", e); 256 } 257 } 258 259 260 261 void updateHighestSeen() { 262 SortedMap map=received_msgs.tailMap(new Long (highest_seen)); 263 Map.Entry entry; 264 for(Iterator it=map.entrySet().iterator(); it.hasNext();) { 265 entry=(Map.Entry)it.next(); 266 if(entry.getValue() != null) 267 highest_seen=((Long )entry.getKey()).longValue(); 268 else 269 break; 270 } 271 } 272 273 public Message remove() { 274 Message retval=null; 275 Long key; 276 boolean bounded_buffer_enabled=max_xmit_buf_size > 0; 277 278 try { 279 lock.writeLock().acquire(); 280 try { 281 while(received_msgs.size() > 0) { 282 if(log.isTraceEnabled()) { 283 StringBuffer sb=new StringBuffer ("received msgs="); 284 sb.append(received_msgs.size()).append(", max_xmit_buf_size=").append(max_xmit_buf_size); 285 log.trace(sb.toString()); 286 } 287 288 key=(Long )received_msgs.firstKey(); 289 retval=(Message)received_msgs.get(key); 290 if(retval != null) { received_msgs.remove(key); if(discard_delivered_msgs == false) { 293 delivered_msgs.put(key, retval); } 295 head++; return retval; 297 } 298 else { if(bounded_buffer_enabled && received_msgs.size() > max_xmit_buf_size) { 300 received_msgs.remove(key); head++; 302 retransmitter.remove(key.longValue()); 303 } 304 else { 305 break; 306 } 307 } 308 } 309 return retval; 310 } 311 finally { 312 lock.writeLock().release(); 313 } 314 } 315 catch(InterruptedException e) { 316 log.error("failed acquiring write lock", e); 317 return null; 318 } 319 } 320 321 322 323 328 public void stable(long seqno) { 329 try { 330 lock.writeLock().acquire(); 331 try { 332 SortedMap m=delivered_msgs.headMap(new Long (seqno +1)); 335 if(m.size() > 0) 336 lowest_seen=Math.max(lowest_seen, ((Long )m.lastKey()).longValue()); 337 m.clear(); } 339 finally { 340 lock.writeLock().release(); 341 } 342 } 343 catch(InterruptedException e) { 344 log.error("failed acquiring write lock", e); 345 } 346 } 347 348 349 352 public void reset() { 353 try { 354 lock.writeLock().acquire(); 355 try { 356 if(retransmitter != null) 357 retransmitter.reset(); 358 _reset(); 359 } 360 finally { 361 lock.writeLock().release(); 362 } 363 } 364 catch(InterruptedException e) { 365 log.error("failed acquiring write lock", e); 366 } 367 } 368 369 370 373 public void destroy() { 374 try { 375 lock.writeLock().acquire(); 376 try { 377 if(retransmitter != null) 378 retransmitter.stop(); 379 _reset(); 380 } 381 finally { 382 lock.writeLock().release(); 383 } 384 } 385 catch(InterruptedException e) { 386 log.error("failed acquiring write lock", e); 387 } 388 } 389 390 391 395 public long getHighestDelivered() { 396 try { 397 lock.readLock().acquire(); 398 try { 399 return (Math.max(head - 1, -1)); 400 } 401 finally { 402 lock.readLock().release(); 403 } 404 } 405 catch(InterruptedException e) { 406 log.error("failed acquiring read lock", e); 407 return -1; 408 } 409 } 410 411 412 417 public long getLowestSeen() { 418 try { 419 lock.readLock().acquire(); 420 try { 421 return (lowest_seen); 422 } 423 finally { 424 lock.readLock().release(); 425 } 426 } 427 catch(InterruptedException e) { 428 log.error("failed acquiring read lock", e); 429 return -1; 430 } 431 } 432 433 434 440 public long getHighestSeen() { 441 try { 442 lock.readLock().acquire(); 443 try { 444 return (highest_seen); 445 } 446 finally { 447 lock.readLock().release(); 448 } 449 } 450 catch(InterruptedException e) { 451 log.error("failed acquiring read lock", e); 452 return -1; 453 } 454 } 455 456 457 465 public List getMissingMessages(long low, long high) { 466 List retval=new List(); 467 469 if(low > high) { 470 if(log.isErrorEnabled()) log.error("invalid range: low (" + low + 471 ") is higher than high (" + high + ')'); 472 return null; 473 } 474 475 try { 476 lock.readLock().acquire(); 477 try { 478 479 SortedMap m=received_msgs.subMap(new Long (low), new Long (high+1)); 482 for(Iterator it=m.keySet().iterator(); it.hasNext();) { 483 retval.add(it.next()); 484 } 485 486 493 return retval; 494 } 495 finally { 496 lock.readLock().release(); 497 } 498 } 499 catch(InterruptedException e) { 500 log.error("failed acquiring read lock", e); 501 return null; 502 } 503 } 504 505 506 513 public long getHighestReceived() { 514 try { 515 lock.readLock().acquire(); 516 try { 517 return Math.max(tail - 1, -1); 518 } 519 finally { 520 lock.readLock().release(); 521 } 522 } 523 catch(InterruptedException e) { 524 log.error("failed acquiring read lock", e); 525 return -1; 526 } 527 } 528 529 530 536 public List getMessagesHigherThan(long seqno) { 537 List retval=new List(); 538 539 try { 540 lock.readLock().acquire(); 541 try { 542 SortedMap m=received_msgs.tailMap(new Long (seqno+1)); 544 for(Iterator it=m.values().iterator(); it.hasNext();) { 545 retval.add((it.next())); 546 } 547 548 m=delivered_msgs.tailMap(new Long (seqno +1)); 551 for(Iterator it=m.values().iterator(); it.hasNext();) { 552 retval.add(((Message)it.next()).copy()); 553 } 554 return (retval); 555 556 } 557 finally { 558 lock.readLock().release(); 559 } 560 } 561 catch(InterruptedException e) { 562 log.error("failed acquiring read lock", e); 563 return null; 564 } 565 } 566 567 568 573 public List getMessagesInRange(long lower, long upper) { 574 List retval=new List(); 575 576 try { 577 lock.readLock().acquire(); 578 try { 579 SortedMap m=received_msgs.subMap(new Long (lower +1), new Long (upper +1)); 581 for(Iterator it=m.values().iterator(); it.hasNext();) { 582 retval.add(it.next()); 583 } 584 585 m=delivered_msgs.subMap(new Long (lower +1), new Long (upper +1)); 586 for(Iterator it=m.values().iterator(); it.hasNext();) { 587 retval.add(((Message)it.next()).copy()); 588 } 589 return retval; 590 591 } 592 finally { 593 lock.readLock().release(); 594 } 595 } 596 catch(InterruptedException e) { 597 log.error("failed acquiring read lock", e); 598 return null; 599 } 600 } 601 602 603 610 public List getMessagesInList(List missing_msgs) { 611 List ret=new List(); 612 613 if(missing_msgs == null) { 614 if(log.isErrorEnabled()) log.error("argument list is null"); 615 return ret; 616 } 617 618 try { 619 lock.readLock().acquire(); 620 try { 621 Long seqno; 622 Message msg; 623 for(Enumeration en=missing_msgs.elements(); en.hasMoreElements();) { 624 seqno=(Long )en.nextElement(); 625 msg=(Message)delivered_msgs.get(seqno); 626 if(msg != null) 627 ret.add(msg.copy()); 628 msg=(Message)received_msgs.get(seqno); 629 if(msg != null) 630 ret.add(msg.copy()); 631 } 632 return ret; 633 } 634 finally { 635 lock.readLock().release(); 636 } 637 } 638 catch(InterruptedException e) { 639 log.error("failed acquiring read lock", e); 640 return null; 641 } 642 } 643 644 645 public int size() { 646 boolean acquired=false; 647 try { 648 lock.readLock().acquire(); 649 acquired=true; 650 } 651 catch(InterruptedException e) {} 652 try { 653 return received_msgs.size(); 654 } 655 finally { 656 if(acquired) 657 lock.readLock().release(); 658 } 659 } 660 661 662 public String toString() { 663 StringBuffer sb=new StringBuffer (); 664 try { 665 lock.readLock().acquire(); 666 try { 667 sb.append("received_msgs: " + printReceivedMessages()); 668 sb.append(", delivered_msgs: " + printDeliveredMessages()); 669 } 670 finally { 671 lock.readLock().release(); 672 } 673 } 674 catch(InterruptedException e) { 675 log.error("failed acquiring read lock", e); 676 return ""; 677 } 678 679 return sb.toString(); 680 } 681 682 683 687 String printDeliveredMessages() { 688 StringBuffer sb=new StringBuffer (); 689 Long min=null, max=null; 690 691 if(delivered_msgs.size() > 0) { 692 try {min=(Long )delivered_msgs.firstKey();} catch(NoSuchElementException ex) {} 693 try {max=(Long )delivered_msgs.lastKey();} catch(NoSuchElementException ex) {} 694 } 695 sb.append('[').append(min).append(" - ").append(max).append(']'); 696 return sb.toString(); 697 } 698 699 700 704 String printReceivedMessages() { 705 StringBuffer sb=new StringBuffer (); 706 sb.append('['); 707 if(received_msgs.size() > 0) { 708 Long first=null, last=null; 709 try {first=(Long )received_msgs.firstKey();} catch(NoSuchElementException ex) {} 710 try {last=(Long )received_msgs.lastKey();} catch(NoSuchElementException ex) {} 711 sb.append(first).append(" - ").append(last); 712 int non_received=0; 713 Map.Entry entry; 714 715 for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) { 716 entry=(Map.Entry)it.next(); 717 if(entry.getValue() == null) 718 non_received++; 719 } 720 sb.append(" (size=").append(received_msgs.size()).append(", missing=").append(non_received).append(')'); 721 } 722 sb.append(']'); 723 return sb.toString(); 724 } 725 726 727 728 729 733 private void updateLowestSeen() { 734 Long lowest_seqno=null; 735 736 740 749 750 if(delivered_msgs.size() > 0) { 752 try { 753 lowest_seqno=(Long )delivered_msgs.firstKey(); 754 if(lowest_seqno != null) 755 lowest_seen=lowest_seqno.longValue(); 756 } 757 catch(NoSuchElementException ex) { 758 } 759 } 760 else { 762 if(received_msgs.size() > 0) { 763 try { 764 lowest_seqno=(Long )received_msgs.firstKey(); 765 if(received_msgs.get(lowest_seqno) != null) { lowest_seen=lowest_seqno.longValue(); 767 } 768 } 769 catch(NoSuchElementException ex) {} 770 } 771 } 772 } 773 774 775 780 824 825 832 private void _reset() { 833 received_msgs.clear(); 834 delivered_msgs.clear(); 835 head=0; 836 tail=0; 837 lowest_seen=0; 838 highest_seen=0; 839 } 840 841 842 843 } 844 | Popular Tags |