KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > MultiplexerConcurrentTest


1 package org.jgroups.tests;
2
3 import org.jgroups.*;
4 import org.jgroups.util.Util;
5 import org.jgroups.mux.MuxChannel;
6
7 import junit.framework.*;
8
9 import java.util.*;
10
11 /**
12  * Test the multiplexer concurrency functionality. This is described in http://jira.jboss.com/jira/browse/JGRP-426
13  * @author Bela Ban
14  * @version $Id: MultiplexerConcurrentTest.java,v 1.3 2007/03/02 08:44:38 belaban Exp $
15  */

16 public class MultiplexerConcurrentTest extends ChannelTestBase {
17     private Channel s1, s2, s11, s21;
18     JChannelFactory factory, factory2;
19
20     private static final long MIN_TIME=1000; // 1 sec between msgs
21
private static final long MAX_TIME=5000;
22
23
24     public MultiplexerConcurrentTest(String JavaDoc name) {
25         super(name);
26     }
27
28
29     public void setUp() throws Exception JavaDoc {
30         super.setUp();
31         factory=new JChannelFactory();
32         factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
33
34         factory2=new JChannelFactory();
35         factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
36     }
37
38     public void tearDown() throws Exception JavaDoc {
39         if(s1 != null)
40             s1.close();
41         if(s2 != null)
42             s2.close();
43
44         if(s21 != null) {
45             s21.close();
46             s21=null;
47         }
48         if(s11 != null) {
49             s11.close();
50             s11=null;
51         }
52         if(s1 != null) {
53             assertFalse(((MuxChannel)s1).getChannel().isOpen());
54             assertFalse(((MuxChannel)s1).getChannel().isConnected());
55         }
56         if(s2 != null) {
57             assertFalse(((MuxChannel)s2).getChannel().isOpen());
58             assertFalse(((MuxChannel)s2).getChannel().isConnected());
59         }
60         s1=s2=null;
61         super.tearDown();
62     }
63
64
65     /** Use case #1 in http://jira.jboss.com/jira/browse/JGRP-426:<br/>
66      * Sender A sends M1 to S1 and M2 to S1. M2 should wait until M1 is done
67      */

68     public void testTwoMessagesFromSameSenderToSameService() throws Exception JavaDoc {
69         final MyReceiver receiver=new MyReceiver();
70         s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1");
71         s1.connect("bla");
72         s1.setReceiver(receiver);
73         s1.send(null, null, "slow");
74         s1.send(null, null, "fast");
75         synchronized(receiver) {
76             while(!receiver.done())
77                 receiver.wait();
78         }
79
80         // verify time diffs
81
Map<Long JavaDoc,Message> results=receiver.getMessages();
82         System.out.println("results:\n" + printMessages(results));
83         Iterator<Map.Entry<Long JavaDoc,Message>> it=results.entrySet().iterator();
84         long time;
85         Message msg;
86         Map.Entry<Long JavaDoc,Message> entry;
87         entry=it.next();
88         time=entry.getKey();
89         msg=entry.getValue();
90         String JavaDoc mode=(String JavaDoc)msg.getObject();
91         assertEquals("the slow message needs to be delivered before the fast one", "slow", mode);
92         entry=it.next();
93         long time2=entry.getKey();
94         long diff=Math.abs(time2-time);
95         System.out.println("diff=" + diff);
96         assertTrue(diff >= MAX_TIME && diff < 6000);
97     }
98
99
100     /** Use case #2 in http://jira.jboss.com/jira/browse/JGRP-426:<br/>
101       * Sender A sends M1 to S1 and M2 to S2. M2 should get processed immediately and not
102      * have to wait for M1 to complete
103       */

104      public void testTwoMessagesFromSameSenderToDifferentServices() throws Exception JavaDoc {
105         final MyReceiver receiver=new MyReceiver();
106         s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1");
107         s1.connect("bla");
108         s1.setReceiver(receiver);
109
110         s2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s2");
111         s2.connect("bla");
112         s2.setReceiver(receiver);
113
114         s1.send(null, null, "slow");
115         Util.sleep(200);
116         s2.send(null, null, "fast");
117         synchronized(receiver) {
118             while(!receiver.done())
119                 receiver.wait();
120         }
121
122         // verify time diffs
123
Map<Long JavaDoc,Message> results=receiver.getMessages();
124         System.out.println("results:\n" + printMessages(results));
125         Set<Long JavaDoc> times=results.keySet();
126
127
128         Iterator<Long JavaDoc> it=times.iterator();
129         long time, time2, diff;
130         time=it.next();
131         time2=it.next();
132         diff=Math.abs(time2-time);
133         System.out.println("diff=" + diff);
134         assertTrue("failing as we don't yet have concurrent delivery", diff < MIN_TIME);
135     }
136
137
138     /**
139      * Use case #3 in http://jira.jboss.com/jira/browse/JGRP-426:<br/>
140      * Sender A sends M1 to S1 and sender B sends M2 to S1. M2 should get processed concurrently to M1
141      * and should not have to wait for M1's completion
142      */

143     public void testTwoMessagesFromDifferentSendersToSameService() throws Exception JavaDoc {
144         final MyReceiver receiver=new MyReceiver();
145         s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1");
146         s1.connect("bla");
147         s1.setReceiver(receiver);
148
149         s2=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); // same service
150
s2.connect("bla");
151
152         s1.send(null, null, "slow");
153         Util.sleep(200); // the slower message needs to be received first
154
s2.send(null, null, "fast");
155         synchronized(receiver) {
156             while(!receiver.done())
157                 receiver.wait();
158         }
159          // verify time diffs
160
Map<Long JavaDoc,Message> results=receiver.getMessages();
161         System.out.println("results:\n" + printMessages(results));
162         Set<Long JavaDoc> times=results.keySet();
163
164         Iterator<Long JavaDoc> it=times.iterator();
165         long time, time2, diff;
166         time=it.next();
167         time2=it.next();
168         diff=Math.abs(time2-time);
169         System.out.println("diff=" + diff);
170         assertTrue("failing as we don't yet have concurrent delivery", diff < MIN_TIME);
171     }
172
173     /**
174      * Use case #4 in http://jira.jboss.com/jira/browse/JGRP-426:<br/>
175      * Sender A sends M1 to S1 and sender B sends M2 to S2. M1 and M2 should get processed concurrently
176      */

177     public void testTwoMessagesFromDifferentSendersToDifferentServices() throws Exception JavaDoc {
178         final MyReceiver receiver=new MyReceiver();
179         s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1");
180         s1.connect("bla");
181         s1.setReceiver(receiver);
182         s11=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s2");
183         s11.connect("bla");
184         s11.setReceiver(receiver);
185
186
187         s2=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1");
188         s2.connect("bla");
189
190         s21=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s2");
191         s21.connect("bla");
192
193         s1.send(null, null, "slow");
194         Util.sleep(200); // the slower message needs to be received first
195
s21.send(null, null, "fast");
196         synchronized(receiver) {
197             while(!receiver.done())
198                 receiver.wait();
199         }
200          // verify time diffs
201
Map<Long JavaDoc,Message> results=receiver.getMessages();
202         System.out.println("results:\n" + printMessages(results));
203         Set<Long JavaDoc> times=results.keySet();
204
205         Iterator<Long JavaDoc> it=times.iterator();
206         long time, time2, diff;
207         time=it.next();
208         time2=it.next();
209         diff=Math.abs(time2-time);
210         System.out.println("diff=" + diff);
211         assertTrue("failing as we don't yet have concurrent delivery", diff < MIN_TIME);
212     }
213
214     
215
216     private static class MyReceiver extends ReceiverAdapter {
217         final Map<Long JavaDoc,Message> msgs=new HashMap<Long JavaDoc,Message>();
218
219
220         public void receive(Message msg) {
221             String JavaDoc mode=(String JavaDoc)msg.getObject();
222             System.out.println("received " + msg + " (" + mode + ")");
223             msgs.put(System.currentTimeMillis(), msg);
224             if(mode.equalsIgnoreCase("slow")) {
225                 System.out.println("sleeping for 5 secs");
226                 Util.sleep(5000);
227             }
228             synchronized(this) {
229                 if(msgs.size() == 2)
230                     this.notify();
231             }
232         }
233
234         public boolean done() {
235             synchronized(msgs) {
236                 return msgs.size() == 2;
237             }
238         }
239
240         public Map<Long JavaDoc,Message> getMessages() {
241             return new TreeMap<Long JavaDoc,Message>(msgs);
242         }
243     }
244
245
246     String JavaDoc printMessages(Map<Long JavaDoc,Message> map) {
247         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
248         for(Map.Entry<Long JavaDoc,Message> entry: map.entrySet()) {
249             sb.append(new Date(entry.getKey())).append(": ").append(entry.getValue().getObject()).append("\n");
250         }
251         return sb.toString();
252     }
253
254
255     public static junit.framework.Test suite() {
256         return new TestSuite(MultiplexerConcurrentTest.class);
257     }
258
259     public static void main(String JavaDoc[] args) {
260         junit.textui.TestRunner.run(suite());
261     }
262
263
264 }
265
Popular Tags