KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > proxy > TCPProxy


1 package com.tc.net.proxy;
2
3 import com.tc.util.StringUtil;
4
5 import java.io.BufferedReader JavaDoc;
6 import java.io.File JavaDoc;
7 import java.io.FileOutputStream JavaDoc;
8 import java.io.IOException JavaDoc;
9 import java.io.InputStream JavaDoc;
10 import java.io.InputStreamReader JavaDoc;
11 import java.io.OutputStream JavaDoc;
12 import java.net.InetAddress JavaDoc;
13 import java.net.InetSocketAddress JavaDoc;
14 import java.net.ServerSocket JavaDoc;
15 import java.net.Socket JavaDoc;
16 import java.net.SocketTimeoutException JavaDoc;
17 import java.util.Date JavaDoc;
18 import java.util.HashSet JavaDoc;
19 import java.util.Set JavaDoc;
20
21 /*
22  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
23  */

24
25 /**
26  * A simple TCP proxy (with round robin load balancing support) to simulate network delays and help debug network
27  * streams.
28  */

29 public class TCPProxy {
30
31   private volatile boolean debug;
32   private long delay;
33   private final int listenPort;
34   private final InetSocketAddress JavaDoc[] endpoints;
35   private int roundRobinSequence;
36   private ServerSocket JavaDoc serverSocket;
37   private Thread JavaDoc acceptThread;
38   private volatile boolean stop;
39   private final Set JavaDoc connections = new HashSet JavaDoc();
40   private final File JavaDoc logDir;
41   private final boolean logData;
42
43   public TCPProxy(int listenPort, InetAddress JavaDoc destHost, int destPort, long delay, boolean logData, File JavaDoc logDir) {
44     this(listenPort, new InetSocketAddress JavaDoc[] { new InetSocketAddress JavaDoc(destHost, destPort) }, delay, logData, logDir);
45   }
46
47   /**
48    * If multiple endpoints are used, then the proxy will round robin between them.
49    */

50   public TCPProxy(int listenPort, InetSocketAddress JavaDoc[] endpoints, long delay, boolean logData, File JavaDoc logDir) {
51     roundRobinSequence = 0;
52     debug = false;
53     stop = false;
54     this.listenPort = listenPort;
55     this.endpoints = endpoints;
56     this.logData = logData;
57     this.logDir = logDir;
58     setDelay(delay);
59   }
60
61   public synchronized void start() throws IOException JavaDoc {
62     stop();
63
64     log("Starting listener on port " + listenPort + ", proxying to " + StringUtil.toString(endpoints, ", ", "[", "]")
65         + " with " + getDelay() + "ms delay");
66
67     serverSocket = new ServerSocket JavaDoc(listenPort);
68
69     stop = false;
70
71     final TCPProxy ME = this;
72     acceptThread = new Thread JavaDoc(new Runnable JavaDoc() {
73       public void run() {
74         ME.run();
75       }
76     }, "Accept thread (port " + listenPort + ")");
77     acceptThread.start();
78   }
79
80   public synchronized void stop() {
81     stop = true;
82
83     try {
84       if (serverSocket != null) {
85         serverSocket.close();
86       }
87     } catch (Exception JavaDoc e) {
88       log("Error closing serverSocket", e);
89     } finally {
90       serverSocket = null;
91     }
92
93     try {
94       if (acceptThread != null) {
95         acceptThread.interrupt();
96
97         try {
98           acceptThread.join(10000);
99         } catch (InterruptedException JavaDoc e) {
100           log("Interrupted while join()'ing acceptor thread", e);
101         }
102       }
103     } finally {
104       acceptThread = null;
105     }
106
107     closeAllConnections();
108   }
109
110   synchronized void closeAllConnections() {
111     Connection conns[];
112     synchronized (connections) {
113       conns = (Connection[]) connections.toArray(new Connection[] {});
114     }
115
116     for (int i = 0; i < conns.length; i++) {
117       try {
118         conns[i].close();
119       } catch (Exception JavaDoc e) {
120         log("Error closing connection " + conns[i].toString(), e);
121       }
122     }
123   }
124
125   public void toggleDebug() {
126     debug = !debug;
127   }
128
129   public synchronized long getDelay() {
130     return delay;
131   }
132
133   public synchronized void setDelay(long newDelay) {
134     if (newDelay < 0) { throw new IllegalArgumentException JavaDoc("Delay must be greater than or equal to zero"); }
135     delay = newDelay;
136   }
137
138   void interrupt() {
139     Connection conns[];
140     synchronized (connections) {
141       conns = (Connection[]) connections.toArray(new Connection[] {});
142     }
143
144     for (int i = 0; i < conns.length; i++) {
145       conns[i].interrupt();
146     }
147   }
148
149   private void run() {
150     while (!stop) {
151       final Socket JavaDoc socket;
152       try {
153         socket = serverSocket.accept();
154       } catch (IOException JavaDoc ioe) {
155         continue;
156       }
157
158       if (Thread.interrupted()) {
159         continue;
160       }
161
162       if (socket != null) {
163         debug("Accepted connection from " + socket.toString());
164
165         try {
166           new Connection(socket, this, logData, logDir);
167         } catch (IOException JavaDoc ioe) {
168           log("Error connecting to any of remote hosts " + StringUtil.toString(endpoints, ", ", "[", "]") + ", "
169               + ioe.getMessage());
170           try {
171             socket.close();
172           } catch (IOException JavaDoc clientIOE) {
173             log("Unable to close client socket after failing to proxy: " + clientIOE.getMessage());
174           }
175         }
176       }
177     }
178   }
179
180   private synchronized int getAndIncrementRoundRobinSequence() {
181     return roundRobinSequence++;
182   }
183
184   void deregister(Connection connection) {
185     synchronized (connections) {
186       connections.remove(connection);
187     }
188   }
189
190   void register(Connection connection) {
191     synchronized (connections) {
192       connections.add(connection);
193     }
194   }
195
196   public void status() {
197     synchronized (System.err) {
198       System.err.println();
199       System.err.println("Listening on port : " + listenPort);
200       System.err.println("Connection delay : " + getDelay() + "ms");
201       System.err.println("Proxying to : " + StringUtil.toString(endpoints, ", ", "[", "]"));
202       System.err.println("Debug Logging : " + debug);
203       System.err.println("Active connections:");
204
205       Connection conns[];
206       synchronized (connections) {
207         conns = (Connection[]) connections.toArray(new Connection[] {});
208       }
209
210       for (int i = 0; i < conns.length; i++) {
211         System.err.println("\t" + i + ": " + conns[i].toString());
212       }
213
214       if (conns.length == 0) {
215         System.err.println("\tNONE");
216       }
217     }
218   }
219
220   private static void help() {
221     synchronized (System.err) {
222       System.err.println();
223       System.err.println("h - this help message");
224       System.err.println("s - print proxy status");
225       System.err.println("d <num> - adjust the delay time to <num> milliseconds");
226       System.err.println("c - close all active connections");
227       System.err.println("l - toggle debug logging");
228       System.err.println("q - quit (shutdown proxy)");
229     }
230   }
231
232   public static void main(String JavaDoc[] args) throws IOException JavaDoc, InterruptedException JavaDoc {
233     if ((args.length < 2) || (args.length > 3)) {
234       usage();
235       System.exit(1);
236     }
237
238     final int listenPort = Integer.valueOf(args[0]).intValue();
239     final String JavaDoc[] endpointStrings = args[1].split(",");
240     final InetSocketAddress JavaDoc[] endpoints = new InetSocketAddress JavaDoc[endpointStrings.length];
241     for (int pos = 0; pos < endpointStrings.length; ++pos) {
242       final int separatorIdx = endpointStrings[pos].indexOf(":");
243       endpoints[pos] = new InetSocketAddress JavaDoc(endpointStrings[pos].substring(0, separatorIdx), Integer
244           .parseInt(endpointStrings[pos].substring(separatorIdx + 1)));
245     }
246
247     long delay = 0;
248     if (args.length == 3) {
249       delay = (Long.valueOf(args[2]).longValue());
250     }
251
252     // If this is set to true then we are in non-interactive mode and don't print a prompt
253
final boolean daemonMode = Boolean.getBoolean("daemon");
254
255     final TCPProxy theProxy = new TCPProxy(listenPort, endpoints, delay, false, null);
256     theProxy.start();
257
258     if (daemonMode) {
259       final Object JavaDoc o = new Object JavaDoc();
260       synchronized (o) {
261         o.wait();
262       }
263     } else {
264       try {
265         BufferedReader JavaDoc stdin = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
266         String JavaDoc line = "";
267         prompt();
268         while ((line = stdin.readLine()) != null) {
269           line = line.trim();
270
271           if (line.toLowerCase().startsWith("q")) {
272             break;
273           }
274
275           try {
276             if (line.toLowerCase().startsWith("h")) {
277               help();
278               continue;
279             }
280
281             if (line.toLowerCase().startsWith("s")) {
282               theProxy.status();
283               continue;
284             }
285
286             if (line.toLowerCase().startsWith("c")) {
287               theProxy.closeAllConnections();
288               out("all connections closed");
289               continue;
290             }
291
292             if (line.toLowerCase().startsWith("l")) {
293               theProxy.toggleDebug();
294               out("debug logging toggled");
295               continue;
296             }
297
298             if (line.toLowerCase().startsWith("d")) {
299               if (line.length() <= 2) {
300                 out("you must supply a delay value");
301                 continue;
302               }
303
304               try {
305                 theProxy.setDelay(Long.valueOf(line.substring(2)).longValue());
306                 theProxy.interrupt();
307               } catch (Exception JavaDoc e) {
308                 out(e);
309               }
310               continue;
311             }
312           } catch (Exception JavaDoc e) {
313             out(e);
314           } finally {
315             prompt();
316           }
317         }
318       } finally {
319         theProxy.stop();
320       }
321     }
322   }
323
324   private static class Connection {
325     private final Socket JavaDoc client;
326     private final Socket JavaDoc proxy;
327     private final TCPProxy parent;
328     private final Thread JavaDoc clientThread;
329     private final Thread JavaDoc proxyThread;
330     private final Object JavaDoc closeLock = new Object JavaDoc();
331     private volatile boolean stopConn = false;
332     private final long connectTime;
333     private long lastActivity;
334     private long clientBytesIn = 0;
335     private long proxyBytesIn = 0;
336     private final OutputStream JavaDoc clientLog;
337     private final OutputStream JavaDoc proxyLog;
338
339     Connection(Socket JavaDoc client, TCPProxy parent, boolean logData, File JavaDoc logDir) throws IOException JavaDoc {
340       this.parent = parent;
341       this.client = client;
342       this.connectTime = System.currentTimeMillis();
343       this.lastActivity = this.connectTime;
344
345       // Round robin and try connecting to the next available backend server; this is done by adding an ever increasing
346
// sequence number to the offset into the endpoint array (and then mod'ing it so you don't index past the array);
347
// this will ensure that you loop through the array in order and start over at the beginning once you reach the
348
// end
349
IOException JavaDoc lastConnectException = null;
350       Socket JavaDoc connectedSocket = null;
351       final int roundRobinSequence = parent.getAndIncrementRoundRobinSequence();
352       for (int pos = 0; connectedSocket == null && pos < parent.endpoints.length; ++pos) {
353         final int roundRobinOffset = (pos + roundRobinSequence) % parent.endpoints.length;
354         try {
355           connectedSocket = new Socket JavaDoc(parent.endpoints[roundRobinOffset].getAddress(),
356                                        parent.endpoints[roundRobinOffset].getPort());
357           break;
358         } catch (IOException JavaDoc ioe) {
359           lastConnectException = ioe;
360         }
361       }
362       if (connectedSocket == null) {
363         final IOException JavaDoc ioe = lastConnectException != null ? lastConnectException
364             : new IOException JavaDoc("Unable to establish a proxy connection to a back end server: "
365                               + StringUtil.toString(parent.endpoints, ",", "[", "]"));
366         throw ioe;
367       } else {
368         proxy = connectedSocket;
369       }
370
371       if (logData) {
372         final String JavaDoc log = client.getLocalAddress().getHostName().toString() + "." + client.getPort();
373         clientLog = new FileOutputStream JavaDoc(new File JavaDoc(logDir, log + ".in"), false);
374         proxyLog = new FileOutputStream JavaDoc(new File JavaDoc(logDir, log + ".out"), false);
375       } else {
376         clientLog = null;
377         proxyLog = null;
378       }
379
380       proxy.setSoTimeout(100);
381       client.setSoTimeout(100);
382
383       final InputStream JavaDoc clientIs = client.getInputStream();
384       final OutputStream JavaDoc clientOs = client.getOutputStream();
385       final InputStream JavaDoc proxyIs = proxy.getInputStream();
386       final OutputStream JavaDoc proxyOs = proxy.getOutputStream();
387
388       parent.register(this);
389
390       clientThread = new Thread JavaDoc(new Runnable JavaDoc() {
391         public void run() {
392           runHalf(clientIs, proxyOs, true, clientLog);
393         }
394       }, "Client thread for connection " + client + " proxy to " + proxy);
395
396       proxyThread = new Thread JavaDoc(new Runnable JavaDoc() {
397         public void run() {
398           runHalf(proxyIs, clientOs, false, proxyLog);
399         }
400       }, "Proxy thread for connection " + client + " proxy to " + proxy);
401
402       clientThread.start();
403       proxyThread.start();
404     }
405
406     private synchronized void activity() {
407       lastActivity = System.currentTimeMillis();
408     }
409
410     private synchronized long getLastActivity() {
411       return lastActivity;
412     }
413
414     private synchronized void addProxyBytesIn(long bytesIn) {
415       this.proxyBytesIn += bytesIn;
416     }
417
418     private synchronized void addClientBytesIn(long bytesIn) {
419       this.clientBytesIn += bytesIn;
420     }
421
422     private synchronized long getProxyBytesIn() {
423       return this.proxyBytesIn;
424     }
425
426     private synchronized long getClientBytesIn() {
427       return this.clientBytesIn;
428     }
429
430     public String JavaDoc toString() {
431       return "Client: " + client + ", proxy to: " + proxy + ", connect: " + new Date JavaDoc(connectTime) + ", idle: "
432              + (System.currentTimeMillis() - getLastActivity()) + ", bytes from client: " + getClientBytesIn()
433              + ", bytes from endpoint: " + getProxyBytesIn();
434     }
435
436     private void delay() {
437       final long sleep = parent.getDelay();
438
439       if (sleep > 0) {
440         try {
441           Thread.sleep(sleep);
442         } catch (InterruptedException JavaDoc e) {
443           // ignore
444
}
445       }
446     }
447
448     private void runHalf(InputStream JavaDoc src, OutputStream JavaDoc dest, boolean isClientHalf, OutputStream JavaDoc log) {
449       byte buffer[] = new byte[4096];
450
451       while (!stopConn) {
452         int bytesRead = 0;
453         try {
454           bytesRead = src.read(buffer);
455         } catch (SocketTimeoutException JavaDoc ste) {
456           bytesRead = ste.bytesTransferred;
457         } catch (IOException JavaDoc ioe) {
458           // ignore
459
} finally {
460           if (bytesRead > 0) {
461             try {
462               if (log != null) {
463                 log.write(buffer, 0, bytesRead);
464                 log.flush();
465               }
466             } catch (IOException JavaDoc e) {
467               throw new RuntimeException JavaDoc(e);
468             }
469             parent.debug("read " + bytesRead + " on " + (isClientHalf ? "client" : "proxy") + " connection");
470             if (isClientHalf) addClientBytesIn(bytesRead);
471             else addProxyBytesIn(bytesRead);
472           }
473         }
474
475         if (bytesRead < 0) {
476           // delay();
477
close();
478           return;
479         }
480
481         if (bytesRead > 0) {
482           activity();
483           delay();
484
485           try {
486             dest.write(buffer, 0, bytesRead);
487           } catch (IOException JavaDoc ioe) {
488             close();
489             return;
490           }
491         }
492       }
493     }
494
495     void interrupt() {
496       try {
497         clientThread.interrupt();
498       } finally {
499         proxyThread.interrupt();
500       }
501     }
502
503     void close() {
504       synchronized (closeLock) {
505         if (stopConn) return;
506         stopConn = true;
507       }
508
509       try {
510         try {
511           if (client != null) client.close();
512         } catch (IOException JavaDoc e) {
513           // ignore
514
}
515
516         try {
517           if (proxy != null) proxy.close();
518         } catch (IOException JavaDoc e) {
519           // ignore
520
}
521
522         clientThread.interrupt();
523         proxyThread.interrupt();
524
525         try {
526           clientThread.join(1000);
527         } catch (InterruptedException JavaDoc ie) {
528           // ignore
529
}
530         try {
531           proxyThread.join(1000);
532         } catch (InterruptedException JavaDoc ie) {
533           // ignore
534
}
535       } finally {
536         parent.deregister(this);
537
538         try {
539           if (clientLog != null) {
540             clientLog.close();
541           }
542         } catch (Exception JavaDoc e) {
543           e.printStackTrace();
544         }
545
546         try {
547           if (proxyLog != null) {
548             proxyLog.close();
549           }
550         } catch (Exception JavaDoc e) {
551           e.printStackTrace();
552         }
553       }
554     }
555   }
556
557   private static void prompt() {
558     synchronized (System.err) {
559       System.err.print("\nproxy> ");
560       System.err.flush();
561     }
562   }
563
564   private static void out(String JavaDoc message) {
565     synchronized (System.err) {
566       System.err.println(message);
567     }
568   }
569
570   private static void out(Throwable JavaDoc t) {
571     if (t == null) return;
572     synchronized (System.err) {
573       t.printStackTrace(System.err);
574     }
575   }
576
577   private static void log(String JavaDoc message) {
578     log(message, null);
579   }
580
581   private static void log(String JavaDoc message, Throwable JavaDoc t) {
582     synchronized (System.err) {
583       System.err.println(new Date JavaDoc() + ": " + message);
584       if (t != null) {
585         t.printStackTrace(System.err);
586       }
587     }
588   }
589
590   private void debug(String JavaDoc message) {
591     debug(message, null);
592   }
593
594   private void debug(String JavaDoc message, Throwable JavaDoc t) {
595     if (debug) log(message, t);
596   }
597
598   private static void usage() {
599     System.err.println("usage: TCPProxy <listen port> <endpoint[,endpoint...]> [delay]");
600     System.err.println(" <listen port> - The port the proxy should listen on");
601     System.err
602         .println(" <endpoint> - Comma separated list of 1 or more <host>:<port> pairs to round robin requests to");
603     System.err.println(" [delay] - Millisecond delay between network data (optional, default: 0)");
604   }
605
606 }
Popular Tags