1 package org.jgroups.tests; 2 3 import junit.framework.TestCase; 4 import org.jgroups.*; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.util.Util; 7 8 import java.util.*; 9 import java.util.concurrent.*; 10 import java.util.concurrent.atomic.AtomicInteger ; 11 12 17 public class ConcurrentStackTest extends TestCase { 18 String props="udp.xml"; 19 JChannel ch1, ch2, ch3; 20 final static int NUM=10, EXPECTED=NUM * 3; 21 final static long SLEEPTIME=100; 22 CyclicBarrier barrier; 23 24 public ConcurrentStackTest(String name) { 25 super(name); 26 } 27 28 public void setUp() throws Exception { 29 super.setUp(); 30 barrier=new CyclicBarrier(4); 31 ch1=new JChannel(props); 32 ch2=new JChannel(props); 33 ch3=new JChannel(props); 34 } 35 36 protected void tearDown() throws Exception { 37 if(ch3 != null) ch3.close(); 38 if(ch2 != null) ch2.close(); 39 if(ch1 != null) ch1.close(); 40 barrier.reset(); 41 super.tearDown(); 42 } 43 44 45 46 public void testSequentialDelivery() throws Exception { 47 doIt(false); 48 } 49 50 public void testConcurrentDelivery() throws Exception { 51 doIt(true); 52 } 53 54 55 private void doIt(boolean threadless) throws Exception { 56 long start, stop, diff; 57 setThreadless(ch1, threadless); 58 setThreadless(ch2, threadless); 59 setThreadless(ch3, threadless); 60 61 MyReceiver r1=new MyReceiver("R1"), r2=new MyReceiver("R2"), r3=new MyReceiver("R3"); 62 ch1.setReceiver(r1); ch2.setReceiver(r2); ch3.setReceiver(r3); 63 64 ch1.connect("test"); 65 ch2.connect("test"); 66 ch3.connect("test"); 67 View v=ch3.getView(); 68 assertEquals(3, v.size()); 69 70 new Thread (new Sender(ch1)) {}.start(); 71 new Thread (new Sender(ch2)) {}.start(); 72 new Thread (new Sender(ch3)) {}.start(); 73 barrier.await(); start=System.currentTimeMillis(); 75 76 Exception ex=null; 77 78 try { 79 barrier.await((long)(EXPECTED * SLEEPTIME * 1.3), TimeUnit.MILLISECONDS); } 81 catch(java.util.concurrent.TimeoutException e) { 82 ex=e; 83 } 84 85 86 stop=System.currentTimeMillis(); 87 diff=stop - start; 88 89 System.out.println("Total time: " + diff + " ms\n"); 90 91 checkFIFO(r1); 92 checkFIFO(r2); 93 checkFIFO(r3); 94 checkTime(diff, threadless); 95 96 if(ex != null) 97 throw ex; 98 } 99 100 private void checkFIFO(MyReceiver r) { 101 List<Pair<Address,Integer >> msgs=r.getMessages(); 102 Map<Address,List<Integer >> map=new HashMap(); 103 for(Pair<Address,Integer > p: msgs) { 104 Address sender=p.key; 105 List<Integer > list=map.get(sender); 106 if(list == null) { 107 list=new LinkedList(); 108 map.put(sender, list); 109 } 110 list.add(p.val); 111 } 112 113 boolean fifo=true; 114 List<Address> incorrect_receivers=new LinkedList(); 115 System.out.println("Checking FIFO for " + r.getName() + ":"); 116 for(Address addr: map.keySet()) { 117 List<Integer > list=map.get(addr); 118 print(addr, list); 119 if(!verifyFIFO(list)) { 120 fifo=false; 121 incorrect_receivers.add(addr); 122 } 123 } 124 System.out.print("\n"); 125 126 if(!fifo) 127 fail("The following receivers didn't receive all messages in FIFO order: " + incorrect_receivers); 128 } 129 130 131 private boolean verifyFIFO(List<Integer > list) { 132 List<Integer > list2=new LinkedList(list); 133 Collections.sort(list2); 134 return list.equals(list2); 135 } 136 137 private void print(Address addr, List<Integer > list) { 138 StringBuilder sb=new StringBuilder (); 139 sb.append(addr).append(": "); 140 for(Integer i: list) 141 sb.append(i).append(" "); 142 System.out.println(sb); 143 } 144 145 146 private void checkTime(long time, boolean threadless) { 147 long min_time, max_time; 148 149 if(threadless) { 150 min_time=NUM * SLEEPTIME; 151 } 152 else { 153 min_time=EXPECTED * SLEEPTIME; 154 } 155 max_time=(long)(min_time * 1.3); 157 assertTrue("time (" + time + "ms) is out of bounds (min=" + min_time + "ms, max=" + max_time + "ms)", 158 min_time <= time && time <= max_time); 159 } 160 161 162 private void setThreadless(JChannel ch1, boolean threadless) { 163 Protocol tp=ch1.getProtocolStack().findProtocol("UDP"); 164 if(tp == null) 165 throw new IllegalStateException ("Protocol UDP not found in properties"); 166 Properties p=new Properties(); 167 p.setProperty("use_concurrent_stack", String.valueOf(threadless)); 168 p.setProperty("thread_pool.min_threads", "1"); 169 p.setProperty("thread_pool.max_threads", "100"); 170 p.setProperty("thread_pool.queue_enabled", "false"); 171 tp.setProperties(p); 173 } 174 175 176 private class Sender implements Runnable { 177 Channel ch; 178 Address local_addr; 179 180 public Sender(Channel ch) { 181 this.ch=ch; 182 local_addr=ch.getLocalAddress(); 183 } 184 185 public void run() { 186 Message msg; 187 try { 188 barrier.await(); 189 } 190 catch(Throwable t) { 191 return; 192 } 193 194 for(int i=1; i <= NUM; i++) { 195 msg=new Message(null, null, new Integer (i)); 196 try { 197 ch.send(msg); 199 } 200 catch(Exception e) { 201 e.printStackTrace(); 202 } 203 } 204 } 205 } 206 207 208 private class Pair<K,V> { 209 K key; 210 V val; 211 212 public Pair(K key, V val) { 213 this.key=key; 214 this.val=val; 215 } 216 217 public String toString() { 218 return key + "::" + val; 219 } 220 } 221 222 private class MyReceiver extends ReceiverAdapter { 223 String name; 224 final List<Pair<Address,Integer >> msgs=new LinkedList(); 225 AtomicInteger count=new AtomicInteger (0); 226 227 public MyReceiver(String name) { 228 this.name=name; 229 } 230 231 public void receive(Message msg) { 232 Util.sleep(SLEEPTIME); 233 Pair pair=new Pair<Address,Integer >(msg.getSrc(), (Integer )msg.getObject()); 234 synchronized(msgs) { 236 msgs.add(pair); 237 } 238 if(count.incrementAndGet() >= EXPECTED) { 239 try { 240 barrier.await(); 241 } 242 catch(Exception e) { 243 e.printStackTrace(); 244 } 245 } 246 } 247 248 public List getMessages() {return msgs;} 249 250 public String getName() { 251 return name; 252 } 253 } 254 255 256 public static void main(String [] args) { 257 String [] testCaseName={ConcurrentStackTest.class.getName()}; 258 junit.textui.TestRunner.main(testCaseName); 259 } 260 261 } 262 | Popular Tags |