1 24 25 package org.objectweb.cjdbc.controller.requestmanager.distributed; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.Vector ; 30 31 import javax.management.NotCompliantMBeanException ; 32 33 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException; 34 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException; 35 import org.objectweb.cjdbc.common.i18n.Translate; 36 import org.objectweb.cjdbc.common.log.Trace; 37 import org.objectweb.cjdbc.common.shared.BackendInfo; 38 import org.objectweb.cjdbc.common.shared.BackendState; 39 import org.objectweb.cjdbc.common.sql.AbstractRequest; 40 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 41 import org.objectweb.cjdbc.common.sql.SelectRequest; 42 import org.objectweb.cjdbc.common.sql.StoredProcedure; 43 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 44 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache; 45 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 46 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 47 import org.objectweb.cjdbc.controller.recoverylog.RecoveryLog; 48 import org.objectweb.cjdbc.controller.requestmanager.RequestManager; 49 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 50 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 51 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 52 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase; 53 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 54 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage; 55 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DisableBackend; 56 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend; 57 import org.objectweb.tribe.adapters.MulticastRequestAdapter; 58 59 73 public abstract class DistributedRequestManager extends RequestManager 74 { 75 protected DistributedVirtualDatabase dvdb; 76 77 private Vector failedOnAllBackends; 78 79 private long controllerId; 80 81 private ArrayList distributedTransactions; 82 83 protected static final int NO_RESULT = -5; 85 86 102 public DistributedRequestManager(DistributedVirtualDatabase vdb, 103 AbstractScheduler scheduler, AbstractResultCache cache, 104 AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog, 105 long beginTimeout, long commitTimeout, long rollbackTimeout) 106 throws SQLException , NotCompliantMBeanException 107 { 108 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout, 109 commitTimeout, rollbackTimeout); 110 dvdb = vdb; 111 failedOnAllBackends = new Vector (); 112 distributedTransactions = new ArrayList (); 113 } 114 115 119 125 public static final long CONTROLLER_ID_BIT_MASK = 0xffff000000000000L; 126 130 public static final long TRANSACTION_ID_BIT_MASK = ~CONTROLLER_ID_BIT_MASK; 131 132 135 public static final int CONTROLLER_ID_SHIFT_BITS = 48; 136 137 140 public static final long CONTROLLER_ID_BITS = 0x000000000000ffffL; 141 142 147 public long getControllerId() 148 { 149 return controllerId; 150 } 151 152 162 public void setControllerId(long id) 163 { 164 if ((id & ~CONTROLLER_ID_BITS) != 0) 165 { 166 String msg = "Out of range controller id (" + id + ")"; 167 logger.error(msg); 168 throw new RuntimeException (msg); 169 } 170 this.controllerId = (id << CONTROLLER_ID_SHIFT_BITS) 171 & CONTROLLER_ID_BIT_MASK; 172 if (logger.isDebugEnabled()) 173 logger.debug("Setting controller identifier to " + id 174 + " (shifted value is " + controllerId + ")"); 175 } 176 177 182 public Trace getLogger() 183 { 184 return logger; 185 } 186 187 192 public VirtualDatabase getVirtualDatabase() 193 { 194 return dvdb; 195 } 196 197 200 public void setScheduler(AbstractScheduler scheduler) 201 { 202 super.setScheduler(scheduler); 203 if (vdb.getTotalOrderQueue() == null) 206 throw new RuntimeException ( 207 "New scheduler does not support total ordering and is not compatible with distributed virtual databases."); 208 } 209 210 214 224 public void enableBackend(DatabaseBackend db) throws SQLException 225 { 226 int size = dvdb.getAllMemberButUs().size(); 227 if (size > 0) 228 { 229 logger.debug(Translate 230 .get("virtualdatabase.distributed.enable.backend.check")); 231 232 try 233 { 234 dvdb.getMulticastRequestAdapter().multicastMessage( 237 dvdb.getAllMemberButUs(), new EnableBackend(new BackendInfo(db)), 238 MulticastRequestAdapter.WAIT_NONE, 239 CJDBCGroupMessage.defaultCastTimeOut); 240 } 241 catch (Exception e) 242 { 243 String msg = "Error while enabling backend " + db.getName(); 244 logger.error(msg, e); 245 throw new SQLException (msg + "(" + e + ")"); 246 } 247 } 248 249 super.enableBackend(db); 250 } 251 252 255 public void disableBackend(DatabaseBackend db) throws SQLException 256 { 257 int size = dvdb.getAllMemberButUs().size(); 258 if (size > 0) 259 { 260 logger.debug(Translate.get("virtualdatabase.distributed.disable.backend", 261 db.getName())); 262 263 try 264 { 265 dvdb.getMulticastRequestAdapter().multicastMessage( 268 dvdb.getAllMemberButUs(), new DisableBackend(new BackendInfo(db)), 269 MulticastRequestAdapter.WAIT_NONE, 270 CJDBCGroupMessage.defaultCastTimeOut); 271 } 272 catch (Exception e) 273 { 274 String msg = "Error while disabling backend " + db.getName(); 275 logger.error(msg, e); 276 throw new SQLException (msg + "(" + e + ")"); 277 } 278 } 279 280 super.disableBackend(db); 281 } 282 283 294 public void disableBackendForCheckpoint(DatabaseBackend db, 295 String checkpointName) throws SQLException 296 { 297 if (recoveryLog == null) 299 { 300 String msg = Translate.get("recovery.store.checkpoint.failed.cause.null", 301 checkpointName); 302 logger.error(msg); 303 throw new SQLException (msg); 304 } 305 306 try 308 { 309 dvdb.setGroupCheckpoint(checkpointName, dvdb.getAllMembers()); 310 } 311 catch (VirtualDatabaseException e) 312 { 313 String msg = "set group checkpoint failed"; 314 logger.error(msg, e); 315 throw new SQLException (msg); 316 } 317 318 db.setState(BackendState.DISABLING); 320 logger.info(Translate.get("backend.state.disabling", db.getName())); 321 322 db.waitForAllTransactionsToComplete(); 324 325 db.setLastKnownCheckpoint(checkpointName); 327 loadBalancer.disableBackend(db); 328 logger.info(Translate.get("backend.state.disabled", db.getName())); 329 330 } 331 332 338 public void addFailedOnAllBackends(AbstractRequest request) 339 { 340 failedOnAllBackends.add(request); 341 } 342 343 351 public void completeFailedOnAllBackends(AbstractRequest request, 352 boolean success) 353 { 354 if (!failedOnAllBackends.remove(request)) 355 { 356 logger.warn("Unable to find request " 357 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 358 + " in list of requests that failed on all backends."); 359 return; 360 } 361 if (success) 362 { logger 364 .error("Request " 365 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 366 + " failed on all local backends but succeeded on other controllers. Disabling all local backends."); 367 try 368 { 369 dvdb.disableAllBackends(); 370 } 371 catch (VirtualDatabaseException e) 372 { 373 logger.error("An error occured while disabling all backends", e); 374 } 375 } 376 else 377 scheduler.notifyWriteCompleted((AbstractWriteRequest) request); 380 } 381 382 391 public void removeFailedRequestFromRecoveryLog(AbstractWriteRequest request, 392 long recoveryLogId) 393 { 394 if (logger.isDebugEnabled()) 395 logger.debug("Request " 396 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 397 + " failed at all controllers, removing it from recovery log."); 398 399 request.setId(recoveryLogId); 403 recoveryLog.unlogRequest(request); 404 } 405 406 413 public void removeFailedStoredProcedureFromRecoveryLog(StoredProcedure proc) 414 { 415 if (logger.isDebugEnabled()) 416 logger.debug("Request " 417 + proc.getSQLShortForm(dvdb.getSQLShortFormLength()) 418 + " failed at all controllers, removing it from recovery log."); 419 420 recoveryLog.unlogRequest(proc); 421 } 422 423 430 public void removeFailedCommitFromRecoveryLog(TransactionMarkerMetaData tm) 431 { 432 if (logger.isDebugEnabled()) 433 logger 434 .debug("Transaction " 435 + tm.getTransactionId() 436 + " commit failed at all controllers, removing it from recovery log."); 437 438 recoveryLog.unlogCommit(tm); 439 } 440 441 448 public void removeFailedRollbackFromRecoveryLog(TransactionMarkerMetaData tm) 449 { 450 if (logger.isDebugEnabled()) 451 logger 452 .debug("Transaction " 453 + tm.getTransactionId() 454 + " rollback failed at all controllers, removing it from recovery log."); 455 456 recoveryLog.unlogRollback(tm); 457 } 458 459 463 466 public long begin(String login) throws SQLException 467 { 468 try 469 { 470 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0, 471 beginTimeout, login); 472 473 long tid = scheduler.begin(tm); 475 tid = tid & TRANSACTION_ID_BIT_MASK; 478 tid = tid | controllerId; 479 tm.setTransactionId(tid); 480 481 if (logger.isDebugEnabled()) 482 logger.debug(Translate.get("transaction.begin", String.valueOf(tid))); 483 484 try 485 { 486 loadBalancer.begin(tm); 488 } 489 catch (SQLException e) 490 { 491 throw e; 492 } 493 finally 494 { 495 scheduler.beginCompleted(tid); 497 } 498 499 tidLoginTable.put(new Long (tid), tm); 500 return tid; 501 } 502 catch (RuntimeException e) 503 { 504 logger.fatal(Translate.get( 505 "fatal.runtime.exception.requestmanager.begin", e)); 506 throw new SQLException (e.getMessage()); 507 } 508 } 509 510 517 public void lazyTransactionStart(AbstractRequest request) throws SQLException 518 { 519 if (!request.isAutoCommit()) 522 { 523 long tid = request.getTransactionId(); 524 if ((tid & CONTROLLER_ID_BIT_MASK) != controllerId) 525 { if (!tidLoginTable.containsKey(new Long (tid))) 527 { try 529 { 530 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0, 531 beginTimeout, request.getLogin()); 532 tm.setTransactionId(tid); 533 534 if (logger.isDebugEnabled()) 535 logger.debug(Translate.get("transaction.begin.lazy", String 536 .valueOf(tid))); 537 538 try 539 { 540 scheduler.begin(tm); 541 542 loadBalancer.begin(tm); 544 545 tidLoginTable.put(new Long (tid), tm); 548 if (recoveryLog != null) 549 logLazyTransactionBegin(tid); 550 } 551 catch (SQLException e) 552 { 553 if (recoveryLog != null) 554 tidLoginTable.remove(new Long (tid)); 556 throw e; 557 } 558 finally 559 { 560 scheduler.beginCompleted(tid); 562 } 563 } 564 catch (RuntimeException e) 565 { 566 logger.fatal(Translate.get( 567 "fatal.runtime.exception.requestmanager.begin", e)); 568 throw new SQLException (e.getMessage()); 569 } 570 } 571 } 572 } 573 } 574 575 579 public void commit(long transactionId, boolean logCommit) throws SQLException 580 { 581 Long lTid = new Long (transactionId); 582 boolean isAWriteTransaction; 583 synchronized (distributedTransactions) 584 { 585 isAWriteTransaction = distributedTransactions.remove(lTid); 586 } 587 if (isAWriteTransaction) 588 { 589 TransactionMarkerMetaData tm = getTransactionMarker(lTid); 590 distributedCommit(tm.getLogin(), transactionId); 591 } 592 else 593 super.commit(transactionId, logCommit); 595 } 596 597 601 public void rollback(long transactionId, boolean logRollback) 602 throws SQLException 603 { 604 Long lTid = new Long (transactionId); 605 boolean isAWriteTransaction; 606 synchronized (distributedTransactions) 607 { 608 isAWriteTransaction = distributedTransactions.remove(lTid); 609 } 610 if (isAWriteTransaction) 611 { 612 TransactionMarkerMetaData tm = getTransactionMarker(lTid); 613 distributedRollback(tm.getLogin(), transactionId); 614 } 615 else 616 super.rollback(transactionId, logRollback); 618 } 619 620 624 public void rollback(long transactionId, String savepointName) 625 throws SQLException 626 { 627 Long lTid = new Long (transactionId); 628 boolean isAWriteTransaction; 629 synchronized (distributedTransactions) 630 { 631 isAWriteTransaction = distributedTransactions.contains(lTid); 632 } 633 if (isAWriteTransaction) 634 distributedRollback(transactionId, savepointName); 635 else 636 super.rollback(transactionId, savepointName); 638 } 639 640 643 public int setSavepoint(long transactionId) throws SQLException 644 { 645 Long lTid = new Long (transactionId); 646 boolean isAWriteTransaction; 647 synchronized (distributedTransactions) 648 { 649 isAWriteTransaction = distributedTransactions.contains(lTid); 650 } 651 if (isAWriteTransaction) 652 { 653 int savepointId = scheduler.incrementSavepointId(); 654 distributedSetSavepoint(transactionId, String.valueOf(savepointId)); 655 return savepointId; 656 } 657 else 658 return super.setSavepoint(transactionId); 660 } 661 662 666 public void setSavepoint(long transactionId, String name) throws SQLException 667 { 668 Long lTid = new Long (transactionId); 669 boolean isAWriteTransaction; 670 synchronized (distributedTransactions) 671 { 672 isAWriteTransaction = distributedTransactions.contains(lTid); 673 } 674 if (isAWriteTransaction) 675 distributedSetSavepoint(transactionId, name); 676 else 677 super.setSavepoint(transactionId, name); 679 } 680 681 685 public void releaseSavepoint(long transactionId, String name) 686 throws SQLException 687 { 688 Long lTid = new Long (transactionId); 689 boolean isAWriteTransaction; 690 synchronized (distributedTransactions) 691 { 692 isAWriteTransaction = distributedTransactions.contains(lTid); 693 } 694 if (isAWriteTransaction) 695 distributedReleaseSavepoint(transactionId, name); 696 else 697 super.releaseSavepoint(transactionId, name); 699 } 700 701 704 public void scheduleExecWriteRequest(AbstractWriteRequest request) 705 throws SQLException 706 { 707 lazyTransactionStart(request); 708 super.scheduleExecWriteRequest(request); 709 } 710 711 714 public ControllerResultSet execReadRequest(SelectRequest request) 715 throws SQLException 716 { 717 try 718 { 719 return execLocalReadRequest(request); 720 } 721 catch (NoMoreBackendException ignored) { 723 return execRemoteReadRequest(request); 728 } 729 } 730 731 739 public abstract ControllerResultSet execRemoteReadRequest( 740 SelectRequest request) throws SQLException ; 741 742 745 public int execWriteRequest(AbstractWriteRequest request) throws SQLException 746 { 747 if (!request.isAutoCommit()) 748 { Long lTid = new Long (request.getTransactionId()); 750 synchronized (distributedTransactions) 751 { 752 if (!distributedTransactions.contains(lTid)) 753 distributedTransactions.add(lTid); 754 } 755 } 756 return execDistributedWriteRequest(request); 757 } 758 759 762 public ControllerResultSet execWriteRequestWithKeys( 763 AbstractWriteRequest request) throws SQLException 764 { 765 if (!request.isAutoCommit()) 766 { Long lTid = new Long (request.getTransactionId()); 768 synchronized (distributedTransactions) 769 { 770 if (!distributedTransactions.contains(lTid)) 771 distributedTransactions.add(lTid); 772 } 773 } 774 return execDistributedWriteRequestWithKeys(request); 775 } 776 777 780 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc) 781 throws SQLException 782 { 783 if (proc.isReadOnly()) 785 { 786 try 787 { 788 return execReadStoredProcedureLocally(proc); 789 } 790 catch (AllBackendsFailedException ignore) 791 { 792 } 794 } 795 796 if (!proc.isAutoCommit()) 797 { Long lTid = new Long (proc.getTransactionId()); 799 synchronized (distributedTransactions) 800 { 801 if (!distributedTransactions.contains(lTid)) 802 distributedTransactions.add(lTid); 803 } 804 } 805 return execDistributedReadStoredProcedure(proc); 806 } 807 808 811 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 812 { 813 if (!proc.isAutoCommit()) 814 { Long lTid = new Long (proc.getTransactionId()); 816 synchronized (distributedTransactions) 817 { 818 if (!distributedTransactions.contains(lTid)) 819 distributedTransactions.add(lTid); 820 } 821 } 822 return execDistributedWriteStoredProcedure(proc); 823 } 824 825 829 836 public abstract void distributedCommit(String login, long transactionId) 837 throws SQLException ; 838 839 846 public abstract void distributedRollback(String login, long transactionId) 847 throws SQLException ; 848 849 856 public abstract void distributedRollback(long transactionId, 857 String savepointName) throws SQLException ; 858 859 866 public abstract void distributedSetSavepoint(long transactionId, String name) 867 throws SQLException ; 868 869 876 public abstract void distributedReleaseSavepoint(long transactionId, 877 String name) throws SQLException ; 878 879 886 public abstract int execDistributedWriteRequest(AbstractWriteRequest request) 887 throws SQLException ; 888 889 897 public abstract ControllerResultSet execDistributedWriteRequestWithKeys( 898 AbstractWriteRequest request) throws SQLException ; 899 900 907 public abstract ControllerResultSet execDistributedReadStoredProcedure( 908 StoredProcedure proc) throws SQLException ; 909 910 917 public abstract int execDistributedWriteStoredProcedure(StoredProcedure proc) 918 throws SQLException ; 919 920 930 public ControllerResultSet execReadStoredProcedureLocally(StoredProcedure proc) 931 throws AllBackendsFailedException, SQLException 932 { 933 return super.execReadStoredProcedure(proc); 934 } 935 936 946 public int execDistributedWriteStoredProcedureLocally(StoredProcedure proc) 947 throws AllBackendsFailedException, SQLException 948 { 949 return super.execWriteStoredProcedure(proc); 950 } 951 952 962 public ControllerResultSet execLocalReadRequest(SelectRequest request) 963 throws NoMoreBackendException, SQLException 964 { 965 return super.execReadRequest(request); 966 } 967 968 } | Popular Tags |