1 24 25 package org.objectweb.cjdbc.controller.scheduler; 26 27 import java.sql.SQLException ; 28 29 import org.objectweb.cjdbc.common.exceptions.RollbackException; 30 import org.objectweb.cjdbc.common.i18n.Translate; 31 import org.objectweb.cjdbc.common.log.Trace; 32 import org.objectweb.cjdbc.common.sql.AbstractRequest; 33 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 34 import org.objectweb.cjdbc.common.sql.SelectRequest; 35 import org.objectweb.cjdbc.common.sql.StoredProcedure; 36 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema; 37 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 38 import org.objectweb.cjdbc.common.xml.XmlComponent; 39 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 40 41 53 public abstract class AbstractScheduler implements XmlComponent 54 { 55 56 68 protected int raidbLevel; 69 protected int parsingGranularity; 70 71 private long tid; 73 private int sid; 74 private boolean suspendedTransactions = false; 75 private int pendingTransactions; 76 private Object transactionSync = new Object (); 77 private Object endOfCurrentTransactions = new Object (); 78 79 private boolean suspendedWrites = false; 81 private int pendingWrites; 82 private Object writesSync = new Object (); 83 private Object endOfCurrentWrites = new Object (); 84 85 protected static Trace logger = Trace 86 .getLogger("org.objectweb.cjdbc.controller.scheduler"); 87 88 private int numberRead = 0; 90 private int numberWrite = 0; 91 92 96 103 public AbstractScheduler(int raidbLevel, int parsingGranularity) 104 { 105 this.raidbLevel = raidbLevel; 106 this.parsingGranularity = parsingGranularity; 107 this.tid = 0; 108 this.sid = 0; 109 this.pendingTransactions = 0; 110 this.pendingWrites = 0; 111 } 112 113 117 123 public final void initializeTransactionId(long transactionId) 124 { 125 this.tid = transactionId; 126 } 127 128 133 public final int getParsingGranularity() 134 { 135 return parsingGranularity; 136 } 137 138 143 public final void setParsingGranularity(int parsingGranularity) 144 { 145 this.parsingGranularity = parsingGranularity; 146 } 147 148 153 public final int getPendingWrites() 154 { 155 return pendingWrites; 156 } 157 158 163 public final int getRAIDbLevel() 164 { 165 return raidbLevel; 166 } 167 168 173 public final void setRAIDbLevel(int raidbLevel) 174 { 175 this.raidbLevel = raidbLevel; 176 } 177 178 186 public void setDatabaseSchema(DatabaseSchema dbs) 187 { 188 if (logger.isInfoEnabled()) 189 logger.info(Translate.get("scheduler.doesnt.support.schemas")); 190 } 191 192 198 public void mergeDatabaseSchema(DatabaseSchema dbs) 199 { 200 logger.info(Translate.get("scheduler.doesnt.support.schemas")); 201 } 202 203 208 public synchronized int incrementSavepointId() 209 { 210 sid++; 211 return sid; 212 } 213 214 218 226 public abstract void scheduleReadRequest(SelectRequest request) 227 throws SQLException ; 228 229 234 public abstract void readCompletedNotify(SelectRequest request); 235 236 241 public final void readCompleted(SelectRequest request) 242 { 243 numberRead++; 244 this.readCompletedNotify(request); 245 } 246 247 259 public final void scheduleWriteRequest(AbstractWriteRequest request) 260 throws SQLException , RollbackException 261 { 262 suspendWriteIfNeeded(request); 263 scheduleNonSuspendedWriteRequest(request); 264 } 265 266 275 public abstract void scheduleNonSuspendedWriteRequest( 276 AbstractWriteRequest request) throws SQLException , RollbackException; 277 278 290 public final void writeCompleted(AbstractWriteRequest request) 291 { 292 synchronized (writesSync) 293 { 294 pendingWrites--; 295 296 if (logger.isDebugEnabled()) 297 logger.debug("Write completed, remaining pending writes: " 298 + pendingWrites); 299 300 notifyWriteCompleted(request); 301 302 if (suspendedWrites && (pendingWrites == 0)) 305 { 306 synchronized (endOfCurrentWrites) 307 { 308 endOfCurrentWrites.notifyAll(); 309 } 310 } 311 } 312 numberWrite++; 313 } 314 315 322 public abstract void notifyWriteCompleted(AbstractWriteRequest request); 323 324 336 public final void scheduleStoredProcedure(StoredProcedure proc) 337 throws SQLException , RollbackException 338 { 339 suspendWriteIfNeeded(proc); 340 scheduleNonSuspendedStoredProcedure(proc); 341 } 342 343 351 public abstract void scheduleNonSuspendedStoredProcedure(StoredProcedure proc) 352 throws SQLException , RollbackException; 353 354 366 public final void storedProcedureCompleted(StoredProcedure proc) 367 { 368 synchronized (writesSync) 369 { 370 pendingWrites--; 371 372 if (logger.isDebugEnabled()) 373 logger.debug("Stored procedure completed, remaining pending writes: " 374 + pendingWrites); 375 376 notifyStoredProcedureCompleted(proc); 377 378 if (suspendedWrites && (pendingWrites == 0)) 381 { 382 synchronized (endOfCurrentWrites) 383 { 384 endOfCurrentWrites.notifyAll(); 385 } 386 } 387 } 388 numberWrite++; 389 } 390 391 398 public abstract void notifyStoredProcedureCompleted(StoredProcedure proc); 399 400 407 private void suspendWriteIfNeeded(AbstractRequest request) 408 throws SQLException 409 { 410 synchronized (writesSync) 411 { 412 if (suspendedWrites) 413 { 414 try 415 { 416 int timeout = request.getTimeout(); 418 if (timeout > 0) 419 { 420 long start = System.currentTimeMillis(); 421 long lTimeout = timeout * 1000; 422 writesSync.wait(lTimeout); 423 long end = System.currentTimeMillis(); 424 int remaining = (int) (lTimeout - (end - start)); 425 if (remaining > 0) 426 request.setTimeout(remaining); 427 else 428 { 429 String msg = Translate.get("scheduler.request.timeout", 430 new String []{String.valueOf(request.getId()), 431 String.valueOf(request.getTimeout())}); 432 logger.warn(msg); 433 throw new SQLException (msg); 434 } 435 } 436 else 437 this.writesSync.wait(); 438 } 439 catch (InterruptedException e) 440 { 441 String msg = Translate.get("scheduler.request.timeout.failed", e); 442 logger.warn(msg); 443 throw new SQLException (msg); 444 } 445 } 446 pendingWrites++; 447 448 if (logger.isDebugEnabled()) 449 logger.debug("Schedule " + request.getSQL() 450 + " - Current pending writes: " + pendingWrites); 451 } 452 } 453 454 458 467 public final long begin(TransactionMarkerMetaData tm) throws SQLException 468 { 469 synchronized (writesSync) 471 { 472 if (suspendedWrites) 473 { 474 try 475 { 476 long timeout = tm.getTimeout(); 478 if (timeout > 0) 479 { 480 long start = System.currentTimeMillis(); 481 writesSync.wait(timeout); 482 long end = System.currentTimeMillis(); 483 long remaining = timeout - (end - start); 484 if (remaining > 0) 485 tm.setTimeout(remaining); 486 else 487 { 488 String msg = Translate.get("scheduler.begin.timeout.writeSync"); 489 logger.warn(msg); 490 throw new SQLException (msg); 491 } 492 } 493 else 494 writesSync.wait(); 495 } 496 catch (InterruptedException e) 497 { 498 String msg = Translate.get("scheduler.begin.timeout.writeSync") 499 + " (" + e + ")"; 500 logger.error(msg); 501 throw new SQLException (msg); 502 } 503 } 504 pendingWrites++; 505 506 if (logger.isDebugEnabled()) 507 logger.debug("Begin scheduled - current pending writes: " 508 + pendingWrites); 509 } 510 511 synchronized (transactionSync) 513 { 514 if (suspendedTransactions) 515 try 516 { 517 long timeout = tm.getTimeout(); 519 if (timeout > 0) 520 { 521 long start = System.currentTimeMillis(); 522 transactionSync.wait(timeout); 523 long end = System.currentTimeMillis(); 524 long remaining = timeout - (end - start); 525 if (remaining > 0) 526 tm.setTimeout(remaining); 527 else 528 { 529 String msg = Translate 530 .get("scheduler.begin.timeout.transactionSync"); 531 logger.warn(msg); 532 throw new SQLException (msg); 533 } 534 } 535 else 536 transactionSync.wait(); 537 } 538 catch (InterruptedException e) 539 { 540 String msg = Translate.get("scheduler.begin.timeout.transactionSync") 541 + " (" + e + ")"; 542 logger.error(msg); 543 throw new SQLException (msg); 544 } 545 tid++; 546 pendingTransactions++; 547 548 if (logger.isDebugEnabled()) 549 logger.debug("Begin scheduled - current pending transactions: " 550 + pendingTransactions); 551 return tid; 552 } 553 } 554 555 560 public final void beginCompleted(long transactionId) 561 { 562 synchronized (writesSync) 564 { 565 pendingWrites--; 566 567 if (logger.isDebugEnabled()) 568 logger.debug("Begin completed, remaining pending writes: " 569 + pendingWrites); 570 571 if (suspendedWrites && (pendingWrites == 0)) 574 { 575 synchronized (endOfCurrentWrites) 576 { 577 endOfCurrentWrites.notifyAll(); 578 } 579 } 580 } 581 } 582 583 592 public final void commit(TransactionMarkerMetaData tm) throws SQLException 593 { 594 synchronized (writesSync) 596 { 597 if (suspendedWrites) 598 { 599 try 600 { 601 long timeout = tm.getTimeout(); 603 if (timeout > 0) 604 { 605 long start = System.currentTimeMillis(); 606 writesSync.wait(timeout); 607 long end = System.currentTimeMillis(); 608 long remaining = timeout - (end - start); 609 if (remaining > 0) 610 tm.setTimeout(remaining); 611 else 612 { 613 String msg = Translate.get("scheduler.commit.timeout.writeSync"); 614 logger.warn(msg); 615 throw new SQLException (msg); 616 } 617 } 618 else 619 writesSync.wait(); 620 } 621 catch (InterruptedException e) 622 { 623 String msg = Translate.get("scheduler.commit.timeout.writeSync") 624 + " (" + e + ")"; 625 logger.error(msg); 626 throw new SQLException (msg); 627 } 628 } 629 pendingWrites++; 630 631 if (logger.isDebugEnabled()) 632 logger.debug("Commit scheduled - current pending writes: " 633 + pendingWrites); 634 } 635 commitTransaction(tm.getTransactionId()); 636 } 637 638 643 protected abstract void commitTransaction(long transactionId); 644 645 650 public final void commitCompleted(long transactionId) 651 { 652 synchronized (transactionSync) 654 { 655 pendingTransactions--; 656 657 if (logger.isDebugEnabled()) 658 logger.debug("Commit completed, remaining pending transactions: " 659 + pendingTransactions); 660 661 if (suspendedTransactions && (pendingTransactions == 0)) 665 { 666 synchronized (endOfCurrentTransactions) 667 { 668 endOfCurrentTransactions.notifyAll(); 669 } 670 } 671 } 672 synchronized (writesSync) 674 { 675 pendingWrites--; 676 677 if (logger.isDebugEnabled()) 678 logger.debug("Commit completed, remaining pending writes: " 679 + pendingWrites); 680 681 if (suspendedWrites && (pendingWrites == 0)) 684 { 685 synchronized (endOfCurrentWrites) 686 { 687 endOfCurrentWrites.notifyAll(); 688 } 689 } 690 } 691 } 692 693 702 public final void rollback(TransactionMarkerMetaData tm) throws SQLException 703 { 704 synchronized (writesSync) 706 { 707 if (suspendedWrites) 708 { 709 try 710 { 711 long timeout = tm.getTimeout(); 713 if (timeout > 0) 714 { 715 long start = System.currentTimeMillis(); 716 writesSync.wait(timeout); 717 long end = System.currentTimeMillis(); 718 long remaining = timeout - (end - start); 719 if (remaining > 0) 720 tm.setTimeout(remaining); 721 else 722 { 723 String msg = Translate 724 .get("scheduler.rollback.timeout.writeSync"); 725 logger.warn(msg); 726 throw new SQLException (msg); 727 } 728 } 729 else 730 writesSync.wait(); 731 } 732 catch (InterruptedException e) 733 { 734 String msg = Translate.get("scheduler.rollback.timeout.writeSync") 735 + " (" + e + ")"; 736 logger.error(msg); 737 throw new SQLException (msg); 738 } 739 } 740 pendingWrites++; 741 742 if (logger.isDebugEnabled()) 743 logger.debug("Rollback scheduled - current pending writes: " 744 + pendingWrites); 745 } 746 rollbackTransaction(tm.getTransactionId()); 747 } 748 749 758 public final void rollback(TransactionMarkerMetaData tm, String savepointName) 759 throws SQLException 760 { 761 synchronized (writesSync) 763 { 764 if (suspendedWrites) 765 try 766 { 767 long timeout = tm.getTimeout(); 769 if (timeout > 0) 770 { 771 long start = System.currentTimeMillis(); 772 writesSync.wait(timeout); 773 long end = System.currentTimeMillis(); 774 long remaining = timeout - (end - start); 775 if (remaining > 0) 776 tm.setTimeout(remaining); 777 else 778 { 779 String msg = Translate 780 .get("scheduler.rollbacksavepoint.timeout.writeSync"); 781 logger.warn(msg); 782 throw new SQLException (msg); 783 } 784 } 785 else 786 writesSync.wait(); 787 } 788 catch (InterruptedException e) 789 { 790 String msg = Translate 791 .get("scheduler.rollbacksavepoint.timeout.writeSync") 792 + " (" + e + ")"; 793 logger.error(msg); 794 throw new SQLException (msg); 795 } 796 pendingWrites++; 797 798 if (logger.isDebugEnabled()) 799 logger.debug("Rollback " + savepointName 800 + " scheduled - current pending writes: " + pendingWrites); 801 } 802 803 this.rollbackTransaction(tm.getTransactionId(), savepointName); 804 } 805 806 811 protected abstract void rollbackTransaction(long transactionId); 812 813 819 protected abstract void rollbackTransaction(long transactionId, 820 String savepointName); 821 822 827 public final void rollbackCompleted(long transactionId) 828 { 829 synchronized (transactionSync) 831 { 832 pendingTransactions--; 833 834 if (logger.isDebugEnabled()) 835 logger.debug("Rollback completed, remaining pending transactions: " 836 + pendingTransactions); 837 838 if (suspendedTransactions && (pendingTransactions == 0)) 842 { 843 synchronized (endOfCurrentTransactions) 844 { 845 endOfCurrentTransactions.notifyAll(); 846 } 847 } 848 } 849 synchronized (writesSync) 851 { 852 pendingWrites--; 853 854 if (logger.isDebugEnabled()) 855 logger.debug("Rollback completed, remaining pending writes: " 856 + pendingWrites); 857 858 if (suspendedWrites && (pendingWrites == 0)) 861 { 862 synchronized (endOfCurrentWrites) 863 { 864 endOfCurrentWrites.notifyAll(); 865 } 866 } 867 } 868 } 869 870 879 public final int setSavepoint(TransactionMarkerMetaData tm) 880 throws SQLException 881 { 882 synchronized (writesSync) 884 { 885 if (suspendedWrites) 886 try 887 { 888 long timeout = tm.getTimeout(); 890 if (timeout > 0) 891 { 892 long start = System.currentTimeMillis(); 893 writesSync.wait(timeout); 894 long end = System.currentTimeMillis(); 895 long remaining = timeout - (end - start); 896 if (remaining > 0) 897 tm.setTimeout(remaining); 898 else 899 { 900 String msg = Translate 901 .get("scheduler.setsavepoint.timeout.writeSync"); 902 logger.warn(msg); 903 throw new SQLException (msg); 904 } 905 } 906 else 907 writesSync.wait(); 908 } 909 catch (InterruptedException e) 910 { 911 String msg = Translate 912 .get("scheduler.setsavepoint.timeout.writeSync") 913 + " (" + e + ")"; 914 logger.error(msg); 915 throw new SQLException (msg); 916 } 917 pendingWrites++; 918 919 if (logger.isDebugEnabled()) 920 logger.debug("Set savepoint scheduled - current pending writes: " 921 + pendingWrites); 922 } 923 924 int savepointId = this.incrementSavepointId(); 925 this.setSavepointTransaction(tm.getTransactionId(), String 926 .valueOf(savepointId)); 927 return savepointId; 928 } 929 930 939 public final void setSavepoint(TransactionMarkerMetaData tm, String name) 940 throws SQLException 941 { 942 synchronized (writesSync) 944 { 945 if (suspendedWrites) 946 try 947 { 948 long timeout = tm.getTimeout(); 950 if (timeout > 0) 951 { 952 long start = System.currentTimeMillis(); 953 writesSync.wait(timeout); 954 long end = System.currentTimeMillis(); 955 long remaining = timeout - (end - start); 956 if (remaining > 0) 957 tm.setTimeout(remaining); 958 else 959 { 960 String msg = Translate 961 .get("scheduler.setsavepoint.timeout.writeSync"); 962 logger.warn(msg); 963 throw new SQLException (msg); 964 } 965 } 966 else 967 writesSync.wait(); 968 } 969 catch (InterruptedException e) 970 { 971 String msg = Translate 972 .get("scheduler.setsavepoint.timeout.writeSync") 973 + " (" + e + ")"; 974 logger.error(msg); 975 throw new SQLException (msg); 976 } 977 pendingWrites++; 978 979 if (logger.isDebugEnabled()) 980 logger.debug("Set savepoint " + name 981 + " scheduled - current pending writes: " + pendingWrites); 982 } 983 984 this.setSavepointTransaction(tm.getTransactionId(), name); 985 } 986 987 993 protected abstract void setSavepointTransaction(long transactionId, 994 String name); 995 996 1005 public final void releaseSavepoint(TransactionMarkerMetaData tm, String name) 1006 throws SQLException 1007 { 1008 synchronized (writesSync) 1010 { 1011 if (suspendedWrites) 1012 try 1013 { 1014 long timeout = tm.getTimeout(); 1016 if (timeout > 0) 1017 { 1018 long start = System.currentTimeMillis(); 1019 writesSync.wait(timeout); 1020 long end = System.currentTimeMillis(); 1021 long remaining = timeout - (end - start); 1022 if (remaining > 0) 1023 tm.setTimeout(remaining); 1024 else 1025 { 1026 String msg = Translate 1027 .get("scheduler.releasesavepoint.timeout.writeSync"); 1028 logger.warn(msg); 1029 throw new SQLException (msg); 1030 } 1031 } 1032 else 1033 writesSync.wait(); 1034 } 1035 catch (InterruptedException e) 1036 { 1037 String msg = Translate 1038 .get("scheduler.releasesavepoint.timeout.writeSync") 1039 + " (" + e + ")"; 1040 logger.error(msg); 1041 throw new SQLException (msg); 1042 } 1043 pendingWrites++; 1044 1045 if (logger.isDebugEnabled()) 1046 logger.debug("Release savepoint " + name 1047 + " scheduled - current pending writes: " + pendingWrites); 1048 } 1049 1050 this.releaseSavepointTransaction(tm.getTransactionId(), name); 1051 } 1052 1053 1059 protected abstract void releaseSavepointTransaction(long transactionId, 1060 String name); 1061 1062 1067 public final void savepointCompleted(long transactionId) 1068 { 1069 synchronized (writesSync) 1070 { 1071 pendingWrites--; 1072 1073 if (logger.isDebugEnabled()) 1074 logger.debug("Savepoint completed, remaining pending writes: " 1075 + pendingWrites); 1076 1077 if (suspendedWrites && (pendingWrites == 0)) 1080 { 1081 synchronized (endOfCurrentWrites) 1082 { 1083 endOfCurrentWrites.notifyAll(); 1084 } 1085 } 1086 } 1087 } 1088 1089 1093 1103 public final void suspendNewTransactionsForCheckpoint() throws SQLException 1104 { 1105 synchronized (transactionSync) 1106 { 1107 suspendedTransactions = true; 1108 if (pendingTransactions == 0) 1109 return; 1110 } 1111 1112 synchronized (endOfCurrentTransactions) 1113 { 1114 if (pendingTransactions == 0) 1120 return; 1121 1122 try 1124 { 1125 endOfCurrentTransactions.wait(); 1126 } 1127 catch (InterruptedException e) 1128 { 1129 String msg = Translate.get("scheduler.suspend.transaction.failed", e); 1130 logger.error(msg); 1131 throw new SQLException (msg); 1132 } 1133 } 1134 } 1135 1136 1142 public final void resumeNewTransactions() 1143 { 1144 synchronized (transactionSync) 1145 { 1146 suspendedTransactions = false; 1147 transactionSync.notifyAll(); 1149 } 1150 } 1151 1152 1161 public void suspendWrites() throws SQLException 1162 { 1163 synchronized (writesSync) 1164 { 1165 suspendedWrites = true; 1166 if (pendingWrites == 0) 1167 return; 1168 } 1169 1170 synchronized (endOfCurrentWrites) 1171 { 1172 if (pendingWrites == 0) 1178 return; 1179 1180 try 1182 { 1183 endOfCurrentWrites.wait(); 1184 } 1185 catch (InterruptedException e) 1186 { 1187 String msg = Translate.get("scheduler.suspend.writes.failed", e); 1188 logger.error(msg); 1189 throw new SQLException (msg); 1190 } 1191 } 1192 } 1193 1194 1200 public void resumeWrites() 1201 { 1202 synchronized (writesSync) 1203 { 1204 suspendedWrites = false; 1205 writesSync.notifyAll(); 1207 } 1208 } 1209 1210 1214 protected abstract String getXmlImpl(); 1215 1216 1221 public String getXml() 1222 { 1223 StringBuffer info = new StringBuffer (); 1224 info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">"); 1225 info.append(this.getXmlImpl()); 1226 info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">"); 1227 return info.toString(); 1228 } 1229 1230 1235 public String [] getSchedulerData() 1236 { 1237 String [] data = new String [7]; 1238 data[0] = "" + numberRead; 1239 data[1] = "" + numberWrite; 1240 data[2] = "" + pendingTransactions; 1241 data[3] = "" + pendingWrites; 1242 data[4] = "" + numberRead + numberWrite; 1243 data[5] = (suspendedTransactions) ? "1" : "0"; 1244 data[6] = (suspendedWrites) ? "1" : "0"; 1245 return data; 1246 } 1247 1248 1251 public int getNumberRead() 1252 { 1253 return numberRead; 1254 } 1255 1256 1259 public int getNumberWrite() 1260 { 1261 return numberWrite; 1262 } 1263 1264 1267 public int getPendingTransactions() 1268 { 1269 return pendingTransactions; 1270 } 1271 1272 1275 public boolean isSuspendedTransactions() 1276 { 1277 return suspendedTransactions; 1278 } 1279 1280 1283 public boolean isSuspendedWrites() 1284 { 1285 return suspendedWrites; 1286 } 1287} | Popular Tags |