1 package com.ubermq.kernel; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.kernel.*; 5 import com.ubermq.kernel.event.*; 6 import java.io.*; 7 import java.nio.channels.*; 8 import java.util.*; 9 import org.apache.log4j.*; 10 11 15 public final class ReadWriteTransformThread 16 extends Thread 17 { 18 private static final Logger log = Logger.getLogger(ReadWriteTransformThread.class); 19 20 private Selector selector; 21 private List toRegister, toUnregister, toEnable, toDisable; 22 private Object serviceNotifier; 23 private int operation; 24 private int spinDetector; 25 private static final int THRESHOLD = 1000000; 26 27 34 public ReadWriteTransformThread(int operation) 35 throws IOException 36 { 37 super("Channel-Based NIO " + (operation == SelectionKey.OP_READ ? "reader" : "writer")); 38 setDaemon(true); 39 40 this.selector = Selector.open(); 41 this.toRegister = Collections.synchronizedList(new LinkedList()); 42 this.toUnregister = Collections.synchronizedList(new LinkedList()); 43 this.toEnable = Collections.synchronizedList(new LinkedList()); 44 this.toDisable = Collections.synchronizedList(new LinkedList()); 45 this.serviceNotifier = new Object (); 46 this.operation = operation; 47 this.spinDetector = 0; 48 } 49 50 56 public void run() 57 { 58 spinDetector = 0; 59 while(!isInterrupted()) 60 { 61 try 62 { 63 registerNewChannels(); 64 int n = selector.select(); 65 66 if (n > 0) 67 { 68 spinDetector = 0; 69 acceptPendingRequests(); 70 } 71 else 72 { 73 ++spinDetector; 74 if (spinDetector > THRESHOLD) 75 { 76 log.debug("Network loss detected! Selector observed " + spinDetector + " consecutive unblocked select() calls without results"); 77 spinDetector = 0; 78 79 Selector newSelector = Selector.open(); 81 Iterator iter = selector.keys().iterator(); 82 while (iter.hasNext()) 83 { 84 SelectionKey sk = (SelectionKey)iter.next(); 85 ConnectionInfo ci = (ConnectionInfo)sk.attachment(); 86 87 int ops = sk.interestOps(); 89 sk.cancel(); 90 91 SelectableChannel channel = getChannelForConnection(ci); 93 channel.register(newSelector, ops, ci); 94 } 95 this.selector.close(); 96 this.selector = newSelector; 97 log.debug("Successfully created new selector."); 98 } 99 } 100 } 101 catch(Exception ex) 102 { 103 log.debug("Error in select() block", ex); 104 } 105 } 106 107 Iterator iter = selector.keys().iterator(); 109 while (iter.hasNext()) 110 { 111 SelectionKey key = (SelectionKey)iter.next(); 112 key.cancel(); 113 114 ConnectionInfo conn = (ConnectionInfo)key.attachment(); 115 conn.close(); 116 } 117 } 118 119 127 public void register(ConnectionInfo ci, boolean sync) 128 { 129 synchronized(serviceNotifier) 130 { 131 toRegister.add(ci); 132 selector.wakeup(); 133 134 if (sync) 135 { 136 try 137 { 138 serviceNotifier.wait(); 139 } 140 catch (InterruptedException e) 141 { 142 log.debug("interrupted waiting for registration", e); 143 } 144 } 145 } 146 } 147 148 public void unregister(ConnectionInfo ci) 149 { 150 toUnregister.add(ci); 151 selector.wakeup(); 152 } 153 154 160 public void requestService(ConnectionInfo ci) 161 { 162 toEnable.add(ci); 163 toDisable.remove(ci); 164 selector.wakeup(); 165 } 166 167 179 public void cancelServiceRequest(ConnectionInfo ci) 180 { 181 if (!toEnable.contains(ci)) 182 { 183 toDisable.add(ci); 184 selector.wakeup(); 185 } 186 } 187 188 private void registerNewChannels() throws IOException 189 { 190 synchronized(toRegister) 191 { 192 Iterator iter = toRegister.iterator(); 194 while (iter.hasNext()) 195 { 196 ConnectionInfo conn = (ConnectionInfo)iter.next(); 197 if (conn.in() instanceof SelectableChannel) 198 { 199 doRegister(conn); 200 } 201 else 202 { 203 } 205 } 206 toRegister.clear(); 207 } 208 209 synchronized(toUnregister) 210 { 211 Iterator iter = toUnregister.iterator(); 212 while (iter.hasNext()) 213 { 214 ConnectionInfo conn = (ConnectionInfo)iter.next(); 215 SelectableChannel channel = getChannelForConnection(conn); 216 SelectionKey sk = channel.keyFor(selector); 217 if (sk != null) 218 { 219 sk.cancel(); 220 } 221 else 222 log.debug("can't unregister - nothing known about " + conn); 223 } 224 toUnregister.clear(); 225 } 226 227 synchronized(toEnable) 228 { 229 Iterator iter = toEnable.iterator(); 231 while(iter.hasNext()) 232 { 233 ConnectionInfo conn = (ConnectionInfo)iter.next(); 234 SelectableChannel channel = getChannelForConnection(conn); 235 SelectionKey sk = channel.keyFor(selector); 236 if (sk != null) 237 { 238 try 239 { 240 sk.interestOps(operation); 241 log.debug("enabled " + conn + " for op " + getFriendlyOpName(operation)); 242 } 243 catch (CancelledKeyException e) 244 { 245 log.debug("can't enable cancelled key " + sk + " for " + conn, e); 246 } 247 } 248 else 249 log.debug("can't enable - nothing known about " + conn); 250 } 251 toEnable.clear(); 252 } 253 254 synchronized(toDisable) 255 { 256 Iterator iter = toDisable.iterator(); 258 while(iter.hasNext()) 259 { 260 ConnectionInfo conn = (ConnectionInfo)iter.next(); 261 SelectableChannel channel = getChannelForConnection(conn); 262 SelectionKey sk = channel.keyFor(selector); 263 if (sk != null) 264 { 265 try 266 { 267 sk.interestOps(0); 268 log.debug("disabled " + conn + " for op " + getFriendlyOpName(operation)); 269 } 270 catch(CancelledKeyException cke) 271 { 272 log.debug("can't disable - already cancelled " + conn); 274 } 275 } 276 else 277 log.debug("can't disable - nothing known about " + conn); 278 } 279 toDisable.clear(); 280 } 281 282 synchronized(serviceNotifier) 284 { 285 serviceNotifier.notifyAll(); 286 } 287 } 288 289 private String getFriendlyOpName(int op) 290 { 291 if (op == SelectionKey.OP_READ) 292 return "OP_READ"; 293 else if (op == SelectionKey.OP_WRITE) 294 return "OP_WRITE"; 295 else if (op == SelectionKey.OP_ACCEPT) 296 return "OP_ACCEPT"; 297 else 298 return String.valueOf(op); 299 } 300 301 305 private SelectableChannel getChannelForConnection(ConnectionInfo conn) 306 { 307 return operation == SelectionKey.OP_WRITE ? 308 (SelectableChannel)conn.out() : 309 (SelectableChannel)conn.in(); 310 } 311 312 private void doRegister(final ConnectionInfo conn) 313 throws IOException 314 { 315 SelectableChannel channel = getChannelForConnection(conn); 316 log.debug("registering " + getFriendlyOpName(operation) + ": conn " + conn + ", new channel is " + channel); 317 channel.configureBlocking(false); 318 319 channel.register(selector, 0, conn); 321 322 conn.setIOHandler(this, operation); 324 325 conn.addEventListener(new ConnectionEventListener() 329 { 330 public void connectionEvent(ConnectionEvent e) 331 { 332 if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED || 333 e.getEventCode() == ConnectionEvent.CONNECTION_IO_EXCEPTION) 334 { 335 unregister(conn); 336 } 337 } 338 }); 339 } 340 341 private synchronized void acceptPendingRequests() 342 throws IOException 343 { 344 Set readyKeys = selector.selectedKeys(); 345 346 for(Iterator i = readyKeys.iterator(); i.hasNext(); ) 347 { 348 SelectionKey key = (SelectionKey)i.next(); 349 i.remove(); 350 351 ConnectionInfo conn = (ConnectionInfo)key.attachment(); 352 if (!key.isValid()) 353 { 354 log.debug("key no longer valid, skipping"); 355 continue; 356 } 357 else 358 { 359 if (operation == SelectionKey.OP_WRITE && 360 key.isWritable()) 361 { 362 conn.flush(); 363 } 364 else if (operation == SelectionKey.OP_READ && 365 key.isReadable()) 366 { 367 ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); 368 369 conn.readFrom(incomingChannel, key); 371 } 372 } 373 } 374 } 375 376 } 377 378 | Popular Tags |