1 22 package org.jboss.invocation.pooled.interfaces; 23 24 import java.io.IOException ; 25 import java.io.Externalizable ; 26 import java.io.ObjectInput ; 27 import java.io.ObjectOutput ; 28 import java.io.BufferedOutputStream ; 29 import java.io.BufferedInputStream ; 30 import java.io.ObjectInputStream ; 31 import java.io.ObjectOutputStream ; 32 import java.io.EOFException ; 33 import java.io.OptionalDataException ; 34 import java.io.UnsupportedEncodingException ; 35 import java.io.InterruptedIOException ; 36 import java.net.Socket ; 37 import java.net.SocketException ; 38 import java.rmi.MarshalledObject ; 39 import java.rmi.NoSuchObjectException ; 40 import java.rmi.ServerException ; 41 import java.rmi.ConnectException ; 42 import java.util.Iterator ; 43 import java.util.Map ; 44 import java.util.List ; 45 import java.util.LinkedList ; 46 47 import javax.transaction.TransactionRolledbackException ; 48 import javax.transaction.SystemException ; 49 import javax.net.ssl.SSLSocket; 50 import javax.net.ssl.HandshakeCompletedListener; 51 import javax.net.ssl.HandshakeCompletedEvent; 52 import javax.net.ssl.SSLException; 53 54 import org.jboss.invocation.Invocation; 55 import org.jboss.invocation.Invoker; 56 import org.jboss.tm.TransactionPropagationContextFactory; 57 import org.jboss.logging.Logger; 58 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 59 60 61 70 public class PooledInvokerProxy 71 implements Invoker, Externalizable 72 { 73 private static final Logger log = Logger.getLogger(PooledInvokerProxy.class); 75 76 private static final long serialVersionUID = -1456509931095566410L; 77 78 private static final int WIRE_VERSION = 1; 79 80 90 protected static TransactionPropagationContextFactory tpcFactory = null; 91 92 public static void setTPCFactory(TransactionPropagationContextFactory tpcf) { 96 tpcFactory = tpcf; 97 } 98 99 public static long getSocketTime = 0; 101 public static long readTime = 0; 102 public static long writeTime = 0; 103 public static long serializeTime = 0; 104 public static long deserializeTime = 0; 105 106 public static long usedPooled = 0; 107 108 private static int inUseCount = 0; 109 110 private static long socketConnectCount = 0; 111 112 private static long socketCloseCount = 0; 113 114 117 public static int MAX_RETRIES = 10; 118 119 120 protected static final Map connectionPools = new ConcurrentReaderHashMap(); 121 122 125 protected ServerAddress address; 126 127 132 protected LinkedList pool = null; 133 134 protected int maxPoolSize; 135 136 protected int retryCount = 1; 137 138 private transient boolean trace; 139 140 143 protected static class ClientSocket 144 implements HandshakeCompletedListener 145 { 146 public ObjectOutputStream out; 147 public ObjectInputStream in; 148 public Socket socket; 149 public int timeout; 150 public String sessionID; 151 private boolean handshakeComplete = false; 152 private boolean trace; 153 154 public ClientSocket(Socket socket, int timeout) throws Exception 155 { 156 this.socket = socket; 157 trace = log.isTraceEnabled(); 158 boolean needHandshake = false; 159 160 if( socket instanceof SSLSocket ) 161 { 162 SSLSocket ssl = (SSLSocket) socket; 163 ssl.addHandshakeCompletedListener(this); 164 if( trace ) 165 log.trace("Starting SSL handshake"); 166 needHandshake = true; 167 handshakeComplete = false; 168 ssl.startHandshake(); 169 } 170 socket.setSoTimeout(timeout); 171 this.timeout = timeout; 172 out = new OptimizedObjectOutputStream(new BufferedOutputStream (socket.getOutputStream())); 173 out.flush(); 174 in = new OptimizedObjectInputStream(new BufferedInputStream (socket.getInputStream())); 175 if( needHandshake ) 176 { 177 socket.setSoTimeout(1000); 179 for(int n = 0; handshakeComplete == false && n < 60; n ++) 180 { 181 try 182 { 183 int b = in.read(); 184 } 185 catch(SSLException e) 186 { 187 if( trace ) 188 log.trace("Error while waiting for handshake to complete", e); 189 throw e; 190 } 191 catch(IOException e) 192 { 193 if( trace ) 194 log.trace("Handshaked read()", e); 195 } 196 } 197 if( handshakeComplete == false ) 198 throw new SSLException("Handshaked failed to complete in 60 seconds"); 199 socket.setSoTimeout(timeout); 201 } 202 203 } 204 205 public void handshakeCompleted(HandshakeCompletedEvent event) 206 { 207 handshakeComplete = true; 208 byte[] id = event.getSession().getId(); 209 try 210 { 211 sessionID = new String (id, "UTF-8"); 212 } 213 catch (UnsupportedEncodingException e) 214 { 215 log.warn("Failed to create session id using UTF-8, using default", e); 216 sessionID = new String (id); 217 } 218 if( trace ) 219 { 220 log.trace("handshakeCompleted, event="+event+", sessionID="+sessionID); 221 } 222 } 223 224 public String toString() 225 { 226 StringBuffer tmp = new StringBuffer ("ClientSocket@"); 227 tmp.append(System.identityHashCode(this)); 228 tmp.append('['); 229 tmp.append("socket="); 230 tmp.append(socket.toString()); 231 tmp.append(']'); 232 return tmp.toString(); 233 } 234 235 239 protected void finalize() 240 { 241 if (socket != null) 242 { 243 if( trace ) 244 log.trace("Closing socket in finalize: "+socket); 245 try 246 { 247 socketCloseCount --; 248 socket.close(); 249 } 250 catch (Exception ignored) {} 251 finally 252 { 253 socket = null; 254 } 255 } 256 } 257 } 258 259 262 public static void clearStats() 263 { 264 getSocketTime = 0; 265 readTime = 0; 266 writeTime = 0; 267 serializeTime = 0; 268 deserializeTime = 0; 269 usedPooled = 0; 270 } 271 272 275 public static long getInUseCount() 276 { 277 return inUseCount; 278 } 279 280 283 public static long getUsedPooled() 284 { 285 return usedPooled; 286 } 287 public static long getSocketConnectCount() 288 { 289 return socketConnectCount; 290 } 291 public static long getSocketCloseCount() 292 { 293 return socketCloseCount; 294 } 295 296 299 public static int getTotalPoolCount() 300 { 301 int count = 0; 302 Iterator iter = connectionPools.values().iterator(); 303 while( iter.hasNext() ) 304 { 305 List pool = (List ) iter.next(); 306 if( pool != null ) 307 count += pool.size(); 308 } 309 return count; 310 } 311 312 315 public long getPoolCount() 316 { 317 return pool.size(); 318 } 319 320 323 public PooledInvokerProxy() 324 { 325 super(); 326 trace = log.isTraceEnabled(); 327 } 328 329 333 public PooledInvokerProxy(ServerAddress sa, int maxPoolSize) 334 { 335 this(sa, maxPoolSize, MAX_RETRIES); 336 } 337 public PooledInvokerProxy(ServerAddress sa, int maxPoolSize, int retryCount) 338 { 339 this.address = sa; 340 this.maxPoolSize = maxPoolSize; 341 this.retryCount = retryCount; 342 } 343 344 347 public static void clearPool(ServerAddress sa) 348 { 349 boolean trace = log.isTraceEnabled(); 350 if( trace ) 351 log.trace("clearPool, sa: "+sa); 352 try 353 { 354 LinkedList thepool = (LinkedList )connectionPools.get(sa); 355 if (thepool == null) return; 356 synchronized (thepool) 357 { 358 int size = thepool.size(); 359 for (int i = 0; i < size; i++) 360 { 361 ClientSocket cs = null; 362 try 363 { 364 ClientSocket socket = (ClientSocket)thepool.removeFirst(); 365 cs = socket; 366 if( trace ) 367 log.trace("Closing, ClientSocket: "+socket); 368 socketCloseCount --; 369 socket.socket.close(); 370 } 371 catch (Exception ignored) 372 { 373 } 374 finally 375 { 376 if( cs != null ) 377 cs.socket = null; 378 } 379 } 380 } 381 } 382 catch (Exception ex) 383 { 384 } 386 } 387 390 public static void clearPools() 391 { 392 synchronized (connectionPools) 393 { 394 Iterator it = connectionPools.keySet().iterator(); 395 while (it.hasNext()) 396 { 397 ServerAddress sa = (ServerAddress)it.next(); 398 clearPool(sa); 399 } 400 } 401 } 402 403 protected void initPool() 404 { 405 synchronized (connectionPools) 406 { 407 pool = (LinkedList )connectionPools.get(address); 408 if (pool == null) 409 { 410 pool = new LinkedList (); 411 connectionPools.put(address, pool); 412 } 413 } 414 } 415 416 protected ClientSocket getConnection() throws Exception 417 { 418 Socket socket = null; 419 ClientSocket cs = null; 420 421 for (int i = 0; i < retryCount; i++) 434 { 435 ClientSocket pooled = getPooledConnection(); 436 if (pooled != null) 437 { 438 usedPooled++; 439 inUseCount ++; 440 return pooled; 441 } 442 443 try 444 { 445 if( trace) 446 { 447 log.trace("Connecting to addr: "+address.address 448 +", port: "+address.port 449 +",clientSocketFactory: "+address.clientSocketFactory 450 +",enableTcpNoDelay: "+address.enableTcpNoDelay 451 +",timeout: "+address.timeout); 452 } 453 if( address.clientSocketFactory != null ) 454 socket = address.clientSocketFactory.createSocket(address.address, address.port); 455 else 456 socket = new Socket (address.address, address.port); 457 socketConnectCount ++; 458 if( trace ) 459 log.trace("Connected, socket="+socket); 460 461 socket.setTcpNoDelay(address.enableTcpNoDelay); 462 cs = new ClientSocket(socket, address.timeout); 463 inUseCount ++; 464 if( trace ) 465 { 466 log.trace("New ClientSocket: "+cs 467 +", usedPooled="+ usedPooled 468 +", inUseCount="+ inUseCount 469 +", socketConnectCount="+ socketConnectCount 470 +", socketCloseCount="+ socketCloseCount 471 ); 472 } 473 break; 474 } 475 catch (Exception ex) 476 { 477 if( ex instanceof InterruptedIOException || ex instanceof SocketException ) 478 { 479 if( trace ) 480 log.trace("Connect failed", ex); 481 if (i + 1 < retryCount) 482 { 483 Thread.sleep(1); 484 continue; 485 } 486 } 487 throw ex; 488 } 489 } 490 if( cs == null ) 492 throw new ConnectException ("Failed to obtain a socket, tries="+retryCount); 493 return cs; 494 } 495 496 protected synchronized ClientSocket getPooledConnection() 497 { 498 ClientSocket socket = null; 499 while (pool.size() > 0) 500 { 501 try 502 { 503 synchronized( pool ) 504 { 505 socket = (ClientSocket)pool.removeFirst(); 506 } 507 if( trace ) 509 log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress()); 510 final byte ACK = 1; 511 socket.out.writeByte(ACK); 512 socket.out.flush(); 513 socket.in.readByte(); 514 if( trace ) 515 { 516 log.trace("Using pooled ClientSocket: "+socket 517 +", usedPooled="+ usedPooled 518 +", inUseCount="+ inUseCount 519 +", socketConnectCount="+ socketConnectCount 520 +", socketCloseCount="+ socketCloseCount 521 ); 522 } 523 return socket; 524 } 525 catch (Exception ex) 526 { 527 if( trace ) 528 log.trace("Failed to validate pooled socket: "+socket, ex); 529 try 530 { 531 if( socket != null ) 532 { 533 socketCloseCount --; 534 socket.socket.close(); 535 } 536 } 537 catch (Exception ignored) 538 { 539 } 540 finally 541 { 542 if( socket != null ) 543 socket.socket = null; 544 } 545 } 546 } 547 return null; 548 } 549 550 556 protected synchronized boolean returnConnection(ClientSocket socket) 557 { 558 boolean pooled = false; 559 synchronized( pool ) 560 { 561 if (pool.size() < maxPoolSize) 562 { 563 pool.add(socket); 564 inUseCount --; 565 pooled = true; 566 } 567 } 568 return pooled; 569 } 570 571 574 public String getServerHostName() throws Exception 575 { 576 return address.address; 577 } 578 579 589 public Object getTransactionPropagationContext() 590 throws SystemException 591 { 592 return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext(); 593 } 594 595 596 600 public Object invoke(Invocation invocation) 601 throws Exception 602 { 603 boolean trace = log.isTraceEnabled(); 604 PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation); 606 607 mi.setTransactionPropagationContext(getTransactionPropagationContext()); 610 611 Object response = null; 612 long start = System.currentTimeMillis(); 613 ClientSocket socket = getConnection(); 614 long end = System.currentTimeMillis() - start; 615 getSocketTime += end; 616 if( socket.sessionID != null ) 618 { 619 mi.setValue("SESSION_ID", socket.sessionID); 620 if( trace ) 621 log.trace("Added SESSION_ID to invocation"); 622 } 623 624 try 625 { 626 if( trace ) 627 log.trace("Sending invocation to: "+mi.getObjectName()); 628 socket.out.writeObject(mi); 629 socket.out.reset(); 630 socket.out.writeObject(Boolean.TRUE); socket.out.flush(); 632 socket.out.reset(); 633 end = System.currentTimeMillis() - start; 634 writeTime += end; 635 start = System.currentTimeMillis(); 636 response = socket.in.readObject(); 637 socket.in.readObject(); 641 end = System.currentTimeMillis() - start; 642 readTime += end; 643 } 644 catch (Exception ex) 645 { 646 if( trace ) 647 log.trace("Failure during invoke", ex); 648 try 649 { 650 socketCloseCount --; 651 socket.socket.close(); 652 } 653 catch (Exception ignored) {} 654 finally 655 { 656 socket.socket = null; 657 } 658 throw new java.rmi.ConnectException ("Failure during invoke", ex); 659 } 660 661 if( returnConnection(socket) == false ) 663 { 664 if( trace ) 666 log.trace("Closing unpooled socket: "+socket); 667 try 668 { 669 socketCloseCount --; 670 socket.socket.close(); 671 } 672 catch (Exception ignored) {} 673 finally 674 { 675 socket.socket = null; 676 } 677 } 678 679 681 try 682 { 683 if (response instanceof Exception ) 684 { 685 throw ((Exception )response); 686 } 687 if (response instanceof MarshalledObject ) 688 { 689 return ((MarshalledObject )response).get(); 690 } 691 return response; 692 } 693 catch (ServerException ex) 694 { 695 if (ex.detail instanceof NoSuchObjectException ) 699 { 700 throw (NoSuchObjectException ) ex.detail; 701 } 702 if (ex.detail instanceof TransactionRolledbackException ) 704 { 705 throw (TransactionRolledbackException ) ex.detail; 706 } 707 throw ex; 708 } 709 } 710 711 720 public void writeExternal(final ObjectOutput out) 721 throws IOException 722 { 723 out.writeObject(address); 725 out.writeInt(maxPoolSize); 726 out.writeInt(WIRE_VERSION); 728 out.writeInt(retryCount); 729 } 730 731 public void readExternal(final ObjectInput in) 732 throws IOException , ClassNotFoundException 733 { 734 trace = log.isTraceEnabled(); 735 address = (ServerAddress)in.readObject(); 736 maxPoolSize = in.readInt(); 737 int version = 0; 738 try 739 { 740 version = in.readInt(); 741 } 742 catch(EOFException e) 743 { 744 } 746 catch(OptionalDataException e) 747 { 748 } 750 751 switch( version ) 752 { 753 case 0: 754 retryCount = MAX_RETRIES; 756 break; 757 case 1: 758 readVersion1(in); 759 break; 760 default: 761 764 break; 765 } 766 initPool(); 767 } 768 769 private void readVersion1(final ObjectInput in) 770 throws IOException 771 { 772 retryCount = in.readInt(); 773 } 774 } 775 | Popular Tags |