KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > ubik > net > mplex > MultiplexServerSocket


1 package org.sapia.ubik.net.mplex;
2
3 import org.sapia.ubik.rmi.server.Log;
4
5 import java.io.IOException JavaDoc;
6
7 import java.net.InetAddress JavaDoc;
8 import java.net.ServerSocket JavaDoc;
9 import java.net.Socket JavaDoc;
10 import java.net.SocketException JavaDoc;
11
12 import java.util.ArrayList JavaDoc;
13 import java.util.Date JavaDoc;
14 import java.util.Iterator JavaDoc;
15 import java.util.List JavaDoc;
16 import org.sapia.ubik.util.Localhost;
17
18
19 /**
20  * This class is the main server socket that multiplexes the traditionnal
21  * <code>java.net.ServerSocket</code>. That means that it listens on the port of the
22  * server socket and it provides a mechanism to register different handlers for
23  * these new incoming socket connections. These handlers are called socket connectors
24  * and they can be used to process multiple types of data streams over the same socket.<p>
25  *
26  * For instance, using this <code>MultiplexServerSocket</code> you could handle serialized
27  * Java objects and HTTP requests using two different connectors (two distinct handling
28  * logic), same host/port. This is achieved through a read-ahead on the incoming stream of data
29  * from a new client socket connection. When a new incoming connection is requested by a client,
30  * all the registered <code>StreamSelector</code>s are used to determine which socket connector
31  * will handle the new connection.<p>
32  *
33  * The internals of this <code>MultiplexServerSocket</code> are simple. An instance of this class
34  * works on an asynchronous process were new socket connections are first accepted, and then selected.
35  * These two steps are described as follows:
36  * <ol>
37  * <li><b>acceptor</b>: This first step involves the low-level logic of virtual machine that resides
38  * under the Java I/O package. When a client connects to the server, a new <code>MultiplexSocket</code>
39  * is created on the server-side; the acceptor adds that new connection/socket to the internal
40  * pending queue.
41  * </li>
42  * <li><b>selector</b>: The selector listens on the queue of accepted connections and performs
43  * the logic of going through all the candidate socket connectors to determine which one of them
44  * handles the new socket connection; if none of the registered connectors is.
45  * </li>
46  * </ol>
47  * To perform this process, the <code>MultiplexServerSocket</code> contains two distinct pools of
48  * threads that are configurable using the <code>setAcceptorDaemonThread(int)</code> and
49  * <code>setSelectorDaemonThread(int)</code> methods.<p>
50  *
51  * The primary goal of the <code>MultiplexServerSocket</code> was to keep the "standard" (non-NIO)
52  * programming model of the JDK regarding socket handling. Whether you use this server socket
53  * directly or you use a socket connector, the logic is still the same for the client that
54  * receives new incoming connections: it calls an <code>accept()</code> method that blocks
55  * until a new connection is made. For integration with actual running systems, an adapter
56  * is available to make a socket connector look like a server socket. We used that strategy
57  * with the open-source Simple HTTP server and it worked transparently and fluidly.
58  *
59  * @see MultiplexSocket
60  * @see MultiplexSocketConnector
61  * @see ServerSocketAdapter
62  * @see StreamSelector
63  * @author <a HREF="mailto:jc@sapia-oss.org">Jean-Cedric Desrochers</a>
64  * <dl>
65  * <dt><b>Copyright:</b><dd>Copyright &#169; 2002-2004 <a HREF="http://www.sapia-oss.org">
66  * Sapia Open Source Software</a>. All Rights Reserved.</dd></dt>
67  * <dt><b>License:</b><dd>Read the license.txt file of the jar or visit the
68  * <a HREF="http://www.sapia-oss.org/license.html" target="sapia-license">license page</a>
69  * at the Sapia OSS web site</dd></dt>
70  * </dl>
71  */

72 public class MultiplexServerSocket extends ServerSocket JavaDoc implements Runnable JavaDoc {
73   /**
74    * The default number of bytes read ahead fom incoming socket connections.
75    */

76   public static final short DEFAULT_READ_AHEAD_BUFFER_SIZE = 64;
77
78   /**
79    * The default number of acceptor daemon thread used to accept connections.
80    */

81   public static final short DEFAULT_ACCEPTOR_DAEMON_THREAD = 1;
82
83   /**
84    * The default number of selector daemon thread used to accept connections.
85    */

86   public static final short DEFAULT_SELECTOR_DAEMON_THREAD = 1;
87
88   /** The list of created connectors that can handle new connections on this server. */
89   private List JavaDoc _theConnectors = new ArrayList JavaDoc();
90
91   /** The default connector that handles incoming connections. */
92   private SocketConnectorImpl _theDefaultConnector;
93
94   /** The list of running acceptor daemon threads. */
95   private List JavaDoc _theAcceptorDaemons = new ArrayList JavaDoc();
96
97   /** The list of running selector daemon threads. */
98   private List JavaDoc _theSelectorDaemons = new ArrayList JavaDoc();
99
100   /** The socket queue of accepted connection. */
101   private SocketQueue _theAcceptedQueue = new SocketQueue();
102
103   /** The number of acceptor daemon threads of this server. */
104   private int _theAcceptorDaemonThread = DEFAULT_ACCEPTOR_DAEMON_THREAD;
105
106   /** The number of selector daemon threads of this server. */
107   private int _theSelectorDaemonThread = DEFAULT_SELECTOR_DAEMON_THREAD;
108
109   /** The number of bytes read ahead when accepting a new connection. */
110   private int _theReadAheadBufferSize = DEFAULT_READ_AHEAD_BUFFER_SIZE;
111   
112   /**
113    * Creates a new MultiplexServerSocket instance. The server
114    * socket is unbound
115    *
116    * @throws IOException If an error occurs opening the socket.
117    */

118   public MultiplexServerSocket() throws IOException JavaDoc {
119     super();
120   }
121
122   /**
123    * Creates a new MultiplexServerSocket instance. The new server will
124    * be bound on the speficied port, or to a free port if the value passed
125    * in is 0 (zero).
126    *
127    * The maximum queue length for incoming connection indications (a
128    * request to connect) is set to <code>50</code>. If a connection
129    * indication arrives when the queue is full, the connection is refused.
130    *
131    * @param port The port number to bind the server or 0 to use any port.
132    * @throws IOException If an error occurs when opening the socket.
133    */

134   public MultiplexServerSocket(int port) throws IOException JavaDoc {
135     super(port, 50, Localhost.getLocalAddress());
136   }
137
138   /**
139    * Creates a new MultiplexServerSocket instance. The new server will
140    * be bound on the speficied port, or to a free port if the value passed
141    * in is 0 (zero).
142    *
143    * The maximum queue length for incoming connection indications (a
144    * request to connect) is set to the <code>backlog</code> parameter. If
145    * a connection indication arrives when the queue is full, the
146    * connection is refused.
147    *
148    * @param port The port number to bind the server or 0 oi use any port.
149    * @param backlog The maximum length of the queue.
150    * @throws IOException If an error occurs when opening the socket.
151    */

152   public MultiplexServerSocket(int port, int backlog) throws IOException JavaDoc {
153     super(port, backlog, Localhost.getLocalAddress());
154   }
155
156   /**
157    * Creates a new MultiplexServerSocket instance. The new server will
158    * be bound on the speficied port, or to a free port if the value passed
159    * in is 0 (zero). The server will use the local IP address represented
160    * by the <code>bindAddr</code> passed in.
161    *
162    * The maximum queue length for incoming connection indications (a
163    * request to connect) is set to the <code>backlog</code> parameter. If
164    * a connection indication arrives when the queue is full, the
165    * connection is refused.
166    *
167    * @param port The port number to bind the server or 0 oi use any port.
168    * @param backlog The maximum length of the queue.
169    * @param bindAddr The local TCP address the server will bind to. If <code>null</code>
170    * the server will accept connections on any/all local addresses.
171    * @throws IOException If an error occurs when opening the socket.
172    */

173   public MultiplexServerSocket(int port, int backlog, InetAddress JavaDoc bindAddr)
174     throws IOException JavaDoc {
175     super(port, backlog, bindAddr);
176   }
177
178   /**
179    * Returns the size of the buffer used to pre-read the incoming bytes of the
180    * accepted connection.
181    *
182    * @return The size of the read ahead buffer size.
183    */

184   public int getReadAheadBufferSize() {
185     return _theReadAheadBufferSize;
186   }
187
188   /**
189    * Changes the size of the read ahead buffer size. This method can only be called before starting
190    * the server (ie. the initial call to the method <code>accept()</code>.
191    *
192    * @param aSize The new size.
193    * @exception IllegalStateException If the server is already running.
194    */

195   public void setReadAheadBufferSize(int aSize) {
196     if (aSize <= 0) {
197       throw new IllegalArgumentException JavaDoc("The size is less than zero");
198     } else if (_theAcceptorDaemons.size() > 0) {
199       throw new IllegalStateException JavaDoc(
200         "Cannot change the read ahead buffer size on a running server socket");
201     }
202
203     _theReadAheadBufferSize = aSize;
204   }
205
206   /**
207    * Returns the number of daemon threads used to accept incoming connections.
208    *
209    * @return The number of daemon threads used to accept incoming connections.
210    */

211   public int getAcceptorDaemonThread() {
212     return _theAcceptorDaemonThread;
213   }
214
215   /**
216    * Returns the number of daemon threads used to select an connector for incoming connections.
217    *
218    * @return The number of daemon threads used to select an connector for incoming connections.
219    */

220   public int getSelectorDaemonThread() {
221     return _theSelectorDaemonThread;
222   }
223
224   /**
225    * Changes the number of daemon threads used to accept incoming connections.
226    *
227    * @param maxThread The new numbe of running daemon.
228    * @exception IllegalStateException If the server is already running.
229    */

230   public void setAcceptorDaemonThread(int maxThread) {
231     if (maxThread <= 0) {
232       throw new IllegalArgumentException JavaDoc("The size is less than zero");
233     } else if (_theAcceptorDaemons.size() > 0) {
234       throw new IllegalStateException JavaDoc(
235         "Cannot change the number of acceptor daemons on a running server socket");
236     }
237
238     _theAcceptorDaemonThread = maxThread;
239   }
240
241   /**
242    * Changes the number of daemon threads used to select connectors for incoming connections.
243    *
244    * @param maxThread The new numbe of running daemon.
245    * @exception IllegalStateException If the server is already running.
246    */

247   public void setSelectorDaemonThread(int maxThread) {
248     if (maxThread <= 0) {
249       throw new IllegalArgumentException JavaDoc("The size is less than zero");
250     } else if (_theSelectorDaemons.size() > 0) {
251       throw new IllegalStateException JavaDoc(
252         "Cannot change the number of selector daemons on a running server socket");
253     }
254
255     _theSelectorDaemonThread = maxThread;
256   }
257
258   /**
259    * This factory method creates a socket connector through which a client will be able
260    * to receive incoming socket connections. The stream selector passed in will be used
261    * y this multiplex server socket to determine if an incoming socket connection must
262    * be handled by the created socket connector.
263    *
264    * @param aSelector The stream selector to assign to the created socket connector.
265    * @return The created socket connector.
266    * @throws IllegalStateException If this server socket is closed.
267    */

268   public synchronized MultiplexSocketConnector createSocketConnector(
269     StreamSelector aSelector) {
270     if (aSelector == null) {
271       throw new IllegalArgumentException JavaDoc("The selector passed in is null");
272     } else if (isClosed()) {
273       throw new IllegalStateException JavaDoc(
274         "Could not create a socket connector, the server socket is closed");
275     }
276
277     MultiplexSocketConnector aConnector = new SocketConnectorImpl(this,
278         aSelector, new SocketQueue());
279     _theConnectors.add(aConnector);
280
281     return aConnector;
282   }
283
284   /**
285    * Internal method that initializes the default connector that gets all the incoming socket
286    * connections. It also creates all the acceptor and selector thread pools according to the
287    * configuration.
288    */

289   private synchronized void initializeDefaultConnector() {
290     if (_theDefaultConnector == null) {
291       // Create the default handler
292
_theDefaultConnector = new SocketConnectorImpl(this,
293           new PositiveStreamSelector(), new SocketQueue());
294
295       // Create the selector daemons
296
for (int i = 1; i <= _theAcceptorDaemonThread; i++) {
297         Thread JavaDoc aDaemon = new Thread JavaDoc(new SelectorTask(),
298             "MultiplexServerSocket-Selector" + i);
299         aDaemon.setDaemon(true);
300         _theSelectorDaemons.add(aDaemon);
301         aDaemon.start();
302       }
303
304       // Create the accceptor daemons
305
for (int i = 1; i <= _theAcceptorDaemonThread; i++) {
306         Thread JavaDoc aDaemon = new Thread JavaDoc(this, "MultiplexServerSocket-Acceptor" + i);
307         aDaemon.setDaemon(true);
308         _theAcceptorDaemons.add(aDaemon);
309         aDaemon.start();
310       }
311     }
312   }
313
314   /**
315    * Removes the socket connector passed in from the list of available connectors
316    * associated with this server socket. After this method the multiplex server socket
317    * will no longer send new incoming socket connection to the socket connector.
318    *
319    * @param aConnector The socket connector to remove from this server socket.
320    */

321   public synchronized void removeSocketConnector(
322     MultiplexSocketConnector aConnector) {
323     if (aConnector == null) {
324       throw new IllegalArgumentException JavaDoc("The connector passed in is null");
325     }
326
327     _theConnectors.remove(aConnector);
328   }
329
330   /**
331    * Extract the first bytes of the multiplex socket passed.
332    *
333    * @param aSocket The socket from which to get the data.
334    * @param aMaxLength The maximum number of bytes to extract.
335    * @return The array of bytes representingthe header.
336    * @throws IOException If an error occurs extracting the header.
337    */

338   private byte[] extractHeader(MultiplexSocket aSocket, int aMaxLength)
339     throws IOException JavaDoc {
340     // Extract the first bytes of the input stream
341
byte[] preview = new byte[aMaxLength];
342     int length = aSocket.getPushbackInputStream().read(preview, 0,
343         preview.length);
344
345     // Put back the headers in the stream
346
aSocket.getPushbackInputStream().unread(preview, 0, length);
347
348     // Trim the array of bytes
349
byte[] header;
350
351     if (length < preview.length) {
352       header = new byte[length];
353       System.arraycopy(preview, 0, header, 0, length);
354     } else {
355       header = preview;
356     }
357
358     return header;
359   }
360
361   /**
362    * Listens for a connection to be made to this socket and accepts it. The default
363    * connector of this server socket will be used to act as the main port of entry
364    * for clients directly using the server socket instead of the default socket
365    * connector. The method blocks until a connection is made.
366    *
367    * @return The new Socket
368    * @exception IOException If an I/O error occurs when waiting for a connection.
369    * @exception SocketTimeoutException if a timeout was previously set with setSoTimeout and
370    * the timeout has been reached.
371    */

372   public Socket JavaDoc accept() throws IOException JavaDoc {
373     if (isClosed()) {
374       throw new SocketException JavaDoc("Socket is closed");
375     } else if (!isBound()) {
376       throw new SocketException JavaDoc("Socket is not bound yet");
377     } else if (_theDefaultConnector == null) {
378       initializeDefaultConnector();
379     }
380
381     return _theDefaultConnector.getQueue().getSocket();
382   }
383
384   /**
385    * Closes this multiplex server socket. If socket connectors are associated to
386    * this server scket then they are closed as well.
387    *
388    * @exception IOException If an I/O error occurs when closing the socket.
389    */

390   public synchronized void close() throws IOException JavaDoc {
391     try {
392       super.close();
393     } finally {
394       if (_theDefaultConnector != null) {
395         _theDefaultConnector.close();
396       }
397
398       // May close the acceptor and selector threads here!!!
399
if (_theConnectors != null) {
400         // To avoid concurrent modifs when removing
401
ArrayList JavaDoc someConnectors = new ArrayList JavaDoc(_theConnectors);
402
403         for (Iterator JavaDoc it = someConnectors.iterator(); it.hasNext();) {
404           MultiplexSocketConnector aConnector = (MultiplexSocketConnector) it.next();
405           aConnector.close();
406         }
407       }
408     }
409   }
410
411   /**
412    * Returns the implementation address and implementation port of
413    * this socket as a <code>String</code>.
414    *
415    * @return A string representation of this socket.
416    */

417   public String JavaDoc toString() {
418     StringBuffer JavaDoc aBuffer = new StringBuffer JavaDoc();
419     aBuffer.append("MultiplexServerSocket[").append(super.toString()).append("]");
420
421     return aBuffer.toString();
422   }
423
424   /**
425    * Implements the Runnable interface and it performs the asynchronous acceptor logic
426    * of the multiplex. Used by the acceptor threads, it listens on the underlying
427    * server socket and dispacthes all the incoming socket connections to the queue
428    * of accepted socket connection. The client socket connections put in that queue are
429    * then processed asynchronously by the selector threads.
430    */

431   public void run() {
432     try {
433       Log.warning(getClass(),
434         new Date JavaDoc() + " [" + Thread.currentThread().getName() +
435         "] MultiplexServerSocket * REPORT * Starting this acceptor thread");
436
437       // Loop for accepting incoming client socket connection
438
while (!isClosed() && !Thread.interrupted()) {
439         try {
440           // Wait for a connection
441
MultiplexSocket aClient = new MultiplexSocket(null,
442               _theReadAheadBufferSize);
443           implAccept(aClient);
444
445           _theAcceptedQueue.add(aClient);
446         } catch (IOException JavaDoc ioe) {
447           _theDefaultConnector.getQueue().setException(ioe);
448         }
449       }
450     } catch (Exception JavaDoc e) {
451       Log.error(getClass(),
452         new Date JavaDoc() + " [" + Thread.currentThread().getName() +
453         "] MultiplexServerSocket * ERROR * An unhandled exception occured in this acceptor thread",
454         e);
455     } finally {
456       Log.warning(getClass(),
457         new Date JavaDoc() + " [" + Thread.currentThread().getName() +
458         "] MultiplexServerSocket * REPORT * Stopping this acceptor thread");
459     }
460   }
461
462   /**
463    * This utility method is used to select the socket connector that requests the new
464    * client socket connection passed in. Note that this method will only use as candidate
465    * selectors the one that are registered with this server socket (and not the default one).
466    *
467    * @param aClient The client socket connection for which to select a connector.
468    * @return The first connector that selects the socket connection, or <code>null</code>
469    * if no connector selects the client socket r if there is no registered connector.
470    * @exception IOException If an error occurs selecting a connector for the socket.
471    */

472   private SocketConnectorImpl selectConnector(MultiplexSocket aClient)
473     throws IOException JavaDoc {
474     SocketConnectorImpl aConnector = null;
475
476     if (_theConnectors.size() > 0) {
477       // Extract the first bytes of the socket input stream
478
byte[] header = extractHeader(aClient, _theReadAheadBufferSize);
479
480       // Select the right connector
481
synchronized (this) {
482         for (Iterator JavaDoc it = _theConnectors.iterator(); it.hasNext();) {
483           SocketConnectorImpl aCandidate = (SocketConnectorImpl) it.next();
484
485           if (aCandidate.getSelector().selectStream(header)) {
486             aConnector = aCandidate;
487
488             break;
489           }
490         }
491       }
492     }
493
494     return aConnector;
495   }
496
497   /**
498    * This class is responsible of the selection logic of the multiplex.
499    */

500   public class SelectorTask implements Runnable JavaDoc {
501     /**
502      * This method waits on the queue of accepted client socket connection
503      * and will processes asynchonously, using the selector threads, to the election
504      * of the appropriate socket selector for the new accepted socket connection.
505      */

506     public void run() {
507       try {
508         Log.warning(getClass(),
509           new Date JavaDoc() + " [" + Thread.currentThread().getName() +
510           "] MultiplexServerSocket * REPORT * Starting this selector thread");
511
512         // Loop for selecting new client socket connections
513
while (!isClosed() && !Thread.interrupted()) {
514           MultiplexSocket aSocket;
515           SocketConnectorImpl aConnector;
516
517           try {
518             // Get the next accepted client socket
519
aSocket = (MultiplexSocket) _theAcceptedQueue.getSocket();
520
521             // Selects a registered connector
522
aConnector = selectConnector(aSocket);
523
524             // If no connector selected, use the default one
525
if (aConnector == null) {
526               _theDefaultConnector.getQueue().add(aSocket);
527             } else {
528               aConnector.getQueue().add(aSocket);
529             }
530           } catch (IOException JavaDoc ioe) {
531             _theDefaultConnector.getQueue().setException(ioe);
532           }
533         }
534       } catch (Exception JavaDoc e) {
535         Log.error(new Date JavaDoc() + " [" + Thread.currentThread().getName() +
536           "] MultiplexServerSocket * ERROR * An unhandled exception occured in this selector thread",
537           e);
538       } finally {
539         Log.warning(getClass(),
540           new Date JavaDoc() + " [" + Thread.currentThread().getName() +
541           "] MultiplexServerSocket * REPORT * Stopping this selector thread");
542       }
543     }
544   }
545 }
546
Popular Tags