1 45 package org.exolab.jms.net.socket; 46 47 import java.io.IOException ; 48 import java.net.InetAddress ; 49 import java.net.ServerSocket ; 50 import java.net.Socket ; 51 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 55 56 import org.exolab.jms.net.connector.Authenticator; 57 import org.exolab.jms.net.connector.ManagedConnection; 58 import org.exolab.jms.net.connector.ManagedConnectionAcceptor; 59 import org.exolab.jms.net.connector.ManagedConnectionAcceptorListener; 60 import org.exolab.jms.net.connector.ResourceException; 61 import org.exolab.jms.net.connector.URIRequestInfo; 62 import org.exolab.jms.net.uri.URI; 63 import org.exolab.jms.net.util.ThreadPool; 64 65 66 72 public abstract class SocketManagedConnectionAcceptor 73 implements ManagedConnectionAcceptor { 74 75 78 private Authenticator _authenticator; 79 80 83 private ServerSocket _socket; 84 85 88 private final URI _uri; 89 90 93 private PooledExecutor _pool; 94 95 98 private final ThreadGroup _group; 99 100 103 private Dispatcher _dispatcher; 104 105 108 private final SocketRequestInfo _info; 109 110 111 114 private static final Log _log = 115 LogFactory.getLog(SocketManagedConnectionAcceptor.class); 116 117 118 136 public SocketManagedConnectionAcceptor(Authenticator authenticator, 137 SocketRequestInfo info) 138 throws ResourceException { 139 140 if (authenticator == null) { 141 throw new IllegalArgumentException ( 142 "Argument 'authenticator' is null"); 143 } 144 if (info == null) { 145 throw new IllegalArgumentException ("Argument 'info' is null"); 146 } 147 148 _authenticator = authenticator; 149 _uri = info.getURI(); 150 _info = info; 151 int port = info.getPort(); 152 try { 153 InetAddress host = null; 154 if (!info.getBindAll()) { 155 host = info.getHostAddress(); 156 } 157 int backlog = info.getConnectionRequestQueueSize(); 158 _socket = createServerSocket(port, backlog, host); 159 } catch (IOException exception) { 160 throw new ResourceException( 161 "Failed to create server socket for URI=" + info.getURI(), 162 exception); 163 } 164 165 _group = new ThreadGroup (_uri.toString()); 166 StringBuffer name = new StringBuffer (); 167 name.append(_uri.toString()); 168 name.append("[server]"); 169 _pool = new ThreadPool(_group, name.toString(), info.getMaxThreads()); 170 } 171 172 178 public synchronized void accept(ManagedConnectionAcceptorListener listener) 179 throws ResourceException { 180 if (_dispatcher != null) { 181 throw new ResourceException( 182 "Acceptor is already accepting connections on URI=" + _uri); 183 } 184 185 _dispatcher = new Dispatcher(listener); 186 _dispatcher.start(); 187 if (_log.isDebugEnabled()) { 188 _log.debug("Acceptor accepting requests at URI=" + _uri); 189 } 190 } 191 192 197 public SocketRequestInfo getRequestInfo() { 198 return _info; 199 } 200 201 206 public URI getURI() { 207 return _uri; 208 } 209 210 216 public synchronized void close() throws ResourceException { 217 if (_log.isDebugEnabled()) { 218 _log.debug("Acceptor shutting down at URI=" + _uri); 219 } 220 _pool.shutdownNow(); 221 if (_dispatcher != null) { 222 _dispatcher.close(); 224 if (Thread.currentThread() != _dispatcher) { 225 try { 226 _dispatcher.join(); } catch (InterruptedException ignore) { 228 } 230 } 231 _dispatcher = null; 232 _socket = null; 233 } else if (_socket != null) { 234 try { 235 _socket.close(); 236 _socket = null; 237 } catch (IOException exception) { 238 throw new ResourceException("Failed to close socket", 239 exception); 240 } 241 } 242 } 243 244 255 protected ServerSocket createServerSocket(int port, int backlog, 256 InetAddress host) 257 throws IOException { 258 return new ServerSocket (port, backlog, host); 259 } 260 261 272 protected abstract ManagedConnection createManagedConnection( 273 URI uri, Socket socket, Authenticator authenticator, 274 PooledExecutor pool) 275 throws ResourceException; 276 277 280 private class Dispatcher extends Thread { 281 282 285 private final ManagedConnectionAcceptorListener _listener; 286 287 290 private volatile boolean _closed = false; 291 292 297 public Dispatcher(ManagedConnectionAcceptorListener listener) { 298 super(_group, getURI() + "[acceptor]"); 299 _listener = listener; 300 } 301 302 305 public void close() { 306 _closed = true; 307 try { 308 _socket.close(); 309 } catch (IOException exception) { 310 _log.debug(exception); 311 } 312 } 313 314 317 public void run() { 318 while (!_closed) { 319 try { 320 Socket socket = _socket.accept(); 321 ManagedConnection connection = createManagedConnection( 322 _uri, socket, _authenticator, _pool); 323 _listener.accepted(SocketManagedConnectionAcceptor.this, 324 connection); 325 } catch (Exception exception) { 326 if (!_closed) { 327 _listener.error(SocketManagedConnectionAcceptor.this, 328 exception); 329 } 330 break; 331 } 332 } 333 } 334 } 335 } 336 | Popular Tags |