1 29 30 package com.caucho.server.hmux; 31 32 import com.caucho.log.Log; 33 import com.caucho.util.Alarm; 34 import com.caucho.vfs.*; 35 36 import java.io.IOException ; 37 import java.io.InputStream ; 38 import java.io.OutputStream ; 39 import java.net.ConnectException ; 40 import java.net.Socket ; 41 import java.net.SocketException ; 42 import java.util.HashMap ; 43 import java.util.Iterator ; 44 import java.util.logging.Level ; 45 import java.util.logging.Logger ; 46 47 50 class HmuxStream extends StreamImpl { 51 private static final Logger log = Log.open(HmuxStream.class); 52 private static HashMap <String ,String > _reserved; 54 55 private static final Object LOCK = new Object (); 56 57 private static HmuxStream _savedStream; 59 private static long _saveTime; 61 62 private long _socketTimeout = 30000L; 63 64 private boolean _isSSL; 65 66 private Socket _s; 67 private InputStream _is; 68 private OutputStream _os; 69 private ReadStream _rs; 70 private WriteStream _ws; 71 72 private String _host; 74 private int _port; 76 77 private String _virtualHost; 78 79 private String _method; 81 private boolean _isHead; 83 private boolean _isPost; 85 86 private MemoryStream _tempStream; 88 89 private boolean _isKeepalive = true; 91 private boolean _didGet; 93 private int _chunkLength; 95 private boolean _isRequestDone; 97 98 private HashMap <String ,Object > _attributes; 99 100 private byte []_tempBuffer; 102 103 106 private HmuxStream(Path path, String host, int port, Socket s) 107 throws IOException 108 { 109 _s = s; 110 111 _host = host; 112 _port = port; 113 114 _is = _s.getInputStream(); 115 _os = _s.getOutputStream(); 116 117 _ws = VfsStream.openWrite(_os); 118 _rs = VfsStream.openRead(_is, _ws); 119 120 _attributes = new HashMap <String ,Object >(); 121 122 init(path); 123 } 124 125 132 static HmuxStreamWrapper openRead(HmuxPath path) throws IOException 133 { 134 HmuxStream stream = createStream(path); 135 stream._isPost = false; 136 137 return new HmuxStreamWrapper(stream); 138 } 139 140 147 static HmuxStreamWrapper openReadWrite(HmuxPath path) throws IOException 148 { 149 HmuxStream stream = createStream(path); 150 stream._isPost = true; 151 152 return new HmuxStreamWrapper(stream); 153 } 154 155 163 static private HmuxStream createStream(HmuxPath path) throws IOException 164 { 165 String host = path.getHost(); 166 int port = path.getPort(); 167 168 HmuxStream stream = null; 169 long streamTime = 0; 170 synchronized (LOCK) { 171 if (_savedStream != null && 172 host.equals(_savedStream.getHost()) && 173 port == _savedStream.getPort()) { 174 stream = _savedStream; 175 streamTime = _saveTime; 176 _savedStream = null; 177 } 178 } 179 180 if (stream == null) { 181 } 182 else if (Alarm.getCurrentTime() < streamTime + 5000) { 184 stream.init(path); 185 return stream; 186 } 187 else { 189 try { 190 stream._isKeepalive = false; 191 stream.close(); 192 } catch (IOException e) { 193 log.log(Level.FINE, e.toString(), e); 194 } 195 } 196 197 Socket s; 198 199 try { 200 s = new Socket (host, port); 201 } catch (ConnectException e) { 202 throw new ConnectException (path.getURL() + ": " + e.getMessage()); 203 } catch (Exception e) { 204 throw new ConnectException (path.getURL() + ": " + e.toString()); 205 } 206 207 int socketTimeout = 300 * 1000; 208 209 try { 210 s.setSoTimeout(socketTimeout); 211 } catch (Exception e) { 212 } 213 214 return new HmuxStream(path, host, port, s); 215 } 216 217 220 private void init(Path path) 221 { 222 _isRequestDone = false; 223 _didGet = false; 224 _isPost = false; 225 _isHead = false; 226 _method = null; 227 _attributes.clear(); 228 229 setPath(path); 230 231 if (path instanceof HmuxPath) 232 _virtualHost = ((HmuxPath) path).getVirtualHost(); 233 } 234 235 238 public void setSSL(boolean isSSL) 239 { 240 _isSSL = isSSL; 241 } 242 243 246 public boolean isSSL() 247 { 248 return _isSSL; 249 } 250 251 254 public void setMethod(String method) 255 { 256 _method = method; 257 } 258 259 262 public void setHead(boolean isHead) 263 { 264 _isHead = isHead; 265 } 266 267 270 public String getHost() 271 { 272 return _host; 273 } 274 275 278 public int getPort() 279 { 280 return _port; 281 } 282 283 289 public Object getAttribute(String name) 290 throws IOException 291 { 292 if (! _didGet) 293 getConnInput(); 294 295 return _attributes.get(name.toLowerCase()); 296 } 297 298 301 public Iterator getAttributeNames() 302 throws IOException 303 { 304 if (! _didGet) 305 getConnInput(); 306 307 return _attributes.keySet().iterator(); 308 } 309 310 313 public void setAttribute(String name, Object value) 314 { 315 if (name.equals("method")) 316 setMethod((String ) value); 317 else if (name.equals("socket-timeout")) { 318 if (value instanceof Integer ) { 319 int socketTimeout = ((Integer ) value).intValue(); 320 321 if (socketTimeout > 0) { 322 try { 323 if (_s != null) 324 _s.setSoTimeout(socketTimeout); 325 } catch (Exception e) { 326 327 } 328 } 329 } 330 } 331 else 332 _attributes.put(name.toLowerCase(), value); 333 } 334 335 338 public void removeAttribute(String name) 339 { 340 _attributes.remove(name.toLowerCase()); 341 } 342 343 346 public void setSocketTimeout(long timeout) 347 throws SocketException 348 { 349 if (_s != null) 350 _s.setSoTimeout((int) timeout); 351 } 352 353 356 public boolean canWrite() 357 { 358 return true; 359 } 360 361 369 public void write(byte []buf, int offset, int length, boolean isEnd) 370 throws IOException 371 { 372 if (! _isPost) 373 return; 374 375 if (_tempStream == null) 376 _tempStream = new MemoryStream(); 377 378 _tempStream.write(buf, offset, length, isEnd); 379 } 380 381 384 public boolean canRead() 385 { 386 return true; 387 } 388 389 393 public int read(byte []buf, int offset, int length) throws IOException 394 { 395 try { 396 return readInt(buf, offset, length); 397 } catch (IOException e) { 398 _isKeepalive = false; 399 throw e; 400 } catch (RuntimeException e) { 401 _isKeepalive = false; 402 throw e; 403 } 404 } 405 406 410 public int readInt(byte []buf, int offset, int length) throws IOException 411 { 412 if (! _didGet) 413 getConnInput(); 414 415 if (_isRequestDone) 416 return -1; 417 418 try { 419 int len = length; 420 421 if (_chunkLength == 0) { 422 if (! readData()) 423 _chunkLength = -1; 424 } 425 426 if (_chunkLength < 0) 427 return -1; 428 429 if (_chunkLength < len) 430 len = _chunkLength; 431 432 len = _rs.read(buf, offset, len); 433 434 if (len < 0) { 435 } 436 else 437 _chunkLength -= len; 438 439 return len; 440 } catch (IOException e) { 441 _isKeepalive = false; 442 throw e; 443 } catch (RuntimeException e) { 444 _isKeepalive = false; 445 throw e; 446 } 447 } 448 449 452 private void getConnInput() throws IOException 453 { 454 if (_didGet) 455 return; 456 457 try { 458 getConnInputImpl(); 459 } catch (IOException e) { 460 _isKeepalive = false; 461 throw e; 462 } catch (RuntimeException e) { 463 _isKeepalive = false; 464 throw e; 465 } 466 } 467 468 472 private void getConnInputImpl() throws IOException 473 { 474 if (_didGet) 475 return; 476 477 _didGet = true; 478 479 _ws.write('C'); 480 _ws.write(0); 481 _ws.write(0); 482 483 if (_method != null) { 484 writeString(HmuxRequest.HMUX_METHOD, _method); 485 } 486 else if (_isPost) { 487 writeString(HmuxRequest.HMUX_METHOD, "POST"); 488 } 489 else if (_isHead) 490 writeString(HmuxRequest.HMUX_METHOD, "HEAD"); 491 else 492 writeString(HmuxRequest.HMUX_METHOD, "GET"); 493 494 if (_virtualHost != null) 495 writeString(HmuxRequest.HMUX_SERVER_NAME, _virtualHost); 496 else { 497 writeString(HmuxRequest.HMUX_SERVER_NAME, _path.getHost()); 498 _ws.print(_path.getHost()); 499 if (_path.getPort() != 80) { 500 writeString(HmuxRequest.CSE_SERVER_PORT, 501 String.valueOf(_path.getPort())); 502 } 503 } 504 505 writeString(HmuxRequest.HMUX_URI, _path.getPath()); 507 508 if (_path.getQuery() != null) 509 writeString(HmuxRequest.CSE_QUERY_STRING, _path.getQuery()); 510 511 Iterator iter = getAttributeNames(); 512 while (iter.hasNext()) { 513 String name = (String ) iter.next(); 514 if (_reserved.get(name.toLowerCase()) == null) { 515 writeString(HmuxRequest.HMUX_HEADER, name); 516 writeString(HmuxRequest.HMUX_STRING, getAttribute(name)); 517 } 518 } 519 520 if (_isPost) { 521 MemoryStream tempStream = _tempStream; 522 _tempStream = null; 523 if (tempStream != null) { 524 TempBuffer tb = TempBuffer.allocate(); 525 byte []buffer = tb.getBuffer(); 526 int sublen; 527 528 ReadStream postIn = tempStream.openRead(); 529 530 while ((sublen = postIn.read(buffer, 0, buffer.length)) > 0) { 531 _ws.write('D'); 532 _ws.write(sublen >> 8); 533 _ws.write(sublen); 534 _ws.write(buffer, 0, sublen); 535 } 536 537 tempStream.destroy(); 538 539 TempBuffer.free(tb); 540 } 541 } 542 543 _attributes.clear(); 544 545 _ws.write('Q'); 546 547 readData(); 548 549 if (_isHead) 550 _isRequestDone = true; 551 } 552 553 private void writeString(int code, String string) 554 throws IOException 555 { 556 WriteStream ws = _ws; 557 558 ws.write((byte) code); 559 int len = string.length(); 560 ws.write(len >> 8); 561 ws.write(len); 562 ws.print(string); 563 } 564 565 private void writeString(int code, Object obj) 566 throws IOException 567 { 568 String string = String.valueOf(obj); 569 570 WriteStream ws = _ws; 571 572 ws.write((byte) code); 573 int len = string.length(); 574 ws.write(len >> 8); 575 ws.write(len); 576 ws.print(string); 577 } 578 579 582 private boolean readData() 583 throws IOException 584 { 585 boolean isDebug = log.isLoggable(Level.FINE); 586 587 int code; 588 589 ReadStream is = _rs; 590 591 while ((code = is.read()) > 0) { 592 switch (code) { 593 case HmuxRequest.HMUX_CHANNEL: 594 is.read(); 595 is.read(); 596 break; 597 case HmuxRequest.HMUX_QUIT: 598 case HmuxRequest.HMUX_EXIT: 599 is.close(); 600 601 if (isDebug) 602 log.fine("HMUX: " + (char) code); 603 604 return false; 605 606 case HmuxRequest.HMUX_YIELD: 607 break; 608 609 case HmuxRequest.HMUX_STATUS: 610 String value = readString(is); 611 _attributes.put("status", value.substring(0, 3)); 612 613 if (isDebug) 614 log.fine("HMUX: " + (char) code + " " + value); 615 break; 616 617 case HmuxRequest.HMUX_DATA: 618 _chunkLength = 256 * (is.read() & 0xff) + (is.read() & 0xff); 619 620 if (isDebug) 621 log.fine("HMUX: " + (char) code + " " + _chunkLength); 622 623 return true; 624 625 default: 626 int len = 256 * (is.read() & 0xff) + (is.read() & 0xff); 627 628 if (isDebug) 629 log.fine("HMUX: " + (char) code + " " + len); 630 631 is.skip(len); 632 break; 633 } 634 } 635 636 return false; 637 } 638 639 private String readString(ReadStream is) 640 throws IOException 641 { 642 int len = 256 * (is.read() & 0xff) + is.read(); 643 644 char []buf = new char[len]; 645 646 is.readAll(buf, 0, len); 647 648 return new String (buf); 649 } 650 651 654 public int getAvailable() throws IOException 655 { 656 if (! _didGet) 657 getConnInput(); 658 659 return _rs.getAvailable(); 660 } 661 662 665 public void close() throws IOException 666 { 667 if (_isKeepalive) { 668 if (! _didGet) 670 getConnInput(); 671 672 if (! _isRequestDone) { 673 if (_tempBuffer == null) 674 _tempBuffer = new byte[256]; 675 676 try { 677 while (read(_tempBuffer, 0, _tempBuffer.length) > 0) { 678 } 679 } catch (IOException e) { 680 _isKeepalive = false; 681 } 682 } 683 } 684 685 if (com.caucho.server.util.CauchoSystem.isTesting()) 686 _isKeepalive = false; 688 if (_isKeepalive) { 689 HmuxStream oldSaved; 690 long now = Alarm.getCurrentTime(); 691 synchronized (LOCK) { 692 oldSaved = _savedStream; 693 _savedStream = this; 694 _saveTime = now; 695 } 696 697 if (oldSaved != null && oldSaved != this) { 698 oldSaved._isKeepalive = false; 699 oldSaved.close(); 700 } 701 702 return; 703 } 704 705 try { 706 try { 707 if (_ws != null) 708 _ws.close(); 709 } catch (Throwable e) { 710 } 711 _ws = null; 712 713 try { 714 if (_rs != null) 715 _rs.close(); 716 } catch (Throwable e) { 717 } 718 _rs = null; 719 720 try { 721 if (_os != null) 722 _os.close(); 723 } catch (Throwable e) { 724 } 725 _os = null; 726 727 try { 728 if (_is != null) 729 _is.close(); 730 } catch (Throwable e) { 731 } 732 _is = null; 733 } finally { 734 if (_s != null) 735 _s.close(); 736 _s = null; 737 } 738 } 739 740 static { 741 _reserved = new HashMap <String ,String >(); 742 _reserved.put("user-agent", ""); 743 _reserved.put("content-length", ""); 744 _reserved.put("content-encoding", ""); 745 _reserved.put("connection", ""); 746 _reserved.put("host", ""); 747 } 748 } 749 | Popular Tags |