1 17 18 package org.apache.tomcat.util.net; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 import java.net.BindException ; 23 import java.net.InetAddress ; 24 import java.net.ServerSocket ; 25 import java.net.Socket ; 26 import java.net.SocketException ; 27 import java.security.AccessControlException ; 28 import java.util.Stack ; 29 import java.util.Vector ; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.tomcat.util.res.StringManager; 34 import org.apache.tomcat.util.threads.ThreadPool; 35 import org.apache.tomcat.util.threads.ThreadPoolRunnable; 36 37 42 43 44 45 60 public class PoolTcpEndpoint implements Runnable { 62 static Log log=LogFactory.getLog(PoolTcpEndpoint.class ); 63 64 private StringManager sm = 65 StringManager.getManager("org.apache.tomcat.util.net.res"); 66 67 private static final int BACKLOG = 100; 68 private static final int TIMEOUT = 1000; 69 70 private final Object threadSync = new Object (); 71 72 private int backlog = BACKLOG; 73 private int serverTimeout = TIMEOUT; 74 75 private InetAddress inet; 76 private int port; 77 78 private ServerSocketFactory factory; 79 private ServerSocket serverSocket; 80 81 private volatile boolean running = false; 82 private volatile boolean paused = false; 83 private boolean initialized = false; 84 private boolean reinitializing = false; 85 static final int debug=0; 86 87 protected boolean tcpNoDelay=false; 88 protected int linger=100; 89 protected int socketTimeout=-1; 90 private boolean lf = true; 91 92 93 95 96 TcpConnectionHandler handler; 97 ThreadPoolRunnable listener; 98 ThreadPool tp; 99 100 101 103 104 private Thread thread = null; 105 106 private Stack workerThreads = new Stack (); 107 private int curThreads = 0; 108 private int maxThreads = 20; 109 110 private Vector created = new Vector (); 111 112 113 public PoolTcpEndpoint() { 114 tp = new ThreadPool(); 115 } 116 117 public PoolTcpEndpoint( ThreadPool tp ) { 118 this.tp=tp; 119 } 120 121 123 public void setMaxThreads(int maxThreads) { 124 if( maxThreads > 0) 125 tp.setMaxThreads(maxThreads); 126 } 127 128 public int getMaxThreads() { 129 return tp.getMaxThreads(); 130 } 131 132 public void setMaxSpareThreads(int maxThreads) { 133 if(maxThreads > 0) 134 tp.setMaxSpareThreads(maxThreads); 135 } 136 137 public int getMaxSpareThreads() { 138 return tp.getMaxSpareThreads(); 139 } 140 141 public void setMinSpareThreads(int minThreads) { 142 if(minThreads > 0) 143 tp.setMinSpareThreads(minThreads); 144 } 145 146 public int getMinSpareThreads() { 147 return tp.getMinSpareThreads(); 148 } 149 150 public void setThreadPriority(int threadPriority) { 151 tp.setThreadPriority(threadPriority); 152 } 153 154 public int getThreadPriority() { 155 return tp.getThreadPriority(); 156 } 157 158 public int getPort() { 159 return port; 160 } 161 162 public void setPort(int port ) { 163 this.port=port; 164 } 165 166 public InetAddress getAddress() { 167 return inet; 168 } 169 170 public void setAddress(InetAddress inet) { 171 this.inet=inet; 172 } 173 174 public void setServerSocket(ServerSocket ss) { 175 serverSocket = ss; 176 } 177 178 public void setServerSocketFactory( ServerSocketFactory factory ) { 179 this.factory=factory; 180 } 181 182 ServerSocketFactory getServerSocketFactory() { 183 return factory; 184 } 185 186 public void setConnectionHandler( TcpConnectionHandler handler ) { 187 this.handler=handler; 188 } 189 190 public TcpConnectionHandler getConnectionHandler() { 191 return handler; 192 } 193 194 public boolean isRunning() { 195 return running; 196 } 197 198 public boolean isPaused() { 199 return paused; 200 } 201 202 207 public void setBacklog(int backlog) { 208 if( backlog>0) 209 this.backlog = backlog; 210 } 211 212 public int getBacklog() { 213 return backlog; 214 } 215 216 224 public void setServerTimeout(int timeout) { 225 this.serverTimeout = timeout; 226 } 227 228 public boolean getTcpNoDelay() { 229 return tcpNoDelay; 230 } 231 232 public void setTcpNoDelay( boolean b ) { 233 tcpNoDelay=b; 234 } 235 236 public int getSoLinger() { 237 return linger; 238 } 239 240 public void setSoLinger( int i ) { 241 linger=i; 242 } 243 244 public int getSoTimeout() { 245 return socketTimeout; 246 } 247 248 public void setSoTimeout( int i ) { 249 socketTimeout=i; 250 } 251 252 public int getServerSoTimeout() { 253 return serverTimeout; 254 } 255 256 public void setServerSoTimeout( int i ) { 257 serverTimeout=i; 258 } 259 260 public String getStrategy() { 261 if (lf) { 262 return "lf"; 263 } else { 264 return "ms"; 265 } 266 } 267 268 public void setStrategy(String strategy) { 269 if ("ms".equals(strategy)) { 270 lf = false; 271 } else { 272 lf = true; 273 } 274 } 275 276 public int getCurrentThreadCount() { 277 return curThreads; 278 } 279 280 public int getCurrentThreadsBusy() { 281 return curThreads - workerThreads.size(); 282 } 283 284 286 public void initEndpoint() throws IOException , InstantiationException { 287 try { 288 if(factory==null) 289 factory=ServerSocketFactory.getDefault(); 290 if(serverSocket==null) { 291 try { 292 if (inet == null) { 293 serverSocket = factory.createSocket(port, backlog); 294 } else { 295 serverSocket = factory.createSocket(port, backlog, inet); 296 } 297 } catch ( BindException be ) { 298 throw new BindException (be.getMessage() + ":" + port); 299 } 300 } 301 if( serverTimeout >= 0 ) 302 serverSocket.setSoTimeout( serverTimeout ); 303 } catch( IOException ex ) { 304 throw ex; 305 } catch( InstantiationException ex1 ) { 306 throw ex1; 307 } 308 initialized = true; 309 } 310 311 public void startEndpoint() throws IOException , InstantiationException { 312 if (!initialized) { 313 initEndpoint(); 314 } 315 if (lf) { 316 tp.start(); 317 } 318 running = true; 319 paused = false; 320 if (lf) { 321 listener = new LeaderFollowerWorkerThread(this); 322 tp.runIt(listener); 323 } else { 324 maxThreads = getMaxThreads(); 325 threadStart(); 326 } 327 } 328 329 public void pauseEndpoint() { 330 if (running && !paused) { 331 paused = true; 332 unlockAccept(); 333 } 334 } 335 336 public void resumeEndpoint() { 337 if (running) { 338 paused = false; 339 } 340 } 341 342 public void stopEndpoint() { 343 if (running) { 344 if (lf) { 345 tp.shutdown(); 346 } 347 running = false; 348 if (serverSocket != null) { 349 closeServerSocket(); 350 } 351 if (!lf) { 352 threadStop(); 353 } 354 initialized=false ; 355 } 356 } 357 358 protected void closeServerSocket() { 359 if (!paused) 360 unlockAccept(); 361 try { 362 if( serverSocket!=null) 363 serverSocket.close(); 364 } catch(Exception e) { 365 log.error(sm.getString("endpoint.err.close"), e); 366 } 367 serverSocket = null; 368 } 369 370 protected void unlockAccept() { 371 Socket s = null; 372 try { 373 if (inet == null) { 375 s = new Socket ("127.0.0.1", port); 376 } else { 377 s = new Socket (inet, port); 378 s.setSoLinger(true, 0); 381 } 382 } catch(Exception e) { 383 if (log.isDebugEnabled()) { 384 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); 385 } 386 } finally { 387 if (s != null) { 388 try { 389 s.close(); 390 } catch (Exception e) { 391 } 393 } 394 } 395 } 396 397 399 Socket acceptSocket() { 400 if( !running || serverSocket==null ) return null; 401 402 Socket accepted = null; 403 404 try { 405 if(factory==null) { 406 accepted = serverSocket.accept(); 407 } else { 408 accepted = factory.acceptSocket(serverSocket); 409 } 410 if (null == accepted) { 411 log.warn(sm.getString("endpoint.warn.nullSocket")); 412 } else { 413 if (!running) { 414 accepted.close(); accepted = null; 416 } else if (factory != null) { 417 factory.initSocket( accepted ); 418 } 419 } 420 } 421 catch(InterruptedIOException iioe) { 422 } 426 catch (AccessControlException ace) { 427 String msg = sm.getString("endpoint.warn.security", 432 serverSocket, ace); 433 log.warn(msg); 434 } 435 catch (IOException e) { 436 437 String msg = null; 438 439 if (running) { 440 msg = sm.getString("endpoint.err.nonfatal", 441 serverSocket, e); 442 log.error(msg, e); 443 } 444 445 if (accepted != null) { 446 try { 447 accepted.close(); 448 } catch(Throwable ex) { 449 msg = sm.getString("endpoint.err.nonfatal", 450 accepted, ex); 451 log.warn(msg, ex); 452 } 453 accepted = null; 454 } 455 456 if( ! running ) return null; 457 reinitializing = true; 458 synchronized (threadSync) { 460 if (reinitializing) { 461 reinitializing = false; 462 closeServerSocket(); 464 initialized = false; 465 try { 467 msg = sm.getString("endpoint.warn.reinit"); 468 log.warn(msg); 469 initEndpoint(); 470 } catch (Throwable t) { 471 msg = sm.getString("endpoint.err.nonfatal", 472 serverSocket, t); 473 log.error(msg, t); 474 } 475 if (!initialized) { 477 msg = sm.getString("endpoint.warn.restart"); 478 log.warn(msg); 479 try { 480 stopEndpoint(); 481 initEndpoint(); 482 startEndpoint(); 483 } catch (Throwable t) { 484 msg = sm.getString("endpoint.err.fatal", 485 serverSocket, t); 486 log.error(msg, t); 487 } 488 throw new ThreadDeath (); 490 } 491 } 492 } 493 494 } 495 496 return accepted; 497 } 498 499 void setSocketOptions(Socket socket) 500 throws SocketException { 501 if(linger >= 0 ) 502 socket.setSoLinger( true, linger); 503 if( tcpNoDelay ) 504 socket.setTcpNoDelay(tcpNoDelay); 505 if( socketTimeout > 0 ) 506 socket.setSoTimeout( socketTimeout ); 507 } 508 509 510 void processSocket(Socket s, TcpConnection con, Object [] threadData) { 511 int step = 1; 513 try { 514 515 setSocketOptions(s); 517 518 step = 2; 520 if (getServerSocketFactory() != null) { 521 getServerSocketFactory().handshake(s); 522 } 523 524 step = 3; 526 con.setEndpoint(this); 527 con.setSocket(s); 528 getConnectionHandler().processConnection(con, threadData); 529 530 } catch (SocketException se) { 531 log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()), 532 se); 533 try { 535 s.close(); 536 } catch (IOException e) { 537 } 538 } catch (Throwable t) { 539 if (step == 2) { 540 if (log.isDebugEnabled()) { 541 log.debug(sm.getString("endpoint.err.handshake"), t); 542 } 543 } else { 544 log.error(sm.getString("endpoint.err.unexpected"), t); 545 } 546 try { 548 s.close(); 549 } catch (IOException e) { 550 } 551 } finally { 552 if (con != null) { 553 con.recycle(); 554 } 555 } 556 } 557 558 559 561 562 568 private MasterSlaveWorkerThread createWorkerThread() { 569 570 synchronized (workerThreads) { 571 if (workerThreads.size() > 0) { 572 return ((MasterSlaveWorkerThread) workerThreads.pop()); 573 } 574 if ((maxThreads > 0) && (curThreads < maxThreads)) { 575 return (newWorkerThread()); 576 } else { 577 if (maxThreads < 0) { 578 return (newWorkerThread()); 579 } else { 580 return (null); 581 } 582 } 583 } 584 585 } 586 587 588 592 private MasterSlaveWorkerThread newWorkerThread() { 593 594 MasterSlaveWorkerThread workerThread = 595 new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads)); 596 workerThread.start(); 597 created.addElement(workerThread); 598 return (workerThread); 599 600 } 601 602 603 608 void recycleWorkerThread(MasterSlaveWorkerThread workerThread) { 609 workerThreads.push(workerThread); 610 } 611 612 613 617 public void run() { 618 619 while (running) { 621 622 while (paused) { 624 try { 625 Thread.sleep(1000); 626 } catch (InterruptedException e) { 627 } 629 } 630 631 MasterSlaveWorkerThread workerThread = createWorkerThread(); 633 if (workerThread == null) { 634 try { 635 Thread.sleep(100); 641 } catch (InterruptedException e) { 642 } 644 continue; 645 } 646 647 Socket socket = acceptSocket(); 649 650 workerThread.assign(socket); 652 653 655 } 656 657 synchronized (threadSync) { 659 threadSync.notifyAll(); 660 } 661 662 } 663 664 665 668 private void threadStart() { 669 thread = new Thread (this, tp.getName()); 670 thread.setPriority(getThreadPriority()); 671 thread.setDaemon(true); 672 thread.start(); 673 } 674 675 676 679 private void threadStop() { 680 thread = null; 681 } 682 683 684 } 685 | Popular Tags |