1 17 18 package org.apache.geronimo.transaction.manager; 19 20 import java.util.ArrayList ; 21 import java.util.HashMap ; 22 import java.util.IdentityHashMap ; 23 import java.util.Iterator ; 24 import java.util.LinkedList ; 25 import java.util.List ; 26 import java.util.Map ; 27 import java.util.Set ; 28 29 import javax.transaction.HeuristicMixedException ; 30 import javax.transaction.HeuristicRollbackException ; 31 import javax.transaction.RollbackException ; 32 import javax.transaction.Status ; 33 import javax.transaction.Synchronization ; 34 import javax.transaction.SystemException ; 35 import javax.transaction.Transaction ; 36 import javax.transaction.xa.XAException ; 37 import javax.transaction.xa.XAResource ; 38 import javax.transaction.xa.Xid ; 39 import javax.ejb.EJBException ; 40 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 49 public class TransactionImpl implements Transaction { 50 private static final Log log = LogFactory.getLog("Transaction"); 51 52 private final XidFactory xidFactory; 53 private final Xid xid; 54 private final TransactionLog txnLog; 55 private final long timeout; 56 private final List syncList = new ArrayList (5); 57 private final LinkedList resourceManagers = new LinkedList (); 58 private final IdentityHashMap activeXaResources = new IdentityHashMap (3); 59 private final IdentityHashMap suspendedXaResources = new IdentityHashMap (3); 60 private int status = Status.STATUS_NO_TRANSACTION; 61 private Object logMark; 62 63 private final Map resources = new HashMap (); 64 private Synchronization interposedSynchronization; 65 private final Map entityManagers = new HashMap (); 66 67 TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException { 68 this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds); 69 } 70 71 TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException { 72 this.xidFactory = xidFactory; 73 this.txnLog = txnLog; 74 this.xid = xid; 75 this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime(); 76 try { 77 txnLog.begin(xid); 78 } catch (LogException e) { 79 status = Status.STATUS_MARKED_ROLLBACK; 80 SystemException ex = new SystemException ("Error logging begin; transaction marked for roll back)"); 81 ex.initCause(e); 82 throw ex; 83 } 84 status = Status.STATUS_ACTIVE; 85 } 86 87 public TransactionImpl(Xid xid, TransactionLog txLog) { 89 this.xidFactory = null; 90 this.txnLog = txLog; 91 this.xid = xid; 92 status = Status.STATUS_PREPARED; 93 this.timeout = Long.MAX_VALUE; 95 } 96 97 public synchronized int getStatus() { 98 return status; 99 } 100 101 public Object getResource(Object key) { 102 return resources.get(key); 103 } 104 105 public boolean getRollbackOnly() { 106 return status == Status.STATUS_MARKED_ROLLBACK; 107 } 108 109 public Object getTransactionKey() { 110 return xid; 111 } 112 113 public int getTransactionStatus() { 114 return status; 115 } 116 117 public void putResource(Object key, Object value) { 118 if (key == null) { 119 throw new NullPointerException ("You must supply a non-null key for putResource"); 120 } 121 resources.put(key, value); 122 } 123 124 public void registerInterposedSynchronization(Synchronization synchronization) { 125 interposedSynchronization = synchronization; 126 } 127 128 public synchronized void setRollbackOnly() throws IllegalStateException { 129 switch (status) { 130 case Status.STATUS_ACTIVE: 131 case Status.STATUS_PREPARING: 132 status = Status.STATUS_MARKED_ROLLBACK; 133 break; 134 case Status.STATUS_MARKED_ROLLBACK: 135 case Status.STATUS_ROLLING_BACK: 136 break; 138 default: 139 throw new IllegalStateException ("Cannot set rollback only, status is " + getStateString(status)); 140 } 141 } 142 143 public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException , RollbackException , SystemException { 144 if (synch == null) { 145 throw new IllegalArgumentException ("Synchronization is null"); 146 } 147 switch (status) { 148 case Status.STATUS_ACTIVE: 149 case Status.STATUS_PREPARING: 150 break; 151 case Status.STATUS_MARKED_ROLLBACK: 152 throw new RollbackException ("Transaction is marked for rollback"); 153 default: 154 throw new IllegalStateException ("Status is " + getStateString(status)); 155 } 156 syncList.add(synch); 157 } 158 159 public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException , RollbackException , SystemException { 160 if (xaRes == null) { 161 throw new IllegalArgumentException ("XAResource is null"); 162 } 163 switch (status) { 164 case Status.STATUS_ACTIVE: 165 break; 166 case Status.STATUS_MARKED_ROLLBACK: 167 throw new RollbackException ("Transaction is marked for rollback"); 168 default: 169 throw new IllegalStateException ("Status is " + getStateString(status)); 170 } 171 172 if (activeXaResources.containsKey(xaRes)) { 173 throw new IllegalStateException ("xaresource: " + xaRes + " is already enlisted!"); 174 } 175 176 try { 177 TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes); 178 if (manager != null) { 179 xaRes.start(manager.getBranchId(), XAResource.TMRESUME); 181 activeXaResources.put(xaRes, manager); 182 return true; 183 } 184 for (Iterator i = resourceManagers.iterator(); i.hasNext();) { 186 manager = (TransactionBranch) i.next(); 187 boolean sameRM; 188 if (xaRes == manager.getCommitter()) { 190 throw new IllegalStateException ("xaRes " + xaRes + " is a committer but is not active or suspended"); 191 } 192 try { 194 sameRM = xaRes.isSameRM(manager.getCommitter()); 195 } catch (XAException e) { 196 log.warn("Unexpected error checking for same RM", e); 197 continue; 198 } 199 if (sameRM) { 200 xaRes.start(manager.getBranchId(), XAResource.TMJOIN); 201 activeXaResources.put(xaRes, manager); 202 return true; 203 } 204 } 205 Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1); 207 xaRes.start(branchId, XAResource.TMNOFLAGS); 208 activeXaResources.put(xaRes, addBranchXid(xaRes, branchId)); 209 return true; 210 } catch (XAException e) { 211 log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e); 212 return false; 213 } 214 } 215 216 public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException , SystemException { 217 if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) { 218 throw new IllegalStateException ("invalid flag for delistResource: " + flag); 219 } 220 if (xaRes == null) { 221 throw new IllegalArgumentException ("XAResource is null"); 222 } 223 switch (status) { 224 case Status.STATUS_ACTIVE: 225 case Status.STATUS_MARKED_ROLLBACK: 226 break; 227 default: 228 throw new IllegalStateException ("Status is " + getStateString(status)); 229 } 230 TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes); 231 if (manager == null) { 232 if (flag == XAResource.TMSUSPEND) { 233 throw new IllegalStateException ("trying to suspend an inactive xaresource: " + xaRes); 234 } 235 manager = (TransactionBranch) suspendedXaResources.remove(xaRes); 237 if (manager == null) { 238 throw new IllegalStateException ("Resource not known to transaction: " + xaRes); 239 } 240 } 241 242 try { 243 xaRes.end(manager.getBranchId(), flag); 244 if (flag == XAResource.TMSUSPEND) { 245 suspendedXaResources.put(xaRes, manager); 246 } 247 return true; 248 } catch (XAException e) { 249 log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e); 250 return false; 251 } 252 } 253 254 public void commit() throws HeuristicMixedException , HeuristicRollbackException , RollbackException , SecurityException , SystemException { 256 beforePrepare(); 257 258 try { 259 boolean timedout = false; 260 if (TransactionTimer.getCurrentTime() > timeout) { 261 status = Status.STATUS_MARKED_ROLLBACK; 262 timedout = true; 263 } 264 265 if (status == Status.STATUS_MARKED_ROLLBACK) { 266 rollbackResources(resourceManagers); 267 if (timedout) { 268 throw new RollbackException ("Transaction timout"); 269 } else { 270 throw new RollbackException ("Unable to commit: transaction marked for rollback"); 271 } 272 } 273 synchronized (this) { 274 if (status == Status.STATUS_ACTIVE) { 275 if (this.resourceManagers.size() == 0) { 276 status = Status.STATUS_COMMITTED; 278 } else if (this.resourceManagers.size() == 1) { 279 status = Status.STATUS_COMMITTING; 281 } else { 282 status = Status.STATUS_PREPARING; 284 } 285 } 286 } 288 289 if (resourceManagers.size() == 0) { 291 synchronized (this) { 292 status = Status.STATUS_COMMITTED; 293 } 294 return; 295 } 296 297 if (resourceManagers.size() == 1) { 299 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst(); 300 try { 301 manager.getCommitter().commit(manager.getBranchId(), true); 302 synchronized (this) { 303 status = Status.STATUS_COMMITTED; 304 } 305 return; 306 } catch (XAException e) { 307 synchronized (this) { 308 status = Status.STATUS_ROLLEDBACK; 309 } 310 throw (RollbackException ) new RollbackException ("Error during one-phase commit").initCause(e); 311 } 312 } 313 314 boolean willCommit = internalPrepare(); 316 317 if (willCommit) { 319 commitResources(resourceManagers); 320 } else { 321 rollbackResources(resourceManagers); 322 throw new RollbackException ("Unable to commit"); 323 } 324 } finally { 325 afterCompletion(); 326 synchronized (this) { 327 status = Status.STATUS_NO_TRANSACTION; 328 } 329 } 330 } 331 332 int prepare() throws SystemException , RollbackException { 334 beforePrepare(); 335 int result = XAResource.XA_RDONLY; 336 try { 337 LinkedList rms; 338 synchronized (this) { 339 if (status == Status.STATUS_ACTIVE) { 340 if (resourceManagers.size() == 0) { 341 status = Status.STATUS_COMMITTED; 343 return result; 344 } else { 345 status = Status.STATUS_PREPARING; 347 } 348 } 349 rms = resourceManagers; 351 } 352 353 boolean willCommit = internalPrepare(); 354 355 if (willCommit) { 357 if (!rms.isEmpty()) { 358 result = XAResource.XA_OK; 359 } 360 } else { 361 rollbackResources(rms); 362 throw new RollbackException ("Unable to commit"); 363 } 364 } finally { 365 if (result == XAResource.XA_RDONLY) { 366 afterCompletion(); 367 synchronized (this) { 368 status = Status.STATUS_NO_TRANSACTION; 369 } 370 } 371 } 372 return result; 373 } 374 375 void preparedCommit() throws SystemException { 377 try { 378 commitResources(resourceManagers); 379 } finally { 380 afterCompletion(); 381 synchronized (this) { 382 status = Status.STATUS_NO_TRANSACTION; 383 } 384 } 385 } 386 387 private void beforePrepare() { 389 synchronized (this) { 390 switch (status) { 391 case Status.STATUS_ACTIVE: 392 case Status.STATUS_MARKED_ROLLBACK: 393 break; 394 default: 395 throw new IllegalStateException ("Status is " + getStateString(status)); 396 } 397 } 398 399 beforeCompletion(); 400 endResources(); 401 } 402 403 404 private boolean internalPrepare() throws SystemException { 406 407 for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) { 408 synchronized (this) { 409 if (status != Status.STATUS_PREPARING) { 410 break; 412 } 413 } 414 TransactionBranch manager = (TransactionBranch) rms.next(); 415 try { 416 int vote = manager.getCommitter().prepare(manager.getBranchId()); 417 if (vote == XAResource.XA_RDONLY) { 418 rms.remove(); 420 } 421 } catch (XAException e) { 422 synchronized (this) { 423 status = Status.STATUS_MARKED_ROLLBACK; 424 rms.remove(); 427 break; 428 } 429 } 430 } 431 432 boolean willCommit; 434 synchronized (this) { 435 willCommit = (status != Status.STATUS_MARKED_ROLLBACK); 436 if (willCommit) { 437 status = Status.STATUS_PREPARED; 438 } 439 } 440 if (willCommit && !resourceManagers.isEmpty()) { 442 try { 443 logMark = txnLog.prepare(xid, resourceManagers); 444 } catch (LogException e) { 445 try { 446 rollbackResources(resourceManagers); 447 } catch (Exception se) { 448 log.error("Unable to rollback after failure to log prepare", se.getCause()); 449 } 450 throw (SystemException ) new SystemException ("Error logging prepare; transaction was rolled back)").initCause(e); 451 } 452 } 453 return willCommit; 454 } 455 456 public void rollback() throws IllegalStateException , SystemException { 457 List rms; 458 synchronized (this) { 459 switch (status) { 460 case Status.STATUS_ACTIVE: 461 status = Status.STATUS_MARKED_ROLLBACK; 462 break; 463 case Status.STATUS_MARKED_ROLLBACK: 464 break; 465 default: 466 throw new IllegalStateException ("Status is " + getStateString(status)); 467 } 468 rms = resourceManagers; 469 } 470 471 beforeCompletion(); 472 endResources(); 473 try { 474 rollbackResources(rms); 475 if (logMark != null) { 477 try { 478 txnLog.rollback(xid, logMark); 479 } catch (LogException e) { 480 try { 481 rollbackResources(rms); 482 } catch (Exception se) { 483 log.error("Unable to rollback after failure to log decision", se.getCause()); 484 } 485 throw (SystemException ) new SystemException ("Error logging rollback").initCause(e); 486 } 487 } 488 } finally { 489 afterCompletion(); 490 synchronized (this) { 491 status = Status.STATUS_NO_TRANSACTION; 492 } 493 } 494 } 495 496 private void beforeCompletion() { 497 int i = 0; 498 while (true) { 499 Synchronization synch; 500 synchronized (this) { 501 if (i == syncList.size()) { 502 if (interposedSynchronization != null) { 503 synch = interposedSynchronization; 504 i++; 505 } else { 506 return; 507 } 508 } else if (i == syncList.size() + 1) { 509 return; 510 } else { 511 synch = (Synchronization ) syncList.get(i++); 512 } 513 } 514 try { 515 synch.beforeCompletion(); 516 } catch (Exception e) { 517 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e); 518 synchronized (this) { 519 status = Status.STATUS_MARKED_ROLLBACK; 520 } 521 } 522 } 523 } 524 525 private void afterCompletion() { 526 if (interposedSynchronization != null) { 528 try { 529 interposedSynchronization.afterCompletion(status); 530 } catch (Exception e) { 531 log.warn("Unexpected exception from afterCompletion; continuing", e); 532 } 533 } 534 for (Iterator i = syncList.iterator(); i.hasNext();) { 535 Synchronization synch = (Synchronization ) i.next(); 536 try { 537 synch.afterCompletion(status); 538 } catch (Exception e) { 539 log.warn("Unexpected exception from afterCompletion; continuing", e); 540 continue; 541 } 542 } 543 for (Iterator i = entityManagers.values().iterator(); i.hasNext();) { 544 Closeable entityManager = (Closeable) i.next(); 545 entityManager.close(); 546 } 547 } 548 549 private void endResources() { 550 endResources(activeXaResources); 551 endResources(suspendedXaResources); 552 } 553 554 private void endResources(IdentityHashMap resourceMap) { 555 while (true) { 556 XAResource xaRes; 557 TransactionBranch manager; 558 int flags; 559 synchronized (this) { 560 Set entrySet = resourceMap.entrySet(); 561 if (entrySet.isEmpty()) { 562 return; 563 } 564 Map.Entry entry = (Map.Entry ) entrySet.iterator().next(); 565 xaRes = (XAResource ) entry.getKey(); 566 manager = (TransactionBranch) entry.getValue(); 567 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS; 568 resourceMap.remove(xaRes); 569 } 570 try { 571 xaRes.end(manager.getBranchId(), flags); 572 } catch (XAException e) { 573 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e); 574 synchronized (this) { 575 status = Status.STATUS_MARKED_ROLLBACK; 576 } 577 } 578 } 579 } 580 581 private void rollbackResources(List rms) throws SystemException { 582 SystemException cause = null; 583 synchronized (this) { 584 status = Status.STATUS_ROLLING_BACK; 585 } 586 for (Iterator i = rms.iterator(); i.hasNext();) { 587 TransactionBranch manager = (TransactionBranch) i.next(); 588 try { 589 manager.getCommitter().rollback(manager.getBranchId()); 590 } catch (XAException e) { 591 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e); 592 if (cause == null) { 593 cause = new SystemException (e.errorCode); 594 } 595 continue; 596 } 597 } 598 synchronized (this) { 599 status = Status.STATUS_ROLLEDBACK; 600 } 601 if (cause != null) { 602 throw cause; 603 } 604 } 605 606 private void commitResources(List rms) throws SystemException { 607 SystemException cause = null; 608 synchronized (this) { 609 status = Status.STATUS_COMMITTING; 610 } 611 for (Iterator i = rms.iterator(); i.hasNext();) { 612 TransactionBranch manager = (TransactionBranch) i.next(); 613 try { 614 manager.getCommitter().commit(manager.getBranchId(), false); 615 } catch (XAException e) { 616 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e); 617 if (cause == null) { 618 cause = new SystemException (e.errorCode); 619 } 620 continue; 621 } 622 } 623 if (!rms.isEmpty()) { 625 try { 626 txnLog.commit(xid, logMark); 627 } catch (LogException e) { 628 log.error("Unexpected exception logging commit completion for xid " + xid, e); 629 throw (SystemException ) new SystemException ("Unexpected error logging commit completion for xid " + xid).initCause(e); 630 } 631 } 632 synchronized (this) { 633 status = Status.STATUS_COMMITTED; 634 } 635 if (cause != null) { 636 throw cause; 637 } 638 } 639 640 private static String getStateString(int status) { 641 switch (status) { 642 case Status.STATUS_ACTIVE: 643 return "STATUS_ACTIVE"; 644 case Status.STATUS_PREPARING: 645 return "STATUS_PREPARING"; 646 case Status.STATUS_PREPARED: 647 return "STATUS_PREPARED"; 648 case Status.STATUS_MARKED_ROLLBACK: 649 return "STATUS_MARKED_ROLLBACK"; 650 case Status.STATUS_ROLLING_BACK: 651 return "STATUS_ROLLING_BACK"; 652 case Status.STATUS_COMMITTING: 653 return "STATUS_COMMITTING"; 654 case Status.STATUS_COMMITTED: 655 return "STATUS_COMMITTED"; 656 case Status.STATUS_ROLLEDBACK: 657 return "STATUS_ROLLEDBACK"; 658 case Status.STATUS_NO_TRANSACTION: 659 return "STATUS_NO_TRANSACTION"; 660 case Status.STATUS_UNKNOWN: 661 return "STATUS_UNKNOWN"; 662 default: 663 throw new AssertionError (); 664 } 665 } 666 667 public boolean equals(Object obj) { 668 if (obj instanceof TransactionImpl) { 669 TransactionImpl other = (TransactionImpl) obj; 670 return xid.equals(other.xid); 671 } else { 672 return false; 673 } 674 } 675 676 public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) { 679 TransactionBranch manager = new TransactionBranch(xaRes, branchId); 680 resourceManagers.add(manager); 681 return manager; 682 } 683 684 public Object getEntityManager(String persistenceUnit) { 685 return entityManagers.get(persistenceUnit); 686 } 687 688 public void setEntityManager(String persistenceUnit, Object entityManager) { 689 Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager); 690 if (oldEntityManager != null) { 691 throw new EJBException ("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid); 692 } 693 } 694 695 private static class TransactionBranch implements TransactionBranchInfo { 696 private final XAResource committer; 697 private final Xid branchId; 698 699 public TransactionBranch(XAResource xaRes, Xid branchId) { 700 committer = xaRes; 701 this.branchId = branchId; 702 } 703 704 public XAResource getCommitter() { 705 return committer; 706 } 707 708 public Xid getBranchId() { 709 return branchId; 710 } 711 712 public String getResourceName() { 713 if (committer instanceof NamedXAResource) { 714 return ((NamedXAResource) committer).getName(); 715 } else { 716 throw new IllegalStateException ("Cannot log transactions unles XAResources are named! " + committer); 717 } 718 } 719 720 public Xid getBranchXid() { 721 return branchId; 722 } 723 } 724 725 726 } 727 | Popular Tags |