1 3 package org.jgroups.blocks; 4 5 import junit.framework.Test; 6 import junit.framework.TestCase; 7 import junit.framework.TestSuite; 8 import org.apache.log4j.Logger; 9 import org.jgroups.JChannel; 10 import org.jgroups.util.RspList; 11 import org.jgroups.util.Util; 12 13 import java.util.Vector ; 14 15 26 public class RpcDispatcherTotalTokenTest extends TestCase 27 { 28 String props = null; 29 30 final int NUM_ITEMS = 10; 31 static Logger logger = Logger.getLogger(RpcDispatcherTotalTokenTest.class.getName()); 32 33 public RpcDispatcherTotalTokenTest(String testName) 34 { 35 super(testName); 36 } 37 38 public class DispatcherTask implements Runnable 39 { 40 protected String taskName; 41 protected boolean finished; 42 JChannel channel; 43 RpcDispatcher disp; 44 RspList rsp_list; 45 int base; 46 boolean bTest; 47 48 public DispatcherTask(String taskName, String jgProps) throws Exception 49 { 50 this.taskName = taskName; 51 this.base = 0; 52 finished = false; 53 bTest = true; 54 channel = new JChannel(jgProps); 55 disp = new RpcDispatcher(channel, null, null, this); 56 channel.connect("RpcDispatcherTestGroup"); 57 } 58 59 public synchronized int print(String tag) throws Exception 60 { 61 return base++; 62 } 63 64 public JChannel getChannel() 65 { 66 return channel; 67 } 68 69 public boolean finished() 70 { 71 return finished; 72 } 73 74 public boolean result() 75 { 76 return bTest; 77 } 78 79 protected boolean checkResult(RspList rsp) 80 { 81 boolean result = true; 82 Vector results = rsp.getResults(); 83 Object retval = null; 84 if (results.size() > 0 && rsp.numSuspectedMembers() == 0) 85 { 86 retval = results.elementAt(0); 87 for (int i = 1; i < results.size(); i++) 88 { 89 Object data = results.elementAt(i); 90 boolean test = data.equals(retval); 91 if (!test) 92 { 93 logger.error( 94 "Task " 95 + taskName 96 + ":Reference value differs from returned value " 97 + retval 98 + " != " 99 + data); 100 result = false; 101 } 102 } 103 } 104 return result; 105 } 106 107 public void run() 108 { 109 logger.debug("Task " + taskName + " View is :" + channel.getView()); 110 111 for (int i = 0; i < NUM_ITEMS && bTest; i++) 112 { 113 Util.sleep(100); 114 115 try 116 { 117 MethodCall call = 118 new MethodCall( 119 "print", 120 new Object [] { taskName + '_' + i}, 121 new String [] { String .class.getName()}); 122 rsp_list = disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, 0); 123 logger.debug("Task " + taskName + " Responses: " + rsp_list); 124 bTest = checkResult(rsp_list); 125 } 126 catch (Exception ex) 127 { 128 logger.error("Unexpected error", ex); 129 } 130 131 } 132 133 finished = true; 134 } 135 136 public void close() 137 { 138 try 139 { 140 logger.debug("Stopping dispatcher"); 141 disp.stop(); 142 logger.debug("Stopping dispatcher: -- done"); 143 144 Util.sleep(200); 145 logger.debug("Closing channel"); 146 channel.close(); 147 logger.debug("Closing channel: -- done"); 148 } 149 catch (Exception e) 150 { 151 logger.error("Unexpected error", e); 152 } 153 } 154 } 155 156 public void setUp() throws Exception 157 { 158 props = 159 "UDP(mcast_recv_buf_size=80000;mcast_send_buf_size=150000;mcast_port=45566;" 160 + "mcast_addr=228.8.8.8;ip_ttl=32):" 161 + "PING(timeout=2000;num_initial_members=3):" 162 + "FD_SOCK:" 163 + "VERIFY_SUSPECT(timeout=1500):" 164 + "UNICAST(timeout=600,1200,2000,2500):" 165 + "FRAG(frag_size=8096;down_thread=false;up_thread=false):" 166 + "TOTAL_TOKEN(unblock_sending=10;block_sending=50):" 167 + "pbcast.GMS(print_local_addr=true;join_timeout=3000;join_retry_timeout=2000;shun=true):" 168 + "STATE_TRANSFER:" 169 + "QUEUE"; 170 } 171 172 public void tearDown() throws Exception 173 { 174 } 175 176 public void test() throws Exception 177 { 178 DispatcherTask t1 = new DispatcherTask("Task1", props); 179 DispatcherTask t2 = new DispatcherTask("Task2", props); 180 DispatcherTask t3 = new DispatcherTask("Task3", props); 181 182 Thread rTask1 = new Thread (t1); 183 Thread rTask2 = new Thread (t2); 184 Thread rTask3 = new Thread (t3); 185 186 TotalTokenProtocolObserver po1 = new TotalTokenProtocolObserver(t1.getChannel()); 187 TotalTokenProtocolObserver po2 = new TotalTokenProtocolObserver(t2.getChannel()); 188 TotalTokenProtocolObserver po3 = new TotalTokenProtocolObserver(t3.getChannel()); 189 190 Util.sleep(1000); 191 192 rTask1.start(); 193 rTask2.start(); 194 rTask3.start(); 195 196 while (!t1.finished() || !t2.finished() || !t3.finished()) 197 { 198 Util.sleep(1000); 199 } 200 201 t1.close(); 202 t2.close(); 203 t3.close(); 204 205 assertTrue(t1.result()); 206 assertTrue(t2.result()); 207 assertTrue(t3.result()); 208 } 209 210 public static Test suite() 211 { 212 return new TestSuite(RpcDispatcherTotalTokenTest.class); 213 } 214 215 public static void main(String [] args) 216 { 217 junit.textui.TestRunner.run(suite()); 218 } 219 220 } 221 | Popular Tags |