1 2 package org.jgroups.tests; 3 4 5 import junit.framework.Test; 6 import junit.framework.TestCase; 7 import junit.framework.TestSuite; 8 import org.jgroups.*; 9 import org.jgroups.util.Promise; 10 import org.jgroups.util.Util; 11 12 13 19 public class DiscardTest extends TestCase { 20 JChannel ch1, ch2; 21 22 static final String discard_props="discard.xml"; static final String fast_props="udp.xml"; static final long NUM_MSGS=10000; 25 static final int MSG_SIZE=1000; 26 private static final String GROUP="DiscardTestGroup"; 27 final Promise ch1_all_received=new Promise(); 28 final Promise ch2_all_received=new Promise(); 29 30 31 public DiscardTest(String name) { 32 super(name); 33 } 34 35 protected void setUp() throws Exception { 36 super.setUp(); 37 ch1_all_received.reset(); 38 ch2_all_received.reset(); 39 } 40 41 public void testDiscardProperties() throws Exception { 42 _testLosslessReception(discard_props); 43 } 44 45 public void testFastProperties() throws Exception { 46 _testLosslessReception(fast_props); 47 } 48 49 public void _testLosslessReception(String props) throws Exception { 50 Address ch1_addr, ch2_addr; 51 long start, stop; 52 53 System.setProperty("bind.address", "127.0.0.1"); 54 55 ch1=new JChannel(props); 56 ch1.setReceiver(new MyReceiver(ch1_all_received, NUM_MSGS, "ch1")); 57 ch2=new JChannel(props); 58 ch2.setReceiver(new MyReceiver(ch2_all_received, NUM_MSGS, "ch2")); 59 60 ch1.connect(GROUP); 61 ch1_addr=ch1.getLocalAddress(); 62 ch2.connect(GROUP); 63 ch2_addr=ch2.getLocalAddress(); 64 65 Util.sleep(2000); 66 View v=ch2.getView(); 67 System.out.println("**** ch2's view: " + v); 68 assertEquals(2, v.size()); 69 assertTrue(v.getMembers().contains(ch1_addr)); 70 assertTrue(v.getMembers().contains(ch2_addr)); 71 72 System.out.println("sending " + NUM_MSGS + " 1K messages to all members (including myself)"); 73 start=System.currentTimeMillis(); 74 for(int i=0; i < NUM_MSGS; i++) { 75 Message msg=createMessage(MSG_SIZE); 76 ch1.send(msg); 77 if(i % 1000 == 0) 78 System.out.println("-- sent " + i + " messages"); 79 } 80 81 System.out.println("-- waiting for ch1 and ch2 to receive " + NUM_MSGS + " messages"); 82 Long num_msgs; 83 num_msgs=(Long )ch1_all_received.getResult(); 84 System.out.println("-- received " + num_msgs + " messages on ch1"); 85 86 num_msgs=(Long )ch2_all_received.getResult(); 87 stop=System.currentTimeMillis(); 88 System.out.println("-- received " + num_msgs + " messages on ch2"); 89 90 long diff=stop-start; 91 double msgs_sec=NUM_MSGS / (diff / 1000.0); 92 System.out.println("== Sent and received " + NUM_MSGS + " in " + diff + "ms, " + 93 msgs_sec + " msgs/sec"); 94 95 ch2.close(); 96 ch1.close(); 97 } 98 99 100 static class MyReceiver extends ReceiverAdapter { 101 final Promise p; 102 final long num_msgs_expected; 103 long num_msgs=0; 104 String channel_name; 105 boolean operational=true; 106 107 public MyReceiver(final Promise p, final long num_msgs_expected, String channel_name) { 108 this.p=p; 109 this.num_msgs_expected=num_msgs_expected; 110 this.channel_name=channel_name; 111 } 112 113 public void receive(Message msg) { 114 if(!operational) 115 return; 116 num_msgs++; 117 118 if(num_msgs > 0 && num_msgs % 1000 == 0) 119 System.out.println("-- received " + num_msgs + " on " + channel_name); 120 121 if(num_msgs >= num_msgs_expected) { 122 System.out.println("SUCCESS: received all " + num_msgs_expected + " messages on " + channel_name); 123 operational=false; 124 p.setResult(new Long (num_msgs)); 125 } 126 } 127 128 public void viewAccepted(View new_view) { 129 System.out.println("-- view (" + channel_name + "): " + new_view); 130 } 131 } 132 133 134 private Message createMessage(int size) { 135 byte[] buf=new byte[size]; 136 for(int i=0; i < buf.length; i++) buf[i]=(byte)'x'; 137 return new Message(null, null, buf); 138 } 139 140 141 142 143 public static Test suite() { 144 return new TestSuite(DiscardTest.class); 145 } 146 147 public static void main(String [] args) { 148 junit.textui.TestRunner.run(suite()); 149 } 150 151 152 153 154 } 155 156 157 | Popular Tags |