1 19 20 package org.apache.excalibur.instrument.manager.http.server; 21 22 import java.io.InterruptedIOException ; 23 import java.io.IOException ; 24 import java.net.BindException ; 25 import java.net.InetAddress ; 26 import java.net.Socket ; 27 import java.net.SocketException ; 28 import java.net.ServerSocket ; 29 import java.util.ArrayList ; 30 import java.util.Iterator ; 31 import java.util.List ; 32 33 import org.apache.excalibur.instrument.CounterInstrument; 34 import org.apache.excalibur.instrument.ValueInstrument; 35 36 41 abstract class AbstractSocketServer 42 extends AbstractLogEnabledInstrumentableStartable 43 { 44 49 private Object m_semaphore = new Object (); 50 51 52 private int m_port; 53 54 55 private int m_backlog = 50; 56 57 58 private InetAddress m_bindAddr; 59 60 61 private int m_soTimeout = 30000; 62 63 65 private long m_shutdownTimeout = 5000; 66 67 68 private boolean m_started; 69 70 71 private ServerSocket m_serverSocket; 72 73 74 private List m_openSockets = new ArrayList (); 75 76 77 private CounterInstrument m_instrumentConnects; 78 79 80 private ValueInstrument m_instrumentOpenSockets; 81 82 83 private CounterInstrument m_instrumentDisconnects; 84 85 88 95 public AbstractSocketServer( int port, InetAddress bindAddress ) 96 { 97 super(); 98 m_port = port; 99 m_bindAddr = bindAddress; 100 101 m_instrumentConnects = new CounterInstrument( "connects" ); 103 m_instrumentOpenSockets = new ValueInstrument( "open-sockets" ); 104 m_instrumentDisconnects = new CounterInstrument( "disconnects" ); 105 addInstrument( m_instrumentConnects ); 106 addInstrument( m_instrumentOpenSockets ); 107 addInstrument( m_instrumentDisconnects ); 108 } 109 110 113 118 public void start() 119 throws Exception 120 { 121 try 123 { 124 m_serverSocket = new ServerSocket ( m_port, m_backlog, m_bindAddr ); 125 } 126 catch ( IOException e ) 127 { 128 String msg = "Unable to bind to port " + m_port + ": " + e.getMessage(); 129 throw new BindException ( msg ); 130 } 131 132 super.start(); 133 } 134 135 141 protected void stopRunner() 142 throws Exception 143 { 144 ServerSocket serverSocket = m_serverSocket; 146 if ( serverSocket != null ) 147 { 148 try 149 { 150 serverSocket.close(); 151 } 152 catch ( IOException e ) 153 { 154 getLogger().debug( "Unable to close the server socket.", e ); 155 } 156 } 157 158 synchronized ( m_semaphore ) 160 { 161 long start = System.currentTimeMillis(); 162 boolean closed = false; 163 int lastSize = 0; 164 int size; 165 while ( ( size = m_openSockets.size() ) > 0 ) 166 { 167 if ( lastSize != size ) 169 { 170 getLogger().debug( 171 "Waiting until " + size + " open sockets have been closed." ); 172 lastSize = size; 173 } 174 175 try 176 { 177 m_semaphore.wait( 250 ); 179 } 180 catch ( InterruptedException e ) 181 { 182 } 184 185 long now = System.currentTimeMillis(); 187 if ( ( !closed ) 188 && ( ( m_shutdownTimeout > 0 ) && ( now - start >= m_shutdownTimeout ) ) ) 189 { 190 getLogger().debug( "Closing " + m_openSockets.size() + " open sockets that did " 191 + "not exit on their own." ); 192 193 for ( Iterator iter = m_openSockets.iterator(); iter.hasNext(); ) 194 { 195 Socket socket = (Socket )iter.next(); 196 try 197 { 198 socket.close(); 199 } 200 catch ( IOException e ) 201 { 202 getLogger().debug( "Problem closing socket.", e ); 203 } 204 } 205 206 closed = true; 207 } 208 } 209 } 210 } 211 212 218 protected void runner() 219 { 220 m_started = true; 222 223 int workerId = 1; 224 try 225 { 226 while ( !isStopping() ) 228 { 229 try 230 { 231 getLogger().debug( "Listen for a connection..." ); 232 final Socket socket = m_serverSocket.accept(); 233 234 socket.setSoTimeout( m_soTimeout ); 236 237 socket.setTcpNoDelay( true ); 239 240 if ( getLogger().isDebugEnabled() ) 241 { 242 String remoteIP = socket.getInetAddress().getHostAddress(); 243 getLogger().debug( "Accepted a connection from " + remoteIP ); 244 } 245 246 int openSockets; 251 synchronized ( m_semaphore ) 252 { 253 m_openSockets.add( socket ); 254 openSockets = m_openSockets.size(); 255 getLogger().debug( "Open sockets: " + openSockets ); 256 } 257 258 m_instrumentConnects.increment(); 260 m_instrumentOpenSockets.setValue( openSockets ); 261 262 Thread worker = new Thread ( "socketWorker." + workerId++ ) 264 { 265 public void run() 266 { 267 handleSocketInner( socket ); 268 } 269 }; 270 worker.start(); 271 } 272 catch ( Throwable t ) 273 { 274 if ( isStopping() 278 && ( ( t instanceof InterruptedException ) 279 || ( t instanceof SocketException ) 280 || ( t instanceof InterruptedIOException ) ) ) 281 { 282 } 284 else 285 { 286 getLogger().error( "Encountered an unexpected error, continuing.", t ); 287 288 try 290 { 291 Thread.sleep( 5000 ); 292 } 293 catch ( InterruptedException e ) 294 { 295 } 297 } 298 } 299 } 300 } 301 finally 302 { 303 try 305 { 306 m_serverSocket.close(); 307 } 308 catch ( IOException e ) 309 { 310 getLogger().debug( "Unable to close the server socket.", e ); 311 } 312 } 313 } 314 315 318 324 protected abstract void handleSocket( Socket socket ); 325 326 333 private void handleSocketInner( Socket socket ) 334 { 335 try 336 { 337 try 338 { 339 handleSocket( socket ); 340 } 341 finally 342 { 343 int openSockets; 344 synchronized ( m_semaphore ) 345 { 346 m_openSockets.remove( socket ); 347 openSockets = m_openSockets.size(); 348 getLogger().debug( "Open sockets: " + openSockets ); 349 350 m_semaphore.notify(); 352 } 353 354 m_instrumentOpenSockets.setValue( openSockets ); 356 m_instrumentDisconnects.increment(); 357 358 socket.close(); 360 } 361 } 362 catch ( Throwable t ) 363 { 364 getLogger().error( "Encountered an error while handling socket: " + socket, t ); 365 } 366 } 367 368 protected int getSoTimeout() 369 { 370 return m_soTimeout; 371 } 372 } 373 | Popular Tags |