KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > MessageBundlingTest


1 package org.jgroups.tests;
2
3 import org.jgroups.*;
4 import org.jgroups.protocols.TP;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.stack.ProtocolStack;
7 import org.jgroups.util.Promise;
8 import org.jgroups.util.Util;
9
10 import java.util.*;
11
12 /**
13  * Measure the latency between messages with message bundling enabled at the transport level
14  * @author Bela Ban
15  * @version $Id: MessageBundlingTest.java,v 1.5 2007/05/04 11:47:59 belaban Exp $
16  */

17 public class MessageBundlingTest extends ChannelTestBase {
18     private JChannel ch1, ch2;
19     private MyReceiver r2;
20     private final static long LATENCY=30L;
21     private final static long SLEEP=5000L;
22     private static final boolean BUNDLING=true;
23     private static final int MAX_BYTES=20000;
24
25     public void setUp() throws Exception JavaDoc {
26         super.setUp();
27         ch1=createChannel();
28         setBundling(ch1, BUNDLING, MAX_BYTES, LATENCY);
29         setLoopback(ch1, false);
30         ch1.setReceiver(new NullReceiver());
31         ch1.connect("x");
32         ch2=createChannel();
33         setBundling(ch2, BUNDLING, MAX_BYTES, LATENCY);
34         setLoopback(ch1, false);
35         r2=new MyReceiver();
36         ch2.setReceiver(r2);
37         ch2.connect("x");
38
39         View view=ch2.getView();
40         assertEquals(2, view.size());
41     }
42
43
44     public void tearDown() throws Exception JavaDoc {
45         closeChannel(ch2);
46         closeChannel(ch1);
47         super.tearDown();
48     }
49
50
51     protected boolean useBlocking() {
52        return false;
53     }
54
55
56     public void testLatencyWithoutMessageBundling() throws ChannelClosedException, ChannelNotConnectedException {
57         Message tmp=new Message();
58         setBundling(ch1, false, 20000, 30);
59         r2.setNumExpectedMesssages(1);
60         Promise promise=new Promise();
61         r2.setPromise(promise);
62         long time=System.currentTimeMillis();
63         ch1.send(tmp);
64         System.out.println(">>> sent message at " + new Date());
65         promise.getResult(SLEEP);
66         List<Long JavaDoc> list=r2.getTimes();
67         assertEquals(1, list.size());
68         Long JavaDoc time2=list.get(0);
69         long diff=time2 - time;
70         System.out.println("latency: " + diff + " ms");
71         assertTrue("latency (" + diff + "ms) should be less than " + LATENCY + " ms", diff < LATENCY);
72     }
73
74
75     public void testLatencyWithMessageBundling() throws ChannelClosedException, ChannelNotConnectedException {
76         Message tmp=new Message();
77         r2.setNumExpectedMesssages(1);
78         Promise promise=new Promise();
79         r2.setPromise(promise);
80         long time=System.currentTimeMillis();
81         ch1.send(tmp);
82         System.out.println(">>> sent message at " + new Date());
83         promise.getResult(SLEEP);
84         List<Long JavaDoc> list=r2.getTimes();
85         assertEquals(1, list.size());
86         Long JavaDoc time2=list.get(0);
87         long diff=time2 - time;
88         System.out.println("latency: " + diff + " ms");
89         assertTrue("latency (" + diff + "ms) should be more than the bundling timeout (" + LATENCY +
90                 "ms), but less than 2 times the LATENCY (" + LATENCY *2 + ")", diff > LATENCY && diff < LATENCY * 2);
91     }
92
93
94
95     public void testLatencyWithMessageBundlingAndLoopback() throws ChannelClosedException, ChannelNotConnectedException {
96         Message tmp=new Message();
97         setLoopback(ch1, true);
98         setLoopback(ch2, true);
99         r2.setNumExpectedMesssages(1);
100         Promise promise=new Promise();
101         r2.setPromise(promise);
102         long time=System.currentTimeMillis();
103         System.out.println(">>> sending message at " + new Date());
104         ch1.send(tmp);
105         promise.getResult(SLEEP);
106         List<Long JavaDoc> list=r2.getTimes();
107         assertEquals(1, list.size());
108         Long JavaDoc time2=list.get(0);
109         long diff=time2 - time;
110         System.out.println("latency: " + diff + " ms");
111         assertTrue("latency (" + diff + "ms) should be more than the bundling timeout (" + LATENCY +
112                 "ms), but less than 2 times the LATENCY (" + LATENCY *2 + ")", diff > LATENCY && diff < LATENCY * 2);
113     }
114
115
116     public void testLatencyWithMessageBundlingAndMaxBytes() throws ChannelClosedException, ChannelNotConnectedException {
117         setLoopback(ch1, true);
118         setLoopback(ch2, true);
119         r2.setNumExpectedMesssages(10);
120         Promise promise=new Promise();
121         r2.setPromise(promise);
122         Util.sleep(LATENCY *2);
123         System.out.println(">>> sending 10 messages at " + new Date());
124         for(int i=0; i < 10; i++)
125             ch1.send(new Message(null, null, new byte[2000]));
126
127         promise.getResult(SLEEP); // we should get the messages immediately because max_bundle_size has been exceeded by the 20 messages
128
List<Long JavaDoc> list=r2.getTimes();
129         assertEquals(10, list.size());
130
131         for(Iterator<Long JavaDoc> it=list.iterator(); it.hasNext();) {
132             Long JavaDoc val=it.next();
133             System.out.println(val);
134         }
135     }
136
137
138     public void testSimple() throws ChannelClosedException, ChannelNotConnectedException {
139         Message tmp=new Message();
140         ch2.setReceiver(new SimpleReceiver());
141         ch1.send(tmp);
142         System.out.println(">>> sent message at " + new Date());
143         Util.sleep(10000);
144     }
145
146     private void setLoopback(JChannel ch, boolean b) {
147         ProtocolStack stack=ch.getProtocolStack();
148         Vector<Protocol> prots=stack.getProtocols();
149         TP transport=(TP)prots.lastElement();
150         transport.setLoopback(b);
151     }
152
153
154     private void setBundling(JChannel ch, boolean enabled, int max_bytes, long timeout) {
155         ProtocolStack stack=ch.getProtocolStack();
156         Vector<Protocol> prots=stack.getProtocols();
157         TP transport=(TP)prots.lastElement();
158         transport.setEnableBundling(enabled);
159         transport.setMaxBundleSize(max_bytes);
160         transport.setMaxBundleTimeout(timeout);
161     }
162
163     private void closeChannel(Channel c) {
164         if(c != null && (c.isOpen() || c.isConnected())) {
165             c.close();
166         }
167     }
168
169     private static class NullReceiver extends ReceiverAdapter {
170
171         public void receive(Message msg) {
172             ;
173         }
174     }
175
176
177     private static class SimpleReceiver extends ReceiverAdapter {
178         long start=System.currentTimeMillis();
179
180         public void receive(Message msg) {
181             System.out.println("<<< received message from " + msg.getSrc() + " at " + new Date() +
182                     ", latency=" + (System.currentTimeMillis() - start) + " ms");
183         }
184     }
185
186     private static class MyReceiver extends ReceiverAdapter {
187         private final List<Long JavaDoc> times=new LinkedList<Long JavaDoc>();
188         private int num_expected_msgs;
189         private Promise promise;
190
191         public List<Long JavaDoc> getTimes() {
192             return times;
193         }
194
195
196         public void setNumExpectedMesssages(int num_expected_msgs) {
197             this.num_expected_msgs=num_expected_msgs;
198         }
199
200
201         public void setPromise(Promise promise) {
202             this.promise=promise;
203         }
204
205         public int size() {
206             return times.size();
207         }
208
209         public void receive(Message msg) {
210             times.add(new Long JavaDoc(System.currentTimeMillis()));
211             System.out.println("<<< received message from " + msg.getSrc() + " at " + new Date());
212             if(times.size() >= num_expected_msgs && promise != null) {
213                 promise.setResult(times.size());
214             }
215         }
216     }
217
218
219     public static void main(String JavaDoc[] args) {
220         String JavaDoc[] testCaseName={MessageBundlingTest.class.getName()};
221         junit.textui.TestRunner.main(testCaseName);
222     }
223 }
224
Popular Tags