KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > stack > RouterTest


1 // $Id: RouterTest.java,v 1.8 2005/04/25 08:25:49 belaban Exp $
2

3 package org.jgroups.tests.stack;
4
5 import junit.framework.Test;
6 import junit.framework.TestCase;
7 import junit.framework.TestSuite;
8 import org.jgroups.Address;
9 import org.jgroups.Message;
10 import org.jgroups.stack.GossipRouter;
11 import org.jgroups.stack.IpAddress;
12 import org.jgroups.util.List;
13 import org.jgroups.util.Promise;
14 import org.jgroups.util.Util;
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17
18 import java.io.DataInputStream JavaDoc;
19 import java.io.DataOutputStream JavaDoc;
20 import java.net.Socket JavaDoc;
21 import java.util.Random JavaDoc;
22
23 /**
24  * Tests routing protocol primitives with the new GossipRouter. Since 2.2.1,
25  * the GossipRouter is supposed to answer Gossip requests too.
26  * <p/>
27  * Note: Disable DEBUG logging before this test, otherwise the stress tests
28  * may timeout.
29  *
30  * @author Ovidiu Feodorov <ovidiuf@users.sourceforge.net>
31  * @version $Revision: 1.8 $
32  * @since 2.2.1
33  */

34 public class RouterTest extends TestCase {
35
36     private static final Log log = LogFactory.getLog(RouterTest.class);
37
38     private int routerPort=-1;
39     private Random JavaDoc random=new Random JavaDoc();
40
41     public RouterTest(String JavaDoc name) {
42         super(name);
43     }
44
45     public void setUp() throws Exception JavaDoc {
46         super.setUp();
47         routerPort=Utilities.startGossipRouter();
48     }
49
50     public void tearDown() throws Exception JavaDoc {
51         super.tearDown();
52         Utilities.stopGossipRouter();
53     }
54
55     /**
56      * Sends a GossipRouter.GET request to a router with an empty routing table.
57      */

58     public void testEmptyGET() throws Exception JavaDoc {
59         int len;
60         byte[] buffer;
61
62         log.warn("running testEmptyGET");
63
64         Socket JavaDoc s=new Socket JavaDoc("localhost", routerPort);
65         DataInputStream JavaDoc dis=new DataInputStream JavaDoc(s.getInputStream());
66         DataOutputStream JavaDoc dos=new DataOutputStream JavaDoc(s.getOutputStream());
67
68         // read the IpAddress sent by GossipRouter
69
len=dis.readInt();
70         buffer=new byte[len];
71         dis.readFully(buffer, 0, len);
72         IpAddress localAddr=(IpAddress)Util.objectFromByteBuffer(buffer);
73         assertEquals(localAddr.getIpAddress(), s.getLocalAddress());
74         assertEquals(localAddr.getPort(), s.getLocalPort());
75
76         // send GET request
77
dos.writeInt(GossipRouter.GET);
78         dos.writeUTF("nosuchgroup");
79
80         // read the answer
81
len=dis.readInt();
82         assertEquals(0, len);
83
84         // check for end of stream
85
assertEquals(-1, dis.read());
86
87         dis.close();
88         dos.close();
89         s.close();
90     }
91
92
93     /**
94      * Sends a GossipRouter.REGISTER request followed by a GossipRouter.GET for the
95      * group just registered.
96      */

97     public void test_REGISTER_GET() throws Exception JavaDoc {
98
99         log.warn("running test_REGISTER_GET");
100
101
102         int len;
103         byte[] buffer;
104         String JavaDoc groupName="TESTGROUP";
105
106         Socket JavaDoc s=new Socket JavaDoc("localhost", routerPort);
107         DataInputStream JavaDoc dis=new DataInputStream JavaDoc(s.getInputStream());
108         DataOutputStream JavaDoc dos=new DataOutputStream JavaDoc(s.getOutputStream());
109
110         // read the IpAddress sent by GossipRouter
111
len=dis.readInt();
112         buffer=new byte[len];
113         dis.readFully(buffer, 0, len);
114         IpAddress localAddr=(IpAddress)Util.objectFromByteBuffer(buffer);
115         assertEquals(localAddr.getIpAddress(), s.getLocalAddress());
116         assertEquals(localAddr.getPort(), s.getLocalPort());
117
118         // send REGISTER request
119
dos.writeInt(GossipRouter.REGISTER);
120         dos.writeUTF(groupName);
121
122         // send the Address back to the router
123
buffer=Util.objectToByteBuffer(localAddr);
124         dos.writeInt(buffer.length);
125         dos.write(buffer, 0, buffer.length);
126         dos.flush();
127
128         // registration is complete, send a GET request
129
Socket JavaDoc s2=new Socket JavaDoc("localhost", routerPort);
130         DataInputStream JavaDoc dis2=new DataInputStream JavaDoc(s2.getInputStream());
131         DataOutputStream JavaDoc dos2=new DataOutputStream JavaDoc(s2.getOutputStream());
132
133         // read the IpAddress sent by GossipRouter
134
len=dis2.readInt();
135         buffer=new byte[len];
136         dis2.readFully(buffer, 0, len);
137         IpAddress localAddr2=(IpAddress)Util.objectFromByteBuffer(buffer);
138         assertEquals(localAddr2.getIpAddress(), s2.getLocalAddress());
139         assertEquals(localAddr2.getPort(), s2.getLocalPort());
140
141         // send GET request
142
dos2.writeInt(GossipRouter.GET);
143         dos2.writeUTF(groupName);
144
145         // read the answer
146
len=dis2.readInt();
147         buffer=new byte[len];
148         dis2.readFully(buffer, 0, len);
149
150         List groupList=(List)Util.objectFromByteBuffer(buffer);
151         assertEquals(1, groupList.size());
152         assertEquals(localAddr, groupList.removeFromHead());
153
154         // check for end of stream
155
assertEquals(-1, dis2.read());
156
157         // close the GET connection
158
dis2.close();
159         dos2.close();
160         s2.close();
161
162         // close the routing connection
163
dis.close();
164         dos.close();
165         s.close();
166     }
167
168     /**
169      * Sends a GossipRouter.REGISTER request followed by a series of simple routing requests (to all
170      * members of the group, to itself, to an inexistent member).
171      */

172
173     public void test_REGISTER_Route_To_Self() throws Exception JavaDoc {
174
175         log.warn("running test_REGISTER_Route_To_Self");
176
177
178         int len;
179         byte[] buffer, destAddrBuffer;
180         String JavaDoc groupName="TESTGROUP";
181         Message msg;
182
183         Socket JavaDoc s=new Socket JavaDoc("localhost", routerPort);
184         DataInputStream JavaDoc dis=new DataInputStream JavaDoc(s.getInputStream());
185         DataOutputStream JavaDoc dos=new DataOutputStream JavaDoc(s.getOutputStream());
186
187         // read the IpAddress sent by GossipRouter
188
len=dis.readInt();
189         buffer=new byte[len];
190         dis.readFully(buffer, 0, len);
191         IpAddress localAddr=(IpAddress)Util.objectFromByteBuffer(buffer);
192         assertEquals(localAddr.getIpAddress(), s.getLocalAddress());
193         assertEquals(localAddr.getPort(), s.getLocalPort());
194
195         // send REGISTER request
196
dos.writeInt(GossipRouter.REGISTER);
197         dos.writeUTF(groupName);
198
199         // send the Address back to the router
200
buffer=Util.objectToByteBuffer(localAddr);
201         dos.writeInt(buffer.length);
202         dos.write(buffer, 0, buffer.length);
203         dos.flush();
204
205         // registration is complete
206

207         String JavaDoc payload="THIS IS A MESSAGE PAYLOAD " + random.nextLong();
208
209         // send a simple routing request to all members (null dest address)
210
msg=new Message(null, localAddr, payload);
211         buffer=Util.objectToByteBuffer(msg);
212         dos.writeUTF(groupName);
213         dos.write(0); // a 0 byte means a null address
214
dos.writeInt(buffer.length);
215         dos.write(buffer, 0, buffer.length);
216
217         // due to Bela's optimizations, the router won't loopback local messages, the RouterStub
218
// is expected to loop them back, so the following section is useless. The router will
219
// just discard the message.
220

221         // send a simple routing request to itself
222
msg=new Message(localAddr, localAddr, payload);
223         buffer=Util.objectToByteBuffer(msg);
224         dos.writeUTF(groupName);
225         destAddrBuffer=Util.objectToByteBuffer(localAddr);
226         dos.writeInt(destAddrBuffer.length);
227         dos.write(destAddrBuffer, 0, destAddrBuffer.length);
228         dos.writeInt(buffer.length);
229         dos.write(buffer, 0, buffer.length);
230
231         // due to Bela's optimizations, the router won't loopback local messages, the RouterStub
232
// is expected to loop them back, so the following section is useless. The router will
233
// just discard the message.
234

235         // send a simple routing request to an inexistent member, the message
236
// should be discarded by router
237
Address inexistentAddress=
238                 new IpAddress("localhost", Utilities.getFreePort());
239
240         msg=new Message(inexistentAddress, localAddr, payload);
241         buffer=Util.objectToByteBuffer(msg);
242         dos.writeUTF(groupName);
243         destAddrBuffer=Util.objectToByteBuffer(inexistentAddress);
244         dos.writeInt(destAddrBuffer.length);
245         dos.write(destAddrBuffer, 0, destAddrBuffer.length);
246         dos.writeInt(buffer.length);
247         dos.write(buffer, 0, buffer.length);
248
249         // the message should be discarded by router
250

251         // close the routing connection
252
dis.close();
253         dos.close();
254         s.close();
255     }
256
257
258     public void test_REGISTER_Route_To_All() throws Exception JavaDoc {
259
260         log.warn("running test_REGISTER_Route_To_All");
261
262         int len;
263         byte[] buffer;
264         String JavaDoc groupName="TESTGROUP";
265         Message msg, msgCopy;
266
267         // Register the first member
268

269         Socket JavaDoc sOne = new Socket JavaDoc("localhost", routerPort);
270         DataInputStream JavaDoc disOne = new DataInputStream JavaDoc(sOne.getInputStream());
271         DataOutputStream JavaDoc dosOne = new DataOutputStream JavaDoc(sOne.getOutputStream());
272
273         // read the IpAddress sent by GossipRouter
274
len=disOne.readInt();
275         buffer=new byte[len];
276         disOne.readFully(buffer, 0, len);
277         IpAddress localAddrOne=(IpAddress)Util.objectFromByteBuffer(buffer);
278         assertEquals(localAddrOne.getIpAddress(), sOne.getLocalAddress());
279         assertEquals(localAddrOne.getPort(), sOne.getLocalPort());
280
281         // send REGISTER request
282
dosOne.writeInt(GossipRouter.REGISTER);
283         dosOne.writeUTF(groupName);
284
285         // send the Address back to the router
286
buffer=Util.objectToByteBuffer(localAddrOne);
287         dosOne.writeInt(buffer.length);
288         dosOne.write(buffer, 0, buffer.length);
289         dosOne.flush();
290
291         // registration of the first member is complete
292

293         // Register the second member
294

295         Socket JavaDoc sTwo = new Socket JavaDoc("localhost", routerPort);
296         DataInputStream JavaDoc disTwo = new DataInputStream JavaDoc(sTwo.getInputStream());
297         DataOutputStream JavaDoc dosTwo = new DataOutputStream JavaDoc(sTwo.getOutputStream());
298
299         // read the IpAddress sent by GossipRouter
300
len=disTwo.readInt();
301         buffer=new byte[len];
302         disTwo.readFully(buffer, 0, len);
303         IpAddress localAddrTwo=(IpAddress)Util.objectFromByteBuffer(buffer);
304         assertEquals(localAddrTwo.getIpAddress(), sTwo.getLocalAddress());
305         assertEquals(localAddrTwo.getPort(), sTwo.getLocalPort());
306
307         // send REGISTER request
308
dosTwo.writeInt(GossipRouter.REGISTER);
309         dosTwo.writeUTF(groupName);
310
311         // send the Address back to the router
312
buffer=Util.objectToByteBuffer(localAddrTwo);
313         dosTwo.writeInt(buffer.length);
314         dosTwo.write(buffer, 0, buffer.length);
315         dosTwo.flush();
316
317         // registration of the second member is complete
318

319         // make sure both clients registered
320
Thread.sleep(1000);
321
322         String JavaDoc payload="THIS IS A MESSAGE PAYLOAD " + random.nextLong();
323
324         // the first member sends a simple routing request to all members (null dest address)
325
msg=new Message(null, localAddrOne, payload);
326         buffer=Util.objectToByteBuffer(msg);
327         dosOne.writeUTF(groupName);
328         dosOne.write(0); // a 0 byte means a null address
329
dosOne.writeInt(buffer.length);
330         dosOne.write(buffer, 0, buffer.length);
331
332         dosOne.flush();
333
334
335         // only the second member should receive the routing request, the router won't send a
336
// message to the originator
337

338         // the second member reads the message
339
len=disTwo.readInt();
340         buffer=new byte[len];
341         disTwo.readFully(buffer, 0, len);
342         msgCopy=(Message)Util.objectFromByteBuffer(buffer);
343         assertEquals(msg.getSrc(), msgCopy.getSrc());
344         assertNull(msgCopy.getDest());
345         assertEquals(msg.getObject(), msgCopy.getObject());
346
347
348         // close the routing connection
349
disOne.close();
350         dosOne.close();
351         sOne.close();
352         disTwo.close();
353         dosTwo.close();
354         sTwo.close();
355
356     }
357
358     public void test_REGISTER_Route_To_Other() throws Exception JavaDoc {
359
360         log.warn("running test_REGISTER_Route_To_Other");
361
362
363         int len;
364         byte[] buffer;
365         String JavaDoc groupName="TESTGROUP";
366         Message msg, msgCopy;
367
368         // Register the first member
369

370         Socket JavaDoc sOne = new Socket JavaDoc("localhost", routerPort);
371         DataInputStream JavaDoc disOne = new DataInputStream JavaDoc(sOne.getInputStream());
372         DataOutputStream JavaDoc dosOne = new DataOutputStream JavaDoc(sOne.getOutputStream());
373
374         // read the IpAddress sent by GossipRouter
375
len=disOne.readInt();
376         buffer=new byte[len];
377         disOne.readFully(buffer, 0, len);
378         IpAddress localAddrOne=(IpAddress)Util.objectFromByteBuffer(buffer);
379         assertEquals(localAddrOne.getIpAddress(), sOne.getLocalAddress());
380         assertEquals(localAddrOne.getPort(), sOne.getLocalPort());
381
382         // send REGISTER request
383
dosOne.writeInt(GossipRouter.REGISTER);
384         dosOne.writeUTF(groupName);
385
386         // send the Address back to the router
387
buffer=Util.objectToByteBuffer(localAddrOne);
388         dosOne.writeInt(buffer.length);
389         dosOne.write(buffer, 0, buffer.length);
390         dosOne.flush();
391
392         // registration of the first member is complete
393

394         // Register the second member
395

396         Socket JavaDoc sTwo = new Socket JavaDoc("localhost", routerPort);
397         DataInputStream JavaDoc disTwo = new DataInputStream JavaDoc(sTwo.getInputStream());
398         DataOutputStream JavaDoc dosTwo = new DataOutputStream JavaDoc(sTwo.getOutputStream());
399
400         // read the IpAddress sent by GossipRouter
401
len=disTwo.readInt();
402         buffer=new byte[len];
403         disTwo.readFully(buffer, 0, len);
404         IpAddress localAddrTwo=(IpAddress)Util.objectFromByteBuffer(buffer);
405         assertEquals(localAddrTwo.getIpAddress(), sTwo.getLocalAddress());
406         assertEquals(localAddrTwo.getPort(), sTwo.getLocalPort());
407
408         // send REGISTER request
409
dosTwo.writeInt(GossipRouter.REGISTER);
410         dosTwo.writeUTF(groupName);
411
412         // send the Address back to the router
413
buffer=Util.objectToByteBuffer(localAddrTwo);
414         dosTwo.writeInt(buffer.length);
415         dosTwo.write(buffer, 0, buffer.length);
416         dosTwo.flush();
417
418         // registration of the second member is complete
419

420         // make sure both clients registered
421
Thread.sleep(1000);
422
423         String JavaDoc payload="THIS IS A MESSAGE PAYLOAD " + random.nextLong();
424
425         // first member send a simple routing request to the second member
426
msg=new Message(localAddrTwo, localAddrOne, payload);
427         buffer=Util.objectToByteBuffer(msg);
428         dosOne.writeUTF(groupName);
429         dosOne.write(1);
430         dosOne.write(1); // regular IPAddress
431
localAddrTwo.writeTo(dosOne);
432         dosOne.writeInt(buffer.length);
433         dosOne.write(buffer, 0, buffer.length);
434
435         dosOne.flush();
436
437         // the second member reads the message
438
len=disTwo.readInt();
439         buffer=new byte[len];
440         disTwo.readFully(buffer, 0, len);
441         msgCopy=(Message)Util.objectFromByteBuffer(buffer);
442         assertEquals(msg.getSrc(), msgCopy.getSrc());
443         assertEquals(msg.getDest(), msgCopy.getDest());
444         assertEquals(msg.getObject(), msgCopy.getObject());
445
446         // close the routing connection
447
disOne.close();
448         dosOne.close();
449         sOne.close();
450         disTwo.close();
451         dosTwo.close();
452         sTwo.close();
453     }
454
455
456
457
458     /**
459      * Sends a GossipRouter.REGISTER request followed by a series of stress routing
460      * requests to all members of the group.
461      */

462     public void test_REGISTER_RouteStressAll() throws Exception JavaDoc {
463
464         log.warn("running test_REGISTER_RouteStressAll, this may take a while .... ");
465
466
467         int len;
468         byte[] buffer;
469         final String JavaDoc groupName="TESTGROUP";
470
471         // Register the first member
472

473         Socket JavaDoc sOne = new Socket JavaDoc("localhost", routerPort);
474         DataInputStream JavaDoc disOne = new DataInputStream JavaDoc(sOne.getInputStream());
475         final DataOutputStream JavaDoc dosOne = new DataOutputStream JavaDoc(sOne.getOutputStream());
476
477         // read the IpAddress sent by GossipRouter
478
len=disOne.readInt();
479         buffer=new byte[len];
480         disOne.readFully(buffer, 0, len);
481         final IpAddress localAddrOne=(IpAddress)Util.objectFromByteBuffer(buffer);
482         assertEquals(localAddrOne.getIpAddress(), sOne.getLocalAddress());
483         assertEquals(localAddrOne.getPort(), sOne.getLocalPort());
484
485         // send REGISTER request
486
dosOne.writeInt(GossipRouter.REGISTER);
487         dosOne.writeUTF(groupName);
488
489         // send the Address back to the router
490
buffer=Util.objectToByteBuffer(localAddrOne);
491         dosOne.writeInt(buffer.length);
492         dosOne.write(buffer, 0, buffer.length);
493         dosOne.flush();
494
495         // registration of the first member is complete
496

497         // Register the second member
498

499         Socket JavaDoc sTwo = new Socket JavaDoc("localhost", routerPort);
500         final DataInputStream JavaDoc disTwo = new DataInputStream JavaDoc(sTwo.getInputStream());
501         DataOutputStream JavaDoc dosTwo = new DataOutputStream JavaDoc(sTwo.getOutputStream());
502
503         // read the IpAddress sent by GossipRouter
504
len=disTwo.readInt();
505         buffer=new byte[len];
506         disTwo.readFully(buffer, 0, len);
507         IpAddress localAddrTwo=(IpAddress)Util.objectFromByteBuffer(buffer);
508         assertEquals(localAddrTwo.getIpAddress(), sTwo.getLocalAddress());
509         assertEquals(localAddrTwo.getPort(), sTwo.getLocalPort());
510
511         // send REGISTER request
512
dosTwo.writeInt(GossipRouter.REGISTER);
513         dosTwo.writeUTF(groupName);
514
515         // send the Address back to the router
516
buffer=Util.objectToByteBuffer(localAddrTwo);
517         dosTwo.writeInt(buffer.length);
518         dosTwo.write(buffer, 0, buffer.length);
519         dosTwo.flush();
520
521         // registration of the second member is complete
522

523         // make sure both clients registered
524
Thread.sleep(1000);
525
526         // send a series of stress routing requests to all members
527
final int count=100000; // total number of messages to be sent
528
int timeout=120; // nr of secs to wait for all messages to arrrive
529

530         final boolean[] received=new boolean[count];
531         for(int i=0; i < count; i++) {
532             received[i]=false;
533         }
534         final Promise waitingArea=new Promise();
535         long start=System.currentTimeMillis();
536
537         new Thread JavaDoc(new Runnable JavaDoc() {
538             public void run() {
539                 for(int i=0; i < count; i++) {
540                     Message msg=new Message(null, localAddrOne, new Integer JavaDoc(i));
541                     try {
542                         byte[] buffer=Util.objectToByteBuffer(msg);
543                         dosOne.writeUTF(groupName);
544                         dosOne.write(0);
545                         dosOne.writeInt(buffer.length);
546                         dosOne.write(buffer, 0, buffer.length);
547                         dosOne.flush();
548                         if(i % 10000 == 0)
549                             System.out.println("--sent " + i);
550                     }
551                     catch(Exception JavaDoc e) {
552                         // this fails the test
553
waitingArea.setResult(e);
554                     }
555                 }
556             }
557         }, "Sending Thread").start();
558
559
560         new Thread JavaDoc(new Runnable JavaDoc() {
561             public void run() {
562                 int cnt=0;
563                 while(cnt < count) {
564                     try {
565                         int len=disTwo.readInt();
566                         byte[] buffer=new byte[len];
567                         disTwo.readFully(buffer, 0, len);
568                         Message msg= (Message)Util.objectFromByteBuffer(buffer);
569                         int index=((Integer JavaDoc)msg.getObject()).intValue();
570                         received[index]=true;
571                         cnt++;
572                         if(cnt % 10000 == 0)
573                             System.out.println("-- received " + cnt);
574                     }
575                     catch(Exception JavaDoc e) {
576                         // this fails the test
577
waitingArea.setResult(e);
578                     }
579                 }
580                 waitingArea.setResult(Boolean.TRUE);
581             }
582         }, "Receiving Thread").start();
583
584
585         // wait here the stress threads to finish
586
Object JavaDoc result=waitingArea.getResult((long)timeout * 1000);
587         long stop=System.currentTimeMillis();
588
589         // close the routing connection
590
disOne.close();
591         dosOne.close();
592         sOne.close();
593         disTwo.close();
594         dosTwo.close();
595         sTwo.close();
596
597
598         int messok=0;
599         for(int i=0; i < count; i++) {
600             if(received[i]) {
601                 messok++;
602             }
603         }
604
605         if(result == null) {
606             fail("Timeout while waiting for all messages to be received. " +
607                     messok + " messages out of " + count + " received so far.");
608         }
609         if(result instanceof Exception JavaDoc) {
610             throw (Exception JavaDoc)result;
611         }
612
613         // make sure all messages have been received
614
for(int i=0; i < count; i++) {
615             if(!received[i]) {
616                 fail("At least message " + i + " NOT RECEIVED");
617             }
618         }
619         System.out.println("STRESS TEST OK, " + count + " messages, " +
620                 1000 * count / (stop - start) + " messages/sec");
621     }
622
623
624     public static Test suite() {
625         TestSuite s=new TestSuite(RouterTest.class);
626         return s;
627     }
628
629     public static void main(String JavaDoc[] args) {
630         junit.textui.TestRunner.run(suite());
631         System.exit(0);
632     }
633
634     static void log(String JavaDoc msg) {
635         
636     }
637
638 }
639
Popular Tags