1 7 8 package com.sun.corba.se.impl.transport; 9 10 import java.io.IOException ; 11 import java.net.InetSocketAddress ; 12 import java.net.Socket ; 13 import java.nio.ByteBuffer ; 14 import java.nio.channels.SelectableChannel ; 15 import java.nio.channels.SelectionKey ; 16 import java.nio.channels.SocketChannel ; 17 import java.security.AccessController ; 18 import java.security.PrivilegedAction ; 19 import java.util.Collections ; 20 import java.util.Hashtable ; 21 import java.util.HashMap ; 22 import java.util.Map ; 23 24 import org.omg.CORBA.COMM_FAILURE ; 25 import org.omg.CORBA.CompletionStatus ; 26 import org.omg.CORBA.DATA_CONVERSION ; 27 import org.omg.CORBA.INTERNAL ; 28 import org.omg.CORBA.MARSHAL ; 29 import org.omg.CORBA.OBJECT_NOT_EXIST ; 30 import org.omg.CORBA.SystemException ; 31 32 import com.sun.org.omg.SendingContext.CodeBase; 33 34 import com.sun.corba.se.pept.broker.Broker; 35 import com.sun.corba.se.pept.encoding.InputObject; 36 import com.sun.corba.se.pept.encoding.OutputObject; 37 import com.sun.corba.se.pept.protocol.MessageMediator; 38 import com.sun.corba.se.pept.transport.Acceptor; 39 import com.sun.corba.se.pept.transport.Connection; 40 import com.sun.corba.se.pept.transport.ConnectionCache; 41 import com.sun.corba.se.pept.transport.ContactInfo; 42 import com.sun.corba.se.pept.transport.EventHandler; 43 import com.sun.corba.se.pept.transport.InboundConnectionCache; 44 import com.sun.corba.se.pept.transport.OutboundConnectionCache; 45 import com.sun.corba.se.pept.transport.ResponseWaitingRoom; 46 import com.sun.corba.se.pept.transport.Selector; 47 48 import com.sun.corba.se.spi.ior.IOR; 49 import com.sun.corba.se.spi.ior.iiop.GIOPVersion; 50 import com.sun.corba.se.spi.logging.CORBALogDomains; 51 import com.sun.corba.se.spi.orb.ORB ; 52 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 54 import com.sun.corba.se.spi.orbutil.threadpool.Work; 55 import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 56 import com.sun.corba.se.spi.transport.CorbaContactInfo; 57 import com.sun.corba.se.spi.transport.CorbaConnection; 58 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 59 import com.sun.corba.se.spi.transport.ReadTimeouts; 60 61 import com.sun.corba.se.impl.encoding.CachedCodeBase; 62 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0; 63 import com.sun.corba.se.impl.encoding.CDROutputObject; 64 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0; 65 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo; 66 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry; 67 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 68 import com.sun.corba.se.impl.orbutil.ORBConstants; 69 import com.sun.corba.se.impl.orbutil.ORBUtility; 70 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 71 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; 72 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl; 73 74 77 public class SocketOrChannelConnectionImpl 78 extends 79 EventHandlerBase 80 implements 81 CorbaConnection, 82 Work 83 { 84 public static boolean dprintWriteLocks = false; 85 86 90 protected long enqueueTime; 91 92 protected SocketChannel socketChannel; 93 public SocketChannel getSocketChannel() 94 { 95 return socketChannel; 96 } 97 98 protected CorbaContactInfo contactInfo; 101 protected Acceptor acceptor; 102 protected ConnectionCache connectionCache; 103 104 108 protected Socket socket; protected long timeStamp = 0; 110 protected boolean isServer = false; 111 112 protected int requestId = 5; 115 protected CorbaResponseWaitingRoom responseWaitingRoom; 116 protected int state; 117 protected java.lang.Object stateEvent = new java.lang.Object (); 118 protected java.lang.Object writeEvent = new java.lang.Object (); 119 protected boolean writeLocked; 120 protected int serverRequestCount = 0; 121 122 Map serverRequestMap = null; 125 126 protected boolean postInitialContexts = false; 130 131 protected IOR codeBaseServerIOR; 134 135 protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this); 139 140 protected ORBUtilSystemException wrapper ; 141 142 protected ReadTimeouts readTimeouts; 144 145 protected boolean shouldReadGiopHeaderOnly; 146 147 protected CorbaMessageMediator partialMessageMediator = null; 151 152 protected SocketOrChannelConnectionImpl(ORB orb) 154 { 155 this.orb = orb; 156 wrapper = ORBUtilSystemException.get( orb, 157 CORBALogDomains.RPC_TRANSPORT ) ; 158 159 setWork(this); 160 responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this); 161 setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts()); 162 } 163 164 protected SocketOrChannelConnectionImpl(ORB orb, 166 boolean useSelectThreadToWait, 167 boolean useWorkerThread) 168 { 169 this(orb) ; 170 setUseSelectThreadToWait(useSelectThreadToWait); 171 setUseWorkerThreadForEvent(useWorkerThread); 172 } 173 174 public SocketOrChannelConnectionImpl(ORB orb, 176 CorbaContactInfo contactInfo, 177 boolean useSelectThreadToWait, 178 boolean useWorkerThread, 179 String socketType, 180 String hostname, 181 int port) 182 { 183 this(orb, useSelectThreadToWait, useWorkerThread); 184 185 this.contactInfo = contactInfo; 186 187 try { 188 socket = orb.getORBData().getSocketFactory() 189 .createSocket(socketType, 190 new InetSocketAddress (hostname, port)); 191 socketChannel = socket.getChannel(); 192 193 if (socketChannel != null) { 194 boolean isBlocking = !useSelectThreadToWait; 195 socketChannel.configureBlocking(isBlocking); 196 } else { 197 setUseSelectThreadToWait(false); 200 } 201 if (orb.transportDebugFlag) { 202 dprint(".initialize: connection created: " + socket); 203 } 204 } catch (Throwable t) { 205 throw wrapper.connectFailure(t, socketType, hostname, 206 Integer.toString(port)); 207 } 208 state = OPENING; 209 } 210 211 public SocketOrChannelConnectionImpl(ORB orb, 213 CorbaContactInfo contactInfo, 214 String socketType, 215 String hostname, 216 int port) 217 { 218 this(orb, contactInfo, 219 orb.getORBData().connectionSocketUseSelectThreadToWait(), 220 orb.getORBData().connectionSocketUseWorkerThreadForEvent(), 221 socketType, hostname, port); 222 } 223 224 public SocketOrChannelConnectionImpl(ORB orb, 226 Acceptor acceptor, 227 Socket socket, 228 boolean useSelectThreadToWait, 229 boolean useWorkerThread) 230 { 231 this(orb, useSelectThreadToWait, useWorkerThread); 232 233 this.socket = socket; 234 socketChannel = socket.getChannel(); 235 if (socketChannel != null) { 236 try { 238 boolean isBlocking = !useSelectThreadToWait; 239 socketChannel.configureBlocking(isBlocking); 240 } catch (IOException e) { 241 RuntimeException rte = new RuntimeException (); 242 rte.initCause(e); 243 throw rte; 244 } 245 } 246 this.acceptor = acceptor; 247 248 serverRequestMap = Collections.synchronizedMap(new HashMap ()); 249 isServer = true; 250 251 state = ESTABLISHED; 252 } 253 254 public SocketOrChannelConnectionImpl(ORB orb, 256 Acceptor acceptor, 257 Socket socket) 258 { 259 this(orb, acceptor, socket, 260 (socket.getChannel() == null 261 ? false 262 : orb.getORBData().connectionSocketUseSelectThreadToWait()), 263 (socket.getChannel() == null 264 ? false 265 : orb.getORBData().connectionSocketUseWorkerThreadForEvent())); 266 } 267 268 273 public boolean shouldRegisterReadEvent() 274 { 275 return true; 276 } 277 278 public boolean shouldRegisterServerReadEvent() 279 { 280 return true; 281 } 282 283 public boolean read() 284 { 285 try { 286 if (orb.transportDebugFlag) { 287 dprint(".read->: " + this); 288 } 289 CorbaMessageMediator messageMediator = readBits(); 290 if (messageMediator != null) { 291 return dispatch(messageMediator); 294 } 295 return true; 296 } finally { 297 if (orb.transportDebugFlag) { 298 dprint(".read<-: " + this); 299 } 300 } 301 } 302 303 protected CorbaMessageMediator readBits() 304 { 305 try { 306 307 if (orb.transportDebugFlag) { 308 dprint(".readBits->: " + this); 309 } 310 311 MessageMediator messageMediator; 312 if (contactInfo != null) { 314 messageMediator = 315 contactInfo.createMessageMediator(orb, this); 316 } else if (acceptor != null) { 317 messageMediator = acceptor.createMessageMediator(orb, this); 318 } else { 319 throw 320 new RuntimeException ("SocketOrChannelConnectionImpl.readBits"); 321 } 322 return (CorbaMessageMediator) messageMediator; 323 324 } catch (ThreadDeath td) { 325 if (orb.transportDebugFlag) { 326 dprint(".readBits: " + this + ": ThreadDeath: " + td, td); 327 } 328 try { 329 purgeCalls(wrapper.connectionAbort(td), false, false); 330 } catch (Throwable t) { 331 if (orb.transportDebugFlag) { 332 dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t); 333 } 334 } 335 throw td; 336 } catch (Throwable ex) { 337 if (orb.transportDebugFlag) { 338 dprint(".readBits: " + this + ": Throwable: " + ex, ex); 339 } 340 341 try { 342 if (ex instanceof INTERNAL ) { 343 sendMessageError(GIOPVersion.DEFAULT_VERSION); 344 } 345 } catch (IOException e) { 346 if (orb.transportDebugFlag) { 347 dprint(".readBits: " + this + 348 ": sendMessageError: IOException: " + e, e); 349 } 350 } 351 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 353 purgeCalls(wrapper.connectionAbort(ex), true, false); 355 } finally { 363 if (orb.transportDebugFlag) { 364 dprint(".readBits<-: " + this); 365 } 366 } 367 return null; 368 } 369 370 protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) 371 { 372 try { 373 374 if (orb.transportDebugFlag) { 375 dprint(".finishReadingBits->: " + this); 376 } 377 378 if (contactInfo != null) { 380 messageMediator = 381 contactInfo.finishCreatingMessageMediator(orb, this, messageMediator); 382 } else if (acceptor != null) { 383 messageMediator = 384 acceptor.finishCreatingMessageMediator(orb, this, messageMediator); 385 } else { 386 throw 387 new RuntimeException ("SocketOrChannelConnectionImpl.finishReadingBits"); 388 } 389 return (CorbaMessageMediator) messageMediator; 390 391 } catch (ThreadDeath td) { 392 if (orb.transportDebugFlag) { 393 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td); 394 } 395 try { 396 purgeCalls(wrapper.connectionAbort(td), false, false); 397 } catch (Throwable t) { 398 if (orb.transportDebugFlag) { 399 dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t); 400 } 401 } 402 throw td; 403 } catch (Throwable ex) { 404 if (orb.transportDebugFlag) { 405 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex); 406 } 407 408 try { 409 if (ex instanceof INTERNAL ) { 410 sendMessageError(GIOPVersion.DEFAULT_VERSION); 411 } 412 } catch (IOException e) { 413 if (orb.transportDebugFlag) { 414 dprint(".finishReadingBits: " + this + 415 ": sendMessageError: IOException: " + e, e); 416 } 417 } 418 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 420 purgeCalls(wrapper.connectionAbort(ex), true, false); 422 } finally { 430 if (orb.transportDebugFlag) { 431 dprint(".finishReadingBits<-: " + this); 432 } 433 } 434 return null; 435 } 436 437 protected boolean dispatch(CorbaMessageMediator messageMediator) 438 { 439 try { 440 if (orb.transportDebugFlag) { 441 dprint(".dispatch->: " + this); 442 } 443 444 451 boolean result = 452 messageMediator.getProtocolHandler() 453 .handleRequest(messageMediator); 454 455 return result; 456 457 } catch (ThreadDeath td) { 458 if (orb.transportDebugFlag) { 459 dprint(".dispatch: ThreadDeath", td ); 460 } 461 try { 462 purgeCalls(wrapper.connectionAbort(td), false, false); 463 } catch (Throwable t) { 464 if (orb.transportDebugFlag) { 465 dprint(".dispatch: purgeCalls: Throwable", t); 466 } 467 } 468 throw td; 469 } catch (Throwable ex) { 470 if (orb.transportDebugFlag) { 471 dprint(".dispatch: Throwable", ex ) ; 472 } 473 474 try { 475 if (ex instanceof INTERNAL ) { 476 sendMessageError(GIOPVersion.DEFAULT_VERSION); 477 } 478 } catch (IOException e) { 479 if (orb.transportDebugFlag) { 480 dprint(".dispatch: sendMessageError: IOException", e); 481 } 482 } 483 purgeCalls(wrapper.connectionAbort(ex), false, false); 484 } finally { 487 if (orb.transportDebugFlag) { 488 dprint(".dispatch<-: " + this); 489 } 490 } 491 492 return true; 493 } 494 495 public boolean shouldUseDirectByteBuffers() 496 { 497 return getSocketChannel() != null; 498 } 499 500 public ByteBuffer read(int size, int offset, int length, long max_wait_time) 501 throws IOException  502 { 503 if (shouldUseDirectByteBuffers()) { 504 505 ByteBuffer byteBuffer = 506 orb.getByteBufferPool().getByteBuffer(size); 507 508 if (orb.transportDebugFlag) { 509 int bbAddress = System.identityHashCode(byteBuffer); 511 StringBuffer sb = new StringBuffer (80); 512 sb.append(".read: got ByteBuffer id ("); 513 sb.append(bbAddress).append(") from ByteBufferPool."); 514 String msgStr = sb.toString(); 515 dprint(msgStr); 516 } 517 518 byteBuffer.position(offset); 519 byteBuffer.limit(size); 520 521 readFully(byteBuffer, length, max_wait_time); 522 523 return byteBuffer; 524 } 525 526 byte[] buf = new byte[size]; 527 readFully(getSocket().getInputStream(), buf, 528 offset, length, max_wait_time); 529 ByteBuffer byteBuffer = ByteBuffer.wrap(buf); 530 byteBuffer.limit(size); 531 return byteBuffer; 532 } 533 534 public ByteBuffer read(ByteBuffer byteBuffer, int offset, 535 int length, long max_wait_time) 536 throws IOException  537 { 538 int size = offset + length; 539 if (shouldUseDirectByteBuffers()) { 540 541 if (! byteBuffer.isDirect()) { 542 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 543 } 544 if (size > byteBuffer.capacity()) { 545 if (orb.transportDebugFlag) { 546 int bbAddress = System.identityHashCode(byteBuffer); 548 StringBuffer bbsb = new StringBuffer (80); 549 bbsb.append(".read: releasing ByteBuffer id (") 550 .append(bbAddress).append(") to ByteBufferPool."); 551 String bbmsg = bbsb.toString(); 552 dprint(bbmsg); 553 } 554 orb.getByteBufferPool().releaseByteBuffer(byteBuffer); 555 byteBuffer = orb.getByteBufferPool().getByteBuffer(size); 556 } 557 byteBuffer.position(offset); 558 byteBuffer.limit(size); 559 readFully(byteBuffer, length, max_wait_time); 560 byteBuffer.position(0); 561 byteBuffer.limit(size); 562 return byteBuffer; 563 } 564 if (byteBuffer.isDirect()) { 565 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 566 } 567 byte[] buf = new byte[size]; 568 readFully(getSocket().getInputStream(), buf, 569 offset, length, max_wait_time); 570 return ByteBuffer.wrap(buf); 571 } 572 573 public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time) 574 throws IOException  575 { 576 int n = 0; 577 int bytecount = 0; 578 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 579 long total_time_in_wait = 0; 580 581 593 598 do { 599 bytecount = getSocketChannel().read(byteBuffer); 600 601 if (bytecount < 0) { 602 throw new IOException ("End-of-stream"); 603 } 604 else if (bytecount == 0) { 605 try { 606 Thread.sleep(time_to_wait); 607 total_time_in_wait += time_to_wait; 608 time_to_wait = 609 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 610 } 611 catch (InterruptedException ie) { 612 if (orb.transportDebugFlag) { 614 dprint("readFully(): unexpected exception " 615 + ie.toString()); 616 } 617 } 618 } 619 else { 620 n += bytecount; 621 } 622 } 623 while (n < size && total_time_in_wait < max_wait_time); 624 625 if (n < size && total_time_in_wait >= max_wait_time) 626 { 627 throw wrapper.transportReadTimeoutExceeded(new Integer (size), 629 new Integer (n), new Long (max_wait_time), 630 new Long (total_time_in_wait)); 631 } 632 633 getConnectionCache().stampTime(this); 634 } 635 636 public void readFully(java.io.InputStream is, byte[] buf, 638 int offset, int size, long max_wait_time) 639 throws IOException  640 { 641 int n = 0; 642 int bytecount = 0; 643 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 644 long total_time_in_wait = 0; 645 646 658 663 do { 664 bytecount = is.read(buf, offset + n, size - n); 665 if (bytecount < 0) { 666 throw new IOException ("End-of-stream"); 667 } 668 else if (bytecount == 0) { 669 try { 670 Thread.sleep(time_to_wait); 671 total_time_in_wait += time_to_wait; 672 time_to_wait = 673 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 674 } 675 catch (InterruptedException ie) { 676 if (orb.transportDebugFlag) { 678 dprint("readFully(): unexpected exception " 679 + ie.toString()); 680 } 681 } 682 } 683 else { 684 n += bytecount; 685 } 686 } 687 while (n < size && total_time_in_wait < max_wait_time); 688 689 if (n < size && total_time_in_wait >= max_wait_time) 690 { 691 throw wrapper.transportReadTimeoutExceeded(new Integer (size), 693 new Integer (n), new Long (max_wait_time), 694 new Long (total_time_in_wait)); 695 } 696 697 getConnectionCache().stampTime(this); 698 } 699 700 public void write(ByteBuffer byteBuffer) 701 throws IOException  702 { 703 if (shouldUseDirectByteBuffers()) { 704 712 do { 715 getSocketChannel().write(byteBuffer); 716 } 717 while (byteBuffer.hasRemaining()); 718 719 } else { 720 if (! byteBuffer.hasArray()) { 721 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 722 } 723 byte[] tmpBuf = byteBuffer.array(); 724 getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit()); 725 getSocket().getOutputStream().flush(); 726 } 727 728 getConnectionCache().stampTime(this); 732 } 733 734 737 public synchronized void close() 738 { 739 try { 740 if (orb.transportDebugFlag) { 741 dprint(".close->: " + this); 742 } 743 writeLock(); 744 745 751 if (isBusy()) { writeUnlock(); 753 if (orb.transportDebugFlag) { 754 dprint(".close: isBusy so no close: " + this); 755 } 756 return; 757 } 758 759 try { 760 try { 761 sendCloseConnection(GIOPVersion.V1_0); 762 } catch (Throwable t) { 763 wrapper.exceptionWhenSendingCloseConnection(t); 764 } 765 766 synchronized ( stateEvent ){ 767 state = CLOSE_SENT; 768 stateEvent.notifyAll(); 769 } 770 771 775 purgeCalls(wrapper.connectionRebind(), false, true); 778 779 } catch (Exception ex) { 780 if (orb.transportDebugFlag) { 781 dprint(".close: exception: " + this, ex); 782 } 783 } 784 try { 785 Selector selector = orb.getTransportManager().getSelector(0); 786 selector.unregisterForEvent(this); 787 if (socketChannel != null) { 788 socketChannel.close(); 789 } 790 socket.close(); 791 } catch (IOException e) { 792 if (orb.transportDebugFlag) { 793 dprint(".close: " + this, e); 794 } 795 } 796 } finally { 797 if (orb.transportDebugFlag) { 798 dprint(".close<-: " + this); 799 } 800 } 801 } 802 803 public Acceptor getAcceptor() 804 { 805 return acceptor; 806 } 807 808 public ContactInfo getContactInfo() 809 { 810 return contactInfo; 811 } 812 813 public EventHandler getEventHandler() 814 { 815 return this; 816 } 817 818 public OutputObject createOutputObject(MessageMediator messageMediator) 819 { 820 throw new RuntimeException ("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called."); 822 } 823 824 public boolean isServer() 829 { 830 return isServer; 831 } 832 833 public boolean isBusy() 834 { 835 if (serverRequestCount > 0 || 836 getResponseWaitingRoom().numberRegistered() > 0) 837 { 838 return true; 839 } else { 840 return false; 841 } 842 } 843 844 public long getTimeStamp() 845 { 846 return timeStamp; 847 } 848 849 public void setTimeStamp(long time) 850 { 851 timeStamp = time; 852 } 853 854 public void setState(String stateString) 855 { 856 synchronized (stateEvent) { 857 if (stateString.equals("ESTABLISHED")) { 858 state = ESTABLISHED; 859 stateEvent.notifyAll(); 860 } else { 861 } 863 } 864 } 865 866 873 public void writeLock() 874 { 875 try { 876 if (dprintWriteLocks && orb.transportDebugFlag) { 877 dprint(".writeLock->: " + this); 878 } 879 while ( true ) { 881 int localState = state; 882 switch ( localState ) { 883 884 case OPENING: 885 synchronized (stateEvent) { 886 if (state != OPENING) { 887 break; 889 } 890 try { 891 stateEvent.wait(); 892 } catch (InterruptedException ie) { 893 if (orb.transportDebugFlag) { 894 dprint(".writeLock: OPENING InterruptedException: " + this); 895 } 896 } 897 } 898 break; 900 901 case ESTABLISHED: 902 synchronized (writeEvent) { 903 if (!writeLocked) { 904 writeLocked = true; 905 return; 906 } 907 908 try { 909 while (state == ESTABLISHED && writeLocked) { 912 writeEvent.wait(100); 913 } 914 } catch (InterruptedException ie) { 915 if (orb.transportDebugFlag) { 916 dprint(".writeLock: ESTABLISHED InterruptedException: " + this); 917 } 918 } 919 } 920 break; 922 923 case ABORT: 929 synchronized ( stateEvent ){ 930 if (state != ABORT) { 931 break; 932 } 933 throw wrapper.writeErrorSend() ; 934 } 935 936 case CLOSE_RECVD: 937 synchronized ( stateEvent ){ 940 if (state != CLOSE_RECVD) { 941 break; 942 } 943 throw wrapper.connectionCloseRebind() ; 944 } 945 946 default: 947 if (orb.transportDebugFlag) { 948 dprint(".writeLock: default: " + this); 949 } 950 throw new RuntimeException (".writeLock: bad state"); 952 } 953 } 954 } finally { 955 if (dprintWriteLocks && orb.transportDebugFlag) { 956 dprint(".writeLock<-: " + this); 957 } 958 } 959 } 960 961 public void writeUnlock() 962 { 963 try { 964 if (dprintWriteLocks && orb.transportDebugFlag) { 965 dprint(".writeUnlock->: " + this); 966 } 967 synchronized (writeEvent) { 968 writeLocked = false; 969 writeEvent.notify(); } 971 } finally { 972 if (dprintWriteLocks && orb.transportDebugFlag) { 973 dprint(".writeUnlock<-: " + this); 974 } 975 } 976 } 977 978 public void sendWithoutLock(OutputObject outputObject) 980 { 981 984 987 try { 988 989 991 CDROutputObject cdrOutputObject = (CDROutputObject) outputObject; 992 cdrOutputObject.writeTo(this); 993 996 } catch (IOException e1) { 997 998 1002 1003 1009 1025 1026 1031 1040 SystemException exc = wrapper.writeErrorSend(e1); 1043 purgeCalls(exc, false, true); 1044 throw exc; 1045 } 1046 } 1047 1048 public void registerWaiter(MessageMediator messageMediator) 1049 { 1050 responseWaitingRoom.registerWaiter(messageMediator); 1051 } 1052 1053 public void unregisterWaiter(MessageMediator messageMediator) 1054 { 1055 responseWaitingRoom.unregisterWaiter(messageMediator); 1056 } 1057 1058 public InputObject waitForResponse(MessageMediator messageMediator) 1059 { 1060 return responseWaitingRoom.waitForResponse(messageMediator); 1061 } 1062 1063 public void setConnectionCache(ConnectionCache connectionCache) 1064 { 1065 this.connectionCache = connectionCache; 1066 } 1067 1068 public ConnectionCache getConnectionCache() 1069 { 1070 return connectionCache; 1071 } 1072 1073 1078 public void setUseSelectThreadToWait(boolean x) 1079 { 1080 useSelectThreadToWait = x; 1081 setReadGiopHeaderOnly(shouldUseSelectThreadToWait()); 1085 } 1086 1087 public void handleEvent() 1088 { 1089 if (orb.transportDebugFlag) { 1090 dprint(".handleEvent->: " + this); 1091 } 1092 getSelectionKey().interestOps(getSelectionKey().interestOps() & 1093 (~ getInterestOps())); 1094 1095 if (shouldUseWorkerThreadForEvent()) { 1096 Throwable throwable = null; 1097 try { 1098 int poolToUse = 0; 1099 if (shouldReadGiopHeaderOnly()) { 1100 partialMessageMediator = readBits(); 1101 poolToUse = 1102 partialMessageMediator.getThreadPoolToUse(); 1103 } 1104 1105 if (orb.transportDebugFlag) { 1106 dprint(".handleEvent: addWork to pool: " + poolToUse); 1107 } 1108 orb.getThreadPoolManager().getThreadPool(poolToUse) 1109 .getWorkQueue(0).addWork(getWork()); 1110 } catch (NoSuchThreadPoolException e) { 1111 throwable = e; 1112 } catch (NoSuchWorkQueueException e) { 1113 throwable = e; 1114 } 1115 if (throwable != null) { 1117 if (orb.transportDebugFlag) { 1118 dprint(".handleEvent: " + throwable); 1119 } 1120 INTERNAL i = new INTERNAL ("NoSuchThreadPoolException"); 1121 i.initCause(throwable); 1122 throw i; 1123 } 1124 } else { 1125 if (orb.transportDebugFlag) { 1126 dprint(".handleEvent: doWork"); 1127 } 1128 getWork().doWork(); 1129 } 1130 if (orb.transportDebugFlag) { 1131 dprint(".handleEvent<-: " + this); 1132 } 1133 } 1134 1135 public SelectableChannel getChannel() 1136 { 1137 return socketChannel; 1138 } 1139 1140 public int getInterestOps() 1141 { 1142 return SelectionKey.OP_READ; 1143 } 1144 1145 1147 public Connection getConnection() 1148 { 1149 return this; 1150 } 1151 1152 1157 public String getName() 1158 { 1159 return this.toString(); 1160 } 1161 1162 public void doWork() 1163 { 1164 try { 1165 if (orb.transportDebugFlag) { 1166 dprint(".doWork->: " + this); 1167 } 1168 1169 1174 if (!shouldReadGiopHeaderOnly()) { 1175 read(); 1176 } 1177 else { 1178 CorbaMessageMediator messageMediator = 1181 this.getPartialMessageMediator(); 1182 1183 messageMediator = finishReadingBits(messageMediator); 1185 1186 if (messageMediator != null) { 1187 dispatch(messageMediator); 1190 } 1191 } 1192 } catch (Throwable t) { 1193 if (orb.transportDebugFlag) { 1194 dprint(".doWork: ignoring Throwable: " 1195 + t 1196 + " " + this); 1197 } 1198 } finally { 1199 if (orb.transportDebugFlag) { 1200 dprint(".doWork<-: " + this); 1201 } 1202 } 1203 } 1204 1205 public void setEnqueueTime(long timeInMillis) 1206 { 1207 enqueueTime = timeInMillis; 1208 } 1209 1210 public long getEnqueueTime() 1211 { 1212 return enqueueTime; 1213 } 1214 1215 1220 public boolean shouldReadGiopHeaderOnly() { 1222 return shouldReadGiopHeaderOnly; 1223 } 1224 1225 protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) { 1226 shouldReadGiopHeaderOnly = shouldReadHeaderOnly; 1227 } 1228 1229 public ResponseWaitingRoom getResponseWaitingRoom() 1230 { 1231 return responseWaitingRoom; 1232 } 1233 1234 1237 public void serverRequestMapPut(int requestId, 1238 CorbaMessageMediator messageMediator) 1239 { 1240 serverRequestMap.put(new Integer (requestId), messageMediator); 1241 } 1242 1243 public CorbaMessageMediator serverRequestMapGet(int requestId) 1244 { 1245 return (CorbaMessageMediator) 1246 serverRequestMap.get(new Integer (requestId)); 1247 } 1248 1249 public void serverRequestMapRemove(int requestId) 1250 { 1251 serverRequestMap.remove(new Integer (requestId)); 1252 } 1253 1254 1255 public java.net.Socket getSocket() 1258 { 1259 return socket; 1260 } 1261 1262 1270 public synchronized void serverRequestProcessingBegins() 1271 { 1272 serverRequestCount++; 1273 } 1274 1275 public synchronized void serverRequestProcessingEnds() 1276 { 1277 serverRequestCount--; 1278 } 1279 1280 1284 public synchronized int getNextRequestId() 1285 { 1286 return requestId++; 1287 } 1288 1289 protected CodeSetComponentInfo.CodeSetContext codeSetContext = null; 1291 1292 public ORB getBroker() 1293 { 1294 return orb; 1295 } 1296 1297 public CodeSetComponentInfo.CodeSetContext getCodeSetContext() { 1298 if (codeSetContext == null) { 1307 synchronized(this) { 1308 return codeSetContext; 1309 } 1310 } 1311 1312 return codeSetContext; 1313 } 1314 1315 public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) { 1316 if (codeSetContext == null) { 1318 1319 if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null || 1320 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) { 1321 throw wrapper.badCodesetsFromClient() ; 1325 } 1326 1327 codeSetContext = csc; 1328 } 1329 } 1330 1331 1335 1342 public MessageMediator clientRequestMapGet(int requestId) 1343 { 1344 return responseWaitingRoom.getMessageMediator(requestId); 1345 } 1346 1347 protected MessageMediator clientReply_1_1; 1348 1349 public void clientReply_1_1_Put(MessageMediator x) 1350 { 1351 clientReply_1_1 = x; 1352 } 1353 1354 public MessageMediator clientReply_1_1_Get() 1355 { 1356 return clientReply_1_1; 1357 } 1358 1359 public void clientReply_1_1_Remove() 1360 { 1361 clientReply_1_1 = null; 1362 } 1363 1364 protected MessageMediator serverRequest_1_1; 1365 1366 public void serverRequest_1_1_Put(MessageMediator x) 1367 { 1368 serverRequest_1_1 = x; 1369 } 1370 1371 public MessageMediator serverRequest_1_1_Get() 1372 { 1373 return serverRequest_1_1; 1374 } 1375 1376 public void serverRequest_1_1_Remove() 1377 { 1378 serverRequest_1_1 = null; 1379 } 1380 1381 protected String getStateString( int state ) 1382 { 1383 synchronized ( stateEvent ){ 1384 switch (state) { 1385 case OPENING : return "OPENING" ; 1386 case ESTABLISHED : return "ESTABLISHED" ; 1387 case CLOSE_SENT : return "CLOSE_SENT" ; 1388 case CLOSE_RECVD : return "CLOSE_RECVD" ; 1389 case ABORT : return "ABORT" ; 1390 default : return "???" ; 1391 } 1392 } 1393 } 1394 1395 public synchronized boolean isPostInitialContexts() { 1396 return postInitialContexts; 1397 } 1398 1399 public synchronized void setPostInitialContexts(){ 1401 postInitialContexts = true; 1402 } 1403 1404 1417 public void purgeCalls(SystemException systemException, 1418 boolean die, boolean lockHeld) 1419 { 1420 int minor_code = systemException.minor; 1421 1422 try{ 1423 if (orb.transportDebugFlag) { 1424 dprint(".purgeCalls->: " 1425 + minor_code + "/" + die + "/" + lockHeld 1426 + " " + this); 1427 } 1428 1429 1432 synchronized ( stateEvent ){ 1433 if ((state == ABORT) || (state == CLOSE_RECVD)) { 1434 if (orb.transportDebugFlag) { 1435 dprint(".purgeCalls: exiting since state is: " 1436 + getStateString(state) 1437 + " " + this); 1438 } 1439 return; 1440 } 1441 } 1442 1443 try { 1445 if (!lockHeld) { 1446 writeLock(); 1447 } 1448 } catch (SystemException ex) { 1449 if (orb.transportDebugFlag) 1450 dprint(".purgeCalls: SystemException" + ex 1451 + "; continuing " + this); 1452 } 1453 1454 org.omg.CORBA.CompletionStatus completion_status; 1457 synchronized ( stateEvent ){ 1458 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) { 1459 state = CLOSE_RECVD; 1460 systemException.completed = CompletionStatus.COMPLETED_NO; 1461 } else { 1462 state = ABORT; 1463 systemException.completed = CompletionStatus.COMPLETED_MAYBE; 1464 } 1465 stateEvent.notifyAll(); 1466 } 1467 1468 try { 1469 socket.getInputStream().close(); 1470 socket.getOutputStream().close(); 1471 socket.close(); 1472 } catch (Exception ex) { 1473 if (orb.transportDebugFlag) { 1474 dprint(".purgeCalls: Exception closing socket: " + ex 1475 + " " + this); 1476 } 1477 } 1478 1479 1482 responseWaitingRoom.signalExceptionToAllWaiters(systemException); 1483 1484 if (contactInfo != null) { 1485 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo); 1486 } else if (acceptor != null) { 1487 ((InboundConnectionCache)getConnectionCache()).remove(this); 1488 } 1489 1490 1494 1502 writeUnlock(); 1503 1504 } finally { 1505 if (orb.transportDebugFlag) { 1506 dprint(".purgeCalls<-: " 1507 + minor_code + "/" + die + "/" + lockHeld 1508 + " " + this); 1509 } 1510 } 1511 } 1512 1513 1517 1518 public void sendCloseConnection(GIOPVersion giopVersion) 1519 throws IOException 1520 { 1521 Message msg = MessageBase.createCloseConnection(giopVersion); 1522 sendHelper(giopVersion, msg); 1523 } 1524 1525 public void sendMessageError(GIOPVersion giopVersion) 1526 throws IOException 1527 { 1528 Message msg = MessageBase.createMessageError(giopVersion); 1529 sendHelper(giopVersion, msg); 1530 } 1531 1532 1537 public void sendCancelRequest(GIOPVersion giopVersion, int requestId) 1538 throws IOException 1539 { 1540 1541 Message msg = MessageBase.createCancelRequest(giopVersion, requestId); 1542 sendHelper(giopVersion, msg); 1543 } 1544 1545 protected void sendHelper(GIOPVersion giopVersion, Message msg) 1546 throws IOException  1547 { 1548 CDROutputObject outputObject = 1550 new CDROutputObject((ORB)orb, null, giopVersion, this, msg, 1551 ORBConstants.STREAM_FORMAT_VERSION_1); 1552 msg.write(outputObject); 1553 1554 outputObject.writeTo(this); 1555 } 1556 1557 public void sendCancelRequestWithLock(GIOPVersion giopVersion, 1558 int requestId) 1559 throws IOException 1560 { 1561 writeLock(); 1562 try { 1563 sendCancelRequest(giopVersion, requestId); 1564 } finally { 1565 writeUnlock(); 1566 } 1567 } 1568 1569 1579 1583 public final void setCodeBaseIOR(IOR ior) { 1584 codeBaseServerIOR = ior; 1585 } 1586 1587 public final IOR getCodeBaseIOR() { 1588 return codeBaseServerIOR; 1589 } 1590 1591 public final CodeBase getCodeBase() { 1594 return cachedCodeBase; 1595 } 1596 1597 1599 protected void setReadTimeouts(ReadTimeouts readTimeouts) { 1601 this.readTimeouts = readTimeouts; 1602 } 1603 1604 protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) { 1605 partialMessageMediator = messageMediator; 1606 } 1607 1608 protected CorbaMessageMediator getPartialMessageMediator() { 1609 return partialMessageMediator; 1610 } 1611 1612 public String toString() 1613 { 1614 synchronized ( stateEvent ){ 1615 return 1616 "SocketOrChannelConnectionImpl[" + " " 1617 + (socketChannel == null ? 1618 socket.toString() : socketChannel.toString()) + " " 1619 + getStateString( state ) + " " 1620 + shouldUseSelectThreadToWait() + " " 1621 + shouldUseWorkerThreadForEvent() + " " 1622 + shouldReadGiopHeaderOnly() 1623 + "]" ; 1624 } 1625 } 1626 1627 public void dprint(String msg) 1629 { 1630 ORBUtility.dprint("SocketOrChannelConnectionImpl", msg); 1631 } 1632 1633 protected void dprint(String msg, Throwable t) 1634 { 1635 dprint(msg); 1636 t.printStackTrace(System.out); 1637 } 1638} 1639 1640 | Popular Tags |