1 2 package org.jgroups.tests; 3 4 5 import junit.framework.TestCase; 6 import org.jgroups.*; 7 import org.jgroups.util.Util; 8 import org.jgroups.stack.GossipRouter; 9 10 import java.util.Vector ; 11 import java.util.List ; 12 import java.util.LinkedList ; 13 14 15 20 public class MergeTest extends TestCase { 21 JChannel channel; 22 static final int TIMES=10; 23 static final int router_port=12001; 24 static final String bind_addr="127.0.0.1"; 25 GossipRouter router; 26 JChannel ch1, ch2; 27 private ViewChecker checker1, checker2; 28 private static final int NUM_MCASTS=5; 29 private static final int NUM_UCASTS=10; 30 private static final long WAIT_TIME=2000L; 31 32 String props="tunnel.xml"; 33 34 35 36 public MergeTest(String name) { 37 super(name); 38 } 39 40 protected void setUp() throws Exception { 41 super.setUp(); 42 startRouter(); 43 ch1=new JChannel(props); 44 checker1=new ViewChecker(ch1); 45 ch1.setReceiver(checker1); 46 ch1.connect("demo"); 47 ch2=new JChannel(props); 48 checker2=new ViewChecker(ch2); 49 ch2.setReceiver(checker2); 50 ch2.connect("demo"); 51 Util.sleep(1000); 52 } 53 54 public void tearDown() throws Exception { 55 super.tearDown(); 56 ch2.close(); 57 ch1.close(); 58 stopRouter(); 59 } 60 61 public void testPartitionAndSubsequentMerge() throws Exception { 62 partitionAndMerge(); 63 } 64 65 66 public void testTwoMerges() throws Exception { 67 partitionAndMerge(); 68 partitionAndMerge(); 69 } 70 71 72 73 private void partitionAndMerge() throws Exception { 74 View v=ch2.getView(); 75 System.out.println("view is " + v); 76 assertEquals("channel is supposed to have 2 members", 2, ch2.getView().size()); 77 78 System.out.println("sending " + NUM_MCASTS + " multicast messages"); 79 for(int i=0; i < NUM_MCASTS; i++) { 80 Message msg=new Message(); 81 ch1.send(msg); 82 } 83 System.out.println("sending " + NUM_UCASTS + " unicast messages to " + v.size() + " members"); 84 Vector <Address> mbrs=v.getMembers(); 85 for(Address mbr: mbrs) { 86 for(int i=0; i < NUM_UCASTS; i++) { 87 Channel ch=i % 2 == 0? ch1 : ch2; 88 ch.send(new Message(mbr)); 89 } 90 } 91 System.out.println("done, sleeping for " + WAIT_TIME + " time"); 92 Util.sleep(WAIT_TIME); 93 94 System.out.println("++ simulating network partition by stopping the GossipRouter"); 95 stopRouter(); 96 97 System.out.println("sleeping for 10 secs"); 98 checker1.waitForNViews(1, 10000); 99 checker2.waitForNViews(1, 10000); 100 v=ch1.getView(); 101 System.out.println("-- ch1.view: " + v); 102 103 v=ch2.getView(); 104 System.out.println("-- ch2.view: " + v); 105 assertEquals("view should be 1 (channels should have excluded each other): " + v, 1, v.size()); 106 107 System.out.println("++ simulating merge by starting the GossipRouter again"); 108 startRouter(); 109 110 System.out.println("sleeping for 30 secs"); 111 checker1.waitForNViews(1, 30000); 112 checker2.waitForNViews(1, 30000); 113 114 v=ch1.getView(); 115 System.out.println("-- ch1.view: " + v); 116 117 v=ch2.getView(); 118 System.out.println("-- ch2.view: " + v); 119 120 assertEquals("channel is supposed to have 2 members again after merge", 2, ch2.getView().size()); 121 } 122 123 124 125 126 127 128 private void startRouter() throws Exception { 129 router=new GossipRouter(router_port, bind_addr); 130 router.start(); 131 } 132 133 private void stopRouter() { 134 router.stop(); 135 } 136 137 private static class ViewChecker extends ReceiverAdapter { 138 final Object mutex=new Object (); 139 int count=0; 140 final Channel channel; 141 final List <View> views=new LinkedList <View>(); 142 143 144 public ViewChecker(Channel channel) { 145 this.channel=channel; 146 } 147 148 public void viewAccepted(View new_view) { 149 synchronized(mutex) { 150 count++; 151 View view=channel != null? channel.getView() : null; 152 views.add(view); 153 mutex.notifyAll(); 155 } 156 } 157 158 159 public void waitForNViews(int n, long timeout) { 160 long sleep_time=timeout, curr, start; 161 synchronized(mutex) { 162 views.clear(); 163 count=0; 164 start=System.currentTimeMillis(); 165 while(count < n) { 166 try {mutex.wait(sleep_time);} catch(InterruptedException e) {} 167 curr=System.currentTimeMillis(); 168 sleep_time-=(curr - start); 169 if(sleep_time <= 0) 170 break; 171 } 172 } 173 174 } 177 178 179 public void receive(Message msg) { 180 Address sender=msg.getSrc(), receiver=msg.getDest(); 181 boolean multicast=receiver == null || receiver.isMulticastAddress(); 182 System.out.println("[" + receiver + "]: received " + (multicast? " multicast " : " unicast ") + " message from " + sender); 183 } 184 } 185 186 187 public static void main(String [] args) { 188 String [] testCaseName={MergeTest.class.getName()}; 189 junit.textui.TestRunner.main(testCaseName); 190 } 191 192 193 } 194 | Popular Tags |