1 29 30 package com.caucho.server.port; 31 32 import com.caucho.loader.Environment; 33 import com.caucho.management.server.AbstractManagedObject; 34 import com.caucho.management.server.TcpConnectionMXBean; 35 import com.caucho.server.connection.BroadcastTask; 36 import com.caucho.util.Alarm; 37 import com.caucho.util.ThreadPool; 38 import com.caucho.util.ThreadTask; 39 import com.caucho.vfs.ClientDisconnectException; 40 import com.caucho.vfs.QSocket; 41 import com.caucho.vfs.ReadStream; 42 43 import java.io.IOException ; 44 import java.net.InetAddress ; 45 import java.util.logging.Level ; 46 import java.util.logging.Logger ; 47 48 54 public class TcpConnection extends PortConnection implements ThreadTask 55 { 56 private static final Logger log 57 = Logger.getLogger(TcpConnection.class.getName()); 58 59 private final QSocket _socket; 60 61 private boolean _isInUse; 62 private boolean _isActive; 63 private boolean _isClosed; 64 65 private boolean _isKeepalive; 66 private boolean _isDead; 67 68 private final Object _requestLock = new Object (); 69 70 private final String _id; 71 private String _name; 72 73 private final Admin _admin = new Admin(); 74 75 private String _state = "unknown"; 76 private long _startTime; 77 private Thread _thread; 78 79 85 TcpConnection(Port port, QSocket socket) 86 { 87 setPort(port); 88 89 int id = getId(); 90 91 if (port.getAddress() == null) { 92 _id = "resin-tcp-connection-*:" + port.getPort() + "-" + id; 93 _name = "INADDR_ANY-" + port.getPort() + "-" + id; 94 } 95 else { 96 _id = ("resin-tcp-connection-" + port.getAddress() + ":" + 97 port.getPort() + "-" + id); 98 _name = port.getAddress() + "-" + port.getPort() + "-" + id; 99 } 100 101 _socket = socket; 102 } 103 104 107 public String getName() 108 { 109 return _name; 110 } 111 112 115 public void initSocket() 116 throws IOException 117 { 118 _isClosed = false; 119 _isInUse = true; 120 _isKeepalive = false; 121 122 getWriteStream().init(_socket.getStream()); 123 getReadStream().init(_socket.getStream(), getWriteStream()); 124 125 if (log.isLoggable(Level.FINE)) { 126 Port port = getPort(); 127 128 if (port != null) 129 log.fine("starting connection " + this + ", total=" + port.getConnectionCount()); 130 else 131 log.fine("starting connection " + this); 132 } 133 } 134 135 138 public QSocket getSocket() 139 { 140 return _socket; 141 } 142 143 146 public QSocket startSocket() 147 { 148 _isClosed = false; 149 150 return _socket; 151 } 152 153 156 private boolean waitForKeepalive() 157 throws IOException 158 { 159 Port port = getPort(); 160 QSocket socket = _socket; 161 162 if (port.isClosed()) 163 return false; 164 165 ReadStream is = getReadStream(); 166 167 if (getReadStream().getBufferAvailable() > 0) 168 return true; 169 170 long timeout = port.getKeepaliveTimeout(); 171 172 boolean isSelectManager = port.getServer().isEnableSelectManager(); 173 174 if (isSelectManager) { 175 timeout = port.getKeepaliveSelectThreadTimeout(); 176 } 177 178 if (timeout > 0 && timeout < port.getSocketTimeout()) 179 return is.fillWithTimeout(timeout); 180 else if (isSelectManager) 181 return false; 182 else 183 return true; 184 } 185 186 public boolean isSecure() 187 { 188 if (_isClosed) 189 return false; 190 else 191 return _socket.isSecure(); 192 } 193 194 197 public boolean isClosed() 198 { 199 return _isClosed; 200 } 201 202 205 public void setActive(boolean isActive) 206 { 207 _isActive = isActive; 208 } 209 210 213 public boolean isActive() 214 { 215 return _isActive; 216 } 217 218 221 public void setKeepalive() 222 { 223 if (_isKeepalive) 224 log.warning("illegal state: setting keepalive with active keepalive: " + this); 225 226 _isKeepalive = true; 227 } 228 229 232 public void clearKeepalive() 233 { 234 if (! _isKeepalive) 235 log.warning("illegal state: clearing keepalive with inactive keepalive: " + this); 236 237 _isKeepalive = false; 238 } 239 240 243 public InetAddress getLocalAddress() 244 { 245 try { 247 return _socket.getLocalAddress(); 248 } catch (Exception e) { 249 try { 250 return InetAddress.getLocalHost(); 251 } catch (Exception e1) { 252 try { 253 return InetAddress.getByName("127.0.0.1"); 254 } catch (Exception e2) { 255 return null; 256 } 257 } 258 } 259 } 260 261 264 public int getLocalPort() 265 { 266 return _socket.getLocalPort(); 267 } 268 269 272 public InetAddress getRemoteAddress() 273 { 274 return _socket.getRemoteAddress(); 275 } 276 277 280 public String getRemoteHost() 281 { 282 return _socket.getRemoteHost(); 283 } 284 285 288 public int getRemoteAddress(byte []buffer, int offset, int length) 289 { 290 return _socket.getRemoteAddress(buffer, offset, length); 291 } 292 293 296 public int getRemotePort() 297 { 298 return _socket.getRemotePort(); 299 } 300 301 304 public String getVirtualHost() 305 { 306 return getPort().getVirtualHost(); 307 } 308 309 312 public final String getState() 313 { 314 return _state; 315 } 316 317 320 public final void setState(String state) 321 { 322 _state = state; 323 } 324 325 328 public final void beginActive() 329 { 330 _state = "active"; 331 _startTime = Alarm.getCurrentTime(); 332 } 333 334 337 public final void endActive() 338 { 339 _state = "idle"; 340 _startTime = 0; 341 } 342 343 346 public final long getThreadId() 347 { 348 Thread thread = _thread; 349 350 if (thread != null) 351 return thread.getId(); 352 else 353 return -1; 354 } 355 356 359 public final long getRequestActiveTime() 360 { 361 if (_startTime > 0) 362 return Alarm.getCurrentTime() - _startTime; 363 else 364 return -1; 365 } 366 367 375 private void keepalive() 376 { 377 Port port = getPort(); 378 379 if (! port.keepaliveBegin(this)) { 380 if (log.isLoggable(Level.FINE)) 381 log.fine("[" + getId() + "] failed keepalive"); 382 383 free(); 384 } 385 else if (port.getSelectManager() != null) { 386 if (! port.getSelectManager().keepalive(this)) { 387 if (log.isLoggable(Level.FINE)) 391 log.fine("[" + getId() + "] FAILED keepalive (select)"); 392 393 port.keepaliveEnd(this); 394 free(); 395 } 396 else { 397 if (log.isLoggable(Level.FINE)) 398 log.fine("[" + getId() + "] keepalive (select)"); 399 } 400 } 401 else { 402 if (log.isLoggable(Level.FINE)) 403 log.fine("[" + getId() + "] keepalive (thread)"); 404 405 setKeepalive(); 406 ThreadPool.getThreadPool().schedule(this); 407 } 408 } 409 410 413 public void start() 414 { 415 Thread thread = Thread.currentThread(); 416 ClassLoader oldLoader = thread.getContextClassLoader(); 417 } 418 419 422 public void run() 423 { 424 Port port = getPort(); 425 426 boolean isKeepalive = _isKeepalive; 427 _isKeepalive = false; 428 429 boolean isFirst = ! isKeepalive; 430 431 ServerRequest request = getRequest(); 432 boolean isWaitForRead = request.isWaitForRead(); 433 434 Thread thread = Thread.currentThread(); 435 String oldThreadName = thread.getName(); 436 437 thread.setName(_id); 438 439 if (isKeepalive) 440 port.keepaliveEnd(this); 441 442 port.threadBegin(this); 443 444 ClassLoader systemLoader = ClassLoader.getSystemClassLoader(); 445 446 long startTime = Alarm.getExactTime(); 447 448 thread.setContextClassLoader(systemLoader); 449 450 452 try { 453 _thread = thread; 454 455 while (! _isDead) { 456 if (isKeepalive) { 457 } 458 else if (! port.accept(this, isFirst)) { 459 return; 460 } 461 462 isFirst = false; 463 464 try { 465 thread.interrupted(); 466 468 do { 469 thread.setContextClassLoader(systemLoader); 470 471 isKeepalive = false; 472 473 if (! port.isClosed() && 474 (! isWaitForRead || getReadStream().waitForRead())) { 475 476 synchronized (_requestLock) { 477 isKeepalive = request.handleRequest(); 478 } 479 } 480 } while (isKeepalive && waitForKeepalive() && ! port.isClosed()); 481 482 if (isKeepalive) { 483 return; 484 } 485 else { 486 getRequest().protocolCloseEvent(); 487 } 488 } 489 catch (ClientDisconnectException e) { 490 isKeepalive = false; 491 492 if (log.isLoggable(Level.FINER)) 493 log.finer("[" + getId() + "] " + e); 494 } 495 catch (IOException e) { 496 isKeepalive = false; 497 498 if (log.isLoggable(Level.FINE)) 499 log.log(Level.FINE, "[" + getId() + "] " + e, e); 500 501 } 502 finally { 503 thread.setContextClassLoader(systemLoader); 504 505 if (! isKeepalive) 506 closeImpl(); 507 } 508 } 509 } catch (Throwable e) { 510 log.log(Level.WARNING, e.toString(), e); 511 isKeepalive = false; 512 } finally { 513 thread.setContextClassLoader(systemLoader); 514 515 517 port.threadEnd(this); 518 519 if (isKeepalive) 520 keepalive(); 521 else 522 free(); 523 524 _thread = null; 525 thread.setName(oldThreadName); 526 } 527 } 528 529 532 public void sendBroadcast(BroadcastTask task) 533 { 534 synchronized (_requestLock) { 535 task.execute(this); 536 } 537 } 538 539 542 public void closeOnShutdown() 543 { 544 QSocket socket = _socket; 545 546 if (socket != null) { 547 try { 548 socket.close(); 549 } catch (Throwable e) { 550 log.log(Level.FINE, e.toString(), e); 551 } 552 553 Thread.currentThread().yield(); 554 } 555 } 556 557 560 private void closeImpl() 561 { 562 QSocket socket = _socket; 563 564 boolean isClosed; 565 566 synchronized (this) { 567 isClosed = _isClosed; 568 _isClosed = true; 569 } 570 571 if (! isClosed) { 572 _isActive = false; 573 boolean isKeepalive = _isKeepalive; 574 _isKeepalive = false; 575 576 Port port = getPort(); 577 578 if (isKeepalive) 579 port.keepaliveEnd(this); 580 581 if (log.isLoggable(Level.FINE) && _isInUse) { 582 Object serverId = Environment.getAttribute("caucho.server-id"); 583 String prefix = ""; 584 585 if (serverId != null) 586 prefix = "[" + serverId + "] "; 587 588 if (port != null) 589 log.fine(prefix + "closing connection " + this + ", total=" + port.getConnectionCount()); 590 else 591 log.fine(prefix + "closing connection " + this); 592 } 593 594 _isInUse = false; 595 596 try { 597 getWriteStream().close(); 598 } catch (Throwable e) { 599 log.log(Level.FINE, e.toString(), e); 600 } 601 602 try { 603 getReadStream().close(); 604 } catch (Throwable e) { 605 log.log(Level.FINE, e.toString(), e); 606 } 607 608 if (socket != null) { 609 try { 610 socket.close(); 611 } catch (Throwable e) { 612 log.log(Level.FINE, e.toString(), e); 613 } 614 } 615 } 616 } 617 618 621 public final void destroy() 622 { 623 _isDead = true; 624 625 closeImpl(); 626 } 627 628 631 final void free() 632 { 633 closeImpl(); 634 635 setState("free"); 636 637 if (! _isDead) 638 getPort().free(this); 639 else 640 getPort().kill(this); 641 } 642 643 public String toString() 644 { 645 if (_isActive) 646 return "TcpConnection[id=" + _id + ",socket=" + _socket + ",active]"; 647 else 648 return "TcpConnection[id=" + _id + ",socket=" + _socket + ",port=" + getPort() + "]"; 649 } 650 651 class Admin extends AbstractManagedObject implements TcpConnectionMXBean { 652 Admin() 653 { 654 super(ClassLoader.getSystemClassLoader()); 655 } 656 657 public String getName() 658 { 659 return _name; 660 } 661 662 public long getThreadId() 663 { 664 return TcpConnection.this.getThreadId(); 665 } 666 667 public long getRequestActiveTime() 668 { 669 return TcpConnection.this.getRequestActiveTime(); 670 } 671 672 public String getState() 673 { 674 return TcpConnection.this.getState(); 675 } 676 677 void register() 678 { 679 registerSelf(); 680 } 681 682 void unregister() 683 { 684 unregisterSelf(); 685 } 686 } 687 } 688 | Popular Tags |