1 24 25 package org.continuent.sequoia.controller.loadbalancer.raidb1; 26 27 import java.sql.Connection ; 28 import java.sql.SQLException ; 29 import java.util.ArrayList ; 30 import java.util.List ; 31 32 import javax.management.ObjectName ; 33 34 import org.continuent.sequoia.common.exceptions.BadConnectionException; 35 import org.continuent.sequoia.common.exceptions.NoMoreBackendException; 36 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException; 37 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory; 38 import org.continuent.sequoia.common.exceptions.UnreachableBackendException; 39 import org.continuent.sequoia.common.i18n.Translate; 40 import org.continuent.sequoia.common.jmx.JmxConstants; 41 import org.continuent.sequoia.common.log.Trace; 42 import org.continuent.sequoia.common.xml.DatabasesXmlTags; 43 import org.continuent.sequoia.controller.backend.DatabaseBackend; 44 import org.continuent.sequoia.controller.backend.result.ControllerResultSet; 45 import org.continuent.sequoia.controller.backend.result.ExecuteResult; 46 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult; 47 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult; 48 import org.continuent.sequoia.controller.cache.metadata.MetadataCache; 49 import org.continuent.sequoia.controller.connection.AbstractConnectionManager; 50 import org.continuent.sequoia.controller.connection.PooledConnection; 51 import org.continuent.sequoia.controller.jmx.MBeanServerManager; 52 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer; 53 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException; 54 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues; 55 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueuesControl; 56 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy; 57 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask; 58 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask; 59 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask; 60 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask; 61 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask; 62 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask; 63 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask; 64 import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask; 65 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask; 66 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask; 67 import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask; 68 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteQueryTask; 69 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteTask; 70 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask; 71 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask; 72 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels; 73 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 74 import org.continuent.sequoia.controller.requests.AbstractRequest; 75 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 76 import org.continuent.sequoia.controller.requests.ParsingGranularities; 77 import org.continuent.sequoia.controller.requests.SelectRequest; 78 import org.continuent.sequoia.controller.requests.StoredProcedure; 79 import org.continuent.sequoia.controller.semantic.SemanticBehavior; 80 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 81 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection; 82 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit; 83 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection; 84 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint; 85 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback; 86 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint; 87 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint; 88 89 99 public abstract class RAIDb1 extends AbstractLoadBalancer 100 { 101 111 protected static Trace logger = Trace 112 .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb1"); 113 114 protected static Trace endUserLogger = Trace 115 .getLogger("org.continuent.sequoia.enduser"); 116 117 120 121 130 public RAIDb1(VirtualDatabase vdb, 131 WaitForCompletionPolicy waitForCompletionPolicy) throws Exception 132 { 133 super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.TABLE); 134 this.waitForCompletionPolicy = waitForCompletionPolicy; 135 } 136 137 140 141 153 public ControllerResultSet statementExecuteQuery(SelectRequest request, 154 MetadataCache metadataCache) throws SQLException , 155 AllBackendsFailedException 156 { 157 if (request.isMustBroadcast()) 158 return execBroadcastReadRequest(request, metadataCache); 159 else 160 return execSingleBackendReadRequest(request, metadataCache); 161 } 162 163 172 public abstract ControllerResultSet execSingleBackendReadRequest( 173 SelectRequest request, MetadataCache metadataCache) throws SQLException ; 174 175 188 private ControllerResultSet execBroadcastReadRequest(SelectRequest request, 189 MetadataCache metadataCache) throws SQLException , 190 AllBackendsFailedException, NoMoreBackendException 191 { 192 handleMacros(request); 194 195 boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true); 197 198 if (request.isLazyTransactionStart()) 200 this.vdb.getRequestManager().logLazyTransactionBegin( 201 request.getTransactionId()); 202 203 if (recoveryLog != null) 205 request.setLogId(recoveryLog.logRequestExecuting(request)); 206 207 int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String 208 .valueOf(request.getId())); 209 210 StatementExecuteQueryTask task = new StatementExecuteQueryTask(1, 212 nbOfThreads, request, metadataCache); 213 214 atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads, 215 removeFromTotalOrderQueue); 216 217 synchronized (task) 218 { 219 if (!task.hasCompleted()) 220 waitForTaskCompletion(request.getTimeout() * 1000L, String 221 .valueOf(request.getId()), task); 222 223 checkTaskCompletion(task); 224 } 225 226 if (recoveryLog != null) 228 recoveryLog.logRequestCompletion(request.getLogId(), true, request 229 .getExecTimeInMs()); 230 231 return task.getResult(); 232 } 233 234 245 public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request) 246 throws AllBackendsFailedException, NoMoreBackendException, SQLException 247 { 248 return ((StatementExecuteUpdateTask) execWriteRequest(request, false, null)) 249 .getResult(); 250 } 251 252 264 public GeneratedKeysResult statementExecuteUpdateWithKeys( 265 AbstractWriteRequest request, MetadataCache metadataCache) 266 throws AllBackendsFailedException, NoMoreBackendException, SQLException 267 { 268 return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(request, 269 true, metadataCache)).getResult(); 270 } 271 272 276 public ExecuteResult statementExecute(AbstractRequest request, 277 MetadataCache metadataCache) throws SQLException , 278 AllBackendsFailedException 279 { 280 StatementExecuteTask task = (StatementExecuteTask) callStoredProcedure( 281 request, STATEMENT_EXECUTE_TASK, metadataCache); 282 return task.getResult(); 283 } 284 285 300 private AbstractTask execWriteRequest(AbstractWriteRequest request, 301 boolean useKeys, MetadataCache metadataCache) 302 throws AllBackendsFailedException, NoMoreBackendException, SQLException 303 { 304 handleMacros(request); 306 307 boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true); 309 310 if (request.isLazyTransactionStart()) 312 this.vdb.getRequestManager().logLazyTransactionBegin( 313 request.getTransactionId()); 314 315 if (recoveryLog != null) 317 recoveryLog.logRequestExecuting(request); 318 319 int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String 320 .valueOf(request.getId())); 321 322 AbstractTask task; 324 if (useKeys) 325 task = new StatementExecuteUpdateWithKeysTask(getNbToWait(nbOfThreads), 326 nbOfThreads, request, metadataCache); 327 else 328 task = new StatementExecuteUpdateTask(getNbToWait(nbOfThreads), 329 nbOfThreads, request); 330 331 atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads, 332 removeFromTotalOrderQueue); 333 334 try 335 { 336 synchronized (task) 337 { 338 if (!task.hasCompleted()) 339 waitForTaskCompletion(request.getTimeout() * 1000L, String 340 .valueOf(request.getId()), task); 341 342 checkTaskCompletion(task); 343 return task; 344 } 345 } 346 finally 347 { 348 if (!request.isAutoCommit()) 349 { try 351 { 352 this.vdb.getRequestManager().getTransactionMetaData( 353 new Long (request.getTransactionId())); 354 } 355 catch (SQLException e) 356 { logger 359 .info("Concurrent abort detected, re-enforcing abort of transaction " 360 + request.getTransactionId()); 361 abort(new TransactionMetaData(request.getTransactionId(), 0, request 362 .getLogin(), request.isPersistentConnection(), request 363 .getPersistentConnectionId())); 364 } 365 } 366 } 367 } 368 369 protected static final int STATEMENT_EXECUTE_QUERY = 0; 370 protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1; 371 protected static final int CALLABLE_STATEMENT_EXECUTE = 2; 372 373 382 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 383 DatabaseBackend backend, MetadataCache metadataCache) 384 throws SQLException , UnreachableBackendException 385 { 386 handleMacros(request); 388 389 AbstractConnectionManager cm = backend.getConnectionManager(request 391 .getLogin()); 392 393 if (cm == null) 395 { 396 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 397 new String []{request.getLogin(), backend.getName()}); 398 logger.error(msg); 399 throw new SQLException (msg); 400 } 401 402 if (request.isAutoCommit()) 404 { 405 ControllerResultSet rs = null; 406 boolean badConnection; 407 do 408 { 409 badConnection = false; 410 PooledConnection c = null; 412 try 413 { 414 c = cm.retrieveConnectionInAutoCommit(request); 415 } 416 catch (UnreachableBackendException e1) 417 { 418 String msg = Translate.get( 419 "loadbalancer.backend.disabling.unreachable", backend.getName()); 420 logger.error(msg); 421 endUserLogger.error(msg); 422 disableBackend(backend, true); 423 throw new UnreachableBackendException(Translate.get( 424 "loadbalancer.backend.unreacheable", backend.getName())); 425 } 426 427 if (c == null) 429 throw new UnreachableBackendException(Translate.get( 430 "loadbalancer.backend.no.connection", backend.getName())); 431 432 try 434 { 435 rs = executeStatementExecuteQueryOnBackend(request, backend, null, c 436 .getConnection(), metadataCache); 437 cm.releaseConnectionInAutoCommit(request, c); 438 } 439 catch (SQLException e) 440 { 441 cm.releaseConnectionInAutoCommit(request, c); 442 throw SQLExceptionFactory.getSQLException(e, Translate.get( 443 "loadbalancer.request.failed.on.backend", new String []{ 444 request.getSqlShortForm(vdb.getSqlShortFormLength()), 445 backend.getName(), e.getMessage()})); 446 } 447 catch (BadConnectionException e) 448 { cm.deleteConnection(c); 450 if (request.isPersistentConnection()) 451 { 452 cm.deletePersistentConnection(request.getPersistentConnectionId()); 453 } 454 badConnection = true; 455 } 456 catch (UnreachableBackendException e) 457 { 458 String msg = Translate.get( 459 "loadbalancer.backend.disabling.unreachable", backend.getName()); 460 logger.error(msg); 461 endUserLogger.error(msg); 462 disableBackend(backend, true); 463 throw new UnreachableBackendException(Translate.get( 464 "loadbalancer.backend.unreacheable", backend.getName())); 465 } 466 catch (Throwable e) 467 { 468 469 logger.error("Unexpected exception:", e); 470 cm.releaseConnectionInAutoCommit(request, c); 471 throw new SQLException (Translate.get( 472 "loadbalancer.request.failed.on.backend", new String []{ 473 request.getSqlShortForm(vdb.getSqlShortFormLength()), 474 backend.getName(), e.getMessage()})); 475 } 476 } 477 while (badConnection); 478 if (logger.isDebugEnabled()) 479 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 480 String.valueOf(request.getId()), backend.getName()})); 481 return rs; 482 } 483 else 484 { Connection c; 486 long tid = request.getTransactionId(); 487 488 try 489 { 490 c = backend 491 .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm); 492 } 493 catch (UnreachableBackendException e1) 494 { 495 String msg = Translate.get( 496 "loadbalancer.backend.disabling.unreachable", backend.getName()); 497 logger.error(msg); 498 endUserLogger.error(msg); 499 disableBackend(backend, true); 500 throw new UnreachableBackendException(Translate.get( 501 "loadbalancer.backend.unreacheable", backend.getName())); 502 } 503 catch (NoTransactionStartWhenDisablingException e) 504 { 505 String msg = Translate.get("loadbalancer.backend.is.disabling", 506 new String []{request.getSqlShortForm(vdb.getSqlShortFormLength()), 507 backend.getName()}); 508 logger.error(msg); 509 throw new UnreachableBackendException(msg); 510 } 511 512 if (c == null) 514 throw new SQLException (Translate.get( 515 "loadbalancer.unable.retrieve.connection", new String []{ 516 String.valueOf(tid), backend.getName()})); 517 518 ControllerResultSet rs = null; 520 try 521 { 522 rs = executeStatementExecuteQueryOnBackend(request, backend, null, c, 523 metadataCache); 524 } 525 catch (SQLException e) 526 { 527 throw SQLExceptionFactory.getSQLException(e, Translate.get( 528 "loadbalancer.request.failed.on.backend", new String []{ 529 request.getSqlShortForm(vdb.getSqlShortFormLength()), 530 backend.getName(), e.getMessage()})); 531 } 532 catch (BadConnectionException e) 533 { cm.deleteConnection(tid); 536 String msg = Translate.get( 537 "loadbalancer.backend.disabling.connection.failure", backend 538 .getName()); 539 logger.error(msg); 540 endUserLogger.error(msg); 541 disableBackend(backend, true); 542 throw new UnreachableBackendException(msg); 543 } 544 catch (UnreachableBackendException e) 545 { 546 String msg = Translate.get( 547 "loadbalancer.backend.disabling.unreachable", backend.getName()); 548 logger.error(msg); 549 endUserLogger.error(msg); 550 disableBackend(backend, true); 551 throw e; 552 } 553 catch (Throwable e) 554 { 555 logger.error("Unexpected exception:", e); 556 throw new SQLException (Translate.get( 557 "loadbalancer.request.failed.on.backend", new String []{ 558 request.getSqlShortForm(vdb.getSqlShortFormLength()), 559 backend.getName(), e.getMessage()})); 560 } 561 if (logger.isDebugEnabled()) 562 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 563 new String []{String.valueOf(tid), String.valueOf(request.getId()), 564 backend.getName()})); 565 return rs; 566 } 567 } 568 569 581 protected Object executeStoredProcedureOnBackend(StoredProcedure proc, 582 boolean isExecuteQuery, DatabaseBackend backend, 583 MetadataCache metadataCache) throws SQLException , 584 UnreachableBackendException 585 { 586 AbstractConnectionManager cm = backend 588 .getConnectionManager(proc.getLogin()); 589 590 if (cm == null) 592 { 593 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 594 new String []{proc.getLogin(), backend.getName()}); 595 logger.error(msg); 596 throw new SQLException (msg); 597 } 598 599 if (proc.isAutoCommit()) 601 { 602 Object result = null; 603 boolean badConnection; 604 PooledConnection c = null; 605 do 606 { 607 badConnection = false; 608 PooledConnection previousConnection = c; 609 try 611 { 612 c = cm.retrieveConnectionInAutoCommit(proc); 613 } 614 catch (UnreachableBackendException e1) 615 { 616 String msg = Translate.get( 617 "loadbalancer.backend.disabling.unreachable", backend.getName()); 618 logger.error(msg); 619 endUserLogger.error(msg); 620 disableBackend(backend, true); 621 throw new UnreachableBackendException(Translate.get( 622 "loadbalancer.backend.unreacheable", backend.getName())); 623 } 624 625 if (c == null || c == previousConnection) 627 throw new UnreachableBackendException(Translate.get( 628 "loadbalancer.backend.no.connection", backend.getName())); 629 630 try 632 { 633 if (isExecuteQuery) 634 result = AbstractLoadBalancer 635 .executeCallableStatementExecuteQueryOnBackend(proc, backend, 636 null, c.getConnection(), metadataCache); 637 else 638 result = AbstractLoadBalancer 639 .executeCallableStatementExecuteOnBackend(proc, backend, null, 640 c.getConnection(), metadataCache); 641 } 642 catch (BadConnectionException e) 643 { cm.deleteConnection(c); 645 if (proc.isPersistentConnection()) 646 cm.deletePersistentConnection(proc.getPersistentConnectionId()); 647 badConnection = true; 648 } 649 catch (Throwable e) 650 { 651 logger.error("Unexpected exception:", e); 652 throw new SQLException (Translate.get( 653 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 654 proc.getSqlShortForm(vdb.getSqlShortFormLength()), 655 backend.getName(), e.getMessage()})); 656 } 657 finally 658 { 659 cm.releaseConnectionInAutoCommit(proc, c); 660 } 661 } 662 while (badConnection); 663 664 if (logger.isDebugEnabled()) 665 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 666 new String []{String.valueOf(proc.getId()), backend.getName()})); 667 668 return result; 669 } 670 else 671 { Connection c; 673 long tid = proc.getTransactionId(); 674 675 try 676 { 677 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm); 678 } 679 catch (UnreachableBackendException e) 680 { 681 endUserLogger.error(Translate.get( 684 "loadbalancer.backend.disabling.unreachable", backend.getName())); 685 disableBackend(backend, true); 686 throw e; 687 } 688 catch (NoTransactionStartWhenDisablingException e) 689 { 690 String msg = Translate.get("loadbalancer.backend.is.disabling", 691 new String []{proc.getSqlShortForm(vdb.getSqlShortFormLength()), 692 backend.getName()}); 693 logger.error(msg); 694 throw new UnreachableBackendException(msg); 695 } 696 697 if (c == null) 699 throw new SQLException (Translate.get( 700 "loadbalancer.unable.retrieve.connection", new String []{ 701 String.valueOf(tid), backend.getName()})); 702 703 try 705 { 706 if (logger.isDebugEnabled()) 707 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 708 new String []{String.valueOf(tid), String.valueOf(proc.getId()), 709 backend.getName()})); 710 if (isExecuteQuery) 711 return AbstractLoadBalancer 712 .executeCallableStatementExecuteQueryOnBackend(proc, backend, 713 null, c, metadataCache); 714 else 715 return AbstractLoadBalancer.executeCallableStatementExecuteOnBackend( 716 proc, backend, null, c, metadataCache); 717 } 718 catch (BadConnectionException e) 719 { cm.deleteConnection(tid); 722 String msg = Translate.get( 723 "loadbalancer.backend.disabling.connection.failure", backend 724 .getName()); 725 logger.error(msg); 726 endUserLogger.error(msg); 727 disableBackend(backend, true); 728 throw new UnreachableBackendException(msg); 729 } 730 catch (Throwable e) 731 { 732 logger.error("Unexpected exception:", e); 733 throw new SQLException (Translate.get( 734 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 735 proc.getSqlShortForm(vdb.getSqlShortFormLength()), 736 backend.getName(), e.getMessage()})); 737 } 738 } 739 } 740 741 745 public ControllerResultSet callableStatementExecuteQuery( 746 StoredProcedure proc, MetadataCache metadataCache) throws SQLException , 747 AllBackendsFailedException 748 { 749 CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure( 750 proc, EXECUTE_QUERY_TASK, metadataCache); 751 return task.getResult(); 752 } 753 754 757 public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc) 758 throws SQLException , AllBackendsFailedException 759 { 760 CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure( 761 proc, EXECUTE_UPDATE_TASK, null); 762 return task.getResult(); 763 } 764 765 769 public ExecuteResult callableStatementExecute(StoredProcedure proc, 770 MetadataCache metadataCache) throws SQLException , 771 AllBackendsFailedException 772 { 773 CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure( 774 proc, CALLABLE_EXECUTE_TASK, metadataCache); 775 return task.getResult(); 776 } 777 778 private static final int EXECUTE_QUERY_TASK = 0; 779 private static final int EXECUTE_UPDATE_TASK = 1; 780 private static final int CALLABLE_EXECUTE_TASK = 2; 781 private static final int STATEMENT_EXECUTE_TASK = 3; 782 783 801 private AbstractTask callStoredProcedure(AbstractRequest request, 802 int taskType, MetadataCache metadataCache) throws SQLException , 803 AllBackendsFailedException, NoMoreBackendException 804 { 805 handleMacros(request); 807 808 boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true); 810 811 if (request.isLazyTransactionStart()) 813 this.vdb.getRequestManager().logLazyTransactionBegin( 814 request.getTransactionId()); 815 816 if (recoveryLog != null) 818 { 819 boolean mustLog = !request.isReadOnly(); 820 if (taskType != STATEMENT_EXECUTE_TASK) 821 { SemanticBehavior semantic = ((StoredProcedure) request).getSemantic(); 823 mustLog = (semantic == null) || !semantic.isReadOnly(); 824 } 825 if (mustLog) 826 recoveryLog.logRequestExecuting(request); 827 } 828 829 int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String 830 .valueOf(request.getId())); 831 832 AbstractTask task; 834 switch (taskType) 835 { 836 case EXECUTE_QUERY_TASK : 837 task = new CallableStatementExecuteQueryTask(getNbToWait(nbOfThreads), 838 nbOfThreads, (StoredProcedure) request, metadataCache); 839 break; 840 case EXECUTE_UPDATE_TASK : 841 task = new CallableStatementExecuteUpdateTask(getNbToWait(nbOfThreads), 842 nbOfThreads, (StoredProcedure) request); 843 break; 844 case CALLABLE_EXECUTE_TASK : 845 task = new CallableStatementExecuteTask(getNbToWait(nbOfThreads), 846 nbOfThreads, (StoredProcedure) request, metadataCache); 847 break; 848 case STATEMENT_EXECUTE_TASK : 849 task = new StatementExecuteTask(getNbToWait(nbOfThreads), nbOfThreads, 850 (AbstractWriteRequest) request, metadataCache); 851 break; 852 default : 853 throw new RuntimeException ("Unhandled task type " + taskType 854 + " in callStoredProcedure"); 855 } 856 857 atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads, 858 removeFromTotalOrderQueue); 859 860 try 861 { 862 synchronized (task) 863 { 864 if (!task.hasCompleted()) 865 waitForTaskCompletion(request.getTimeout() * 1000L, String 866 .valueOf(request.getId()), task); 867 868 checkTaskCompletion(task); 869 return task; 870 } 871 } 872 finally 873 { 874 if (!request.isAutoCommit()) 875 { try 877 { 878 this.vdb.getRequestManager().getTransactionMetaData( 879 new Long (request.getTransactionId())); 880 } 881 catch (SQLException e) 882 { logger 885 .info("Concurrent abort detected, re-inforcing abort of transaction " 886 + request.getTransactionId()); 887 abort(new TransactionMetaData(request.getTransactionId(), 0, request 888 .getLogin(), request.isPersistentConnection(), request 889 .getPersistentConnectionId())); 890 } 891 } 892 } 893 } 894 895 906 private void checkTaskCompletion(AbstractTask task) 907 throws NoMoreBackendException, AllBackendsFailedException, SQLException 908 { 909 AbstractRequest request = task.getRequest(); 910 911 if (task.getSuccess() > 0) 912 return; 913 914 if (task.getFailed() == 0) 918 { 919 throw new NoMoreBackendException(Translate 920 .get("loadbalancer.backendlist.empty")); 921 } 922 923 if (task.getSuccess() == 0) 924 { 925 List exceptions = task.getExceptions(); 927 if (exceptions == null) 928 throw new AllBackendsFailedException(Translate.get( 929 "loadbalancer.request.failed.all", new Object []{request.getType(), 930 String.valueOf(request.getId())})); 931 else 932 { 933 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 934 new Object []{request.getType(), String.valueOf(request.getId())}) 935 + "\n"; 936 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 937 errorMsg); 938 logger.debug(ex.getMessage()); 940 throw ex; 941 } 942 } 943 } 944 945 948 public ControllerResultSet getPreparedStatementGetMetaData( 949 AbstractRequest request) throws SQLException 950 { 951 try 953 { 954 vdb.acquireReadLockBackendLists(); 955 } 956 catch (InterruptedException e) 957 { 958 String msg = Translate.get( 959 "loadbalancer.backendlist.acquire.readlock.failed", e); 960 logger.error(msg); 961 throw new SQLException (msg); 962 } 963 964 967 DatabaseBackend backend = null; 968 969 try 972 { 973 ArrayList backends = vdb.getBackends(); 974 int size = backends.size(); 975 976 if (size == 0) 977 throw new SQLException (Translate.get( 978 "loadbalancer.execute.no.backend.available", request.getId())); 979 980 for (int i = 0; i < size; i++) 982 { 983 DatabaseBackend b = (DatabaseBackend) backends.get(i); 984 if (b.isReadEnabled()) 985 { 986 backend = b; 987 break; 988 } 989 } 990 } 991 catch (Throwable e) 992 { 993 String msg = Translate.get("loadbalancer.execute.find.backend.failed", 994 new String []{request.getSqlShortForm(vdb.getSqlShortFormLength()), 995 e.getMessage()}); 996 logger.error(msg, e); 997 throw new SQLException (msg); 998 } 999 finally 1000 { 1001 vdb.releaseReadLockBackendLists(); 1002 } 1003 1004 if (backend == null) 1005 throw new NoMoreBackendException(Translate.get( 1006 "loadbalancer.execute.no.backend.enabled", request.getId())); 1007 1008 AbstractConnectionManager cm = backend.getConnectionManager(request 1010 .getLogin()); 1011 1012 if (cm == null) 1014 { 1015 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 1016 new String []{request.getLogin(), backend.getName()}); 1017 logger.error(msg); 1018 throw new SQLException (msg); 1019 } 1020 1021 if (request.isAutoCommit()) 1023 { 1024 ControllerResultSet rs = null; 1025 boolean badConnection; 1026 do 1027 { 1028 badConnection = false; 1029 PooledConnection c = null; 1031 try 1032 { 1033 c = cm.retrieveConnectionInAutoCommit(request); 1034 } 1035 catch (UnreachableBackendException e1) 1036 { 1037 String msg = Translate.get( 1038 "loadbalancer.backend.disabling.unreachable", backend.getName()); 1039 logger.error(msg); 1040 endUserLogger.error(msg); 1041 disableBackend(backend, true); 1042 return getPreparedStatementGetMetaData(request); 1044 } 1045 1046 if (c == null) 1048 throw new SQLException (Translate.get( 1049 "loadbalancer.backend.no.connection", backend.getName())); 1050 1051 try 1053 { 1054 rs = preparedStatementGetMetaDataOnBackend( 1055 request.getSqlOrTemplate(), backend, c.getConnection()); 1056 cm.releaseConnectionInAutoCommit(request, c); 1057 } 1058 catch (SQLException e) 1059 { 1060 cm.releaseConnectionInAutoCommit(request, c); 1061 throw SQLExceptionFactory.getSQLException(e, Translate.get( 1062 "loadbalancer.request.failed.on.backend", new String []{ 1063 request.getSqlShortForm(vdb.getSqlShortFormLength()), 1064 backend.getName(), e.getMessage()})); 1065 } 1066 catch (BadConnectionException e) 1067 { cm.deleteConnection(c); 1069 badConnection = true; 1070 } 1071 catch (Throwable e) 1072 { 1073 cm.releaseConnectionInAutoCommit(request, c); 1074 1075 logger.error("Unexpected exception:", e); 1076 throw new SQLException (Translate.get( 1077 "loadbalancer.request.failed.on.backend", new String []{ 1078 request.getSqlShortForm(vdb.getSqlShortFormLength()), 1079 backend.getName(), e.getMessage()})); 1080 } 1081 } 1082 while (badConnection); 1083 if (logger.isDebugEnabled()) 1084 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 1085 String.valueOf(request.getId()), backend.getName()})); 1086 return rs; 1087 } 1088 else 1089 { Connection c; 1091 long tid = request.getTransactionId(); 1092 1093 try 1094 { 1095 c = backend 1096 .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm); 1097 } 1098 catch (UnreachableBackendException e1) 1099 { 1100 String msg = Translate.get( 1101 "loadbalancer.backend.disabling.unreachable", backend.getName()); 1102 logger.error(msg); 1103 endUserLogger.error(msg); 1104 disableBackend(backend, true); 1105 throw new SQLException (Translate.get( 1106 "loadbalancer.backend.unreacheable", backend.getName())); 1107 } 1108 catch (NoTransactionStartWhenDisablingException e) 1109 { 1110 String msg = Translate.get("loadbalancer.backend.is.disabling", 1111 new String []{request.getSqlShortForm(vdb.getSqlShortFormLength()), 1112 backend.getName()}); 1113 logger.error(msg); 1114 throw new SQLException (msg); 1115 } 1116 1117 if (c == null) 1119 throw new SQLException (Translate.get( 1120 "loadbalancer.unable.retrieve.connection", new String []{ 1121 String.valueOf(tid), backend.getName()})); 1122 1123 ControllerResultSet rs = null; 1125 try 1126 { 1127 rs = preparedStatementGetMetaDataOnBackend(request.getSqlOrTemplate(), 1128 backend, c); 1129 } 1130 catch (SQLException e) 1131 { 1132 throw e; 1133 } 1134 catch (BadConnectionException e) 1135 { cm.deleteConnection(tid); 1138 String msg = Translate.get( 1139 "loadbalancer.backend.disabling.connection.failure", backend 1140 .getName()); 1141 logger.error(msg); 1142 endUserLogger.error(msg); 1143 disableBackend(backend, true); 1144 throw new SQLException (msg); 1145 } 1146 catch (Throwable e) 1147 { 1148 1149 logger.error("Unexpected exception:", e); 1150 throw new SQLException (Translate.get( 1151 "loadbalancer.request.failed.on.backend", new String []{ 1152 request.getSqlShortForm(vdb.getSqlShortFormLength()), 1153 backend.getName(), e.getMessage()})); 1154 } 1155 if (logger.isDebugEnabled()) 1156 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 1157 new String []{String.valueOf(tid), String.valueOf(request.getId()), 1158 backend.getName()})); 1159 return rs; 1160 } 1161 } 1162 1163 1166 1167 1170 public void abort(TransactionMetaData tm) throws SQLException 1171 { 1172 long tid = tm.getTransactionId(); 1173 boolean executeRollback = false; 1174 DistributedRollback toqObject = null; 1175 1182 if (vdb.getTotalOrderQueue() != null) 1183 { 1184 toqObject = new DistributedRollback(tm.getLogin(), tid); 1185 waitForTotalOrder(toqObject, false); 1186 } 1187 1188 try 1189 { 1190 String requestDescription = "abort " + tid; 1192 int nbOfThreads = acquireLockAndCheckNbOfThreads(toqObject, 1193 requestDescription); 1194 1195 boolean rollbackInProgress = false; 1196 synchronized (enabledBackends) 1197 { 1198 for (int i = 0; i < nbOfThreads; i++) 1200 { 1201 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1202 rollbackInProgress = rollbackInProgress 1203 || backend.getTaskQueues().abortAllQueriesForTransaction(tid); 1204 } 1205 } 1206 1207 backendListLock.releaseRead(); 1209 1210 if (rollbackInProgress) 1211 { if (vdb.getTotalOrderQueue() != null) 1213 removeObjectFromAndNotifyTotalOrderQueue(toqObject); 1214 return; 1215 } 1216 1217 executeRollback = true; 1218 rollback(tm); 1219 } 1220 catch (NoMoreBackendException ignore) 1221 { 1222 if (!executeRollback && (recoveryLog != null)) 1223 recoveryLog.logAbort(tm); } 1225 } 1226 1227 1233 public final void begin(TransactionMetaData tm) throws SQLException 1234 { 1235 } 1236 1237 1243 public void commit(TransactionMetaData tm) throws SQLException 1244 { 1245 long tid = tm.getTransactionId(); 1246 Long lTid = new Long (tid); 1247 1248 boolean canTakeReadLock = false; 1250 DistributedCommit totalOrderCommit = null; 1251 if (vdb.getTotalOrderQueue() != null) 1252 { 1253 totalOrderCommit = new DistributedCommit(tm.getLogin(), tid); 1258 canTakeReadLock = waitForTotalOrder(totalOrderCommit, false); 1259 if (!canTakeReadLock) 1260 totalOrderCommit = null; 1262 } 1263 1264 if (recoveryLog != null) 1266 recoveryLog.logCommit(tm); 1267 1268 String requestDescription = "commit " + tid; 1270 int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderCommit, 1271 requestDescription); 1272 1273 ArrayList commitList = new ArrayList (nbOfThreads); 1275 for (int i = 0; i < nbOfThreads; i++) 1276 { 1277 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1278 if (backend.isStartedTransaction(lTid)) 1279 commitList.add(backend); 1280 } 1281 1282 int nbOfThreadsToCommit = commitList.size(); 1283 CommitTask task = null; 1284 if (nbOfThreadsToCommit != 0) 1285 task = new CommitTask(getNbToWait(nbOfThreadsToCommit), 1286 nbOfThreadsToCommit, tm); 1287 1288 synchronized (enabledBackends) 1290 { 1291 for (int i = 0; i < nbOfThreadsToCommit; i++) 1292 { 1293 DatabaseBackend backend = (DatabaseBackend) commitList.get(i); 1294 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1295 } 1296 } 1297 1298 backendListLock.releaseRead(); 1300 1301 if (totalOrderCommit != null) 1303 removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit); 1304 1305 if (task == null) 1307 return; 1308 1309 synchronized (task) 1310 { 1311 if (!task.hasCompleted()) 1312 waitForTaskCompletion(tm.getTimeout(), requestDescription, task); 1313 1314 if (task.getSuccess() == 0) 1315 { List exceptions = task.getExceptions(); 1317 if (exceptions == null) 1318 throw new SQLException (Translate.get( 1319 "loadbalancer.commit.all.failed", tid)); 1320 else 1321 { 1322 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 1323 tid) 1324 + "\n"; 1325 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1326 errorMsg); 1327 logger.error(ex.getMessage()); 1328 throw ex; 1329 } 1330 } 1331 } 1332 } 1333 1334 1340 public void rollback(TransactionMetaData tm) throws SQLException 1341 { 1342 long tid = tm.getTransactionId(); 1343 Long lTid = new Long (tid); 1344 1345 DistributedRollback totalOrderRollback = null; 1347 boolean canTakeReadLock = false; 1348 if (vdb.getTotalOrderQueue() != null) 1349 { 1350 totalOrderRollback = new DistributedRollback(tm.getLogin(), tid); 1351 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1356 if (!canTakeReadLock) 1357 totalOrderRollback = null; 1359 } 1360 1361 if (recoveryLog != null) 1363 recoveryLog.logRollback(tm); 1364 1365 String requestDescription = "rollback " + tid; 1367 int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRollback, 1368 requestDescription); 1369 1370 ArrayList rollbackList = new ArrayList (); 1372 for (int i = 0; i < nbOfThreads; i++) 1373 { 1374 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1375 if (backend.isStartedTransaction(lTid)) 1376 rollbackList.add(backend); 1377 } 1378 1379 int nbOfThreadsToRollback = rollbackList.size(); 1380 RollbackTask task = null; 1381 task = new RollbackTask(getNbToWait(nbOfThreadsToRollback), 1382 nbOfThreadsToRollback, tm); 1383 1384 synchronized (enabledBackends) 1386 { 1387 for (int i = 0; i < nbOfThreadsToRollback; i++) 1388 { 1389 DatabaseBackend backend = (DatabaseBackend) rollbackList.get(i); 1390 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1391 } 1392 } 1393 1394 backendListLock.releaseRead(); 1396 1397 if (totalOrderRollback != null) 1399 removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback); 1400 1401 if (nbOfThreadsToRollback == 0) 1403 return; 1404 1405 synchronized (task) 1406 { 1407 if (!task.hasCompleted()) 1408 waitForTaskCompletion(tm.getTimeout(), requestDescription, task); 1409 1410 if (task.getSuccess() > 0) 1411 return; 1412 1413 List exceptions = task.getExceptions(); 1415 if (exceptions == null) 1416 throw new SQLException (Translate.get( 1417 "loadbalancer.rollback.all.failed", tid)); 1418 else 1419 { 1420 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 1421 tid) 1422 + "\n"; 1423 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1424 errorMsg); 1425 logger.error(ex.getMessage()); 1426 throw ex; 1427 } 1428 } 1429 } 1430 1431 1438 public void rollbackToSavepoint(TransactionMetaData tm, String savepointName) 1439 throws SQLException 1440 { 1441 long tid = tm.getTransactionId(); 1442 Long lTid = new Long (tid); 1443 1444 DistributedRollbackToSavepoint totalOrderRollback = null; 1446 boolean canTakeReadLock = false; 1447 if (vdb.getTotalOrderQueue() != null) 1448 { 1449 totalOrderRollback = new DistributedRollbackToSavepoint(tid, 1450 savepointName); 1451 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1456 if (!canTakeReadLock) 1457 totalOrderRollback = null; 1459 } 1460 1461 if (recoveryLog != null) 1463 recoveryLog.logRollbackToSavepoint(tm, savepointName); 1464 1465 String requestDescription = "rollback " + savepointName + " " + tid; 1467 int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1468 1469 ArrayList rollbackList = new ArrayList (); 1471 for (int i = 0; i < nbOfThreads; i++) 1472 { 1473 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1474 if (backend.isStartedTransaction(lTid)) 1475 rollbackList.add(backend); 1476 } 1477 1478 int nbOfThreadsToRollback = rollbackList.size(); 1479 RollbackToSavepointTask task = null; 1480 if (nbOfThreadsToRollback != 0) 1481 task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback), 1482 nbOfThreadsToRollback, tm, savepointName); 1483 1484 synchronized (enabledBackends) 1486 { 1487 for (int i = 0; i < nbOfThreadsToRollback; i++) 1488 { 1489 DatabaseBackend backend = (DatabaseBackend) rollbackList.get(i); 1490 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1491 } 1492 } 1493 1494 backendListLock.releaseRead(); 1496 1497 if (totalOrderRollback != null) 1499 removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback); 1500 1501 if (task == null) 1503 return; 1504 1505 synchronized (task) 1506 { 1507 if (!task.hasCompleted()) 1508 waitForTaskCompletion(tm.getTimeout(), requestDescription, task); 1509 1510 if (task.getSuccess() == 0) 1511 { List exceptions = task.getExceptions(); 1513 if (exceptions == null) 1514 throw new SQLException (Translate.get( 1515 "loadbalancer.rollbacksavepoint.all.failed", new String []{ 1516 savepointName, String.valueOf(tid)})); 1517 else 1518 { 1519 String errorMsg = Translate.get( 1520 "loadbalancer.rollbacksavepoint.failed.stack", new String []{ 1521 savepointName, String.valueOf(tid)}) 1522 + "\n"; 1523 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1524 errorMsg); 1525 logger.error(ex.getMessage()); 1526 throw ex; 1527 } 1528 } 1529 } 1530 } 1531 1532 1539 public void releaseSavepoint(TransactionMetaData tm, String savepointName) 1540 throws SQLException 1541 { 1542 long tid = tm.getTransactionId(); 1543 Long lTid = new Long (tid); 1544 1545 DistributedReleaseSavepoint totalOrderRelease = null; 1547 boolean canTakeReadLock = false; 1548 if (vdb.getTotalOrderQueue() != null) 1549 { 1550 totalOrderRelease = new DistributedReleaseSavepoint(tid, savepointName); 1551 canTakeReadLock = waitForTotalOrder(totalOrderRelease, false); 1556 if (!canTakeReadLock) 1557 totalOrderRelease = null; 1559 } 1560 1561 if (recoveryLog != null) 1563 recoveryLog.logReleaseSavepoint(tm, savepointName); 1564 1565 String requestDescription = "release savepoint " + savepointName + " " 1567 + tid; 1568 int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1569 1570 ArrayList savepointList = new ArrayList (); 1572 for (int i = 0; i < nbOfThreads; i++) 1573 { 1574 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1575 if (backend.isStartedTransaction(lTid)) 1576 savepointList.add(backend); 1577 } 1578 1579 int nbOfSavepoints = savepointList.size(); 1580 ReleaseSavepointTask task = null; 1581 if (nbOfSavepoints != 0) 1582 task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, 1583 tm, savepointName); 1584 1585 synchronized (enabledBackends) 1587 { 1588 for (int i = 0; i < nbOfSavepoints; i++) 1589 { 1590 DatabaseBackend backend = (DatabaseBackend) savepointList.get(i); 1591 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1592 } 1593 } 1594 1595 backendListLock.releaseRead(); 1597 1598 if (totalOrderRelease != null) 1600 removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease); 1601 1602 if (task == null) 1604 return; 1605 1606 synchronized (task) 1607 { 1608 if (!task.hasCompleted()) 1609 waitForTaskCompletion(tm.getTimeout(), requestDescription, task); 1610 1611 if (task.getSuccess() == 0) 1612 { List exceptions = task.getExceptions(); 1614 if (exceptions == null) 1615 throw new SQLException (Translate.get( 1616 "loadbalancer.releasesavepoint.all.failed", new String []{ 1617 savepointName, String.valueOf(tid)})); 1618 else 1619 { 1620 String errorMsg = Translate.get( 1621 "loadbalancer.releasesavepoint.failed.stack", new String []{ 1622 savepointName, String.valueOf(tid)}) 1623 + "\n"; 1624 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1625 errorMsg); 1626 logger.error(ex.getMessage()); 1627 throw ex; 1628 } 1629 } 1630 } 1631 } 1632 1633 1640 public void setSavepoint(TransactionMetaData tm, String savepointName) 1641 throws SQLException 1642 { 1643 long tid = tm.getTransactionId(); 1644 1645 DistributedSetSavepoint totalOrderSavepoint = null; 1647 boolean canTakeReadLock = false; 1648 if (vdb.getTotalOrderQueue() != null) 1649 { 1650 totalOrderSavepoint = new DistributedSetSavepoint(tm.getLogin(), tid, 1651 savepointName); 1652 canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false); 1657 if (!canTakeReadLock) 1658 totalOrderSavepoint = null; 1660 } 1661 1662 if (recoveryLog != null) 1664 recoveryLog.logSetSavepoint(tm, savepointName); 1665 1666 String requestDescription = "set savepoint " + savepointName + " " + tid; 1668 int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1669 1670 SavepointTask task = null; 1671 1672 synchronized (enabledBackends) 1674 { 1675 if (nbOfThreads != 0) 1676 { 1677 task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm, 1678 savepointName); 1679 for (int i = 0; i < nbOfThreads; i++) 1680 { 1681 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1682 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1683 } 1684 } 1685 } 1686 1687 backendListLock.releaseRead(); 1689 1690 if (totalOrderSavepoint != null) 1692 removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint); 1693 1694 if (task == null) 1696 return; 1697 1698 synchronized (task) 1699 { 1700 if (!task.hasCompleted()) 1701 waitForTaskCompletion(tm.getTimeout(), requestDescription, task); 1702 1703 if (task.getSuccess() == 0) 1704 { List exceptions = task.getExceptions(); 1706 if (exceptions == null) 1707 throw new SQLException (Translate.get( 1708 "loadbalancer.setsavepoint.all.failed", new String []{ 1709 savepointName, String.valueOf(tid)})); 1710 else 1711 { 1712 String errorMsg = Translate.get( 1713 "loadbalancer.setsavepoint.failed.stack", new String []{ 1714 savepointName, String.valueOf(tid)}) 1715 + "\n"; 1716 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1717 errorMsg); 1718 logger.error(ex.getMessage()); 1719 throw ex; 1720 } 1721 } 1722 } 1723 } 1724 1725 1729 1742 private void atomicTaskPostInQueueAndReleaseLock(AbstractRequest request, 1743 AbstractTask task, int nbOfThreads, boolean removeFromTotalOrderQueue) 1744 { 1745 synchronized (enabledBackends) 1746 { 1747 for (int i = 0; i < nbOfThreads; i++) 1748 { 1749 BackendTaskQueues queues = ((DatabaseBackend) enabledBackends.get(i)) 1750 .getTaskQueues(); 1751 queues.addTaskToBackendTotalOrderQueue(task); 1752 } 1753 } 1754 1755 backendListLock.releaseRead(); 1756 1757 if (removeFromTotalOrderQueue) 1759 { 1760 removeObjectFromAndNotifyTotalOrderQueue(request); 1761 } 1762 } 1763 1764 1768 public void closePersistentConnection(String login, 1769 long persistentConnectionId) throws SQLException 1770 { 1771 1777 1778 String requestDescription = "closing persistent connection " 1779 + persistentConnectionId; 1780 int nbOfThreads = 0; 1781 1782 DistributedClosePersistentConnection totalOrderQueueObject = null; 1783 boolean removefromTotalOrder = false; 1784 if (vdb.getTotalOrderQueue() != null) 1785 { 1786 totalOrderQueueObject = new DistributedClosePersistentConnection(login, 1787 persistentConnectionId); 1788 removefromTotalOrder = waitForTotalOrder(totalOrderQueueObject, false); 1789 } 1790 1791 ClosePersistentConnectionTask task = null; 1792 try 1793 { 1794 nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1795 1796 task = new ClosePersistentConnectionTask(getNbToWait(nbOfThreads), 1797 nbOfThreads, login, persistentConnectionId); 1798 1799 synchronized (enabledBackends) 1801 { 1802 for (int i = 0; i < nbOfThreads; i++) 1803 { 1804 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1805 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1806 } 1807 } 1808 1809 backendListLock.releaseRead(); 1811 1812 if (removefromTotalOrder) 1813 removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1814 totalOrderQueueObject = null; 1815 1816 synchronized (task) 1817 { 1818 if (!task.hasCompleted()) 1819 try 1820 { 1821 waitForTaskCompletion(0, requestDescription, task); 1822 } 1823 catch (SQLException ignore) 1824 { 1825 } 1826 } 1827 } 1828 finally 1829 { 1830 if (totalOrderQueueObject != null) 1831 { removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1833 } 1834 1835 if (logger.isDebugEnabled()) 1836 logger.debug(requestDescription + " completed on " + nbOfThreads 1837 + " backends."); 1838 } 1839 } 1840 1841 1845 public void openPersistentConnection(String login, long persistentConnectionId) 1846 throws SQLException 1847 { 1848 String requestDescription = "opening persistent connection " 1849 + persistentConnectionId; 1850 int nbOfThreads = 0; 1851 1852 DistributedOpenPersistentConnection totalOrderQueueObject = null; 1853 if (vdb.getTotalOrderQueue() != null) 1854 { 1855 totalOrderQueueObject = new DistributedOpenPersistentConnection(login, 1856 persistentConnectionId); 1857 waitForTotalOrder(totalOrderQueueObject, true); 1858 } 1859 1860 OpenPersistentConnectionTask task = null; 1861 try 1862 { 1863 nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription); 1864 1865 task = new OpenPersistentConnectionTask(getNbToWait(nbOfThreads), 1866 nbOfThreads, login, persistentConnectionId); 1867 1868 synchronized (enabledBackends) 1870 { 1871 for (int i = 0; i < nbOfThreads; i++) 1872 { 1873 DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i); 1874 backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task); 1875 } 1876 } 1877 1878 backendListLock.releaseRead(); 1880 1881 removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1882 totalOrderQueueObject = null; 1883 1884 synchronized (task) 1885 { 1886 if (!task.hasCompleted()) 1887 try 1888 { 1889 waitForTaskCompletion(0, requestDescription, task); 1890 } 1891 catch (SQLException ignore) 1892 { 1893 } 1894 } 1895 } 1896 finally 1897 { 1898 if (totalOrderQueueObject != null) 1899 { removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject); 1901 } 1902 1903 if (logger.isDebugEnabled()) 1904 logger.debug(requestDescription + " completed on " + nbOfThreads 1905 + " backends."); 1906 } 1907 } 1908 1909 1921 public synchronized void enableBackend(DatabaseBackend db, 1922 boolean writeEnabled) throws SQLException 1923 { 1924 if (!db.isInitialized()) 1925 db.initializeConnections(); 1926 1927 if (writeEnabled && db.isWriteCanBeEnabled()) 1928 { 1929 BackendTaskQueues taskqueues = new BackendTaskQueues(db, 1930 waitForCompletionPolicy, this.vdb.getRequestManager()); 1931 try 1933 { 1934 ObjectName taskQueuesObjectName = JmxConstants 1935 .getBackendTaskQueuesObjectName(db.getVirtualDatabaseName(), db 1936 .getName()); 1937 if (MBeanServerManager.getInstance().isRegistered(taskQueuesObjectName)) 1938 { 1939 MBeanServerManager.unregister(taskQueuesObjectName); 1940 } 1941 MBeanServerManager.registerMBean(new BackendTaskQueuesControl( 1942 taskqueues), taskQueuesObjectName); 1943 } 1944 catch (Exception e) 1945 { 1946 if (logger.isWarnEnabled()) 1947 { 1948 logger.warn("failed to register task queue mbeans for " + db, e); 1949 } 1950 } 1951 db.setTaskQueues(taskqueues); 1952 db.startWorkerThreads(this); 1953 db.startDeadlockDetectionThread(this.vdb); 1954 db.enableWrite(); 1955 } 1956 1957 db.enableRead(); 1958 try 1959 { 1960 backendListLock.acquireWrite(); 1961 } 1962 catch (InterruptedException e) 1963 { 1964 logger.error("Error while acquiring write lock in enableBackend", e); 1965 } 1966 1967 synchronized (enabledBackends) 1968 { 1969 enabledBackends.add(db); 1970 } 1971 1972 backendListLock.releaseWrite(); 1973 } 1974 1975 1987 public void disableBackend(DatabaseBackend db, boolean forceDisable) 1988 throws SQLException 1989 { 1990 if (!db.disable()) 1991 { 1992 return; 1994 } 1995 synchronized (this) 1996 { 1997 try 1998 { 1999 backendListLock.acquireWrite(); 2000 } 2001 catch (InterruptedException e) 2002 { 2003 logger.error("Error while acquiring write lock in enableBackend", e); 2004 } 2005 2006 try 2007 { 2008 synchronized (enabledBackends) 2009 { 2010 enabledBackends.remove(db); 2011 if (enabledBackends.isEmpty()) 2012 { 2013 this.vdb.getRequestManager().setDatabaseSchema(null); 2015 } 2016 } 2017 2018 if (!forceDisable) 2019 terminateThreadsAndConnections(db); 2020 } 2021 finally 2022 { 2023 backendListLock.releaseWrite(); 2024 } 2025 2026 if (forceDisable) 2027 { 2028 db.shutdownConnectionManagers(); 2029 terminateThreadsAndConnections(db); 2030 } 2031 2032 if (!db.getActiveTransactions().isEmpty()) 2034 { 2035 if (logger.isWarnEnabled()) 2036 { 2037 logger.warn("Active transactions after backend " + db.getName() 2038 + " is disabled: " + db.getActiveTransactions()); 2039 } 2040 } 2041 } 2042 } 2043 2044 private void terminateThreadsAndConnections(DatabaseBackend db) 2045 throws SQLException 2046 { 2047 db.terminateWorkerThreads(); 2048 db.terminateDeadlockDetectionThread(); 2049 2050 if (db.isInitialized()) 2051 db.finalizeConnections(); 2052 } 2053 2054 2058 2061 public String getXmlImpl() 2062 { 2063 StringBuffer info = new StringBuffer (); 2064 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 2065 if (waitForCompletionPolicy != null) 2066 info.append(waitForCompletionPolicy.getXml()); 2067 if (macroHandler != null) 2068 info.append(macroHandler.getXml()); 2069 info.append(getRaidb1Xml()); 2070 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 2071 return info.toString(); 2072 } 2073 2074 2081 public abstract String getRaidb1Xml(); 2082} 2083 | Popular Tags |