1 package org.jgroups.tests; 2 3 import org.jgroups.*; 4 import org.jgroups.util.Util; 5 import org.jgroups.mux.MuxChannel; 6 7 import junit.framework.*; 8 9 import java.util.*; 10 11 16 public class MultiplexerConcurrentTest extends ChannelTestBase { 17 private Channel s1, s2, s11, s21; 18 JChannelFactory factory, factory2; 19 20 private static final long MIN_TIME=1000; private static final long MAX_TIME=5000; 22 23 24 public MultiplexerConcurrentTest(String name) { 25 super(name); 26 } 27 28 29 public void setUp() throws Exception { 30 super.setUp(); 31 factory=new JChannelFactory(); 32 factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG); 33 34 factory2=new JChannelFactory(); 35 factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG); 36 } 37 38 public void tearDown() throws Exception { 39 if(s1 != null) 40 s1.close(); 41 if(s2 != null) 42 s2.close(); 43 44 if(s21 != null) { 45 s21.close(); 46 s21=null; 47 } 48 if(s11 != null) { 49 s11.close(); 50 s11=null; 51 } 52 if(s1 != null) { 53 assertFalse(((MuxChannel)s1).getChannel().isOpen()); 54 assertFalse(((MuxChannel)s1).getChannel().isConnected()); 55 } 56 if(s2 != null) { 57 assertFalse(((MuxChannel)s2).getChannel().isOpen()); 58 assertFalse(((MuxChannel)s2).getChannel().isConnected()); 59 } 60 s1=s2=null; 61 super.tearDown(); 62 } 63 64 65 68 public void testTwoMessagesFromSameSenderToSameService() throws Exception { 69 final MyReceiver receiver=new MyReceiver(); 70 s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); 71 s1.connect("bla"); 72 s1.setReceiver(receiver); 73 s1.send(null, null, "slow"); 74 s1.send(null, null, "fast"); 75 synchronized(receiver) { 76 while(!receiver.done()) 77 receiver.wait(); 78 } 79 80 Map<Long ,Message> results=receiver.getMessages(); 82 System.out.println("results:\n" + printMessages(results)); 83 Iterator<Map.Entry<Long ,Message>> it=results.entrySet().iterator(); 84 long time; 85 Message msg; 86 Map.Entry<Long ,Message> entry; 87 entry=it.next(); 88 time=entry.getKey(); 89 msg=entry.getValue(); 90 String mode=(String )msg.getObject(); 91 assertEquals("the slow message needs to be delivered before the fast one", "slow", mode); 92 entry=it.next(); 93 long time2=entry.getKey(); 94 long diff=Math.abs(time2-time); 95 System.out.println("diff=" + diff); 96 assertTrue(diff >= MAX_TIME && diff < 6000); 97 } 98 99 100 104 public void testTwoMessagesFromSameSenderToDifferentServices() throws Exception { 105 final MyReceiver receiver=new MyReceiver(); 106 s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); 107 s1.connect("bla"); 108 s1.setReceiver(receiver); 109 110 s2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s2"); 111 s2.connect("bla"); 112 s2.setReceiver(receiver); 113 114 s1.send(null, null, "slow"); 115 Util.sleep(200); 116 s2.send(null, null, "fast"); 117 synchronized(receiver) { 118 while(!receiver.done()) 119 receiver.wait(); 120 } 121 122 Map<Long ,Message> results=receiver.getMessages(); 124 System.out.println("results:\n" + printMessages(results)); 125 Set<Long > times=results.keySet(); 126 127 128 Iterator<Long > it=times.iterator(); 129 long time, time2, diff; 130 time=it.next(); 131 time2=it.next(); 132 diff=Math.abs(time2-time); 133 System.out.println("diff=" + diff); 134 assertTrue("failing as we don't yet have concurrent delivery", diff < MIN_TIME); 135 } 136 137 138 143 public void testTwoMessagesFromDifferentSendersToSameService() throws Exception { 144 final MyReceiver receiver=new MyReceiver(); 145 s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); 146 s1.connect("bla"); 147 s1.setReceiver(receiver); 148 149 s2=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); s2.connect("bla"); 151 152 s1.send(null, null, "slow"); 153 Util.sleep(200); s2.send(null, null, "fast"); 155 synchronized(receiver) { 156 while(!receiver.done()) 157 receiver.wait(); 158 } 159 Map<Long ,Message> results=receiver.getMessages(); 161 System.out.println("results:\n" + printMessages(results)); 162 Set<Long > times=results.keySet(); 163 164 Iterator<Long > it=times.iterator(); 165 long time, time2, diff; 166 time=it.next(); 167 time2=it.next(); 168 diff=Math.abs(time2-time); 169 System.out.println("diff=" + diff); 170 assertTrue("failing as we don't yet have concurrent delivery", diff < MIN_TIME); 171 } 172 173 177 public void testTwoMessagesFromDifferentSendersToDifferentServices() throws Exception { 178 final MyReceiver receiver=new MyReceiver(); 179 s1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); 180 s1.connect("bla"); 181 s1.setReceiver(receiver); 182 s11=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s2"); 183 s11.connect("bla"); 184 s11.setReceiver(receiver); 185 186 187 s2=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s1"); 188 s2.connect("bla"); 189 190 s21=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "s2"); 191 s21.connect("bla"); 192 193 s1.send(null, null, "slow"); 194 Util.sleep(200); s21.send(null, null, "fast"); 196 synchronized(receiver) { 197 while(!receiver.done()) 198 receiver.wait(); 199 } 200 Map<Long ,Message> results=receiver.getMessages(); 202 System.out.println("results:\n" + printMessages(results)); 203 Set<Long > times=results.keySet(); 204 205 Iterator<Long > it=times.iterator(); 206 long time, time2, diff; 207 time=it.next(); 208 time2=it.next(); 209 diff=Math.abs(time2-time); 210 System.out.println("diff=" + diff); 211 assertTrue("failing as we don't yet have concurrent delivery", diff < MIN_TIME); 212 } 213 214 215 216 private static class MyReceiver extends ReceiverAdapter { 217 final Map<Long ,Message> msgs=new HashMap<Long ,Message>(); 218 219 220 public void receive(Message msg) { 221 String mode=(String )msg.getObject(); 222 System.out.println("received " + msg + " (" + mode + ")"); 223 msgs.put(System.currentTimeMillis(), msg); 224 if(mode.equalsIgnoreCase("slow")) { 225 System.out.println("sleeping for 5 secs"); 226 Util.sleep(5000); 227 } 228 synchronized(this) { 229 if(msgs.size() == 2) 230 this.notify(); 231 } 232 } 233 234 public boolean done() { 235 synchronized(msgs) { 236 return msgs.size() == 2; 237 } 238 } 239 240 public Map<Long ,Message> getMessages() { 241 return new TreeMap<Long ,Message>(msgs); 242 } 243 } 244 245 246 String printMessages(Map<Long ,Message> map) { 247 StringBuilder sb=new StringBuilder (); 248 for(Map.Entry<Long ,Message> entry: map.entrySet()) { 249 sb.append(new Date(entry.getKey())).append(": ").append(entry.getValue().getObject()).append("\n"); 250 } 251 return sb.toString(); 252 } 253 254 255 public static junit.framework.Test suite() { 256 return new TestSuite(MultiplexerConcurrentTest.class); 257 } 258 259 public static void main(String [] args) { 260 junit.textui.TestRunner.run(suite()); 261 } 262 263 264 } 265 | Popular Tags |