1 package com.tc.net.proxy; 2 3 import com.tc.util.StringUtil; 4 5 import java.io.BufferedReader ; 6 import java.io.File ; 7 import java.io.FileOutputStream ; 8 import java.io.IOException ; 9 import java.io.InputStream ; 10 import java.io.InputStreamReader ; 11 import java.io.OutputStream ; 12 import java.net.InetAddress ; 13 import java.net.InetSocketAddress ; 14 import java.net.ServerSocket ; 15 import java.net.Socket ; 16 import java.net.SocketTimeoutException ; 17 import java.util.Date ; 18 import java.util.HashSet ; 19 import java.util.Set ; 20 21 24 25 29 public class TCPProxy { 30 31 private volatile boolean debug; 32 private long delay; 33 private final int listenPort; 34 private final InetSocketAddress [] endpoints; 35 private int roundRobinSequence; 36 private ServerSocket serverSocket; 37 private Thread acceptThread; 38 private volatile boolean stop; 39 private final Set connections = new HashSet (); 40 private final File logDir; 41 private final boolean logData; 42 43 public TCPProxy(int listenPort, InetAddress destHost, int destPort, long delay, boolean logData, File logDir) { 44 this(listenPort, new InetSocketAddress [] { new InetSocketAddress (destHost, destPort) }, delay, logData, logDir); 45 } 46 47 50 public TCPProxy(int listenPort, InetSocketAddress [] endpoints, long delay, boolean logData, File logDir) { 51 roundRobinSequence = 0; 52 debug = false; 53 stop = false; 54 this.listenPort = listenPort; 55 this.endpoints = endpoints; 56 this.logData = logData; 57 this.logDir = logDir; 58 setDelay(delay); 59 } 60 61 public synchronized void start() throws IOException { 62 stop(); 63 64 log("Starting listener on port " + listenPort + ", proxying to " + StringUtil.toString(endpoints, ", ", "[", "]") 65 + " with " + getDelay() + "ms delay"); 66 67 serverSocket = new ServerSocket (listenPort); 68 69 stop = false; 70 71 final TCPProxy ME = this; 72 acceptThread = new Thread (new Runnable () { 73 public void run() { 74 ME.run(); 75 } 76 }, "Accept thread (port " + listenPort + ")"); 77 acceptThread.start(); 78 } 79 80 public synchronized void stop() { 81 stop = true; 82 83 try { 84 if (serverSocket != null) { 85 serverSocket.close(); 86 } 87 } catch (Exception e) { 88 log("Error closing serverSocket", e); 89 } finally { 90 serverSocket = null; 91 } 92 93 try { 94 if (acceptThread != null) { 95 acceptThread.interrupt(); 96 97 try { 98 acceptThread.join(10000); 99 } catch (InterruptedException e) { 100 log("Interrupted while join()'ing acceptor thread", e); 101 } 102 } 103 } finally { 104 acceptThread = null; 105 } 106 107 closeAllConnections(); 108 } 109 110 synchronized void closeAllConnections() { 111 Connection conns[]; 112 synchronized (connections) { 113 conns = (Connection[]) connections.toArray(new Connection[] {}); 114 } 115 116 for (int i = 0; i < conns.length; i++) { 117 try { 118 conns[i].close(); 119 } catch (Exception e) { 120 log("Error closing connection " + conns[i].toString(), e); 121 } 122 } 123 } 124 125 public void toggleDebug() { 126 debug = !debug; 127 } 128 129 public synchronized long getDelay() { 130 return delay; 131 } 132 133 public synchronized void setDelay(long newDelay) { 134 if (newDelay < 0) { throw new IllegalArgumentException ("Delay must be greater than or equal to zero"); } 135 delay = newDelay; 136 } 137 138 void interrupt() { 139 Connection conns[]; 140 synchronized (connections) { 141 conns = (Connection[]) connections.toArray(new Connection[] {}); 142 } 143 144 for (int i = 0; i < conns.length; i++) { 145 conns[i].interrupt(); 146 } 147 } 148 149 private void run() { 150 while (!stop) { 151 final Socket socket; 152 try { 153 socket = serverSocket.accept(); 154 } catch (IOException ioe) { 155 continue; 156 } 157 158 if (Thread.interrupted()) { 159 continue; 160 } 161 162 if (socket != null) { 163 debug("Accepted connection from " + socket.toString()); 164 165 try { 166 new Connection(socket, this, logData, logDir); 167 } catch (IOException ioe) { 168 log("Error connecting to any of remote hosts " + StringUtil.toString(endpoints, ", ", "[", "]") + ", " 169 + ioe.getMessage()); 170 try { 171 socket.close(); 172 } catch (IOException clientIOE) { 173 log("Unable to close client socket after failing to proxy: " + clientIOE.getMessage()); 174 } 175 } 176 } 177 } 178 } 179 180 private synchronized int getAndIncrementRoundRobinSequence() { 181 return roundRobinSequence++; 182 } 183 184 void deregister(Connection connection) { 185 synchronized (connections) { 186 connections.remove(connection); 187 } 188 } 189 190 void register(Connection connection) { 191 synchronized (connections) { 192 connections.add(connection); 193 } 194 } 195 196 public void status() { 197 synchronized (System.err) { 198 System.err.println(); 199 System.err.println("Listening on port : " + listenPort); 200 System.err.println("Connection delay : " + getDelay() + "ms"); 201 System.err.println("Proxying to : " + StringUtil.toString(endpoints, ", ", "[", "]")); 202 System.err.println("Debug Logging : " + debug); 203 System.err.println("Active connections:"); 204 205 Connection conns[]; 206 synchronized (connections) { 207 conns = (Connection[]) connections.toArray(new Connection[] {}); 208 } 209 210 for (int i = 0; i < conns.length; i++) { 211 System.err.println("\t" + i + ": " + conns[i].toString()); 212 } 213 214 if (conns.length == 0) { 215 System.err.println("\tNONE"); 216 } 217 } 218 } 219 220 private static void help() { 221 synchronized (System.err) { 222 System.err.println(); 223 System.err.println("h - this help message"); 224 System.err.println("s - print proxy status"); 225 System.err.println("d <num> - adjust the delay time to <num> milliseconds"); 226 System.err.println("c - close all active connections"); 227 System.err.println("l - toggle debug logging"); 228 System.err.println("q - quit (shutdown proxy)"); 229 } 230 } 231 232 public static void main(String [] args) throws IOException , InterruptedException { 233 if ((args.length < 2) || (args.length > 3)) { 234 usage(); 235 System.exit(1); 236 } 237 238 final int listenPort = Integer.valueOf(args[0]).intValue(); 239 final String [] endpointStrings = args[1].split(","); 240 final InetSocketAddress [] endpoints = new InetSocketAddress [endpointStrings.length]; 241 for (int pos = 0; pos < endpointStrings.length; ++pos) { 242 final int separatorIdx = endpointStrings[pos].indexOf(":"); 243 endpoints[pos] = new InetSocketAddress (endpointStrings[pos].substring(0, separatorIdx), Integer 244 .parseInt(endpointStrings[pos].substring(separatorIdx + 1))); 245 } 246 247 long delay = 0; 248 if (args.length == 3) { 249 delay = (Long.valueOf(args[2]).longValue()); 250 } 251 252 final boolean daemonMode = Boolean.getBoolean("daemon"); 254 255 final TCPProxy theProxy = new TCPProxy(listenPort, endpoints, delay, false, null); 256 theProxy.start(); 257 258 if (daemonMode) { 259 final Object o = new Object (); 260 synchronized (o) { 261 o.wait(); 262 } 263 } else { 264 try { 265 BufferedReader stdin = new BufferedReader (new InputStreamReader (System.in)); 266 String line = ""; 267 prompt(); 268 while ((line = stdin.readLine()) != null) { 269 line = line.trim(); 270 271 if (line.toLowerCase().startsWith("q")) { 272 break; 273 } 274 275 try { 276 if (line.toLowerCase().startsWith("h")) { 277 help(); 278 continue; 279 } 280 281 if (line.toLowerCase().startsWith("s")) { 282 theProxy.status(); 283 continue; 284 } 285 286 if (line.toLowerCase().startsWith("c")) { 287 theProxy.closeAllConnections(); 288 out("all connections closed"); 289 continue; 290 } 291 292 if (line.toLowerCase().startsWith("l")) { 293 theProxy.toggleDebug(); 294 out("debug logging toggled"); 295 continue; 296 } 297 298 if (line.toLowerCase().startsWith("d")) { 299 if (line.length() <= 2) { 300 out("you must supply a delay value"); 301 continue; 302 } 303 304 try { 305 theProxy.setDelay(Long.valueOf(line.substring(2)).longValue()); 306 theProxy.interrupt(); 307 } catch (Exception e) { 308 out(e); 309 } 310 continue; 311 } 312 } catch (Exception e) { 313 out(e); 314 } finally { 315 prompt(); 316 } 317 } 318 } finally { 319 theProxy.stop(); 320 } 321 } 322 } 323 324 private static class Connection { 325 private final Socket client; 326 private final Socket proxy; 327 private final TCPProxy parent; 328 private final Thread clientThread; 329 private final Thread proxyThread; 330 private final Object closeLock = new Object (); 331 private volatile boolean stopConn = false; 332 private final long connectTime; 333 private long lastActivity; 334 private long clientBytesIn = 0; 335 private long proxyBytesIn = 0; 336 private final OutputStream clientLog; 337 private final OutputStream proxyLog; 338 339 Connection(Socket client, TCPProxy parent, boolean logData, File logDir) throws IOException { 340 this.parent = parent; 341 this.client = client; 342 this.connectTime = System.currentTimeMillis(); 343 this.lastActivity = this.connectTime; 344 345 IOException lastConnectException = null; 350 Socket connectedSocket = null; 351 final int roundRobinSequence = parent.getAndIncrementRoundRobinSequence(); 352 for (int pos = 0; connectedSocket == null && pos < parent.endpoints.length; ++pos) { 353 final int roundRobinOffset = (pos + roundRobinSequence) % parent.endpoints.length; 354 try { 355 connectedSocket = new Socket (parent.endpoints[roundRobinOffset].getAddress(), 356 parent.endpoints[roundRobinOffset].getPort()); 357 break; 358 } catch (IOException ioe) { 359 lastConnectException = ioe; 360 } 361 } 362 if (connectedSocket == null) { 363 final IOException ioe = lastConnectException != null ? lastConnectException 364 : new IOException ("Unable to establish a proxy connection to a back end server: " 365 + StringUtil.toString(parent.endpoints, ",", "[", "]")); 366 throw ioe; 367 } else { 368 proxy = connectedSocket; 369 } 370 371 if (logData) { 372 final String log = client.getLocalAddress().getHostName().toString() + "." + client.getPort(); 373 clientLog = new FileOutputStream (new File (logDir, log + ".in"), false); 374 proxyLog = new FileOutputStream (new File (logDir, log + ".out"), false); 375 } else { 376 clientLog = null; 377 proxyLog = null; 378 } 379 380 proxy.setSoTimeout(100); 381 client.setSoTimeout(100); 382 383 final InputStream clientIs = client.getInputStream(); 384 final OutputStream clientOs = client.getOutputStream(); 385 final InputStream proxyIs = proxy.getInputStream(); 386 final OutputStream proxyOs = proxy.getOutputStream(); 387 388 parent.register(this); 389 390 clientThread = new Thread (new Runnable () { 391 public void run() { 392 runHalf(clientIs, proxyOs, true, clientLog); 393 } 394 }, "Client thread for connection " + client + " proxy to " + proxy); 395 396 proxyThread = new Thread (new Runnable () { 397 public void run() { 398 runHalf(proxyIs, clientOs, false, proxyLog); 399 } 400 }, "Proxy thread for connection " + client + " proxy to " + proxy); 401 402 clientThread.start(); 403 proxyThread.start(); 404 } 405 406 private synchronized void activity() { 407 lastActivity = System.currentTimeMillis(); 408 } 409 410 private synchronized long getLastActivity() { 411 return lastActivity; 412 } 413 414 private synchronized void addProxyBytesIn(long bytesIn) { 415 this.proxyBytesIn += bytesIn; 416 } 417 418 private synchronized void addClientBytesIn(long bytesIn) { 419 this.clientBytesIn += bytesIn; 420 } 421 422 private synchronized long getProxyBytesIn() { 423 return this.proxyBytesIn; 424 } 425 426 private synchronized long getClientBytesIn() { 427 return this.clientBytesIn; 428 } 429 430 public String toString() { 431 return "Client: " + client + ", proxy to: " + proxy + ", connect: " + new Date (connectTime) + ", idle: " 432 + (System.currentTimeMillis() - getLastActivity()) + ", bytes from client: " + getClientBytesIn() 433 + ", bytes from endpoint: " + getProxyBytesIn(); 434 } 435 436 private void delay() { 437 final long sleep = parent.getDelay(); 438 439 if (sleep > 0) { 440 try { 441 Thread.sleep(sleep); 442 } catch (InterruptedException e) { 443 } 445 } 446 } 447 448 private void runHalf(InputStream src, OutputStream dest, boolean isClientHalf, OutputStream log) { 449 byte buffer[] = new byte[4096]; 450 451 while (!stopConn) { 452 int bytesRead = 0; 453 try { 454 bytesRead = src.read(buffer); 455 } catch (SocketTimeoutException ste) { 456 bytesRead = ste.bytesTransferred; 457 } catch (IOException ioe) { 458 } finally { 460 if (bytesRead > 0) { 461 try { 462 if (log != null) { 463 log.write(buffer, 0, bytesRead); 464 log.flush(); 465 } 466 } catch (IOException e) { 467 throw new RuntimeException (e); 468 } 469 parent.debug("read " + bytesRead + " on " + (isClientHalf ? "client" : "proxy") + " connection"); 470 if (isClientHalf) addClientBytesIn(bytesRead); 471 else addProxyBytesIn(bytesRead); 472 } 473 } 474 475 if (bytesRead < 0) { 476 close(); 478 return; 479 } 480 481 if (bytesRead > 0) { 482 activity(); 483 delay(); 484 485 try { 486 dest.write(buffer, 0, bytesRead); 487 } catch (IOException ioe) { 488 close(); 489 return; 490 } 491 } 492 } 493 } 494 495 void interrupt() { 496 try { 497 clientThread.interrupt(); 498 } finally { 499 proxyThread.interrupt(); 500 } 501 } 502 503 void close() { 504 synchronized (closeLock) { 505 if (stopConn) return; 506 stopConn = true; 507 } 508 509 try { 510 try { 511 if (client != null) client.close(); 512 } catch (IOException e) { 513 } 515 516 try { 517 if (proxy != null) proxy.close(); 518 } catch (IOException e) { 519 } 521 522 clientThread.interrupt(); 523 proxyThread.interrupt(); 524 525 try { 526 clientThread.join(1000); 527 } catch (InterruptedException ie) { 528 } 530 try { 531 proxyThread.join(1000); 532 } catch (InterruptedException ie) { 533 } 535 } finally { 536 parent.deregister(this); 537 538 try { 539 if (clientLog != null) { 540 clientLog.close(); 541 } 542 } catch (Exception e) { 543 e.printStackTrace(); 544 } 545 546 try { 547 if (proxyLog != null) { 548 proxyLog.close(); 549 } 550 } catch (Exception e) { 551 e.printStackTrace(); 552 } 553 } 554 } 555 } 556 557 private static void prompt() { 558 synchronized (System.err) { 559 System.err.print("\nproxy> "); 560 System.err.flush(); 561 } 562 } 563 564 private static void out(String message) { 565 synchronized (System.err) { 566 System.err.println(message); 567 } 568 } 569 570 private static void out(Throwable t) { 571 if (t == null) return; 572 synchronized (System.err) { 573 t.printStackTrace(System.err); 574 } 575 } 576 577 private static void log(String message) { 578 log(message, null); 579 } 580 581 private static void log(String message, Throwable t) { 582 synchronized (System.err) { 583 System.err.println(new Date () + ": " + message); 584 if (t != null) { 585 t.printStackTrace(System.err); 586 } 587 } 588 } 589 590 private void debug(String message) { 591 debug(message, null); 592 } 593 594 private void debug(String message, Throwable t) { 595 if (debug) log(message, t); 596 } 597 598 private static void usage() { 599 System.err.println("usage: TCPProxy <listen port> <endpoint[,endpoint...]> [delay]"); 600 System.err.println(" <listen port> - The port the proxy should listen on"); 601 System.err 602 .println(" <endpoint> - Comma separated list of 1 or more <host>:<port> pairs to round robin requests to"); 603 System.err.println(" [delay] - Millisecond delay between network data (optional, default: 0)"); 604 } 605 606 } | Popular Tags |