1 22 package org.xsocket.stream; 23 24 import java.io.IOException ; 25 import java.lang.annotation.Inherited ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.nio.BufferUnderflowException ; 29 import java.nio.ByteBuffer ; 30 import java.nio.channels.WritableByteChannel ; 31 import java.util.concurrent.Executor ; 32 import java.util.logging.Level ; 33 import java.util.logging.Logger ; 34 35 import org.xsocket.ClosedConnectionException; 36 import org.xsocket.MaxReadSizeExceededException; 37 38 39 40 41 83 public final class NonBlockingConnectionPool extends AbstractConnectionPool { 84 85 private static final Logger LOG = Logger.getLogger(NonBlockingConnectionPool.class.getName()); 86 87 public static final long UNLIMITED_TIMEOUT = AbstractConnectionPool.MAX_TIMEOUT; 88 89 90 95 public NonBlockingConnectionPool(long timeToIdleMillis) { 96 this(timeToIdleMillis, Integer.MAX_VALUE, NULL); 97 } 98 99 100 public NonBlockingConnectionPool(long timeToIdleMillis, int maxActive, long maxWaitTimeMillis) { 101 super(timeToIdleMillis, MAX_TIMEOUT, maxActive, maxWaitTimeMillis, MAX_SIZE); 102 } 103 104 105 117 public INonBlockingConnection getNonBlockingConnection(String host, int port) throws IOException , WaitTimeoutException { 118 return (INonBlockingConnection) getConnection(new InetSocketAddress (host, port), null); 119 } 120 121 122 134 public INonBlockingConnection getNonBlockingConnection(InetAddress address, int port) throws IOException , WaitTimeoutException { 135 return (INonBlockingConnection) getConnection(new InetSocketAddress (address, port), null); 136 } 137 138 139 140 153 public INonBlockingConnection getNonBlockingConnection(InetAddress address, int port, IHandler appHandler) throws IOException , WaitTimeoutException { 154 return getNonBlockingConnection(address, port, appHandler, null); 155 } 156 157 158 159 173 public INonBlockingConnection getNonBlockingConnection(InetAddress address, int port, IHandler appHandler, Executor workerPool) throws IOException , WaitTimeoutException { 174 return (INonBlockingConnection) getConnection(new InetSocketAddress (address, port), appHandler, workerPool); 175 } 176 177 178 private INonBlockingConnection getConnection(InetSocketAddress address, IHandler appHandler, Executor workerPool) throws IOException , WaitTimeoutException { 179 PoolableNonBlockingConnection connection = (PoolableNonBlockingConnection) getConnection(address, workerPool); 180 if (connection != null) { 181 connection.setHandler(appHandler); 182 connection.notiyConnect(); 183 return connection; 184 } else { 185 throw new IOException ("couldn't create a connection to " + address); 186 } 187 } 188 189 190 191 192 195 @Override 196 PoolableConnection createConnection(InetSocketAddress address, Executor workerPool) throws IOException { 197 if (workerPool != null) { 198 return new PoolableNonBlockingConnection(address, workerPool); 199 } else { 200 return new PoolableNonBlockingConnection(address); 201 } 202 } 203 204 205 206 private final class PoolableNonBlockingConnection extends PoolableConnection implements INonBlockingConnection { 207 208 public PoolableNonBlockingConnection(InetSocketAddress address) throws IOException { 209 super(NonBlockingConnectionPool.this, new NonBlockingConnection(address.getAddress(), address.getPort(), new HandlerProxy()), address); 210 getHandlerProxy().init(this); 211 } 212 213 public PoolableNonBlockingConnection(InetSocketAddress address, Executor workerPool) throws IOException { 214 super(NonBlockingConnectionPool.this, new NonBlockingConnection(address.getAddress(), address.getPort(), new HandlerProxy(), workerPool), address); 215 getHandlerProxy().init(this); 216 } 217 218 public INonBlockingConnection setOption(String name, Object value) throws IOException { 219 return getUnderlyingConnection().setOption(name, value); 220 } 221 222 void setHandler(IHandler handler) { 223 getHandlerProxy().setHandler(handler); 224 } 225 226 private HandlerProxy getHandlerProxy() { 227 return (HandlerProxy) getUnderlyingConnection().getAppHandler(); 228 } 229 230 void notiyConnect() throws IOException { 231 HandlerProxy hdlProxy = (HandlerProxy) getUnderlyingConnection().getAppHandler(); 232 hdlProxy.notifyConnect(getUnderlyingConnection()); 233 } 234 235 @Override 236 void reset() throws IOException { 237 setHandler(null); 238 239 super.reset(); 240 } 241 242 public void setFlushmode(FlushMode flushMode) { 243 getUnderlyingConnection().setFlushmode(flushMode); 244 } 245 246 public FlushMode getFlushmode() { 247 return getUnderlyingConnection().getFlushmode(); 248 } 249 250 public void setWriteTransferRate(int bytesPerSecond) throws ClosedConnectionException, IOException { 251 getUnderlyingConnection().setWriteTransferRate(bytesPerSecond); 252 } 253 254 public ByteBuffer [] readAvailable() throws IOException , ClosedConnectionException { 255 return getUnderlyingConnection().readAvailable(); 256 } 257 258 public boolean readAvailableByDelimiter(String delimiter, WritableByteChannel outputChannel) throws IOException , ClosedConnectionException { 259 return getUnderlyingConnection().readAvailableByDelimiter(delimiter, outputChannel); 260 } 261 262 public boolean readAvailableByDelimiter(String delimiter, String encoding, WritableByteChannel outputChannel) throws IOException , ClosedConnectionException { 263 return getUnderlyingConnection().readAvailableByDelimiter(delimiter, encoding, outputChannel); 264 } 265 266 public int getNumberOfAvailableBytes() { 267 return getUnderlyingConnection().getNumberOfAvailableBytes(); 268 } 269 270 public int indexOf(String str) { 271 return getUnderlyingConnection().indexOf(str); 272 } 273 274 private NonBlockingConnection getUnderlyingConnection() { 275 return (NonBlockingConnection) getDelegee(); 276 } 277 } 278 279 280 private static final class HandlerProxy implements IDataHandler, IDisconnectHandler, ITimeoutHandler { 281 282 private PoolableNonBlockingConnection poolableConnection = null; 283 284 private IHandler handler = null; 285 private boolean isDataHandler = false; 286 private boolean isConnectHandler = false; 287 private boolean isDisconnectHandler = false; 288 private boolean isTimeoutHandler = false; 289 290 void init(PoolableNonBlockingConnection poolableConnection) { 291 this.poolableConnection = poolableConnection; 292 } 293 294 void setHandler(IHandler handler) { 295 this.handler = handler; 296 297 if (handler != null) { 298 isDataHandler = (handler instanceof IDataHandler); 299 isConnectHandler = (handler instanceof IConnectHandler); 300 isDisconnectHandler = (handler instanceof IDisconnectHandler); 301 isTimeoutHandler = (handler instanceof ITimeoutHandler); 302 } 303 } 304 305 306 void notifyConnect(INonBlockingConnection connection) throws IOException { 307 if (isConnectHandler) { 308 ((IConnectHandler) handler).onConnect(poolableConnection); 309 } 310 } 311 312 public boolean onData(INonBlockingConnection connection) throws IOException , BufferUnderflowException , MaxReadSizeExceededException { 313 if (isDataHandler) { 314 ((IDataHandler) handler).onData(poolableConnection); 315 } 316 return true; 317 } 318 319 320 public boolean onDisconnect(INonBlockingConnection connection) throws IOException { 321 if (isDisconnectHandler) { 322 ((IDisconnectHandler) handler).onDisconnect(poolableConnection); 323 } 324 325 poolableConnection.reallyClose(); 326 return true; 327 } 328 329 330 public boolean onConnectionTimeout(INonBlockingConnection connection) throws IOException { 331 poolableConnection.setReuseable(false); 332 333 boolean isHandled = false; 334 if(isTimeoutHandler) { 335 isHandled = ((ITimeoutHandler) handler).onConnectionTimeout(poolableConnection); 336 } 337 338 if (!isHandled) { 339 poolableConnection.reallyClose(); 340 } 341 342 return true; 343 } 344 345 346 public boolean onIdleTimeout(INonBlockingConnection connection) throws IOException { 347 poolableConnection.setReuseable(false); 348 349 if (LOG.isLoggable(Level.FINE)) { 350 LOG.fine("idle timout occured for pooled connection " + poolableConnection.getId()); 351 } 352 353 boolean isHandled = false; 354 if(isTimeoutHandler) { 355 isHandled = ((ITimeoutHandler) handler).onIdleTimeout(poolableConnection); 356 } 357 358 if (!isHandled) { 359 poolableConnection.reallyClose(); 360 } 361 362 return true; 363 } 364 } 365 } 366 | Popular Tags |