KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > transport > socket > nio > support > DatagramAcceptorDelegate


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException JavaDoc;
23 import java.net.InetSocketAddress JavaDoc;
24 import java.net.SocketAddress JavaDoc;
25 import java.nio.channels.DatagramChannel JavaDoc;
26 import java.nio.channels.SelectionKey JavaDoc;
27 import java.nio.channels.Selector JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Map JavaDoc;
32 import java.util.Queue JavaDoc;
33 import java.util.Set JavaDoc;
34 import java.util.concurrent.ConcurrentHashMap JavaDoc;
35 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
36 import java.util.concurrent.Executor JavaDoc;
37 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
38
39 import org.apache.mina.common.ByteBuffer;
40 import org.apache.mina.common.ExceptionMonitor;
41 import org.apache.mina.common.IoAcceptor;
42 import org.apache.mina.common.IoFilter.WriteRequest;
43 import org.apache.mina.common.IoHandler;
44 import org.apache.mina.common.IoServiceConfig;
45 import org.apache.mina.common.IoSession;
46 import org.apache.mina.common.IoSessionRecycler;
47 import org.apache.mina.common.RuntimeIOException;
48 import org.apache.mina.common.support.BaseIoAcceptor;
49 import org.apache.mina.common.support.IoServiceListenerSupport;
50 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
51 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
52 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
53 import org.apache.mina.util.NamePreservingRunnable;
54
55 /**
56  * {@link IoAcceptor} for datagram transport (UDP/IP).
57  *
58  * @author The Apache Directory Project (mina-dev@directory.apache.org)
59  * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13 7월 2007) $
60  */

61 public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
62         IoAcceptor, DatagramService {
63     private static final AtomicInteger JavaDoc nextId = new AtomicInteger JavaDoc();
64
65     private final Object JavaDoc lock = new Object JavaDoc();
66
67     private final IoAcceptor wrapper;
68
69     private final Executor JavaDoc executor;
70
71     private final int id = nextId.getAndIncrement();
72
73     private Selector JavaDoc selector;
74
75     private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
76
77     private final Map JavaDoc<SocketAddress JavaDoc, DatagramChannel JavaDoc> channels = new ConcurrentHashMap JavaDoc<SocketAddress JavaDoc, DatagramChannel JavaDoc>();
78
79     private final Queue JavaDoc<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue JavaDoc<RegistrationRequest>();
80
81     private final Queue JavaDoc<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue JavaDoc<CancellationRequest>();
82
83     private final Queue JavaDoc<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue JavaDoc<DatagramSessionImpl>();
84
85     private Worker worker;
86
87     /**
88      * Creates a new instance.
89      */

90     public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor JavaDoc executor) {
91         this.wrapper = wrapper;
92         this.executor = executor;
93
94         // The default reuseAddress of an accepted socket should be 'true'.
95
defaultConfig.getSessionConfig().setReuseAddress(true);
96     }
97
98     public void bind(SocketAddress JavaDoc address, IoHandler handler,
99             IoServiceConfig config) throws IOException JavaDoc {
100         if (handler == null)
101             throw new NullPointerException JavaDoc("handler");
102         if (config == null) {
103             config = getDefaultConfig();
104         }
105
106         if (address != null && !(address instanceof InetSocketAddress JavaDoc))
107             throw new IllegalArgumentException JavaDoc("Unexpected address type: "
108                     + address.getClass());
109
110         RegistrationRequest request = new RegistrationRequest(address, handler,
111                 config);
112         registerQueue.add(request);
113         startupWorker();
114
115         selector.wakeup();
116
117         synchronized (request) {
118             while (!request.done) {
119                 try {
120                     request.wait();
121                 } catch (InterruptedException JavaDoc e) {
122                     throw new RuntimeIOException(e);
123                 }
124             }
125         }
126
127         if (request.exception != null) {
128             throw (IOException JavaDoc) new IOException JavaDoc("Failed to bind")
129                     .initCause(request.exception);
130         }
131     }
132
133     public void unbind(SocketAddress JavaDoc address) {
134         if (address == null)
135             throw new NullPointerException JavaDoc("address");
136
137         CancellationRequest request = new CancellationRequest(address);
138         try {
139             startupWorker();
140         } catch (IOException JavaDoc e) {
141             // IOException is thrown only when Worker thread is not
142
// running and failed to open a selector. We simply throw
143
// IllegalArgumentException here because we can simply
144
// conclude that nothing is bound to the selector.
145
throw new IllegalArgumentException JavaDoc("Address not bound: " + address);
146         }
147
148         cancelQueue.add(request);
149         selector.wakeup();
150
151         synchronized (request) {
152             while (!request.done) {
153                 try {
154                     request.wait();
155                 } catch (InterruptedException JavaDoc e) {
156                     throw new RuntimeIOException(e);
157                 }
158             }
159         }
160
161         if (request.exception != null) {
162             throw new RuntimeException JavaDoc("Failed to unbind", request.exception);
163         }
164     }
165
166     public void unbindAll() {
167         List JavaDoc<SocketAddress JavaDoc> addresses = new ArrayList JavaDoc<SocketAddress JavaDoc>(channels
168                 .keySet());
169
170         for (SocketAddress JavaDoc address : addresses) {
171             unbind(address);
172         }
173     }
174
175     @Override JavaDoc
176     public IoSession newSession(SocketAddress JavaDoc remoteAddress,
177             SocketAddress JavaDoc localAddress) {
178         if (remoteAddress == null) {
179             throw new NullPointerException JavaDoc("remoteAddress");
180         }
181         if (localAddress == null) {
182             throw new NullPointerException JavaDoc("localAddress");
183         }
184
185         Selector JavaDoc selector = this.selector;
186         DatagramChannel JavaDoc ch = channels.get(localAddress);
187         if (selector == null || ch == null) {
188             throw new IllegalArgumentException JavaDoc("Unknown localAddress: "
189                     + localAddress);
190         }
191
192         SelectionKey JavaDoc key = ch.keyFor(selector);
193         if (key == null) {
194             throw new IllegalArgumentException JavaDoc("Unknown localAddress: "
195                     + localAddress);
196         }
197
198         RegistrationRequest req = (RegistrationRequest) key.attachment();
199         IoSession session;
200         IoSessionRecycler sessionRecycler = getSessionRecycler(req);
201         synchronized (sessionRecycler) {
202             session = sessionRecycler.recycle(localAddress, remoteAddress);
203             if (session != null) {
204                 return session;
205             }
206
207             // If a new session needs to be created.
208
// Note that the local address is the service address in the
209
// acceptor side, and I didn't call getLocalSocketAddress().
210
// This avoids strange cases where getLocalSocketAddress() on the
211
// underlying socket would return an IPv6 address while the
212
// specified service address is an IPv4 address.
213
DatagramSessionImpl datagramSession = new DatagramSessionImpl(
214                     wrapper, this, req.config, ch, req.handler, req.address,
215                     req.address);
216             datagramSession.setRemoteAddress(remoteAddress);
217             datagramSession.setSelectionKey(key);
218
219             getSessionRecycler(req).put(datagramSession);
220             session = datagramSession;
221         }
222
223         try {
224             buildFilterChain(req, session);
225             getListeners().fireSessionCreated(session);
226         } catch (Throwable JavaDoc t) {
227             ExceptionMonitor.getInstance().exceptionCaught(t);
228         }
229
230         return session;
231     }
232
233     private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {
234         IoSessionRecycler sessionRecycler;
235         if (req.config instanceof DatagramServiceConfig) {
236             sessionRecycler = ((DatagramServiceConfig) req.config)
237                     .getSessionRecycler();
238         } else {
239             sessionRecycler = defaultConfig.getSessionRecycler();
240         }
241         return sessionRecycler;
242     }
243
244     @Override JavaDoc
245     public IoServiceListenerSupport getListeners() {
246         return super.getListeners();
247     }
248
249     private void buildFilterChain(RegistrationRequest req, IoSession session)
250             throws Exception JavaDoc {
251         this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
252         req.config.getFilterChainBuilder().buildFilterChain(
253                 session.getFilterChain());
254         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
255     }
256
257     public DatagramAcceptorConfig getDefaultConfig() {
258         return defaultConfig;
259     }
260
261     /**
262      * Sets the config this acceptor will use by default.
263      *
264      * @param defaultConfig the default config.
265      * @throws NullPointerException if the specified value is <code>null</code>.
266      */

267     public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {
268         if (defaultConfig == null) {
269             throw new NullPointerException JavaDoc("defaultConfig");
270         }
271         this.defaultConfig = defaultConfig;
272     }
273
274     private void startupWorker() throws IOException JavaDoc {
275         synchronized (lock) {
276             if (worker == null) {
277                 selector = Selector.open();
278                 worker = new Worker();
279                 executor.execute(new NamePreservingRunnable(worker));
280             }
281         }
282     }
283
284     public void flushSession(DatagramSessionImpl session) {
285         scheduleFlush(session);
286         Selector JavaDoc selector = this.selector;
287         if (selector != null) {
288             selector.wakeup();
289         }
290     }
291
292     public void closeSession(DatagramSessionImpl session) {
293     }
294
295     private void scheduleFlush(DatagramSessionImpl session) {
296         flushingSessions.add(session);
297     }
298
299     private class Worker implements Runnable JavaDoc {
300         public void run() {
301             Thread.currentThread().setName("DatagramAcceptor-" + id);
302
303             for (;;) {
304                 try {
305                     int nKeys = selector.select();
306
307                     registerNew();
308
309                     if (nKeys > 0) {
310                         processReadySessions(selector.selectedKeys());
311                     }
312
313                     flushSessions();
314                     cancelKeys();
315
316                     if (selector.keys().isEmpty()) {
317                         synchronized (lock) {
318                             if (selector.keys().isEmpty()
319                                     && registerQueue.isEmpty()
320                                     && cancelQueue.isEmpty()) {
321                                 worker = null;
322                                 try {
323                                     selector.close();
324                                 } catch (IOException JavaDoc e) {
325                                     ExceptionMonitor.getInstance()
326                                             .exceptionCaught(e);
327                                 } finally {
328                                     selector = null;
329                                 }
330                                 break;
331                             }
332                         }
333                     }
334                 } catch (IOException JavaDoc e) {
335                     ExceptionMonitor.getInstance().exceptionCaught(e);
336
337                     try {
338                         Thread.sleep(1000);
339                     } catch (InterruptedException JavaDoc e1) {
340                         ExceptionMonitor.getInstance().exceptionCaught(e1);
341                     }
342                 }
343             }
344         }
345     }
346
347     private void processReadySessions(Set JavaDoc<SelectionKey JavaDoc> keys) {
348         Iterator JavaDoc<SelectionKey JavaDoc> it = keys.iterator();
349         while (it.hasNext()) {
350             SelectionKey JavaDoc key = it.next();
351             it.remove();
352
353             DatagramChannel JavaDoc ch = (DatagramChannel JavaDoc) key.channel();
354
355             RegistrationRequest req = (RegistrationRequest) key.attachment();
356             try {
357                 if (key.isReadable()) {
358                     readSession(ch, req);
359                 }
360
361                 if (key.isWritable()) {
362                     for (Object JavaDoc o : getManagedSessions(req.address)) {
363                         scheduleFlush((DatagramSessionImpl) o);
364                     }
365                 }
366             } catch (Throwable JavaDoc t) {
367                 ExceptionMonitor.getInstance().exceptionCaught(t);
368             }
369         }
370     }
371
372     private void readSession(DatagramChannel JavaDoc channel, RegistrationRequest req)
373             throws Exception JavaDoc {
374         ByteBuffer readBuf = ByteBuffer
375                 .allocate(((DatagramSessionConfig) req.config
376                         .getSessionConfig()).getReceiveBufferSize());
377         try {
378             SocketAddress JavaDoc remoteAddress = channel.receive(readBuf.buf());
379             if (remoteAddress != null) {
380                 DatagramSessionImpl session = (DatagramSessionImpl) newSession(
381                         remoteAddress, req.address);
382
383                 readBuf.flip();
384
385                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
386                 newBuf.put(readBuf);
387                 newBuf.flip();
388
389                 session.increaseReadBytes(newBuf.remaining());
390                 session.getFilterChain().fireMessageReceived(session, newBuf);
391             }
392         } finally {
393             readBuf.release();
394         }
395     }
396
397     private void flushSessions() {
398         if (flushingSessions.size() == 0)
399             return;
400
401         for (;;) {
402             DatagramSessionImpl session = flushingSessions.poll();
403
404             if (session == null)
405                 break;
406
407             try {
408                 flush(session);
409             } catch (IOException JavaDoc e) {
410                 session.getFilterChain().fireExceptionCaught(session, e);
411             }
412         }
413     }
414
415     private void flush(DatagramSessionImpl session) throws IOException JavaDoc {
416         DatagramChannel JavaDoc ch = session.getChannel();
417
418         Queue JavaDoc<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
419
420         for (;;) {
421             WriteRequest req = writeRequestQueue.peek();
422
423             if (req == null)
424                 break;
425
426             ByteBuffer buf = (ByteBuffer) req.getMessage();
427             if (buf.remaining() == 0) {
428                 // pop and fire event
429
writeRequestQueue.poll();
430
431                 session.increaseWrittenMessages();
432                 buf.reset();
433                 session.getFilterChain().fireMessageSent(session, req);
434                 continue;
435             }
436
437             SelectionKey JavaDoc key = session.getSelectionKey();
438             if (key == null) {
439                 scheduleFlush(session);
440                 break;
441             }
442             if (!key.isValid()) {
443                 continue;
444             }
445
446             SocketAddress JavaDoc destination = req.getDestination();
447             if (destination == null) {
448                 destination = session.getRemoteAddress();
449             }
450
451             int writtenBytes = ch.send(buf.buf(), destination);
452
453             if (writtenBytes == 0) {
454                 // Kernel buffer is full
455
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
456             } else if (writtenBytes > 0) {
457                 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
458
459                 // pop and fire event
460
writeRequestQueue.poll();
461
462                 session.increaseWrittenBytes(writtenBytes);
463                 session.increaseWrittenMessages();
464                 buf.reset();
465                 session.getFilterChain().fireMessageSent(session, req);
466             }
467         }
468     }
469
470     private void registerNew() {
471         if (registerQueue.isEmpty())
472             return;
473
474         for (;;) {
475             RegistrationRequest req = registerQueue.poll();
476
477             if (req == null)
478                 break;
479
480             DatagramChannel JavaDoc ch = null;
481             try {
482                 ch = DatagramChannel.open();
483                 DatagramSessionConfig cfg;
484                 if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {
485                     cfg = (DatagramSessionConfig) req.config.getSessionConfig();
486                 } else {
487                     cfg = getDefaultConfig().getSessionConfig();
488                 }
489
490                 ch.socket().setReuseAddress(cfg.isReuseAddress());
491                 ch.socket().setBroadcast(cfg.isBroadcast());
492                 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
493                 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
494
495                 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
496                     ch.socket().setTrafficClass(cfg.getTrafficClass());
497                 }
498
499                 ch.configureBlocking(false);
500                 ch.socket().bind(req.address);
501                 if (req.address == null || req.address.getPort() == 0) {
502                     req.address = (InetSocketAddress JavaDoc) ch.socket()
503                             .getLocalSocketAddress();
504                 }
505                 ch.register(selector, SelectionKey.OP_READ, req);
506                 channels.put(req.address, ch);
507
508                 getListeners().fireServiceActivated(this, req.address,
509                         req.handler, req.config);
510             } catch (Throwable JavaDoc t) {
511                 req.exception = t;
512             } finally {
513                 synchronized (req) {
514                     req.done = true;
515                     req.notify();
516                 }
517
518                 if (ch != null && req.exception != null) {
519                     try {
520                         ch.disconnect();
521                         ch.close();
522                     } catch (Throwable JavaDoc e) {
523                         ExceptionMonitor.getInstance().exceptionCaught(e);
524                     }
525                 }
526             }
527         }
528     }
529
530     private void cancelKeys() {
531         if (cancelQueue.isEmpty())
532             return;
533
534         for (;;) {
535             CancellationRequest request = cancelQueue.poll();
536
537             if (request == null) {
538                 break;
539             }
540
541             DatagramChannel JavaDoc ch = channels.remove(request.address);
542
543             // close the channel
544
try {
545                 if (ch == null) {
546                     request.exception = new IllegalArgumentException JavaDoc(
547                             "Address not bound: " + request.address);
548                 } else {
549                     SelectionKey JavaDoc key = ch.keyFor(selector);
550                     request.registrationRequest = (RegistrationRequest) key
551                             .attachment();
552                     key.cancel();
553                     selector.wakeup(); // wake up again to trigger thread death
554
ch.disconnect();
555                     ch.close();
556                 }
557             } catch (Throwable JavaDoc t) {
558                 ExceptionMonitor.getInstance().exceptionCaught(t);
559             } finally {
560                 synchronized (request) {
561                     request.done = true;
562                     request.notify();
563                 }
564
565                 if (request.exception == null) {
566                     getListeners().fireServiceDeactivated(this,
567                             request.address,
568                             request.registrationRequest.handler,
569                             request.registrationRequest.config);
570                 }
571             }
572         }
573     }
574
575     public void updateTrafficMask(DatagramSessionImpl session) {
576         // There's no point in changing the traffic mask for sessions originating
577
// from this acceptor since new sessions are created every time data is
578
// received.
579
}
580
581     private static class RegistrationRequest {
582         private InetSocketAddress JavaDoc address;
583
584         private final IoHandler handler;
585
586         private final IoServiceConfig config;
587
588         private Throwable JavaDoc exception;
589
590         private boolean done;
591
592         private RegistrationRequest(SocketAddress JavaDoc address, IoHandler handler,
593                 IoServiceConfig config) {
594             this.address = (InetSocketAddress JavaDoc) address;
595             this.handler = handler;
596             this.config = config;
597         }
598     }
599
600     private static class CancellationRequest {
601         private final SocketAddress JavaDoc address;
602
603         private boolean done;
604
605         private RegistrationRequest registrationRequest;
606
607         private RuntimeException JavaDoc exception;
608
609         private CancellationRequest(SocketAddress JavaDoc address) {
610             this.address = address;
611         }
612     }
613 }
614
Popular Tags