1 24 25 package org.objectweb.cjdbc.controller.loadbalancer.raidb0; 26 27 import java.sql.Connection ; 28 import java.sql.SQLException ; 29 import java.util.ArrayList ; 30 31 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 32 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException; 33 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException; 34 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 35 import org.objectweb.cjdbc.common.i18n.Translate; 36 import org.objectweb.cjdbc.common.log.Trace; 37 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 38 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 39 import org.objectweb.cjdbc.common.sql.SelectRequest; 40 import org.objectweb.cjdbc.common.sql.StoredProcedure; 41 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 42 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 43 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 44 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 45 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 46 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 47 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 48 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException; 49 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy; 50 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule; 51 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 52 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 53 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask; 54 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask; 56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask; 57 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 58 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 59 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 60 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 61 62 75 public class RAIDb0 extends AbstractLoadBalancer 76 { 77 87 private ArrayList backendThreads; 88 private ReadPrioritaryFIFOWriteLock backendThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 89 private CreateTablePolicy createTablePolicy; 90 91 protected static Trace logger = Trace 92 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb0"); 93 94 97 98 106 public RAIDb0(VirtualDatabase vdb, CreateTablePolicy createTablePolicy) 107 throws Exception 108 { 109 super(vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE); 110 backendThreads = new ArrayList (); 111 this.createTablePolicy = createTablePolicy; 112 } 113 114 117 118 128 public ControllerResultSet execReadRequest(SelectRequest request, 129 MetadataCache metadataCache) throws SQLException 130 { 131 try 132 { 133 vdb.acquireReadLockBackendLists(); } 137 catch (InterruptedException e) 138 { 139 String msg = Translate.get( 140 "loadbalancer.backendlist.acquire.readlock.failed", e); 141 logger.error(msg); 142 throw new SQLException (msg); 143 } 144 145 try 146 { 147 ControllerResultSet rs = null; 148 ArrayList fromTables = request.getFrom(); 149 150 if (fromTables == null) 151 throw new SQLException (Translate.get("loadbalancer.from.not.found", 152 request.getSQLShortForm(vdb.getSQLShortFormLength()))); 153 154 ArrayList backends = vdb.getBackends(); 156 int size = backends.size(); 157 int enabledBackends = 0; 158 159 DatabaseBackend backend = null; 160 for (int i = 0; i < size; i++) 162 { 163 backend = (DatabaseBackend) backends.get(i); 164 if (backend.isReadEnabled()) 165 enabledBackends++; 166 if (backend.isReadEnabled() && backend.hasTables(fromTables)) 167 break; 168 else 169 backend = null; 170 } 171 172 if (backend == null) 173 { 174 if (enabledBackends == 0) 175 throw new NoMoreBackendException(Translate.get( 176 "loadbalancer.execute.no.backend.enabled", request.getId())); 177 else 178 throw new SQLException (Translate.get( 179 "loadbalancer.backend.no.required.tables", fromTables.toString())); 180 } 181 182 if (logger.isDebugEnabled()) 183 { 184 logger.debug("Backend " + backend.getName() 185 + " has all tables which are:"); 186 for (int i = 0; i < fromTables.size(); i++) 187 { 188 logger.debug(fromTables.get(i)); 189 } 190 } 191 192 try 194 { 195 rs = executeRequestOnBackend(request, backend, metadataCache); 196 } 197 catch (SQLException se) 198 { 199 String msg = Translate.get("loadbalancer.request.failed", new String []{ 200 String.valueOf(request.getId()), se.getMessage()}); 201 if (logger.isInfoEnabled()) 202 logger.info(msg); 203 throw new SQLException (msg); 204 } 205 206 return rs; 207 } 208 catch (RuntimeException e) 209 { 210 String msg = Translate 211 .get("loadbalancer.request.failed", new String []{ 212 request.getSQLShortForm(vdb.getSQLShortFormLength()), 213 e.getMessage()}); 214 logger.fatal(msg, e); 215 throw new SQLException (msg); 216 } 217 finally 218 { 219 vdb.releaseReadLockBackendLists(); } 223 } 224 225 233 public int execWriteRequest(AbstractWriteRequest request) throws SQLException 234 { 235 handleMacros(request); 237 238 try 239 { 240 vdb.acquireReadLockBackendLists(); } 244 catch (InterruptedException e) 245 { 246 String msg = Translate.get( 247 "loadbalancer.backendlist.acquire.readlock.failed", e); 248 logger.error(msg); 249 throw new SQLException (msg); 250 } 251 252 try 253 { 254 String table = request.getTableName(); 255 AbstractConnectionManager cm = null; 256 257 if (table == null) 258 throw new SQLException (Translate.get( 259 "loadbalancer.request.target.table.not.found", request 260 .getSQLShortForm(vdb.getSQLShortFormLength()))); 261 262 ArrayList backends = vdb.getBackends(); 264 int size = backends.size(); 265 266 DatabaseBackend backend = null; 267 if (request.isCreate()) 269 { CreateTableRule rule = createTablePolicy.getTableRule(request 271 .getTableName()); 272 if (rule == null) 273 rule = createTablePolicy.getDefaultRule(); 274 275 ArrayList choosen; 277 try 278 { 279 choosen = rule.getBackends(backends); 280 } 281 catch (CreateTableException e) 282 { 283 throw new SQLException (Translate.get( 284 "loadbalancer.create.table.rule.failed", e.getMessage())); 285 } 286 287 if (choosen != null) 289 backend = (DatabaseBackend) choosen.get(0); 290 if (backend != null) 291 cm = backend.getConnectionManager(request.getLogin()); 292 } 293 else 294 { for (int i = 0; i < size; i++) 296 { 297 backend = (DatabaseBackend) backends.get(i); 298 if ((backend.isWriteEnabled() || backend.isDisabling()) && backend.hasTable(table)) 299 { 300 cm = backend.getConnectionManager(request.getLogin()); 301 break; 302 } 303 } 304 } 305 306 if (cm == null) 308 throw new SQLException (Translate.get( 309 "loadbalancer.backend.no.required.table", table)); 310 311 313 if (request.isAutoCommit()) 314 { 315 if (backend.isDisabling()) 318 throw new SQLException (Translate.get( 319 "loadbalancer.backend.is.disabling", new String []{ 320 request.getSQLShortForm(vdb.getSQLShortFormLength()), 321 backend.getName()})); 322 323 Connection c = null; 325 try 326 { 327 c = cm.getConnection(); 328 } 329 catch (UnreachableBackendException e1) 330 { 331 logger.error(Translate.get( 332 "loadbalancer.backend.disabling.unreachable", backend.getName())); 333 disableBackend(backend); 334 throw new SQLException (Translate.get( 335 "loadbalancer.backend.unreacheable", backend.getName())); 336 } 337 338 if (c == null) 340 throw new SQLException (Translate.get( 341 "loadbalancer.backend.no.connection", backend.getName())); 342 343 int result; 344 try 345 { 346 result = executeUpdateRequestOnBackend(request, backend, c); 347 } 348 catch (Exception e) 349 { 350 throw new SQLException (Translate.get("loadbalancer.request.failed", 351 new String []{ 352 request.getSQLShortForm(vdb.getSQLShortFormLength()), 353 e.getMessage()})); 354 } 355 finally 356 { 357 cm.releaseConnection(c); 358 } 359 if (logger.isDebugEnabled()) 360 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 361 String.valueOf(request.getId()), backend.getName()})); 362 return result; 363 } 364 else 365 { Connection c; 367 long tid = request.getTransactionId(); 368 Long lTid = new Long (tid); 369 370 try 371 { 372 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 373 request.getTransactionIsolation()); 374 } 375 catch (UnreachableBackendException e1) 376 { 377 logger.error(Translate.get( 378 "loadbalancer.backend.disabling.unreachable", backend.getName())); 379 disableBackend(backend); 380 throw new SQLException (Translate.get( 381 "loadbalancer.backend.unreacheable", backend.getName())); 382 } 383 catch (NoTransactionStartWhenDisablingException e) 384 { 385 String msg = Translate.get("loadbalancer.backend.is.disabling", 386 new String []{ 387 request.getSQLShortForm(vdb.getSQLShortFormLength()), 388 backend.getName()}); 389 logger.error(msg); 390 throw new SQLException (msg); 391 } 392 393 if (c == null) 395 throw new SQLException (Translate.get( 396 "loadbalancer.unable.retrieve.connection", new String []{ 397 String.valueOf(tid), backend.getName()})); 398 399 int result; 401 try 402 { 403 result = executeUpdateRequestOnBackend(request, backend, c); 404 } 405 catch (Exception e) 406 { 407 throw new SQLException (Translate.get("loadbalancer.request.failed", 408 new String []{ 409 request.getSQLShortForm(vdb.getSQLShortFormLength()), 410 e.getMessage()})); 411 } 412 if (logger.isDebugEnabled()) 413 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 414 String.valueOf(request.getId()), backend.getName()})); 415 return result; 416 } 417 } 418 catch (RuntimeException e) 419 { 420 String msg = Translate 421 .get("loadbalancer.request.failed", new String []{ 422 request.getSQLShortForm(vdb.getSQLShortFormLength()), 423 e.getMessage()}); 424 logger.fatal(msg, e); 425 throw new SQLException (msg); 426 } 427 finally 428 { 429 vdb.releaseReadLockBackendLists(); } 433 } 434 435 439 public ControllerResultSet execWriteRequestWithKeys( 440 AbstractWriteRequest request, MetadataCache metadataCache) 441 throws SQLException 442 { 443 handleMacros(request); 445 446 try 447 { 448 vdb.acquireReadLockBackendLists(); } 452 catch (InterruptedException e) 453 { 454 String msg = Translate.get( 455 "loadbalancer.backendlist.acquire.readlock.failed", e); 456 logger.error(msg); 457 throw new SQLException (msg); 458 } 459 460 try 461 { 462 String table = request.getTableName(); 463 AbstractConnectionManager cm = null; 464 465 if (table == null) 466 throw new SQLException (Translate.get( 467 "loadbalancer.request.target.table.not.found", request 468 .getSQLShortForm(vdb.getSQLShortFormLength()))); 469 470 ArrayList backends = vdb.getBackends(); 472 int size = backends.size(); 473 474 DatabaseBackend backend = null; 475 if (request.isCreate()) 477 { CreateTableRule rule = createTablePolicy.getTableRule(request 479 .getTableName()); 480 if (rule == null) 481 rule = createTablePolicy.getDefaultRule(); 482 483 ArrayList choosen; 485 try 486 { 487 choosen = rule.getBackends(backends); 488 } 489 catch (CreateTableException e) 490 { 491 throw new SQLException (Translate.get( 492 "loadbalancer.create.table.rule.failed", e.getMessage())); 493 } 494 495 if (choosen != null) 497 backend = (DatabaseBackend) choosen.get(0); 498 if (backend != null) 499 cm = backend.getConnectionManager(request.getLogin()); 500 } 501 else 502 { for (int i = 0; i < size; i++) 504 { 505 backend = (DatabaseBackend) backends.get(i); 506 if ((backend.isWriteEnabled() || backend.isDisabling()) && backend.hasTable(table)) 507 { 508 cm = backend.getConnectionManager(request.getLogin()); 509 break; 510 } 511 } 512 } 513 514 if (cm == null) 516 throw new SQLException (Translate.get( 517 "loadbalancer.backend.no.required.table", table)); 518 519 if (!backend.getDriverCompliance().supportGetGeneratedKeys()) 520 throw new SQLException (Translate.get( 521 "loadbalancer.backend.autogeneratedkeys.unsupported", backend 522 .getName())); 523 524 526 if (request.isAutoCommit()) 527 { 528 if (backend.isDisabling()) 531 throw new SQLException (Translate.get( 532 "loadbalancer.backend.is.disabling", new String []{ 533 request.getSQLShortForm(vdb.getSQLShortFormLength()), 534 backend.getName()})); 535 536 Connection c = null; 538 try 539 { 540 c = cm.getConnection(); 541 } 542 catch (UnreachableBackendException e1) 543 { 544 logger.error(Translate.get( 545 "loadbalancer.backend.disabling.unreachable", backend.getName())); 546 disableBackend(backend); 547 throw new SQLException (Translate.get( 548 "loadbalancer.backend.unreacheable", backend.getName())); 549 } 550 551 if (c == null) 553 throw new SQLException (Translate.get( 554 "loadbalancer.backend.no.connection", backend.getName())); 555 556 ControllerResultSet result; 558 try 559 { 560 result = executeUpdateRequestOnBackendWithKeys(request, backend, c, 561 metadataCache); 562 } 563 catch (Exception e) 564 { 565 throw new SQLException (Translate.get("loadbalancer.request.failed", 566 new String []{ 567 request.getSQLShortForm(vdb.getSQLShortFormLength()), 568 e.getMessage()})); 569 } 570 finally 571 { 572 backend.removePendingRequest(request); 573 cm.releaseConnection(c); 574 } 575 if (logger.isDebugEnabled()) 576 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 577 String.valueOf(request.getId()), backend.getName()})); 578 return result; 579 } 580 else 581 { Connection c; 583 long tid = request.getTransactionId(); 584 Long lTid = new Long (tid); 585 586 try 587 { 588 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 589 request.getTransactionIsolation()); 590 } 591 catch (UnreachableBackendException e1) 592 { 593 logger.error(Translate.get( 594 "loadbalancer.backend.disabling.unreachable", backend.getName())); 595 disableBackend(backend); 596 throw new SQLException (Translate.get( 597 "loadbalancer.backend.unreacheable", backend.getName())); 598 } 599 catch (NoTransactionStartWhenDisablingException e) 600 { 601 String msg = Translate.get("loadbalancer.backend.is.disabling", 602 new String []{ 603 request.getSQLShortForm(vdb.getSQLShortFormLength()), 604 backend.getName()}); 605 logger.error(msg); 606 throw new SQLException (msg); 607 } 608 609 if (c == null) 611 throw new SQLException (Translate.get( 612 "loadbalancer.unable.retrieve.connection", new String []{ 613 String.valueOf(tid), backend.getName()})); 614 615 ControllerResultSet result; 617 try 618 { 619 result = executeUpdateRequestOnBackendWithKeys(request, backend, c, 620 metadataCache); 621 } 622 catch (Exception e) 623 { 624 throw new SQLException (Translate.get("loadbalancer.request.failed", 625 new String []{ 626 request.getSQLShortForm(vdb.getSQLShortFormLength()), 627 e.getMessage()})); 628 } 629 if (logger.isDebugEnabled()) 630 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 631 String.valueOf(request.getId()), backend.getName()})); 632 return result; 633 } 634 } 635 catch (RuntimeException e) 636 { 637 String msg = Translate 638 .get("loadbalancer.request.failed", new String []{ 639 request.getSQLShortForm(vdb.getSQLShortFormLength()), 640 e.getMessage()}); 641 logger.fatal(msg, e); 642 throw new SQLException (msg); 643 } 644 finally 645 { 646 vdb.releaseReadLockBackendLists(); } 650 } 651 652 661 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 662 DatabaseBackend backend, MetadataCache metadataCache) throws SQLException 663 { 664 handleMacros(request); 666 667 AbstractConnectionManager cm = backend.getConnectionManager(request 669 .getLogin()); 670 671 if (cm == null) 673 { 674 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 675 new String []{request.getLogin(), backend.getName()}); 676 logger.error(msg); 677 throw new SQLException (msg); 678 } 679 680 if (request.isAutoCommit()) 682 { 683 ControllerResultSet rs = null; 684 boolean badConnection; 685 do 686 { 687 badConnection = false; 688 Connection c = null; 690 try 691 { 692 c = cm.getConnection(); 693 } 694 catch (UnreachableBackendException e1) 695 { 696 logger.error(Translate.get( 697 "loadbalancer.backend.disabling.unreachable", backend.getName())); 698 disableBackend(backend); 699 throw new SQLException (Translate.get( 700 "loadbalancer.backend.unreacheable", backend.getName())); 701 } 702 703 if (c == null) 705 throw new SQLException (Translate.get( 706 "loadbalancer.backend.no.connection", backend.getName())); 707 708 try 710 { 711 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 712 cm.releaseConnection(c); 713 } 714 catch (SQLException e) 715 { 716 cm.releaseConnection(c); 717 throw new SQLException (Translate.get( 718 "loadbalancer.request.failed.on.backend", new String []{ 719 request.getSQLShortForm(vdb.getSQLShortFormLength()), 720 backend.getName(), e.getMessage()})); 721 } 722 catch (BadConnectionException e) 723 { cm.deleteConnection(c); 725 badConnection = true; 726 } 727 } 728 while (badConnection); 729 if (logger.isDebugEnabled()) 730 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 731 String.valueOf(request.getId()), backend.getName()})); 732 return rs; 733 } 734 else 735 { Connection c; 737 long tid = request.getTransactionId(); 738 Long lTid = new Long (tid); 739 740 try 741 { 742 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 743 request.getTransactionIsolation()); 744 } 745 catch (UnreachableBackendException e1) 746 { 747 logger.error(Translate.get( 748 "loadbalancer.backend.disabling.unreachable", backend.getName())); 749 disableBackend(backend); 750 throw new SQLException (Translate.get( 751 "loadbalancer.backend.unreacheable", backend.getName())); 752 } 753 catch (NoTransactionStartWhenDisablingException e) 754 { 755 String msg = Translate.get("loadbalancer.backend.is.disabling", 756 new String []{request.getSQLShortForm(vdb.getSQLShortFormLength()), 757 backend.getName()}); 758 logger.error(msg); 759 throw new SQLException (msg); 760 } 761 762 if (c == null) 764 throw new SQLException (Translate.get( 765 "loadbalancer.unable.retrieve.connection", new String []{ 766 String.valueOf(tid), backend.getName()})); 767 768 ControllerResultSet rs = null; 770 try 771 { 772 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 773 } 774 catch (SQLException e) 775 { 776 throw new SQLException (Translate.get( 777 "loadbalancer.request.failed.on.backend", new String []{ 778 request.getSQLShortForm(vdb.getSQLShortFormLength()), 779 backend.getName(), e.getMessage()})); 780 } 781 catch (BadConnectionException e) 782 { cm.deleteConnection(tid); 784 throw new SQLException (Translate 785 .get("loadbalancer.connection.failed", new String []{ 786 String.valueOf(tid), backend.getName(), e.getMessage()})); 787 } 788 if (logger.isDebugEnabled()) 789 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 790 new String []{String.valueOf(tid), String.valueOf(request.getId()), 791 backend.getName()})); 792 return rs; 793 } 794 } 795 796 800 public ControllerResultSet execReadOnlyReadStoredProcedure( 801 StoredProcedure proc, MetadataCache metadataCache) throws SQLException 802 { 803 throw new SQLException ( 804 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 805 } 806 807 811 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 812 MetadataCache metadataCache) throws SQLException 813 { 814 throw new SQLException ( 815 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 816 } 817 818 821 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 822 { 823 throw new SQLException ( 824 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 825 } 826 827 830 831 837 public final void begin(TransactionMarkerMetaData tm) throws SQLException 838 { 839 } 840 841 847 public void commit(TransactionMarkerMetaData tm) throws SQLException 848 { 849 try 850 { 851 backendThreadsRWLock.acquireRead(); 852 } 853 catch (InterruptedException e) 854 { 855 String msg = Translate.get( 856 "loadbalancer.backendlist.acquire.readlock.failed", e); 857 logger.error(msg); 858 throw new SQLException (msg); 859 } 860 861 int nbOfThreads = backendThreads.size(); 862 ArrayList commitList = new ArrayList (); 863 long tid = tm.getTransactionId(); 864 Long lTid = new Long (tid); 865 866 for (int i = 0; i < nbOfThreads; i++) 868 { 869 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 870 if (thread.getBackend().isStartedTransaction(lTid)) 871 commitList.add(thread); 872 } 873 874 nbOfThreads = commitList.size(); 875 if (nbOfThreads == 0) 876 { backendThreadsRWLock.releaseRead(); 880 return; 881 } 882 883 CommitTask task = new CommitTask(nbOfThreads, nbOfThreads, tm.getTimeout(), tm.getLogin(), tid); 886 887 synchronized (task) 888 { 889 for (int i = 0; i < nbOfThreads; i++) 891 { 892 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 893 synchronized (thread) 894 { 895 thread.addTask(task, tid); 896 thread.notify(); 897 } 898 } 899 900 backendThreadsRWLock.releaseRead(); 901 902 try 904 { 905 long timeout = tm.getTimeout(); 907 if (timeout > 0) 908 { 909 long start = System.currentTimeMillis(); 910 task.wait(timeout); 911 long end = System.currentTimeMillis(); 912 long remaining = timeout - (end - start); 913 if (remaining <= 0) 914 { 915 if (task.setExpiredTimeout()) 916 { String msg = Translate.get("loadbalancer.commit.timeout", 918 new String []{String.valueOf(tid), 919 String.valueOf(task.getSuccess()), 920 String.valueOf(task.getFailed())}); 921 logger.warn(msg); 922 throw new SQLException (msg); 923 } 924 } 926 } 927 else 928 task.wait(); 929 } 930 catch (InterruptedException e) 931 { 932 if (task.setExpiredTimeout()) 933 { String msg = Translate.get("loadbalancer.commit.timeout", 935 new String []{String.valueOf(tid), 936 String.valueOf(task.getSuccess()), 937 String.valueOf(task.getFailed())}); 938 logger.warn(msg); 939 throw new SQLException (msg); 940 } 941 } 943 944 if (task.getSuccess() > 0) 945 return; 946 else 947 { ArrayList exceptions = task.getExceptions(); 949 if (exceptions == null) 950 throw new SQLException (Translate.get( 951 "loadbalancer.commit.all.failed", tid)); 952 else 953 { 954 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 955 tid) 956 + "\n"; 957 for (int i = 0; i < exceptions.size(); i++) 958 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 959 logger.error(errorMsg); 960 throw new SQLException (errorMsg); 961 } 962 } 963 } 964 } 965 966 972 public void rollback(TransactionMarkerMetaData tm) throws SQLException 973 { 974 try 975 { 976 backendThreadsRWLock.acquireRead(); 977 } 978 catch (InterruptedException e) 979 { 980 String msg = Translate.get( 981 "loadbalancer.backendlist.acquire.readlock.failed", e); 982 logger.error(msg); 983 throw new SQLException (msg); 984 } 985 int nbOfThreads = backendThreads.size(); 986 ArrayList rollbackList = new ArrayList (); 987 long tid = tm.getTransactionId(); 988 Long lTid = new Long (tid); 989 990 for (int i = 0; i < nbOfThreads; i++) 992 { 993 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 994 if (thread.getBackend().isStartedTransaction(lTid)) 995 rollbackList.add(thread); 996 } 997 998 nbOfThreads = rollbackList.size(); 999 if (nbOfThreads == 0) 1000 { backendThreadsRWLock.releaseRead(); 1004 return; 1005 } 1006 1007 RollbackTask task = new RollbackTask(nbOfThreads, nbOfThreads, tm.getTimeout(), tm.getLogin(), tid); 1011 1012 synchronized (task) 1013 { 1014 for (int i = 0; i < nbOfThreads; i++) 1016 { 1017 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 1018 synchronized (thread) 1019 { 1020 thread.addTask(task, tid); 1021 thread.notify(); 1022 } 1023 } 1024 1025 backendThreadsRWLock.releaseRead(); 1026 1027 try 1029 { 1030 long timeout = tm.getTimeout(); 1032 if (timeout > 0) 1033 { 1034 long start = System.currentTimeMillis(); 1035 task.wait(timeout); 1036 long end = System.currentTimeMillis(); 1037 long remaining = timeout - (end - start); 1038 if (remaining <= 0) 1039 { 1040 if (task.setExpiredTimeout()) 1041 { String msg = Translate.get("loadbalancer.rollback.timeout", 1043 new String []{String.valueOf(tid), 1044 String.valueOf(task.getSuccess()), 1045 String.valueOf(task.getFailed())}); 1046 logger.warn(msg); 1047 throw new SQLException (msg); 1048 } 1049 } 1051 } 1052 else 1053 task.wait(); 1054 } 1055 catch (InterruptedException e) 1056 { 1057 if (task.setExpiredTimeout()) 1058 { String msg = Translate.get("loadbalancer.rollback.timeout", 1060 new String []{String.valueOf(tid), 1061 String.valueOf(task.getSuccess()), 1062 String.valueOf(task.getFailed())}); 1063 logger.warn(msg); 1064 throw new SQLException (msg); 1065 } 1066 } 1068 1069 if (task.getSuccess() > 0) 1070 return; 1071 else 1072 { ArrayList exceptions = task.getExceptions(); 1074 if (exceptions == null) 1075 throw new SQLException (Translate.get( 1076 "loadbalancer.rollback.all.failed", tid)); 1077 else 1078 { 1079 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 1080 tid) 1081 + "\n"; 1082 for (int i = 0; i < exceptions.size(); i++) 1083 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 1084 logger.error(errorMsg); 1085 throw new SQLException (errorMsg); 1086 } 1087 } 1088 } 1089 } 1090 1091 1098 public void rollback(TransactionMarkerMetaData tm, String savepointName) 1099 throws SQLException 1100 { 1101 try 1102 { 1103 backendThreadsRWLock.acquireRead(); 1104 } 1105 catch (InterruptedException e) 1106 { 1107 String msg = Translate.get( 1108 "loadbalancer.backendlist.acquire.readlock.failed", e); 1109 logger.error(msg); 1110 throw new SQLException (msg); 1111 } 1112 int nbOfThreads = backendThreads.size(); 1113 ArrayList rollbackList = new ArrayList (); 1114 long tid = tm.getTransactionId(); 1115 Long lTid = new Long (tid); 1116 1117 for (int i = 0; i < nbOfThreads; i++) 1119 { 1120 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 1121 if (thread.getBackend().isStartedTransaction(lTid)) 1122 rollbackList.add(thread); 1123 } 1124 1125 nbOfThreads = rollbackList.size(); 1126 if (nbOfThreads == 0) 1127 { backendThreadsRWLock.releaseRead(); 1131 return; 1132 } 1133 1134 RollbackToSavepointTask task = new RollbackToSavepointTask(nbOfThreads, 1136 nbOfThreads, tm.getTimeout(), tm.getLogin(), tid, savepointName); 1137 1138 synchronized (task) 1139 { 1140 for (int i = 0; i < nbOfThreads; i++) 1142 { 1143 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 1144 synchronized (thread) 1145 { 1146 thread.addTask(task, tid); 1147 thread.notify(); 1148 } 1149 } 1150 1151 backendThreadsRWLock.releaseRead(); 1152 try 1154 { 1155 long timeout = tm.getTimeout(); 1157 if (timeout > 0) 1158 { 1159 long start = System.currentTimeMillis(); 1160 task.wait(timeout); 1161 long end = System.currentTimeMillis(); 1162 long remaining = timeout - (end - start); 1163 if (remaining <= 0) 1164 { 1165 if (task.setExpiredTimeout()) 1166 { String msg = Translate.get( 1168 "loadbalancer.rollbacksavepoint.timeout", 1169 new String []{savepointName, String.valueOf(tid), 1170 String.valueOf(task.getSuccess()), 1171 String.valueOf(task.getFailed())}); 1172 logger.warn(msg); 1173 throw new SQLException (msg); 1174 } 1175 } 1177 } 1178 else 1179 task.wait(); 1180 } 1181 catch (InterruptedException e) 1182 { 1183 if (task.setExpiredTimeout()) 1184 { String msg = Translate.get( 1186 "loadbalancer.rollbacksavepoint.timeout", 1187 new String []{savepointName, String.valueOf(tid), 1188 String.valueOf(task.getSuccess()), 1189 String.valueOf(task.getFailed())}); 1190 logger.warn(msg); 1191 throw new SQLException (msg); 1192 } 1193 } 1195 1196 if (task.getSuccess() > 0) 1197 return; 1198 else 1199 { ArrayList exceptions = task.getExceptions(); 1201 if (exceptions == null) 1202 throw new SQLException (Translate.get( 1203 "loadbalancer.rollbacksavepoint.all.failed", 1204 new String []{savepointName, String.valueOf(tid)})); 1205 else 1206 { 1207 String errorMsg = Translate.get( 1208 "loadbalancer.rollbacksavepoint.failed.stack", 1209 new String []{savepointName, String.valueOf(tid)}) 1210 + "\n"; 1211 for (int i = 0; i < exceptions.size(); i++) 1212 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 1213 logger.error(errorMsg); 1214 throw new SQLException (errorMsg); 1215 } 1216 } 1217 } 1218 } 1219 1220 1227 public void releaseSavepoint(TransactionMarkerMetaData tm, String name) 1228 throws SQLException 1229 { 1230 long tid = tm.getTransactionId(); 1231 Long lTid = new Long (tid); 1232 1233 try 1235 { 1236 backendThreadsRWLock.acquireRead(); 1237 } 1238 catch (InterruptedException e) 1239 { 1240 String msg = Translate.get( 1241 "loadbalancer.backendlist.acquire.readlock.failed", e); 1242 logger.error(msg); 1243 throw new SQLException (msg); 1244 } 1245 1246 ArrayList savepointList = new ArrayList (); 1249 int nbOfThreads = backendThreads.size(); 1250 for (int i = 0; i < nbOfThreads; i++) 1251 { 1252 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 1253 if (thread.getBackend().isStartedTransaction(lTid)) 1254 savepointList.add(thread); 1255 } 1256 1257 nbOfThreads = savepointList.size(); 1258 if (nbOfThreads == 0) 1259 { backendThreadsRWLock.releaseRead(); 1262 return; 1263 } 1264 1265 ReleaseSavepointTask task = new ReleaseSavepointTask(nbOfThreads, 1267 nbOfThreads, tm.getTimeout(), tm.getLogin(), tid, name); 1268 1269 synchronized (task) 1270 { 1271 for (int i = 0; i < nbOfThreads; i++) 1273 { 1274 BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i); 1275 synchronized (thread) 1276 { 1277 thread.addTask(task, tid); 1278 thread.notify(); 1279 } 1280 } 1281 1282 backendThreadsRWLock.releaseRead(); 1283 1284 try 1286 { 1287 long timeout = tm.getTimeout(); 1289 if (timeout > 0) 1290 { 1291 long start = System.currentTimeMillis(); 1292 task.wait(timeout); 1293 long end = System.currentTimeMillis(); 1294 long remaining = timeout - (end - start); 1295 if (remaining <= 0) 1296 { 1297 if (task.setExpiredTimeout()) 1298 { String msg = Translate.get( 1300 "loadbalancer.releasesavepoint.timeout", 1301 new String []{name, String.valueOf(tid), 1302 String.valueOf(task.getSuccess()), 1303 String.valueOf(task.getFailed())}); 1304 logger.warn(msg); 1305 throw new SQLException (msg); 1306 } 1307 } 1309 } 1310 else 1311 task.wait(); 1312 } 1313 catch (InterruptedException e) 1314 { 1315 if (task.setExpiredTimeout()) 1316 { String msg = Translate.get("loadbalancer.releasesavepoint.timeout", 1318 new String []{name, String.valueOf(tid), 1319 String.valueOf(task.getSuccess()), 1320 String.valueOf(task.getFailed())}); 1321 logger.warn(msg); 1322 throw new SQLException (msg); 1323 } 1324 } 1326 1327 if (task.getSuccess() > 0) 1328 return; 1329 else 1330 { ArrayList exceptions = task.getExceptions(); 1332 if (exceptions == null) 1333 throw new SQLException (Translate.get( 1334 "loadbalancer.releasesavepoint.all.failed", 1335 new String []{name, String.valueOf(tid)})); 1336 else 1337 { 1338 String errorMsg = Translate.get( 1339 "loadbalancer.releasesavepoint.failed.stack", 1340 new String []{name, String.valueOf(tid)}) 1341 + "\n"; 1342 for (int i = 0; i < exceptions.size(); i++) 1343 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 1344 logger.error(errorMsg); 1345 throw new SQLException (errorMsg); 1346 } 1347 } 1348 } 1349 } 1350 1351 1358 public void setSavepoint(TransactionMarkerMetaData tm, String name) 1359 throws SQLException 1360 { 1361 long tid = tm.getTransactionId(); 1362 Long lTid = new Long (tid); 1363 1364 try 1366 { 1367 backendThreadsRWLock.acquireRead(); 1368 } 1369 catch (InterruptedException e) 1370 { 1371 String msg = Translate.get( 1372 "loadbalancer.backendlist.acquire.readlock.failed", e); 1373 logger.error(msg); 1374 throw new SQLException (msg); 1375 } 1376 1377 ArrayList savepointList = new ArrayList (); 1380 int nbOfThreads = backendThreads.size(); 1381 for (int i = 0; i < nbOfThreads; i++) 1382 { 1383 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 1384 if (thread.getBackend().isStartedTransaction(lTid)) 1385 savepointList.add(thread); 1386 } 1387 1388 nbOfThreads = savepointList.size(); 1389 if (nbOfThreads == 0) 1390 { backendThreadsRWLock.releaseRead(); 1393 return; 1394 } 1395 1396 SavepointTask task = new SavepointTask(nbOfThreads, nbOfThreads, 1398 tm.getTimeout(), tm.getLogin(), tid, name); 1399 1400 synchronized (task) 1401 { 1402 for (int i = 0; i < nbOfThreads; i++) 1404 { 1405 BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i); 1406 synchronized (thread) 1407 { 1408 thread.addTask(task, tid); 1409 thread.notify(); 1410 } 1411 } 1412 1413 backendThreadsRWLock.releaseRead(); 1414 1415 try 1417 { 1418 long timeout = tm.getTimeout(); 1420 if (timeout > 0) 1421 { 1422 long start = System.currentTimeMillis(); 1423 task.wait(timeout); 1424 long end = System.currentTimeMillis(); 1425 long remaining = timeout - (end - start); 1426 if (remaining <= 0) 1427 { 1428 if (task.setExpiredTimeout()) 1429 { String msg = Translate.get("loadbalancer.setsavepoint.timeout", 1431 new String []{name, String.valueOf(tid), 1432 String.valueOf(task.getSuccess()), 1433 String.valueOf(task.getFailed())}); 1434 logger.warn(msg); 1435 throw new SQLException (msg); 1436 } 1437 } 1439 } 1440 else 1441 task.wait(); 1442 } 1443 catch (InterruptedException e) 1444 { 1445 if (task.setExpiredTimeout()) 1446 { String msg = Translate.get("loadbalancer.setsavepoint.timeout", 1448 new String []{name, String.valueOf(tid), 1449 String.valueOf(task.getSuccess()), 1450 String.valueOf(task.getFailed())}); 1451 logger.warn(msg); 1452 throw new SQLException (msg); 1453 } 1454 } 1456 1457 if (task.getSuccess() > 0) 1458 return; 1459 else 1460 { ArrayList exceptions = task.getExceptions(); 1462 if (exceptions == null) 1463 throw new SQLException (Translate.get( 1464 "loadbalancer.setsavepoint.all.failed", 1465 new String []{name, String.valueOf(tid)})); 1466 else 1467 { 1468 String errorMsg = Translate.get( 1469 "loadbalancer.setsavepoint.failed.stack", 1470 new String []{name, String.valueOf(tid)}) 1471 + "\n"; 1472 for (int i = 0; i < exceptions.size(); i++) 1473 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 1474 logger.error(errorMsg); 1475 throw new SQLException (errorMsg); 1476 } 1477 } 1478 } 1479 } 1480 1481 1484 1485 1497 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 1498 throws SQLException 1499 { 1500 BackendWorkerThread thread = new BackendWorkerThread(db, this); 1502 try 1503 { 1504 backendThreadsRWLock.acquireWrite(); 1505 } 1506 catch (InterruptedException e) 1507 { 1508 String msg = Translate.get( 1509 "loadbalancer.backendlist.acquire.writelock.failed", e); 1510 logger.error(msg); 1511 throw new SQLException (msg); 1512 } 1513 backendThreads.add(thread); 1514 backendThreadsRWLock.releaseWrite(); 1515 thread.start(); 1516 logger.info(Translate.get("loadbalancer.backend.workerthread.add", db 1517 .getName())); 1518 1519 if (!db.isInitialized()) 1520 db.initializeConnections(); 1521 db.enableRead(); 1522 if (writeEnabled) 1523 db.enableWrite(); 1524 } 1525 1526 1537 public synchronized void disableBackend(DatabaseBackend db) 1538 throws SQLException 1539 { 1540 int nbOfThreads = backendThreads.size(); 1541 1542 for (int i = 0; i < nbOfThreads; i++) 1544 { 1545 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 1546 if (thread.getBackend().equals(db)) 1547 { 1548 logger.info(Translate.get("loadbalancer.backend.workerthread.remove", 1549 db.getName())); 1550 1551 try 1553 { 1554 backendThreadsRWLock.acquireWrite(); 1555 } 1556 catch (InterruptedException e) 1557 { 1558 String msg = Translate.get( 1559 "loadbalancer.backendlist.acquire.writelock.failed", e); 1560 logger.error(msg); 1561 throw new SQLException (msg); 1562 } 1563 backendThreads.remove(thread); 1564 backendThreadsRWLock.releaseWrite(); 1565 1566 synchronized (thread) 1567 { 1568 thread.addPriorityTask(new KillThreadTask(1, 1)); 1570 thread.notify(); 1571 } 1572 break; 1573 } 1574 } 1575 1576 db.disable(); 1577 if (db.isInitialized()) 1578 db.finalizeConnections(); 1579 } 1580 1581 1585 public void setWeight(String name, int w) throws SQLException 1586 { 1587 throw new SQLException ("Weight is not supported with this load balancer"); 1588 } 1589 1590 1593 public int getNumberOfEnabledBackends() 1594 { 1595 return backendThreads.size(); 1596 } 1597 1598 1601 1602 1607 public String getInformation() 1608 { 1609 return "RAIDb-0 Request load balancer\n"; 1610 } 1611 1612 1615 public String getXmlImpl() 1616 { 1617 StringBuffer info = new StringBuffer (); 1618 info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">"); 1619 createTablePolicy.getXml(); 1620 info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">"); 1621 return info.toString(); 1622 } 1623 1624} | Popular Tags |