1 22 package org.xsocket.stream.io.impl; 23 24 import java.io.IOException ; 25 import java.net.InetAddress ; 26 import java.net.InetSocketAddress ; 27 import java.net.Socket ; 28 import java.nio.ByteBuffer ; 29 import java.nio.channels.SocketChannel ; 30 import java.rmi.server.UID ; 31 import java.util.LinkedList ; 32 import java.util.Map ; 33 import java.util.Random ; 34 import java.util.Timer ; 35 import java.util.Map.Entry; 36 import java.util.concurrent.atomic.AtomicInteger ; 37 import java.util.logging.Level ; 38 import java.util.logging.Logger ; 39 40 import javax.net.ssl.SSLContext; 41 42 import org.xsocket.stream.io.spi.IAcceptor; 43 import org.xsocket.stream.io.spi.IAcceptorCallback; 44 import org.xsocket.stream.io.spi.IClientIoProvider; 45 import org.xsocket.stream.io.spi.IIoHandler; 46 import org.xsocket.stream.io.spi.IIoHandlerContext; 47 import org.xsocket.stream.io.spi.IServerIoProvider; 48 49 50 51 52 77 public final class IoProvider implements IClientIoProvider, IServerIoProvider { 78 79 private static final Logger LOG = Logger.getLogger(IoProvider.class.getName()); 80 81 82 private static final Timer TIMER = new Timer ("xIoTimer", true); 83 private static IoSocketDispatcher globalDispatcher = null; 84 85 public static final int DEFAULT_READ_BUFFER_PREALLOCATION_SIZE = 65536; 87 public static final boolean DEFAULT_USE_DIRECT_BUFFER = true; 88 public static final String USE_DIRECT_READ_BUFFER_CLIENT_KEY = "org.xsocket.stream.UseDirectReadBufferClient"; 89 public static final String READ_BUFFER_PREALLOCATIONSIZE_CLIENT_KEY = "org.xsocket.stream.ReadBufferPreallocationsizeClient"; 90 public static final String USE_DIRECT_READ_BUFFER_SERVER_KEY = "org.xsocket.stream.UseDirectReadBufferServer"; 91 public static final String READ_BUFFER_PREALLOCATIONSIZE_SERVER_KEY = "org.xsocket.stream.ReadBufferPreallocationsizeServer"; 92 private static Boolean useDirectReadBufferClient = null; 93 private static int readBufferPreallocationsizeClient = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE; 94 private static Boolean useDirectReadBufferServer = null; 95 private static int readBufferPreallocationsizeServer = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE; 96 97 private final IMemoryManager sslMemoryManagerServer = new SynchronizedMemoryManager(readBufferPreallocationsizeServer, useDirectReadBufferServer); 98 private final IMemoryManager sslMemoryManagerClient = new SynchronizedMemoryManager(readBufferPreallocationsizeClient, useDirectReadBufferClient); 99 100 101 115 116 117 private final AtomicInteger nextId = new AtomicInteger (); 119 private static String idPrefix = null; 120 121 122 static { 123 124 try { 126 useDirectReadBufferClient = new Boolean (System.getProperty(IoProvider.USE_DIRECT_READ_BUFFER_CLIENT_KEY, Boolean.toString(DEFAULT_USE_DIRECT_BUFFER))); 127 } catch (Exception e) { 128 LOG.warning("invalid value for system property " + IoProvider.USE_DIRECT_READ_BUFFER_CLIENT_KEY + ": " 129 + System.getProperty(IoProvider.USE_DIRECT_READ_BUFFER_CLIENT_KEY) + " (valid is true|false)" 130 + " using direct buffer"); 131 useDirectReadBufferClient = Boolean.TRUE; 132 } 133 134 try { 135 useDirectReadBufferServer = new Boolean (System.getProperty(IoProvider.USE_DIRECT_READ_BUFFER_SERVER_KEY, Boolean.toString(DEFAULT_USE_DIRECT_BUFFER))); 136 } catch (Exception e) { 137 LOG.warning("invalid value for system property " + IoProvider.USE_DIRECT_READ_BUFFER_SERVER_KEY + ": " 138 + System.getProperty(IoProvider.USE_DIRECT_READ_BUFFER_SERVER_KEY) + " (valid is true|false)" 139 + " using direct buffer"); 140 useDirectReadBufferServer = Boolean.TRUE; 141 } 142 143 try { 144 readBufferPreallocationsizeClient = Integer.parseInt(System.getProperty(IoProvider.READ_BUFFER_PREALLOCATIONSIZE_CLIENT_KEY, Integer.toString(DEFAULT_READ_BUFFER_PREALLOCATION_SIZE))); 145 } catch (Exception e) { 146 LOG.warning("invalid value for system property " + IoProvider.READ_BUFFER_PREALLOCATIONSIZE_CLIENT_KEY + ": " 147 + System.getProperty(IoProvider.READ_BUFFER_PREALLOCATIONSIZE_CLIENT_KEY) 148 + " using default size " + DEFAULT_READ_BUFFER_PREALLOCATION_SIZE); 149 readBufferPreallocationsizeClient = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE; 150 } 151 152 try { 153 readBufferPreallocationsizeServer = Integer.parseInt(System.getProperty(IoProvider.READ_BUFFER_PREALLOCATIONSIZE_SERVER_KEY, Integer.toString(DEFAULT_READ_BUFFER_PREALLOCATION_SIZE))); 154 } catch (Exception e) { 155 LOG.warning("invalid value for system property " + IoProvider.READ_BUFFER_PREALLOCATIONSIZE_SERVER_KEY + ": " 156 + System.getProperty(IoProvider.READ_BUFFER_PREALLOCATIONSIZE_SERVER_KEY) 157 + " using default size " + DEFAULT_READ_BUFFER_PREALLOCATION_SIZE); 158 readBufferPreallocationsizeServer = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE; 159 } 160 161 162 String base = null; 164 try { 165 base = InetAddress.getLocalHost().getCanonicalHostName(); 166 } catch (Exception e) { 167 base = new UID ().toString(); 168 } 169 170 int random = 0; 171 Random rand = new Random (); 172 do { 173 random = rand.nextInt(); 174 } while (random < 0); 175 idPrefix = Integer.toHexString(base.hashCode()) + "." + Long.toHexString(System.currentTimeMillis()) + "." + Integer.toHexString(random); 176 } 177 178 179 182 public IAcceptor createAcceptor(IAcceptorCallback callback, IIoHandlerContext handlerContext, InetSocketAddress address, int backlog, Map <String , Object > options) throws IOException { 183 Acceptor acceptor = new Acceptor(callback, handlerContext, address, backlog); 184 for (Entry<String , Object > entry : options.entrySet()) { 185 acceptor.setOption(entry.getKey(), entry.getValue()); 186 } 187 188 return acceptor; 189 } 190 191 192 195 public IAcceptor create(IAcceptorCallback callback, IIoHandlerContext handlerContext, InetSocketAddress address, int backlog, Map <String , Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 196 Acceptor acceptor = new Acceptor(callback, handlerContext, address, backlog, sslContext, sslOn); 197 for (Entry<String , Object > entry : options.entrySet()) { 198 acceptor.setOption(entry.getKey(), entry.getValue()); 199 } 200 201 return acceptor; 202 } 203 204 205 208 public IIoHandler createClientIoHandler(IIoHandlerContext ctx, InetSocketAddress remoteAddress, Map <String ,Object > options) throws IOException { 209 return createIoHandler(ctx, true, getClientDispatcher(), openSocket(remoteAddress, options), null, false); 210 } 211 212 213 216 public IIoHandler createSSLClientIoHandler(IIoHandlerContext ctx, InetSocketAddress remoteAddress, Map <String ,Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 217 return createIoHandler(ctx, true, getClientDispatcher(), openSocket(remoteAddress, options), sslContext, sslOn); 218 } 219 220 221 224 IIoHandler createIoHandler(IIoHandlerContext ctx, boolean isClient, IoSocketDispatcher dispatcher, SocketChannel channel, SSLContext sslContext, boolean sslOn) throws IOException { 225 226 String connectionId = null; 227 228 if (isClient) { 229 connectionId = idPrefix + ".c." + nextId.incrementAndGet(); 230 } else { 231 connectionId = idPrefix + ".s." + nextId.incrementAndGet(); 232 } 233 234 ChainableIoHandler ioHandler = new IoSocketHandler(channel, dispatcher, ctx, connectionId); 235 236 if (sslContext != null) { 238 239 IMemoryManager mm = null; 240 if (isClient) { 241 mm = sslMemoryManagerClient; 242 } else { 243 mm = sslMemoryManagerServer; 244 } 245 246 if (sslOn) { 247 ioHandler = new IoSSLHandler(ioHandler, sslContext, isClient, mm); 248 } else { 249 ioHandler = new IoActivateableSSLHandler(ioHandler, sslContext, isClient, mm); 250 } 251 } 252 253 if (ctx.isMultithreaded()) { 255 ioHandler = new IoMultithreadedHandler(ioHandler, ctx); 256 } 257 258 return ioHandler; 259 } 260 261 262 265 public IIoHandler setWriteTransferRate(IIoHandler ioHandler, int bytesPerSecond) throws IOException { 266 267 if (bytesPerSecond == UNLIMITED) { 269 IoThrottledWriteHandler delayWriter = (IoThrottledWriteHandler) getHandler((ChainableIoHandler) ioHandler, IoThrottledWriteHandler.class); 270 if (delayWriter != null) { 271 delayWriter.flushOutgoing(); 272 ChainableIoHandler successor = delayWriter.getSuccessor(); 273 return successor; 274 } else { 275 return ioHandler; 276 } 277 278 } else { 280 IoThrottledWriteHandler delayWriter = (IoThrottledWriteHandler) getHandler((ChainableIoHandler) ioHandler, IoThrottledWriteHandler.class); 281 if (delayWriter == null) { 282 delayWriter = new IoThrottledWriteHandler((ChainableIoHandler) ioHandler); 283 } 284 285 delayWriter.setWriteRateSec(bytesPerSecond); 286 return delayWriter; 287 } 288 } 289 290 291 292 public boolean preStartSecuredMode(IIoHandler ioHandler) throws IOException { 293 try { 294 IoActivateableSSLHandler activateableHandler = (IoActivateableSSLHandler) getHandler((ChainableIoHandler) ioHandler, IoActivateableSSLHandler.class); 295 if (activateableHandler != null) { 296 return activateableHandler.preStartSecuredMode(); 297 } else { 298 LOG.warning("connection is not SSL activatable (non IoActivateableHandler in chain"); 299 return false; 300 } 301 } catch (ClassCastException cce) { 302 throw new IOException ("only ioHandler of tpye " + ChainableIoHandler.class.getName() + " are supported"); 303 } 304 } 305 306 public void startSecuredMode(IIoHandler ioHandler, LinkedList <ByteBuffer > buffers) throws IOException { 307 try { 308 ((ChainableIoHandler) ioHandler).flushOutgoing(); 309 } catch (ClassCastException cce) { 310 throw new IOException ("only ioHandler of tpye " + ChainableIoHandler.class.getName() + " are supported"); 311 } 312 313 IoActivateableSSLHandler activateableHandler = (IoActivateableSSLHandler) getHandler((ChainableIoHandler) ioHandler, IoActivateableSSLHandler.class); 314 if (activateableHandler != null) { 315 activateableHandler.startSecuredMode(buffers); 316 } else { 317 LOG.warning("connection is not SSL activatable (non IoActivateableHandler in chain"); 318 } 319 } 320 321 322 323 static Timer getTimer() { 324 return TIMER; 325 } 326 327 static boolean isUseDirectReadBufferServer() { 328 return useDirectReadBufferServer; 329 } 330 331 332 static int getReadBufferPreallocationsizeServer() { 333 return readBufferPreallocationsizeServer; 334 } 335 336 337 private static SocketChannel openSocket(InetSocketAddress remoteAddress, Map <String ,Object > options) throws IOException { 338 SocketChannel channel = SocketChannel.open(); 339 340 for (Entry<String , Object > entry : options.entrySet()) { 341 setOption(channel.socket(), entry.getKey(), entry.getValue()); 342 } 343 344 345 try { 346 channel.socket().connect(remoteAddress); 347 } catch (IOException ioe) { 348 if (LOG.isLoggable(Level.FINE)) { 349 LOG.fine("error occured by bindung socket to remote address " + remoteAddress + " " + ioe.toString()); 350 } 351 throw ioe; 352 } 353 354 return channel; 355 } 356 357 358 359 360 private static void setOption(Socket socket, String name, Object value) throws IOException { 361 362 if (name.equals(IClientIoProvider.SO_RCVBUF)) { 363 socket.setReceiveBufferSize((Integer ) value); 364 365 } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) { 366 socket.setReuseAddress((Boolean ) value); 367 368 } else if (name.equals(IClientIoProvider.SO_SNDBUF)) { 369 socket.setSendBufferSize((Integer ) value); 370 371 } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) { 372 socket.setKeepAlive((Boolean ) value); 373 374 } else if (name.equals(IClientIoProvider.TCP_NODELAY)) { 375 socket.setTcpNoDelay((Boolean ) value); 376 377 } else if (name.equals(IClientIoProvider.SO_LINGER)) { 378 if (value instanceof Integer ) { 379 socket.setSoLinger(true, (Integer ) value); 380 } else if (value instanceof Boolean ) { 381 if (((Boolean ) value).equals(Boolean.FALSE)) { 382 socket.setSoLinger(Boolean.FALSE, 0); 383 } 384 } 385 386 } else { 387 LOG.warning("option " + name + " is not supported"); 388 } 389 } 390 391 392 393 private ChainableIoHandler getHandler(ChainableIoHandler head, Class clazz) { 394 ChainableIoHandler handler = head; 395 do { 396 if (handler.getClass() == clazz) { 397 return handler; 398 } 399 400 handler = handler.getSuccessor(); 401 } while (handler != null); 402 403 return null; 404 } 405 406 407 private static synchronized IoSocketDispatcher getClientDispatcher() { 408 if (globalDispatcher == null) { 409 globalDispatcher = new IoSocketDispatcher( new UnsynchronizedMemoryManager(readBufferPreallocationsizeClient, useDirectReadBufferClient)); 410 Thread t = new Thread (globalDispatcher); 411 t.setName(IoSocketDispatcher.DISPATCHER_PREFIX + "#" + "CLIENT"); 412 t.setDaemon(true); 413 t.start(); 414 415 if (LOG.isLoggable(Level.FINE)) { 416 LOG.fine("client dispatcher created (readbuffer preallocation size=" + readBufferPreallocationsizeClient + ", useDirectBuffer=" + useDirectReadBufferClient + ")"); 417 } 418 } 419 return globalDispatcher; 420 } 421 } | Popular Tags |