KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > net > NetworkSelector


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.core.net;
47
48 /**
49  * NetworkSelector.java
50  *
51  * A thread running select() in an inifinite loop. It provides an
52  * interface to register server channels (for accept), connecting
53  * channels, connected channels (for read & write operations).
54  *
55  * Created: Mon Jan 19 15:12:58 2004
56  *
57  * @author Uri Schneider
58  * @version 1.0
59  */

60
61 import java.io.IOException JavaDoc;
62 import java.nio.channels.CancelledKeyException JavaDoc;
63 import java.nio.channels.ClosedChannelException JavaDoc;
64 import java.nio.channels.SelectableChannel JavaDoc;
65 import java.nio.channels.SelectionKey JavaDoc;
66 import java.nio.channels.Selector JavaDoc;
67 import java.nio.channels.ServerSocketChannel JavaDoc;
68 import java.nio.channels.SocketChannel JavaDoc;
69 import java.util.Collections JavaDoc;
70 import java.util.Iterator JavaDoc;
71 import java.util.LinkedList JavaDoc;
72 import java.util.List JavaDoc;
73
74 import org.apache.commons.logging.Log;
75 import org.apache.commons.logging.LogFactory;
76 import org.mr.core.util.ActiveObject;
77
78 public class NetworkSelector implements Runnable JavaDoc {
79     private Selector JavaDoc selector;
80     private NetworkListener listener;
81     private Thread JavaDoc myThread;
82     private Log log;
83     private List JavaDoc registerQueue;
84
85     /**
86      * Default contructor
87      */

88     public NetworkSelector() {
89         this.log = LogFactory.getLog("NetworkSelector");
90         this.selector = null;
91         this.registerQueue = Collections.synchronizedList(new LinkedList JavaDoc());
92
93         try {
94             this.selector = Selector.open();
95         } catch (IOException JavaDoc e) {
96             if(log.isFatalEnabled()){
97                 log.fatal("Cannot open Selector", e);
98             }
99         }
100     } // NetworkSelector constructor
101

102     /**
103      * Register a new server channel for selection. New accepted channels
104      * will be reported through this selector's NetworkListener.
105      * @param channel
106      */

107     public void addServerChannel(ServerSocketChannel JavaDoc channel) {
108         final SelectableChannel JavaDoc fchannel = channel;
109         this.registerQueue.add(new ActiveObject() {
110                 public boolean call() {
111                     registerAcceptChannel(fchannel);
112                     return true;
113                 }
114             });
115         this.selector.wakeup();
116     }
117
118     /**
119      * Register a TransportImpl object for selection. The impl will be called
120      * whenever new data is available on its channel.
121      * @param impl
122      */

123     public void addTransportImpl(TransportImpl impl, Transport owner) {
124         final SelectableChannel JavaDoc fchannel = impl.getChannel();
125         final SelectorReadCallback fcb = impl;
126         final Transport fowner = owner;
127
128         if (fchannel != null) {
129             if (impl.isConnected()) {
130                 this.registerQueue.add(new ActiveObject() {
131                         public boolean call() {
132                             registerReadChannel(fchannel, fcb);
133                             return true;
134                         }
135                     });
136             } else {
137                 this.registerQueue.add(new ActiveObject() {
138                         public boolean call() {
139                             registerConnectingChannel(fchannel, fowner);
140                             return true;
141                         }
142                     });
143             }
144             this.selector.wakeup();
145         }
146     }
147
148     public void addImplForWrite(TransportImpl impl) {
149         final SelectableChannel JavaDoc fchannel = impl.getChannel();
150         final SelectorReadCallback fcb = impl;
151
152         this.registerQueue.add(new ActiveObject() {
153                 public boolean call() {
154                     registerWriteChannel(fchannel, fcb);
155                     return true;
156                 }
157             });
158         this.selector.wakeup();
159     }
160
161     public void removeImplForWrite(TransportImpl impl) {
162         final SelectableChannel JavaDoc fchannel = impl.getChannel();
163
164         this.registerQueue.add(new ActiveObject() {
165                 public boolean call() {
166                     deregisterWriteChannel(fchannel);
167                     return true;
168                 }
169             });
170         this.selector.wakeup();
171     }
172                         
173     private void registerReadChannel(SelectableChannel JavaDoc channel,
174                                      SelectorReadCallback cb)
175     {
176         try {
177             channel.configureBlocking(false);
178             SelectionKey JavaDoc readKey = channel.register(this.selector,
179                                                     SelectionKey.OP_READ);
180             readKey.attach(cb);
181         } catch (ClosedChannelException JavaDoc e) {
182             if(log.isDebugEnabled()){
183                 log.debug("Cannot register read channel (" +
184                           channel.toString() + ")");
185             }
186         } catch (IOException JavaDoc e) {
187             if(log.isWarnEnabled()){
188                 log.warn("IO Error while registering new read channel ("
189                                + channel.toString() + "): " + e);
190             }
191         }
192     }
193
194     private void registerWriteChannel(SelectableChannel JavaDoc channel,
195                                       SelectorReadCallback cb) {
196         try {
197             channel.configureBlocking(false);
198             SelectionKey JavaDoc key = channel.keyFor(this.selector);
199             if (key == null) {
200                 key = channel.register(this.selector,
201                                        SelectionKey.OP_WRITE);
202             } else {
203                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
204             }
205             key.attach(cb);
206         } catch (IOException JavaDoc e) {
207             if(log.isWarnEnabled()){
208                 log.warn("IO Error while registering write channel (" +
209                             channel.toString() + "): " + e.toString()+".");
210             }
211         } catch (CancelledKeyException JavaDoc e) {}
212     }
213
214     private void deregisterWriteChannel(SelectableChannel JavaDoc channel) {
215         try {
216             SelectionKey JavaDoc key = channel.keyFor(this.selector);
217             key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
218         } catch (CancelledKeyException JavaDoc e) {}
219     }
220
221     private void registerConnectingChannel(SelectableChannel JavaDoc channel,
222                                            Transport owner) {
223         try {
224             channel.configureBlocking(false);
225             SelectionKey JavaDoc connectKey =
226                 channel.register(this.selector, SelectionKey.OP_CONNECT);
227             connectKey.attach(owner);
228         } catch (ClosedChannelException JavaDoc e) {
229             if(log.isDebugEnabled()){
230                 log.debug("Cannot register connecting channel (" +
231                           channel.toString() + ")");
232             }
233         } catch (IOException JavaDoc e) {
234             if(log.isWarnEnabled()){
235                 log.warn("IO Error while registering new connecting " +
236                             "channel (" + channel.toString() + "): " +
237                             e.toString()+".");
238             }
239         }
240     }
241
242     private void registerAcceptChannel(SelectableChannel JavaDoc channel) {
243         try {
244             channel.configureBlocking(false);
245             channel.register(this.selector, SelectionKey.OP_ACCEPT);
246         } catch (ClosedChannelException JavaDoc e) {
247             if(log.isErrorEnabled()){
248                 log.error("Cannot register server channel (local = " +
249                              channel.toString() +
250                              ") - THIS SHOULDN'T HAPPEN!!", e);
251             }
252         } catch (IOException JavaDoc e) {
253             if(log.isErrorEnabled()){
254                 log.error("IO Error while registering new server channel ("
255                              + channel.toString() + ").", e);
256             }
257         }
258     }
259
260     /**
261      * Set this selector's listener
262      * @param listener
263      */

264     public void setListener(NetworkListener listener) {
265         this.listener = listener;
266     }
267
268     /**
269      * Start this selector thread.
270      */

271     public void start() {
272         this.myThread = new Thread JavaDoc(this, "NetworkSelector");
273         this.myThread.start();
274     }
275
276     /* (non-Javadoc)
277      * @see java.lang.Runnable#run()
278      */

279      public void run() {
280         Iterator JavaDoc selectedIter;
281         SelectionKey JavaDoc key;
282         int nErrors = 0;
283
284         while (true) {
285             try {
286                 selector.select();
287             } catch (IOException JavaDoc e) {
288                 if(log.isErrorEnabled()){
289                     log.error("Error selecting", e);
290                 }
291                 nErrors++;
292                 if (nErrors == 10) {
293                     if(log.isFatalEnabled()){
294                         log.fatal("Selector exiting");
295                     }
296                     return;
297                 }
298                 continue;
299             } catch (NullPointerException JavaDoc e) {
300                 // this happens somtimes when a channel is closed
301
// while a select operation is in progress, although
302
// it shouldn't happen. so? continue, of course.
303
// (but do not print a log message! it scares the
304
// shit out of the QA people)
305
if(log.isDebugEnabled()){
306                     log.debug("Null pointer while selecting.");
307                 }
308                 continue;
309             } catch (CancelledKeyException JavaDoc cke) {
310                 // do nothing. what can one do?
311
}
312
313
314             nErrors = 0;
315             selectedIter = selector.selectedKeys().iterator();
316             while (selectedIter.hasNext()) {
317                 key = (SelectionKey JavaDoc) selectedIter.next();
318                 selectedIter.remove();
319
320                 try {
321                     if (key.isAcceptable()) {
322                         SocketChannel JavaDoc channel;
323                         ServerSocketChannel JavaDoc server =
324                             (ServerSocketChannel JavaDoc) key.channel();
325                         try {
326                             channel = server.accept();
327                             if (this.listener != null) {
328                                 listener.acceptedChannel(channel);
329                             }
330                         } catch (IOException JavaDoc e) {
331                             if(log.isErrorEnabled()){
332                                 log.error("Error accepting (local = " +
333                                              server.socket().
334                                              getLocalSocketAddress().
335                                              toString() + ").", e);
336                             }
337                         }
338                     }
339                     if (key.isReadable()) {
340                         SelectorReadCallback srcb =
341                             (SelectorReadCallback) key.attachment();
342                         if (srcb != null) {
343                             srcb.read();
344                         }
345                     }
346                     if (key.isConnectable()) {
347                         Transport t = (Transport) key.attachment();
348                         SocketChannel JavaDoc channel =
349                             (SocketChannel JavaDoc) key.channel();
350                         try {
351                             channel.finishConnect();
352                         } catch (IOException JavaDoc e) {
353                             if(log.isInfoEnabled()){
354                                 log.info("Error connecting to " +
355                                             t.getInfo().getSocketAddress().
356                                             toString() + " " +
357                                             channel.toString() + ": " +
358                                             e.toString()+".");
359                             }
360                         }
361                         t.finishedConnecting(channel);
362                     }
363                     if (key.isWritable()) {
364                         SelectorReadCallback cb =
365                             (SelectorReadCallback) key.attachment();
366                         if (cb != null) {
367                             cb.selectWrite();
368                         }
369                     }
370                 } catch (CancelledKeyException JavaDoc cke) {
371                     // do nothing. what can one do?
372
}
373             }
374             while (!this.registerQueue.isEmpty()) {
375                 ActiveObject ao = (ActiveObject) this.registerQueue.remove(0);
376                 ao.call();
377             }
378         }
379     }
380 } // NetworkSelector
381
Popular Tags