1 20 21 package org.jacorb.imr; 22 23 import org.apache.avalon.framework.logger.Logger; 24 import org.apache.avalon.framework.configuration.*; 25 26 import org.omg.GIOP.*; 27 28 import org.jacorb.imr.RegistrationPackage.*; 29 import org.jacorb.imr.AdminPackage.*; 30 31 import org.jacorb.orb.*; 32 import org.jacorb.orb.giop.*; 33 import org.jacorb.orb.iiop.*; 34 35 import org.jacorb.poa.util.POAUtil; 36 37 import org.jacorb.util.ObjectUtil; 38 39 import org.omg.PortableServer.*; 40 41 import java.io.*; 42 import java.net.ServerSocket ; 43 import java.net.Socket ; 44 import java.net.InetAddress ; 45 import java.lang.reflect.Method ; 46 47 57 58 public class ImplementationRepositoryImpl 59 extends ImplementationRepositoryPOA 60 { 61 64 private org.omg.CORBA.ORB orb; 65 66 private org.jacorb.config.Configuration configuration = null; 67 68 69 private Logger logger = null; 70 71 private String iorFile = null; 72 73 private File table_file; 74 private ServerTable server_table; 75 private File table_file_backup; 76 private SocketListener listener; 77 private Thread listenerThread; 78 79 private int object_activation_retries = 5; 80 private int object_activation_sleep = 50; 81 82 private boolean allow_auto_register = false; 83 private boolean check_object_liveness = false; 84 85 private int connection_timeout = 2000; 86 private long poaActivationTimeout = 120000; 88 private WriteThread wt; 89 private boolean updatePending; 90 private Shutdown shutdownThread; 91 92 93 103 public ImplementationRepositoryImpl(org.omg.CORBA.ORB orb) 104 { 105 this.orb = orb; 106 107 shutdownThread = new Shutdown (); 108 shutdownThread.setDaemon (true); 109 shutdownThread.setName ("Shutdown Thread"); 110 addShutdownHook (shutdownThread); 111 } 112 113 public void configure(Configuration myConfiguration) 114 throws ConfigurationException 115 { 116 configuration = (org.jacorb.config.Configuration)myConfiguration; 117 118 logger = configuration.getNamedLogger("jacorb.imr"); 119 120 String defaultTableFile = "table.dat"; 121 String tableFileStr = configuration.getAttribute("jacorb.imr.table_file", 122 defaultTableFile); 123 124 if (tableFileStr == defaultTableFile) 128 { 129 if (this.logger.isWarnEnabled()) 130 { 131 this.logger.warn("No file for the server table specified! Please configure the property jacorb.imr.table_file!"); 132 this.logger.warn("Will create \"table.dat\" in current directory, if necessary"); 133 } 134 } 135 136 table_file = new File(tableFileStr); 137 boolean _new_table = false; 138 139 if (! table_file.exists ()) 141 { 142 _new_table = true; 143 if (this.logger.isInfoEnabled()) 144 { 145 this.logger.info("Table file " + tableFileStr + 146 " does not exist - autocreating it."); 147 } 148 149 try 150 { 151 table_file.createNewFile (); 152 } 153 catch (IOException ex) 154 { 155 throw new ConfigurationException("Failed to create table file", ex); 156 } 157 } 158 else 159 { 160 if (table_file.isDirectory ()) 161 { 162 throw new ConfigurationException("The table file is a directory! Please check " + table_file.getAbsolutePath()); 163 } 164 165 if (! table_file.canRead()) 166 { 167 throw new ConfigurationException("The table file is not readable! Please check " + table_file.getAbsolutePath()); 168 } 169 170 if (! table_file.canWrite()) 171 { 172 throw new ConfigurationException("The table file is not writable! Please check " + table_file.getAbsolutePath()); 173 } 174 } 175 176 try 177 { 178 if (_new_table) 179 { 180 this.server_table = new ServerTable(); 181 save_server_table(table_file); 182 } 183 else 184 { 185 try 186 { 187 ObjectInputStream _in = 188 new ObjectInputStream(new FileInputStream(table_file)); 189 server_table = (ServerTable)_in.readObject(); 190 _in.close(); 191 } 192 catch (Exception ex) 193 { 194 this.logger.debug("Failed to read ServerTable", ex); 195 196 server_table = new ServerTable(); 197 save_server_table(table_file); 198 } 199 } 200 } 201 catch (FileOpFailed ex) 202 { 203 this.logger.debug("Failed to read ServerTable", ex); 204 } 205 206 207 this.iorFile = configuration.getAttribute("jacorb.imr.ior_file"); 209 210 String _backup_file_str = 211 configuration.getAttribute("jacorb.imr.backup_file", ""); 212 213 if (_backup_file_str.length() == 0) 215 { 216 this.logger.warn("No backup file specified!. No backup file will be created"); 217 } 218 219 if (_backup_file_str.length() > 0) 220 { 221 table_file_backup = new File(_backup_file_str); 222 223 if ( ! table_file_backup.exists ()) 225 { 226 _new_table = true; 227 228 if (this.logger.isInfoEnabled()) 229 { 230 this.logger.info("Backup file " + _backup_file_str + 231 " does not exist - autocreating it."); 232 } 233 234 try 235 { 236 table_file_backup.createNewFile(); 237 } 238 catch (IOException ex) 239 { 240 throw new ConfigurationException("Failed to create backup file", 241 ex); 242 } 243 } 244 else 245 { 246 if (table_file_backup.isDirectory ()) 247 { 248 throw new ConfigurationException("The backup file is a directory! Please check " + table_file_backup.getAbsolutePath()); 249 } 250 251 if (! table_file_backup.canRead()) 252 { 253 throw new ConfigurationException("The backup file is not readable! Please check " + table_file_backup.getAbsolutePath()); 254 } 255 256 if (! table_file_backup.canWrite()) 257 { 258 throw new ConfigurationException("The backup file is not writable! Please check " + table_file_backup.getAbsolutePath()); 259 } 260 } 261 } 262 263 this.object_activation_retries = 264 configuration.getAttributeAsInteger("jacorb.imr.object_activation_retries", 265 5); 266 267 this.object_activation_sleep = 268 configuration.getAttributeAsInteger("jacorb.imr.object_activation_sleep", 269 50); 270 271 this.allow_auto_register = 272 configuration.getAttributeAsBoolean("jacorb.imr.allow_auto_register", 273 false); 274 this.check_object_liveness = 275 configuration.getAttributeAsBoolean("jacorb.imr.check_object_liveness", 276 false); 277 278 this.connection_timeout = 279 configuration.getAttributeAsInteger("jacorb.imr.connection_timeout", 280 2000 ); 281 282 this.poaActivationTimeout = 283 configuration.getAttributeAsInteger( "jacorb.imr.timeout", 120000); 284 285 this.listener = new SocketListener(); 286 this.listener.configure(configuration); 287 288 this.listenerThread = new Thread (listener); 289 this.listenerThread.setPriority(Thread.MAX_PRIORITY); 290 this.listenerThread.start(); 291 292 this.wt = new WriteThread (); 293 this.wt.setName ("IMR Write Thread"); 294 this.wt.setDaemon (true); 295 this.wt.start (); 296 } 297 298 public String getIORFile() 299 { 300 return this.iorFile; 301 } 302 303 305 313 public void set_server_down(String server) 314 throws UnknownServerName 315 { 316 if (this.logger.isDebugEnabled()) 317 { 318 this.logger.debug("ImR: server " + server + " is going down... "); 319 } 320 321 ImRServerInfo _server = server_table.getServer(server); 322 _server.setDown(); 323 } 324 325 326 346 public void register_poa(String name, String server, String host, int port) 347 throws IllegalPOAName, DuplicatePOAName, UnknownServerName 348 { 349 ImRServerInfo _server = null; 350 ImRPOAInfo _poa = null; 351 boolean remap = false; 352 353 updatePending = true; 354 355 if (this.logger.isDebugEnabled()) 356 { 357 this.logger.debug("ImR: registering poa " + name + " for server: " + 358 server + " on " + host); 359 } 360 361 if( allow_auto_register && 362 ! server_table.hasServer( server )) 363 { 364 try 365 { 366 register_server( server, "", "" ); 367 } 368 catch( IllegalServerName isn ) 369 { 370 } 372 catch( DuplicateServerName dsn ) 373 { 374 } 376 } 377 378 _server = server_table.getServer(server); 379 _poa = server_table.getPOA(name); 380 381 if (_poa == null) 382 { 383 _poa = new ImRPOAInfo(name, host, port, _server, 385 poaActivationTimeout); 386 _server.addPOA(_poa); 387 server_table.putPOA(name, _poa); 388 389 this.logger.debug("ImR: new poa registered"); 390 } 391 else 392 { 393 395 if ((_poa.active) || (! server.equals(_poa.server.name))) 399 { 400 byte[] first = _poa.name.getBytes (); 401 byte[] id = new byte [ first.length + 1]; 402 System.arraycopy (first, 0, id, 0, first.length); 403 id[first.length] = org.jacorb.poa.POAConstants.OBJECT_KEY_SEP_BYTE; 404 405 if (_poa.host.equals (host) && _poa.port == port) 409 { 410 remap = true; 411 } 412 else 413 { 414 remap = ! (checkServerActive (_poa.host, _poa.port, id)); 416 } 417 418 if (remap == false) 419 { 420 throw new DuplicatePOAName 421 ( 422 "POA " + name + 423 " has already been registered " + 424 "for server " + _poa.server.name 425 ); 426 } 427 else 428 { 429 this.logger.debug("ImR: Remapping server/port"); 430 } 431 } 432 433 _poa.reactivate(host, port); 434 this.logger.debug("ImR: register_poa, reactivated"); 435 } 436 try 437 { 438 synchronized (wt) 439 { 440 wt.notify (); 441 } 442 } 443 catch (IllegalMonitorStateException e) 444 { 445 this.logger.debug("Caught Exception", e); 446 } 447 } 448 449 450 459 public void register_host(HostInfo host) 460 throws IllegalHostName, InvalidSSDRef 461 { 462 463 if (host.name == null || host.name.length() == 0) 464 throw new IllegalHostName(host.name); 465 466 try 467 { 468 host.ssd_ref.get_system_load(); 469 } 470 catch (Exception e) 471 { 472 this.logger.debug("Caught Exception", e); 473 throw new InvalidSSDRef(); 474 } 475 updatePending = true; 476 477 server_table.putHost(host.name, new ImRHostInfo(host)); 478 479 try 480 { 481 synchronized (wt) 482 { 483 wt.notify (); 484 } 485 } 486 catch (IllegalMonitorStateException e) 487 { 488 this.logger.debug("Caught Exception", e); 489 } 490 } 491 492 493 497 498 public ImRInfo get_imr_info() 499 { 500 return new ImRInfo(listener.getAddress(), listener.getPort()); 501 } 502 503 504 506 513 public HostInfo[] list_hosts() 514 { 515 return server_table.getHosts(); 516 } 517 518 524 public ServerInfo[] list_servers() 525 { 526 ServerInfo [] servers; 527 528 if (check_object_liveness) 529 { 530 this.logger.debug("ImR: Checking servers"); 531 532 servers = server_table.getServers(); 533 534 for (int k=0; k<servers.length; k++) 535 { 536 if (servers[k].active && servers[k].poas.length > 0) 537 { 538 byte[] first = servers[k].poas[0].name.getBytes (); 539 byte[] id = new byte [ first.length + 1]; 540 System.arraycopy (first, 0, id, 0, first.length); 541 id[first.length] = org.jacorb.poa.POAConstants.OBJECT_KEY_SEP_BYTE; 542 543 if ( ! checkServerActive 544 (servers[k].poas[0].host, servers[k].poas[0].port, id)) 545 { 546 try 547 { 548 if (this.logger.isDebugEnabled()) 549 { 550 this.logger.debug("ImR: Setting server " + 551 servers[k].name + " down"); 552 } 553 554 server_table.getServer(servers[k].name).setDown(); 556 557 servers[k].active = false; 559 } 560 catch (UnknownServerName e) 561 { 562 if (this.logger.isDebugEnabled()) 563 { 564 this.logger.debug("ImR: Internal error - unknown server " + servers[k].name, e); 565 } 566 } 567 } 568 } 569 } 570 } 571 else 572 { 573 servers = server_table.getServers(); 574 } 575 576 return servers; 577 } 578 579 586 public ServerInfo get_server_info(String server) 587 throws UnknownServerName 588 { 589 return server_table.getServer(server).toServerInfo(); 590 } 591 592 606 public void register_server(String name, String command, String host) 607 throws IllegalServerName, DuplicateServerName 608 { 609 updatePending = true; 610 611 ImRServerInfo _server = new ImRServerInfo(name, host, command); 612 server_table.putServer(name, _server); 613 614 if (this.logger.isDebugEnabled()) 615 { 616 this.logger.debug("ImR: server " + name + " on " + 617 host + " registered"); 618 } 619 620 try 621 { 622 synchronized (wt) 623 { 624 wt.notify (); 625 } 626 } 627 catch (IllegalMonitorStateException e) 628 { 629 this.logger.debug("Caught Exception", e); 630 } 631 } 632 633 634 641 public void unregister_server(String name) throws UnknownServerName 642 { 643 updatePending = true; 644 645 ImRServerInfo _server = server_table.getServer(name); 646 String [] _poas = _server.getPOANames(); 647 648 for (int _i = 0; _i < _poas.length; _i++) 650 server_table.removePOA(_poas[_i]); 651 652 server_table.removeServer(name); 653 654 if (this.logger.isDebugEnabled()) 655 { 656 this.logger.debug("ImR: server " + name + " unregistered"); 657 } 658 659 try 660 { 661 synchronized (wt) 662 { 663 wt.notify (); 664 } 665 } 666 catch (IllegalMonitorStateException e) 667 { 668 this.logger.debug("Caught Exception", e); 669 } 670 } 671 672 681 public void edit_server(String name, String command, String host) 682 throws UnknownServerName 683 { 684 updatePending = true; 685 686 ImRServerInfo _server = server_table.getServer(name); 687 688 _server.command = command; 689 _server.host = host; 690 691 if (this.logger.isDebugEnabled()) 692 { 693 this.logger.debug("ImR: server " + name + " edited"); 694 } 695 696 try 697 { 698 synchronized (wt) 699 { 700 wt.notify (); 701 } 702 } 703 catch (IllegalMonitorStateException e) 704 { 705 this.logger.debug("Caught Exception", e); 706 } 707 } 708 709 710 719 public void hold_server(String name) 720 throws UnknownServerName 721 { 722 ImRServerInfo _server = server_table.getServer(name); 723 _server.holding = true; 724 } 725 726 732 public void release_server(String name) 733 throws UnknownServerName 734 { 735 ImRServerInfo _server = server_table.getServer(name); 736 _server.release(); 737 } 738 739 746 public void start_server(String name) 747 throws UnknownServerName, ServerStartupFailed 748 { 749 restartServer(server_table.getServer(name)); 750 } 751 752 756 public void save_server_table() 757 throws FileOpFailed 758 { 759 if (table_file_backup != null) 760 { 761 save_server_table(table_file_backup); 762 } 763 } 764 765 774 public void shutdown(boolean wait) 775 { 776 synchronized (wt) 777 { 778 wt.shutdown (); 779 wt.notify (); 780 } 781 if (listener != null) 782 { 783 listener.stopListening (wait); 784 try 785 { 786 synchronized (listener) 787 { 788 listenerThread.join(5000); 790 } 791 } 792 catch (InterruptedException e) 793 { 794 this.logger.debug("Caught Exception", e); 795 } 796 } 797 try 798 { 799 save_server_table (); 800 } 801 catch (FileOpFailed f) 802 { 803 this.logger.debug("ImR: Failed to save backup table.", f); 804 } 805 this.logger.debug("ImR: Finished shutting down"); 806 } 807 808 815 public void unregister_host(String name) 816 throws UnknownHostName{ 817 if (server_table.removeHost(name) == null) 818 throw new UnknownHostName(name); 819 } 820 821 827 private void save_server_table(File save_to) throws FileOpFailed { 828 try{ 829 ObjectOutputStream _out = new ObjectOutputStream(new FileOutputStream(save_to)); 830 831 server_table.table_lock.gainExclusiveLock(); 832 _out.writeObject(server_table); 833 server_table.table_lock.releaseExclusiveLock(); 834 835 _out.flush(); 836 _out.close(); 837 } 838 catch (Exception e) 839 { 840 this.logger.debug("Caught Exception", e); 841 throw new FileOpFailed(); 842 } 843 updatePending = false; 844 } 845 846 847 850 public static void usage () 851 { 852 System.out.println("Usage: The following properties are useful in conjunction with the \nImplementationRepository:"); 853 System.out.println("\t \"jacorb.imr.endpoint_host\" Address to listen on for requests"); 854 System.out.println("\t \"jacorb.imr.endpoint_port\" Port to listen on for requests"); 855 System.out.println("\t \"jacorb.imr.table_file\" The file to store the server table into"); 856 System.out.println("\t \"jacorb.imr.backup_file\" The file to store the server table backup into"); 857 System.out.println("\t \"jacorb.imr.ior_file\" The file to store the ImRs IOR into"); 858 System.out.println("\t \"jacorb.imr.allow_auto_register\" if set to \"on\", servers that don't \n\talready have an entry on their first call to the imr, will get \n\tautomatically registered. Otherwise, an UnknownServer exception \n\tis thrown."); 859 System.exit(0); 860 } 861 862 867 public static void main(String [] args) 868 { 869 java.util.Properties argProps = ObjectUtil.argsToProps( args ); 872 argProps.setProperty("jacorb.implname", "the_ImR"); 873 argProps.setProperty("jacorb.use_imr", "off"); 874 875 try 877 { 878 org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init( args, argProps ); 879 880 ImplementationRepositoryImpl _imr = 881 new ImplementationRepositoryImpl(orb); 882 _imr.configure(((org.jacorb.orb.ORB) orb).getConfiguration()); 883 884 POA root_poa = POAHelper.narrow(orb.resolve_initial_references("RootPOA")); 885 root_poa.the_POAManager().activate(); 886 887 org.omg.CORBA.Policy [] policies = new org.omg.CORBA.Policy [2]; 888 889 policies[0] = 890 root_poa.create_lifespan_policy(LifespanPolicyValue.PERSISTENT); 891 policies[1] = 892 root_poa.create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID); 893 894 POA imr_poa = root_poa.create_POA( "ImRPOA", 895 root_poa.the_POAManager(), 896 policies ); 897 898 for (int i=0; i<policies.length; i++) 899 { 900 policies[i].destroy(); 901 } 902 903 byte[] id = "ImR".getBytes(); 904 905 906 imr_poa.activate_object_with_id( id, _imr ); 907 908 PrintWriter _out = new PrintWriter 909 (new FileOutputStream(new File(_imr.getIORFile()))); 910 911 _out.println(orb.object_to_string(imr_poa.servant_to_reference(_imr))); 912 _out.flush(); 913 _out.close(); 914 915 orb.run(); 916 } 917 catch (Exception _e) 918 { 919 _e.printStackTrace(); 920 usage(); 921 System.exit(1); 922 } 923 } 924 925 private void restartServer(ImRServerInfo server) 926 throws ServerStartupFailed 927 { 928 server.awaitRelease(); 930 931 if(! server.active ) 932 { 933 if (this.logger.isDebugEnabled()) 934 { 935 this.logger.debug("ImR: server " + server.name + " is down"); 936 } 937 938 if (server.command.length() == 0){ 939 throw new ServerStartupFailed("Server " + server.name + 941 " can't be restarted because" + 942 " of missing startup command"); 943 } 944 else{ 945 if (server.shouldBeRestarted()){ 953 try{ 954 ImRHostInfo _host = server_table.getHost(server.host); 959 960 if( _host == null ) 961 { 962 throw new ServerStartupFailed( "Unknown host: >>" + 963 server.host + "<<" ); 964 } 965 966 if (this.logger.isDebugEnabled()) 967 { 968 this.logger.debug("ImR: will restart " + server.name); 969 } 970 971 _host.startServer(server.command, orb); 972 } 973 catch (ServerStartupFailed ssf) 974 { 975 server.setNotRestarting(); 976 977 throw ssf; 978 } 979 catch (Exception e) 980 { 981 server.setNotRestarting(); 982 983 this.logger.debug("Caught Exception", e); 984 985 server_table.removeHost(server.host); 987 988 throw new ServerStartupFailed("Failed to connect to host!"); 989 } 990 } 991 else 992 { 993 if (this.logger.isDebugEnabled()) 994 { 995 this.logger.debug("ImR: somebody else is restarting " + 996 server.name); 997 } 998 } 999 } 1000 } 1001 else 1002 { 1003 if (this.logger.isDebugEnabled()) 1004 { 1005 this.logger.debug("ImR: server " + server.name + " is active"); 1006 } 1007 } 1008 } 1009 1010 1011 1014 private void addShutdownHook (Thread thread) 1015 { 1016 Method method = getHookMethod ("addShutdownHook"); 1017 1018 if (method != null) 1019 { 1020 invokeHookMethod (method, thread); 1021 } 1022 } 1023 1024 private Method getHookMethod (String name) 1025 { 1026 Method method = null; 1027 Class [] params = new Class [1]; 1028 1029 params[0] = Thread .class; 1030 try 1031 { 1032 method = Runtime .class.getMethod (name, params); 1033 } 1034 catch (Throwable ex) {} 1035 1036 return method; 1037 } 1038 1039 private void invokeHookMethod (Method method, Thread thread) 1040 { 1041 Object [] args = new Object [1]; 1042 1043 args[0] = thread; 1044 try 1045 { 1046 method.invoke (Runtime.getRuntime (), args); 1047 } 1048 catch (Throwable ex) 1049 { 1050 if (this.logger.isDebugEnabled()) 1051 { 1052 this.logger.debug("Failed to invoke Runtime." + method.getName (), 1053 ex); 1054 } 1055 } 1056 } 1057 1058 1059 1066 private class SocketListener 1067 implements Runnable 1068 { 1069 private ServerSocket server_socket; 1070 private int port = 0; 1071 private String address; 1072 private int timeout = 0; 1073 private boolean run = true; 1074 private boolean wait = false; 1075 1076 private MessageReceptorPool receptor_pool = null; 1077 private RequestListener request_listener = null; 1078 private ReplyListener reply_listener = null; 1079 1080 private TransportManager transport_manager = null; 1081 1082 1085 public SocketListener() 1086 { 1087 request_listener = new ImRRequestListener(); 1088 reply_listener = new NoBiDirServerReplyListener(); 1089 } 1090 1091 public void configure(Configuration myConfiguration) 1092 throws ConfigurationException 1093 { 1094 receptor_pool = MessageReceptorPool.getInstance(myConfiguration); 1096 1097 try 1098 { 1099 int endpoint_port = 1100 configuration.getAttributeAsInteger( 1101 "jacorb.imr.endpoint_port_number",0); 1102 1103 String endpoint_host = 1104 configuration.getAttribute("jacorb.imr.endpoint_host", ""); 1105 1106 if (endpoint_host.length() > 0) 1107 { 1108 server_socket = 1109 new ServerSocket ( endpoint_port, 1110 50, InetAddress.getByName(endpoint_host)); 1112 } 1113 else 1114 { 1115 server_socket = 1117 new ServerSocket (endpoint_port); 1118 } 1119 1120 org.jacorb.orb.dns.DNSLookup lookup = 1121 new org.jacorb.orb.dns.DNSLookup(); 1122 lookup.configure(configuration); 1123 1124 if( endpoint_host.length() > 0 ) 1126 { 1127 address = lookup.inverseLookup( 1128 InetAddress.getByName( endpoint_host ) ); 1129 } 1130 else 1131 { 1132 address = lookup.inverseLookup( 1133 InetAddress.getLocalHost() ); 1134 } 1135 1136 if( address == null ) 1137 { 1138 if( endpoint_host.length() > 0 ) 1139 { 1140 address = endpoint_host; 1141 } 1142 else 1143 { 1144 address = InetAddress.getLocalHost().toString(); 1145 } 1146 } 1147 1148 if( address.indexOf("/") >= 0 ) 1149 address = address.substring(address.indexOf("/") + 1); 1150 1151 port = server_socket.getLocalPort(); 1152 1153 if (logger.isDebugEnabled()) 1154 { 1155 logger.debug("ImR Listener at " + port + ", " + address); 1156 } 1157 } 1158 catch (Exception e) 1159 { 1160 throw new ConfigurationException("Listener: Couldn't init", e); 1161 } 1162 1163 this.transport_manager = 1164 new TransportManager( (org.jacorb.orb.ORB) orb ); 1165 this.transport_manager.configure(configuration); 1166 } 1167 1168 1169 1174 public int getPort() 1175 { 1176 return port; 1177 } 1178 1179 1184 public String getAddress() 1185 { 1186 return address; 1187 } 1188 1189 1194 public void setTimeout( int timeout ) 1195 { 1196 this.timeout = timeout; 1197 } 1198 1199 1205 public void run() 1206 { 1207 while( run ) 1208 { 1209 try 1210 { 1211 Socket socket = server_socket.accept(); 1212 socket.setSoTimeout( timeout ); 1213 1214 org.jacorb.orb.iiop.ServerIIOPConnection transport = 1215 new ServerIIOPConnection (socket, 1216 false); transport.configure(configuration); 1218 1219 GIOPConnection connection = 1220 new ServerGIOPConnection( transport.get_server_profile(), 1221 transport, 1222 request_listener, 1223 reply_listener, 1224 null, 1225 null); 1226 connection.configure(configuration); 1227 receptor_pool.connectionCreated( connection ); 1228 } 1229 catch (Exception _e) 1230 { 1231 if (run) 1237 { 1238 logger.debug("Caught Exception", _e); 1239 } 1240 } 1241 } 1242 1243 orb.shutdown(wait); 1246 } 1247 1248 1253 public void stopListening(boolean wait) 1254 { 1255 run = false; 1256 this.wait = wait; 1257 1258 try 1259 { 1260 server_socket.close(); 1261 } 1262 catch (Exception _e) 1263 { 1264 logger.debug("Caught Exception", _e); 1265 } 1266 } 1267 } 1268 1269 1270 private boolean checkServerActive(String host, int port, byte []object_key) 1271 { 1272 ClientConnectionManager cm = null; 1273 IIOPAddress address = null; 1274 ClientConnection connection = null; 1275 LocateRequestOutputStream lros = null; 1276 LocateReplyReceiver receiver = null; 1277 LocateReplyInputStream lris = null; 1278 boolean result = false; 1279 1280 cm = ((org.jacorb.orb.ORB)orb).getClientConnectionManager (); 1281 try 1282 { 1283 address = new IIOPAddress (host, port); 1284 address.configure(configuration); 1285 1286 IIOPProfile iiopProfile = new IIOPProfile(address, object_key); 1287 iiopProfile.configure(configuration); 1288 1289 connection = cm.getConnection(iiopProfile); 1290 } 1291 catch(ConfigurationException e) 1292 { 1293 logger.debug("Failed to configure", e); 1294 } 1295 1296 if (this.logger.isDebugEnabled()) 1297 { 1298 this.logger.debug("Pinging " + host + " / " + port); 1299 } 1300 1301 try 1302 { 1303 lros = new LocateRequestOutputStream (object_key, connection.getId(), 2); 1304 receiver = new LocateReplyReceiver((org.jacorb.orb.ORB)orb); 1305 1306 connection.sendRequest( 1307 lros, 1308 receiver, 1309 lros.getRequestId (), 1310 true ); 1312 lris = receiver.getReply(); 1313 1314 switch (lris.rep_hdr.locate_status.value ()) 1315 { 1316 case LocateStatusType_1_2._UNKNOWN_OBJECT: 1317 case LocateStatusType_1_2._OBJECT_HERE: 1318 case LocateStatusType_1_2._OBJECT_FORWARD: 1319 case LocateStatusType_1_2._OBJECT_FORWARD_PERM: 1320 case LocateStatusType_1_2._LOC_SYSTEM_EXCEPTION: 1321 case LocateStatusType_1_2._LOC_NEEDS_ADDRESSING_MODE: 1322 default: 1323 { 1324 result = true; 1325 break; 1326 } 1327 } 1328 } 1329 catch (Throwable ex) 1330 { 1331 this.logger.debug("Caught Exception", ex); 1332 1333 result = false; 1334 } 1335 finally 1336 { 1337 cm.releaseConnection (connection); 1338 } 1339 return result; 1340 } 1341 1342 1343 1346 private class ImRRequestListener 1347 implements RequestListener 1348 { 1349 public ImRRequestListener() 1350 { 1351 } 1352 1353 1359 public void requestReceived( byte[] request, 1360 GIOPConnection connection ) 1361 { 1362 logger.debug("requestReceived"); 1363 connection.incPendingMessages(); 1364 1365 RequestInputStream in = new RequestInputStream( orb, request ); 1366 1367 replyNewLocation( ((org.jacorb.orb.ORB)orb).mapObjectKey( 1368 ParsedIOR.extractObjectKey(in.req_hdr.target, (org.jacorb.orb.ORB)orb)), 1369 in.req_hdr.request_id, 1370 in.getGIOPMinor(), 1371 connection ); 1372 } 1373 1374 public void locateRequestReceived( byte[] request, 1375 GIOPConnection connection ) 1376 { 1377 connection.incPendingMessages(); 1378 1379 LocateRequestInputStream in = 1380 new LocateRequestInputStream( orb, request ); 1381 1382 replyNewLocation( ParsedIOR.extractObjectKey(in.req_hdr.target, (org.jacorb.orb.ORB) orb), 1383 in.req_hdr.request_id, 1384 in.getGIOPMinor(), 1385 connection ); 1386 } 1387 1388 public void cancelRequestReceived( byte[] request, 1389 GIOPConnection connection ) 1390 { 1391 } 1393 1394 public void fragmentReceived( byte[] fragment, 1395 GIOPConnection connection ) 1396 { 1397 } 1399 1400 public void connectionClosed() 1401 { 1402 } 1403 1404 1409 private void replyNewLocation( byte[] object_key, 1410 int request_id, 1411 int giop_minor, 1412 GIOPConnection connection ) 1413 { 1414 String _poa_name = 1415 POAUtil.extractImplName( object_key ) + '/' + 1416 POAUtil.extractPOAName( object_key ); 1417 1418 ImRPOAInfo _poa = server_table.getPOA( _poa_name ); 1420 if (_poa == null) 1421 { 1422 sendSysException( 1423 new org.omg.CORBA.OBJECT_NOT_EXIST ( "POA " + 1424 _poa_name + 1425 " unknown" ), 1426 connection, 1427 request_id, 1428 giop_minor ); 1429 return; 1430 } 1431 1432 ImRServerInfo _server = _poa.server; 1434 1435 if (logger.isDebugEnabled()) 1436 { 1437 logger.debug("ImR: Looking up: " + _server.name); 1438 } 1439 1440 boolean ssd_valid = 1447 ( 1448 (_server.command.length() != 0) && 1449 (server_table.getHost(_server.host) != null) 1450 ); 1451 1452 if (_server.active && (check_object_liveness || ssd_valid)) 1453 { 1454 if (! checkServerActive (_poa.host, _poa.port, object_key)) 1457 { 1458 _server.setDown (); 1460 } 1461 } 1462 1463 try 1464 { 1465 restartServer( _server ); 1466 } 1467 catch( ServerStartupFailed ssf ) 1468 { 1469 if (logger.isDebugEnabled()) 1470 { 1471 logger.debug("Object (" + _server.name + ") on " 1472 + _poa.host + '/' + _poa.port + 1473 " not reachable"); 1474 } 1475 1476 sendSysException( new org.omg.CORBA.TRANSIENT (ssf.reason), 1477 connection, 1478 request_id, 1479 giop_minor ); 1480 return; 1481 } 1482 1483 boolean _old_poa_state = _poa.active; 1485 1486 if( ! _poa.awaitActivation() ) 1488 { 1489 sendSysException( new org.omg.CORBA.TRANSIENT ("Timeout exceeded"), 1491 connection, 1492 request_id, 1493 giop_minor ); 1494 return; 1495 } 1496 1497 ReplyOutputStream out = 1498 new ReplyOutputStream( request_id, 1499 org.omg.GIOP.ReplyStatusType_1_2.LOCATION_FORWARD, 1500 giop_minor, 1501 false, 1502 logger); 1503 1504 IIOPAddress addr = new IIOPAddress (_poa.host,(short)_poa.port); 1507 org.omg.IOP.IOR _ior = null; 1508 try 1509 { 1510 addr.configure(configuration); 1511 IIOPProfile p = new IIOPProfile (addr,object_key,giop_minor); 1512 p.configure(configuration); 1513 _ior = ParsedIOR.createObjectIOR(p); 1514 } 1515 catch(ConfigurationException e) 1516 { 1517 logger.debug("Caught exception", e); 1518 } 1519 1520 if( !_old_poa_state ) 1521 { 1522 1528 org.omg.CORBA.Object _object = 1529 orb.string_to_object( 1530 (new ParsedIOR( _ior, (org.jacorb.orb.ORB) orb, logger)).getIORString()); 1531 1532 for( int _i = 0; _i < object_activation_retries; _i++ ) 1534 { 1535 try 1536 { 1537 Thread.sleep( object_activation_sleep ); 1538 1539 if( ! _object._non_existent() ) { 1542 break; 1543 } 1544 } 1545 catch(Exception _e) 1546 { 1547 logger.debug("Caught Exception", _e); 1548 } 1549 } 1550 } 1551 1552 try 1553 { 1554 out.write_IOR(_ior); 1556 1557 if (logger.isDebugEnabled()) 1558 { 1559 logger.debug("ImR: Sending location forward for " + 1560 _server.name); 1561 } 1562 1563 connection.sendReply( out ); 1564 } 1565 catch( IOException _e ) 1566 { 1567 logger.debug("Caught Exception", _e); 1568 1569 sendSysException( new org.omg.CORBA.UNKNOWN (_e.toString()), 1570 connection, 1571 request_id, 1572 giop_minor ); 1573 } 1574 } 1575 1576 1582 private void sendSysException( org.omg.CORBA.SystemException sys_ex, 1583 GIOPConnection connection, 1584 int request_id, 1585 int giop_minor ) 1586 { 1587 ReplyOutputStream out = 1588 new ReplyOutputStream( request_id, 1589 org.omg.GIOP.ReplyStatusType_1_2.SYSTEM_EXCEPTION, 1590 giop_minor, 1591 false, 1592 logger); 1593 1594 SystemExceptionHelper.write( out, sys_ex ); 1595 1596 try 1597 { 1598 connection.sendReply( out ); 1599 } 1600 catch( IOException _e ) 1601 { 1602 logger.debug("Caught Exception", _e); 1603 } 1604 } 1605 } 1606 1607 1608 1612 private class WriteThread extends Thread 1613 { 1614 boolean done; 1615 1616 public WriteThread () 1617 { 1618 } 1619 1620 1623 public void run () 1624 { 1625 while (true) 1626 { 1627 try 1628 { 1629 save_server_table (table_file); 1630 } 1631 catch (FileOpFailed ex) 1632 { 1633 logger.debug("Caught Exception", ex); 1634 } 1635 1636 if (done) 1637 { 1638 break; 1639 } 1640 1641 if ( ! updatePending) 1644 { 1645 try 1646 { 1647 synchronized (this) 1648 { 1649 this.wait (); 1650 } 1651 } 1652 catch (InterruptedException ex) {} 1653 1654 logger.debug("ImR: IMR write thread waking up to save server table... "); 1655 } 1656 } 1657 } 1658 1659 1662 public void shutdown () 1663 { 1664 done = true; 1665 } 1666 } 1667 1668 1669 1673 private class Shutdown extends Thread 1674 { 1675 public synchronized void run () 1676 { 1677 logger.debug("ImR: Shutting down"); 1678 shutdown(true); 1679 } 1680 } 1681 1682} | Popular Tags |