1 package org.jgroups.tests; 2 3 import java.io.ByteArrayOutputStream ; 4 import java.io.InputStream ; 5 import java.io.IOException ; 6 import java.io.OutputStream ; 7 8 import junit.framework.TestCase; 9 import org.jgroups.Channel; 10 import org.jgroups.ExtendedMessageListener; 11 import org.jgroups.JChannelFactory; 12 import org.jgroups.Message; 13 import org.jgroups.View; 14 import org.jgroups.blocks.RpcDispatcher; 15 import org.jgroups.stack.GossipRouter; 16 import org.jgroups.util.Util; 17 18 23 public class MultiplexerMergeTest extends TestCase { 24 private static final String STACK_FILE = "stacks.xml"; 26 private static final String STACK_NAME = "tunnel"; 27 private static final int ROUTER_PORT = 12001; 29 private static final String BIND_ADDR = "127.0.0.1"; 30 31 private JChannelFactory factory; 32 private JChannelFactory factory2; 33 private Channel ch1; 34 private Channel ch2; 35 private GossipRouter router; 36 private RpcDispatcher dispatcher1; 37 private RpcDispatcher dispatcher2; 38 39 public MultiplexerMergeTest(String name) { 40 super(name); 41 } 42 43 protected void setUp() throws Exception { 44 super.setUp(); 45 46 factory = new JChannelFactory(); 47 factory.setMultiplexerConfig(STACK_FILE); 48 49 factory2 = new JChannelFactory(); 50 factory2.setMultiplexerConfig(STACK_FILE); 51 52 startRouter(); 53 54 ch1 = factory.createMultiplexerChannel(STACK_NAME, "foo"); 55 dispatcher1 = new RpcDispatcher(ch1, null, null, new Object (), false); 56 dispatcher1.setMessageListener(new MessageListenerAdaptor("listener1", "client1 initial state")); 57 ch1.connect("bla"); 58 ch1.getState(null, 10000); 59 60 ch2 = factory2.createMultiplexerChannel(STACK_NAME, "foo"); 61 dispatcher2 = new RpcDispatcher(ch2, null, null, new Object (), false); 62 dispatcher2.setMessageListener(new MessageListenerAdaptor("listener2", "client2 initial state")); 63 ch2.connect("bla"); 64 boolean rc = ch2.getState(null, 10000); 65 67 System.out.println("sleeping for 5 seconds"); 68 Util.sleep(5000); 69 } 70 71 public void tearDown() throws Exception { 72 super.tearDown(); 73 ch2.close(); 74 ch1.close(); 75 stopRouter(); 76 } 77 78 public void testPartitionAndSubsequentMerge() throws Exception { 79 partitionAndMerge(); 80 } 81 82 private void partitionAndMerge() throws Exception { 83 View v = ch2.getView(); 84 System.out.println("ch2 view is " + v); 85 assertEquals("channel2 should have 2 members", 2, ch2.getView().size()); 86 87 System.out.println("++ simulating network partition by stopping the GossipRouter"); 88 stopRouter(); 89 90 System.out.println("sleeping for 20 seconds"); 91 Util.sleep(20000); 92 93 v = ch1.getView(); 94 System.out.println("-- ch1.view: " + v); 95 v = ch2.getView(); 96 System.out.println("-- ch2.view: " + v); 97 98 assertEquals("channel2 should have 1 member (channels should have excluded each other)", 1, v.size()); 99 100 System.out.println("++ simulating merge by starting the GossipRouter again"); 101 router.start(); 102 103 System.out.println("sleeping for 30 seconds"); 104 Util.sleep(30000); 105 106 v = ch1.getView(); 107 System.out.println("-- ch1.view: " + v); 108 v = ch2.getView(); 109 System.out.println("-- ch2.view: " + v); 110 111 assertEquals("channel2 is supposed to have 2 members again after merge", 2, ch2.getView().size()); 112 } 113 114 private void startRouter() throws Exception { 115 router = new GossipRouter(ROUTER_PORT, BIND_ADDR); 116 router.start(); 117 } 118 119 private void stopRouter() { 120 router.stop(); 121 } 122 123 private static final class MessageListenerAdaptor implements ExtendedMessageListener { 124 private String m_name; 125 private byte[] m_state = null; 126 127 MessageListenerAdaptor(String name, String state) { 128 m_name = name; 129 if (state != null) 130 m_state = state.getBytes(); 131 } 132 133 public void receive(Message msg) { 134 System.out.println(m_name + " MultiplexerMergeTest.receive() - not implemented"); 135 } 136 137 public byte[] getState() { 138 System.out.println(m_name + " MultiplexerMergeTest.getState() - returning byte[] state = " + new String (m_state)); 139 return m_state; 140 } 141 142 public void setState(byte[] state) { 143 System.out.println(m_name + " MultiplexerMergeTest.setState(byte[]) - setting state = " + new String (state)); 144 m_state = state; 145 } 146 147 public void setState(InputStream is) { 148 m_state = getInputStreamBytes(is); 149 try 150 { 151 is.close(); 152 } 153 catch(Exception e){ 154 System.out.println(m_name + " MultiplexerMergeTest.setState(InputStream): " + e.toString()); 155 } 156 System.out.println(m_name + " MultiplexerMergeTest.setState(InputStream) - setting stream state = " + new String (m_state)); 157 } 158 159 public void getState(OutputStream os) { 160 System.out.println(m_name + " MultiplexerMergeTest.getState(OutputStream) returning stream state = " + new String (m_state)); 161 try { 162 os.write(m_state); 163 os.flush(); 164 os.close(); 165 } catch (IOException e) { 166 System.out.println(m_name + " MultiplexerMergeTest.getState(OutputStream) failed: " + e.toString()); 167 } 168 } 169 170 public byte[] getState(String state_id) { 171 System.out.println(m_name + " MultiplexerMergeTest.getState(String) - not implemented"); 172 return null; 173 } 174 175 public void getState(String state_id, OutputStream os) { 176 System.out.println(m_name + " MultiplexerMergeTest.getState(String, InputStream) - not implemented"); 177 } 178 179 public void setState(String state_id, byte[] state) { 180 System.out.println(m_name + " MultiplexerMergeTest.setState(String, byte[]) - not implemented"); 181 } 182 183 public void setState(String state_id, InputStream is) { 184 System.out.println(m_name + " MultiplexerMergeTest.setState(String, InputStream) - not implemented"); 185 } 186 } 187 188 private static byte[] getInputStreamBytes(InputStream is) { 189 byte[] b = null; 190 if ( is != null ) { 191 b = new byte[1024]; 192 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 193 try { 194 while ( true ) { 195 int bytes = is.read( b ); 196 if ( bytes == -1 ) { 197 break; 198 } 199 baos.write(b, 0, bytes); 200 } 201 } 202 catch ( Exception e ) { 203 e.printStackTrace(); 204 } 205 finally { 206 try { 207 b = baos.toByteArray(); 208 baos.close(); 209 } 210 catch ( Exception e ) { 211 e.printStackTrace(); 212 } 213 } 214 } 215 return b; 216 } 217 218 public static void main(String [] args) { 219 String [] testCaseName={MultiplexerMergeTest.class.getName()}; 220 junit.textui.TestRunner.main(testCaseName); 221 } 222 223 } 224 225 | Popular Tags |