1 17 18 package org.apache.james.util.connection; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 import java.net.ServerSocket ; 23 import java.net.Socket ; 24 import java.net.SocketException ; 25 import java.util.ArrayList ; 26 import java.util.Iterator ; 27 import java.util.List ; 28 29 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler; 30 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory; 31 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool; 32 import org.apache.avalon.excalibur.pool.ObjectFactory; 33 import org.apache.avalon.excalibur.pool.Pool; 34 import org.apache.avalon.excalibur.pool.Poolable; 35 import org.apache.avalon.excalibur.thread.ThreadPool; 36 import org.apache.avalon.framework.activity.Disposable; 37 import org.apache.avalon.framework.activity.Initializable; 38 import org.apache.avalon.framework.component.Component; 39 import org.apache.avalon.framework.logger.AbstractLogEnabled; 40 import org.apache.avalon.framework.logger.LogEnabled; 41 42 43 49 public class ServerConnection extends AbstractLogEnabled 50 implements Component, Initializable, Runnable { 51 52 63 private static int POLLING_INTERVAL = 20*1000; 64 65 68 private ServerSocket serverSocket; 69 70 74 private ConnectionHandlerFactory handlerFactory; 75 76 79 private Pool runnerPool; 80 81 84 private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory(); 85 86 90 private ThreadPool connThreadPool; 91 92 95 private int socketTimeout; 96 97 101 private int maxOpenConn; 102 103 106 private final ArrayList clientConnectionRunners = new ArrayList (); 107 108 111 private Thread serverConnectionThread; 112 113 124 public ServerConnection(ServerSocket serverSocket, 125 ConnectionHandlerFactory handlerFactory, 126 ThreadPool threadPool, 127 int timeout, 128 int maxOpenConn) { 129 this.serverSocket = serverSocket; 130 this.handlerFactory = handlerFactory; 131 connThreadPool = threadPool; 132 socketTimeout = timeout; 133 this.maxOpenConn = maxOpenConn; 134 } 135 136 139 public void initialize() throws Exception { 140 runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn); 141 if (runnerPool instanceof LogEnabled) { 142 ((LogEnabled)runnerPool).enableLogging(getLogger()); 143 } 144 ((Initializable)runnerPool).initialize(); 145 } 146 147 152 public void dispose() { 153 if (getLogger().isDebugEnabled()) { 154 getLogger().debug("Disposing server connection..." + this.toString()); 155 } 156 synchronized( this ) { 157 if( null != serverConnectionThread ) { 158 Thread thread = serverConnectionThread; 164 serverConnectionThread = null; 165 thread.interrupt(); 166 try { 167 serverSocket.close(); 168 } catch (IOException ie) { 169 } 175 try { 176 if (POLLING_INTERVAL > 0) { 177 wait(2L*POLLING_INTERVAL); 178 } else { 179 wait(); 180 } 181 } catch (InterruptedException ie) { 182 } 184 } 185 if (runnerPool instanceof Disposable) { 186 ((Disposable)runnerPool).dispose(); 187 } 188 runnerPool = null; 189 } 190 191 getLogger().debug("Closed server connection - cleaning up clients - " + this.toString()); 192 193 synchronized (clientConnectionRunners) { 194 Iterator runnerIterator = clientConnectionRunners.iterator(); 195 while( runnerIterator.hasNext() ) { 196 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next(); 197 runner.dispose(); 198 runner = null; 199 } 200 clientConnectionRunners.clear(); 201 } 202 203 getLogger().debug("Cleaned up clients - " + this.toString()); 204 205 } 206 207 212 private ClientConnectionRunner addClientConnectionRunner() 213 throws Exception { 214 synchronized (clientConnectionRunners) { 215 ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get(); 216 clientConnectionRunners.add(clientConnectionRunner); 217 if (getLogger().isDebugEnabled()) { 218 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size()); 219 } 220 return clientConnectionRunner; 221 } 222 } 223 224 229 private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) { 230 synchronized (clientConnectionRunners) { 231 if (clientConnectionRunners.remove(clientConnectionRunner)) { 232 if (getLogger().isDebugEnabled()) { 233 getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size()); 234 } 235 runnerPool.put(clientConnectionRunner); 236 } 237 } 238 } 239 240 245 public void run() { 246 serverConnectionThread = Thread.currentThread(); 247 248 int ioExceptionCount = 0; 249 try { 250 serverSocket.setSoTimeout(POLLING_INTERVAL); 251 } catch (SocketException se) { 252 } 254 255 if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) { 256 StringBuffer debugBuffer = 257 new StringBuffer (128) 258 .append(serverConnectionThread.getName()) 259 .append(" is listening on ") 260 .append(serverSocket.toString()); 261 getLogger().debug(debugBuffer.toString()); 262 } 263 while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) { 264 try { 265 Socket clientSocket = null; 266 try { 267 clientSocket = serverSocket.accept(); 268 } catch( InterruptedIOException iioe ) { 269 continue; 272 } catch( IOException se ) { 273 if (ioExceptionCount > 0) { 274 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se ); 275 break; 276 } else { 277 continue; 278 } 279 } catch( SecurityException se ) { 280 getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se ); 281 break; 282 } 283 ClientConnectionRunner runner = null; 284 synchronized (clientConnectionRunners) { 285 if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) { 286 if (getLogger().isWarnEnabled()) { 287 getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size()); 288 if (getLogger().isWarnEnabled()) { 289 Iterator runnerIterator = clientConnectionRunners.iterator(); 290 getLogger().info("Connections: "); 291 while( runnerIterator.hasNext() ) { 292 getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString()); 293 } 294 } 295 } 296 try { 297 clientSocket.close(); 298 } catch (IOException ignored) { 299 } 301 continue; 302 } else { 303 clientSocket.setSoTimeout(socketTimeout); 304 runner = addClientConnectionRunner(); 305 runner.setSocket(clientSocket); 306 } 307 } 308 setupLogger( runner ); 309 try { 310 connThreadPool.execute( runner ); 311 } catch (Exception e) { 312 getLogger().error("Internal error - insufficient threads available to service request. " + 316 Thread.activeCount() + " threads in service request pool.", e); 317 try { 318 clientSocket.close(); 319 } catch (IOException ignored) { 320 } 322 removeClientConnectionRunner(runner); 325 } 326 } catch( IOException ioe ) { 327 getLogger().error( "Exception accepting connection", ioe ); 328 } catch( Throwable e ) { 329 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e ); 330 } 331 } 332 synchronized( this ) { 333 serverConnectionThread = null; 334 Thread.currentThread().interrupted(); 335 notifyAll(); 336 } 337 } 338 339 344 class ClientConnectionRunner extends AbstractLogEnabled 345 implements Component, Poolable, Runnable { 346 347 350 private Socket clientSocket; 351 352 355 private Thread clientSocketThread; 356 357 360 public String toString() { 361 return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread; 362 } 363 364 public ClientConnectionRunner() { 365 } 366 367 371 public void dispose() { 372 synchronized( this ) { 373 if (null != clientSocketThread) { 374 clientSocketThread.interrupt(); 380 clientSocketThread = null; 381 try { 382 wait(); 383 } catch (InterruptedException ie) { 384 } 386 } 387 } 388 } 389 390 395 public void setSocket(Socket socket) { 396 clientSocket = socket; 397 } 398 399 404 public void run() { 405 ConnectionHandler handler = null; 406 try { 407 clientSocketThread = Thread.currentThread(); 408 409 handler = ServerConnection.this.handlerFactory.createConnectionHandler(); 410 String connectionString = null; 411 if( getLogger().isDebugEnabled() ) { 412 connectionString = getConnectionString(); 413 String message = "Starting " + connectionString; 414 getLogger().debug( message ); 415 } 416 417 handler.handleConnection(clientSocket); 418 419 if( getLogger().isDebugEnabled() ) { 420 String message = "Ending " + connectionString; 421 getLogger().debug( message ); 422 } 423 424 } catch( Throwable e ) { 425 getLogger().error( "Error handling connection", e ); 426 } finally { 427 428 try { 430 if (clientSocket != null) { 431 clientSocket.close(); 432 } 433 } catch( IOException ioe ) { 434 getLogger().warn( "Error shutting down connection", ioe ); 435 } 436 437 clientSocket = null; 438 439 synchronized( this ) { 442 clientSocketThread = null; 443 444 Thread.currentThread().interrupted(); 445 446 if (handler != null) { 451 ServerConnection.this.handlerFactory.releaseConnectionHandler( handler ); 452 handler = null; 453 } 454 455 ServerConnection.this.removeClientConnectionRunner(this); 457 458 notifyAll(); 459 } 460 } 461 } 462 463 468 private String getConnectionString() { 469 if (clientSocket == null) { 470 return "invalid socket"; 471 } 472 StringBuffer connectionBuffer 473 = new StringBuffer (256) 474 .append("connection on ") 475 .append(clientSocket.getLocalAddress().getHostAddress().toString()) 476 .append(":") 477 .append(clientSocket.getLocalPort()) 478 .append(" from ") 479 .append(clientSocket.getInetAddress().getHostAddress().toString()) 480 .append(":") 481 .append(clientSocket.getPort()); 482 return connectionBuffer.toString(); 483 } 484 } 485 486 489 private class ClientConnectionRunnerFactory 490 implements ObjectFactory { 491 492 495 public Object newInstance() throws Exception { 496 return new ClientConnectionRunner(); 497 } 498 499 502 public Class getCreatedClass() { 503 return ClientConnectionRunner.class; 504 } 505 506 509 public void decommission( Object object ) throws Exception { 510 return; 511 } 512 } 513 } 514 515 516 | Popular Tags |