1 20 package org.apache.mina.transport.socket.nio; 21 22 import java.io.IOException ; 23 import java.net.ConnectException ; 24 import java.net.InetSocketAddress ; 25 import java.net.SocketAddress ; 26 import java.nio.channels.SelectionKey ; 27 import java.nio.channels.Selector ; 28 import java.nio.channels.SocketChannel ; 29 import java.util.Queue ; 30 import java.util.Set ; 31 import java.util.concurrent.ConcurrentLinkedQueue ; 32 import java.util.concurrent.Executor ; 33 import java.util.concurrent.atomic.AtomicInteger ; 34 35 import org.apache.mina.common.ConnectFuture; 36 import org.apache.mina.common.ExceptionMonitor; 37 import org.apache.mina.common.IoConnector; 38 import org.apache.mina.common.IoConnectorConfig; 39 import org.apache.mina.common.IoHandler; 40 import org.apache.mina.common.IoServiceConfig; 41 import org.apache.mina.common.support.AbstractIoFilterChain; 42 import org.apache.mina.common.support.BaseIoConnector; 43 import org.apache.mina.common.support.DefaultConnectFuture; 44 import org.apache.mina.util.NamePreservingRunnable; 45 import org.apache.mina.util.NewThreadExecutor; 46 47 53 public class SocketConnector extends BaseIoConnector { 54 private static final AtomicInteger nextId = new AtomicInteger (); 55 56 private final Object lock = new Object (); 57 58 private final int id = nextId.getAndIncrement(); 59 60 private final String threadName = "SocketConnector-" + id; 61 62 private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); 63 64 private final Queue <ConnectionRequest> connectQueue = new ConcurrentLinkedQueue <ConnectionRequest>(); 65 66 private final SocketIoProcessor[] ioProcessors; 67 68 private final int processorCount; 69 70 private final Executor executor; 71 72 private Selector selector; 73 74 private Worker worker; 75 76 private int processorDistributor = 0; 77 78 private int workerTimeout = 60; 80 83 public SocketConnector() { 84 this(1, new NewThreadExecutor()); 85 } 86 87 93 public SocketConnector(int processorCount, Executor executor) { 94 if (processorCount < 1) { 95 throw new IllegalArgumentException ( 96 "Must have at least one processor"); 97 } 98 99 this.executor = executor; 100 this.processorCount = processorCount; 101 ioProcessors = new SocketIoProcessor[processorCount]; 102 103 for (int i = 0; i < processorCount; i++) { 104 ioProcessors[i] = new SocketIoProcessor( 105 "SocketConnectorIoProcessor-" + id + "." + i, executor); 106 } 107 } 108 109 114 public int getWorkerTimeout() { 115 return workerTimeout; 116 } 117 118 123 public void setWorkerTimeout(int workerTimeout) { 124 if (workerTimeout < 0) { 125 throw new IllegalArgumentException ("Must be >= 0"); 126 } 127 this.workerTimeout = workerTimeout; 128 } 129 130 public ConnectFuture connect(SocketAddress address, IoHandler handler, 131 IoServiceConfig config) { 132 return connect(address, null, handler, config); 133 } 134 135 public ConnectFuture connect(SocketAddress address, 136 SocketAddress localAddress, IoHandler handler, 137 IoServiceConfig config) { 138 if (address == null) 139 throw new NullPointerException ("address"); 140 if (handler == null) 141 throw new NullPointerException ("handler"); 142 143 if (!(address instanceof InetSocketAddress )) 144 throw new IllegalArgumentException ("Unexpected address type: " 145 + address.getClass()); 146 147 if (localAddress != null 148 && !(localAddress instanceof InetSocketAddress )) 149 throw new IllegalArgumentException ( 150 "Unexpected local address type: " + localAddress.getClass()); 151 152 if (config == null) { 153 config = getDefaultConfig(); 154 } 155 156 SocketChannel ch = null; 157 boolean success = false; 158 try { 159 ch = SocketChannel.open(); 160 ch.socket().setReuseAddress(true); 161 if (localAddress != null) { 162 ch.socket().bind(localAddress); 163 } 164 165 ch.configureBlocking(false); 166 167 if (ch.connect(address)) { 168 DefaultConnectFuture future = new DefaultConnectFuture(); 169 newSession(ch, handler, config, future); 170 success = true; 171 return future; 172 } 173 174 success = true; 175 } catch (IOException e) { 176 return DefaultConnectFuture.newFailedFuture(e); 177 } finally { 178 if (!success && ch != null) { 179 try { 180 ch.close(); 181 } catch (IOException e) { 182 ExceptionMonitor.getInstance().exceptionCaught(e); 183 } 184 } 185 } 186 187 ConnectionRequest request = new ConnectionRequest(ch, handler, config); 188 synchronized (lock) { 189 try { 190 startupWorker(); 191 } catch (IOException e) { 192 try { 193 ch.close(); 194 } catch (IOException e2) { 195 ExceptionMonitor.getInstance().exceptionCaught(e2); 196 } 197 198 return DefaultConnectFuture.newFailedFuture(e); 199 } 200 } 201 202 connectQueue.add(request); 203 selector.wakeup(); 204 205 return request; 206 } 207 208 public SocketConnectorConfig getDefaultConfig() { 209 return defaultConfig; 210 } 211 212 218 public void setDefaultConfig(SocketConnectorConfig defaultConfig) { 219 if (defaultConfig == null) { 220 throw new NullPointerException ("defaultConfig"); 221 } 222 this.defaultConfig = defaultConfig; 223 } 224 225 private synchronized void startupWorker() throws IOException { 226 if (worker == null) { 227 selector = Selector.open(); 228 worker = new Worker(); 229 executor.execute(new NamePreservingRunnable(worker)); 230 } 231 } 232 233 private void registerNew() { 234 if (connectQueue.isEmpty()) 235 return; 236 237 for (;;) { 238 ConnectionRequest req = connectQueue.poll(); 239 240 if (req == null) 241 break; 242 243 SocketChannel ch = req.channel; 244 try { 245 ch.register(selector, SelectionKey.OP_CONNECT, req); 246 } catch (IOException e) { 247 req.setException(e); 248 } 249 } 250 } 251 252 private void processSessions(Set <SelectionKey > keys) { 253 for (SelectionKey key : keys) { 254 if (!key.isConnectable()) 255 continue; 256 257 SocketChannel ch = (SocketChannel ) key.channel(); 258 ConnectionRequest entry = (ConnectionRequest) key.attachment(); 259 260 boolean success = false; 261 try { 262 ch.finishConnect(); 263 newSession(ch, entry.handler, entry.config, entry); 264 success = true; 265 } catch (Throwable e) { 266 entry.setException(e); 267 } finally { 268 key.cancel(); 269 if (!success) { 270 try { 271 ch.close(); 272 } catch (IOException e) { 273 ExceptionMonitor.getInstance().exceptionCaught(e); 274 } 275 } 276 } 277 } 278 279 keys.clear(); 280 } 281 282 private void processTimedOutSessions(Set <SelectionKey > keys) { 283 long currentTime = System.currentTimeMillis(); 284 285 for (SelectionKey key : keys) { 286 if (!key.isValid()) 287 continue; 288 289 ConnectionRequest entry = (ConnectionRequest) key.attachment(); 290 291 if (currentTime >= entry.deadline) { 292 entry.setException(new ConnectException ()); 293 try { 294 key.channel().close(); 295 } catch (IOException e) { 296 ExceptionMonitor.getInstance().exceptionCaught(e); 297 } finally { 298 key.cancel(); 299 } 300 } 301 } 302 } 303 304 private void newSession(SocketChannel ch, IoHandler handler, 305 IoServiceConfig config, ConnectFuture connectFuture) 306 throws IOException { 307 SocketSessionImpl session = new SocketSessionImpl(this, 308 nextProcessor(), getListeners(), config, ch, handler, ch 309 .socket().getRemoteSocketAddress()); 310 try { 311 getFilterChainBuilder().buildFilterChain(session.getFilterChain()); 312 config.getFilterChainBuilder().buildFilterChain( 313 session.getFilterChain()); 314 config.getThreadModel().buildFilterChain(session.getFilterChain()); 315 } catch (Throwable e) { 316 throw (IOException ) new IOException ("Failed to create a session.") 317 .initCause(e); 318 } 319 320 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, 323 connectFuture); 324 325 session.getIoProcessor().addNew(session); 327 } 328 329 private SocketIoProcessor nextProcessor() { 330 if (this.processorDistributor == Integer.MAX_VALUE) { 331 this.processorDistributor = Integer.MAX_VALUE % this.processorCount; 332 } 333 334 return ioProcessors[processorDistributor++ % processorCount]; 335 } 336 337 private class Worker implements Runnable { 338 private long lastActive = System.currentTimeMillis(); 339 340 public void run() { 341 Thread.currentThread().setName(SocketConnector.this.threadName); 342 343 for (;;) { 344 try { 345 int nKeys = selector.select(1000); 346 347 registerNew(); 348 349 if (nKeys > 0) { 350 processSessions(selector.selectedKeys()); 351 } 352 353 processTimedOutSessions(selector.keys()); 354 355 if (selector.keys().isEmpty()) { 356 if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) { 357 synchronized (lock) { 358 if (selector.keys().isEmpty() 359 && connectQueue.isEmpty()) { 360 worker = null; 361 try { 362 selector.close(); 363 } catch (IOException e) { 364 ExceptionMonitor.getInstance() 365 .exceptionCaught(e); 366 } finally { 367 selector = null; 368 } 369 break; 370 } 371 } 372 } 373 } else { 374 lastActive = System.currentTimeMillis(); 375 } 376 } catch (IOException e) { 377 ExceptionMonitor.getInstance().exceptionCaught(e); 378 379 try { 380 Thread.sleep(1000); 381 } catch (InterruptedException e1) { 382 ExceptionMonitor.getInstance().exceptionCaught(e1); 383 } 384 } 385 } 386 } 387 } 388 389 private class ConnectionRequest extends DefaultConnectFuture { 390 private final SocketChannel channel; 391 392 private final long deadline; 393 394 private final IoHandler handler; 395 396 private final IoServiceConfig config; 397 398 private ConnectionRequest(SocketChannel channel, IoHandler handler, 399 IoServiceConfig config) { 400 this.channel = channel; 401 long timeout; 402 if (config instanceof IoConnectorConfig) { 403 timeout = ((IoConnectorConfig) config) 404 .getConnectTimeoutMillis(); 405 } else { 406 timeout = ((IoConnectorConfig) getDefaultConfig()) 407 .getConnectTimeoutMillis(); 408 } 409 this.deadline = System.currentTimeMillis() + timeout; 410 this.handler = handler; 411 this.config = config; 412 } 413 } 414 } | Popular Tags |