KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > FRAG_Test


1 package org.jgroups.protocols;
2
3 import junit.framework.Test;
4 import junit.framework.TestCase;
5 import junit.framework.TestSuite;
6 import org.jgroups.Event;
7 import org.jgroups.Global;
8 import org.jgroups.Message;
9 import org.jgroups.View;
10 import org.jgroups.debug.Simulator;
11 import org.jgroups.stack.IpAddress;
12 import org.jgroups.stack.Protocol;
13
14 import java.nio.ByteBuffer JavaDoc;
15 import java.util.Properties JavaDoc;
16 import java.util.Vector JavaDoc;
17
18 /**
19  * Tests the fragmentation (FRAG) protocol for http://jira.jboss.com/jira/browse/JGRP-215
20  * @author Bela Ban
21  */

22 public class FRAG_Test extends TestCase {
23     IpAddress a1;
24     Vector JavaDoc members;
25     View v;
26     static Simulator s=null;
27     static int num_done=0;
28
29     static Sender[] senders=null;
30
31     public static final int SIZE=10000; // bytes
32
public static final int NUM_MSGS=10;
33     public static final int NUM_THREADS=100;
34
35
36     public FRAG_Test(String JavaDoc name) {
37         super(name);
38     }
39
40
41     public void setUp() throws Exception JavaDoc {
42         super.setUp();
43         a1=new IpAddress(1111);
44         members=new Vector JavaDoc();
45         members.add(a1);
46         v=new View(a1, 1, members);
47         s=new Simulator();
48         s.setLocalAddress(a1);
49         s.setView(v);
50         s.addMember(a1);
51         Protocol frag=createProtocol();
52         System.out.println("protocol to be tested: " + frag);
53         Properties JavaDoc props=new Properties JavaDoc();
54         props.setProperty("frag_size", "512");
55         props.setProperty("up_thread", "false");
56         props.setProperty("down_thread", "false");
57         frag.setPropertiesInternal(props);
58         Protocol[] stack=new Protocol[]{frag};
59         s.setProtocolStack(stack);
60         s.start();
61     }
62
63
64     protected Protocol createProtocol() {
65         return new FRAG();
66     }
67
68     public void tearDown() throws Exception JavaDoc {
69         super.tearDown();
70         s.stop();
71     }
72
73
74
75     public void testFragmentation() throws InterruptedException JavaDoc {
76         FRAG_Test.Receiver r=new FRAG_Test.Receiver();
77         s.setReceiver(r);
78
79         senders=new Sender[NUM_THREADS];
80         for(int i=0; i < senders.length; i++) {
81             senders[i]=new Sender(i);
82         }
83
84         for(int i=0; i < senders.length; i++) {
85             Sender sender=senders[i];
86             sender.start();
87         }
88
89         for(int i=0; i < senders.length; i++) {
90             Sender sender=senders[i];
91             sender.join(5000);
92             if(sender.isAlive()) {
93                 System.err.println("sender #" + i + " could not be joined (still alive)");
94             }
95         }
96
97         int sent=0, received=0, corrupted=0;
98         for(int i=0; i < senders.length; i++) {
99             Sender sender=senders[i];
100             received+=sender.getNumReceived();
101             sent+=sender.getNumSent();
102             corrupted+=sender.getNumCorrupted();
103         }
104
105         System.out.println("sent: " + sent + ", received: " + received + ", corrupted: " + corrupted);
106         assertEquals("sent and received should be the same", sent, received);
107         assertEquals("we should have 0 corrupted messages", 0, corrupted);
108     }
109
110
111
112
113     static class Sender extends Thread JavaDoc {
114         int id=-1;
115         int num_sent=0;
116         int num_received=0;
117         int num_corrupted=0;
118         boolean done=false;
119
120         public int getIdent() {
121             return id;
122         }
123
124         public int getNumReceived() {
125             return num_received;
126         }
127
128         public int getNumSent() {
129             return num_sent;
130         }
131
132         public int getNumCorrupted() {
133             return num_corrupted;
134         }
135
136         public Sender(int id) {
137             super("sender #" + id);
138             this.id=id;
139         }
140
141         public void run() {
142             byte[] buf=createBuffer(id);
143             Message msg;
144             Event evt;
145
146             for(int i=0; i < NUM_MSGS; i++) {
147                 msg=new Message(null, null, buf);
148                 evt=new Event(Event.MSG, msg);
149                 s.send(evt);
150                 num_sent++;
151             }
152
153             synchronized(this) {
154                 try {
155                     while(!done)
156                         this.wait(500);
157                     num_done++;
158                     System.out.println("thread #" + id + " is done (" + num_done + ")");
159                 }
160                 catch(InterruptedException JavaDoc e) {
161                 }
162             }
163         }
164
165         private byte[] createBuffer(int id) {
166             ByteBuffer JavaDoc buf=ByteBuffer.allocate(SIZE);
167             int elements=SIZE / Global.INT_SIZE;
168             for(int i=0; i < elements; i++) {
169                 buf.putInt(id);
170             }
171             return buf.array();
172         }
173
174         /** 1 int has already been read by the Receiver */
175         public void verify(ByteBuffer JavaDoc buf) {
176             boolean corrupted=false;
177
178             int num_elements=(SIZE / Global.INT_SIZE) -1;
179             int tmp;
180             for(int i=0; i < num_elements; i++) {
181                 tmp=buf.getInt();
182                 if(tmp != id) {
183                     corrupted=true;
184                     break;
185                 }
186             }
187
188             if(corrupted)
189                 num_corrupted++;
190             else
191                 num_received++;
192
193             if(num_corrupted + num_received >= NUM_MSGS) {
194                 synchronized(this) {
195                     done=true;
196                     this.notify();
197                 }
198             }
199         }
200     }
201
202     static class Receiver implements Simulator.Receiver {
203         int received=0;
204
205         public void receive(Event evt) {
206             if(evt.getType() == Event.MSG) {
207                 received++;
208                 if(received % 1000 == 0)
209                     System.out.println("<== " + received);
210
211                 Message msg=(Message)evt.getArg();
212                 byte[] data=msg.getBuffer();
213                 ByteBuffer JavaDoc buf=ByteBuffer.wrap(data);
214                 int id=buf.getInt();
215                 Sender sender=senders[id];
216                 sender.verify(buf);
217             }
218         }
219     }
220
221
222
223     public static Test suite() {
224         return new TestSuite(FRAG_Test.class);
225     }
226
227     public static void main(String JavaDoc[] args) {
228         junit.textui.TestRunner.run(FRAG_Test.suite());
229     }
230 }
231
Popular Tags