KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > transport > socket > nio > SocketIoProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException JavaDoc;
23 import java.nio.channels.SelectionKey JavaDoc;
24 import java.nio.channels.Selector JavaDoc;
25 import java.nio.channels.SocketChannel JavaDoc;
26 import java.util.Queue JavaDoc;
27 import java.util.Set JavaDoc;
28 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
29 import java.util.concurrent.Executor JavaDoc;
30
31 import org.apache.mina.common.ByteBuffer;
32 import org.apache.mina.common.ExceptionMonitor;
33 import org.apache.mina.common.IdleStatus;
34 import org.apache.mina.common.IoFilter.WriteRequest;
35 import org.apache.mina.common.WriteTimeoutException;
36 import org.apache.mina.util.NamePreservingRunnable;
37
38 /**
39  * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
40  *
41  * @author The Apache Directory Project (mina-dev@directory.apache.org)
42  * @version $Rev: 556539 $, $Date: 2007-07-16 16:48:36 +0900 (월, 16 7월 2007) $,
43  */

44 class SocketIoProcessor {
45     private final Object JavaDoc lock = new Object JavaDoc();
46
47     private final String JavaDoc threadName;
48
49     private final Executor JavaDoc executor;
50
51     private Selector JavaDoc selector;
52
53     private final Queue JavaDoc<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue JavaDoc<SocketSessionImpl>();
54
55     private final Queue JavaDoc<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue JavaDoc<SocketSessionImpl>();
56
57     private final Queue JavaDoc<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue JavaDoc<SocketSessionImpl>();
58
59     private final Queue JavaDoc<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue JavaDoc<SocketSessionImpl>();
60
61     private Worker worker;
62
63     private long lastIdleCheckTime = System.currentTimeMillis();
64
65     SocketIoProcessor(String JavaDoc threadName, Executor JavaDoc executor) {
66         this.threadName = threadName;
67         this.executor = executor;
68     }
69
70     void addNew(SocketSessionImpl session) throws IOException JavaDoc {
71         newSessions.add(session);
72         startupWorker();
73     }
74
75     void remove(SocketSessionImpl session) throws IOException JavaDoc {
76         scheduleRemove(session);
77         startupWorker();
78     }
79
80     private void startupWorker() throws IOException JavaDoc {
81         synchronized (lock) {
82             if (worker == null) {
83                 selector = Selector.open();
84                 worker = new Worker();
85                 executor.execute(new NamePreservingRunnable(worker));
86             }
87             selector.wakeup();
88         }
89     }
90
91     void flush(SocketSessionImpl session) {
92         scheduleFlush(session);
93         Selector JavaDoc selector = this.selector;
94         if (selector != null) {
95             selector.wakeup();
96         }
97     }
98
99     void updateTrafficMask(SocketSessionImpl session) {
100         scheduleTrafficControl(session);
101         Selector JavaDoc selector = this.selector;
102         if (selector != null) {
103             selector.wakeup();
104         }
105     }
106
107     private void scheduleRemove(SocketSessionImpl session) {
108         removingSessions.add(session);
109     }
110
111     private void scheduleFlush(SocketSessionImpl session) {
112         flushingSessions.add(session);
113     }
114
115     private void scheduleTrafficControl(SocketSessionImpl session) {
116         trafficControllingSessions.add(session);
117     }
118
119     private void doAddNew() {
120         for (;;) {
121             SocketSessionImpl session = newSessions.poll();
122
123             if (session == null)
124                 break;
125
126             SocketChannel JavaDoc ch = session.getChannel();
127             try {
128                 ch.configureBlocking(false);
129                 session.setSelectionKey(ch.register(selector,
130                         SelectionKey.OP_READ, session));
131
132                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
133
// in AbstractIoFilterChain.fireSessionOpened().
134
session.getServiceListeners().fireSessionCreated(session);
135             } catch (IOException JavaDoc e) {
136                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
137
// and call ConnectFuture.setException().
138
session.getFilterChain().fireExceptionCaught(session, e);
139             }
140         }
141     }
142
143     private void doRemove() {
144         for (;;) {
145             SocketSessionImpl session = removingSessions.poll();
146
147             if (session == null)
148                 break;
149
150             SocketChannel JavaDoc ch = session.getChannel();
151             SelectionKey JavaDoc key = session.getSelectionKey();
152             // Retry later if session is not yet fully initialized.
153
// (In case that Session.close() is called before addSession() is processed)
154
if (key == null) {
155                 scheduleRemove(session);
156                 break;
157             }
158             // skip if channel is already closed
159
if (!key.isValid()) {
160                 continue;
161             }
162
163             try {
164                 key.cancel();
165                 ch.close();
166             } catch (IOException JavaDoc e) {
167                 session.getFilterChain().fireExceptionCaught(session, e);
168             } finally {
169                 releaseWriteBuffers(session);
170                 session.getServiceListeners().fireSessionDestroyed(session);
171             }
172         }
173     }
174
175     private void process(Set JavaDoc<SelectionKey JavaDoc> selectedKeys) {
176         for (SelectionKey JavaDoc key : selectedKeys) {
177             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
178
179             if (key.isReadable() && session.getTrafficMask().isReadable()) {
180                 read(session);
181             }
182
183             if (key.isWritable() && session.getTrafficMask().isWritable()) {
184                 scheduleFlush(session);
185             }
186         }
187
188         selectedKeys.clear();
189     }
190
191     private void read(SocketSessionImpl session) {
192         ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
193         SocketChannel JavaDoc ch = session.getChannel();
194
195         try {
196             int readBytes = 0;
197             int ret;
198
199             try {
200                 while ((ret = ch.read(buf.buf())) > 0) {
201                     readBytes += ret;
202                 }
203             } finally {
204                 buf.flip();
205             }
206
207             session.increaseReadBytes(readBytes);
208
209             if (readBytes > 0) {
210                 session.getFilterChain().fireMessageReceived(session, buf);
211                 buf = null;
212                 
213                 if (readBytes * 2 < session.getReadBufferSize()) {
214                     if (session.getReadBufferSize() > 64) {
215                         session.setReadBufferSize(session.getReadBufferSize() >>> 1);
216                     }
217                 } else if (readBytes == session.getReadBufferSize()) {
218                     session.setReadBufferSize(session.getReadBufferSize() << 1);
219                 }
220             }
221             if (ret < 0) {
222                 scheduleRemove(session);
223             }
224         } catch (Throwable JavaDoc e) {
225             if (e instanceof IOException JavaDoc)
226                 scheduleRemove(session);
227             session.getFilterChain().fireExceptionCaught(session, e);
228         } finally {
229             if (buf != null)
230                 buf.release();
231         }
232     }
233
234     private void notifyIdleness() {
235         // process idle sessions
236
long currentTime = System.currentTimeMillis();
237         if ((currentTime - lastIdleCheckTime) >= 1000) {
238             lastIdleCheckTime = currentTime;
239             Set JavaDoc<SelectionKey JavaDoc> keys = selector.keys();
240             if (keys != null) {
241                 for (SelectionKey JavaDoc key : keys) {
242                     SocketSessionImpl session = (SocketSessionImpl) key
243                             .attachment();
244                     notifyIdleness(session, currentTime);
245                 }
246             }
247         }
248     }
249
250     private void notifyIdleness(SocketSessionImpl session, long currentTime) {
251         notifyIdleness0(session, currentTime, session
252                 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
253                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
254                         .getLastIdleTime(IdleStatus.BOTH_IDLE)));
255         notifyIdleness0(session, currentTime, session
256                 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
257                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
258                         session.getLastIdleTime(IdleStatus.READER_IDLE)));
259         notifyIdleness0(session, currentTime, session
260                 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
261                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
262                         session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
263
264         notifyWriteTimeout(session, currentTime, session
265                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
266     }
267
268     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
269             long idleTime, IdleStatus status, long lastIoTime) {
270         if (idleTime > 0 && lastIoTime != 0
271                 && (currentTime - lastIoTime) >= idleTime) {
272             session.increaseIdleCount(status);
273             session.getFilterChain().fireSessionIdle(session, status);
274         }
275     }
276
277     private void notifyWriteTimeout(SocketSessionImpl session,
278             long currentTime, long writeTimeout, long lastIoTime) {
279         SelectionKey JavaDoc key = session.getSelectionKey();
280         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
281                 && key != null && key.isValid()
282                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
283             session.getFilterChain().fireExceptionCaught(session,
284                     new WriteTimeoutException());
285         }
286     }
287
288     private void doFlush() {
289         for (;;) {
290             SocketSessionImpl session = flushingSessions.poll();
291
292             if (session == null)
293                 break;
294
295             if (!session.isConnected()) {
296                 releaseWriteBuffers(session);
297                 continue;
298             }
299
300             SelectionKey JavaDoc key = session.getSelectionKey();
301             // Retry later if session is not yet fully initialized.
302
// (In case that Session.write() is called before addSession() is processed)
303
if (key == null) {
304                 scheduleFlush(session);
305                 break;
306             }
307
308             // Skip if the channel is already closed.
309
if (!key.isValid()) {
310                 continue;
311             }
312
313             try {
314                 doFlush(session);
315             } catch (IOException JavaDoc e) {
316                 scheduleRemove(session);
317                 session.getFilterChain().fireExceptionCaught(session, e);
318             }
319         }
320     }
321
322     private void releaseWriteBuffers(SocketSessionImpl session) {
323         Queue JavaDoc<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
324         WriteRequest req;
325
326         while ((req = writeRequestQueue.poll()) != null) {
327             try {
328                 ((ByteBuffer) req.getMessage()).release();
329             } catch (IllegalStateException JavaDoc e) {
330                 session.getFilterChain().fireExceptionCaught(session, e);
331             } finally {
332                 req.getFuture().setWritten(false);
333             }
334         }
335     }
336
337     private void doFlush(SocketSessionImpl session) throws IOException JavaDoc {
338         // Clear OP_WRITE
339
SelectionKey JavaDoc key = session.getSelectionKey();
340         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
341
342         SocketChannel JavaDoc ch = session.getChannel();
343         Queue JavaDoc<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
344
345         for (;;) {
346             WriteRequest req = writeRequestQueue.peek();
347
348             if (req == null)
349                 break;
350
351             ByteBuffer buf = (ByteBuffer) req.getMessage();
352             if (buf.remaining() == 0) {
353                 writeRequestQueue.poll();
354
355                 session.increaseWrittenMessages();
356
357                 buf.reset();
358                 session.getFilterChain().fireMessageSent(session, req);
359                 continue;
360             }
361
362             if (key.isWritable()) {
363                 int writtenBytes = ch.write(buf.buf());
364                 if (writtenBytes > 0) {
365                     session.increaseWrittenBytes(writtenBytes);
366                 }
367             }
368
369             if (buf.hasRemaining()) {
370                 // Kernel buffer is full
371
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
372                 break;
373             }
374         }
375     }
376
377     private void doUpdateTrafficMask() {
378         if (trafficControllingSessions.isEmpty())
379             return;
380
381         for (;;) {
382             SocketSessionImpl session = trafficControllingSessions.poll();
383
384             if (session == null)
385                 break;
386
387             SelectionKey JavaDoc key = session.getSelectionKey();
388             // Retry later if session is not yet fully initialized.
389
// (In case that Session.suspend??() or session.resume??() is
390
// called before addSession() is processed)
391
if (key == null) {
392                 scheduleTrafficControl(session);
393                 break;
394             }
395             // skip if channel is already closed
396
if (!key.isValid()) {
397                 continue;
398             }
399
400             // The normal is OP_READ and, if there are write requests in the
401
// session's write queue, set OP_WRITE to trigger flushing.
402
int ops = SelectionKey.OP_READ;
403             Queue JavaDoc<WriteRequest> writeRequestQueue = session
404                     .getWriteRequestQueue();
405             synchronized (writeRequestQueue) {
406                 if (!writeRequestQueue.isEmpty()) {
407                     ops |= SelectionKey.OP_WRITE;
408                 }
409             }
410
411             // Now mask the preferred ops with the mask of the current session
412
int mask = session.getTrafficMask().getInterestOps();
413             key.interestOps(ops & mask);
414         }
415     }
416
417     private class Worker implements Runnable JavaDoc {
418         public void run() {
419             Thread.currentThread().setName(SocketIoProcessor.this.threadName);
420
421             for (;;) {
422                 try {
423                     int nKeys = selector.select(1000);
424                     doAddNew();
425                     doUpdateTrafficMask();
426
427                     if (nKeys > 0) {
428                         process(selector.selectedKeys());
429                     }
430
431                     doFlush();
432                     doRemove();
433                     notifyIdleness();
434
435                     if (selector.keys().isEmpty()) {
436                         synchronized (lock) {
437                             if (selector.keys().isEmpty()
438                                     && newSessions.isEmpty()) {
439                                 worker = null;
440
441                                 try {
442                                     selector.close();
443                                 } catch (IOException JavaDoc e) {
444                                     ExceptionMonitor.getInstance()
445                                             .exceptionCaught(e);
446                                 } finally {
447                                     selector = null;
448                                 }
449
450                                 break;
451                             }
452                         }
453                     }
454                 } catch (Throwable JavaDoc t) {
455                     ExceptionMonitor.getInstance().exceptionCaught(t);
456
457                     try {
458                         Thread.sleep(1000);
459                     } catch (InterruptedException JavaDoc e1) {
460                         ExceptionMonitor.getInstance().exceptionCaught(e1);
461                     }
462                 }
463             }
464         }
465     }
466
467 }
468
Popular Tags