1 20 package org.apache.mina.transport.socket.nio; 21 22 import java.io.IOException ; 23 import java.net.InetSocketAddress ; 24 import java.net.SocketAddress ; 25 import java.nio.channels.SelectionKey ; 26 import java.nio.channels.Selector ; 27 import java.nio.channels.ServerSocketChannel ; 28 import java.nio.channels.SocketChannel ; 29 import java.util.ArrayList ; 30 import java.util.Iterator ; 31 import java.util.List ; 32 import java.util.Map ; 33 import java.util.Queue ; 34 import java.util.Set ; 35 import java.util.concurrent.ConcurrentHashMap ; 36 import java.util.concurrent.ConcurrentLinkedQueue ; 37 import java.util.concurrent.CountDownLatch ; 38 import java.util.concurrent.Executor ; 39 import java.util.concurrent.atomic.AtomicInteger ; 40 41 import org.apache.mina.common.ExceptionMonitor; 42 import org.apache.mina.common.IoAcceptor; 43 import org.apache.mina.common.IoHandler; 44 import org.apache.mina.common.IoServiceConfig; 45 import org.apache.mina.common.support.BaseIoAcceptor; 46 import org.apache.mina.util.NamePreservingRunnable; 47 import org.apache.mina.util.NewThreadExecutor; 48 49 55 public class SocketAcceptor extends BaseIoAcceptor { 56 private static final AtomicInteger nextId = new AtomicInteger (); 57 58 private final Executor executor; 59 60 private final Object lock = new Object (); 61 62 private final int id = nextId.getAndIncrement(); 63 64 private final String threadName = "SocketAcceptor-" + id; 65 66 private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig(); 67 68 private final Map <SocketAddress , ServerSocketChannel > channels = new ConcurrentHashMap <SocketAddress , ServerSocketChannel >(); 69 70 private final Queue <RegistrationRequest> registerQueue = new ConcurrentLinkedQueue <RegistrationRequest>(); 71 72 private final Queue <CancellationRequest> cancelQueue = new ConcurrentLinkedQueue <CancellationRequest>(); 73 74 private final SocketIoProcessor[] ioProcessors; 75 76 private final int processorCount; 77 78 private Selector selector; 79 80 private Worker worker; 81 82 private int processorDistributor = 0; 83 84 87 public SocketAcceptor() { 88 this(1, new NewThreadExecutor()); 89 } 90 91 97 public SocketAcceptor(int processorCount, Executor executor) { 98 if (processorCount < 1) { 99 throw new IllegalArgumentException ( 100 "Must have at least one processor"); 101 } 102 103 defaultConfig.getSessionConfig().setReuseAddress(true); 105 106 this.executor = executor; 107 this.processorCount = processorCount; 108 ioProcessors = new SocketIoProcessor[processorCount]; 109 110 for (int i = 0; i < processorCount; i++) { 111 ioProcessors[i] = new SocketIoProcessor( 112 "SocketAcceptorIoProcessor-" + id + "." + i, executor); 113 } 114 } 115 116 122 public void bind(SocketAddress address, IoHandler handler, 123 IoServiceConfig config) throws IOException { 124 if (handler == null) { 125 throw new NullPointerException ("handler"); 126 } 127 128 if (address != null && !(address instanceof InetSocketAddress )) { 129 throw new IllegalArgumentException ("Unexpected address type: " 130 + address.getClass()); 131 } 132 133 if (config == null) { 134 config = getDefaultConfig(); 135 } 136 137 RegistrationRequest request = new RegistrationRequest(address, handler, 138 config); 139 140 registerQueue.add(request); 141 142 startupWorker(); 143 144 selector.wakeup(); 145 146 try { 147 request.done.await(); 148 } catch (InterruptedException e) { 149 ExceptionMonitor.getInstance().exceptionCaught(e); 150 } 151 152 if (request.exception != null) { 153 throw request.exception; 154 } 155 } 156 157 private synchronized void startupWorker() throws IOException { 158 synchronized (lock) { 159 if (worker == null) { 160 selector = Selector.open(); 161 worker = new Worker(); 162 163 executor.execute(new NamePreservingRunnable(worker)); 164 } 165 } 166 } 167 168 public void unbind(SocketAddress address) { 169 if (address == null) { 170 throw new NullPointerException ("address"); 171 } 172 173 CancellationRequest request = new CancellationRequest(address); 174 175 try { 176 startupWorker(); 177 } catch (IOException e) { 178 throw new IllegalArgumentException ("Address not bound: " + address); 183 } 184 185 cancelQueue.add(request); 186 187 selector.wakeup(); 188 189 try { 190 request.done.await(); 191 } catch (InterruptedException e) { 192 ExceptionMonitor.getInstance().exceptionCaught(e); 193 } 194 195 if (request.exception != null) { 196 request.exception.fillInStackTrace(); 197 198 throw request.exception; 199 } 200 } 201 202 public void unbindAll() { 203 List <SocketAddress > addresses = new ArrayList <SocketAddress >(channels 204 .keySet()); 205 206 for (SocketAddress address : addresses) { 207 unbind(address); 208 } 209 } 210 211 private class Worker implements Runnable { 212 public void run() { 213 Thread.currentThread().setName(SocketAcceptor.this.threadName); 214 215 for (;;) { 216 try { 217 int nKeys = selector.select(); 218 219 registerNew(); 220 221 if (nKeys > 0) { 222 processSessions(selector.selectedKeys()); 223 } 224 225 cancelKeys(); 226 227 if (selector.keys().isEmpty()) { 228 synchronized (lock) { 229 if (selector.keys().isEmpty() 230 && registerQueue.isEmpty() 231 && cancelQueue.isEmpty()) { 232 worker = null; 233 try { 234 selector.close(); 235 } catch (IOException e) { 236 ExceptionMonitor.getInstance() 237 .exceptionCaught(e); 238 } finally { 239 selector = null; 240 } 241 break; 242 } 243 } 244 } 245 } catch (IOException e) { 246 ExceptionMonitor.getInstance().exceptionCaught(e); 247 248 try { 249 Thread.sleep(1000); 250 } catch (InterruptedException e1) { 251 ExceptionMonitor.getInstance().exceptionCaught(e1); 252 } 253 } 254 } 255 } 256 257 private void processSessions(Set <SelectionKey > keys) throws IOException { 258 Iterator <SelectionKey > it = keys.iterator(); 259 while (it.hasNext()) { 260 SelectionKey key = it.next(); 261 262 it.remove(); 263 264 if (!key.isAcceptable()) { 265 continue; 266 } 267 268 ServerSocketChannel ssc = (ServerSocketChannel ) key.channel(); 269 270 SocketChannel ch = ssc.accept(); 271 272 if (ch == null) { 273 continue; 274 } 275 276 boolean success = false; 277 try { 278 RegistrationRequest req = (RegistrationRequest) key 279 .attachment(); 280 SocketSessionImpl session = new SocketSessionImpl( 281 SocketAcceptor.this, nextProcessor(), 282 getListeners(), req.config, ch, req.handler, 283 req.address); 284 getFilterChainBuilder().buildFilterChain( 285 session.getFilterChain()); 286 req.config.getFilterChainBuilder().buildFilterChain( 287 session.getFilterChain()); 288 req.config.getThreadModel().buildFilterChain( 289 session.getFilterChain()); 290 session.getIoProcessor().addNew(session); 291 success = true; 292 } catch (Throwable t) { 293 ExceptionMonitor.getInstance().exceptionCaught(t); 294 } finally { 295 if (!success) { 296 ch.close(); 297 } 298 } 299 } 300 } 301 } 302 303 private SocketIoProcessor nextProcessor() { 304 if (this.processorDistributor == Integer.MAX_VALUE) { 305 this.processorDistributor = Integer.MAX_VALUE % this.processorCount; 306 } 307 308 return ioProcessors[processorDistributor++ % processorCount]; 309 } 310 311 public SocketAcceptorConfig getDefaultConfig() { 312 return defaultConfig; 313 } 314 315 321 public void setDefaultConfig(SocketAcceptorConfig defaultConfig) { 322 if (defaultConfig == null) { 323 throw new NullPointerException ("defaultConfig"); 324 } 325 this.defaultConfig = defaultConfig; 326 } 327 328 private void registerNew() { 329 if (registerQueue.isEmpty()) { 330 return; 331 } 332 333 for (;;) { 334 RegistrationRequest req = registerQueue.poll(); 335 336 if (req == null) { 337 break; 338 } 339 340 ServerSocketChannel ssc = null; 341 342 try { 343 ssc = ServerSocketChannel.open(); 344 ssc.configureBlocking(false); 345 346 SocketAcceptorConfig cfg; 348 if (req.config instanceof SocketAcceptorConfig) { 349 cfg = (SocketAcceptorConfig) req.config; 350 } else { 351 cfg = getDefaultConfig(); 352 } 353 354 ssc.socket().setReuseAddress(cfg.isReuseAddress()); 355 ssc.socket().setReceiveBufferSize( 356 cfg.getSessionConfig().getReceiveBufferSize()); 357 358 ssc.socket().bind(req.address, cfg.getBacklog()); 360 if (req.address == null || req.address.getPort() == 0) { 361 req.address = (InetSocketAddress ) ssc.socket() 362 .getLocalSocketAddress(); 363 } 364 ssc.register(selector, SelectionKey.OP_ACCEPT, req); 365 366 channels.put(req.address, ssc); 367 368 getListeners().fireServiceActivated(this, req.address, 369 req.handler, req.config); 370 } catch (IOException e) { 371 req.exception = e; 372 } finally { 373 req.done.countDown(); 374 375 if (ssc != null && req.exception != null) { 376 try { 377 ssc.close(); 378 } catch (IOException e) { 379 ExceptionMonitor.getInstance().exceptionCaught(e); 380 } 381 } 382 } 383 } 384 } 385 386 private void cancelKeys() { 387 if (cancelQueue.isEmpty()) { 388 return; 389 } 390 391 for (;;) { 392 CancellationRequest request = cancelQueue.poll(); 393 394 if (request == null) { 395 break; 396 } 397 398 ServerSocketChannel ssc = channels.remove(request.address); 399 400 try { 402 if (ssc == null) { 403 request.exception = new IllegalArgumentException ( 404 "Address not bound: " + request.address); 405 } else { 406 SelectionKey key = ssc.keyFor(selector); 407 request.registrationRequest = (RegistrationRequest) key 408 .attachment(); 409 key.cancel(); 410 411 selector.wakeup(); 413 ssc.close(); 414 } 415 } catch (IOException e) { 416 ExceptionMonitor.getInstance().exceptionCaught(e); 417 } finally { 418 request.done.countDown(); 419 420 if (request.exception == null) { 421 getListeners().fireServiceDeactivated(this, 422 request.address, 423 request.registrationRequest.handler, 424 request.registrationRequest.config); 425 } 426 } 427 } 428 } 429 430 private static class RegistrationRequest { 431 private InetSocketAddress address; 432 433 private final IoHandler handler; 434 435 private final IoServiceConfig config; 436 437 private final CountDownLatch done = new CountDownLatch (1); 438 439 private volatile IOException exception; 440 441 private RegistrationRequest(SocketAddress address, IoHandler handler, 442 IoServiceConfig config) { 443 this.address = (InetSocketAddress ) address; 444 this.handler = handler; 445 this.config = config; 446 } 447 } 448 449 private static class CancellationRequest { 450 private final SocketAddress address; 451 452 private final CountDownLatch done = new CountDownLatch (1); 453 454 private RegistrationRequest registrationRequest; 455 456 private volatile RuntimeException exception; 457 458 private CancellationRequest(SocketAddress address) { 459 this.address = address; 460 } 461 } 462 } 463 | Popular Tags |