1 46 package org.mr.core.net; 47 48 60 61 import java.io.IOException ; 62 import java.nio.channels.CancelledKeyException ; 63 import java.nio.channels.ClosedChannelException ; 64 import java.nio.channels.SelectableChannel ; 65 import java.nio.channels.SelectionKey ; 66 import java.nio.channels.Selector ; 67 import java.nio.channels.ServerSocketChannel ; 68 import java.nio.channels.SocketChannel ; 69 import java.util.Collections ; 70 import java.util.Iterator ; 71 import java.util.LinkedList ; 72 import java.util.List ; 73 74 import org.apache.commons.logging.Log; 75 import org.apache.commons.logging.LogFactory; 76 import org.mr.core.util.ActiveObject; 77 78 public class NetworkSelector implements Runnable { 79 private Selector selector; 80 private NetworkListener listener; 81 private Thread myThread; 82 private Log log; 83 private List registerQueue; 84 85 88 public NetworkSelector() { 89 this.log = LogFactory.getLog("NetworkSelector"); 90 this.selector = null; 91 this.registerQueue = Collections.synchronizedList(new LinkedList ()); 92 93 try { 94 this.selector = Selector.open(); 95 } catch (IOException e) { 96 if(log.isFatalEnabled()){ 97 log.fatal("Cannot open Selector", e); 98 } 99 } 100 } 102 107 public void addServerChannel(ServerSocketChannel channel) { 108 final SelectableChannel fchannel = channel; 109 this.registerQueue.add(new ActiveObject() { 110 public boolean call() { 111 registerAcceptChannel(fchannel); 112 return true; 113 } 114 }); 115 this.selector.wakeup(); 116 } 117 118 123 public void addTransportImpl(TransportImpl impl, Transport owner) { 124 final SelectableChannel fchannel = impl.getChannel(); 125 final SelectorReadCallback fcb = impl; 126 final Transport fowner = owner; 127 128 if (fchannel != null) { 129 if (impl.isConnected()) { 130 this.registerQueue.add(new ActiveObject() { 131 public boolean call() { 132 registerReadChannel(fchannel, fcb); 133 return true; 134 } 135 }); 136 } else { 137 this.registerQueue.add(new ActiveObject() { 138 public boolean call() { 139 registerConnectingChannel(fchannel, fowner); 140 return true; 141 } 142 }); 143 } 144 this.selector.wakeup(); 145 } 146 } 147 148 public void addImplForWrite(TransportImpl impl) { 149 final SelectableChannel fchannel = impl.getChannel(); 150 final SelectorReadCallback fcb = impl; 151 152 this.registerQueue.add(new ActiveObject() { 153 public boolean call() { 154 registerWriteChannel(fchannel, fcb); 155 return true; 156 } 157 }); 158 this.selector.wakeup(); 159 } 160 161 public void removeImplForWrite(TransportImpl impl) { 162 final SelectableChannel fchannel = impl.getChannel(); 163 164 this.registerQueue.add(new ActiveObject() { 165 public boolean call() { 166 deregisterWriteChannel(fchannel); 167 return true; 168 } 169 }); 170 this.selector.wakeup(); 171 } 172 173 private void registerReadChannel(SelectableChannel channel, 174 SelectorReadCallback cb) 175 { 176 try { 177 channel.configureBlocking(false); 178 SelectionKey readKey = channel.register(this.selector, 179 SelectionKey.OP_READ); 180 readKey.attach(cb); 181 } catch (ClosedChannelException e) { 182 if(log.isDebugEnabled()){ 183 log.debug("Cannot register read channel (" + 184 channel.toString() + ")"); 185 } 186 } catch (IOException e) { 187 if(log.isWarnEnabled()){ 188 log.warn("IO Error while registering new read channel (" 189 + channel.toString() + "): " + e); 190 } 191 } 192 } 193 194 private void registerWriteChannel(SelectableChannel channel, 195 SelectorReadCallback cb) { 196 try { 197 channel.configureBlocking(false); 198 SelectionKey key = channel.keyFor(this.selector); 199 if (key == null) { 200 key = channel.register(this.selector, 201 SelectionKey.OP_WRITE); 202 } else { 203 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 204 } 205 key.attach(cb); 206 } catch (IOException e) { 207 if(log.isWarnEnabled()){ 208 log.warn("IO Error while registering write channel (" + 209 channel.toString() + "): " + e.toString()+"."); 210 } 211 } catch (CancelledKeyException e) {} 212 } 213 214 private void deregisterWriteChannel(SelectableChannel channel) { 215 try { 216 SelectionKey key = channel.keyFor(this.selector); 217 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); 218 } catch (CancelledKeyException e) {} 219 } 220 221 private void registerConnectingChannel(SelectableChannel channel, 222 Transport owner) { 223 try { 224 channel.configureBlocking(false); 225 SelectionKey connectKey = 226 channel.register(this.selector, SelectionKey.OP_CONNECT); 227 connectKey.attach(owner); 228 } catch (ClosedChannelException e) { 229 if(log.isDebugEnabled()){ 230 log.debug("Cannot register connecting channel (" + 231 channel.toString() + ")"); 232 } 233 } catch (IOException e) { 234 if(log.isWarnEnabled()){ 235 log.warn("IO Error while registering new connecting " + 236 "channel (" + channel.toString() + "): " + 237 e.toString()+"."); 238 } 239 } 240 } 241 242 private void registerAcceptChannel(SelectableChannel channel) { 243 try { 244 channel.configureBlocking(false); 245 channel.register(this.selector, SelectionKey.OP_ACCEPT); 246 } catch (ClosedChannelException e) { 247 if(log.isErrorEnabled()){ 248 log.error("Cannot register server channel (local = " + 249 channel.toString() + 250 ") - THIS SHOULDN'T HAPPEN!!", e); 251 } 252 } catch (IOException e) { 253 if(log.isErrorEnabled()){ 254 log.error("IO Error while registering new server channel (" 255 + channel.toString() + ").", e); 256 } 257 } 258 } 259 260 264 public void setListener(NetworkListener listener) { 265 this.listener = listener; 266 } 267 268 271 public void start() { 272 this.myThread = new Thread (this, "NetworkSelector"); 273 this.myThread.start(); 274 } 275 276 279 public void run() { 280 Iterator selectedIter; 281 SelectionKey key; 282 int nErrors = 0; 283 284 while (true) { 285 try { 286 selector.select(); 287 } catch (IOException e) { 288 if(log.isErrorEnabled()){ 289 log.error("Error selecting", e); 290 } 291 nErrors++; 292 if (nErrors == 10) { 293 if(log.isFatalEnabled()){ 294 log.fatal("Selector exiting"); 295 } 296 return; 297 } 298 continue; 299 } catch (NullPointerException e) { 300 if(log.isDebugEnabled()){ 306 log.debug("Null pointer while selecting."); 307 } 308 continue; 309 } catch (CancelledKeyException cke) { 310 } 312 313 314 nErrors = 0; 315 selectedIter = selector.selectedKeys().iterator(); 316 while (selectedIter.hasNext()) { 317 key = (SelectionKey ) selectedIter.next(); 318 selectedIter.remove(); 319 320 try { 321 if (key.isAcceptable()) { 322 SocketChannel channel; 323 ServerSocketChannel server = 324 (ServerSocketChannel ) key.channel(); 325 try { 326 channel = server.accept(); 327 if (this.listener != null) { 328 listener.acceptedChannel(channel); 329 } 330 } catch (IOException e) { 331 if(log.isErrorEnabled()){ 332 log.error("Error accepting (local = " + 333 server.socket(). 334 getLocalSocketAddress(). 335 toString() + ").", e); 336 } 337 } 338 } 339 if (key.isReadable()) { 340 SelectorReadCallback srcb = 341 (SelectorReadCallback) key.attachment(); 342 if (srcb != null) { 343 srcb.read(); 344 } 345 } 346 if (key.isConnectable()) { 347 Transport t = (Transport) key.attachment(); 348 SocketChannel channel = 349 (SocketChannel ) key.channel(); 350 try { 351 channel.finishConnect(); 352 } catch (IOException e) { 353 if(log.isInfoEnabled()){ 354 log.info("Error connecting to " + 355 t.getInfo().getSocketAddress(). 356 toString() + " " + 357 channel.toString() + ": " + 358 e.toString()+"."); 359 } 360 } 361 t.finishedConnecting(channel); 362 } 363 if (key.isWritable()) { 364 SelectorReadCallback cb = 365 (SelectorReadCallback) key.attachment(); 366 if (cb != null) { 367 cb.selectWrite(); 368 } 369 } 370 } catch (CancelledKeyException cke) { 371 } 373 } 374 while (!this.registerQueue.isEmpty()) { 375 ActiveObject ao = (ActiveObject) this.registerQueue.remove(0); 376 ao.call(); 377 } 378 } 379 } 380 } | Popular Tags |