1 22 package org.jboss.invocation.pooled.server; 23 24 import java.net.InetAddress ; 25 import java.net.ServerSocket ; 26 import java.net.Socket ; 27 import java.net.UnknownHostException ; 28 import java.util.LinkedList ; 29 import java.security.PrivilegedExceptionAction ; 30 import java.security.AccessController ; 31 import java.security.PrivilegedActionException ; 32 import java.lang.reflect.Method ; 33 import java.rmi.NoSuchObjectException ; 34 import javax.management.ObjectName ; 35 import javax.naming.InitialContext ; 36 import javax.transaction.Transaction ; 37 import javax.transaction.TransactionManager ; 38 import javax.net.SocketFactory; 39 import javax.net.ServerSocketFactory; 40 41 import org.jboss.invocation.Invocation; 42 import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy; 43 import org.jboss.invocation.pooled.interfaces.ServerAddress; 44 import org.jboss.invocation.pooled.interfaces.PooledMarshalledInvocation; 45 import org.jboss.logging.Logger; 46 import org.jboss.proxy.TransactionInterceptor; 47 import org.jboss.system.Registry; 48 import org.jboss.system.ServiceMBeanSupport; 49 import org.jboss.system.server.ServerConfigUtil; 50 import org.jboss.tm.TransactionPropagationContextFactory; 51 import org.jboss.tm.TransactionPropagationContextImporter; 52 import org.jboss.tm.TransactionPropagationContextUtil; 53 import org.jboss.security.SecurityDomain; 54 import org.jboss.net.sockets.DefaultSocketFactory; 55 56 78 public class PooledInvoker extends ServiceMBeanSupport 79 implements PooledInvokerMBean, Runnable 80 { 81 82 85 final static protected Logger log = Logger.getLogger(PooledInvoker.class); 86 87 90 protected boolean enableTcpNoDelay = false; 91 92 95 protected String serverBindAddress = null; 96 97 100 protected int serverBindPort = 0; 101 102 105 protected String clientConnectAddress = null; 106 107 110 protected int clientConnectPort = 0; 111 114 protected int clientRetryCount = 1; 115 116 protected int backlog = 200; 117 118 119 protected String clientSocketFactoryName; 120 121 122 protected String serverSocketFactoryName; 123 124 125 protected SocketFactory clientSocketFactory; 126 127 128 protected ServerSocketFactory serverSocketFactory; 129 130 protected ServerSocket serverSocket = null; 131 132 133 protected String sslDomain; 134 135 protected int timeout = 60000; 137 protected int maxPoolSize = 300; 138 139 protected int clientMaxPoolSize = 300; 140 141 protected int numAcceptThreads = 1; 142 protected Thread [] acceptThreads; 143 144 protected LRUPool clientpool; 145 protected LinkedList threadpool; 146 protected boolean running = true; 147 148 protected boolean trace = false; 149 153 protected ObjectName transactionManagerService; 154 155 protected PooledInvokerProxy optimizedInvokerProxy = null; 156 157 private MBeanServerAction serverAction = new MBeanServerAction(); 158 159 protected static TransactionPropagationContextFactory tpcFactory; 160 protected static TransactionPropagationContextImporter tpcImporter; 161 162 168 protected void jmxBind() 169 { 170 Registry.bind(getServiceName(), optimizedInvokerProxy); 171 } 172 173 178 public void startService() throws Exception 179 { 180 trace = log.isTraceEnabled(); 181 182 InitialContext ctx = new InitialContext (); 186 187 tpcFactory = TransactionPropagationContextUtil.getTPCFactory(); 189 190 tpcImporter = TransactionPropagationContextUtil.getTPCImporter(); 192 193 TransactionInterceptor.setTransactionManager((TransactionManager )ctx.lookup("java:/TransactionManager")); 195 196 200 InetAddress bindAddress = 201 (serverBindAddress == null || serverBindAddress.length() == 0) 202 ? null 203 : InetAddress.getByName(serverBindAddress); 204 205 clientConnectAddress = 206 (clientConnectAddress == null || clientConnectAddress.length() == 0) 207 ? InetAddress.getLocalHost().getHostName() 208 : clientConnectAddress; 209 213 clientConnectAddress = ServerConfigUtil.fixRemoteAddress(clientConnectAddress); 214 215 loadCustomSocketFactories(); 217 218 clientpool = new LRUPool(2, maxPoolSize); 219 clientpool.create(); 220 threadpool = new LinkedList (); 221 try 222 { 223 if( serverSocketFactory != null ) 224 serverSocket = serverSocketFactory.createServerSocket(serverBindPort, backlog, bindAddress); 225 else 226 serverSocket = new ServerSocket (serverBindPort, backlog, bindAddress); 227 } 228 catch( java.net.BindException be) 229 { 230 throw new Exception ("Port "+serverBindPort+" is already in use",be); 231 } 232 serverBindPort = serverSocket.getLocalPort(); 233 clientConnectPort = (clientConnectPort == 0) ? serverSocket.getLocalPort() : clientConnectPort; 234 235 ServerAddress sa = new ServerAddress(clientConnectAddress, clientConnectPort, 236 enableTcpNoDelay, timeout, clientSocketFactory); 237 optimizedInvokerProxy = new PooledInvokerProxy(sa, clientMaxPoolSize, clientRetryCount); 238 239 jmxBind(); 244 log.debug("Bound invoker for JMX node"); 245 ctx.close(); 246 247 acceptThreads = new Thread [numAcceptThreads]; 248 for (int i = 0; i < numAcceptThreads; i++) 249 { 250 String name = "PooledInvokerAcceptor#"+i+"-"+serverBindPort; 251 acceptThreads[i] = new Thread (this, name); 252 acceptThreads[i].start(); 253 } 254 } 255 256 public void run() 257 { 258 while (running) 259 { 260 try 261 { 262 Socket socket = serverSocket.accept(); 263 if( trace ) 264 log.trace("Accepted: "+socket); 265 ServerThread thread = null; 266 boolean newThread = false; 267 268 while (thread == null) 269 { 270 synchronized(threadpool) 271 { 272 if (threadpool.size() > 0) 273 { 274 thread = (ServerThread)threadpool.removeFirst(); 275 } 276 } 277 if (thread == null) 278 { 279 synchronized(clientpool) 280 { 281 if (clientpool.size() < maxPoolSize) 282 { 283 thread = new ServerThread(socket, this, clientpool, threadpool, timeout); 284 newThread = true; 285 } 286 if (thread == null) 287 { 288 clientpool.evict(); 289 if( trace ) 290 log.trace("Waiting for a thread..."); 291 clientpool.wait(); 292 if( trace ) 293 log.trace("Notified of available thread"); 294 } 295 } 296 } 297 } 298 synchronized(clientpool) 299 { 300 clientpool.insert(thread, thread); 301 } 302 303 if (newThread) 304 { 305 if( trace ) 306 log.trace("Created a new thread, t="+thread); 307 thread.start(); 308 } 309 else 310 { 311 if( trace ) 312 log.trace("Reusing thread t="+thread); 313 thread.wakeup(socket, timeout); 314 } 315 } 316 catch (Throwable ex) 317 { 318 if (running) 319 log.error("Failed to accept socket connection", ex); 320 } 321 } 322 } 323 324 327 public void stopService() throws Exception 328 { 329 running = false; 330 maxPoolSize = 0; for (int i = 0; i < acceptThreads.length; i++) 332 { 333 try 334 { 335 acceptThreads[i].interrupt(); 336 } 337 catch (Exception ignored){} 338 } 339 clientpool.flush(); 340 for (int i = 0; i < threadpool.size(); i++) 341 { 342 ServerThread thread = (ServerThread)threadpool.removeFirst(); 343 thread.shutdown(); 344 } 345 346 try 347 { 348 serverSocket.close(); 349 } 350 catch(Exception e) 351 { 352 } 353 } 354 355 protected void destroyService() throws Exception 356 { 357 Registry.unbind(getServiceName()); 359 } 360 361 365 public Object invoke(Invocation invocation) throws Exception 366 { 367 Thread currentThread = Thread.currentThread(); 368 ClassLoader oldCl = currentThread.getContextClassLoader(); 369 try 370 { 371 372 PooledMarshalledInvocation mi = (PooledMarshalledInvocation) invocation; 374 invocation.setTransaction(importTPC(mi.getTransactionPropagationContext())); 375 ObjectName mbean = (ObjectName ) Registry.lookup(invocation.getObjectName()); 376 if( mbean == null ) 377 { 378 System.err.println("NoSuchObjectException: "+invocation.getObjectName()); 379 throw new NoSuchObjectException ("Failed to find target for objectName: "+invocation.getObjectName()); 380 } 381 382 Object obj = serverAction.invoke(mbean, "invoke", 384 new Object [] { invocation }, Invocation.INVOKE_SIGNATURE); 385 386 return obj; 387 } 388 catch (Exception e) 389 { 390 org.jboss.mx.util.JMXExceptionDecoder.rethrow(e); 391 392 throw new org.jboss.util.UnreachableStatementException(); 394 } 395 finally 396 { 397 currentThread.setContextClassLoader(oldCl); 398 } 399 } 400 401 protected Transaction importTPC(Object tpc) 402 { 403 if (tpc != null) 404 return tpcImporter.importTransactionPropagationContext(tpc); 405 return null; 406 } 407 408 410 416 public int getNumAcceptThreads() 417 { 418 return numAcceptThreads; 419 } 420 421 427 public void setNumAcceptThreads(int size) 428 { 429 this.numAcceptThreads = size; 430 } 431 432 438 public int getMaxPoolSize() 439 { 440 return maxPoolSize; 441 } 442 443 449 public void setMaxPoolSize(int maxPoolSize) 450 { 451 this.maxPoolSize = maxPoolSize; 452 } 453 454 460 public int getClientMaxPoolSize() 461 { 462 return clientMaxPoolSize; 463 } 464 465 471 public void setClientMaxPoolSize(int clientMaxPoolSize) 472 { 473 this.clientMaxPoolSize = clientMaxPoolSize; 474 } 475 476 482 public int getSocketTimeout() 483 { 484 return timeout; 485 } 486 487 493 public void setSocketTimeout(int time) 494 { 495 this.timeout = time; 496 } 497 498 503 public int getCurrentClientPoolSize() 504 { 505 return clientpool.size(); 506 } 507 508 513 public int getCurrentThreadPoolSize() 514 { 515 return threadpool.size(); 516 } 517 518 524 public int getServerBindPort() 525 { 526 return serverBindPort; 527 } 528 529 535 public void setServerBindPort(int serverBindPort) 536 { 537 this.serverBindPort = serverBindPort; 538 } 539 540 543 public String getClientConnectAddress() 544 { 545 return clientConnectAddress; 546 } 547 548 551 public void setClientConnectAddress(String clientConnectAddress) 552 { 553 this.clientConnectAddress = clientConnectAddress; 554 } 555 556 559 public int getClientConnectPort() 560 { 561 return clientConnectPort; 562 } 563 564 567 public void setClientConnectPort(int clientConnectPort) 568 { 569 this.clientConnectPort = clientConnectPort; 570 } 571 572 575 public int getClientRetryCount() 576 { 577 return clientRetryCount; 578 } 579 580 583 public void setClientRetryCount(int clientRetryCount) 584 { 585 this.clientRetryCount = clientRetryCount; 586 } 587 588 591 public int getBacklog() 592 { 593 return backlog; 594 } 595 596 599 public void setBacklog(int backlog) 600 { 601 this.backlog = backlog; 602 } 603 604 607 public boolean isEnableTcpNoDelay() 608 { 609 return enableTcpNoDelay; 610 } 611 612 615 public void setEnableTcpNoDelay(boolean enableTcpNoDelay) 616 { 617 this.enableTcpNoDelay = enableTcpNoDelay; 618 } 619 620 623 public String getServerBindAddress() 624 { 625 return serverBindAddress; 626 } 627 628 631 public void setServerBindAddress(String serverBindAddress) 632 { 633 this.serverBindAddress = serverBindAddress; 634 } 635 636 639 public String getClientSocketFactoryName() 640 { 641 return clientSocketFactoryName; 642 } 643 644 647 public void setClientSocketFactoryName(String clientSocketFactoryName) 648 { 649 this.clientSocketFactoryName = clientSocketFactoryName; 650 } 651 652 655 public String getServerSocketFactoryName() 656 { 657 return serverSocketFactoryName; 658 } 659 660 663 public void setServerSocketFactoryName(String serverSocketFactoryName) 664 { 665 this.serverSocketFactoryName = serverSocketFactoryName; 666 } 667 668 671 public SocketFactory getClientSocketFactory() 672 { 673 return clientSocketFactory; 674 } 675 676 679 public void setClientSocketFactory(SocketFactory clientSocketFactory) 680 { 681 this.clientSocketFactory = clientSocketFactory; 682 } 683 684 687 public ServerSocket getServerSocket() 688 { 689 return serverSocket; 690 } 691 692 695 public void setServerSocket(ServerSocket serverSocket) 696 { 697 this.serverSocket = serverSocket; 698 } 699 700 703 public String getSslDomain() 704 { 705 return sslDomain; 706 } 707 708 711 public void setSslDomain(String sslDomain) 712 { 713 this.sslDomain = sslDomain; 714 } 715 716 719 public ServerSocketFactory getServerSocketFactory() 720 { 721 return serverSocketFactory; 722 } 723 724 727 public void setServerSocketFactory(ServerSocketFactory serverSocketFactory) 728 { 729 this.serverSocketFactory = serverSocketFactory; 730 } 731 732 739 public ObjectName getTransactionManagerService() 740 { 741 return transactionManagerService; 742 } 743 744 745 751 public void setTransactionManagerService(ObjectName transactionManagerService) 752 { 753 this.transactionManagerService = transactionManagerService; 754 } 755 756 759 public PooledInvokerProxy getOptimizedInvokerProxy() 760 { 761 return optimizedInvokerProxy; 762 } 763 764 768 protected void loadCustomSocketFactories() 769 { 770 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 771 772 try 773 { 774 if( clientSocketFactoryName != null ) 775 { 776 Class csfClass = loader.loadClass(clientSocketFactoryName); 777 clientSocketFactory = (SocketFactory) csfClass.newInstance(); 778 } 779 } 780 catch (Exception e) 781 { 782 log.error("Failed to load client socket factory", e); 783 clientSocketFactory = null; 784 } 785 786 try 787 { 788 if( serverSocketFactory == null ) 789 { 790 if( serverSocketFactoryName != null ) 791 { 792 Class ssfClass = loader.loadClass(serverSocketFactoryName); 793 serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance(); 794 if( serverBindAddress != null ) 795 { 796 try 798 { 799 Class [] parameterTypes = {String .class}; 800 Method m = ssfClass.getMethod("setBindAddress", parameterTypes); 801 Object [] args = {serverBindAddress}; 802 m.invoke(serverSocketFactory, args); 803 } 804 catch (NoSuchMethodException e) 805 { 806 log.warn("Socket factory does not support setBindAddress(String)"); 807 } 809 catch (Exception e) 810 { 811 log.warn("Failed to setBindAddress="+serverBindAddress+" on socket factory", e); 812 } 814 } 815 818 if( sslDomain != null ) 819 { 820 try 821 { 822 InitialContext ctx = new InitialContext (); 823 SecurityDomain domain = (SecurityDomain) ctx.lookup(sslDomain); 824 Class [] parameterTypes = {SecurityDomain.class}; 825 Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes); 826 Object [] args = {domain}; 827 m.invoke(serverSocketFactory, args); 828 } 829 catch(NoSuchMethodException e) 830 { 831 log.error("Socket factory does not support setSecurityDomain(SecurityDomain)"); 832 } 833 catch(Exception e) 834 { 835 log.error("Failed to setSecurityDomain="+sslDomain+" on socket factory", e); 836 } 837 } 838 } 839 else if( serverBindAddress != null ) 841 { 842 DefaultSocketFactory defaultFactory = new DefaultSocketFactory(backlog); 843 serverSocketFactory = defaultFactory; 844 try 845 { 846 defaultFactory.setBindAddress(serverBindAddress); 847 } 848 catch (UnknownHostException e) 849 { 850 log.error("Failed to setBindAddress="+serverBindAddress+" on socket factory", e); 851 } 852 } 853 } 854 } 855 catch (Exception e) 856 { 857 log.error("operation failed", e); 858 serverSocketFactory = null; 859 } 860 } 861 862 865 class MBeanServerAction implements PrivilegedExceptionAction 866 { 867 private ObjectName target; 868 String method; 869 Object [] args; 870 String [] sig; 871 872 MBeanServerAction() 873 { 874 } 875 MBeanServerAction(ObjectName target, String method, Object [] args, String [] sig) 876 { 877 this.target = target; 878 this.method = method; 879 this.args = args; 880 this.sig = sig; 881 } 882 883 public Object run() throws Exception 884 { 885 Object rtnValue = server.invoke(target, method, args, sig); 886 return rtnValue; 887 } 888 Object invoke(ObjectName target, String method, Object [] args, String [] sig) 889 throws Exception 890 { 891 SecurityManager sm = System.getSecurityManager(); 892 Object rtnValue = null; 893 if( sm == null ) 894 { 895 rtnValue = server.invoke(target, method, args, sig); 897 } 898 else 899 { 900 try 901 { 902 MBeanServerAction action = new MBeanServerAction(target, method, args, sig); 904 rtnValue = AccessController.doPrivileged(action); 905 } 906 catch (PrivilegedActionException e) 907 { 908 Exception ex = e.getException(); 909 throw ex; 910 } 911 } 912 return rtnValue; 913 } 914 } 915 } 916 | Popular Tags |