1 5 package com.tc.net.core; 6 7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 8 9 import com.tc.exception.TCInternalError; 10 import com.tc.net.NIOWorkarounds; 11 import com.tc.net.core.event.TCListenerEvent; 12 import com.tc.util.Assert; 13 import com.tc.util.Util; 14 import com.tc.util.runtime.Os; 15 16 import java.io.IOException ; 17 import java.net.Socket ; 18 import java.nio.channels.CancelledKeyException ; 19 import java.nio.channels.Channel ; 20 import java.nio.channels.ClosedChannelException ; 21 import java.nio.channels.GatheringByteChannel ; 22 import java.nio.channels.ScatteringByteChannel ; 23 import java.nio.channels.SelectableChannel ; 24 import java.nio.channels.SelectionKey ; 25 import java.nio.channels.Selector ; 26 import java.nio.channels.ServerSocketChannel ; 27 import java.nio.channels.SocketChannel ; 28 import java.util.HashSet ; 29 import java.util.Iterator ; 30 import java.util.Random ; 31 import java.util.Set ; 32 33 39 class TCCommJDK14 extends AbstractTCComm { 40 41 TCCommJDK14() { 42 } 44 45 protected void startImpl() { 46 this.selector = null; 47 48 final int tries = 3; 49 50 for (int i = 0; i < tries; i++) { 51 try { 52 this.selector = Selector.open(); 53 break; 54 } catch (IOException ioe) { 55 throw new RuntimeException (ioe); 56 } catch (NullPointerException npe) { 57 if (i < tries && NIOWorkarounds.selectorOpenRace(npe)) { 58 System.err.println("Attempting to work around sun bug 6427854 (attempt " + (i + 1) + " of " + tries + ")"); 59 try { 60 Thread.sleep(new Random ().nextInt(20) + 5); 61 } catch (InterruptedException ie) { 62 } 64 continue; 65 } 66 throw npe; 67 } 68 } 69 70 if (this.selector == null) { throw new RuntimeException ("Could not start selector"); } 71 72 commThread = new TCCommThread(this); 73 commThread.start(); 74 } 75 76 protected void stopImpl() { 77 try { 78 if (selector != null) { 79 selector.wakeup(); 80 } 81 } catch (Exception e) { 82 logger.error("Exception trying to stop TCComm", e); 83 } 84 } 85 86 void addSelectorTask(final Runnable task) { 87 Assert.eval(!isCommThread()); 88 boolean isInterrupted = false; 89 90 try { 91 while (true) { 92 try { 93 selectorTasks.put(task); 94 break; 95 } catch (InterruptedException e) { 96 logger.warn(e); 97 isInterrupted = true; 98 } 99 } 100 } finally { 101 selector.wakeup(); 102 } 103 Util.selfInterruptIfNeeded(isInterrupted); 104 } 105 106 void stopListener(final ServerSocketChannel ssc, final Runnable callback) { 107 if (!isCommThread()) { 108 Runnable task = new Runnable () { 109 public void run() { 110 TCCommJDK14.this.stopListener(ssc, callback); 111 } 112 }; 113 addSelectorTask(task); 114 return; 115 } 116 117 try { 118 cleanupChannel(ssc, null); 119 } catch (Exception e) { 120 logger.error(e); 121 } finally { 122 try { 123 callback.run(); 124 } catch (Exception e) { 125 logger.error(e); 126 } 127 } 128 } 129 130 void unregister(SelectableChannel channel) { 131 Assert.assertTrue(isCommThread()); 132 SelectionKey key = channel.keyFor(selector); 133 if (key != null) { 134 key.cancel(); 135 key.attach(null); 136 } 137 } 138 139 void cleanupChannel(final Channel ch, final Runnable callback) { 140 if (null == ch) { 141 logger.warn("null channel passed to cleanupChannel()", new Throwable ()); 143 return; 144 } 145 146 if (!isCommThread()) { 147 if (logger.isDebugEnabled()) { 148 logger.debug("queue'ing channel close operation"); 149 } 150 151 addSelectorTask(new Runnable () { 152 public void run() { 153 TCCommJDK14.this.cleanupChannel(ch, callback); 154 } 155 }); 156 return; 157 } 158 159 try { 160 if (ch instanceof SelectableChannel ) { 161 SelectableChannel sc = (SelectableChannel ) ch; 162 163 try { 164 SelectionKey sk = sc.keyFor(selector); 165 if (sk != null) { 166 sk.attach(null); 167 sk.cancel(); 168 } 169 } catch (Exception e) { 170 logger.error("Exception trying to clear selection key", e); 171 } 172 } 173 174 if (ch instanceof SocketChannel ) { 175 SocketChannel sc = (SocketChannel ) ch; 176 177 Socket s = sc.socket(); 178 179 if (null != s) { 180 synchronized (s) { 181 182 if (s.isConnected()) { 183 try { 184 if (!s.isOutputShutdown()) { 185 s.shutdownOutput(); 186 } 187 } catch (Exception e) { 188 logger.error("Exception trying to shutdown socket output: " + e.getMessage()); 189 } 190 191 try { 192 if (!s.isClosed()) { 193 s.close(); 194 } 195 } catch (Exception e) { 196 logger.error("Exception trying to close() socket: " + e.getMessage()); 197 } 198 } 199 } 200 } 201 } else if (ch instanceof ServerSocketChannel ) { 202 ServerSocketChannel ssc = (ServerSocketChannel ) ch; 203 204 try { 205 ssc.close(); 206 } catch (Exception e) { 207 logger.error("Exception trying to close() server socket" + e.getMessage()); 208 } 209 } 210 211 try { 212 ch.close(); 213 } catch (Exception e) { 214 logger.error("Exception trying to close channel", e); 215 } 216 } catch (Exception e) { 217 logger.error("Unhandled exception in cleanupChannel()", e); 219 } finally { 220 try { 221 if (callback != null) { 222 callback.run(); 223 } 224 } catch (Throwable t) { 225 logger.error("Unhandled exception in cleanupChannel callback.", t); 226 } 227 } 228 229 } 230 231 void requestConnectInterest(TCConnectionJDK14 conn, SocketChannel sc) { 232 handleRequest(InterestRequest.createSetInterestRequest(sc, conn, SelectionKey.OP_CONNECT)); 233 } 234 235 void requestReadInterest(TCJDK14ChannelReader reader, ScatteringByteChannel channel) { 236 handleRequest(InterestRequest.createAddInterestRequest((SelectableChannel ) channel, reader, SelectionKey.OP_READ)); 237 } 238 239 void requestWriteInterest(TCJDK14ChannelWriter writer, GatheringByteChannel channel) { 240 handleRequest(InterestRequest.createAddInterestRequest((SelectableChannel ) channel, writer, SelectionKey.OP_WRITE)); 241 } 242 243 void requestAcceptInterest(TCListenerJDK14 lsnr, ServerSocketChannel ssc) { 244 handleRequest(InterestRequest.createSetInterestRequest(ssc, lsnr, SelectionKey.OP_ACCEPT)); 245 } 246 247 void removeWriteInterest(TCConnectionJDK14 conn, SelectableChannel channel) { 248 handleRequest(InterestRequest.createRemoveInterestRequest(channel, conn, SelectionKey.OP_WRITE)); 249 } 250 251 void removeReadInterest(TCConnectionJDK14 conn, SelectableChannel channel) { 252 handleRequest(InterestRequest.createRemoveInterestRequest(channel, conn, SelectionKey.OP_READ)); 253 } 254 255 public void closeEvent(TCListenerEvent event) { 256 commThread.listenerAdded(event.getSource()); 257 } 258 259 void listenerAdded(TCListener listener) { 260 commThread.listenerAdded(listener); 261 } 262 263 private void handleRequest(final InterestRequest req) { 264 if (isStopped()) { return; } 266 267 if (isCommThread()) { 268 modifyInterest(req); 269 } else { 270 addSelectorTask(new Runnable () { 271 public void run() { 272 TCCommJDK14.this.handleRequest(req); 273 } 274 }); 275 return; 276 } 277 } 278 279 void selectLoop() throws IOException { 280 Assert.assertNotNull("selector", selector); 281 Assert.eval("Not started", isStarted()); 282 283 while (true) { 284 final int numKeys; 285 try { 286 numKeys = selector.select(); 287 } catch (IOException ioe) { 288 if (NIOWorkarounds.linuxSelectWorkaround(ioe)) { 289 logger.warn("working around Sun bug 4504001"); 290 continue; 291 } 292 throw ioe; 293 } 294 295 if (isStopped()) { 296 if (logger.isDebugEnabled()) { 297 logger.debug("Select loop terminating"); 298 } 299 return; 300 } 301 302 boolean isInterrupted = false; 303 while (true) { 305 Runnable task = null; 306 while (true) { 307 try { 308 task = (Runnable ) selectorTasks.poll(0); 309 break; 310 } catch (InterruptedException ie) { 311 logger.error("Error getting task from task queue", ie); 312 isInterrupted = true; 313 } 314 } 315 316 if (null == task) { 317 break; 318 } 319 320 try { 321 task.run(); 322 } catch (Exception e) { 323 logger.error("error running selector task", e); 324 } 325 } 326 Util.selfInterruptIfNeeded(isInterrupted); 327 328 final Set selectedKeys = selector.selectedKeys(); 329 if ((0 == numKeys) && (0 == selectedKeys.size())) { 330 continue; 331 } 332 333 for (Iterator iter = selectedKeys.iterator(); iter.hasNext();) { 334 SelectionKey key = (SelectionKey ) iter.next(); 335 iter.remove(); 336 337 if (null == key) { 338 logger.error("Selection key is null"); 339 continue; 340 } 341 342 try { 343 if (key.isAcceptable()) { 344 doAccept(key); 345 continue; 346 } 347 348 if (key.isConnectable()) { 349 doConnect(key); 350 continue; 351 } 352 353 if (key.isReadable()) { 354 ((TCJDK14ChannelReader) key.attachment()).doRead((ScatteringByteChannel ) key.channel()); 355 } 356 357 if (key.isValid() && key.isWritable()) { 358 ((TCJDK14ChannelWriter) key.attachment()).doWrite((GatheringByteChannel ) key.channel()); 359 } 360 } catch (CancelledKeyException cke) { 361 logger.warn(cke.getClass().getName() + " occured"); 362 } 363 } } } 366 367 private void dispose() { 368 if (selector != null) { 369 370 for (Iterator keys = selector.keys().iterator(); keys.hasNext();) { 371 try { 372 SelectionKey key = (SelectionKey ) keys.next(); 373 cleanupChannel(key.channel(), null); 374 } 375 376 catch (Exception e) { 377 logger.warn("Exception trying to close channel", e); 378 } 379 } 380 381 try { 382 selector.close(); 383 } catch (Exception e) { 384 if ((Os.isMac()) && (Os.isUnix()) && (e.getMessage().equals("Bad file descriptor"))) { 385 logger.warn("Exception trying to close selector: " + e.getMessage()); 388 } else { 389 logger.error("Exception trying to close selector", e); 390 } 391 } 392 } 393 394 selectorTasks = new LinkedQueue(); 396 } 397 398 private boolean isCommThread() { 399 return isCommThread(Thread.currentThread()); 400 } 401 402 private boolean isCommThread(Thread thread) { 403 if (thread == null) { return false; } 404 return thread == commThread; 405 } 406 407 private void doConnect(SelectionKey key) { 408 SocketChannel sc = (SocketChannel ) key.channel(); 409 TCConnectionJDK14 conn = (TCConnectionJDK14) key.attachment(); 410 411 try { 412 if (sc.finishConnect()) { 413 sc.register(selector, SelectionKey.OP_READ, conn); 414 conn.finishConnect(); 415 } else { 416 String errMsg = "finishConnect() returned false, but no exception thrown"; 417 418 if (logger.isInfoEnabled()) { 419 logger.info(errMsg); 420 } 421 422 conn.fireErrorEvent(errMsg); 423 } 424 } catch (IOException ioe) { 425 if (logger.isInfoEnabled()) { 426 logger.info("IOException attempting to finish socket connection", ioe); 427 } 428 429 conn.fireErrorEvent(ioe, null); 430 } 431 } 432 433 private void modifyInterest(InterestRequest request) { 434 Assert.eval(isCommThread()); 435 436 try { 437 final int existingOps; 438 439 SelectionKey key = request.channel.keyFor(selector); 440 if (key != null) { 441 existingOps = key.interestOps(); 442 } else { 443 existingOps = 0; 444 } 445 446 if (logger.isDebugEnabled()) { 447 logger.debug(request); 448 } 449 450 if (request.add) { 451 request.channel.register(selector, existingOps | request.interestOps, request.attachment); 452 } else if (request.set) { 453 request.channel.register(selector, request.interestOps, request.attachment); 454 } else if (request.remove) { 455 request.channel.register(selector, existingOps ^ request.interestOps, request.attachment); 456 } else { 457 throw new TCInternalError(); 458 } 459 } catch (ClosedChannelException cce) { 460 logger.warn("Exception trying to process interest request: " + cce); 461 462 } catch (CancelledKeyException cke) { 463 logger.warn("Exception trying to process interest request: " + cke); 464 } 465 } 466 467 private void doAccept(final SelectionKey key) { 468 Assert.eval(isCommThread()); 469 470 SocketChannel sc = null; 471 472 TCListenerJDK14 lsnr = (TCListenerJDK14) key.attachment(); 473 474 try { 475 final ServerSocketChannel ssc = (ServerSocketChannel ) key.channel(); 476 sc = ssc.accept(); 477 sc.configureBlocking(false); 478 final Socket s = sc.socket(); 479 480 try { 481 s.setSendBufferSize(64 * 1024); 482 } catch (IOException ioe) { 483 logger.warn("IOException trying to setSendBufferSize()"); 484 } 485 486 try { 487 s.setTcpNoDelay(true); 488 } catch (IOException ioe) { 489 logger.warn("IOException trying to setTcpNoDelay()", ioe); 490 } 491 492 TCConnectionJDK14 conn = lsnr.createConnection(sc); 493 sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn); 494 } catch (IOException ioe) { 495 if (logger.isInfoEnabled()) { 496 logger.info("IO Exception accepting new connection", ioe); 497 } 498 499 cleanupChannel(sc, null); 500 } 501 } 502 503 private Selector selector; 504 private TCCommThread commThread = null; 505 private LinkedQueue selectorTasks = new LinkedQueue(); 506 507 private static class InterestRequest { 508 final SelectableChannel channel; 509 final Object attachment; 510 final boolean set; 511 final boolean add; 512 final boolean remove; 513 final int interestOps; 514 515 static InterestRequest createAddInterestRequest(SelectableChannel channel, Object attachment, int interestOps) { 516 return new InterestRequest(channel, attachment, interestOps, false, true, false); 517 } 518 519 static InterestRequest createSetInterestRequest(SelectableChannel channel, Object attachment, int interestOps) { 520 return new InterestRequest(channel, attachment, interestOps, true, false, false); 521 } 522 523 static InterestRequest createRemoveInterestRequest(SelectableChannel channel, Object attachment, int interestOps) { 524 return new InterestRequest(channel, attachment, interestOps, false, false, true); 525 } 526 527 private InterestRequest(SelectableChannel channel, Object attachment, int interestOps, boolean set, boolean add, 528 boolean remove) { 529 Assert.eval(remove ^ set ^ add); 530 Assert.eval(channel != null); 531 532 this.channel = channel; 533 this.attachment = attachment; 534 this.set = set; 535 this.add = add; 536 this.remove = remove; 537 this.interestOps = interestOps; 538 } 539 540 public String toString() { 541 StringBuffer buf = new StringBuffer (); 542 543 buf.append("Interest modify request: ").append(channel.toString()).append("\n"); 544 buf.append("Ops: "); 545 546 if ((interestOps & SelectionKey.OP_ACCEPT) != 0) { 547 buf.append(" ACCEPT"); 548 } 549 550 if ((interestOps & SelectionKey.OP_CONNECT) != 0) { 551 buf.append(" CONNECT"); 552 } 553 554 if ((interestOps & SelectionKey.OP_READ) != 0) { 555 buf.append(" READ"); 556 } 557 558 if ((interestOps & SelectionKey.OP_WRITE) != 0) { 559 buf.append(" WRITE"); 560 } 561 562 buf.append("\n"); 563 564 buf.append("Set: ").append(set).append(", Remove: ").append(remove).append(", Add: ").append(add).append("\n"); 565 buf.append("Attachment: "); 566 567 if (attachment != null) { 568 buf.append(attachment.toString()); 569 } else { 570 buf.append("null"); 571 } 572 573 buf.append("\n"); 574 575 return buf.toString(); 576 } 577 578 } 579 580 private static class TCCommThread extends Thread { 583 final TCCommJDK14 commInstance; 584 final Set listeners = new HashSet (); 585 final int number = getNextCounter(); 586 final String baseName = "TCComm Selector Thread " + number; 587 588 private static int counter = 1; 589 590 private static synchronized int getNextCounter() { 591 return counter++; 592 } 593 594 TCCommThread(TCCommJDK14 comm) { 595 commInstance = comm; 596 setDaemon(true); 597 setName(baseName); 598 599 if (logger.isDebugEnabled()) { 600 logger.debug("Creating a new selector thread (" + toString() + ")", new Throwable ()); 601 } 602 } 603 604 String makeListenString(TCListener listener) { 605 StringBuffer buf = new StringBuffer (); 606 buf.append("(listen "); 607 buf.append(listener.getBindAddress().getHostAddress()); 608 buf.append(':'); 609 buf.append(listener.getBindPort()); 610 buf.append(')'); 611 return buf.toString(); 612 } 613 614 synchronized void listenerRemoved(TCListener listener) { 615 listeners.remove(makeListenString(listener)); 616 updateThreadName(); 617 } 618 619 synchronized void listenerAdded(TCListener listener) { 620 listeners.add(makeListenString(listener)); 621 updateThreadName(); 622 } 623 624 private void updateThreadName() { 625 StringBuffer buf = new StringBuffer (baseName); 626 for (final Iterator iter = listeners.iterator(); iter.hasNext();) { 627 buf.append(' '); 628 buf.append(iter.next()); 629 } 630 631 setName(buf.toString()); 632 } 633 634 public void run() { 635 try { 636 commInstance.selectLoop(); 637 } catch (Throwable t) { 638 logger.error("Unhandled exception from selectLoop", t); 639 t.printStackTrace(); 640 } finally { 641 commInstance.dispose(); 642 } 643 } 644 } 645 646 } 647 | Popular Tags |