1 22 23 package org.continuent.sequoia.controller.loadbalancer; 24 25 import java.sql.SQLException ; 26 import java.sql.Statement ; 27 import java.util.ArrayList ; 28 import java.util.ConcurrentModificationException ; 29 import java.util.Iterator ; 30 import java.util.LinkedList ; 31 import java.util.List ; 32 import java.util.SortedSet ; 33 34 import org.continuent.sequoia.common.log.Trace; 35 import org.continuent.sequoia.controller.backend.DatabaseBackend; 36 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy; 37 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask; 38 import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask; 39 import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask; 40 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask; 41 import org.continuent.sequoia.controller.locks.DeadlockDetectionThread; 42 import org.continuent.sequoia.controller.locks.TransactionLogicalLock; 43 import org.continuent.sequoia.controller.requestmanager.RequestManager; 44 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 45 import org.continuent.sequoia.controller.requests.AbstractRequest; 46 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 47 import org.continuent.sequoia.controller.requests.ParsingGranularities; 48 import org.continuent.sequoia.controller.requests.SelectRequest; 49 import org.continuent.sequoia.controller.requests.StoredProcedure; 50 import org.continuent.sequoia.controller.semantic.SemanticBehavior; 51 import org.continuent.sequoia.controller.semantic.SemanticManager; 52 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema; 53 import org.continuent.sequoia.controller.sql.schema.DatabaseTable; 54 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 55 56 63 public class BackendTaskQueues 64 { 65 66 private LinkedList totalOrderQueue; 67 71 private LinkedList storedProcedureQueue; 72 76 private LinkedList conflictingRequestsQueue; 77 81 private LinkedList nonConflictingRequestsQueue; 82 83 private DatabaseBackend backend; 84 private WaitForCompletionPolicy waitForCompletionPolicy; 85 private RequestManager requestManager; 86 private boolean allowTasksToBePosted; 87 private static final Object ALLOW_TASKS_SYNC = new Object (); 88 89 private DeadlockDetectionThread deadlockDetectionThread; 90 91 private int storedProcedureInQueue = 0; 94 95 private int writesWithMultipleLocks = 0; 96 private Trace logger; 97 98 105 public BackendTaskQueues(DatabaseBackend backend, 106 WaitForCompletionPolicy waitForCompletionPolicy, 107 RequestManager requestManager) 108 { 109 this.backend = backend; 110 this.logger = backend.getLogger(); 111 this.waitForCompletionPolicy = waitForCompletionPolicy; 112 this.requestManager = requestManager; 113 totalOrderQueue = new LinkedList (); 114 storedProcedureQueue = new LinkedList (); 115 conflictingRequestsQueue = new LinkedList (); 116 nonConflictingRequestsQueue = new LinkedList (); 117 allowTasksToBePosted = false; 118 } 119 120 126 public boolean abortAllQueriesForTransaction(long tid) 127 { 128 synchronized (this) 129 { 130 boolean rollbackInProgress = abortAllQueriesEvenRunningInTransaction(tid, 131 storedProcedureQueue); 132 if (abortAllQueriesEvenRunningInTransaction(tid, conflictingRequestsQueue)) 133 rollbackInProgress = true; 134 if (abortAllQueriesEvenRunningInTransaction(tid, 135 nonConflictingRequestsQueue)) 136 rollbackInProgress = true; 137 return rollbackInProgress; 138 } 139 } 140 141 149 private boolean abortAllQueriesEvenRunningInTransaction(long tid, 150 LinkedList queue) 151 { 152 boolean rollbackInProgress = false; 153 synchronized (queue) 154 { 155 Long lTid = new Long (tid); 156 for (Iterator iter = queue.iterator(); iter.hasNext();) 157 { 158 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 159 boolean isProcessing = false; 160 AbstractTask task = entry.getTask(); 161 if (task.getTransactionId() == tid) 162 { 163 if (task instanceof RollbackTask) 164 rollbackInProgress = true; 165 else 166 { 172 if (!task.isAutoCommit() && !backend.isStartedTransaction(lTid)) 173 backend.startTransaction(lTid); 174 175 if (logger.isDebugEnabled()) 176 logger.debug("Aborting request " + task.getRequest() 177 + " on backend " + backend.getName()); 178 179 BackendWorkerThread processingThread = entry.getProcessingThread(); 180 if (processingThread != null) 181 { isProcessing = true; 183 Statement s = processingThread.getCurrentStatement(); 184 if (s != null) 185 { 186 try 187 { 188 s.cancel(); 189 } 190 catch (SQLException e) 191 { 192 logger.warn("Unable to cancel execution of request", e); 193 } 194 catch (NullPointerException e) 195 { 196 if (logger.isWarnEnabled()) 197 logger 198 .warn( 199 "Ignoring NullPointerException caused by Connector/J 5.0.4 bug #24721", 200 e); 201 } 202 } 203 } 204 if (!task.hasCompleted()) 205 { 206 try 207 { 208 if (processingThread == null) 209 { processingThread = backend 212 .getBackendWorkerThreadForNotification(); 213 if (processingThread == null) 214 { logger 217 .warn("No worker thread found for request abort notification, creating fake worker thread"); 218 processingThread = new BackendWorkerThread(backend, 219 requestManager.getLoadBalancer()); 220 } 221 } 222 task.notifyFailure(processingThread, -1L, new SQLException ( 223 "Transaction aborted due to deadlock")); 224 } 225 catch (SQLException ignore) 226 { 227 } 228 } 229 if (!isProcessing) 230 { 231 235 completedEntryExecution(entry, iter); 236 } 237 } 238 } 239 } 240 } 241 return rollbackInProgress; 242 } 243 244 249 public void abortRemainingRequests() 250 { 251 setAllowTasksToBePosted(false); 252 abortRemainingRequests(storedProcedureQueue); 253 abortRemainingRequests(conflictingRequestsQueue); 254 abortRemainingRequests(nonConflictingRequestsQueue); 255 } 256 257 262 private void abortRemainingRequests(LinkedList queue) 263 { 264 synchronized (queue) 265 { 266 for (Iterator iter = queue.iterator(); iter.hasNext();) 267 { 268 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 269 AbstractTask task = entry.getTask(); 270 271 if (task instanceof KillThreadTask) 273 continue; 274 275 if (entry.getProcessingThread() != null) 276 { logger.warn("A worker thread was still processing task " + task 278 + ", aborting the request execution."); 279 Statement s = entry.getProcessingThread().getCurrentStatement(); 280 if (s != null) 281 { 282 try 283 { 284 s.cancel(); 285 } 286 catch (SQLException e) 287 { 288 logger.warn("Unable to cancel execution of request", e); 289 } 290 } 291 } 292 if (!task.hasCompleted()) 293 { 294 if (logger.isDebugEnabled()) 295 logger.debug("Cancelling task " + task); 296 task.notifyCompletion(entry.getProcessingThread()); 297 } 298 completedEntryExecution(entry, iter); 299 } 300 } 301 } 302 303 308 public final void addTaskToBackendTotalOrderQueue(AbstractTask task) 309 { 310 synchronized (this) 311 { 312 synchronized (totalOrderQueue) 313 { 314 totalOrderQueue.addLast(task); 315 } 316 317 322 this.notifyAll(); 323 } 324 } 325 326 333 public final void addTaskToBackendTotalOrderQueue(AbstractTask task, 334 int queueSize) 335 { 336 synchronized (this) 337 { 338 boolean mustNotify = false; 339 do 340 { 341 synchronized (totalOrderQueue) 342 { 343 if (totalOrderQueue.size() < queueSize) 344 { 345 totalOrderQueue.addLast(task); 346 mustNotify = true; 347 } 348 } 349 350 if (mustNotify) 351 { 352 357 this.notifyAll(); 358 return; } 360 else 361 { 362 try 363 { this.wait(); 365 } 366 catch (InterruptedException e) 367 { 368 } 369 } 370 } 371 while (!mustNotify); 372 } 373 } 374 375 380 private void addTaskInConflictingRequestsQueue(AbstractTask task) 381 { 382 addTaskToQueue(conflictingRequestsQueue, task, false); 383 } 384 385 391 private void addTaskInNonConflictingRequestsQueue(AbstractTask task, 392 boolean isACommitOrRollback) 393 { 394 addTaskToQueue(nonConflictingRequestsQueue, task, isACommitOrRollback); 395 } 396 397 402 private void addTaskInStoredProcedureQueue(AbstractTask task) 403 { 404 addTaskToQueue(storedProcedureQueue, task, false); 405 } 406 407 415 private void addTaskToQueue(LinkedList queue, AbstractTask task, 416 boolean isACommitOrRollback) 417 { 418 if (!allowTasksToBePosted()) 419 { 420 if (logger.isDebugEnabled()) 421 logger.debug("Cancelling task " + task); 422 task.notifyCompletion(null); 423 return; 424 } 425 426 backend.addPendingTask(task); 428 if (logger.isDebugEnabled()) 429 logger.debug("Adding task " + task + " to pending request queue"); 430 431 synchronized (this) 432 { 433 synchronized (queue) 435 { 436 queue.addLast(new BackendTaskQueueEntry(task, queue, 437 isACommitOrRollback)); 438 } 439 440 445 this.notifyAll(); 446 } 447 } 448 449 454 public final void checkForPriorityInversion() 455 { 456 DatabaseSchema schema = backend.getDatabaseSchema(); 457 458 synchronized (conflictingRequestsQueue) 460 { 461 for (Iterator iter = conflictingRequestsQueue.iterator(); iter.hasNext();) 462 { 463 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 464 465 if (entry.processingThread != null) 468 continue; 469 470 AbstractTask task = entry.getTask(); 471 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 472 { 473 if (task.getSuccess() + task.getFailed() > 0) 474 { if (logger.isDebugEnabled()) 477 logger.debug("Priority inversion for already started request " 478 + task.getRequest()); 479 moveToNonConflictingQueue(iter, entry); 480 continue; 481 } 482 } 483 484 AbstractRequest request = task.getRequest(); 485 SortedSet lockedTables = request.getWriteLockedDatabaseTables(); 486 if (lockedTables != null) 487 { 488 boolean queryIsConflicting = false; 489 for (Iterator iterator = lockedTables.iterator(); iterator.hasNext() 490 && !queryIsConflicting;) 491 { 492 String tableName = (String ) iterator.next(); 493 DatabaseTable table = schema.getTable(tableName, false); 494 if (table == null) 495 { logger 497 .warn("Unable to find table " 498 + tableName 499 + " in database schema, when checking priority inversion for query " 500 + request.toStringShortForm(requestManager 501 .getVirtualDatabase().getSqlShortFormLength())); 502 } 503 else 504 { 505 511 TransactionLogicalLock lock = table.getLock(); 512 if (!lock.isLocked()) 513 logger.warn("Unexpected free lock on table " + table); 514 else 515 { queryIsConflicting = lock.getLocker() != task 517 .getTransactionId(); 518 } 519 } 520 } 521 if (!queryIsConflicting) 522 { if (logger.isDebugEnabled()) 525 logger.debug("Priority inversion for request " 526 + task.getRequest()); 527 moveToNonConflictingQueue(iter, entry); 528 } 529 } 530 else 531 { logger.warn("Non-locking task " + task 534 + " was posted in conflicting queue"); 535 if (logger.isDebugEnabled()) 536 logger.debug("Priority inversion for request " + task.getRequest()); 537 moveToNonConflictingQueue(iter, entry); 538 } 539 } 540 } 541 542 synchronized (storedProcedureQueue) 544 { 545 for (Iterator iter = storedProcedureQueue.iterator(); iter.hasNext();) 546 { 547 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 548 549 TransactionLogicalLock globalLock = schema.getLock(); 550 AbstractTask task = entry.getTask(); 551 AbstractRequest request = task.getRequest(); 552 if (globalLock.isLocked()) 553 { if (task.getTransactionId() == globalLock.getLocker()) 555 { 556 if (!schema.allTablesAreUnlockedOrLockedByTransaction(request)) 559 return; 560 561 566 moveToNonConflictingQueue(iter, entry); 567 if (task.isAutoCommit()) 570 return; 571 continue; 572 } 573 else 574 { 577 boolean currentStoredProcedureInQueue = false; 578 for (Iterator iter2 = storedProcedureQueue.iterator(); iter2 579 .hasNext();) 580 { 581 BackendTaskQueueEntry entry2 = (BackendTaskQueueEntry) iter2 582 .next(); 583 AbstractTask task2 = entry2.getTask(); 584 if ((task2 != null) 585 && (task2.getTransactionId() == globalLock.getLocker())) 586 currentStoredProcedureInQueue = true; 587 } 588 589 if (!currentStoredProcedureInQueue) 592 return; 593 } 594 } 595 596 TransactionMetaData tm = getTransactionMetaData(request); 598 599 if ((request instanceof SelectRequest) 600 || (request instanceof AbstractWriteRequest)) 601 { 602 SortedSet writeLockedTables = request.getWriteLockedDatabaseTables(); 603 604 if (writeLockedTables == null || writeLockedTables.isEmpty()) 605 { moveToNonConflictingQueue(iter, entry); 607 continue; 608 } 609 610 moveMultipleWriteLocksQuery(schema, iter, entry, task, request, tm); 611 } 612 else 613 { 614 if (request instanceof StoredProcedure) 615 { 616 StoredProcedure sp = (StoredProcedure) request; 617 SemanticBehavior semantic = sp.getSemantic(); 618 if (semantic != null) 619 { 620 if (semantic.canExecuteOutOfOrder() || semantic.isReadOnly() 623 || (request.getWriteLockedDatabaseTables() == null)) 624 moveToNonConflictingQueue(iter, entry); 625 else 626 moveMultipleWriteLocksQuery(schema, iter, entry, task, request, 627 tm); 628 continue; 629 } 630 } 631 632 635 globalLock.acquire(request); 636 if (tm != null) 637 { 638 List acquiredLocks = tm.getAcquiredLocks(backend); 639 if ((acquiredLocks == null) || !acquiredLocks.contains(globalLock)) 640 tm.addAcquiredLock(backend, globalLock); 641 } 642 else 643 { 644 ArrayList globalLockList = new ArrayList (); 645 globalLockList.add(globalLock); 646 task.setLocks(backend, globalLockList); 647 } 648 649 if (!schema.allTablesAreUnlockedOrLockedByTransaction(request)) 652 return; 653 654 moveToNonConflictingQueue(iter, entry); 657 continue; 658 } 659 } 660 } 661 } 662 663 private void moveMultipleWriteLocksQuery(DatabaseSchema schema, 664 Iterator iter, BackendTaskQueueEntry entry, AbstractTask task, 665 AbstractRequest request, TransactionMetaData tm) 666 { 667 672 boolean allLocksAcquired = true; 673 for (Iterator lockIter = request.getWriteLockedDatabaseTables().iterator(); lockIter 674 .hasNext();) 675 { 676 String tableName = (String ) lockIter.next(); 677 DatabaseTable table = schema.getTable(tableName, false); 678 if (table == null) 679 { logger.warn("Unable to find table " 681 + tableName 682 + " in database schema, scheduling query " 683 + request.toStringShortForm(requestManager.getVirtualDatabase() 684 .getSqlShortFormLength()) + " in conflicting queue."); 685 allLocksAcquired = false; 686 } 687 else 688 { 692 TransactionLogicalLock tableLock = table.getLock(); 693 if (!tableLock.acquire(request)) 694 allLocksAcquired = false; 695 701 if (tm != null) 702 { 703 List acquiredLocks = tm.getAcquiredLocks(backend); 704 if ((acquiredLocks == null) || !acquiredLocks.contains(tableLock)) 705 tm.addAcquiredLock(backend, tableLock); 706 } 707 else 708 { 709 List tableLockList = task.getLocks(backend); 710 if (tableLockList == null) 711 tableLockList = new ArrayList (); 712 if (!tableLockList.contains(tableLock)) 717 { 718 tableLockList.add(tableLock); 719 task.setLocks(backend, tableLockList); 720 } 721 } 722 } 723 } 724 if (allLocksAcquired) 726 moveToNonConflictingQueue(iter, entry); 727 else 728 moveToConflictingQueue(iter, entry); 729 } 730 731 private void moveToConflictingQueue(Iterator iter, BackendTaskQueueEntry entry) 732 { 733 iter.remove(); 734 if (logger.isDebugEnabled()) 735 logger.debug("Moving " + entry.getTask() + " to conflicting queue"); 736 synchronized (conflictingRequestsQueue) 737 { 738 entry.setQueue(conflictingRequestsQueue); 739 conflictingRequestsQueue.addLast(entry); 740 } 741 } 742 743 private void moveToNonConflictingQueue(Iterator iter, 744 BackendTaskQueueEntry entry) 745 { 746 iter.remove(); 747 if (logger.isDebugEnabled()) 748 logger.debug("Moving " + entry.getTask() + " to non conflicting queue"); 749 synchronized (nonConflictingRequestsQueue) 750 { 751 entry.setQueue(nonConflictingRequestsQueue); 752 nonConflictingRequestsQueue.addLast(entry); 753 } 754 } 755 756 private static final int UNASSIGNED_QUEUE = -1; 757 private static final int CONFLICTING_QUEUE = 0; 758 private static final int NON_CONFLICTING_QUEUE = 1; 759 private static final int STORED_PROCEDURE_QUEUE = 2; 760 private final Object atomicPostSyncObject = new Object (); 761 762 767 private ArrayList lockList = null; 768 769 778 private boolean fetchNextQueryFromBackendTotalOrderQueue() 779 { 780 DatabaseSchema schema = backend.getDatabaseSchema(); 781 TransactionMetaData tm = null; 782 int queueToUse = UNASSIGNED_QUEUE; 783 784 AbstractTask task; 785 AbstractRequest request; 786 787 synchronized (totalOrderQueue) 789 { 790 if (totalOrderQueue.isEmpty()) 791 return false; 792 task = (AbstractTask) totalOrderQueue.removeFirst(); 793 794 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 795 { 801 802 808 synchronized (atomicPostSyncObject) 809 { 810 while (mustWaitForLateTask(task)) 811 { 812 totalOrderQueue.addFirst(task); return false; 816 } 817 } 818 } 819 820 request = task.getRequest(); 822 if (request == null || task instanceof BeginTask) 823 { 824 addTaskInNonConflictingRequestsQueue(task, !task.isAutoCommit()); 825 return true; 826 } 827 else 828 { try 830 { 831 if (!request.isParsed()) 832 { 833 SemanticManager semanticManager = requestManager 834 .getVirtualDatabase().getSemanticManager(); 835 request.setSemanticManager(semanticManager); 837 SemanticBehavior semantic = semanticManager 838 .getRequestSemantic(request); 839 if (semantic != null) 840 request.setSemantic(semantic); 841 else 842 { request.parse(backend.getDatabaseSchema(), 844 ParsingGranularities.TABLE, false); 845 } 846 } 847 } 848 catch (SQLException e) 849 { 850 logger.warn("Parsing of request " + request 851 + " failed in recovery process", e); 852 } 853 } 854 if (backend.isReplaying()) 855 { 856 861 if (request instanceof StoredProcedure) 862 { 863 StoredProcedure sp = (StoredProcedure) request; 864 SemanticBehavior semantic = sp.getSemantic(); 865 if (semantic != null && semantic.isReadOnly()) 866 { 867 task.notifySuccess(null); 868 synchronized (this) 869 { notifyAll(); 871 } 872 return true; 873 } 874 } 875 } 876 if (!request.isAutoCommit()) 877 { try 879 { 880 tm = requestManager.getTransactionMetaData(new Long (request 881 .getTransactionId())); 882 } 883 catch (SQLException e) 884 { 885 if (logger.isDebugEnabled()) 887 logger.debug("No transaction medatada found for transaction " 888 + request.getTransactionId()); 889 } 890 } 891 892 if (schema == null) 893 { 894 try 895 { 896 task.notifyFailure((BackendWorkerThread) Thread.currentThread(), 0, 897 new SQLException ( 898 "No schema available to perform request locking on backend " 899 + backend.getName())); 900 } 901 catch (SQLException ignore) 902 { 903 } 905 return true; 906 } 907 908 synchronized (atomicPostSyncObject) 909 { 910 lockList = null; 911 912 boolean requestIsAStoredProcedure = request instanceof StoredProcedure; 913 if (requestIsAStoredProcedure) 914 storedProcedureInQueue++; 915 916 SortedSet writeLockedTables = request.getWriteLockedDatabaseTables(); 917 if ((writeLockedTables != null) && (writeLockedTables.size() > 1)) 918 writesWithMultipleLocks++; 919 920 TransactionLogicalLock globalLock = schema.getLock(); 922 if (globalLock.isLocked()) 923 { 924 if (request.isAutoCommit()) 925 { 926 queueToUse = STORED_PROCEDURE_QUEUE; 927 } 928 else 929 { 930 934 if (globalLock.getLocker() == request.getTransactionId()) 935 queueToUse = NON_CONFLICTING_QUEUE; 936 else 937 { 938 943 if ((tm == null) || (tm.getAcquiredLocks(backend) == null)) 944 { 945 queueToUse = STORED_PROCEDURE_QUEUE; 948 } 949 } 950 } 951 } 952 953 if (queueToUse == UNASSIGNED_QUEUE) 954 { if (request instanceof AbstractWriteRequest 957 && !((AbstractWriteRequest) request).requiresGlobalLock()) 958 { 959 queueToUse = getQueueAndWriteLockTables(request, schema, tm); 960 } 961 else if (request instanceof SelectRequest) 962 { 963 975 queueToUse = getQueueAndWriteLockTables(request, schema, tm); 976 } 977 else 978 { 979 if (requestIsAStoredProcedure) 980 { 981 StoredProcedure sp = (StoredProcedure) request; 982 SemanticBehavior semantic = sp.getSemantic(); 983 if (semantic != null) 984 { 985 if (semantic.isReadOnly() 988 || (request.getWriteLockedDatabaseTables() == null)) 989 queueToUse = NON_CONFLICTING_QUEUE; 990 else 991 { 992 queueToUse = getQueueAndWriteLockTables(request, schema, tm); 993 if (semantic.canExecuteOutOfOrder()) 994 queueToUse = NON_CONFLICTING_QUEUE; 995 } 996 } 997 } 998 999 if (queueToUse == UNASSIGNED_QUEUE) 1000 { 1001 1006 if (!globalLock.isLocked()) 1007 { globalLock.acquire(request); 1010 if (tm != null) 1011 tm.addAcquiredLock(backend, globalLock); 1012 else 1013 { 1014 if (lockList == null) 1015 lockList = new ArrayList (); 1016 lockList.add(globalLock); 1017 } 1018 if (schema.allTablesAreUnlockedOrLockedByTransaction(request)) 1019 queueToUse = NON_CONFLICTING_QUEUE; 1021 else 1022 queueToUse = STORED_PROCEDURE_QUEUE; 1024 } 1025 else 1026 { 1031 if (schema.allTablesAreUnlockedOrLockedByTransaction(request)) 1032 { 1033 queueToUse = NON_CONFLICTING_QUEUE; 1034 List locks = schema.lockAllTables(request); 1035 if (tm != null) 1036 tm.addAcquiredLocks(backend, locks); 1037 else 1038 { 1039 if (lockList == null) 1040 lockList = new ArrayList (); 1041 lockList.add(locks); 1042 } 1043 } 1044 else 1045 { 1049 queueToUse = STORED_PROCEDURE_QUEUE; 1050 } 1051 } 1052 } 1053 } 1054 } 1055 1056 if (queueToUse == NON_CONFLICTING_QUEUE) 1057 { 1058 if (logger.isDebugEnabled()) 1059 logger.debug("Scheduling request " + request 1060 + " in non conflicting queue"); 1061 addTaskInNonConflictingRequestsQueue(task, false); 1062 } 1063 else if (queueToUse == CONFLICTING_QUEUE) 1064 { 1065 if (logger.isDebugEnabled()) 1066 logger.debug("Scheduling request " + request 1067 + " in conflicting queue"); 1068 addTaskInConflictingRequestsQueue(task); 1069 } 1070 else if (queueToUse == STORED_PROCEDURE_QUEUE) 1071 { 1072 if (logger.isDebugEnabled()) 1073 logger.debug("Scheduling request " + request 1074 + " in stored procedure queue"); 1075 addTaskInStoredProcedureQueue(task); 1076 } 1077 1078 task.setLocks(backend, lockList); 1079 } } 1082 return true; 1083 } 1084 1085 1097 private int getQueueAndWriteLockTables(AbstractRequest request, 1098 DatabaseSchema schema, TransactionMetaData tm) 1099 { 1100 SortedSet writeLockedTables = request.getWriteLockedDatabaseTables(); 1101 1102 if (writeLockedTables == null || writeLockedTables.isEmpty()) 1103 { return NON_CONFLICTING_QUEUE; 1105 } 1106 else if (request.isCreate() && writeLockedTables.size() == 1) 1107 { return NON_CONFLICTING_QUEUE; 1112 } 1113 1114 1119 int queueToUse = NON_CONFLICTING_QUEUE; 1120 for (Iterator iter = writeLockedTables.iterator(); iter.hasNext();) 1121 { 1122 String tableName = (String ) iter.next(); 1123 DatabaseTable table = schema.getTable(tableName, false); 1124 if (table == null) 1125 { logger.warn("Unable to find table " 1127 + tableName 1128 + " in database schema, scheduling query " 1129 + request.toStringShortForm(requestManager.getVirtualDatabase() 1130 .getSqlShortFormLength()) + " in conflicting queue."); 1131 queueToUse = CONFLICTING_QUEUE; 1132 } 1133 else 1134 { 1138 if (!table.getLock().acquire(request)) 1139 { 1140 queueToUse = CONFLICTING_QUEUE; 1141 if (logger.isDebugEnabled()) 1142 logger.debug("Request " + request + " waits for lock on table " 1143 + table); 1144 } 1145 if (tm != null) 1146 tm.addAcquiredLock(backend, table.getLock()); 1147 else 1148 { 1149 if (lockList == null) 1150 lockList = new ArrayList (); 1151 lockList.add(table.getLock()); 1152 } 1153 } 1154 } 1155 return queueToUse; 1156 } 1157 1158 1165 private void releaseLocksForAutoCommitRequest(List locks, long transactionId) 1166 { 1167 if (locks == null) 1168 return; for (Iterator iter = locks.iterator(); iter.hasNext();) 1170 { 1171 TransactionLogicalLock lock = (TransactionLogicalLock) iter.next(); 1172 if (lock == null) 1173 logger.warn("Unexpected null lock for transaction " + transactionId 1174 + " when releasing " + locks.toArray()); 1175 else 1176 lock.release(transactionId); 1177 } 1178 } 1179 1180 1185 private void releaseLocksForTransaction(long transactionId) 1186 { 1187 try 1188 { 1189 TransactionMetaData tm = requestManager.getTransactionMetaData(new Long ( 1190 transactionId)); 1191 releaseLocksForAutoCommitRequest(tm.removeBackendLocks(backend), 1192 transactionId); 1193 } 1194 catch (SQLException e) 1195 { 1196 1201 if (!backend.isReplaying()) 1202 if (logger.isWarnEnabled()) 1203 logger.warn("No transaction medatada found for transaction " 1204 + transactionId + " releasing locks manually"); 1205 if (backend.getDatabaseSchema() != null) 1206 backend.getDatabaseSchema().releaseLocksOnAllTables(transactionId); 1207 else 1208 { 1209 1212 if (logger.isWarnEnabled()) 1213 logger 1214 .warn("Cannot release locks, as no schema is available on this backend. " 1215 + "This backend is problably not available anymore."); 1216 } 1217 } 1218 } 1219 1220 private TransactionMetaData getTransactionMetaData(AbstractRequest request) 1221 { 1222 TransactionMetaData tm = null; 1223 if ((request != null) && !request.isAutoCommit()) 1224 { try 1226 { 1227 tm = requestManager.getTransactionMetaData(new Long (request 1228 .getTransactionId())); 1229 } 1230 catch (SQLException e) 1231 { 1232 if (logger.isDebugEnabled()) 1234 logger.debug("No transaction medatada found for transaction " 1235 + request.getTransactionId()); 1236 } 1237 } 1238 return tm; 1239 } 1240 1241 1247 public final void completedEntryExecution(BackendTaskQueueEntry entry) 1248 { 1249 completedEntryExecution(entry, null); 1250 } 1251 1252 1258 public void completeStoredProcedureExecution(AbstractTask task) 1259 { 1260 AbstractRequest request = task.getRequest(); 1261 long transactionId = request.getTransactionId(); 1262 synchronized (atomicPostSyncObject) 1263 { 1264 if (request.isAutoCommit()) 1265 { 1266 releaseLocksForAutoCommitRequest(task.getLocks(backend), transactionId); 1267 checkForPriorityInversion(); 1268 SortedSet writeLockedTables = request.getWriteLockedDatabaseTables(); 1269 if ((writeLockedTables != null) && (writeLockedTables.size() > 1)) 1270 writesWithMultipleLocks--; 1271 } 1272 storedProcedureInQueue--; 1273 } 1274 } 1275 1276 1282 public void completeWriteRequestExecution(AbstractTask task) 1283 { 1284 AbstractRequest request = task.getRequest(); 1285 SortedSet writeLockedTables = request.getWriteLockedDatabaseTables(); 1286 if ((writeLockedTables != null) && (writeLockedTables.size() > 1)) 1287 synchronized (atomicPostSyncObject) 1288 { 1289 writesWithMultipleLocks--; 1290 } 1291 1292 long transactionId = request.getTransactionId(); 1293 if (request.isAutoCommit()) 1294 { 1295 synchronized (atomicPostSyncObject) 1296 { 1297 releaseLocksForAutoCommitRequest(task.getLocks(backend), transactionId); 1298 if (writesWithMultipleLocks > 0 1301 || waitForCompletionPolicy.isEnforceTableLocking()) 1302 checkForPriorityInversion(); 1303 else if (storedProcedureInQueue > 0) 1304 checkForPriorityInversion(); 1305 } 1306 } 1307 } 1308 1309 1315 public void releaseLocksAndCheckForPriorityInversion(TransactionMetaData tm) 1316 { 1317 synchronized (atomicPostSyncObject) 1318 { 1319 releaseLocksForTransaction(tm.getTransactionId()); 1320 checkForPriorityInversion(); 1321 } 1322 } 1323 1324 1333 private void completedEntryExecution(BackendTaskQueueEntry entry, 1334 Iterator iter) 1335 { 1336 if (entry == null) 1337 return; 1338 1339 AbstractTask task = entry.getTask(); 1341 if (!backend.removePendingTask(task)) 1342 logger.warn("Unable to remove task " + task 1343 + " from pending request queue"); 1344 1345 synchronized (this) 1346 { 1347 LinkedList queue = entry.getQueue(); 1349 synchronized (queue) 1350 { 1351 if (iter != null) 1352 iter.remove(); 1353 else 1354 { 1355 if (!queue.remove(entry)) 1356 logger.error("Failed to remove task " + task + " from " + queue); 1357 } 1358 } 1359 1360 this.notifyAll(); 1363 } 1364 } 1365 1366 1372 public final BackendTaskQueueEntry getFirstConflictingRequestQueueOrStoredProcedureQueueEntry() 1373 { 1374 synchronized (conflictingRequestsQueue) 1375 { 1376 if (conflictingRequestsQueue.isEmpty()) 1377 { 1378 synchronized (storedProcedureQueue) 1379 { 1380 if (storedProcedureQueue.isEmpty()) 1381 return null; 1382 return (BackendTaskQueueEntry) storedProcedureQueue.getFirst(); 1383 } 1384 } 1385 return (BackendTaskQueueEntry) conflictingRequestsQueue.getFirst(); 1386 } 1387 } 1388 1389 1396 public List getStoredProcedureQueue() 1397 { 1398 return storedProcedureQueue; 1399 } 1400 1401 1412 public final BackendTaskQueueEntry getNextEntryToExecute( 1413 BackendWorkerThread thread) 1414 { 1415 BackendTaskQueueEntry entry = null; 1416 1417 1425 1426 while (true) 1427 { 1428 Object firstNonConflictingTask = null; 1429 Object lastNonConflictingTask = null; 1430 synchronized (nonConflictingRequestsQueue) 1432 { 1433 if (!nonConflictingRequestsQueue.isEmpty()) 1434 { 1435 firstNonConflictingTask = nonConflictingRequestsQueue.getFirst(); 1436 lastNonConflictingTask = nonConflictingRequestsQueue.getLast(); 1437 for (Iterator iter = nonConflictingRequestsQueue.iterator(); iter 1438 .hasNext();) 1439 { 1440 entry = (BackendTaskQueueEntry) iter.next(); 1441 if (entry.getProcessingThread() == null) 1442 { entry.setProcessingThread(thread); 1444 return entry; 1445 } 1446 } 1447 } 1448 } 1449 1450 Object firstConflictingTask = null; 1453 Object lastConflictingTask = null; 1454 synchronized (conflictingRequestsQueue) 1455 { 1456 if (!conflictingRequestsQueue.isEmpty()) 1457 { 1458 firstConflictingTask = conflictingRequestsQueue.getFirst(); 1459 lastConflictingTask = conflictingRequestsQueue.getLast(); 1460 entry = (BackendTaskQueueEntry) conflictingRequestsQueue.getFirst(); 1463 if (entry.getProcessingThread() == null) 1464 { AbstractRequest request = entry.getTask().getRequest(); 1466 SortedSet lockedTables = request.getWriteLockedDatabaseTables(); 1467 if ((lockedTables != null) && (lockedTables.size() > 0)) 1468 { 1469 1478 boolean conflictingQueryDetected = false; 1479 synchronized (nonConflictingRequestsQueue) 1480 { 1481 if (!nonConflictingRequestsQueue.isEmpty() 1482 || waitForCompletionPolicy.isEnforceTableLocking()) 1483 { int locksNotOwnedByMe = 0; 1485 long transactionId = entry.getTask().getTransactionId(); 1486 DatabaseSchema schema = backend.getDatabaseSchema(); 1487 for (Iterator iterator = lockedTables.iterator(); iterator 1488 .hasNext() 1489 && !conflictingQueryDetected;) 1490 { 1491 String tableName = (String ) iterator.next(); 1492 DatabaseTable table = schema.getTable(tableName, false); 1493 if (table == null) 1494 { logger 1496 .warn("Unable to find table " 1497 + tableName 1498 + " in database schema, when getting next entry to execute : " 1499 + request 1500 .toStringShortForm(requestManager 1501 .getVirtualDatabase() 1502 .getSqlShortFormLength())); 1503 1504 conflictingQueryDetected = true; 1507 } 1508 else 1509 { 1510 TransactionLogicalLock lock = table.getLock(); 1511 if (lock.isLocked()) 1512 { 1513 if (lock.getLocker() != transactionId) 1514 locksNotOwnedByMe++; 1515 1516 1520 for (Iterator iter = nonConflictingRequestsQueue 1521 .iterator(); iter.hasNext();) 1522 { 1523 BackendTaskQueueEntry nonConflictingEntry = (BackendTaskQueueEntry) iter 1524 .next(); 1525 long nonConflictingRequestTransactionId = nonConflictingEntry 1526 .getTask().getTransactionId(); 1527 if ((lock.getLocker() == nonConflictingRequestTransactionId) 1528 || lock 1529 .isWaiting(nonConflictingRequestTransactionId)) 1530 { 1531 conflictingQueryDetected = true; 1532 break; 1533 } 1534 } 1535 } 1536 } 1537 } 1538 1539 1543 if (waitForCompletionPolicy.isEnforceTableLocking()) 1544 conflictingQueryDetected = locksNotOwnedByMe > 0; 1545 1546 1552 conflictingQueryDetected = conflictingQueryDetected 1553 || ((locksNotOwnedByMe > 1) && (locksNotOwnedByMe == lockedTables 1554 .size())); 1555 } 1556 } 1557 1558 if (!conflictingQueryDetected) 1561 { 1562 entry.setProcessingThread(thread); 1563 return entry; 1564 } 1565 } 1566 else 1567 { 1568 if (logger.isWarnEnabled()) 1569 logger.warn("Detected non-locking task " + entry.getTask() 1570 + " in conflicting queue"); 1571 1572 1578 synchronized (nonConflictingRequestsQueue) 1579 { 1580 if (nonConflictingRequestsQueue.isEmpty()) 1581 { 1582 entry.setProcessingThread(thread); 1583 return entry; 1584 } 1585 } 1586 } 1587 } 1588 } 1589 } 1590 1591 synchronized (this) 1592 { 1593 if (fetchNextQueryFromBackendTotalOrderQueue()) 1596 continue; 1597 1598 synchronized (nonConflictingRequestsQueue) 1602 { 1603 if (!nonConflictingRequestsQueue.isEmpty()) 1604 { 1605 if (firstNonConflictingTask != nonConflictingRequestsQueue 1606 .getFirst()) 1607 continue; 1608 if (lastNonConflictingTask != nonConflictingRequestsQueue.getLast()) 1609 continue; 1610 } 1611 else if (firstNonConflictingTask != null) 1612 continue; } 1614 synchronized (conflictingRequestsQueue) 1615 { 1616 if (!conflictingRequestsQueue.isEmpty()) 1617 { 1618 if (firstConflictingTask != conflictingRequestsQueue.getFirst()) 1619 continue; 1620 if (lastConflictingTask != conflictingRequestsQueue.getLast()) 1621 continue; 1622 } 1623 else if (firstConflictingTask != null) 1624 continue; } 1626 1627 try 1629 { 1630 this.wait(); 1631 } 1632 catch (InterruptedException ignore) 1633 { 1634 } 1635 } 1636 1637 } 1638 } 1639 1640 1651 public BackendTaskQueueEntry getNextCommitRollbackToExecute( 1652 BackendWorkerThread thread) 1653 { 1654 boolean found = false; 1655 BackendTaskQueueEntry entry = null; 1656 while (!found) 1657 { 1658 Object firstNonConflictingTask = null; 1659 Object lastNonConflictingTask = null; 1660 synchronized (nonConflictingRequestsQueue) 1662 { 1663 if (!nonConflictingRequestsQueue.isEmpty()) 1664 { 1665 firstNonConflictingTask = nonConflictingRequestsQueue.getFirst(); 1666 lastNonConflictingTask = nonConflictingRequestsQueue.getLast(); 1667 for (Iterator iter = nonConflictingRequestsQueue.iterator(); iter 1668 .hasNext();) 1669 { 1670 entry = (BackendTaskQueueEntry) iter.next(); 1671 if ((entry.isACommitOrRollback() || (entry.getTask() instanceof KillThreadTask)) 1672 && (entry.getProcessingThread() == null)) 1673 { entry.setProcessingThread(thread); 1675 return entry; 1676 } 1677 } 1678 } 1679 } 1680 1681 synchronized (this) 1682 { 1683 if (fetchNextQueryFromBackendTotalOrderQueue()) 1686 continue; 1687 1688 synchronized (nonConflictingRequestsQueue) 1691 { 1692 if (!nonConflictingRequestsQueue.isEmpty()) 1693 { 1694 if (firstNonConflictingTask != nonConflictingRequestsQueue 1695 .getFirst()) 1696 continue; 1697 if (lastNonConflictingTask != nonConflictingRequestsQueue.getLast()) 1698 continue; 1699 } 1700 } 1701 1702 try 1703 { 1704 this.wait(); 1705 } 1706 catch (InterruptedException ignore) 1707 { 1708 } 1709 } 1710 1711 } 1712 return null; 1714 } 1715 1716 1725 private boolean mustWaitForLateTask(AbstractTask currentTask) 1726 { 1727 if (currentTask.isPersistentConnection()) 1728 { 1729 long currentCid = currentTask.getPersistentConnectionId(); 1730 if (hasTaskForPersistentConnectionInQueue(nonConflictingRequestsQueue, 1733 currentCid) 1734 || hasTaskForPersistentConnectionInQueue(conflictingRequestsQueue, 1735 currentCid) 1736 || hasTaskForPersistentConnectionInQueue(storedProcedureQueue, 1737 currentCid)) 1738 return true; 1740 } 1741 1742 if (!currentTask.isAutoCommit()) 1743 { 1744 long currentTid = currentTask.getTransactionId(); 1745 if (hasTaskForTransactionInQueue(nonConflictingRequestsQueue, currentTid) 1748 || hasTaskForTransactionInQueue(conflictingRequestsQueue, currentTid) 1749 || hasTaskForTransactionInQueue(storedProcedureQueue, currentTid)) 1750 return true; 1752 } 1753 1754 return hasDDLTaskInQueue(nonConflictingRequestsQueue) 1755 || hasDDLTaskInQueue(conflictingRequestsQueue) 1756 || hasDDLTaskInQueue(storedProcedureQueue); 1757 } 1758 1759 private boolean hasDDLTaskInQueue(List queue) 1760 { 1761 boolean retry; 1762 do 1763 { 1764 retry = false; 1765 try 1766 { 1767 for (Iterator iter = queue.iterator(); iter.hasNext();) 1768 { 1769 BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter 1770 .next(); 1771 AbstractTask otherTask = otherEntry.getTask(); 1772 AbstractRequest request = otherTask.getRequest(); 1773 1781 if ((request != null) 1782 && (request.isCreate() || request.isAlter() || request.isDrop())) 1783 { 1784 return true; 1785 } 1786 } 1787 } 1788 catch (ConcurrentModificationException e) 1789 { 1790 retry = true; 1791 } 1792 } 1793 while (retry); 1794 return false; 1795 } 1796 1797 private boolean hasTaskForPersistentConnectionInQueue(List queue, long cid) 1798 { 1799 boolean retry; 1800 do 1801 { 1802 retry = false; 1803 try 1804 { 1805 for (Iterator iter = queue.iterator(); iter.hasNext();) 1806 { 1807 BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter 1808 .next(); 1809 1810 AbstractTask otherTask = otherEntry.getTask(); 1811 1812 if (otherTask.isPersistentConnection() 1814 && (otherTask.getPersistentConnectionId() == cid)) 1815 { 1816 return true; 1817 } 1818 } 1819 } 1820 catch (ConcurrentModificationException e) 1821 { 1822 retry = true; 1823 } 1824 } 1825 while (retry); 1826 return false; 1827 } 1828 1829 private boolean hasTaskForTransactionInQueue(List queue, long tid) 1830 { 1831 boolean retry; 1832 do 1833 { 1834 retry = false; 1835 try 1836 { 1837 for (Iterator iter = queue.iterator(); iter.hasNext();) 1838 { 1839 BackendTaskQueueEntry otherEntry = (BackendTaskQueueEntry) iter 1840 .next(); 1841 1842 AbstractTask otherTask = otherEntry.getTask(); 1843 1844 if (!otherTask.isAutoCommit() 1846 && (otherTask.getTransactionId() == tid)) 1847 { 1848 return true; 1849 } 1850 } 1851 } 1852 catch (ConcurrentModificationException e) 1853 { 1854 retry = true; 1855 } 1856 } 1857 while (retry); 1858 return false; 1859 } 1860 1861 1868 public boolean allowTasksToBePosted() 1869 { 1870 synchronized (ALLOW_TASKS_SYNC) 1871 { 1872 return allowTasksToBePosted; 1873 } 1874 } 1875 1876 1883 public void setAllowTasksToBePosted(boolean allowTasksToBePosted) 1884 { 1885 synchronized (ALLOW_TASKS_SYNC) 1886 { 1887 this.allowTasksToBePosted = allowTasksToBePosted; 1888 } 1889 } 1890 1891 1897 public void startDeadlockDetectionThread(VirtualDatabase vdb) 1898 { 1899 if (deadlockDetectionThread != null) 1900 throw new RuntimeException ( 1901 "Trying to start multiple times a deadlock detection thread on the same backend " 1902 + backend.getName()); 1903 1904 deadlockDetectionThread = new DeadlockDetectionThread(backend, vdb, 1905 atomicPostSyncObject, waitForCompletionPolicy.getDeadlockTimeoutInMs()); 1906 deadlockDetectionThread.start(); 1907 } 1908 1909 1913 public void terminateDeadlockDetectionThread() 1914 { 1915 if (deadlockDetectionThread == null) 1916 throw new RuntimeException ( 1917 "No deadlock detection thread to stop on backend " 1918 + backend.getName()); 1919 1920 deadlockDetectionThread.kill(); 1921 deadlockDetectionThread = null; 1922 } 1923 1924 1933 protected synchronized String dump() 1934 { 1935 StringBuffer buff = new StringBuffer (); 1936 buff.append("Non Conflicting Requests Queue (" 1937 + nonConflictingRequestsQueue.size() + ")\n"); 1938 for (Iterator iter = nonConflictingRequestsQueue.iterator(); iter.hasNext();) 1939 { 1940 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 1941 buff.append("\t" + entry + "\n"); 1942 } 1943 buff.append("Conflicting Requests Queue (" 1944 + conflictingRequestsQueue.size() + ")\n"); 1945 for (Iterator iter = conflictingRequestsQueue.iterator(); iter.hasNext();) 1946 { 1947 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 1948 buff.append("\t" + entry + "\n"); 1949 } 1950 buff.append("Stored Procedures Queue (" + storedProcedureQueue.size() 1951 + ")\n"); 1952 for (Iterator iter = storedProcedureQueue.iterator(); iter.hasNext();) 1953 { 1954 BackendTaskQueueEntry entry = (BackendTaskQueueEntry) iter.next(); 1955 buff.append("\t" + entry + "\n"); 1956 } 1957 return buff.toString(); 1958 } 1959} 1960 | Popular Tags |