1 7 8 package com.sun.corba.se.impl.transport; 9 10 import java.io.IOException ; 11 import java.nio.channels.ClosedChannelException ; 12 import java.nio.channels.SelectableChannel ; 13 import java.nio.channels.SelectionKey ; 14 import java.nio.channels.Selector ; 15 import java.util.ArrayList ; 16 import java.util.HashMap ; 17 import java.util.Iterator ; 18 import java.util.List ; 19 20 import com.sun.corba.se.pept.broker.Broker; 21 import com.sun.corba.se.pept.transport.Acceptor; 22 import com.sun.corba.se.pept.transport.Connection; 23 import com.sun.corba.se.pept.transport.EventHandler; 24 import com.sun.corba.se.pept.transport.ListenerThread; 25 import com.sun.corba.se.pept.transport.ReaderThread; 26 27 import com.sun.corba.se.spi.logging.CORBALogDomains; 28 import com.sun.corba.se.spi.orb.ORB; 29 import com.sun.corba.se.spi.orbutil.threadpool.Work; 30 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 31 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 32 33 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 34 import com.sun.corba.se.impl.orbutil.ORBUtility; 35 36 39 public class SelectorImpl 40 extends 41 Thread 42 implements 43 com.sun.corba.se.pept.transport.Selector 44 { 45 private ORB orb; 46 private Selector selector; 47 private long timeout; 48 private List deferredRegistrations; 49 private List interestOpsList; 50 private HashMap listenerThreads; 51 private HashMap readerThreads; 52 private boolean selectorStarted; 53 private boolean closed; 54 private ORBUtilSystemException wrapper ; 55 56 57 public SelectorImpl(ORB orb) 58 { 59 this.orb = orb; 60 selector = null; 61 selectorStarted = false; 62 timeout = 60000; 63 deferredRegistrations = new ArrayList (); 64 interestOpsList = new ArrayList (); 65 listenerThreads = new HashMap (); 66 readerThreads = new HashMap (); 67 closed = false; 68 wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT); 69 } 70 71 public void setTimeout(long timeout) 72 { 73 this.timeout = timeout; 74 } 75 76 public long getTimeout() 77 { 78 return timeout; 79 } 80 81 public void registerInterestOps(EventHandler eventHandler) 82 { 83 if (orb.transportDebugFlag) { 84 dprint(".registerInterestOps:-> " + eventHandler); 85 } 86 87 SelectionKey selectionKey = eventHandler.getSelectionKey(); 88 if (selectionKey.isValid()) { 89 int ehOps = eventHandler.getInterestOps(); 90 SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps); 91 synchronized(interestOpsList) { 92 interestOpsList.add(keyAndOp); 93 } 94 selector.wakeup(); 96 } 97 else { 98 wrapper.selectionKeyInvalid(eventHandler.toString()); 99 if (orb.transportDebugFlag) { 100 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler); 101 } 102 } 103 104 if (orb.transportDebugFlag) { 105 dprint(".registerInterestOps:<- "); 106 } 107 } 108 109 public void registerForEvent(EventHandler eventHandler) 110 { 111 if (orb.transportDebugFlag) { 112 dprint(".registerForEvent: " + eventHandler); 113 } 114 115 if (isClosed()) { 116 if (orb.transportDebugFlag) { 117 dprint(".registerForEvent: closed: " + eventHandler); 118 } 119 return; 120 } 121 122 if (eventHandler.shouldUseSelectThreadToWait()) { 123 synchronized (deferredRegistrations) { 124 deferredRegistrations.add(eventHandler); 125 } 126 if (! selectorStarted) { 127 startSelector(); 128 } 129 selector.wakeup(); 130 return; 131 } 132 133 switch (eventHandler.getInterestOps()) { 134 case SelectionKey.OP_ACCEPT : 135 createListenerThread(eventHandler); 136 break; 137 case SelectionKey.OP_READ : 138 createReaderThread(eventHandler); 139 break; 140 default: 141 if (orb.transportDebugFlag) { 142 dprint(".registerForEvent: default: " + eventHandler); 143 } 144 throw new RuntimeException ( 145 "SelectorImpl.registerForEvent: unknown interest ops"); 146 } 147 } 148 149 public void unregisterForEvent(EventHandler eventHandler) 150 { 151 if (orb.transportDebugFlag) { 152 dprint(".unregisterForEvent: " + eventHandler); 153 } 154 155 if (isClosed()) { 156 if (orb.transportDebugFlag) { 157 dprint(".unregisterForEvent: closed: " + eventHandler); 158 } 159 return; 160 } 161 162 if (eventHandler.shouldUseSelectThreadToWait()) { 163 SelectionKey selectionKey = eventHandler.getSelectionKey(); 164 selectionKey.cancel(); 165 selector.wakeup(); 166 return; 167 } 168 169 switch (eventHandler.getInterestOps()) { 170 case SelectionKey.OP_ACCEPT : 171 destroyListenerThread(eventHandler); 172 break; 173 case SelectionKey.OP_READ : 174 destroyReaderThread(eventHandler); 175 break; 176 default: 177 if (orb.transportDebugFlag) { 178 dprint(".unregisterForEvent: default: " + eventHandler); 179 } 180 throw new RuntimeException ( 181 "SelectorImpl.uregisterForEvent: unknown interest ops"); 182 } 183 } 184 185 public void close() 186 { 187 if (orb.transportDebugFlag) { 188 dprint(".close"); 189 } 190 191 if (isClosed()) { 192 if (orb.transportDebugFlag) { 193 dprint(".close: already closed"); 194 } 195 return; 196 } 197 198 setClosed(true); 199 200 Iterator i; 201 202 204 i = listenerThreads.values().iterator(); 205 while (i.hasNext()) { 206 ListenerThread listenerThread = (ListenerThread) i.next(); 207 listenerThread.close(); 208 } 209 210 212 i = readerThreads.values().iterator(); 213 while (i.hasNext()) { 214 ReaderThread readerThread = (ReaderThread) i.next(); 215 readerThread.close(); 216 } 217 218 220 try { 221 if (selector != null) { 222 selector.wakeup(); 224 } 225 } catch (Throwable t) { 226 if (orb.transportDebugFlag) { 227 dprint(".close: selector.close: " + t); 228 } 229 } 230 } 231 232 237 public void run() 238 { 239 setName("SelectorThread"); 240 while (!closed) { 241 try { 242 int n = 0; 243 if (timeout == 0 && orb.transportDebugFlag) { 244 dprint(".run: Beginning of selection cycle"); 245 } 246 handleDeferredRegistrations(); 247 enableInterestOps(); 248 try { 249 n = selector.select(timeout); 250 } catch (IOException e) { 251 if (orb.transportDebugFlag) { 252 dprint(".run: selector.select: " + e); 253 } 254 } 255 if (closed) { 256 selector.close(); 257 if (orb.transportDebugFlag) { 258 dprint(".run: closed - .run return"); 259 } 260 return; 261 } 262 270 Iterator iterator = selector.selectedKeys().iterator(); 271 if (orb.transportDebugFlag) { 272 if (iterator.hasNext()) { 273 dprint(".run: n = " + n); 274 } 275 } 276 while (iterator.hasNext()) { 277 SelectionKey selectionKey = (SelectionKey ) iterator.next(); 278 iterator.remove(); 279 EventHandler eventHandler = (EventHandler) 280 selectionKey.attachment(); 281 try { 282 eventHandler.handleEvent(); 283 } catch (Throwable t) { 284 if (orb.transportDebugFlag) { 285 dprint(".run: eventHandler.handleEvent", t); 286 } 287 } 288 } 289 if (timeout == 0 && orb.transportDebugFlag) { 290 dprint(".run: End of selection cycle"); 291 } 292 } catch (Throwable t) { 293 if (orb.transportDebugFlag) { 296 dprint(".run: ignoring", t); 297 } 298 } 299 } 300 } 301 302 307 private synchronized boolean isClosed () 308 { 309 return closed; 310 } 311 312 private synchronized void setClosed(boolean closed) 313 { 314 this.closed = closed; 315 } 316 317 private void startSelector() 318 { 319 try { 320 selector = Selector.open(); 321 } catch (IOException e) { 322 if (orb.transportDebugFlag) { 323 dprint(".startSelector: Selector.open: IOException: " + e); 324 } 325 RuntimeException rte = 327 new RuntimeException (".startSelector: Selector.open exception"); 328 rte.initCause(e); 329 throw rte; 330 } 331 setDaemon(true); 332 start(); 333 selectorStarted = true; 334 if (orb.transportDebugFlag) { 335 dprint(".startSelector: selector.start completed."); 336 } 337 } 338 339 private void handleDeferredRegistrations() 340 { 341 synchronized (deferredRegistrations) { 342 int deferredListSize = deferredRegistrations.size(); 343 for (int i = 0; i < deferredListSize; i++) { 344 EventHandler eventHandler = 345 (EventHandler)deferredRegistrations.get(i); 346 if (orb.transportDebugFlag) { 347 dprint(".handleDeferredRegistrations: " + eventHandler); 348 } 349 SelectableChannel channel = eventHandler.getChannel(); 350 SelectionKey selectionKey = null; 351 try { 352 selectionKey = 353 channel.register(selector, 354 eventHandler.getInterestOps(), 355 (Object )eventHandler); 356 } catch (ClosedChannelException e) { 357 if (orb.transportDebugFlag) { 358 dprint(".handleDeferredRegistrations: " + e); 359 } 360 } 361 eventHandler.setSelectionKey(selectionKey); 362 } 363 deferredRegistrations.clear(); 364 } 365 } 366 367 private void enableInterestOps() 368 { 369 synchronized (interestOpsList) { 370 int listSize = interestOpsList.size(); 371 if (listSize > 0) { 372 if (orb.transportDebugFlag) { 373 dprint(".enableInterestOps:->"); 374 } 375 SelectionKey selectionKey = null; 376 SelectionKeyAndOp keyAndOp = null; 377 int keyOp, selectionKeyOps = 0; 378 for (int i = 0; i < listSize; i++) { 379 keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i); 380 selectionKey = keyAndOp.selectionKey; 381 382 392 if (selectionKey.isValid()) { 393 if (orb.transportDebugFlag) { 394 dprint(".enableInterestOps: " + keyAndOp); 395 } 396 keyOp = keyAndOp.keyOp; 397 selectionKeyOps = selectionKey.interestOps(); 398 selectionKey.interestOps(selectionKeyOps | keyOp); 399 } 400 } 401 interestOpsList.clear(); 402 if (orb.transportDebugFlag) { 403 dprint(".enableInterestOps:<-"); 404 } 405 } 406 } 407 } 408 409 private void createListenerThread(EventHandler eventHandler) 410 { 411 if (orb.transportDebugFlag) { 412 dprint(".createListenerThread: " + eventHandler); 413 } 414 Acceptor acceptor = eventHandler.getAcceptor(); 415 ListenerThread listenerThread = 416 new ListenerThreadImpl(orb, acceptor, this); 417 listenerThreads.put(eventHandler, listenerThread); 418 Throwable throwable = null; 419 try { 420 orb.getThreadPoolManager().getThreadPool(0) 421 .getWorkQueue(0).addWork((Work)listenerThread); 422 } catch (NoSuchThreadPoolException e) { 423 throwable = e; 424 } catch (NoSuchWorkQueueException e) { 425 throwable = e; 426 } 427 if (throwable != null) { 428 RuntimeException rte = new RuntimeException (throwable.toString()); 429 rte.initCause(throwable); 430 throw rte; 431 } 432 } 433 434 private void destroyListenerThread(EventHandler eventHandler) 435 { 436 if (orb.transportDebugFlag) { 437 dprint(".destroyListenerThread: " + eventHandler); 438 } 439 ListenerThread listenerThread = (ListenerThread) 440 listenerThreads.get(eventHandler); 441 if (listenerThread == null) { 442 if (orb.transportDebugFlag) { 443 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring."); 444 } 445 return; 446 } 447 listenerThreads.remove(eventHandler); 448 listenerThread.close(); 449 } 450 451 private void createReaderThread(EventHandler eventHandler) 452 { 453 if (orb.transportDebugFlag) { 454 dprint(".createReaderThread: " + eventHandler); 455 } 456 Connection connection = eventHandler.getConnection(); 457 ReaderThread readerThread = 458 new ReaderThreadImpl(orb, connection, this); 459 readerThreads.put(eventHandler, readerThread); 460 Throwable throwable = null; 461 try { 462 orb.getThreadPoolManager().getThreadPool(0) 463 .getWorkQueue(0).addWork((Work)readerThread); 464 } catch (NoSuchThreadPoolException e) { 465 throwable = e; 466 } catch (NoSuchWorkQueueException e) { 467 throwable = e; 468 } 469 if (throwable != null) { 470 RuntimeException rte = new RuntimeException (throwable.toString()); 471 rte.initCause(throwable); 472 throw rte; 473 } 474 } 475 476 private void destroyReaderThread(EventHandler eventHandler) 477 { 478 if (orb.transportDebugFlag) { 479 dprint(".destroyReaderThread: " + eventHandler); 480 } 481 ReaderThread readerThread = (ReaderThread) 482 readerThreads.get(eventHandler); 483 if (readerThread == null) { 484 if (orb.transportDebugFlag) { 485 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring."); 486 } 487 return; 488 } 489 readerThreads.remove(eventHandler); 490 readerThread.close(); 491 } 492 493 private void dprint(String msg) 494 { 495 ORBUtility.dprint("SelectorImpl", msg); 496 } 497 498 protected void dprint(String msg, Throwable t) 499 { 500 dprint(msg); 501 t.printStackTrace(System.out); 502 } 503 504 private class SelectionKeyAndOp 510 { 511 public int keyOp; 513 public SelectionKey selectionKey; 514 515 public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) { 517 this.selectionKey = selectionKey; 518 this.keyOp = keyOp; 519 } 520 } 521 522 } 524 525 | Popular Tags |