1 package org.jgroups.protocols; 2 3 import junit.framework.Test; 4 import junit.framework.TestCase; 5 import junit.framework.TestSuite; 6 import org.jgroups.Event; 7 import org.jgroups.Global; 8 import org.jgroups.Message; 9 import org.jgroups.View; 10 import org.jgroups.debug.Simulator; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.stack.Protocol; 13 14 import java.nio.ByteBuffer ; 15 import java.util.Properties ; 16 import java.util.Vector ; 17 18 22 public class FRAG_Test extends TestCase { 23 IpAddress a1; 24 Vector members; 25 View v; 26 static Simulator s=null; 27 static int num_done=0; 28 29 static Sender[] senders=null; 30 31 public static final int SIZE=10000; public static final int NUM_MSGS=10; 33 public static final int NUM_THREADS=100; 34 35 36 public FRAG_Test(String name) { 37 super(name); 38 } 39 40 41 public void setUp() throws Exception { 42 super.setUp(); 43 a1=new IpAddress(1111); 44 members=new Vector (); 45 members.add(a1); 46 v=new View(a1, 1, members); 47 s=new Simulator(); 48 s.setLocalAddress(a1); 49 s.setView(v); 50 s.addMember(a1); 51 Protocol frag=createProtocol(); 52 System.out.println("protocol to be tested: " + frag); 53 Properties props=new Properties (); 54 props.setProperty("frag_size", "512"); 55 props.setProperty("up_thread", "false"); 56 props.setProperty("down_thread", "false"); 57 frag.setPropertiesInternal(props); 58 Protocol[] stack=new Protocol[]{frag}; 59 s.setProtocolStack(stack); 60 s.start(); 61 } 62 63 64 protected Protocol createProtocol() { 65 return new FRAG(); 66 } 67 68 public void tearDown() throws Exception { 69 super.tearDown(); 70 s.stop(); 71 } 72 73 74 75 public void testFragmentation() throws InterruptedException { 76 FRAG_Test.Receiver r=new FRAG_Test.Receiver(); 77 s.setReceiver(r); 78 79 senders=new Sender[NUM_THREADS]; 80 for(int i=0; i < senders.length; i++) { 81 senders[i]=new Sender(i); 82 } 83 84 for(int i=0; i < senders.length; i++) { 85 Sender sender=senders[i]; 86 sender.start(); 87 } 88 89 for(int i=0; i < senders.length; i++) { 90 Sender sender=senders[i]; 91 sender.join(5000); 92 if(sender.isAlive()) { 93 System.err.println("sender #" + i + " could not be joined (still alive)"); 94 } 95 } 96 97 int sent=0, received=0, corrupted=0; 98 for(int i=0; i < senders.length; i++) { 99 Sender sender=senders[i]; 100 received+=sender.getNumReceived(); 101 sent+=sender.getNumSent(); 102 corrupted+=sender.getNumCorrupted(); 103 } 104 105 System.out.println("sent: " + sent + ", received: " + received + ", corrupted: " + corrupted); 106 assertEquals("sent and received should be the same", sent, received); 107 assertEquals("we should have 0 corrupted messages", 0, corrupted); 108 } 109 110 111 112 113 static class Sender extends Thread { 114 int id=-1; 115 int num_sent=0; 116 int num_received=0; 117 int num_corrupted=0; 118 boolean done=false; 119 120 public int getIdent() { 121 return id; 122 } 123 124 public int getNumReceived() { 125 return num_received; 126 } 127 128 public int getNumSent() { 129 return num_sent; 130 } 131 132 public int getNumCorrupted() { 133 return num_corrupted; 134 } 135 136 public Sender(int id) { 137 super("sender #" + id); 138 this.id=id; 139 } 140 141 public void run() { 142 byte[] buf=createBuffer(id); 143 Message msg; 144 Event evt; 145 146 for(int i=0; i < NUM_MSGS; i++) { 147 msg=new Message(null, null, buf); 148 evt=new Event(Event.MSG, msg); 149 s.send(evt); 150 num_sent++; 151 } 152 153 synchronized(this) { 154 try { 155 while(!done) 156 this.wait(500); 157 num_done++; 158 System.out.println("thread #" + id + " is done (" + num_done + ")"); 159 } 160 catch(InterruptedException e) { 161 } 162 } 163 } 164 165 private byte[] createBuffer(int id) { 166 ByteBuffer buf=ByteBuffer.allocate(SIZE); 167 int elements=SIZE / Global.INT_SIZE; 168 for(int i=0; i < elements; i++) { 169 buf.putInt(id); 170 } 171 return buf.array(); 172 } 173 174 175 public void verify(ByteBuffer buf) { 176 boolean corrupted=false; 177 178 int num_elements=(SIZE / Global.INT_SIZE) -1; 179 int tmp; 180 for(int i=0; i < num_elements; i++) { 181 tmp=buf.getInt(); 182 if(tmp != id) { 183 corrupted=true; 184 break; 185 } 186 } 187 188 if(corrupted) 189 num_corrupted++; 190 else 191 num_received++; 192 193 if(num_corrupted + num_received >= NUM_MSGS) { 194 synchronized(this) { 195 done=true; 196 this.notify(); 197 } 198 } 199 } 200 } 201 202 static class Receiver implements Simulator.Receiver { 203 int received=0; 204 205 public void receive(Event evt) { 206 if(evt.getType() == Event.MSG) { 207 received++; 208 if(received % 1000 == 0) 209 System.out.println("<== " + received); 210 211 Message msg=(Message)evt.getArg(); 212 byte[] data=msg.getBuffer(); 213 ByteBuffer buf=ByteBuffer.wrap(data); 214 int id=buf.getInt(); 215 Sender sender=senders[id]; 216 sender.verify(buf); 217 } 218 } 219 } 220 221 222 223 public static Test suite() { 224 return new TestSuite(FRAG_Test.class); 225 } 226 227 public static void main(String [] args) { 228 junit.textui.TestRunner.run(FRAG_Test.suite()); 229 } 230 } 231 | Popular Tags |