1 25 26 package org.objectweb.jonas.dbm; 27 28 import java.sql.SQLException ; 29 import java.util.HashMap ; 30 import java.util.Iterator ; 31 import java.util.LinkedList ; 32 import java.util.Map ; 33 34 import javax.sql.XAConnection ; 35 import javax.transaction.SystemException ; 36 import javax.transaction.Transaction ; 37 38 import org.objectweb.jonas.common.Log; 39 import org.objectweb.jonas.jdbc_xa.ConnectionImpl; 40 import org.objectweb.jonas.jdbc_xa.XADataSourceImpl; 41 import org.objectweb.jonas.jtm.TransactionService; 42 import org.objectweb.jonas.service.ServiceException; 43 import org.objectweb.jonas.service.ServiceManager; 44 import org.objectweb.transaction.jta.TransactionManager; 45 import org.objectweb.util.monolog.api.BasicLevel; 46 import org.objectweb.util.monolog.api.Logger; 47 48 64 public class Pool { 65 66 70 73 private Logger logger = null; 74 75 78 private TransactionManager tm = null; 79 80 84 private PoolKeeper poolKeeper; 85 86 90 private Map xac2item = new HashMap (); 91 92 97 private LinkedList freeList = new LinkedList (); 98 99 103 private Map tx2item = new HashMap (); 104 105 108 private ConnectionManager cmgr; 109 110 113 private XADataSourceImpl xads; 114 115 118 private int waiterCount = 0; 119 120 123 private long waitingTime = 0; 124 125 128 private int busyMax = 0; 129 130 133 private int busyMin = 0; 134 135 140 143 private static final int NO_LIMIT = 99999; 144 145 148 private static final long ONE_DAY = 1440L * 60L * 1000L; 149 150 154 private static final int MAX_REMOVE_FREELIST = 10; 155 156 159 private int poolMin = 0; 160 161 164 public int getPoolMin() { 165 return poolMin; 166 } 167 168 171 public synchronized void setPoolMin(int min) { 172 if (poolMin != min) { 173 poolMin = min; 174 adjust(); 175 } 176 } 177 178 182 private int poolMax = NO_LIMIT; 183 184 187 public int getPoolMax() { 188 return poolMax; 189 } 190 191 194 public synchronized void setPoolMax(int max) { 195 if (poolMax != max) { 196 if (max < 0 || max > NO_LIMIT) { 197 if (currentWaiters > 0) { 198 notify(); 199 } 200 poolMax = NO_LIMIT; 201 } else { 202 if (currentWaiters > 0 && poolMax < max) { 203 notify(); 204 } 205 poolMax = max; 206 adjust(); 207 } 208 } 209 } 210 211 216 private long maxAge = ONE_DAY; 217 218 221 private int maxAgeMn; 222 223 226 public int getMaxAge() { 227 return maxAgeMn; 228 } 229 230 233 public long getMaxAgeMilli() { 234 return maxAge; 235 } 236 237 240 public void setMaxAge(int mn) { 241 maxAgeMn = mn; 242 maxAge = mn * 60L * 1000L; 244 } 245 246 249 private long maxOpenTime = ONE_DAY; 250 251 254 private int maxOpenTimeMn; 255 256 259 public int getMaxOpenTime() { 260 return maxOpenTimeMn; 261 } 262 263 266 public long getMaxOpenTimeMilli() { 267 return maxOpenTime; 268 } 269 270 273 public void setMaxOpenTime(int mn) { 274 maxOpenTimeMn = mn; 275 maxOpenTime = mn * 60L * 1000L; 277 } 278 279 282 private long waiterTimeout = 10000; 283 284 287 public int getMaxWaitTime() { 288 return (int) (waiterTimeout / 1000L); 289 } 290 291 294 public void setMaxWaitTime(int sec) { 295 waiterTimeout = sec * 1000L; 296 } 297 298 301 private int maxWaiters = 1000; 302 303 306 public int getMaxWaiters() { 307 return maxWaiters; 308 } 309 310 313 public void setMaxWaiters(int nb) { 314 maxWaiters = nb; 315 } 316 317 320 private int samplingPeriod = 60; 322 325 public int getSamplingPeriod() { 326 return samplingPeriod; 327 } 328 329 332 public void setSamplingPeriod(int sec) { 333 if (sec > 0) { 334 samplingPeriod = sec; 335 poolKeeper.setSamplingPeriod(sec); 336 } 337 } 338 339 347 private int checkLevel = 1; 349 352 public int getCheckLevel() { 353 return checkLevel; 354 } 355 356 359 public void setCheckLevel(int level) { 360 checkLevel = level; 361 } 362 363 366 private String testStatement; 367 368 371 public String getTestStatement() { 372 return testStatement; 373 } 374 375 378 public void setTestStatement(String s) { 379 testStatement = s; 380 } 381 382 387 390 private int busyMaxRecent = 0; 391 392 395 public int getBusyMaxRecent() { 396 return busyMaxRecent; 397 } 398 399 402 private int busyMinRecent = 0; 403 404 407 public int getBusyMinRecent() { 408 return busyMinRecent; 409 } 410 411 414 private int currentWaiters = 0; 415 416 419 public int getCurrentWaiters() { 420 return currentWaiters; 421 } 422 423 426 private int openedCount = 0; 427 428 431 public int getOpenedCount() { 432 return openedCount; 433 } 434 435 438 private int connectionFailures = 0; 439 440 443 public int getConnectionFailures() { 444 return connectionFailures; 445 } 446 447 452 private int connectionLeaks = 0; 453 454 457 public int getConnectionLeaks() { 458 return connectionLeaks; 459 } 460 461 464 private int servedOpen = 0; 465 466 469 public int getServedOpen() { 470 return servedOpen; 471 } 472 473 476 private int rejectedFull = 0; 477 478 481 public int getRejectedFull() { 482 return rejectedFull; 483 } 484 485 488 private int rejectedTimeout = 0; 489 490 493 public int getRejectedTimeout() { 494 return rejectedTimeout; 495 } 496 497 500 private int rejectedOther = 0; 501 502 505 public int getRejectedOther() { 506 return rejectedOther; 507 } 508 509 512 public int getRejectedOpen() { 513 return rejectedFull + rejectedTimeout + rejectedOther; 514 } 515 516 519 private int waitersHigh = 0; 520 521 524 public int getWaitersHigh() { 525 return waitersHigh; 526 } 527 528 531 private int waitersHighRecent = 0; 532 533 536 public int getWaitersHighRecent() { 537 return waitersHighRecent; 538 } 539 540 543 private int totalWaiterCount = 0; 544 545 548 public int getWaiterCount() { 549 return totalWaiterCount; 550 } 551 552 555 private long totalWaitingTime = 0; 556 557 560 public long getWaitingTime() { 561 return totalWaitingTime; 562 } 563 564 567 private long waitingHigh = 0; 568 569 572 public long getWaitingHigh() { 573 return waitingHigh; 574 } 575 576 579 private long waitingHighRecent = 0; 580 581 584 public long getWaitingHighRecent() { 585 return waitingHighRecent; 586 } 587 588 592 600 public Pool(ConnectionManager cmgr, XADataSourceImpl xads, Logger log) throws Exception { 601 this.cmgr = cmgr; 602 this.xads = xads; 603 this.logger = log; 604 ServiceManager sm = ServiceManager.getInstance(); 605 TransactionService ts = (TransactionService) sm.getTransactionService(); 606 tm = ts.getTransactionManager(); 607 poolKeeper = new PoolKeeper(this, logger); 608 poolKeeper.start(); 609 } 610 611 615 616 619 public int getCurrentOpened() { 620 return xac2item.size(); 621 } 622 623 626 public int getCurrentBusy() { 627 return xac2item.size() - freeList.size(); 628 } 629 630 633 public void recomputeBusy() { 634 int busy = getCurrentBusy(); 635 if (logger.isLoggable(BasicLevel.DEBUG)) { 636 logger.log(BasicLevel.DEBUG, "Busy Connections = " + busy); 637 } 638 if (busyMax < busy) { 639 busyMax = busy; 640 } 641 if (busyMin > busy) { 642 busyMin = busy; 643 } 644 } 645 646 649 public int getCurrentInTx() { 650 return tx2item.size(); 651 } 652 653 660 public synchronized PoolItem openConnection(String user, Transaction tx) throws SQLException { 661 PoolItem item = null; 662 663 if (tx != null) { 666 item = (PoolItem) tx2item.get(tx); 667 if (item != null) { 668 if (logger.isLoggable(BasicLevel.DEBUG)) { 669 logger.log(BasicLevel.DEBUG, "Reuse a Connection for same tx"); 670 } 671 item.open(); 672 servedOpen++; 673 return item; 674 } 675 } 676 677 long timetowait = waiterTimeout; 679 long starttime = 0; 680 while (item == null) { 681 if (freeList.isEmpty()) { 683 if (xac2item.size() >= poolMax) { 686 boolean stoplooping = true; 687 if (timetowait > 0) { 689 if (currentWaiters < maxWaiters) { 690 currentWaiters++; 691 if (waiterCount < currentWaiters) { 693 waiterCount = currentWaiters; 694 } 695 if (starttime == 0) { 696 starttime = System.currentTimeMillis(); 697 if (logger.isLoggable(BasicLevel.DEBUG)) { 698 logger.log(BasicLevel.DEBUG, "Wait for a free Connection" + xac2item.size()); 699 } 700 } 701 try { 702 wait(timetowait); 703 } catch (InterruptedException ign) { 704 logger.log(BasicLevel.WARN, "Interrupted"); 705 } finally { 706 currentWaiters--; 707 } 708 long stoptime = System.currentTimeMillis(); 709 long stillwaited = stoptime - starttime; 710 timetowait = waiterTimeout - stillwaited; 711 stoplooping = (timetowait <= 0); 712 if (stoplooping) { 713 totalWaiterCount++; 715 totalWaitingTime += stillwaited; 716 if (waitingTime < stillwaited) { 717 waitingTime = stillwaited; 718 } 719 } else { 720 if (!freeList.isEmpty() || xac2item.size() < poolMax) { 721 if (logger.isLoggable(BasicLevel.DEBUG)) { 723 logger.log(BasicLevel.DEBUG, "Notified after " + stillwaited); 724 } 725 totalWaiterCount++; 726 totalWaitingTime += stillwaited; 727 if (waitingTime < stillwaited) { 728 waitingTime = stillwaited; 729 } 730 } 731 continue; 732 } 733 } 734 } 735 if (stoplooping && freeList.isEmpty() && xac2item.size() >= poolMax) { 736 if (starttime > 0) { 737 rejectedTimeout++; 738 logger.log(BasicLevel.WARN, "Cannot create a Connection - timeout"); 739 } else { 740 rejectedFull++; 741 logger.log(BasicLevel.WARN, "Cannot create a Connection"); 742 } 743 throw new SQLException ("No more connections in " + cmgr.getDatasourceName()); 744 } 745 continue; 746 } 747 if (logger.isLoggable(BasicLevel.DEBUG)) { 748 logger.log(BasicLevel.DEBUG, "empty free list: Create a new Connection"); 749 } 750 XAConnection xac = null; 751 try { 752 xac = xads.getXAConnection(); 754 openedCount++; 755 } catch (SQLException e) { 756 connectionFailures++; 757 rejectedOther++; 758 logger.log(BasicLevel.WARN, "Cannot create new Connection for tx"); 759 throw e; 760 } 761 item = new PoolItem(this, xac, user, logger); 762 xac.addConnectionEventListener(cmgr); 764 xac2item.put(xac, item); 765 } else { 766 item = (PoolItem) freeList.removeFirst(); 767 768 if (checkLevel > 0) { 770 try { 771 ConnectionImpl conn = (ConnectionImpl) item.getXACon().getConnection(); 772 if (conn.isPhysicallyClosed()) { 773 logger.log(BasicLevel.WARN, "The JDBC connection has been closed!"); 774 destroyItem(item); 775 starttime = 0; 776 item = null; 777 continue; 778 } 779 if (checkLevel > 1) { 780 java.sql.Statement stmt = conn.createStatement(); 781 stmt.execute(testStatement); 782 stmt.close(); 783 } 784 } catch (Exception e) { 785 logger.log(BasicLevel.ERROR, "DataSource " + cmgr.getDatasourceName() 786 + " error: removing invalid item", e); 787 destroyItem(item); 788 starttime = 0; 789 item = null; 790 continue; 791 } 792 } 793 } 794 } 795 recomputeBusy(); 796 item.setTx(tx); 797 if (tx == null) { 798 if (logger.isLoggable(BasicLevel.DEBUG)) { 799 logger.log(BasicLevel.DEBUG, "Got a Connection - no TX: "); 800 } 801 } else { 802 if (logger.isLoggable(BasicLevel.DEBUG)) { 803 logger.log(BasicLevel.DEBUG, "Got a Connection for TX: "); 804 } 805 try { 807 tx.registerSynchronization(item); 808 tx2item.put(tx, item); } catch (javax.transaction.RollbackException e) { 810 if (logger.isLoggable(BasicLevel.WARN)) { 812 logger.log(BasicLevel.WARN, "DataSource " + cmgr.getDatasourceName() 813 + " error: Pool item registered, but tx is rollback only", e); 814 } 815 } catch (javax.transaction.SystemException e) { 816 if (logger.isLoggable(BasicLevel.ERROR)) { 817 logger.log(BasicLevel.ERROR, "DataSource " + cmgr.getDatasourceName() 818 + " error in pool: system exception from transaction manager ", e); 819 } 820 } catch (IllegalStateException e) { 821 if (logger.isLoggable(BasicLevel.WARN)) { 823 logger.log(BasicLevel.WARN, "Got a Connection - committed TX: ", e); 824 } 825 item.setTx(null); 826 } 827 } 828 item.open(); 829 servedOpen++; 830 return item; 831 } 832 833 838 public synchronized void freeConnections(Transaction tx) { 839 if (logger.isLoggable(BasicLevel.DEBUG)) { 840 logger.log(BasicLevel.DEBUG, "free connection for Tx = " + tx); 841 } 842 PoolItem item = (PoolItem) tx2item.remove(tx); 843 if (item == null) { 844 logger.log(BasicLevel.ERROR, "pool: no connection found to free for Tx = " + tx); 845 return; 846 } 847 item.setTx(null); 848 if (item.isOpen()) { 849 logger.log(BasicLevel.WARN, "WARNING: Connection not closed by caller"); 852 return; 853 } 854 freeItem(item); 855 } 856 857 865 public synchronized PoolItem closeConnection(XAConnection xac, int flag) { 866 PoolItem item = (PoolItem) xac2item.get(xac); 867 if (item == null) { 868 logger.log(BasicLevel.ERROR, "Pool: no item for this xac!"); 869 return null; 870 } 871 if (!item.close()) { 875 return null; 876 } 877 if (item.getTx() != null) { 878 if (logger.isLoggable(BasicLevel.DEBUG)) { 879 logger.log(BasicLevel.DEBUG, "keep the Connection for this tx"); 880 } 881 } else { 882 freeItem(item); 883 } 884 885 Transaction tx = null; 887 try { 888 tx = tm.getTransaction(); 889 } catch (NullPointerException n) { 890 logger.log(BasicLevel.ERROR, "Pool: should not be used outside a JOnAS Server", n); 892 } catch (SystemException e) { 893 logger.log(BasicLevel.ERROR, "Pool: getTransaction failed:", e); 894 } 895 if (tx != null && item.isClosed()) { 896 try { 897 tx.delistResource(xac.getXAResource(), flag); 898 } catch (Exception e) { 899 logger.log(BasicLevel.ERROR, "Pool: Exception while delisting resource:", e); 900 } 901 } 902 903 return item; 904 } 905 906 909 public synchronized void sampling() { 910 waitingHighRecent = waitingTime; 911 if (waitingHigh < waitingTime) { 912 waitingHigh = waitingTime; 913 } 914 waitingTime = 0; 915 916 waitersHighRecent = waiterCount; 917 if (waitersHigh < waiterCount) { 918 waitersHigh = waiterCount; 919 } 920 waiterCount = 0; 921 922 busyMaxRecent = busyMax; 923 busyMax = getCurrentBusy(); 924 busyMinRecent = busyMin; 925 busyMin = getCurrentBusy(); 926 } 927 928 932 public synchronized void adjust() { 933 if (logger.isLoggable(BasicLevel.DEBUG)) { 934 logger.log(BasicLevel.DEBUG, ""); 935 } 936 937 int count = xac2item.size() - poolMin; 941 if (count >= 0) { 944 if (count > MAX_REMOVE_FREELIST) { 945 count = MAX_REMOVE_FREELIST; 946 } 947 for (Iterator i = freeList.iterator(); i.hasNext();) { 948 PoolItem item = (PoolItem) i.next(); 949 if (item.isAged()) { 950 if (logger.isLoggable(BasicLevel.DEBUG)) { 951 logger.log(BasicLevel.DEBUG, "remove a timed out connection"); 952 } 953 i.remove(); 954 destroyItem(item); 955 count--; 956 if (count <= 0) { 957 break; 958 } 959 } 960 } 961 } 962 recomputeBusy(); 963 964 for (Iterator i = xac2item.values().iterator(); i.hasNext();) { 966 PoolItem item = (PoolItem) i.next(); 967 if (item.inactive()) { 968 logger.log(BasicLevel.WARN, "close a timed out open connection"); 969 i.remove(); 970 item.remove(); 972 connectionLeaks++; 973 if (currentWaiters > 0) { 975 notify(); 976 } 977 } 978 } 979 980 if (poolMax != NO_LIMIT) { 983 while (freeList.size() > poolMin && xac2item.size() > poolMax) { 984 PoolItem item = (PoolItem) freeList.removeFirst(); 985 destroyItem(item); 986 } 987 } 988 recomputeBusy(); 989 990 while (xac2item.size() < poolMin) { 992 XAConnection xac = null; 993 try { 994 xac = xads.getXAConnection(); 995 openedCount++; 996 } catch (SQLException e) { 997 throw new ServiceException("Could not create " + poolMin + " items in the pool : ", e); 998 } 999 if (logger.isLoggable(BasicLevel.DEBUG)) { 1001 logger.log(BasicLevel.DEBUG, "Create a new available Connection"); 1002 } 1003 PoolItem item = new PoolItem(this, xac, null, logger); 1004 freeList.addLast(item); 1005 xac2item.put(xac, item); 1006 xac.addConnectionEventListener(cmgr); 1007 } 1008 } 1009 1010 1013 public synchronized void closeAllConnections() { 1014 if (logger.isLoggable(BasicLevel.DEBUG)) { 1015 logger.log(BasicLevel.DEBUG, ""); 1016 } 1017 1018 poolKeeper.stop(); 1020 1021 Iterator it = xac2item.keySet().iterator(); 1023 try { 1024 while (it.hasNext()) { 1025 XAConnection xac = (XAConnection ) it.next(); 1026 xac.close(); 1027 } 1028 } catch (java.sql.SQLException e) { 1029 logger.log(BasicLevel.ERROR, "Error while closing a Connection:", e); 1030 } 1031 } 1032 1033 1037 1041 private void freeItem(PoolItem item) { 1042 freeList.addLast(item); 1046 if (logger.isLoggable(BasicLevel.DEBUG)) { 1047 logger.log(BasicLevel.DEBUG, "item added to freeList: " + freeList.size()); 1048 } 1049 if (currentWaiters > 0) { 1051 notify(); 1052 } 1053 recomputeBusy(); 1054 } 1055 1056 1060 private void destroyItem(PoolItem item) { 1061 if (logger.isLoggable(BasicLevel.DEBUG)) { 1062 logger.log(BasicLevel.DEBUG, "remove an item in xac2item"); 1063 } 1064 xac2item.remove(item.getXACon()); 1065 item.remove(); 1066 if (currentWaiters > 0) { 1068 notify(); 1069 } 1070 recomputeBusy(); 1071 } 1072 1073} 1074 | Popular Tags |