1 package org.jgroups.tests.perf.transports; 2 3 import org.jgroups.stack.IpAddress; 4 import org.jgroups.tests.perf.Receiver; 5 import org.jgroups.tests.perf.Transport; 6 7 import java.io.*; 8 import java.net.ConnectException ; 9 import java.net.InetAddress ; 10 import java.net.ServerSocket ; 11 import java.net.Socket ; 12 import java.util.*; 13 14 19 public class TcpTransport implements Transport { 20 Receiver receiver=null; 21 Properties config=null; 22 int max_receiver_buffer_size=500000; 23 int max_send_buffer_size=500000; 24 List nodes=new ArrayList(); 25 ConnectionTable ct; 26 int srv_port=7777; 27 ServerSocket srv_sock=null; 28 InetAddress bind_addr=null; 29 IpAddress local_addr=null; 30 List receivers=new ArrayList(); 31 32 33 public TcpTransport() { 34 } 35 36 public Object getLocalAddress() { 37 return local_addr; 38 } 39 40 public void create(Properties properties) throws Exception { 41 this.config=properties; 42 44 String tmp; 45 if((tmp=config.getProperty("srv_port")) != null) 46 srv_port=Integer.parseInt(tmp); 47 48 String bind_addr_str=System.getProperty("udp.bind_addr", config.getProperty("bind_addr")); 49 if(bind_addr_str != null) { 50 bind_addr=InetAddress.getByName(bind_addr_str); 51 } 52 else 53 bind_addr=InetAddress.getLocalHost(); 54 55 String cluster_def=config.getProperty("cluster"); 56 if(cluster_def == null) 57 throw new Exception ("TcpTransport.create(): property 'cluster' is not defined"); 58 nodes=parseCommaDelimitedList(cluster_def); 59 ct=new ConnectionTable(nodes); 60 } 61 62 63 public void start() throws Exception { 64 srv_sock=new ServerSocket (srv_port, 50, bind_addr); 65 local_addr=new IpAddress(srv_sock.getInetAddress(), srv_sock.getLocalPort()); 66 ct.init(); 67 68 Thread acceptor=new Thread () { 70 public void run() { 71 while(true) { 72 try { 73 Socket s=srv_sock.accept(); 74 ReceiverThread r=new ReceiverThread(s); 75 r.setDaemon(true); 76 receivers.add(r); 77 r.start(); 78 } 79 catch(Exception ex) { 80 ex.printStackTrace(); 81 break; 82 } 83 } 84 } 85 }; 86 acceptor.setDaemon(true); 87 acceptor.start(); 88 } 89 90 public void stop() { 91 ct.close(); 92 for(Iterator it=receivers.iterator(); it.hasNext();) { 93 ReceiverThread thread=(ReceiverThread)it.next(); 94 thread.stopThread(); 95 } 96 } 97 98 public void destroy() { 99 ; 100 } 101 102 public void setReceiver(Receiver r) { 103 this.receiver=r; 104 } 105 106 public void send(Object destination, byte[] payload) throws Exception { 107 if(destination != null) 108 throw new Exception ("TcpTransport.send(): unicasts not supported"); 109 ct.writeMessage(payload); 110 } 111 112 113 class ConnectionTable { 114 List nodes; 115 Connection[] connections; 116 117 ConnectionTable(List nodes) throws Exception { 118 this.nodes=nodes; 119 connections=new Connection[nodes.size()]; 120 } 121 122 123 void init() throws Exception { 124 int i=0; 125 126 for(Iterator it=nodes.iterator(); it.hasNext();) { 127 InetAddress addr=(InetAddress )it.next(); 128 if(connections[i] == null) { 129 try { 130 connections[i]=new Connection(addr); 131 } 132 catch(ConnectException connect_ex) { 133 System.err.println("Failed to connect to " + addr + ':' + srv_port); 134 throw connect_ex; 135 } 136 catch(Exception all_others) { 137 throw all_others; 138 } 139 System.out.println("-- connected to " +addr); 140 System.out.flush(); 141 } 142 i++; 143 } 144 } 145 146 void writeMessage(byte[] msg) throws Exception { 148 for(int i=0; i < connections.length; i++) { 149 Connection c=connections[i]; 150 if(c != null) 151 c.writeMessage(msg); 152 } 153 } 154 155 void close() { 156 for(int i=0; i < connections.length; i++) { 157 Connection c=connections[i]; 158 if(c != null) 159 c.close(); 160 } 161 } 162 163 public String toString() { 164 StringBuffer sb=new StringBuffer (); 165 for(Iterator it=nodes.iterator(); it.hasNext();) { 166 InetAddress inetAddress=(InetAddress )it.next(); 167 sb.append(inetAddress).append(' '); 168 } 169 return sb.toString(); 170 } 171 } 172 173 class Connection { 174 Socket sock; 175 DataOutputStream out; 176 177 Connection(InetAddress addr) throws Exception { 178 sock=new Socket (addr, srv_port); 179 out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())); 180 } 181 182 void writeMessage(byte[] msg) throws Exception { 183 out.writeInt(msg.length); 184 out.write(msg, 0, msg.length); 185 out.flush(); 186 } 187 188 void close() { 189 try { 190 sock.close(); 191 } 192 catch(Exception ex) { 193 194 } 195 } 196 } 197 198 199 200 class ReceiverThread extends Thread { 201 Socket sock; 202 DataInputStream in; 203 204 ReceiverThread(Socket sock) throws Exception { 205 this.sock=sock; 206 sock.setSoTimeout(5000); 207 in=new DataInputStream(new BufferedInputStream(sock.getInputStream())); 208 } 209 210 public void run() { 211 while(sock != null) { 212 try { 213 int len=in.readInt(); 214 byte[] buf=new byte[len]; 215 in.readFully(buf, 0, len); 216 if(receiver != null) 217 receiver.receive(sock.getInetAddress(), buf); 218 } 219 catch(EOFException eof) { 220 break; 221 } 222 catch(Exception ex) { 223 if(sock == null) return; 224 ex.printStackTrace(); 225 } 226 } 227 } 228 229 void stopThread() { 230 try { 231 sock.close(); 232 sock=null; 233 } 234 catch(Exception ex) { 235 236 } 237 } 238 239 } 240 241 242 243 public List parseCommaDelimitedList(String s) throws Exception { 244 List retval=new ArrayList(); 245 StringTokenizer tok; 246 InetAddress host; 247 248 if(s == null) return null; 249 tok=new StringTokenizer(s, ","); 250 while(tok.hasMoreTokens()) { 251 host=InetAddress.getByName(tok.nextToken()); 252 retval.add(host); 253 } 254 return retval; 255 } 256 257 258 } 259 | Popular Tags |