1 package org.jgroups.tests; 2 3 import java.io.Serializable ; 4 import java.util.ArrayList ; 5 import java.util.Iterator ; 6 import java.util.List ; 7 import java.util.Random ; 8 9 import junit.framework.Test; 10 import junit.framework.TestCase; 11 import junit.framework.TestSuite; 12 13 import org.jgroups.Address; 14 import org.jgroups.BlockEvent; 15 import org.jgroups.ChannelClosedException; 16 import org.jgroups.ChannelNotConnectedException; 17 import org.jgroups.JChannel; 18 import org.jgroups.Message; 19 import org.jgroups.TimeoutException; 20 import org.jgroups.View; 21 import org.jgroups.ViewId; 22 import org.jgroups.util.Util; 23 24 46 public class VirtualSynchronyTest extends TestCase { 47 48 private final static String CHANNEL_PROPS="flush-udp.xml"; 49 private final static int INITIAL_NUMBER_OF_MEMBERS=5; 50 private int runningTime = 1000*50; private Random r = new Random (); 52 53 54 public VirtualSynchronyTest(String arg0) { 55 super(arg0); 56 } 57 58 public void testVSynch() throws Exception 59 { 60 long start = System.currentTimeMillis(); 61 boolean running=true; 62 List members=new ArrayList (); 63 64 for(int i =0;i<INITIAL_NUMBER_OF_MEMBERS;i++) 66 { 67 GroupMemberThread member = new GroupMemberThread("Member"); 68 member.start(); 69 members.add(member); 70 Util.sleep(getRandomDelayInSeconds(4,6)*1000); 71 } 72 73 74 for (; running;) { 75 76 if(r.nextBoolean()) 78 { 79 Util.sleep(getRandomDelayInSeconds(3,8)*1000); 80 GroupMemberThread member = new GroupMemberThread("Member"); 81 member.start(); 82 members.add(member); 83 } 84 else if(members.size()>1) 85 { 86 Util.sleep(getRandomDelayInSeconds(3,8)*1000); 87 GroupMemberThread unluckyBastard = (GroupMemberThread) members.get(r.nextInt(members.size())); 88 members.remove(unluckyBastard); 89 unluckyBastard.setRunning(false); 90 } 91 running =System.currentTimeMillis() - start <= runningTime; 92 System.out.println("Running time " + ((System.currentTimeMillis()-start)/1000) + " secs"); 93 } 94 System.out.println("Done, Virtual Synchrony satisfied in all tests "); 95 } 96 97 protected int getRandomDelayInSeconds(int from,int to) 98 { 99 return from + r.nextInt(to-from); 100 } 101 102 protected void setUp() throws Exception { 103 super.setUp(); 104 } 105 106 protected void tearDown() throws Exception { 107 super.tearDown(); 108 } 109 110 public static Test suite() { 111 return new TestSuite(VirtualSynchronyTest.class); 112 } 113 114 public static void main(String [] args) { 115 String [] testCaseName={VirtualSynchronyTest.class.getName()}; 116 junit.textui.TestRunner.main(testCaseName); 117 } 118 119 private static class GroupMemberThread extends Thread { 120 JChannel ch = null; 121 int numberOfMessagesInView = 0; 122 View currentView; 123 View prevView; 124 List payloads; 125 VSynchPayload payload; 126 volatile boolean running = true; 127 Random r; 128 int messagesSentPerView = 0; 129 130 public GroupMemberThread(String name) { 131 super(name); 132 payloads = new ArrayList (); 133 r = new Random (); 134 messagesSentPerView = r.nextInt(25); 135 } 136 137 public String getAddress() { 138 if(ch!=null && ch.isConnected()) 139 { 140 return ch.getLocalAddress().toString(); 141 } 142 else 143 { 144 return "disconnected " + getName(); 145 } 146 } 147 148 public void setRunning(boolean b) { 149 running=false; 150 System.out.println("Disconnect " + getAddress()); 151 if(ch!=null)ch.close(); 152 } 153 154 public void run() { 155 try { 156 ch = new JChannel(CHANNEL_PROPS); 157 ch.connect("vsynchtest"); 158 } catch (Exception e) { 159 e.printStackTrace(); 160 } 161 162 while (running) { 163 Object msgReceived = null; 164 try { 165 msgReceived = ch.receive(0); 166 if (!running) { 167 } else { 171 if (msgReceived instanceof View) { 172 gotView(msgReceived); 173 } 174 175 if (msgReceived instanceof Message) { 176 gotMessage(msgReceived); 177 } 178 179 if (msgReceived instanceof BlockEvent){ 180 ch.blockOk(); 181 } 182 } 183 184 } catch (TimeoutException e) { 185 } catch (Exception e) { 186 ch.close(); 187 running = false; 188 } 189 } 190 } 191 192 private void gotMessage(Object msgReceived) { 193 Message msg = (Message) msgReceived; 194 Object m = msg.getObject(); 195 196 if (m instanceof VSynchPayload) { 197 VSynchPayload pay = (VSynchPayload) m; 198 if (prevView != null && prevView.getVid().equals(pay.viewId)) { 199 payloads.add(pay); 200 boolean receivedAllPayloads = ((payloads.size() == prevView 201 .getMembers().size()) || (payloads.size() == currentView 202 .getMembers().size())); 203 if (receivedAllPayloads) { 204 VSynchPayload first=(VSynchPayload) payloads.get(0); 205 for (Iterator i = payloads.listIterator(1); i.hasNext();) { 206 VSynchPayload p = (VSynchPayload) i.next(); 207 assertEquals("Member " + p + " and " + first 208 + " failed VS", first.msgViewCount, 209 p.msgViewCount); 210 } 211 System.out.println("VS ok, all " + payloads.size() 212 + " members in " + prevView.getVid() 213 + " view have received " + first.msgViewCount 214 + " messages.\nAll messages sent in " 215 + prevView.getVid() + " were delivered in " 216 + prevView.getVid()); 217 } 218 } 219 } else if (m instanceof String ) { 220 assertEquals("Member " + ch.getLocalAddress() 221 + " received message from the wrong view. Message sender was " 222 + msg.getSrc(), currentView.getVid().getId(), Long.parseLong((String ) m)); 223 numberOfMessagesInView++; 224 } 225 } 226 227 private void gotView(Object msg) throws ChannelNotConnectedException, ChannelClosedException { 228 View tmpView = (View) msg; 229 if (currentView != null) { 230 payload = new VSynchPayload(currentView.getVid(), 231 numberOfMessagesInView,ch.getLocalAddress()); 232 ch.send((Address)tmpView.getMembers().get(0), null, payload); 233 } 234 numberOfMessagesInView = 0; 235 payloads.clear(); 236 prevView = currentView; 237 currentView = tmpView; 238 for (int i = 0; i < messagesSentPerView; i++) { 240 ch.send(null, null, Long.toString(currentView.getVid().getId())); 241 } 242 } 243 } 244 245 private static class VSynchPayload implements Serializable { 246 public ViewId viewId; 247 248 public int msgViewCount; 249 250 public Address member; 251 252 public VSynchPayload(ViewId viewId, int numbreOfMessagesInView,Address a) { 253 super(); 254 this.viewId = viewId; 255 this.msgViewCount = numbreOfMessagesInView; 256 this.member=a; 257 } 258 259 public String toString() { 260 return "[member=" +member + ",viewId=" + viewId.getId() + ",msgCount=" + msgViewCount 261 + "]"; 262 } 263 264 } 265 } 266 | Popular Tags |