1 22 package org.xsocket; 23 24 25 import java.io.IOException ; 26 import java.nio.channels.SelectionKey ; 27 import java.nio.channels.Selector ; 28 import java.util.HashSet ; 29 import java.util.Iterator ; 30 import java.util.Set ; 31 import java.util.logging.Level ; 32 import java.util.logging.Logger ; 33 34 35 36 43 public class Dispatcher<T extends IHandle> implements IDispatcher<T> { 44 45 private static final Logger LOG = Logger.getLogger(Dispatcher.class.getName()); 46 47 private static final long TIMEOUT_SHUTDOWN_MILLIS = 5L * 1000L; 48 49 private boolean isOpen = true; 51 52 private Object dispatcherThreadGuard = new Object (); 54 55 private Selector selector = null; 57 58 private IEventHandler<T> eventHandler = null; 60 61 62 63 private long handledRegistractions = 0; 65 private long handledReads = 0; 66 private long handledWrites = 0; 67 68 73 public Dispatcher(IEventHandler<T> eventHandler) { 74 assert (eventHandler != null) : "null is not allowed for event handler "; 75 76 this.eventHandler = eventHandler; 77 78 79 if (LOG.isLoggable(Level.FINE)) { 80 LOG.fine("dispatcher " + this.hashCode() + " has been created (eventHandler=" + eventHandler + ")"); 81 } 82 83 try { 84 selector = Selector.open(); 85 } catch (IOException ioe) { 86 String text = "exception occured while opening selector. Reason: " + ioe.toString(); 87 LOG.severe(text); 88 throw new RuntimeException (text, ioe); 89 } 90 } 91 92 93 96 public final IEventHandler<T> getEventHandler() { 97 return eventHandler; 98 } 99 100 101 104 public void register(T handle, int ops) throws IOException { 105 assert (!handle.getChannel().isBlocking()); 106 107 if (LOG.isLoggable(Level.FINE)) { 108 LOG.fine("register handle " + handle); 109 } 110 111 synchronized (dispatcherThreadGuard) { 112 selector.wakeup(); 113 114 handle.getChannel().register(selector, ops, handle); 115 eventHandler.onHandleRegisterEvent(handle); 116 } 117 118 119 handledRegistractions++; 120 } 121 122 123 126 public void deregister(final T handle) throws IOException { 127 128 synchronized (dispatcherThreadGuard) { 129 selector.wakeup(); 130 131 SelectionKey key = handle.getChannel().keyFor(selector); 132 if (key.isValid()) { 133 key.cancel(); 134 } 135 } 136 } 137 138 139 142 @SuppressWarnings ("unchecked") 143 public final Set <T> getRegistered() { 144 145 Set <T> registered = new HashSet <T>(); 146 147 if (selector != null) { 148 SelectionKey [] selKeys = null; 149 synchronized (dispatcherThreadGuard) { 150 selector.wakeup(); 151 152 Set <SelectionKey > keySet = selector.keys(); 153 selKeys = keySet.toArray(new SelectionKey [keySet.size()]); 154 } 155 156 try { 157 for (SelectionKey key : selKeys) { 158 T handle = (T) key.attachment(); 159 registered.add(handle); 160 } 161 } catch (Exception ignore) { } 162 } 163 164 return registered; 165 } 166 167 168 169 172 public final void updateInterestSet(T handle, int ops) throws IOException { 173 SelectionKey key = handle.getChannel().keyFor(selector); 174 175 if (key != null) { 176 synchronized (dispatcherThreadGuard) { 177 if (key.isValid()) { 178 key.selector().wakeup(); 179 key.interestOps(ops); 180 181 if (LOG.isLoggable(Level.FINER)) { 182 LOG.finer("interest ops for " + handle + " updated to " + ops); 183 } 184 } else { 185 throw new IOException ("handle " + handle + " is invalid "); 186 } 187 } 188 } 189 } 190 191 192 193 196 @SuppressWarnings ("unchecked") 197 public final void run() { 198 199 if (LOG.isLoggable(Level.FINE)) { 200 LOG.fine("selector listening ..."); 201 } 202 203 204 while(isOpen) { 205 try { 206 207 synchronized (dispatcherThreadGuard) { 209 210 } 211 212 int eventCount = selector.select(1000); 214 215 if (eventCount > 0) { 217 Set selectedEventKeys = selector.selectedKeys(); 218 Iterator it = selectedEventKeys.iterator(); 219 220 while (it.hasNext()) { 222 SelectionKey eventKey = (SelectionKey ) it.next(); 223 it.remove(); 224 225 T handle = (T) eventKey.attachment(); 226 227 if (eventKey.isValid() && eventKey.isReadable()) { 229 230 eventHandler.onHandleReadableEvent(handle); 232 handledReads++; 233 } 234 235 if (eventKey.isValid() && eventKey.isWritable()) { 237 handledWrites++; 238 239 eventHandler.onHandleWriteableEvent(handle); 241 } 242 } 243 } 244 245 } catch (Throwable e) { 246 LOG.warning("exception occured while processing. Reason " + e.toString()); 247 } 248 } 249 250 251 closeDispatcher(); 252 } 253 254 255 @SuppressWarnings ("unchecked") 256 private void closeDispatcher() { 257 LOG.fine("closing connections"); 258 259 260 261 262 if (selector != null) { 263 try { 264 selector.close(); 265 } catch (Exception e) { 266 if (LOG.isLoggable(Level.FINE)) { 267 LOG.fine("error occured by close selector within tearDown " + e.toString()); 268 } 269 } 270 } 271 } 272 273 274 275 278 public void close() { 279 if (isOpen) { 280 if (selector != null) { 281 282 Set <T> openHandles = getRegistered(); 284 final int openConnections = openHandles.size(); 285 for (T handle : openHandles) { 286 eventHandler.onDispatcherCloseEvent(handle); 287 } 288 289 Thread closer = new Thread () { 291 @Override 292 public void run() { 293 long start = System.currentTimeMillis(); 294 295 int terminatedConnections = 0; 296 do { 297 try { 298 Thread.sleep(100); 299 } catch (InterruptedException ignore) { } 300 301 if (System.currentTimeMillis() > (start + TIMEOUT_SHUTDOWN_MILLIS)) { 302 LOG.warning("shutdown timeout reached (" + DataConverter.toFormatedDuration(TIMEOUT_SHUTDOWN_MILLIS) + "). kill pending connections"); 303 for (SelectionKey sk : selector.keys()) { 304 try { 305 terminatedConnections++; 306 sk.channel().close(); 307 } catch (Exception ignore) { } 308 } 309 310 break; 311 } 312 } while (getRegistered().size() > 0); 313 314 isOpen = false; 315 selector.wakeup(); 317 318 if ((openConnections > 0) || (terminatedConnections > 0)) { 319 if ((openConnections > 0) && (terminatedConnections > 0)) { 320 LOG.info((openConnections - terminatedConnections) + " connections has been closed properly, " 321 + terminatedConnections + " connections has been terminate unclean"); 322 } 323 } 324 325 326 if (LOG.isLoggable(Level.FINE)) { 327 LOG.fine("dispatcher " + this.hashCode() + " has been closed (shutdown time = " + DataConverter.toFormatedDuration(System.currentTimeMillis() - start) + ")"); 328 } 329 } 330 }; 331 closer.setName("xDispatcherCloser"); 332 closer.start(); 333 } 334 } 335 } 336 337 338 342 public final boolean isOpen() { 343 return isOpen; 344 } 345 346 347 348 349 350 353 public final long getNumberOfHandledRegistrations() { 354 return handledRegistractions; 355 } 356 357 358 361 public final long getNumberOfHandledReads() { 362 return handledReads; 363 } 364 365 366 369 public final long getNumberOfHandledWrites() { 370 return handledWrites; 371 } 372 } 373 | Popular Tags |