1 package org.jgroups.tests; 2 3 import junit.framework.TestCase; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import org.jgroups.*; 7 import org.jgroups.blocks.RpcDispatcher; 8 import org.jgroups.mux.MuxChannel; 9 import org.jgroups.stack.GossipRouter; 10 import org.jgroups.util.Util; 11 12 import java.io.InputStream ; 13 import java.io.OutputStream ; 14 import java.util.*; 15 import java.util.concurrent.Semaphore ; 16 import java.util.concurrent.TimeUnit ; 17 18 19 26 public class ChannelTestBase extends TestCase 27 { 28 29 protected final static Random RANDOM = new Random(); 30 31 private static final int LETTER_A = 64; 32 33 protected final static String DEFAULT_MUX_FACTORY_COUNT = "4"; 34 35 protected static String CHANNEL_CONFIG = "udp.xml"; 36 37 protected static String MUX_CHANNEL_CONFIG = "stacks.xml"; 38 39 protected static String MUX_CHANNEL_CONFIG_STACK_NAME ="udp"; 40 41 protected int active_threads = 0; 42 43 protected JChannelFactory muxFactory[] = null; 44 45 protected String thread_dump = null; 46 47 protected int currentChannelGeneratedName = LETTER_A; 48 49 private static final int ROUTER_PORT = 12001; 50 51 private static final String BIND_ADDR = "127.0.0.1"; 52 53 GossipRouter router = null; 54 55 protected final Log log = LogFactory.getLog(this.getClass()); 56 57 58 public ChannelTestBase() 59 { 60 super(); 61 } 62 63 public ChannelTestBase(String name) 64 { 65 super(name); 66 } 67 68 protected void setUp() throws Exception 69 { 70 super.setUp(); 71 MUX_CHANNEL_CONFIG = System.getProperty("mux.conf", MUX_CHANNEL_CONFIG); 72 MUX_CHANNEL_CONFIG_STACK_NAME = System.getProperty("mux.conf.stack", MUX_CHANNEL_CONFIG_STACK_NAME); 73 CHANNEL_CONFIG = System.getProperty("channel.conf", CHANNEL_CONFIG); 74 75 currentChannelGeneratedName = LETTER_A; 76 77 if (isTunnelUsed()){ 78 router = new GossipRouter(ROUTER_PORT, BIND_ADDR); 79 router.start(); 80 } 81 82 if (isMuxChannelUsed()) 83 { 84 muxFactory = new JChannelFactory[getMuxFactoryCount()]; 85 86 for (int i = 0; i < muxFactory.length; i++) 87 { 88 muxFactory[i] = new JChannelFactory(); 89 muxFactory[i].setMultiplexerConfig(MUX_CHANNEL_CONFIG); 90 } 91 } 92 93 if (shouldCompareThreadCount()) 94 { 95 active_threads = Thread.activeCount(); 96 thread_dump = "active threads before (" + active_threads + "):\n" + Util.activeThreads(); 97 } 98 } 99 100 protected static boolean isTunnelUsed() { 101 102 return CHANNEL_CONFIG.contains("tunnel"); 104 } 105 106 protected void tearDown() throws Exception 107 { 108 super.tearDown(); 109 110 if (isMuxChannelUsed()) 111 { 112 for (int i = 0; i < muxFactory.length; i++) 113 { 114 muxFactory[i].destroy(); 115 } 116 } 117 118 if(router != null) { 119 router.stop(); 120 Util.sleep(100); 122 } 123 124 125 if (shouldCompareThreadCount()) 126 { 127 131 133 Util.sleep(20); 134 int current_active_threads = Thread.activeCount(); 135 136 String msg = ""; 137 if (active_threads != current_active_threads) 138 { 139 System.out.println(thread_dump); 140 System.out.println("active threads after (" + current_active_threads + "):\n" + Util.activeThreads()); 141 msg = "active threads:\n" + Util.dumpThreads(); 142 } 143 assertEquals(msg, active_threads, current_active_threads); 144 } 145 } 146 147 159 protected String [] createMuxApplicationNames(int muxApplicationstPerChannelCount) 160 { 161 return createMuxApplicationNames(muxApplicationstPerChannelCount,getMuxFactoryCount()); 162 } 163 164 177 protected String [] createMuxApplicationNames(int muxApplicationstPerChannelCount, int muxFactoryCount) 178 { 179 if(muxFactoryCount>getMuxFactoryCount()) 180 { 181 throw new IllegalArgumentException ("Parameter muxFactoryCount hs to be less than or equal to getMuxFactoryCount()"); 182 } 183 184 int startLetter = LETTER_A; 185 String names [] = null; 186 int totalMuxAppCount = muxFactoryCount * muxApplicationstPerChannelCount; 187 names = new String [totalMuxAppCount]; 188 189 boolean pickNextLetter = false; 190 for (int i = 0; i < totalMuxAppCount; i++) 191 { 192 pickNextLetter =(i % muxFactoryCount == 0); 193 if(pickNextLetter) 194 { 195 startLetter++; 196 } 197 names[i] = Character.toString((char)startLetter); 198 } 199 return names; 200 } 201 202 208 protected String getNextChannelName() 209 { 210 return Character.toString((char)++currentChannelGeneratedName); 211 } 212 213 protected String [] createApplicationNames(int applicationCount) 214 { 215 String names [] = new String [applicationCount]; 216 for(int i = 0;i<applicationCount;i++) 217 { 218 names [i] = getNextChannelName(); 219 } 220 return names; 221 } 222 223 protected JChannel createChannel(Object id) throws Exception 224 { 225 JChannel c = null; 226 if (isMuxChannelUsed()) 227 { 228 for (int i = 0; i < muxFactory.length; i++) 229 { 230 if (!muxFactory[i].hasMuxChannel(MUX_CHANNEL_CONFIG_STACK_NAME, id.toString())) 231 { 232 c = new DefaultMuxChannelTestFactory(muxFactory[i]).createChannel(id); 233 return c; 234 } 235 } 236 237 throw new Exception ("Cannot create mux channel with id " + id 238 + " since all currently used channels have already registered service with that id"); 239 } 240 else 241 { 242 c = new DefaultChannelTestFactory().createChannel(id); 243 } 244 return c; 245 } 246 247 protected JChannel createChannel() throws Exception 248 { 249 return createChannel("A"); 250 } 251 252 255 protected class DefaultChannelTestFactory implements ChannelTestFactory 256 { 257 public JChannel createChannel(Object id) throws Exception 258 { 259 return createChannel(CHANNEL_CONFIG, useBlocking()); 260 } 261 262 protected JChannel createChannel(String configFile, boolean useBlocking) throws Exception 263 { 264 HashMap channelOptions = new HashMap(); 265 channelOptions.put(new Integer (Channel.BLOCK), Boolean.valueOf(useBlocking)); 266 return createChannel(configFile, channelOptions); 267 } 268 269 protected JChannel createChannel(String configFile, Map channelOptions) throws Exception 270 { 271 JChannel ch = null; 272 log.info("Using configuration file " + configFile); 273 ch = new JChannel(configFile); 274 for (Iterator iter = channelOptions.keySet().iterator(); iter.hasNext();) 275 { 276 Integer key = (Integer ) iter.next(); 277 Object value = channelOptions.get(key); 278 ch.setOpt(key.intValue(), value); 279 } 280 return ch; 281 } 282 } 283 284 288 public class DefaultMuxChannelTestFactory implements ChannelTestFactory 289 { 290 JChannelFactory f = null; 291 292 public DefaultMuxChannelTestFactory(JChannelFactory f) 293 { 294 this.f = f; 295 } 296 297 public JChannel createChannel(Object id) throws Exception 298 { 299 JChannel c =(JChannel)f.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, id.toString()); 300 if(useBlocking()) 301 { 302 c.setOpt(Channel.BLOCK, Boolean.TRUE); 303 } 304 Address address = c.getLocalAddress(); 305 String append = "[" + id + "]" + " using " + MUX_CHANNEL_CONFIG + ",stack " + MUX_CHANNEL_CONFIG_STACK_NAME; 306 if (address == null) 307 { 308 log.info("Created unconnected mux channel " + append); 309 } 310 else 311 { 312 log.info("Created mux channel "+ address + append); 313 } 314 return c; 315 } 316 } 317 318 public class NextAvailableMuxChannelTestFactory implements ChannelTestFactory 319 { 320 public Channel createChannel(Object id) throws Exception 321 { 322 return ChannelTestBase.this.createChannel(id); 323 } 324 } 325 328 protected interface ChannelTestFactory 329 { 330 public Channel createChannel(Object id) throws Exception ; 331 } 332 333 336 protected abstract class ChannelApplication implements Runnable , MemberRetrievable 337 { 338 protected Channel channel; 339 340 protected Thread thread; 341 342 protected Throwable exception; 343 344 protected String name; 345 346 public ChannelApplication(String name) throws Exception 347 { 348 ChannelTestBase.this.createChannel(name); 349 } 350 351 public ChannelApplication(String name,JChannelFactory f) throws Exception 352 { 353 if(f==null) 354 { 355 createChannel(name, new DefaultChannelTestFactory()); 356 } 357 else 358 { 359 createChannel(name, new DefaultMuxChannelTestFactory(f)); 360 } 361 } 362 363 370 public ChannelApplication(String name, ChannelTestFactory factory) throws Exception 371 { 372 createChannel(name, factory); 373 } 374 375 private void createChannel(String name, ChannelTestFactory factory) throws Exception 376 { 377 this.name = name; 378 channel = factory.createChannel(name); 379 } 380 381 385 protected abstract void useChannel() throws Exception ; 386 387 public void run() 388 { 389 try 390 { 391 useChannel(); 392 } 393 catch (Exception e) 394 { 395 log.error(name + ": " + e.getLocalizedMessage(), e); 396 exception = e; } 398 } 399 400 public List getMembers() 401 { 402 List result = null; 403 View v = channel.getView(); 404 if (v != null) 405 { 406 result = v.getMembers(); 407 } 408 return result; 409 } 410 411 public boolean isUsingMuxChannel() 412 { 413 return channel instanceof MuxChannel; 414 } 415 416 public Address getLocalAddress() 417 { 418 return channel.getLocalAddress(); 419 } 420 421 public void start() 422 { 423 thread = new Thread (this, getName()); 424 thread.start(); 425 Address a = getLocalAddress(); 426 boolean connected =a != null; 427 if (connected) 428 { 429 log.info("Thread for channel " + a + "[" + getName() + "] started"); 430 } 431 else 432 { 433 log.info("Thread for channel [" + getName() + "] started"); 434 } 435 } 436 437 public void setChannel(Channel ch) 438 { 439 this.channel = ch; 440 } 441 442 public Channel getChannel() 443 { 444 return channel; 445 } 446 447 public String getName() 448 { 449 return name; 450 } 451 452 public void cleanup() 453 { 454 if (thread != null && thread.isAlive()) 455 { 456 thread.interrupt(); 457 } 458 Address a = getLocalAddress(); 459 boolean connected =a != null; 460 if (connected) 461 { 462 log.info("Closing channel " + a + "[" + getName() + "]"); 463 } 464 else 465 { 466 log.info("Closing channel [" + getName() + "]"); 467 } 468 channel.close(); 469 log.info("Closed channel " + a + "[" + getName() + "]"); 470 } 471 } 472 473 protected abstract class PushChannelApplication extends ChannelApplication implements ExtendedReceiver 474 { 475 RpcDispatcher dispatcher; 476 477 public PushChannelApplication(String name) throws Exception 478 { 479 this(name, new DefaultChannelTestFactory(), false); 480 } 481 482 public PushChannelApplication(String name, JChannelFactory f) throws Exception 483 { 484 this(name, new DefaultMuxChannelTestFactory(f), false); 485 } 486 487 public PushChannelApplication(String name, boolean useDispatcher) throws Exception 488 { 489 this(name, new DefaultChannelTestFactory(), useDispatcher); 490 } 491 492 public PushChannelApplication(String name, ChannelTestFactory factory, boolean useDispatcher) 493 throws Exception 494 { 495 super(name, factory); 496 if (useDispatcher) 497 { 498 dispatcher = new RpcDispatcher(channel, this, this, this); 499 } 500 else 501 { 502 channel.setReceiver(this); 503 } 504 } 505 506 public RpcDispatcher getDispatcher() 507 { 508 return dispatcher; 509 } 510 511 public boolean hasDispatcher() 512 { 513 return dispatcher != null; 514 } 515 516 public void block() 517 { 518 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] in blocking"); 519 } 520 521 public byte[] getState() 522 { 523 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] "); 524 return null; 525 } 526 527 public void getState(OutputStream ostream) 528 { 529 log.debug("Channel " + getLocalAddress() + "[" + getName() + "]"); 530 } 531 532 public byte[] getState(String state_id) 533 { 534 log.debug("Channel " + getLocalAddress() + "[" + getName() + " state id =" + state_id); 535 return null; 536 } 537 538 public void getState(String state_id, OutputStream ostream) 539 { 540 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] state id =" + state_id); 541 } 542 543 public void receive(Message msg) 544 { 545 } 546 547 public void setState(byte[] state) 548 { 549 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] "); 550 } 551 552 public void setState(InputStream istream) 553 { 554 log.debug("Channel " + getLocalAddress() + "[" + getName() + "]"); 555 } 556 557 public void setState(String state_id, byte[] state) 558 { 559 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] state id =" + state_id + ", state size is " 560 + state.length); 561 } 562 563 public void setState(String state_id, InputStream istream) 564 { 565 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] state id " + state_id); 566 } 567 568 public void suspect(Address suspected_mbr) 569 { 570 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] suspecting " + suspected_mbr); 571 } 572 573 public void unblock() 574 { 575 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] unblocking"); 576 } 577 578 public void viewAccepted(View new_view) 579 { 580 log.debug("Channel " + getLocalAddress() + "[" + getName() + "] accepted view " + new_view); 581 } 582 } 583 584 593 protected abstract class PushChannelApplicationWithSemaphore extends PushChannelApplication 594 { 595 protected Semaphore semaphore; 596 597 public PushChannelApplicationWithSemaphore(String name, ChannelTestFactory factory, Semaphore semaphore, 598 boolean useDispatcher) throws Exception 599 { 600 super(name, factory, useDispatcher); 601 this.semaphore = semaphore; 602 } 603 604 protected PushChannelApplicationWithSemaphore(String name, Semaphore semaphore) throws Exception 605 { 606 super(name); 607 this.semaphore = semaphore; 608 } 609 610 protected PushChannelApplicationWithSemaphore(String name, JChannelFactory f,Semaphore semaphore) throws Exception 611 { 612 this(name, new DefaultMuxChannelTestFactory(f), semaphore, false); 613 } 614 615 protected PushChannelApplicationWithSemaphore(String name, Semaphore semaphore, boolean useDispatcher) 616 throws Exception 617 { 618 this(name, new DefaultChannelTestFactory(), semaphore, useDispatcher); 619 } 620 621 public void run() 622 { 623 boolean acquired = false; 624 try 625 { 626 acquired = semaphore.tryAcquire(60000L, TimeUnit.MILLISECONDS); 627 if (!acquired) 628 { 629 throw new Exception (name + " cannot acquire semaphore"); 630 } 631 632 useChannel(); 633 } 634 catch (Exception e) 635 { 636 log.error(name + ": " + e.getLocalizedMessage(), e); 637 exception = e; 639 } 640 finally 641 { 642 if (acquired) 643 { 644 semaphore.release(); 645 } 646 } 647 } 648 } 649 650 protected interface MemberRetrievable 651 { 652 public List getMembers(); 653 654 public Address getLocalAddress(); 655 } 656 657 663 protected static boolean isMuxChannelUsed() 664 { 665 return Boolean.valueOf(System.getProperty("mux.on", "false")).booleanValue(); 666 } 667 668 674 protected static boolean shouldCompareThreadCount() 675 { 676 return Boolean.valueOf(System.getProperty("threadcount", "false")).booleanValue(); 677 } 678 679 685 protected int getMuxFactoryCount() 686 { 687 return Integer.parseInt(System.getProperty("mux.factorycount", DEFAULT_MUX_FACTORY_COUNT)); 688 } 689 690 696 protected boolean useBlocking() 697 { 698 return Boolean.valueOf(System.getProperty("useBlocking", "false")).booleanValue(); 699 } 700 701 705 public static boolean areViewsComplete(MemberRetrievable[] channels,int memberCount) 706 { 707 for (int i = 0; i < memberCount; i++) 708 { 709 if (!isViewComplete(channels[i], memberCount)) 710 { 711 return false; 712 } 713 } 714 715 return true; 716 } 717 718 727 public static void blockUntilViewsReceived(MemberRetrievable[] channels,long timeout) 728 { 729 blockUntilViewsReceived(channels,channels.length,timeout); 730 } 731 732 public static void blockUntilViewsReceived(Collection channels,long timeout) 733 { 734 blockUntilViewsReceived(channels,channels.size(),timeout); 735 } 736 737 746 public static void blockUntilViewsReceived(MemberRetrievable[] channels, int count, long timeout) 747 { 748 long failTime = System.currentTimeMillis() + timeout; 749 750 while (System.currentTimeMillis() < failTime) 751 { 752 Util.sleep(100); 753 if (areViewsComplete(channels,count)) 754 { 755 return; 756 } 757 } 758 759 throw new RuntimeException ("timed out before caches had complete views"); 760 } 761 762 public static void blockUntilViewsReceived(Collection channels, int count, long timeout) 763 { 764 long failTime = System.currentTimeMillis() + timeout; 765 766 767 while (System.currentTimeMillis() < failTime) 768 { 769 Util.sleep(100); 770 if (areViewsComplete((MemberRetrievable[])channels.toArray(new MemberRetrievable[channels.size()]),count)) 771 { 772 return; 773 } 774 } 775 776 throw new RuntimeException ("timed out before caches had complete views"); 777 } 778 779 public static boolean isViewComplete(MemberRetrievable channel, int memberCount) 780 { 781 782 List members = channel.getMembers(); 783 if (members == null || memberCount > members.size()) 784 { 785 return false; 786 } 787 else if (memberCount < members.size()) 788 { 789 StringBuilder sb=new StringBuilder ("Channel at address "); 791 sb.append(channel.getLocalAddress()); 792 sb.append(" had "); 793 sb.append(members.size()); 794 sb.append(" members; expecting "); 795 sb.append(memberCount); 796 sb.append(". Members were ("); 797 for (int j = 0; j < members.size(); j++) 798 { 799 if (j > 0) 800 { 801 sb.append(", "); 802 } 803 sb.append(members.get(j)); 804 } 805 sb.append(')'); 806 807 throw new IllegalStateException (sb.toString()); 808 } 809 810 return true; 811 } 812 813 public static void takeAllPermits(Semaphore semaphore, int count) 814 { 815 for (int i = 0; i < count; i++) 816 { 817 try 818 { 819 semaphore.acquire(); 820 } 821 catch (InterruptedException e) 822 { 823 e.printStackTrace(); 825 } 826 } 827 } 828 829 public static void acquireSemaphore(Semaphore semaphore, long timeout, int count) throws Exception 830 { 831 for (int i = 0; i < count; i++) 832 { 833 boolean acquired = false; 834 try 835 { 836 acquired = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); 837 } 838 catch (InterruptedException e) 839 { 840 e.printStackTrace(); 842 } 843 if (!acquired) 844 throw new Exception ("Failed to acquire semaphore"); 845 } 846 } 847 848 public static void sleepRandom(int maxTime) 849 { 850 Util.sleep(RANDOM.nextInt(maxTime)); 851 } 852 853 854 855 } 856 | Popular Tags |