| 1 31 32 package org.apache.commons.httpclient; 33 34 import java.io.IOException ; 35 import java.io.InputStream ; 36 import java.io.OutputStream ; 37 import java.lang.ref.Reference ; 38 import java.lang.ref.ReferenceQueue ; 39 import java.lang.ref.WeakReference ; 40 import java.net.InetAddress ; 41 import java.net.SocketException ; 42 import java.util.ArrayList ; 43 import java.util.HashMap ; 44 import java.util.Iterator ; 45 import java.util.LinkedList ; 46 import java.util.Map ; 47 import java.util.WeakHashMap ; 48 49 import org.apache.commons.httpclient.protocol.Protocol; 50 import org.apache.commons.logging.Log; 51 import org.apache.commons.logging.LogFactory; 52 53 63 public class MultiThreadedHttpConnectionManager implements HttpConnectionManager { 64 65 67 private static final Log LOG = LogFactory.getLog(MultiThreadedHttpConnectionManager.class); 68 69 70 public static final int DEFAULT_MAX_HOST_CONNECTIONS = 2; 72 73 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 20; 74 75 79 private static final Map REFERENCE_TO_CONNECTION_SOURCE = new HashMap (); 80 81 85 private static final ReferenceQueue REFERENCE_QUEUE = new ReferenceQueue (); 86 87 90 private static ReferenceQueueThread REFERENCE_QUEUE_THREAD; 91 92 95 private static WeakHashMap ALL_CONNECTION_MANAGERS = new WeakHashMap (); 96 97 105 public static void shutdownAll() { 106 107 synchronized (REFERENCE_TO_CONNECTION_SOURCE) { 108 synchronized (ALL_CONNECTION_MANAGERS) { 110 Iterator connIter = ALL_CONNECTION_MANAGERS.keySet().iterator(); 111 while (connIter.hasNext()) { 112 MultiThreadedHttpConnectionManager connManager = 113 (MultiThreadedHttpConnectionManager) connIter.next(); 114 connIter.remove(); 115 connManager.shutdown(); 116 } 117 } 118 119 if (REFERENCE_QUEUE_THREAD != null) { 121 REFERENCE_QUEUE_THREAD.shutdown(); 122 REFERENCE_QUEUE_THREAD = null; 123 } 124 REFERENCE_TO_CONNECTION_SOURCE.clear(); 125 } 126 } 127 128 145 private static void storeReferenceToConnection( 146 HttpConnectionWithReference connection, 147 HostConfiguration hostConfiguration, 148 ConnectionPool connectionPool 149 ) { 150 151 ConnectionSource source = new ConnectionSource(); 152 source.connectionPool = connectionPool; 153 source.hostConfiguration = hostConfiguration; 154 155 synchronized (REFERENCE_TO_CONNECTION_SOURCE) { 156 157 if (REFERENCE_QUEUE_THREAD == null) { 159 REFERENCE_QUEUE_THREAD = new ReferenceQueueThread(); 160 REFERENCE_QUEUE_THREAD.start(); 161 } 162 163 REFERENCE_TO_CONNECTION_SOURCE.put( 164 connection.reference, 165 source 166 ); 167 } 168 } 169 170 174 private static void shutdownCheckedOutConnections(ConnectionPool connectionPool) { 175 176 ArrayList connectionsToClose = new ArrayList (); 178 179 synchronized (REFERENCE_TO_CONNECTION_SOURCE) { 180 181 Iterator referenceIter = REFERENCE_TO_CONNECTION_SOURCE.keySet().iterator(); 182 while (referenceIter.hasNext()) { 183 Reference ref = (Reference ) referenceIter.next(); 184 ConnectionSource source = 185 (ConnectionSource) REFERENCE_TO_CONNECTION_SOURCE.get(ref); 186 if (source.connectionPool == connectionPool) { 187 referenceIter.remove(); 188 HttpConnection connection = (HttpConnection) ref.get(); 189 if (connection != null) { 190 connectionsToClose.add(connection); 191 } 192 } 193 } 194 } 195 196 for (Iterator i = connectionsToClose.iterator(); i.hasNext();) { 199 HttpConnection connection = (HttpConnection) i.next(); 200 connection.close(); 201 connection.setHttpConnectionManager(null); 204 connection.releaseConnection(); 205 } 206 } 207 208 216 private static void removeReferenceToConnection(HttpConnectionWithReference connection) { 217 218 synchronized (REFERENCE_TO_CONNECTION_SOURCE) { 219 REFERENCE_TO_CONNECTION_SOURCE.remove(connection.reference); 220 } 221 } 222 223 225 private int maxHostConnections = DEFAULT_MAX_HOST_CONNECTIONS; 226 227 228 private int maxTotalConnections = DEFAULT_MAX_TOTAL_CONNECTIONS; 229 230 231 private boolean connectionStaleCheckingEnabled = true; 232 233 private boolean shutdown = false; 234 235 236 private ConnectionPool connectionPool; 237 238 241 public MultiThreadedHttpConnectionManager() { 242 this.connectionPool = new ConnectionPool(); 243 synchronized(ALL_CONNECTION_MANAGERS) { 244 ALL_CONNECTION_MANAGERS.put(this, null); 245 } 246 } 247 248 256 public synchronized void shutdown() { 257 synchronized (connectionPool) { 258 if (!shutdown) { 259 shutdown = true; 260 connectionPool.shutdown(); 261 } 262 } 263 } 264 265 272 public boolean isConnectionStaleCheckingEnabled() { 273 return connectionStaleCheckingEnabled; 274 } 275 276 284 public void setConnectionStaleCheckingEnabled(boolean connectionStaleCheckingEnabled) { 285 this.connectionStaleCheckingEnabled = connectionStaleCheckingEnabled; 286 } 287 288 295 public void setMaxConnectionsPerHost(int maxHostConnections) { 296 this.maxHostConnections = maxHostConnections; 297 } 298 299 306 public int getMaxConnectionsPerHost() { 307 return maxHostConnections; 308 } 309 310 315 public void setMaxTotalConnections(int maxTotalConnections) { 316 this.maxTotalConnections = maxTotalConnections; 317 } 318 319 324 public int getMaxTotalConnections() { 325 return maxTotalConnections; 326 } 327 328 331 public HttpConnection getConnection(HostConfiguration hostConfiguration) { 332 333 while (true) { 334 try { 335 return getConnection(hostConfiguration, 0); 336 } catch (HttpException e) { 337 LOG.debug( 341 "Unexpected exception while waiting for connection", 342 e 343 ); 344 }; 345 } 346 } 347 348 351 public HttpConnection getConnection(HostConfiguration hostConfiguration, 352 long timeout) throws HttpException { 353 354 LOG.trace("enter HttpConnectionManager.getConnection(HostConfiguration, long)"); 355 356 if (hostConfiguration == null) { 357 throw new IllegalArgumentException ("hostConfiguration is null"); 358 } 359 360 if (LOG.isDebugEnabled()) { 361 LOG.debug("HttpConnectionManager.getConnection: config = " 362 + hostConfiguration + ", timeout = " + timeout); 363 } 364 365 final HttpConnection conn = doGetConnection(hostConfiguration, timeout); 366 367 return new HttpConnectionAdapter(conn); 370 } 371 372 387 private HttpConnection doGetConnection(HostConfiguration hostConfiguration, 388 long timeout) throws HttpException { 389 390 HttpConnection connection = null; 391 392 synchronized (connectionPool) { 393 394 hostConfiguration = new HostConfiguration(hostConfiguration); 397 HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration); 398 WaitingThread waitingThread = null; 399 400 boolean useTimeout = (timeout > 0); 401 long timeToWait = timeout; 402 long startWait = 0; 403 long endWait = 0; 404 405 while (connection == null) { 406 407 if (shutdown) { 408 throw new IllegalStateException ("Connection factory has been shutdown."); 409 } 410 411 if (hostPool.freeConnections.size() > 0) { 414 connection = connectionPool.getFreeConnection(hostConfiguration); 415 416 } else if ((hostPool.numConnections < maxHostConnections) 419 && (connectionPool.numConnections < maxTotalConnections)) { 420 421 connection = connectionPool.createConnection(hostConfiguration); 422 423 } else if ((hostPool.numConnections < maxHostConnections) 427 && (connectionPool.freeConnections.size() > 0)) { 428 429 connectionPool.deleteLeastUsedConnection(); 430 connection = connectionPool.createConnection(hostConfiguration); 431 432 } else { 436 439 try { 440 441 if (useTimeout && timeToWait <= 0) { 442 throw new HttpException("Timeout waiting for connection"); 443 } 444 445 if (LOG.isDebugEnabled()) { 446 LOG.debug("Unable to get a connection, waiting..., hostConfig=" + hostConfiguration); 447 } 448 449 if (waitingThread == null) { 450 waitingThread = new WaitingThread(); 451 waitingThread.hostConnectionPool = hostPool; 452 waitingThread.thread = Thread.currentThread(); 453 } 454 455 if (useTimeout) { 456 startWait = System.currentTimeMillis(); 457 } 458 459 hostPool.waitingThreads.addLast(waitingThread); 460 connectionPool.waitingThreads.addLast(waitingThread); 461 connectionPool.wait(timeToWait); 462 463 hostPool.waitingThreads.remove(waitingThread); 466 connectionPool.waitingThreads.remove(waitingThread); 467 } catch (InterruptedException e) { 468 } finally { 470 if (useTimeout) { 471 endWait = System.currentTimeMillis(); 472 timeToWait -= (endWait - startWait); 473 } 474 } 475 } 476 } 477 } 478 return connection; 479 } 480 481 487 public int getConnectionsInUse(HostConfiguration hostConfiguration) { 488 synchronized (connectionPool) { 489 HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration); 490 return hostPool.numConnections; 491 } 492 } 493 494 499 public int getConnectionsInUse() { 500 synchronized (connectionPool) { 501 return connectionPool.numConnections; 502 } 503 } 504 505 512 public void releaseConnection(HttpConnection conn) { 513 LOG.trace("enter HttpConnectionManager.releaseConnection(HttpConnection)"); 514 515 if (conn instanceof HttpConnectionAdapter) { 516 conn = ((HttpConnectionAdapter) conn).getWrappedConnection(); 518 } else { 519 } 522 523 SimpleHttpConnectionManager.finishLastResponse(conn); 525 526 connectionPool.freeConnection(conn); 527 } 528 529 534 private HostConfiguration configurationForConnection(HttpConnection conn) { 535 536 HostConfiguration connectionConfiguration = new HostConfiguration(); 537 538 connectionConfiguration.setHost( 539 conn.getHost(), 540 conn.getVirtualHost(), 541 conn.getPort(), 542 conn.getProtocol() 543 ); 544 if (conn.getLocalAddress() != null) { 545 connectionConfiguration.setLocalAddress(conn.getLocalAddress()); 546 } 547 if (conn.getProxyHost() != null) { 548 connectionConfiguration.setProxy(conn.getProxyHost(), conn.getProxyPort()); 549 } 550 551 return connectionConfiguration; 552 } 553 554 555 558 private class ConnectionPool { 559 560 561 private LinkedList freeConnections = new LinkedList (); 562 563 564 private LinkedList waitingThreads = new LinkedList (); 565 566 570 private final Map mapHosts = new HashMap (); 571 572 573 private int numConnections = 0; 574 575 578 public synchronized void shutdown() { 579 580 Iterator iter = freeConnections.iterator(); 582 while (iter.hasNext()) { 583 HttpConnection conn = (HttpConnection) iter.next(); 584 iter.remove(); 585 conn.close(); 586 } 587 588 shutdownCheckedOutConnections(this); 590 591 iter = waitingThreads.iterator(); 593 while (iter.hasNext()) { 594 WaitingThread waiter = (WaitingThread) iter.next(); 595 iter.remove(); 596 waiter.thread.interrupt(); 597 } 598 599 mapHosts.clear(); 601 } 602 603 609 public synchronized HttpConnection createConnection(HostConfiguration hostConfiguration) { 610 611 HttpConnectionWithReference connection = null; 612 613 HostConnectionPool hostPool = getHostPool(hostConfiguration); 614 615 if ((hostPool.numConnections < getMaxConnectionsPerHost()) 616 && (numConnections < getMaxTotalConnections())) { 617 618 if (LOG.isDebugEnabled()) { 619 LOG.debug("Allocating new connection, hostConfig=" + hostConfiguration); 620 } 621 connection = new HttpConnectionWithReference(hostConfiguration); 622 connection.setStaleCheckingEnabled(connectionStaleCheckingEnabled); 623 connection.setHttpConnectionManager(MultiThreadedHttpConnectionManager.this); 624 numConnections++; 625 hostPool.numConnections++; 626 627 storeReferenceToConnection(connection, hostConfiguration, this); 630 631 } else if (LOG.isDebugEnabled()) { 632 if (hostPool.numConnections >= getMaxConnectionsPerHost()) { 633 LOG.debug("No connection allocated, host pool has already reached " 634 + "maxConnectionsPerHost, hostConfig=" + hostConfiguration 635 + ", maxConnectionsPerhost=" + getMaxConnectionsPerHost()); 636 } else { 637 LOG.debug("No connection allocated, maxTotalConnections reached, " 638 + "maxTotalConnections=" + getMaxTotalConnections()); 639 } 640 } 641 642 return connection; 643 } 644 645 651 public synchronized void handleLostConnection(HostConfiguration config) { 652 HostConnectionPool hostPool = getHostPool(config); 653 hostPool.numConnections--; 654 655 numConnections--; 656 notifyWaitingThread(config); 657 } 658 659 665 public synchronized HostConnectionPool getHostPool(HostConfiguration hostConfiguration) { 666 LOG.trace("enter HttpConnectionManager.ConnectionPool.getHostPool(HostConfiguration)"); 667 668 HostConnectionPool listConnections = (HostConnectionPool) 670 mapHosts.get(hostConfiguration); 671 if (listConnections == null) { 672 listConnections = new HostConnectionPool(); 674 listConnections.hostConfiguration = hostConfiguration; 675 mapHosts.put(hostConfiguration, listConnections); 676 } 677 678 return listConnections; 679 } 680 681 687 public synchronized HttpConnection getFreeConnection(HostConfiguration hostConfiguration) { 688 689 HttpConnectionWithReference connection = null; 690 691 HostConnectionPool hostPool = getHostPool(hostConfiguration); 692 693 if (hostPool.freeConnections.size() > 0) { 694 connection = (HttpConnectionWithReference) hostPool.freeConnections.removeFirst(); 695 freeConnections.remove(connection); 696 storeReferenceToConnection(connection, hostConfiguration, this); 699 if (LOG.isDebugEnabled()) { 700 LOG.debug("Getting free connection, hostConfig=" + hostConfiguration); 701 } 702 } else if (LOG.isDebugEnabled()) { 703 LOG.debug("There were no free connections to get, hostConfig=" 704 + hostConfiguration); 705 } 706 return connection; 707 } 708 709 712 public synchronized void deleteLeastUsedConnection() { 713 714 HttpConnection connection = (HttpConnection) freeConnections.removeFirst(); 715 716 if (connection != null) { 717 HostConfiguration connectionConfiguration = configurationForConnection(connection); 718 719 if (LOG.isDebugEnabled()) { 720 LOG.debug("Reclaiming unused connection, hostConfig=" 721 + connectionConfiguration); 722 } 723 724 connection.close(); 725 726 HostConnectionPool hostPool = getHostPool(connectionConfiguration); 727 728 hostPool.freeConnections.remove(connection); 729 hostPool.numConnections--; 730 numConnections--; 731 } else if (LOG.isDebugEnabled()) { 732 LOG.debug("Attempted to reclaim an unused connection but there were none."); 733 } 734 } 735 736 742 public synchronized void notifyWaitingThread(HostConfiguration configuration) { 743 notifyWaitingThread(getHostPool(configuration)); 744 } 745 746 753 public synchronized void notifyWaitingThread(HostConnectionPool hostPool) { 754 755 WaitingThread waitingThread = null; 759 760 if (hostPool.waitingThreads.size() > 0) { 761 if (LOG.isDebugEnabled()) { 762 LOG.debug("Notifying thread waiting on host pool, hostConfig=" 763 + hostPool.hostConfiguration); 764 } 765 waitingThread = (WaitingThread) hostPool.waitingThreads.removeFirst(); 766 waitingThreads.remove(waitingThread); 767 } else if (waitingThreads.size() > 0) { 768 if (LOG.isDebugEnabled()) { 769 LOG.debug("No-one waiting on host pool, notifying next waiting thread."); 770 } 771 waitingThread = (WaitingThread) waitingThreads.removeFirst(); 772 waitingThread.hostConnectionPool.waitingThreads.remove(waitingThread); 773 } else if (LOG.isDebugEnabled()) { 774 LOG.debug("Notifying no-one, there are no waiting threads"); 775 } 776 777 if (waitingThread != null) { 778 waitingThread.thread.interrupt(); 779 } 780 } 781 782 786 public void freeConnection(HttpConnection conn) { 787 788 HostConfiguration connectionConfiguration = configurationForConnection(conn); 789 790 if (LOG.isDebugEnabled()) { 791 LOG.debug("Freeing connection, hostConfig=" + connectionConfiguration); 792 } 793 794 synchronized (this) { 795 796 if (shutdown) { 797 conn.close(); 800 return; 801 } 802 803 HostConnectionPool hostPool = getHostPool(connectionConfiguration); 804 805 hostPool.freeConnections.add(conn); 807 if (hostPool.numConnections == 0) { 808 LOG.error("Host connection pool not found, hostConfig=" 810 + connectionConfiguration); 811 hostPool.numConnections = 1; 812 } 813 814 freeConnections.add(conn); 815 removeReferenceToConnection((HttpConnectionWithReference) conn); 818 if (numConnections == 0) { 819 LOG.error("Host connection pool not found, hostConfig=" 821 + connectionConfiguration); 822 numConnections = 1; 823 } 824 825 notifyWaitingThread(hostPool); 826 } 827 } 828 } 829 830 834 private static class ConnectionSource { 835 836 837 &nbs
|