KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > james > util > connection > ServerConnection


1 /***********************************************************************
2  * Copyright (c) 2000-2004 The Apache Software Foundation. *
3  * All rights reserved. *
4  * ------------------------------------------------------------------- *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you *
6  * may not use this file except in compliance with the License. You *
7  * may obtain a copy of the License at: *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14  * implied. See the License for the specific language governing *
15  * permissions and limitations under the License. *
16  ***********************************************************************/

17
18 package org.apache.james.util.connection;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InterruptedIOException JavaDoc;
22 import java.net.ServerSocket JavaDoc;
23 import java.net.Socket JavaDoc;
24 import java.net.SocketException JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.List JavaDoc;
28
29 import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
30 import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
31 import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
32 import org.apache.avalon.excalibur.pool.ObjectFactory;
33 import org.apache.avalon.excalibur.pool.Pool;
34 import org.apache.avalon.excalibur.pool.Poolable;
35 import org.apache.avalon.excalibur.thread.ThreadPool;
36 import org.apache.avalon.framework.activity.Disposable;
37 import org.apache.avalon.framework.activity.Initializable;
38 import org.apache.avalon.framework.component.Component;
39 import org.apache.avalon.framework.logger.AbstractLogEnabled;
40 import org.apache.avalon.framework.logger.LogEnabled;
41
42
43 /**
44  * Represents a single server socket managed by a connection manager.
45  * The connection manager will spawn a single ServerConnection for each
46  * server socket that the connection manager is managing.
47  *
48  */

49 public class ServerConnection extends AbstractLogEnabled
50     implements Component, Initializable, Runnable JavaDoc {
51
52     /**
53      * This is a hack to deal with the fact that there appears to be
54      * no platform-independent way to break out of a ServerSocket
55      * accept() call. On some platforms closing either the ServerSocket
56      * itself, or its associated InputStream, causes the accept
57      * method to exit. Unfortunately, this behavior is not consistent
58      * across platforms. The deal with this, we introduce a polling
59      * loop of 20 seconds for the server socket. This introduces a
60      * cost across platforms, but is necessary to maintain cross-platform
61      * functionality.
62      */

63     private static int POLLING_INTERVAL = 20*1000;
64
65     /**
66      * The server socket which this connection is managing
67      */

68     private ServerSocket JavaDoc serverSocket;
69
70     /**
71      * The connection handler factory that generates connection
72      * handlers to manage client connections to this server socket
73      */

74     private ConnectionHandlerFactory handlerFactory;
75
76     /**
77      * The pool that produces ClientConnectionRunners
78      */

79     private Pool runnerPool;
80
81     /**
82      * The factory used to provide ClientConnectionRunner objects
83      */

84     private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
85
86     /**
87      * The thread pool used to spawn individual threads used to manage each
88      * client connection.
89      */

90     private ThreadPool connThreadPool;
91
92     /**
93      * The timeout for client sockets spawned off this connection.
94      */

95     private int socketTimeout;
96
97     /**
98      * The maximum number of open client connections that this server
99      * connection will allow.
100      */

101     private int maxOpenConn;
102
103     /**
104      * A collection of client connection runners.
105      */

106     private final ArrayList JavaDoc clientConnectionRunners = new ArrayList JavaDoc();
107
108     /**
109      * The thread used to manage this server connection.
110      */

111     private Thread JavaDoc serverConnectionThread;
112
113     /**
114      * The sole constructor for a ServerConnection.
115      *
116      * @param serverSocket the ServerSocket associated with this ServerConnection
117      * @param handlerFactory the factory that generates ConnectionHandlers for the client
118      * connections spawned off this ServerConnection
119      * @param threadPool the ThreadPool used to obtain handler threads
120      * @param timeout the client idle timeout for this ServerConnection's client connections
121      * @param maxOpenConn the maximum number of open client connections allowed for this
122      * ServerConnection
123      */

124     public ServerConnection(ServerSocket JavaDoc serverSocket,
125                             ConnectionHandlerFactory handlerFactory,
126                             ThreadPool threadPool,
127                             int timeout,
128                             int maxOpenConn) {
129         this.serverSocket = serverSocket;
130         this.handlerFactory = handlerFactory;
131         connThreadPool = threadPool;
132         socketTimeout = timeout;
133         this.maxOpenConn = maxOpenConn;
134     }
135
136     /**
137      * @see org.apache.avalon.framework.activity.Initializable#initialize()
138      */

139     public void initialize() throws Exception JavaDoc {
140         runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
141         if (runnerPool instanceof LogEnabled) {
142             ((LogEnabled)runnerPool).enableLogging(getLogger());
143         }
144         ((Initializable)runnerPool).initialize();
145     }
146
147     /**
148      * The dispose operation is called by the owning ConnectionManager
149      * at the end of its lifecycle. Cleans up the server connection, forcing
150      * everything to finish.
151      */

152     public void dispose() {
153         if (getLogger().isDebugEnabled()) {
154             getLogger().debug("Disposing server connection..." + this.toString());
155         }
156         synchronized( this ) {
157             if( null != serverConnectionThread ) {
158                 // Execution of this block means that the run() method
159
// hasn't finished yet. So we interrupt the thread
160
// to terminate run() and wait for the run() method
161
// to finish. The notifyAll() at the end of run() will
162
// wake this thread and allow dispose() to end.
163
Thread JavaDoc thread = serverConnectionThread;
164                 serverConnectionThread = null;
165                 thread.interrupt();
166                 try {
167                     serverSocket.close();
168                 } catch (IOException JavaDoc ie) {
169                     // Ignored - we're doing this to break out of the
170
// accept. This minimizes the time required to
171
// shutdown the server. Unfortunately, this is
172
// not guaranteed to work on all platforms. See
173
// the comments for POLLING_INTERVAL
174
}
175                 try {
176                     if (POLLING_INTERVAL > 0) {
177                         wait(2L*POLLING_INTERVAL);
178                     } else {
179                         wait();
180                     }
181                 } catch (InterruptedException JavaDoc ie) {
182                     // Expected - just complete dispose()
183
}
184             }
185             if (runnerPool instanceof Disposable) {
186                 ((Disposable)runnerPool).dispose();
187             }
188             runnerPool = null;
189         }
190
191         getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
192
193         synchronized (clientConnectionRunners) {
194             Iterator JavaDoc runnerIterator = clientConnectionRunners.iterator();
195             while( runnerIterator.hasNext() ) {
196                 ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
197                 runner.dispose();
198                 runner = null;
199             }
200             clientConnectionRunners.clear();
201         }
202
203         getLogger().debug("Cleaned up clients - " + this.toString());
204
205     }
206
207     /**
208      * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
209      *
210      * @param clientConnectionRunner the ClientConnectionRunner to be added
211      */

212     private ClientConnectionRunner addClientConnectionRunner()
213             throws Exception JavaDoc {
214         synchronized (clientConnectionRunners) {
215             ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
216             clientConnectionRunners.add(clientConnectionRunner);
217             if (getLogger().isDebugEnabled()) {
218                 getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
219             }
220             return clientConnectionRunner;
221         }
222     }
223
224     /**
225      * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
226      *
227      * @param clientConnectionRunner the ClientConnectionRunner to be removed
228      */

229     private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
230         synchronized (clientConnectionRunners) {
231             if (clientConnectionRunners.remove(clientConnectionRunner)) {
232                 if (getLogger().isDebugEnabled()) {
233                     getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
234                 }
235                 runnerPool.put(clientConnectionRunner);
236             }
237         }
238     }
239
240     /**
241      * Provides the body for the thread of execution for a ServerConnection.
242      * Connections made to the server socket are passed to an appropriate,
243      * newly created, ClientConnectionRunner
244      */

245     public void run() {
246         serverConnectionThread = Thread.currentThread();
247
248         int ioExceptionCount = 0;
249         try {
250             serverSocket.setSoTimeout(POLLING_INTERVAL);
251         } catch (SocketException JavaDoc se) {
252             // Ignored - for the moment
253
}
254
255         if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
256             StringBuffer JavaDoc debugBuffer =
257                 new StringBuffer JavaDoc(128)
258                     .append(serverConnectionThread.getName())
259                     .append(" is listening on ")
260                     .append(serverSocket.toString());
261             getLogger().debug(debugBuffer.toString());
262         }
263         while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
264             try {
265                 Socket JavaDoc clientSocket = null;
266                 try {
267                     clientSocket = serverSocket.accept();
268                 } catch( InterruptedIOException JavaDoc iioe ) {
269                     // This exception is expected upon ServerConnection shutdown.
270
// See the POLLING_INTERVAL comment
271
continue;
272                 } catch( IOException JavaDoc se ) {
273                     if (ioExceptionCount > 0) {
274                         getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
275                         break;
276                     } else {
277                         continue;
278                     }
279                 } catch( SecurityException JavaDoc se ) {
280                     getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
281                     break;
282                 }
283                 ClientConnectionRunner runner = null;
284                 synchronized (clientConnectionRunners) {
285                     if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
286                         if (getLogger().isWarnEnabled()) {
287                            getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size());
288                            if (getLogger().isWarnEnabled()) {
289                                Iterator JavaDoc runnerIterator = clientConnectionRunners.iterator();
290                                getLogger().info("Connections: ");
291                                while( runnerIterator.hasNext() ) {
292                                    getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString());
293                                }
294                            }
295                         }
296                         try {
297                             clientSocket.close();
298                         } catch (IOException JavaDoc ignored) {
299                             // We ignore this exception, as we already have an error condition.
300
}
301                         continue;
302                     } else {
303                         clientSocket.setSoTimeout(socketTimeout);
304                         runner = addClientConnectionRunner();
305                         runner.setSocket(clientSocket);
306                     }
307                 }
308                 setupLogger( runner );
309                 try {
310                     connThreadPool.execute( runner );
311                 } catch (Exception JavaDoc e) {
312                     // This error indicates that the underlying thread pool
313
// is out of threads. For robustness, we catch this and
314
// cleanup
315
getLogger().error("Internal error - insufficient threads available to service request. " +
316                                       Thread.activeCount() + " threads in service request pool.", e);
317                     try {
318                         clientSocket.close();
319                     } catch (IOException JavaDoc ignored) {
320                         // We ignore this exception, as we already have an error condition.
321
}
322                     // In this case, the thread will not remove the client connection runner,
323
// so we must.
324
removeClientConnectionRunner(runner);
325                 }
326             } catch( IOException JavaDoc ioe ) {
327                 getLogger().error( "Exception accepting connection", ioe );
328             } catch( Throwable JavaDoc e ) {
329                 getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
330             }
331         }
332         synchronized( this ) {
333             serverConnectionThread = null;
334             Thread.currentThread().interrupted();
335             notifyAll();
336         }
337     }
338
339     /**
340      * An inner class to provide the actual body of the thread of execution
341      * that occurs upon a client connection.
342      *
343      */

344     class ClientConnectionRunner extends AbstractLogEnabled
345         implements Component, Poolable, Runnable JavaDoc {
346
347         /**
348          * The Socket that this client connection is using for transport.
349          */

350         private Socket JavaDoc clientSocket;
351
352         /**
353          * The thread of execution associated with this client connection.
354          */

355         private Thread JavaDoc clientSocketThread;
356
357         /**
358          * Returns string for diagnostic logging
359          */

360         public String JavaDoc toString() {
361             return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
362         }
363
364         public ClientConnectionRunner() {
365         }
366
367         /**
368          * The dispose operation that terminates the runner. Should only be
369          * called by the ServerConnection that owns the ClientConnectionRunner
370          */

371         public void dispose() {
372             synchronized( this ) {
373                 if (null != clientSocketThread) {
374                     // Execution of this block means that the run() method
375
// hasn't finished yet. So we interrupt the thread
376
// to terminate run() and wait for the run() method
377
// to finish. The notifyAll() at the end of run() will
378
// wake this thread and allow dispose() to end.
379
clientSocketThread.interrupt();
380                     clientSocketThread = null;
381                     try {
382                         wait();
383                     } catch (InterruptedException JavaDoc ie) {
384                         // Expected - return from the method
385
}
386                 }
387             }
388         }
389
390         /**
391          * Sets the socket for a ClientConnectionRunner.
392          *
393          * @param socket the client socket associated with this ClientConnectionRunner
394          */

395         public void setSocket(Socket JavaDoc socket) {
396             clientSocket = socket;
397         }
398
399         /**
400          * Provides the body for the thread of execution dealing with a particular client
401          * connection. An appropriate ConnectionHandler is created, applied, executed,
402          * and released.
403          */

404         public void run() {
405             ConnectionHandler handler = null;
406             try {
407                 clientSocketThread = Thread.currentThread();
408
409                 handler = ServerConnection.this.handlerFactory.createConnectionHandler();
410                 String JavaDoc connectionString = null;
411                 if( getLogger().isDebugEnabled() ) {
412                     connectionString = getConnectionString();
413                     String JavaDoc message = "Starting " + connectionString;
414                     getLogger().debug( message );
415                 }
416
417                 handler.handleConnection(clientSocket);
418
419                 if( getLogger().isDebugEnabled() ) {
420                     String JavaDoc message = "Ending " + connectionString;
421                     getLogger().debug( message );
422                 }
423
424             } catch( Throwable JavaDoc e ) {
425                 getLogger().error( "Error handling connection", e );
426             } finally {
427
428                 // Close the underlying socket
429
try {
430                     if (clientSocket != null) {
431                         clientSocket.close();
432                     }
433                 } catch( IOException JavaDoc ioe ) {
434                     getLogger().warn( "Error shutting down connection", ioe );
435                 }
436
437                 clientSocket = null;
438
439                 // Null out the thread, notify other threads to encourage
440
// a context switch
441
synchronized( this ) {
442                     clientSocketThread = null;
443
444                     Thread.currentThread().interrupted();
445
446                     // Release the handler and kill the reference to the handler factory
447
//
448
// This needs to be done after the clientSocketThread is nulled out,
449
// otherwise we could trash a reused ClientConnectionRunner
450
if (handler != null) {
451                         ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
452                         handler = null;
453                     }
454
455                     // Remove this runner from the list of active connections.
456
ServerConnection.this.removeClientConnectionRunner(this);
457
458                     notifyAll();
459                 }
460             }
461         }
462
463         /**
464          * Helper method to return a formatted string with connection transport information.
465          *
466          * @return a formatted string
467          */

468         private String JavaDoc getConnectionString() {
469             if (clientSocket == null) {
470                 return "invalid socket";
471             }
472             StringBuffer JavaDoc connectionBuffer
473                 = new StringBuffer JavaDoc(256)
474                     .append("connection on ")
475                     .append(clientSocket.getLocalAddress().getHostAddress().toString())
476                     .append(":")
477                     .append(clientSocket.getLocalPort())
478                     .append(" from ")
479                     .append(clientSocket.getInetAddress().getHostAddress().toString())
480                     .append(":")
481                     .append(clientSocket.getPort());
482             return connectionBuffer.toString();
483         }
484     }
485
486     /**
487      * The factory for producing handlers.
488      */

489     private class ClientConnectionRunnerFactory
490         implements ObjectFactory {
491
492         /**
493          * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
494          */

495         public Object JavaDoc newInstance() throws Exception JavaDoc {
496             return new ClientConnectionRunner();
497         }
498
499         /**
500          * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
501          */

502         public Class JavaDoc getCreatedClass() {
503             return ClientConnectionRunner.class;
504         }
505
506         /**
507          * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
508          */

509         public void decommission( Object JavaDoc object ) throws Exception JavaDoc {
510             return;
511         }
512     }
513 }
514
515
516
Popular Tags