1 24 25 package org.objectweb.cjdbc.controller.loadbalancer.raidb1; 26 27 import java.sql.Connection ; 28 import java.sql.SQLException ; 29 import java.util.ArrayList ; 30 import java.util.ConcurrentModificationException ; 31 import java.util.Iterator ; 32 33 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 34 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException; 35 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException; 36 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory; 37 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 38 import org.objectweb.cjdbc.common.i18n.Translate; 39 import org.objectweb.cjdbc.common.log.Trace; 40 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 41 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 42 import org.objectweb.cjdbc.common.sql.SelectRequest; 43 import org.objectweb.cjdbc.common.sql.StoredProcedure; 44 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 45 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 46 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 47 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 48 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 49 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 50 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 51 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 52 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 53 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 54 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask; 57 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask; 58 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 59 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask; 60 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask; 61 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask; 62 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask; 63 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask; 64 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 65 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 66 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 67 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 68 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit; 69 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReleaseSavepoint; 70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback; 71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.RollbackToSavepoint; 72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetSavepoint; 73 74 84 public abstract class RAIDb1 extends AbstractLoadBalancer 85 { 86 96 100 protected ArrayList backendBlockingThreads; 101 105 protected ArrayList backendNonBlockingThreads; 106 107 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 108 109 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 110 111 protected WaitForCompletionPolicy waitForCompletionPolicy; 112 protected static Trace logger = Trace 113 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1"); 114 115 118 119 128 public RAIDb1(VirtualDatabase vdb, 129 WaitForCompletionPolicy waitForCompletionPolicy) throws Exception 130 { 131 super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING); 132 this.waitForCompletionPolicy = waitForCompletionPolicy; 133 backendBlockingThreads = new ArrayList (); 134 backendNonBlockingThreads = new ArrayList (); 135 } 136 137 140 141 148 private int getNbToWait(int nbOfThreads) 149 { 150 int nbToWait; 151 switch (waitForCompletionPolicy.getPolicy()) 152 { 153 case WaitForCompletionPolicy.FIRST : 154 nbToWait = 1; 155 break; 156 case WaitForCompletionPolicy.MAJORITY : 157 nbToWait = nbOfThreads / 2 + 1; 158 break; 159 case WaitForCompletionPolicy.ALL : 160 nbToWait = nbOfThreads; 161 break; 162 default : 163 logger 164 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 165 nbToWait = nbOfThreads; 166 break; 167 } 168 return nbToWait; 169 } 170 171 174 public abstract ControllerResultSet execReadRequest(SelectRequest request, 175 MetadataCache metadataCache) throws SQLException ; 176 177 186 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 187 DatabaseBackend backend, MetadataCache metadataCache) 188 throws SQLException , UnreachableBackendException 189 { 190 handleMacros(request); 192 193 AbstractConnectionManager cm = backend.getConnectionManager(request 195 .getLogin()); 196 197 if (cm == null) 199 { 200 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 201 new String []{request.getLogin(), backend.getName()}); 202 logger.error(msg); 203 throw new SQLException (msg); 204 } 205 206 if (request.isAutoCommit()) 208 { 209 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 210 waitForAllWritesToComplete(backend); 214 215 ControllerResultSet rs = null; 216 boolean badConnection; 217 do 218 { 219 badConnection = false; 220 Connection c = null; 222 try 223 { 224 c = cm.getConnection(); 225 } 226 catch (UnreachableBackendException e1) 227 { 228 logger.error(Translate.get( 229 "loadbalancer.backend.disabling.unreachable", backend.getName())); 230 disableBackend(backend); 231 throw new UnreachableBackendException(Translate.get( 232 "loadbalancer.backend.unreacheable", backend.getName())); 233 } 234 235 if (c == null) 237 throw new SQLException (Translate.get( 238 "loadbalancer.backend.no.connection", backend.getName())); 239 240 try 242 { 243 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 244 cm.releaseConnection(c); 245 } 246 catch (SQLException e) 247 { 248 cm.releaseConnection(c); 249 throw SQLExceptionFactory.getSQLException(e, Translate.get( 250 "loadbalancer.request.failed.on.backend", new String []{ 251 request.getSQLShortForm(vdb.getSQLShortFormLength()), 252 backend.getName(), e.getMessage()})); 253 } 254 catch (BadConnectionException e) 255 { cm.deleteConnection(c); 257 badConnection = true; 258 } 259 } 260 while (badConnection); 261 if (logger.isDebugEnabled()) 262 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 263 String.valueOf(request.getId()), backend.getName()})); 264 return rs; 265 } 266 else 267 { Connection c; 269 long tid = request.getTransactionId(); 270 Long lTid = new Long (tid); 271 272 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 274 waitForAllWritesToComplete(backend, request.getTransactionId()); 275 276 try 277 { 278 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 279 request.getTransactionIsolation()); 280 } 281 catch (UnreachableBackendException e1) 282 { 283 logger.error(Translate.get( 284 "loadbalancer.backend.disabling.unreachable", backend.getName())); 285 disableBackend(backend); 286 throw new SQLException (Translate.get( 287 "loadbalancer.backend.unreacheable", backend.getName())); 288 } 289 catch (NoTransactionStartWhenDisablingException e) 290 { 291 String msg = Translate.get("loadbalancer.backend.is.disabling", 292 new String []{request.getSQLShortForm(vdb.getSQLShortFormLength()), 293 backend.getName()}); 294 logger.error(msg); 295 throw new SQLException (msg); 296 } 297 298 if (c == null) 300 throw new SQLException (Translate.get( 301 "loadbalancer.unable.retrieve.connection", new String []{ 302 String.valueOf(tid), backend.getName()})); 303 304 ControllerResultSet rs = null; 306 try 307 { 308 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 309 } 310 catch (SQLException e) 311 { 312 throw SQLExceptionFactory.getSQLException(e, Translate.get( 313 "loadbalancer.request.failed.on.backend", new String []{ 314 request.getSQLShortForm(vdb.getSQLShortFormLength()), 315 backend.getName(), e.getMessage()})); 316 } 317 catch (BadConnectionException e) 318 { cm.deleteConnection(tid); 321 String msg = Translate.get( 322 "loadbalancer.backend.disabling.connection.failure", backend 323 .getName()); 324 logger.error(msg); 325 disableBackend(backend); 326 throw new SQLException (msg); 327 } 328 if (logger.isDebugEnabled()) 329 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 330 new String []{String.valueOf(tid), String.valueOf(request.getId()), 331 backend.getName()})); 332 return rs; 333 } 334 } 335 336 347 public int execWriteRequest(AbstractWriteRequest request) 348 throws AllBackendsFailedException, NoMoreBackendException, SQLException 349 { 350 return ((WriteRequestTask) execWriteRequest(request, false, null)) 351 .getResult(); 352 } 353 354 364 public ControllerResultSet execWriteRequestWithKeys( 365 AbstractWriteRequest request, MetadataCache metadataCache) 366 throws AllBackendsFailedException, SQLException 367 { 368 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 369 metadataCache)).getResult(); 370 } 371 372 387 private AbstractTask execWriteRequest(AbstractWriteRequest request, 388 boolean useKeys, MetadataCache metadataCache) 389 throws AllBackendsFailedException, NoMoreBackendException, SQLException 390 { 391 ArrayList backendThreads; 392 ReadPrioritaryFIFOWriteLock lock; 393 394 boolean canTakeReadLock = waitForTotalOrder(request, true); 399 400 handleMacros(request); 402 403 if (request.mightBlock()) 405 { backendThreads = backendBlockingThreads; 407 lock = backendBlockingThreadsRWLock; 408 } 409 else 410 { backendThreads = backendNonBlockingThreads; 412 lock = backendNonBlockingThreadsRWLock; 413 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 414 && (request.getTransactionId() != 0)) 415 waitForAllWritesToComplete(request.getTransactionId()); 416 } 420 421 try 422 { 423 if (canTakeReadLock) 424 lock.acquireRead(); 425 else 426 lock.acquireWrite(); 427 } 428 catch (InterruptedException e) 429 { 430 String msg = Translate.get( 431 "loadbalancer.backendlist.acquire.writelock.failed", e); 432 logger.error(msg); 433 throw new SQLException (msg); 434 } 435 436 int nbOfThreads = backendThreads.size(); 440 if (nbOfThreads == 0) 441 { 442 if (canTakeReadLock) 443 lock.releaseRead(); 444 else 445 lock.releaseWrite(); 446 447 removeHeadFromAndNotifyTotalOrderQueue(); 449 throw new NoMoreBackendException(Translate 450 .get("loadbalancer.backendlist.empty")); 451 } 452 else 453 { 454 if (logger.isDebugEnabled()) 455 logger.debug(Translate.get("loadbalancer.execute.on.several", 456 new String []{String.valueOf(request.getId()), 457 String.valueOf(nbOfThreads)})); 458 } 459 460 AbstractTask task; 462 if (useKeys) 463 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 464 nbOfThreads, request, metadataCache); 465 else 466 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 467 request); 468 469 474 if (request.isAutoCommit()) 476 { 477 for (int i = 0; i < nbOfThreads; i++) 478 { 479 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 480 .get(i); 481 synchronized (thread) 482 { 483 thread.addTask(task); 484 } 485 } 486 } 487 else 488 { 489 for (int i = 0; i < nbOfThreads; i++) 490 { 491 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 492 .get(i); 493 synchronized (thread) 494 { 495 thread.addTask(task, request.getTransactionId()); 496 } 497 } 498 } 499 500 for (int i = 0; i < nbOfThreads; i++) 502 { 503 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 504 synchronized (thread) 505 { 506 thread.notify(); 507 } 508 } 509 510 if (canTakeReadLock) 512 lock.releaseRead(); 513 else 514 lock.releaseWrite(); 515 516 removeHeadFromAndNotifyTotalOrderQueue(); 518 519 synchronized (task) 520 { 521 if (!task.hasCompleted()) 522 { 523 try 525 { 526 long timeout = request.getTimeout() * 1000; 528 if (timeout > 0) 529 { 530 long start = System.currentTimeMillis(); 531 task.wait(timeout); 532 long end = System.currentTimeMillis(); 533 long remaining = timeout - (end - start); 534 if (remaining <= 0) 535 { 536 if (task.setExpiredTimeout()) 537 { String msg = Translate.get("loadbalancer.request.timeout", 539 new String []{String.valueOf(request.getId()), 540 String.valueOf(task.getSuccess()), 541 String.valueOf(task.getFailed())}); 542 543 logger.warn(msg); 544 throw new SQLException (msg); 545 } 546 } 548 } 550 else 551 task.wait(); 552 } 553 catch (InterruptedException e) 554 { 555 if (task.setExpiredTimeout()) 556 { String msg = Translate.get("loadbalancer.request.timeout", 558 new String []{String.valueOf(request.getId()), 559 String.valueOf(task.getSuccess()), 560 String.valueOf(task.getFailed())}); 561 562 logger.warn(msg); 563 throw new SQLException (msg); 564 } 565 } 567 } 568 569 if (task.getSuccess() > 0) 570 return task; 571 else 572 { ArrayList exceptions = task.getExceptions(); 574 if (exceptions == null) 575 throw new AllBackendsFailedException(Translate.get( 576 "loadbalancer.request.failed.all", request.getId())); 577 else 578 { 579 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 580 request.getId()) 581 + "\n"; 582 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 583 errorMsg); 584 logger.error(ex.getMessage()); 585 throw ex; 586 } 587 } 588 } 589 } 590 591 600 protected ControllerResultSet executeStoredProcedureOnBackend( 601 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) 602 throws SQLException , UnreachableBackendException 603 { 604 AbstractConnectionManager cm = backend 606 .getConnectionManager(proc.getLogin()); 607 608 if (cm == null) 610 { 611 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 612 new String []{proc.getLogin(), backend.getName()}); 613 logger.error(msg); 614 throw new SQLException (msg); 615 } 616 617 if (proc.isAutoCommit()) 619 { 620 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 621 waitForAllWritesToComplete(backend); 625 626 Connection c = null; 628 try 629 { 630 c = cm.getConnection(); 631 } 632 catch (UnreachableBackendException e1) 633 { 634 logger.error(Translate.get( 635 "loadbalancer.backend.disabling.unreachable", backend.getName())); 636 disableBackend(backend); 637 throw new UnreachableBackendException(Translate.get( 638 "loadbalancer.backend.unreacheable", backend.getName())); 639 } 640 641 if (c == null) 643 throw new UnreachableBackendException(Translate.get( 644 "loadbalancer.backend.no.connection", backend.getName())); 645 646 ControllerResultSet rs = null; 648 try 649 { 650 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 651 backend, c, metadataCache); 652 } 653 catch (Exception e) 654 { 655 throw new SQLException (Translate.get( 656 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 657 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 658 backend.getName(), e.getMessage()})); 659 } 660 finally 661 { 662 cm.releaseConnection(c); 663 } 664 if (logger.isDebugEnabled()) 665 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 666 new String []{String.valueOf(proc.getId()), backend.getName()})); 667 return rs; 668 } 669 else 670 { Connection c; 672 long tid = proc.getTransactionId(); 673 Long lTid = new Long (tid); 674 675 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 677 waitForAllWritesToComplete(backend, proc.getTransactionId()); 678 679 try 680 { 681 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 682 proc.getTransactionIsolation()); 683 } 684 catch (UnreachableBackendException e1) 685 { 686 logger.error(Translate.get( 687 "loadbalancer.backend.disabling.unreachable", backend.getName())); 688 disableBackend(backend); 689 throw new SQLException (Translate.get( 690 "loadbalancer.backend.unreacheable", backend.getName())); 691 } 692 catch (NoTransactionStartWhenDisablingException e) 693 { 694 String msg = Translate.get("loadbalancer.backend.is.disabling", 695 new String []{proc.getSQLShortForm(vdb.getSQLShortFormLength()), 696 backend.getName()}); 697 logger.error(msg); 698 throw new SQLException (msg); 699 } 700 701 if (c == null) 703 throw new SQLException (Translate.get( 704 "loadbalancer.unable.retrieve.connection", new String []{ 705 String.valueOf(tid), backend.getName()})); 706 707 ControllerResultSet rs; 709 try 710 { 711 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 712 backend, c, metadataCache); 713 } 714 catch (Exception e) 715 { 716 throw new SQLException (Translate.get( 717 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 718 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 719 backend.getName(), e.getMessage()})); 720 } 721 if (logger.isDebugEnabled()) 722 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 723 new String []{String.valueOf(tid), String.valueOf(proc.getId()), 724 backend.getName()})); 725 return rs; 726 } 727 } 728 729 733 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 734 MetadataCache metadataCache) throws SQLException 735 { 736 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 737 proc, true, metadataCache); 738 return task.getResult(); 739 } 740 741 744 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 745 { 746 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 747 proc, false, null); 748 return task.getResult(); 749 } 750 751 763 private AbstractTask callStoredProcedure(StoredProcedure proc, 764 boolean isRead, MetadataCache metadataCache) throws SQLException 765 { 766 ArrayList backendThreads = backendBlockingThreads; 767 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 768 769 boolean canTakeReadLock = waitForTotalOrder(proc, true); 774 775 handleMacros(proc); 777 778 try 779 { 780 if (canTakeReadLock) 786 lock.acquireRead(); 787 else 788 lock.acquireWrite(); 789 } 790 catch (InterruptedException e) 791 { 792 String msg; 793 msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed", 794 e); 795 logger.error(msg); 796 throw new SQLException (msg); 797 } 798 799 int nbOfThreads = backendThreads.size(); 800 if (nbOfThreads == 0) 801 { 802 if (canTakeReadLock) 803 lock.releaseRead(); 804 else 805 lock.releaseWrite(); 806 807 removeHeadFromAndNotifyTotalOrderQueue(); 809 throw new NoMoreBackendException(Translate 810 .get("loadbalancer.backendlist.empty")); 811 } 812 else 813 { 814 if (logger.isDebugEnabled()) 815 logger.debug(Translate.get("loadbalancer.execute.on.several", 816 new String []{String.valueOf(proc.getId()), 817 String.valueOf(nbOfThreads)})); 818 } 819 820 AbstractTask task; 822 if (isRead) 823 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads, 824 proc, metadataCache); 825 else 826 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads), 827 nbOfThreads, proc); 828 829 for (int i = 0; i < nbOfThreads; i++) 831 { 832 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 833 synchronized (thread) 834 { 835 if (proc.isAutoCommit()) 836 thread.addTask(task); 837 else 838 thread.addTask(task, proc.getTransactionId()); 839 thread.notify(); 840 } 841 } 842 843 if (canTakeReadLock) 844 lock.releaseRead(); 845 else 846 lock.releaseWrite(); 847 848 removeHeadFromAndNotifyTotalOrderQueue(); 850 851 synchronized (task) 852 { 853 if (!task.hasCompleted()) 854 { 855 try 857 { 858 long timeout = proc.getTimeout() * 1000; 860 if (timeout > 0) 861 { 862 long start = System.currentTimeMillis(); 863 task.wait(timeout); 864 long end = System.currentTimeMillis(); 865 long remaining = timeout - (end - start); 866 if (remaining <= 0) 867 { 868 if (task.setExpiredTimeout()) 869 { String msg = Translate.get( 871 "loadbalancer.storedprocedure.timeout", new String []{ 872 String.valueOf(proc.getId()), 873 String.valueOf(task.getSuccess()), 874 String.valueOf(task.getFailed())}); 875 logger.warn(msg); 876 throw new SQLException (msg); 877 } 878 } 880 } 882 else 883 task.wait(); 884 } 885 catch (InterruptedException e) 886 { 887 if (task.setExpiredTimeout()) 888 { String msg = Translate.get("loadbalancer.storedprocedure.timeout", 890 new String []{String.valueOf(proc.getId()), 891 String.valueOf(task.getSuccess()), 892 String.valueOf(task.getFailed())}); 893 logger.warn(msg); 894 throw new SQLException (msg); 895 } 896 } 898 } 899 900 if (task.getSuccess() > 0) 901 return task; 902 else 903 { ArrayList exceptions = task.getExceptions(); 905 if (exceptions == null) 906 throw new SQLException (Translate.get( 907 "loadbalancer.storedprocedure.all.failed", proc.getId())); 908 else 909 { 910 String errorMsg = Translate.get( 911 "loadbalancer.storedprocedure.failed.stack", proc.getId()) 912 + "\n"; 913 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 914 errorMsg); 915 logger.error(ex.getMessage()); 916 throw ex; 917 } 918 } 919 } 920 } 921 922 925 926 932 public final void begin(TransactionMarkerMetaData tm) throws SQLException 933 { 934 } 935 936 942 public void commit(TransactionMarkerMetaData tm) throws SQLException 943 { 944 long tid = tm.getTransactionId(); 945 Long lTid = new Long (tid); 946 ArrayList asynchronousBackends = null; 949 CommitTask task = null; 950 951 Commit totalOrderCommit = null; 953 boolean canTakeReadLock = false; 954 if (vdb.getTotalOrderQueue() != null) 955 { 956 totalOrderCommit = new Commit(tm.getLogin(), tid); 957 canTakeReadLock = waitForTotalOrder(totalOrderCommit, false); 962 if (!canTakeReadLock) 963 totalOrderCommit = null; 965 } 966 967 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 968 { try 970 { 971 if (canTakeReadLock) 972 backendBlockingThreadsRWLock.acquireRead(); 973 else 974 { 975 backendBlockingThreadsRWLock.acquireWrite(); 979 } 980 } 981 catch (InterruptedException e) 982 { 983 String msg = Translate.get( 984 "loadbalancer.backendlist.acquire.writelock.failed", e); 985 logger.error(msg); 986 throw new SQLException (msg); 987 } 988 989 int nbOfThreads = backendBlockingThreads.size(); 990 task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 992 .getTimeout(), tm.getLogin(), tid); 993 994 for (int i = 0; i < nbOfThreads; i++) 995 { 996 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 997 .get(i); 998 if (thread.hasTaskForTransaction(lTid)) 999 { 1000 if (asynchronousBackends == null) 1001 asynchronousBackends = new ArrayList (); 1002 asynchronousBackends.add(thread.getBackend()); 1003 synchronized (thread) 1004 { 1005 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1006 thread.notify(); 1007 } 1008 } 1009 } 1010 1011 if (canTakeReadLock) 1012 backendBlockingThreadsRWLock.releaseRead(); 1013 else 1014 backendBlockingThreadsRWLock.releaseWrite(); 1015 1016 if (asynchronousBackends == null) 1018 task = null; 1019 } 1020 1021 try 1022 { 1023 if (canTakeReadLock) 1024 backendNonBlockingThreadsRWLock.acquireRead(); 1025 else 1026 backendNonBlockingThreadsRWLock.acquireWrite(); 1027 } 1028 catch (InterruptedException e) 1029 { 1030 String msg = Translate.get( 1031 "loadbalancer.backendlist.acquire.writelock.failed", e); 1032 logger.error(msg); 1033 throw new SQLException (msg); 1034 } 1035 1036 int nbOfThreads = backendNonBlockingThreads.size(); 1037 ArrayList commitList = new ArrayList (); 1038 1039 for (int i = 0; i < nbOfThreads; i++) 1041 { 1042 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1043 .get(i); 1044 DatabaseBackend backend = thread.getBackend(); 1045 if (backend.isStartedTransaction(lTid) 1049 && ((asynchronousBackends == null) || (!asynchronousBackends 1050 .contains(backend)))) 1051 commitList.add(thread); 1052 } 1053 1054 int nbOfThreadsToCommit = commitList.size(); 1059 if ((task == null) && (nbOfThreadsToCommit != 0)) 1060 task = new CommitTask(getNbToWait(nbOfThreadsToCommit), 1061 nbOfThreadsToCommit, tm.getTimeout(), tm.getLogin(), tid); 1062 1063 for (int i = 0; i < nbOfThreadsToCommit; i++) 1067 { 1068 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 1069 synchronized (thread) 1070 { 1071 thread.addTask(task, tid); 1072 thread.notify(); 1073 } 1074 } 1075 1076 if (canTakeReadLock) 1077 backendNonBlockingThreadsRWLock.releaseRead(); 1078 else 1079 backendNonBlockingThreadsRWLock.releaseWrite(); 1080 1081 if (totalOrderCommit != null) 1083 removeHeadFromAndNotifyTotalOrderQueue(); 1084 1085 if (task == null) 1087 return; 1088 1089 synchronized (task) 1090 { 1091 if (!task.hasCompleted()) 1092 { 1093 try 1095 { 1096 long timeout = tm.getTimeout(); 1098 if (timeout > 0) 1099 { 1100 long start = System.currentTimeMillis(); 1101 task.wait(timeout); 1102 long end = System.currentTimeMillis(); 1103 long remaining = timeout - (end - start); 1104 if (remaining <= 0) 1105 { 1106 if (task.setExpiredTimeout()) 1107 { String msg = Translate.get("loadbalancer.commit.timeout", 1109 new String []{String.valueOf(tid), 1110 String.valueOf(task.getSuccess()), 1111 String.valueOf(task.getFailed())}); 1112 logger.warn(msg); 1113 throw new SQLException (msg); 1114 } 1115 } 1117 } 1118 else 1119 task.wait(); 1120 } 1121 catch (InterruptedException e) 1122 { 1123 if (task.setExpiredTimeout()) 1124 { String msg = Translate.get("loadbalancer.commit.timeout", 1126 new String []{String.valueOf(tid), 1127 String.valueOf(task.getSuccess()), 1128 String.valueOf(task.getFailed())}); 1129 logger.warn(msg); 1130 throw new SQLException (msg); 1131 } 1132 } 1134 } 1135 1136 if (task.getSuccess() > 0) 1137 return; 1138 else 1139 { ArrayList exceptions = task.getExceptions(); 1141 if (exceptions == null) 1142 throw new SQLException (Translate.get( 1143 "loadbalancer.commit.all.failed", tid)); 1144 else 1145 { 1146 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 1147 tid) 1148 + "\n"; 1149 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1150 errorMsg); 1151 logger.error(ex.getMessage()); 1152 throw ex; 1153 } 1154 } 1155 } 1156 } 1157 1158 1164 public void rollback(TransactionMarkerMetaData tm) throws SQLException 1165 { 1166 long tid = tm.getTransactionId(); 1167 Long lTid = new Long (tid); 1168 ArrayList asynchronousBackends = null; 1171 RollbackTask task = null; 1172 1173 Rollback totalOrderRollback = null; 1175 boolean canTakeReadLock = false; 1176 if (vdb.getTotalOrderQueue() != null) 1177 { 1178 totalOrderRollback = new Rollback(tm.getLogin(), tid); 1179 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1184 if (!canTakeReadLock) 1185 totalOrderRollback = null; 1187 } 1188 1189 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1190 { 1191 try 1192 { 1193 if (canTakeReadLock) 1194 backendBlockingThreadsRWLock.acquireRead(); 1195 else 1196 { 1197 backendBlockingThreadsRWLock.acquireWrite(); 1201 } 1202 } 1203 catch (InterruptedException e) 1204 { 1205 String msg = Translate.get( 1206 "loadbalancer.backendlist.acquire.writelock.failed", e); 1207 logger.error(msg); 1208 throw new SQLException (msg); 1209 } 1210 1211 int nbOfThreads = backendBlockingThreads.size(); 1212 task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1214 .getTimeout(), tm.getLogin(), tid); 1215 1216 for (int i = 0; i < nbOfThreads; i++) 1217 { 1218 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1219 .get(i); 1220 if (thread.hasTaskForTransaction(lTid)) 1221 { 1222 if (asynchronousBackends == null) 1223 asynchronousBackends = new ArrayList (); 1224 asynchronousBackends.add(thread.getBackend()); 1225 synchronized (thread) 1226 { 1227 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1228 thread.notify(); 1229 } 1230 } 1231 } 1232 1233 if (canTakeReadLock) 1234 backendBlockingThreadsRWLock.releaseRead(); 1235 else 1236 backendBlockingThreadsRWLock.releaseWrite(); 1237 1238 if (asynchronousBackends == null) 1240 task = null; 1241 } 1242 1243 try 1244 { 1245 if (canTakeReadLock) 1246 backendNonBlockingThreadsRWLock.acquireRead(); 1247 else 1248 backendNonBlockingThreadsRWLock.acquireWrite(); 1249 } 1250 catch (InterruptedException e) 1251 { 1252 String msg = Translate.get( 1253 "loadbalancer.backendlist.acquire.writelock.failed", e); 1254 logger.error(msg); 1255 throw new SQLException (msg); 1256 } 1257 1258 int nbOfThreads = backendNonBlockingThreads.size(); 1259 ArrayList rollbackList = new ArrayList (); 1260 1261 for (int i = 0; i < nbOfThreads; i++) 1263 { 1264 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1265 .get(i); 1266 DatabaseBackend backend = thread.getBackend(); 1267 if (backend.isStartedTransaction(lTid) 1271 && ((asynchronousBackends == null) || (!asynchronousBackends 1272 .contains(backend)))) 1273 rollbackList.add(thread); 1274 } 1275 1276 int nbOfThreadsToRollback = rollbackList.size(); 1277 if ((task == null) && (nbOfThreadsToRollback != 0)) 1281 task = new RollbackTask(getNbToWait(nbOfThreadsToRollback), 1282 nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid); 1283 1284 for (int i = 0; i < nbOfThreadsToRollback; i++) 1288 { 1289 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 1290 synchronized (thread) 1291 { 1292 thread.addTask(task, tid); 1293 thread.notify(); 1294 } 1295 } 1296 1297 if (canTakeReadLock) 1299 backendNonBlockingThreadsRWLock.releaseRead(); 1300 else 1301 backendNonBlockingThreadsRWLock.releaseWrite(); 1302 1303 if (totalOrderRollback != null) 1305 removeHeadFromAndNotifyTotalOrderQueue(); 1306 1307 if (task == null) 1309 return; 1310 1311 synchronized (task) 1312 { 1313 if (!task.hasCompleted()) 1314 { 1315 try 1317 { 1318 long timeout = tm.getTimeout(); 1320 if (timeout > 0) 1321 { 1322 long start = System.currentTimeMillis(); 1323 task.wait(timeout); 1324 long end = System.currentTimeMillis(); 1325 long remaining = timeout - (end - start); 1326 if (remaining <= 0) 1327 { 1328 if (task.setExpiredTimeout()) 1329 { String msg = Translate.get("loadbalancer.rollback.timeout", 1331 new String []{String.valueOf(tid), 1332 String.valueOf(task.getSuccess()), 1333 String.valueOf(task.getFailed())}); 1334 logger.warn(msg); 1335 throw new SQLException (msg); 1336 } 1337 } 1339 } 1340 else 1341 task.wait(); 1342 } 1343 catch (InterruptedException e) 1344 { 1345 if (task.setExpiredTimeout()) 1346 { String msg = Translate.get("loadbalancer.rollback.timeout", 1348 new String []{String.valueOf(tid), 1349 String.valueOf(task.getSuccess()), 1350 String.valueOf(task.getFailed())}); 1351 logger.warn(msg); 1352 throw new SQLException (msg); 1353 } 1354 } 1356 } 1357 1358 if (task.getSuccess() > 0) 1359 return; 1360 else 1361 { ArrayList exceptions = task.getExceptions(); 1363 if (exceptions == null) 1364 throw new SQLException (Translate.get( 1365 "loadbalancer.rollback.all.failed", tid)); 1366 else 1367 { 1368 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 1369 tid) 1370 + "\n"; 1371 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1372 errorMsg); 1373 logger.error(ex.getMessage()); 1374 throw ex; 1375 } 1376 } 1377 } 1378 } 1379 1380 1387 public void rollback(TransactionMarkerMetaData tm, String savepointName) 1388 throws SQLException 1389 { 1390 long tid = tm.getTransactionId(); 1391 Long lTid = new Long (tid); 1392 ArrayList asynchronousBackends = null; 1395 RollbackToSavepointTask task = null; 1396 1397 RollbackToSavepoint totalOrderRollback = null; 1399 boolean canTakeReadLock = false; 1400 if (vdb.getTotalOrderQueue() != null) 1401 { 1402 totalOrderRollback = new RollbackToSavepoint(tid, savepointName); 1403 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1408 if (!canTakeReadLock) 1409 totalOrderRollback = null; 1411 } 1412 1413 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1414 { 1415 try 1416 { 1417 if (canTakeReadLock) 1418 backendBlockingThreadsRWLock.acquireRead(); 1419 else 1420 { 1421 backendBlockingThreadsRWLock.acquireWrite(); 1425 } 1426 } 1427 catch (InterruptedException e) 1428 { 1429 String msg = Translate.get( 1430 "loadbalancer.backendlist.acquire.writelock.failed", e); 1431 logger.error(msg); 1432 throw new SQLException (msg); 1433 } 1434 1435 int nbOfThreads = backendBlockingThreads.size(); 1436 task = new RollbackToSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, 1438 tm.getTimeout(), tm.getLogin(), tid, savepointName); 1439 1440 for (int i = 0; i < nbOfThreads; i++) 1441 { 1442 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1443 .get(i); 1444 if (thread.hasTaskForTransaction(lTid)) 1445 { 1446 if (asynchronousBackends == null) 1447 asynchronousBackends = new ArrayList (); 1448 asynchronousBackends.add(thread.getBackend()); 1449 synchronized (thread) 1450 { 1451 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1452 thread.notify(); 1453 } 1454 } 1455 } 1456 1457 if (canTakeReadLock) 1458 backendBlockingThreadsRWLock.releaseRead(); 1459 else 1460 backendBlockingThreadsRWLock.releaseWrite(); 1461 1462 if (asynchronousBackends == null) 1464 task = null; 1465 } 1466 1467 try 1468 { 1469 if (canTakeReadLock) 1470 backendNonBlockingThreadsRWLock.acquireRead(); 1471 else 1472 backendNonBlockingThreadsRWLock.acquireWrite(); 1473 } 1474 catch (InterruptedException e) 1475 { 1476 String msg = Translate.get( 1477 "loadbalancer.backendlist.acquire.writelock.failed", e); 1478 logger.error(msg); 1479 throw new SQLException (msg); 1480 } 1481 1482 int nbOfThreads = backendNonBlockingThreads.size(); 1483 ArrayList rollbackList = new ArrayList (); 1484 1485 for (int i = 0; i < nbOfThreads; i++) 1488 { 1489 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1490 .get(i); 1491 DatabaseBackend backend = thread.getBackend(); 1492 if (backend.isStartedTransaction(lTid) 1496 && ((asynchronousBackends == null) || (!asynchronousBackends 1497 .contains(backend)))) 1498 rollbackList.add(thread); 1499 } 1500 1501 int nbOfThreadsToRollback = rollbackList.size(); 1502 if ((task == null) && (nbOfThreadsToRollback != 0)) 1506 task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback), 1507 nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid, 1508 savepointName); 1509 1510 for (int i = 0; i < nbOfThreadsToRollback; i++) 1514 { 1515 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 1516 synchronized (thread) 1517 { 1518 thread.addTask(task, tid); 1519 thread.notify(); 1520 } 1521 } 1522 1523 if (canTakeReadLock) 1525 backendNonBlockingThreadsRWLock.releaseRead(); 1526 else 1527 backendNonBlockingThreadsRWLock.releaseWrite(); 1528 1529 if (totalOrderRollback != null) 1531 removeHeadFromAndNotifyTotalOrderQueue(); 1532 1533 if (task == null) 1535 return; 1536 1537 synchronized (task) 1538 { 1539 if (!task.hasCompleted()) 1540 { 1541 try 1543 { 1544 long timeout = tm.getTimeout(); 1546 if (timeout > 0) 1547 { 1548 long start = System.currentTimeMillis(); 1549 task.wait(timeout); 1550 long end = System.currentTimeMillis(); 1551 long remaining = timeout - (end - start); 1552 if (remaining <= 0) 1553 { 1554 if (task.setExpiredTimeout()) 1555 { String msg = Translate.get( 1557 "loadbalancer.rollbacksavepoint.timeout", new String []{ 1558 savepointName, String.valueOf(tid), 1559 String.valueOf(task.getSuccess()), 1560 String.valueOf(task.getFailed())}); 1561 logger.warn(msg); 1562 throw new SQLException (msg); 1563 } 1564 } 1566 } 1567 else 1568 task.wait(); 1569 } 1570 catch (InterruptedException e) 1571 { 1572 if (task.setExpiredTimeout()) 1573 { String msg = Translate.get( 1575 "loadbalancer.rollbacksavepoint.timeout", new String []{ 1576 savepointName, String.valueOf(tid), 1577 String.valueOf(task.getSuccess()), 1578 String.valueOf(task.getFailed())}); 1579 logger.warn(msg); 1580 throw new SQLException (msg); 1581 } 1582 } 1584 } 1585 1586 if (task.getSuccess() > 0) 1587 return; 1588 else 1589 { ArrayList exceptions = task.getExceptions(); 1591 if (exceptions == null) 1592 throw new SQLException (Translate.get( 1593 "loadbalancer.rollbacksavepoint.all.failed", new String []{ 1594 savepointName, String.valueOf(tid)})); 1595 else 1596 { 1597 String errorMsg = Translate.get( 1598 "loadbalancer.rollbacksavepoint.failed.stack", new String []{ 1599 savepointName, String.valueOf(tid)}) 1600 + "\n"; 1601 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1602 errorMsg); 1603 logger.error(ex.getMessage()); 1604 throw ex; 1605 } 1606 } 1607 } 1608 } 1609 1610 1617 public void releaseSavepoint(TransactionMarkerMetaData tm, String name) 1618 throws SQLException 1619 { 1620 long tid = tm.getTransactionId(); 1621 Long lTid = new Long (tid); 1622 1623 ArrayList asynchronousBackends = null; 1626 ReleaseSavepointTask task = null; 1627 1628 ReleaseSavepoint totalOrderRelease = null; 1630 boolean canTakeReadLock = false; 1631 if (vdb.getTotalOrderQueue() != null) 1632 { 1633 totalOrderRelease = new ReleaseSavepoint(tid, name); 1634 canTakeReadLock = waitForTotalOrder(totalOrderRelease, false); 1639 if (!canTakeReadLock) 1640 totalOrderRelease = null; 1642 } 1643 1644 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1645 { 1646 try 1647 { 1648 if (canTakeReadLock) 1649 backendBlockingThreadsRWLock.acquireRead(); 1650 else 1651 { 1652 backendBlockingThreadsRWLock.acquireWrite(); 1656 } 1657 } 1658 catch (InterruptedException e) 1659 { 1660 String msg = Translate.get( 1661 "loadbalancer.backendlist.acquire.writelock.failed", e); 1662 logger.error(msg); 1663 throw new SQLException (msg); 1664 } 1665 1666 int nbOfThreads = backendBlockingThreads.size(); 1667 1668 task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1670 .getTimeout(), tm.getLogin(), tid, name); 1671 1672 for (int i = 0; i < nbOfThreads; i++) 1673 { 1674 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1675 .get(i); 1676 if (thread.hasTaskForTransaction(lTid)) 1677 { 1678 if (asynchronousBackends == null) 1679 asynchronousBackends = new ArrayList (); 1680 asynchronousBackends.add(thread.getBackend()); 1681 synchronized (thread) 1682 { 1683 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1684 thread.notify(); 1685 } 1686 } 1687 } 1688 1689 if (canTakeReadLock) 1691 backendBlockingThreadsRWLock.releaseRead(); 1692 else 1693 backendBlockingThreadsRWLock.releaseWrite(); 1694 1695 if (asynchronousBackends == null) 1697 task = null; 1698 } 1699 1700 try 1701 { 1702 if (canTakeReadLock) 1703 backendNonBlockingThreadsRWLock.acquireRead(); 1704 else 1705 backendNonBlockingThreadsRWLock.acquireWrite(); 1706 } 1707 catch (InterruptedException e) 1708 { 1709 String msg = Translate.get( 1710 "loadbalancer.backendlist.acquire.writelock.failed", e); 1711 logger.error(msg); 1712 throw new SQLException (msg); 1713 } 1714 1715 int nbOfThreads = backendNonBlockingThreads.size(); 1716 ArrayList savepointList = new ArrayList (); 1717 for (int i = 0; i < nbOfThreads; i++) 1720 { 1721 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1722 .get(i); 1723 DatabaseBackend backend = thread.getBackend(); 1724 if (backend.isStartedTransaction(lTid) 1728 && ((asynchronousBackends == null) || (!asynchronousBackends 1729 .contains(backend)))) 1730 savepointList.add(thread); 1731 } 1732 1733 nbOfThreads = savepointList.size(); 1734 if (nbOfThreads == 0) 1735 { 1736 if (canTakeReadLock) 1737 backendNonBlockingThreadsRWLock.releaseRead(); 1738 else 1739 backendNonBlockingThreadsRWLock.releaseWrite(); 1740 return; 1741 } 1742 1743 if (task == null) 1744 task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1745 .getTimeout(), tm.getLogin(), tid, name); 1746 1747 synchronized (task) 1748 { 1749 for (int i = 0; i < nbOfThreads; i++) 1751 { 1752 BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i); 1753 synchronized (thread) 1754 { 1755 thread.addTask(task, tid); 1756 thread.notify(); 1757 } 1758 } 1759 1760 if (canTakeReadLock) 1762 backendNonBlockingThreadsRWLock.releaseRead(); 1763 else 1764 backendNonBlockingThreadsRWLock.releaseWrite(); 1765 1766 if (totalOrderRelease != null) 1768 removeHeadFromAndNotifyTotalOrderQueue(); 1769 1770 try 1772 { 1773 long timeout = tm.getTimeout(); 1775 if (timeout > 0) 1776 { 1777 long start = System.currentTimeMillis(); 1778 task.wait(timeout); 1779 long end = System.currentTimeMillis(); 1780 long remaining = timeout - (end - start); 1781 if (remaining <= 0) 1782 { 1783 if (task.setExpiredTimeout()) 1784 { String msg = Translate.get( 1786 "loadbalancer.releasesavepoint.timeout", new String []{name, 1787 String.valueOf(tid), String.valueOf(task.getSuccess()), 1788 String.valueOf(task.getFailed())}); 1789 logger.warn(msg); 1790 throw new SQLException (msg); 1791 } 1792 } 1794 } 1795 else 1796 task.wait(); 1797 } 1798 catch (InterruptedException e) 1799 { 1800 if (task.setExpiredTimeout()) 1801 { String msg = Translate.get("loadbalancer.releasesavepoint.timeout", 1803 new String []{name, String.valueOf(tid), 1804 String.valueOf(task.getSuccess()), 1805 String.valueOf(task.getFailed())}); 1806 logger.warn(msg); 1807 throw new SQLException (msg); 1808 } 1809 } 1811 1812 if (task.getSuccess() > 0) 1813 return; 1814 else 1815 { ArrayList exceptions = task.getExceptions(); 1817 if (exceptions == null) 1818 throw new SQLException (Translate.get( 1819 "loadbalancer.releasesavepoint.all.failed", new String []{name, 1820 String.valueOf(tid)})); 1821 else 1822 { 1823 String errorMsg = Translate.get( 1824 "loadbalancer.releasesavepoint.failed.stack", new String []{name, 1825 String.valueOf(tid)}) 1826 + "\n"; 1827 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1828 errorMsg); 1829 logger.error(ex.getMessage()); 1830 throw ex; 1831 } 1832 } 1833 } 1834 } 1835 1836 1843 public void setSavepoint(TransactionMarkerMetaData tm, String name) 1844 throws SQLException 1845 { 1846 long tid = tm.getTransactionId(); 1847 Long lTid = new Long (tid); 1848 1849 ArrayList asynchronousBackends = null; 1852 SavepointTask task = null; 1853 1854 SetSavepoint totalOrderSavepoint = null; 1856 boolean canTakeReadLock = false; 1857 if (vdb.getTotalOrderQueue() != null) 1858 { 1859 totalOrderSavepoint = new SetSavepoint(tid, name); 1860 canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false); 1865 if (!canTakeReadLock) 1866 totalOrderSavepoint = null; 1868 } 1869 1870 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1871 { 1872 try 1873 { 1874 if (canTakeReadLock) 1875 backendBlockingThreadsRWLock.acquireRead(); 1876 else 1877 { 1878 backendBlockingThreadsRWLock.acquireWrite(); 1882 } 1883 } 1884 catch (InterruptedException e) 1885 { 1886 String msg = Translate.get( 1887 "loadbalancer.backendlist.acquire.writelock.failed", e); 1888 logger.error(msg); 1889 throw new SQLException (msg); 1890 } 1891 1892 int nbOfThreads = backendBlockingThreads.size(); 1893 1894 task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1896 .getTimeout(), tm.getLogin(), tid, name); 1897 1898 for (int i = 0; i < nbOfThreads; i++) 1899 { 1900 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1901 .get(i); 1902 if (thread.hasTaskForTransaction(lTid)) 1903 { 1904 if (asynchronousBackends == null) 1905 asynchronousBackends = new ArrayList (); 1906 asynchronousBackends.add(thread.getBackend()); 1907 synchronized (thread) 1908 { 1909 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1910 thread.notify(); 1911 } 1912 } 1913 } 1914 1915 if (canTakeReadLock) 1917 backendBlockingThreadsRWLock.releaseRead(); 1918 else 1919 backendBlockingThreadsRWLock.releaseWrite(); 1920 1921 if (asynchronousBackends == null) 1923 task = null; 1924 } 1925 1926 try 1927 { 1928 if (canTakeReadLock) 1929 backendNonBlockingThreadsRWLock.acquireRead(); 1930 else 1931 backendNonBlockingThreadsRWLock.acquireWrite(); 1932 } 1933 catch (InterruptedException e) 1934 { 1935 String msg = Translate.get( 1936 "loadbalancer.backendlist.acquire.writelock.failed", e); 1937 logger.error(msg); 1938 throw new SQLException (msg); 1939 } 1940 1941 int nbOfThreads = backendNonBlockingThreads.size(); 1942 ArrayList savepointList = new ArrayList (); 1943 for (int i = 0; i < nbOfThreads; i++) 1946 { 1947 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1948 .get(i); 1949 DatabaseBackend backend = thread.getBackend(); 1950 if (backend.isStartedTransaction(lTid) 1954 && ((asynchronousBackends == null) || (!asynchronousBackends 1955 .contains(backend)))) 1956 savepointList.add(thread); 1957 } 1958 1959 nbOfThreads = savepointList.size(); 1960 if (nbOfThreads == 0) 1961 { 1962 if (canTakeReadLock) 1963 backendNonBlockingThreadsRWLock.releaseRead(); 1964 else 1965 backendNonBlockingThreadsRWLock.releaseWrite(); 1966 return; 1967 } 1968 1969 if (task == null) 1970 task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1971 .getTimeout(), tm.getLogin(), tid, name); 1972 1973 synchronized (task) 1974 { 1975 for (int i = 0; i < nbOfThreads; i++) 1977 { 1978 BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i); 1979 synchronized (thread) 1980 { 1981 thread.addTask(task, tid); 1982 thread.notify(); 1983 } 1984 } 1985 1986 if (canTakeReadLock) 1988 backendNonBlockingThreadsRWLock.releaseRead(); 1989 else 1990 backendNonBlockingThreadsRWLock.releaseWrite(); 1991 1992 if (totalOrderSavepoint != null) 1994 removeHeadFromAndNotifyTotalOrderQueue(); 1995 1996 try 1998 { 1999 long timeout = tm.getTimeout(); 2001 if (timeout > 0) 2002 { 2003 long start = System.currentTimeMillis(); 2004 task.wait(timeout); 2005 long end = System.currentTimeMillis(); 2006 long remaining = timeout - (end - start); 2007 if (remaining <= 0) 2008 { 2009 if (task.setExpiredTimeout()) 2010 { String msg = Translate.get("loadbalancer.setsavepoint.timeout", 2012 new String []{name, String.valueOf(tid), 2013 String.valueOf(task.getSuccess()), 2014 String.valueOf(task.getFailed())}); 2015 logger.warn(msg); 2016 throw new SQLException (msg); 2017 } 2018 } 2020 } 2021 else 2022 task.wait(); 2023 } 2024 catch (InterruptedException e) 2025 { 2026 if (task.setExpiredTimeout()) 2027 { String msg = Translate.get("loadbalancer.setsavepoint.timeout", 2029 new String []{name, String.valueOf(tid), 2030 String.valueOf(task.getSuccess()), 2031 String.valueOf(task.getFailed())}); 2032 logger.warn(msg); 2033 throw new SQLException (msg); 2034 } 2035 } 2037 2038 if (task.getSuccess() > 0) 2039 return; 2040 else 2041 { ArrayList exceptions = task.getExceptions(); 2043 if (exceptions == null) 2044 throw new SQLException (Translate.get( 2045 "loadbalancer.setsavepoint.all.failed", new String []{name, 2046 String.valueOf(tid)})); 2047 else 2048 { 2049 String errorMsg = Translate.get( 2050 "loadbalancer.setsavepoint.failed.stack", new String []{name, 2051 String.valueOf(tid)}) 2052 + "\n"; 2053 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 2054 errorMsg); 2055 logger.error(ex.getMessage()); 2056 throw ex; 2057 } 2058 } 2059 } 2060 } 2061 2062 2068 private void waitForAllWritesToBePostedOnBackendBlockingThreads() 2069 throws SQLException 2070 { 2071 try 2075 { 2076 backendBlockingThreadsRWLock.acquireWrite(); 2077 } 2078 catch (InterruptedException e) 2079 { 2080 String msg = Translate.get( 2081 "loadbalancer.backendlist.acquire.writelock.failed", e); 2082 logger.error(msg); 2083 throw new SQLException (msg); 2084 } 2085 backendBlockingThreadsRWLock.releaseWrite(); 2086 } 2087 2088 2094 protected void waitForAllWritesToComplete(long transactionId) 2095 throws SQLException 2096 { 2097 waitForAllWritesToBePostedOnBackendBlockingThreads(); 2098 2099 boolean success = false; 2100 while (!success) 2101 { 2102 try 2103 { for (Iterator iter = backendBlockingThreads.iterator(); iter.hasNext();) 2106 { 2107 BackendWorkerThread thread = (BackendWorkerThread) iter.next(); 2108 thread.waitForAllTasksToComplete(transactionId); 2109 } 2110 success = true; 2111 } 2112 catch (ConcurrentModificationException e) 2113 { } 2116 } 2117 } 2118 2119 2127 protected void waitForAllWritesToComplete(DatabaseBackend backend, 2128 long transactionId) throws SQLException 2129 { 2130 waitForAllWritesToBePostedOnBackendBlockingThreads(); 2131 2132 boolean success = false; 2133 while (!success) 2134 { 2135 try 2136 { for (Iterator iter = backendBlockingThreads.iterator(); iter.hasNext();) 2139 { 2140 BackendWorkerThread thread = (BackendWorkerThread) iter.next(); 2141 if (thread.getBackend() == backend) 2142 { 2143 thread.waitForAllTasksToComplete(transactionId); 2144 break; 2145 } 2146 } 2147 success = true; 2148 } 2149 catch (ConcurrentModificationException e) 2150 { } 2153 } 2154 } 2155 2156 2163 protected void waitForAllWritesToComplete(DatabaseBackend backend) 2164 throws SQLException 2165 { 2166 waitForAllWritesToBePostedOnBackendBlockingThreads(); 2167 2168 boolean success = false; 2169 while (!success) 2170 { 2171 try 2172 { for (Iterator iter = backendBlockingThreads.iterator(); iter.hasNext();) 2175 { 2176 BackendWorkerThread thread = (BackendWorkerThread) iter.next(); 2177 if (thread.getBackend() == backend) 2178 { 2179 thread.waitForAllTasksToComplete(); 2180 break; 2181 } 2182 } 2183 success = true; 2184 } 2185 catch (ConcurrentModificationException e) 2186 { } 2189 } 2190 } 2191 2192 2195 2196 2208 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 2209 throws SQLException 2210 { 2211 if (writeEnabled && db.isWriteCanBeEnabled()) 2212 { 2213 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 2215 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 2216 2217 try 2219 { 2220 backendBlockingThreadsRWLock.acquireWrite(); 2221 } 2222 catch (InterruptedException e) 2223 { 2224 String msg = Translate.get( 2225 "loadbalancer.backendlist.acquire.writelock.failed", e); 2226 logger.error(msg); 2227 throw new SQLException (msg); 2228 } 2229 backendBlockingThreads.add(blockingThread); 2230 backendBlockingThreadsRWLock.releaseWrite(); 2231 blockingThread.start(); 2232 logger.info(Translate.get( 2233 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 2234 2235 try 2237 { 2238 backendNonBlockingThreadsRWLock.acquireWrite(); 2239 } 2240 catch (InterruptedException e) 2241 { 2242 String msg = Translate.get( 2243 "loadbalancer.backendlist.acquire.writelock.failed", e); 2244 logger.error(msg); 2245 throw new SQLException (msg); 2246 } 2247 backendNonBlockingThreads.add(nonBlockingThread); 2248 backendNonBlockingThreadsRWLock.releaseWrite(); 2249 nonBlockingThread.start(); 2250 logger.info(Translate.get( 2251 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 2252 db.enableWrite(); 2253 } 2254 2255 if (!db.isInitialized()) 2256 db.initializeConnections(); 2257 db.enableRead(); 2258 } 2259 2260 2271 public synchronized void disableBackend(DatabaseBackend db) 2272 throws SQLException 2273 { 2274 if (db.isWriteEnabled()) 2275 { 2276 KillThreadTask killBlockingThreadTask = new KillThreadTask(1, 1); 2277 2278 try 2280 { 2281 backendBlockingThreadsRWLock.acquireWrite(); 2282 } 2283 catch (InterruptedException e) 2284 { 2285 String msg = Translate.get( 2286 "loadbalancer.backendlist.acquire.writelock.failed", e); 2287 logger.error(msg); 2288 throw new SQLException (msg); 2289 } 2290 2291 int nbOfThreads = backendBlockingThreads.size(); 2292 2293 for (int i = 0; i < nbOfThreads; i++) 2295 { 2296 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 2297 .get(i); 2298 if (thread.getBackend().equals(db)) 2299 { 2300 logger.info(Translate 2301 .get("loadbalancer.backend.workerthread.blocking.remove", db 2302 .getName())); 2303 2304 backendBlockingThreads.remove(i); 2306 2307 synchronized (thread) 2308 { 2309 thread.addPriorityTask(killBlockingThreadTask); 2311 thread.notify(); 2312 } 2313 break; 2314 } 2315 } 2316 2317 backendBlockingThreadsRWLock.releaseWrite(); 2318 2319 synchronized (killBlockingThreadTask) 2321 { 2322 if (!killBlockingThreadTask.hasFullyCompleted()) 2323 try 2324 { 2325 killBlockingThreadTask.wait(); 2326 } 2327 catch (InterruptedException ignore) 2328 { 2329 } 2330 } 2331 2332 KillThreadTask killNonBlockingThreadTask = new KillThreadTask(1, 1); 2334 2335 try 2336 { 2337 backendNonBlockingThreadsRWLock.acquireWrite(); 2338 } 2339 catch (InterruptedException e) 2340 { 2341 String msg = Translate.get( 2342 "loadbalancer.backendlist.acquire.writelock.failed", e); 2343 logger.error(msg); 2344 throw new SQLException (msg); 2345 } 2346 2347 nbOfThreads = backendNonBlockingThreads.size(); 2349 for (int i = 0; i < nbOfThreads; i++) 2350 { 2351 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 2352 .get(i); 2353 if (thread.getBackend().equals(db)) 2354 { 2355 logger.info(Translate.get( 2356 "loadbalancer.backend.workerthread.non.blocking.remove", db 2357 .getName())); 2358 2359 backendNonBlockingThreads.remove(i); 2361 2362 synchronized (thread) 2363 { 2364 thread.addPriorityTask(killNonBlockingThreadTask); 2366 thread.notify(); 2367 } 2368 break; 2369 } 2370 } 2371 2372 backendNonBlockingThreadsRWLock.releaseWrite(); 2373 2374 synchronized (killNonBlockingThreadTask) 2376 { 2377 if (!killNonBlockingThreadTask.hasFullyCompleted()) 2378 try 2379 { 2380 killNonBlockingThreadTask.wait(); 2381 } 2382 catch (InterruptedException ignore) 2383 { 2384 } 2385 } 2386 } 2387 2388 db.disable(); 2389 if (db.isInitialized()) 2390 db.finalizeConnections(); 2391 } 2392 2393 2396 public int getNumberOfEnabledBackends() 2397 { 2398 return backendBlockingThreads.size(); 2399 } 2400 2401 2405 2408 public String getXmlImpl() 2409 { 2410 StringBuffer info = new StringBuffer (); 2411 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 2412 if (waitForCompletionPolicy != null) 2413 info.append(waitForCompletionPolicy.getXml()); 2414 if (macroHandler != null) 2415 info.append(macroHandler.getXml()); 2416 info.append(getRaidb1Xml()); 2417 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 2418 return info.toString(); 2419 } 2420 2421 2428 public abstract String getRaidb1Xml(); 2429} | Popular Tags |