1 2 package ch.ethz.ssh2.transport; 3 4 import java.io.IOException ; 5 import java.io.InputStream ; 6 import java.io.OutputStream ; 7 import java.net.InetAddress ; 8 import java.net.InetSocketAddress ; 9 import java.net.Socket ; 10 import java.net.UnknownHostException ; 11 import java.security.SecureRandom ; 12 import java.util.Vector ; 13 14 import ch.ethz.ssh2.ConnectionInfo; 15 import ch.ethz.ssh2.ConnectionMonitor; 16 import ch.ethz.ssh2.DHGexParameters; 17 import ch.ethz.ssh2.HTTPProxyData; 18 import ch.ethz.ssh2.HTTPProxyException; 19 import ch.ethz.ssh2.ProxyData; 20 import ch.ethz.ssh2.ServerHostKeyVerifier; 21 import ch.ethz.ssh2.crypto.Base64; 22 import ch.ethz.ssh2.crypto.CryptoWishList; 23 import ch.ethz.ssh2.crypto.cipher.BlockCipher; 24 import ch.ethz.ssh2.crypto.digest.MAC; 25 import ch.ethz.ssh2.log.Logger; 26 import ch.ethz.ssh2.packets.PacketDisconnect; 27 import ch.ethz.ssh2.packets.Packets; 28 import ch.ethz.ssh2.packets.TypesReader; 29 import ch.ethz.ssh2.util.Tokenizer; 30 31 46 47 53 public class TransportManager 54 { 55 private static final Logger log = Logger.getLogger(TransportManager.class); 56 57 class HandlerEntry 58 { 59 MessageHandler mh; 60 int low; 61 int high; 62 } 63 64 private final Vector asynchronousQueue = new Vector (); 65 private Thread asynchronousThread = null; 66 67 class AsynchronousWorker extends Thread 68 { 69 public void run() 70 { 71 while (true) 72 { 73 byte[] msg = null; 74 75 synchronized (asynchronousQueue) 76 { 77 if (asynchronousQueue.size() == 0) 78 { 79 80 81 try 82 { 83 asynchronousQueue.wait(2000); 84 } 85 catch (InterruptedException e) 86 { 87 88 } 89 90 if (asynchronousQueue.size() == 0) 91 { 92 asynchronousThread = null; 93 return; 94 } 95 } 96 97 msg = (byte[]) asynchronousQueue.remove(0); 98 } 99 100 111 112 try 113 { 114 sendMessage(msg); 115 } 116 catch (IOException e) 117 { 118 return; 119 } 120 } 121 } 122 } 123 124 String hostname; 125 int port; 126 final Socket sock = new Socket (); 127 128 Object connectionSemaphore = new Object (); 129 130 boolean flagKexOngoing = false; 131 boolean connectionClosed = false; 132 133 Throwable reasonClosedCause = null; 134 135 TransportConnection tc; 136 KexManager km; 137 138 Vector messageHandlers = new Vector (); 139 140 Thread receiveThread; 141 142 Vector connectionMonitors = new Vector (); 143 boolean monitorsWereInformed = false; 144 145 155 private InetAddress createInetAddress(String host) throws UnknownHostException 156 { 157 158 159 InetAddress addr = parseIPv4Address(host); 160 161 if (addr != null) 162 return addr; 163 164 return InetAddress.getByName(host); 165 } 166 167 private InetAddress parseIPv4Address(String host) throws UnknownHostException 168 { 169 if (host == null) 170 return null; 171 172 String [] quad = Tokenizer.parseTokens(host, '.'); 173 174 if ((quad == null) || (quad.length != 4)) 175 return null; 176 177 byte[] addr = new byte[4]; 178 179 for (int i = 0; i < 4; i++) 180 { 181 int part = 0; 182 183 if ((quad[i].length() == 0) || (quad[i].length() > 3)) 184 return null; 185 186 for (int k = 0; k < quad[i].length(); k++) 187 { 188 char c = quad[i].charAt(k); 189 190 191 if ((c < '0') || (c > '9')) 192 return null; 193 194 part = part * 10 + (c - '0'); 195 } 196 197 if (part > 255) 198 return null; 199 200 addr[i] = (byte) part; 201 } 202 203 return InetAddress.getByAddress(host, addr); 204 } 205 206 public TransportManager(String host, int port) throws IOException 207 { 208 this.hostname = host; 209 this.port = port; 210 } 211 212 public int getPacketOverheadEstimate() 213 { 214 return tc.getPacketOverheadEstimate(); 215 } 216 217 public void setTcpNoDelay(boolean state) throws IOException 218 { 219 sock.setTcpNoDelay(state); 220 } 221 222 public void setSoTimeout(int timeout) throws IOException 223 { 224 sock.setSoTimeout(timeout); 225 } 226 227 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException 228 { 229 return km.getOrWaitForConnectionInfo(kexNumber); 230 } 231 232 public Throwable getReasonClosedCause() 233 { 234 synchronized (connectionSemaphore) 235 { 236 return reasonClosedCause; 237 } 238 } 239 240 public byte[] getSessionIdentifier() 241 { 242 return km.sessionId; 243 } 244 245 public void close(Throwable cause, boolean useDisconnectPacket) 246 { 247 if (useDisconnectPacket == false) 248 { 249 252 253 try 254 { 255 sock.close(); 256 } 257 catch (IOException ignore) 258 { 259 } 260 261 265 } 266 267 synchronized (connectionSemaphore) 268 { 269 if (connectionClosed == false) 270 { 271 if (useDisconnectPacket == true) 272 { 273 try 274 { 275 byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "") 276 .getPayload(); 277 if (tc != null) 278 tc.sendMessage(msg); 279 } 280 catch (IOException ignore) 281 { 282 } 283 284 try 285 { 286 sock.close(); 287 } 288 catch (IOException ignore) 289 { 290 } 291 } 292 293 connectionClosed = true; 294 reasonClosedCause = cause; 295 } 296 connectionSemaphore.notifyAll(); 297 } 298 299 300 301 Vector monitors = null; 302 303 synchronized (this) 304 { 305 309 310 if (monitorsWereInformed == false) 311 { 312 monitorsWereInformed = true; 313 monitors = (Vector ) connectionMonitors.clone(); 314 } 315 } 316 317 if (monitors != null) 318 { 319 for (int i = 0; i < monitors.size(); i++) 320 { 321 try 322 { 323 ConnectionMonitor cmon = (ConnectionMonitor) monitors.elementAt(i); 324 cmon.connectionLost(reasonClosedCause); 325 } 326 catch (Exception ignore) 327 { 328 } 329 } 330 } 331 } 332 333 private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException 334 { 335 336 337 if (proxyData == null) 338 { 339 InetAddress addr = createInetAddress(hostname); 340 sock.connect(new InetSocketAddress (addr, port), connectTimeout); 341 sock.setSoTimeout(0); 342 return; 343 } 344 345 if (proxyData instanceof HTTPProxyData) 346 { 347 HTTPProxyData pd = (HTTPProxyData) proxyData; 348 349 350 351 InetAddress addr = createInetAddress(pd.proxyHost); 352 sock.connect(new InetSocketAddress (addr, pd.proxyPort), connectTimeout); 353 sock.setSoTimeout(0); 354 355 356 357 StringBuffer sb = new StringBuffer (); 358 359 sb.append("CONNECT "); 360 sb.append(hostname); 361 sb.append(':'); 362 sb.append(port); 363 sb.append(" HTTP/1.0\r\n"); 364 365 if ((pd.proxyUser != null) && (pd.proxyPass != null)) 366 { 367 String credentials = pd.proxyUser + ":" + pd.proxyPass; 368 char[] encoded = Base64.encode(credentials.getBytes()); 369 sb.append("Proxy-Authorization: Basic "); 370 sb.append(encoded); 371 sb.append("\r\n"); 372 } 373 374 if (pd.requestHeaderLines != null) 375 { 376 for (int i = 0; i < pd.requestHeaderLines.length; i++) 377 { 378 if (pd.requestHeaderLines[i] != null) 379 { 380 sb.append(pd.requestHeaderLines[i]); 381 sb.append("\r\n"); 382 } 383 } 384 } 385 386 sb.append("\r\n"); 387 388 OutputStream out = sock.getOutputStream(); 389 390 out.write(sb.toString().getBytes()); 391 out.flush(); 392 393 394 395 byte[] buffer = new byte[1024]; 396 InputStream in = sock.getInputStream(); 397 398 int len = ClientServerHello.readLineRN(in, buffer); 399 400 String httpReponse = new String (buffer, 0, len); 401 402 if (httpReponse.startsWith("HTTP/") == false) 403 throw new IOException ("The proxy did not send back a valid HTTP response."); 404 405 406 407 if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' ')) 408 throw new IOException ("The proxy did not send back a valid HTTP response."); 409 410 int errorCode = 0; 411 412 try 413 { 414 errorCode = Integer.parseInt(httpReponse.substring(9, 12)); 415 } 416 catch (NumberFormatException ignore) 417 { 418 throw new IOException ("The proxy did not send back a valid HTTP response."); 419 } 420 421 if ((errorCode < 0) || (errorCode > 999)) 422 throw new IOException ("The proxy did not send back a valid HTTP response."); 423 424 if (errorCode != 200) 425 { 426 throw new HTTPProxyException(httpReponse.substring(13), errorCode); 427 } 428 429 430 431 while (true) 432 { 433 len = ClientServerHello.readLineRN(in, buffer); 434 if (len == 0) 435 break; 436 } 437 return; 438 } 439 440 throw new IOException ("Unsupported ProxyData"); 441 } 442 443 public void initialize(CryptoWishList cwl, ServerHostKeyVerifier verifier, DHGexParameters dhgex, 444 int connectTimeout, SecureRandom rnd, ProxyData proxyData) throws IOException 445 { 446 447 448 establishConnection(proxyData, connectTimeout); 449 450 454 455 ClientServerHello csh = new ClientServerHello(sock.getInputStream(), sock.getOutputStream()); 456 457 tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd); 458 459 km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd); 460 km.initiateKEX(cwl, dhgex); 461 462 receiveThread = new Thread (new Runnable () 463 { 464 public void run() 465 { 466 try 467 { 468 receiveLoop(); 469 } 470 catch (IOException e) 471 { 472 close(e, false); 473 474 if (log.isEnabled()) 475 log.log(10, "Receive thread: error in receiveLoop: " + e.getMessage()); 476 } 477 478 if (log.isEnabled()) 479 log.log(50, "Receive thread: back from receiveLoop"); 480 481 482 483 if (km != null) 484 { 485 try 486 { 487 km.handleMessage(null, 0); 488 } 489 catch (IOException e) 490 { 491 } 492 } 493 494 for (int i = 0; i < messageHandlers.size(); i++) 495 { 496 HandlerEntry he = (HandlerEntry) messageHandlers.elementAt(i); 497 try 498 { 499 he.mh.handleMessage(null, 0); 500 } 501 catch (Exception ignore) 502 { 503 } 504 } 505 } 506 }); 507 508 receiveThread.setDaemon(true); 509 receiveThread.start(); 510 } 511 512 public void registerMessageHandler(MessageHandler mh, int low, int high) 513 { 514 HandlerEntry he = new HandlerEntry(); 515 he.mh = mh; 516 he.low = low; 517 he.high = high; 518 519 synchronized (messageHandlers) 520 { 521 messageHandlers.addElement(he); 522 } 523 } 524 525 public void removeMessageHandler(MessageHandler mh, int low, int high) 526 { 527 synchronized (messageHandlers) 528 { 529 for (int i = 0; i < messageHandlers.size(); i++) 530 { 531 HandlerEntry he = (HandlerEntry) messageHandlers.elementAt(i); 532 if ((he.mh == mh) && (he.low == low) && (he.high == high)) 533 { 534 messageHandlers.removeElementAt(i); 535 break; 536 } 537 } 538 } 539 } 540 541 public void sendKexMessage(byte[] msg) throws IOException 542 { 543 synchronized (connectionSemaphore) 544 { 545 if (connectionClosed) 546 { 547 throw (IOException ) new IOException ("Sorry, this connection is closed.").initCause(reasonClosedCause); 548 } 549 550 flagKexOngoing = true; 551 552 try 553 { 554 tc.sendMessage(msg); 555 } 556 catch (IOException e) 557 { 558 close(e, false); 559 throw e; 560 } 561 } 562 } 563 564 public void kexFinished() throws IOException 565 { 566 synchronized (connectionSemaphore) 567 { 568 flagKexOngoing = false; 569 connectionSemaphore.notifyAll(); 570 } 571 } 572 573 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException 574 { 575 km.initiateKEX(cwl, dhgex); 576 } 577 578 public void changeRecvCipher(BlockCipher bc, MAC mac) 579 { 580 tc.changeRecvCipher(bc, mac); 581 } 582 583 public void changeSendCipher(BlockCipher bc, MAC mac) 584 { 585 tc.changeSendCipher(bc, mac); 586 } 587 588 public void sendAsynchronousMessage(byte[] msg) throws IOException 589 { 590 synchronized (asynchronousQueue) 591 { 592 asynchronousQueue.addElement(msg); 593 594 599 600 if (asynchronousQueue.size() > 100) 601 throw new IOException ("Error: the peer is not consuming our asynchronous replies."); 602 603 604 605 if (asynchronousThread == null) 606 { 607 asynchronousThread = new AsynchronousWorker(); 608 asynchronousThread.setDaemon(true); 609 asynchronousThread.start(); 610 611 612 } 613 } 614 } 615 616 public void setConnectionMonitors(Vector monitors) 617 { 618 synchronized (this) 619 { 620 connectionMonitors = (Vector ) monitors.clone(); 621 } 622 } 623 624 public void sendMessage(byte[] msg) throws IOException 625 { 626 if (Thread.currentThread() == receiveThread) 627 throw new IOException ("Assertion error: sendMessage may never be invoked by the receiver thread!"); 628 629 synchronized (connectionSemaphore) 630 { 631 while (true) 632 { 633 if (connectionClosed) 634 { 635 throw (IOException ) new IOException ("Sorry, this connection is closed.") 636 .initCause(reasonClosedCause); 637 } 638 639 if (flagKexOngoing == false) 640 break; 641 642 try 643 { 644 connectionSemaphore.wait(); 645 } 646 catch (InterruptedException e) 647 { 648 } 649 } 650 651 try 652 { 653 tc.sendMessage(msg); 654 } 655 catch (IOException e) 656 { 657 close(e, false); 658 throw e; 659 } 660 } 661 } 662 663 public void receiveLoop() throws IOException 664 { 665 byte[] msg = new byte[35000]; 666 667 while (true) 668 { 669 int msglen = tc.receiveMessage(msg, 0, msg.length); 670 671 int type = msg[0] & 0xff; 672 673 if (type == Packets.SSH_MSG_IGNORE) 674 continue; 675 676 if (type == Packets.SSH_MSG_DEBUG) 677 { 678 if (log.isEnabled()) 679 { 680 TypesReader tr = new TypesReader(msg, 0, msglen); 681 tr.readByte(); 682 tr.readBoolean(); 683 StringBuffer debugMessageBuffer = new StringBuffer (); 684 debugMessageBuffer.append(tr.readString("UTF-8")); 685 686 for (int i = 0; i < debugMessageBuffer.length(); i++) 687 { 688 char c = debugMessageBuffer.charAt(i); 689 690 if ((c >= 32) && (c <= 126)) 691 continue; 692 debugMessageBuffer.setCharAt(i, '\uFFFD'); 693 } 694 695 log.log(50, "DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'"); 696 } 697 continue; 698 } 699 700 if (type == Packets.SSH_MSG_UNIMPLEMENTED) 701 { 702 throw new IOException ("Peer sent UNIMPLEMENTED message, that should not happen."); 703 } 704 705 if (type == Packets.SSH_MSG_DISCONNECT) 706 { 707 TypesReader tr = new TypesReader(msg, 0, msglen); 708 tr.readByte(); 709 int reason_code = tr.readUINT32(); 710 StringBuffer reasonBuffer = new StringBuffer (); 711 reasonBuffer.append(tr.readString("UTF-8")); 712 713 717 718 if (reasonBuffer.length() > 255) 719 { 720 reasonBuffer.setLength(255); 721 reasonBuffer.setCharAt(254, '.'); 722 reasonBuffer.setCharAt(253, '.'); 723 reasonBuffer.setCharAt(252, '.'); 724 } 725 726 732 733 for (int i = 0; i < reasonBuffer.length(); i++) 734 { 735 char c = reasonBuffer.charAt(i); 736 737 if ((c >= 32) && (c <= 126)) 738 continue; 739 reasonBuffer.setCharAt(i, '\uFFFD'); 740 } 741 742 throw new IOException ("Peer sent DISCONNECT message (reason code " + reason_code + "): " 743 + reasonBuffer.toString()); 744 } 745 746 749 750 if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS) 751 || ((type >= 30) && (type <= 49))) 752 { 753 km.handleMessage(msg, msglen); 754 continue; 755 } 756 757 MessageHandler mh = null; 758 759 for (int i = 0; i < messageHandlers.size(); i++) 760 { 761 HandlerEntry he = (HandlerEntry) messageHandlers.elementAt(i); 762 if ((he.low <= type) && (type <= he.high)) 763 { 764 mh = he.mh; 765 break; 766 } 767 } 768 769 if (mh == null) 770 throw new IOException ("Unexpected SSH message (type " + type + ")"); 771 772 mh.handleMessage(msg, msglen); 773 } 774 } 775 } 776 | Popular Tags |