|                                                                                                              1
 19
 20  package org.apache.excalibur.instrument.manager.http.server;
 21
 22  import java.io.InterruptedIOException
  ; 23  import java.io.IOException
  ; 24  import java.net.BindException
  ; 25  import java.net.InetAddress
  ; 26  import java.net.Socket
  ; 27  import java.net.SocketException
  ; 28  import java.net.ServerSocket
  ; 29  import java.util.ArrayList
  ; 30  import java.util.Iterator
  ; 31  import java.util.List
  ; 32
 33  import org.apache.excalibur.instrument.CounterInstrument;
 34  import org.apache.excalibur.instrument.ValueInstrument;
 35
 36
 41  abstract class AbstractSocketServer
 42      extends AbstractLogEnabledInstrumentableStartable
 43  {
 44
 49      private Object
  m_semaphore = new Object  (); 50
 51
 52      private int m_port;
 53
 54
 55      private int m_backlog = 50;
 56
 57
 58      private InetAddress
  m_bindAddr; 59
 60
 61      private int m_soTimeout = 30000;
 62
 63
 65      private long m_shutdownTimeout = 5000;
 66
 67
 68      private boolean m_started;
 69
 70
 71      private ServerSocket
  m_serverSocket; 72
 73
 74      private List
  m_openSockets = new ArrayList  (); 75
 76
 77      private CounterInstrument m_instrumentConnects;
 78
 79
 80      private ValueInstrument m_instrumentOpenSockets;
 81
 82
 83      private CounterInstrument m_instrumentDisconnects;
 84
 85
 88
 95      public AbstractSocketServer( int port, InetAddress
  bindAddress ) 96      {
 97          super();
 98          m_port = port;
 99          m_bindAddr = bindAddress;
 100
 101                 m_instrumentConnects = new CounterInstrument( "connects" );
 103         m_instrumentOpenSockets = new ValueInstrument( "open-sockets" );
 104         m_instrumentDisconnects = new CounterInstrument( "disconnects" );
 105         addInstrument( m_instrumentConnects );
 106         addInstrument( m_instrumentOpenSockets );
 107         addInstrument( m_instrumentDisconnects );
 108     }
 109
 110
 113
 118     public void start()
 119         throws Exception
  120     {
 121                 try
 123         {
 124             m_serverSocket = new ServerSocket
  ( m_port, m_backlog, m_bindAddr ); 125         }
 126         catch ( IOException
  e ) 127         {
 128             String
  msg = "Unable to bind to port " + m_port + ": " + e.getMessage(); 129             throw new BindException
  ( msg ); 130         }
 131
 132         super.start();
 133     }
 134
 135
 141     protected void stopRunner()
 142         throws Exception
  143     {
 144                 ServerSocket
  serverSocket = m_serverSocket; 146         if ( serverSocket != null )
 147         {
 148             try
 149             {
 150                 serverSocket.close();
 151             }
 152             catch ( IOException
  e ) 153             {
 154                 getLogger().debug( "Unable to close the server socket.", e );
 155             }
 156         }
 157
 158                 synchronized ( m_semaphore )
 160         {
 161             long start = System.currentTimeMillis();
 162             boolean closed = false;
 163             int lastSize = 0;
 164             int size;
 165             while ( ( size = m_openSockets.size() ) > 0 )
 166             {
 167                                 if ( lastSize != size )
 169                 {
 170                     getLogger().debug(
 171                         "Waiting until " + size + " open sockets have been closed." );
 172                     lastSize = size;
 173                 }
 174
 175                 try
 176                 {
 177                                         m_semaphore.wait( 250 );
 179                 }
 180                 catch ( InterruptedException
  e ) 181                 {
 182                                     }
 184
 185                                 long now = System.currentTimeMillis();
 187                 if ( ( !closed )
 188                     && ( ( m_shutdownTimeout > 0 ) && ( now - start >= m_shutdownTimeout ) ) )
 189                 {
 190                     getLogger().debug( "Closing " + m_openSockets.size() + " open sockets that did "
 191                         + "not exit on their own." );
 192
 193                     for ( Iterator
  iter = m_openSockets.iterator(); iter.hasNext(); ) 194                     {
 195                         Socket
  socket = (Socket  )iter.next(); 196                         try
 197                         {
 198                             socket.close();
 199                         }
 200                         catch ( IOException
  e ) 201                         {
 202                             getLogger().debug( "Problem closing socket.", e );
 203                         }
 204                     }
 205
 206                     closed = true;
 207                 }
 208             }
 209         }
 210     }
 211
 212
 218     protected void runner()
 219     {
 220                 m_started = true;
 222
 223         int workerId = 1;
 224         try
 225         {
 226                         while ( !isStopping() )
 228             {
 229                 try
 230                 {
 231                     getLogger().debug( "Listen for a connection..." );
 232                     final Socket
  socket = m_serverSocket.accept(); 233
 234                                         socket.setSoTimeout( m_soTimeout );
 236
 237                                         socket.setTcpNoDelay( true );
 239
 240                     if ( getLogger().isDebugEnabled() )
 241                     {
 242                         String
  remoteIP = socket.getInetAddress().getHostAddress(); 243                         getLogger().debug( "Accepted a connection from " + remoteIP );
 244                     }
 245
 246                                                                                                     int openSockets;
 251                     synchronized ( m_semaphore )
 252                     {
 253                         m_openSockets.add( socket );
 254                         openSockets = m_openSockets.size();
 255                         getLogger().debug( "Open sockets: " + openSockets );
 256                     }
 257
 258                                         m_instrumentConnects.increment();
 260                     m_instrumentOpenSockets.setValue( openSockets );
 261
 262                                         Thread
  worker = new Thread  ( "socketWorker." + workerId++ ) 264                     {
 265                         public void run()
 266                         {
 267                             handleSocketInner( socket );
 268                         }
 269                     };
 270                     worker.start();
 271                 }
 272                 catch ( Throwable
  t ) 273                 {
 274                                                                                 if ( isStopping()
 278                         && ( ( t instanceof InterruptedException
  ) 279                         || ( t instanceof SocketException
  ) 280                         || ( t instanceof InterruptedIOException
  ) ) ) 281                     {
 282                                             }
 284                     else
 285                     {
 286                         getLogger().error( "Encountered an unexpected error, continuing.", t );
 287
 288                                                 try
 290                         {
 291                             Thread.sleep( 5000 );
 292                         }
 293                         catch ( InterruptedException
  e ) 294                         {
 295                                                     }
 297                     }
 298                 }
 299             }
 300         }
 301         finally
 302         {
 303                         try
 305             {
 306                 m_serverSocket.close();
 307             }
 308             catch ( IOException
  e ) 309             {
 310                 getLogger().debug( "Unable to close the server socket.", e );
 311             }
 312         }
 313     }
 314
 315
 318
 324     protected abstract void handleSocket( Socket
  socket ); 325
 326
 333     private void handleSocketInner( Socket
  socket ) 334     {
 335         try
 336         {
 337             try
 338             {
 339                 handleSocket( socket );
 340             }
 341             finally
 342             {
 343                 int openSockets;
 344                 synchronized ( m_semaphore )
 345                 {
 346                     m_openSockets.remove( socket );
 347                     openSockets = m_openSockets.size();
 348                     getLogger().debug( "Open sockets: " + openSockets );
 349
 350                                         m_semaphore.notify();
 352                 }
 353
 354                                 m_instrumentOpenSockets.setValue( openSockets );
 356                 m_instrumentDisconnects.increment();
 357
 358                                 socket.close();
 360             }
 361         }
 362         catch ( Throwable
  t ) 363         {
 364             getLogger().error( "Encountered an error while handling socket: " + socket, t );
 365         }
 366     }
 367
 368     protected int getSoTimeout()
 369     {
 370         return m_soTimeout;
 371     }
 372 }
 373
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |