1 3 package org.jgroups.tests.stack; 4 5 import junit.framework.Test; 6 import junit.framework.TestCase; 7 import junit.framework.TestSuite; 8 import org.jgroups.Address; 9 import org.jgroups.Message; 10 import org.jgroups.stack.GossipRouter; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.util.List; 13 import org.jgroups.util.Promise; 14 import org.jgroups.util.Util; 15 import org.apache.commons.logging.Log; 16 import org.apache.commons.logging.LogFactory; 17 18 import java.io.DataInputStream ; 19 import java.io.DataOutputStream ; 20 import java.net.Socket ; 21 import java.util.Random ; 22 23 34 public class RouterTest extends TestCase { 35 36 private static final Log log = LogFactory.getLog(RouterTest.class); 37 38 private int routerPort=-1; 39 private Random random=new Random (); 40 41 public RouterTest(String name) { 42 super(name); 43 } 44 45 public void setUp() throws Exception { 46 super.setUp(); 47 routerPort=Utilities.startGossipRouter(); 48 } 49 50 public void tearDown() throws Exception { 51 super.tearDown(); 52 Utilities.stopGossipRouter(); 53 } 54 55 58 public void testEmptyGET() throws Exception { 59 int len; 60 byte[] buffer; 61 62 log.warn("running testEmptyGET"); 63 64 Socket s=new Socket ("localhost", routerPort); 65 DataInputStream dis=new DataInputStream (s.getInputStream()); 66 DataOutputStream dos=new DataOutputStream (s.getOutputStream()); 67 68 len=dis.readInt(); 70 buffer=new byte[len]; 71 dis.readFully(buffer, 0, len); 72 IpAddress localAddr=(IpAddress)Util.objectFromByteBuffer(buffer); 73 assertEquals(localAddr.getIpAddress(), s.getLocalAddress()); 74 assertEquals(localAddr.getPort(), s.getLocalPort()); 75 76 dos.writeInt(GossipRouter.GET); 78 dos.writeUTF("nosuchgroup"); 79 80 len=dis.readInt(); 82 assertEquals(0, len); 83 84 assertEquals(-1, dis.read()); 86 87 dis.close(); 88 dos.close(); 89 s.close(); 90 } 91 92 93 97 public void test_REGISTER_GET() throws Exception { 98 99 log.warn("running test_REGISTER_GET"); 100 101 102 int len; 103 byte[] buffer; 104 String groupName="TESTGROUP"; 105 106 Socket s=new Socket ("localhost", routerPort); 107 DataInputStream dis=new DataInputStream (s.getInputStream()); 108 DataOutputStream dos=new DataOutputStream (s.getOutputStream()); 109 110 len=dis.readInt(); 112 buffer=new byte[len]; 113 dis.readFully(buffer, 0, len); 114 IpAddress localAddr=(IpAddress)Util.objectFromByteBuffer(buffer); 115 assertEquals(localAddr.getIpAddress(), s.getLocalAddress()); 116 assertEquals(localAddr.getPort(), s.getLocalPort()); 117 118 dos.writeInt(GossipRouter.REGISTER); 120 dos.writeUTF(groupName); 121 122 buffer=Util.objectToByteBuffer(localAddr); 124 dos.writeInt(buffer.length); 125 dos.write(buffer, 0, buffer.length); 126 dos.flush(); 127 128 Socket s2=new Socket ("localhost", routerPort); 130 DataInputStream dis2=new DataInputStream (s2.getInputStream()); 131 DataOutputStream dos2=new DataOutputStream (s2.getOutputStream()); 132 133 len=dis2.readInt(); 135 buffer=new byte[len]; 136 dis2.readFully(buffer, 0, len); 137 IpAddress localAddr2=(IpAddress)Util.objectFromByteBuffer(buffer); 138 assertEquals(localAddr2.getIpAddress(), s2.getLocalAddress()); 139 assertEquals(localAddr2.getPort(), s2.getLocalPort()); 140 141 dos2.writeInt(GossipRouter.GET); 143 dos2.writeUTF(groupName); 144 145 len=dis2.readInt(); 147 buffer=new byte[len]; 148 dis2.readFully(buffer, 0, len); 149 150 List groupList=(List)Util.objectFromByteBuffer(buffer); 151 assertEquals(1, groupList.size()); 152 assertEquals(localAddr, groupList.removeFromHead()); 153 154 assertEquals(-1, dis2.read()); 156 157 dis2.close(); 159 dos2.close(); 160 s2.close(); 161 162 dis.close(); 164 dos.close(); 165 s.close(); 166 } 167 168 172 173 public void test_REGISTER_Route_To_Self() throws Exception { 174 175 log.warn("running test_REGISTER_Route_To_Self"); 176 177 178 int len; 179 byte[] buffer, destAddrBuffer; 180 String groupName="TESTGROUP"; 181 Message msg; 182 183 Socket s=new Socket ("localhost", routerPort); 184 DataInputStream dis=new DataInputStream (s.getInputStream()); 185 DataOutputStream dos=new DataOutputStream (s.getOutputStream()); 186 187 len=dis.readInt(); 189 buffer=new byte[len]; 190 dis.readFully(buffer, 0, len); 191 IpAddress localAddr=(IpAddress)Util.objectFromByteBuffer(buffer); 192 assertEquals(localAddr.getIpAddress(), s.getLocalAddress()); 193 assertEquals(localAddr.getPort(), s.getLocalPort()); 194 195 dos.writeInt(GossipRouter.REGISTER); 197 dos.writeUTF(groupName); 198 199 buffer=Util.objectToByteBuffer(localAddr); 201 dos.writeInt(buffer.length); 202 dos.write(buffer, 0, buffer.length); 203 dos.flush(); 204 205 207 String payload="THIS IS A MESSAGE PAYLOAD " + random.nextLong(); 208 209 msg=new Message(null, localAddr, payload); 211 buffer=Util.objectToByteBuffer(msg); 212 dos.writeUTF(groupName); 213 dos.write(0); dos.writeInt(buffer.length); 215 dos.write(buffer, 0, buffer.length); 216 217 221 msg=new Message(localAddr, localAddr, payload); 223 buffer=Util.objectToByteBuffer(msg); 224 dos.writeUTF(groupName); 225 destAddrBuffer=Util.objectToByteBuffer(localAddr); 226 dos.writeInt(destAddrBuffer.length); 227 dos.write(destAddrBuffer, 0, destAddrBuffer.length); 228 dos.writeInt(buffer.length); 229 dos.write(buffer, 0, buffer.length); 230 231 235 Address inexistentAddress= 238 new IpAddress("localhost", Utilities.getFreePort()); 239 240 msg=new Message(inexistentAddress, localAddr, payload); 241 buffer=Util.objectToByteBuffer(msg); 242 dos.writeUTF(groupName); 243 destAddrBuffer=Util.objectToByteBuffer(inexistentAddress); 244 dos.writeInt(destAddrBuffer.length); 245 dos.write(destAddrBuffer, 0, destAddrBuffer.length); 246 dos.writeInt(buffer.length); 247 dos.write(buffer, 0, buffer.length); 248 249 251 dis.close(); 253 dos.close(); 254 s.close(); 255 } 256 257 258 public void test_REGISTER_Route_To_All() throws Exception { 259 260 log.warn("running test_REGISTER_Route_To_All"); 261 262 int len; 263 byte[] buffer; 264 String groupName="TESTGROUP"; 265 Message msg, msgCopy; 266 267 269 Socket sOne = new Socket ("localhost", routerPort); 270 DataInputStream disOne = new DataInputStream (sOne.getInputStream()); 271 DataOutputStream dosOne = new DataOutputStream (sOne.getOutputStream()); 272 273 len=disOne.readInt(); 275 buffer=new byte[len]; 276 disOne.readFully(buffer, 0, len); 277 IpAddress localAddrOne=(IpAddress)Util.objectFromByteBuffer(buffer); 278 assertEquals(localAddrOne.getIpAddress(), sOne.getLocalAddress()); 279 assertEquals(localAddrOne.getPort(), sOne.getLocalPort()); 280 281 dosOne.writeInt(GossipRouter.REGISTER); 283 dosOne.writeUTF(groupName); 284 285 buffer=Util.objectToByteBuffer(localAddrOne); 287 dosOne.writeInt(buffer.length); 288 dosOne.write(buffer, 0, buffer.length); 289 dosOne.flush(); 290 291 293 295 Socket sTwo = new Socket ("localhost", routerPort); 296 DataInputStream disTwo = new DataInputStream (sTwo.getInputStream()); 297 DataOutputStream dosTwo = new DataOutputStream (sTwo.getOutputStream()); 298 299 len=disTwo.readInt(); 301 buffer=new byte[len]; 302 disTwo.readFully(buffer, 0, len); 303 IpAddress localAddrTwo=(IpAddress)Util.objectFromByteBuffer(buffer); 304 assertEquals(localAddrTwo.getIpAddress(), sTwo.getLocalAddress()); 305 assertEquals(localAddrTwo.getPort(), sTwo.getLocalPort()); 306 307 dosTwo.writeInt(GossipRouter.REGISTER); 309 dosTwo.writeUTF(groupName); 310 311 buffer=Util.objectToByteBuffer(localAddrTwo); 313 dosTwo.writeInt(buffer.length); 314 dosTwo.write(buffer, 0, buffer.length); 315 dosTwo.flush(); 316 317 319 Thread.sleep(1000); 321 322 String payload="THIS IS A MESSAGE PAYLOAD " + random.nextLong(); 323 324 msg=new Message(null, localAddrOne, payload); 326 buffer=Util.objectToByteBuffer(msg); 327 dosOne.writeUTF(groupName); 328 dosOne.write(0); dosOne.writeInt(buffer.length); 330 dosOne.write(buffer, 0, buffer.length); 331 332 dosOne.flush(); 333 334 335 338 len=disTwo.readInt(); 340 buffer=new byte[len]; 341 disTwo.readFully(buffer, 0, len); 342 msgCopy=(Message)Util.objectFromByteBuffer(buffer); 343 assertEquals(msg.getSrc(), msgCopy.getSrc()); 344 assertNull(msgCopy.getDest()); 345 assertEquals(msg.getObject(), msgCopy.getObject()); 346 347 348 disOne.close(); 350 dosOne.close(); 351 sOne.close(); 352 disTwo.close(); 353 dosTwo.close(); 354 sTwo.close(); 355 356 } 357 358 public void test_REGISTER_Route_To_Other() throws Exception { 359 360 log.warn("running test_REGISTER_Route_To_Other"); 361 362 363 int len; 364 byte[] buffer; 365 String groupName="TESTGROUP"; 366 Message msg, msgCopy; 367 368 370 Socket sOne = new Socket ("localhost", routerPort); 371 DataInputStream disOne = new DataInputStream (sOne.getInputStream()); 372 DataOutputStream dosOne = new DataOutputStream (sOne.getOutputStream()); 373 374 len=disOne.readInt(); 376 buffer=new byte[len]; 377 disOne.readFully(buffer, 0, len); 378 IpAddress localAddrOne=(IpAddress)Util.objectFromByteBuffer(buffer); 379 assertEquals(localAddrOne.getIpAddress(), sOne.getLocalAddress()); 380 assertEquals(localAddrOne.getPort(), sOne.getLocalPort()); 381 382 dosOne.writeInt(GossipRouter.REGISTER); 384 dosOne.writeUTF(groupName); 385 386 buffer=Util.objectToByteBuffer(localAddrOne); 388 dosOne.writeInt(buffer.length); 389 dosOne.write(buffer, 0, buffer.length); 390 dosOne.flush(); 391 392 394 396 Socket sTwo = new Socket ("localhost", routerPort); 397 DataInputStream disTwo = new DataInputStream (sTwo.getInputStream()); 398 DataOutputStream dosTwo = new DataOutputStream (sTwo.getOutputStream()); 399 400 len=disTwo.readInt(); 402 buffer=new byte[len]; 403 disTwo.readFully(buffer, 0, len); 404 IpAddress localAddrTwo=(IpAddress)Util.objectFromByteBuffer(buffer); 405 assertEquals(localAddrTwo.getIpAddress(), sTwo.getLocalAddress()); 406 assertEquals(localAddrTwo.getPort(), sTwo.getLocalPort()); 407 408 dosTwo.writeInt(GossipRouter.REGISTER); 410 dosTwo.writeUTF(groupName); 411 412 buffer=Util.objectToByteBuffer(localAddrTwo); 414 dosTwo.writeInt(buffer.length); 415 dosTwo.write(buffer, 0, buffer.length); 416 dosTwo.flush(); 417 418 420 Thread.sleep(1000); 422 423 String payload="THIS IS A MESSAGE PAYLOAD " + random.nextLong(); 424 425 msg=new Message(localAddrTwo, localAddrOne, payload); 427 buffer=Util.objectToByteBuffer(msg); 428 dosOne.writeUTF(groupName); 429 dosOne.write(1); 430 dosOne.write(1); localAddrTwo.writeTo(dosOne); 432 dosOne.writeInt(buffer.length); 433 dosOne.write(buffer, 0, buffer.length); 434 435 dosOne.flush(); 436 437 len=disTwo.readInt(); 439 buffer=new byte[len]; 440 disTwo.readFully(buffer, 0, len); 441 msgCopy=(Message)Util.objectFromByteBuffer(buffer); 442 assertEquals(msg.getSrc(), msgCopy.getSrc()); 443 assertEquals(msg.getDest(), msgCopy.getDest()); 444 assertEquals(msg.getObject(), msgCopy.getObject()); 445 446 disOne.close(); 448 dosOne.close(); 449 sOne.close(); 450 disTwo.close(); 451 dosTwo.close(); 452 sTwo.close(); 453 } 454 455 456 457 458 462 public void test_REGISTER_RouteStressAll() throws Exception { 463 464 log.warn("running test_REGISTER_RouteStressAll, this may take a while .... "); 465 466 467 int len; 468 byte[] buffer; 469 final String groupName="TESTGROUP"; 470 471 473 Socket sOne = new Socket ("localhost", routerPort); 474 DataInputStream disOne = new DataInputStream (sOne.getInputStream()); 475 final DataOutputStream dosOne = new DataOutputStream (sOne.getOutputStream()); 476 477 len=disOne.readInt(); 479 buffer=new byte[len]; 480 disOne.readFully(buffer, 0, len); 481 final IpAddress localAddrOne=(IpAddress)Util.objectFromByteBuffer(buffer); 482 assertEquals(localAddrOne.getIpAddress(), sOne.getLocalAddress()); 483 assertEquals(localAddrOne.getPort(), sOne.getLocalPort()); 484 485 dosOne.writeInt(GossipRouter.REGISTER); 487 dosOne.writeUTF(groupName); 488 489 buffer=Util.objectToByteBuffer(localAddrOne); 491 dosOne.writeInt(buffer.length); 492 dosOne.write(buffer, 0, buffer.length); 493 dosOne.flush(); 494 495 497 499 Socket sTwo = new Socket ("localhost", routerPort); 500 final DataInputStream disTwo = new DataInputStream (sTwo.getInputStream()); 501 DataOutputStream dosTwo = new DataOutputStream (sTwo.getOutputStream()); 502 503 len=disTwo.readInt(); 505 buffer=new byte[len]; 506 disTwo.readFully(buffer, 0, len); 507 IpAddress localAddrTwo=(IpAddress)Util.objectFromByteBuffer(buffer); 508 assertEquals(localAddrTwo.getIpAddress(), sTwo.getLocalAddress()); 509 assertEquals(localAddrTwo.getPort(), sTwo.getLocalPort()); 510 511 dosTwo.writeInt(GossipRouter.REGISTER); 513 dosTwo.writeUTF(groupName); 514 515 buffer=Util.objectToByteBuffer(localAddrTwo); 517 dosTwo.writeInt(buffer.length); 518 dosTwo.write(buffer, 0, buffer.length); 519 dosTwo.flush(); 520 521 523 Thread.sleep(1000); 525 526 final int count=100000; int timeout=120; 530 final boolean[] received=new boolean[count]; 531 for(int i=0; i < count; i++) { 532 received[i]=false; 533 } 534 final Promise waitingArea=new Promise(); 535 long start=System.currentTimeMillis(); 536 537 new Thread (new Runnable () { 538 public void run() { 539 for(int i=0; i < count; i++) { 540 Message msg=new Message(null, localAddrOne, new Integer (i)); 541 try { 542 byte[] buffer=Util.objectToByteBuffer(msg); 543 dosOne.writeUTF(groupName); 544 dosOne.write(0); 545 dosOne.writeInt(buffer.length); 546 dosOne.write(buffer, 0, buffer.length); 547 dosOne.flush(); 548 if(i % 10000 == 0) 549 System.out.println("--sent " + i); 550 } 551 catch(Exception e) { 552 waitingArea.setResult(e); 554 } 555 } 556 } 557 }, "Sending Thread").start(); 558 559 560 new Thread (new Runnable () { 561 public void run() { 562 int cnt=0; 563 while(cnt < count) { 564 try { 565 int len=disTwo.readInt(); 566 byte[] buffer=new byte[len]; 567 disTwo.readFully(buffer, 0, len); 568 Message msg= (Message)Util.objectFromByteBuffer(buffer); 569 int index=((Integer )msg.getObject()).intValue(); 570 received[index]=true; 571 cnt++; 572 if(cnt % 10000 == 0) 573 System.out.println("-- received " + cnt); 574 } 575 catch(Exception e) { 576 waitingArea.setResult(e); 578 } 579 } 580 waitingArea.setResult(Boolean.TRUE); 581 } 582 }, "Receiving Thread").start(); 583 584 585 Object result=waitingArea.getResult((long)timeout * 1000); 587 long stop=System.currentTimeMillis(); 588 589 disOne.close(); 591 dosOne.close(); 592 sOne.close(); 593 disTwo.close(); 594 dosTwo.close(); 595 sTwo.close(); 596 597 598 int messok=0; 599 for(int i=0; i < count; i++) { 600 if(received[i]) { 601 messok++; 602 } 603 } 604 605 if(result == null) { 606 fail("Timeout while waiting for all messages to be received. " + 607 messok + " messages out of " + count + " received so far."); 608 } 609 if(result instanceof Exception ) { 610 throw (Exception )result; 611 } 612 613 for(int i=0; i < count; i++) { 615 if(!received[i]) { 616 fail("At least message " + i + " NOT RECEIVED"); 617 } 618 } 619 System.out.println("STRESS TEST OK, " + count + " messages, " + 620 1000 * count / (stop - start) + " messages/sec"); 621 } 622 623 624 public static Test suite() { 625 TestSuite s=new TestSuite(RouterTest.class); 626 return s; 627 } 628 629 public static void main(String [] args) { 630 junit.textui.TestRunner.run(suite()); 631 System.exit(0); 632 } 633 634 static void log(String msg) { 635 636 } 637 638 } 639 | Popular Tags |