1 package org.jgroups.tests; 2 3 import org.jgroups.*; 4 import org.jgroups.protocols.TP; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.stack.ProtocolStack; 7 import org.jgroups.util.Promise; 8 import org.jgroups.util.Util; 9 10 import java.util.*; 11 12 17 public class MessageBundlingTest extends ChannelTestBase { 18 private JChannel ch1, ch2; 19 private MyReceiver r2; 20 private final static long LATENCY=30L; 21 private final static long SLEEP=5000L; 22 private static final boolean BUNDLING=true; 23 private static final int MAX_BYTES=20000; 24 25 public void setUp() throws Exception { 26 super.setUp(); 27 ch1=createChannel(); 28 setBundling(ch1, BUNDLING, MAX_BYTES, LATENCY); 29 setLoopback(ch1, false); 30 ch1.setReceiver(new NullReceiver()); 31 ch1.connect("x"); 32 ch2=createChannel(); 33 setBundling(ch2, BUNDLING, MAX_BYTES, LATENCY); 34 setLoopback(ch1, false); 35 r2=new MyReceiver(); 36 ch2.setReceiver(r2); 37 ch2.connect("x"); 38 39 View view=ch2.getView(); 40 assertEquals(2, view.size()); 41 } 42 43 44 public void tearDown() throws Exception { 45 closeChannel(ch2); 46 closeChannel(ch1); 47 super.tearDown(); 48 } 49 50 51 protected boolean useBlocking() { 52 return false; 53 } 54 55 56 public void testLatencyWithoutMessageBundling() throws ChannelClosedException, ChannelNotConnectedException { 57 Message tmp=new Message(); 58 setBundling(ch1, false, 20000, 30); 59 r2.setNumExpectedMesssages(1); 60 Promise promise=new Promise(); 61 r2.setPromise(promise); 62 long time=System.currentTimeMillis(); 63 ch1.send(tmp); 64 System.out.println(">>> sent message at " + new Date()); 65 promise.getResult(SLEEP); 66 List<Long > list=r2.getTimes(); 67 assertEquals(1, list.size()); 68 Long time2=list.get(0); 69 long diff=time2 - time; 70 System.out.println("latency: " + diff + " ms"); 71 assertTrue("latency (" + diff + "ms) should be less than " + LATENCY + " ms", diff < LATENCY); 72 } 73 74 75 public void testLatencyWithMessageBundling() throws ChannelClosedException, ChannelNotConnectedException { 76 Message tmp=new Message(); 77 r2.setNumExpectedMesssages(1); 78 Promise promise=new Promise(); 79 r2.setPromise(promise); 80 long time=System.currentTimeMillis(); 81 ch1.send(tmp); 82 System.out.println(">>> sent message at " + new Date()); 83 promise.getResult(SLEEP); 84 List<Long > list=r2.getTimes(); 85 assertEquals(1, list.size()); 86 Long time2=list.get(0); 87 long diff=time2 - time; 88 System.out.println("latency: " + diff + " ms"); 89 assertTrue("latency (" + diff + "ms) should be more than the bundling timeout (" + LATENCY + 90 "ms), but less than 2 times the LATENCY (" + LATENCY *2 + ")", diff > LATENCY && diff < LATENCY * 2); 91 } 92 93 94 95 public void testLatencyWithMessageBundlingAndLoopback() throws ChannelClosedException, ChannelNotConnectedException { 96 Message tmp=new Message(); 97 setLoopback(ch1, true); 98 setLoopback(ch2, true); 99 r2.setNumExpectedMesssages(1); 100 Promise promise=new Promise(); 101 r2.setPromise(promise); 102 long time=System.currentTimeMillis(); 103 System.out.println(">>> sending message at " + new Date()); 104 ch1.send(tmp); 105 promise.getResult(SLEEP); 106 List<Long > list=r2.getTimes(); 107 assertEquals(1, list.size()); 108 Long time2=list.get(0); 109 long diff=time2 - time; 110 System.out.println("latency: " + diff + " ms"); 111 assertTrue("latency (" + diff + "ms) should be more than the bundling timeout (" + LATENCY + 112 "ms), but less than 2 times the LATENCY (" + LATENCY *2 + ")", diff > LATENCY && diff < LATENCY * 2); 113 } 114 115 116 public void testLatencyWithMessageBundlingAndMaxBytes() throws ChannelClosedException, ChannelNotConnectedException { 117 setLoopback(ch1, true); 118 setLoopback(ch2, true); 119 r2.setNumExpectedMesssages(10); 120 Promise promise=new Promise(); 121 r2.setPromise(promise); 122 Util.sleep(LATENCY *2); 123 System.out.println(">>> sending 10 messages at " + new Date()); 124 for(int i=0; i < 10; i++) 125 ch1.send(new Message(null, null, new byte[2000])); 126 127 promise.getResult(SLEEP); List<Long > list=r2.getTimes(); 129 assertEquals(10, list.size()); 130 131 for(Iterator<Long > it=list.iterator(); it.hasNext();) { 132 Long val=it.next(); 133 System.out.println(val); 134 } 135 } 136 137 138 public void testSimple() throws ChannelClosedException, ChannelNotConnectedException { 139 Message tmp=new Message(); 140 ch2.setReceiver(new SimpleReceiver()); 141 ch1.send(tmp); 142 System.out.println(">>> sent message at " + new Date()); 143 Util.sleep(10000); 144 } 145 146 private void setLoopback(JChannel ch, boolean b) { 147 ProtocolStack stack=ch.getProtocolStack(); 148 Vector<Protocol> prots=stack.getProtocols(); 149 TP transport=(TP)prots.lastElement(); 150 transport.setLoopback(b); 151 } 152 153 154 private void setBundling(JChannel ch, boolean enabled, int max_bytes, long timeout) { 155 ProtocolStack stack=ch.getProtocolStack(); 156 Vector<Protocol> prots=stack.getProtocols(); 157 TP transport=(TP)prots.lastElement(); 158 transport.setEnableBundling(enabled); 159 transport.setMaxBundleSize(max_bytes); 160 transport.setMaxBundleTimeout(timeout); 161 } 162 163 private void closeChannel(Channel c) { 164 if(c != null && (c.isOpen() || c.isConnected())) { 165 c.close(); 166 } 167 } 168 169 private static class NullReceiver extends ReceiverAdapter { 170 171 public void receive(Message msg) { 172 ; 173 } 174 } 175 176 177 private static class SimpleReceiver extends ReceiverAdapter { 178 long start=System.currentTimeMillis(); 179 180 public void receive(Message msg) { 181 System.out.println("<<< received message from " + msg.getSrc() + " at " + new Date() + 182 ", latency=" + (System.currentTimeMillis() - start) + " ms"); 183 } 184 } 185 186 private static class MyReceiver extends ReceiverAdapter { 187 private final List<Long > times=new LinkedList<Long >(); 188 private int num_expected_msgs; 189 private Promise promise; 190 191 public List<Long > getTimes() { 192 return times; 193 } 194 195 196 public void setNumExpectedMesssages(int num_expected_msgs) { 197 this.num_expected_msgs=num_expected_msgs; 198 } 199 200 201 public void setPromise(Promise promise) { 202 this.promise=promise; 203 } 204 205 public int size() { 206 return times.size(); 207 } 208 209 public void receive(Message msg) { 210 times.add(new Long (System.currentTimeMillis())); 211 System.out.println("<<< received message from " + msg.getSrc() + " at " + new Date()); 212 if(times.size() >= num_expected_msgs && promise != null) { 213 promise.setResult(times.size()); 214 } 215 } 216 } 217 218 219 public static void main(String [] args) { 220 String [] testCaseName={MessageBundlingTest.class.getName()}; 221 junit.textui.TestRunner.main(testCaseName); 222 } 223 } 224 | Popular Tags |