1 24 25 package org.continuent.sequoia.controller.scheduler; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.HashMap ; 30 import java.util.HashSet ; 31 import java.util.Hashtable ; 32 import java.util.LinkedList ; 33 import java.util.List ; 34 import java.util.Set ; 35 36 import org.continuent.sequoia.common.exceptions.RollbackException; 37 import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException; 38 import org.continuent.sequoia.common.i18n.Translate; 39 import org.continuent.sequoia.common.log.Trace; 40 import org.continuent.sequoia.common.xml.DatabasesXmlTags; 41 import org.continuent.sequoia.common.xml.XmlComponent; 42 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData; 43 import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager; 44 import org.continuent.sequoia.controller.requests.AbstractRequest; 45 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 46 import org.continuent.sequoia.controller.requests.SelectRequest; 47 import org.continuent.sequoia.controller.requests.StoredProcedure; 48 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema; 49 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 50 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit; 51 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection; 52 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint; 53 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback; 54 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint; 55 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint; 56 57 70 public abstract class AbstractScheduler implements XmlComponent 71 { 72 73 85 protected int raidbLevel; 86 protected int parsingGranularity; 87 88 private long controllerId = 0; 90 private long tid; 91 private int sid; 92 private int suspendedTransactions = 0; 93 private int pendingTransactions = 0; 94 private final Object transactionSync = new Object (); 95 private final Object endOfCurrentTransactions = new Object (); 96 private List activeTransactions = new ArrayList (); 97 102 private HashMap activeReadRequests = new HashMap (); 103 107 private HashMap activeWriteRequests = new HashMap (); 108 109 114 protected Hashtable persistentConnections = new Hashtable (); 115 private final Object endOfCurrentPersistentConnections = new Object (); 116 private Object persistentConnectionsSync = new Object (); 117 private int suspendedNewPersistentConnections = 0; 118 private int suspendedOpenClosePersistentConnections = 0; 119 private int pendingOpenClosePersistentConnections = 0; 120 private Object suspendOpenClosePersistentConnectionSync = new Object (); 121 122 private int suspendedWrites = 0; 124 private int pendingWrites = 0; 125 private final Object writesSync = new Object (); 126 private final Object endOfCurrentWrites = new Object (); 127 128 private Set suspendedRequests = new HashSet (); 129 130 protected static Trace logger = Trace 131 .getLogger("org.continuent.sequoia.controller.scheduler"); 132 133 134 protected LinkedList totalOrderQueue; 135 136 private int numberRead = 0; 138 private int numberWrite = 0; 139 140 private VirtualDatabase vdb; 141 142 146 155 public AbstractScheduler(int raidbLevel, int parsingGranularity, 156 VirtualDatabase vdb) 157 { 158 this.raidbLevel = raidbLevel; 159 this.parsingGranularity = parsingGranularity; 160 this.tid = 0; 161 this.sid = 0; 162 this.pendingTransactions = 0; 163 this.pendingWrites = 0; 164 this.totalOrderQueue = vdb.getTotalOrderQueue(); 165 this.vdb = vdb; 166 } 167 168 177 public AbstractScheduler(int raidbLevel, int parsingGranularity) 178 { 179 this.raidbLevel = raidbLevel; 180 this.parsingGranularity = parsingGranularity; 181 this.tid = 0; 182 this.sid = 0; 183 this.pendingTransactions = 0; 184 this.pendingWrites = 0; 185 this.totalOrderQueue = null; 186 this.vdb = null; 187 } 188 189 193 198 public final HashMap getActiveReadRequests() 199 { 200 return activeReadRequests; 201 } 202 203 208 public final List getActiveTransactions() 209 { 210 return activeTransactions; 211 } 212 213 220 public final HashMap getActiveWriteRequests() 221 { 222 return activeWriteRequests; 223 } 224 225 230 public synchronized int incrementSavepointId() 231 { 232 sid++; 233 return sid; 234 } 235 236 242 public final void initializeTransactionId(long transactionId) 243 { 244 synchronized (transactionSync) 245 { 246 this.tid = transactionId; 247 } 248 } 249 250 256 public boolean isActiveRequest(long requestId) 257 { 258 Long lId = new Long (requestId); 259 synchronized (activeReadRequests) 260 { 261 if (activeReadRequests.containsKey(lId)) 262 return true; 263 } 264 synchronized (activeWriteRequests) 265 { 266 return activeWriteRequests.containsKey(lId); 267 } 268 } 269 270 275 public long getNextTransactionId() 276 { 277 synchronized (transactionSync) 278 { 279 return tid++; 280 } 281 } 282 283 288 public final int getParsingGranularity() 289 { 290 return parsingGranularity; 291 } 292 293 298 public final void setParsingGranularity(int parsingGranularity) 299 { 300 this.parsingGranularity = parsingGranularity; 301 } 302 303 309 public void setControllerId(long controllerId) 310 { 311 this.controllerId = controllerId; 312 } 313 314 319 public final int getPendingWrites() 320 { 321 return pendingWrites; 322 } 323 324 329 public final int getRAIDbLevel() 330 { 331 return raidbLevel; 332 } 333 334 339 public final void setRAIDbLevel(int raidbLevel) 340 { 341 this.raidbLevel = raidbLevel; 342 } 343 344 350 public void mergeDatabaseSchema(DatabaseSchema dbs) 351 { 352 logger.info(Translate.get("scheduler.doesnt.support.schemas")); 353 } 354 355 363 public void setDatabaseSchema(DatabaseSchema dbs) 364 { 365 if (logger.isInfoEnabled()) 366 logger.info(Translate.get("scheduler.doesnt.support.schemas")); 367 } 368 369 375 public void waitForRequestCompletion(long requestId) 376 { 377 Long lId = new Long (requestId); 378 synchronized (activeReadRequests) 379 { 380 while (activeReadRequests.containsKey(lId)) 381 { 382 try 383 { 384 activeReadRequests.wait(); 385 } 386 catch (InterruptedException ignore) 387 { 388 } 389 } 390 } 391 synchronized (activeWriteRequests) 392 { 393 while (activeWriteRequests.containsKey(lId)) 394 { 395 try 396 { 397 activeWriteRequests.wait(); 398 } 399 catch (InterruptedException ignore) 400 { 401 } 402 } 403 } 404 } 405 406 413 public void scheduleReadRequest(SelectRequest request) throws SQLException 414 { 415 Long id = new Long (request.getId()); 416 synchronized (activeReadRequests) 417 { 418 if (activeReadRequests.containsKey(id)) 419 throw new SQLException ("A query with id " + id 420 + " has already been scheduled"); 421 activeReadRequests.put(id, request); 422 } 423 424 if (request.isAutoCommit() && request.isMustBroadcast()) 426 { 427 long fakeTid = getNextTransactionId(); 428 fakeTid = fakeTid & DistributedRequestManager.TRANSACTION_ID_BIT_MASK; 429 fakeTid = fakeTid | controllerId; 430 request.setTransactionId(fakeTid); 431 } 432 433 try 434 { 435 scheduleNonSuspendedReadRequest(request); 436 } 437 catch (SQLException e) 438 { 439 synchronized (activeReadRequests) 441 { 442 activeReadRequests.remove(id); 443 } 444 throw e; 445 } 446 } 447 448 455 protected abstract void scheduleNonSuspendedReadRequest(SelectRequest request) 456 throws SQLException ; 457 458 465 public final void readCompleted(SelectRequest request) throws SQLException 466 { 467 Long id = new Long (request.getId()); 468 synchronized (activeReadRequests) 469 { 470 if (activeReadRequests.remove(id) == null) 471 throw new SQLException ("Query " + id 472 + " is not in the list of currently scheduled queries"); 473 activeReadRequests.notifyAll(); 474 } 475 numberRead++; 476 this.readCompletedNotify(request); 477 } 478 479 484 protected abstract void readCompletedNotify(SelectRequest request); 485 486 499 public final void scheduleWriteRequest(AbstractWriteRequest request) 500 throws SQLException , RollbackException 501 { 502 suspendWriteIfNeededAndAddQueryToActiveRequests(request); 503 scheduleNonSuspendedWriteRequest(request); 504 505 if (request.isAutoCommit()) 507 { 508 long fakeTid = getNextTransactionId(); 509 fakeTid = fakeTid & DistributedRequestManager.TRANSACTION_ID_BIT_MASK; 510 fakeTid = fakeTid | controllerId; 511 request.setTransactionId(fakeTid); 512 } 513 } 514 515 524 protected abstract void scheduleNonSuspendedWriteRequest( 525 AbstractWriteRequest request) throws SQLException , RollbackException; 526 527 540 public final void writeCompleted(AbstractWriteRequest request) 541 throws SQLException 542 { 543 Long id = new Long (request.getId()); 544 545 synchronized (writesSync) 546 { 547 synchronized (activeWriteRequests) 548 { 549 if (activeWriteRequests.remove(id) == null) 550 throw new SQLException ("Query " + id 551 + " is not in the list of currently scheduled queries"); 552 553 activeWriteRequests.notifyAll(); 554 } 555 pendingWrites--; 556 557 if (pendingWrites < 0) 558 { 559 logger 560 .error("Negative pending writes detected on write request completion (" 561 + request + ")"); 562 pendingWrites = 0; 563 } 564 565 if (logger.isDebugEnabled()) 566 logger.debug("Write completed, remaining pending writes: " 567 + pendingWrites); 568 569 notifyWriteCompleted(request); 570 571 checkPendingWrites(); 572 } 573 numberWrite++; 574 } 575 576 583 protected abstract void notifyWriteCompleted(AbstractWriteRequest request); 584 585 597 public final void scheduleStoredProcedure(StoredProcedure proc) 598 throws SQLException , RollbackException 599 { 600 suspendWriteIfNeededAndAddQueryToActiveRequests(proc); 601 scheduleNonSuspendedStoredProcedure(proc); 602 603 if (proc.isAutoCommit()) 605 { 606 long fakeTid = getNextTransactionId(); 607 fakeTid = fakeTid & DistributedRequestManager.TRANSACTION_ID_BIT_MASK; 608 fakeTid = fakeTid | controllerId; 609 proc.setTransactionId(fakeTid); 610 } 611 } 612 613 621 protected abstract void scheduleNonSuspendedStoredProcedure( 622 StoredProcedure proc) throws SQLException , RollbackException; 623 624 638 public final void storedProcedureCompleted(StoredProcedure proc) 639 throws SQLException 640 { 641 Long id = new Long (proc.getId()); 642 643 synchronized (writesSync) 644 { 645 synchronized (activeWriteRequests) 646 { 647 if (activeWriteRequests.remove(id) == null) 648 throw new SQLException ("Query " + id 649 + " is not in the list of currently scheduled queries"); 650 651 activeWriteRequests.notifyAll(); 652 } 653 654 pendingWrites--; 655 656 if (pendingWrites < 0) 657 { 658 logger 659 .error("Negative pending writes detected on stored procedure completion (" 660 + proc + ")"); 661 pendingWrites = 0; 662 } 663 664 if (logger.isDebugEnabled()) 665 logger.debug("Stored procedure completed, remaining pending writes: " 666 + pendingWrites); 667 668 notifyStoredProcedureCompleted(proc); 669 670 checkPendingWrites(); 671 } 672 numberWrite++; 673 } 674 675 682 protected abstract void notifyStoredProcedureCompleted(StoredProcedure proc); 683 684 693 private void suspendWriteIfNeededAndAddQueryToActiveRequests( 694 AbstractRequest request) throws SQLException 695 { 696 Long id = new Long (request.getId()); 697 698 synchronized (writesSync) 699 { 700 if (suspendedWrites > 0) 701 { 702 boolean mustBeSuspended = !request.isPersistentConnection() 705 && (request.isAutoCommit() || !activeTransactions 706 .contains(new TransactionMetaData(request.getTransactionId(), 707 0, request.getLogin(), request.isPersistentConnection(), 708 request.getPersistentConnectionId()))); 709 710 if (mustBeSuspended) 711 { 712 addSuspendedRequest(request); 713 try 714 { 715 int timeout = request.getTimeout(); 717 if (timeout > 0) 718 { 719 long start = System.currentTimeMillis(); 720 long lTimeout = timeout * 1000L; 721 writesSync.wait(lTimeout); 722 long end = System.currentTimeMillis(); 723 int remaining = (int) (lTimeout - (end - start)); 724 if (remaining > 0) 725 request.setTimeout(remaining); 726 else 727 { 728 String msg = Translate.get("scheduler.request.timeout", 729 new String []{String.valueOf(request.getId()), 730 String.valueOf(request.getTimeout()), 731 String.valueOf(pendingWrites)}); 732 logger.warn(msg); 733 throw new SQLException (msg); 734 } 735 } 736 else 737 this.writesSync.wait(); 738 } 739 catch (InterruptedException e) 740 { 741 String msg = Translate.get("scheduler.request.timeout.failed", e); 742 logger.warn(msg); 743 throw new SQLException (msg); 744 } 745 } 746 } 747 748 synchronized (activeWriteRequests) 749 { 750 if (activeWriteRequests.containsKey(id)) 751 throw new SQLException ("A query with id " + id 752 + " has already been scheduled"); 753 754 activeWriteRequests.put(id, request); 755 } 756 pendingWrites++; 757 758 if (logger.isDebugEnabled()) 759 logger.debug("Schedule " + request.getUniqueKey() 760 + " - Current pending writes: " + pendingWrites); 761 } 762 } 763 764 769 public void scheduleOpenPersistentConnection( 770 DistributedOpenPersistentConnection dmsg) 771 { 772 checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount(); 773 774 synchronized (persistentConnectionsSync) 778 { 779 if (suspendedNewPersistentConnections > 0) 780 { 781 addSuspendedRequest(dmsg); 782 try 783 { 784 persistentConnectionsSync.wait(); 785 } 786 catch (InterruptedException e) 787 { 788 e.printStackTrace(); 789 } 790 } 791 persistentConnections.put(new Long (dmsg.getPersistentConnectionId()), 792 dmsg.getLogin()); 793 } 794 } 795 796 804 public void scheduleOpenPersistentConnection(long persistentConnectionId, 805 String login) 806 { 807 scheduleOpenPersistentConnection(new DistributedOpenPersistentConnection( 810 login, persistentConnectionId)); 811 } 812 813 816 public void scheduleClosePersistentConnection() 817 { 818 checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount(); 819 } 820 821 private void checkForSuspendedOpenClosePersistentConnectionsAndIncreasePendingCount() 822 { 823 synchronized (suspendOpenClosePersistentConnectionSync) 824 { 825 while (suspendedOpenClosePersistentConnections > 0) 826 { 827 try 828 { 829 suspendOpenClosePersistentConnectionSync.wait(); 830 } 831 catch (InterruptedException e) 832 { 833 } 834 } 835 pendingOpenClosePersistentConnections++; 836 } 837 } 838 839 private void decrementOpenClosePersistentConnectionCount() 840 { 841 synchronized (suspendOpenClosePersistentConnectionSync) 842 { 843 pendingOpenClosePersistentConnections--; 844 if (pendingOpenClosePersistentConnections < 0) 845 { 846 logger 847 .error("Negative count of pending open/close persistent connections"); 848 pendingOpenClosePersistentConnections = 0; 849 } 850 if (suspendedOpenClosePersistentConnections == 0) 851 suspendOpenClosePersistentConnectionSync.notifyAll(); 852 } 853 } 854 855 863 public void openPersistentConnectionCompleted(long persistentConnectionId, 864 boolean success) 865 { 866 decrementOpenClosePersistentConnectionCount(); 867 if (!success) 868 synchronized (endOfCurrentPersistentConnections) 869 { 870 persistentConnections.remove(new Long (persistentConnectionId)); 871 endOfCurrentPersistentConnections.notifyAll(); 872 } 873 } 874 875 880 public void closePersistentConnectionCompleted(long persistentConnectionId) 881 { 882 decrementOpenClosePersistentConnectionCount(); 883 synchronized (endOfCurrentPersistentConnections) 884 { 885 persistentConnections.remove(new Long (persistentConnectionId)); 886 endOfCurrentPersistentConnections.notifyAll(); 887 } 888 } 889 890 896 public String getPersistentConnectionLogin(Long persistentConnectionId) 897 { 898 return (String ) persistentConnections.get(persistentConnectionId); 899 } 900 901 904 public boolean hasPersistentConnection(long persistentConnectionId) 905 { 906 return persistentConnections.contains(new Long (persistentConnectionId)); 907 } 908 909 915 public Hashtable getOpenPersistentConnections() 916 { 917 return persistentConnections; 918 } 919 920 924 938 public final void begin(TransactionMetaData tm, boolean isLazyStart, 939 AbstractRequest request) throws SQLException 940 { 941 boolean retry; 943 do 944 { 945 retry = false; 946 synchronized (transactionSync) 947 { 948 if ((suspendedTransactions > 0) && !isLazyStart 949 && !tm.isPersistentConnection()) 950 { 951 addSuspendedRequest(request); 952 try 953 { 954 long timeout = tm.getTimeout(); 956 if (timeout > 0) 957 { 958 long start = System.currentTimeMillis(); 959 transactionSync.wait(timeout); 960 long end = System.currentTimeMillis(); 961 long remaining = timeout - (end - start); 962 if (remaining > 0) 963 tm.setTimeout(remaining); 964 else 965 { 966 String msg = Translate.get( 967 "scheduler.begin.timeout.transactionSync", 968 pendingTransactions); 969 logger.warn(msg); 970 throw new SQLException (msg); 971 } 972 } 973 else 974 transactionSync.wait(); 975 } 976 catch (InterruptedException e) 977 { 978 String msg = Translate.get( 979 "scheduler.begin.timeout.transactionSync", pendingTransactions) 980 + " (" + e + ")"; 981 logger.error(msg); 982 throw new SQLException (msg); 983 } 984 } 985 if (vdb != null && vdb.isRejectingNewTransaction()) 986 throw new VDBisShuttingDownException( 987 "VDB is shutting down... can't start a new transaction"); 988 989 pendingTransactions++; 990 991 if (logger.isDebugEnabled()) 992 logger.debug("Begin scheduled - current pending transactions: " 993 + pendingTransactions); 994 } 995 996 synchronized (writesSync) 998 { 999 1003 synchronized (transactionSync) 1004 { 1005 if ((suspendedTransactions > 0) && !isLazyStart 1006 && !tm.isPersistentConnection()) 1007 { 1008 retry = true; 1009 pendingTransactions--; 1010 checkPendingTransactions(); 1011 continue; 1012 } 1013 } 1014 if ((suspendedWrites > 0) && !isLazyStart 1015 && !tm.isPersistentConnection()) 1016 { 1017 addSuspendedRequest(request); 1018 try 1019 { 1020 long timeout = tm.getTimeout(); 1022 if (timeout > 0) 1023 { 1024 long start = System.currentTimeMillis(); 1025 writesSync.wait(timeout); 1026 long end = System.currentTimeMillis(); 1027 long remaining = timeout - (end - start); 1028 if (remaining > 0) 1029 tm.setTimeout(remaining); 1030 else 1031 { 1032 String msg = Translate.get( 1033 "scheduler.begin.timeout.writesSync", pendingWrites); 1034 logger.warn(msg); 1035 synchronized (transactionSync) 1036 { 1037 pendingTransactions--; 1038 } 1039 checkPendingTransactions(); 1040 throw new SQLException (msg); 1041 } 1042 } 1043 else 1044 writesSync.wait(); 1045 } 1046 catch (InterruptedException e) 1047 { 1048 String msg = Translate.get("scheduler.begin.timeout.writesSync", 1049 pendingWrites) 1050 + " (" + e + ")"; 1051 logger.error(msg); 1052 synchronized (transactionSync) 1053 { 1054 pendingTransactions--; 1055 } 1056 checkPendingTransactions(); 1057 throw new SQLException (msg); 1058 } 1059 } 1060 pendingWrites++; 1061 1062 if (logger.isDebugEnabled()) 1063 logger.debug("Begin scheduled - current pending writes: " 1064 + pendingWrites); 1065 1066 if (activeTransactions.contains(tm)) 1069 { 1070 logger.error("Trying to start twice transaction " 1071 + tm.getTransactionId()); 1072 } 1073 else 1074 activeTransactions.add(tm); 1075 } 1076 } 1077 while (retry); 1078 } 1079 1080 1085 public final void beginCompleted(long transactionId) 1086 { 1087 synchronized (writesSync) 1089 { 1090 pendingWrites--; 1091 if (pendingWrites < 0) 1092 { 1093 logger 1094 .error("Negative pending writes detected on begin completion for transaction " 1095 + transactionId); 1096 pendingWrites = 0; 1097 } 1098 1099 if (logger.isDebugEnabled()) 1100 logger.debug("Begin completed, remaining pending writes: " 1101 + pendingWrites); 1102 1103 checkPendingWrites(); 1104 } 1105 } 1106 1107 1119 public final void commit(TransactionMetaData tm, boolean emptyTransaction, 1120 DistributedCommit dmsg) throws SQLException 1121 { 1122 synchronized (writesSync) 1124 { 1125 if (!activeTransactions.contains(tm)) 1126 throw new SQLException ("Transaction " + tm.getTransactionId() 1127 + " is not active, rejecting the commit."); 1128 1129 if (false) { 1132 addSuspendedRequest(dmsg); 1133 try 1134 { 1135 long timeout = tm.getTimeout(); 1137 if (timeout > 0) 1138 { 1139 long start = System.currentTimeMillis(); 1140 writesSync.wait(timeout); 1141 long end = System.currentTimeMillis(); 1142 long remaining = timeout - (end - start); 1143 if (remaining > 0) 1144 tm.setTimeout(remaining); 1145 else 1146 { 1147 String msg = Translate.get("scheduler.commit.timeout.writesSync", 1148 pendingWrites); 1149 logger.warn(msg); 1150 throw new SQLException (msg); 1151 } 1152 } 1153 else 1154 writesSync.wait(); 1155 } 1156 catch (InterruptedException e) 1157 { 1158 String msg = Translate.get("scheduler.commit.timeout.writesSync", 1159 pendingWrites) 1160 + " (" + e + ")"; 1161 logger.error(msg); 1162 throw new SQLException (msg); 1163 } 1164 } 1165 pendingWrites++; 1166 1167 if (logger.isDebugEnabled()) 1168 logger.debug("Commit scheduled - current pending writes: " 1169 + pendingWrites); 1170 } 1171 if (!emptyTransaction) 1172 commitTransaction(tm.getTransactionId()); 1173 } 1174 1175 1180 protected abstract void commitTransaction(long transactionId); 1181 1182 1188 public final void commitCompleted(TransactionMetaData tm, boolean isSuccess) 1189 { 1190 boolean transactionIsActive = false; 1191 synchronized (writesSync) 1192 { 1193 if (isSuccess) 1194 { 1195 transactionIsActive = activeTransactions.remove(tm); 1196 } 1197 } 1198 if (transactionIsActive) 1199 { 1200 synchronized (transactionSync) 1202 { 1203 pendingTransactions--; 1204 if (pendingTransactions < 0) 1205 { 1206 logger 1207 .error("Negative pending transactions detected on commit completion for transaction " 1208 + tm.getTransactionId()); 1209 pendingTransactions = 0; 1210 } 1211 1212 if (logger.isDebugEnabled()) 1213 logger.debug("Commit completed, remaining pending transactions: " 1214 + pendingTransactions); 1215 1216 checkPendingTransactions(); 1217 } 1218 } 1219 else if ((isSuccess) && (logger.isDebugEnabled())) 1220 logger.debug("Transaction " + tm.getTransactionId() 1221 + " has already completed."); 1222 1223 synchronized (writesSync) 1225 { 1226 pendingWrites--; 1227 if (pendingWrites < 0) 1228 { 1229 logger 1230 .error("Negative pending writes detected on commit completion for transaction" 1231 + tm.getTransactionId()); 1232 pendingWrites = 0; 1233 } 1234 1235 if (logger.isDebugEnabled()) 1236 logger.debug("Commit completed, remaining pending writes: " 1237 + pendingWrites); 1238 1239 checkPendingWrites(); 1240 } 1241 } 1242 1243 1253 public final void rollback(TransactionMetaData tm, DistributedRollback dmsg) 1254 throws SQLException 1255 { 1256 synchronized (writesSync) 1258 { 1259 if (!activeTransactions.contains(tm)) 1260 throw new SQLException ("Transaction " + tm.getTransactionId() 1261 + " is not active, rejecting the rollback."); 1262 1263 if (false) { 1266 addSuspendedRequest(dmsg); 1267 try 1268 { 1269 long timeout = tm.getTimeout(); 1271 if (timeout > 0) 1272 { 1273 long start = System.currentTimeMillis(); 1274 writesSync.wait(timeout); 1275 long end = System.currentTimeMillis(); 1276 long remaining = timeout - (end - start); 1277 if (remaining > 0) 1278 tm.setTimeout(remaining); 1279 else 1280 { 1281 String msg = Translate.get( 1282 "scheduler.rollback.timeout.writesSync", pendingWrites); 1283 logger.warn(msg); 1284 throw new SQLException (msg); 1285 } 1286 } 1287 else 1288 writesSync.wait(); 1289 } 1290 catch (InterruptedException e) 1291 { 1292 String msg = Translate.get("scheduler.rollback.timeout.writesSync", 1293 pendingWrites) 1294 + " (" + e + ")"; 1295 logger.error(msg); 1296 throw new SQLException (msg); 1297 } 1298 } 1299 pendingWrites++; 1300 1301 if (logger.isDebugEnabled()) 1302 logger.debug("Rollback scheduled - current pending writes: " 1303 + pendingWrites); 1304 } 1305 rollbackTransaction(tm.getTransactionId()); 1306 } 1307 1308 1318 public final void rollback(TransactionMetaData tm, String savepointName, 1319 DistributedRollbackToSavepoint dmsg) throws SQLException 1320 { 1321 synchronized (writesSync) 1323 { 1324 if (false) { 1327 addSuspendedRequest(dmsg); 1328 try 1329 { 1330 long timeout = tm.getTimeout(); 1332 if (timeout > 0) 1333 { 1334 long start = System.currentTimeMillis(); 1335 writesSync.wait(timeout); 1336 long end = System.currentTimeMillis(); 1337 long remaining = timeout - (end - start); 1338 if (remaining > 0) 1339 tm.setTimeout(remaining); 1340 else 1341 { 1342 String msg = Translate.get( 1343 "scheduler.rollbacksavepoint.timeout.writeSync", 1344 pendingWrites); 1345 logger.warn(msg); 1346 throw new SQLException (msg); 1347 } 1348 } 1349 else 1350 writesSync.wait(); 1351 } 1352 catch (InterruptedException e) 1353 { 1354 String msg = Translate.get( 1355 "scheduler.rollbacksavepoint.timeout.writeSync", pendingWrites) 1356 + " (" + e + ")"; 1357 logger.error(msg); 1358 throw new SQLException (msg); 1359 } 1360 } 1361 pendingWrites++; 1362 1363 if (logger.isDebugEnabled()) 1364 logger.debug("Rollback " + savepointName 1365 + " scheduled - current pending writes: " + pendingWrites); 1366 } 1367 1368 this.rollbackTransaction(tm.getTransactionId(), savepointName); 1369 } 1370 1371 1376 protected abstract void rollbackTransaction(long transactionId); 1377 1378 1384 protected abstract void rollbackTransaction(long transactionId, 1385 String savepointName); 1386 1387 1393 public final void rollbackCompleted(TransactionMetaData tm, boolean isSuccess) 1394 { 1395 boolean transactionIsActive = false; 1396 synchronized (writesSync) 1397 { 1398 if (isSuccess) 1399 { 1400 transactionIsActive = activeTransactions.remove(tm); 1401 } 1402 } 1403 if (transactionIsActive) 1404 { 1405 synchronized (transactionSync) 1407 { 1408 pendingTransactions--; 1409 if (pendingTransactions < 0) 1410 { 1411 logger 1412 .error("Negative pending transactions detected on rollback completion for transaction " 1413 + tm.getTransactionId()); 1414 pendingTransactions = 0; 1415 } 1416 1417 if (logger.isDebugEnabled()) 1418 logger.debug("Rollback completed, remaining pending transactions: " 1419 + pendingTransactions); 1420 1421 checkPendingTransactions(); 1422 } 1423 } 1424 else if ((isSuccess) && (logger.isDebugEnabled())) 1425 logger.debug("Transaction " + tm.getTransactionId() 1426 + " has already completed."); 1427 1428 synchronized (writesSync) 1430 { 1431 pendingWrites--; 1432 1433 if (pendingWrites < 0) 1434 { 1435 logger 1436 .error("Negative pending writes detected on rollback completion for transaction " 1437 + tm.getTransactionId()); 1438 pendingWrites = 0; 1439 } 1440 1441 if (logger.isDebugEnabled()) 1442 logger.debug("Rollback completed, remaining pending writes: " 1443 + pendingWrites); 1444 1445 checkPendingWrites(); 1446 } 1447 } 1448 1449 1458 public final int setSavepoint(TransactionMetaData tm) throws SQLException 1459 { 1460 synchronized (writesSync) 1462 { 1463 if (suspendedWrites > 0) 1464 { 1465 try 1466 { 1467 long timeout = tm.getTimeout(); 1469 if (timeout > 0) 1470 { 1471 long start = System.currentTimeMillis(); 1472 writesSync.wait(timeout); 1473 long end = System.currentTimeMillis(); 1474 long remaining = timeout - (end - start); 1475 if (remaining > 0) 1476 tm.setTimeout(remaining); 1477 else 1478 { 1479 String msg = Translate.get( 1480 "scheduler.setsavepoint.timeout.writeSync", pendingWrites); 1481 logger.warn(msg); 1482 throw new SQLException (msg); 1483 } 1484 } 1485 else 1486 writesSync.wait(); 1487 } 1488 catch (InterruptedException e) 1489 { 1490 String msg = Translate.get( 1491 "scheduler.setsavepoint.timeout.writeSync", pendingWrites) 1492 + " (" + e + ")"; 1493 logger.error(msg); 1494 throw new SQLException (msg); 1495 } 1496 } 1497 pendingWrites++; 1498 1499 if (logger.isDebugEnabled()) 1500 logger.debug("Set savepoint scheduled - current pending writes: " 1501 + pendingWrites); 1502 } 1503 1504 int savepointId = this.incrementSavepointId(); 1505 this.setSavepointTransaction(tm.getTransactionId(), String 1506 .valueOf(savepointId)); 1507 return savepointId; 1508 } 1509 1510 1520 public final void setSavepoint(TransactionMetaData tm, String name, 1521 DistributedSetSavepoint dmsg) throws SQLException 1522 { 1523 synchronized (writesSync) 1525 { 1526 if (suspendedWrites > 0) 1527 { 1528 addSuspendedRequest(dmsg); 1529 try 1530 { 1531 long timeout = tm.getTimeout(); 1533 if (timeout > 0) 1534 { 1535 long start = System.currentTimeMillis(); 1536 writesSync.wait(timeout); 1537 long end = System.currentTimeMillis(); 1538 long remaining = timeout - (end - start); 1539 if (remaining > 0) 1540 tm.setTimeout(remaining); 1541 else 1542 { 1543 String msg = Translate.get( 1544 "scheduler.setsavepoint.timeout.writeSync", pendingWrites); 1545 logger.warn(msg); 1546 throw new SQLException (msg); 1547 } 1548 } 1549 else 1550 writesSync.wait(); 1551 } 1552 catch (InterruptedException e) 1553 { 1554 String msg = Translate.get( 1555 "scheduler.setsavepoint.timeout.writeSync", pendingWrites) 1556 + " (" + e + ")"; 1557 logger.error(msg); 1558 throw new SQLException (msg); 1559 } 1560 } 1561 pendingWrites++; 1562 1563 if (logger.isDebugEnabled()) 1564 logger.debug("Set savepoint " + name 1565 + " scheduled - current pending writes: " + pendingWrites); 1566 } 1567 1568 this.setSavepointTransaction(tm.getTransactionId(), name); 1569 } 1570 1571 1577 protected abstract void setSavepointTransaction(long transactionId, 1578 String name); 1579 1580 1590 public final void releaseSavepoint(TransactionMetaData tm, String name, 1591 DistributedReleaseSavepoint dmsg) throws SQLException 1592 { 1593 synchronized (writesSync) 1595 { 1596 if (suspendedWrites > 0) 1597 { 1598 addSuspendedRequest(dmsg); 1599 try 1600 { 1601 long timeout = tm.getTimeout(); 1603 if (timeout > 0) 1604 { 1605 long start = System.currentTimeMillis(); 1606 writesSync.wait(timeout); 1607 long end = System.currentTimeMillis(); 1608 long remaining = timeout - (end - start); 1609 if (remaining > 0) 1610 tm.setTimeout(remaining); 1611 else 1612 { 1613 String msg = Translate 1614 .get("scheduler.releasesavepoint.timeout.writeSync", 1615 pendingWrites); 1616 logger.warn(msg); 1617 throw new SQLException (msg); 1618 } 1619 } 1620 else 1621 writesSync.wait(); 1622 } 1623 catch (InterruptedException e) 1624 { 1625 String msg = Translate.get( 1626 "scheduler.releasesavepoint.timeout.writeSync", pendingWrites) 1627 + " (" + e + ")"; 1628 logger.error(msg); 1629 throw new SQLException (msg); 1630 } 1631 } 1632 pendingWrites++; 1633 1634 if (logger.isDebugEnabled()) 1635 logger.debug("Release savepoint " + name 1636 + " scheduled - current pending writes: " + pendingWrites); 1637 } 1638 1639 this.releaseSavepointTransaction(tm.getTransactionId(), name); 1640 } 1641 1642 1648 protected abstract void releaseSavepointTransaction(long transactionId, 1649 String name); 1650 1651 1656 public final void savepointCompleted(long transactionId) 1657 { 1658 synchronized (writesSync) 1659 { 1660 pendingWrites--; 1661 1662 if (pendingWrites < 0) 1663 { 1664 logger 1665 .error("Negative pending writes detected on savepoint completion for transaction" 1666 + transactionId); 1667 pendingWrites = 0; 1668 } 1669 1670 if (logger.isDebugEnabled()) 1671 logger.debug("Savepoint completed, remaining pending writes: " 1672 + pendingWrites); 1673 1674 checkPendingWrites(); 1675 } 1676 } 1677 1678 1684 public final void resumeNewTransactions() 1685 { 1686 if (logger.isDebugEnabled()) 1687 logger.debug("Resuming new transactions"); 1688 1689 synchronized (transactionSync) 1690 { 1691 suspendedTransactions--; 1692 if (suspendedTransactions < 0) 1693 { 1694 suspendedTransactions = 0; 1695 logger 1696 .error("Unexpected negative suspendedTransactions in AbstractScheduler.resumeNewTransactions()"); 1697 } 1698 if (suspendedTransactions == 0) 1699 { 1700 transactionSync.notifyAll(); 1702 } 1703 } 1704 } 1705 1706 1716 public final void suspendNewTransactions() 1717 { 1718 if (logger.isDebugEnabled()) 1719 logger.debug("Suspending new transactions"); 1720 1721 synchronized (transactionSync) 1722 { 1723 suspendedTransactions++; 1724 } 1725 } 1726 1727 1740 public void suspendNewTransactionsAndWrites() 1741 { 1742 if (logger.isDebugEnabled()) 1743 logger.debug("Suspending new transactions and writes"); 1744 1745 synchronized (writesSync) 1746 { 1747 synchronized (transactionSync) 1748 { 1749 suspendedTransactions++; 1750 suspendedWrites++; 1751 } 1752 } 1753 } 1754 1755 1761 public void waitForSuspendedTransactionsToComplete() throws SQLException 1762 { 1763 synchronized (transactionSync) 1764 { 1765 if (pendingTransactions == 0) 1766 { 1767 if (logger.isDebugEnabled()) 1768 logger.debug("All transactions suspended"); 1769 return; 1770 } 1771 } 1772 long waitTime = 15000; 1773 while (true) 1774 { 1775 1776 synchronized (endOfCurrentTransactions) 1777 { 1778 if (pendingTransactions == 0) 1784 { 1785 if (logger.isDebugEnabled()) 1786 logger.debug("All new transactions suspended"); 1787 return; 1788 } 1789 1790 if (logger.isDebugEnabled()) 1791 logger.debug("Waiting for " + pendingTransactions 1792 + " transactions to complete."); 1793 1794 try 1796 { 1797 endOfCurrentTransactions.wait(waitTime); 1798 } 1799 catch (InterruptedException e) 1800 { 1801 String msg = Translate.get("scheduler.suspend.transaction.failed", e); 1802 logger.error(msg); 1803 throw new SQLException (msg); 1804 } 1805 } 1806 synchronized (transactionSync) 1807 { 1808 if (pendingTransactions == 0) 1809 break; 1810 else 1811 { 1812 logger.warn("Waiting for " + pendingTransactions 1813 + " open transactions"); 1814 waitTime *= 2; 1815 } 1816 1817 } 1818 } 1819 1820 if (logger.isDebugEnabled()) 1821 logger.debug("All new transactions suspended"); 1822 } 1823 1824 1830 public void resumeWrites() 1831 { 1832 if (logger.isDebugEnabled()) 1833 logger.debug("Resuming writes"); 1834 1835 synchronized (writesSync) 1836 { 1837 suspendedWrites--; 1838 if (suspendedWrites < 0) 1839 { 1840 suspendedWrites = 0; 1841 logger 1842 .error("Unexpected negative suspendedWrites in AbstractScheduler.resumeWrites()"); 1843 } 1844 if (suspendedWrites == 0) 1845 { 1846 writesSync.notifyAll(); 1848 } 1849 } 1850 } 1851 1852 1856 private void checkPendingWrites() 1857 { 1858 synchronized (writesSync) 1859 { 1860 if ((suspendedWrites > 0) && (pendingWrites == 0)) 1863 { 1864 synchronized (endOfCurrentWrites) 1865 { 1866 endOfCurrentWrites.notifyAll(); 1867 } 1868 } 1869 } 1870 } 1871 1872 1879 private void checkPendingTransactions() 1880 { 1881 synchronized (transactionSync) 1882 { 1883 if ((suspendedTransactions > 0) && (pendingTransactions == 0)) 1887 { 1888 synchronized (endOfCurrentTransactions) 1889 { 1890 endOfCurrentTransactions.notifyAll(); 1891 } 1892 } 1893 } 1894 } 1895 1896 1900 public void resumeWritesTransactionsAndPersistentConnections() 1901 { 1902 clearSuspendedRequests(); 1903 resumeWrites(); 1904 resumeNewTransactions(); 1905 resumeNewPersistentConnections(); 1906 } 1907 1908 1912 1921 public void suspendNewWrites() 1922 { 1923 if (logger.isDebugEnabled()) 1924 logger.debug("Suspending new writes"); 1925 1926 synchronized (writesSync) 1927 { 1928 suspendedWrites++; 1929 } 1930 } 1931 1932 1938 public void waitForSuspendedWritesToComplete() throws SQLException 1939 { 1940 synchronized (writesSync) 1941 { 1942 if (pendingWrites == 0) 1943 { 1944 if (logger.isDebugEnabled()) 1945 logger.debug("All writes suspended"); 1946 return; 1947 } 1948 } 1949 1950 long waitTime = 15000; 1951 while (true) 1952 { 1953 synchronized (endOfCurrentWrites) 1954 { 1955 if (pendingWrites == 0) 1961 { 1962 if (logger.isDebugEnabled()) 1963 logger.debug("All writes suspended"); 1964 return; 1965 } 1966 1967 if (logger.isDebugEnabled()) 1968 logger.debug("Wait for " + pendingWrites + " writes to complete."); 1969 1970 try 1972 { 1973 endOfCurrentWrites.wait(waitTime); 1974 } 1975 catch (InterruptedException e) 1976 { 1977 String msg = Translate.get("scheduler.suspend.writes.failed", e); 1978 logger.error(msg); 1979 throw new SQLException (msg); 1980 } 1981 } 1982 synchronized (writesSync) 1983 { 1984 if (pendingWrites == 0) 1985 break; 1986 else 1987 { 1988 logger.warn("Waiting for " + pendingWrites + " pending writes"); 1989 waitTime *= 2; 1990 } 1991 } 1992 } 1993 1994 if (logger.isDebugEnabled()) 1995 logger.debug("All writes suspended"); 1996 } 1997 1998 2001 public void resumeOpenClosePersistentConnection() 2002 { 2003 synchronized (suspendOpenClosePersistentConnectionSync) 2004 { 2005 suspendedOpenClosePersistentConnections--; 2006 if (suspendedOpenClosePersistentConnections == 0) 2007 suspendOpenClosePersistentConnectionSync.notifyAll(); 2008 } 2009 } 2010 2011 2017 public final void resumeNewPersistentConnections() 2018 { 2019 if (logger.isDebugEnabled()) 2020 logger.debug("Resuming new persistent connections"); 2021 2022 synchronized (persistentConnectionsSync) 2023 { 2024 suspendedNewPersistentConnections--; 2025 if (suspendedNewPersistentConnections < 0) 2026 { 2027 suspendedNewPersistentConnections = 0; 2028 logger 2029 .error("Unexpected negative suspendedPersistentConnections in AbstractScheduler.resumeNewPersistentConnections()"); 2030 } 2031 if (suspendedNewPersistentConnections == 0) 2032 { 2033 persistentConnectionsSync.notifyAll(); 2035 } 2036 } 2037 } 2038 2039 2045 public void suspendOpenClosePersistentConnection() 2046 { 2047 synchronized (suspendOpenClosePersistentConnectionSync) 2048 { 2049 suspendedOpenClosePersistentConnections++; 2050 } 2051 } 2052 2053 2061 public void suspendNewPersistentConnections() 2062 { 2063 if (logger.isDebugEnabled()) 2064 logger.debug("Suspending new persistent connections"); 2065 2066 synchronized (persistentConnectionsSync) 2067 { 2068 suspendedNewPersistentConnections++; 2069 } 2070 } 2071 2072 2078 public void waitForPersistentConnectionsToComplete() throws SQLException 2079 { 2080 synchronized (persistentConnectionsSync) 2081 { 2082 if (persistentConnections.isEmpty()) 2083 { 2084 if (logger.isDebugEnabled()) 2085 logger.debug("All persistent connections closed"); 2086 return; 2087 } 2088 } 2089 long waitTime = 15000; 2090 synchronized (endOfCurrentPersistentConnections) 2091 { 2092 if (persistentConnections.isEmpty()) 2093 { 2094 if (logger.isDebugEnabled()) 2095 logger.debug("All persistent connections closed"); 2096 return; 2097 } 2098 2099 if (logger.isDebugEnabled()) 2100 logger.debug("Waiting for " + persistentConnections.size() 2101 + " persistent connections to be closed."); 2102 2103 long startTime = System.currentTimeMillis(); 2105 while (!persistentConnections.isEmpty()) 2106 try 2107 { 2108 endOfCurrentPersistentConnections.wait(waitTime); 2109 if (!persistentConnections.isEmpty() 2110 && (System.currentTimeMillis() - startTime) > waitTime) 2111 { 2112 logger.warn("Waiting for " + persistentConnections.size() 2113 + " open persistent connections"); 2114 waitTime *= 2; 2115 startTime = System.currentTimeMillis(); 2116 } 2117 } 2118 catch (InterruptedException e) 2119 { 2120 String msg = Translate.get("scheduler.suspend.transaction.failed", e); 2121 logger.error(msg); 2122 throw new SQLException (msg); 2123 } 2124 2125 } 2126 2127 if (logger.isDebugEnabled()) 2128 logger.debug("All persistent connections closed"); 2129 } 2130 2131 2135 public void waitForPendingOpenClosePersistentConnection() 2136 { 2137 synchronized (suspendOpenClosePersistentConnectionSync) 2138 { 2139 while (pendingOpenClosePersistentConnections > 0) 2140 { 2141 try 2142 { 2143 suspendOpenClosePersistentConnectionSync.wait(); 2144 } 2145 catch (InterruptedException ignore) 2146 { 2147 } 2148 } 2149 } 2150 } 2151 2152 2157 private void addSuspendedRequest(Object obj) 2158 { 2159 synchronized (suspendedRequests) 2160 { 2161 suspendedRequests.add(obj); 2162 } 2163 if (totalOrderQueue != null) 2164 { synchronized (totalOrderQueue) 2166 { 2167 totalOrderQueue.notifyAll(); 2168 } 2169 } 2170 } 2171 2172 2178 public boolean isSuspendedRequest(Object obj) 2179 { 2180 synchronized (suspendedRequests) 2181 { 2182 return suspendedRequests.contains(obj); 2183 } 2184 } 2185 2186 2189 private void clearSuspendedRequests() 2190 { 2191 synchronized (suspendedRequests) 2192 { 2193 suspendedRequests.clear(); 2194 if (totalOrderQueue != null) 2195 { synchronized (totalOrderQueue) 2197 { 2198 totalOrderQueue.notifyAll(); 2199 } 2200 } 2201 } 2202 } 2203 2204 2208 protected abstract String getXmlImpl(); 2209 2210 2215 public String getXml() 2216 { 2217 StringBuffer info = new StringBuffer (); 2218 info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">"); 2219 info.append(this.getXmlImpl()); 2220 info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">"); 2221 return info.toString(); 2222 } 2223 2224 2229 public String [] getSchedulerData() 2230 { 2231 String [] data = new String [7]; 2232 data[0] = String.valueOf(numberRead); 2233 data[1] = String.valueOf(numberWrite); 2234 data[2] = String.valueOf(pendingTransactions); 2235 data[3] = String.valueOf(pendingWrites); 2236 data[4] = String.valueOf(numberRead + numberWrite); 2237 data[5] = String.valueOf(suspendedTransactions); 2238 data[6] = String.valueOf(suspendedWrites); 2239 return data; 2240 } 2241 2242 2245 public int getNumberRead() 2246 { 2247 return numberRead; 2248 } 2249 2250 2253 public int getNumberWrite() 2254 { 2255 return numberWrite; 2256 } 2257 2258 2261 public int getPendingTransactions() 2262 { 2263 return pendingTransactions; 2264 } 2265 2266 2269 public boolean isSuspendedTransactions() 2270 { 2271 return suspendedTransactions > 0; 2272 } 2273 2274 2277 public boolean isSuspendedWrites() 2278 { 2279 return suspendedWrites > 0; 2280 } 2281 2282} | Popular Tags |