1 package org.jgroups.tests; 3 4 5 6 import junit.framework.Test; 7 import junit.framework.TestCase; 8 import junit.framework.TestSuite; 9 import org.jgroups.Address; 10 import org.jgroups.Message; 11 import org.jgroups.stack.AckMcastSenderWindow; 12 import org.jgroups.stack.IpAddress; 13 14 import java.util.ArrayList ; 15 import java.util.Hashtable ; 16 import java.util.List ; 17 import java.util.Vector ; 18 import java.net.UnknownHostException ; 19 20 21 35 public class AckMcastSenderWindowTest extends TestCase { 36 private class Cmd 37 implements AckMcastSenderWindow.RetransmitCommand { 38 public void retransmit(long seqno, Message msg, Address addr) { 39 _retransmit(seqno, msg, addr); } 40 } 41 42 private class Acker extends Thread { public void run() { _ackerRun(); } } 43 44 45 46 private static Address[] _RECVS = { 47 new IpAddress(5000), 48 new IpAddress(5001), 49 new IpAddress(5002) 50 }; 51 52 53 54 private AckMcastSenderWindow.RetransmitCommand _cmd; 55 56 private AckMcastSenderWindow _win; 57 61 private Hashtable _tbl; 62 63 64 72 private void _put(long seqno, Address addr) { 73 List list; 74 75 synchronized(_tbl) { 76 if ((list = (List )_tbl.get(new Long (seqno))) == null) { 77 list = new ArrayList (); 78 _tbl.put(new Long (seqno), list); 79 } 80 if (!list.contains(addr)) list.add(addr); 81 else { if (list.isEmpty()) _tbl.remove(new Long (seqno)); } 82 } } 84 85 92 private void _remove(long seqno, Address addr) { 93 List list; 94 95 synchronized(_tbl) { 96 if ((list = (List )_tbl.get(new Long (seqno))) == null) return; 97 list.remove(addr); 98 if (list.isEmpty()) _tbl.remove(new Long (seqno)); 99 } } 101 102 105 private boolean _contains(long seqno, Address addr) { 106 List list; 107 108 synchronized(_tbl) { 109 if ((list = (List )_tbl.get(new Long (seqno))) == null) return(false); 110 return(list.contains(addr)); 111 } } 113 114 115 118 private void _ackerRun() { 119 _win.ack(2, _RECVS[2]); _remove(2, _RECVS[2]); 121 try { Thread.sleep(1000); 122 } catch(InterruptedException ex) { ex.printStackTrace(); } 123 124 _win.ack(1, _RECVS[1]); _remove(1, _RECVS[1]); 126 try { Thread.sleep(500); 127 } catch(InterruptedException ex) { ex.printStackTrace(); } 128 129 _win.ack(1, _RECVS[0]); _remove(1, _RECVS[0]); 133 _win.ack(2, _RECVS[0]); _remove(2, _RECVS[0]); 134 _win.ack(2, _RECVS[1]); _remove(2, _RECVS[1]); 135 try { Thread.sleep(500); 136 } catch(InterruptedException ex) { ex.printStackTrace(); } 137 138 _win.ack(1, _RECVS[2]); _remove(1, _RECVS[2]); 140 } 141 142 143 146 private void _retransmit(long seqno, Message msg, Address addr) { 147 if (!_contains(seqno, addr)) 148 fail("Acknowledging a non-existent msg, great!"); 149 else 150 System.out.println("retransmitting " + seqno); 151 } 152 153 154 160 public void test1() { 161 Vector dests = new Vector (); 162 Message msg = new Message(); 163 Acker acker = new Acker(); 164 long seqno; 165 166 for (int i = 0; i < _RECVS.length; ++i) dests.add(_RECVS[i]); 167 168 seqno = 1; 170 for (int i = 0; i < _RECVS.length; ++i) _put(seqno, _RECVS[i]); 171 _win.add(seqno, msg, dests); 172 173 seqno = 2; 175 for (int i = 0; i < _RECVS.length; ++i) _put(seqno, _RECVS[i]); 176 _win.add(seqno, msg, dests); 177 178 acker.start(); 180 try { acker.join(); 181 } catch(InterruptedException ex) { ex.printStackTrace(); } 182 183 _win.stop(); 184 } 186 187 public void testRemove() throws UnknownHostException { 188 AckMcastSenderWindow mywin=new AckMcastSenderWindow(new MyCommand(), new long[]{1000,2000,3000}); 189 Address sender1=new IpAddress("127.0.0.1", 10000); 190 Address sender2=new IpAddress("127.0.0.1", 10001); 191 Address sender3=new IpAddress("127.0.0.1", 10002); 192 Vector senders=new Vector (); 193 Message msg=new Message(); 194 long seqno=322649; 195 196 senders.addElement(sender1); 197 senders.addElement(sender2); 198 senders.addElement(sender3); 199 200 mywin.add(seqno, msg, (Vector )senders.clone()); 202 mywin.ack(seqno, sender1); 203 mywin.ack(seqno, sender2); 204 205 System.out.println("entry is " + mywin.printDetails(seqno)); 206 assertTrue(mywin.getNumberOfResponsesExpected(seqno) == 3); 207 assertTrue(mywin.getNumberOfResponsesReceived(seqno) == 2); 208 mywin.waitUntilAllAcksReceived(4000); 209 mywin.suspect(sender3); 210 assertTrue(mywin.size() == 0); } 212 213 214 public AckMcastSenderWindowTest(String name) { super(name); } 215 216 217 public void setUp() { 218 _cmd = new Cmd(); 219 _win = new AckMcastSenderWindow(_cmd); 220 _tbl = new Hashtable (); 221 } 222 223 public void tearDown() { 224 _win.stop(); 225 } 226 227 228 class MyCommand implements AckMcastSenderWindow.RetransmitCommand { 229 230 public void retransmit(long seqno, Message msg, Address dest) { 231 System.out.println("-- retransmitting " + seqno); 232 } 233 } 234 235 236 public static Test suite() { 237 TestSuite suite; 238 suite = new TestSuite(AckMcastSenderWindowTest.class); 239 return(suite); 240 } 241 public static void main(String [] args) { 242 String [] name = {AckMcastSenderWindowTest.class.getName()}; 243 junit.textui.TestRunner.main(name); 244 } 245 } 246 | Popular Tags |