1 package org.jgroups.protocols; 3 4 import junit.framework.Test; 5 import junit.framework.TestCase; 6 import junit.framework.TestSuite; 7 import org.jgroups.Address; 8 import org.jgroups.Channel; 9 import org.jgroups.ChannelClosedException; 10 import org.jgroups.ChannelException; 11 import org.jgroups.ChannelListener; 12 import org.jgroups.ChannelNotConnectedException; 13 import org.jgroups.ExitEvent; 14 import org.jgroups.GetStateEvent; 15 import org.jgroups.JChannel; 16 import org.jgroups.SetStateEvent; 17 import org.jgroups.Message; 18 import org.jgroups.util.Util; 19 20 30 public class STATE_TRANSFER_Test extends TestCase { 31 32 public final static String CHANNEL_PROPS ="udp.xml"; 33 34 public static final String GROUP_NAME = "jgroups.TEST_GROUP"; 35 36 private Coordinator coord; 37 38 public STATE_TRANSFER_Test(String testName) { 39 super(testName); 40 } 41 42 protected void setUp() throws Exception { 43 super.setUp(); 44 45 System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.SimpleLog"); 46 System.setProperty("org.apache.commons.logging.simplelog.defaultlog", "error"); 47 48 coord = new Coordinator(); 49 coord.recvLoop(); 50 coord.sendLoop(); 51 } 52 53 protected void tearDown() throws Exception { 54 super.tearDown(); 55 56 coord.stop(); 57 coord = null; 58 } 59 60 static class Coordinator implements ChannelListener { 61 62 private JChannel channel = null; 63 private int cnt = 0; private volatile boolean closed = false; 65 66 protected Coordinator() throws ChannelException { 67 68 channel = new JChannel(CHANNEL_PROPS); 69 channel.setOpt(Channel.LOCAL, Boolean.FALSE); 70 channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); 71 channel.addChannelListener(this); 72 channel.connect(GROUP_NAME); 73 } 74 75 public void channelConnected(Channel channel) { 76 } 77 78 public void channelDisconnected(Channel channel) { 79 } 80 81 public void channelClosed(Channel channel) { 82 } 83 84 public void channelShunned() { 85 } 86 87 public void channelReconnected(Address addr) { } 89 90 public void recvLoop() throws Exception { 91 Thread task = new Thread (new Runnable () { 92 public void run() { 93 Object tmp; 94 while (! closed) { 95 try { 96 tmp = channel.receive(0); 97 if (tmp instanceof ExitEvent) { 98 System.err.println("-- received EXIT, waiting for ChannelReconnected callback"); 99 break; 100 } 101 if (tmp instanceof GetStateEvent) { 102 synchronized (Coordinator.this) { 103 System.err.println("-- GetStateEvent, cnt=" + cnt); 104 channel.returnState(Util.objectToByteBuffer(new Integer (cnt))); 105 } 106 } 107 } catch (ChannelNotConnectedException not) { 108 break; 109 } catch (ChannelClosedException closed) { 110 break; 111 } catch (Exception e) { 112 System.err.println(e); 113 } 114 } 115 } 116 }); 117 task.start(); 118 } 119 120 public void sendLoop() throws Exception { 121 Thread task = new Thread (new Runnable () { 122 123 public void run() { 124 while (! closed) { 125 try { 126 synchronized (Coordinator.this) { 127 channel.send(null, null, new Integer (++cnt)); 128 System.err.println("send cnt=" + cnt); 129 } 130 Thread.sleep(1000); 131 } catch (ChannelNotConnectedException not) { 132 break; 133 } catch (ChannelClosedException closed) { 134 break; 135 } catch (Exception e) { 136 System.err.println(e); 137 } 138 } 139 } 140 }); 141 task.start(); 142 } 143 144 public void stop() { 145 closed = true; 146 channel.close(); 147 } 148 } 149 150 public void testBasicStateSync() throws Exception { 151 152 Channel channel = new JChannel(CHANNEL_PROPS); 153 channel.setOpt(Channel.LOCAL, Boolean.FALSE); 154 155 channel.connect(GROUP_NAME); 156 157 Thread.sleep(1000); 158 159 boolean join = false; 160 join = channel.getState(null, 100000l); 161 assertTrue(join); 162 163 Object tmp; 164 int cnt = -1; 165 while (true) { 166 try { 167 tmp = channel.receive(0); 168 if (tmp instanceof ExitEvent) { 169 break; 170 } 171 if (tmp instanceof SetStateEvent) { 172 cnt = ((Integer ) Util.objectFromByteBuffer(((SetStateEvent) tmp).getArg())).intValue(); 173 System.err.println("-- SetStateEvent, cnt=" + cnt); 174 continue; 175 } 176 if ( tmp instanceof Message ) { 177 if (cnt != -1) { 178 int msg = ((Integer ) ((Message) tmp).getObject()).intValue(); 179 assertEquals(cnt, msg - 1); 180 break; } 182 } 183 } catch (ChannelNotConnectedException not) { 184 break; 185 } catch (ChannelClosedException closed) { 186 break; 187 } catch (Exception e) { 188 System.err.println(e); 189 } 190 } 191 192 channel.close(); 193 } 194 195 196 public static Test suite() { 197 return new TestSuite(STATE_TRANSFER_Test.class); 198 } 199 200 public static void main(String [] args) { 201 junit.textui.TestRunner.run(suite()); 202 } 203 204 } 205 | Popular Tags |