KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > transport > socket > nio > support > DatagramConnectorDelegate


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException JavaDoc;
23 import java.net.InetSocketAddress JavaDoc;
24 import java.net.SocketAddress JavaDoc;
25 import java.nio.channels.DatagramChannel JavaDoc;
26 import java.nio.channels.SelectionKey JavaDoc;
27 import java.nio.channels.Selector JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Queue JavaDoc;
30 import java.util.Set JavaDoc;
31 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
32 import java.util.concurrent.Executor JavaDoc;
33 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
34
35 import org.apache.mina.common.ByteBuffer;
36 import org.apache.mina.common.ConnectFuture;
37 import org.apache.mina.common.ExceptionMonitor;
38 import org.apache.mina.common.IoConnector;
39 import org.apache.mina.common.IoHandler;
40 import org.apache.mina.common.IoServiceConfig;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.IoSessionRecycler;
43 import org.apache.mina.common.IoFilter.WriteRequest;
44 import org.apache.mina.common.support.AbstractIoFilterChain;
45 import org.apache.mina.common.support.BaseIoConnector;
46 import org.apache.mina.common.support.DefaultConnectFuture;
47 import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
48 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
49 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
50 import org.apache.mina.util.NamePreservingRunnable;
51
52 /**
53  * {@link IoConnector} for datagram transport (UDP/IP).
54  *
55  * @author The Apache Directory Project (mina-dev@directory.apache.org)
56  * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13 7월 2007) $
57  */

58 public class DatagramConnectorDelegate extends BaseIoConnector implements
59         DatagramService {
60     private static final AtomicInteger JavaDoc nextId = new AtomicInteger JavaDoc();
61
62     private final Object JavaDoc lock = new Object JavaDoc();
63
64     private final IoConnector wrapper;
65
66     private final Executor JavaDoc executor;
67
68     private final int id = nextId.getAndIncrement();
69
70     private Selector JavaDoc selector;
71
72     private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
73
74     private final Queue JavaDoc<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue JavaDoc<RegistrationRequest>();
75
76     private final Queue JavaDoc<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue JavaDoc<DatagramSessionImpl>();
77
78     private final Queue JavaDoc<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue JavaDoc<DatagramSessionImpl>();
79
80     private final Queue JavaDoc<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue JavaDoc<DatagramSessionImpl>();
81
82     private Worker worker;
83
84     /**
85      * Creates a new instance.
86      */

87     public DatagramConnectorDelegate(IoConnector wrapper, Executor JavaDoc executor) {
88         this.wrapper = wrapper;
89         this.executor = executor;
90     }
91
92     public ConnectFuture connect(SocketAddress JavaDoc address, IoHandler handler,
93             IoServiceConfig config) {
94         return connect(address, null, handler, config);
95     }
96
97     public ConnectFuture connect(SocketAddress JavaDoc address,
98             SocketAddress JavaDoc localAddress, IoHandler handler,
99             IoServiceConfig config) {
100         if (address == null)
101             throw new NullPointerException JavaDoc("address");
102         if (handler == null)
103             throw new NullPointerException JavaDoc("handler");
104
105         if (!(address instanceof InetSocketAddress JavaDoc))
106             throw new IllegalArgumentException JavaDoc("Unexpected address type: "
107                     + address.getClass());
108
109         if (localAddress != null
110                 && !(localAddress instanceof InetSocketAddress JavaDoc)) {
111             throw new IllegalArgumentException JavaDoc(
112                     "Unexpected local address type: " + localAddress.getClass());
113         }
114
115         if (config == null) {
116             config = getDefaultConfig();
117         }
118
119         DatagramChannel JavaDoc ch = null;
120         boolean initialized = false;
121         try {
122             ch = DatagramChannel.open();
123             DatagramSessionConfig cfg;
124             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
125                 cfg = (DatagramSessionConfig) config.getSessionConfig();
126             } else {
127                 cfg = getDefaultConfig().getSessionConfig();
128             }
129
130             ch.socket().setReuseAddress(cfg.isReuseAddress());
131             ch.socket().setBroadcast(cfg.isBroadcast());
132             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
133             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
134
135             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
136                 ch.socket().setTrafficClass(cfg.getTrafficClass());
137             }
138
139             if (localAddress != null) {
140                 ch.socket().bind(localAddress);
141             }
142             ch.connect(address);
143             ch.configureBlocking(false);
144             initialized = true;
145         } catch (IOException JavaDoc e) {
146             return DefaultConnectFuture.newFailedFuture(e);
147         } finally {
148             if (!initialized && ch != null) {
149                 try {
150                     ch.disconnect();
151                     ch.close();
152                 } catch (IOException JavaDoc e) {
153                     ExceptionMonitor.getInstance().exceptionCaught(e);
154                 }
155             }
156         }
157
158         RegistrationRequest request = new RegistrationRequest(ch, handler,
159                 config);
160         try {
161             startupWorker();
162         } catch (IOException JavaDoc e) {
163             try {
164                 ch.disconnect();
165                 ch.close();
166             } catch (IOException JavaDoc e2) {
167                 ExceptionMonitor.getInstance().exceptionCaught(e2);
168             }
169
170             return DefaultConnectFuture.newFailedFuture(e);
171         }
172
173         registerQueue.add(request);
174
175         selector.wakeup();
176         return request;
177     }
178
179     public DatagramConnectorConfig getDefaultConfig() {
180         return defaultConfig;
181     }
182
183     /**
184      * Sets the config this connector will use by default.
185      *
186      * @param defaultConfig the default config.
187      * @throws NullPointerException if the specified value is <code>null</code>.
188      */

189     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
190         if (defaultConfig == null) {
191             throw new NullPointerException JavaDoc("defaultConfig");
192         }
193         this.defaultConfig = defaultConfig;
194     }
195
196     private void startupWorker() throws IOException JavaDoc {
197         synchronized (lock) {
198             if (worker == null) {
199                 selector = Selector.open();
200                 worker = new Worker();
201                 executor.execute(new NamePreservingRunnable(worker));
202             }
203         }
204     }
205
206     public void closeSession(DatagramSessionImpl session) {
207         try {
208             startupWorker();
209         } catch (IOException JavaDoc e) {
210             // IOException is thrown only when Worker thread is not
211
// running and failed to open a selector. We simply return
212
// silently here because it we can simply conclude that
213
// this session is not managed by this connector or
214
// already closed.
215
return;
216         }
217
218         cancelQueue.add(session);
219
220         selector.wakeup();
221     }
222
223     public void flushSession(DatagramSessionImpl session) {
224         scheduleFlush(session);
225         Selector JavaDoc selector = this.selector;
226         if (selector != null) {
227             selector.wakeup();
228         }
229     }
230
231     private void scheduleFlush(DatagramSessionImpl session) {
232         flushingSessions.add(session);
233     }
234
235     public void updateTrafficMask(DatagramSessionImpl session) {
236         scheduleTrafficControl(session);
237         Selector JavaDoc selector = this.selector;
238         if (selector != null) {
239             selector.wakeup();
240         }
241     }
242
243     private void scheduleTrafficControl(DatagramSessionImpl session) {
244         trafficControllingSessions.add(session);
245     }
246
247     private void doUpdateTrafficMask() {
248         if (trafficControllingSessions.isEmpty())
249             return;
250
251         for (;;) {
252             DatagramSessionImpl session = trafficControllingSessions.poll();
253
254             if (session == null)
255                 break;
256
257             SelectionKey JavaDoc key = session.getSelectionKey();
258             // Retry later if session is not yet fully initialized.
259
// (In case that Session.suspend??() or session.resume??() is
260
// called before addSession() is processed)
261
if (key == null) {
262                 scheduleTrafficControl(session);
263                 break;
264             }
265             // skip if channel is already closed
266
if (!key.isValid()) {
267                 continue;
268             }
269
270             // The normal is OP_READ and, if there are write requests in the
271
// session's write queue, set OP_WRITE to trigger flushing.
272
int ops = SelectionKey.OP_READ;
273             if (!session.getWriteRequestQueue().isEmpty()) {
274                 ops |= SelectionKey.OP_WRITE;
275             }
276
277             // Now mask the preferred ops with the mask of the current session
278
int mask = session.getTrafficMask().getInterestOps();
279             key.interestOps(ops & mask);
280         }
281     }
282
283     private class Worker implements Runnable JavaDoc {
284         public void run() {
285             Thread.currentThread().setName("DatagramConnector-" + id);
286
287             for (;;) {
288                 try {
289                     int nKeys = selector.select();
290
291                     registerNew();
292                     doUpdateTrafficMask();
293
294                     if (nKeys > 0) {
295                         processReadySessions(selector.selectedKeys());
296                     }
297
298                     flushSessions();
299                     cancelKeys();
300
301                     if (selector.keys().isEmpty()) {
302                         synchronized (lock) {
303                             if (selector.keys().isEmpty()
304                                     && registerQueue.isEmpty()
305                                     && cancelQueue.isEmpty()) {
306                                 worker = null;
307                                 try {
308                                     selector.close();
309                                 } catch (IOException JavaDoc e) {
310                                     ExceptionMonitor.getInstance()
311                                             .exceptionCaught(e);
312                                 } finally {
313                                     selector = null;
314                                 }
315                                 break;
316                             }
317                         }
318                     }
319                 } catch (IOException JavaDoc e) {
320                     ExceptionMonitor.getInstance().exceptionCaught(e);
321
322                     try {
323                         Thread.sleep(1000);
324                     } catch (InterruptedException JavaDoc e1) {
325                         ExceptionMonitor.getInstance().exceptionCaught(e1);
326                     }
327                 }
328             }
329         }
330     }
331
332     private void processReadySessions(Set JavaDoc<SelectionKey JavaDoc> keys) {
333         Iterator JavaDoc<SelectionKey JavaDoc> it = keys.iterator();
334         while (it.hasNext()) {
335             SelectionKey JavaDoc key = it.next();
336             it.remove();
337
338             DatagramSessionImpl session = (DatagramSessionImpl) key
339                     .attachment();
340
341             // Let the recycler know that the session is still active.
342
getSessionRecycler(session).recycle(session.getLocalAddress(),
343                     session.getRemoteAddress());
344
345             if (key.isReadable() && session.getTrafficMask().isReadable()) {
346                 readSession(session);
347             }
348
349             if (key.isWritable() && session.getTrafficMask().isWritable()) {
350                 scheduleFlush(session);
351             }
352         }
353     }
354
355     private IoSessionRecycler getSessionRecycler(IoSession session) {
356         IoServiceConfig config = session.getServiceConfig();
357         IoSessionRecycler sessionRecycler;
358         if (config instanceof DatagramServiceConfig) {
359             sessionRecycler = ((DatagramServiceConfig) config)
360                     .getSessionRecycler();
361         } else {
362             sessionRecycler = defaultConfig.getSessionRecycler();
363         }
364         return sessionRecycler;
365     }
366
367     private void readSession(DatagramSessionImpl session) {
368
369         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
370         try {
371             int readBytes = session.getChannel().read(readBuf.buf());
372             if (readBytes > 0) {
373                 readBuf.flip();
374                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
375                 newBuf.put(readBuf);
376                 newBuf.flip();
377
378                 session.increaseReadBytes(readBytes);
379                 session.getFilterChain().fireMessageReceived(session, newBuf);
380             }
381         } catch (IOException JavaDoc e) {
382             session.getFilterChain().fireExceptionCaught(session, e);
383         } finally {
384             readBuf.release();
385         }
386     }
387
388     private void flushSessions() {
389         if (flushingSessions.size() == 0)
390             return;
391
392         for (;;) {
393             DatagramSessionImpl session = flushingSessions.poll();
394
395             if (session == null)
396                 break;
397
398             try {
399                 flush(session);
400             } catch (IOException JavaDoc e) {
401                 session.getFilterChain().fireExceptionCaught(session, e);
402             }
403         }
404     }
405
406     private void flush(DatagramSessionImpl session) throws IOException JavaDoc {
407         DatagramChannel JavaDoc ch = session.getChannel();
408
409         Queue JavaDoc<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
410
411         for (;;) {
412             WriteRequest req = writeRequestQueue.peek();
413
414             if (req == null)
415                 break;
416
417             ByteBuffer buf = (ByteBuffer) req.getMessage();
418             if (buf.remaining() == 0) {
419                 // pop and fire event
420
writeRequestQueue.poll();
421
422                 session.increaseWrittenMessages();
423                 buf.reset();
424                 session.getFilterChain().fireMessageSent(session, req);
425                 continue;
426             }
427
428             SelectionKey JavaDoc key = session.getSelectionKey();
429             if (key == null) {
430                 scheduleFlush(session);
431                 break;
432             }
433             if (!key.isValid()) {
434                 continue;
435             }
436
437             int writtenBytes = ch.write(buf.buf());
438
439             if (writtenBytes == 0) {
440                 // Kernel buffer is full
441
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
442             } else if (writtenBytes > 0) {
443                 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
444
445                 // pop and fire event
446
writeRequestQueue.poll();
447
448                 session.increaseWrittenBytes(writtenBytes);
449                 session.increaseWrittenMessages();
450                 buf.reset();
451                 session.getFilterChain().fireMessageSent(session, req);
452             }
453         }
454     }
455
456     private void registerNew() {
457         if (registerQueue.isEmpty())
458             return;
459
460         for (;;) {
461             RegistrationRequest req = registerQueue.poll();
462
463             if (req == null)
464                 break;
465
466             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
467                     this, req.config, req.channel, req.handler, req.channel
468                             .socket().getRemoteSocketAddress(), req.channel
469                             .socket().getLocalSocketAddress());
470
471             // AbstractIoFilterChain will notify the connect future.
472
session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
473
474             boolean success = false;
475             try {
476                 SelectionKey JavaDoc key = req.channel.register(selector,
477                         SelectionKey.OP_READ, session);
478
479                 session.setSelectionKey(key);
480                 buildFilterChain(req, session);
481                 getSessionRecycler(session).put(session);
482
483                 // The CONNECT_FUTURE attribute is cleared and notified here.
484
getListeners().fireSessionCreated(session);
485                 success = true;
486             } catch (Throwable JavaDoc t) {
487                 // The CONNECT_FUTURE attribute is cleared and notified here.
488
session.getFilterChain().fireExceptionCaught(session, t);
489             } finally {
490                 if (!success) {
491                     try {
492                         req.channel.disconnect();
493                         req.channel.close();
494                     } catch (IOException JavaDoc e) {
495                         ExceptionMonitor.getInstance().exceptionCaught(e);
496                     }
497                 }
498             }
499         }
500     }
501
502     private void buildFilterChain(RegistrationRequest req, IoSession session)
503             throws Exception JavaDoc {
504         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
505         req.config.getFilterChainBuilder().buildFilterChain(
506                 session.getFilterChain());
507         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
508     }
509
510     private void cancelKeys() {
511         if (cancelQueue.isEmpty())
512             return;
513
514         for (;;) {
515             DatagramSessionImpl session = cancelQueue.poll();
516
517             if (session == null)
518                 break;
519             else {
520                 SelectionKey JavaDoc key = session.getSelectionKey();
521                 DatagramChannel JavaDoc ch = (DatagramChannel JavaDoc) key.channel();
522                 try {
523                     ch.disconnect();
524                     ch.close();
525                 } catch (IOException JavaDoc e) {
526                     ExceptionMonitor.getInstance().exceptionCaught(e);
527                 }
528
529                 getListeners().fireSessionDestroyed(session);
530                 session.getCloseFuture().setClosed();
531                 key.cancel();
532                 selector.wakeup(); // wake up again to trigger thread death
533
}
534         }
535     }
536
537     private static class RegistrationRequest extends DefaultConnectFuture {
538         private final DatagramChannel JavaDoc channel;
539
540         private final IoHandler handler;
541
542         private final IoServiceConfig config;
543
544         private RegistrationRequest(DatagramChannel JavaDoc channel, IoHandler handler,
545                 IoServiceConfig config) {
546             this.channel = channel;
547             this.handler = handler;
548             this.config = config;
549         }
550     }
551 }
552
Popular Tags