1 25 26 package org.continuent.sequoia.controller.requestmanager.distributed; 27 28 import java.io.Serializable ; 29 import java.sql.SQLException ; 30 import java.util.ArrayList ; 31 import java.util.HashMap ; 32 import java.util.Iterator ; 33 import java.util.LinkedList ; 34 import java.util.List ; 35 36 import javax.management.NotCompliantMBeanException ; 37 38 import org.continuent.hedera.adapters.MulticastRequestAdapter; 39 import org.continuent.hedera.adapters.MulticastResponse; 40 import org.continuent.hedera.common.Member; 41 import org.continuent.sequoia.common.exceptions.NoMoreBackendException; 42 import org.continuent.sequoia.common.exceptions.NoResultAvailableException; 43 import org.continuent.sequoia.common.exceptions.VirtualDatabaseException; 44 import org.continuent.sequoia.common.i18n.Translate; 45 import org.continuent.sequoia.common.jmx.management.BackendInfo; 46 import org.continuent.sequoia.common.log.Trace; 47 import org.continuent.sequoia.controller.backend.DatabaseBackend; 48 import org.continuent.sequoia.controller.backend.result.ControllerResultSet; 49 import org.continuent.sequoia.controller.backend.result.ExecuteResult; 50 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult; 51 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult; 52 import org.continuent.sequoia.controller.cache.result.AbstractResultCache; 53 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer; 54 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException; 55 import org.continuent.sequoia.controller.recoverylog.RecoveryLog; 56 import org.continuent.sequoia.controller.requestmanager.RequestManager; 57 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 58 import org.continuent.sequoia.controller.requests.AbstractRequest; 59 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 60 import org.continuent.sequoia.controller.requests.SelectRequest; 61 import org.continuent.sequoia.controller.requests.StoredProcedure; 62 import org.continuent.sequoia.controller.requests.UnknownWriteRequest; 63 import org.continuent.sequoia.controller.scheduler.AbstractScheduler; 64 import org.continuent.sequoia.controller.semantic.SemanticBehavior; 65 import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase; 66 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 67 import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint; 68 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection; 69 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit; 70 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection; 71 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint; 72 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback; 73 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint; 74 import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForPersistentConnection; 75 import org.continuent.sequoia.controller.virtualdatabase.protocol.FailoverForTransaction; 76 import org.continuent.sequoia.controller.virtualdatabase.protocol.GetRequestResultFromFailoverCache; 77 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyCompletion; 78 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyDisableBackend; 79 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyEnableBackend; 80 import org.continuent.sequoia.controller.virtualdatabase.protocol.NotifyInconsistency; 81 import org.continuent.sequoia.controller.virtualdatabase.protocol.ResumeActivity; 82 import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendActivity; 83 84 99 public abstract class DistributedRequestManager extends RequestManager 100 { 101 protected DistributedVirtualDatabase dvdb; 102 107 private HashMap failedOnAllBackends; 108 109 private long controllerId; 110 111 protected LinkedList distributedTransactions; 112 113 116 public static final Integer SUCCESSFUL_COMPLETION = new Integer (-1); 117 118 134 public DistributedRequestManager(DistributedVirtualDatabase vdb, 135 AbstractScheduler scheduler, AbstractResultCache cache, 136 AbstractLoadBalancer loadBalancer, RecoveryLog recoveryLog, 137 long beginTimeout, long commitTimeout, long rollbackTimeout) 138 throws SQLException , NotCompliantMBeanException 139 { 140 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout, 141 commitTimeout, rollbackTimeout); 142 dvdb = vdb; 143 failedOnAllBackends = new HashMap (); 144 distributedTransactions = new LinkedList (); 145 } 146 147 151 157 public static final long CONTROLLER_ID_BIT_MASK = 0xffff000000000000L; 158 162 public static final long TRANSACTION_ID_BIT_MASK = ~CONTROLLER_ID_BIT_MASK; 163 164 167 public static final int CONTROLLER_ID_SHIFT_BITS = 48; 168 169 172 public static final long CONTROLLER_ID_BITS = 0x000000000000ffffL; 173 174 179 public long getControllerId() 180 { 181 return controllerId; 182 } 183 184 194 public void setControllerId(long id) 195 { 196 if ((id & ~CONTROLLER_ID_BITS) != 0) 197 { 198 String msg = "Out of range controller id (" + id + ")"; 199 logger.error(msg); 200 throw new RuntimeException (msg); 201 } 202 this.controllerId = (id << CONTROLLER_ID_SHIFT_BITS) 203 & CONTROLLER_ID_BIT_MASK; 204 if (logger.isDebugEnabled()) 205 logger.debug("Setting controller identifier to " + id 206 + " (shifted value is " + controllerId + ")"); 207 208 scheduler.setControllerId(controllerId); 209 } 210 211 217 public long getNextConnectionId(long id) 218 { 219 id = id & TRANSACTION_ID_BIT_MASK; 222 id = id | controllerId; 223 return id; 224 } 225 226 229 public long getNextRequestId() 230 { 231 233 long id = super.getNextRequestId(); 234 id = id & TRANSACTION_ID_BIT_MASK; 237 id = id | controllerId; 238 return id; 239 } 240 241 246 public Trace getLogger() 247 { 248 return logger; 249 } 250 251 256 public VirtualDatabase getVirtualDatabase() 257 { 258 return dvdb; 259 } 260 261 264 public void setScheduler(AbstractScheduler scheduler) 265 { 266 super.setScheduler(scheduler); 267 if (vdb.getTotalOrderQueue() == null) 270 throw new RuntimeException ( 271 "New scheduler does not support total ordering and is not compatible with distributed virtual databases."); 272 } 273 274 278 288 public void enableBackend(DatabaseBackend db) throws SQLException 289 { 290 int size = dvdb.getAllMemberButUs().size(); 291 if (size > 0) 292 { 293 logger.debug(Translate 294 .get("virtualdatabase.distributed.enable.backend.check")); 295 296 try 297 { 298 dvdb.getMulticastRequestAdapter().multicastMessage( 301 dvdb.getAllMemberButUs(), 302 new NotifyEnableBackend(new BackendInfo(db)), 303 MulticastRequestAdapter.WAIT_NONE, 304 dvdb.getMessageTimeouts().getEnableBackendTimeout()); 305 } 306 catch (Exception e) 307 { 308 String msg = "Error while enabling backend " + db.getName(); 309 logger.error(msg, e); 310 throw new SQLException (msg + "(" + e + ")"); 311 } 312 } 313 314 super.enableBackend(db); 315 } 316 317 321 public void disableBackend(DatabaseBackend db, boolean forceDisable) 322 throws SQLException 323 { 324 int size = dvdb.getAllMemberButUs().size(); 325 if (size > 0) 326 { 327 logger.debug(Translate.get("virtualdatabase.distributed.disable.backend", 328 db.getName())); 329 330 try 331 { 332 dvdb.getMulticastRequestAdapter().multicastMessage( 335 dvdb.getAllMemberButUs(), 336 new NotifyDisableBackend(new BackendInfo(db)), 337 MulticastRequestAdapter.WAIT_NONE, 338 dvdb.getMessageTimeouts().getDisableBackendTimeout()); 339 } 340 catch (Exception e) 341 { 342 String msg = "Error while disabling backend " + db.getName(); 343 logger.error(msg, e); 344 throw new SQLException (msg + "(" + e + ")"); 345 } 346 } 347 348 super.disableBackend(db, forceDisable); 349 } 350 351 357 public void disableBackendsWithCheckpoint(ArrayList backendInfos, 358 String checkpointName) throws SQLException 359 { 360 try 363 { 364 suspendActivity(); 366 367 dvdb.getMulticastRequestAdapter().multicastMessage(dvdb.getAllMembers(), 368 new DisableBackendsAndSetCheckpoint(backendInfos, checkpointName), 369 MulticastRequestAdapter.WAIT_ALL, 370 dvdb.getMessageTimeouts().getDisableBackendTimeout()); 371 } 372 catch (Exception e) 373 { 374 String msg = "Error while disabling backends " + backendInfos; 375 logger.error(msg, e); 376 throw new SQLException (msg + "(" + e + ")"); 377 } 378 finally 379 { 380 resumeActivity(); 381 } 382 } 383 384 388 private class FailureInformation 389 { 390 private boolean needSchedulerNotification; 391 private long logId = -1; 392 private boolean success; 393 private boolean disableBackendOnSuccess; 394 private int updateCount; 395 396 404 public FailureInformation(boolean needSchedulerNotification, long logId) 405 { 406 this.needSchedulerNotification = needSchedulerNotification; 407 this.logId = logId; 408 } 409 410 423 public FailureInformation(boolean success, boolean disableBackendOnSuccess, 424 int updateCount) 425 { 426 this.success = success; 427 this.disableBackendOnSuccess = disableBackendOnSuccess; 428 this.updateCount = updateCount; 429 } 430 431 436 public final long getLogId() 437 { 438 return logId; 439 } 440 441 446 public void setLogId(long logId) 447 { 448 this.logId = logId; 449 } 450 451 456 public final boolean needSchedulerNotification() 457 { 458 return needSchedulerNotification; 459 } 460 461 467 public void setNeedSchedulerNotification(boolean needSchedulerNotification) 468 { 469 this.needSchedulerNotification = needSchedulerNotification; 470 } 471 472 477 public boolean isDisableBackendOnSuccess() 478 { 479 return disableBackendOnSuccess; 480 } 481 482 489 public boolean isSuccess() 490 { 491 return success; 492 } 493 494 500 public int getUpdateCount() 501 { 502 return updateCount; 503 } 504 505 } 506 507 private void logRequestCompletionAndNotifyScheduler(AbstractRequest request, 508 boolean success, FailureInformation failureInfo, int updateCount) 509 { 510 if (recoveryLog != null) 512 { 513 boolean mustLog = !request.isReadOnly(); 514 if (request instanceof StoredProcedure) 515 { 516 SemanticBehavior semantic = ((StoredProcedure) request).getSemantic(); 517 mustLog = (semantic == null) || !semantic.isReadOnly(); 518 } 519 if (mustLog && failureInfo.getLogId() != 0) 520 recoveryLog.logRequestCompletion(failureInfo.getLogId(), success, 521 request.getExecTimeInMs(), updateCount); 522 } 523 524 if (failureInfo.needSchedulerNotification()) 525 { 526 try 527 { 528 if (request instanceof StoredProcedure) 531 scheduler.storedProcedureCompleted((StoredProcedure) request); 532 else if (!request.isAutoCommit() 533 && (request instanceof UnknownWriteRequest)) 534 { 535 String sql = request.getSqlOrTemplate(); 536 TransactionMetaData tm = new TransactionMetaData(request 537 .getTransactionId(), 0, request.getLogin(), request 538 .isPersistentConnection(), request.getPersistentConnectionId()); 539 if ("commit".equals(sql)) 540 scheduler.commitCompleted(tm, success); 541 else if ("rollback".equals(sql) || "abort".equals(sql)) 542 scheduler.rollbackCompleted(tm, success); 543 else if (sql.startsWith("rollback")) scheduler.savepointCompleted(tm.getTransactionId()); 545 else if (sql.startsWith("release ")) 546 scheduler.savepointCompleted(tm.getTransactionId()); 547 else if (sql.startsWith("savepoint ")) 548 scheduler.savepointCompleted(tm.getTransactionId()); 549 else 550 scheduler.writeCompleted((AbstractWriteRequest) request); 552 } 553 else 554 scheduler.writeCompleted((AbstractWriteRequest) request); 556 } 557 catch (SQLException e) 558 { 559 logger.warn("Failed to notify scheduler for request " + request, e); 560 } 561 } 562 } 563 564 572 public void addFailedOnAllBackends(AbstractRequest request, 573 boolean needSchedulerNotification) 574 { 575 synchronized (failedOnAllBackends) 576 { 577 582 FailureInformation failureInfo = (FailureInformation) failedOnAllBackends 583 .get(request); 584 if (failureInfo == null) 585 failedOnAllBackends.put(request, new FailureInformation( 586 needSchedulerNotification, request.getLogId())); 587 else 588 { 589 failureInfo.setLogId(request.getLogId()); 590 failureInfo.setNeedSchedulerNotification(needSchedulerNotification); 591 completeFailedOnAllBackends(request, failureInfo.isSuccess(), 592 failureInfo.isDisableBackendOnSuccess(), failureInfo.updateCount); 593 } 594 } 595 } 596 597 607 public void cleanupAllFailedQueriesFromController(long failedControllerId) 608 { 609 synchronized (failedOnAllBackends) 610 { 611 for (Iterator iter = failedOnAllBackends.keySet().iterator(); iter 612 .hasNext();) 613 { 614 AbstractRequest request = (AbstractRequest) iter.next(); 615 if (((request.getId() & CONTROLLER_ID_BIT_MASK) == failedControllerId) 616 || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId) 617 || (request.isPersistentConnection() && (request 618 .getPersistentConnectionId() & CONTROLLER_ID_BIT_MASK) == failedControllerId)) 619 { FailureInformation failureInfo = (FailureInformation) failedOnAllBackends 621 .get(request); 622 if (failureInfo.getLogId() > 0) 626 { 627 if (logger.isInfoEnabled()) 628 logger.info("No status information received for request " 629 + request + ", considering status as failed."); 630 631 boolean isAbortOrRollback = (request instanceof UnknownWriteRequest) 635 && ("rollback".equals(request.getSqlOrTemplate()) || "abort" 636 .equals(request.getSqlOrTemplate())); 637 638 logRequestCompletionAndNotifyScheduler(request, isAbortOrRollback, 639 failureInfo, -1); 640 } 641 iter.remove(); 642 } 643 } 644 } 645 } 646 647 660 public void completeFailedOnAllBackends(AbstractRequest request, 661 boolean success, boolean disableBackendOnSuccess, int updateCount) 662 { 663 FailureInformation failureInfo; 664 synchronized (failedOnAllBackends) 665 { 666 failureInfo = (FailureInformation) failedOnAllBackends.remove(request); 667 if (failureInfo == null) 668 { 669 674 failureInfo = new FailureInformation(success, disableBackendOnSuccess, 675 updateCount); 676 failedOnAllBackends.put(request, failureInfo); 677 678 logger.info("Unable to find request " 679 + request.getSqlShortForm(dvdb.getSqlShortFormLength()) 680 + " in list of requests that failed on all backends."); 681 return; 682 } 683 } 684 logRequestCompletionAndNotifyScheduler(request, success, failureInfo, 685 updateCount); 686 687 if (disableBackendOnSuccess && success) 688 { 689 691 logger 692 .error("Request " 693 + request.getSqlShortForm(dvdb.getSqlShortFormLength()) 694 + " failed on all local backends but succeeded on other controllers. Disabling all local backends."); 695 696 try 697 { 698 dvdb.disableAllBackends(true); 699 } 700 catch (VirtualDatabaseException e) 701 { 702 logger.error("An error occured while disabling all backends", e); 703 } 704 } 705 } 706 707 711 public void distributedClosePersistentConnection(String login, 712 long persistentConnectionId) 713 { 714 List groupMembers = dvdb.getCurrentGroup().getMembers(); 715 716 if (logger.isDebugEnabled()) 717 logger.debug("Broadcasting closing persistent connection " 718 + persistentConnectionId + " for user " + login 719 + " to all controllers (" + dvdb.getChannel().getLocalMembership() 720 + "->" + groupMembers.toString() + ")"); 721 722 try 724 { 725 dvdb.getMulticastRequestAdapter().multicastMessage( 726 groupMembers, 727 new DistributedClosePersistentConnection(login, 728 persistentConnectionId), MulticastRequestAdapter.WAIT_ALL, 0); 729 } 730 catch (Exception e) 731 { 732 String msg = "An error occured while executing distributed persistent connection " 733 + persistentConnectionId + " closing"; 734 logger.warn(msg, e); 735 } 736 737 if (logger.isDebugEnabled()) 738 logger.debug("Persistent connection " + persistentConnectionId 739 + " closed."); 740 } 741 742 746 public void distributedOpenPersistentConnection(String login, 747 long persistentConnectionId) throws SQLException 748 { 749 List groupMembers = dvdb.getCurrentGroup().getMembers(); 750 751 if (logger.isDebugEnabled()) 752 logger.debug("Broadcasting opening persistent connection " 753 + persistentConnectionId + " for user " + login 754 + " to all controllers (" + dvdb.getChannel().getLocalMembership() 755 + "->" + groupMembers.toString() + ")"); 756 757 boolean success = false; 758 Exception exception = null; 759 try 760 { 761 MulticastResponse responses = dvdb.getMulticastRequestAdapter() 763 .multicastMessage( 764 groupMembers, 765 new DistributedOpenPersistentConnection(login, 766 persistentConnectionId), MulticastRequestAdapter.WAIT_ALL, 0); 767 768 groupMembers = dvdb.getAllMembers(); 770 int size = groupMembers.size(); 771 ArrayList failedControllers = null; 772 for (int i = 0; i < size; i++) 774 { 775 Member member = (Member) groupMembers.get(i); 776 if ((responses.getFailedMembers() != null) 777 && responses.getFailedMembers().contains(member)) 778 { 779 logger.warn("Controller " + member + " is suspected of failure."); 780 continue; 781 } 782 Object r = responses.getResult(member); 783 if (r instanceof Boolean ) 784 { 785 if (((Boolean ) r).booleanValue()) 786 success = true; 787 else 788 logger.error("Unexpected result for controller " + member); 789 } 790 else if (r instanceof Exception ) 791 { 792 if (failedControllers == null) 793 failedControllers = new ArrayList (); 794 failedControllers.add(member); 795 if (exception == null) 796 exception = (Exception ) r; 797 if (logger.isDebugEnabled()) 798 logger.debug("Controller " + member 799 + " failed to open persistent connection " 800 + persistentConnectionId + " (" + r + ")"); 801 } 802 } 803 804 808 if (failedControllers != null) 809 { 810 UnknownWriteRequest notifRequest = new UnknownWriteRequest("open " 811 + persistentConnectionId, false, 0, null); 812 notifRequest.setPersistentConnection(true); 813 notifRequest.setPersistentConnectionId(persistentConnectionId); 814 notifyRequestCompletion(notifRequest, success, false, failedControllers); 815 } 816 } 817 catch (Exception e) 818 { 819 String msg = "An error occured while executing distributed persistent connection " 820 + persistentConnectionId + " opening"; 821 logger.warn(msg, e); 822 } 823 824 if (success) 825 { 826 if (logger.isDebugEnabled()) 827 logger.debug("Persistent connection " + persistentConnectionId 828 + " opened."); 829 return; } 831 832 String msg = "Failed to open persistent connection " 834 + persistentConnectionId + " on all controllers (" + exception + ")"; 835 logger.warn(msg); 836 throw new SQLException (msg); 837 } 838 839 845 public void distributedFailoverForPersistentConnection( 846 long persistentConnectionId) 847 { 848 List groupMembers = dvdb.getCurrentGroup().getMembers(); 849 850 if (logger.isDebugEnabled()) 851 logger.debug("Broadcasting failover for persistent connection " 852 + persistentConnectionId + " to all controllers (" 853 + dvdb.getChannel().getLocalMembership() + "->" 854 + groupMembers.toString() + ")"); 855 856 try 858 { 859 dvdb.getMulticastRequestAdapter().multicastMessage(groupMembers, 860 new FailoverForPersistentConnection(persistentConnectionId), 861 MulticastRequestAdapter.WAIT_ALL, 0); 862 } 863 catch (Exception e) 864 { 865 String msg = "An error occured while notifying distributed persistent connection " 866 + persistentConnectionId + " failover"; 867 logger.warn(msg, e); 868 } 869 } 870 871 877 public void distributedFailoverForTransaction(long currentTid) 878 { 879 List groupMembers = dvdb.getCurrentGroup().getMembers(); 880 881 if (logger.isDebugEnabled()) 882 logger.debug("Broadcasting failover for transaction " + currentTid 883 + " to all controllers (" + dvdb.getChannel().getLocalMembership() 884 + "->" + groupMembers.toString() + ")"); 885 886 try 888 { 889 dvdb.getMulticastRequestAdapter().multicastMessage(groupMembers, 890 new FailoverForTransaction(currentTid), 891 MulticastRequestAdapter.WAIT_ALL, 0); 892 } 893 catch (Exception e) 894 { 895 String msg = "An error occured while notifying distributed persistent connection " 896 + currentTid + " failover"; 897 logger.warn(msg, e); 898 } 899 } 900 901 905 909 public void abort(long transactionId, boolean logAbort, boolean forceAbort) 910 throws SQLException 911 { 912 Long lTid = new Long (transactionId); 913 TransactionMetaData tm; 914 try 915 { 916 tm = getTransactionMetaData(lTid); 917 if (!forceAbort && tidSavepoints.get(lTid) != null) 918 { 919 if (logger.isDebugEnabled()) 920 logger.debug("Transaction " + transactionId 921 + " has savepoints, transaction will not be aborted"); 922 return; 923 } 924 } 925 catch (SQLException e1) 926 { 927 logger.warn("No transaction metadata found to abort transaction " 928 + transactionId + ". Creating a fake context for abort."); 929 tm = new TransactionMetaData(transactionId, 0, RecoveryLog.UNKNOWN_USER, 932 false, 0); 933 if (tidSavepoints.get(lTid) != null) 934 { 935 if (logger.isDebugEnabled()) 936 logger.debug("Transaction " + transactionId 937 + " has savepoints, transaction will not be aborted"); 938 return; 939 } 940 } 941 942 boolean isAWriteTransaction; 943 synchronized (distributedTransactions) 944 { 945 isAWriteTransaction = distributedTransactions.contains(lTid); 946 } 947 if (isAWriteTransaction) 948 { 949 distributedAbort(tm.getLogin(), transactionId); 950 } 951 else 952 { 953 LinkedList totalOrderQueue = dvdb.getTotalOrderQueue(); 957 synchronized (totalOrderQueue) 958 { 959 totalOrderQueue.addLast(new DistributedRollback(tm.getLogin(), 960 transactionId)); 961 } 962 super.abort(transactionId, logAbort, forceAbort); 963 } 964 } 965 966 971 public long begin(String login, boolean isPersistentConnection, 972 long persistentConnectionId) throws SQLException 973 { 974 long tid = scheduler.getNextTransactionId(); 975 tid = tid & TRANSACTION_ID_BIT_MASK; 978 tid = tid | controllerId; 979 doBegin(login, tid, isPersistentConnection, persistentConnectionId); 980 return tid; 981 } 982 983 987 public void commit(long transactionId, boolean logCommit, 988 boolean emptyTransaction) throws SQLException 989 { 990 Long lTid = new Long (transactionId); 991 TransactionMetaData tm = getTransactionMetaData(lTid); 992 boolean isAWriteTransaction; 993 synchronized (distributedTransactions) 994 { 995 isAWriteTransaction = distributedTransactions.contains(lTid); 996 } 997 if (isAWriteTransaction) 998 { 999 distributedCommit(tm.getLogin(), transactionId); 1000 } 1001 else 1002 { 1003 DistributedCommit commit = new DistributedCommit(tm.getLogin(), 1005 transactionId); 1006 if (!emptyTransaction) 1007 { 1008 LinkedList totalOrderQueue = dvdb.getTotalOrderQueue(); 1009 synchronized (totalOrderQueue) 1010 { 1011 totalOrderQueue.addLast(commit); 1012 } 1013 } 1014 try 1015 { 1016 super.commit(transactionId, logCommit, emptyTransaction); 1017 } 1018 catch (SQLException e) 1019 { 1020 if (logger.isWarnEnabled()) 1021 { 1022 logger 1023 .warn("Ignoring failure of commit for read-only transaction, exception was: " 1024 + e); 1025 } 1026 1027 scheduler.commit(tm, emptyTransaction, commit); 1029 scheduler.commitCompleted(tm, true); 1030 1031 completeTransaction(lTid); 1033 } 1034 } 1035 } 1036 1037 1040 public void completeTransaction(Long tid) 1041 { 1042 synchronized (distributedTransactions) 1043 { 1044 distributedTransactions.remove(tid); 1045 } 1046 super.completeTransaction(tid); 1047 } 1048 1049 1058 public void lazyTransactionStart(AbstractRequest request) throws SQLException 1059 { 1060 if (!request.isAutoCommit()) 1063 { 1064 long tid = request.getTransactionId(); 1065 Long lTid = new Long (tid); 1066 TransactionMetaData tm = (TransactionMetaData) transactionMetaDatas 1067 .get(lTid); 1068 1069 if ((tid & CONTROLLER_ID_BIT_MASK) == controllerId) 1070 { if (tm == null) 1072 logger.error("Unexpected non-started local transaction " + lTid); 1073 else 1074 { 1075 if (tm.isReadOnly()) 1076 { 1077 request.setIsLazyTransactionStart(true); 1078 tm.setReadOnly(false); 1079 } 1080 } 1081 } 1082 else 1083 { if (tm != null) 1085 { 1086 1090 if (tm.isReadOnly()) 1091 { 1092 request.setIsLazyTransactionStart(true); 1093 tm.setReadOnly(false); 1094 } 1095 return; } 1097 try 1099 { 1100 tm = new TransactionMetaData(tid, beginTimeout, request.getLogin(), 1101 request.isPersistentConnection(), request 1102 .getPersistentConnectionId()); 1103 tm.setReadOnly(false); 1104 1105 if (logger.isDebugEnabled()) 1106 logger.debug(Translate.get("transaction.begin.lazy", String 1107 .valueOf(tid))); 1108 1109 scheduler.begin(tm, true, request); 1110 1111 try 1112 { 1113 loadBalancer.begin(tm); 1115 1116 transactionMetaDatas.put(lTid, tm); 1119 request.setIsLazyTransactionStart(true); 1120 1121 synchronized (distributedTransactions) 1122 { 1123 if (!distributedTransactions.contains(lTid)) 1124 distributedTransactions.add(lTid); 1125 } 1126 } 1127 catch (SQLException e) 1128 { 1129 if (recoveryLog != null) 1130 transactionMetaDatas.remove(lTid); 1132 throw e; 1133 } 1134 finally 1135 { 1136 scheduler.beginCompleted(tid); 1138 } 1139 } 1140 catch (RuntimeException e) 1141 { 1142 String msg = Translate 1143 .get("fatal.runtime.exception.requestmanager.begin"); 1144 logger.fatal(msg, e); 1145 endUserLogger.fatal(msg); 1146 throw new SQLException (e.getMessage()); 1147 } 1148 } 1149 } 1150 } 1151 1152 1156 public void rollback(long transactionId, boolean logRollback) 1157 throws SQLException 1158 { 1159 Long lTid = new Long (transactionId); 1160 TransactionMetaData tm = getTransactionMetaData(lTid); 1161 boolean isAWriteTransaction; 1162 synchronized (distributedTransactions) 1163 { 1164 isAWriteTransaction = distributedTransactions.contains(lTid); 1165 } 1166 if (isAWriteTransaction) 1167 { 1168 distributedRollback(tm.getLogin(), transactionId); 1169 } 1170 else 1171 { 1172 DistributedRollback rollback = new DistributedRollback(tm.getLogin(), 1174 transactionId); 1175 LinkedList totalOrderQueue = dvdb.getTotalOrderQueue(); 1176 synchronized (totalOrderQueue) 1177 { 1178 totalOrderQueue.addLast(rollback); 1179 } 1180 try 1181 { 1182 super.rollback(transactionId, logRollback); 1183 } 1184 catch (SQLException e) 1185 { 1186 if (logger.isWarnEnabled()) 1187 { 1188 logger 1189 .warn("Ignoring failure of rollback for read-only transaction, exception was: " 1190 + e); 1191 } 1192 1193 try 1195 { 1196 scheduler.rollback(tm, rollback); 1197 } 1198 catch (SQLException ignore) 1199 { 1200 } 1201 scheduler.rollbackCompleted(tm, true); 1202 1203 completeTransaction(lTid); 1205 } 1206 } 1207 } 1208 1209 1213 public void rollback(long transactionId, String savepointName) 1214 throws SQLException 1215 { 1216 Long lTid = new Long (transactionId); 1217 boolean isAWriteTransaction; 1218 synchronized (distributedTransactions) 1219 { 1220 isAWriteTransaction = distributedTransactions.contains(lTid); 1221 } 1222 if (isAWriteTransaction) 1223 { 1224 TransactionMetaData tm = getTransactionMetaData(lTid); 1225 distributedRollback(tm.getLogin(), transactionId, savepointName); 1226 } 1227 else 1228 { LinkedList totalOrderQueue = dvdb.getTotalOrderQueue(); 1230 synchronized (totalOrderQueue) 1231 { 1232 totalOrderQueue.addLast(new DistributedRollbackToSavepoint( 1233 transactionId, savepointName)); 1234 } 1235 super.rollback(transactionId, savepointName); 1236 } 1237 } 1238 1239 1242 public int setSavepoint(long transactionId) throws SQLException 1243 { 1244 Long lTid = new Long (transactionId); 1245 int savepointId = scheduler.incrementSavepointId(); 1246 TransactionMetaData tm = getTransactionMetaData(lTid); 1247 synchronized (distributedTransactions) 1248 { 1249 if (!distributedTransactions.contains(lTid)) 1250 distributedTransactions.add(lTid); 1251 } 1252 distributedSetSavepoint(tm.getLogin(), transactionId, String 1253 .valueOf(savepointId)); 1254 return savepointId; 1255 } 1256 1257 1261 public void setSavepoint(long transactionId, String name) throws SQLException 1262 { 1263 Long lTid = new Long (transactionId); 1264 TransactionMetaData tm = getTransactionMetaData(lTid); 1265 synchronized (distributedTransactions) 1266 { 1267 if (!distributedTransactions.contains(lTid)) 1268 distributedTransactions.add(lTid); 1269 } 1270 distributedSetSavepoint(tm.getLogin(), transactionId, name); 1271 } 1272 1273 1277 public void releaseSavepoint(long transactionId, String name) 1278 throws SQLException 1279 { 1280 Long lTid = new Long (transactionId); 1281 boolean isAWriteTransaction; 1282 synchronized (distributedTransactions) 1283 { 1284 isAWriteTransaction = distributedTransactions.contains(lTid); 1285 } 1286 if (isAWriteTransaction) 1287 { 1288 TransactionMetaData tm = getTransactionMetaData(lTid); 1289 distributedReleaseSavepoint(tm.getLogin(), transactionId, name); 1290 } 1291 else 1292 { 1293 LinkedList totalOrderQueue = dvdb.getTotalOrderQueue(); 1295 synchronized (totalOrderQueue) 1296 { 1297 totalOrderQueue.addLast(new DistributedReleaseSavepoint(transactionId, 1298 name)); 1299 } 1300 super.releaseSavepoint(transactionId, name); 1301 } 1302 } 1303 1304 1309 private void addToDistributedTransactionListIfNeeded(AbstractRequest request) 1310 { 1311 if (!request.isAutoCommit()) 1313 { 1314 Long lTid = new Long (request.getTransactionId()); 1315 synchronized (distributedTransactions) 1316 { 1317 if (!distributedTransactions.contains(lTid)) 1318 distributedTransactions.add(lTid); 1319 } 1320 } 1321 } 1322 1323 1331 public void closePersistentConnection(Long connectionId) 1332 { 1333 String vLogin = scheduler.getPersistentConnectionLogin(connectionId); 1334 if (vLogin != null) 1335 super.closePersistentConnection(vLogin, connectionId.longValue()); 1336 } 1337 1338 1348 public ControllerResultSet execLocalStatementExecuteQuery( 1349 SelectRequest request) throws NoMoreBackendException, SQLException 1350 { 1351 return super.statementExecuteQuery(request); 1352 } 1353 1354 1362 public abstract ControllerResultSet execRemoteStatementExecuteQuery( 1363 SelectRequest request) throws SQLException ; 1364 1365 1368 public ControllerResultSet statementExecuteQuery(SelectRequest request) 1369 throws SQLException 1370 { 1371 if (!request.isMustBroadcast()) 1372 { 1373 try 1374 { 1375 return execLocalStatementExecuteQuery(request); 1376 } 1377 catch (SQLException e) 1378 { 1379 if (!(e instanceof NoMoreBackendException)) 1380 throw e; 1381 addToDistributedTransactionListIfNeeded(request); 1384 return execRemoteStatementExecuteQuery(request); 1385 } 1386 } 1387 addToDistributedTransactionListIfNeeded(request); 1388 return distributedStatementExecuteQuery(request); 1389 } 1390 1391 1394 public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request) 1395 throws SQLException 1396 { 1397 if (!request.isAutoCommit()) 1398 { 1403 Long lTid = new Long (request.getTransactionId()); 1404 synchronized (distributedTransactions) 1405 { 1406 if (!distributedTransactions.contains(lTid)) 1407 distributedTransactions.add(lTid); 1408 } 1409 } 1410 return distributedStatementExecuteUpdate(request); 1411 } 1412 1413 1416 public GeneratedKeysResult statementExecuteUpdateWithKeys( 1417 AbstractWriteRequest request) throws SQLException 1418 { 1419 if (!request.isAutoCommit()) 1420 { 1425 Long lTid = new Long (request.getTransactionId()); 1426 synchronized (distributedTransactions) 1427 { 1428 if (!distributedTransactions.contains(lTid)) 1429 distributedTransactions.add(lTid); 1430 } 1431 } 1432 return distributedStatementExecuteUpdateWithKeys(request); 1433 } 1434 1435 1438 public ExecuteResult statementExecute(AbstractRequest request) 1439 throws SQLException 1440 { 1441 if (!request.isAutoCommit()) 1442 { 1447 Long lTid = new Long (request.getTransactionId()); 1448 synchronized (distributedTransactions) 1449 { 1450 if (!distributedTransactions.contains(lTid)) 1451 distributedTransactions.add(lTid); 1452 } 1453 } 1454 return distributedStatementExecute(request); 1455 } 1456 1457 1460 public void scheduleExecWriteRequest(AbstractWriteRequest request) 1461 throws SQLException 1462 { 1463 lazyTransactionStart(request); 1464 super.scheduleExecWriteRequest(request); 1465 } 1466 1467 1470 public ControllerResultSet callableStatementExecuteQuery(StoredProcedure proc) 1471 throws SQLException 1472 { 1473 getParsingFromCacheOrParse(proc); 1475 1476 SemanticBehavior semantic = proc.getSemantic(); 1478 if (proc.isReadOnly() || ((semantic != null) && (semantic.isReadOnly()))) 1479 { 1480 try 1481 { 1482 proc.setIsReadOnly(true); 1483 return execLocallyCallableStatementExecuteQuery(proc); 1484 } 1485 catch (AllBackendsFailedException ignore) 1486 { 1487 } 1489 catch (SQLException e) 1490 { 1491 if (!(e instanceof NoMoreBackendException)) 1492 throw e; 1493 } 1495 } 1496 1497 addToDistributedTransactionListIfNeeded(proc); 1498 return distributedCallableStatementExecuteQuery(proc); 1499 } 1500 1501 1504 public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc) 1505 throws SQLException 1506 { 1507 if (!proc.isAutoCommit()) 1508 { 1513 Long lTid = new Long (proc.getTransactionId()); 1514 synchronized (distributedTransactions) 1515 { 1516 if (!distributedTransactions.contains(lTid)) 1517 distributedTransactions.add(lTid); 1518 } 1519 } 1520 return distributedCallableStatementExecuteUpdate(proc); 1521 } 1522 1523 1526 public ExecuteResult callableStatementExecute(StoredProcedure proc) 1527 throws SQLException 1528 { 1529 getParsingFromCacheOrParse(proc); 1531 1532 SemanticBehavior semantic = proc.getSemantic(); 1534 if (proc.isReadOnly() || ((semantic != null) && (semantic.isReadOnly()))) 1535 { 1536 try 1537 { 1538 proc.setIsReadOnly(true); 1539 return execLocallyCallableStatementExecute(proc); 1540 } 1541 catch (AllBackendsFailedException ignore) 1542 { 1543 } 1545 catch (SQLException e) 1546 { 1547 if (!(e instanceof NoMoreBackendException)) 1548 throw e; 1549 } 1551 } 1552 1553 if (!proc.isAutoCommit()) 1554 { 1559 Long lTid = new Long (proc.getTransactionId()); 1560 synchronized (distributedTransactions) 1561 { 1562 if (!distributedTransactions.contains(lTid)) 1563 distributedTransactions.add(lTid); 1564 } 1565 } 1566 return distributedCallableStatementExecute(proc); 1567 } 1568 1569 1579 protected Serializable getRequestResultFromFailoverCache( 1580 List successfulControllers, long id) throws NoResultAvailableException 1581 { 1582 List groupMembers = new ArrayList (1); 1583 1584 for (Iterator iter = successfulControllers.iterator(); iter.hasNext();) 1586 { 1587 Member remoteController = (Member) iter.next(); 1588 groupMembers.clear(); 1589 groupMembers.add(remoteController); 1590 1591 if (logger.isDebugEnabled()) 1592 logger.debug("Getting result for request " + id + " from controllers " 1593 + remoteController); 1594 1595 try 1596 { MulticastResponse response = dvdb.getMulticastRequestAdapter() 1598 .multicastMessage(groupMembers, 1599 new GetRequestResultFromFailoverCache(id), 1600 MulticastRequestAdapter.WAIT_ALL, 0); 1601 Serializable result = response.getResult(remoteController); 1602 1603 if ((result instanceof Exception ) 1604 || (response.getFailedMembers() != null)) 1605 { if (logger.isInfoEnabled()) 1607 logger.info("Controller " + remoteController 1608 + " could not fetch result for request " + id, 1609 (Exception ) result); 1610 } 1611 else 1612 return result; 1613 } 1614 catch (Exception e) 1615 { 1616 String msg = "An error occured while getching result for request " + id 1617 + " from controller " + remoteController; 1618 logger.warn(msg, e); 1619 } 1620 } 1621 throw new NoResultAvailableException( 1622 "All controllers failed when trying to fetch result for request " + id); 1623 } 1624 1625 1638 public boolean storeRequestResult(AbstractRequest request, Serializable result) 1639 { 1640 if ((request.getId() & CONTROLLER_ID_BIT_MASK) != dvdb.getControllerId()) 1642 { 1643 dvdb.getRequestResultFailoverCache().store(request, result); 1644 return true; 1645 } 1646 return false; 1647 } 1648 1649 1653 1660 public abstract void distributedAbort(String login, long transactionId) 1661 throws SQLException ; 1662 1663 1670 public abstract void distributedCommit(String login, long transactionId) 1671 throws SQLException ; 1672 1673 1680 public abstract void distributedRollback(String login, long transactionId) 1681 throws SQLException ; 1682 1683 1691 public abstract void distributedRollback(String login, long transactionId, 1692 String savepointName) throws SQLException ; 1693 1694 1702 public abstract void distributedSetSavepoint(String login, 1703 long transactionId, String name) throws SQLException ; 1704 1705 1713 public abstract void distributedReleaseSavepoint(String login, 1714 long transactionId, String name) throws SQLException ; 1715 1716 1724 public abstract ControllerResultSet distributedStatementExecuteQuery( 1725 SelectRequest request) throws SQLException ; 1726 1727 1734 public abstract ExecuteUpdateResult distributedStatementExecuteUpdate( 1735 AbstractWriteRequest request) throws SQLException ; 1736 1737 1745 public abstract GeneratedKeysResult distributedStatementExecuteUpdateWithKeys( 1746 AbstractWriteRequest request) throws SQLException ; 1747 1748 1755 public abstract ExecuteResult distributedStatementExecute( 1756 AbstractRequest request) throws SQLException ; 1757 1758 1766 public abstract ControllerResultSet distributedCallableStatementExecuteQuery( 1767 StoredProcedure proc) throws SQLException ; 1768 1769 1777 public abstract ExecuteUpdateResult distributedCallableStatementExecuteUpdate( 1778 StoredProcedure proc) throws SQLException ; 1779 1780 1788 public abstract ExecuteResult distributedCallableStatementExecute( 1789 StoredProcedure proc) throws SQLException ; 1790 1791 1801 public ControllerResultSet execLocallyCallableStatementExecuteQuery( 1802 StoredProcedure proc) throws AllBackendsFailedException, SQLException 1803 { 1804 return super.callableStatementExecuteQuery(proc); 1805 } 1806 1807 1817 public ExecuteResult execLocallyCallableStatementExecute(StoredProcedure proc) 1818 throws AllBackendsFailedException, SQLException 1819 { 1820 return super.callableStatementExecute(proc); 1821 } 1822 1823 1831 public boolean isDistributedTransaction(long currentTid) 1832 { 1833 synchronized (distributedTransactions) 1834 { 1835 return distributedTransactions.contains(new Long (currentTid)); 1836 } 1837 } 1838 1839 1842 public void resumeActivity() 1843 { 1844 try 1847 { 1848 dvdb.getMulticastRequestAdapter().multicastMessage(dvdb.getAllMembers(), 1850 new ResumeActivity(), MulticastRequestAdapter.WAIT_ALL, 1851 dvdb.getMessageTimeouts().getDisableBackendTimeout()); 1852 } 1853 catch (Exception e) 1854 { 1855 String msg = "Error while resuming activity"; 1856 logger.error(msg, e); 1857 } 1858 } 1859 1860 1863 public void suspendActivity() throws SQLException 1864 { 1865 try 1868 { 1869 dvdb.getMulticastRequestAdapter().multicastMessage(dvdb.getAllMembers(), 1871 new SuspendActivity(), MulticastRequestAdapter.WAIT_ALL, 1872 dvdb.getMessageTimeouts().getDisableBackendTimeout()); 1873 logger.info("All activity is suspended for " + dvdb.getDatabaseName()); 1874 } 1875 catch (Exception e) 1876 { 1877 String msg = "Error while suspending activity"; 1878 logger.error(msg, e); 1879 throw (SQLException ) new SQLException (msg + "(" + e + ")").initCause(e); 1880 } 1881 } 1882 1883 1891 protected void notifyControllerInconsistency(AbstractRequest request, 1892 ArrayList inconsistentControllers) throws SQLException 1893 { 1894 try 1895 { 1896 dvdb.getMulticastRequestAdapter().multicastMessage( 1897 inconsistentControllers, new NotifyInconsistency(request), 1898 MulticastRequestAdapter.WAIT_ALL, 0); 1899 } 1900 catch (Exception e) 1901 { 1902 String msg = "An error occured while notifying controllers (" 1903 + inconsistentControllers 1904 + ") of inconsistency due to distributed request " + request.getId(); 1905 logger.warn(msg, e); 1906 throw new SQLException (msg + " (" + e + ")"); 1907 } 1908 } 1909 1910 1922 protected void notifyRequestCompletion(AbstractRequest request, 1923 boolean success, boolean disableBackendOnSuccess, 1924 ArrayList backendsToNotify) throws SQLException 1925 { 1926 if (backendsToNotify == null) 1927 return; 1928 try 1929 { 1930 dvdb.getMulticastRequestAdapter().multicastMessage(backendsToNotify, 1931 new NotifyCompletion(request, success, disableBackendOnSuccess), 1932 MulticastRequestAdapter.WAIT_ALL, 1933 dvdb.getMessageTimeouts().getNotifyCompletionTimeout()); 1934 } 1935 catch (Exception e) 1936 { 1937 String msg = "An error occured while notifying all controllers of failure of distributed request " 1938 + request.getId(); 1939 logger.warn(msg, e); 1940 throw new SQLException (msg + " (" + e + ")"); 1941 } 1942 } 1943 1944 1958 protected void notifyRequestCompletion(AbstractRequest request, 1959 boolean success, boolean disableBackendOnSuccess, 1960 ArrayList backendsToNotify, int requestUpdateCount) throws SQLException 1961 { 1962 if (backendsToNotify == null) 1963 return; 1964 try 1965 { 1966 dvdb.getMulticastRequestAdapter().multicastMessage( 1967 backendsToNotify, 1968 new NotifyCompletion(request, success, disableBackendOnSuccess, 1969 requestUpdateCount), MulticastRequestAdapter.WAIT_ALL, 1970 dvdb.getMessageTimeouts().getNotifyCompletionTimeout()); 1971 } 1972 catch (Exception e) 1973 { 1974 String msg = "An error occured while notifying all controllers of failure of distributed request " 1975 + request.getId(); 1976 logger.warn(msg, e); 1977 throw new SQLException (msg + " (" + e + ")"); 1978 } 1979 } 1980 1981 1991 public void cleanupRollbackFromOtherController(long tId) 1992 { 1993 long cid = this.getControllerId(); 1994 synchronized (failedOnAllBackends) 1995 { 1996 for (Iterator iter = failedOnAllBackends.keySet().iterator(); iter 1997 .hasNext();) 1998 { 1999 AbstractRequest request = (AbstractRequest) iter.next(); 2000 if (((request.getId() & CONTROLLER_ID_BIT_MASK) != cid) 2001 || ((request.getTransactionId() & CONTROLLER_ID_BIT_MASK) != cid) 2002 && request.getTransactionId() == tId) 2003 { if (logger.isInfoEnabled()) 2005 logger.info("Failover while rollbacking the transaction " + tId 2006 + " detected. No status information received for request " 2007 + request + ", considering status as failed."); 2008 FailureInformation failureInfo = (FailureInformation) failedOnAllBackends 2009 .get(request); 2010 2011 boolean isAbortOrRollback = (request instanceof UnknownWriteRequest) 2015 && ("rollback".equals(request.getSqlOrTemplate()) || "abort" 2016 .equals(request.getSqlOrTemplate())); 2017 2018 logRequestCompletionAndNotifyScheduler(request, isAbortOrRollback, 2019 failureInfo, -1); 2020 iter.remove(); 2021 } 2022 } 2023 } 2024 } 2025} | Popular Tags |