1 20 package org.apache.mina.transport.socket.nio; 21 22 import java.io.IOException ; 23 import java.nio.channels.SelectionKey ; 24 import java.nio.channels.Selector ; 25 import java.nio.channels.SocketChannel ; 26 import java.util.Queue ; 27 import java.util.Set ; 28 import java.util.concurrent.ConcurrentLinkedQueue ; 29 import java.util.concurrent.Executor ; 30 31 import org.apache.mina.common.ByteBuffer; 32 import org.apache.mina.common.ExceptionMonitor; 33 import org.apache.mina.common.IdleStatus; 34 import org.apache.mina.common.IoFilter.WriteRequest; 35 import org.apache.mina.common.WriteTimeoutException; 36 import org.apache.mina.util.NamePreservingRunnable; 37 38 44 class SocketIoProcessor { 45 private final Object lock = new Object (); 46 47 private final String threadName; 48 49 private final Executor executor; 50 51 private Selector selector; 52 53 private final Queue <SocketSessionImpl> newSessions = new ConcurrentLinkedQueue <SocketSessionImpl>(); 54 55 private final Queue <SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue <SocketSessionImpl>(); 56 57 private final Queue <SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue <SocketSessionImpl>(); 58 59 private final Queue <SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue <SocketSessionImpl>(); 60 61 private Worker worker; 62 63 private long lastIdleCheckTime = System.currentTimeMillis(); 64 65 SocketIoProcessor(String threadName, Executor executor) { 66 this.threadName = threadName; 67 this.executor = executor; 68 } 69 70 void addNew(SocketSessionImpl session) throws IOException { 71 newSessions.add(session); 72 startupWorker(); 73 } 74 75 void remove(SocketSessionImpl session) throws IOException { 76 scheduleRemove(session); 77 startupWorker(); 78 } 79 80 private void startupWorker() throws IOException { 81 synchronized (lock) { 82 if (worker == null) { 83 selector = Selector.open(); 84 worker = new Worker(); 85 executor.execute(new NamePreservingRunnable(worker)); 86 } 87 selector.wakeup(); 88 } 89 } 90 91 void flush(SocketSessionImpl session) { 92 scheduleFlush(session); 93 Selector selector = this.selector; 94 if (selector != null) { 95 selector.wakeup(); 96 } 97 } 98 99 void updateTrafficMask(SocketSessionImpl session) { 100 scheduleTrafficControl(session); 101 Selector selector = this.selector; 102 if (selector != null) { 103 selector.wakeup(); 104 } 105 } 106 107 private void scheduleRemove(SocketSessionImpl session) { 108 removingSessions.add(session); 109 } 110 111 private void scheduleFlush(SocketSessionImpl session) { 112 flushingSessions.add(session); 113 } 114 115 private void scheduleTrafficControl(SocketSessionImpl session) { 116 trafficControllingSessions.add(session); 117 } 118 119 private void doAddNew() { 120 for (;;) { 121 SocketSessionImpl session = newSessions.poll(); 122 123 if (session == null) 124 break; 125 126 SocketChannel ch = session.getChannel(); 127 try { 128 ch.configureBlocking(false); 129 session.setSelectionKey(ch.register(selector, 130 SelectionKey.OP_READ, session)); 131 132 session.getServiceListeners().fireSessionCreated(session); 135 } catch (IOException e) { 136 session.getFilterChain().fireExceptionCaught(session, e); 139 } 140 } 141 } 142 143 private void doRemove() { 144 for (;;) { 145 SocketSessionImpl session = removingSessions.poll(); 146 147 if (session == null) 148 break; 149 150 SocketChannel ch = session.getChannel(); 151 SelectionKey key = session.getSelectionKey(); 152 if (key == null) { 155 scheduleRemove(session); 156 break; 157 } 158 if (!key.isValid()) { 160 continue; 161 } 162 163 try { 164 key.cancel(); 165 ch.close(); 166 } catch (IOException e) { 167 session.getFilterChain().fireExceptionCaught(session, e); 168 } finally { 169 releaseWriteBuffers(session); 170 session.getServiceListeners().fireSessionDestroyed(session); 171 } 172 } 173 } 174 175 private void process(Set <SelectionKey > selectedKeys) { 176 for (SelectionKey key : selectedKeys) { 177 SocketSessionImpl session = (SocketSessionImpl) key.attachment(); 178 179 if (key.isReadable() && session.getTrafficMask().isReadable()) { 180 read(session); 181 } 182 183 if (key.isWritable() && session.getTrafficMask().isWritable()) { 184 scheduleFlush(session); 185 } 186 } 187 188 selectedKeys.clear(); 189 } 190 191 private void read(SocketSessionImpl session) { 192 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); 193 SocketChannel ch = session.getChannel(); 194 195 try { 196 int readBytes = 0; 197 int ret; 198 199 try { 200 while ((ret = ch.read(buf.buf())) > 0) { 201 readBytes += ret; 202 } 203 } finally { 204 buf.flip(); 205 } 206 207 session.increaseReadBytes(readBytes); 208 209 if (readBytes > 0) { 210 session.getFilterChain().fireMessageReceived(session, buf); 211 buf = null; 212 213 if (readBytes * 2 < session.getReadBufferSize()) { 214 if (session.getReadBufferSize() > 64) { 215 session.setReadBufferSize(session.getReadBufferSize() >>> 1); 216 } 217 } else if (readBytes == session.getReadBufferSize()) { 218 session.setReadBufferSize(session.getReadBufferSize() << 1); 219 } 220 } 221 if (ret < 0) { 222 scheduleRemove(session); 223 } 224 } catch (Throwable e) { 225 if (e instanceof IOException ) 226 scheduleRemove(session); 227 session.getFilterChain().fireExceptionCaught(session, e); 228 } finally { 229 if (buf != null) 230 buf.release(); 231 } 232 } 233 234 private void notifyIdleness() { 235 long currentTime = System.currentTimeMillis(); 237 if ((currentTime - lastIdleCheckTime) >= 1000) { 238 lastIdleCheckTime = currentTime; 239 Set <SelectionKey > keys = selector.keys(); 240 if (keys != null) { 241 for (SelectionKey key : keys) { 242 SocketSessionImpl session = (SocketSessionImpl) key 243 .attachment(); 244 notifyIdleness(session, currentTime); 245 } 246 } 247 } 248 } 249 250 private void notifyIdleness(SocketSessionImpl session, long currentTime) { 251 notifyIdleness0(session, currentTime, session 252 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE), 253 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session 254 .getLastIdleTime(IdleStatus.BOTH_IDLE))); 255 notifyIdleness0(session, currentTime, session 256 .getIdleTimeInMillis(IdleStatus.READER_IDLE), 257 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), 258 session.getLastIdleTime(IdleStatus.READER_IDLE))); 259 notifyIdleness0(session, currentTime, session 260 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE), 261 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), 262 session.getLastIdleTime(IdleStatus.WRITER_IDLE))); 263 264 notifyWriteTimeout(session, currentTime, session 265 .getWriteTimeoutInMillis(), session.getLastWriteTime()); 266 } 267 268 private void notifyIdleness0(SocketSessionImpl session, long currentTime, 269 long idleTime, IdleStatus status, long lastIoTime) { 270 if (idleTime > 0 && lastIoTime != 0 271 && (currentTime - lastIoTime) >= idleTime) { 272 session.increaseIdleCount(status); 273 session.getFilterChain().fireSessionIdle(session, status); 274 } 275 } 276 277 private void notifyWriteTimeout(SocketSessionImpl session, 278 long currentTime, long writeTimeout, long lastIoTime) { 279 SelectionKey key = session.getSelectionKey(); 280 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout 281 && key != null && key.isValid() 282 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) { 283 session.getFilterChain().fireExceptionCaught(session, 284 new WriteTimeoutException()); 285 } 286 } 287 288 private void doFlush() { 289 for (;;) { 290 SocketSessionImpl session = flushingSessions.poll(); 291 292 if (session == null) 293 break; 294 295 if (!session.isConnected()) { 296 releaseWriteBuffers(session); 297 continue; 298 } 299 300 SelectionKey key = session.getSelectionKey(); 301 if (key == null) { 304 scheduleFlush(session); 305 break; 306 } 307 308 if (!key.isValid()) { 310 continue; 311 } 312 313 try { 314 doFlush(session); 315 } catch (IOException e) { 316 scheduleRemove(session); 317 session.getFilterChain().fireExceptionCaught(session, e); 318 } 319 } 320 } 321 322 private void releaseWriteBuffers(SocketSessionImpl session) { 323 Queue <WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); 324 WriteRequest req; 325 326 while ((req = writeRequestQueue.poll()) != null) { 327 try { 328 ((ByteBuffer) req.getMessage()).release(); 329 } catch (IllegalStateException e) { 330 session.getFilterChain().fireExceptionCaught(session, e); 331 } finally { 332 req.getFuture().setWritten(false); 333 } 334 } 335 } 336 337 private void doFlush(SocketSessionImpl session) throws IOException { 338 SelectionKey key = session.getSelectionKey(); 340 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); 341 342 SocketChannel ch = session.getChannel(); 343 Queue <WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); 344 345 for (;;) { 346 WriteRequest req = writeRequestQueue.peek(); 347 348 if (req == null) 349 break; 350 351 ByteBuffer buf = (ByteBuffer) req.getMessage(); 352 if (buf.remaining() == 0) { 353 writeRequestQueue.poll(); 354 355 session.increaseWrittenMessages(); 356 357 buf.reset(); 358 session.getFilterChain().fireMessageSent(session, req); 359 continue; 360 } 361 362 if (key.isWritable()) { 363 int writtenBytes = ch.write(buf.buf()); 364 if (writtenBytes > 0) { 365 session.increaseWrittenBytes(writtenBytes); 366 } 367 } 368 369 if (buf.hasRemaining()) { 370 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 372 break; 373 } 374 } 375 } 376 377 private void doUpdateTrafficMask() { 378 if (trafficControllingSessions.isEmpty()) 379 return; 380 381 for (;;) { 382 SocketSessionImpl session = trafficControllingSessions.poll(); 383 384 if (session == null) 385 break; 386 387 SelectionKey key = session.getSelectionKey(); 388 if (key == null) { 392 scheduleTrafficControl(session); 393 break; 394 } 395 if (!key.isValid()) { 397 continue; 398 } 399 400 int ops = SelectionKey.OP_READ; 403 Queue <WriteRequest> writeRequestQueue = session 404 .getWriteRequestQueue(); 405 synchronized (writeRequestQueue) { 406 if (!writeRequestQueue.isEmpty()) { 407 ops |= SelectionKey.OP_WRITE; 408 } 409 } 410 411 int mask = session.getTrafficMask().getInterestOps(); 413 key.interestOps(ops & mask); 414 } 415 } 416 417 private class Worker implements Runnable { 418 public void run() { 419 Thread.currentThread().setName(SocketIoProcessor.this.threadName); 420 421 for (;;) { 422 try { 423 int nKeys = selector.select(1000); 424 doAddNew(); 425 doUpdateTrafficMask(); 426 427 if (nKeys > 0) { 428 process(selector.selectedKeys()); 429 } 430 431 doFlush(); 432 doRemove(); 433 notifyIdleness(); 434 435 if (selector.keys().isEmpty()) { 436 synchronized (lock) { 437 if (selector.keys().isEmpty() 438 && newSessions.isEmpty()) { 439 worker = null; 440 441 try { 442 selector.close(); 443 } catch (IOException e) { 444 ExceptionMonitor.getInstance() 445 .exceptionCaught(e); 446 } finally { 447 selector = null; 448 } 449 450 break; 451 } 452 } 453 } 454 } catch (Throwable t) { 455 ExceptionMonitor.getInstance().exceptionCaught(t); 456 457 try { 458 Thread.sleep(1000); 459 } catch (InterruptedException e1) { 460 ExceptionMonitor.getInstance().exceptionCaught(e1); 461 } 462 } 463 } 464 } 465 } 466 467 } 468 | Popular Tags |