KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > MultiplexerTest


1 package org.jgroups.tests;
2
3 import junit.framework.Test;
4 import junit.framework.TestSuite;
5 import org.jgroups.*;
6 import org.jgroups.mux.MuxChannel;
7 import org.jgroups.stack.IpAddress;
8 import org.jgroups.stack.ProtocolStack;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Util;
11
12 import java.util.*;
13 import java.io.*;
14
15 /**
16  * Test the multiplexer functionality provided by JChannelFactory
17  * @author Bela Ban
18  * @version $Id: MultiplexerTest.java,v 1.44 2007/06/29 11:02:36 belaban Exp $
19  */

20 public class MultiplexerTest extends ChannelTestBase {
21     private Cache c1, c2, c1_repl, c2_repl;
22     private Channel ch1, ch2, ch1_repl, ch2_repl;
23     JChannelFactory factory, factory2;
24
25     public MultiplexerTest(String JavaDoc name) {
26         super(name);
27     }
28
29
30     public void setUp() throws Exception JavaDoc {
31         super.setUp();
32         factory=new JChannelFactory();
33         factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
34
35         factory2=new JChannelFactory();
36         factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
37     }
38
39     public void tearDown() throws Exception JavaDoc {
40         if(ch1_repl != null)
41             ch1_repl.close();
42         if(ch2_repl != null)
43             ch2_repl.close();
44         if(ch1 != null)
45             ch1.close();
46         if(ch2 != null)
47             ch2.close();
48         if(ch1 != null) {
49             assertFalse(((MuxChannel)ch1).getChannel().isOpen());
50             assertFalse(((MuxChannel)ch1).getChannel().isConnected());
51         }
52         if(ch2 != null) {
53             assertFalse(((MuxChannel)ch2).getChannel().isOpen());
54             assertFalse(((MuxChannel)ch2).getChannel().isConnected());
55         }
56         if(ch1_repl != null) {
57             assertFalse(((MuxChannel)ch1_repl).getChannel().isOpen());
58             assertFalse(((MuxChannel)ch1_repl).getChannel().isConnected());
59         }
60         if(ch2_repl != null) {
61             assertFalse(((MuxChannel)ch2_repl).getChannel().isOpen());
62             assertFalse(((MuxChannel)ch2_repl).getChannel().isConnected());
63         }
64
65         if(c1 != null) c1.clear();
66         if(c2 != null) c2.clear();
67         if(c1_repl != null) c1_repl.clear();
68         if(c2_repl != null) c2_repl.clear();
69
70         ch1_repl=ch2_repl=ch1=ch2=null;
71         c1=c2=c1_repl=c2_repl=null;
72         
73         super.tearDown();
74     }
75
76
77     public void testReplicationWithOneChannel() throws Exception JavaDoc {
78         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
79         ch1.connect("bla");
80         c1=new Cache(ch1, "cache-1");
81         assertEquals("cache has to be empty initially", 0, c1.size());
82         c1.put("name", "Bela");
83         Util.sleep(300); // we need to wait because replication is asynchronous here
84
assertEquals(1, c1.size());
85         assertEquals("Bela", c1.get("name"));
86     }
87
88
89     public void testLifecycle() throws Exception JavaDoc {
90         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
91         assertTrue(ch1.isOpen());
92         assertFalse(ch1.isConnected());
93
94         ch1.connect("bla");
95         assertTrue(ch1.isOpen());
96         assertTrue(ch1.isConnected());
97
98         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
99         assertTrue(ch2.isOpen());
100         assertFalse(ch2.isConnected());
101
102         ch2.connect("bla");
103         assertTrue(ch2.isOpen());
104         assertTrue(ch2.isConnected());
105
106         ch2.disconnect();
107         assertTrue(ch2.isOpen());
108         assertFalse(ch2.isConnected());
109
110         ch2.connect("bla");
111         assertTrue(ch2.isOpen());
112         assertTrue(ch2.isConnected());
113
114         ch2.disconnect();
115         assertTrue(ch2.isOpen());
116         assertFalse(ch2.isConnected());
117
118         ch2.close();
119         assertFalse(ch2.isOpen());
120         assertFalse(ch2.isConnected());
121
122         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
123         ch2.connect("bla");
124         assertTrue(ch2.isOpen());
125         assertTrue(ch2.isConnected());
126
127         ch2.close();
128         assertFalse(ch2.isOpen());
129         assertFalse(ch2.isConnected());
130     }
131
132
133     public void testDisconnect() throws Exception JavaDoc {
134         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
135         assertTrue(ch1.isOpen());
136         assertFalse(ch1.isConnected());
137         assertTrue(((MuxChannel)ch1).getChannel().isOpen());
138         assertFalse(((MuxChannel)ch1).getChannel().isConnected());
139
140         ch1.connect("bla");
141         assertTrue(ch1.isOpen());
142         assertTrue(ch1.isConnected());
143         assertTrue(((MuxChannel)ch1).getChannel().isOpen());
144         assertTrue(((MuxChannel)ch1).getChannel().isConnected());
145
146         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
147         assertTrue(ch2.isOpen());
148         assertFalse(ch2.isConnected());
149
150         ch1.disconnect();
151         assertTrue(ch1.isOpen());
152         assertFalse(ch1.isConnected());
153
154         ch1.connect("bla");
155         assertTrue(ch1.isOpen());
156         assertTrue(ch1.isConnected());
157
158         ch1.close();
159         assertFalse(ch1.isOpen());
160         assertFalse(ch1.isConnected());
161         assertTrue(((MuxChannel)ch1).getChannel().isOpen());
162         assertTrue(((MuxChannel)ch1).getChannel().isConnected());
163
164         ch2.close();
165         assertFalse(ch2.isOpen());
166         assertFalse(ch2.isConnected());
167     }
168
169     public void testDisconnect2() throws Exception JavaDoc {
170         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
171         assertTrue(ch1.isOpen());
172         assertFalse(ch1.isConnected());
173
174         ch1.connect("bla");
175         assertTrue(ch1.isOpen());
176         assertTrue(ch1.isConnected());
177
178         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
179         assertTrue(ch2.isOpen());
180         assertFalse(ch2.isConnected());
181
182         ch1.disconnect();
183         assertTrue(ch1.isOpen());
184         assertFalse(ch1.isConnected());
185
186         assertTrue(ch2.isOpen());
187         assertFalse(ch2.isConnected());
188
189         ch1.connect("bla");
190         assertTrue(ch1.isOpen());
191         assertTrue(ch1.isConnected());
192
193         assertTrue(ch2.isOpen());
194         assertFalse(ch2.isConnected());
195     }
196
197
198     public void testClose() throws Exception JavaDoc {
199         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
200         ch1.connect("bla");
201         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
202         ch2.connect("bla");
203         ch1.close();
204         ch2.close();
205     }
206
207
208     public void testReplicationWithTwoChannels() throws Exception JavaDoc {
209         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
210         c1=new Cache(ch1, "cache-1");
211         assertEquals("cache has to be empty initially", 0, c1.size());
212         ch1.connect("bla");
213
214         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
215         c1_repl=new Cache(ch1_repl, "cache-1-repl");
216         assertEquals("cache has to be empty initially", 0, c1_repl.size());
217         ch1_repl.connect("bla");
218         Util.sleep(200);
219
220         View v=ch1_repl.getView();
221         assertNotNull(v);
222         assertEquals("view is " + v, 2, v.size());
223         v=ch1.getView();
224         assertNotNull(v);
225         assertEquals(2, v.size());
226
227         // System.out.println("****** [c1] PUT(name, Bela) *******");
228
c1.put("name", "Bela");
229         if(ch1.flushSupported()) {
230             boolean success=ch1.startFlush(5000, true);
231             System.out.println("startFlush(): " + success);
232             assertTrue(success);
233         }
234         else
235             Util.sleep(10000);
236
237         System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);
238
239         assertEquals(1, c1.size());
240         assertEquals("Bela", c1.get("name"));
241
242         Util.sleep(500); // async repl - wait until replicated to other member
243
assertEquals(1, c1_repl.size());
244         assertEquals("Bela", c1_repl.get("name"));
245
246         c1.put("id", new Long JavaDoc(322649));
247         c1_repl.put("hobbies", "biking");
248         c1_repl.put("bike", "Centurion");
249          if(ch1.flushSupported()) {
250              boolean success=ch1.startFlush(5000, true);
251              System.out.println("startFlush(): " + success);
252              assertTrue(success);
253          }
254         else
255             Util.sleep(10000);
256
257         System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);
258
259         assertEquals("c1: " + c1, 4, c1.size());
260         assertEquals("c1_repl: " + c1_repl, 4, c1_repl.size());
261
262         assertEquals(new Long JavaDoc(322649), c1.get("id"));
263         assertEquals(new Long JavaDoc(322649), c1_repl.get("id"));
264
265         assertEquals("biking", c1.get("hobbies"));
266         assertEquals("biking", c1_repl.get("hobbies"));
267
268         assertEquals("Centurion", c1.get("bike"));
269         assertEquals("Centurion", c1_repl.get("bike"));
270     }
271
272
273     public void testVirtualSynchrony() throws Exception JavaDoc {
274         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
275         c1=new Cache(ch1, "cache-1");
276         ch1.connect("bla");
277
278         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
279         c1_repl=new Cache(ch1_repl, "cache-1-repl");
280         ch1_repl.connect("bla");
281         assertEquals("view: " + ch1.getView(), 2, ch1.getView().size());
282
283         // start adding messages
284
flush(ch1, 5000); // flush all pending message out of the system so everyone receives them
285

286         for(int i=1; i <= 20; i++) {
287             if(i % 2 == 0) {
288                 c1.put(i, Boolean.TRUE); // even numbers
289
}
290             else {
291                 c1_repl.put(i, Boolean.TRUE); // odd numbers
292
}
293         }
294
295         flush(ch1, 5000);
296         System.out.println("c1 (" + c1.size() + " elements):\n" + c1.printKeys() +
297                 "\nc1_repl (" + c1_repl.size() + " elements):\n" + c1_repl.printKeys());
298         assertEquals(c1.size(), c1_repl.size());
299         assertEquals(20, c1.size());
300     }
301
302
303     private static void flush(Channel channel, long timeout) {
304         if(channel.flushSupported()) {
305             boolean success=channel.startFlush(timeout, true);
306             System.out.println("startFlush(): " + success);
307             assertTrue(success);
308         }
309         else
310             Util.sleep(timeout);
311     }
312
313     public void testReplicationWithReconnect() throws Exception JavaDoc {
314         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
315         ch1.connect("bla");
316         c1=new Cache(ch1, "cache-1");
317         assertEquals("cache has to be empty initially", 0, c1.size());
318         c1.put("name", "Bela");
319         Util.sleep(300); // we need to wait because replication is asynchronous here
320
assertEquals(1, c1.size());
321         assertEquals("Bela", c1.get("name"));
322
323         ch1.disconnect();
324
325         ch1.connect("bla");
326
327         c2=new Cache(ch1, "cache-1");
328         assertEquals("cache has to be empty initially", 0, c2.size());
329         c2.put("name", "Bela");
330         Util.sleep(300); // we need to wait because replication is asynchronous here
331
assertEquals(1, c2.size());
332         assertEquals("Bela", c2.get("name"));
333
334     }
335
336
337     public void testStateTransfer() throws Exception JavaDoc {
338         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
339         ch1.connect("bla");
340         c1=new Cache(ch1, "cache-1");
341         assertEquals("cache has to be empty initially", 0, c1.size());
342
343         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
344
345         c1.put("name", "Bela");
346         c1.put("id", new Long JavaDoc(322649));
347         c1.put("hobbies", "biking");
348         c1.put("bike", "Centurion");
349
350
351         ch1_repl.connect("bla");
352         c1_repl=new Cache(ch1_repl, "cache-1-repl");
353         boolean rc=ch1_repl.getState(null, 5000);
354         System.out.println("state transfer: " + rc);
355         Util.sleep(500);
356
357         System.out.println("c1_repl: " + c1_repl);
358         assertEquals("initial state should have been transferred", 4, c1_repl.size());
359
360         assertEquals(new Long JavaDoc(322649), c1.get("id"));
361         assertEquals(new Long JavaDoc(322649), c1_repl.get("id"));
362
363         assertEquals("biking", c1.get("hobbies"));
364         assertEquals("biking", c1_repl.get("hobbies"));
365
366         assertEquals("Centurion", c1.get("bike"));
367         assertEquals("Centurion", c1_repl.get("bike"));
368     }
369
370
371     public void testStateTransferWithTwoApplications() throws Exception JavaDoc {
372         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
373         ch1.connect("bla");
374         c1=new Cache(ch1, "cache-1");
375         assertEquals("cache has to be empty initially", 0, c1.size());
376
377         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
378         ch2.connect("bla");
379         c2=new Cache(ch2, "cache-2");
380         assertEquals("cache has to be empty initially", 0, c2.size());
381
382         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
383
384         ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
385
386
387         c1.put("name", "cache-1");
388         c2.put("name", "cache-2");
389
390         ch1_repl.connect("bla");
391         c1_repl=new Cache(ch1_repl, "cache-1-repl");
392         boolean rc=ch1_repl.getState(null, 5000);
393         System.out.println("state transfer: " + rc);
394
395         ch2_repl.connect("bla");
396         c2_repl=new Cache(ch2_repl, "cache-2-repl");
397         rc=ch2_repl.getState(null, 5000);
398         System.out.println("state transfer: " + rc);
399         Util.sleep(500);
400
401         System.out.println("Caches after state transfers:");
402         System.out.println("c1: " + c1);
403         System.out.println("c1_repl: " + c1_repl);
404         System.out.println("c2: " + c2);
405         System.out.println("c2_repl: " + c2_repl);
406
407         assertEquals(1, c1.size());
408         assertEquals(1, c1_repl.size());
409
410         assertEquals(1, c2.size());
411         assertEquals(1, c2_repl.size());
412
413         assertEquals("cache-1", c1.get("name"));
414         assertEquals("cache-1", c1_repl.get("name"));
415
416         assertEquals("cache-2", c2.get("name"));
417         assertEquals("cache-2", c2_repl.get("name"));
418     }
419
420
421     public void testStateTransferWithRegistration() throws Exception JavaDoc {
422         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
423         ch1.connect("bla");
424         c1=new Cache(ch1, "cache-1");
425         assertEquals("cache has to be empty initially", 0, c1.size());
426
427         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
428         ch2.connect("bla");
429         c2=new Cache(ch2, "cache-2");
430         assertEquals("cache has to be empty initially", 0, c2.size());
431         c1.put("name", "cache-1");
432         c2.put("name", "cache-2");
433
434         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1", true, null); // register for state transfer
435
ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2", true, null); // register for state transfer
436

437         ch1_repl.connect("bla");
438         c1_repl=new Cache(ch1_repl, "cache-1-repl");
439         boolean rc=ch1_repl.getState(null, 5000); // this will *not* trigger the state transfer protocol
440
System.out.println("state transfer: " + rc);
441
442         ch2_repl.connect("bla");
443         c2_repl=new Cache(ch2_repl, "cache-2-repl");
444         rc=ch2_repl.getState(null, 5000); // only *this* will trigger the state transfer
445
System.out.println("state transfer: " + rc);
446         Util.sleep(500);
447
448         System.out.println("Caches after state transfers:");
449         System.out.println("c1: " + c1);
450         System.out.println("c1_repl: " + c1_repl);
451         System.out.println("c2: " + c2);
452         System.out.println("c2_repl: " + c2_repl);
453
454         assertEquals(1, c1.size());
455         assertEquals(1, c1_repl.size());
456
457         assertEquals(1, c2.size());
458         assertEquals(1, c2_repl.size());
459
460         assertEquals("cache-1", c1.get("name"));
461         assertEquals("cache-1", c1_repl.get("name"));
462
463         assertEquals("cache-2", c2.get("name"));
464         assertEquals("cache-2", c2_repl.get("name"));
465         c1.clear();
466         c1_repl.clear();
467         c2.clear();
468         c2_repl.clear();
469     }
470
471
472     private void setCorrectPortRange(Channel ch) {
473         ProtocolStack stack=((MuxChannel)ch).getProtocolStack();
474         Protocol tcpping=stack.findProtocol("TCPPING");
475         if(tcpping == null)
476             return;
477
478         Properties props=tcpping.getProperties();
479         String JavaDoc port_range=props.getProperty("port_range");
480         if(port_range != null) {
481             System.out.println("port_range in TCPPING: " + port_range + ", setting it to 2");
482             port_range="2";
483             Properties p=new Properties();
484             // p.putAll(props);
485
p.setProperty("port_range", port_range);
486             tcpping.setProperties(p);
487         }
488     }
489
490
491     public void testStateTransferWithReconnect() throws Exception JavaDoc {
492         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
493         setCorrectPortRange(ch1);
494
495         assertTrue(ch1.isOpen());
496         assertFalse(ch1.isConnected());
497         ch1.connect("bla");
498         assertTrue(ch1.isOpen());
499         assertTrue(ch1.isConnected());
500         assertServiceAndClusterView(ch1, 1, 1);
501
502         c1=new Cache(ch1, "cache-1");
503         assertEquals("cache has to be empty initially", 0, c1.size());
504
505         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
506         setCorrectPortRange(ch1_repl);
507         assertTrue(ch1_repl.isOpen());
508         assertFalse(ch1_repl.isConnected());
509
510         c1.put("name", "Bela");
511         c1.put("id", new Long JavaDoc(322649));
512         c1.put("hobbies", "biking");
513         c1.put("bike", "Centurion");
514
515         ch1_repl.connect("bla");
516         assertTrue(ch1_repl.isOpen());
517         assertTrue(ch1_repl.isConnected());
518         assertServiceAndClusterView(ch1_repl, 2, 2);
519         Util.sleep(500);
520         assertServiceAndClusterView(ch1, 2, 2);
521
522         c1_repl=new Cache(ch1_repl, "cache-1-repl");
523         boolean rc=ch1_repl.getState(null, 5000);
524         System.out.println("state transfer: " + rc);
525         Util.sleep(500);
526
527         System.out.println("c1_repl: " + c1_repl);
528         assertEquals("initial state should have been transferred", 4, c1_repl.size());
529         assertEquals(new Long JavaDoc(322649), c1.get("id"));
530         assertEquals(new Long JavaDoc(322649), c1_repl.get("id"));
531
532         assertEquals("biking", c1.get("hobbies"));
533         assertEquals("biking", c1_repl.get("hobbies"));
534
535         assertEquals("Centurion", c1.get("bike"));
536         assertEquals("Centurion", c1_repl.get("bike"));
537
538         ch1_repl.disconnect();
539         assertTrue(ch1_repl.isOpen());
540         assertFalse(ch1_repl.isConnected());
541         Util.sleep(1000);
542         assertServiceAndClusterView(ch1, 1, 1);
543
544         c1_repl.clear();
545
546         ch1_repl.connect("bla");
547         assertTrue(ch1_repl.isOpen());
548         assertTrue(ch1_repl.isConnected());
549         assertServiceAndClusterView(ch1_repl, 2, 2);
550         Util.sleep(300);
551         assertServiceAndClusterView(ch1, 2, 2);
552
553         assertEquals("cache has to be empty initially", 0, c1_repl.size());
554
555         rc=ch1_repl.getState(null, 5000);
556         System.out.println("state transfer: " + rc);
557         Util.sleep(500);
558
559         System.out.println("c1_repl: " + c1_repl);
560         assertEquals("initial state should have been transferred", 4, c1_repl.size());
561
562         assertEquals(new Long JavaDoc(322649), c1.get("id"));
563         assertEquals(new Long JavaDoc(322649), c1_repl.get("id"));
564
565         assertEquals("biking", c1.get("hobbies"));
566         assertEquals("biking", c1_repl.get("hobbies"));
567
568         assertEquals("Centurion", c1.get("bike"));
569         assertEquals("Centurion", c1_repl.get("bike"));
570
571         // Now see what happens if we reconnect the first channel
572
// But first, add another MuxChannel on that JChannel
573
// just so it remains coordinator (test that it doesn't
574
// ask for state from itself)
575
ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
576         setCorrectPortRange(ch2);
577         assertTrue(ch2.isOpen());
578         assertFalse(ch2.isConnected());
579         assertServiceAndClusterView(ch1, 2, 2);
580         assertServiceAndClusterView(ch1_repl, 2, 2);
581
582
583         ch1.disconnect();
584         //sleep a bit and thus let asynch VIEW to propagate to other channel
585
Util.sleep(500);
586         assertTrue(ch1.isOpen());
587         assertFalse(ch1.isConnected());
588         assertServiceAndClusterView(ch1_repl, 1, 1);
589         assertTrue(ch2.isOpen());
590         assertFalse(ch2.isConnected());
591
592         c1.clear();
593
594         ch1.connect("bla");
595         assertTrue(ch1.isOpen());
596         assertTrue(ch1.isConnected());
597         assertServiceAndClusterView(ch1, 2, 2);
598         Util.sleep(500);
599         assertServiceAndClusterView(ch1_repl, 2, 2);
600         assertTrue(ch2.isOpen());
601         assertFalse(ch2.isConnected());
602
603         assertEquals("cache has to be empty initially", 0, c1.size());
604
605         rc=ch1.getState(null, 5000);
606         System.out.println("state transfer: " + rc);
607         Util.sleep(500);
608
609         System.out.println("c1: " + c1);
610         assertEquals("initial state should have been transferred", 4, c1.size());
611
612         assertEquals(new Long JavaDoc(322649), c1.get("id"));
613         assertEquals(new Long JavaDoc(322649), c1_repl.get("id"));
614
615         assertEquals("biking", c1.get("hobbies"));
616         assertEquals("biking", c1_repl.get("hobbies"));
617
618         assertEquals("Centurion", c1.get("bike"));
619         assertEquals("Centurion", c1_repl.get("bike"));
620     }
621
622
623     private void assertServiceAndClusterView(Channel ch, int num_service_view_mbrs, int num_cluster_view_mbrs) {
624         View service_view, cluster_view;
625         service_view=ch.getView();
626         cluster_view=((MuxChannel)ch).getClusterView();
627
628         String JavaDoc msg="cluster view=" + cluster_view + ", service view=" + service_view;
629
630         assertNotNull(service_view);
631         assertNotNull(cluster_view);
632
633         assertEquals(msg, num_service_view_mbrs, service_view.size());
634         assertEquals(msg, num_cluster_view_mbrs, cluster_view.size());
635     }
636
637
638     public void testStateTransferFromSelfWithRegularChannel() throws Exception JavaDoc {
639         JChannel ch=new JChannel();
640         ch.connect("X");
641         try {
642             boolean rc=ch.getState(null, 2000);
643             assertFalse("getState() on singleton should return false", rc);
644         }
645         finally {
646             ch.close();
647         }
648     }
649
650     public void testStateTransferFromSelf() throws Exception JavaDoc {
651         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
652         ch1.connect("bla");
653         boolean rc=ch1.getState(null, 2000);
654         assertFalse("getState() on singleton should return false", rc);
655         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
656         ch2.connect("foo");
657         rc=ch2.getState(null, 2000);
658         assertFalse("getState() on singleton should return false", rc);
659     }
660
661
662     public void testAdditionalData() throws Exception JavaDoc {
663         byte[] additional_data=new byte[]{'b', 'e', 'l', 'a'};
664         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
665         Map m=new HashMap(1);
666         m.put("additional_data", additional_data);
667         ch1.down(new Event(Event.CONFIG, m));
668         ch1.connect("bla");
669         IpAddress local_addr=(IpAddress)ch1.getLocalAddress();
670         assertNotNull(local_addr);
671         byte[] tmp=local_addr.getAdditionalData();
672         assertNotNull(tmp);
673         assertEquals(tmp, additional_data);
674
675         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
676         ch2.connect("foo");
677         local_addr=(IpAddress)ch2.getLocalAddress();
678         assertNotNull(local_addr);
679         tmp=local_addr.getAdditionalData();
680         assertNotNull(tmp);
681         assertEquals(tmp, additional_data);
682     }
683
684     public void testAdditionalData2() throws Exception JavaDoc {
685         byte[] additional_data=new byte[]{'b', 'e', 'l', 'a'};
686         byte[] additional_data2=new byte[]{'m', 'i', 'c', 'h', 'i'};
687         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
688         ch1.connect("bla");
689         IpAddress local_addr=(IpAddress)ch1.getLocalAddress();
690         assertNotNull(local_addr);
691         byte[] tmp=local_addr.getAdditionalData();
692         assertNull(tmp);
693
694         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
695         Map m=new HashMap(1);
696         m.put("additional_data", additional_data);
697         ch2.down(new Event(Event.CONFIG, m));
698         ch2.connect("foo");
699         local_addr=(IpAddress)ch2.getLocalAddress();
700         assertNotNull(local_addr);
701         tmp=local_addr.getAdditionalData();
702         assertNotNull(tmp);
703         assertEquals(tmp, additional_data);
704
705         local_addr=(IpAddress)ch1.getLocalAddress();
706         assertNotNull(local_addr);
707         tmp=local_addr.getAdditionalData();
708         assertNotNull(tmp);
709         assertEquals(tmp, additional_data);
710
711         m.clear();
712         m.put("additional_data", additional_data2);
713         ch2.down(new Event(Event.CONFIG, m));
714         local_addr=(IpAddress)ch2.getLocalAddress();
715         assertNotNull(local_addr);
716         tmp=local_addr.getAdditionalData();
717         assertNotNull(tmp);
718         assertEquals(tmp, additional_data2);
719         assertFalse(Arrays.equals(tmp, additional_data));
720     }
721
722     public void testGetSubstates() throws Exception JavaDoc {
723         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
724         ch1.connect("bla");
725         c1=new ExtendedCache(ch1, "cache-1");
726         assertEquals("cache has to be empty initially", 0, c1.size());
727
728         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
729         ch2.connect("bla");
730         c2=new ExtendedCache(ch2, "cache-2");
731         assertEquals("cache has to be empty initially", 0, c2.size());
732
733         for(int i=0; i < 10; i++) {
734             c1.put(new Integer JavaDoc(i), new Integer JavaDoc(i));
735             c2.put(new Integer JavaDoc(i), new Integer JavaDoc(i));
736         }
737
738         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
739         ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
740         ch1_repl.connect("bla");
741         c1_repl=new ExtendedCache(ch1_repl, "cache-1-repl");
742         boolean rc=ch1_repl.getState(null, "odd", 5000);
743         System.out.println("state transfer: " + rc);
744
745         ch2_repl.connect("bla");
746         c2_repl=new ExtendedCache(ch2_repl, "cache-2-repl");
747         rc=ch2_repl.getState(null, "even", 5000);
748         System.out.println("state transfer: " + rc);
749         Util.sleep(500);
750
751         System.out.println("Caches after state transfers:");
752         System.out.println("c1: " + c1);
753         System.out.println("c2: " + c2);
754
755         System.out.println("c1_repl (removed odd substate): " + c1_repl);
756         System.out.println("c2_repl (removed even substate): " + c2_repl);
757
758         assertEquals(5, c1_repl.size());
759         assertEquals(5, c2_repl.size());
760
761         _testEvenNumbersPresent(c1_repl);
762         _testOddNumbersPresent(c2_repl);
763     }
764
765     private void _testEvenNumbersPresent(Cache c) {
766         Integer JavaDoc[] evens=new Integer JavaDoc[]{new Integer JavaDoc(0), new Integer JavaDoc(2), new Integer JavaDoc(4), new Integer JavaDoc(6), new Integer JavaDoc(8)};
767         _testNumbersPresent(c, evens);
768
769     }
770
771     private void _testOddNumbersPresent(Cache c) {
772         Integer JavaDoc[] odds=new Integer JavaDoc[]{new Integer JavaDoc(1), new Integer JavaDoc(3), new Integer JavaDoc(5), new Integer JavaDoc(7), new Integer JavaDoc(9)};
773         _testNumbersPresent(c, odds);
774     }
775
776     private void _testNumbersPresent(Cache c, Integer JavaDoc[] numbers) {
777         int len=numbers.length;
778         assertEquals(len, c.size());
779         for(int i=0; i < numbers.length; i++) {
780             Integer JavaDoc number=numbers[i];
781             assertEquals(number, c.get(number));
782         }
783     }
784
785
786
787     public void testGetSubstatesMultipleTimes() throws Exception JavaDoc {
788         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
789         ch1.connect("bla");
790         c1=new ExtendedCache(ch1, "cache-1");
791         assertEquals("cache has to be empty initially", 0, c1.size());
792
793         ch2=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
794         ch2.connect("bla");
795         c2=new ExtendedCache(ch2, "cache-2");
796         assertEquals("cache has to be empty initially", 0, c2.size());
797
798         for(int i=0; i < 10; i++) {
799             c1.put(new Integer JavaDoc(i), new Integer JavaDoc(i));
800             c2.put(new Integer JavaDoc(i), new Integer JavaDoc(i));
801         }
802
803         ch1_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
804         ch2_repl=factory2.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
805         ch1_repl.connect("bla");
806         c1_repl=new ExtendedCache(ch1_repl, "cache-1-repl");
807         boolean rc=ch1_repl.getState(null, "odd", 5000);
808         System.out.println("state transfer: " + rc);
809
810         ch2_repl.connect("bla");
811         c2_repl=new ExtendedCache(ch2_repl, "cache-2-repl");
812         rc=ch2_repl.getState(null, "even", 5000);
813         System.out.println("state transfer: " + rc);
814         Util.sleep(500);
815         _testOddNumbersPresent(c2_repl);
816
817         System.out.println("Caches after state transfers:");
818         System.out.println("c1: " + c1);
819         System.out.println("c2: " + c2);
820         System.out.println("c1_repl (removed odd substate): " + c1_repl);
821         System.out.println("c2_repl (removed even substate): " + c2_repl);
822
823         assertEquals(5, c2_repl.size());
824         rc=ch2_repl.getState(null, "odd", 5000);
825         Util.sleep(500);
826         System.out.println("c2_repl (removed odd substate): " + c2_repl);
827         _testEvenNumbersPresent(c2_repl);
828
829         assertEquals(5, c2_repl.size());
830         rc=ch2_repl.getState(null, "even", 5000);
831         Util.sleep(500);
832         System.out.println("c2_repl (removed even substate): " + c2_repl);
833         _testOddNumbersPresent(c2_repl);
834
835         assertEquals(5, c2_repl.size());
836         rc=ch2_repl.getState(null, "odd", 5000);
837         Util.sleep(500);
838         System.out.println("c2_repl (removed odd substate): " + c2_repl);
839         _testEvenNumbersPresent(c2_repl);
840     }
841
842
843     public void testOrdering() throws Exception JavaDoc {
844         final int NUM=100;
845         ch1=factory.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
846         ch1.connect("bla");
847         MyReceiver receiver=new MyReceiver(NUM);
848         ch1.setReceiver(receiver);
849         for(int i=1; i <= NUM; i++) {
850             ch1.send(new Message(null, null, new Integer JavaDoc(i)));
851             System.out.println("-- sent " + i);
852         }
853
854         receiver.waitForCompletion();
855
856         List<Integer JavaDoc> nums=receiver.getNums();
857         checkMonotonicallyIncreasingNumbers(nums);
858         System.out.println(NUM + " messages were received in the correct order");
859     }
860
861     private static void checkMonotonicallyIncreasingNumbers(List<Integer JavaDoc> nums) {
862         int current=-1;
863         for(int num: nums) {
864             if(current < 0) {
865                 current=num;
866             }
867             else {
868                 assertEquals("list is " + nums, ++current, num);
869             }
870         }
871     }
872
873
874     private static class MyReceiver extends ReceiverAdapter {
875         final List<Integer JavaDoc> nums=new LinkedList<Integer JavaDoc>();
876         final int expected;
877
878         public MyReceiver(int expected) {
879             this.expected=expected;
880         }
881
882         public List<Integer JavaDoc> getNums() {
883             return nums;
884         }
885
886         public void waitForCompletion() throws InterruptedException JavaDoc {
887             synchronized(nums) {
888                 while(nums.size() < expected) {
889                     nums.wait();
890                 }
891             }
892         }
893
894         public void receive(Message msg) {
895             Util.sleepRandom(100);
896             Integer JavaDoc num=(Integer JavaDoc)msg.getObject();
897             synchronized(nums) {
898                 System.out.println("-- received " + num);
899                 nums.add(num);
900                 if(nums.size() >= expected) {
901                     nums.notifyAll();
902                 }
903             }
904             Util.sleepRandom(100);
905         }
906     }
907
908     
909     public static Test suite() {
910         return new TestSuite(MultiplexerTest.class);
911     }
912
913     public static void main(String JavaDoc[] args) {
914         junit.textui.TestRunner.run(MultiplexerTest.suite());
915     }
916
917     private static class Cache extends ExtendedReceiverAdapter {
918         protected final Map data ;
919         Channel ch;
920         String JavaDoc name;
921
922         public Cache(Channel ch, String JavaDoc name) {
923             this.data=new TreeMap();
924             this.ch=ch;
925             this.name=name;
926             this.ch.setReceiver(this);
927         }
928
929         protected Object JavaDoc get(Object JavaDoc key) {
930             synchronized(data) {
931                 return data.get(key);
932             }
933         }
934
935         protected void put(Object JavaDoc key, Object JavaDoc val) throws Exception JavaDoc {
936             Object JavaDoc[] buf=new Object JavaDoc[2];
937             buf[0]=key; buf[1]=val;
938             synchronized(data) {
939                 data.put(key, val);
940             }
941             Message msg=new Message(null, null, buf);
942             ch.send(msg);
943         }
944
945         protected int size() {
946             synchronized(data) {
947                 return data.size();
948             }
949         }
950
951
952         public void receive(Message msg) {
953             if(ch.getLocalAddress().equals(msg.getSrc()))
954                 return;
955             Object JavaDoc[] modification=(Object JavaDoc[])msg.getObject();
956             Object JavaDoc key=modification[0];
957             Object JavaDoc val=modification[1];
958             synchronized(data) {
959                 // System.out.println("****** [" + name + "] received PUT(" + key + ", " + val + ") " + " from " + msg.getSrc() + " *******");
960
data.put(key,val);
961             }
962         }
963
964         public byte[] getState() {
965             byte[] state=null;
966             synchronized(data) {
967                 try {
968                     state=Util.objectToByteBuffer(data);
969                 }
970                 catch(Exception JavaDoc e) {
971                     e.printStackTrace();
972                     return null;
973                 }
974             }
975             return state;
976         }
977
978         public byte[] getState(String JavaDoc state_id) {
979             return getState();
980         }
981
982
983         public void setState(byte[] state) {
984             Map m;
985             try {
986                 m=(Map)Util.objectFromByteBuffer(state);
987                 synchronized(data) {
988                     data.clear();
989                     data.putAll(m);
990                 }
991             }
992             catch(Exception JavaDoc e) {
993                 e.printStackTrace();
994             }
995         }
996
997         public void setState(String JavaDoc state_id, byte[] state) {
998             setState(state);
999         }
1000
1001        public void getState(OutputStream ostream){
1002            ObjectOutputStream oos = null;
1003            try{
1004               oos = new ObjectOutputStream(ostream);
1005               synchronized(data){
1006                  oos.writeObject(data);
1007               }
1008               oos.flush();
1009            }
1010            catch (IOException e){}
1011            finally{
1012               try{
1013                  if(oos != null)
1014                     oos.close();
1015               }
1016               catch (IOException e){
1017                  System.err.println(e);
1018               }
1019            }
1020        }
1021
1022        public void getState(String JavaDoc state_id, OutputStream ostream) {
1023           getState(ostream);
1024        }
1025
1026        public void setState(InputStream istream) {
1027           ObjectInputStream ois = null;
1028           try {
1029               ois = new ObjectInputStream(istream);
1030               Map m = (Map)ois.readObject();
1031               synchronized (data)
1032               {
1033                  data.clear();
1034                  data.putAll(m);
1035               }
1036
1037           } catch (Exception JavaDoc e) {}
1038           finally{
1039               try {
1040                   if(ois != null)
1041                      ois.close();
1042               } catch (IOException e) {
1043                   System.err.println(e);
1044               }
1045           }
1046        }
1047
1048        public void setState(String JavaDoc state_id, InputStream istream) {
1049           setState(istream);
1050        }
1051
1052        public void clear() {
1053            synchronized (data){
1054               data.clear();
1055            }
1056        }
1057
1058
1059        public void viewAccepted(View new_view) {
1060            log("view is " + new_view);
1061        }
1062
1063        public String JavaDoc toString() {
1064            synchronized(data){
1065                return data.toString();
1066            }
1067        }
1068
1069
1070        public String JavaDoc printKeys() {
1071            return data.keySet().toString();
1072        }
1073
1074        private void log(String JavaDoc msg) {
1075            System.out.println("-- [" + name + "] " + msg);
1076        }
1077
1078    }
1079
1080
1081    static class ExtendedCache extends Cache {
1082
1083        public ExtendedCache(Channel ch, String JavaDoc name) {
1084            super(ch, name);
1085        }
1086
1087
1088        public byte[] getState(String JavaDoc state_id) {
1089            Map copy=null;
1090            synchronized (data){
1091               copy=new HashMap(data);
1092            }
1093            for(Iterator it=copy.keySet().iterator(); it.hasNext();) {
1094                Integer JavaDoc key=(Integer JavaDoc)it.next();
1095                if(state_id.equals("odd") && key.intValue() % 2 != 0)
1096                    it.remove();
1097                else if(state_id.equals("even") && key.intValue() % 2 == 0)
1098                    it.remove();
1099            }
1100            try {
1101                return Util.objectToByteBuffer(copy);
1102            }
1103            catch(Exception JavaDoc e) {
1104                e.printStackTrace();
1105                return null;
1106            }
1107        }
1108
1109        public void getState(String JavaDoc state_id,OutputStream os) {
1110           Map copy=null;
1111           synchronized (data){
1112              copy=new HashMap(data);
1113           }
1114           for(Iterator it=copy.keySet().iterator(); it.hasNext();) {
1115               Integer JavaDoc key=(Integer JavaDoc)it.next();
1116               if(state_id.equals("odd") && key.intValue() % 2 != 0)
1117                   it.remove();
1118               else if(state_id.equals("even") && key.intValue() % 2 == 0)
1119                   it.remove();
1120           }
1121           ObjectOutputStream oos = null;
1122           try {
1123               oos=new ObjectOutputStream(os);
1124               oos.writeObject(copy);
1125               oos.flush();
1126           }
1127           catch (IOException e){}
1128           finally{
1129              try{
1130                 if(oos != null)
1131                    oos.close();
1132              }
1133              catch (IOException e){
1134                 System.err.println(e);
1135              }
1136           }
1137        }
1138
1139        public void setState(String JavaDoc state_id, InputStream is){
1140           setState(is);
1141        }
1142
1143        public void setState(String JavaDoc state_id, byte[] state) {
1144            setState(state);
1145        }
1146
1147        public String JavaDoc toString() {
1148            synchronized(data) {
1149                Set keys=new TreeSet(data.keySet());
1150                StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
1151                for(Iterator it=keys.iterator(); it.hasNext();) {
1152                    Object JavaDoc o=it.next();
1153                    sb.append(o).append("=").append(data.get(o)).append(" ");
1154                }
1155                return sb.toString();
1156            }
1157        }
1158    }
1159
1160}
1161
Popular Tags