KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > excalibur > instrument > manager > http > server > AbstractSocketServer


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.
15  *
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */

19
20 package org.apache.excalibur.instrument.manager.http.server;
21
22 import java.io.InterruptedIOException JavaDoc;
23 import java.io.IOException JavaDoc;
24 import java.net.BindException JavaDoc;
25 import java.net.InetAddress JavaDoc;
26 import java.net.Socket JavaDoc;
27 import java.net.SocketException JavaDoc;
28 import java.net.ServerSocket JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.List JavaDoc;
32
33 import org.apache.excalibur.instrument.CounterInstrument;
34 import org.apache.excalibur.instrument.ValueInstrument;
35
36 /**
37  *
38  * @author <a HREF="mailto:dev@avalon.apache.org">Avalon Development Team</a>
39  * @version $Revision: 1.6 $
40  */

41 abstract class AbstractSocketServer
42     extends AbstractLogEnabledInstrumentableStartable
43 {
44     /** Semaphore used to synchronize actions contained in this class. This is to
45      * avoid unintended problems with synchronization performed by subclasses
46      * this is important here because this is a server and actions from one
47      * thread can cause a call out through a socket into another thread and
48      * cause deadlocks. */

49     private Object JavaDoc m_semaphore = new Object JavaDoc();
50     
51     /** The port to listen on for connections. */
52     private int m_port;
53     
54     /** The backlog to assign to the server socket. */
55     private int m_backlog = 50;
56     
57     /** The address to bind the port server to. Null for any address. */
58     private InetAddress JavaDoc m_bindAddr;
59     
60     /** The SO_TIMEOUT to use for client sockets. */
61     private int m_soTimeout = 30000; /* 30 seconds. */
62     
63     /** The time in ms after the component starts to shutdown that sockets
64      * will have to shutdown on their own before they are closed. */

65     private long m_shutdownTimeout = 5000; /* 5 seconds. */
66     
67     /** Flag which keeps track of when the server has been started. */
68     private boolean m_started;
69     
70     /** Reference to the ServerSocket. */
71     private ServerSocket JavaDoc m_serverSocket;
72     
73     /** Used to track the number of open sockets. */
74     private List JavaDoc m_openSockets = new ArrayList JavaDoc();
75     
76     /** Number of times that the server socket is connected to. */
77     private CounterInstrument m_instrumentConnects;
78     
79     /** Number of sockets that are connected at any given time. */
80     private ValueInstrument m_instrumentOpenSockets;
81     
82     /** Number of times that a connection socket disconnects. */
83     private CounterInstrument m_instrumentDisconnects;
84     
85     /*---------------------------------------------------------------
86      * Constructors
87      *-------------------------------------------------------------*/

88     /**
89      * Creates a new AbstractSocketServer.
90      *
91      * @param port The port on which the server will listen.
92      * @param bindAddress The address on which the server will listen for
93      * connections.
94      */

95     public AbstractSocketServer( int port, InetAddress JavaDoc bindAddress )
96     {
97         super();
98         m_port = port;
99         m_bindAddr = bindAddress;
100         
101         // Create instruments
102
m_instrumentConnects = new CounterInstrument( "connects" );
103         m_instrumentOpenSockets = new ValueInstrument( "open-sockets" );
104         m_instrumentDisconnects = new CounterInstrument( "disconnects" );
105         addInstrument( m_instrumentConnects );
106         addInstrument( m_instrumentOpenSockets );
107         addInstrument( m_instrumentDisconnects );
108     }
109     
110     /*---------------------------------------------------------------
111      * AbstractLogEnabledInstrumentableStartable Methods
112      *-------------------------------------------------------------*/

113     /**
114      * Starts the runner thread.
115      *
116      * @throws Exception If there are any problems.
117      */

118     public void start()
119         throws Exception JavaDoc
120     {
121         // Create the server socket.
122
try
123         {
124             m_serverSocket = new ServerSocket JavaDoc( m_port, m_backlog, m_bindAddr );
125         }
126         catch ( IOException JavaDoc e )
127         {
128             String JavaDoc msg = "Unable to bind to port " + m_port + ": " + e.getMessage();
129             throw new BindException JavaDoc( msg );
130         }
131         
132         super.start();
133     }
134     
135     /**
136      * Called when the component is being stopped, the isStopping method will
137      * always return true when this method is called.
138      *
139      * @throws Exception If there are any problems
140      */

141     protected void stopRunner()
142         throws Exception JavaDoc
143     {
144         // Close the server socket so it stops blocking for new connections.
145
ServerSocket JavaDoc serverSocket = m_serverSocket;
146         if ( serverSocket != null )
147         {
148             try
149             {
150                 serverSocket.close();
151             }
152             catch ( IOException JavaDoc e )
153             {
154                 getLogger().debug( "Unable to close the server socket.", e );
155             }
156         }
157         
158         // Wait for any sockets that are still open to complete.
159
synchronized ( m_semaphore )
160         {
161             long start = System.currentTimeMillis();
162             boolean closed = false;
163             int lastSize = 0;
164             int size;
165             while ( ( size = m_openSockets.size() ) > 0 )
166             {
167                 // Only display the remaining queue size when the number changes.
168
if ( lastSize != size )
169                 {
170                     getLogger().debug(
171                         "Waiting until " + size + " open sockets have been closed." );
172                     lastSize = size;
173                 }
174                 
175                 try
176                 {
177                     // Will be notified whenever a socket is removed from the openSockets list.
178
m_semaphore.wait( 250 );
179                 }
180                 catch ( InterruptedException JavaDoc e )
181                 {
182                     // Ignore.
183
}
184                 
185                 // If we have already waited too long, then try closing the connections.
186
long now = System.currentTimeMillis();
187                 if ( ( !closed )
188                     && ( ( m_shutdownTimeout > 0 ) && ( now - start >= m_shutdownTimeout ) ) )
189                 {
190                     getLogger().debug( "Closing " + m_openSockets.size() + " open sockets that did "
191                         + "not exit on their own." );
192                     
193                     for ( Iterator JavaDoc iter = m_openSockets.iterator(); iter.hasNext(); )
194                     {
195                         Socket JavaDoc socket = (Socket JavaDoc)iter.next();
196                         try
197                         {
198                             socket.close();
199                         }
200                         catch ( IOException JavaDoc e )
201                         {
202                             getLogger().debug( "Problem closing socket.", e );
203                         }
204                     }
205                     
206                     closed = true;
207                 }
208             }
209         }
210     }
211     
212     /**
213      * Runner method that will be called when the component is started.
214      * The method must monitor the isStopping() method and make sure
215      * that it returns in a timely manner when the isStopping() method
216      * returns true.
217      */

218     protected void runner()
219     {
220         // Set the started flag
221
m_started = true;
222         
223         int workerId = 1;
224         try
225         {
226             // Loop until we are asked to stop.
227
while ( !isStopping() )
228             {
229                 try
230                 {
231                     getLogger().debug( "Listen for a connection..." );
232                     final Socket JavaDoc socket = m_serverSocket.accept();
233                     
234                     // Set the SO_TIMEOUT for the socket.
235
socket.setSoTimeout( m_soTimeout );
236                     
237                     // Set the TCP_NO_DELAY flag for the socket to improve performance.
238
socket.setTcpNoDelay( true );
239                     
240                     if ( getLogger().isDebugEnabled() )
241                     {
242                         String JavaDoc remoteIP = socket.getInetAddress().getHostAddress();
243                         getLogger().debug( "Accepted a connection from " + remoteIP );
244                     }
245                     
246                     // Increment the number of open sockets. This is done here rather than in
247
// handleSocketInner so that it will be incremented before we request a
248
// worker from the thread pool. This is necessary to avoid timing problems
249
// during shutdown of the component.
250
int openSockets;
251                     synchronized ( m_semaphore )
252                     {
253                         m_openSockets.add( socket );
254                         openSockets = m_openSockets.size();
255                         getLogger().debug( "Open sockets: " + openSockets );
256                     }
257                     
258                     // Notify the instrument manager
259
m_instrumentConnects.increment();
260                     m_instrumentOpenSockets.setValue( openSockets );
261                     
262                     // Handle the socket in a new thread. May want to use pooling here later.
263
Thread JavaDoc worker = new Thread JavaDoc( "socketWorker." + workerId++ )
264                     {
265                         public void run()
266                         {
267                             handleSocketInner( socket );
268                         }
269                     };
270                     worker.start();
271                 }
272                 catch ( Throwable JavaDoc t )
273                 {
274                     // Check for throwable type this way rather than with seperate catches
275
// to work around a problem where InterruptedException can be thrown
276
// when the compiler gives an error saying that it can't.
277
if ( isStopping()
278                         && ( ( t instanceof InterruptedException JavaDoc )
279                         || ( t instanceof SocketException JavaDoc )
280                         || ( t instanceof InterruptedIOException JavaDoc ) ) )
281                     {
282                         // This is expected, the service is being stopped.
283
}
284                     else
285                     {
286                         getLogger().error( "Encountered an unexpected error, continuing.", t );
287                         
288                         // Avoid tight thrashing
289
try
290                         {
291                             Thread.sleep( 5000 );
292                         }
293                         catch ( InterruptedException JavaDoc e )
294                         {
295                             // Ignore
296
}
297                     }
298                 }
299             }
300         }
301         finally
302         {
303             // Always make sure the server socket is closed.
304
try
305             {
306                 m_serverSocket.close();
307             }
308             catch ( IOException JavaDoc e )
309             {
310                 getLogger().debug( "Unable to close the server socket.", e );
311             }
312         }
313     }
314     
315     /*---------------------------------------------------------------
316      * Methods
317      *-------------------------------------------------------------*/

318     /**
319      * Handle a newly connected socket. The implementation need not
320      * worry about closing the socket.
321      *
322      * @param socket Newly connected Socket to be handled.
323      */

324     protected abstract void handleSocket( Socket JavaDoc socket );
325     
326     /**
327      * Keeps track of instrumentation related to the life of a socket as well
328      * as handles any errors encountered while handling the socket in the
329      * user code.
330      *
331      * @param socket The socket to be handled.
332      */

333     private void handleSocketInner( Socket JavaDoc socket )
334     {
335         try
336         {
337             try
338             {
339                 handleSocket( socket );
340             }
341             finally
342             {
343                 int openSockets;
344                 synchronized ( m_semaphore )
345                 {
346                     m_openSockets.remove( socket );
347                     openSockets = m_openSockets.size();
348                     getLogger().debug( "Open sockets: " + openSockets );
349                     
350                     // Notify the stop method if it is waiting for this socket to complete.
351
m_semaphore.notify();
352                 }
353                 
354                 // Notify the instrument manager
355
m_instrumentOpenSockets.setValue( openSockets );
356                 m_instrumentDisconnects.increment();
357                 
358                 // Always close the socket.
359
socket.close();
360             }
361         }
362         catch ( Throwable JavaDoc t )
363         {
364             getLogger().error( "Encountered an error while handling socket: " + socket, t );
365         }
366     }
367     
368     protected int getSoTimeout()
369     {
370         return m_soTimeout;
371     }
372 }
373
Popular Tags