1 22 package org.jboss.resource.connectionmanager; 23 24 import java.util.ArrayList ; 25 import java.util.Collections ; 26 import java.util.HashSet ; 27 import java.util.Iterator ; 28 import java.util.Set ; 29 30 import javax.resource.ResourceException ; 31 import javax.resource.spi.ConnectionRequestInfo ; 32 import javax.resource.spi.ManagedConnection ; 33 import javax.resource.spi.ManagedConnectionFactory ; 34 import javax.resource.spi.ValidatingManagedConnectionFactory ; 35 import javax.security.auth.Subject ; 36 37 import org.jboss.logging.Logger; 38 import org.jboss.resource.JBossResourceException; 39 import org.jboss.util.UnreachableStatementException; 40 41 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore; 42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 43 44 52 public class InternalManagedConnectionPool implements IdleConnectionRemovalSupport 53 { 54 55 private final ManagedConnectionFactory mcf; 56 57 58 private final ConnectionListenerFactory clf; 59 60 61 private final Subject defaultSubject; 62 63 64 private final ConnectionRequestInfo defaultCri; 65 66 67 private final PoolParams poolParams; 68 69 73 private int maxSize; 74 75 76 private ArrayList cls; 77 78 79 private final FIFOSemaphore permits; 80 81 82 private final Logger log; 83 84 85 private final boolean trace; 86 87 88 private final Counter connectionCounter = new Counter(); 89 90 91 private final HashSet checkedOut = new HashSet (); 92 93 94 private boolean started = false; 95 96 97 private SynchronizedBoolean shutdown = new SynchronizedBoolean(false); 98 99 100 private volatile int maxUsedConnections = 0; 101 102 111 protected InternalManagedConnectionPool(ManagedConnectionFactory mcf, ConnectionListenerFactory clf, Subject subject, 112 ConnectionRequestInfo cri, PoolParams poolParams, Logger log) 113 { 114 this.mcf = mcf; 115 this.clf = clf; 116 defaultSubject = subject; 117 defaultCri = cri; 118 this.poolParams = poolParams; 119 this.maxSize = poolParams.maxSize; 120 this.log = log; 121 this.trace = log.isTraceEnabled(); 122 cls = new ArrayList (this.maxSize); 123 permits = new FIFOSemaphore(this.maxSize); 124 125 if(poolParams.prefill){ 126 127 PoolFiller.fillPool(this); 128 129 } 130 } 131 132 135 protected void initialize() 136 { 137 if (poolParams.idleTimeout != 0) 138 IdleRemover.registerPool(this, poolParams.idleTimeout); 139 140 if (poolParams.backgroundValidation) 141 { 142 143 log.debug("Registering for background validation at interval " + poolParams.backgroundInterval); 144 ConnectionValidator.registerPool(this, poolParams.backgroundInterval); 145 146 } 147 } 148 149 public long getAvailableConnections() 150 { 151 return permits.permits(); 152 } 153 154 public int getMaxConnectionsInUseCount() 155 { 156 return maxUsedConnections; 157 } 158 159 public int getConnectionInUseCount() 160 { 161 return checkedOut.size(); 162 } 163 164 169 public ConnectionListener getConnection(Subject subject, ConnectionRequestInfo cri) throws ResourceException 170 { 171 subject = (subject == null) ? defaultSubject : subject; 172 cri = (cri == null) ? defaultCri : cri; 173 long startWait = System.currentTimeMillis(); 174 try 175 { 176 connectionCounter.updateBlockTime(System.currentTimeMillis() - startWait); 177 178 if (permits.attempt(poolParams.blockingTimeout)) 179 { 180 ConnectionListener cl = null; 182 do 183 { 184 synchronized (cls) 185 { 186 if (shutdown.get()) 187 { 188 permits.release(); 189 throw new ResourceException ("The pool has been shutdown"); 190 } 191 192 if (cls.size() > 0) 193 { 194 cl = (ConnectionListener) cls.remove(cls.size() - 1); 195 checkedOut.add(cl); 196 int size = (int) (maxSize - permits.permits()); 197 if (size > maxUsedConnections) 198 maxUsedConnections = size; 199 } 200 } 201 if (cl != null) 202 { 203 try 205 { 206 Object matchedMC = mcf.matchManagedConnections(Collections.singleton(cl.getManagedConnection()), 207 subject, cri); 208 if (matchedMC != null) 209 { 210 if (trace) 211 log.trace("supplying ManagedConnection from pool: " + cl); 212 cl.grantPermit(true); 213 return cl; 214 } 215 216 log.warn("Destroying connection that could not be successfully matched: " + cl); 222 synchronized (cls) 223 { 224 checkedOut.remove(cl); 225 } 226 doDestroy(cl); 227 cl = null; 228 } 229 catch (Throwable t) 230 { 231 log.warn("Throwable while trying to match ManagedConnection, destroying connection: " + cl, t); 232 synchronized (cls) 233 { 234 checkedOut.remove(cl); 235 } 236 doDestroy(cl); 237 cl = null; 238 } 239 } 240 } 241 while (cls.size() > 0); 243 try 245 { 246 cl = createConnectionEventListener(subject, cri); 248 synchronized (cls) 249 { 250 checkedOut.add(cl); 251 int size = (int) (maxSize - permits.permits()); 252 if (size > maxUsedConnections) 253 maxUsedConnections = size; 254 } 255 256 if (started == false) 259 { 260 started = true; 261 if (poolParams.minSize > 0) 262 PoolFiller.fillPool(this); 263 } 264 if (trace) 265 log.trace("supplying new ManagedConnection: " + cl); 266 cl.grantPermit(true); 267 return cl; 268 } 269 catch (Throwable t) 270 { 271 log.warn("Throwable while attempting to get a new connection: " + cl, t); 272 synchronized (cls) 274 { 275 checkedOut.remove(cl); 276 } 277 permits.release(); 278 JBossResourceException.rethrowAsResourceException("Unexpected throwable while trying to create a connection: " + cl, t); 279 throw new UnreachableStatementException(); 280 } 281 } 282 else 283 { 284 throw new ResourceException ("No ManagedConnections available within configured blocking timeout ( " 286 + poolParams.blockingTimeout + " [ms] )"); 287 } 288 289 } 290 catch (InterruptedException ie) 291 { 292 long end = System.currentTimeMillis() - startWait; 293 throw new ResourceException ("Interrupted while requesting permit! Waited " + end + " ms"); 294 } 295 } 296 297 public void returnConnection(ConnectionListener cl, boolean kill) 298 { 299 if (cl.getState() == ConnectionListener.DESTROYED) 300 { 301 log.trace("ManagedConnection is being returned after it was destroyed" + cl); 302 if (cl.hasPermit()) 303 { 304 cl.grantPermit(false); 306 permits.release(); 307 } 308 309 return; 310 } 311 312 if (trace) 313 log.trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl); 314 try 315 { 316 cl.getManagedConnection().cleanup(); 317 } 318 catch (ResourceException re) 319 { 320 log.warn("ResourceException cleaning up ManagedConnection: " + cl, re); 321 kill = true; 322 } 323 324 if (cl.getState() == ConnectionListener.DESTROY) 326 kill = true; 327 328 synchronized (cls) 329 { 330 checkedOut.remove(cl); 331 332 if (kill == false && cls.size() >= poolParams.maxSize) 334 { 335 log.warn("Destroying returned connection, maximum pool size exceeded " + cl); 336 kill = true; 337 } 338 339 if (kill) 341 { 342 cls.remove(cl); 348 } 349 else 351 { 352 cl.used(); 353 cls.add(cl); 354 } 355 356 if (cl.hasPermit()) 357 { 358 cl.grantPermit(false); 360 permits.release(); 361 } 362 } 363 364 if (kill) 365 { 366 if (trace) 367 log.trace("Destroying returned connection " + cl); 368 doDestroy(cl); 369 } 370 371 } 372 373 public void flush() 374 { 375 ArrayList destroy = null; 376 synchronized (cls) 377 { 378 if (trace) 379 log.trace("Flushing pool checkedOut=" + checkedOut + " inPool=" + cls); 380 381 for (Iterator i = checkedOut.iterator(); i.hasNext();) 383 { 384 ConnectionListener cl = (ConnectionListener) i.next(); 385 if (trace) 386 log.trace("Flush marking checked out connection for destruction " + cl); 387 cl.setState(ConnectionListener.DESTROY); 388 } 389 while (cls.size() > 0) 391 { 392 ConnectionListener cl = (ConnectionListener) cls.remove(0); 393 if (destroy == null) 394 destroy = new ArrayList (); 395 destroy.add(cl); 396 } 397 } 398 399 if (destroy != null) 401 { 402 for (int i = 0; i < destroy.size(); ++i) 403 { 404 ConnectionListener cl = (ConnectionListener) destroy.get(i); 405 if (trace) 406 log.trace("Destroying flushed connection " + cl); 407 doDestroy(cl); 408 } 409 410 if (shutdown.get() == false && poolParams.minSize > 0) 412 PoolFiller.fillPool(this); 413 } 414 } 415 416 public void removeIdleConnections() 417 { 418 ArrayList destroy = null; 419 long timeout = System.currentTimeMillis() - poolParams.idleTimeout; 420 while (true) 421 { 422 synchronized (cls) 423 { 424 425 if (cls.size() == 0) 427 break; 428 429 ConnectionListener cl = (ConnectionListener) cls.get(0); 431 if (cl.isTimedOut(timeout) && shouldRemove()) 432 { 433 connectionCounter.incTimedOut(); 434 cls.remove(0); 436 if (destroy == null) 437 destroy = new ArrayList (); 438 destroy.add(cl); 439 } 440 else 441 { 442 break; 444 } 445 } 446 } 447 448 if (destroy != null) 450 { 451 for (int i = 0; i < destroy.size(); ++i) 452 { 453 ConnectionListener cl = (ConnectionListener) destroy.get(i); 454 if (trace) 455 log.trace("Destroying timedout connection " + cl); 456 doDestroy(cl); 457 } 458 459 if (shutdown.get() == false && poolParams.minSize > 0) 461 PoolFiller.fillPool(this); 462 } 463 } 464 465 466 469 public void shutdownWithoutClear() 470 { 471 IdleRemover.unregisterPool(this); 472 IdleRemover.waitForBackgroundThread(); 473 ConnectionValidator.unRegisterPool(this); 474 ConnectionValidator.waitForBackgroundThread(); 475 476 fillToMin(); 477 shutdown.set(true); 478 } 479 480 public void shutdown() 481 { 482 shutdown.set(true); 483 IdleRemover.unregisterPool(this); 484 ConnectionValidator.unRegisterPool(this); 485 flush(); 486 } 487 488 public void fillToMin() 489 { 490 while (true) 491 { 492 try 495 { 496 if (permits.attempt(poolParams.blockingTimeout)) 497 { 498 try 499 { 500 if (shutdown.get()) 501 return; 502 503 if (getMinSize() - connectionCounter.getGuaranteedCount() <= 0) 505 return; 506 507 try 509 { 510 ConnectionListener cl = createConnectionEventListener(defaultSubject, defaultCri); 511 synchronized (cls) 512 { 513 if (trace) 514 log.trace("Filling pool cl=" + cl); 515 cls.add(cl); 516 } 517 } 518 catch (ResourceException re) 519 { 520 log.warn("Unable to fill pool ", re); 521 return; 522 } 523 } 524 finally 525 { 526 permits.release(); 527 } 528 } 529 } 530 catch (InterruptedException ignored) 531 { 532 log.trace("Interrupted while requesting permit in fillToMin"); 533 } 534 } 535 } 536 537 public int getConnectionCount() 538 { 539 return connectionCounter.getCount(); 540 } 541 542 public long getTotalBlockTime(){ 543 544 return connectionCounter.getTotalBlockTime(); 545 546 } 547 548 public int getTimedOut(){ 549 550 return connectionCounter.getTimedOut(); 551 } 552 553 public long getAverageBlockTime(){ 554 555 return connectionCounter.getTotalBlockTime() / getConnectionCreatedCount(); 556 557 } 558 public int getConnectionCreatedCount() 559 { 560 return connectionCounter.getCreatedCount(); 561 } 562 563 public int getConnectionDestroyedCount() 564 { 565 return connectionCounter.getDestroyedCount(); 566 } 567 568 576 private ConnectionListener createConnectionEventListener(Subject subject, ConnectionRequestInfo cri) 577 throws ResourceException 578 { 579 ManagedConnection mc = mcf.createManagedConnection(subject, cri); 580 connectionCounter.inc(); 581 try 582 { 583 return clf.createConnectionListener(mc, this); 584 } 585 catch (ResourceException re) 586 { 587 connectionCounter.dec(); 588 mc.destroy(); 589 throw re; 590 } 591 } 592 593 598 private void doDestroy(ConnectionListener cl) 599 { 600 if (cl.getState() == ConnectionListener.DESTROYED) 601 { 602 log.trace("ManagedConnection is already destroyed " + cl); 603 return; 604 } 605 606 connectionCounter.dec(); 607 cl.setState(ConnectionListener.DESTROYED); 608 try 609 { 610 cl.getManagedConnection().destroy(); 611 } 612 catch (Throwable t) 613 { 614 log.warn("Exception destroying ManagedConnection " + cl, t); 615 } 616 617 } 618 619 private boolean shouldRemove() 620 { 621 boolean remove = true; 622 623 if(poolParams.stictMin) 624 { 625 remove = cls.size() > poolParams.minSize; 626 627 log.trace("StrictMin is active. Current connection will be removed is " + remove); 628 629 } 630 631 return remove; 632 633 } 634 635 public void validateConnections() throws Exception 636 { 637 638 if (trace) 639 log.trace("Attempting to validate connections for pool " + this); 640 641 if (permits.attempt(poolParams.blockingTimeout)) 642 { 643 644 boolean destroyed = false; 645 646 try 647 { 648 649 while (true) 650 { 651 652 ConnectionListener cl = null; 653 654 synchronized (cls) 655 { 656 if (cls.size() == 0) 657 { 658 659 break; 660 661 } 662 663 cl = removeForFrequencyCheck(); 664 665 } 666 667 if (cl == null) 668 { 669 670 break; 671 } 672 673 try 674 { 675 676 Set candidateSet = Collections.singleton(cl.getManagedConnection()); 677 678 if (mcf instanceof ValidatingManagedConnectionFactory ) 679 { 680 ValidatingManagedConnectionFactory vcf = (ValidatingManagedConnectionFactory ) mcf; 681 candidateSet = vcf.getInvalidConnections(candidateSet); 682 683 if (candidateSet != null && candidateSet.size() > 0) 684 { 685 686 if (cl.getState() != ConnectionListener.DESTROY) 687 { 688 log.warn(""); 689 doDestroy(cl); 690 destroyed = true; 691 692 } 693 } 694 695 } 696 else 697 { 698 log.warn("warning: background validation was specified with a non compliant ManagedConnectionFactory interface."); 699 } 700 701 } 702 finally 703 { 704 synchronized (cls) 705 { 706 707 returnForFrequencyCheck(cl); 708 709 } 710 711 } 712 713 } 714 715 } 716 finally 717 { 718 permits.release(); 719 720 if (destroyed && shutdown.get() == false && poolParams.minSize > 0) 721 { 722 PoolFiller.fillPool(this); 723 } 724 725 } 726 727 } 728 729 } 730 private ConnectionListener removeForFrequencyCheck() 731 { 732 733 log.debug("Checking for connection within frequency"); 734 735 ConnectionListener cl = null; 736 737 for (Iterator iter = cls.iterator(); iter.hasNext();) 738 { 739 740 cl = (ConnectionListener) iter.next(); 741 long lastCheck = cl.getLastValidatedTime(); 742 743 if ((System.currentTimeMillis() - lastCheck) >= poolParams.backgroundInterval) 744 { 745 cls.remove(cl); 746 break; 747 748 } 749 else 750 { 751 cl = null; 752 } 753 754 } 755 756 return cl; 757 } 758 759 private void returnForFrequencyCheck(ConnectionListener cl) 760 { 761 762 log.debug("Returning for connection within frequency"); 763 764 cl.setLastValidatedTime(System.currentTimeMillis()); 765 cls.add(cl); 766 767 } 768 773 private int getMinSize() 774 { 775 if (poolParams.minSize > maxSize) 776 return maxSize; 777 778 return poolParams.minSize; 779 } 780 781 public static class PoolParams 782 { 783 public int minSize = 0; 784 785 public int maxSize = 10; 786 787 public int blockingTimeout = 30000; 789 public long idleTimeout = 1000 * 60 * 30; 791 public boolean backgroundValidation; 793 public long backgroundInterval = 1000 * 60 * 10; public boolean prefill; 796 797 public boolean stictMin; 798 } 799 800 803 private static class Counter 804 { 805 private int created = 0; 806 807 private int destroyed = 0; 808 809 private long totalBlockTime; 810 811 private int timedOut; 812 813 synchronized int getGuaranteedCount() 814 { 815 return created - destroyed; 816 } 817 818 int getCount() 819 { 820 return created - destroyed; 821 } 822 823 int getCreatedCount() 824 { 825 return created; 826 } 827 828 int getDestroyedCount() 829 { 830 return destroyed; 831 } 832 833 synchronized void inc() 834 { 835 ++created; 836 } 837 838 synchronized void dec() 839 { 840 ++destroyed; 841 } 842 843 synchronized void updateBlockTime(long latest){ 844 845 totalBlockTime += latest; 846 847 } 848 849 long getTotalBlockTime(){ 850 851 return totalBlockTime; 852 853 } 854 855 int getTimedOut(){ 856 857 return timedOut; 858 859 } 860 861 synchronized void incTimedOut(){ 862 863 ++timedOut; 864 865 } 866 } 867 } | Popular Tags |