1 24 25 package org.continuent.sequoia.controller.loadbalancer; 26 27 import java.sql.CallableStatement ; 28 import java.sql.Connection ; 29 import java.sql.PreparedStatement ; 30 import java.sql.ResultSet ; 31 import java.sql.SQLException ; 32 import java.sql.SQLWarning ; 33 import java.sql.Statement ; 34 import java.util.ArrayList ; 35 import java.util.Iterator ; 36 import java.util.LinkedList ; 37 import java.util.List ; 38 39 import org.continuent.sequoia.common.exceptions.BadConnectionException; 40 import org.continuent.sequoia.common.exceptions.NoMoreBackendException; 41 import org.continuent.sequoia.common.exceptions.UnreachableBackendException; 42 import org.continuent.sequoia.common.i18n.Translate; 43 import org.continuent.sequoia.common.log.Trace; 44 import org.continuent.sequoia.common.xml.DatabasesXmlTags; 45 import org.continuent.sequoia.common.xml.XmlComponent; 46 import org.continuent.sequoia.controller.backend.DatabaseBackend; 47 import org.continuent.sequoia.controller.backend.DriverCompliance; 48 import org.continuent.sequoia.controller.backend.result.ControllerResultSet; 49 import org.continuent.sequoia.controller.backend.result.ExecuteResult; 50 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult; 51 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult; 52 import org.continuent.sequoia.controller.cache.metadata.MetadataCache; 53 import org.continuent.sequoia.controller.connection.AbstractConnectionManager; 54 import org.continuent.sequoia.controller.connection.PooledConnection; 55 import org.continuent.sequoia.controller.core.ControllerConstants; 56 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy; 57 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask; 58 import org.continuent.sequoia.controller.locks.ReadPrioritaryFIFOWriteLock; 59 import org.continuent.sequoia.controller.protocol.PreparedStatementSerialization; 60 import org.continuent.sequoia.controller.recoverylog.RecoveryLog; 61 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 62 import org.continuent.sequoia.controller.requests.AbstractRequest; 63 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 64 import org.continuent.sequoia.controller.requests.SelectRequest; 65 import org.continuent.sequoia.controller.requests.StoredProcedure; 66 import org.continuent.sequoia.controller.sql.macros.MacrosHandler; 67 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 68 import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendWritesMessage; 69 70 84 public abstract class AbstractLoadBalancer implements XmlComponent 85 { 86 87 98 protected VirtualDatabase vdb; 100 protected RecoveryLog recoveryLog; 101 protected int raidbLevel; 102 protected int parsingGranularity; 103 104 protected LinkedList totalOrderQueue; 105 106 protected MacrosHandler macroHandler; 107 108 114 protected ArrayList enabledBackends; 115 protected ReadPrioritaryFIFOWriteLock backendListLock = new ReadPrioritaryFIFOWriteLock(); 116 117 118 public WaitForCompletionPolicy waitForCompletionPolicy; 119 120 private static int defaultTransactionIsolationLevel; 121 122 protected static Trace logger = Trace 123 .getLogger("org.continuent.sequoia.controller.loadbalancer"); 124 125 134 protected AbstractLoadBalancer(VirtualDatabase vdb, int raidbLevel, 135 int parsingGranularity) throws SQLException 136 { 137 this.raidbLevel = raidbLevel; 138 this.parsingGranularity = parsingGranularity; 139 this.vdb = vdb; 140 this.totalOrderQueue = vdb.getTotalOrderQueue(); 141 this.enabledBackends = new ArrayList (); 142 try 143 { 144 vdb.acquireReadLockBackendLists(); 145 } 146 catch (InterruptedException e) 147 { 148 String msg = Translate.get( 149 "loadbalancer.backendlist.acquire.readlock.failed", e); 150 logger.error(msg); 151 throw new SQLException (msg); 152 } 153 int size = vdb.getBackends().size(); 154 ArrayList backends = vdb.getBackends(); 155 for (int i = 0; i < size; i++) 156 { 157 DatabaseBackend backend = (DatabaseBackend) backends.get(i); 158 if (backend.isReadEnabled() || backend.isWriteEnabled()) 159 { 160 if (logger.isWarnEnabled()) 161 logger.warn(Translate.get( 162 "loadbalancer.constructor.backends.not.disabled", backend 163 .getName())); 164 try 165 { 166 disableBackend(backend, true); 167 } 168 catch (Exception e) 169 { backend.disable(); 171 } 172 } 173 } 174 vdb.releaseReadLockBackendLists(); 175 } 176 177 181 186 public static final int getDefaultTransactionIsolationLevel() 187 { 188 return defaultTransactionIsolationLevel; 189 } 190 191 197 public final void setDefaultTransactionIsolationLevel( 198 int defaultTransactionIsolationLevel) 199 { 200 AbstractLoadBalancer.defaultTransactionIsolationLevel = defaultTransactionIsolationLevel; 201 } 202 203 210 public void setMacroHandler(MacrosHandler handler) 211 { 212 this.macroHandler = handler; 213 } 214 215 220 public int getParsingGranularity() 221 { 222 return parsingGranularity; 223 } 224 225 230 public int getRAIDbLevel() 231 { 232 return raidbLevel; 233 } 234 235 240 public final RecoveryLog getRecoveryLog() 241 { 242 return recoveryLog; 243 } 244 245 250 public final void setRecoveryLog(RecoveryLog recoveryLog) 251 { 252 this.recoveryLog = recoveryLog; 253 } 254 255 262 public void setWeight(String name, int w) throws SQLException 263 { 264 throw new SQLException ("Weight is not supported by this load balancer"); 265 } 266 267 271 285 protected int acquireLockAndCheckNbOfThreads(Object request, 286 String requestDescription) throws SQLException , NoMoreBackendException 287 { 288 try 289 { 290 backendListLock.acquireRead(); 291 } 292 catch (InterruptedException e) 293 { 294 String msg = Translate.get( 295 "loadbalancer.backendlist.acquire.readlock.failed", e); 296 logger.error(msg); 297 throw new SQLException (msg); 298 } 299 300 int nbOfThreads = enabledBackends.size(); 301 if (nbOfThreads == 0) 302 { 303 releaseLockAndUnlockNextQuery(request); 304 throw new NoMoreBackendException(Translate 305 .get("loadbalancer.backendlist.empty")); 306 } 307 else 308 { 309 if (logger.isDebugEnabled()) 310 logger.debug(Translate.get("loadbalancer.execute.on.several", 311 new String []{requestDescription, String.valueOf(nbOfThreads)})); 312 } 313 return nbOfThreads; 314 } 315 316 323 protected int getNbToWait(int nbOfThreads) 324 { 325 int nbToWait; 326 switch (waitForCompletionPolicy.getPolicy()) 327 { 328 case WaitForCompletionPolicy.FIRST : 329 nbToWait = 1; 330 break; 331 case WaitForCompletionPolicy.MAJORITY : 332 nbToWait = nbOfThreads / 2 + 1; 333 break; 334 case WaitForCompletionPolicy.ALL : 335 nbToWait = nbOfThreads; 336 break; 337 default : 338 logger 339 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 340 nbToWait = nbOfThreads; 341 break; 342 } 343 return nbToWait; 344 } 345 346 354 public void handleMacros(AbstractRequest request) 355 { 356 if (macroHandler == null) 357 return; 358 359 if (!request.needsMacroProcessing()) 361 return; 362 363 macroHandler.processMacros(request); 364 } 365 366 372 protected void releaseLockAndUnlockNextQuery(Object currentQuery) 373 { 374 backendListLock.releaseRead(); 375 376 removeObjectFromAndNotifyTotalOrderQueue(currentQuery); 378 } 379 380 386 public void removeObjectFromAndNotifyTotalOrderQueue(Object request) 387 { 388 if ((totalOrderQueue != null) && (request != null)) 389 { 390 synchronized (totalOrderQueue) 391 { 392 try 393 { 394 if (totalOrderQueue.remove(request)) 395 { 396 if (logger.isDebugEnabled()) 397 logger.debug("Removed " + request + " from total order queue"); 398 totalOrderQueue.notifyAll(); 399 } 400 else if (logger.isDebugEnabled()) 401 { 402 logger.debug(request + " was not in the total order queue"); 403 } 404 } 405 catch (RuntimeException e) 406 { 407 logger.warn("Unable to remove request " + request 408 + " from total order queue", e); 409 } 410 } 411 } 412 } 413 414 424 public static void waitForTaskCompletion(long timeout, 425 String requestDescription, AbstractTask task) throws SQLException 426 { 427 try 429 { 430 if (timeout > 0) 432 { 433 long start = System.currentTimeMillis(); 434 task.wait(timeout); 435 long end = System.currentTimeMillis(); 436 long remaining = timeout - (end - start); 437 if (remaining <= 0) 438 { 439 if (task.setExpiredTimeout()) 440 { String msg = Translate.get("loadbalancer.request.timeout", 442 new String []{requestDescription, 443 String.valueOf(task.getSuccess()), 444 String.valueOf(task.getFailed())}); 445 446 logger.warn(msg); 447 throw new SQLException (msg); 448 } 449 } 451 } 453 else 454 task.wait(); 455 } 456 catch (InterruptedException e) 457 { 458 if (task.setExpiredTimeout()) 459 { String msg = Translate.get("loadbalancer.request.timeout", 461 new String []{requestDescription, String.valueOf(task.getSuccess()), 462 String.valueOf(task.getFailed())}); 463 464 logger.warn(msg); 465 throw new SQLException (msg); 466 } 467 } 469 } 470 471 485 public boolean waitForTotalOrder(Object request, boolean errorIfNotFound) 486 { 487 if (totalOrderQueue != null) 488 { 489 synchronized (totalOrderQueue) 490 { 491 int index = totalOrderQueue.indexOf(request); 492 while (index > 0) 493 { 494 if (logger.isDebugEnabled()) 495 logger.debug("Waiting for " + index 496 + " queries to execute (current is " + totalOrderQueue.get(0) 497 + ")"); 498 499 boolean foundNonSuspendedRequest = false; 501 for (int i = 0; i < index; i++) 502 { 503 if (!vdb.getRequestManager().getScheduler().isSuspendedRequest( 504 totalOrderQueue.get(i))) 505 { 506 foundNonSuspendedRequest = true; 507 break; 508 } 509 } 510 if (!foundNonSuspendedRequest) 511 { 512 index = 0; 513 break; 514 } 515 516 try 517 { 518 totalOrderQueue.wait(); 519 } 520 catch (InterruptedException ignore) 521 { 522 } 523 index = totalOrderQueue.indexOf(request); 524 } 525 if (index == -1) 526 { 527 if (errorIfNotFound) 528 logger 529 .error("Request was not found in total order queue, posting out of order (" 530 + request + ")"); 531 return false; 532 } 533 else 534 return true; 535 } 536 } 537 return false; 538 } 539 540 546 public void waitForSuspendWritesToComplete(AbstractRequest request) 547 { 548 if (totalOrderQueue != null) 549 { 550 synchronized (totalOrderQueue) 551 { 552 boolean hasToWait = true; 553 while (hasToWait) 554 { 555 hasToWait = false; 556 for (Iterator iter = totalOrderQueue.iterator(); iter.hasNext();) 560 { 561 Object elem = iter.next(); 562 if (elem instanceof SuspendWritesMessage) 563 { 564 hasToWait = true; 566 break; 567 } 568 else if (elem instanceof AbstractRequest) 569 { 570 AbstractRequest req = (AbstractRequest) elem; 572 if (req == request) 573 break; 574 } 575 } 576 if (hasToWait) 577 try 578 { 579 totalOrderQueue.wait(); 580 } 581 catch (InterruptedException ignore) 582 { 583 } 584 } 585 } 586 } 587 } 588 589 593 604 public abstract ControllerResultSet statementExecuteQuery( 605 SelectRequest request, MetadataCache metadataCache) throws SQLException , 606 AllBackendsFailedException; 607 608 620 public abstract ExecuteUpdateResult statementExecuteUpdate( 621 AbstractWriteRequest request) throws AllBackendsFailedException, 622 NoMoreBackendException, SQLException ; 623 624 637 public abstract GeneratedKeysResult statementExecuteUpdateWithKeys( 638 AbstractWriteRequest request, MetadataCache metadataCache) 639 throws AllBackendsFailedException, NoMoreBackendException, SQLException ; 640 641 651 public abstract ExecuteResult statementExecute(AbstractRequest request, 652 MetadataCache metadataCache) throws AllBackendsFailedException, 653 SQLException ; 654 655 664 public abstract ControllerResultSet readOnlyCallableStatementExecuteQuery( 665 StoredProcedure proc, MetadataCache metadataCache) throws SQLException ; 666 667 676 public abstract ExecuteResult readOnlyCallableStatementExecute( 677 StoredProcedure proc, MetadataCache metadataCache) throws SQLException ; 678 679 690 public abstract ControllerResultSet callableStatementExecuteQuery( 691 StoredProcedure proc, MetadataCache metadataCache) 692 throws AllBackendsFailedException, SQLException ; 693 694 703 public abstract ExecuteUpdateResult callableStatementExecuteUpdate( 704 StoredProcedure proc) throws AllBackendsFailedException, SQLException ; 705 706 716 public abstract ExecuteResult callableStatementExecute(StoredProcedure proc, 717 MetadataCache metadataCache) throws AllBackendsFailedException, 718 SQLException ; 719 720 728 public abstract ControllerResultSet getPreparedStatementGetMetaData( 729 AbstractRequest request) throws SQLException ; 730 731 735 private static Statement setupStatementOrPreparedStatement( 736 AbstractRequest request, DatabaseBackend backend, 737 BackendWorkerThread workerThread, Connection c, 738 boolean setupResultSetParameters, boolean needGeneratedKeys) 739 throws SQLException 740 { 741 Statement s; if (request.getPreparedStatementParameters() == null) 743 s = c.createStatement(); 744 else 745 { 746 String rewrittenTemplate = backend.rewriteQuery(request 747 .getSqlOrTemplate()); 748 if (needGeneratedKeys) 749 s = c.prepareStatement(rewrittenTemplate, 750 Statement.RETURN_GENERATED_KEYS); 751 else 752 s = c.prepareStatement(rewrittenTemplate); 753 PreparedStatementSerialization.setPreparedStatement(request 754 .getPreparedStatementParameters(), (PreparedStatement ) s); 755 } 756 757 if (workerThread != null) 760 workerThread.setCurrentStatement(s); 761 762 DriverCompliance driverCompliance = backend.getDriverCompliance(); 763 if (driverCompliance.supportSetQueryTimeout()) 764 s.setQueryTimeout(request.getTimeout()); 765 766 if (setupResultSetParameters) 767 { 768 if ((request.getCursorName() != null) 769 && (driverCompliance.supportSetCursorName())) 770 s.setCursorName(request.getCursorName()); 771 if ((request.getFetchSize() != 0) 772 && driverCompliance.supportSetFetchSize()) 773 s.setFetchSize(request.getFetchSize()); 774 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 775 s.setMaxRows(request.getMaxRows()); 776 } 777 return s; 778 } 779 780 796 public static final ControllerResultSet executeStatementExecuteQueryOnBackend( 797 SelectRequest request, DatabaseBackend backend, 798 BackendWorkerThread workerThread, Connection c, 799 MetadataCache metadataCache) throws SQLException , BadConnectionException, 800 UnreachableBackendException 801 { 802 ControllerResultSet rs = null; 803 ResultSet backendRS = null; 804 try 805 { 806 backend.addPendingReadRequest(request); 807 808 Statement s = setupStatementOrPreparedStatement(request, backend, 809 workerThread, c, true, false); 810 811 if (request.getPreparedStatementParameters() == null) 813 backendRS = s.executeQuery(backend.rewriteQuery(request 814 .getSqlOrTemplate())); 815 else 816 backendRS = ((PreparedStatement ) s).executeQuery(); 817 818 SQLWarning stWarns = null; 819 if (request.getRetrieveSQLWarnings()) 820 { 821 stWarns = s.getWarnings(); 822 } 823 rs = new ControllerResultSet(request, backendRS, metadataCache, s, false); 824 rs.setStatementWarnings(stWarns); 825 } 826 catch (SQLException e) 827 { 834 if (backend.isValidConnection(c)) 835 throw e; else if (request.isPersistentConnection()) 837 throw new UnreachableBackendException("Bad persistent connection", e); 838 else 839 throw new BadConnectionException(e); 840 } 841 finally 842 { 843 if (backendRS != null && request.getFetchSize() == 0) 845 try 846 { 847 backendRS.close(); 848 } 849 catch (SQLException ignore) 850 { 851 } 852 backend.removePendingRequest(request); 853 } 854 return rs; 855 } 856 857 871 public static final ExecuteUpdateResult executeStatementExecuteUpdateOnBackend( 872 AbstractWriteRequest request, DatabaseBackend backend, 873 BackendWorkerThread workerThread, Connection c) throws SQLException , 874 BadConnectionException 875 { 876 Statement s = null; 877 try 878 { 879 backend.addPendingWriteRequest(request); 880 881 s = setupStatementOrPreparedStatement(request, backend, workerThread, c, 882 false, false); 883 884 ExecuteUpdateResult eur; 886 if (request.getPreparedStatementParameters() == null) 887 eur = new ExecuteUpdateResult(s.executeUpdate(backend 888 .rewriteQuery(request.getSqlOrTemplate()))); 889 else 890 eur = new ExecuteUpdateResult(((PreparedStatement ) s).executeUpdate()); 891 if (request.getRetrieveSQLWarnings()) 893 eur.setStatementWarnings(s.getWarnings()); 894 895 if (request.requiresConnectionPoolFlush()) 896 backend.flagAllConnectionsForRenewal(); 897 898 return eur; 899 } 900 catch (SQLException e) 901 { if (backend.isValidConnection(c)) 903 throw e; else 905 throw new BadConnectionException(e); 906 } 907 finally 908 { 909 backend.removePendingRequest(request); 910 try 911 { 912 if (s != null) 913 s.close(); 914 } 915 catch (SQLException ignore) 916 { 917 } 918 } 919 } 920 921 936 public static final GeneratedKeysResult executeStatementExecuteUpdateWithKeysOnBackend( 937 AbstractWriteRequest request, DatabaseBackend backend, 938 BackendWorkerThread workerThread, Connection c, 939 MetadataCache metadataCache) throws SQLException , BadConnectionException 940 { 941 if (!backend.getDriverCompliance().supportGetGeneratedKeys()) 942 throw new SQLException ("Backend " + backend.getName() 943 + " does not support RETURN_GENERATED_KEYS"); 944 945 Statement s = null; 946 try 947 { 948 backend.addPendingWriteRequest(request); 949 950 s = setupStatementOrPreparedStatement(request, backend, workerThread, c, 951 false, true); 952 953 int updateCount; 955 if (request.getPreparedStatementParameters() == null) 956 updateCount = s.executeUpdate(backend.rewriteQuery(request 957 .getSqlOrTemplate()), Statement.RETURN_GENERATED_KEYS); 958 else 959 updateCount = ((PreparedStatement ) s).executeUpdate(); 960 SQLWarning stWarns = null; 962 if (request.getRetrieveSQLWarnings()) 963 { 964 stWarns = s.getWarnings(); 965 } 966 ControllerResultSet rs = new ControllerResultSet(request, s 967 .getGeneratedKeys(), metadataCache, s, false); 968 GeneratedKeysResult gkr = new GeneratedKeysResult(rs, updateCount); 969 gkr.setStatementWarnings(stWarns); 970 971 if (request.requiresConnectionPoolFlush()) 972 backend.flagAllConnectionsForRenewal(); 973 974 return gkr; 975 } 976 catch (SQLException e) 977 { if (backend.isValidConnection(c)) 979 throw e; else 981 throw new BadConnectionException(e); 982 } 983 finally 984 { 985 backend.removePendingRequest(request); 986 try 987 { 988 if (s != null) 989 s.close(); 990 } 991 catch (SQLException ignore) 992 { 993 } 994 } 995 } 996 997 1011 public static final ExecuteResult executeStatementExecuteOnBackend( 1012 AbstractRequest request, DatabaseBackend backend, 1013 BackendWorkerThread workerThread, Connection c, 1014 MetadataCache metadataCache) throws SQLException , BadConnectionException 1015 { 1016 Statement s = null; 1017 try 1018 { 1019 backend.addPendingWriteRequest(request); 1020 1021 request.setFetchSize(0); 1023 1024 s = setupStatementOrPreparedStatement(request, backend, workerThread, c, 1025 true, false); 1026 1027 boolean hasResult; 1029 if (request.getPreparedStatementParameters() == null) 1030 hasResult = s.execute(backend.rewriteQuery(request.getSqlOrTemplate())); 1031 else 1032 hasResult = ((PreparedStatement ) s).execute(); 1033 1034 int updatedRows = 0; 1035 ExecuteResult result = new ExecuteResult(); 1037 if (request.getRetrieveSQLWarnings()) 1039 result.setStatementWarnings(s.getWarnings()); 1040 do 1041 { 1042 if (hasResult) 1043 { 1044 ControllerResultSet crs = new ControllerResultSet(request, s 1045 .getResultSet(), metadataCache, null, true); 1046 result.addResult(crs); 1047 } 1048 else 1049 { 1050 updatedRows = s.getUpdateCount(); 1051 result.addResult(updatedRows); 1052 } 1053 hasResult = s.getMoreResults(); 1054 } 1055 while (hasResult || (updatedRows != -1)); 1056 1057 if (request.requiresConnectionPoolFlush()) 1058 backend.flagAllConnectionsForRenewal(); 1059 1060 return result; 1061 } 1062 catch (SQLException e) 1063 { if (backend.isValidConnection(c)) 1065 throw e; else 1067 throw new BadConnectionException(e); 1068 } 1069 finally 1070 { 1071 backend.removePendingRequest(request); 1072 try 1073 { 1074 if (s != null) 1075 s.close(); 1076 } 1077 catch (SQLException ignore) 1078 { 1079 } 1080 } 1081 } 1082 1083 1095 private static void fetchOutAndNamedParameters(CallableStatement cs, 1096 StoredProcedure proc) throws SQLException 1097 { 1098 List outParamIndexes = proc.getOutParameterIndexes(); 1100 if (outParamIndexes != null) 1101 { 1102 for (Iterator iter = outParamIndexes.iterator(); iter.hasNext();) 1103 { 1104 Object index = iter.next(); 1105 if (index instanceof Integer ) 1106 proc.setOutParameterValue(index, cs.getObject(((Integer ) index) 1107 .intValue())); 1108 else 1109 proc.setOutParameterValue(index, cs.getObject((String ) index)); 1111 } 1112 } 1113 1114 List namedParamNames = proc.getNamedParameterNames(); 1116 if (namedParamNames != null) 1117 { 1118 for (Iterator iter = namedParamNames.iterator(); iter.hasNext();) 1119 { 1120 String paramName = (String ) iter.next(); 1122 proc.setNamedParameterValue(paramName, cs.getObject(paramName)); 1123 } 1124 } 1125 } 1126 1127 1131 private static CallableStatement setupCallableStatement(StoredProcedure proc, 1132 DatabaseBackend backend, BackendWorkerThread workerThread, Connection c, 1133 boolean setupResultSetParameters) throws SQLException 1134 { 1135 CallableStatement cs; cs = c.prepareCall(backend.rewriteQuery(proc.getSqlOrTemplate())); 1137 if (proc.getPreparedStatementParameters() != null) 1138 PreparedStatementSerialization.setCallableStatement(backend 1139 .rewriteQuery(proc.getPreparedStatementParameters()), cs, proc); 1140 1141 if (workerThread != null) 1144 workerThread.setCurrentStatement(cs); 1145 1146 DriverCompliance driverCompliance = backend.getDriverCompliance(); 1147 if (driverCompliance.supportSetQueryTimeout()) 1148 cs.setQueryTimeout(proc.getTimeout()); 1149 1150 if (setupResultSetParameters) 1151 { 1152 if ((proc.getCursorName() != null) 1153 && (driverCompliance.supportSetCursorName())) 1154 cs.setCursorName(proc.getCursorName()); 1155 if ((proc.getFetchSize() != 0) && driverCompliance.supportSetFetchSize()) 1156 cs.setFetchSize(proc.getFetchSize()); 1157 if ((proc.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 1158 cs.setMaxRows(proc.getMaxRows()); 1159 } 1160 return cs; 1161 } 1162 1163 1177 public static final ControllerResultSet executeCallableStatementExecuteQueryOnBackend( 1178 StoredProcedure proc, DatabaseBackend backend, 1179 BackendWorkerThread workerThread, Connection c, 1180 MetadataCache metadataCache) throws SQLException , BadConnectionException 1181 { 1182 CallableStatement cs = null; 1183 ResultSet backendRS = null; 1184 try 1185 { 1186 backend.addPendingReadRequest(proc); 1187 1188 cs = setupCallableStatement(proc, backend, workerThread, c, true); 1189 1190 backendRS = cs.executeQuery(); 1192 1193 SQLWarning stWarns = null; 1194 if (proc.getRetrieveSQLWarnings()) 1195 { 1196 stWarns = cs.getWarnings(); 1197 } 1198 ControllerResultSet rs = new ControllerResultSet(proc, backendRS, 1199 metadataCache, cs, false); 1200 rs.setStatementWarnings(stWarns); 1201 fetchOutAndNamedParameters(cs, proc); 1202 1203 if (proc.requiresConnectionPoolFlush()) 1204 backend.flagAllConnectionsForRenewal(); 1205 1206 return rs; 1207 } 1208 catch (SQLException e) 1209 { if (backend.isValidConnection(c)) 1211 throw e; else 1213 throw new BadConnectionException(e); 1214 } 1215 finally 1216 { 1217 if (backendRS != null && proc.getFetchSize() == 0) 1219 { 1220 try 1221 { 1222 backendRS.close(); 1223 } 1224 catch (SQLException ignore) 1225 { 1226 } 1227 } 1228 backend.removePendingRequest(proc); 1229 } 1230 } 1231 1232 1245 public static final ExecuteUpdateResult executeCallableStatementExecuteUpdateOnBackend( 1246 StoredProcedure proc, DatabaseBackend backend, 1247 BackendWorkerThread workerThread, Connection c) throws SQLException , 1248 BadConnectionException 1249 { 1250 CallableStatement cs = null; 1251 try 1252 { 1253 backend.addPendingWriteRequest(proc); 1254 1255 cs = setupCallableStatement(proc, backend, workerThread, c, false); 1256 1257 ExecuteUpdateResult eur = new ExecuteUpdateResult(cs.executeUpdate()); 1259 if (proc.getRetrieveSQLWarnings()) 1261 eur.setStatementWarnings(cs.getWarnings()); 1262 1263 fetchOutAndNamedParameters(cs, proc); 1264 1265 if (proc.requiresConnectionPoolFlush()) 1266 backend.flagAllConnectionsForRenewal(); 1267 1268 return eur; 1269 } 1270 catch (SQLException e) 1271 { if (backend.isValidConnection(c)) 1273 throw e; else 1275 throw new BadConnectionException(e); 1276 } 1277 finally 1278 { 1279 backend.removePendingRequest(proc); 1280 try 1281 { 1282 if (cs != null) 1283 cs.close(); 1284 } 1285 catch (SQLException ignore) 1286 { 1287 } 1288 } 1289 } 1290 1291 1306 public static final ExecuteResult executeCallableStatementExecuteOnBackend( 1307 StoredProcedure proc, DatabaseBackend backend, 1308 BackendWorkerThread workerThread, Connection c, 1309 MetadataCache metadataCache) throws SQLException , BadConnectionException 1310 { 1311 CallableStatement cs = null; 1312 try 1313 { 1314 backend.addPendingWriteRequest(proc); 1315 1316 proc.setFetchSize(0); 1318 1319 cs = setupCallableStatement(proc, backend, workerThread, c, true); 1320 1321 boolean hasResult = cs.execute(); 1323 int updatedRows = 0; 1324 ExecuteResult result = new ExecuteResult(); 1326 if (proc.getRetrieveSQLWarnings()) 1328 result.setStatementWarnings(cs.getWarnings()); 1329 do 1330 { 1331 if (hasResult) 1332 { 1333 ControllerResultSet crs = new ControllerResultSet(proc, cs 1334 .getResultSet(), metadataCache, null, true); 1335 result.addResult(crs); 1336 } 1337 else 1338 { 1339 updatedRows = cs.getUpdateCount(); 1340 result.addResult(updatedRows); 1341 } 1342 if (updatedRows != -1) 1343 hasResult = cs.getMoreResults(); 1344 } 1345 while (hasResult || (updatedRows != -1)); 1346 1347 fetchOutAndNamedParameters(cs, proc); 1348 1349 if (proc.requiresConnectionPoolFlush()) 1350 backend.flagAllConnectionsForRenewal(); 1351 1352 return result; 1353 } 1354 catch (SQLException e) 1355 { if (backend.isValidConnection(c)) 1357 throw e; else 1359 throw new BadConnectionException(e); 1360 } 1361 finally 1362 { 1363 backend.removePendingRequest(proc); 1364 try 1365 { 1366 if (cs != null) 1367 cs.close(); 1368 } 1369 catch (SQLException ignore) 1370 { 1371 } 1372 } 1373 } 1374 1375 1385 public static final ControllerResultSet preparedStatementGetMetaDataOnBackend( 1386 String sqlTemplate, DatabaseBackend backend, Connection c) 1387 throws SQLException , BadConnectionException 1388 { 1389 try 1390 { 1391 PreparedStatement ps = c.prepareStatement(sqlTemplate); 1392 return new ControllerResultSet(ControllerConstants.CONTROLLER_FACTORY 1393 .getResultSetMetaDataFactory().copyResultSetMetaData( 1394 ps.getMetaData(), null), new ArrayList ()); 1395 } 1396 catch (SQLException e) 1397 { if (backend.isValidConnection(c)) 1399 throw e; else 1401 throw new BadConnectionException(e); 1402 } 1403 } 1404 1405 1409 1415 public abstract void abort(TransactionMetaData tm) throws SQLException ; 1416 1417 1423 public abstract void begin(TransactionMetaData tm) throws SQLException ; 1424 1425 1433 public abstract void commit(TransactionMetaData tm) 1434 throws AllBackendsFailedException, SQLException ; 1435 1436 1444 public abstract void rollback(TransactionMetaData tm) 1445 throws AllBackendsFailedException, SQLException ; 1446 1447 1456 public abstract void rollbackToSavepoint(TransactionMetaData tm, 1457 String savepointName) throws AllBackendsFailedException, SQLException ; 1458 1459 1468 public abstract void setSavepoint(TransactionMetaData tm, String name) 1469 throws AllBackendsFailedException, SQLException ; 1470 1471 1480 public abstract void releaseSavepoint(TransactionMetaData tm, String name) 1481 throws AllBackendsFailedException, SQLException ; 1482 1483 1490 public abstract void closePersistentConnection(String login, 1491 long persistentConnectionId) throws SQLException ; 1492 1493 1500 public abstract void openPersistentConnection(String login, 1501 long persistentConnectionId) throws SQLException ; 1502 1503 1520 public static final Connection getConnectionAndBeginTransaction( 1521 DatabaseBackend backend, AbstractConnectionManager cm, 1522 AbstractRequest request) throws SQLException , UnreachableBackendException 1523 { 1524 PooledConnection pc = null; 1525 boolean isConnectionValid = false; 1526 Connection c; 1527 1528 do 1529 { 1530 if (request.isPersistentConnection()) 1531 { pc = cm.retrieveConnectionInAutoCommit(request); 1534 cm.registerConnectionForTransaction(pc, request.getTransactionId()); 1535 } 1536 else 1537 { pc = cm.getConnectionForTransaction(request.getTransactionId()); 1539 } 1540 1541 if (pc == null) 1543 { 1544 throw new UnreachableBackendException(Translate.get( 1545 "loadbalancer.unable.get.connection", new String []{ 1546 String.valueOf(request.getTransactionId()), backend.getName()})); 1547 } 1548 c = pc.getConnection(); 1549 try 1550 { 1551 if (request.getTransactionIsolation() != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) 1552 { 1553 1557 pc.setTransactionIsolation(request.getTransactionIsolation()); 1558 } 1559 else if (defaultTransactionIsolationLevel != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) 1560 { 1561 1566 pc.setTransactionIsolation(defaultTransactionIsolationLevel); 1567 } 1568 1569 c.setAutoCommit(false); 1570 isConnectionValid = true; 1571 } 1572 catch (SQLException e) 1573 { 1574 if (backend.isValidConnection(c)) 1575 throw e; else 1577 { 1578 cm.deleteConnection(request.getTransactionId()); 1579 if (request.isPersistentConnection()) 1580 { 1581 cm.deletePersistentConnection(request.getPersistentConnectionId()); 1582 } 1583 } 1584 } 1585 } 1586 while (!isConnectionValid); 1587 if (pc == null) 1588 { 1589 if (logger.isErrorEnabled()) 1590 { 1591 logger.error("Got a null connection [backend=" + backend.getName() 1592 + ", tid=" + request.getTransactionId() + "]"); 1593 } 1594 } 1595 return c; 1596 } 1597 1598 1602 1611 public abstract void enableBackend(DatabaseBackend db, boolean writeEnabled) 1612 throws SQLException ; 1613 1614 1624 public abstract void disableBackend(DatabaseBackend db, boolean forceDisable) 1625 throws SQLException ; 1626 1627 1633 public int getNumberOfEnabledBackends() 1634 { 1635 return enabledBackends.size(); 1636 } 1637 1638 1643 public abstract String getInformation(); 1644 1645 1650 public abstract String getXmlImpl(); 1651 1652 1655 public String getXml() 1656 { 1657 StringBuffer info = new StringBuffer (); 1658 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + " " 1659 + DatabasesXmlTags.ATT_transactionIsolation + "=\""); 1660 switch (defaultTransactionIsolationLevel) 1661 { 1662 case Connection.TRANSACTION_READ_UNCOMMITTED : 1663 info.append(DatabasesXmlTags.VAL_readUncommitted); 1664 break; 1665 case Connection.TRANSACTION_READ_COMMITTED : 1666 info.append(DatabasesXmlTags.VAL_readCommitted); 1667 break; 1668 case Connection.TRANSACTION_REPEATABLE_READ : 1669 info.append(DatabasesXmlTags.VAL_repeatableRead); 1670 break; 1671 case Connection.TRANSACTION_SERIALIZABLE : 1672 info.append(DatabasesXmlTags.VAL_serializable); 1673 break; 1674 default : 1675 info.append(DatabasesXmlTags.VAL_databaseDefault); 1676 break; 1677 } 1678 info.append("\">"); 1679 info.append(getXmlImpl()); 1680 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 1681 return info.toString(); 1682 } 1683} 1684 | Popular Tags |