1 23 24 package org.apache.commons.transaction.file; 25 26 import java.io.BufferedReader ; 27 import java.io.BufferedWriter ; 28 import java.io.File ; 29 import java.io.FileInputStream ; 30 import java.io.FileNotFoundException ; 31 import java.io.FileOutputStream ; 32 import java.io.IOException ; 33 import java.io.InputStream ; 34 import java.io.InputStreamReader ; 35 import java.io.OutputStream ; 36 import java.io.OutputStreamWriter ; 37 import java.util.ArrayList ; 38 import java.util.Collection ; 39 import java.util.HashMap ; 40 import java.util.List ; 41 import java.util.Map ; 42 import java.util.Iterator ; 43 import java.util.Collections ; 44 45 import org.apache.commons.transaction.locking.GenericLock; 46 import org.apache.commons.transaction.locking.GenericLockManager; 47 import org.apache.commons.transaction.locking.LockException; 48 import org.apache.commons.transaction.locking.LockManager2; 49 import org.apache.commons.transaction.util.FileHelper; 50 import org.apache.commons.transaction.util.LoggerFacade; 51 52 120 public class FileResourceManager implements ResourceManager, ResourceManagerErrorCodes { 121 122 protected static final int NATIVE_ISOLATION_LEVEL = ISOLATION_LEVEL_REPEATABLE_READ; 124 protected static final int DEFAULT_ISOLATION_LEVEL = NATIVE_ISOLATION_LEVEL; 125 126 protected static final int NO_LOCK = 0; 127 protected static final int LOCK_ACCESS = NO_LOCK + 1; 128 protected static final int LOCK_SHARED = NO_LOCK + 2; 129 protected static final int LOCK_EXCLUSIVE = NO_LOCK + 3; 130 protected static final int LOCK_COMMIT = NO_LOCK + 4; 131 132 protected static final int OPERATION_MODE_STOPPED = 0; 133 protected static final int OPERATION_MODE_STOPPING = 1; 134 protected static final int OPERATION_MODE_STARTED = 2; 135 protected static final int OPERATION_MODE_STARTING = 3; 136 protected static final int OPERATION_MODE_RECOVERING = 4; 137 138 protected static final String DEFAULT_PARAMETER_ENCODING = "ISO-8859-15"; 139 140 protected static final int DEFAULT_TIMEOUT_MSECS = 5000; 141 protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2; 142 143 protected static final String WORK_CHANGE_DIR = "change"; 144 protected static final String WORK_DELETE_DIR = "delete"; 145 146 protected static final String CONTEXT_FILE = "transaction.log"; 147 148 153 154 protected static void applyDeletes(File removeDir, File targetDir, File rootDir) throws IOException { 155 if (removeDir.isDirectory() && targetDir.isDirectory()) { 156 File [] files = removeDir.listFiles(); 157 for (int i = 0; i < files.length; i++) { 158 File removeFile = files[i]; 159 File targetFile = new File (targetDir, removeFile.getName()); 160 if (removeFile.isFile()) { 161 if (targetFile.exists()) { 162 targetFile.delete(); 163 } 164 removeFile.delete(); 166 } else { 167 applyDeletes(removeFile, targetFile, rootDir); 168 } 169 if (!targetDir.equals(rootDir) && targetDir.list().length == 0) { 171 targetDir.delete(); 172 } 173 } 174 } 175 } 176 177 182 183 protected String workDir; 184 protected String storeDir; 185 protected boolean cleanUp = true; 186 protected boolean dirty = false; 187 protected int operationMode = OPERATION_MODE_STOPPED; 188 protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS; 189 protected boolean debug; 190 191 protected LoggerFacade logger; 192 193 protected Map globalTransactions; 194 protected List globalOpenResources; 195 protected LockManager2 lockManager; 196 197 protected ResourceIdToPathMapper idMapper = null; 198 199 204 205 213 public FileResourceManager(String storeDir, String workDir, boolean urlEncodePath, LoggerFacade logger) { 214 this(storeDir, workDir, urlEncodePath, logger, false); 215 } 216 217 226 public FileResourceManager( 227 String storeDir, 228 String workDir, 229 boolean urlEncodePath, 230 LoggerFacade logger, 231 boolean debug) { 232 this(storeDir, workDir, urlEncodePath ? new URLEncodeIdMapper() : null, logger, false); 233 } 234 235 244 public FileResourceManager( 245 String storeDir, 246 String workDir, 247 ResourceIdToPathMapper idMapper, 248 LoggerFacade logger, 249 boolean debug) { 250 this.workDir = workDir; 251 this.storeDir = storeDir; 252 this.logger = logger; 253 this.debug = debug; 254 this.idMapper = idMapper; 255 } 256 257 264 public String getStoreDir() { 265 return storeDir; 266 } 267 268 275 public String getWorkDir() { 276 return workDir; 277 } 278 279 284 public LoggerFacade getLogger() { 285 return logger; 286 } 287 288 293 294 public boolean lockResource(Object resourceId, Object txId) throws ResourceManagerException { 295 lockResource(resourceId, txId, false); 296 return true; 298 } 299 300 public boolean lockResource(Object resourceId, Object txId, boolean shared) throws ResourceManagerException { 301 lockResource(resourceId, txId, shared, true, Long.MAX_VALUE, true); 302 return true; 304 } 305 306 public boolean lockResource( 307 Object resourceId, 308 Object txId, 309 boolean shared, 310 boolean wait, 311 long timeoutMSecs, 312 boolean reentrant) 313 throws ResourceManagerException { 314 315 TransactionContext context = (shared ? txInitialSaneCheck(txId) : txInitialSaneCheckForWriting(txId)); 316 assureNotMarkedForRollback(context); 317 fileInitialSaneCheck(txId, resourceId); 318 319 int level = (shared ? getSharedLockLevel(context) : LOCK_EXCLUSIVE); 321 try { 322 lockManager.lock(txId, resourceId, level, reentrant, Math.min(timeoutMSecs, 323 context.timeoutMSecs)); 324 return true; 326 } catch (LockException e) { 327 switch (e.getCode()) { 328 case LockException.CODE_INTERRUPTED: 329 throw new ResourceManagerException("Could not get lock for resource at '" 330 + resourceId + "'", ERR_NO_LOCK, txId); 331 case LockException.CODE_TIMED_OUT: 332 throw new ResourceManagerException("Lock timed out for resource at '" + resourceId 333 + "'", ERR_NO_LOCK, txId); 334 case LockException.CODE_DEADLOCK_VICTIM: 335 throw new ResourceManagerException("Deadlock victim resource at '" + resourceId 336 + "'", ERR_DEAD_LOCK, txId); 337 default : 338 throw new ResourceManagerException("Locking exception for resource at '" + resourceId 339 + "'", ERR_DEAD_LOCK, txId); 340 } 341 } 342 } 343 344 public int getDefaultIsolationLevel() { 345 return DEFAULT_ISOLATION_LEVEL; 346 } 347 348 public int[] getSupportedIsolationLevels() throws ResourceManagerException { 349 return new int[] { ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ }; 350 } 351 352 public boolean isIsolationLevelSupported(int level) throws ResourceManagerException { 353 return (level == ISOLATION_LEVEL_READ_COMMITTED || level == ISOLATION_LEVEL_REPEATABLE_READ); 354 } 355 356 359 public long getDefaultTransactionTimeout() { 360 return defaultTimeout; 361 } 362 363 368 public void setDefaultTransactionTimeout(long timeout) { 369 defaultTimeout = timeout; 370 } 371 372 public long getTransactionTimeout(Object txId) throws ResourceManagerException { 373 assureRMReady(); 374 long msecs = 0; 375 TransactionContext context = getContext(txId); 376 if (context == null) { 377 msecs = getDefaultTransactionTimeout(); 378 } else { 379 msecs = context.timeoutMSecs; 380 } 381 return msecs; 382 } 383 384 public void setTransactionTimeout(Object txId, long mSecs) throws ResourceManagerException { 385 assureRMReady(); 386 TransactionContext context = getContext(txId); 387 if (context != null) { 388 context.timeoutMSecs = mSecs; 389 } else { 390 throw new ResourceManagerException(ERR_NO_TX, txId); 391 } 392 } 393 394 public int getIsolationLevel(Object txId) throws ResourceManagerException { 395 assureRMReady(); 396 TransactionContext context = getContext(txId); 397 if (context == null) { 398 return DEFAULT_ISOLATION_LEVEL; 399 } else { 400 return context.isolationLevel; 401 } 402 } 403 404 public void setIsolationLevel(Object txId, int level) throws ResourceManagerException { 405 assureRMReady(); 406 TransactionContext context = getContext(txId); 407 if (context != null) { 408 if (level != ISOLATION_LEVEL_READ_COMMITTED || level != ISOLATION_LEVEL_REPEATABLE_READ) { 409 context.isolationLevel = level; 410 } else { 411 throw new ResourceManagerException(ERR_ISOLATION_LEVEL_UNSUPPORTED, txId); 412 } 413 } else { 414 throw new ResourceManagerException(ERR_NO_TX, txId); 415 } 416 } 417 418 public synchronized void start() throws ResourceManagerSystemException { 419 420 logger.logInfo("Starting RM at '" + storeDir + "' / '" + workDir + "'"); 421 422 operationMode = OPERATION_MODE_STARTING; 423 424 globalTransactions = Collections.synchronizedMap(new HashMap ()); 425 lockManager = new GenericLockManager(LOCK_COMMIT, logger); 426 globalOpenResources = Collections.synchronizedList(new ArrayList ()); 427 428 recover(); 429 sync(); 430 431 operationMode = OPERATION_MODE_STARTED; 432 433 if (dirty) { 434 logger.logWarning("Started RM, but in dirty mode only (Recovery of pending transactions failed)"); 435 } else { 436 logger.logInfo("Started RM"); 437 } 438 439 } 440 441 public synchronized boolean stop(int mode) throws ResourceManagerSystemException { 442 return stop(mode, getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR); 443 } 444 445 public synchronized boolean stop(int mode, long timeOut) throws ResourceManagerSystemException { 446 447 logger.logInfo("Stopping RM at '" + storeDir + "' / '" + workDir + "'"); 448 449 operationMode = OPERATION_MODE_STOPPING; 450 451 sync(); 452 boolean success = shutdown(mode, timeOut); 453 454 releaseGlobalOpenResources(); 455 456 if (success) { 457 operationMode = OPERATION_MODE_STOPPED; 458 logger.logInfo("Stopped RM"); 459 } else { 460 logger.logWarning("Failed to stop RM"); 461 } 462 463 return success; 464 } 465 466 public synchronized boolean recover() throws ResourceManagerSystemException { 467 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STARTING) { 468 throw new ResourceManagerSystemException( 469 ERR_SYSTEM, 470 "Recovery is possible in started or starting resource manager only"); 471 } 472 int oldMode = operationMode; 473 operationMode = OPERATION_MODE_RECOVERING; 474 475 recoverContexts(); 476 if (globalTransactions.size() > 0) { 477 logger.logInfo("Recovering pending transactions"); 478 } 479 480 dirty = !rollBackOrForward(); 481 482 operationMode = oldMode; 483 return dirty; 484 } 485 486 public int getTransactionState(Object txId) throws ResourceManagerException { 487 TransactionContext context = getContext(txId); 488 489 if (context == null) { 490 return STATUS_NO_TRANSACTION; 491 } else { 492 return context.status; 493 } 494 495 } 496 497 public void startTransaction(Object txId) throws ResourceManagerException { 498 499 if (logger.isFineEnabled()) logger.logFine("Starting Tx " + txId); 500 501 assureStarted(); if (txId == null || txId.toString().length() == 0) { 503 throw new ResourceManagerException(ERR_TXID_INVALID, txId); 504 } 505 506 synchronized (globalTransactions) { 508 TransactionContext context = getContext(txId); 509 510 if (context != null) { 511 throw new ResourceManagerException(ERR_DUP_TX, txId); 512 } 513 514 context = new TransactionContext(txId); 515 context.init(); 516 globalTransactions.put(txId, context); 517 518 } 519 } 520 521 public void markTransactionForRollback(Object txId) throws ResourceManagerException { 522 assureRMReady(); 523 TransactionContext context = txInitialSaneCheckForWriting(txId); 524 try { 525 context.status = STATUS_MARKED_ROLLBACK; 526 context.saveState(); 527 } finally { 528 context.finalCleanUp(); 530 } 531 } 532 533 public int prepareTransaction(Object txId) throws ResourceManagerException { 534 assureRMReady(); 535 if (dirty) { 537 throw new ResourceManagerSystemException( 538 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!", 539 ERR_SYSTEM, 540 txId); 541 } 542 543 if (txId == null) { 544 throw new ResourceManagerException(ERR_TXID_INVALID, txId); 545 } 546 547 TransactionContext context = getContext(txId); 548 549 if (context == null) { 550 return PREPARE_FAILURE; 551 } 552 553 synchronized (context) { 554 555 sync(); 556 557 if (context.status != STATUS_ACTIVE) { 558 context.status = STATUS_MARKED_ROLLBACK; 559 context.saveState(); 560 return PREPARE_FAILURE; 561 } 562 563 if (logger.isFineEnabled()) logger.logFine("Preparing Tx " + txId); 564 565 int prepareStatus = PREPARE_FAILURE; 566 567 context.status = STATUS_PREPARING; 568 context.saveState(); 569 context.closeResources(); 571 if (context.readOnly) { 572 prepareStatus = PREPARE_SUCCESS_READONLY; 573 } else { 574 try { 576 context.upgradeLockToCommit(); 577 } catch (ResourceManagerException rme) { 578 markTransactionForRollback(txId); 580 throw rme; 581 } 582 prepareStatus = PREPARE_SUCCESS; 583 } 584 context.status = STATUS_PREPARED; 585 context.saveState(); 586 if (logger.isFineEnabled()) logger.logFine("Prepared Tx " + txId); 587 588 return prepareStatus; 589 } 590 } 591 592 public void rollbackTransaction(Object txId) throws ResourceManagerException { 593 assureRMReady(); 594 TransactionContext context = txInitialSaneCheckForWriting(txId); 595 synchronized (context) { 597 try { 598 599 if (logger.isFineEnabled()) logger.logFine("Rolling back Tx " + txId); 600 601 context.status = STATUS_ROLLING_BACK; 602 context.saveState(); 603 context.rollback(); 604 context.status = STATUS_ROLLEDBACK; 605 context.saveState(); 606 globalTransactions.remove(txId); 607 context.cleanUp(); 608 609 if (logger.isFineEnabled()) logger.logFine("Rolled back Tx " + txId); 610 611 } catch (Error e) { 613 setDirty(txId, e); 614 throw e; 615 } catch (RuntimeException e) { 616 setDirty(txId, e); 617 throw e; 618 } catch (ResourceManagerSystemException e) { 619 setDirty(txId, e); 620 throw e; 621 } finally { 622 context.finalCleanUp(); 623 context.notifyFinish(); 625 } 626 } 627 } 628 629 public void commitTransaction(Object txId) throws ResourceManagerException { 630 assureRMReady(); 631 TransactionContext context = txInitialSaneCheckForWriting(txId); 632 assureNotMarkedForRollback(context); 633 634 synchronized (context) { 636 try { 637 638 if (logger.isFineEnabled()) logger.logFine("Committing Tx " + txId); 639 640 context.status = STATUS_COMMITTING; 641 context.saveState(); 642 context.commit(); 643 context.status = STATUS_COMMITTED; 644 context.saveState(); 645 globalTransactions.remove(txId); 646 context.cleanUp(); 647 648 if (logger.isFineEnabled()) logger.logFine("Committed Tx " + txId); 649 650 } catch (Error e) { 652 setDirty(txId, e); 653 throw e; 654 } catch (RuntimeException e) { 655 setDirty(txId, e); 656 throw e; 657 } catch (ResourceManagerSystemException e) { 658 setDirty(txId, e); 659 throw e; 660 } catch (ResourceManagerException e) { 662 logger.logWarning("Could not commit tx " + txId + ", rolling back instead", e); 663 rollbackTransaction(txId); 664 } finally { 665 context.finalCleanUp(); 666 context.notifyFinish(); 668 } 669 } 670 } 671 672 public boolean resourceExists(Object resourceId) throws ResourceManagerException { 673 Object txId; 675 TransactionContext context; 676 synchronized (globalTransactions) { 677 txId = generatedUniqueTxId(); 678 if (logger.isFinerEnabled()) 679 logger.logFiner("Creating temporary light weight tx " + txId + " to check for exists"); 680 context = new TransactionContext(txId); 681 context.isLightWeight = true; 682 context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED; 684 globalTransactions.put(txId, context); 686 } 687 688 boolean exists = resourceExists(txId, resourceId); 689 690 context.freeLocks(); 691 globalTransactions.remove(txId); 692 if (logger.isFinerEnabled()) 693 logger.logFiner("Removing temporary light weight tx " + txId); 694 695 return exists; 696 } 697 698 public boolean resourceExists(Object txId, Object resourceId) throws ResourceManagerException { 699 lockResource(resourceId, txId, true); 700 return (getPathForRead(txId, resourceId) != null); 701 } 702 703 public void deleteResource(Object txId, Object resourceId) throws ResourceManagerException { 704 deleteResource(txId, resourceId, true); 705 } 706 707 public void deleteResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException { 708 709 if (logger.isFineEnabled()) logger.logFine(txId + " deleting " + resourceId); 710 711 lockResource(resourceId, txId, false); 712 713 if (getPathForRead(txId, resourceId) == null) { 714 if (assureOnly) { 715 return; 716 } 717 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId); 718 } 719 String txDeletePath = getDeletePath(txId, resourceId); 720 String mainPath = getMainPath(resourceId); 721 try { 722 723 undoScheduledChangeOrCreate(txId, resourceId); 725 726 if (FileHelper.fileExists(mainPath)) { 729 FileHelper.createFile(txDeletePath); 730 } 731 } catch (IOException e) { 732 throw new ResourceManagerSystemException( 733 "Can not delete resource at '" + resourceId + "'", 734 ERR_SYSTEM, 735 txId, 736 e); 737 } 738 } 739 740 public void createResource(Object txId, Object resourceId) throws ResourceManagerException { 741 createResource(txId, resourceId, true); 742 } 743 744 public void createResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException { 745 746 if (logger.isFineEnabled()) logger.logFine(txId + " creating " + resourceId); 747 748 lockResource(resourceId, txId, false); 749 750 if (getPathForRead(txId, resourceId) != null) { 751 if (assureOnly) { 752 return; 753 } 754 throw new ResourceManagerException( 755 "Resource at '" + resourceId + "', already exists", 756 ERR_RESOURCE_EXISTS, 757 txId); 758 } 759 760 String txChangePath = getChangePath(txId, resourceId); 761 try { 762 763 if (!undoScheduledDelete(txId, resourceId)) { 765 FileHelper.createFile(txChangePath); 766 } 767 768 } catch (IOException e) { 769 throw new ResourceManagerSystemException( 770 "Can not create resource at '" + resourceId + "'", 771 ERR_SYSTEM, 772 txId, 773 e); 774 } 775 } 776 777 782 783 public InputStream readResource(Object resourceId) throws ResourceManagerException { 784 Object txId; 786 synchronized (globalTransactions) { 787 txId = generatedUniqueTxId(); 788 if (logger.isFinerEnabled()) 789 logger.logFiner("Creating temporary light weight tx " + txId + " for reading"); 790 TransactionContext context = new TransactionContext(txId); 791 context.isLightWeight = true; 792 context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED; 794 globalTransactions.put(txId, context); 796 } 797 798 InputStream is = readResource(txId, resourceId); 799 return is; 800 } 801 802 public InputStream readResource(Object txId, Object resourceId) throws ResourceManagerException { 803 804 if (logger.isFineEnabled()) logger.logFine(txId + " reading " + resourceId); 805 806 lockResource(resourceId, txId, true); 807 808 String resourcePath = getPathForRead(txId, resourceId); 809 if (resourcePath == null) { 810 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId); 811 } 812 813 File file = new File (resourcePath); 814 try { 815 FileInputStream stream = new FileInputStream (file); 816 getContext(txId).registerResource(stream); 817 return new InputStreamWrapper(stream, txId, resourceId); 818 } catch (FileNotFoundException e) { 819 throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId); 820 } 821 } 822 823 public OutputStream writeResource(Object txId, Object resourceId) throws ResourceManagerException { 824 825 if (logger.isFineEnabled()) logger.logFine(txId + " writing " + resourceId); 826 827 lockResource(resourceId, txId, false); 828 829 String resourcePath = getPathForWrite(txId, resourceId); 830 if (resourcePath == null) { 831 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId); 832 } 833 834 File file = new File (resourcePath); 835 try { 836 FileOutputStream stream = new FileOutputStream (file); 837 TransactionContext context = getContext(txId); 838 context.registerResource(stream); 839 context.readOnly = false; 840 return stream; 841 } catch (FileNotFoundException e) { 842 throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId); 843 } 844 } 845 846 851 852 855 public synchronized void reset() { 856 FileHelper.removeRec(new File (storeDir)); 857 FileHelper.removeRec(new File (workDir)); 858 new File (storeDir).mkdirs(); 859 new File (workDir).mkdirs(); 860 } 861 862 869 public synchronized void sync() throws ResourceManagerSystemException { 870 } 871 872 879 public String generatedUniqueTxId() throws ResourceManagerSystemException { 880 assureRMReady(); 881 String txId; 882 synchronized (globalTransactions) { 883 do { 884 txId = Long.toHexString(System.currentTimeMillis()); 885 } while (getContext(txId) != null); 887 } 888 return txId; 889 } 890 891 896 897 protected void fileInitialSaneCheck(Object txId, Object path) throws ResourceManagerException { 898 if (path == null || path.toString().length() == 0) { 899 throw new ResourceManagerException(ERR_RESOURCEID_INVALID, txId); 900 } 901 } 902 903 protected void assureStarted() throws ResourceManagerSystemException { 904 if (operationMode != OPERATION_MODE_STARTED) { 905 throw new ResourceManagerSystemException("Resource Manager Service not started", ERR_SYSTEM, null); 906 } 907 } 908 909 protected void assureRMReady() throws ResourceManagerSystemException { 910 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STOPPING) { 911 throw new ResourceManagerSystemException("Resource Manager Service not ready", ERR_SYSTEM, null); 912 } 913 } 914 915 protected void assureNotMarkedForRollback(TransactionContext context) throws ResourceManagerException { 916 if (context.status == STATUS_MARKED_ROLLBACK) { 917 throw new ResourceManagerException(ERR_MARKED_FOR_ROLLBACK, context.txId); 918 } 919 } 920 921 protected TransactionContext txInitialSaneCheckForWriting(Object txId) throws ResourceManagerException { 922 assureRMReady(); 923 if (dirty) { 925 throw new ResourceManagerSystemException( 926 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!", 927 ERR_SYSTEM, 928 txId); 929 } 930 return txInitialSaneCheck(txId); 931 } 932 933 protected TransactionContext txInitialSaneCheck(Object txId) throws ResourceManagerException { 934 assureRMReady(); 935 if (txId == null) { 936 throw new ResourceManagerException(ERR_TXID_INVALID, txId); 937 } 938 939 TransactionContext context = getContext(txId); 940 941 if (context == null) { 942 throw new ResourceManagerException(ERR_NO_TX, txId); 943 } 944 945 return context; 946 } 947 948 953 954 protected TransactionContext getContext(Object txId) { 955 return (TransactionContext) globalTransactions.get(txId); 956 } 957 958 protected String assureLeadingSlash(Object pathObject) { 959 String path = ""; 960 if (pathObject != null) { 961 if (idMapper != null) { 962 path = idMapper.getPathForId(pathObject); 963 } else { 964 path = pathObject.toString(); 965 } 966 if (path.length() > 0 && path.charAt(0) != '/' && path.charAt(0) != '\\') { 967 path = "/" + path; 968 } 969 } 970 return path; 971 } 972 973 protected String getMainPath(Object path) { 974 StringBuffer buf = new StringBuffer (storeDir.length() + path.toString().length() + 5); 975 buf.append(storeDir).append(assureLeadingSlash(path)); 976 return buf.toString(); 977 } 978 979 protected String getChangePath(Object txId, Object path) { 980 StringBuffer buf = new StringBuffer (txId.toString().length() + path.toString().length() 981 + WORK_CHANGE_DIR.length() + workDir.length() + 5); 982 buf.append(workDir).append('/').append(txId.toString()).append('/').append(WORK_CHANGE_DIR).append( 983 assureLeadingSlash(path)); 984 return buf.toString(); 985 } 986 987 protected String getDeletePath(Object txId, Object path) { 988 StringBuffer buf = new StringBuffer (txId.toString().length() + path.toString().length() 989 + WORK_DELETE_DIR.length() + workDir.length() + 5); 990 buf.append(workDir).append('/').append(txId.toString()).append('/').append(WORK_DELETE_DIR).append( 991 assureLeadingSlash(path)); 992 return buf.toString(); 993 } 994 995 protected boolean undoScheduledDelete(Object txId, Object resourceId) throws ResourceManagerException { 996 String txDeletePath = getDeletePath(txId, resourceId); 997 File deleteFile = new File (txDeletePath); 998 if (deleteFile.exists()) { 999 if (!deleteFile.delete()) { 1000 throw new ResourceManagerSystemException( 1001 "Failed to undo delete of '" + resourceId + "'", 1002 ERR_SYSTEM, 1003 txId); 1004 } 1005 return true; 1006 } 1007 return false; 1008 } 1009 1010 protected boolean undoScheduledChangeOrCreate(Object txId, Object resourceId) throws ResourceManagerException { 1011 String txChangePath = getChangePath(txId, resourceId); 1012 File changeFile = new File (txChangePath); 1013 if (changeFile.exists()) { 1014 if (!changeFile.delete()) { 1015 throw new ResourceManagerSystemException( 1016 "Failed to undo change / create of '" + resourceId + "'", 1017 ERR_SYSTEM, 1018 txId); 1019 } 1020 return true; 1021 } 1022 return false; 1023 } 1024 1025 protected String getPathForWrite(Object txId, Object resourceId) throws ResourceManagerException { 1026 try { 1027 String txChangePath = getChangePath(txId, resourceId); 1029 if (!FileHelper.fileExists(txChangePath)) { 1030 FileHelper.createFile(txChangePath); 1031 } 1032 return txChangePath; 1033 } catch (IOException e) { 1034 throw new ResourceManagerSystemException( 1035 "Can not write to resource at '" + resourceId + "'", 1036 ERR_SYSTEM, 1037 txId, 1038 e); 1039 } 1040 } 1041 1042 protected String getPathForRead(Object txId, Object resourceId) throws ResourceManagerException { 1043 1044 String mainPath = getMainPath(resourceId); 1045 String txChangePath = getChangePath(txId, resourceId); 1046 String txDeletePath = getDeletePath(txId, resourceId); 1047 1048 1050 boolean changeExists = FileHelper.fileExists(txChangePath); 1051 boolean deleteExists = FileHelper.fileExists(txDeletePath); 1052 boolean mainExists = FileHelper.fileExists(mainPath); 1053 boolean resourceIsDir = 1054 ((mainExists && new File (mainPath).isDirectory()) 1055 || (changeExists && new File (txChangePath).isDirectory())); 1056 if (resourceIsDir) { 1057 logger.logWarning("Resource at '" + resourceId + "' maps to directory"); 1058 } 1059 1060 1062 if (!resourceIsDir && changeExists && deleteExists) { 1066 throw new ResourceManagerSystemException( 1067 "Inconsistent delete and change combination for resource at '" + resourceId + "'", 1068 ERR_TX_INCONSISTENT, 1069 txId); 1070 } 1071 1072 if (deleteExists && !mainExists) { 1074 throw new ResourceManagerSystemException( 1075 "Inconsistent delete for resource at '" + resourceId + "'", 1076 ERR_TX_INCONSISTENT, 1077 txId); 1078 } 1079 1080 if (changeExists) { 1081 return txChangePath; 1082 } else if ((mainExists && !deleteExists)) { 1083 return mainPath; 1084 } else { 1085 return null; 1086 } 1087 } 1088 1089 1094 1095 protected int getSharedLockLevel(TransactionContext context) throws ResourceManagerException { 1096 if (context.isolationLevel == ISOLATION_LEVEL_READ_COMMITTED 1097 || context.isolationLevel == ISOLATION_LEVEL_READ_UNCOMMITTED) { 1098 return LOCK_ACCESS; 1099 } else if ( 1100 context.isolationLevel == ISOLATION_LEVEL_REPEATABLE_READ 1101 || context.isolationLevel == ISOLATION_LEVEL_SERIALIZABLE) { 1102 return LOCK_SHARED; 1103 } else { 1104 return LOCK_ACCESS; 1105 } 1106 } 1107 1108 1113 1114 protected void registerOpenResource(Object openResource) { 1115 if (logger.isFinerEnabled()) 1116 logger.logFiner("Registering open resource " + openResource); 1117 globalOpenResources.add(openResource); 1118 } 1119 1120 protected void releaseGlobalOpenResources() { 1121 ArrayList copy; 1122 synchronized (globalOpenResources) { 1123 copy = new ArrayList (globalOpenResources); 1125 for (Iterator it = copy.iterator(); it.hasNext();) { 1126 Object stream = it.next(); 1127 closeOpenResource(stream); 1128 } 1129 } 1130 } 1131 1132 protected void closeOpenResource(Object openResource) { 1133 if (logger.isFinerEnabled()) logger.logFiner("Releasing resource " + openResource); 1134 globalOpenResources.remove(openResource); 1135 if (openResource instanceof InputStream ) { 1136 InputStream is = (InputStream ) openResource; 1137 try { 1138 is.close(); 1139 } catch (IOException e) { 1140 } 1142 } else if (openResource instanceof OutputStream ) { 1143 OutputStream os = (OutputStream ) openResource; 1144 try { 1145 os.close(); 1146 } catch (IOException e) { 1147 } 1149 } 1150 } 1151 1152 1157 1158 protected boolean rollBackOrForward() { 1159 boolean allCool = true; 1160 1161 synchronized (globalTransactions) { 1162 ArrayList contexts = new ArrayList (globalTransactions.values()); 1163 for (Iterator it = contexts.iterator(); it.hasNext();) { 1164 TransactionContext context = (TransactionContext) it.next(); 1165 if (context.status == STATUS_COMMITTING) { 1166 logger.logInfo("Rolling forward " + context.txId); 1168 1169 try { 1170 context.commit(); 1171 context.status = STATUS_COMMITTED; 1172 context.saveState(); 1173 globalTransactions.remove(context.txId); 1174 context.cleanUp(); 1175 } catch (ResourceManagerException e) { 1176 allCool = false; 1178 logger.logSevere("Rolling forward of " + context.txId + " failed", e); 1179 } 1180 } else if (context.status == STATUS_COMMITTED) { 1181 logger.logInfo("Cleaning already commited " + context.txId); 1182 globalTransactions.remove(context.txId); 1183 try { 1184 context.cleanUp(); 1185 } catch (ResourceManagerException e) { 1186 allCool = false; 1188 logger.logWarning("Cleaning of " + context.txId + " failed", e); 1189 } 1190 } else { 1191 if (context.status != STATUS_ROLLING_BACK 1193 && context.status != STATUS_ROLLEDBACK 1194 && context.status != STATUS_MARKED_ROLLBACK) { 1195 logger.logWarning("Irregularly rolling back " + context.txId); 1196 } else { 1197 logger.logInfo("Rolling back " + context.txId); 1198 } 1199 try { 1200 context.rollback(); 1201 context.status = STATUS_ROLLEDBACK; 1202 context.saveState(); 1203 globalTransactions.remove(context.txId); 1204 context.cleanUp(); 1205 } catch (ResourceManagerException e) { 1206 logger.logWarning("Rolling back of " + context.txId + " failed", e); 1207 } 1208 } 1210 } 1211 1212 } 1213 return allCool; 1214 } 1215 1216 protected void recoverContexts() { 1217 File dir = new File (workDir); 1218 File [] files = dir.listFiles(); 1219 if (files == null) 1220 return; 1221 for (int i = 0; i < files.length; i++) { 1222 File file = files[i]; 1223 String txId = file.getName(); 1224 if (!globalTransactions.containsKey(txId)) { 1226 1227 logger.logInfo("Recovering " + txId); 1228 TransactionContext context; 1229 try { 1230 context = new TransactionContext(txId); 1231 context.recoverState(); 1232 globalTransactions.put(txId, context); 1233 } catch (ResourceManagerException e) { 1234 logger.logWarning("Recovering of " + txId + " failed"); 1236 } 1237 } 1238 } 1239 } 1240 1241 protected boolean waitForAllTxToStop(long timeoutMSecs) { 1242 long startTime = System.currentTimeMillis(); 1243 1244 1249 Collection transactionsToStop; 1250 synchronized (globalTransactions) { 1251 transactionsToStop = new ArrayList (globalTransactions.values()); 1252 } 1253 for (Iterator it = transactionsToStop.iterator(); it.hasNext();) { 1254 long remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs; 1255 1256 if (remainingTimeout <= 0) { 1257 return false; 1258 } 1259 1260 TransactionContext context = (TransactionContext) it.next(); 1261 synchronized (context) { 1262 if (!context.finished) { 1263 logger.logInfo( 1264 "Waiting for tx " + context.txId + " to finish for " + remainingTimeout + " milli seconds"); 1265 } 1266 while (!context.finished && remainingTimeout > 0) { 1267 try { 1268 context.wait(remainingTimeout); 1269 } catch (InterruptedException e) { 1270 return false; 1271 } 1272 remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs; 1273 } 1274 if (context.finished) { 1275 logger.logInfo("Tx " + context.txId + " finished"); 1276 } else { 1277 logger.logWarning("Tx " + context.txId + " failed to finish in given time"); 1278 } 1279 } 1280 } 1281 1282 return (globalTransactions.size() == 0); 1283 } 1284 1285 protected boolean shutdown(int mode, long timeoutMSecs) { 1286 switch (mode) { 1287 case SHUTDOWN_MODE_NORMAL : 1288 return waitForAllTxToStop(timeoutMSecs); 1289 case SHUTDOWN_MODE_ROLLBACK : 1290 return rollBackOrForward(); 1291 case SHUTDOWN_MODE_KILL : 1292 return true; 1293 default : 1294 return false; 1295 } 1296 } 1297 1298 protected void setDirty(Object txId, Throwable t) { 1299 logger.logSevere( 1300 "Fatal error during critical commit/rollback of transaction " + txId + ", setting database to dirty.", 1301 t); 1302 dirty = true; 1303 } 1304 1305 1309 protected class TransactionContext { 1310 1311 protected Object txId; 1312 protected int status = STATUS_ACTIVE; 1313 protected int isolationLevel = DEFAULT_ISOLATION_LEVEL; 1314 protected long timeoutMSecs = getDefaultTransactionTimeout(); 1315 protected long startTime; 1316 protected long commitTime = -1L; 1317 protected boolean isLightWeight = false; 1318 protected boolean readOnly = true; 1319 protected boolean finished = false; 1320 1321 private List openResourcs = new ArrayList (); 1323 1324 public TransactionContext(Object txId) throws ResourceManagerException { 1325 this.txId = txId; 1326 startTime = System.currentTimeMillis(); 1327 } 1328 1329 public long getRemainingTimeout() { 1330 long now = System.currentTimeMillis(); 1331 return (startTime - now + timeoutMSecs); 1332 } 1333 1334 public synchronized void init() throws ResourceManagerException { 1335 String baseDir = workDir + "/" + txId; 1336 String changeDir = baseDir + "/" + WORK_CHANGE_DIR; 1337 String deleteDir = baseDir + "/" + WORK_DELETE_DIR; 1338 1339 new File (changeDir).mkdirs(); 1340 new File (deleteDir).mkdirs(); 1341 1342 saveState(); 1343 } 1344 1345 public synchronized void rollback() throws ResourceManagerException { 1346 closeResources(); 1347 freeLocks(); 1348 } 1349 1350 public synchronized void commit() throws ResourceManagerException { 1351 String baseDir = workDir + "/" + txId; 1352 String changeDir = baseDir + "/" + WORK_CHANGE_DIR; 1353 String deleteDir = baseDir + "/" + WORK_DELETE_DIR; 1354 1355 closeResources(); 1356 upgradeLockToCommit(); 1357 try { 1358 applyDeletes(new File (deleteDir), new File (storeDir), new File (storeDir)); 1359 FileHelper.moveRec(new File (changeDir), new File (storeDir)); 1360 } catch (IOException e) { 1361 throw new ResourceManagerSystemException("Commit failed", ERR_SYSTEM, txId, e); 1362 } 1363 freeLocks(); 1364 commitTime = System.currentTimeMillis(); 1365 } 1366 1367 public synchronized void notifyFinish() { 1368 finished = true; 1369 notifyAll(); 1370 } 1371 1372 public synchronized void cleanUp() throws ResourceManagerException { 1373 if (!cleanUp) 1374 return; boolean clean = true; 1376 Exception cleanException = null; 1377 String baseDir = workDir + "/" + txId; 1378 FileHelper.removeRec(new File (baseDir)); 1379 if (!clean) { 1380 throw new ResourceManagerSystemException( 1381 "Clean up failed due to unreleasable lock", 1382 ERR_SYSTEM, 1383 txId, 1384 cleanException); 1385 } 1386 } 1387 1388 public synchronized void finalCleanUp() throws ResourceManagerException { 1389 closeResources(); 1390 freeLocks(); 1391 } 1392 1393 public synchronized void upgradeLockToCommit() throws ResourceManagerException { 1394 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) { 1395 GenericLock lock = (GenericLock) it.next(); 1396 if (lock.getLockLevel(txId) == LOCK_EXCLUSIVE) { 1398 try { 1399 if (!lock 1401 .acquire( 1402 txId, 1403 LOCK_COMMIT, 1404 true, 1405 true, 1406 getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR)) { 1407 throw new ResourceManagerException( 1408 "Could not upgrade to commit lock for resource at '" 1409 + lock.getResourceId().toString() 1410 + "'", 1411 ERR_NO_LOCK, 1412 txId); 1413 } 1414 } catch (InterruptedException e) { 1415 throw new ResourceManagerSystemException(ERR_SYSTEM, txId, e); 1416 } 1417 } 1418 1419 } 1420 } 1421 1422 public synchronized void freeLocks() { 1423 lockManager.releaseAll(txId); 1424 } 1425 1426 public synchronized void closeResources() { 1427 synchronized (globalOpenResources) { 1428 for (Iterator it = openResourcs.iterator(); it.hasNext();) { 1429 Object stream = it.next(); 1430 closeOpenResource(stream); 1431 } 1432 } 1433 } 1434 1435 public synchronized void registerResource(Object openResource) { 1436 synchronized (globalOpenResources) { 1437 registerOpenResource(openResource); 1438 openResourcs.add(openResource); 1439 } 1440 } 1441 1442 public synchronized void saveState() throws ResourceManagerException { 1443 String statePath = workDir + "/" + txId + "/" + CONTEXT_FILE; 1444 File file = new File (statePath); 1445 BufferedWriter writer = null; 1446 try { 1447 OutputStream os = new FileOutputStream (file); 1448 writer = new BufferedWriter (new OutputStreamWriter (os, DEFAULT_PARAMETER_ENCODING)); 1449 writer.write(toString()); 1450 } catch (FileNotFoundException e) { 1451 String msg = "Saving status information to '" + statePath + "' failed! Could not create file"; 1452 logger.logSevere(msg, e); 1453 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e); 1454 } catch (IOException e) { 1455 String msg = "Saving status information to '" + statePath + "' failed"; 1456 logger.logSevere(msg, e); 1457 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e); 1458 } finally { 1459 if (writer != null) { 1460 try { 1461 writer.close(); 1462 } catch (IOException e) { 1463 } 1464 1465 } 1466 } 1467 } 1468 1469 public synchronized void recoverState() throws ResourceManagerException { 1470 String statePath = workDir + "/" + txId + "/" + CONTEXT_FILE; 1471 File file = new File (statePath); 1472 BufferedReader reader = null; 1473 try { 1474 InputStream is = new FileInputStream (file); 1475 1476 reader = new BufferedReader (new InputStreamReader (is, DEFAULT_PARAMETER_ENCODING)); 1477 txId = reader.readLine(); 1478 status = Integer.parseInt(reader.readLine()); 1479 isolationLevel = Integer.parseInt(reader.readLine()); 1480 timeoutMSecs = Long.parseLong(reader.readLine()); 1481 startTime = Long.parseLong(reader.readLine()); 1482 } catch (FileNotFoundException e) { 1483 String msg = "Recovering status information from '" + statePath + "' failed! Could not find file"; 1484 logger.logSevere(msg, e); 1485 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId); 1486 } catch (IOException e) { 1487 String msg = "Recovering status information from '" + statePath + "' failed"; 1488 logger.logSevere(msg, e); 1489 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e); 1490 } catch (Throwable t) { 1491 String msg = "Recovering status information from '" + statePath + "' failed"; 1492 logger.logSevere(msg, t); 1493 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, t); 1494 } finally { 1495 if (reader != null) { 1496 try { 1497 reader.close(); 1498 } catch (IOException e) { 1499 } 1500 1501 } 1502 } 1503 } 1504 1505 public synchronized String toString() { 1506 StringBuffer buf = new StringBuffer (); 1507 buf.append(txId).append('\n'); 1508 buf.append(Integer.toString(status)).append('\n'); 1509 buf.append(Integer.toString(isolationLevel)).append('\n'); 1510 buf.append(Long.toString(timeoutMSecs)).append('\n'); 1511 buf.append(Long.toString(startTime)).append('\n'); 1512 if (debug) { 1513 buf.append("----- Lock Debug Info -----\n"); 1514 1515 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) { 1516 GenericLock lock = (GenericLock) it.next(); 1517 buf.append(lock.toString()+"\n"); 1518 } 1519 1520 } 1521 return buf.toString(); 1522 } 1523 1524 } 1525 1526 private class InputStreamWrapper extends InputStream { 1527 private InputStream is; 1528 private Object txId; 1529 private Object resourceId; 1530 1531 public InputStreamWrapper(InputStream is, Object txId, Object resourceId) { 1532 this.is = is; 1533 this.txId = txId; 1534 this.resourceId = resourceId; 1535 } 1536 1537 public int read() throws IOException { 1538 return is.read(); 1539 } 1540 1541 public int read(byte b[]) throws IOException { 1542 return is.read(b); 1543 } 1544 1545 public int read(byte b[], int off, int len) throws IOException { 1546 return is.read(b, off, len); 1547 } 1548 1549 public int available() throws IOException { 1550 return is.available(); 1551 } 1552 1553 public void close() throws IOException { 1554 try { 1555 is.close(); 1556 } finally { 1557 TransactionContext context; 1558 synchronized (globalTransactions) { 1559 context = getContext(txId); 1560 if (context == null) { 1561 return; 1562 } 1563 } 1564 synchronized (context) { 1565 if (context.isLightWeight) { 1566 if (logger.isFinerEnabled()) 1567 logger.logFiner("Upon close of resource removing temporary light weight tx " + txId); 1568 context.freeLocks(); 1569 globalTransactions.remove(txId); 1570 } else { 1571 if (lockManager.getLevel(txId, resourceId) == LOCK_ACCESS) { 1573 if (logger.isFinerEnabled()) { 1574 logger.logFiner( 1575 "Upon close of resource releasing access lock for tx " 1576 + txId 1577 + " on resource at " 1578 + resourceId); 1579 } 1580 lockManager.release(txId, resourceId); 1581 } 1582 } 1583 } 1584 } 1585 } 1586 1587 public void mark(int readlimit) { 1588 is.mark(readlimit); 1589 } 1590 1591 public void reset() throws IOException { 1592 is.reset(); 1593 } 1594 1595 public boolean markSupported() { 1596 return is.markSupported(); 1597 1598 } 1599 } 1600} 1601 | Popular Tags |