| 1 24 25 package org.objectweb.cjdbc.controller.loadbalancer.raidb2; 26 27 import java.sql.Connection ; 28 import java.sql.SQLException ; 29 import java.util.ArrayList ; 30 import java.util.ConcurrentModificationException ; 31 import java.util.Iterator ; 32 33 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 34 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException; 35 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory; 36 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 37 import org.objectweb.cjdbc.common.i18n.Translate; 38 import org.objectweb.cjdbc.common.log.Trace; 39 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 40 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 41 import org.objectweb.cjdbc.common.sql.SelectRequest; 42 import org.objectweb.cjdbc.common.sql.StoredProcedure; 43 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 44 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 45 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 46 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 47 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 48 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 49 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 50 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 51 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 52 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException; 53 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy; 54 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule; 55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 57 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 58 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask; 59 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask; 60 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 61 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask; 62 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask; 63 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask; 64 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask; 65 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask; 66 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 67 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 68 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 69 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit; 71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReleaseSavepoint; 72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback; 73 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.RollbackToSavepoint; 74 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetSavepoint; 75 76 89 public abstract class RAIDb2 extends AbstractLoadBalancer 90 { 91 100 protected ArrayList backendBlockingThreads; 101 protected ArrayList backendNonBlockingThreads; 102 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 103 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 104 protected WaitForCompletionPolicy waitForCompletionPolicy; 106 protected CreateTablePolicy createTablePolicy; 107 108 protected static Trace logger = Trace 109 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2"); 110 111 114 115 126 public RAIDb2(VirtualDatabase vdb, 127 WaitForCompletionPolicy waitForCompletionPolicy, 128 CreateTablePolicy createTablePolicy) throws Exception  129 { 130 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE); 131 132 this.waitForCompletionPolicy = waitForCompletionPolicy; 133 backendBlockingThreads = new ArrayList (); 134 backendNonBlockingThreads = new ArrayList (); 135 this.createTablePolicy = createTablePolicy; 136 } 137 138 141 142 149 private int getNbToWait(int nbOfThreads) 150 { 151 int nbToWait; 152 switch (waitForCompletionPolicy.getPolicy()) 153 { 154 case WaitForCompletionPolicy.FIRST : 155 nbToWait = 1; 156 break; 157 case WaitForCompletionPolicy.MAJORITY : 158 nbToWait = nbOfThreads / 2 + 1; 159 break; 160 case WaitForCompletionPolicy.ALL : 161 nbToWait = nbOfThreads; 162 break; 163 default : 164 logger 165 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 166 nbToWait = nbOfThreads; 167 break; 168 } 169 return nbToWait; 170 } 171 172 182 public int execWriteRequest(AbstractWriteRequest request) 183 throws AllBackendsFailedException, SQLException  184 { 185 return ((WriteRequestTask) execWriteRequest(request, false, null)) 186 .getResult(); 187 } 188 189 199 public ControllerResultSet execWriteRequestWithKeys( 200 AbstractWriteRequest request, MetadataCache metadataCache) 201 throws AllBackendsFailedException, SQLException  202 { 203 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 204 metadataCache)).getResult(); 205 } 206 207 222 private AbstractTask execWriteRequest(AbstractWriteRequest request, 223 boolean useKeys, MetadataCache metadataCache) 224 throws AllBackendsFailedException, SQLException  225 { 226 ArrayList backendThreads; 227 ReadPrioritaryFIFOWriteLock lock; 228 229 boolean canTakeReadLock = waitForTotalOrder(request, true); 234 235 handleMacros(request); 237 238 if (request.mightBlock()) 240 { backendThreads = backendBlockingThreads; 242 lock = backendBlockingThreadsRWLock; 243 } 244 else 245 { backendThreads = backendNonBlockingThreads; 247 lock = backendNonBlockingThreadsRWLock; 248 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 249 && (request.getTransactionId() != 0)) 250 waitForAllWritesToComplete(request.getTransactionId()); 251 } 252 253 try 254 { 255 if (canTakeReadLock) 256 lock.acquireRead(); 257 else 258 lock.acquireWrite(); 259 } 260 catch (InterruptedException e) 261 { 262 String msg = Translate.get( 263 "loadbalancer.backendlist.acquire.writelock.failed", e); 264 logger.error(msg); 265 throw new SQLException (msg); 266 } 267 268 int nbOfThreads = backendThreads.size(); 269 ArrayList writeList = new ArrayList (); 270 String tableName = request.getTableName(); 271 272 if (request.isCreate()) 273 { CreateTableRule rule = createTablePolicy.getTableRule(request 275 .getTableName()); 276 if (rule == null) 277 rule = createTablePolicy.getDefaultRule(); 278 279 ArrayList chosen; 281 try 282 { 283 chosen = rule.getBackends(vdb.getBackends()); 284 } 285 catch (CreateTableException e) 286 { 287 throw new SQLException (Translate.get( 288 "loadbalancer.create.table.rule.failed", e.getMessage())); 289 } 290 291 if (chosen != null) 293 { 294 for (int i = 0; i < nbOfThreads; i++) 295 { 296 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 297 .get(i); 298 if (chosen.contains(thread.getBackend())) 299 writeList.add(thread); 300 } 301 } 302 } 303 else 304 { for (int i = 0; i < nbOfThreads; i++) 306 { 307 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 308 .get(i); 309 if (thread.getBackend().hasTable(tableName)) 310 writeList.add(thread); 311 } 312 } 313 314 nbOfThreads = writeList.size(); 315 if (nbOfThreads == 0) 316 { 317 if (canTakeReadLock) 318 lock.releaseRead(); 319 else 320 lock.releaseWrite(); 321 322 String msg = Translate.get("loadbalancer.execute.no.backend.found", 323 request.getSQLShortForm(vdb.getSQLShortFormLength())); 324 logger.warn(msg); 325 326 removeHeadFromAndNotifyTotalOrderQueue(); 328 throw new SQLException (msg); 329 } 330 else 331 { 332 if (logger.isDebugEnabled()) 333 logger.debug(Translate.get("loadbalancer.execute.on.several", 334 new String []{String.valueOf(request.getId()), 335 String.valueOf(nbOfThreads)})); 336 } 337 338 AbstractTask task; 340 if (useKeys) 341 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 342 nbOfThreads, request, metadataCache); 343 else 344 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 345 request); 346 347 352 if (request.isAutoCommit()) 354 { 355 for (int i = 0; i < nbOfThreads; i++) 356 { 357 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i); 358 synchronized (thread) 359 { 360 thread.addTask(task); 361 } 362 } 363 } 364 else 365 { 366 for (int i = 0; i < nbOfThreads; i++) 367 { 368 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i); 369 synchronized (thread) 370 { 371 thread.addTask(task, request.getTransactionId()); 372 } 373 } 374 } 375 376 for (int i = 0; i < nbOfThreads; i++) 378 { 379 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i); 380 synchronized (thread) 381 { 382 thread.notify(); 383 } 384 } 385 386 if (canTakeReadLock) 388 lock.releaseRead(); 389 else 390 lock.releaseWrite(); 391 392 removeHeadFromAndNotifyTotalOrderQueue(); 394 395 synchronized (task) 396 { 397 if (!task.hasCompleted()) 398 { 399 try 401 { 402 long timeout = request.getTimeout() * 1000; 404 if (timeout > 0) 405 { 406 long start = System.currentTimeMillis(); 407 task.wait(timeout); 408 long end = System.currentTimeMillis(); 409 long remaining = timeout - (end - start); 410 if (remaining <= 0) 411 { 412 if (task.setExpiredTimeout()) 413 { String msg = Translate.get("loadbalancer.request.timeout", 415 new String []{String.valueOf(request.getId()), 416 String.valueOf(task.getSuccess()), 417 String.valueOf(task.getFailed())}); 418 logger.warn(msg); 419 throw new SQLException (msg); 420 } 421 } 423 } 425 else 426 task.wait(); 427 } 428 catch (InterruptedException e) 429 { 430 if (task.setExpiredTimeout()) 431 { String msg = Translate.get("loadbalancer.request.timeout", 433 new String []{String.valueOf(request.getId()), 434 String.valueOf(task.getSuccess()), 435 String.valueOf(task.getFailed())}); 436 logger.warn(msg); 437 throw new SQLException (msg); 438 } 439 } 441 } 442 443 if (task.getSuccess() > 0) 444 return task; 445 else 446 { ArrayList exceptions = task.getExceptions(); 448 if (exceptions == null) 449 throw new AllBackendsFailedException(Translate.get( 450 "loadbalancer.request.failed.all", request.getId())); 451 else 452 { 453 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 454 request.getId()) 455 + "\n"; 456 for (int i = 0; i < exceptions.size(); i++) 457 errorMsg += ((SQLException ) exceptions.get(i)).getMessage() + "\n"; 458 logger.error(errorMsg); 459 throw new SQLException (errorMsg); 460 } 461 } 462 } 463 } 464 465 473 public abstract ControllerResultSet execReadRequest(SelectRequest request, 474 MetadataCache metadataCache) throws SQLException ; 475 476 485 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 486 DatabaseBackend backend, MetadataCache metadataCache) 487 throws SQLException , UnreachableBackendException 488 { 489 handleMacros(request); 491 492 AbstractConnectionManager cm = backend.getConnectionManager(request 494 .getLogin()); 495 496 if (cm == null) 498 { 499 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 500 new String []{request.getLogin(), backend.getName()}); 501 logger.error(msg); 502 throw new SQLException (msg); 503 } 504 505 if (request.isAutoCommit()) 507 { 508 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 509 waitForAllWritesToComplete(backend); 513 514 ControllerResultSet rs = null; 515 boolean badConnection; 516 do 517 { 518 badConnection = false; 519 Connection c = null; 521 try 522 { 523 c = cm.getConnection(); 524 } 525 catch (UnreachableBackendException e1) 526 { 527 logger.error(Translate.get( 528 "loadbalancer.backend.disabling.unreachable", backend.getName())); 529 disableBackend(backend); 530 throw new UnreachableBackendException(Translate.get( 531 "loadbalancer.backend.unreacheable", backend.getName())); 532 } 533 534 if (c == null) 536 throw new UnreachableBackendException( 537 "No more connections on backend " + backend.getName()); 538 539 try 541 { 542 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 543 cm.releaseConnection(c); 544 } 545 catch (SQLException e) 546 { 547 cm.releaseConnection(c); 548 throw new SQLException (Translate.get( 549 "loadbalancer.request.failed.on.backend", new String []{ 550 request.getSQLShortForm(vdb.getSQLShortFormLength()), 551 backend.getName(), e.getMessage()})); 552 } 553 catch (BadConnectionException e) 554 { cm.deleteConnection(c); 556 badConnection = true; 557 } 558 } 559 while (badConnection); 560 if (logger.isDebugEnabled()) 561 logger.debug(Translate.get("loadbalancer.execute.on", new String []{ 562 String.valueOf(request.getId()), backend.getName()})); 563 return rs; 564 } 565 else 566 { Connection c; 568 long tid = request.getTransactionId(); 569 Long lTid = new Long (tid); 570 571 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 573 waitForAllWritesToComplete(backend, request.getTransactionId()); 574 575 try 576 { 577 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 578 request.getTransactionIsolation()); 579 } 580 catch (UnreachableBackendException e1) 581 { 582 logger.error(Translate.get( 583 "loadbalancer.backend.disabling.unreachable", backend.getName())); 584 disableBackend(backend); 585 throw new SQLException (Translate.get( 586 "loadbalancer.backend.unreacheable", backend.getName())); 587 } 588 catch (NoTransactionStartWhenDisablingException e) 589 { 590 String msg = Translate.get("loadbalancer.backend.is.disabling", 591 new String []{request.getSQLShortForm(vdb.getSQLShortFormLength()), 592 backend.getName()}); 593 logger.error(msg); 594 throw new SQLException (msg); 595 } 596 597 if (c == null) 599 throw new SQLException (Translate.get( 600 "loadbalancer.unable.retrieve.connection", new String []{ 601 String.valueOf(tid), backend.getName()})); 602 603 ControllerResultSet rs = null; 605 try 606 { 607 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 608 } 609 catch (SQLException e) 610 { 611 throw new SQLException (Translate.get( 612 "loadbalancer.request.failed.on.backend", new String []{ 613 request.getSQLShortForm(vdb.getSQLShortFormLength()), 614 backend.getName(), e.getMessage()})); 615 } 616 catch (BadConnectionException e) 617 { cm.deleteConnection(tid); 620 String msg = Translate.get( 621 "loadbalancer.backend.disabling.connection.failure", backend 622 .getName()); 623 logger.error(msg); 624 disableBackend(backend); 625 throw new SQLException (msg); 626 } 627 if (logger.isDebugEnabled()) 628 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 629 new String []{String.valueOf(tid), String.valueOf(request.getId()), 630 backend.getName()})); 631 return rs; 632 } 633 } 634 635 644 protected ControllerResultSet executeStoredProcedureOnBackend( 645 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) 646 throws SQLException , UnreachableBackendException 647 { 648 AbstractConnectionManager cm = backend 650 .getConnectionManager(proc.getLogin()); 651 652 if (cm == null) 654 { 655 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 656 new String []{proc.getLogin(), backend.getName()}); 657 logger.error(msg); 658 throw new SQLException (msg); 659 } 660 661 if (proc.isAutoCommit()) 663 { 664 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 665 waitForAllWritesToComplete(backend); 669 670 Connection c = null; 672 try 673 { 674 c = cm.getConnection(); 675 } 676 catch (UnreachableBackendException e1) 677 { 678 logger.error(Translate.get( 679 "loadbalancer.backend.disabling.unreachable", backend.getName())); 680 disableBackend(backend); 681 throw new UnreachableBackendException(Translate.get( 682 "loadbalancer.backend.unreacheable", backend.getName())); 683 } 684 685 if (c == null) 687 throw new SQLException (Translate.get( 688 "loadbalancer.backend.no.connection", backend.getName())); 689 690 ControllerResultSet rs = null; 692 try 693 { 694 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 695 backend, c, metadataCache); 696 } 697 catch (Exception e) 698 { 699 throw new SQLException (Translate.get( 700 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 701 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 702 backend.getName(), e.getMessage()})); 703 } 704 finally 705 { 706 cm.releaseConnection(c); 707 } 708 if (logger.isDebugEnabled()) 709 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 710 new String []{String.valueOf(proc.getId()), backend.getName()})); 711 return rs; 712 } 713 else 714 { Connection c; 716 long tid = proc.getTransactionId(); 717 Long lTid = new Long (tid); 718 719 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 721 waitForAllWritesToComplete(backend, proc.getTransactionId()); 722 723 try 724 { 725 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm, 726 proc.getTransactionIsolation()); 727 } 728 catch (UnreachableBackendException e1) 729 { 730 logger.error(Translate.get( 731 "loadbalancer.backend.disabling.unreachable", backend.getName())); 732 disableBackend(backend); 733 throw new SQLException (Translate.get( 734 "loadbalancer.backend.unreacheable", backend.getName())); 735 } 736 catch (NoTransactionStartWhenDisablingException e) 737 { 738 String msg = Translate.get("loadbalancer.backend.is.disabling", 739 new String []{proc.getSQLShortForm(vdb.getSQLShortFormLength()), 740 backend.getName()}); 741 logger.error(msg); 742 throw new SQLException (msg); 743 } 744 745 if (c == null) 747 throw new SQLException (Translate.get( 748 "loadbalancer.unable.retrieve.connection", new String []{ 749 String.valueOf(tid), backend.getName()})); 750 751 ControllerResultSet rs; 753 try 754 { 755 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 756 backend, c, metadataCache); 757 } 758 catch (Exception e) 759 { 760 throw new SQLException (Translate.get( 761 "loadbalancer.storedprocedure.failed.on.backend", new String []{ 762 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 763 backend.getName(), e.getMessage()})); 764 } 765 if (logger.isDebugEnabled()) 766 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 767 new String []{String.valueOf(tid), String.valueOf(proc.getId()), 768 backend.getName()})); 769 return rs; 770 } 771 } 772 773 777 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 778 MetadataCache metadataCache) throws SQLException  779 { 780 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 781 proc, true, metadataCache); 782 return task.getResult(); 783 } 784 785 788 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException  789 { 790 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 791 proc, false, null); 792 return task.getResult(); 793 } 794 795 807 private AbstractTask callStoredProcedure(StoredProcedure proc, 808 boolean isRead, MetadataCache metadataCache) throws SQLException  809 { 810 ArrayList backendThreads = backendBlockingThreads; 811 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 812 813 |