KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > transport > socket > nio > SocketAcceptor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException JavaDoc;
23 import java.net.InetSocketAddress JavaDoc;
24 import java.net.SocketAddress JavaDoc;
25 import java.nio.channels.SelectionKey JavaDoc;
26 import java.nio.channels.Selector JavaDoc;
27 import java.nio.channels.ServerSocketChannel JavaDoc;
28 import java.nio.channels.SocketChannel JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Map JavaDoc;
33 import java.util.Queue JavaDoc;
34 import java.util.Set JavaDoc;
35 import java.util.concurrent.ConcurrentHashMap JavaDoc;
36 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
37 import java.util.concurrent.CountDownLatch JavaDoc;
38 import java.util.concurrent.Executor JavaDoc;
39 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
40
41 import org.apache.mina.common.ExceptionMonitor;
42 import org.apache.mina.common.IoAcceptor;
43 import org.apache.mina.common.IoHandler;
44 import org.apache.mina.common.IoServiceConfig;
45 import org.apache.mina.common.support.BaseIoAcceptor;
46 import org.apache.mina.util.NamePreservingRunnable;
47 import org.apache.mina.util.NewThreadExecutor;
48
49 /**
50  * {@link IoAcceptor} for socket transport (TCP/IP).
51  *
52  * @author The Apache Directory Project (mina-dev@directory.apache.org)
53  * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
54  */

55 public class SocketAcceptor extends BaseIoAcceptor {
56     private static final AtomicInteger JavaDoc nextId = new AtomicInteger JavaDoc();
57
58     private final Executor JavaDoc executor;
59
60     private final Object JavaDoc lock = new Object JavaDoc();
61
62     private final int id = nextId.getAndIncrement();
63
64     private final String JavaDoc threadName = "SocketAcceptor-" + id;
65
66     private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
67
68     private final Map JavaDoc<SocketAddress JavaDoc, ServerSocketChannel JavaDoc> channels = new ConcurrentHashMap JavaDoc<SocketAddress JavaDoc, ServerSocketChannel JavaDoc>();
69
70     private final Queue JavaDoc<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue JavaDoc<RegistrationRequest>();
71
72     private final Queue JavaDoc<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue JavaDoc<CancellationRequest>();
73
74     private final SocketIoProcessor[] ioProcessors;
75
76     private final int processorCount;
77
78     private Selector JavaDoc selector;
79
80     private Worker worker;
81
82     private int processorDistributor = 0;
83
84     /**
85      * Create an acceptor with a single processing thread using a NewThreadExecutor
86      */

87     public SocketAcceptor() {
88         this(1, new NewThreadExecutor());
89     }
90
91     /**
92      * Create an acceptor with the desired number of processing threads
93      *
94      * @param processorCount Number of processing threads
95      * @param executor Executor to use for launching threads
96      */

97     public SocketAcceptor(int processorCount, Executor JavaDoc executor) {
98         if (processorCount < 1) {
99             throw new IllegalArgumentException JavaDoc(
100                     "Must have at least one processor");
101         }
102
103         // The default reuseAddress of an accepted socket should be 'true'.
104
defaultConfig.getSessionConfig().setReuseAddress(true);
105
106         this.executor = executor;
107         this.processorCount = processorCount;
108         ioProcessors = new SocketIoProcessor[processorCount];
109
110         for (int i = 0; i < processorCount; i++) {
111             ioProcessors[i] = new SocketIoProcessor(
112                     "SocketAcceptorIoProcessor-" + id + "." + i, executor);
113         }
114     }
115
116     /**
117      * Binds to the specified <code>address</code> and handles incoming connections with the specified
118      * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
119      *
120      * @throws IOException if failed to bind
121      */

122     public void bind(SocketAddress JavaDoc address, IoHandler handler,
123             IoServiceConfig config) throws IOException JavaDoc {
124         if (handler == null) {
125             throw new NullPointerException JavaDoc("handler");
126         }
127
128         if (address != null && !(address instanceof InetSocketAddress JavaDoc)) {
129             throw new IllegalArgumentException JavaDoc("Unexpected address type: "
130                     + address.getClass());
131         }
132
133         if (config == null) {
134             config = getDefaultConfig();
135         }
136
137         RegistrationRequest request = new RegistrationRequest(address, handler,
138                 config);
139
140         registerQueue.add(request);
141
142         startupWorker();
143
144         selector.wakeup();
145
146         try {
147             request.done.await();
148         } catch (InterruptedException JavaDoc e) {
149             ExceptionMonitor.getInstance().exceptionCaught(e);
150         }
151
152         if (request.exception != null) {
153             throw request.exception;
154         }
155     }
156
157     private synchronized void startupWorker() throws IOException JavaDoc {
158         synchronized (lock) {
159             if (worker == null) {
160                 selector = Selector.open();
161                 worker = new Worker();
162
163                 executor.execute(new NamePreservingRunnable(worker));
164             }
165         }
166     }
167
168     public void unbind(SocketAddress JavaDoc address) {
169         if (address == null) {
170             throw new NullPointerException JavaDoc("address");
171         }
172
173         CancellationRequest request = new CancellationRequest(address);
174
175         try {
176             startupWorker();
177         } catch (IOException JavaDoc e) {
178             // IOException is thrown only when Worker thread is not
179
// running and failed to open a selector. We simply throw
180
// IllegalArgumentException here because we can simply
181
// conclude that nothing is bound to the selector.
182
throw new IllegalArgumentException JavaDoc("Address not bound: " + address);
183         }
184
185         cancelQueue.add(request);
186
187         selector.wakeup();
188
189         try {
190             request.done.await();
191         } catch (InterruptedException JavaDoc e) {
192             ExceptionMonitor.getInstance().exceptionCaught(e);
193         }
194
195         if (request.exception != null) {
196             request.exception.fillInStackTrace();
197
198             throw request.exception;
199         }
200     }
201
202     public void unbindAll() {
203         List JavaDoc<SocketAddress JavaDoc> addresses = new ArrayList JavaDoc<SocketAddress JavaDoc>(channels
204                 .keySet());
205
206         for (SocketAddress JavaDoc address : addresses) {
207             unbind(address);
208         }
209     }
210
211     private class Worker implements Runnable JavaDoc {
212         public void run() {
213             Thread.currentThread().setName(SocketAcceptor.this.threadName);
214
215             for (;;) {
216                 try {
217                     int nKeys = selector.select();
218
219                     registerNew();
220
221                     if (nKeys > 0) {
222                         processSessions(selector.selectedKeys());
223                     }
224
225                     cancelKeys();
226
227                     if (selector.keys().isEmpty()) {
228                         synchronized (lock) {
229                             if (selector.keys().isEmpty()
230                                     && registerQueue.isEmpty()
231                                     && cancelQueue.isEmpty()) {
232                                 worker = null;
233                                 try {
234                                     selector.close();
235                                 } catch (IOException JavaDoc e) {
236                                     ExceptionMonitor.getInstance()
237                                             .exceptionCaught(e);
238                                 } finally {
239                                     selector = null;
240                                 }
241                                 break;
242                             }
243                         }
244                     }
245                 } catch (IOException JavaDoc e) {
246                     ExceptionMonitor.getInstance().exceptionCaught(e);
247
248                     try {
249                         Thread.sleep(1000);
250                     } catch (InterruptedException JavaDoc e1) {
251                         ExceptionMonitor.getInstance().exceptionCaught(e1);
252                     }
253                 }
254             }
255         }
256
257         private void processSessions(Set JavaDoc<SelectionKey JavaDoc> keys) throws IOException JavaDoc {
258             Iterator JavaDoc<SelectionKey JavaDoc> it = keys.iterator();
259             while (it.hasNext()) {
260                 SelectionKey JavaDoc key = it.next();
261
262                 it.remove();
263
264                 if (!key.isAcceptable()) {
265                     continue;
266                 }
267
268                 ServerSocketChannel JavaDoc ssc = (ServerSocketChannel JavaDoc) key.channel();
269
270                 SocketChannel JavaDoc ch = ssc.accept();
271
272                 if (ch == null) {
273                     continue;
274                 }
275
276                 boolean success = false;
277                 try {
278                     RegistrationRequest req = (RegistrationRequest) key
279                             .attachment();
280                     SocketSessionImpl session = new SocketSessionImpl(
281                             SocketAcceptor.this, nextProcessor(),
282                             getListeners(), req.config, ch, req.handler,
283                             req.address);
284                     getFilterChainBuilder().buildFilterChain(
285                             session.getFilterChain());
286                     req.config.getFilterChainBuilder().buildFilterChain(
287                             session.getFilterChain());
288                     req.config.getThreadModel().buildFilterChain(
289                             session.getFilterChain());
290                     session.getIoProcessor().addNew(session);
291                     success = true;
292                 } catch (Throwable JavaDoc t) {
293                     ExceptionMonitor.getInstance().exceptionCaught(t);
294                 } finally {
295                     if (!success) {
296                         ch.close();
297                     }
298                 }
299             }
300         }
301     }
302
303     private SocketIoProcessor nextProcessor() {
304         if (this.processorDistributor == Integer.MAX_VALUE) {
305             this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
306         }
307
308         return ioProcessors[processorDistributor++ % processorCount];
309     }
310
311     public SocketAcceptorConfig getDefaultConfig() {
312         return defaultConfig;
313     }
314
315     /**
316      * Sets the config this acceptor will use by default.
317      *
318      * @param defaultConfig the default config.
319      * @throws NullPointerException if the specified value is <code>null</code>.
320      */

321     public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
322         if (defaultConfig == null) {
323             throw new NullPointerException JavaDoc("defaultConfig");
324         }
325         this.defaultConfig = defaultConfig;
326     }
327
328     private void registerNew() {
329         if (registerQueue.isEmpty()) {
330             return;
331         }
332
333         for (;;) {
334             RegistrationRequest req = registerQueue.poll();
335
336             if (req == null) {
337                 break;
338             }
339
340             ServerSocketChannel JavaDoc ssc = null;
341
342             try {
343                 ssc = ServerSocketChannel.open();
344                 ssc.configureBlocking(false);
345
346                 // Configure the server socket,
347
SocketAcceptorConfig cfg;
348                 if (req.config instanceof SocketAcceptorConfig) {
349                     cfg = (SocketAcceptorConfig) req.config;
350                 } else {
351                     cfg = getDefaultConfig();
352                 }
353
354                 ssc.socket().setReuseAddress(cfg.isReuseAddress());
355                 ssc.socket().setReceiveBufferSize(
356                         cfg.getSessionConfig().getReceiveBufferSize());
357
358                 // and bind.
359
ssc.socket().bind(req.address, cfg.getBacklog());
360                 if (req.address == null || req.address.getPort() == 0) {
361                     req.address = (InetSocketAddress JavaDoc) ssc.socket()
362                             .getLocalSocketAddress();
363                 }
364                 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
365
366                 channels.put(req.address, ssc);
367
368                 getListeners().fireServiceActivated(this, req.address,
369                         req.handler, req.config);
370             } catch (IOException JavaDoc e) {
371                 req.exception = e;
372             } finally {
373                 req.done.countDown();
374
375                 if (ssc != null && req.exception != null) {
376                     try {
377                         ssc.close();
378                     } catch (IOException JavaDoc e) {
379                         ExceptionMonitor.getInstance().exceptionCaught(e);
380                     }
381                 }
382             }
383         }
384     }
385
386     private void cancelKeys() {
387         if (cancelQueue.isEmpty()) {
388             return;
389         }
390
391         for (;;) {
392             CancellationRequest request = cancelQueue.poll();
393
394             if (request == null) {
395                 break;
396             }
397
398             ServerSocketChannel JavaDoc ssc = channels.remove(request.address);
399
400             // close the channel
401
try {
402                 if (ssc == null) {
403                     request.exception = new IllegalArgumentException JavaDoc(
404                             "Address not bound: " + request.address);
405                 } else {
406                     SelectionKey JavaDoc key = ssc.keyFor(selector);
407                     request.registrationRequest = (RegistrationRequest) key
408                             .attachment();
409                     key.cancel();
410
411                     selector.wakeup(); // wake up again to trigger thread death
412

413                     ssc.close();
414                 }
415             } catch (IOException JavaDoc e) {
416                 ExceptionMonitor.getInstance().exceptionCaught(e);
417             } finally {
418                 request.done.countDown();
419
420                 if (request.exception == null) {
421                     getListeners().fireServiceDeactivated(this,
422                             request.address,
423                             request.registrationRequest.handler,
424                             request.registrationRequest.config);
425                 }
426             }
427         }
428     }
429
430     private static class RegistrationRequest {
431         private InetSocketAddress JavaDoc address;
432
433         private final IoHandler handler;
434
435         private final IoServiceConfig config;
436
437         private final CountDownLatch JavaDoc done = new CountDownLatch JavaDoc(1);
438
439         private volatile IOException JavaDoc exception;
440
441         private RegistrationRequest(SocketAddress JavaDoc address, IoHandler handler,
442                 IoServiceConfig config) {
443             this.address = (InetSocketAddress JavaDoc) address;
444             this.handler = handler;
445             this.config = config;
446         }
447     }
448
449     private static class CancellationRequest {
450         private final SocketAddress JavaDoc address;
451
452         private final CountDownLatch JavaDoc done = new CountDownLatch JavaDoc(1);
453
454         private RegistrationRequest registrationRequest;
455
456         private volatile RuntimeException JavaDoc exception;
457
458         private CancellationRequest(SocketAddress JavaDoc address) {
459             this.address = address;
460         }
461     }
462 }
463
Popular Tags