KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > VirtualSynchronyTest


1 package org.jgroups.tests;
2
3 import java.io.Serializable JavaDoc;
4 import java.util.ArrayList JavaDoc;
5 import java.util.Iterator JavaDoc;
6 import java.util.List JavaDoc;
7 import java.util.Random JavaDoc;
8
9 import junit.framework.Test;
10 import junit.framework.TestCase;
11 import junit.framework.TestSuite;
12
13 import org.jgroups.Address;
14 import org.jgroups.BlockEvent;
15 import org.jgroups.ChannelClosedException;
16 import org.jgroups.ChannelNotConnectedException;
17 import org.jgroups.JChannel;
18 import org.jgroups.Message;
19 import org.jgroups.TimeoutException;
20 import org.jgroups.View;
21 import org.jgroups.ViewId;
22 import org.jgroups.util.Util;
23
24 /**
25  * Virtual Synchrony guarantees that views in a group are observed
26  * in the same order by all group members and that views are totally ordered
27  * with respect to all regular messages in a group.
28  *
29  * Therefore, in virtually synchronous group communication model, all members
30  * that observe the same two consecutive views, receive the same set of regular
31  * multicast messages between those two views.
32  *
33  * VirtualSynchronyTest verifies virtual synchrony as follows. Each surviving member P
34  * from a view Vm that receives a new view Vn sends a message M to group coordinator Q
35  * containing number of messages received in view Vm. Each member P upon receiving
36  * a new view sends a random number of messages to everyone in the group.
37  *
38  * Group coordinator Q upon receiving each message M from a member P verifies
39  * if virtual synchrony is satisifed.
40  *
41  *
42  * @author Vladimir Blagojevic
43  * @version $Id$
44  *
45  */

46 public class VirtualSynchronyTest extends TestCase {
47
48     private final static String JavaDoc CHANNEL_PROPS="flush-udp.xml";
49     private final static int INITIAL_NUMBER_OF_MEMBERS=5;
50     private int runningTime = 1000*50; // 50 secs
51
private Random JavaDoc r = new Random JavaDoc();
52     
53     
54     public VirtualSynchronyTest(String JavaDoc arg0) {
55         super(arg0);
56     }
57     
58     public void testVSynch() throws Exception JavaDoc
59     {
60         long start = System.currentTimeMillis();
61         boolean running=true;
62         List JavaDoc members=new ArrayList JavaDoc();
63         
64         //first spawn and join
65
for(int i =0;i<INITIAL_NUMBER_OF_MEMBERS;i++)
66         {
67             GroupMemberThread member = new GroupMemberThread("Member");
68             member.start();
69             members.add(member);
70             Util.sleep(getRandomDelayInSeconds(4,6)*1000);
71         }
72         
73         
74         for (; running;) {
75             
76             //and then flip a coin
77
if(r.nextBoolean())
78             {
79                 Util.sleep(getRandomDelayInSeconds(3,8)*1000);
80                 GroupMemberThread member = new GroupMemberThread("Member");
81                 member.start();
82                 members.add(member);
83             }
84             else if(members.size()>1)
85             {
86                 Util.sleep(getRandomDelayInSeconds(3,8)*1000);
87                 GroupMemberThread unluckyBastard = (GroupMemberThread) members.get(r.nextInt(members.size()));
88                 members.remove(unluckyBastard);
89                 unluckyBastard.setRunning(false);
90             }
91             running =System.currentTimeMillis() - start <= runningTime;
92             System.out.println("Running time " + ((System.currentTimeMillis()-start)/1000) + " secs");
93         }
94         System.out.println("Done, Virtual Synchrony satisfied in all tests ");
95     }
96     
97     protected int getRandomDelayInSeconds(int from,int to)
98     {
99         return from + r.nextInt(to-from);
100     }
101
102     protected void setUp() throws Exception JavaDoc {
103         super.setUp();
104     }
105
106     protected void tearDown() throws Exception JavaDoc {
107         super.tearDown();
108     }
109     
110     public static Test suite() {
111           return new TestSuite(VirtualSynchronyTest.class);
112     }
113      
114     public static void main(String JavaDoc[] args) {
115          String JavaDoc[] testCaseName={VirtualSynchronyTest.class.getName()};
116          junit.textui.TestRunner.main(testCaseName);
117     }
118     
119     private static class GroupMemberThread extends Thread JavaDoc {
120         JChannel ch = null;
121         int numberOfMessagesInView = 0;
122         View currentView;
123         View prevView;
124         List JavaDoc payloads;
125         VSynchPayload payload;
126         volatile boolean running = true;
127         Random JavaDoc r;
128         int messagesSentPerView = 0;
129
130         public GroupMemberThread(String JavaDoc name) {
131             super(name);
132             payloads = new ArrayList JavaDoc();
133             r = new Random JavaDoc();
134             messagesSentPerView = r.nextInt(25);
135         }
136
137         public String JavaDoc getAddress() {
138             if(ch!=null && ch.isConnected())
139             {
140                 return ch.getLocalAddress().toString();
141             }
142             else
143             {
144                 return "disconnected " + getName();
145             }
146         }
147
148         public void setRunning(boolean b) {
149             running=false;
150             System.out.println("Disconnect " + getAddress());
151             if(ch!=null)ch.close();
152         }
153
154         public void run() {
155             try {
156                 ch = new JChannel(CHANNEL_PROPS);
157                 ch.connect("vsynchtest");
158             } catch (Exception JavaDoc e) {
159                 e.printStackTrace();
160             }
161             
162             while (running) {
163                 Object JavaDoc msgReceived = null;
164                 try {
165                     msgReceived = ch.receive(0);
166                     if (!running) {
167                         // I am not a group member anymore so
168
// I will discard any transient message I
169
// receive
170
} else {
171                         if (msgReceived instanceof View) {
172                             gotView(msgReceived);
173                         }
174
175                         if (msgReceived instanceof Message) {
176                             gotMessage(msgReceived);
177                         }
178                         
179                         if (msgReceived instanceof BlockEvent){
180                            ch.blockOk();
181                         }
182                     }
183
184                 } catch (TimeoutException e) {
185                 } catch (Exception JavaDoc e) {
186                     ch.close();
187                     running = false;
188                 }
189             }
190         }
191
192         private void gotMessage(Object JavaDoc msgReceived) {
193             Message msg = (Message) msgReceived;
194             Object JavaDoc m = msg.getObject();
195
196             if (m instanceof VSynchPayload) {
197                 VSynchPayload pay = (VSynchPayload) m;
198                 if (prevView != null && prevView.getVid().equals(pay.viewId)) {
199                     payloads.add(pay);
200                     boolean receivedAllPayloads = ((payloads.size() == prevView
201                             .getMembers().size()) || (payloads.size() == currentView
202                             .getMembers().size()));
203                     if (receivedAllPayloads) {
204                         VSynchPayload first=(VSynchPayload) payloads.get(0);
205                         for (Iterator JavaDoc i = payloads.listIterator(1); i.hasNext();) {
206                             VSynchPayload p = (VSynchPayload) i.next();
207                             assertEquals("Member " + p + " and " + first
208                                     + " failed VS", first.msgViewCount,
209                                     p.msgViewCount);
210                         }
211                         System.out.println("VS ok, all " + payloads.size()
212                                 + " members in " + prevView.getVid()
213                                 + " view have received " + first.msgViewCount
214                                 + " messages.\nAll messages sent in "
215                                 + prevView.getVid() + " were delivered in "
216                                 + prevView.getVid());
217                     }
218                 }
219             } else if (m instanceof String JavaDoc) {
220                 assertEquals("Member " + ch.getLocalAddress()
221                             + " received message from the wrong view. Message sender was "
222                             + msg.getSrc(), currentView.getVid().getId(), Long.parseLong((String JavaDoc) m));
223                 numberOfMessagesInView++;
224             }
225         }
226
227         private void gotView(Object JavaDoc msg) throws ChannelNotConnectedException, ChannelClosedException {
228             View tmpView = (View) msg;
229             if (currentView != null) {
230                 payload = new VSynchPayload(currentView.getVid(),
231                         numberOfMessagesInView,ch.getLocalAddress());
232                 ch.send((Address)tmpView.getMembers().get(0), null, payload);
233             }
234             numberOfMessagesInView = 0;
235             payloads.clear();
236             prevView = currentView;
237             currentView = tmpView;
238             // send our allotment of messages
239
for (int i = 0; i < messagesSentPerView; i++) {
240                 ch.send(null, null, Long.toString(currentView.getVid().getId()));
241             }
242         }
243     }
244
245     private static class VSynchPayload implements Serializable JavaDoc {
246         public ViewId viewId;
247
248         public int msgViewCount;
249
250         public Address member;
251
252         public VSynchPayload(ViewId viewId, int numbreOfMessagesInView,Address a) {
253             super();
254             this.viewId = viewId;
255             this.msgViewCount = numbreOfMessagesInView;
256             this.member=a;
257         }
258
259         public String JavaDoc toString() {
260             return "[member=" +member + ",viewId=" + viewId.getId() + ",msgCount=" + msgViewCount
261                     + "]";
262         }
263
264     }
265 }
266
Popular Tags