1 3 package org.jgroups.stack; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.Address; 8 import org.jgroups.util.TimeScheduler; 9 import org.jgroups.util.Util; 10 11 import java.util.ArrayList ; 12 import java.util.LinkedList ; 13 import java.util.ListIterator ; 14 15 16 31 public class Retransmitter { 32 33 private static final long SEC=1000; 34 35 private static long[] RETRANSMIT_TIMEOUTS={2 * SEC, 3 * SEC, 5 * SEC, 8 * SEC}; 36 37 private static final long SUSPEND_TIMEOUT=2000; 38 39 private Address sender=null; 40 private final LinkedList msgs=new LinkedList (); 41 private RetransmitCommand cmd=null; 42 private boolean retransmitter_owned; 43 private TimeScheduler retransmitter=null; 44 protected static final Log log=LogFactory.getLog(Retransmitter.class); 45 46 47 48 public interface RetransmitCommand { 49 59 void retransmit(long first_seqno, long last_seqno, Address sender); 60 } 61 62 63 69 public Retransmitter(Address sender, RetransmitCommand cmd, TimeScheduler sched) { 70 init(sender, cmd, sched, false); 71 } 72 73 74 79 public Retransmitter(Address sender, RetransmitCommand cmd) { 80 init(sender, cmd, new TimeScheduler(SUSPEND_TIMEOUT), true); 81 } 82 83 84 public void setRetransmitTimeouts(long[] timeouts) { 85 if(timeouts != null) 86 RETRANSMIT_TIMEOUTS=timeouts; 87 } 88 89 90 99 public void add(long first_seqno, long last_seqno) { 100 Entry e; 101 102 if(first_seqno > last_seqno) { 103 long tmp=first_seqno; 104 first_seqno=last_seqno; 105 last_seqno=tmp; 106 } 107 synchronized(msgs) { 108 e=new Entry(first_seqno, last_seqno, RETRANSMIT_TIMEOUTS); 109 msgs.add(e); 110 retransmitter.add(e); 111 } 112 } 113 114 120 public void remove(long seqno) { 121 Entry e; 122 123 synchronized(msgs) { 124 for(ListIterator it=msgs.listIterator(); it.hasNext();) { 125 e=(Entry)it.next(); 126 synchronized(e) { 127 if(seqno < e.low || seqno > e.high) continue; 128 e.remove(seqno); 129 if(e.low > e.high) { 130 e.cancel(); 131 it.remove(); 132 } 133 } 134 break; 135 } 136 } 137 } 138 139 143 public void reset() { 144 Entry entry; 145 146 synchronized(msgs) { 147 for(ListIterator it=msgs.listIterator(); it.hasNext();) { 148 entry=(Entry)it.next(); 149 entry.cancel(); 150 } 151 msgs.clear(); 152 } 153 } 154 155 164 public void stop() { 165 Entry entry; 166 167 synchronized(msgs) { 170 if(retransmitter_owned) { 171 try { 172 retransmitter.stop(); 173 } 174 catch(InterruptedException ex) { 175 if(log.isErrorEnabled()) log.error(Util.printStackTrace(ex)); 176 } 177 } 178 else { 179 for(ListIterator it=msgs.listIterator(); it.hasNext();) { 180 entry=(Entry)it.next(); 181 entry.cancel(); 182 } 183 } 184 msgs.clear(); 185 } 186 } 187 188 189 public String toString() { 190 return (msgs.size() + " messages to retransmit: (" + msgs.toString() + ')'); 191 } 192 193 194 195 196 197 198 199 208 private void init(Address sender, RetransmitCommand cmd, TimeScheduler sched, boolean sched_owned) { 209 this.sender=sender; 210 this.cmd=cmd; 211 retransmitter_owned=sched_owned; 212 retransmitter=sched; 213 } 214 215 216 217 218 219 220 223 private static abstract class Task implements TimeScheduler.Task { 224 private final Interval intervals; 225 private boolean cancelled; 226 227 protected Task(long[] intervals) { 228 this.intervals=new Interval(intervals); 229 this.cancelled=false; 230 } 231 232 public long nextInterval() { 233 return (intervals.next()); 234 } 235 236 public boolean cancelled() { 237 return (cancelled); 238 } 239 240 public void cancel() { 241 cancelled=true; 242 } 243 } 244 245 246 257 private class Entry extends Task { 258 public long low; 259 public long high; 260 public final java.util.List list; 261 262 public Entry(long low, long high, long[] intervals) { 263 super(intervals); 264 this.low=low; 265 this.high=high; 266 list=new ArrayList (); 267 list.add(new long[]{low, high}); 268 } 269 270 288 public void remove(long seqno) { 289 int i; 290 long[] bounds, newBounds; 291 292 bounds=null; 293 synchronized(this) { 294 for(i=0; i < list.size(); ++i) { 295 bounds=(long[])list.get(i); 296 if(seqno < bounds[0] || seqno > bounds[1]) continue; 297 break; 298 } 299 if(i == list.size()) return; 300 301 if(seqno == bounds[0]) { 302 if(bounds[0] == bounds[1]) 303 list.remove(i); 304 else 305 bounds[0]++; 306 if(i == 0) 307 low=list.size() == 0 ? high + 1 : ((long[])list.get(i))[0]; 308 } 309 else if(seqno == bounds[1]) { 310 bounds[1]--; 311 if(i == list.size() - 1) high=((long[])list.get(i))[1]; 312 } 313 else { 314 newBounds=new long[2]; 315 newBounds[0]=seqno + 1; 316 newBounds[1]=bounds[1]; 317 bounds[1]=seqno - 1; 318 list.add(i + 1, newBounds); 319 } 320 } 321 } 322 323 327 public void run() { 328 long[] bounds; 329 330 synchronized(this) { 331 for(int i=0; i < list.size(); ++i) { 332 bounds=(long[])list.get(i); 333 cmd.retransmit(bounds[0], bounds[1], sender); 334 } 335 } 336 } 337 338 public String toString() { 339 StringBuffer sb=new StringBuffer (); 340 if(low == high) 341 sb.append(low); 342 else 343 sb.append(low).append(':').append(high); 344 return sb.toString(); 345 } 346 347 } 349 350 public static void main(String [] args) { 351 Retransmitter xmitter; 352 Address sender; 353 354 try { 355 sender=new org.jgroups.stack.IpAddress("localhost", 5555); 356 xmitter=new Retransmitter(sender, new MyXmitter()); 357 xmitter.setRetransmitTimeouts(new long[]{1000, 2000, 4000, 8000}); 358 359 xmitter.add(1, 10); 360 xmitter.remove(1); 361 xmitter.remove(2); 362 xmitter.remove(4); 363 364 Util.sleep(3000); 365 xmitter.remove(3); 366 367 Util.sleep(1000); 368 xmitter.remove(10); 369 xmitter.remove(8); 370 xmitter.remove(6); 371 xmitter.remove(7); 372 xmitter.remove(9); 373 xmitter.remove(5); 374 } 375 catch(Exception e) { 376 System.err.println(e); 377 } 378 } 379 380 381 static class MyXmitter implements Retransmitter.RetransmitCommand { 382 383 public void retransmit(long first_seqno, long last_seqno, Address sender) { 384 System.out.println("-- " + new java.util.Date () + ": retransmit(" + first_seqno + ", " + 385 last_seqno + ", " + sender + ')'); 386 } 387 } 388 389 static void sleep(long timeout) { 390 Util.sleep(timeout); 391 } 392 393 } 394 395 | Popular Tags |