|                                                                                                              1
 24
 25  package org.continuent.sequoia.controller.loadbalancer;
 26
 27  import java.sql.CallableStatement
  ; 28  import java.sql.Connection
  ; 29  import java.sql.PreparedStatement
  ; 30  import java.sql.ResultSet
  ; 31  import java.sql.SQLException
  ; 32  import java.sql.SQLWarning
  ; 33  import java.sql.Statement
  ; 34  import java.util.ArrayList
  ; 35  import java.util.Iterator
  ; 36  import java.util.LinkedList
  ; 37  import java.util.List
  ; 38
 39  import org.continuent.sequoia.common.exceptions.BadConnectionException;
 40  import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
 41  import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
 42  import org.continuent.sequoia.common.i18n.Translate;
 43  import org.continuent.sequoia.common.log.Trace;
 44  import org.continuent.sequoia.common.xml.DatabasesXmlTags;
 45  import org.continuent.sequoia.common.xml.XmlComponent;
 46  import org.continuent.sequoia.controller.backend.DatabaseBackend;
 47  import org.continuent.sequoia.controller.backend.DriverCompliance;
 48  import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
 49  import org.continuent.sequoia.controller.backend.result.ExecuteResult;
 50  import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
 51  import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
 52  import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
 53  import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
 54  import org.continuent.sequoia.controller.connection.PooledConnection;
 55  import org.continuent.sequoia.controller.core.ControllerConstants;
 56  import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
 57  import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
 58  import org.continuent.sequoia.controller.locks.ReadPrioritaryFIFOWriteLock;
 59  import org.continuent.sequoia.controller.protocol.PreparedStatementSerialization;
 60  import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
 61  import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
 62  import org.continuent.sequoia.controller.requests.AbstractRequest;
 63  import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
 64  import org.continuent.sequoia.controller.requests.SelectRequest;
 65  import org.continuent.sequoia.controller.requests.StoredProcedure;
 66  import org.continuent.sequoia.controller.sql.macros.MacrosHandler;
 67  import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
 68  import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendWritesMessage;
 69
 70
 84  public abstract class AbstractLoadBalancer implements XmlComponent
 85  {
 86
 87
 98      protected VirtualDatabase             vdb;
 100   protected RecoveryLog                 recoveryLog;
 101   protected int                         raidbLevel;
 102   protected int                         parsingGranularity;
 103
 104   protected LinkedList
  totalOrderQueue; 105
 106   protected MacrosHandler               macroHandler;
 107
 108
 114   protected ArrayList
  enabledBackends; 115   protected ReadPrioritaryFIFOWriteLock backendListLock = new ReadPrioritaryFIFOWriteLock();
 116
 117
 118   public WaitForCompletionPolicy        waitForCompletionPolicy;
 119
 120   private static int                    defaultTransactionIsolationLevel;
 121
 122   protected static Trace                logger          = Trace
 123                                                             .getLogger("org.continuent.sequoia.controller.loadbalancer");
 124
 125
 134   protected AbstractLoadBalancer(VirtualDatabase vdb, int raidbLevel,
 135       int parsingGranularity) throws SQLException
  136   {
 137     this.raidbLevel = raidbLevel;
 138     this.parsingGranularity = parsingGranularity;
 139     this.vdb = vdb;
 140     this.totalOrderQueue = vdb.getTotalOrderQueue();
 141     this.enabledBackends = new ArrayList
  (); 142     try
 143     {
 144       vdb.acquireReadLockBackendLists();
 145     }
 146     catch (InterruptedException
  e) 147     {
 148       String
  msg = Translate.get( 149           "loadbalancer.backendlist.acquire.readlock.failed", e);
 150       logger.error(msg);
 151       throw new SQLException
  (msg); 152     }
 153     int size = vdb.getBackends().size();
 154     ArrayList
  backends = vdb.getBackends(); 155     for (int i = 0; i < size; i++)
 156     {
 157       DatabaseBackend backend = (DatabaseBackend) backends.get(i);
 158       if (backend.isReadEnabled() || backend.isWriteEnabled())
 159       {
 160         if (logger.isWarnEnabled())
 161           logger.warn(Translate.get(
 162               "loadbalancer.constructor.backends.not.disabled", backend
 163                   .getName()));
 164         try
 165         {
 166           disableBackend(backend, true);
 167         }
 168         catch (Exception
  e) 169         {           backend.disable();
 171         }
 172       }
 173     }
 174     vdb.releaseReadLockBackendLists();
 175   }
 176
 177
 181
 186   public static final int getDefaultTransactionIsolationLevel()
 187   {
 188     return defaultTransactionIsolationLevel;
 189   }
 190
 191
 197   public final void setDefaultTransactionIsolationLevel(
 198       int defaultTransactionIsolationLevel)
 199   {
 200     AbstractLoadBalancer.defaultTransactionIsolationLevel = defaultTransactionIsolationLevel;
 201   }
 202
 203
 210   public void setMacroHandler(MacrosHandler handler)
 211   {
 212     this.macroHandler = handler;
 213   }
 214
 215
 220   public int getParsingGranularity()
 221   {
 222     return parsingGranularity;
 223   }
 224
 225
 230   public int getRAIDbLevel()
 231   {
 232     return raidbLevel;
 233   }
 234
 235
 240   public final RecoveryLog getRecoveryLog()
 241   {
 242     return recoveryLog;
 243   }
 244
 245
 250   public final void setRecoveryLog(RecoveryLog recoveryLog)
 251   {
 252     this.recoveryLog = recoveryLog;
 253   }
 254
 255
 262   public void setWeight(String
  name, int w) throws SQLException  263   {
 264     throw new SQLException
  ("Weight is not supported by this load balancer"); 265   }
 266
 267
 271
 285   protected int acquireLockAndCheckNbOfThreads(Object
  request, 286       String
  requestDescription) throws SQLException  , NoMoreBackendException 287   {
 288     try
 289     {
 290       backendListLock.acquireRead();
 291     }
 292     catch (InterruptedException
  e) 293     {
 294       String
  msg = Translate.get( 295           "loadbalancer.backendlist.acquire.readlock.failed", e);
 296       logger.error(msg);
 297       throw new SQLException
  (msg); 298     }
 299
 300     int nbOfThreads = enabledBackends.size();
 301     if (nbOfThreads == 0)
 302     {
 303       releaseLockAndUnlockNextQuery(request);
 304       throw new NoMoreBackendException(Translate
 305           .get("loadbalancer.backendlist.empty"));
 306     }
 307     else
 308     {
 309       if (logger.isDebugEnabled())
 310         logger.debug(Translate.get("loadbalancer.execute.on.several",
 311             new String
  []{requestDescription, String.valueOf(nbOfThreads)})); 312     }
 313     return nbOfThreads;
 314   }
 315
 316
 323   protected int getNbToWait(int nbOfThreads)
 324   {
 325     int nbToWait;
 326     switch (waitForCompletionPolicy.getPolicy())
 327     {
 328       case WaitForCompletionPolicy.FIRST :
 329         nbToWait = 1;
 330         break;
 331       case WaitForCompletionPolicy.MAJORITY :
 332         nbToWait = nbOfThreads / 2 + 1;
 333         break;
 334       case WaitForCompletionPolicy.ALL :
 335         nbToWait = nbOfThreads;
 336         break;
 337       default :
 338         logger
 339             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
 340         nbToWait = nbOfThreads;
 341         break;
 342     }
 343     return nbToWait;
 344   }
 345
 346
 354   public void handleMacros(AbstractRequest request)
 355   {
 356     if (macroHandler == null)
 357       return;
 358
 359         if (!request.needsMacroProcessing())
 361       return;
 362
 363     macroHandler.processMacros(request);
 364   }
 365
 366
 372   protected void releaseLockAndUnlockNextQuery(Object
  currentQuery) 373   {
 374     backendListLock.releaseRead();
 375
 376         removeObjectFromAndNotifyTotalOrderQueue(currentQuery);
 378   }
 379
 380
 386   public void removeObjectFromAndNotifyTotalOrderQueue(Object
  request) 387   {
 388     if ((totalOrderQueue != null) && (request != null))
 389     {
 390       synchronized (totalOrderQueue)
 391       {
 392         try
 393         {
 394           if (totalOrderQueue.remove(request))
 395           {
 396             if (logger.isDebugEnabled())
 397               logger.debug("Removed " + request + " from total order queue");
 398             totalOrderQueue.notifyAll();
 399           }
 400           else if (logger.isDebugEnabled())
 401           {
 402             logger.debug(request + " was not in the total order queue");
 403           }
 404         }
 405         catch (RuntimeException
  e) 406         {
 407           logger.warn("Unable to remove request " + request
 408               + " from total order queue", e);
 409         }
 410       }
 411     }
 412   }
 413
 414
 424   public static void waitForTaskCompletion(long timeout,
 425       String
  requestDescription, AbstractTask task) throws SQLException  426   {
 427         try
 429     {
 430             if (timeout > 0)
 432       {
 433         long start = System.currentTimeMillis();
 434         task.wait(timeout);
 435         long end = System.currentTimeMillis();
 436         long remaining = timeout - (end - start);
 437         if (remaining <= 0)
 438         {
 439           if (task.setExpiredTimeout())
 440           {             String
  msg = Translate.get("loadbalancer.request.timeout", 442                 new String
  []{requestDescription, 443                     String.valueOf(task.getSuccess()),
 444                     String.valueOf(task.getFailed())});
 445
 446             logger.warn(msg);
 447             throw new SQLException
  (msg); 448           }
 449                   }
 451               }
 453       else
 454         task.wait();
 455     }
 456     catch (InterruptedException
  e) 457     {
 458       if (task.setExpiredTimeout())
 459       {         String
  msg = Translate.get("loadbalancer.request.timeout", 461             new String
  []{requestDescription, String.valueOf(task.getSuccess()), 462                 String.valueOf(task.getFailed())});
 463
 464         logger.warn(msg);
 465         throw new SQLException
  (msg); 466       }
 467           }
 469   }
 470
 471
 485   public boolean waitForTotalOrder(Object
  request, boolean errorIfNotFound) 486   {
 487     if (totalOrderQueue != null)
 488     {
 489       synchronized (totalOrderQueue)
 490       {
 491         int index = totalOrderQueue.indexOf(request);
 492         while (index > 0)
 493         {
 494           if (logger.isDebugEnabled())
 495             logger.debug("Waiting for " + index
 496                 + " queries to execute (current is " + totalOrderQueue.get(0)
 497                 + ")");
 498
 499                     boolean foundNonSuspendedRequest = false;
 501           for (int i = 0; i < index; i++)
 502           {
 503             if (!vdb.getRequestManager().getScheduler().isSuspendedRequest(
 504                 totalOrderQueue.get(i)))
 505             {
 506               foundNonSuspendedRequest = true;
 507               break;
 508             }
 509           }
 510           if (!foundNonSuspendedRequest)
 511           {
 512             index = 0;
 513             break;
 514           }
 515
 516           try
 517           {
 518             totalOrderQueue.wait();
 519           }
 520           catch (InterruptedException
  ignore) 521           {
 522           }
 523           index = totalOrderQueue.indexOf(request);
 524         }
 525         if (index == -1)
 526         {
 527           if (errorIfNotFound)
 528             logger
 529                 .error("Request was not found in total order queue, posting out of order ("
 530                     + request + ")");
 531           return false;
 532         }
 533         else
 534           return true;
 535       }
 536     }
 537     return false;
 538   }
 539
 540
 546   public void waitForSuspendWritesToComplete(AbstractRequest request)
 547   {
 548     if (totalOrderQueue != null)
 549     {
 550       synchronized (totalOrderQueue)
 551       {
 552         boolean hasToWait = true;
 553         while (hasToWait)
 554         {
 555           hasToWait = false;
 556                                         for (Iterator
  iter = totalOrderQueue.iterator(); iter.hasNext();) 560           {
 561             Object
  elem = iter.next(); 562             if (elem instanceof SuspendWritesMessage)
 563             {
 564                             hasToWait = true;
 566               break;
 567             }
 568             else if (elem instanceof AbstractRequest)
 569             {
 570                             AbstractRequest req = (AbstractRequest) elem;
 572               if (req == request)
 573                 break;
 574             }
 575           }
 576           if (hasToWait)
 577             try
 578             {
 579               totalOrderQueue.wait();
 580             }
 581             catch (InterruptedException
  ignore) 582             {
 583             }
 584         }
 585       }
 586     }
 587   }
 588
 589
 593
 604   public abstract ControllerResultSet statementExecuteQuery(
 605       SelectRequest request, MetadataCache metadataCache) throws SQLException
  , 606       AllBackendsFailedException;
 607
 608
 620   public abstract ExecuteUpdateResult statementExecuteUpdate(
 621       AbstractWriteRequest request) throws AllBackendsFailedException,
 622       NoMoreBackendException, SQLException
  ; 623
 624
 637   public abstract GeneratedKeysResult statementExecuteUpdateWithKeys(
 638       AbstractWriteRequest request, MetadataCache metadataCache)
 639       throws AllBackendsFailedException, NoMoreBackendException, SQLException
  ; 640
 641
 651   public abstract ExecuteResult statementExecute(AbstractRequest request,
 652       MetadataCache metadataCache) throws AllBackendsFailedException,
 653       SQLException
  ; 654
 655
 664   public abstract ControllerResultSet readOnlyCallableStatementExecuteQuery(
 665       StoredProcedure proc, MetadataCache metadataCache) throws SQLException
  ; 666
 667
 676   public abstract ExecuteResult readOnlyCallableStatementExecute(
 677       StoredProcedure proc, MetadataCache metadataCache) throws SQLException
  ; 678
 679
 690   public abstract ControllerResultSet callableStatementExecuteQuery(
 691       StoredProcedure proc, MetadataCache metadataCache)
 692       throws AllBackendsFailedException, SQLException
  ; 693
 694
 703   public abstract ExecuteUpdateResult callableStatementExecuteUpdate(
 704       StoredProcedure proc) throws AllBackendsFailedException, SQLException
  ; 705
 706
 716   public abstract ExecuteResult callableStatementExecute(StoredProcedure proc,
 717       MetadataCache metadataCache) throws AllBackendsFailedException,
 718       SQLException
  ; 719
 720
 728   public abstract ControllerResultSet getPreparedStatementGetMetaData(
 729       AbstractRequest request) throws SQLException
  ; 730
 731
 735   private static Statement
  setupStatementOrPreparedStatement( 736       AbstractRequest request, DatabaseBackend backend,
 737       BackendWorkerThread workerThread, Connection
  c, 738       boolean setupResultSetParameters, boolean needGeneratedKeys)
 739       throws SQLException
  740   {
 741     Statement
  s;     if (request.getPreparedStatementParameters() == null) 743       s = c.createStatement();
 744     else
 745     {
 746       String
  rewrittenTemplate = backend.rewriteQuery(request 747           .getSqlOrTemplate());
 748       if (needGeneratedKeys)
 749         s = c.prepareStatement(rewrittenTemplate,
 750             Statement.RETURN_GENERATED_KEYS);
 751       else
 752         s = c.prepareStatement(rewrittenTemplate);
 753       PreparedStatementSerialization.setPreparedStatement(request
 754           .getPreparedStatementParameters(), (PreparedStatement
  ) s); 755     }
 756
 757             if (workerThread != null)
 760       workerThread.setCurrentStatement(s);
 761
 762     DriverCompliance driverCompliance = backend.getDriverCompliance();
 763     if (driverCompliance.supportSetQueryTimeout())
 764       s.setQueryTimeout(request.getTimeout());
 765
 766     if (setupResultSetParameters)
 767     {
 768       if ((request.getCursorName() != null)
 769           && (driverCompliance.supportSetCursorName()))
 770         s.setCursorName(request.getCursorName());
 771       if ((request.getFetchSize() != 0)
 772           && driverCompliance.supportSetFetchSize())
 773         s.setFetchSize(request.getFetchSize());
 774       if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows())
 775         s.setMaxRows(request.getMaxRows());
 776     }
 777     return s;
 778   }
 779
 780
 796   public static final ControllerResultSet executeStatementExecuteQueryOnBackend(
 797       SelectRequest request, DatabaseBackend backend,
 798       BackendWorkerThread workerThread, Connection
  c, 799       MetadataCache metadataCache) throws SQLException
  , BadConnectionException, 800       UnreachableBackendException
 801   {
 802     ControllerResultSet rs = null;
 803     ResultSet
  backendRS = null; 804     try
 805     {
 806       backend.addPendingReadRequest(request);
 807
 808       Statement
  s = setupStatementOrPreparedStatement(request, backend, 809           workerThread, c, true, false);
 810
 811             if (request.getPreparedStatementParameters() == null)
 813         backendRS = s.executeQuery(backend.rewriteQuery(request
 814             .getSqlOrTemplate()));
 815       else
 816         backendRS = ((PreparedStatement
  ) s).executeQuery(); 817
 818       SQLWarning
  stWarns = null; 819       if (request.getRetrieveSQLWarnings())
 820       {
 821         stWarns = s.getWarnings();
 822       }
 823       rs = new ControllerResultSet(request, backendRS, metadataCache, s, false);
 824       rs.setStatementWarnings(stWarns);
 825     }
 826     catch (SQLException
  e) 827     {
 834       if (backend.isValidConnection(c))
 835         throw e;       else if (request.isPersistentConnection())
 837         throw new UnreachableBackendException("Bad persistent connection", e);
 838       else
 839         throw new BadConnectionException(e);
 840     }
 841     finally
 842     {
 843             if (backendRS != null && request.getFetchSize() == 0)
 845         try
 846         {
 847           backendRS.close();
 848         }
 849         catch (SQLException
  ignore) 850         {
 851         }
 852       backend.removePendingRequest(request);
 853     }
 854     return rs;
 855   }
 856
 857
 871   public static final ExecuteUpdateResult executeStatementExecuteUpdateOnBackend(
 872       AbstractWriteRequest request, DatabaseBackend backend,
 873       BackendWorkerThread workerThread, Connection
  c) throws SQLException  , 874       BadConnectionException
 875   {
 876     Statement
  s = null; 877     try
 878     {
 879       backend.addPendingWriteRequest(request);
 880
 881       s = setupStatementOrPreparedStatement(request, backend, workerThread, c,
 882           false, false);
 883
 884             ExecuteUpdateResult eur;
 886       if (request.getPreparedStatementParameters() == null)
 887         eur = new ExecuteUpdateResult(s.executeUpdate(backend
 888             .rewriteQuery(request.getSqlOrTemplate())));
 889       else
 890         eur = new ExecuteUpdateResult(((PreparedStatement
  ) s).executeUpdate()); 891             if (request.getRetrieveSQLWarnings())
 893         eur.setStatementWarnings(s.getWarnings());
 894
 895       if (request.requiresConnectionPoolFlush())
 896         backend.flagAllConnectionsForRenewal();
 897
 898       return eur;
 899     }
 900     catch (SQLException
  e) 901     {       if (backend.isValidConnection(c))
 903         throw e;       else
 905         throw new BadConnectionException(e);
 906     }
 907     finally
 908     {
 909       backend.removePendingRequest(request);
 910       try
 911       {
 912         if (s != null)
 913           s.close();
 914       }
 915       catch (SQLException
  ignore) 916       {
 917       }
 918     }
 919   }
 920
 921
 936   public static final GeneratedKeysResult executeStatementExecuteUpdateWithKeysOnBackend(
 937       AbstractWriteRequest request, DatabaseBackend backend,
 938       BackendWorkerThread workerThread, Connection
  c, 939       MetadataCache metadataCache) throws SQLException
  , BadConnectionException 940   {
 941     if (!backend.getDriverCompliance().supportGetGeneratedKeys())
 942       throw new SQLException
  ("Backend " + backend.getName() 943           + " does not support RETURN_GENERATED_KEYS");
 944
 945     Statement
  s = null; 946     try
 947     {
 948       backend.addPendingWriteRequest(request);
 949
 950       s = setupStatementOrPreparedStatement(request, backend, workerThread, c,
 951           false, true);
 952
 953             int updateCount;
 955       if (request.getPreparedStatementParameters() == null)
 956         updateCount = s.executeUpdate(backend.rewriteQuery(request
 957             .getSqlOrTemplate()), Statement.RETURN_GENERATED_KEYS);
 958       else
 959         updateCount = ((PreparedStatement
  ) s).executeUpdate(); 960             SQLWarning
  stWarns = null; 962       if (request.getRetrieveSQLWarnings())
 963       {
 964         stWarns = s.getWarnings();
 965       }
 966       ControllerResultSet rs = new ControllerResultSet(request, s
 967           .getGeneratedKeys(), metadataCache, s, false);
 968       GeneratedKeysResult gkr = new GeneratedKeysResult(rs, updateCount);
 969       gkr.setStatementWarnings(stWarns);
 970
 971       if (request.requiresConnectionPoolFlush())
 972         backend.flagAllConnectionsForRenewal();
 973
 974       return gkr;
 975     }
 976     catch (SQLException
  e) 977     {       if (backend.isValidConnection(c))
 979         throw e;       else
 981         throw new BadConnectionException(e);
 982     }
 983     finally
 984     {
 985       backend.removePendingRequest(request);
 986       try
 987       {
 988         if (s != null)
 989           s.close();
 990       }
 991       catch (SQLException
  ignore) 992       {
 993       }
 994     }
 995   }
 996
 997
 1011  public static final ExecuteResult executeStatementExecuteOnBackend(
 1012      AbstractRequest request, DatabaseBackend backend,
 1013      BackendWorkerThread workerThread, Connection
  c, 1014      MetadataCache metadataCache) throws SQLException
  , BadConnectionException 1015  {
 1016    Statement
  s = null; 1017    try
 1018    {
 1019      backend.addPendingWriteRequest(request);
 1020
 1021            request.setFetchSize(0);
 1023
 1024      s = setupStatementOrPreparedStatement(request, backend, workerThread, c,
 1025          true, false);
 1026
 1027            boolean hasResult;
 1029      if (request.getPreparedStatementParameters() == null)
 1030        hasResult = s.execute(backend.rewriteQuery(request.getSqlOrTemplate()));
 1031      else
 1032        hasResult = ((PreparedStatement
  ) s).execute(); 1033
 1034      int updatedRows = 0;
 1035            ExecuteResult result = new ExecuteResult();
 1037            if (request.getRetrieveSQLWarnings())
 1039        result.setStatementWarnings(s.getWarnings());
 1040      do
 1041      {
 1042        if (hasResult)
 1043        {
 1044          ControllerResultSet crs = new ControllerResultSet(request, s
 1045              .getResultSet(), metadataCache, null, true);
 1046          result.addResult(crs);
 1047        }
 1048        else
 1049        {
 1050          updatedRows = s.getUpdateCount();
 1051          result.addResult(updatedRows);
 1052        }
 1053        hasResult = s.getMoreResults();
 1054      }
 1055      while (hasResult || (updatedRows != -1));
 1056
 1057      if (request.requiresConnectionPoolFlush())
 1058        backend.flagAllConnectionsForRenewal();
 1059
 1060      return result;
 1061    }
 1062    catch (SQLException
  e) 1063    {       if (backend.isValidConnection(c))
 1065        throw e;       else
 1067        throw new BadConnectionException(e);
 1068    }
 1069    finally
 1070    {
 1071      backend.removePendingRequest(request);
 1072      try
 1073      {
 1074        if (s != null)
 1075          s.close();
 1076      }
 1077      catch (SQLException
  ignore) 1078      {
 1079      }
 1080    }
 1081  }
 1082
 1083
 1095  private static void fetchOutAndNamedParameters(CallableStatement
  cs, 1096      StoredProcedure proc) throws SQLException
  1097  {
 1098        List
  outParamIndexes = proc.getOutParameterIndexes(); 1100    if (outParamIndexes != null)
 1101    {
 1102      for (Iterator
  iter = outParamIndexes.iterator(); iter.hasNext();) 1103      {
 1104        Object
  index = iter.next(); 1105        if (index instanceof Integer
  ) 1106          proc.setOutParameterValue(index, cs.getObject(((Integer
  ) index) 1107              .intValue()));
 1108        else
 1109                    proc.setOutParameterValue(index, cs.getObject((String
  ) index)); 1111      }
 1112    }
 1113
 1114        List
  namedParamNames = proc.getNamedParameterNames(); 1116    if (namedParamNames != null)
 1117    {
 1118      for (Iterator
  iter = namedParamNames.iterator(); iter.hasNext();) 1119      {
 1120                String
  paramName = (String  ) iter.next(); 1122        proc.setNamedParameterValue(paramName, cs.getObject(paramName));
 1123      }
 1124    }
 1125  }
 1126
 1127
 1131  private static CallableStatement
  setupCallableStatement(StoredProcedure proc, 1132      DatabaseBackend backend, BackendWorkerThread workerThread, Connection
  c, 1133      boolean setupResultSetParameters) throws SQLException
  1134  {
 1135    CallableStatement
  cs;     cs = c.prepareCall(backend.rewriteQuery(proc.getSqlOrTemplate())); 1137    if (proc.getPreparedStatementParameters() != null)
 1138      PreparedStatementSerialization.setCallableStatement(backend
 1139          .rewriteQuery(proc.getPreparedStatementParameters()), cs, proc);
 1140
 1141            if (workerThread != null)
 1144      workerThread.setCurrentStatement(cs);
 1145
 1146    DriverCompliance driverCompliance = backend.getDriverCompliance();
 1147    if (driverCompliance.supportSetQueryTimeout())
 1148      cs.setQueryTimeout(proc.getTimeout());
 1149
 1150    if (setupResultSetParameters)
 1151    {
 1152      if ((proc.getCursorName() != null)
 1153          && (driverCompliance.supportSetCursorName()))
 1154        cs.setCursorName(proc.getCursorName());
 1155      if ((proc.getFetchSize() != 0) && driverCompliance.supportSetFetchSize())
 1156        cs.setFetchSize(proc.getFetchSize());
 1157      if ((proc.getMaxRows() > 0) && driverCompliance.supportSetMaxRows())
 1158        cs.setMaxRows(proc.getMaxRows());
 1159    }
 1160    return cs;
 1161  }
 1162
 1163
 1177  public static final ControllerResultSet executeCallableStatementExecuteQueryOnBackend(
 1178      StoredProcedure proc, DatabaseBackend backend,
 1179      BackendWorkerThread workerThread, Connection
  c, 1180      MetadataCache metadataCache) throws SQLException
  , BadConnectionException 1181  {
 1182    CallableStatement
  cs = null; 1183    ResultSet
  backendRS = null; 1184    try
 1185    {
 1186      backend.addPendingReadRequest(proc);
 1187
 1188      cs = setupCallableStatement(proc, backend, workerThread, c, true);
 1189
 1190            backendRS = cs.executeQuery();
 1192
 1193      SQLWarning
  stWarns = null; 1194      if (proc.getRetrieveSQLWarnings())
 1195      {
 1196        stWarns = cs.getWarnings();
 1197      }
 1198      ControllerResultSet rs = new ControllerResultSet(proc, backendRS,
 1199          metadataCache, cs, false);
 1200      rs.setStatementWarnings(stWarns);
 1201      fetchOutAndNamedParameters(cs, proc);
 1202
 1203      if (proc.requiresConnectionPoolFlush())
 1204        backend.flagAllConnectionsForRenewal();
 1205
 1206      return rs;
 1207    }
 1208    catch (SQLException
  e) 1209    {       if (backend.isValidConnection(c))
 1211        throw e;       else
 1213        throw new BadConnectionException(e);
 1214    }
 1215    finally
 1216    {
 1217            if (backendRS != null && proc.getFetchSize() == 0)
 1219      {
 1220        try
 1221        {
 1222          backendRS.close();
 1223        }
 1224        catch (SQLException
  ignore) 1225        {
 1226        }
 1227      }
 1228      backend.removePendingRequest(proc);
 1229    }
 1230  }
 1231
 1232
 1245  public static final ExecuteUpdateResult executeCallableStatementExecuteUpdateOnBackend(
 1246      StoredProcedure proc, DatabaseBackend backend,
 1247      BackendWorkerThread workerThread, Connection
  c) throws SQLException  , 1248      BadConnectionException
 1249  {
 1250    CallableStatement
  cs = null; 1251    try
 1252    {
 1253      backend.addPendingWriteRequest(proc);
 1254
 1255      cs = setupCallableStatement(proc, backend, workerThread, c, false);
 1256
 1257            ExecuteUpdateResult eur = new ExecuteUpdateResult(cs.executeUpdate());
 1259            if (proc.getRetrieveSQLWarnings())
 1261        eur.setStatementWarnings(cs.getWarnings());
 1262
 1263      fetchOutAndNamedParameters(cs, proc);
 1264
 1265      if (proc.requiresConnectionPoolFlush())
 1266        backend.flagAllConnectionsForRenewal();
 1267
 1268      return eur;
 1269    }
 1270    catch (SQLException
  e) 1271    {       if (backend.isValidConnection(c))
 1273        throw e;       else
 1275        throw new BadConnectionException(e);
 1276    }
 1277    finally
 1278    {
 1279      backend.removePendingRequest(proc);
 1280      try
 1281      {
 1282        if (cs != null)
 1283          cs.close();
 1284      }
 1285      catch (SQLException
  ignore) 1286      {
 1287      }
 1288    }
 1289  }
 1290
 1291
 1306  public static final ExecuteResult executeCallableStatementExecuteOnBackend(
 1307      StoredProcedure proc, DatabaseBackend backend,
 1308      BackendWorkerThread workerThread, Connection
  c, 1309      MetadataCache metadataCache) throws SQLException
  , BadConnectionException 1310  {
 1311    CallableStatement
  cs = null; 1312    try
 1313    {
 1314      backend.addPendingWriteRequest(proc);
 1315
 1316            proc.setFetchSize(0);
 1318
 1319      cs = setupCallableStatement(proc, backend, workerThread, c, true);
 1320
 1321            boolean hasResult = cs.execute();
 1323      int updatedRows = 0;
 1324            ExecuteResult result = new ExecuteResult();
 1326            if (proc.getRetrieveSQLWarnings())
 1328        result.setStatementWarnings(cs.getWarnings());
 1329      do
 1330      {
 1331        if (hasResult)
 1332        {
 1333          ControllerResultSet crs = new ControllerResultSet(proc, cs
 1334              .getResultSet(), metadataCache, null, true);
 1335          result.addResult(crs);
 1336        }
 1337        else
 1338        {
 1339          updatedRows = cs.getUpdateCount();
 1340          result.addResult(updatedRows);
 1341        }
 1342        if (updatedRows != -1)
 1343          hasResult = cs.getMoreResults();
 1344      }
 1345      while (hasResult || (updatedRows != -1));
 1346
 1347      fetchOutAndNamedParameters(cs, proc);
 1348
 1349      if (proc.requiresConnectionPoolFlush())
 1350        backend.flagAllConnectionsForRenewal();
 1351
 1352      return result;
 1353    }
 1354    catch (SQLException
  e) 1355    {       if (backend.isValidConnection(c))
 1357        throw e;       else
 1359        throw new BadConnectionException(e);
 1360    }
 1361    finally
 1362    {
 1363      backend.removePendingRequest(proc);
 1364      try
 1365      {
 1366        if (cs != null)
 1367          cs.close();
 1368      }
 1369      catch (SQLException
  ignore) 1370      {
 1371      }
 1372    }
 1373  }
 1374
 1375
 1385  public static final ControllerResultSet preparedStatementGetMetaDataOnBackend(
 1386      String
  sqlTemplate, DatabaseBackend backend, Connection  c) 1387      throws SQLException
  , BadConnectionException 1388  {
 1389    try
 1390    {
 1391      PreparedStatement
  ps = c.prepareStatement(sqlTemplate); 1392      return new ControllerResultSet(ControllerConstants.CONTROLLER_FACTORY
 1393          .getResultSetMetaDataFactory().copyResultSetMetaData(
 1394              ps.getMetaData(), null), new ArrayList
  ()); 1395    }
 1396    catch (SQLException
  e) 1397    {       if (backend.isValidConnection(c))
 1399        throw e;       else
 1401        throw new BadConnectionException(e);
 1402    }
 1403  }
 1404
 1405
 1409
 1415  public abstract void abort(TransactionMetaData tm) throws SQLException
  ; 1416
 1417
 1423  public abstract void begin(TransactionMetaData tm) throws SQLException
  ; 1424
 1425
 1433  public abstract void commit(TransactionMetaData tm)
 1434      throws AllBackendsFailedException, SQLException
  ; 1435
 1436
 1444  public abstract void rollback(TransactionMetaData tm)
 1445      throws AllBackendsFailedException, SQLException
  ; 1446
 1447
 1456  public abstract void rollbackToSavepoint(TransactionMetaData tm,
 1457      String
  savepointName) throws AllBackendsFailedException, SQLException  ; 1458
 1459
 1468  public abstract void setSavepoint(TransactionMetaData tm, String
  name) 1469      throws AllBackendsFailedException, SQLException
  ; 1470
 1471
 1480  public abstract void releaseSavepoint(TransactionMetaData tm, String
  name) 1481      throws AllBackendsFailedException, SQLException
  ; 1482
 1483
 1490  public abstract void closePersistentConnection(String
  login, 1491      long persistentConnectionId) throws SQLException
  ; 1492
 1493
 1500  public abstract void openPersistentConnection(String
  login, 1501      long persistentConnectionId) throws SQLException
  ; 1502
 1503
 1520  public static final Connection
  getConnectionAndBeginTransaction( 1521      DatabaseBackend backend, AbstractConnectionManager cm,
 1522      AbstractRequest request) throws SQLException
  , UnreachableBackendException 1523  {
 1524    PooledConnection pc = null;
 1525    boolean isConnectionValid = false;
 1526    Connection
  c; 1527
 1528    do
 1529    {
 1530      if (request.isPersistentConnection())
 1531      {                 pc = cm.retrieveConnectionInAutoCommit(request);
 1534        cm.registerConnectionForTransaction(pc, request.getTransactionId());
 1535      }
 1536      else
 1537      {         pc = cm.getConnectionForTransaction(request.getTransactionId());
 1539      }
 1540
 1541            if (pc == null)
 1543      {
 1544        throw new UnreachableBackendException(Translate.get(
 1545            "loadbalancer.unable.get.connection", new String
  []{ 1546                String.valueOf(request.getTransactionId()), backend.getName()}));
 1547      }
 1548      c = pc.getConnection();
 1549      try
 1550      {
 1551        if (request.getTransactionIsolation() != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL)
 1552        {
 1553
 1557          pc.setTransactionIsolation(request.getTransactionIsolation());
 1558        }
 1559        else if (defaultTransactionIsolationLevel != org.continuent.sequoia.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL)
 1560        {
 1561
 1566          pc.setTransactionIsolation(defaultTransactionIsolationLevel);
 1567        }
 1568
 1569        c.setAutoCommit(false);
 1570        isConnectionValid = true;
 1571      }
 1572      catch (SQLException
  e) 1573      {
 1574        if (backend.isValidConnection(c))
 1575          throw e;         else
 1577        {
 1578          cm.deleteConnection(request.getTransactionId());
 1579          if (request.isPersistentConnection())
 1580          {
 1581            cm.deletePersistentConnection(request.getPersistentConnectionId());
 1582          }
 1583        }
 1584      }
 1585    }
 1586    while (!isConnectionValid);
 1587    if (pc == null)
 1588    {
 1589      if (logger.isErrorEnabled())
 1590      {
 1591        logger.error("Got a null connection [backend=" + backend.getName()
 1592            + ", tid=" + request.getTransactionId() + "]");
 1593      }
 1594    }
 1595    return c;
 1596  }
 1597
 1598
 1602
 1611  public abstract void enableBackend(DatabaseBackend db, boolean writeEnabled)
 1612      throws SQLException
  ; 1613
 1614
 1624  public abstract void disableBackend(DatabaseBackend db, boolean forceDisable)
 1625      throws SQLException
  ; 1626
 1627
 1633  public int getNumberOfEnabledBackends()
 1634  {
 1635    return enabledBackends.size();
 1636  }
 1637
 1638
 1643  public abstract String
  getInformation(); 1644
 1645
 1650  public abstract String
  getXmlImpl(); 1651
 1652
 1655  public String
  getXml() 1656  {
 1657    StringBuffer
  info = new StringBuffer  (); 1658    info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + " "
 1659        + DatabasesXmlTags.ATT_transactionIsolation + "=\"");
 1660    switch (defaultTransactionIsolationLevel)
 1661    {
 1662      case Connection.TRANSACTION_READ_UNCOMMITTED :
 1663        info.append(DatabasesXmlTags.VAL_readUncommitted);
 1664        break;
 1665      case Connection.TRANSACTION_READ_COMMITTED :
 1666        info.append(DatabasesXmlTags.VAL_readCommitted);
 1667        break;
 1668      case Connection.TRANSACTION_REPEATABLE_READ :
 1669        info.append(DatabasesXmlTags.VAL_repeatableRead);
 1670        break;
 1671      case Connection.TRANSACTION_SERIALIZABLE :
 1672        info.append(DatabasesXmlTags.VAL_serializable);
 1673        break;
 1674      default :
 1675        info.append(DatabasesXmlTags.VAL_databaseDefault);
 1676        break;
 1677    }
 1678    info.append("\">");
 1679    info.append(getXmlImpl());
 1680    info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">");
 1681    return info.toString();
 1682  }
 1683}
 1684
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |