1 7 package org.jboss.cache.interceptors; 8 9 import org.jboss.cache.CacheException; 10 import org.jboss.cache.CacheSPI; 11 import org.jboss.cache.GlobalTransaction; 12 import org.jboss.cache.InvocationContext; 13 import org.jboss.cache.OptimisticTransactionEntry; 14 import org.jboss.cache.ReplicationException; 15 import org.jboss.cache.TransactionEntry; 16 import org.jboss.cache.config.Configuration; 17 import org.jboss.cache.config.Option; 18 import org.jboss.cache.marshall.MethodCall; 19 import org.jboss.cache.marshall.MethodCallFactory; 20 import org.jboss.cache.marshall.MethodDeclarations; 21 import org.jboss.cache.optimistic.DataVersion; 22 23 import javax.transaction.Status ; 24 import javax.transaction.Synchronization ; 25 import javax.transaction.SystemException ; 26 import javax.transaction.Transaction ; 27 import java.util.HashMap ; 28 import java.util.List ; 29 import java.util.Map ; 30 import java.util.concurrent.ConcurrentHashMap ; 31 32 41 public class TxInterceptor extends BaseTransactionalContextInterceptor implements TxInterceptorMBean 42 { 43 46 private Map transactions = new ConcurrentHashMap (16); 47 private Map rollbackTransactions = new ConcurrentHashMap (16); 48 private long m_prepares = 0; 49 private long m_commits = 0; 50 private long m_rollbacks = 0; 51 final static Object NULL = new Object (); 52 53 58 private Map remoteTransactions = new ConcurrentHashMap (); 59 60 public Object invoke(MethodCall m) throws Throwable 61 { 62 if (log.isTraceEnabled()) 63 { 64 log.trace("(" + cache.getLocalAddress() + ") call on method [" + m + "]"); 65 } 66 if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m); 68 69 InvocationContext ctx = cache.getInvocationContext(); 70 71 boolean scrubTxsOnExit = false; 72 Option optionOverride = ctx.getOptionOverrides(); 73 74 Object result = null; 75 76 try 77 { 78 82 if (MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId())) 83 { 84 if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction()); 86 87 if (ctx.getGlobalTransaction().isRemote()) remoteTransactions.put(ctx.getGlobalTransaction(), NULL); 88 89 switch (m.getMethodId()) 90 { 91 case MethodDeclarations.optimisticPrepareMethod_id: 92 case MethodDeclarations.prepareMethod_id: 93 if (ctx.getGlobalTransaction().isRemote()) 94 { 95 result = handleRemotePrepare(m, ctx.getGlobalTransaction()); 96 scrubTxsOnExit = true; 97 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 98 { 99 m_prepares++; 100 } 101 } 102 else 103 { 104 if (log.isTraceEnabled()) log.trace("received my own message (discarding it)"); 105 result = null; 106 } 107 break; 108 case MethodDeclarations.commitMethod_id: 109 case MethodDeclarations.rollbackMethod_id: 110 if (ctx.getGlobalTransaction().isRemote()) 111 { 112 result = handleRemoteCommitRollback(m, ctx.getGlobalTransaction()); 113 scrubTxsOnExit = true; 114 } 115 else 116 { 117 if (log.isTraceEnabled()) log.trace("received my own message (discarding it)"); 118 result = null; 119 } 120 break; 121 } 122 } 123 else 124 { 125 result = handleNonTxMethod(m); 127 } 128 } 129 catch (Exception e) 130 { 131 log.info("There was a problem handling this request", e); 132 if (optionOverride == null || !optionOverride.isFailSilently()) throw e; 133 } 134 finally 135 { 136 140 if (scrubTxsOnExit) 141 { 142 setTransactionalContext(null, null); 143 } 144 } 145 return result; 146 } 147 148 public long getPrepares() 149 { 150 return m_prepares; 151 } 152 153 public long getCommits() 154 { 155 return m_commits; 156 } 157 158 public long getRollbacks() 159 { 160 return m_rollbacks; 161 } 162 163 public void resetStatistics() 164 { 165 m_prepares = 0; 166 m_commits = 0; 167 m_rollbacks = 0; 168 } 169 170 public Map <String , Object > dumpStatistics() 171 { 172 Map <String , Object > retval = new HashMap <String , Object >(3); 173 retval.put("Prepares", m_prepares); 174 retval.put("Commits", m_commits); 175 retval.put("Rollbacks", m_rollbacks); 176 return retval; 177 } 178 179 private Object handleRemotePrepare(MethodCall m, GlobalTransaction gtx) throws Throwable 180 { 181 List <MethodCall> modifications = (List <MethodCall>) m.getArgs()[1]; 182 boolean onePhase = (Boolean ) m.getArgs()[configuration.isNodeLockingOptimistic() ? 4 : 3]; 183 184 Transaction ltx = txTable.getLocalTransaction(gtx); 186 187 Transaction currentTx = txManager.getTransaction(); 188 Object retval = null; 189 190 try 191 { 192 if (ltx == null) 193 { 194 if (currentTx != null) txManager.suspend(); 195 ltx = createLocalTxForGlobalTx(gtx); if (log.isDebugEnabled()) 197 { 198 log.debug("(" + cache.getLocalAddress() + "): started new local TX as result of remote PREPARE: local TX=" + ltx + ", global TX=" + gtx); 199 } 200 } 201 else 202 { 203 if (!isValid(ltx)) throw new CacheException("Transaction " + ltx + " not in correct state to be prepared"); 205 206 if (currentTx == null || !ltx.equals(currentTx)) 208 { 209 txManager.suspend(); 210 txManager.resume(ltx); 211 } 212 } 213 214 215 if (log.isTraceEnabled()) {log.trace("Resuming existing transaction " + ltx + ", global TX=" + gtx);} 216 217 219 if (txTable.get(gtx) == null) 224 { 225 227 TransactionEntry entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry(); 228 entry.setTransaction(ltx); 229 log.debug("creating new tx entry"); 230 txTable.put(gtx, entry); 231 if (log.isTraceEnabled()) log.trace("TxTable contents: " + txTable); 232 } 233 234 registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, cache)); 236 237 if (configuration.isNodeLockingOptimistic()) 238 { 239 retval = handleOptimisticPrepare(m, gtx, modifications, onePhase, ltx); 240 } 241 else 242 { 243 retval = handlePessimisticPrepare(m, gtx, modifications, onePhase, ltx); 244 } 245 } 246 finally 247 { 248 txManager.suspend(); if (currentTx != null) txManager.resume(currentTx); 251 if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx); 252 } 253 254 return retval; 255 } 256 257 261 269 private Object handleNonTxMethod(MethodCall m) throws Throwable 270 { 271 InvocationContext ctx = cache.getInvocationContext(); 272 Transaction tx = ctx.getTransaction(); 273 Object result; 274 boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx == null; 276 if (implicitTransaction) 277 { 278 tx = createLocalTx(); 279 ctx.setTransaction(tx); 281 } 282 if (tx != null) m = attachGlobalTransaction(tx, m); 283 284 GlobalTransaction gtx = ctx.getGlobalTransaction(); 285 286 try 287 { 288 result = super.invoke(m); 289 if (implicitTransaction) 290 { 291 copyInvocationScopeOptionsToTxScope(ctx); 292 txManager.commit(); 293 } 294 } 295 catch (Throwable t) 296 { 297 if (implicitTransaction) 298 { 299 log.warn("Rolling back, exception encountered", t); 300 result = t; 301 try 302 { 303 setTransactionalContext(tx, gtx); 304 txManager.rollback(); 305 } 306 catch (Throwable th) 307 { 308 log.warn("Roll back failed encountered", th); 309 } 310 } 311 else 312 { 313 throw t; 314 } 315 } 316 return result; 317 } 318 319 private MethodCall attachGlobalTransaction(Transaction tx, MethodCall m) throws Exception 320 { 321 if (log.isDebugEnabled()) 322 { 323 log.debug(" local transaction exists - registering global tx if not present for " + Thread.currentThread()); 324 } 325 if (log.isTraceEnabled()) 326 { 327 GlobalTransaction tempGtx = txTable.get(tx); 328 log.trace("Associated gtx in txTable is " + tempGtx); 329 } 330 331 GlobalTransaction gtx = registerTransaction(tx); 333 if (gtx != null) 334 { 335 m = replaceGtx(m, gtx); 336 } 337 else 338 { 339 gtx = txTable.get(tx); 341 } 342 343 cache.getInvocationContext().setGlobalTransaction(gtx); 345 346 return m; 347 } 348 349 360 private Object handleOptimisticPrepare(MethodCall m, GlobalTransaction gtx, List <MethodCall> modifications, boolean onePhase, Transaction ltx) throws Throwable 361 { 362 Object retval; 363 if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx); 364 replayModifications(modifications, ltx, true); 365 retval = super.invoke(m); 366 if (!isActive(ltx)) 368 { 369 throw new ReplicationException("prepare() failed -- " + 370 "local transaction status is not STATUS_ACTIVE;" + 371 " is " + ltx.getStatus()); 372 } 373 return retval; 374 } 375 376 private Object handlePessimisticPrepare(MethodCall m, GlobalTransaction gtx, List <MethodCall> modifications, boolean commit, Transaction ltx) throws Exception 377 { 378 boolean success = true; 379 Object retval; 380 try 381 { 382 try 384 { 385 replayModifications(modifications, ltx, false); 386 if (isOnePhaseCommitPrepareMehod(m)) 387 { 388 log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler."); 389 } 390 else 391 { 392 super.invoke(m); 393 } 394 395 if (!isActive(ltx)) 397 { 398 throw new ReplicationException("prepare() failed -- " + 399 "local transaction status is not STATUS_ACTIVE;" + 400 " is " + ltx.getStatus()); 401 } 402 } 403 catch (Throwable th) 404 { 405 log.error("prepare method invocation failed", th); 406 retval = th; 407 success = false; 408 if (retval instanceof Exception ) 409 { 410 throw (Exception ) retval; 411 } 412 } 413 } 414 finally 415 { 416 417 if (log.isTraceEnabled()) {log.trace("Are we running a 1-phase commit? " + commit);} 418 422 if (commit) 423 { 424 try 425 { 426 if (success) 428 { 429 ltx.commit(); 430 } 431 else 432 { 433 ltx.rollback(); 434 } 435 } 436 catch (Throwable t) 437 { 438 log.error("Commit/rollback failed.", t); 439 if (success) 440 { 441 try 443 { 444 log.info("Attempting anotehr rollback"); 445 ltx.rollback(); 447 } 448 catch (Throwable t2) 449 { 450 log.error("Unable to rollback", t2); 451 } 452 } 453 } 454 finally 455 { 456 transactions.remove(ltx); remoteTransactions.remove(gtx); } 459 } 460 } 461 return null; 462 } 463 464 private Object replayModifications(List <MethodCall> modifications, Transaction tx, boolean injectDataVersions) 465 { 466 Object retval = null; 467 468 if (modifications != null) 469 { 470 for (MethodCall modification : modifications) 471 { 472 try 473 { 474 if (injectDataVersions && !MethodDeclarations.isDataGravitationMethod(modification.getMethodId())) 475 { 476 Object [] origArgs = modification.getArgs(); 477 injectDataVersion(origArgs[origArgs.length - 1]); 480 Object [] args = new Object [origArgs.length - 1]; 482 System.arraycopy(origArgs, 0, args, 0, args.length); 483 484 retval = super.invoke(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(modification.getMethodId()), args)); 485 } 486 else 487 { 488 retval = super.invoke(modification); 489 } 490 if (!isActive(tx)) 491 { 492 throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + tx.getStatus()); 493 } 494 } 495 catch (Throwable t) 496 { 497 log.error("method invocation failed", t); 498 retval = t; 499 } 500 finally 501 { 502 if (injectDataVersions) cache.getInvocationContext().setOptionOverrides(null); 504 } 505 if (retval != null && retval instanceof Exception ) 506 { 507 throw new RuntimeException ((Exception ) retval); 508 } 509 } 510 } 511 return retval; 513 } 514 515 public void injectDataVersion(Object obj) 516 { 517 if (obj instanceof DataVersion) 518 { 519 Option o = new Option(); 520 o.setDataVersion((DataVersion) obj); 521 cache.getInvocationContext().setOptionOverrides(o); 522 } 523 else 524 { 525 log.debug("Object " + obj + " is not a DataVersion, not applying to this mod."); 526 } 527 } 528 529 536 private Object handleRemoteCommitRollback(MethodCall m, GlobalTransaction gtx) throws Throwable 537 { 538 Transaction ltx; 539 try 540 { 541 ltx = getLocalTxForGlobalTx(gtx); 542 } 543 catch (IllegalStateException e) 544 { 545 if (m.getMethodId() == MethodDeclarations.rollbackMethod_id) 546 { 547 log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?"); 548 return null; 549 } 550 else 551 { 552 throw e; 553 } 554 } 555 556 Transaction currentTx = txManager.getTransaction(); 558 boolean resumeCurrentTxOnCompletion = false; 559 try 560 { 561 if (!ltx.equals(currentTx)) 562 { 563 currentTx = txManager.suspend(); 564 resumeCurrentTxOnCompletion = true; 565 txManager.resume(ltx); 566 cache.getInvocationContext().setTransaction(ltx); 568 } 569 if (log.isDebugEnabled()) log.debug(" executing " + m + "() with local TX " + ltx + " under global tx " + gtx); 570 571 if (m.getMethodId() == MethodDeclarations.commitMethod_id) 575 { 576 txManager.commit(); 577 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 578 { 579 m_commits++; 580 } 581 } 582 else 583 { 584 txManager.rollback(); 585 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 586 { 587 m_rollbacks++; 588 } 589 } 590 } 591 finally 592 { 593 if (resumeCurrentTxOnCompletion) 595 { 596 if (log.isTraceEnabled()) log.trace("Resuming suspended transaction " + currentTx); 597 txManager.suspend(); 598 if (currentTx != null) 599 { 600 txManager.resume(currentTx); 601 cache.getInvocationContext().setTransaction(currentTx); 602 } 603 } 604 605 remoteTransactions.remove(gtx); 607 transactions.remove(ltx); 608 609 txTable.remove(gtx); 611 txTable.remove(ltx); 612 } 613 614 if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx); 615 616 return null; 617 } 618 619 private Transaction getLocalTxForGlobalTx(GlobalTransaction gtx) throws IllegalStateException 620 { 621 Transaction ltx = txTable.getLocalTransaction(gtx); 622 if (ltx != null) 623 { 624 if (log.isDebugEnabled()) log.debug("Found local TX=" + ltx + ", global TX=" + gtx); 625 } 626 else 627 { 628 throw new IllegalStateException (" found no local TX for global TX " + gtx); 629 } 630 return ltx; 631 } 632 633 641 private Object handleCommitRollback(MethodCall m) throws Throwable 642 { 643 GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction(); 645 Object result; 646 647 651 653 655 result = super.invoke(m); 656 657 if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx); 658 return result; 659 } 660 661 665 670 protected void runCommitPhase(GlobalTransaction gtx, Transaction tx, List modifications, boolean onePhaseCommit) 671 { 672 cache.getInvocationContext().setTxHasMods(modifications != null && modifications.size() > 0); 674 try 675 { 676 MethodCall commitMethod; 677 if (onePhaseCommit) 678 { 679 if (configuration.isNodeLockingOptimistic()) 681 { 682 commitMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, 683 gtx, modifications, null, cache.getLocalAddress(), true); 684 } 685 else 686 { 687 commitMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod, 688 gtx, modifications, cache.getLocalAddress(), 689 true); 690 } 691 } 692 else 693 { 694 commitMethod = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx); 695 } 696 697 if (log.isTraceEnabled()) {log.trace(" running commit for " + gtx);} 698 handleCommitRollback(commitMethod); 699 } 700 catch (Throwable e) 701 { 702 log.warn("Commit failed. Clearing stale locks."); 703 try 704 { 705 cleanupStaleLocks(gtx); 706 } 707 catch (Throwable e2) 708 { 709 log.error("Unable to clear stale locks", e2); 710 throw new RuntimeException (e2); 711 } 712 throw new RuntimeException ("Commit failed.", e); 713 } 714 } 715 716 717 private void cleanupStaleLocks(GlobalTransaction gtx) throws Throwable 718 { 719 TransactionEntry entry = txTable.get(gtx); 720 if (entry != null) 721 { 722 entry.releaseAllLocksLIFO(gtx); 723 } 724 } 725 726 731 protected void runRollbackPhase(GlobalTransaction gtx, Transaction tx, List modifications) 732 { 733 try 735 { 736 cache.getInvocationContext().setTxHasMods(modifications != null && modifications.size() > 0); 737 MethodCall rollbackMethod = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx); 740 if (log.isTraceEnabled()) {log.trace(" running rollback for " + gtx);} 741 742 rollbackTransactions.put(tx, gtx); 746 747 handleCommitRollback(rollbackMethod); 748 } 749 catch (Throwable e) 750 { 751 log.warn("Rollback had a problem", e); 752 } 753 finally 754 { 755 if (tx != null) rollbackTransactions.remove(tx); 756 } 757 } 758 759 766 protected Object runPreparePhase(GlobalTransaction gtx, List modifications) throws Throwable 767 { 768 MethodCall prepareMethod; 770 if (configuration.isNodeLockingOptimistic()) 774 { 775 prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, modifications, null, cache.getLocalAddress(), false); 776 } 777 else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC) 778 { 779 prepareMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod, 780 gtx, modifications, cache.getLocalAddress(), 781 false); } 783 else 785 { 786 log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()"); 788 return null; 789 } 790 791 Object result; 795 796 Transaction ltx = txTable.getLocalTransaction(gtx); 798 799 if (txManager.getTransaction() != null && ltx != null && txManager.getTransaction().equals(ltx)) 801 { 802 result = super.invoke(prepareMethod); 803 } 804 else 805 { 806 log.warn("Local transaction does not exist or does not match expected transaction " + gtx); 807 throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx); 808 } 809 return result; 810 } 811 812 816 817 824 private GlobalTransaction registerTransaction(Transaction tx) throws Exception 825 { 826 GlobalTransaction gtx; 827 if (isValid(tx) && transactions.put(tx, NULL) == null) 828 { 829 gtx = cache.getCurrentTransaction(tx, true); 830 if (gtx.isRemote()) 831 { 832 if (log.isTraceEnabled()) {log.trace("is a remotely initiated gtx so no need to register a tx for it");} 834 } 835 else 836 { 837 if (log.isTraceEnabled()) {log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);} 838 LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, cache); 839 registerHandler(tx, myHandler); 840 } 841 } 842 else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null) 843 { 844 if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered and is rolling back."); 845 } 846 else 847 { 848 if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered."); 849 850 } 851 return gtx; 852 } 853 854 861 private void registerHandler(Transaction tx, RemoteSynchronizationHandler handler) throws Exception 862 { 863 OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(tx); 864 865 if (log.isTraceEnabled()) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")"); 866 867 orderedHandler.registerAtHead(handler); } 869 870 873 private MethodCall replaceGtx(MethodCall m, GlobalTransaction gtx) 874 { 875 Class [] argClasses = m.getMethod().getParameterTypes(); 876 Object [] args = m.getArgs(); 877 878 for (int i = 0; i < argClasses.length; i++) 879 { 880 if (argClasses[i].equals(GlobalTransaction.class)) 881 { 882 if (!gtx.equals(args[i])) 883 { 884 args[i] = gtx; 885 m.setArgs(args); 886 } 887 break; 888 } 889 } 890 return m; 891 } 892 893 899 private Transaction createLocalTx() throws Exception 900 { 901 if (log.isTraceEnabled()) {log.trace("Creating transaction for thread " + Thread.currentThread());} 902 Transaction localTx; 903 if (txManager == null) throw new Exception ("Failed to create local transaction; TransactionManager is null"); 904 txManager.begin(); 905 localTx = txManager.getTransaction(); 906 return localTx; 907 } 908 909 916 private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx) throws Exception 917 { 918 Transaction localTx = createLocalTx(); 919 txTable.put(localTx, gtx); 920 cache.getInvocationContext().setTransaction(localTx); 922 if (log.isTraceEnabled()) log.trace("Created new tx for gtx " + gtx); 923 return localTx; 924 } 925 926 930 932 class RemoteSynchronizationHandler implements Synchronization 933 { 934 Transaction tx = null; 935 GlobalTransaction gtx = null; 936 CacheSPI cache = null; 937 List modifications = null; 938 TransactionEntry entry = null; 939 940 941 RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache) 942 { 943 this.gtx = gtx; 944 this.tx = tx; 945 this.cache = cache; 946 } 947 948 public void beforeCompletion() 949 { 950 if (log.isTraceEnabled()) log.trace("Running beforeCompletion on gtx " + gtx); 951 entry = txTable.get(gtx); 952 if (entry == null) 953 { 954 log.error("Transaction has a null transaction entry - beforeCompletion() will fail."); 955 log.error("TxTable contents: " + txTable); 956 throw new IllegalStateException ("cannot find transaction entry for " + gtx); 957 } 958 959 modifications = entry.getModifications(); 960 } 961 962 public void afterCompletion(int status) 965 { 966 try 967 { 968 setTransactionalContext(tx, gtx); 969 970 try 971 { 972 if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx); 973 } 974 catch (Exception e) 975 { 976 e.printStackTrace(); 977 } 978 979 980 if (log.isTraceEnabled()) log.trace("calling aftercompletion for " + gtx); 981 if ((entry = txTable.get(gtx)) != null) 983 { 984 modifications = entry.getModifications(); 985 cache.getInvocationContext().setOptionOverrides(entry.getOption()); 986 } 987 transactions.remove(tx); 988 989 switch (status) 990 { 991 case Status.STATUS_COMMITTED: 992 993 boolean onePhaseCommit = !configuration.isNodeLockingOptimistic() && configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC; 995 if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit); 996 runCommitPhase(gtx, tx, modifications, onePhaseCommit); 997 log.debug("Finished commit phase"); 998 break; 999 1000 case Status.STATUS_MARKED_ROLLBACK: 1001 case Status.STATUS_ROLLEDBACK: 1002 log.debug("Running rollback phase"); 1003 runRollbackPhase(gtx, tx, modifications); 1004 log.debug("Finished rollback phase"); 1005 break; 1006 1007 default: 1008 throw new IllegalStateException ("illegal status: " + status); 1009 } 1010 } 1011 finally 1012 { 1013 txTable.remove(gtx); 1015 txTable.remove(tx); 1016 setTransactionalContext(null, null); 1017 } 1018 } 1019 1020 public String toString() 1021 { 1022 return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + tx + ")"; 1023 } 1024 } 1025 1026 class LocalSynchronizationHandler extends RemoteSynchronizationHandler 1027 { 1028 private boolean localRollbackOnly = true; 1029 1030 LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache) 1031 { 1032 super(gtx, tx, cache); 1033 } 1034 1035 public void beforeCompletion() 1036 { 1037 super.beforeCompletion(); 1038 setTransactionalContext(tx, gtx); 1041 if (modifications.size() == 0) 1042 { 1043 if (log.isTraceEnabled()) log.trace("No modifications in this tx. Skipping beforeCompletion()"); 1044 return; 1045 } 1046 1047 cache.getInvocationContext().setOptionOverrides(entry.getOption()); 1049 1050 try 1051 { 1052 switch (tx.getStatus()) 1053 { 1054 case Status.STATUS_ACTIVE: 1056 case Status.STATUS_PREPARING: 1057 Object result = runPreparePhase(gtx, modifications); 1059 1060 if (result instanceof Throwable ) 1061 { 1062 tx.setRollbackOnly(); 1063 throw (Throwable ) result; 1064 } 1065 break; 1066 default: 1067 throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unbale to start transaction"); 1068 } 1069 } 1070 catch (Throwable t) 1071 { 1072 try 1073 { 1074 tx.setRollbackOnly(); 1075 } 1076 catch (SystemException se) 1077 { 1078 throw new RuntimeException ("setting tx rollback failed ", se); 1079 } 1080 throw new RuntimeException ("", t); 1081 } 1082 finally 1083 { 1084 localRollbackOnly = false; 1085 setTransactionalContext(null, null); 1086 } 1087 } 1088 1089 public void afterCompletion(int status) 1090 { 1091 cache.getInvocationContext().setLocalRollbackOnly(localRollbackOnly); 1092 super.afterCompletion(status); 1093 } 1094 1095 public String toString() 1096 { 1097 return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + tx + ")"; 1098 } 1099 } 1100} | Popular Tags |