1 3 package org.jgroups.stack; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.Event; 10 import org.jgroups.Message; 11 import org.jgroups.util.Queue; 12 import org.jgroups.util.Util; 13 14 import java.util.HashMap ; 15 16 17 26 public class AckSenderWindow implements Retransmitter.RetransmitCommand { 27 RetransmitCommand retransmit_command = null; final HashMap msgs = new HashMap (); long[] interval = new long[]{400,800,1200,1600}; 30 final Retransmitter retransmitter = new Retransmitter(null, this); 31 final Queue msg_queue = new Queue(); int window_size = -1; 34 35 int min_threshold = -1; 36 boolean use_sliding_window = false, queueing = false; 37 Protocol transport = null; static final Log log=LogFactory.getLog(AckSenderWindow.class); 39 40 41 public interface RetransmitCommand { 42 void retransmit(long seqno, Message msg); 43 } 44 45 46 52 public AckSenderWindow(RetransmitCommand com) { 53 retransmit_command = com; 54 retransmitter.setRetransmitTimeouts(interval); 55 } 56 57 58 public AckSenderWindow(RetransmitCommand com, long[] interval) { 59 retransmit_command = com; 60 this.interval = interval; 61 retransmitter.setRetransmitTimeouts(interval); 62 } 63 64 68 public AckSenderWindow(RetransmitCommand com, long[] interval, Protocol transport) { 69 retransmit_command = com; 70 this.interval = interval; 71 this.transport = transport; 72 retransmitter.setRetransmitTimeouts(interval); 73 } 74 75 76 public void setWindowSize(int window_size, int min_threshold) { 77 this.window_size = window_size; 78 this.min_threshold = min_threshold; 79 80 if (min_threshold > window_size) { 82 this.min_threshold = window_size; 83 this.window_size = min_threshold; 84 if(log.isWarnEnabled()) log.warn("min_threshold (" + min_threshold + 85 ") has to be less than window_size ( " + window_size + "). Values are swapped"); 86 } 87 if (this.window_size <= 0) { 88 this.window_size = this.min_threshold > 0 ? (int) (this.min_threshold * 1.5) : 1000; 89 if(log.isWarnEnabled()) log.warn("window_size is <= 0, setting it to " + this.window_size); 90 } 91 if (this.min_threshold <= 0) { 92 this.min_threshold = this.window_size > 0 ? (int) (this.window_size * 0.5) : 250; 93 if(log.isWarnEnabled()) log.warn("min_threshold is <= 0, setting it to " + this.min_threshold); 94 } 95 96 if(log.isTraceEnabled()) 97 log.trace("window_size=" + this.window_size + ", min_threshold=" + this.min_threshold); 98 use_sliding_window = true; 99 } 100 101 102 public void reset() { 103 synchronized (msgs) { 104 msgs.clear(); 105 } 106 107 retransmitter.reset(); 110 } 111 112 113 120 public void add(long seqno, Message msg) { 121 Long tmp=new Long (seqno); 122 123 synchronized(msgs) { 124 if(msgs.containsKey(tmp)) 125 return; 126 127 if(!use_sliding_window) { 128 addMessage(seqno, tmp, msg); 129 } 130 else { if(queueing) 132 addToQueue(seqno, msg); 133 else { 134 if(msgs.size() + 1 > window_size) { 135 queueing=true; 136 addToQueue(seqno, msg); 137 if(log.isTraceEnabled()) 138 log.trace("window_size (" + window_size + ") was exceeded, " + 139 "starting to queue messages until window size falls under " + min_threshold); 140 } 141 else { 142 addMessage(seqno, tmp, msg); 143 } 144 } 145 } 146 } 147 } 148 149 150 156 public void ack(long seqno) { 157 Long tmp=new Long (seqno); 158 Entry entry; 159 160 synchronized(msgs) { 161 msgs.remove(tmp); 162 retransmitter.remove(seqno); 163 164 if(use_sliding_window && queueing) { 165 if(msgs.size() < min_threshold) { if(log.isTraceEnabled()) 167 log.trace("number of messages in table fell under min_threshold (" + 168 min_threshold + "): adding " + msg_queue.size() + " messages on queue"); 169 170 while(msgs.size() < window_size) { 171 if((entry=removeFromQueue()) != null) 172 addMessage(entry.seqno, new Long (entry.seqno), entry.msg); 173 else 174 break; 175 } 176 177 if(msgs.size() + 1 > window_size) { 178 if(log.isTraceEnabled()) 179 log.trace("exceeded window_size (" + window_size + ") again, will still queue"); 180 return; } 182 else 183 queueing=false; 185 if(log.isTraceEnabled()) log.trace("set queueing to false (table size=" + msgs.size() + ')'); 186 } 187 } 188 } 189 } 190 191 192 public String toString() { 193 return msgs.keySet().toString() + " (retransmitter: " + retransmitter.toString() + ')'; 194 } 195 196 197 public void retransmit(long first_seqno, long last_seqno, Address sender) { 198 Message msg; 199 200 if(retransmit_command != null) { 201 for(long i = first_seqno; i <= last_seqno; i++) { 204 if((msg = (Message) msgs.get(new Long (i))) != null) { retransmit_command.retransmit(i, msg); 206 } 207 } 208 } 209 } 210 211 212 213 214 215 216 217 void addMessage(long seqno, Long tmp, Message msg) { 218 if (transport != null) 219 transport.passDown(new Event(Event.MSG, msg)); 220 msgs.put(tmp, msg); 221 retransmitter.add(seqno, seqno); 222 } 223 224 void addToQueue(long seqno, Message msg) { 225 try { 226 msg_queue.add(new Entry(seqno, msg)); 227 } 228 catch(Exception ex) { 229 if(log.isErrorEnabled()) log.error("exception=" + ex); 230 } 231 } 232 233 Entry removeFromQueue() { 234 try { 235 return msg_queue.size() == 0 ? null : (Entry)msg_queue.remove(); 236 } 237 catch(Exception ex) { 238 if(log.isErrorEnabled()) log.error("exception=" + ex); 239 return null; 240 } 241 } 242 243 244 245 246 247 248 class Entry { 249 final long seqno; 250 final Message msg; 251 252 Entry(long seqno, Message msg) { 253 this.seqno = seqno; 254 this.msg = msg; 255 } 256 } 257 258 259 static class Dummy implements RetransmitCommand { 260 final long last_xmit_req = 0; 261 long curr_time; 262 263 264 public void retransmit(long seqno, Message msg) { 265 266 if(log.isDebugEnabled()) log.debug("seqno=" + seqno); 267 268 curr_time = System.currentTimeMillis(); 269 } 270 } 271 272 273 public static void main(String [] args) { 274 long[] xmit_timeouts = {1000, 2000, 3000, 4000}; 275 AckSenderWindow win = new AckSenderWindow(new Dummy(), xmit_timeouts); 276 277 278 279 final int NUM = 1000; 280 281 for (int i = 1; i < NUM; i++) 282 win.add(i, new Message()); 283 284 285 System.out.println(win); 286 Util.sleep(5000); 287 288 for (int i = 1; i < NUM; i++) { 289 if (i % 2 == 0) win.ack(i); 291 } 292 293 System.out.println(win); 294 Util.sleep(4000); 295 296 for (int i = 1; i < NUM; i++) { 297 if (i % 2 != 0) win.ack(i); 299 } 300 System.out.println(win); 301 302 if (true) { 303 Util.sleep(4000); 304 System.out.println("--done--"); 305 return; 306 } 307 308 309 win.add(3, new Message()); 310 win.add(5, new Message()); 311 win.add(4, new Message()); 312 win.add(8, new Message()); 313 win.add(9, new Message()); 314 win.add(6, new Message()); 315 win.add(7, new Message()); 316 win.add(3, new Message()); 317 System.out.println(win); 318 319 320 try { 321 Thread.sleep(5000); 322 win.ack(5); 323 System.out.println("ack(5)"); 324 win.ack(4); 325 System.out.println("ack(4)"); 326 win.ack(6); 327 System.out.println("ack(6)"); 328 win.ack(7); 329 System.out.println("ack(7)"); 330 win.ack(8); 331 System.out.println("ack(8)"); 332 win.ack(6); 333 System.out.println("ack(6)"); 334 win.ack(9); 335 System.out.println("ack(9)"); 336 System.out.println(win); 337 338 Thread.sleep(5000); 339 win.ack(3); 340 System.out.println("ack(3)"); 341 System.out.println(win); 342 343 Thread.sleep(3000); 344 win.add(10, new Message()); 345 win.add(11, new Message()); 346 System.out.println(win); 347 Thread.sleep(3000); 348 win.ack(10); 349 System.out.println("ack(10)"); 350 win.ack(11); 351 System.out.println("ack(11)"); 352 System.out.println(win); 353 354 win.add(12, new Message()); 355 win.add(13, new Message()); 356 win.add(14, new Message()); 357 win.add(15, new Message()); 358 win.add(16, new Message()); 359 System.out.println(win); 360 361 Util.sleep(1000); 362 win.ack(12); 363 System.out.println("ack(12)"); 364 win.ack(13); 365 System.out.println("ack(13)"); 366 367 win.ack(15); 368 System.out.println("ack(15)"); 369 System.out.println(win); 370 371 Util.sleep(5000); 372 win.ack(16); 373 System.out.println("ack(16)"); 374 System.out.println(win); 375 376 Util.sleep(1000); 377 378 win.ack(14); 379 System.out.println("ack(14)"); 380 System.out.println(win); 381 } catch (Exception e) { 382 System.err.println(e); 383 } 384 } 385 386 } 387 | Popular Tags |