KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > ReconciliationTest


1 package org.jgroups.tests;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.io.ObjectInputStream JavaDoc;
6 import java.io.ObjectOutputStream JavaDoc;
7 import java.io.OutputStream JavaDoc;
8 import java.util.ArrayList JavaDoc;
9 import java.util.HashMap JavaDoc;
10 import java.util.List JavaDoc;
11 import java.util.Map JavaDoc;
12 import java.util.Properties JavaDoc;
13
14 import junit.framework.Test;
15 import junit.framework.TestSuite;
16
17 import org.jgroups.Address;
18 import org.jgroups.Channel;
19 import org.jgroups.ChannelException;
20 import org.jgroups.Event;
21 import org.jgroups.ExtendedReceiverAdapter;
22 import org.jgroups.JChannel;
23 import org.jgroups.Message;
24 import org.jgroups.View;
25 import org.jgroups.protocols.DISCARD;
26 import org.jgroups.stack.Protocol;
27 import org.jgroups.stack.ProtocolStack;
28 import org.jgroups.util.Util;
29
30 /**
31  * Tests the FLUSH protocol, requires flush-udp.xml in ./conf to be present and
32  * configured to use FLUSH
33  *
34  * @author Bela Ban
35  * @version $Id: ReconciliationTest.java,v 1.2 2007/07/06 19:24:22 vlada Exp $
36  */

37 public class ReconciliationTest extends ChannelTestBase {
38
39     private JChannel c1, c2;
40     
41     private List JavaDoc<JChannel> channels;
42     private List JavaDoc<MyReceiver> receivers;
43
44     public ReconciliationTest(){
45         super();
46     }
47
48     public ReconciliationTest(String JavaDoc name){
49         super(name);
50     }
51
52     public void setUp() throws Exception JavaDoc {
53         super.setUp();
54         CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml");
55     }
56
57     public void tearDown() throws Exception JavaDoc {
58         if(channels != null){
59         for(JChannel channel:channels){
60             channel.close();
61         }}
62         
63         Util.sleep(500);
64         super.tearDown();
65     }
66
67     public boolean useBlocking() {
68         return true;
69     }
70
71     /**
72      * Test scenario:
73      * <ul>
74      * <li>3 members: A,B,C
75      * <li>All members have DISCARD which does <em>not</em> discard any
76      * messages !
77      * <li>B (in DISCARD) ignores all messages from C
78      * <li>C multicasts 5 messages to the cluster, A and C receive them
79      * <li>New member D joins
80      * <li>Before installing view {A,B,C,D}, FLUSH updates B with all of C's 5
81      * messages
82      * </ul>
83      */

84     public void testReconciliationFlushTriggeredByNewMemberJoin() throws Exception JavaDoc {
85         
86         FlushTrigger t = new FlushTrigger() {
87             public void triggerFlush() {
88                 log.info("Joining D, this will trigger FLUSH and a subsequent view change to {A,B,C,D}");
89                 JChannel newChannel;
90                 try{
91                     newChannel = createChannel();
92                     newChannel.connect("x");
93                     channels.add(newChannel);
94                 }catch(ChannelException e){
95                     e.printStackTrace();
96                 }
97             };
98         };
99         String JavaDoc apps [] = createApplicationNames(3);
100         reconciliationHelper(apps,t);
101     }
102
103     /**
104      * Test scenario:
105      * <ul>
106      * <li>3 members: A,B,C
107      * <li>All members have DISCARD which does <em>not</em> discard any
108      * messages !
109      * <li>B (in DISCARD) ignores all messages from C
110      * <li>C multicasts 5 messages to the cluster, A and C receive them
111      * <li>A then runs a manual flush by calling Channel.start/stopFlush()
112      * <li>Before installing view {A,B}, FLUSH makes A sends its 5 messages
113      * received from C to B
114      * </ul>
115      */

116     public void testReconciliationFlushTriggeredByManualFlush() throws Exception JavaDoc {
117         
118         FlushTrigger t = new FlushTrigger() {
119             public void triggerFlush() {
120                 JChannel channel = channels.get(0);
121                 boolean rc = channel.startFlush(0, false);
122                 log.info("manual flush success="+rc);
123                 channel.stopFlush();
124             };
125         };
126         String JavaDoc apps [] = createApplicationNames(3);
127         reconciliationHelper(apps,t);
128     }
129
130     /**
131      * Test scenario:
132      * <ul>
133      * <li>3 members: A,B,C
134      * <li>All members have DISCARD which does <em>not</em> discard any
135      * messages !
136      * <li>B (in DISCARD) ignores all messages from C
137      * <li>C multicasts 5 messages to the cluster, A and C receive them
138      * <li>C then 'crashes' (Channel.shutdown())
139      * <li>Before installing view {A,B}, FLUSH makes A sends its 5 messages
140      * received from C to B
141      * </ul>
142      */

143     public void testReconciliationFlushTriggeredByMemberCrashing() throws Exception JavaDoc {
144         
145         FlushTrigger t = new FlushTrigger() {
146             public void triggerFlush() {
147                 JChannel channel = channels.remove(channels.size()-1);
148                 channel.shutdown();
149             };
150         };
151         String JavaDoc apps [] = createApplicationNames(3);
152         reconciliationHelper(apps,t);
153     }
154     
155     public void reconciliationHelper(String JavaDoc [] names,FlushTrigger ft) throws Exception JavaDoc {
156         
157         //create channels and setup receivers
158
int channelCount = names.length;
159         channels = new ArrayList JavaDoc<JChannel>(names.length);
160         receivers = new ArrayList JavaDoc<MyReceiver>(names.length);
161         for(int i = 0;i < channelCount;i++){
162             JChannel channel = createChannel();
163             MyReceiver r = new MyReceiver(channel,names[i]);
164             receivers.add(r);
165             channels.add(channel);
166             channel.setReceiver(r);
167             channel.connect("x");
168             Util.sleep(250);
169         }
170         JChannel last = channels.get(channels.size()-1);
171         JChannel nextToLast = channels.get(channels.size()-2);
172         
173         insertDISCARD(nextToLast, last.getLocalAddress());
174
175         String JavaDoc lastsName = names[names.length-1];
176         String JavaDoc nextToLastName = names[names.length-2];
177         printDigests(channels,"\nDigests before " + lastsName +" sends any messages:");
178         
179
180         // now last sends 5 messages:
181
log.info("\n" + lastsName + " sending 5 messages;" + nextToLastName + " will ignore them, but others will receive them");
182         for(int i = 1;i <= 5;i++){
183             last.send(null, null, new Integer JavaDoc(i));
184         }
185         Util.sleep(1000); // until al messages have been received, this is
186
// asynchronous so we need to wait a bit
187

188         printDigests(channels,"\nDigests after " + lastsName +" sent messages:");
189
190         
191         MyReceiver lastReceiver = receivers.get(receivers.size()-1);
192         MyReceiver nextToLastReceiver = receivers.get(receivers.size()-2);
193         
194         // check last (must have received its own messages)
195
Map JavaDoc<Address, List JavaDoc<Integer JavaDoc>> map = lastReceiver.getMsgs();
196         assertEquals("we should have only 1 sender, namely C at this time", 1, map.size());
197         List JavaDoc<Integer JavaDoc> list = map.get(last.getLocalAddress());
198         log.info(lastsName + ": messages received from "+ lastsName + ",list=" +list);
199         assertEquals("correct msgs: " + list, 5, list.size());
200         
201         //check nextToLast (should have received none of last messages)
202
map = nextToLastReceiver.getMsgs();
203         assertEquals("we should have no sender at this time", 0, map.size());
204         list = map.get(last.getLocalAddress());
205         log.info(nextToLastName+": messages received from "+lastsName +" : " + list);
206         assertNull(list);
207
208         List JavaDoc<MyReceiver> otherReceivers = receivers.subList(0, receivers.size()-2);
209         
210         //check other (should have received last's messages)
211
for(MyReceiver receiver:otherReceivers){
212             map = receiver.getMsgs();
213             assertEquals("we should have only 1 sender", 1, map.size());
214             list = map.get(last.getLocalAddress());
215             log.info(receiver.name +" messages received from "+lastsName +":" + list);
216             assertEquals("correct msgs" + list, 5, list.size());
217         }
218     
219
220         removeDISCARD(nextToLast);
221
222         Address address = last.getLocalAddress();
223         ft.triggerFlush();
224         
225         int cnt = 1000;
226         View v;
227         while((v = channels.get(0).getView()) != null && cnt > 0){
228             cnt--;
229             if(v.size() == channels.size())
230                 break;
231             Util.sleep(500);
232         }
233         
234         printDigests(channels,"");
235
236         // check that member with discard (should have received all missing messages
237
map = nextToLastReceiver.getMsgs();
238         assertEquals("we should have 1 sender at this time", 1, map.size());
239         list = map.get(address);
240         log.info(nextToLastName+": messages received from "+lastsName+" : " + list);
241         assertEquals(5, list.size());
242     }
243     
244     private void printDigests(List JavaDoc<JChannel> channels,String JavaDoc message) {
245         log.info(message);
246         for(JChannel channel:channels){
247             log.info(channel.downcall(Event.GET_DIGEST_EVT));
248         }
249     }
250
251     private static void insertDISCARD(JChannel ch, Address exclude) throws Exception JavaDoc {
252         Properties JavaDoc prop = new Properties JavaDoc();
253         prop.setProperty("excludeitself", "true"); // don't discard messages to
254
// self
255
DISCARD discard = new DISCARD();
256         discard.setProperties(prop);
257         discard.addIgnoreMember(exclude); // ignore messages from this member
258
ch.getProtocolStack().insertProtocol(discard, ProtocolStack.BELOW, "NAKACK");
259     }
260
261     private static void removeDISCARD(JChannel... channels) throws Exception JavaDoc {
262         for(JChannel ch:channels){
263             ch.getProtocolStack().removeProtocol("DISCARD");
264         }
265     }
266     
267     private interface FlushTrigger {
268         void triggerFlush();
269     }
270
271     private static class MyReceiver extends ExtendedReceiverAdapter {
272         Map JavaDoc<Address, List JavaDoc<Integer JavaDoc>> msgs = new HashMap JavaDoc<Address, List JavaDoc<Integer JavaDoc>>(10);
273
274         Channel channel;
275
276         String JavaDoc name;
277
278         public MyReceiver(Channel ch,String JavaDoc name){
279             this.channel = ch;
280             this.name = name;
281         }
282
283         public Map JavaDoc<Address, List JavaDoc<Integer JavaDoc>> getMsgs() {
284             return msgs;
285         }
286
287         public void reset() {
288             msgs.clear();
289         }
290
291         public void receive(Message msg) {
292             List JavaDoc<Integer JavaDoc> list = msgs.get(msg.getSrc());
293             if(list == null){
294                 list = new ArrayList JavaDoc<Integer JavaDoc>();
295                 msgs.put(msg.getSrc(), list);
296             }
297             list.add((Integer JavaDoc) msg.getObject());
298             System.out.println("[" + name
299                                 + " / "
300                                 + channel.getLocalAddress()
301                                 + "]: received message from "
302                                 + msg.getSrc()
303                                 + ": "
304                                 + msg.getObject());
305         }
306
307         public void viewAccepted(View new_view) {
308             System.out.println("[" + name + " / " + channel.getLocalAddress() + "]: " + new_view);
309         }
310     }
311
312     public void testVirtualSynchrony() throws Exception JavaDoc {
313         c1 = createChannel(CHANNEL_CONFIG);
314         Cache cache_1 = new Cache(c1, "cache-1");
315         c1.connect("bla");
316
317         c2 = createChannel(CHANNEL_CONFIG);
318         Cache cache_2 = new Cache(c2, "cache-2");
319         c2.connect("bla");
320         assertEquals("view: " + c1.getView(), 2, c2.getView().size());
321
322         // start adding messages
323
flush(c1, 5000); // flush all pending message out of the system so
324
// everyone receives them
325

326         for(int i = 1;i <= 20;i++){
327             if(i % 2 == 0){
328                 cache_1.put("key-" + i, Boolean.TRUE); // even numbers
329
}else{
330                 cache_2.put("key-" + i, Boolean.TRUE); // odd numbers
331
}
332         }
333
334         flush(c1, 5000);
335         System.out.println("cache_1 (" + cache_1.size()
336                             + " elements): "
337                             + cache_1
338                             + "\ncache_2 ("
339                             + cache_2.size()
340                             + " elements): "
341                             + cache_2);
342         assertEquals(cache_1.size(), cache_2.size());
343         assertEquals(20, cache_1.size());
344     }
345
346     private static void flush(Channel channel, long timeout) {
347         if(channel.flushSupported()){
348             boolean success = channel.startFlush(timeout, true);
349             System.out.println("startFlush(): " + success);
350             assertTrue(success);
351         }else
352             Util.sleep(timeout);
353     }
354
355     protected JChannel createChannel() throws ChannelException {
356         JChannel ret = new JChannel(CHANNEL_CONFIG);
357         ret.setOpt(Channel.BLOCK, Boolean.TRUE);
358         Protocol flush = ret.getProtocolStack().findProtocol("FLUSH");
359         if(flush != null){
360             Properties JavaDoc p = new Properties JavaDoc();
361             p.setProperty("timeout", "0");
362             flush.setProperties(p);
363
364             // send timeout up and down the stack, so other protocols can use
365
// the same value too
366
Map JavaDoc<Object JavaDoc, Object JavaDoc> map = new HashMap JavaDoc<Object JavaDoc, Object JavaDoc>();
367             map.put("flush_timeout", new Long JavaDoc(0));
368             flush.getUpProtocol().up(new Event(Event.CONFIG, map));
369             flush.getDownProtocol().down(new Event(Event.CONFIG, map));
370         }
371         return ret;
372     }
373
374     private static class Cache extends ExtendedReceiverAdapter {
375         protected final Map JavaDoc<Object JavaDoc, Object JavaDoc> data;
376
377         Channel ch;
378
379         String JavaDoc name;
380
381         public Cache(Channel ch,String JavaDoc name){
382             this.data = new HashMap JavaDoc<Object JavaDoc, Object JavaDoc>();
383             this.ch = ch;
384             this.name = name;
385             this.ch.setReceiver(this);
386         }
387
388         protected Object JavaDoc get(Object JavaDoc key) {
389             synchronized(data){
390                 return data.get(key);
391             }
392         }
393
394         protected void put(Object JavaDoc key, Object JavaDoc val) throws Exception JavaDoc {
395             Object JavaDoc[] buf = new Object JavaDoc[2];
396             buf[0] = key;
397             buf[1] = val;
398             Message msg = new Message(null, null, buf);
399             ch.send(msg);
400         }
401
402         protected int size() {
403             synchronized(data){
404                 return data.size();
405             }
406         }
407
408         public void receive(Message msg) {
409             Object JavaDoc[] modification = (Object JavaDoc[]) msg.getObject();
410             Object JavaDoc key = modification[0];
411             Object JavaDoc val = modification[1];
412             synchronized(data){
413                 // System.out.println("****** [" + name + "] received PUT(" +
414
// key + ", " + val + ") " + " from " + msg.getSrc() + "
415
// *******");
416
data.put(key, val);
417             }
418         }
419
420         public byte[] getState() {
421             byte[] state = null;
422             synchronized(data){
423                 try{
424                     state = Util.objectToByteBuffer(data);
425                 }catch(Exception JavaDoc e){
426                     e.printStackTrace();
427                     return null;
428                 }
429             }
430             return state;
431         }
432
433         public byte[] getState(String JavaDoc state_id) {
434             return getState();
435         }
436
437         public void setState(byte[] state) {
438             Map JavaDoc<Object JavaDoc, Object JavaDoc> m;
439             try{
440                 m = (Map JavaDoc<Object JavaDoc, Object JavaDoc>) Util.objectFromByteBuffer(state);
441                 synchronized(data){
442                     data.clear();
443                     data.putAll(m);
444                 }
445             }catch(Exception JavaDoc e){
446                 e.printStackTrace();
447             }
448         }
449
450         public void setState(String JavaDoc state_id, byte[] state) {
451             setState(state);
452         }
453
454         public void getState(OutputStream JavaDoc ostream) {
455             ObjectOutputStream JavaDoc oos = null;
456             try{
457                 oos = new ObjectOutputStream JavaDoc(ostream);
458                 synchronized(data){
459                     oos.writeObject(data);
460                 }
461                 oos.flush();
462             }catch(IOException JavaDoc e){
463             }finally{
464                 try{
465                     if(oos != null)
466                         oos.close();
467                 }catch(IOException JavaDoc e){
468                     System.err.println(e);
469                 }
470             }
471         }
472
473         public void getState(String JavaDoc state_id, OutputStream JavaDoc ostream) {
474             getState(ostream);
475         }
476
477         public void setState(InputStream JavaDoc istream) {
478             ObjectInputStream JavaDoc ois = null;
479             try{
480                 ois = new ObjectInputStream JavaDoc(istream);
481                 Map JavaDoc<Object JavaDoc, Object JavaDoc> m = (Map JavaDoc<Object JavaDoc, Object JavaDoc>) ois.readObject();
482                 synchronized(data){
483                     data.clear();
484                     data.putAll(m);
485                 }
486
487             }catch(Exception JavaDoc e){
488             }finally{
489                 try{
490                     if(ois != null)
491                         ois.close();
492                 }catch(IOException JavaDoc e){
493                     System.err.println(e);
494                 }
495             }
496         }
497
498         public void setState(String JavaDoc state_id, InputStream JavaDoc istream) {
499             setState(istream);
500         }
501
502         public void clear() {
503             synchronized(data){
504                 data.clear();
505             }
506         }
507
508         public void viewAccepted(View new_view) {
509             log("view is " + new_view);
510         }
511
512         public String JavaDoc toString() {
513             synchronized(data){
514                 return data.toString();
515             }
516         }
517
518         private void log(String JavaDoc msg) {
519             System.out.println("-- [" + name + "] " + msg);
520         }
521
522     }
523
524     public static Test suite() {
525         return new TestSuite(ReconciliationTest.class);
526     }
527
528     public static void main(String JavaDoc[] args) {
529         junit.textui.TestRunner.run(ReconciliationTest.suite());
530     }
531 }
532
Popular Tags