1 20 package org.apache.mina.transport.socket.nio.support; 21 22 import java.io.IOException ; 23 import java.net.InetSocketAddress ; 24 import java.net.SocketAddress ; 25 import java.nio.channels.DatagramChannel ; 26 import java.nio.channels.SelectionKey ; 27 import java.nio.channels.Selector ; 28 import java.util.Iterator ; 29 import java.util.Queue ; 30 import java.util.Set ; 31 import java.util.concurrent.ConcurrentLinkedQueue ; 32 import java.util.concurrent.Executor ; 33 import java.util.concurrent.atomic.AtomicInteger ; 34 35 import org.apache.mina.common.ByteBuffer; 36 import org.apache.mina.common.ConnectFuture; 37 import org.apache.mina.common.ExceptionMonitor; 38 import org.apache.mina.common.IoConnector; 39 import org.apache.mina.common.IoHandler; 40 import org.apache.mina.common.IoServiceConfig; 41 import org.apache.mina.common.IoSession; 42 import org.apache.mina.common.IoSessionRecycler; 43 import org.apache.mina.common.IoFilter.WriteRequest; 44 import org.apache.mina.common.support.AbstractIoFilterChain; 45 import org.apache.mina.common.support.BaseIoConnector; 46 import org.apache.mina.common.support.DefaultConnectFuture; 47 import org.apache.mina.transport.socket.nio.DatagramConnectorConfig; 48 import org.apache.mina.transport.socket.nio.DatagramServiceConfig; 49 import org.apache.mina.transport.socket.nio.DatagramSessionConfig; 50 import org.apache.mina.util.NamePreservingRunnable; 51 52 58 public class DatagramConnectorDelegate extends BaseIoConnector implements 59 DatagramService { 60 private static final AtomicInteger nextId = new AtomicInteger (); 61 62 private final Object lock = new Object (); 63 64 private final IoConnector wrapper; 65 66 private final Executor executor; 67 68 private final int id = nextId.getAndIncrement(); 69 70 private Selector selector; 71 72 private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig(); 73 74 private final Queue <RegistrationRequest> registerQueue = new ConcurrentLinkedQueue <RegistrationRequest>(); 75 76 private final Queue <DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue <DatagramSessionImpl>(); 77 78 private final Queue <DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue <DatagramSessionImpl>(); 79 80 private final Queue <DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue <DatagramSessionImpl>(); 81 82 private Worker worker; 83 84 87 public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) { 88 this.wrapper = wrapper; 89 this.executor = executor; 90 } 91 92 public ConnectFuture connect(SocketAddress address, IoHandler handler, 93 IoServiceConfig config) { 94 return connect(address, null, handler, config); 95 } 96 97 public ConnectFuture connect(SocketAddress address, 98 SocketAddress localAddress, IoHandler handler, 99 IoServiceConfig config) { 100 if (address == null) 101 throw new NullPointerException ("address"); 102 if (handler == null) 103 throw new NullPointerException ("handler"); 104 105 if (!(address instanceof InetSocketAddress )) 106 throw new IllegalArgumentException ("Unexpected address type: " 107 + address.getClass()); 108 109 if (localAddress != null 110 && !(localAddress instanceof InetSocketAddress )) { 111 throw new IllegalArgumentException ( 112 "Unexpected local address type: " + localAddress.getClass()); 113 } 114 115 if (config == null) { 116 config = getDefaultConfig(); 117 } 118 119 DatagramChannel ch = null; 120 boolean initialized = false; 121 try { 122 ch = DatagramChannel.open(); 123 DatagramSessionConfig cfg; 124 if (config.getSessionConfig() instanceof DatagramSessionConfig) { 125 cfg = (DatagramSessionConfig) config.getSessionConfig(); 126 } else { 127 cfg = getDefaultConfig().getSessionConfig(); 128 } 129 130 ch.socket().setReuseAddress(cfg.isReuseAddress()); 131 ch.socket().setBroadcast(cfg.isBroadcast()); 132 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize()); 133 ch.socket().setSendBufferSize(cfg.getSendBufferSize()); 134 135 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) { 136 ch.socket().setTrafficClass(cfg.getTrafficClass()); 137 } 138 139 if (localAddress != null) { 140 ch.socket().bind(localAddress); 141 } 142 ch.connect(address); 143 ch.configureBlocking(false); 144 initialized = true; 145 } catch (IOException e) { 146 return DefaultConnectFuture.newFailedFuture(e); 147 } finally { 148 if (!initialized && ch != null) { 149 try { 150 ch.disconnect(); 151 ch.close(); 152 } catch (IOException e) { 153 ExceptionMonitor.getInstance().exceptionCaught(e); 154 } 155 } 156 } 157 158 RegistrationRequest request = new RegistrationRequest(ch, handler, 159 config); 160 try { 161 startupWorker(); 162 } catch (IOException e) { 163 try { 164 ch.disconnect(); 165 ch.close(); 166 } catch (IOException e2) { 167 ExceptionMonitor.getInstance().exceptionCaught(e2); 168 } 169 170 return DefaultConnectFuture.newFailedFuture(e); 171 } 172 173 registerQueue.add(request); 174 175 selector.wakeup(); 176 return request; 177 } 178 179 public DatagramConnectorConfig getDefaultConfig() { 180 return defaultConfig; 181 } 182 183 189 public void setDefaultConfig(DatagramConnectorConfig defaultConfig) { 190 if (defaultConfig == null) { 191 throw new NullPointerException ("defaultConfig"); 192 } 193 this.defaultConfig = defaultConfig; 194 } 195 196 private void startupWorker() throws IOException { 197 synchronized (lock) { 198 if (worker == null) { 199 selector = Selector.open(); 200 worker = new Worker(); 201 executor.execute(new NamePreservingRunnable(worker)); 202 } 203 } 204 } 205 206 public void closeSession(DatagramSessionImpl session) { 207 try { 208 startupWorker(); 209 } catch (IOException e) { 210 return; 216 } 217 218 cancelQueue.add(session); 219 220 selector.wakeup(); 221 } 222 223 public void flushSession(DatagramSessionImpl session) { 224 scheduleFlush(session); 225 Selector selector = this.selector; 226 if (selector != null) { 227 selector.wakeup(); 228 } 229 } 230 231 private void scheduleFlush(DatagramSessionImpl session) { 232 flushingSessions.add(session); 233 } 234 235 public void updateTrafficMask(DatagramSessionImpl session) { 236 scheduleTrafficControl(session); 237 Selector selector = this.selector; 238 if (selector != null) { 239 selector.wakeup(); 240 } 241 } 242 243 private void scheduleTrafficControl(DatagramSessionImpl session) { 244 trafficControllingSessions.add(session); 245 } 246 247 private void doUpdateTrafficMask() { 248 if (trafficControllingSessions.isEmpty()) 249 return; 250 251 for (;;) { 252 DatagramSessionImpl session = trafficControllingSessions.poll(); 253 254 if (session == null) 255 break; 256 257 SelectionKey key = session.getSelectionKey(); 258 if (key == null) { 262 scheduleTrafficControl(session); 263 break; 264 } 265 if (!key.isValid()) { 267 continue; 268 } 269 270 int ops = SelectionKey.OP_READ; 273 if (!session.getWriteRequestQueue().isEmpty()) { 274 ops |= SelectionKey.OP_WRITE; 275 } 276 277 int mask = session.getTrafficMask().getInterestOps(); 279 key.interestOps(ops & mask); 280 } 281 } 282 283 private class Worker implements Runnable { 284 public void run() { 285 Thread.currentThread().setName("DatagramConnector-" + id); 286 287 for (;;) { 288 try { 289 int nKeys = selector.select(); 290 291 registerNew(); 292 doUpdateTrafficMask(); 293 294 if (nKeys > 0) { 295 processReadySessions(selector.selectedKeys()); 296 } 297 298 flushSessions(); 299 cancelKeys(); 300 301 if (selector.keys().isEmpty()) { 302 synchronized (lock) { 303 if (selector.keys().isEmpty() 304 && registerQueue.isEmpty() 305 && cancelQueue.isEmpty()) { 306 worker = null; 307 try { 308 selector.close(); 309 } catch (IOException e) { 310 ExceptionMonitor.getInstance() 311 .exceptionCaught(e); 312 } finally { 313 selector = null; 314 } 315 break; 316 } 317 } 318 } 319 } catch (IOException e) { 320 ExceptionMonitor.getInstance().exceptionCaught(e); 321 322 try { 323 Thread.sleep(1000); 324 } catch (InterruptedException e1) { 325 ExceptionMonitor.getInstance().exceptionCaught(e1); 326 } 327 } 328 } 329 } 330 } 331 332 private void processReadySessions(Set <SelectionKey > keys) { 333 Iterator <SelectionKey > it = keys.iterator(); 334 while (it.hasNext()) { 335 SelectionKey key = it.next(); 336 it.remove(); 337 338 DatagramSessionImpl session = (DatagramSessionImpl) key 339 .attachment(); 340 341 getSessionRecycler(session).recycle(session.getLocalAddress(), 343 session.getRemoteAddress()); 344 345 if (key.isReadable() && session.getTrafficMask().isReadable()) { 346 readSession(session); 347 } 348 349 if (key.isWritable() && session.getTrafficMask().isWritable()) { 350 scheduleFlush(session); 351 } 352 } 353 } 354 355 private IoSessionRecycler getSessionRecycler(IoSession session) { 356 IoServiceConfig config = session.getServiceConfig(); 357 IoSessionRecycler sessionRecycler; 358 if (config instanceof DatagramServiceConfig) { 359 sessionRecycler = ((DatagramServiceConfig) config) 360 .getSessionRecycler(); 361 } else { 362 sessionRecycler = defaultConfig.getSessionRecycler(); 363 } 364 return sessionRecycler; 365 } 366 367 private void readSession(DatagramSessionImpl session) { 368 369 ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize()); 370 try { 371 int readBytes = session.getChannel().read(readBuf.buf()); 372 if (readBytes > 0) { 373 readBuf.flip(); 374 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit()); 375 newBuf.put(readBuf); 376 newBuf.flip(); 377 378 session.increaseReadBytes(readBytes); 379 session.getFilterChain().fireMessageReceived(session, newBuf); 380 } 381 } catch (IOException e) { 382 session.getFilterChain().fireExceptionCaught(session, e); 383 } finally { 384 readBuf.release(); 385 } 386 } 387 388 private void flushSessions() { 389 if (flushingSessions.size() == 0) 390 return; 391 392 for (;;) { 393 DatagramSessionImpl session = flushingSessions.poll(); 394 395 if (session == null) 396 break; 397 398 try { 399 flush(session); 400 } catch (IOException e) { 401 session.getFilterChain().fireExceptionCaught(session, e); 402 } 403 } 404 } 405 406 private void flush(DatagramSessionImpl session) throws IOException { 407 DatagramChannel ch = session.getChannel(); 408 409 Queue <WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); 410 411 for (;;) { 412 WriteRequest req = writeRequestQueue.peek(); 413 414 if (req == null) 415 break; 416 417 ByteBuffer buf = (ByteBuffer) req.getMessage(); 418 if (buf.remaining() == 0) { 419 writeRequestQueue.poll(); 421 422 session.increaseWrittenMessages(); 423 buf.reset(); 424 session.getFilterChain().fireMessageSent(session, req); 425 continue; 426 } 427 428 SelectionKey key = session.getSelectionKey(); 429 if (key == null) { 430 scheduleFlush(session); 431 break; 432 } 433 if (!key.isValid()) { 434 continue; 435 } 436 437 int writtenBytes = ch.write(buf.buf()); 438 439 if (writtenBytes == 0) { 440 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 442 } else if (writtenBytes > 0) { 443 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); 444 445 writeRequestQueue.poll(); 447 448 session.increaseWrittenBytes(writtenBytes); 449 session.increaseWrittenMessages(); 450 buf.reset(); 451 session.getFilterChain().fireMessageSent(session, req); 452 } 453 } 454 } 455 456 private void registerNew() { 457 if (registerQueue.isEmpty()) 458 return; 459 460 for (;;) { 461 RegistrationRequest req = registerQueue.poll(); 462 463 if (req == null) 464 break; 465 466 DatagramSessionImpl session = new DatagramSessionImpl(wrapper, 467 this, req.config, req.channel, req.handler, req.channel 468 .socket().getRemoteSocketAddress(), req.channel 469 .socket().getLocalSocketAddress()); 470 471 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req); 473 474 boolean success = false; 475 try { 476 SelectionKey key = req.channel.register(selector, 477 SelectionKey.OP_READ, session); 478 479 session.setSelectionKey(key); 480 buildFilterChain(req, session); 481 getSessionRecycler(session).put(session); 482 483 getListeners().fireSessionCreated(session); 485 success = true; 486 } catch (Throwable t) { 487 session.getFilterChain().fireExceptionCaught(session, t); 489 } finally { 490 if (!success) { 491 try { 492 req.channel.disconnect(); 493 req.channel.close(); 494 } catch (IOException e) { 495 ExceptionMonitor.getInstance().exceptionCaught(e); 496 } 497 } 498 } 499 } 500 } 501 502 private void buildFilterChain(RegistrationRequest req, IoSession session) 503 throws Exception { 504 getFilterChainBuilder().buildFilterChain(session.getFilterChain()); 505 req.config.getFilterChainBuilder().buildFilterChain( 506 session.getFilterChain()); 507 req.config.getThreadModel().buildFilterChain(session.getFilterChain()); 508 } 509 510 private void cancelKeys() { 511 if (cancelQueue.isEmpty()) 512 return; 513 514 for (;;) { 515 DatagramSessionImpl session = cancelQueue.poll(); 516 517 if (session == null) 518 break; 519 else { 520 SelectionKey key = session.getSelectionKey(); 521 DatagramChannel ch = (DatagramChannel ) key.channel(); 522 try { 523 ch.disconnect(); 524 ch.close(); 525 } catch (IOException e) { 526 ExceptionMonitor.getInstance().exceptionCaught(e); 527 } 528 529 getListeners().fireSessionDestroyed(session); 530 session.getCloseFuture().setClosed(); 531 key.cancel(); 532 selector.wakeup(); } 534 } 535 } 536 537 private static class RegistrationRequest extends DefaultConnectFuture { 538 private final DatagramChannel channel; 539 540 private final IoHandler handler; 541 542 private final IoServiceConfig config; 543 544 private RegistrationRequest(DatagramChannel channel, IoHandler handler, 545 IoServiceConfig config) { 546 this.channel = channel; 547 this.handler = handler; 548 this.config = config; 549 } 550 } 551 } 552 | Popular Tags |