1 3 package org.jgroups.stack; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.Message; 10 import org.jgroups.util.TimeScheduler; 11 12 import java.io.PrintWriter ; 13 import java.io.StringWriter ; 14 import java.util.*; 15 16 17 18 19 38 public class AckMcastSenderWindow { 39 47 public interface RetransmitCommand { 48 55 void retransmit(long seqno, Message msg, Address dest); 56 } 57 58 59 62 private static abstract class Task implements TimeScheduler.Task { 63 private final Interval intervals; 64 private boolean cancelled; 65 66 protected Task(long[] intervals) { 67 this.intervals = new Interval(intervals); 68 this.cancelled = false; 69 } 70 public long nextInterval() { return(intervals.next()); } 71 public void cancel() { cancelled = true; } 72 public boolean cancelled() { return(cancelled); } 73 } 74 75 76 79 private class Entry extends Task { 80 81 public final long seqno; 82 83 public Message msg = null; 84 85 public final Hashtable senders = new Hashtable(); 86 87 public int num_received = 0; 88 89 public Entry(long seqno, Message msg, Vector dests, long[] intervals) { 90 super(intervals); 91 this.seqno = seqno; 92 this.msg = msg; 93 for (int i = 0; i < dests.size(); i++) 94 senders.put(dests.elementAt(i), Boolean.FALSE); 95 } 96 97 boolean allReceived() { 98 return(num_received >= senders.size()); 99 } 100 101 102 public void run() { _retransmit(this); } 103 104 public String toString() { 105 StringBuffer buff = new StringBuffer (); 106 buff.append("num_received = " + num_received + 107 ", received msgs = " + senders); 108 return(buff.toString()); 109 } 110 } 111 112 113 114 private static final long SEC = 1000; 115 116 private static final long[] RETRANSMIT_TIMEOUTS = { 117 2*SEC, 118 3*SEC, 119 5*SEC, 120 8*SEC}; 121 122 private static final long SUSPEND_TIMEOUT = 2000; 123 124 protected static final Log log=LogFactory.getLog(AckMcastSenderWindow.class); 125 126 127 129 private final Hashtable msgs = new Hashtable(); 130 131 132 private final LinkedList suspects=new LinkedList(); 133 134 135 private final int max_suspects=20; 136 137 141 private final Vector stable_msgs = new Vector(); 142 143 private boolean waiting = false; 144 145 147 private boolean retransmitter_owned; 148 149 private TimeScheduler retransmitter = null; 150 151 private long[] retransmit_intervals; 152 153 private RetransmitCommand cmd = null; 154 155 156 159 private static String _toString(Throwable ex) { 160 StringWriter sw = new StringWriter (); 161 PrintWriter pw = new PrintWriter (sw); 162 ex.printStackTrace(pw); 163 return(sw.toString()); 164 } 165 166 167 171 private void _retransmit(Entry entry) { 172 Address sender; 173 boolean received; 174 175 synchronized(entry) { 176 for(Enumeration e = entry.senders.keys(); e.hasMoreElements();) { 177 sender = (Address)e.nextElement(); 178 received = ((Boolean )entry.senders.get(sender)).booleanValue(); 179 if (!received) { 180 if(suspects.contains(sender)) { 181 182 if(log.isWarnEnabled()) log.warn("removing " + sender + 183 " from retransmit list as it is in the suspect list"); 184 remove(sender); 185 continue; 186 } 187 188 if(log.isInfoEnabled()) log.info("--> retransmitting msg #" + 189 entry.seqno + " to " + sender); 190 cmd.retransmit(entry.seqno, entry.msg.copy(), sender); 191 } 192 } 193 } 194 } 195 196 197 211 private void init(RetransmitCommand cmd, long[] retransmit_intervals, 212 TimeScheduler sched, boolean sched_owned) { 213 if (cmd == null) { 214 if(log.isErrorEnabled()) log.error("command is null. Cannot retransmit " + "messages !"); 215 throw new IllegalArgumentException ("cmd"); 216 } 217 218 retransmitter_owned = sched_owned; 219 retransmitter = sched; 220 this.retransmit_intervals = retransmit_intervals; 221 this.cmd = cmd; 222 223 start(); 224 } 225 226 227 237 public AckMcastSenderWindow(RetransmitCommand cmd, 238 long[] retransmit_intervals, TimeScheduler sched) { 239 init(cmd, retransmit_intervals, sched, false); 240 } 241 242 243 251 public AckMcastSenderWindow(RetransmitCommand cmd, TimeScheduler sched) { 252 init(cmd, RETRANSMIT_TIMEOUTS, sched, false); 253 } 254 255 256 257 266 public AckMcastSenderWindow(RetransmitCommand cmd, long[] retransmit_intervals) { 267 init(cmd, retransmit_intervals, new TimeScheduler(SUSPEND_TIMEOUT), true); 268 } 269 270 277 public AckMcastSenderWindow(RetransmitCommand cmd) { 278 this(cmd, RETRANSMIT_TIMEOUTS); 279 } 280 281 282 290 public void add(long seqno, Message msg, Vector receivers) { 291 Entry e; 292 293 if (waiting) return; 294 if (receivers.size() == 0) return; 295 296 synchronized(msgs) { 297 if (msgs.get(new Long (seqno)) != null) return; 298 e = new Entry(seqno, msg, receivers, retransmit_intervals); 299 msgs.put(new Long (seqno), e); 300 retransmitter.add(e); 301 } 302 } 303 304 305 314 public void ack(long seqno, Address sender) { 315 Entry entry; 316 Boolean received; 317 318 synchronized(msgs) { 319 entry = (Entry)msgs.get(new Long (seqno)); 320 if (entry == null) return; 321 322 synchronized(entry) { 323 received = (Boolean )entry.senders.get(sender); 324 if (received == null || received.booleanValue()) return; 325 326 entry.senders.put(sender, Boolean.TRUE); 328 entry.num_received++; 329 if (!entry.allReceived()) return; 330 } 331 332 synchronized(stable_msgs) { 333 entry.cancel(); 334 msgs.remove(new Long (seqno)); 335 stable_msgs.add(new Long (seqno)); 336 } 337 msgs.notifyAll(); 339 } 340 } 341 342 343 349 public void remove(Address obj) { 350 Long key; 351 Entry entry; 352 353 synchronized(msgs) { 354 for (Enumeration e = msgs.keys(); e.hasMoreElements();) { 355 key = (Long )e.nextElement(); 356 entry = (Entry)msgs.get(key); 357 synchronized(entry) { 358 Boolean received = (Boolean )entry.senders.remove(obj); 361 if(received == null) continue; if (received.booleanValue()) entry.num_received--; 363 if (!entry.allReceived()) continue; 364 } 365 synchronized(stable_msgs) { 366 entry.cancel(); 367 msgs.remove(key); 368 stable_msgs.add(key); 369 } 370 msgs.notifyAll(); 372 } 373 } 374 } 375 376 377 384 public void suspect(Address suspected) { 385 386 if(log.isInfoEnabled()) log.info("suspect is " + suspected); 387 remove(suspected); 388 suspects.add(suspected); 389 if(suspects.size() >= max_suspects) 390 suspects.removeFirst(); 391 } 392 393 394 398 public Vector getStableMessages() { 399 Vector retval; 400 401 synchronized(stable_msgs) { 402 retval = (stable_msgs.size() > 0)? (Vector)stable_msgs.clone():null; 403 if (stable_msgs.size() > 0) stable_msgs.clear(); 404 } 405 406 return(retval); 407 } 408 409 410 public void clearStableMessages() { 411 synchronized(stable_msgs) { 412 stable_msgs.clear(); 413 } 414 } 415 416 417 420 public long size() { 421 synchronized(msgs) { 422 return(msgs.size()); 423 } 424 } 425 426 427 428 public long getNumberOfResponsesExpected(long seqno) { 429 Entry entry=(Entry)msgs.get(new Long (seqno)); 430 if(entry != null) 431 return entry.senders.size(); 432 else 433 return -1; 434 } 435 436 437 public long getNumberOfResponsesReceived(long seqno) { 438 Entry entry=(Entry)msgs.get(new Long (seqno)); 439 if(entry != null) 440 return entry.num_received; 441 else 442 return -1; 443 } 444 445 446 public String printDetails(long seqno) { 447 Entry entry=(Entry)msgs.get(new Long (seqno)); 448 if(entry != null) 449 return entry.toString(); 450 else 451 return null; 452 } 453 454 455 463 public void waitUntilAllAcksReceived(long timeout) { 464 long time_to_wait, start_time, current_time; 465 Address suspect; 466 467 for(Iterator it=suspects.iterator(); it.hasNext();) { 469 suspect=(Address)it.next(); 470 remove(suspect); 471 } 472 473 time_to_wait = timeout; 474 waiting = true; 475 if (timeout <= 0) { 476 synchronized(msgs) { 477 while(msgs.size() > 0) 478 try { msgs.wait(); } catch(InterruptedException ex) {} 479 } 480 } else { 481 start_time = System.currentTimeMillis(); 482 synchronized(msgs) { 483 while(msgs.size() > 0) { 484 current_time = System.currentTimeMillis(); 485 time_to_wait = timeout - (current_time - start_time); 486 if (time_to_wait <= 0) break; 487 488 try { 489 msgs.wait(time_to_wait); 490 } catch(InterruptedException ex) { 491 if(log.isWarnEnabled()) log.warn(ex.toString()); 492 } 493 } 494 } 495 } 496 waiting = false; 497 } 498 499 500 501 502 506 public void start() { 507 if (retransmitter_owned) 508 retransmitter.start(); 509 } 510 511 512 521 public void stop() { 522 Entry entry; 523 524 synchronized(msgs) { 527 if (retransmitter_owned) { 528 try { 529 retransmitter.stop(); 530 } catch(InterruptedException ex) { 531 if(log.isErrorEnabled()) log.error(_toString(ex)); 532 } 533 } else { 534 for (Enumeration e = msgs.elements(); e.hasMoreElements();) { 535 entry = (Entry)e.nextElement(); 536 entry.cancel(); 537 } 538 } 539 msgs.clear(); 540 msgs.notifyAll(); 542 } 543 } 544 545 546 550 public void reset() { 551 Entry entry; 552 553 if (waiting) return; 554 555 synchronized(msgs) { 556 for (Enumeration e = msgs.elements(); e.hasMoreElements();) { 557 entry = (Entry)e.nextElement(); 558 entry.cancel(); 559 } 560 msgs.clear(); 561 msgs.notifyAll(); 562 } 563 } 564 565 566 public String toString() { 567 StringBuffer ret; 568 Entry entry; 569 Long key; 570 571 ret = new StringBuffer (); 572 synchronized(msgs) { 573 ret.append("msgs: (" + msgs.size() + ')'); 574 for (Enumeration e = msgs.keys(); e.hasMoreElements();) { 575 key = (Long )e.nextElement(); 576 entry = (Entry)msgs.get(key); 577 ret.append("key = " + key + ", value = " + entry + '\n'); 578 } 579 synchronized(stable_msgs) { 580 ret.append("\nstable_msgs: " + stable_msgs); 581 } 582 } 583 584 return(ret.toString()); 585 } 586 } 587 | Popular Tags |