1 package org.jgroups.tests; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.OutputStream ; 6 import java.util.ArrayList ; 7 import java.util.Collection ; 8 import java.util.Collections ; 9 import java.util.HashMap ; 10 import java.util.Iterator ; 11 import java.util.LinkedList ; 12 import java.util.List ; 13 import java.util.Map ; 14 import java.util.Properties ; 15 import java.util.concurrent.Semaphore ; 16 import java.util.concurrent.TimeUnit ; 17 18 import junit.framework.Test; 19 import junit.framework.TestSuite; 20 21 import org.jgroups.Address; 22 import org.jgroups.BlockEvent; 23 import org.jgroups.Channel; 24 import org.jgroups.ChannelException; 25 import org.jgroups.Event; 26 import org.jgroups.ExtendedReceiverAdapter; 27 import org.jgroups.GetStateEvent; 28 import org.jgroups.JChannel; 29 import org.jgroups.JChannelFactory; 30 import org.jgroups.Message; 31 import org.jgroups.SetStateEvent; 32 import org.jgroups.UnblockEvent; 33 import org.jgroups.View; 34 import org.jgroups.mux.MuxChannel; 35 import org.jgroups.stack.Protocol; 36 import org.jgroups.util.Util; 37 38 45 public class FlushTest extends ChannelTestBase { 46 private JChannel c1, c2; 47 48 public FlushTest(){ 49 super(); 50 } 51 52 public FlushTest(String name){ 53 super(name); 54 } 55 56 public void setUp() throws Exception { 57 super.setUp(); 58 CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml"); 59 } 60 61 public void tearDown() throws Exception { 62 if(c2 != null){ 63 c2.close(); 64 assertFalse(c2.isOpen()); 65 assertFalse(c2.isConnected()); 66 c2 = null; 67 } 68 69 if(c1 != null){ 70 c1.close(); 71 assertFalse(c1.isOpen()); 72 assertFalse(c1.isConnected()); 73 c1 = null; 74 } 75 76 Util.sleep(500); 77 super.tearDown(); 78 } 79 80 public boolean useBlocking() { 81 return true; 82 } 83 84 public void testSingleChannel() throws Exception { 85 Semaphore s = new Semaphore (1); 86 FlushTestReceiver receivers[] = new FlushTestReceiver[] { new FlushTestReceiver("c1", 87 s, 88 0, 89 false) }; 90 receivers[0].start(); 91 s.release(1); 92 93 blockUntilViewsReceived(receivers, 60000); 95 96 Util.sleep(1000); 98 99 s.tryAcquire(1, 60, TimeUnit.SECONDS); 102 receivers[0].cleanup(); 103 Util.sleep(1000); 104 105 checkEventSequence(receivers[0], false); 106 107 } 108 109 112 public void testJoinFollowedByUnicast() throws ChannelException { 113 c1 = createChannel(); 114 c1.setReceiver(new SimpleReplier(c1, true)); 115 c1.connect("test"); 116 117 Address target = c1.getLocalAddress(); 118 Message unicast_msg = new Message(target); 119 120 c2 = createChannel(); 121 c2.setReceiver(new SimpleReplier(c2, false)); 122 c2.connect("test"); 123 124 c2.send(unicast_msg); 126 } 128 129 132 public void testStateTransferFollowedByUnicast() throws ChannelException { 133 c1 = createChannel(); 134 c1.setReceiver(new SimpleReplier(c1, true)); 135 c1.connect("test"); 136 137 Address target = c1.getLocalAddress(); 138 Message unicast_msg = new Message(target); 139 140 c2 = createChannel(); 141 c2.setReceiver(new SimpleReplier(c2, false)); 142 c2.connect("test"); 143 144 log.info("\n** Getting the state **"); 146 c2.getState(null, 10000); 147 c2.send(unicast_msg); 149 } 150 151 158 public void testBlockingNoStateTransfer() { 159 String [] names = null; 160 if(isMuxChannelUsed()){ 161 int muxFactoryCount = 2; 162 names = createMuxApplicationNames(1, muxFactoryCount); 163 _testChannels(names, muxFactoryCount, false, muxFactoryCount); 164 }else{ 165 names = createApplicationNames(4); 166 _testChannels(names, false, 4); 167 } 168 } 169 170 177 public void testBlockingSharedMuxFactory() { 178 String [] names = null; 179 int muxFactoryCount = 1; 180 if(isMuxChannelUsed()){ 181 names = createMuxApplicationNames(4, muxFactoryCount); 182 _testChannels(names, muxFactoryCount, false, new ChannelAssertable(1)); 183 } 184 } 185 186 194 public void testBlockingUnsharedMuxFactoryMultipleService() { 195 String [] names = null; 196 int muxFactoryCount = 2; 197 if(isMuxChannelUsed()){ 198 names = createMuxApplicationNames(2, muxFactoryCount); 199 _testChannels(names, muxFactoryCount, false, new ChannelAssertable(2)); 200 } 201 } 202 203 210 public void testBlockingWithStateTransfer() { 211 String [] names = null; 212 if(isMuxChannelUsed()){ 213 int muxFactoryCount = 2; 214 names = createMuxApplicationNames(1, muxFactoryCount); 215 _testChannels(names, muxFactoryCount, true, muxFactoryCount); 216 }else{ 217 names = createApplicationNames(4); 218 _testChannels(names, true, 4); 219 } 220 } 221 222 229 public void testBlockingWithStateTransferAndMultipleServiceMuxChannel() { 230 String [] names = null; 231 if(isMuxChannelUsed()){ 232 names = createMuxApplicationNames(2, 2); 233 _testChannels(names, 2, true, 2); 234 } 235 } 236 237 private void _testChannels( String names[], 238 int muxFactoryCount, 239 boolean useTransfer, 240 Assertable a) { 241 int count = names.length; 242 243 ArrayList <FlushTestReceiver> channels = new ArrayList <FlushTestReceiver>(count); 244 try{ 245 Semaphore semaphore = new Semaphore (count); 247 semaphore.acquire(count); 248 249 for(int i = 0;i < count;i++){ 252 FlushTestReceiver channel = null; 253 if(isMuxChannelUsed()){ 254 channel = new FlushTestReceiver(names[i], 255 muxFactory[i % muxFactoryCount], 256 semaphore, 257 useTransfer); 258 }else{ 259 channel = new FlushTestReceiver(names[i], semaphore, 0, useTransfer); 260 } 261 channels.add(channel); 262 263 channel.start(); 266 if(!useTransfer){ 267 semaphore.release(1); 268 } 269 Util.sleep(1000); 270 } 271 272 if(isMuxChannelUsed()){ 273 blockUntilViewsReceived(channels, muxFactoryCount, 10000); 274 }else{ 275 blockUntilViewsReceived(channels, 10000); 276 } 277 278 if(useTransfer){ 281 for(FlushTestReceiver app:channels){ 282 app.clear(); 283 } 284 semaphore.release(count); 285 } 286 287 Util.sleep(1000); 289 290 semaphore.tryAcquire(count, 60, TimeUnit.SECONDS); 293 294 a.verify(channels); 296 297 FlushTestReceiver randomRecv = channels.remove(RANDOM.nextInt(count)); 299 log.info("Closing random member " + randomRecv.getName() 300 + " at " 301 + randomRecv.getLocalAddress()); 302 ChannelCloseAssertable closeAssert = new ChannelCloseAssertable(randomRecv); 303 randomRecv.cleanup(); 304 305 Util.sleep(5000); 307 closeAssert.verify(channels); 308 309 311 for(FlushTestReceiver receiver:channels){ 312 if(useTransfer){ 313 checkEventStateTransferSequence(receiver); 314 }else{ 315 checkEventSequence(receiver, isMuxChannelUsed()); 316 } 317 } 318 }catch(Exception ex){ 319 log.warn("Exception encountered during test", ex); 320 fail("Exception encountered during test execution: " + ex); 321 }finally{ 322 for(FlushTestReceiver app:channels){ 323 app.cleanup(); 324 Util.sleep(500); 325 } 326 } 327 } 328 329 public void _testChannels(String names[], boolean useTransfer, int viewSize) { 330 _testChannels(names, getMuxFactoryCount(), useTransfer, new ChannelAssertable(viewSize)); 331 } 332 333 public void _testChannels(String names[], int muxFactoryCount, boolean useTransfer, int viewSize) { 334 _testChannels(names, muxFactoryCount, useTransfer, new ChannelAssertable(viewSize)); 335 } 336 337 private class ChannelCloseAssertable implements Assertable { 338 ChannelApplication app; 339 340 View viewBeforeClose; 341 342 Address appAddress; 343 344 String muxId; 345 346 public ChannelCloseAssertable(ChannelApplication app){ 347 this.app = app; 348 this.viewBeforeClose = app.getChannel().getView(); 349 appAddress = app.getChannel().getLocalAddress(); 350 if(app.isUsingMuxChannel()){ 351 MuxChannel mch = (MuxChannel) app.getChannel(); 352 muxId = mch.getId(); 353 } 354 } 355 356 public void verify(Object verifiable) { 357 Collection channels = (Collection ) verifiable; 358 Channel ch = app.getChannel(); 359 assertFalse("Channel open", ch.isOpen()); 360 assertFalse("Chnanel connected", ch.isConnected()); 361 362 if(viewBeforeClose.getMembers().size() > 1){ 365 for(Iterator iter = channels.iterator();iter.hasNext();){ 366 FlushTestReceiver receiver = (FlushTestReceiver) iter.next(); 367 Channel channel = receiver.getChannel(); 368 boolean pairServiceFound = (receiver.isUsingMuxChannel() && muxId.equals(((MuxChannel) channel).getId())); 369 if(pairServiceFound || !receiver.isUsingMuxChannel()){ 370 assertTrue("Removed from view, address " + appAddress 371 + " view is " 372 + channel.getView(), !channel .getView() 373 .getMembers() 374 .contains(appAddress)); 375 } 376 } 377 } 378 } 379 } 380 381 private class ChannelAssertable implements Assertable { 382 int expectedViewSize = 0; 383 384 public ChannelAssertable(int expectedViewSize){ 385 this.expectedViewSize = expectedViewSize; 386 } 387 388 public void verify(Object verifiable) { 389 Collection channels = (Collection ) verifiable; 390 for(Iterator iter = channels.iterator();iter.hasNext();){ 391 FlushTestReceiver receiver = (FlushTestReceiver) iter.next(); 392 Channel ch = receiver.getChannel(); 393 assertEquals("Correct view", ch.getView().getMembers().size(), expectedViewSize); 394 assertTrue("Channel open", ch.isOpen()); 395 assertTrue("Chnanel connected", ch.isConnected()); 396 assertNotNull("Valid address ", ch.getLocalAddress()); 397 assertTrue("Address included in view ", ch .getView() 398 .getMembers() 399 .contains(ch.getLocalAddress())); 400 assertNotNull("Valid cluster name ", ch.getClusterName()); 401 } 402 403 if(expectedViewSize > 1 && isMuxChannelUsed()){ 406 for(Iterator iter = channels.iterator();iter.hasNext();){ 407 FlushTestReceiver receiver = (FlushTestReceiver) iter.next(); 408 MuxChannel ch = (MuxChannel) receiver.getChannel(); 409 int servicePairs = 1; 410 for(Iterator it = channels.iterator();it.hasNext();){ 411 FlushTestReceiver receiver2 = (FlushTestReceiver) it.next(); 412 MuxChannel ch2 = (MuxChannel) receiver2.getChannel(); 413 if(ch.getId().equals(ch2.getId()) && !ch.getLocalAddress() 414 .equals(ch2.getLocalAddress())){ 415 assertEquals( "Correct view for service pair", 416 ch.getView(), 417 ch2.getView()); 418 assertTrue("Presence in view", ch .getView() 419 .getMembers() 420 .contains(ch.getLocalAddress())); 421 assertTrue("Presence in view", ch .getView() 422 .getMembers() 423 .contains(ch2.getLocalAddress())); 424 assertTrue("Presence in view", ch2 .getView() 425 .getMembers() 426 .contains(ch2.getLocalAddress())); 427 assertTrue("Presence in view", ch2 .getView() 428 .getMembers() 429 .contains(ch.getLocalAddress())); 430 servicePairs++; 431 } 432 } 433 assertEquals("Correct service count", expectedViewSize, servicePairs); 434 } 435 } 436 } 437 } 438 439 private void checkEventSequence(FlushTestReceiver receiver, boolean isMuxUsed) { 440 List events = receiver.getEvents(); 441 String eventString = "[" + receiver.getName() 442 + "|" 443 + receiver.getLocalAddress() 444 + ",events:" 445 + events; 446 log.info(eventString); 447 assertNotNull(events); 448 int size = events.size(); 449 for(int i = 0;i < size;i++){ 450 Object event = events.get(i); 451 if(event instanceof BlockEvent){ 452 if(i + 1 < size){ 453 Object ev = events.get(i + 1); 454 if(isMuxUsed){ 455 assertTrue( "After Block should be View or Unblock" + eventString, 456 ev instanceof View || ev instanceof UnblockEvent); 457 }else{ 458 assertTrue( "After Block should be View " + eventString, 459 events.get(i + 1) instanceof View); 460 } 461 } 462 if(i != 0){ 463 assertTrue( "Before Block should be Unblock " + eventString, 464 events.get(i - 1) instanceof UnblockEvent); 465 } 466 } 467 if(event instanceof View){ 468 if(i + 1 < size){ 469 assertTrue( "After View should be Unblock " + eventString, 470 events.get(i + 1) instanceof UnblockEvent); 471 } 472 assertTrue( "Before View should be Block " + eventString, 473 events.get(i - 1) instanceof BlockEvent); 474 } 475 if(event instanceof UnblockEvent){ 476 if(i + 1 < size){ 477 assertTrue( "After UnBlock should be Block " + eventString, 478 events.get(i + 1) instanceof BlockEvent); 479 } 480 481 Object ev = events.get(i - 1); 482 if(isMuxUsed){ 483 assertTrue( "Before UnBlock should be View or Block" + eventString, 484 ev instanceof View || ev instanceof BlockEvent); 485 }else{ 486 assertTrue( "Before UnBlock should be View " + eventString, 487 events.get(i - 1) instanceof View); 488 } 489 } 490 } 491 receiver.clear(); 492 } 493 494 private void checkEventStateTransferSequence(FlushTestReceiver receiver) { 495 List events = receiver.getEvents(); 496 String eventString = "[" + receiver.getName() + ",events:" + events; 497 log.info(eventString); 498 assertNotNull(events); 499 int size = events.size(); 500 for(int i = 0;i < size;i++){ 501 Object event = events.get(i); 502 if(event instanceof BlockEvent){ 503 if(i + 1 < size){ 504 Object o = events.get(i + 1); 505 assertTrue( "After Block should be state|unblock|view" + eventString, 506 o instanceof SetStateEvent || o instanceof GetStateEvent 507 || o instanceof UnblockEvent 508 || o instanceof View); 509 }else if(i != 0){ 510 Object o = events.get(i + 1); 511 assertTrue( "Before Block should be state or Unblock " + eventString, 512 o instanceof SetStateEvent || o instanceof GetStateEvent 513 || o instanceof UnblockEvent); 514 } 515 } 516 if(event instanceof SetStateEvent || event instanceof GetStateEvent){ 517 if(i + 1 < size){ 518 assertTrue( "After state should be Unblock " + eventString, 519 events.get(i + 1) instanceof UnblockEvent); 520 } 521 assertTrue( "Before state should be Block " + eventString, 522 events.get(i - 1) instanceof BlockEvent); 523 } 524 525 if(event instanceof UnblockEvent){ 526 if(i + 1 < size){ 527 assertTrue( "After UnBlock should be Block " + eventString, 528 events.get(i + 1) instanceof BlockEvent); 529 }else{ 530 Object o = events.get(size - 2); 531 assertTrue( "Before UnBlock should be block|state|view " + eventString, 532 o instanceof SetStateEvent || o instanceof GetStateEvent 533 || o instanceof BlockEvent 534 || o instanceof View); 535 } 536 } 537 538 } 539 receiver.clear(); 540 } 541 542 protected JChannel createChannel() throws ChannelException { 543 JChannel ret = new JChannel(CHANNEL_CONFIG); 544 ret.setOpt(Channel.BLOCK, Boolean.TRUE); 545 Protocol flush = ret.getProtocolStack().findProtocol("FLUSH"); 546 if(flush != null){ 547 Properties p = new Properties (); 548 p.setProperty("timeout", "0"); 549 flush.setProperties(p); 550 551 Map <Object , Object > map = new HashMap <Object , Object >(); 554 map.put("flush_timeout", new Long (0)); 555 flush.getUpProtocol().up(new Event(Event.CONFIG, map)); 556 flush.getDownProtocol().down(new Event(Event.CONFIG, map)); 557 } 558 return ret; 559 } 560 561 private interface Assertable { 562 public void verify(Object verifiable); 563 } 564 565 private class FlushTestReceiver extends PushChannelApplicationWithSemaphore { 566 List <Object > events; 567 568 boolean shouldFetchState; 569 570 int msgCount = 0; 571 572 protected FlushTestReceiver(String name, 573 Semaphore semaphore, 574 int msgCount, 575 boolean shouldFetchState) throws Exception { 576 super(name, semaphore); 577 this.shouldFetchState = shouldFetchState; 578 this.msgCount = msgCount; 579 events = Collections.synchronizedList(new LinkedList <Object >()); 580 channel.connect("test"); 581 } 582 583 protected FlushTestReceiver(String name, 584 JChannelFactory factory, 585 Semaphore semaphore, 586 boolean shouldFetchState) throws Exception { 587 super(name, factory, semaphore); 588 this.shouldFetchState = shouldFetchState; 589 events = Collections.synchronizedList(new LinkedList <Object >()); 590 channel.connect("test"); 591 } 592 593 public void clear() { 594 events.clear(); 595 } 596 597 public List <Object > getEvents() { 598 return new LinkedList <Object >(events); 599 } 600 601 public void block() { 602 events.add(new BlockEvent()); 603 } 604 605 public void unblock() { 606 events.add(new UnblockEvent()); 607 } 608 609 public void viewAccepted(View new_view) { 610 events.add(new_view); 611 } 612 613 public byte[] getState() { 614 events.add(new GetStateEvent(null, null)); 615 return new byte[] { 'b', 'e', 'l', 'a' }; 616 } 617 618 public void setState(byte[] state) { 619 events.add(new SetStateEvent(null, null)); 620 } 621 622 public void getState(OutputStream ostream) { 623 events.add(new GetStateEvent(null, null)); 624 byte[] payload = new byte[] { 'b', 'e', 'l', 'a' }; 625 try{ 626 ostream.write(payload); 627 }catch(IOException e){ 628 e.printStackTrace(); 629 }finally{ 630 Util.close(ostream); 631 } 632 } 633 634 public void setState(InputStream istream) { 635 events.add(new SetStateEvent(null, null)); 636 byte[] payload = new byte[4]; 637 try{ 638 istream.read(payload); 639 }catch(IOException e){ 640 e.printStackTrace(); 641 }finally{ 642 Util.close(istream); 643 } 644 } 645 646 protected void useChannel() throws Exception { 647 if(shouldFetchState){ 648 channel.getState(null, 25000); 649 } 650 if(msgCount > 0){ 651 for(int i = 0;i < msgCount;i++){ 652 channel.send(new Message()); 653 Util.sleep(100); 654 } 655 } 656 } 657 } 658 659 private class SimpleReplier extends ExtendedReceiverAdapter { 660 Channel channel; 661 662 boolean handle_requests = false; 663 664 public SimpleReplier(Channel channel,boolean handle_requests){ 665 this.channel = channel; 666 this.handle_requests = handle_requests; 667 } 668 669 public void receive(Message msg) { 670 Message reply = new Message(msg.getSrc()); 671 try{ 672 log.info("-- MySimpleReplier[" + channel.getLocalAddress() 673 + "]: received message from " 674 + msg.getSrc()); 675 if(handle_requests){ 676 log.info(", sending reply"); 677 channel.send(reply); 678 }else 679 System.out.println("\n"); 680 }catch(Exception e){ 681 e.printStackTrace(); 682 } 683 } 684 685 public void viewAccepted(View new_view) { 686 log.info("-- MySimpleReplier[" + channel.getLocalAddress() 687 + "]: viewAccepted(" 688 + new_view 689 + ")"); 690 } 691 692 public void block() { 693 log.info("-- MySimpleReplier[" + channel.getLocalAddress() + "]: block()"); 694 } 695 696 public void unblock() { 697 log.info("-- MySimpleReplier[" + channel.getLocalAddress() + "]: unblock()"); 698 } 699 } 700 701 public static Test suite() { 702 return new TestSuite(FlushTest.class); 703 } 704 705 public static void main(String [] args) { 706 junit.textui.TestRunner.run(FlushTest.suite()); 707 } 708 } 709 | Popular Tags |