|                                                                                                              1
 20  package org.apache.mina.transport.socket.nio;
 21
 22  import java.io.IOException
  ; 23  import java.nio.channels.SelectionKey
  ; 24  import java.nio.channels.Selector
  ; 25  import java.nio.channels.SocketChannel
  ; 26  import java.util.Queue
  ; 27  import java.util.Set
  ; 28  import java.util.concurrent.ConcurrentLinkedQueue
  ; 29  import java.util.concurrent.Executor
  ; 30
 31  import org.apache.mina.common.ByteBuffer;
 32  import org.apache.mina.common.ExceptionMonitor;
 33  import org.apache.mina.common.IdleStatus;
 34  import org.apache.mina.common.IoFilter.WriteRequest;
 35  import org.apache.mina.common.WriteTimeoutException;
 36  import org.apache.mina.util.NamePreservingRunnable;
 37
 38
 44  class SocketIoProcessor {
 45      private final Object
  lock = new Object  (); 46
 47      private final String
  threadName; 48
 49      private final Executor
  executor; 50
 51      private Selector
  selector; 52
 53      private final Queue
  <SocketSessionImpl> newSessions = new ConcurrentLinkedQueue  <SocketSessionImpl>(); 54
 55      private final Queue
  <SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue  <SocketSessionImpl>(); 56
 57      private final Queue
  <SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue  <SocketSessionImpl>(); 58
 59      private final Queue
  <SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue  <SocketSessionImpl>(); 60
 61      private Worker worker;
 62
 63      private long lastIdleCheckTime = System.currentTimeMillis();
 64
 65      SocketIoProcessor(String
  threadName, Executor  executor) { 66          this.threadName = threadName;
 67          this.executor = executor;
 68      }
 69
 70      void addNew(SocketSessionImpl session) throws IOException
  { 71          newSessions.add(session);
 72          startupWorker();
 73      }
 74
 75      void remove(SocketSessionImpl session) throws IOException
  { 76          scheduleRemove(session);
 77          startupWorker();
 78      }
 79
 80      private void startupWorker() throws IOException
  { 81          synchronized (lock) {
 82              if (worker == null) {
 83                  selector = Selector.open();
 84                  worker = new Worker();
 85                  executor.execute(new NamePreservingRunnable(worker));
 86              }
 87              selector.wakeup();
 88          }
 89      }
 90
 91      void flush(SocketSessionImpl session) {
 92          scheduleFlush(session);
 93          Selector
  selector = this.selector; 94          if (selector != null) {
 95              selector.wakeup();
 96          }
 97      }
 98
 99      void updateTrafficMask(SocketSessionImpl session) {
 100         scheduleTrafficControl(session);
 101         Selector
  selector = this.selector; 102         if (selector != null) {
 103             selector.wakeup();
 104         }
 105     }
 106
 107     private void scheduleRemove(SocketSessionImpl session) {
 108         removingSessions.add(session);
 109     }
 110
 111     private void scheduleFlush(SocketSessionImpl session) {
 112         flushingSessions.add(session);
 113     }
 114
 115     private void scheduleTrafficControl(SocketSessionImpl session) {
 116         trafficControllingSessions.add(session);
 117     }
 118
 119     private void doAddNew() {
 120         for (;;) {
 121             SocketSessionImpl session = newSessions.poll();
 122
 123             if (session == null)
 124                 break;
 125
 126             SocketChannel
  ch = session.getChannel(); 127             try {
 128                 ch.configureBlocking(false);
 129                 session.setSelectionKey(ch.register(selector,
 130                         SelectionKey.OP_READ, session));
 131
 132                                                 session.getServiceListeners().fireSessionCreated(session);
 135             } catch (IOException
  e) { 136                                                 session.getFilterChain().fireExceptionCaught(session, e);
 139             }
 140         }
 141     }
 142
 143     private void doRemove() {
 144         for (;;) {
 145             SocketSessionImpl session = removingSessions.poll();
 146
 147             if (session == null)
 148                 break;
 149
 150             SocketChannel
  ch = session.getChannel(); 151             SelectionKey
  key = session.getSelectionKey(); 152                                     if (key == null) {
 155                 scheduleRemove(session);
 156                 break;
 157             }
 158                         if (!key.isValid()) {
 160                 continue;
 161             }
 162
 163             try {
 164                 key.cancel();
 165                 ch.close();
 166             } catch (IOException
  e) { 167                 session.getFilterChain().fireExceptionCaught(session, e);
 168             } finally {
 169                 releaseWriteBuffers(session);
 170                 session.getServiceListeners().fireSessionDestroyed(session);
 171             }
 172         }
 173     }
 174
 175     private void process(Set
  <SelectionKey  > selectedKeys) { 176         for (SelectionKey
  key : selectedKeys) { 177             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
 178
 179             if (key.isReadable() && session.getTrafficMask().isReadable()) {
 180                 read(session);
 181             }
 182
 183             if (key.isWritable() && session.getTrafficMask().isWritable()) {
 184                 scheduleFlush(session);
 185             }
 186         }
 187
 188         selectedKeys.clear();
 189     }
 190
 191     private void read(SocketSessionImpl session) {
 192         ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
 193         SocketChannel
  ch = session.getChannel(); 194
 195         try {
 196             int readBytes = 0;
 197             int ret;
 198
 199             try {
 200                 while ((ret = ch.read(buf.buf())) > 0) {
 201                     readBytes += ret;
 202                 }
 203             } finally {
 204                 buf.flip();
 205             }
 206
 207             session.increaseReadBytes(readBytes);
 208
 209             if (readBytes > 0) {
 210                 session.getFilterChain().fireMessageReceived(session, buf);
 211                 buf = null;
 212
 213                 if (readBytes * 2 < session.getReadBufferSize()) {
 214                     if (session.getReadBufferSize() > 64) {
 215                         session.setReadBufferSize(session.getReadBufferSize() >>> 1);
 216                     }
 217                 } else if (readBytes == session.getReadBufferSize()) {
 218                     session.setReadBufferSize(session.getReadBufferSize() << 1);
 219                 }
 220             }
 221             if (ret < 0) {
 222                 scheduleRemove(session);
 223             }
 224         } catch (Throwable
  e) { 225             if (e instanceof IOException
  ) 226                 scheduleRemove(session);
 227             session.getFilterChain().fireExceptionCaught(session, e);
 228         } finally {
 229             if (buf != null)
 230                 buf.release();
 231         }
 232     }
 233
 234     private void notifyIdleness() {
 235                 long currentTime = System.currentTimeMillis();
 237         if ((currentTime - lastIdleCheckTime) >= 1000) {
 238             lastIdleCheckTime = currentTime;
 239             Set
  <SelectionKey  > keys = selector.keys(); 240             if (keys != null) {
 241                 for (SelectionKey
  key : keys) { 242                     SocketSessionImpl session = (SocketSessionImpl) key
 243                             .attachment();
 244                     notifyIdleness(session, currentTime);
 245                 }
 246             }
 247         }
 248     }
 249
 250     private void notifyIdleness(SocketSessionImpl session, long currentTime) {
 251         notifyIdleness0(session, currentTime, session
 252                 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
 253                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
 254                         .getLastIdleTime(IdleStatus.BOTH_IDLE)));
 255         notifyIdleness0(session, currentTime, session
 256                 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
 257                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
 258                         session.getLastIdleTime(IdleStatus.READER_IDLE)));
 259         notifyIdleness0(session, currentTime, session
 260                 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
 261                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
 262                         session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
 263
 264         notifyWriteTimeout(session, currentTime, session
 265                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
 266     }
 267
 268     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
 269             long idleTime, IdleStatus status, long lastIoTime) {
 270         if (idleTime > 0 && lastIoTime != 0
 271                 && (currentTime - lastIoTime) >= idleTime) {
 272             session.increaseIdleCount(status);
 273             session.getFilterChain().fireSessionIdle(session, status);
 274         }
 275     }
 276
 277     private void notifyWriteTimeout(SocketSessionImpl session,
 278             long currentTime, long writeTimeout, long lastIoTime) {
 279         SelectionKey
  key = session.getSelectionKey(); 280         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
 281                 && key != null && key.isValid()
 282                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
 283             session.getFilterChain().fireExceptionCaught(session,
 284                     new WriteTimeoutException());
 285         }
 286     }
 287
 288     private void doFlush() {
 289         for (;;) {
 290             SocketSessionImpl session = flushingSessions.poll();
 291
 292             if (session == null)
 293                 break;
 294
 295             if (!session.isConnected()) {
 296                 releaseWriteBuffers(session);
 297                 continue;
 298             }
 299
 300             SelectionKey
  key = session.getSelectionKey(); 301                                     if (key == null) {
 304                 scheduleFlush(session);
 305                 break;
 306             }
 307
 308                         if (!key.isValid()) {
 310                 continue;
 311             }
 312
 313             try {
 314                 doFlush(session);
 315             } catch (IOException
  e) { 316                 scheduleRemove(session);
 317                 session.getFilterChain().fireExceptionCaught(session, e);
 318             }
 319         }
 320     }
 321
 322     private void releaseWriteBuffers(SocketSessionImpl session) {
 323         Queue
  <WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); 324         WriteRequest req;
 325
 326         while ((req = writeRequestQueue.poll()) != null) {
 327             try {
 328                 ((ByteBuffer) req.getMessage()).release();
 329             } catch (IllegalStateException
  e) { 330                 session.getFilterChain().fireExceptionCaught(session, e);
 331             } finally {
 332                 req.getFuture().setWritten(false);
 333             }
 334         }
 335     }
 336
 337     private void doFlush(SocketSessionImpl session) throws IOException
  { 338                 SelectionKey
  key = session.getSelectionKey(); 340         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 341
 342         SocketChannel
  ch = session.getChannel(); 343         Queue
  <WriteRequest> writeRequestQueue = session.getWriteRequestQueue(); 344
 345         for (;;) {
 346             WriteRequest req = writeRequestQueue.peek();
 347
 348             if (req == null)
 349                 break;
 350
 351             ByteBuffer buf = (ByteBuffer) req.getMessage();
 352             if (buf.remaining() == 0) {
 353                 writeRequestQueue.poll();
 354
 355                 session.increaseWrittenMessages();
 356
 357                 buf.reset();
 358                 session.getFilterChain().fireMessageSent(session, req);
 359                 continue;
 360             }
 361
 362             if (key.isWritable()) {
 363                 int writtenBytes = ch.write(buf.buf());
 364                 if (writtenBytes > 0) {
 365                     session.increaseWrittenBytes(writtenBytes);
 366                 }
 367             }
 368
 369             if (buf.hasRemaining()) {
 370                                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
 372                 break;
 373             }
 374         }
 375     }
 376
 377     private void doUpdateTrafficMask() {
 378         if (trafficControllingSessions.isEmpty())
 379             return;
 380
 381         for (;;) {
 382             SocketSessionImpl session = trafficControllingSessions.poll();
 383
 384             if (session == null)
 385                 break;
 386
 387             SelectionKey
  key = session.getSelectionKey(); 388                                                 if (key == null) {
 392                 scheduleTrafficControl(session);
 393                 break;
 394             }
 395                         if (!key.isValid()) {
 397                 continue;
 398             }
 399
 400                                     int ops = SelectionKey.OP_READ;
 403             Queue
  <WriteRequest> writeRequestQueue = session 404                     .getWriteRequestQueue();
 405             synchronized (writeRequestQueue) {
 406                 if (!writeRequestQueue.isEmpty()) {
 407                     ops |= SelectionKey.OP_WRITE;
 408                 }
 409             }
 410
 411                         int mask = session.getTrafficMask().getInterestOps();
 413             key.interestOps(ops & mask);
 414         }
 415     }
 416
 417     private class Worker implements Runnable
  { 418         public void run() {
 419             Thread.currentThread().setName(SocketIoProcessor.this.threadName);
 420
 421             for (;;) {
 422                 try {
 423                     int nKeys = selector.select(1000);
 424                     doAddNew();
 425                     doUpdateTrafficMask();
 426
 427                     if (nKeys > 0) {
 428                         process(selector.selectedKeys());
 429                     }
 430
 431                     doFlush();
 432                     doRemove();
 433                     notifyIdleness();
 434
 435                     if (selector.keys().isEmpty()) {
 436                         synchronized (lock) {
 437                             if (selector.keys().isEmpty()
 438                                     && newSessions.isEmpty()) {
 439                                 worker = null;
 440
 441                                 try {
 442                                     selector.close();
 443                                 } catch (IOException
  e) { 444                                     ExceptionMonitor.getInstance()
 445                                             .exceptionCaught(e);
 446                                 } finally {
 447                                     selector = null;
 448                                 }
 449
 450                                 break;
 451                             }
 452                         }
 453                     }
 454                 } catch (Throwable
  t) { 455                     ExceptionMonitor.getInstance().exceptionCaught(t);
 456
 457                     try {
 458                         Thread.sleep(1000);
 459                     } catch (InterruptedException
  e1) { 460                         ExceptionMonitor.getInstance().exceptionCaught(e1);
 461                     }
 462                 }
 463             }
 464         }
 465     }
 466
 467 }
 468
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |