KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > perf > transports > TcpTransport


1 package org.jgroups.tests.perf.transports;
2
3 import org.jgroups.stack.IpAddress;
4 import org.jgroups.tests.perf.Receiver;
5 import org.jgroups.tests.perf.Transport;
6
7 import java.io.*;
8 import java.net.ConnectException JavaDoc;
9 import java.net.InetAddress JavaDoc;
10 import java.net.ServerSocket JavaDoc;
11 import java.net.Socket JavaDoc;
12 import java.util.*;
13
14 /**
15  * @author Bela Ban Jan 22
16  * @author 2004
17  * @version $Id: TcpTransport.java,v 1.5 2004/10/04 20:43:39 belaban Exp $
18  */

19 public class TcpTransport implements Transport {
20     Receiver receiver=null;
21     Properties config=null;
22     int max_receiver_buffer_size=500000;
23     int max_send_buffer_size=500000;
24     List nodes=new ArrayList();
25     ConnectionTable ct;
26     int srv_port=7777;
27     ServerSocket JavaDoc srv_sock=null;
28     InetAddress JavaDoc bind_addr=null;
29     IpAddress local_addr=null;
30     List receivers=new ArrayList();
31
32
33     public TcpTransport() {
34     }
35
36     public Object JavaDoc getLocalAddress() {
37         return local_addr;
38     }
39
40     public void create(Properties properties) throws Exception JavaDoc {
41         this.config=properties;
42 // System.out.println("-- local_addr is " + local_addr);
43

44         String JavaDoc tmp;
45         if((tmp=config.getProperty("srv_port")) != null)
46             srv_port=Integer.parseInt(tmp);
47
48         String JavaDoc bind_addr_str=System.getProperty("udp.bind_addr", config.getProperty("bind_addr"));
49         if(bind_addr_str != null) {
50             bind_addr=InetAddress.getByName(bind_addr_str);
51         }
52         else
53             bind_addr=InetAddress.getLocalHost();
54
55         String JavaDoc cluster_def=config.getProperty("cluster");
56         if(cluster_def == null)
57             throw new Exception JavaDoc("TcpTransport.create(): property 'cluster' is not defined");
58         nodes=parseCommaDelimitedList(cluster_def);
59         ct=new ConnectionTable(nodes);
60     }
61
62
63     public void start() throws Exception JavaDoc {
64         srv_sock=new ServerSocket JavaDoc(srv_port, 50, bind_addr);
65         local_addr=new IpAddress(srv_sock.getInetAddress(), srv_sock.getLocalPort());
66         ct.init();
67
68         // accept connections and start 1 Receiver per connection
69
Thread JavaDoc acceptor=new Thread JavaDoc() {
70             public void run() {
71                 while(true) {
72                     try {
73                         Socket JavaDoc s=srv_sock.accept();
74                         ReceiverThread r=new ReceiverThread(s);
75                         r.setDaemon(true);
76                         receivers.add(r);
77                         r.start();
78                     }
79                     catch(Exception JavaDoc ex) {
80                         ex.printStackTrace();
81                         break;
82                     }
83                 }
84             }
85         };
86         acceptor.setDaemon(true);
87         acceptor.start();
88     }
89
90     public void stop() {
91         ct.close();
92         for(Iterator it=receivers.iterator(); it.hasNext();) {
93             ReceiverThread thread=(ReceiverThread)it.next();
94             thread.stopThread();
95         }
96     }
97
98     public void destroy() {
99         ;
100     }
101
102     public void setReceiver(Receiver r) {
103         this.receiver=r;
104     }
105
106     public void send(Object JavaDoc destination, byte[] payload) throws Exception JavaDoc {
107         if(destination != null)
108             throw new Exception JavaDoc("TcpTransport.send(): unicasts not supported");
109         ct.writeMessage(payload);
110     }
111
112
113     class ConnectionTable {
114          List nodes;
115          Connection[] connections;
116
117          ConnectionTable(List nodes) throws Exception JavaDoc {
118              this.nodes=nodes;
119              connections=new Connection[nodes.size()];
120          }
121
122
123          void init() throws Exception JavaDoc {
124              int i=0;
125
126              for(Iterator it=nodes.iterator(); it.hasNext();) {
127                  InetAddress JavaDoc addr=(InetAddress JavaDoc)it.next();
128                  if(connections[i] == null) {
129                      try {
130                          connections[i]=new Connection(addr);
131                      }
132                      catch(ConnectException JavaDoc connect_ex) {
133                          System.err.println("Failed to connect to " + addr + ':' + srv_port);
134                          throw connect_ex;
135                      }
136                      catch(Exception JavaDoc all_others) {
137                          throw all_others;
138                      }
139                      System.out.println("-- connected to " +addr);
140                      System.out.flush();
141                  }
142                  i++;
143              }
144          }
145
146          // todo: parallelize
147
void writeMessage(byte[] msg) throws Exception JavaDoc {
148              for(int i=0; i < connections.length; i++) {
149                  Connection c=connections[i];
150                  if(c != null)
151                      c.writeMessage(msg);
152              }
153          }
154
155          void close() {
156              for(int i=0; i < connections.length; i++) {
157                  Connection c=connections[i];
158                  if(c != null)
159                      c.close();
160              }
161          }
162
163          public String JavaDoc toString() {
164              StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
165              for(Iterator it=nodes.iterator(); it.hasNext();) {
166                  InetAddress JavaDoc inetAddress=(InetAddress JavaDoc)it.next();
167                  sb.append(inetAddress).append(' ');
168              }
169              return sb.toString();
170          }
171      }
172
173      class Connection {
174          Socket JavaDoc sock;
175          DataOutputStream out;
176
177          Connection(InetAddress JavaDoc addr) throws Exception JavaDoc {
178              sock=new Socket JavaDoc(addr, srv_port);
179              out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
180          }
181
182          void writeMessage(byte[] msg) throws Exception JavaDoc {
183              out.writeInt(msg.length);
184              out.write(msg, 0, msg.length);
185              out.flush();
186          }
187
188          void close() {
189              try {
190                  sock.close();
191              }
192              catch(Exception JavaDoc ex) {
193
194              }
195          }
196      }
197
198
199
200     class ReceiverThread extends Thread JavaDoc {
201         Socket JavaDoc sock;
202         DataInputStream in;
203
204         ReceiverThread(Socket JavaDoc sock) throws Exception JavaDoc {
205             this.sock=sock;
206             sock.setSoTimeout(5000);
207             in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
208         }
209
210         public void run() {
211             while(sock != null) {
212                 try {
213                     int len=in.readInt();
214                     byte[] buf=new byte[len];
215                     in.readFully(buf, 0, len);
216                     if(receiver != null)
217                         receiver.receive(sock.getInetAddress(), buf);
218                 }
219                 catch(EOFException eof) {
220                     break;
221                 }
222                 catch(Exception JavaDoc ex) {
223                     if(sock == null) return;
224                     ex.printStackTrace();
225                 }
226             }
227         }
228
229         void stopThread() {
230             try {
231                 sock.close();
232                 sock=null;
233             }
234             catch(Exception JavaDoc ex) {
235
236             }
237         }
238
239     }
240
241
242
243     public List parseCommaDelimitedList(String JavaDoc s) throws Exception JavaDoc {
244         List retval=new ArrayList();
245         StringTokenizer tok;
246         InetAddress JavaDoc host;
247
248         if(s == null) return null;
249         tok=new StringTokenizer(s, ",");
250         while(tok.hasMoreTokens()) {
251             host=InetAddress.getByName(tok.nextToken());
252             retval.add(host);
253         }
254         return retval;
255     }
256
257
258 }
259
Popular Tags