1 3 package org.jgroups.tests; 4 5 import org.jgroups.*; 6 import org.jgroups.util.Util; 7 8 import java.io.*; 9 import java.util.Vector ; 10 11 12 17 public class UnicastTest implements Runnable { 18 UnicastTest test; 19 JChannel channel; 20 final String groupname="UnicastTest-Group"; 21 Thread t=null; 22 long sleep_time=0; 23 boolean exit_on_end=false, busy_sleep=false; 24 25 26 public static class Data implements Externalizable { 27 public Data() { 28 } 29 30 public void writeExternal(ObjectOutput out) throws IOException { 31 } 32 33 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 34 } 35 } 36 37 public static class StartData extends Data { 38 long num_values=0; 39 40 public StartData() { 41 super(); 42 } 43 44 StartData(long num_values) { 45 this.num_values=num_values; 46 } 47 48 public void writeExternal(ObjectOutput out) throws IOException { 49 out.writeLong(num_values); 50 } 51 52 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 53 num_values=in.readLong(); 54 } 55 } 56 57 public static class Value extends Data { 58 long value=0; 59 60 public Value() { 61 super(); 62 } 63 64 Value(long value) { 65 this.value=value; 66 } 67 68 public void writeExternal(ObjectOutput out) throws IOException { 69 out.writeLong(value); 70 } 71 72 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 73 value=in.readLong(); 74 } 75 } 76 77 78 public void init(String props, long sleep_time, boolean exit_on_end, boolean busy_sleep) throws Exception { 79 this.sleep_time=sleep_time; 80 this.exit_on_end=exit_on_end; 81 this.busy_sleep=busy_sleep; 82 channel=new JChannel(props); 83 channel.connect(groupname); 84 t=new Thread (this, "UnicastTest - receiver thread"); 85 t.start(); 86 } 87 88 89 public void run() { 90 Data data; 91 Message msg; 92 Object obj; 93 boolean started=false; 94 long start=0, stop=0; 95 long current_value=0, tmp=0, num_values=0; 96 long total_msgs=0, total_time=0, msgs_per_sec; 97 98 while(true) { 99 try { 100 obj=channel.receive(0); 101 if(obj instanceof View) 102 System.out.println("** view: " + obj); 103 else 104 if(obj instanceof Message) { 105 msg=(Message)obj; 106 data=(Data)msg.getObject(); 107 108 if(data instanceof StartData) { 109 if(started) { 110 System.err.println("UnicastTest.run(): received START data, but am already processing data"); 111 continue; 112 } 113 else { 114 started=true; 115 current_value=0; tmp=0; 117 num_values=((StartData)data).num_values; 118 start=System.currentTimeMillis(); 119 } 120 } 121 else 122 if(data instanceof Value) { 123 tmp=((Value)data).value; 124 if(current_value + 1 != tmp) { 125 System.err.println("-- message received (" + tmp + ") is not 1 greater than " + current_value); 126 } 127 else { 128 current_value++; 129 if(current_value % 100 == 0) 130 System.out.println("received " + current_value); 131 if(current_value >= num_values) { 132 stop=System.currentTimeMillis(); 133 total_time=stop - start; 134 msgs_per_sec=(long)(num_values / (total_time / 1000.0)); 135 System.out.println("-- received " + num_values + " messages in " + total_time + 136 " ms (" + msgs_per_sec + " messages/sec)"); 137 started=false; 138 if(exit_on_end) 139 System.exit(0); 140 continue; 141 } 142 } 143 } 144 } 145 } 146 catch(ChannelNotConnectedException not_connected) { 147 System.err.println(not_connected); 148 break; 149 } 150 catch(ChannelClosedException closed_ex) { 151 System.err.println(closed_ex); 152 break; 153 } 154 catch(TimeoutException timeout) { 155 System.err.println(timeout); 156 break; 157 } 158 catch(Throwable t) { 159 System.err.println(t); 160 started=false; 161 current_value=0; 162 tmp=0; 163 Util.sleep(1000); 164 } 165 } 166 } 168 169 170 public void eventLoop() throws Exception { 171 int c; 172 173 while(true) { 174 System.out.print("[1] Send msgs [2] Print view [q] Quit "); 175 System.out.flush(); 176 c=System.in.read(); 177 switch(c) { 178 case -1: 179 break; 180 case '1': 181 sendMessages(); 182 break; 183 case '2': 184 printView(); 185 break; 186 case '3': 187 break; 188 case '4': 189 break; 190 case '5': 191 break; 192 case '6': 193 break; 194 case 'q': 195 channel.close(); 196 return; 197 default: 198 break; 199 } 200 } 201 } 202 203 204 void sendMessages() throws Exception { 205 long num_msgs=getNumberOfMessages(); 206 Address receiver=getReceiver(); 207 Message msg; 208 Value val=new Value(1); 209 210 if(receiver == null) { 211 System.err.println("UnicastTest.sendMessages(): receiver is null, cannot send messages"); 212 return; 213 } 214 215 System.out.println("sending " + num_msgs + " messages to " + receiver); 216 msg=new Message(receiver, null, new StartData(num_msgs)); 217 channel.send(msg); 218 219 for(int i=1; i <= num_msgs; i++) { 220 val=new Value(i); 221 msg=new Message(receiver, null, val); 222 if(i % 100 == 0) 223 System.out.println("-- sent " + i); 224 channel.send(msg); 225 if(sleep_time > 0) 226 Util.sleep(sleep_time, busy_sleep); 227 } 228 System.out.println("done sending " + num_msgs + " to " + receiver); 229 } 230 231 void printView() { 232 System.out.println("\n-- view: " + channel.getView() + '\n'); 233 try { 234 System.in.skip(System.in.available()); 235 } 236 catch(Exception e) { 237 } 238 } 239 240 241 long getNumberOfMessages() { 242 BufferedReader reader=null; 243 String tmp=null; 244 245 try { 246 System.out.print("Number of messages to send: "); 247 System.out.flush(); 248 System.in.skip(System.in.available()); 249 reader=new BufferedReader(new InputStreamReader(System.in)); 250 tmp=reader.readLine().trim(); 251 return Long.parseLong(tmp); 252 } 253 catch(Exception e) { 254 System.err.println("UnicastTest.getNumberOfMessages(): " + e); 255 return 0; 256 } 257 } 258 259 Address getReceiver() { 260 Vector mbrs=null; 261 int index; 262 BufferedReader reader; 263 String tmp; 264 265 try { 266 mbrs=channel.getView().getMembers(); 267 System.out.println("pick receiver from the following members:"); 268 for(int i=0; i < mbrs.size(); i++) { 269 if(mbrs.elementAt(i).equals(channel.getLocalAddress())) 270 System.out.println("[" + i + "]: " + mbrs.elementAt(i) + " (self)"); 271 else 272 System.out.println("[" + i + "]: " + mbrs.elementAt(i)); 273 } 274 System.out.flush(); 275 System.in.skip(System.in.available()); 276 reader=new BufferedReader(new InputStreamReader(System.in)); 277 tmp=reader.readLine().trim(); 278 index=Integer.parseInt(tmp); 279 return (Address)mbrs.elementAt(index); } 281 catch(Exception e) { 282 System.err.println("UnicastTest.getReceiver(): " + e); 283 return null; 284 } 285 } 286 287 288 public static void main(String [] args) { 289 long sleep_time=0; 290 boolean exit_on_end=false; 291 boolean busy_sleep=false; 292 293 String udp_props="UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" + 294 "ucast_recv_buf_size=32000;ucast_send_buf_size=64000;" + 295 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"; 296 297 String regular_props="PING(timeout=1000;num_initial_members=2):" + 298 "MERGE2(min_interval=5000;max_interval=10000):" + 299 "FD_SOCK:" + 300 "VERIFY_SUSPECT(timeout=1500):" + 301 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):" + 302 "UNICAST(timeout=2000,4000,6000;window_size=100;min_threshold=10;use_gms=false):" + 303 "pbcast.STABLE(desired_avg_gossip=20000):" + 304 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" + 305 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)"; 306 307 String props=udp_props + regular_props; 308 String loopback_props="LOOPBACK:" + regular_props; 309 310 for(int i=0; i < args.length; i++) { 311 if("-help".equals(args[i])) { 312 help(); 313 return; 314 } 315 if("-props".equals(args[i])) { 316 props=args[++i]; 317 continue; 318 } 319 if("-sleep".equals(args[i])) { 320 sleep_time=Long.parseLong(args[++i]); 321 continue; 322 } 323 if("-loopback".equals(args[i])) { 324 props=loopback_props; 325 continue; 326 } 327 if("-exit_on_end".equals(args[i])) { 328 exit_on_end=true; 329 continue; 330 } 331 if("-busy_sleep".equals(args[i])) { 332 busy_sleep=true; 333 continue; 334 } 335 } 336 337 338 try { 339 UnicastTest test=new UnicastTest(); 340 test.init(props, sleep_time, exit_on_end, busy_sleep); 341 test.eventLoop(); 342 } 343 catch(Exception ex) { 344 System.err.println(ex); 345 } 346 } 347 348 static void help() { 349 System.out.println("UnicastTest [-help] [-props <props>] [-sleep <time in ms between msg sends] " + 350 "[-loopback] [-exit_on_end] [-busy-sleep]"); 351 } 352 } 353 | Popular Tags |