1 24 25 package org.objectweb.cjdbc.controller.loadbalancer.raidb2; 26 27 import java.sql.Connection ; 28 import java.sql.SQLException ; 29 import java.util.ArrayList ; 30 import java.util.ConcurrentModificationException ; 31 import java.util.Iterator ; 32 33 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 34 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException; 35 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory; 36 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 37 import org.objectweb.cjdbc.common.i18n.Translate; 38 import org.objectweb.cjdbc.common.log.Trace; 39 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 40 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 41 import org.objectweb.cjdbc.common.sql.SelectRequest; 42 import org.objectweb.cjdbc.common.sql.StoredProcedure; 43 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 44 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 45 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 46 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 47 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 48 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 49 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 50 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 51 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 52 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException; 53 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy; 54 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule; 55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 57 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 58 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask; 59 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask; 60 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 61 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask; 62 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask; 63 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask; 64 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask; 65 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask; 66 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 67 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 68 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 69 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit; 71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReleaseSavepoint; 72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback; 73 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.RollbackToSavepoint; 74 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetSavepoint; 75 76 89 public abstract class RAIDb2 extends AbstractLoadBalancer 90 { 91 100 protected ArrayList backendBlockingThreads; 101 protected ArrayList backendNonBlockingThreads; 102 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 103 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 104 protected WaitForCompletionPolicy waitForCompletionPolicy; 106 protected CreateTablePolicy createTablePolicy; 107 108 protected static Trace logger = Trace 109 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2"); 110 111 114 115 126 public RAIDb2(VirtualDatabase vdb, 127 WaitForCompletionPolicy waitForCompletionPolicy, 128 CreateTablePolicy createTablePolicy) throws Exception 129 { 130 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE); 131 132 this.waitForCompletionPolicy = waitForCompletionPolicy; 133 backendBlockingThreads = new ArrayList (); 134 backendNonBlockingThreads = new ArrayList (); 135 this.createTablePolicy = createTablePolicy; 136 } 137 138 141 142 149 private int getNbToWait(int nbOfThreads) 150 { 151 int nbToWait; 152 switch (waitForCompletionPolicy.getPolicy()) 153 { 154 case WaitForCompletionPolicy.FIRST : 155 nbToWait = 1; 156 break; 157 case WaitForCompletionPolicy.MAJORITY : 158 nbToWait = nbOfThreads / 2 + 1; 159 break; 160 case WaitForCompletionPolicy.ALL : 161 nbToWait = nbOfThreads; 162 break; 163 default : 164 logger 165 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 166 nbToWait = nbOfThreads; 167 break; 168 } 169 return nbToWait; 170 } 171 172 182 public int execWriteRequest(AbstractWriteRequest request) 183 throws AllBackendsFailedException, SQLException 184 { 185 return ((WriteRequestTask) execWriteRequest(request, false, null)) 186 .getResult(); 187 } 188 189 199 public ControllerResultSet execWriteRequestWithKeys( 200 AbstractWriteRequest request, MetadataCache metadataCache) 201 throws AllBackendsFailedException, SQLException 202 { 203 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 204 metadataCache)).getResult(); 205 } 206 207 222 private AbstractTask execWriteRequest(AbstractWriteRequest request, 223 boolean useKeys, MetadataCache metadataCache) 224 throws AllBackendsFailedException, SQLException 225 { 226 ArrayList backendThreads; 227 ReadPrioritaryFIFOWriteLock lock; 228 229 boolean canTakeReadLock = waitForTotalOrder(request, true); 234 235 handleMacros(request); 237 238 if (request.mightBlock()) 240 { backendThreads = backendBlockingThreads; 242 lock = backendBlockingThreadsRWLock; 243 } 244 else 245 { backendThreads = backendNonBlockingThreads; 247 lock = backendNonBlockingThreadsRWLock; 248 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 249 && (request.getTransactionId() != 0)) 250 waitForAllWritesToComplete(request.getTransactionId()); 251 } 252 253 try 254 { 255 if (canTakeReadLock) 256 lock.acquireRead(); 257 else 258 lock.acquireWrite(); 259 } 260 catch (InterruptedException e) 261 { 262 String msg = Translate.get( 263 "loadbalancer.backendlist.acquire.writelock.failed", e); 264 logger.error(msg); 265 throw new SQLException (msg); 266 } 267 268 int nbOfThreads = backendThreads.size(); 269 ArrayList writeList = new ArrayList (); 270 String tableName = request.getTableName(); 271 272 if (request.isCreate()) 273 { CreateTableRule rule = createTablePolicy.getTableRule(request 275 .getTableName()); 276 if (rule == null) 277 rule = createTablePolicy.getDefaultRule(); 278 279 ArrayList chosen; 281 try 282 { 283 chosen = rule.getBackends(vdb.getBackends()); 284 } 285 catch (CreateTableException e) 286 { 287 throw new SQLException (Translate.get( 288 "loadbalancer.create.table.rule.failed", e.getMessage())); 289 } 290 291 if (chosen != null) 293 { 294 for (int i = 0; i < nbOfThreads; i++) 295 { 296 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 297 .get(i); 298 if (chosen.contains(thread.getBackend())) 299 writeList.add(thread); 300 } 301 } 302 } 303 else 304 { for (int i = 0; i < nbOfThreads; i++) 306 { 307 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 308 .get(i); 309 if (thread.getBackend().hasTable(tableName)) 310 writeList.add(thread); 311 } 312 } 313 314 nbOfThreads = writeList.size(); 315 if (nbOfThreads == 0) 316 { 317 if (canTakeReadLock) 318 lock.releaseRead(); 319 else 320 lock.releaseWrite(); 321 322 String msg = Translate.get("loadbalancer.execute.no.backend.found", 323 request.getSQLShortForm(vdb.getSQLShortFormLength())); 324 logger.warn(msg); 325 326 removeHeadFromAndNotifyTotalOrderQueue(); 328 throw new SQLException (msg); 329 } 330 else 331 { 332 if (logger.isDebugEnabled()) 333 logger.debug(Translate.get("loadbalancer.execute.on.several", 334 new String []{String.valueOf(request.getId()), 335 String.valueOf(nbOfThreads)})); 336 } 337 338 AbstractTask task; 340 if (useKeys) 341 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 342 nbOfThreads, request, metadataCache); 343 else 344 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 345 request); 346 347 352 if (request.isAutoCommit()) 354 { 355 for (int i = 0; i < nbOfThreads; i++) 356 { 357 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i); 358 synchronized (thread) 359 { 360 thread.addTask(task); 361 } 362 } 363 } 364 else 365 { 366 for (int i = 0; i < nbOfThreads; i++) 367 { 368 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i); 369 synchronized (thread) 370 { 371 thread.addTask(task, request.getTransactionId()); 372 } 373 } 374 } 375 376 for (int i = 0; i < nbOfThreads; i++) 378 { 379 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i); 380 synchronized (thread) 381 { 382 thread.notify(); 383 } 384 } 385 386 if (canTakeReadLock) 388 lock.releaseRead(); 389 else 390 lock.releaseWrite(); 391 392 removeHeadFromAndNotifyTotalOrderQueue(); 394 395 synchronized (task) 396 { 397 if (!task.hasCompleted()) 398 { 399 try 401 { 402 long timeout = request.getTimeout() * 1000; 404 if (timeout > 0) 405 { 406 long start = System.currentTimeMillis(); 407 task.wait(timeout); 408 long end = System.currentTimeMillis(); 409 long remaining = timeout - (end - start); 410 if (remaining <= 0) 411 { 412 if (task.setExpiredTimeout()) 413 { String msg = Translate.get("loadbalancer.request.timeout", 415 new String []{String.valueOf(request.getId()), 416 String.valueOf(task.getSuccess()), 417 String.valueOf(task.getFailed())}); 418 logger.warn(msg); 419 throw new SQLException (msg); 420 } 421 } 423 } 425 else 426 task.wait(); 427 } 428 catch (InterruptedException e) 429 { 430 if (task.setExpiredTimeout()) 431 { String msg = Translate.get("loadbalancer.request.timeout", 433 new String []{String.valueOf(request.getId()), 434 String.valueOf(task.getSuccess()), 435 String.valueOf(task.getFailed())}); 436 logger.warn(msg); 437 throw new SQLException (msg); 438 } 439 } 441 } 442 443 if (task.getSuccess() > 0) 444 return task; 445 else 446 { ArrayList exceptions = task.getExceptions(); 448 if (exceptions == null) 449 throw new AllBackendsFailedException(Translate.get( 450 "loadbalancer.request.failed.all", request.getId())); 451 else 452 { 453 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 454 request.getId()) 455 + "\n"; 456 for (int i = 0; i < exceptions.size(); i++) 457 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 458 logger.error(errorMsg); 459 throw new SQLException (errorMsg); 460 } 461 } 462 } 463 } 464 465 473 public abstract ControllerResultSet execReadRequest(SelectRequest request, 474 MetadataCache metadataCache) throws SQLException ; 475 476 485 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 486 DatabaseBackend backend, MetadataCache metadataCache) 487 throws SQLException , UnreachableBackendException 488 { 489 handleMacros(request); 491 492 AbstractConnectionManager cm = backend.getConnectionManager(request 494 .getLogin()); 495 496 if (cm == null) 498 { 499 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 500 new String []{request.getLogin(), backend.getName()}); 501 logger.error(msg); 502 throw new SQLException (msg); 503 } 504 505 if (request.isAutoCommit()) 507 { 508 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 509 waitForAllWritesToComplete(backend); 513 514 ControllerResultSet rs = null; 515 boolean badConnection; 516 do 517 { 518 badConnection = false; 519 Connection c = null; 521 try 522 { 523 c = cm.getConnection(); 524 } 525 catch (UnreachableBackendException e1) 526 { 527 logger.error(Translate.get( 528 "loadbalancer.backend.disabling.unreachable", backend.getName())); 529 disableBackend(backend); 530 throw new UnreachableBackendException(Translate.get( 531 "loadbalancer.backend.unreacheable", backend.getName())); 532 } 533 534 if (c == null) 536 throw new UnreachableBackendException( 537 "No more connections on backend " + backend.getName()); 538 539 try 541 { 542 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 543 cm.releaseConnection(c); 544 } 545 catch (SQLException e) 546 { 547 cm.releaseConnection(c); 548 throw new SQLException (Translate.get( 549 "loadbalancer.request.failed.on.backend", new String []{ 550 request.getSQLShortForm(vdb.getSQLShortFormLength()), 551 backend.getName(), e.getMessage()})); 552 } 553 catch (BadConnectionException e) 554 { cm.deleteConnection(c); 556 badConnection = true; 557 } 558 } 559 while (badConnection); 560 if (logger.isDebugEnabled()) 561 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 562 String.valueOf(request.getId()), backend.getName()})); 563 return rs; 564 } 565 else 566 { Connection c; 568 long tid = request.getTransactionId(); 569 Long lTid = new Long (tid); 570 571 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 573 waitForAllWritesToComplete(backend, request.getTransactionId()); 574 575 try 576 { 577 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 578 request.getTransactionIsolation()); 579 } 580 catch (UnreachableBackendException e1) 581 { 582 logger.error(Translate.get( 583 "loadbalancer.backend.disabling.unreachable", backend.getName())); 584 disableBackend(backend); 585 throw new SQLException (Translate.get( 586 "loadbalancer.backend.unreacheable", backend.getName())); 587 } 588 catch (NoTransactionStartWhenDisablingException e) 589 { 590 String msg = Translate.get("loadbalancer.backend.is.disabling", 591 new String []{request.getSQLShortForm(vdb.getSQLShortFormLength()), 592 backend.getName()}); 593 logger.error(msg); 594 throw new SQLException (msg); 595 } 596 597 if (c == null) 599 throw new SQLException (Translate.get( 600 "loadbalancer.unable.retrieve.connection", new String []{ 601 String.valueOf(tid), backend.getName()})); 602 603 ControllerResultSet rs = null; 605 try 606 { 607 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 608 } 609 catch (SQLException e) 610 { 611 throw new SQLException (Translate.get( 612 "loadbalancer.request.failed.on.backend", new String []{ 613 request.getSQLShortForm(vdb.getSQLShortFormLength()), 614 backend.getName(), e.getMessage()})); 615 } 616 catch (BadConnectionException e) 617 { cm.deleteConnection(tid); 620 String msg = Translate.get( 621 "loadbalancer.backend.disabling.connection.failure", backend 622 .getName()); 623 logger.error(msg); 624 disableBackend(backend); 625 throw new SQLException (msg); 626 } 627 if (logger.isDebugEnabled()) 628 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 629 new String []{String.valueOf(tid), String.valueOf(request.getId()), 630 backend.getName()})); 631 return rs; 632 } 633 } 634 635 644 protected ControllerResultSet executeStoredProcedureOnBackend( 645 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) 646 throws SQLException , UnreachableBackendException 647 { 648 AbstractConnectionManager cm = backend 650 .getConnectionManager(proc.getLogin()); 651 652 if (cm == null) 654 { 655 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 656 new String []{proc.getLogin(), backend.getName()}); 657 logger.error(msg); 658 throw new SQLException (msg); 659 } 660 661 if (proc.isAutoCommit()) 663 { 664 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 665 waitForAllWritesToComplete(backend); 669 670 Connection c = null; 672 try 673 { 674 c = cm.getConnection(); 675 } 676 catch (UnreachableBackendException e1) 677 { 678 logger.error(Translate.get( 679 "loadbalancer.backend.disabling.unreachable", backend.getName())); 680 disableBackend(backend); 681 throw new UnreachableBackendException(Translate.get( 682 "loadbalancer.backend.unreacheable", backend.getName())); 683 } 684 685 if (c == null) 687 throw new SQLException (Translate.get( 688 "loadbalancer.backend.no.connection", backend.getName())); 689 690 ControllerResultSet rs = null; 692 try 693 { 694 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 695 backend, c, metadataCache); 696 } 697 catch (Exception e) 698 { 699 throw new SQLException (Translate.get( 700 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 701 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 702 backend.getName(), e.getMessage()})); 703 } 704 finally 705 { 706 cm.releaseConnection(c); 707 } 708 if (logger.isDebugEnabled()) 709 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 710 new String []{String.valueOf(proc.getId()), backend.getName()})); 711 return rs; 712 } 713 else 714 { Connection c; 716 long tid = proc.getTransactionId(); 717 Long lTid = new Long (tid); 718 719 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 721 waitForAllWritesToComplete(backend, proc.getTransactionId()); 722 723 try 724 { 725 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 726 proc.getTransactionIsolation()); 727 } 728 catch (UnreachableBackendException e1) 729 { 730 logger.error(Translate.get( 731 "loadbalancer.backend.disabling.unreachable", backend.getName())); 732 disableBackend(backend); 733 throw new SQLException (Translate.get( 734 "loadbalancer.backend.unreacheable", backend.getName())); 735 } 736 catch (NoTransactionStartWhenDisablingException e) 737 { 738 String msg = Translate.get("loadbalancer.backend.is.disabling", 739 new String []{proc.getSQLShortForm(vdb.getSQLShortFormLength()), 740 backend.getName()}); 741 logger.error(msg); 742 throw new SQLException (msg); 743 } 744 745 if (c == null) 747 throw new SQLException (Translate.get( 748 "loadbalancer.unable.retrieve.connection", new String []{ 749 String.valueOf(tid), backend.getName()})); 750 751 ControllerResultSet rs; 753 try 754 { 755 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 756 backend, c, metadataCache); 757 } 758 catch (Exception e) 759 { 760 throw new SQLException (Translate.get( 761 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 762 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 763 backend.getName(), e.getMessage()})); 764 } 765 if (logger.isDebugEnabled()) 766 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 767 new String []{String.valueOf(tid), String.valueOf(proc.getId()), 768 backend.getName()})); 769 return rs; 770 } 771 } 772 773 777 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 778 MetadataCache metadataCache) throws SQLException 779 { 780 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 781 proc, true, metadataCache); 782 return task.getResult(); 783 } 784 785 788 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 789 { 790 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 791 proc, false, null); 792 return task.getResult(); 793 } 794 795 807 private AbstractTask callStoredProcedure(StoredProcedure proc, 808 boolean isRead, MetadataCache metadataCache) throws SQLException 809 { 810 ArrayList backendThreads = backendBlockingThreads; 811 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 812 813 boolean canTakeReadLock = waitForTotalOrder(proc, true); 818 819 handleMacros(proc); 821 822 try 823 { 824 if (canTakeReadLock) 830 lock.acquireRead(); 831 else 832 lock.acquireWrite(); 833 } 834 catch (InterruptedException e) 835 { 836 String msg; 837 msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed", 838 e); 839 logger.error(msg); 840 throw new SQLException (msg); 841 } 842 843 int nbOfThreads = backendThreads.size(); 844 845 AbstractTask task; 847 if (isRead) 848 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads, 849 proc, metadataCache); 850 else 851 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads), 852 nbOfThreads, proc); 853 854 int nbOfBackends = 0; 855 856 for (int i = 0; i < nbOfThreads; i++) 858 { 859 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 860 if (thread.getBackend().hasStoredProcedure(proc.getProcedureName())) 861 { 862 nbOfBackends++; 863 synchronized (thread) 864 { 865 if (proc.isAutoCommit()) 866 thread.addTask(task); 867 else 868 thread.addTask(task, proc.getTransactionId()); 869 thread.notify(); 870 } 871 } 872 } 873 874 if (canTakeReadLock) 875 lock.releaseRead(); 876 else 877 lock.releaseWrite(); 878 879 removeHeadFromAndNotifyTotalOrderQueue(); 881 882 if (nbOfBackends == 0) 883 { 884 throw new SQLException (Translate.get( 885 "loadbalancer.backend.no.required.storedprocedure", proc 886 .getProcedureName())); 887 } 888 else 889 task.setTotalNb(nbOfBackends); 890 891 synchronized (task) 892 { 893 if (!task.hasCompleted()) 894 { 895 try 897 { 898 long timeout = proc.getTimeout() * 1000; 900 if (timeout > 0) 901 { 902 long start = System.currentTimeMillis(); 903 task.wait(timeout); 904 long end = System.currentTimeMillis(); 905 long remaining = timeout - (end - start); 906 if (remaining <= 0) 907 { 908 if (task.setExpiredTimeout()) 909 { String msg = Translate.get( 911 "loadbalancer.storedprocedure.timeout", new String []{ 912 String.valueOf(proc.getId()), 913 String.valueOf(task.getSuccess()), 914 String.valueOf(task.getFailed())}); 915 logger.warn(msg); 916 throw new SQLException (msg); 917 } 918 } 920 } 922 else 923 task.wait(); 924 } 925 catch (InterruptedException e) 926 { 927 if (task.setExpiredTimeout()) 928 { String msg = Translate.get("loadbalancer.storedprocedure.timeout", 930 new String []{String.valueOf(proc.getId()), 931 String.valueOf(task.getSuccess()), 932 String.valueOf(task.getFailed())}); 933 logger.warn(msg); 934 throw new SQLException (msg); 935 } 936 } 938 } 939 940 if (task.getSuccess() > 0) 941 return task; 942 else 943 { ArrayList exceptions = task.getExceptions(); 945 if (exceptions == null) 946 throw new SQLException (Translate.get( 947 "loadbalancer.storedprocedure.all.failed", proc.getId())); 948 else 949 { 950 String errorMsg = Translate.get( 951 "loadbalancer.storedprocedure.failed.stack", proc.getId()) 952 + "\n"; 953 for (int i = 0; i < exceptions.size(); i++) 954 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 955 logger.error(errorMsg); 956 throw new SQLException (errorMsg); 957 } 958 } 959 } 960 } 961 962 965 966 972 public final void begin(TransactionMarkerMetaData tm) throws SQLException 973 { 974 } 975 976 982 public void commit(TransactionMarkerMetaData tm) throws SQLException 983 { 984 long tid = tm.getTransactionId(); 985 Long lTid = new Long (tid); 986 ArrayList asynchronousBackends = null; 989 CommitTask task = null; 990 991 Commit totalOrderCommit = null; 993 boolean canTakeReadLock = false; 994 if (vdb.getTotalOrderQueue() != null) 995 { 996 totalOrderCommit = new Commit(tm.getLogin(), tid); 997 canTakeReadLock = waitForTotalOrder(totalOrderCommit, false); 1002 if (!canTakeReadLock) 1003 totalOrderCommit = null; 1005 } 1006 1007 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1008 { try 1010 { 1011 if (canTakeReadLock) 1012 backendBlockingThreadsRWLock.acquireRead(); 1013 else 1014 { 1015 backendBlockingThreadsRWLock.acquireWrite(); 1019 } 1020 } 1021 catch (InterruptedException e) 1022 { 1023 String msg = Translate.get( 1024 "loadbalancer.backendlist.acquire.writelock.failed", e); 1025 logger.error(msg); 1026 throw new SQLException (msg); 1027 } 1028 1029 int nbOfThreads = backendBlockingThreads.size(); 1030 task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1032 .getTimeout(), tm.getLogin(), tid); 1033 1034 for (int i = 0; i < nbOfThreads; i++) 1035 { 1036 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1037 .get(i); 1038 if (thread.hasTaskForTransaction(lTid)) 1039 { 1040 if (asynchronousBackends == null) 1041 asynchronousBackends = new ArrayList (); 1042 asynchronousBackends.add(thread.getBackend()); 1043 synchronized (thread) 1044 { 1045 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1046 thread.notify(); 1047 } 1048 } 1049 } 1050 1051 if (canTakeReadLock) 1052 backendBlockingThreadsRWLock.releaseRead(); 1053 else 1054 backendBlockingThreadsRWLock.releaseWrite(); 1055 1056 if (asynchronousBackends == null) 1058 task = null; 1059 } 1060 1061 try 1062 { 1063 if (canTakeReadLock) 1064 backendNonBlockingThreadsRWLock.acquireRead(); 1065 else 1066 { 1067 backendNonBlockingThreadsRWLock.acquireWrite(); 1071 } 1072 } 1073 catch (InterruptedException e) 1074 { 1075 String msg = Translate.get( 1076 "loadbalancer.backendlist.acquire.writelock.failed", e); 1077 logger.error(msg); 1078 throw new SQLException (msg); 1079 } 1080 1081 int nbOfThreads = backendNonBlockingThreads.size(); 1082 ArrayList commitList = new ArrayList (); 1083 1084 for (int i = 0; i < nbOfThreads; i++) 1086 { 1087 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1088 .get(i); 1089 DatabaseBackend backend = thread.getBackend(); 1090 if (backend.isStartedTransaction(lTid) 1094 && ((asynchronousBackends == null) || (!asynchronousBackends 1095 .contains(backend)))) 1096 commitList.add(thread); 1097 } 1098 1099 int nbOfThreadsToCommit = commitList.size(); 1104 if ((task == null) && (nbOfThreadsToCommit != 0)) 1105 task = new CommitTask(getNbToWait(nbOfThreadsToCommit), 1106 nbOfThreadsToCommit, tm.getTimeout(), tm.getLogin(), tid); 1107 1108 for (int i = 0; i < nbOfThreadsToCommit; i++) 1112 { 1113 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 1114 synchronized (thread) 1115 { 1116 thread.addTask(task, tid); 1117 thread.notify(); 1118 } 1119 } 1120 1121 if (canTakeReadLock) 1122 backendNonBlockingThreadsRWLock.releaseRead(); 1123 else 1124 backendNonBlockingThreadsRWLock.releaseWrite(); 1125 1126 if (totalOrderCommit != null) 1128 removeHeadFromAndNotifyTotalOrderQueue(); 1129 1130 if (task == null) 1132 return; 1133 1134 synchronized (task) 1135 { 1136 if (!task.hasCompleted()) 1137 { 1138 try 1140 { 1141 long timeout = tm.getTimeout(); 1143 if (timeout > 0) 1144 { 1145 long start = System.currentTimeMillis(); 1146 task.wait(timeout); 1147 long end = System.currentTimeMillis(); 1148 long remaining = timeout - (end - start); 1149 if (remaining <= 0) 1150 { 1151 if (task.setExpiredTimeout()) 1152 { String msg = Translate.get("loadbalancer.commit.timeout", 1154 new String []{String.valueOf(tid), 1155 String.valueOf(task.getSuccess()), 1156 String.valueOf(task.getFailed())}); 1157 logger.warn(msg); 1158 throw new SQLException (msg); 1159 } 1160 } 1162 } 1163 else 1164 task.wait(); 1165 } 1166 catch (InterruptedException e) 1167 { 1168 if (task.setExpiredTimeout()) 1169 { String msg = Translate.get("loadbalancer.commit.timeout", 1171 new String []{String.valueOf(tid), 1172 String.valueOf(task.getSuccess()), 1173 String.valueOf(task.getFailed())}); 1174 logger.warn(msg); 1175 throw new SQLException (msg); 1176 } 1177 } 1179 } 1180 1181 if (task.getSuccess() > 0) 1182 return; 1183 else 1184 { ArrayList exceptions = task.getExceptions(); 1186 if (exceptions == null) 1187 throw new SQLException (Translate.get( 1188 "loadbalancer.commit.all.failed", tid)); 1189 else 1190 { 1191 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 1192 tid) 1193 + "\n"; 1194 for (int i = 0; i < exceptions.size(); i++) 1195 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 1196 logger.error(errorMsg); 1197 throw new SQLException (errorMsg); 1198 } 1199 } 1200 } 1201 } 1202 1203 1209 public void rollback(TransactionMarkerMetaData tm) throws SQLException 1210 { 1211 long tid = tm.getTransactionId(); 1212 Long lTid = new Long (tid); 1213 ArrayList asynchronousBackends = null; 1216 RollbackTask task = null; 1217 1218 Rollback totalOrderRollback = null; 1220 boolean canTakeReadLock = false; 1221 if (vdb.getTotalOrderQueue() != null) 1222 { 1223 totalOrderRollback = new Rollback(tm.getLogin(), tid); 1224 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1229 if (!canTakeReadLock) 1230 totalOrderRollback = null; 1232 } 1233 1234 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1235 { 1236 try 1237 { 1238 if (canTakeReadLock) 1239 backendBlockingThreadsRWLock.acquireRead(); 1240 else 1241 { 1242 backendBlockingThreadsRWLock.acquireWrite(); 1246 } 1247 } 1248 catch (InterruptedException e) 1249 { 1250 String msg = Translate.get( 1251 "loadbalancer.backendlist.acquire.writelock.failed", e); 1252 logger.error(msg); 1253 throw new SQLException (msg); 1254 } 1255 1256 int nbOfThreads = backendBlockingThreads.size(); 1257 task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1259 .getTimeout(), tm.getLogin(), tid); 1260 1261 for (int i = 0; i < nbOfThreads; i++) 1262 { 1263 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1264 .get(i); 1265 if (thread.hasTaskForTransaction(lTid)) 1266 { 1267 if (asynchronousBackends == null) 1268 asynchronousBackends = new ArrayList (); 1269 asynchronousBackends.add(thread.getBackend()); 1270 synchronized (thread) 1271 { 1272 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1273 thread.notify(); 1274 } 1275 } 1276 } 1277 1278 if (canTakeReadLock) 1279 backendBlockingThreadsRWLock.releaseRead(); 1280 else 1281 backendBlockingThreadsRWLock.releaseWrite(); 1282 1283 if (asynchronousBackends == null) 1285 task = null; 1286 } 1287 1288 try 1289 { 1290 if (canTakeReadLock) 1291 backendNonBlockingThreadsRWLock.acquireRead(); 1292 else 1293 backendNonBlockingThreadsRWLock.acquireWrite(); 1294 } 1295 catch (InterruptedException e) 1296 { 1297 String msg = Translate.get( 1298 "loadbalancer.backendlist.acquire.writelock.failed", e); 1299 logger.error(msg); 1300 throw new SQLException (msg); 1301 } 1302 1303 int nbOfThreads = backendNonBlockingThreads.size(); 1304 ArrayList rollbackList = new ArrayList (); 1305 1306 for (int i = 0; i < nbOfThreads; i++) 1308 { 1309 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1310 .get(i); 1311 DatabaseBackend backend = thread.getBackend(); 1312 if (backend.isStartedTransaction(lTid) 1316 && ((asynchronousBackends == null) || (!asynchronousBackends 1317 .contains(backend)))) 1318 rollbackList.add(thread); 1319 } 1320 1321 int nbOfThreadsToRollback = rollbackList.size(); 1322 if ((task == null) && (nbOfThreadsToRollback != 0)) 1326 task = new RollbackTask(getNbToWait(nbOfThreadsToRollback), 1327 nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid); 1328 1329 for (int i = 0; i < nbOfThreadsToRollback; i++) 1333 { 1334 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 1335 synchronized (thread) 1336 { 1337 thread.addTask(task, tid); 1338 thread.notify(); 1339 } 1340 } 1341 1342 if (canTakeReadLock) 1344 backendNonBlockingThreadsRWLock.releaseRead(); 1345 else 1346 backendNonBlockingThreadsRWLock.releaseWrite(); 1347 1348 if (totalOrderRollback != null) 1350 removeHeadFromAndNotifyTotalOrderQueue(); 1351 1352 if (task == null) 1354 return; 1355 1356 synchronized (task) 1357 { 1358 if (!task.hasCompleted()) 1359 { 1360 try 1362 { 1363 long timeout = tm.getTimeout(); 1365 if (timeout > 0) 1366 { 1367 long start = System.currentTimeMillis(); 1368 task.wait(timeout); 1369 long end = System.currentTimeMillis(); 1370 long remaining = timeout - (end - start); 1371 if (remaining <= 0) 1372 { 1373 if (task.setExpiredTimeout()) 1374 { String msg = Translate.get("loadbalancer.rollback.timeout", 1376 new String []{String.valueOf(tid), 1377 String.valueOf(task.getSuccess()), 1378 String.valueOf(task.getFailed())}); 1379 logger.warn(msg); 1380 throw new SQLException (msg); 1381 } 1382 } 1384 } 1385 else 1386 task.wait(); 1387 } 1388 catch (InterruptedException e) 1389 { 1390 if (task.setExpiredTimeout()) 1391 { String msg = Translate.get("loadbalancer.rollback.timeout", 1393 new String []{String.valueOf(tid), 1394 String.valueOf(task.getSuccess()), 1395 String.valueOf(task.getFailed())}); 1396 logger.warn(msg); 1397 throw new SQLException (msg); 1398 } 1399 } 1401 } 1402 1403 if (task.getSuccess() > 0) 1404 return; 1405 else 1406 { ArrayList exceptions = task.getExceptions(); 1408 if (exceptions == null) 1409 throw new SQLException (Translate.get( 1410 "loadbalancer.rollback.all.failed", tid)); 1411 else 1412 { 1413 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 1414 tid) 1415 + "\n"; 1416 for (int i = 0; i < exceptions.size(); i++) 1417 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 1418 logger.error(errorMsg); 1419 throw new SQLException (errorMsg); 1420 } 1421 } 1422 } 1423 } 1424 1425 1434 public void rollback(TransactionMarkerMetaData tm, String savepointName) 1435 throws AllBackendsFailedException, SQLException 1436 { 1437 long tid = tm.getTransactionId(); 1438 Long lTid = new Long (tid); 1439 ArrayList asynchronousBackends = null; 1442 RollbackToSavepointTask task = null; 1443 1444 RollbackToSavepoint totalOrderRollback = null; 1446 boolean canTakeReadLock = false; 1447 if (vdb.getTotalOrderQueue() != null) 1448 { 1449 totalOrderRollback = new RollbackToSavepoint(tid, savepointName); 1450 canTakeReadLock = waitForTotalOrder(totalOrderRollback, false); 1455 if (!canTakeReadLock) 1456 totalOrderRollback = null; 1458 } 1459 1460 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1461 { 1462 try 1463 { 1464 if (canTakeReadLock) 1465 backendBlockingThreadsRWLock.acquireRead(); 1466 else 1467 { 1468 backendBlockingThreadsRWLock.acquireWrite(); 1472 } 1473 } 1474 catch (InterruptedException e) 1475 { 1476 String msg = Translate.get( 1477 "loadbalancer.backendlist.acquire.writelock.failed", e); 1478 logger.error(msg); 1479 throw new SQLException (msg); 1480 } 1481 1482 int nbOfThreads = backendBlockingThreads.size(); 1483 task = new RollbackToSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, 1485 tm.getTimeout(), tm.getLogin(), tid, savepointName); 1486 1487 for (int i = 0; i < nbOfThreads; i++) 1488 { 1489 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1490 .get(i); 1491 if (thread.hasTaskForTransaction(lTid)) 1492 { 1493 if (asynchronousBackends == null) 1494 asynchronousBackends = new ArrayList (); 1495 asynchronousBackends.add(thread.getBackend()); 1496 synchronized (thread) 1497 { 1498 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1499 thread.notify(); 1500 } 1501 } 1502 } 1503 1504 if (canTakeReadLock) 1505 backendBlockingThreadsRWLock.releaseRead(); 1506 else 1507 backendBlockingThreadsRWLock.releaseWrite(); 1508 1509 if (asynchronousBackends == null) 1511 task = null; 1512 } 1513 1514 try 1515 { 1516 if (canTakeReadLock) 1517 backendNonBlockingThreadsRWLock.acquireRead(); 1518 else 1519 backendNonBlockingThreadsRWLock.acquireWrite(); 1520 } 1521 catch (InterruptedException e) 1522 { 1523 String msg = Translate.get( 1524 "loadbalancer.backendlist.acquire.writelock.failed", e); 1525 logger.error(msg); 1526 throw new SQLException (msg); 1527 } 1528 1529 int nbOfThreads = backendNonBlockingThreads.size(); 1530 ArrayList rollbackList = new ArrayList (); 1531 1532 for (int i = 0; i < nbOfThreads; i++) 1535 { 1536 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1537 .get(i); 1538 DatabaseBackend backend = thread.getBackend(); 1539 if (backend.isStartedTransaction(lTid) 1543 && ((asynchronousBackends == null) || (!asynchronousBackends 1544 .contains(backend)))) 1545 rollbackList.add(thread); 1546 } 1547 1548 int nbOfThreadsToRollback = rollbackList.size(); 1549 if ((task == null) && (nbOfThreadsToRollback != 0)) 1553 task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback), 1554 nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid, 1555 savepointName); 1556 1557 for (int i = 0; i < nbOfThreadsToRollback; i++) 1561 { 1562 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 1563 synchronized (thread) 1564 { 1565 thread.addTask(task, tid); 1566 thread.notify(); 1567 } 1568 } 1569 1570 if (canTakeReadLock) 1572 backendNonBlockingThreadsRWLock.releaseRead(); 1573 else 1574 backendNonBlockingThreadsRWLock.releaseWrite(); 1575 1576 if (totalOrderRollback != null) 1578 removeHeadFromAndNotifyTotalOrderQueue(); 1579 1580 if (task == null) 1582 return; 1583 1584 synchronized (task) 1585 { 1586 if (!task.hasCompleted()) 1587 { 1588 try 1590 { 1591 long timeout = tm.getTimeout(); 1593 if (timeout > 0) 1594 { 1595 long start = System.currentTimeMillis(); 1596 task.wait(timeout); 1597 long end = System.currentTimeMillis(); 1598 long remaining = timeout - (end - start); 1599 if (remaining <= 0) 1600 { 1601 if (task.setExpiredTimeout()) 1602 { String msg = Translate.get( 1604 "loadbalancer.rollbacksavepoint.timeout", new String []{ 1605 savepointName, String.valueOf(tid), 1606 String.valueOf(task.getSuccess()), 1607 String.valueOf(task.getFailed())}); 1608 logger.warn(msg); 1609 throw new SQLException (msg); 1610 } 1611 } 1613 } 1614 else 1615 task.wait(); 1616 } 1617 catch (InterruptedException e) 1618 { 1619 if (task.setExpiredTimeout()) 1620 { String msg = Translate.get( 1622 "loadbalancer.rollbacksavepoint.timeout", new String []{ 1623 savepointName, String.valueOf(tid), 1624 String.valueOf(task.getSuccess()), 1625 String.valueOf(task.getFailed())}); 1626 logger.warn(msg); 1627 throw new SQLException (msg); 1628 } 1629 } 1631 } 1632 1633 if (task.getSuccess() > 0) 1634 return; 1635 else 1636 { ArrayList exceptions = task.getExceptions(); 1638 if (exceptions == null) 1639 throw new SQLException (Translate.get( 1640 "loadbalancer.rollbacksavepoint.all.failed", new String []{ 1641 savepointName, String.valueOf(tid)})); 1642 else 1643 { 1644 String errorMsg = Translate.get( 1645 "loadbalancer.rollbacksavepoint.failed.stack", new String []{ 1646 savepointName, String.valueOf(tid)}) 1647 + "\n"; 1648 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1649 errorMsg); 1650 logger.error(ex.getMessage()); 1651 throw ex; 1652 } 1653 } 1654 } 1655 } 1656 1657 1664 public void releaseSavepoint(TransactionMarkerMetaData tm, String name) 1665 throws SQLException 1666 { 1667 long tid = tm.getTransactionId(); 1668 Long lTid = new Long (tid); 1669 1670 ArrayList asynchronousBackends = null; 1673 ReleaseSavepointTask task = null; 1674 1675 ReleaseSavepoint totalOrderRelease = null; 1677 boolean canTakeReadLock = false; 1678 if (vdb.getTotalOrderQueue() != null) 1679 { 1680 totalOrderRelease = new ReleaseSavepoint(tid, name); 1681 canTakeReadLock = waitForTotalOrder(totalOrderRelease, false); 1686 if (!canTakeReadLock) 1687 totalOrderRelease = null; 1689 } 1690 1691 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1692 { 1693 try 1694 { 1695 if (canTakeReadLock) 1696 backendBlockingThreadsRWLock.acquireRead(); 1697 else 1698 { 1699 backendBlockingThreadsRWLock.acquireWrite(); 1703 } 1704 } 1705 catch (InterruptedException e) 1706 { 1707 String msg = Translate.get( 1708 "loadbalancer.backendlist.acquire.writelock.failed", e); 1709 logger.error(msg); 1710 throw new SQLException (msg); 1711 } 1712 1713 int nbOfThreads = backendBlockingThreads.size(); 1714 1715 task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1717 .getTimeout(), tm.getLogin(), tid, name); 1718 1719 for (int i = 0; i < nbOfThreads; i++) 1720 { 1721 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1722 .get(i); 1723 if (thread.hasTaskForTransaction(lTid)) 1724 { 1725 if (asynchronousBackends == null) 1726 asynchronousBackends = new ArrayList (); 1727 asynchronousBackends.add(thread.getBackend()); 1728 synchronized (thread) 1729 { 1730 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1731 thread.notify(); 1732 } 1733 } 1734 } 1735 1736 if (canTakeReadLock) 1738 backendBlockingThreadsRWLock.releaseRead(); 1739 else 1740 backendBlockingThreadsRWLock.releaseWrite(); 1741 1742 if (asynchronousBackends == null) 1744 task = null; 1745 } 1746 1747 try 1748 { 1749 if (canTakeReadLock) 1750 backendNonBlockingThreadsRWLock.acquireRead(); 1751 else 1752 backendNonBlockingThreadsRWLock.acquireWrite(); 1753 } 1754 catch (InterruptedException e) 1755 { 1756 String msg = Translate.get( 1757 "loadbalancer.backendlist.acquire.writelock.failed", e); 1758 logger.error(msg); 1759 throw new SQLException (msg); 1760 } 1761 1762 int nbOfThreads = backendNonBlockingThreads.size(); 1763 ArrayList savepointList = new ArrayList (); 1764 for (int i = 0; i < nbOfThreads; i++) 1767 { 1768 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1769 .get(i); 1770 DatabaseBackend backend = thread.getBackend(); 1771 if (backend.isStartedTransaction(lTid) 1775 && ((asynchronousBackends == null) || (!asynchronousBackends 1776 .contains(backend)))) 1777 savepointList.add(thread); 1778 } 1779 1780 nbOfThreads = savepointList.size(); 1781 if (nbOfThreads == 0) 1782 { 1783 if (canTakeReadLock) 1784 backendNonBlockingThreadsRWLock.releaseRead(); 1785 else 1786 backendNonBlockingThreadsRWLock.releaseWrite(); 1787 return; 1788 } 1789 1790 if (task == null) 1791 task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1792 .getTimeout(), tm.getLogin(), tid, name); 1793 1794 synchronized (task) 1795 { 1796 for (int i = 0; i < nbOfThreads; i++) 1798 { 1799 BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i); 1800 synchronized (thread) 1801 { 1802 thread.addTask(task, tid); 1803 thread.notify(); 1804 } 1805 } 1806 1807 if (canTakeReadLock) 1809 backendNonBlockingThreadsRWLock.releaseRead(); 1810 else 1811 backendNonBlockingThreadsRWLock.releaseWrite(); 1812 1813 if (totalOrderRelease != null) 1815 removeHeadFromAndNotifyTotalOrderQueue(); 1816 1817 try 1819 { 1820 long timeout = tm.getTimeout(); 1822 if (timeout > 0) 1823 { 1824 long start = System.currentTimeMillis(); 1825 task.wait(timeout); 1826 long end = System.currentTimeMillis(); 1827 long remaining = timeout - (end - start); 1828 if (remaining <= 0) 1829 { 1830 if (task.setExpiredTimeout()) 1831 { String msg = Translate.get( 1833 "loadbalancer.releasesavepoint.timeout", new String []{name, 1834 String.valueOf(tid), String.valueOf(task.getSuccess()), 1835 String.valueOf(task.getFailed())}); 1836 logger.warn(msg); 1837 throw new SQLException (msg); 1838 } 1839 } 1841 } 1842 else 1843 task.wait(); 1844 } 1845 catch (InterruptedException e) 1846 { 1847 if (task.setExpiredTimeout()) 1848 { String msg = Translate.get("loadbalancer.releasesavepoint.timeout", 1850 new String []{name, String.valueOf(tid), 1851 String.valueOf(task.getSuccess()), 1852 String.valueOf(task.getFailed())}); 1853 logger.warn(msg); 1854 throw new SQLException (msg); 1855 } 1856 } 1858 1859 if (task.getSuccess() > 0) 1860 return; 1861 else 1862 { ArrayList exceptions = task.getExceptions(); 1864 if (exceptions == null) 1865 throw new SQLException (Translate.get( 1866 "loadbalancer.releasesavepoint.all.failed", new String []{name, 1867 String.valueOf(tid)})); 1868 else 1869 { 1870 String errorMsg = Translate.get( 1871 "loadbalancer.releasesavepoint.failed.stack", new String []{name, 1872 String.valueOf(tid)}) 1873 + "\n"; 1874 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 1875 errorMsg); 1876 logger.error(ex.getMessage()); 1877 throw ex; 1878 } 1879 } 1880 } 1881 } 1882 1883 1890 public void setSavepoint(TransactionMarkerMetaData tm, String name) 1891 throws SQLException 1892 { 1893 long tid = tm.getTransactionId(); 1894 Long lTid = new Long (tid); 1895 1896 ArrayList asynchronousBackends = null; 1899 SavepointTask task = null; 1900 1901 SetSavepoint totalOrderSavepoint = null; 1903 boolean canTakeReadLock = false; 1904 if (vdb.getTotalOrderQueue() != null) 1905 { 1906 totalOrderSavepoint = new SetSavepoint(tid, name); 1907 canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false); 1912 if (!canTakeReadLock) 1913 totalOrderSavepoint = null; 1915 } 1916 1917 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 1918 { 1919 try 1920 { 1921 if (canTakeReadLock) 1922 backendBlockingThreadsRWLock.acquireRead(); 1923 else 1924 { 1925 backendBlockingThreadsRWLock.acquireWrite(); 1929 } 1930 } 1931 catch (InterruptedException e) 1932 { 1933 String msg = Translate.get( 1934 "loadbalancer.backendlist.acquire.writelock.failed", e); 1935 logger.error(msg); 1936 throw new SQLException (msg); 1937 } 1938 1939 int nbOfThreads = backendBlockingThreads.size(); 1940 1941 task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 1943 .getTimeout(), tm.getLogin(), tid, name); 1944 1945 for (int i = 0; i < nbOfThreads; i++) 1946 { 1947 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 1948 .get(i); 1949 if (thread.hasTaskForTransaction(lTid)) 1950 { 1951 if (asynchronousBackends == null) 1952 asynchronousBackends = new ArrayList (); 1953 asynchronousBackends.add(thread.getBackend()); 1954 synchronized (thread) 1955 { 1956 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 1957 thread.notify(); 1958 } 1959 } 1960 } 1961 1962 if (canTakeReadLock) 1964 backendBlockingThreadsRWLock.releaseRead(); 1965 else 1966 backendBlockingThreadsRWLock.releaseWrite(); 1967 1968 if (asynchronousBackends == null) 1970 task = null; 1971 } 1972 1973 try 1974 { 1975 if (canTakeReadLock) 1976 backendNonBlockingThreadsRWLock.acquireRead(); 1977 else 1978 backendNonBlockingThreadsRWLock.acquireWrite(); 1979 } 1980 catch (InterruptedException e) 1981 { 1982 String msg = Translate.get( 1983 "loadbalancer.backendlist.acquire.writelock.failed", e); 1984 logger.error(msg); 1985 throw new SQLException (msg); 1986 } 1987 1988 int nbOfThreads = backendNonBlockingThreads.size(); 1989 ArrayList savepointList = new ArrayList (); 1990 for (int i = 0; i < nbOfThreads; i++) 1993 { 1994 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 1995 .get(i); 1996 DatabaseBackend backend = thread.getBackend(); 1997 if (backend.isStartedTransaction(lTid) 2001 && ((asynchronousBackends == null) || (!asynchronousBackends 2002 .contains(backend)))) 2003 savepointList.add(thread); 2004 } 2005 2006 nbOfThreads = savepointList.size(); 2007 if (nbOfThreads == 0) 2008 { 2009 if (canTakeReadLock) 2010 backendNonBlockingThreadsRWLock.releaseRead(); 2011 else 2012 backendNonBlockingThreadsRWLock.releaseWrite(); 2013 return; 2014 } 2015 2016 if (task == null) 2017 task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm 2018 .getTimeout(), tm.getLogin(), tid, name); 2019 2020 synchronized (task) 2021 { 2022 for (int i = 0; i < nbOfThreads; i++) 2024 { 2025 BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i); 2026 synchronized (thread) 2027 { 2028 thread.addTask(task, tid); 2029 thread.notify(); 2030 } 2031 } 2032 2033 if (canTakeReadLock) 2035 backendNonBlockingThreadsRWLock.releaseRead(); 2036 else 2037 backendNonBlockingThreadsRWLock.releaseWrite(); 2038 2039 if (totalOrderSavepoint != null) 2041 removeHeadFromAndNotifyTotalOrderQueue(); 2042 2043 try 2045 { 2046 long timeout = tm.getTimeout(); 2048 if (timeout > 0) 2049 { 2050 long start = System.currentTimeMillis(); 2051 task.wait(timeout); 2052 long end = System.currentTimeMillis(); 2053 long remaining = timeout - (end - start); 2054 if (remaining <= 0) 2055 { 2056 if (task.setExpiredTimeout()) 2057 { String msg = Translate.get("loadbalancer.setsavepoint.timeout", 2059 new String []{name, String.valueOf(tid), 2060 String.valueOf(task.getSuccess()), 2061 String.valueOf(task.getFailed())}); 2062 logger.warn(msg); 2063 throw new SQLException (msg); 2064 } 2065 } 2067 } 2068 else 2069 task.wait(); 2070 } 2071 catch (InterruptedException e) 2072 { 2073 if (task.setExpiredTimeout()) 2074 { String msg = Translate.get("loadbalancer.setsavepoint.timeout", 2076 new String []{name, String.valueOf(tid), 2077 String.valueOf(task.getSuccess()), 2078 String.valueOf(task.getFailed())}); 2079 logger.warn(msg); 2080 throw new SQLException (msg); 2081 } 2082 } 2084 2085 if (task.getSuccess() > 0) 2086 return; 2087 else 2088 { ArrayList exceptions = task.getExceptions(); 2090 if (exceptions == null) 2091 throw new SQLException (Translate.get( 2092 "loadbalancer.setsavepoint.all.failed", new String []{name, 2093 String.valueOf(tid)})); 2094 else 2095 { 2096 String errorMsg = Translate.get( 2097 "loadbalancer.setsavepoint.failed.stack", new String []{name, 2098 String.valueOf(tid)}) 2099 + "\n"; 2100 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 2101 errorMsg); 2102 logger.error(ex.getMessage()); 2103 throw ex; 2104 } 2105 } 2106 } 2107 } 2108 2109 2115 private void waitForAllWritesToBePostedOnBackendBlockingThreads() 2116 throws SQLException 2117 { 2118 try 2122 { 2123 backendBlockingThreadsRWLock.acquireWrite(); 2124 } 2125 catch (InterruptedException e) 2126 { 2127 String msg = Translate.get( 2128 "loadbalancer.backendlist.acquire.writelock.failed", e); 2129 logger.error(msg); 2130 throw new SQLException (msg); 2131 } 2132 backendBlockingThreadsRWLock.releaseWrite(); 2133 } 2134 2135 2141 protected void waitForAllWritesToComplete(long transactionId) 2142 throws SQLException 2143 { 2144 waitForAllWritesToBePostedOnBackendBlockingThreads(); 2145 2146 boolean success = false; 2147 while (!success) 2148 { 2149 try 2150 { for (Iterator iter = backendBlockingThreads.iterator(); iter.hasNext();) 2153 { 2154 BackendWorkerThread thread = (BackendWorkerThread) iter.next(); 2155 thread.waitForAllTasksToComplete(transactionId); 2156 } 2157 success = true; 2158 } 2159 catch (ConcurrentModificationException e) 2160 { } 2163 } 2164 } 2165 2166 2174 protected void waitForAllWritesToComplete(DatabaseBackend backend, 2175 long transactionId) throws SQLException 2176 { 2177 waitForAllWritesToBePostedOnBackendBlockingThreads(); 2178 2179 boolean success = false; 2180 while (!success) 2181 { 2182 try 2183 { for (Iterator iter = backendBlockingThreads.iterator(); iter.hasNext();) 2186 { 2187 BackendWorkerThread thread = (BackendWorkerThread) iter.next(); 2188 if (thread.getBackend() == backend) 2189 { 2190 thread.waitForAllTasksToComplete(transactionId); 2191 break; 2192 } 2193 } 2194 success = true; 2195 } 2196 catch (ConcurrentModificationException e) 2197 { } 2200 } 2201 } 2202 2203 2210 protected void waitForAllWritesToComplete(DatabaseBackend backend) 2211 throws SQLException 2212 { 2213 waitForAllWritesToBePostedOnBackendBlockingThreads(); 2214 2215 boolean success = false; 2216 while (!success) 2217 { 2218 try 2219 { for (Iterator iter = backendBlockingThreads.iterator(); iter.hasNext();) 2222 { 2223 BackendWorkerThread thread = (BackendWorkerThread) iter.next(); 2224 if (thread.getBackend() == backend) 2225 { 2226 thread.waitForAllTasksToComplete(); 2227 break; 2228 } 2229 } 2230 success = true; 2231 } 2232 catch (ConcurrentModificationException e) 2233 { } 2236 } 2237 } 2238 2239 2242 2243 2255 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 2256 throws SQLException 2257 { 2258 if (writeEnabled) 2259 { 2260 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 2262 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 2263 2264 try 2266 { 2267 backendBlockingThreadsRWLock.acquireWrite(); 2268 } 2269 catch (InterruptedException e) 2270 { 2271 String msg = Translate.get( 2272 "loadbalancer.backendlist.acquire.writelock.failed", e); 2273 logger.error(msg); 2274 throw new SQLException (msg); 2275 } 2276 backendBlockingThreads.add(blockingThread); 2277 backendBlockingThreadsRWLock.releaseWrite(); 2278 blockingThread.start(); 2279 logger.info(Translate.get( 2280 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 2281 2282 try 2284 { 2285 backendNonBlockingThreadsRWLock.acquireWrite(); 2286 } 2287 catch (InterruptedException e) 2288 { 2289 String msg = Translate.get( 2290 "loadbalancer.backendlist.acquire.writelock.failed", e); 2291 logger.error(msg); 2292 throw new SQLException (msg); 2293 } 2294 backendNonBlockingThreads.add(nonBlockingThread); 2295 backendNonBlockingThreadsRWLock.releaseWrite(); 2296 nonBlockingThread.start(); 2297 logger.info(Translate.get( 2298 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 2299 db.enableWrite(); 2300 } 2301 2302 if (!db.isInitialized()) 2303 db.initializeConnections(); 2304 db.enableRead(); 2305 } 2306 2307 2318 public synchronized void disableBackend(DatabaseBackend db) 2319 throws SQLException 2320 { 2321 if (db.isWriteEnabled()) 2322 { 2323 KillThreadTask killBlockingThreadTask = new KillThreadTask(1, 1); 2324 2325 try 2327 { 2328 backendBlockingThreadsRWLock.acquireWrite(); 2329 } 2330 catch (InterruptedException e) 2331 { 2332 String msg = Translate.get( 2333 "loadbalancer.backendlist.acquire.writelock.failed", e); 2334 logger.error(msg); 2335 throw new SQLException (msg); 2336 } 2337 2338 int nbOfThreads = backendBlockingThreads.size(); 2339 2340 for (int i = 0; i < nbOfThreads; i++) 2342 { 2343 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 2344 .get(i); 2345 if (thread.getBackend().equals(db)) 2346 { 2347 logger.info(Translate 2348 .get("loadbalancer.backend.workerthread.blocking.remove", db 2349 .getName())); 2350 2351 backendBlockingThreads.remove(i); 2353 2354 synchronized (thread) 2355 { 2356 thread.addPriorityTask(killBlockingThreadTask); 2358 thread.notify(); 2359 } 2360 break; 2361 } 2362 } 2363 2364 backendBlockingThreadsRWLock.releaseWrite(); 2365 2366 synchronized (killBlockingThreadTask) 2368 { 2369 if (!killBlockingThreadTask.hasFullyCompleted()) 2370 try 2371 { 2372 killBlockingThreadTask.wait(); 2373 } 2374 catch (InterruptedException ignore) 2375 { 2376 } 2377 } 2378 2379 KillThreadTask killNonBlockingThreadTask = new KillThreadTask(1, 1); 2381 2382 try 2383 { 2384 backendNonBlockingThreadsRWLock.acquireWrite(); 2385 } 2386 catch (InterruptedException e) 2387 { 2388 String msg = Translate.get( 2389 "loadbalancer.backendlist.acquire.writelock.failed", e); 2390 logger.error(msg); 2391 throw new SQLException (msg); 2392 } 2393 2394 nbOfThreads = backendNonBlockingThreads.size(); 2396 for (int i = 0; i < nbOfThreads; i++) 2397 { 2398 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 2399 .get(i); 2400 if (thread.getBackend().equals(db)) 2401 { 2402 logger.info(Translate.get( 2403 "loadbalancer.backend.workerthread.non.blocking.remove", db 2404 .getName())); 2405 2406 backendNonBlockingThreads.remove(i); 2408 2409 synchronized (thread) 2410 { 2411 thread.addPriorityTask(killNonBlockingThreadTask); 2413 thread.notify(); 2414 } 2415 break; 2416 } 2417 } 2418 2419 backendNonBlockingThreadsRWLock.releaseWrite(); 2420 2421 synchronized (killNonBlockingThreadTask) 2423 { 2424 if (!killNonBlockingThreadTask.hasFullyCompleted()) 2425 try 2426 { 2427 killNonBlockingThreadTask.wait(); 2428 } 2429 catch (InterruptedException ignore) 2430 { 2431 } 2432 } 2433 } 2434 2435 db.disable(); 2436 if (db.isInitialized()) 2437 db.finalizeConnections(); 2438 } 2439 2440 2443 public int getNumberOfEnabledBackends() 2444 { 2445 return backendBlockingThreads.size(); 2446 } 2447 2448 2451 public String getXmlImpl() 2452 { 2453 StringBuffer info = new StringBuffer (); 2454 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 2455 if (createTablePolicy != null) 2456 info.append(createTablePolicy.getXml()); 2457 if (waitForCompletionPolicy != null) 2458 info.append(waitForCompletionPolicy.getXml()); 2459 if (macroHandler != null) 2460 info.append(macroHandler.getXml()); 2461 this.getRaidb2Xml(); 2462 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 2463 return info.toString(); 2464 } 2465 2466 2471 public abstract String getRaidb2Xml(); 2472 2473} | Popular Tags |