1 23 24 package org.continuent.sequoia.controller.loadbalancer.raidb2; 25 26 import java.sql.Connection ; 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.List ; 30 31 import org.continuent.sequoia.common.exceptions.BadConnectionException; 32 import org.continuent.sequoia.common.exceptions.NoMoreBackendException; 33 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException; 34 import org.continuent.sequoia.common.exceptions.NotImplementedException; 35 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory; 36 import org.continuent.sequoia.common.exceptions.UnreachableBackendException; 37 import org.continuent.sequoia.common.i18n.Translate; 38 import org.continuent.sequoia.common.log.Trace; 39 import org.continuent.sequoia.common.xml.DatabasesXmlTags; 40 import org.continuent.sequoia.controller.backend.DatabaseBackend; 41 import org.continuent.sequoia.controller.backend.result.ControllerResultSet; 42 import org.continuent.sequoia.controller.backend.result.ExecuteResult; 43 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult; 44 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult; 45 import org.continuent.sequoia.controller.cache.metadata.MetadataCache; 46 import org.continuent.sequoia.controller.connection.AbstractConnectionManager; 47 import org.continuent.sequoia.controller.connection.PooledConnection; 48 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer; 49 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException; 50 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues; 51 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy; 52 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException; 53 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy; 54 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule; 55 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask; 56 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask; 57 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask; 58 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask; 59 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask; 60 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask; 61 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask; 62 import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask; 63 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask; 64 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask; 65 import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask; 66 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask; 67 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask; 68 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels; 69 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 70 import org.continuent.sequoia.controller.requests.AbstractRequest; 71 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 72 import org.continuent.sequoia.controller.requests.ParsingGranularities; 73 import org.continuent.sequoia.controller.requests.SelectRequest; 74 import org.continuent.sequoia.controller.requests.StoredProcedure; 75 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 76 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection; 77 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit; 78 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection; 79 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint; 80 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback; 81 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint; 82 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint; 83 84 97 public abstract class RAIDb2 extends AbstractLoadBalancer 98 { 99 108 protected CreateTablePolicy createTablePolicy; 109 protected static Trace logger = Trace 110 .getLogger("org.continuent.sequoia.controller.loadbalancer.raidb2"); 111 112 115 116 127 public RAIDb2(VirtualDatabase vdb, 128 WaitForCompletionPolicy waitForCompletionPolicy, 129 CreateTablePolicy createTablePolicy) throws Exception 130 { 131 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE); 132 133 this.waitForCompletionPolicy = waitForCompletionPolicy; 134 this.createTablePolicy = createTablePolicy; 135 } 136 137 140 141 149 public abstract ControllerResultSet statementExecuteQuery( 150 SelectRequest request, MetadataCache metadataCache) throws SQLException ; 151 152 162 public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request) 163 throws AllBackendsFailedException, SQLException 164 { 165 return ((StatementExecuteUpdateTask) execWriteRequest(request, false, null)) 166 .getResult(); 167 } 168 169 179 public GeneratedKeysResult statementExecuteUpdateWithKeys( 180 AbstractWriteRequest request, MetadataCache metadataCache) 181 throws AllBackendsFailedException, SQLException 182 { 183 return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(request, 184 true, metadataCache)).getResult(); 185 } 186 187 191 public ExecuteResult statementExecute(AbstractRequest request, 192 MetadataCache metadataCache) throws SQLException , 193 AllBackendsFailedException 194 { 195 throw new NotImplementedException( 196 "Statement.execute() is currently not supported with RAIDb-2"); 197 } 198 199 214 private AbstractTask execWriteRequest(AbstractWriteRequest request, 215 boolean useKeys, MetadataCache metadataCache) 216 throws AllBackendsFailedException, SQLException 217 { 218 handleMacros(request); 220 221 boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true); 223 224 if (request.isLazyTransactionStart()) 226 this.vdb.getRequestManager().logLazyTransactionBegin( 227 request.getTransactionId()); 228 229 if (recoveryLog != null) 231 recoveryLog.logRequestExecuting(request); 232 233 int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String 234 .valueOf(request.getId())); 235 236 List writeList = new ArrayList (); 237 238 if (request.isCreate()) 239 { 240 try 241 { 242 writeList = getBackendsForCreateTableRequest(request.getTableName()); 243 } 244 catch (CreateTableException e) 245 { 246 releaseLockAndUnlockNextQuery(request); 247 throw new SQLException (Translate.get( 248 "loadbalancer.create.table.rule.failed", e.getMessage())); 249 } 250 } 251 else 252 { 253 writeList = getBackendsWithTable(request.getTableName(), nbOfThreads); 254 } 255 256 nbOfThreads = writeList.size(); 257 if (nbOfThreads == 0) 258 { 259 String msg = Translate.get("loadbalancer.execute.no.backend.found", 260 request.getSqlShortForm(vdb.getSqlShortFormLength())); 261 logger.warn(msg); 262 263 releaseLockAndUnlockNextQuery(request); 264 throw new SQLException (msg); 265 } 266 if (logger.isDebugEnabled()) 267 { 268 logger.debug(Translate.get("loadbalancer.execute.on.several", String 269 .valueOf(request.getId()), String.valueOf(nbOfThreads))); 270 } 271 272 AbstractTask task; 274 if (useKeys) 275 task = new StatementExecuteUpdateWithKeysTask(getNbToWait(nbOfThreads), 276 nbOfThreads, request, metadataCache); 277 else 278 task = new StatementExecuteUpdateTask(getNbToWait(nbOfThreads), 279 nbOfThreads, request); 280 281 atomicTaskPostInQueueAndReleaseLock(request, task, writeList, 282 removeFromTotalOrderQueue); 283 284 synchronized (task) 285 { 286 if (!task.hasCompleted()) 287 waitForTaskCompletion(request.getTimeout() * 1000L, String 288 .valueOf(request.getId()), task); 289 290 checkTaskCompletion(task); 291 return task; 292 } 293 } 294 295 304 private List getBackendsWithTable(String tableName, int nbOfThreads) 308 { 309 List backendsWithTable = new ArrayList (); 310 for (int i = 0; i < nbOfThreads; i++) 311 { 312 DatabaseBackend b = (DatabaseBackend) enabledBackends.get(i); 313 if (b.hasTable(tableName)) 314 backendsWithTable.add(b); 315 } 316 return backendsWithTable; 317 } 318 319 328 private List getBackendsWithStartedTransaction(Long tid, int nbOfThreads) 332 { 333 List backendsWithStartedTransaction = new ArrayList (nbOfThreads); 335 for (int i = 0; i < nbOfThreads; i++) 336 { 337 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 338 if (backend.isStartedTransaction(tid)) 339 backendsWithStartedTransaction.add(backend); 340 } 341 return backendsWithStartedTransaction; 342 } 343 344 354 private List getBackendsWithStoredProcedure(String procedureName, 358 int nbOfParameters, int nbOfThreads) 359 { 360 List backendsWithStoredProcedure = new ArrayList (nbOfThreads); 361 for (int i = 0; i < nbOfThreads; i++) 362 { 363 DatabaseBackend b = (DatabaseBackend) enabledBackends.get(i); 364 if (b.hasStoredProcedure(procedureName, nbOfParameters)) 365 backendsWithStoredProcedure.add(b); 366 } 367 return backendsWithStoredProcedure; 368 } 369 370 380 private List getBackendsForCreateTableRequest(String tableName) 382 throws CreateTableException 383 { 384 CreateTableRule rule = createTablePolicy.getTableRule(tableName); 386 if (rule == null) 387 { 388 rule = createTablePolicy.getDefaultRule(); 389 } 390 return rule.getBackends(vdb.getBackends()); 391 } 392 393 403 private ControllerResultSet executeSelectRequestInTransaction( 404 SelectRequest request, DatabaseBackend backend, 405 MetadataCache metadataCache) throws SQLException 406 { 407 AbstractConnectionManager cm = backend.getConnectionManager(request 408 .getLogin()); 409 410 if (cm == null) 411 { 412 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 413 request.getLogin(), backend.getName()); 414 logger.error(msg); 415 throw new SQLException (msg); 416 } 417 418 Connection c; 419 long tid = request.getTransactionId(); 420 421 try 422 { 423 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request, cm); 424 } 425 catch (UnreachableBackendException e1) 426 { 427 logger.error(Translate.get("loadbalancer.backend.disabling.unreachable", 428 backend.getName())); 429 disableBackend(backend, true); 430 throw new SQLException (Translate.get("loadbalancer.backend.unreacheable", 431 backend.getName())); 432 } 433 catch (NoTransactionStartWhenDisablingException e) 434 { 435 String msg = Translate.get("loadbalancer.backend.is.disabling", request 436 .getSqlShortForm(vdb.getSqlShortFormLength()), backend.getName()); 437 logger.error(msg); 438 throw new SQLException (msg); 439 } 440 441 if (c == null) 442 throw new SQLException (Translate.get( 443 "loadbalancer.unable.retrieve.connection", String.valueOf(tid), 444 backend.getName())); 445 446 try 447 { 448 ControllerResultSet rs = executeStatementExecuteQueryOnBackend(request, 449 backend, null, c, metadataCache); 450 if (logger.isDebugEnabled()) 451 { 452 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 453 new String []{String.valueOf(tid), String.valueOf(request.getId()), 454 backend.getName()})); 455 } 456 return rs; 457 } 458 catch (SQLException e) 459 { 460 throw e; 461 } 462 catch (BadConnectionException e) 463 { cm.deleteConnection(tid); 466 String msg = Translate.get( 467 "loadbalancer.backend.disabling.connection.failure", backend 468 .getName()); 469 logger.error(msg); 470 disableBackend(backend, true); 471 throw new SQLException (msg); 472 } 473 catch (Throwable e) 474 { 475 throw new SQLException (Translate.get( 476 "loadbalancer.request.failed.on.backend", new String []{ 477 request.getSqlShortForm(vdb.getSqlShortFormLength()), 478 backend.getName(), e.getMessage()})); 479 } 480 } 481 482 492 private ControllerResultSet executeSelectRequestInAutoCommit( 493 SelectRequest request, DatabaseBackend backend, 494 MetadataCache metadataCache) throws SQLException , 495 UnreachableBackendException 496 { 497 AbstractConnectionManager cm = backend.getConnectionManager(request 498 .getLogin()); 499 500 if (cm == null) 502 { 503 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 504 request.getLogin(), backend.getName()); 505 logger.error(msg); 506 throw new SQLException (msg); 507 } 508 509 ControllerResultSet rs = null; 510 boolean badConnection; 511 do 512 { 513 badConnection = false; 514 PooledConnection c = null; 516 try 517 { 518 c = cm.retrieveConnectionInAutoCommit(request); 519 } 520 catch (UnreachableBackendException e1) 521 { 522 logger.error(Translate.get( 523 "loadbalancer.backend.disabling.unreachable", backend.getName())); 524 disableBackend(backend, true); 525 throw new UnreachableBackendException(Translate.get( 526 "loadbalancer.backend.unreacheable", backend.getName())); 527 } 528 529 if (c == null) 531 throw new UnreachableBackendException("No more connections on backend " 532 + backend.getName()); 533 534 try 536 { 537 rs = executeStatementExecuteQueryOnBackend(request, backend, null, c 538 .getConnection(), metadataCache); 539 cm.releaseConnectionInAutoCommit(request, c); 540 } 541 catch (SQLException e) 542 { 543 cm.releaseConnectionInAutoCommit(request, c); 544 throw new SQLException (Translate.get( 545 "loadbalancer.request.failed.on.backend", new String []{ 546 request.getSqlShortForm(vdb.getSqlShortFormLength()), 547 backend.getName(), e.getMessage()})); 548 } 549 catch (BadConnectionException e) 550 { cm.deleteConnection(c); 552 badConnection = true; 553 } 554 catch (Throwable e) 555 { 556 throw new SQLException (Translate.get( 557 "loadbalancer.request.failed.on.backend", new String []{ 558 request.getSqlShortForm(vdb.getSqlShortFormLength()), 559 backend.getName(), e.getMessage()})); 560 } 561 } 562 while (badConnection); 563 if (logger.isDebugEnabled()) 564 logger.debug(Translate.get("loadbalancer.execute.on", String 565 .valueOf(request.getId()), backend.getName())); 566 return rs; 567 } 568 569 578 protected ControllerResultSet executeReadRequestOnBackend( 579 SelectRequest request, DatabaseBackend backend, 580 MetadataCache metadataCache) throws SQLException , 581 UnreachableBackendException 582 { 583 handleMacros(request); 585 586 if (request.isAutoCommit()) 588 { 589 return executeSelectRequestInAutoCommit(request, backend, metadataCache); 590 } 591 else 592 { 593 return executeSelectRequestInTransaction(request, backend, metadataCache); 594 } 595 } 596 597 protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1; 598 protected static final int CALLABLE_STATEMENT_EXECUTE = 2; 599 600 612 protected Object executeStoredProcedureOnBackend(StoredProcedure proc, 613 boolean isExecuteQuery, DatabaseBackend backend, 614 MetadataCache metadataCache) throws SQLException , 615 UnreachableBackendException 616 { 617 if (proc.isAutoCommit()) 619 { 620 return executeStoredProcedureInAutoCommit(proc, isExecuteQuery, backend, 621 metadataCache); 622 } 623 else 624 { 625 return executeStoredProcedureInTransaction(proc, isExecuteQuery, backend, 626 metadataCache); 627 } 628 } 629 630 643 private Object executeStoredProcedureInTransaction(StoredProcedure proc, 644 boolean isExecuteQuery, DatabaseBackend backend, 645 MetadataCache metadataCache) throws SQLException 646 { 647 AbstractConnectionManager cm = backend 650 .getConnectionManager(proc.getLogin()); 651 652 if (cm == null) 654 { 655 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 656 proc.getLogin(), backend.getName()); 657 logger.error(msg); 658 throw new SQLException (msg); 659 } 660 661 Connection c; 662 long tid = proc.getTransactionId(); 663 664 try 665 { 666 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm); 667 } 668 catch (UnreachableBackendException e1) 669 { 670 logger.error(Translate.get("loadbalancer.backend.disabling.unreachable", 671 backend.getName())); 672 disableBackend(backend, true); 673 throw new SQLException (Translate.get("loadbalancer.backend.unreacheable", 674 backend.getName())); 675 } 676 catch (NoTransactionStartWhenDisablingException e) 677 { 678 String msg = Translate.get("loadbalancer.backend.is.disabling", proc 679 .getSqlShortForm(vdb.getSqlShortFormLength()), backend.getName()); 680 logger.error(msg); 681 throw new SQLException (msg); 682 } 683 684 if (c == null) 686 throw new SQLException (Translate.get( 687 "loadbalancer.unable.retrieve.connection", String.valueOf(tid), 688 backend.getName())); 689 690 try 692 { 693 if (isExecuteQuery) 694 return AbstractLoadBalancer 695 .executeCallableStatementExecuteQueryOnBackend(proc, backend, null, 696 c, metadataCache); 697 else 698 return AbstractLoadBalancer.executeCallableStatementExecuteOnBackend( 699 proc, backend, null, c, metadataCache); 700 } 701 catch (Exception e) 702 { 703 throw new SQLException (Translate.get( 704 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 705 proc.getSqlShortForm(vdb.getSqlShortFormLength()), 706 backend.getName(), e.getMessage()})); 707 } 708 finally 709 { 710 if (logger.isDebugEnabled()) 711 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 712 new String []{String.valueOf(tid), String.valueOf(proc.getId()), 713 backend.getName()})); 714 } 715 } 716 717 730 private Object executeStoredProcedureInAutoCommit(StoredProcedure proc, 731 boolean isExecuteQuery, DatabaseBackend backend, 732 MetadataCache metadataCache) throws SQLException , 733 UnreachableBackendException 734 { 735 AbstractConnectionManager cm = backend 737 .getConnectionManager(proc.getLogin()); 738 739 if (cm == null) 741 { 742 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 743 proc.getLogin(), backend.getName()); 744 logger.error(msg); 745 throw new SQLException (msg); 746 } 747 748 PooledConnection c = null; 750 try 751 { 752 c = cm.retrieveConnectionInAutoCommit(proc); 753 } 754 catch (UnreachableBackendException e1) 755 { 756 logger.error(Translate.get("loadbalancer.backend.disabling.unreachable", 757 backend.getName())); 758 disableBackend(backend, true); 759 throw new UnreachableBackendException(Translate.get( 760 "loadbalancer.backend.unreacheable", backend.getName())); 761 } 762 763 if (c == null) 765 throw new SQLException (Translate.get( 766 "loadbalancer.backend.no.connection", backend.getName())); 767 768 try 770 { 771 if (isExecuteQuery) 772 return AbstractLoadBalancer 773 .executeCallableStatementExecuteQueryOnBackend(proc, backend, null, 774 c.getConnection(), metadataCache); 775 else 776 return AbstractLoadBalancer.executeCallableStatementExecuteOnBackend( 777 proc, backend, null, c.getConnection(), metadataCache); 778 } 779 catch (Exception e) 780 { 781 throw new SQLException (Translate.get( 782 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 783 proc.getSqlShortForm(vdb.getSqlShortFormLength()), 784 backend.getName(), e.getMessage()})); 785 } 786 finally 787 { 788 cm.releaseConnectionInAutoCommit(proc, c); 789 if (logger.isDebugEnabled()) 790 logger.debug(Translate.get("loadbalancer.storedprocedure.on", String 791 .valueOf(proc.getId()), backend.getName())); 792 } 793 } 794 795 799 public ControllerResultSet callableStatementExecuteQuery( 800 StoredProcedure proc, MetadataCache metadataCache) throws SQLException , 801 AllBackendsFailedException 802 { 803 CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure( 804 proc, EXECUTE_QUERY_TASK, metadataCache); 805 return task.getResult(); 806 } 807 808 811 public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc) 812 throws SQLException , AllBackendsFailedException 813 { 814 CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure( 815 proc, EXECUTE_UPDATE_TASK, null); 816 return task.getResult(); 817 } 818 819 823 public ExecuteResult callableStatementExecute(StoredProcedure proc, 824 MetadataCache metadataCache) throws SQLException , 825 AllBackendsFailedException 826 { 827 CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure( 828 proc, EXECUTE_TASK, metadataCache); 829 return task.getResult(); 830 } 831 832 private static final int EXECUTE_QUERY_TASK = 0; 833 private static final int EXECUTE_UPDATE_TASK = 1; 834 private static final int EXECUTE_TASK = 2; 835 836 853 private AbstractTask callStoredProcedure(StoredProcedure proc, int taskType, 854 MetadataCache metadataCache) throws SQLException , 855 AllBackendsFailedException, NoMoreBackendException 856 { 857 boolean removeFromTotalOrderQueue = waitForTotalOrder(proc, true); 859 860 handleMacros(proc); 862 863 int nbOfThreads = acquireLockAndCheckNbOfThreads(proc, String.valueOf(proc 864 .getId())); 865 866 AbstractTask task; 868 switch (taskType) 869 { 870 case EXECUTE_QUERY_TASK : 871 task = new CallableStatementExecuteQueryTask(getNbToWait(nbOfThreads), 872 nbOfThreads, proc, metadataCache); 873 break; 874 case EXECUTE_UPDATE_TASK : 875 task = new CallableStatementExecuteUpdateTask(getNbToWait(nbOfThreads), 876 nbOfThreads, proc); 877 break; 878 case EXECUTE_TASK : 879 task = new CallableStatementExecuteTask(getNbToWait(nbOfThreads), 880 nbOfThreads, proc, metadataCache); 881 break; 882 default : 883 throw new RuntimeException ("Unhandled task type " + taskType 884 + " in callStoredProcedure"); 885 } 886 887 List backendList = getBackendsWithStoredProcedure(proc.getProcedureKey(), 888 proc.getNbOfParameters(), nbOfThreads); 889 890 if (backendList.size() == 0) 891 { 892 releaseLockAndUnlockNextQuery(proc); 893 throw new SQLException (Translate.get( 894 "loadbalancer.backend.no.required.storedprocedure", proc 895 .getProcedureKey())); 896 } 897 898 task.setTotalNb(backendList.size()); 899 atomicTaskPostInQueueAndReleaseLock(proc, task, backendList, 900 removeFromTotalOrderQueue); 901 902 synchronized (task) 903 { 904 if (!task.hasCompleted()) 905 waitForTaskCompletion(proc.getTimeout() * 1000L, String.valueOf(proc 906 .getId()), task); 907 908 checkTaskCompletion(task); 909 return task; 910 } 911 } 912 913 924 private void checkTaskCompletion(AbstractTask task) 925 throws NoMoreBackendException, AllBackendsFailedException, SQLException 926 { 927 AbstractRequest request = task.getRequest(); 928 929 if (task.getSuccess() > 0) 930 return; 931 932 if (task.getFailed() == 0) 936 { 937 throw new NoMoreBackendException(Translate 938 .get("loadbalancer.backendlist.empty")); 939 } 940 941 if (task.getSuccess() == 0) 942 { 943 List exceptions = task.getExceptions(); 945 if (exceptions == null) 946 throw new AllBackendsFailedException(Translate.get( 947 "loadbalancer.request.failed.all", new Object []{request.getType(), 948 String.valueOf(request.getId())})); 949 else 950 { 951 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 952 new Object []{request.getType(), String.valueOf(request.getId())}) 953 + "\n"; 954 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 955 errorMsg); 956 logger.error(ex.getMessage()); 957 throw ex; 958 } 959 } 960 } 961 962 965 public ControllerResultSet getPreparedStatementGetMetaData( 966 AbstractRequest request) throws SQLException 967 { 968 try 970 { 971 vdb.acquireReadLockBackendLists(); 972 } 973 catch (InterruptedException e) 974 { 975 String msg = Translate.get( 976 "loadbalancer.backendlist.acquire.readlock.failed", e); 977 logger.error(msg); 978 throw new SQLException (msg); 979 } 980 981 984 DatabaseBackend backend = null; 985 986 try 989 { 990 ArrayList backends = vdb.getBackends(); 991 int size = backends.size(); 992 993 if (size == 0) 994 throw new SQLException (Translate.get( 995 "loadbalancer.execute.no.backend.available", request.getId())); 996 997 for (int i = 0; i < size; i++) 999 { 1000 DatabaseBackend b = (DatabaseBackend) backends.get(i); 1001 if (b.isReadEnabled()) 1002 { 1003 backend = b; 1004 break; 1005 } 1006 } 1007 } 1008 catch (Throwable e) 1009 { 1010 String msg = Translate.get("loadbalancer.execute.find.backend.failed", 1011 new String []{request.getSqlShortForm(vdb.getSqlShortFormLength()), 1012 e.getMessage()}); 1013 logger.error(msg, e); 1014 throw new SQLException (msg); 1015 } 1016 finally 1017 { 1018 vdb.releaseReadLockBackendLists(); 1019 } 1020 1021 if (backend == null) 1022 throw new NoMoreBackendException(Translate.get( 1023 "loadbalancer.execute.no.backend.enabled", request.getId())); 1024 1025 AbstractConnectionManager cm = backend.getConnectionManager(request 1027 .getLogin()); 1028 1029 if (cm == null) 1031 { 1032 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 1033 new String []{request.getLogin(), backend.getName()}); 1034 logger.error(msg); 1035 throw new SQLException (msg); 1036 } 1037 1038 if (request.isAutoCommit()) 1040 { 1041 ControllerResultSet rs = null; 1042 boolean badConnection; 1043 do 1044 { 1045 badConnection = false; 1046 PooledConnection c = null; 1048 try 1049 { 1050 c = cm.retrieveConnectionInAutoCommit(request); 1051 } 1052 catch (UnreachableBackendException e1) 1053 { 1054 logger.error(Translate.get( 1055 "loadbalancer.backend.disabling.unreachable", backend.getName())); 1056 disableBackend(backend, true); 1057 return getPreparedStatementGetMetaData(request); 1059 } 1060 1061 if (c == null) 1063 throw new SQLException (Translate.get( 1064 "loadbalancer.backend.no.connection", backend.getName())); 1065 1066 try 1068 { 1069 rs = preparedStatementGetMetaDataOnBackend( 1070 request.getSqlOrTemplate(), backend, c.getConnection()); 1071 cm.releaseConnectionInAutoCommit(request, c); 1072 } 1073 catch (SQLException e) 1074 { 1075 cm.releaseConnectionInAutoCommit(request, c); 1076 throw SQLExceptionFactory.getSQLException(e, Translate.get( 1077 "loadbalancer.request.failed.on.backend", new String []{ 1078 request.getSqlShortForm(vdb.getSqlShortFormLength()), 1079 backend.getName(), e.getMessage()})); 1080 } 1081 catch (BadConnectionException e) 1082 { cm.deleteConnection(c); 1084 badConnection = true; 1085 } 1086 catch (Throwable e) 1087 { 1088 cm.releaseConnectionInAutoCommit(request, c); 1089 throw new SQLException (Translate.get( 1090 "loadbalancer.request.failed.on.backend", new String []{ 1091 request.getSqlShortForm(vdb.getSqlShortFormLength()), 1092 backend.getName(), e.getMessage()})); 1093 } 1094 } 1095 while (badConnection); 1096 if (logger.isDebugEnabled()) 1097 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 1098 String.valueOf(request.getId()), backend.getName()})); 1099 return rs; 1100 } 1101 else 1102 { Connection c; 1104 long tid = request.getTransactionId(); 1105 1106 try 1107 { 1108 c = backend 1109 .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm); 1110 } 1111 catch (UnreachableBackendException e1) 1112 { 1113 logger.error(Translate.get( 1114 "loadbalancer.backend.disabling.unreachable", backend.getName())); 1115 disableBackend(backend, true); 1116 throw new NoMoreBackendException(Translate.get( 1117 "loadbalancer.backend.unreacheable", backend.getName())); 1118 } 1119 catch (NoTransactionStartWhenDisablingException e) 1120 { 1121 String msg = Translate.get("loadbalancer.backend.is.disabling", 1122 new String []{request.getSqlShortForm(vdb.getSqlShortFormLength()), 1123 backend.getName()}); 1124 logger.error(msg); 1125 throw new SQLException (msg); 1126 } 1127 1128 if (c == null) 1130 throw new SQLException (Translate.get( 1131 "loadbalancer.unable.retrieve.connection", new String []{ 1132 String.valueOf(tid), backend.getName()})); 1133 1134 ControllerResultSet rs = null; 1136 try 1137 { 1138 rs = preparedStatementGetMetaDataOnBackend(request.getSqlOrTemplate(), 1139 backend, c); 1140 } 1141 catch (SQLException e) 1142 { 1143 throw e; 1144 } 1145 catch (BadConnectionException e) 1146 { cm.deleteConnection(tid); 1149 String msg = Translate.get( 1150 "loadbalancer.backend.disabling.connection.failure", backend 1151 .getName()); 1152 logger.error(msg); 1153 disableBackend(backend, true); 1154 throw new SQLException (msg); 1155 } 1156 catch (Throwable e) 1157 { 1158 throw new SQLException (Translate.get( 1159 "loadbalancer.request.failed.on.backend", new String []{ 1160 request.getSqlShortForm(vdb.getSqlShortFormLength()), 1161 backend.getName(), e.getMessage()})); 1162 } 1163 if (logger.isDebugEnabled()) 1164 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 1165 new String []{String.valueOf(tid), String.valueOf(request.getId()), 1166 backend.getName()})); 1167 return rs; 1168 } 1169 } 1170 1171 1174 1177 public void abort(TransactionMetaData tm) throws SQLException 1178 { 1179 long tid = tm.getTransactionId(); 1180 1181 DistributedRollback toqObject = null; 1182 1189 if (vdb.getTotalOrderQueue() != null) 1190 { 1191 toqObject = new DistributedRollback(tm.getLogin(), tid); 1192 waitForTotalOrder(toqObject, false); 1193 } 1194 1195 String requestDescription = "abort " + tid; 1197 int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1198 1199 boolean rollbackInProgress = false; 1200 synchronized (enabledBackends) 1201 { 1202 for (int i = 0; i < nbOfThreads; i++) 1204 { 1205 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1206 rollbackInProgress = rollbackInProgress 1207 || backend.getTaskQueues().abortAllQueriesForTransaction(tid); 1208 } 1209 } 1210 1211 backendListLock.releaseRead(); 1213 1214 if (rollbackInProgress) 1215 { if (vdb.getTotalOrderQueue() != null) 1217 removeObjectFromAndNotifyTotalOrderQueue(toqObject); 1218 return; 1219 } 1220 1221 rollback(tm); 1222 } 1223 1224 1230 public final void begin(TransactionMetaData tm) throws SQLException 1231 { 1232 } 1233 1234 1240 public void commit(TransactionMetaData tm) throws SQLException 1241 { 1242 long tid = tm.getTransactionId(); 1243 1244 boolean canTakeReadLock = false; 1246 DistributedCommit totalOrderCommit = null; 1247 if (vdb.getTotalOrderQueue() != null) 1248 { 1249 totalOrderCommit = new DistributedCommit(tm.getLogin(), tid); 1254 canTakeReadLock = waitForTotalOrder(totalOrderCommit, false); 1255 if (!canTakeReadLock) 1256 totalOrderCommit = null; 1258 } 1259 1260 int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderCommit, 1262 "commit " + tid); 1263 1264 List commitList = getBackendsWithStartedTransaction(new Long (tid), 1265 nbOfThreads); 1266 1267 int nbOfThreadsToCommit = commitList.size(); 1268 CommitTask task = null; 1269 if (nbOfThreadsToCommit != 0) 1270 { 1271 task = new CommitTask(getNbToWait(nbOfThreadsToCommit), 1272 nbOfThreadsToCommit, tm); 1273 } 1274 1276 synchronized (enabledBackends) 1278 { 1279 for (int i = 0; i < nbOfThreadsToCommit; i++) 1280 { 1281 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1282 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1283 } 1284 } 1285 1286 backendListLock.releaseRead(); 1288 1289 if (totalOrderCommit != null) 1291 removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit); 1292 1293 waitForCommit(task, tm.getTimeout()); 1294 } 1295 1296 1302 public void rollback(TransactionMetaData tm) throws SQLException 1303 { 1304 long tid = tm.getTransactionId(); 1305 1306 DistributedRollback totalOrderRollback = null; 1308 boolean canTakeReadLock = false; 1309 if (vdb.getTotalOrderQueue() != null) 1310 { 1311 totalOrderRollback = new DistributedRollback(tm.getLogin(), tid); 1312 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1317 if (!canTakeReadLock) 1318 totalOrderRollback = null; 1320 } 1321 1322 int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRollback, 1324 "rollback " + tid); 1325 1326 List rollbackList = getBackendsWithStartedTransaction(new Long (tid), 1328 nbOfThreads); 1329 1330 int nbOfThreadsToRollback = rollbackList.size(); 1331 RollbackTask task = null; 1332 if (nbOfThreadsToRollback != 0) 1333 task = new RollbackTask(getNbToWait(nbOfThreadsToRollback), 1334 nbOfThreadsToRollback, tm); 1335 1336 synchronized (enabledBackends) 1338 { 1339 for (int i = 0; i < nbOfThreadsToRollback; i++) 1340 { 1341 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1342 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1343 } 1344 } 1345 1346 backendListLock.releaseRead(); 1348 1349 if (totalOrderRollback != null) 1351 removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback); 1352 1353 waitForRollback(task, tm.getTimeout()); 1354 } 1355 1356 1363 public void rollbackToSavepoint(TransactionMetaData tm, String savepointName) 1364 throws SQLException 1365 { 1366 long tid = tm.getTransactionId(); 1367 1368 DistributedRollbackToSavepoint totalOrderRollback = null; 1370 boolean canTakeReadLock = false; 1371 if (vdb.getTotalOrderQueue() != null) 1372 { 1373 totalOrderRollback = new DistributedRollbackToSavepoint(tid, 1374 savepointName); 1375 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1380 if (!canTakeReadLock) 1381 totalOrderRollback = null; 1383 } 1384 1385 int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRollback, 1387 "rollback " + savepointName + " " + tid); 1388 1389 List rollbackList = getBackendsWithStartedTransaction(new Long (tid), 1391 nbOfThreads); 1392 1393 int nbOfThreadsToRollback = rollbackList.size(); 1394 RollbackToSavepointTask task = null; 1395 if (nbOfThreadsToRollback != 0) 1396 task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback), 1397 nbOfThreadsToRollback, tm, savepointName); 1398 1399 synchronized (enabledBackends) 1401 { 1402 for (int i = 0; i < nbOfThreadsToRollback; i++) 1403 { 1404 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1405 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1406 } 1407 } 1408 1409 backendListLock.releaseRead(); 1411 1412 if (totalOrderRollback != null) 1414 removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback); 1415 1416 waitForRollbackToSavepoint(task, savepointName, tm.getTimeout()); 1417 } 1418 1419 1426 public void releaseSavepoint(TransactionMetaData tm, String savepointName) 1427 throws SQLException 1428 { 1429 long tid = tm.getTransactionId(); 1430 1431 DistributedReleaseSavepoint totalOrderRelease = null; 1433 boolean canTakeReadLock = false; 1434 if (vdb.getTotalOrderQueue() != null) 1435 { 1436 totalOrderRelease = new DistributedReleaseSavepoint(tid, savepointName); 1437 canTakeReadLock = waitForTotalOrder(totalOrderRelease, false); 1442 if (!canTakeReadLock) 1443 totalOrderRelease = null; 1445 } 1446 1447 int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRelease, 1449 "release savepoint " + savepointName + " " + tid); 1450 1451 List savepointList = getBackendsWithStartedTransaction(new Long (tid), 1453 nbOfThreads); 1454 1455 int nbOfSavepoints = savepointList.size(); 1456 ReleaseSavepointTask task = null; 1457 if (nbOfSavepoints != 0) 1458 task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, 1459 tm, savepointName); 1460 1461 synchronized (enabledBackends) 1463 { 1464 for (int i = 0; i < nbOfSavepoints; i++) 1465 { 1466 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1467 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1468 } 1469 } 1470 1471 backendListLock.releaseRead(); 1473 1474 if (totalOrderRelease != null) 1476 removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease); 1477 1478 waitForReleaseSavepoint(task, savepointName, tm.getTimeout()); 1479 } 1480 1481 1488 public void setSavepoint(TransactionMetaData tm, String savepointName) 1489 throws SQLException 1490 { 1491 long tid = tm.getTransactionId(); 1492 1493 DistributedSetSavepoint totalOrderSavepoint = null; 1495 boolean canTakeReadLock = false; 1496 if (vdb.getTotalOrderQueue() != null) 1497 { 1498 totalOrderSavepoint = new DistributedSetSavepoint(tm.getLogin(), tid, 1499 savepointName); 1500 canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false); 1505 if (!canTakeReadLock) 1506 totalOrderSavepoint = null; 1508 } 1509 1510 if (recoveryLog != null) 1512 recoveryLog.logSetSavepoint(tm, savepointName); 1513 1514 String requestDescription = "set savepoint " + savepointName + " " + tid; 1516 int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1517 1518 SavepointTask task = null; 1519 1520 synchronized (enabledBackends) 1522 { 1523 if (nbOfThreads != 0) 1524 { 1525 task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm, 1526 savepointName); 1527 for (int i = 0; i < nbOfThreads; i++) 1528 { 1529 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1530 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1531 } 1532 } 1533 } 1534 1535 backendListLock.releaseRead(); 1537 1538 if (totalOrderSavepoint != null) 1540 removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint); 1541 1542 waitForSavepoint(task, savepointName, tm.getTimeout()); 1543 } 1544 1545 1552 private void waitForCommit(CommitTask task, long timeout) throws SQLException 1556 { 1557 if (task == null) 1559 return; 1560 1561 long tid = task.getTransactionId(); 1562 1563 synchronized (task) 1564 { 1565 if (!task.hasCompleted()) 1566 waitForTaskCompletion(timeout, "commit " + tid, task); 1567 1568 if (task.getSuccess() == 0) 1569 { List exceptions = task.getExceptions(); 1571 if (exceptions == null) 1572 throw new SQLException (Translate.get( 1573 "loadbalancer.commit.all.failed", tid)); 1574 else 1575 { 1576 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1577 Translate.get("loadbalancer.commit.failed.stack", tid) + "\n"); 1578 logger.error(ex.getMessage()); 1579 throw ex; 1580 } 1581 } 1582 } 1583 } 1584 1585 1592 private void waitForRollback(RollbackTask task, long timeout) 1596 throws SQLException 1597 { 1598 if (task == null) 1600 return; 1601 1602 long tid = task.getTransactionId(); 1603 1604 synchronized (task) 1605 { 1606 if (!task.hasCompleted()) 1607 waitForTaskCompletion(timeout, "rollback " + tid, task); 1608 1609 if (task.getSuccess() == 0) 1610 { List exceptions = task.getExceptions(); 1612 if (exceptions == null) 1613 throw new SQLException (Translate.get( 1614 "loadbalancer.rollback.all.failed", tid)); 1615 else 1616 { 1617 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1618 Translate.get("loadbalancer.rollback.failed.stack", tid) + "\n"); 1619 logger.error(ex.getMessage()); 1620 throw ex; 1621 } 1622 } 1623 } 1624 } 1625 1626 1635 private void waitForRollbackToSavepoint(RollbackToSavepointTask task, 1641 String savepoint, long timeout) throws SQLException 1642 { 1643 if (task == null) 1645 return; 1646 1647 long tid = task.getTransactionId(); 1648 1649 synchronized (task) 1650 { 1651 if (!task.hasCompleted()) 1652 waitForTaskCompletion(timeout, "rollback " + savepoint + " " + tid, 1653 task); 1654 1655 if (task.getSuccess() == 0) 1656 { List exceptions = task.getExceptions(); 1658 if (exceptions == null) 1659 throw new SQLException (Translate.get( 1660 "loadbalancer.rollbacksavepoint.all.failed", savepoint, String 1661 .valueOf(tid))); 1662 else 1663 { 1664 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1665 Translate.get("loadbalancer.rollbacksavepoint.failed.stack", 1666 savepoint, String.valueOf(tid)) 1667 + "\n"); 1668 logger.error(ex.getMessage()); 1669 throw ex; 1670 } 1671 } 1672 } 1673 } 1674 1675 1684 private void waitForReleaseSavepoint(ReleaseSavepointTask task, 1690 String savepoint, long timeout) throws SQLException 1691 { 1692 if (task == null) 1694 return; 1695 1696 long tid = task.getTransactionId(); 1697 1698 synchronized (task) 1699 { 1700 if (!task.hasCompleted()) 1701 waitForTaskCompletion(timeout, "release savepoint " + savepoint + " " 1702 + tid, task); 1703 1704 if (task.getSuccess() == 0) 1705 { List exceptions = task.getExceptions(); 1707 if (exceptions == null) 1708 throw new SQLException (Translate.get( 1709 "loadbalancer.releasesavepoint.all.failed", savepoint, String 1710 .valueOf(tid))); 1711 else 1712 { 1713 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1714 Translate.get("loadbalancer.releasesavepoint.failed.stack", 1715 savepoint, String.valueOf(tid)) 1716 + "\n"); 1717 logger.error(ex.getMessage()); 1718 throw ex; 1719 } 1720 } 1721 } 1722 } 1723 1724 1732 private void waitForSavepoint(SavepointTask task, String savepoint, 1738 long timeout) throws SQLException 1739 { 1740 if (task == null) 1742 return; 1743 1744 long tid = task.getTransactionId(); 1745 1746 synchronized (task) 1747 { 1748 if (!task.hasCompleted()) 1749 waitForTaskCompletion(timeout, 1750 "set savepoint " + savepoint + " " + tid, task); 1751 1752 if (task.getSuccess() == 0) 1753 { List exceptions = task.getExceptions(); 1755 if (exceptions == null) 1756 throw new SQLException (Translate.get( 1757 "loadbalancer.setsavepoint.all.failed", savepoint, String 1758 .valueOf(tid))); 1759 else 1760 { 1761 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1762 Translate.get("loadbalancer.setsavepoint.failed.stack", 1763 savepoint, String.valueOf(tid)) 1764 + "\n"); 1765 logger.error(ex.getMessage()); 1766 throw ex; 1767 } 1768 } 1769 } 1770 } 1771 1772 1784 private void atomicTaskPostInQueueAndReleaseLock(AbstractRequest request, 1785 AbstractTask task, List writeList, boolean removeFromTotalOrderQueue) 1786 { 1787 synchronized (enabledBackends) 1788 { 1789 int nbOfThreads = writeList.size(); 1790 for (int i = 0; i < nbOfThreads; i++) 1791 { 1792 BackendTaskQueues queues = ((DatabaseBackend) writeList.get(i)) 1793 .getTaskQueues(); 1794 queues.addTaskToBackendTotalOrderQueue(task); 1795 } 1796 } 1797 1798 backendListLock.releaseRead(); 1799 1800 if (removeFromTotalOrderQueue) 1802 { 1803 removeObjectFromAndNotifyTotalOrderQueue(request); 1804 } 1805 } 1806 1807 1811 public void closePersistentConnection(String login, 1812 long persistentConnectionId) throws SQLException 1813 { 1814 1820 1821 String requestDescription = "closing persistent connection " 1822 + persistentConnectionId; 1823 int nbOfThreads = 0; 1824 1825 DistributedClosePersistentConnection totalOrderQueueObject = null; 1826 1827 boolean removefromTotalOrder = false; 1828 if (vdb.getTotalOrderQueue() != null) 1829 { 1830 totalOrderQueueObject = new DistributedClosePersistentConnection(login, 1831 persistentConnectionId); 1832 removefromTotalOrder = waitForTotalOrder(totalOrderQueueObject, true); 1833 } 1834 1835 ClosePersistentConnectionTask task = null; 1836 try 1837 { 1838 nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1839 1840 task = new ClosePersistentConnectionTask(getNbToWait(nbOfThreads), 1841 nbOfThreads, login, persistentConnectionId); 1842 1843 synchronized (enabledBackends) 1845 { 1846 for (int i = 0; i < nbOfThreads; i++) 1847 { 1848 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1849 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1850 } 1851 } 1852 1853 backendListLock.releaseRead(); 1855 1856 if (removefromTotalOrder) 1857 removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1858 totalOrderQueueObject = null; 1859 1860 synchronized (task) 1861 { 1862 if (!task.hasCompleted()) 1863 try 1864 { 1865 waitForTaskCompletion(0, requestDescription, task); 1866 } 1867 catch (SQLException ignore) 1868 { 1869 } 1870 } 1871 } 1872 finally 1873 { 1874 if (totalOrderQueueObject != null) 1875 { removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1877 } 1878 1879 if (logger.isDebugEnabled()) 1880 logger.debug(requestDescription + " completed on " + nbOfThreads 1881 + " backends."); 1882 } 1883 } 1884 1885 1889 public void openPersistentConnection(String login, long persistentConnectionId) 1890 throws SQLException 1891 { 1892 String requestDescription = "opening persistent connection " 1893 + persistentConnectionId; 1894 int nbOfThreads = 0; 1895 1896 DistributedOpenPersistentConnection totalOrderQueueObject = null; 1897 if (vdb.getTotalOrderQueue() != null) 1898 { 1899 totalOrderQueueObject = new DistributedOpenPersistentConnection(login, 1900 persistentConnectionId); 1901 waitForTotalOrder(totalOrderQueueObject, true); 1902 } 1903 1904 OpenPersistentConnectionTask task = null; 1905 try 1906 { 1907 nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1908 1909 task = new OpenPersistentConnectionTask(getNbToWait(nbOfThreads), 1910 nbOfThreads, login, persistentConnectionId); 1911 1912 synchronized (enabledBackends) 1914 { 1915 for (int i = 0; i < nbOfThreads; i++) 1916 { 1917 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1918 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1919 } 1920 } 1921 1922 backendListLock.releaseRead(); 1924 1925 removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1926 totalOrderQueueObject = null; 1927 1928 synchronized (task) 1929 { 1930 if (!task.hasCompleted()) 1931 try 1932 { 1933 waitForTaskCompletion(0, requestDescription, task); 1934 } 1935 catch (SQLException ignore) 1936 { 1937 } 1938 } 1939 } 1940 finally 1941 { 1942 if (totalOrderQueueObject != null) 1943 { removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1945 } 1946 1947 if (logger.isDebugEnabled()) 1948 logger.debug(requestDescription + " completed on " + nbOfThreads 1949 + " backends."); 1950 } 1951 } 1952 1953 1965 public synchronized void enableBackend(DatabaseBackend db, 1966 boolean writeEnabled) throws SQLException 1967 { 1968 if (!db.isInitialized()) 1969 db.initializeConnections(); 1970 1971 if (writeEnabled && db.isWriteCanBeEnabled()) 1972 { 1973 db.setTaskQueues(new BackendTaskQueues(db, waitForCompletionPolicy, 1975 this.vdb.getRequestManager())); 1976 db.startWorkerThreads(this); 1977 db.enableWrite(); 1978 } 1979 1980 db.enableRead(); 1981 try 1982 { 1983 backendListLock.acquireWrite(); 1984 } 1985 catch (InterruptedException e) 1986 { 1987 logger.error("Error while acquiring write lock in enableBackend", e); 1988 } 1989 synchronized (enabledBackends) 1990 { 1991 enabledBackends.add(db); 1992 } 1993 backendListLock.releaseWrite(); 1994 } 1995 1996 2008 public void disableBackend(DatabaseBackend db, boolean forceDisable) 2009 throws SQLException 2010 { 2011 if (!db.disable()) 2012 { 2013 return; 2015 } 2016 synchronized (this) 2017 { 2018 try 2019 { 2020 backendListLock.acquireWrite(); 2021 } 2022 catch (InterruptedException e) 2023 { 2024 logger.error("Error while acquiring write lock in enableBackend", e); 2025 } 2026 2027 try 2028 { 2029 synchronized (enabledBackends) 2030 { 2031 enabledBackends.remove(db); 2032 if (enabledBackends.isEmpty()) 2033 { 2034 this.vdb.getRequestManager().setDatabaseSchema(null); 2036 } 2037 } 2038 2039 if (!forceDisable) 2040 terminateThreadsAndConnections(db); 2041 } 2042 finally 2043 { 2044 backendListLock.releaseWrite(); 2045 } 2046 2047 if (forceDisable) 2048 { 2049 db.shutdownConnectionManagers(); 2050 terminateThreadsAndConnections(db); 2051 } 2052 2053 if (!db.getActiveTransactions().isEmpty()) 2055 { 2056 if (logger.isWarnEnabled()) 2057 { 2058 logger.warn("Active transactions after backend " + db.getName() 2059 + " is disabled: " + db.getActiveTransactions()); 2060 } 2061 } 2062 2063 } 2064 } 2065 2066 private void terminateThreadsAndConnections(DatabaseBackend db) 2067 throws SQLException 2068 { 2069 db.terminateWorkerThreads(); 2070 2071 if (db.isInitialized()) 2072 db.finalizeConnections(); 2073 } 2074 2075 2078 public String getXmlImpl() 2079 { 2080 StringBuffer info = new StringBuffer (); 2081 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 2082 if (createTablePolicy != null) 2083 info.append(createTablePolicy.getXml()); 2084 if (waitForCompletionPolicy != null) 2085 info.append(waitForCompletionPolicy.getXml()); 2086 if (macroHandler != null) 2087 info.append(macroHandler.getXml()); 2088 this.getRaidb2Xml(); 2089 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 2090 return info.toString(); 2091 } 2092 2093 2098 public abstract String getRaidb2Xml(); 2099 2100} | Popular Tags |