1 20 21 package org.jacorb.orb.giop; 22 23 import org.apache.avalon.framework.logger.Logger; 24 import org.apache.avalon.framework.configuration.*; 25 26 import java.io.*; 27 import java.util.*; 28 29 import org.omg.GIOP.*; 30 import org.omg.CORBA.NO_IMPLEMENT ; 31 import org.omg.CORBA.CompletionStatus ; 32 import org.omg.ETF.*; 33 34 import org.jacorb.orb.SystemExceptionHelper; 35 import org.jacorb.orb.BufferManager; 36 import org.jacorb.orb.iiop.*; 37 import org.jacorb.util.*; 38 39 40 48 49 public abstract class GIOPConnection 50 extends java.io.OutputStream 51 { 52 protected org.omg.ETF.Profile profile = null; 53 protected org.omg.ETF.Connection transport = null; 54 55 private RequestListener request_listener = null; 56 private ReplyListener reply_listener = null; 57 protected ConnectionListener connection_listener = null; 58 59 protected Object connect_sync = new Object (); 60 61 private boolean writer_active = false; 62 private Object write_sync = new Object (); 63 64 private org.jacorb.config.Configuration configuration; 65 private Logger logger; 66 67 70 private int TCS = CodeSet.getTCSDefault(); 71 private int TCSW = CodeSet.getTCSWDefault(); 72 73 private boolean tcs_negotiated = false; 74 75 private Hashtable fragments = null; 77 private BufferManager buf_mg = null; 78 79 private boolean dump_incoming = false; 80 private long timeout = 0; 81 82 private BufferHolder msg_header 83 = new BufferHolder (new byte[Messages.MSG_HEADER_SIZE]); 84 85 private BufferHolder inbuf = new BufferHolder(); 86 87 88 91 private static int cubby_count = 0; 93 private Object [] cubbyholes = null; 94 95 private int pending_messages = 0; 97 98 protected boolean discard_messages = false; 99 100 protected Object pendingUndecidedSync = new Object (); 104 105 protected boolean do_close = false; 107 108 protected StatisticsProvider statistics_provider = null; 109 110 public GIOPConnection( org.omg.ETF.Profile profile, 111 org.omg.ETF.Connection transport, 112 RequestListener request_listener, 113 ReplyListener reply_listener, 114 StatisticsProvider statistics_provider ) 115 { 116 this.profile = profile; 117 this.transport = transport; 118 this.request_listener = request_listener; 119 this.reply_listener = reply_listener; 120 this.statistics_provider = statistics_provider; 121 122 fragments = new Hashtable(); 123 buf_mg = BufferManager.getInstance(); 124 126 cubbyholes = new Object [cubby_count]; 127 } 128 129 public void configure(Configuration configuration) 130 throws ConfigurationException 131 { 132 this.configuration = (org.jacorb.config.Configuration)configuration; 133 logger = this.configuration.getNamedLogger("jacorb.giop.conn"); 134 dump_incoming = 135 configuration.getAttribute("jacorb.debug.dump_incoming_messages","off").equals("on"); 136 timeout = 137 configuration.getAttributeAsInteger("jacorb.connection.client.connect_timeout", 0); 138 139 } 140 141 142 public final void setCodeSets( int TCS, int TCSW ) 143 { 144 this.TCS = TCS; 145 this.TCSW = TCSW; 146 } 147 148 public final int getTCS() 149 { 150 return TCS; 151 } 152 153 public final int getTCSW() 154 { 155 return TCSW; 156 } 157 158 public final void markTCSNegotiated() 159 { 160 tcs_negotiated = true; 161 } 162 163 public final boolean isTCSNegotiated() 164 { 165 return tcs_negotiated; 166 } 167 168 172 protected final synchronized RequestListener getRequestListener() 173 { 174 return request_listener; 175 } 176 177 181 public final synchronized void setRequestListener( RequestListener v ) 182 { 183 this.request_listener = v; 184 } 185 186 190 private final synchronized ReplyListener getReplyListener() 191 { 192 return reply_listener; 193 } 194 195 199 public final synchronized void setReplyListener( ReplyListener v ) 200 { 201 this.reply_listener = v; 202 } 203 204 public final void setConnectionListener( ConnectionListener connection_listener ) 205 { 206 this.connection_listener = connection_listener; 207 } 208 209 public final org.omg.ETF.Connection getTransport() 210 { 211 synchronized (connect_sync) 212 { 213 return transport; 214 } 215 } 216 217 private boolean waitUntilConnected() 218 { 219 synchronized (connect_sync) 220 { 221 while (!transport.is_connected() && 222 !do_close) 223 { 224 try 225 { 226 connect_sync.wait(); 227 } 228 catch( InterruptedException ie ) 229 { 230 } 231 } 232 return !do_close; 233 } 234 } 235 236 237 protected abstract void readTimedOut(); 238 protected abstract void streamClosed(); 239 240 241 252 private byte[] getMessage() 253 throws IOException 254 { 255 if( ! waitUntilConnected() ) 260 { 261 return null; 262 } 263 264 try 265 { 266 transport.read (msg_header, 0, 267 Messages.MSG_HEADER_SIZE, 268 Messages.MSG_HEADER_SIZE, 269 0); 270 } 271 catch (org.omg.CORBA.TRANSIENT ex) 272 { 273 return null; 274 } 275 catch (org.omg.CORBA.COMM_FAILURE ex) 276 { 277 this.streamClosed(); 278 return null; 279 } 280 catch (org.omg.CORBA.TIMEOUT ex) 281 { 282 this.readTimedOut(); 283 return null; 284 } 285 286 byte[] header = msg_header.value; 287 288 291 if( (char) header[0] == 'G' && (char) header[1] == 'I' && 292 (char) header[2] == 'O' && (char) header[3] == 'P') 293 { 294 int msg_size = Messages.getMsgSize( header ); 296 297 if( msg_size < 0 ) 298 { 299 if (logger.isErrorEnabled()) 300 { 301 logger.error( "Negative GIOP message size: " + msg_size ); 302 } 303 304 if (logger.isDebugEnabled()) 305 { 306 logger.debug("TCP_IP_GIOPTransport.getMessage() with header: \n" + 307 header + "\nsize : " + Messages.MSG_HEADER_SIZE ); 308 } 309 310 return null; 311 } 312 313 inbuf.value = buf_mg.getBuffer( msg_size + 315 Messages.MSG_HEADER_SIZE ); 316 317 System.arraycopy( header, 0, inbuf.value, 0, Messages.MSG_HEADER_SIZE ); 319 320 try 321 { 322 transport.read (inbuf, Messages.MSG_HEADER_SIZE, 323 msg_size, msg_size, 0); 324 } 325 catch (org.omg.CORBA.COMM_FAILURE ex) 326 { 327 if (logger.isErrorEnabled()) 328 { 329 logger.error( "Failed to read GIOP message" ); 330 } 331 return null; 332 } 333 334 if( dump_incoming ) 335 { 336 if (logger.isInfoEnabled()) 337 { 338 logger.info("BufferDump:\n" + 339 ObjectUtil.bufToString( inbuf.value, 340 0, 341 msg_size + Messages.MSG_HEADER_SIZE )); 342 } 343 } 344 345 if( statistics_provider != null ) 346 { 347 statistics_provider.messageReceived( msg_size + 348 Messages.MSG_HEADER_SIZE ); 349 } 350 351 return inbuf.value; 353 } 354 else 355 { 356 if (logger.isErrorEnabled()) 357 { 358 logger.error( "Failed to read GIOP message, incorrect magic number" ); 359 } 360 361 if (logger.isDebugEnabled()) 362 { 363 logger.debug("GIOPConnection.getMessage()" + msg_header.value ); 364 } 365 366 return null; 367 } 368 } 369 370 public final void receiveMessages() 371 throws IOException 372 { 373 while( true ) 374 { 375 byte[] message = getMessage(); 376 377 if( message == null ) 378 { 379 if( do_close ) 380 { 381 return; 382 } 383 else 384 { 385 continue; 386 } 387 } 388 389 synchronized( pendingUndecidedSync ) 390 { 391 if( discard_messages ) 392 { 393 buf_mg.returnBuffer( message ); 394 continue; 395 } 396 397 if( Messages.getGIOPMajor( message ) != 1 ) 399 { 400 if (logger.isErrorEnabled()) 401 { 402 logger.error( "Invalid GIOP major version encountered: " + 403 Messages.getGIOPMajor( message ) ); 404 } 405 406 buf_mg.returnBuffer( message ); 407 continue; 408 } 409 410 int msg_type = Messages.getMsgType( message ); 411 412 if( msg_type == MsgType_1_1._Fragment ) 413 { 414 if( Messages.getGIOPMinor( message ) == 0 ) 416 { 417 if (logger.isWarnEnabled()) 418 { 419 logger.warn( "Received a GIOP 1.0 message of type Fragment" ); 420 } 421 422 MessageOutputStream out = 423 new MessageOutputStream(); 424 out.writeGIOPMsgHeader( MsgType_1_1._MessageError, 425 0 ); 426 out.insertMsgSize(); 427 sendMessage( out ); 428 buf_mg.returnBuffer( message ); 429 430 continue; 431 } 432 433 if( Messages.getGIOPMinor( message ) == 1 ) 435 { 436 if (logger.isWarnEnabled()) 437 { 438 logger.warn( "Received a GIOP 1.1 Fragment message" ); 439 } 440 441 buf_mg.returnBuffer( message ); 445 446 continue; 447 } 448 449 451 Integer request_id = 452 new Integer ( Messages.getRequestId( message )); 453 454 if( ! fragments.containsKey( request_id )) 456 { 457 if (logger.isErrorEnabled()) 458 { 459 logger.error( "No previous Fragment to this one" ); 460 } 461 462 buf_mg.returnBuffer( message ); 464 465 continue; 466 } 467 468 ByteArrayOutputStream b_out = (ByteArrayOutputStream) 469 fragments.get( request_id ); 470 471 b_out.write( message, 475 Messages.MSG_HEADER_SIZE + 4 , 476 Messages.getMsgSize(message) - 4 ); 477 478 if( Messages.moreFragmentsFollow( message )) 479 { 480 buf_mg.returnBuffer( message ); 482 continue; 483 } 484 else 485 { 486 buf_mg.returnBuffer( message ); 487 488 message = b_out.toByteArray(); 490 msg_type = Messages.getMsgType( message ); 491 492 fragments.remove( request_id ); 493 } 494 } 495 else if( Messages.moreFragmentsFollow( message ) ) 496 { 497 if( Messages.getGIOPMinor( message ) == 0 ) 499 { 500 if (logger.isWarnEnabled()) 501 { 502 logger.warn( "Received a GIOP 1.0 message with the \"more fragments follow\" bits set" ); 503 } 504 505 MessageOutputStream out = 506 new MessageOutputStream(); 507 out.writeGIOPMsgHeader( MsgType_1_1._MessageError, 508 0 ); 509 out.insertMsgSize(); 510 sendMessage( out ); 511 buf_mg.returnBuffer( message ); 512 513 continue; 514 } 515 516 if( Messages.getGIOPMinor( message ) == 1 ) 518 { 519 if( msg_type != MsgType_1_1._Request && 520 msg_type != MsgType_1_1._Reply ) 521 { 522 if (logger.isWarnEnabled()) 523 { 524 logger.warn( "Received a GIOP 1.1 message of type " + 525 msg_type + " with the \"more fragments follow\" bits set" ); 526 } 527 528 MessageOutputStream out = 529 new MessageOutputStream(); 530 out.writeGIOPMsgHeader( MsgType_1_1._MessageError, 531 1 ); 532 out.insertMsgSize(); 533 sendMessage( out ); 534 buf_mg.returnBuffer( message ); 535 536 continue; 537 } 538 else { 540 if (logger.isWarnEnabled()) 541 { 542 logger.warn( "Received a fragmented GIOP 1.1 message" ); 543 } 544 545 int giop_minor = Messages.getGIOPMinor( message ); 546 547 ReplyOutputStream out = 548 new ReplyOutputStream( Messages.getRequestId( message ), 549 ReplyStatusType_1_2.SYSTEM_EXCEPTION, 550 giop_minor, 551 false, 552 logger); 554 SystemExceptionHelper.write( out, 555 new NO_IMPLEMENT ( 0, CompletionStatus.COMPLETED_NO )); 556 557 sendMessage( out ); 558 buf_mg.returnBuffer( message ); 559 560 continue; 561 } 562 } 563 564 if( msg_type == MsgType_1_1._CancelRequest || 566 msg_type == MsgType_1_1._CloseConnection || 567 msg_type == MsgType_1_1._CancelRequest ) 568 { 569 if (logger.isWarnEnabled()) 570 { 571 logger.warn( "Received a GIOP message of type " + msg_type + 572 " with the \"more fragments follow\" bits set, but this " + 573 "message type isn't allowed to be fragmented" ); 574 } 575 576 MessageOutputStream out = 577 new MessageOutputStream(); 578 out.writeGIOPMsgHeader( MsgType_1_1._MessageError, 579 1 ); 580 out.insertMsgSize(); 581 sendMessage( out ); 582 buf_mg.returnBuffer( message ); 583 584 continue; 585 } 586 587 Integer request_id = 589 new Integer ( Messages.getRequestId( message )); 590 591 if( fragments.containsKey( request_id )) 593 { 594 if (logger.isErrorEnabled()) 595 { 596 logger.error( "Received a message of type " + 597 msg_type + 598 " with the more fragments follow bit set, but there is already an fragmented, incomplete message with the same request id " + 599 request_id + "!" ); 600 } 601 602 buf_mg.returnBuffer( message ); 604 605 continue; 606 } 607 608 ByteArrayOutputStream b_out = new ByteArrayOutputStream(); 610 fragments.put( request_id, b_out ); 611 612 b_out.write( message, 614 0, 615 Messages.MSG_HEADER_SIZE + 616 Messages.getMsgSize(message) ); 617 618 619 buf_mg.returnBuffer( message ); 620 621 continue; 623 } 624 625 switch( msg_type ) 626 { 627 case MsgType_1_1._Request: 628 { 629 getRequestListener().requestReceived( message, this ); 630 631 break; 632 } 633 case MsgType_1_1._Reply: 634 { 635 getReplyListener().replyReceived( message, this ); 636 637 break; 638 } 639 case MsgType_1_1._CancelRequest: 640 { 641 getRequestListener().cancelRequestReceived( message, this ); 642 643 break; 644 } 645 case MsgType_1_1._LocateRequest: 646 { 647 getRequestListener().locateRequestReceived( message, this ); 648 649 break; 650 } 651 case MsgType_1_1._LocateReply: 652 { 653 getReplyListener().locateReplyReceived( message, this ); 654 655 break; 656 } 657 case MsgType_1_1._CloseConnection: 658 { 659 getReplyListener().closeConnectionReceived( message, this ); 660 661 break; 662 } 663 case MsgType_1_1._MessageError: 664 { 665 break; 666 } 667 case MsgType_1_1._Fragment: 668 { 669 break; 671 } 672 default: 673 { 674 if (logger.isErrorEnabled()) 675 { 676 logger.error("received message with unknown message type " + msg_type); 677 } 678 buf_mg.returnBuffer( message ); 679 } 680 } 681 } } 683 } 684 685 protected final void getWriteLock() 686 { 687 synchronized( write_sync ) 688 { 689 while( writer_active ) 690 { 691 try 692 { 693 write_sync.wait(); 694 } 695 catch( InterruptedException e ) 696 { 697 } 698 } 699 700 writer_active = true; 701 } 702 } 703 704 protected final void releaseWriteLock() 705 { 706 synchronized( write_sync ) 707 { 708 writer_active = false; 709 710 write_sync.notifyAll(); 711 } 712 } 713 714 public final synchronized void incPendingMessages() 715 { 716 ++pending_messages; 717 } 718 719 public final synchronized void decPendingMessages() 720 { 721 --pending_messages; 722 } 723 724 public final synchronized boolean hasPendingMessages() 725 { 726 return pending_messages != 0; 727 } 728 729 732 733 public final void write( byte[] fragment, int start, int size ) 734 { 735 if (!transport.is_connected()) 736 { 737 synchronized (connect_sync) 738 { 739 transport.connect (profile, timeout); 740 connect_sync.notifyAll(); 741 } 742 } 743 744 transport.write( false, false, fragment, start, size, 0 ); 745 746 if (getStatisticsProvider() != null) 747 { 748 getStatisticsProvider().messageChunkSent (size); 749 } 750 } 751 752 753 754 public final void write(int i) 755 throws java.io.IOException 756 { 757 throw new org.omg.CORBA.NO_IMPLEMENT (); 758 } 759 760 public final void write(byte[] b) throws java.io.IOException 761 { 762 throw new org.omg.CORBA.NO_IMPLEMENT (); 763 } 764 765 766 public final void flush() throws java.io.IOException 767 { 768 throw new org.omg.CORBA.NO_IMPLEMENT (); 769 } 770 771 public final void sendRequest( MessageOutputStream out, 772 boolean expect_reply ) 773 throws IOException 774 { 775 if( expect_reply ) 776 { 777 incPendingMessages(); 778 } 779 780 sendMessage( out ); 781 } 782 783 public final void sendReply( MessageOutputStream out ) 784 throws IOException 785 { 786 decPendingMessages(); 787 788 sendMessage( out ); 789 } 790 791 private final void sendMessage( MessageOutputStream out ) 792 throws IOException 793 { 794 try 795 { 796 getWriteLock(); 797 out.write_to( this ); 798 799 transport.flush(); 800 801 if (getStatisticsProvider() != null) 802 { 803 getStatisticsProvider().flushed(); 804 } 805 } 806 finally 807 { 808 releaseWriteLock(); 809 } 810 } 811 812 public final boolean isSSL() 813 { 814 if (transport instanceof IIOPConnection) 815 return ((IIOPConnection)transport).isSSL(); 816 else 817 return false; 818 } 819 820 public void close() 821 { 822 synchronized (connect_sync) 823 { 824 if( connection_listener != null ) 825 { 826 connection_listener.connectionClosed(); 827 } 828 829 transport.close(); 830 do_close = true; 831 connect_sync.notifyAll(); 832 } 833 834 if (logger.isDebugEnabled()) 835 { 836 logger.debug("GIOPConnection closed (terminated)." ); 837 } 838 } 839 840 841 844 public final StatisticsProvider getStatisticsProvider() 845 { 846 return statistics_provider; 847 } 848 849 850 898 899 901 public static int allocate_cubby_id() 902 { 903 return cubby_count++; 904 } 905 906 public Object get_cubby(int id) 907 { 908 if (id < 0 || id >= cubby_count) 909 { 910 if (logger.isErrorEnabled()) 911 logger.error( "Get bad cubby id "+id+" (max="+cubby_count+")"); 912 return null; 913 } 914 return cubbyholes[id]; 915 } 916 917 public void set_cubby(int id, Object obj) 918 { 919 if (id < 0 || id >= cubby_count) 920 { 921 if (logger.isErrorEnabled()) 922 logger.error( "Set bad cubby id "+id+" (max="+cubby_count+")"); 923 return; 924 } 925 cubbyholes[id] = obj; 926 } 927 928 } | Popular Tags |