1 6 7 package org.jfox.tm; 8 9 import java.util.ArrayList ; 10 import java.util.Iterator ; 11 import java.util.List ; 12 import java.util.TimerTask ; 13 import javax.transaction.HeuristicMixedException ; 14 import javax.transaction.HeuristicRollbackException ; 15 import javax.transaction.RollbackException ; 16 import javax.transaction.Status ; 17 import javax.transaction.Synchronization ; 18 import javax.transaction.SystemException ; 19 import javax.transaction.Transaction ; 20 import javax.transaction.xa.XAException ; 21 import javax.transaction.xa.XAResource ; 22 import javax.transaction.xa.Xid ; 23 24 import org.jfox.ioc.logger.Logger; 25 26 27 28 29 36 37 class TransactionImpl extends TimerTask implements Transaction { 38 private static Logger logger = Logger.getLogger(TransactionImpl.class.getName()); 39 private int status = Status.STATUS_NO_TRANSACTION; 40 41 private Xid xid = null; 43 private static int branchCount = 0; 45 46 private List syncs = new ArrayList (); 48 private List enlistedXARes = new ArrayList (); 50 private List suspendedXARes = new ArrayList (); 52 53 public TransactionImpl() { 54 status = Status.STATUS_ACTIVE; 55 xid = new XidImpl(); 56 logger.debug(new StringBuffer () 57 .append("begin a new Transaction, tx= ") 58 .append(this.toString()) 59 .append(" status=") 60 .append(StatusHelper.toString(status)) 61 .toString()); 62 } 63 64 71 public synchronized boolean delistResource(XAResource xaResource, int flag) throws IllegalStateException , SystemException { 72 logger.debug(new StringBuffer () 73 .append("Transaction.delistResource(),flag = " + flag + " tx= ") 74 .append(this.toString()) 75 .append(" status=") 76 .append(StatusHelper.toString(status)) 77 .toString()); 78 79 if(xaResource == null) throw new IllegalArgumentException ("null xaRes"); 80 if(flag != XAResource.TMSUCCESS && 81 flag != XAResource.TMSUSPEND && 82 flag != XAResource.TMFAIL) 83 throw new IllegalArgumentException ("wrong flag, flag must be one of XAResource.TMSUCCESS, XAResource.TMSUSPEND, XAResource.TMFAIL"); 84 85 KeyXAResource keyXAres = new KeyXAResource(xaResource); 86 int index = enlistedXARes.indexOf(keyXAres); 87 if(index < 0) { 88 throw new IllegalArgumentException ("xaResource not enlisted"); 89 } 90 switch(status) { 91 case Status.STATUS_ACTIVE: 92 case Status.STATUS_MARKED_ROLLBACK: 93 break; 94 case Status.STATUS_PREPARING: 95 throw new IllegalStateException ("Already started preparing."); 96 case Status.STATUS_ROLLING_BACK: 97 throw new IllegalStateException ("Already started rolling back."); 98 case Status.STATUS_PREPARED: 99 throw new IllegalStateException ("Already prepared."); 100 case Status.STATUS_COMMITTING: 101 throw new IllegalStateException ("Already started committing."); 102 case Status.STATUS_COMMITTED: 103 throw new IllegalStateException ("Already committed."); 104 case Status.STATUS_ROLLEDBACK: 105 throw new IllegalStateException ("Already rolled back."); 106 case Status.STATUS_NO_TRANSACTION: 107 throw new IllegalStateException ("No Transaction."); 108 case Status.STATUS_UNKNOWN: 109 throw new IllegalStateException ("Unknown state"); 110 default: 111 throw new IllegalStateException ("Illegal status: " + StatusHelper.toString(status)); 112 } 113 114 keyXAres = (KeyXAResource) enlistedXARes.get(index); 115 try { 116 logger.debug("delist resource " + keyXAres.getXid() + " with flag " + flag); 117 xaResource.end(keyXAres.getXid(), flag); 118 enlistedXARes.remove(index); 119 return true; 120 } 121 catch(XAException e) { 122 logger.warn(e); 123 status = Status.STATUS_MARKED_ROLLBACK; 124 return false; 125 } 126 } 127 128 public synchronized boolean enlistResource(XAResource xaResource) throws RollbackException , IllegalStateException , SystemException { 129 logger.debug(new StringBuffer () 130 .append("Transaction.enlistResource(), tx= ") 131 .append(this.toString()) 132 .append(" status=") 133 .append(StatusHelper.toString(status)) 134 .toString()); 135 if(xaResource == null) throw new IllegalArgumentException ("null xaRes"); 136 switch(status) { 137 case Status.STATUS_ACTIVE: 138 case Status.STATUS_PREPARING: 139 break; 140 case Status.STATUS_PREPARED: 141 throw new IllegalStateException ("Transaction already prepared."); 142 case Status.STATUS_COMMITTING: 143 throw new IllegalStateException ("Transaction already started committing."); 144 case Status.STATUS_COMMITTED: 145 throw new IllegalStateException ("Transaction already committed."); 146 case Status.STATUS_MARKED_ROLLBACK: 147 throw new RollbackException ("Transaction already marked for rollback"); 148 case Status.STATUS_ROLLING_BACK: 149 throw new RollbackException ("Transaction already started rolling back."); 150 case Status.STATUS_ROLLEDBACK: 151 throw new RollbackException ("Transaction already rolled back."); 152 case Status.STATUS_NO_TRANSACTION: 153 throw new IllegalStateException ("No current Transaction."); 154 case Status.STATUS_UNKNOWN: 155 throw new IllegalStateException ("Unknown Transaction status"); 156 default : 157 throw new IllegalStateException ("Illegal Transaction status : " + StatusHelper.toString(status)); 158 } 159 160 int flag = XAResource.TMNOFLAGS; 161 162 KeyXAResource keyXAres = new KeyXAResource(xaResource); 163 int index = enlistedXARes.indexOf(keyXAres); 164 if(index >= 0) { 165 keyXAres = (KeyXAResource) enlistedXARes.get(index); 166 169 if(keyXAres.getXaResource() == xaResource) { 170 return false; 171 } 172 flag = XAResource.TMJOIN; 173 } 174 else if((index = suspendedXARes.indexOf(keyXAres)) >= 0) { flag = XAResource.TMRESUME; 176 keyXAres = (KeyXAResource) suspendedXARes.get(index); 177 suspendedXARes.remove(index); 178 enlistedXARes.add(keyXAres); 179 } 180 else { 181 enlistedXARes.add(keyXAres); 182 } 183 try { 184 logger.debug("resource start new branch Transaction " + keyXAres.getXid() + " with flag " + flag); 185 xaResource.start(keyXAres.getXid(), flag); 186 return true; 187 } 188 catch(XAException e) { 189 logger.warn(e); 190 return false; 191 } 192 } 193 194 public int getStatus() throws SystemException { 195 return status; 196 } 197 198 public synchronized void registerSynchronization(Synchronization synchronization) throws RollbackException , IllegalStateException , SystemException { 199 logger.debug(new StringBuffer () 200 .append("Transaction.registerSynchronization(), tx= ") 201 .append(this.toString()) 202 .append(" status=") 203 .append(StatusHelper.toString(status)) 204 .toString()); 205 if(synchronization == null) throw new IllegalArgumentException ("Null synchronization"); 206 switch(status) { 207 case Status.STATUS_ACTIVE: 208 case Status.STATUS_PREPARING: 209 case Status.STATUS_MARKED_ROLLBACK: 210 break; 211 case Status.STATUS_PREPARED: 212 throw new IllegalStateException ("Already prepared."); 213 case Status.STATUS_COMMITTING: 214 throw new IllegalStateException ("Already started committing."); 215 case Status.STATUS_COMMITTED: 216 throw new IllegalStateException ("Already committed."); 217 case Status.STATUS_ROLLING_BACK: 218 throw new RollbackException ("Already started rolling back."); 219 case Status.STATUS_ROLLEDBACK: 220 throw new RollbackException ("Already rolled back."); 221 case Status.STATUS_NO_TRANSACTION: 222 throw new IllegalStateException ("No Transaction."); 223 case Status.STATUS_UNKNOWN: 224 throw new IllegalStateException ("Unknown state"); 225 default: 226 throw new IllegalStateException ("Illegal status: " + StatusHelper.toString(status)); 227 } 228 syncs.add(synchronization); 229 } 230 231 public synchronized void commit() throws RollbackException , HeuristicMixedException , HeuristicRollbackException , SecurityException , SystemException { 232 logger.debug(new StringBuffer () 233 .append("Transaction.commit(), tx= ") 234 .append(this.toString()) 235 .append(" status=") 236 .append(StatusHelper.toString(status)) 237 .toString()); 238 239 switch(status) { 240 case Status.STATUS_PREPARING: 241 throw new IllegalStateException ("Already started preparing."); 242 case Status.STATUS_PREPARED: 243 throw new IllegalStateException ("Already prepared."); 244 case Status.STATUS_ROLLING_BACK: 245 throw new IllegalStateException ("Already started rolling back."); 246 case Status.STATUS_ROLLEDBACK: 247 throw new IllegalStateException ("Already rolled back."); 248 case Status.STATUS_COMMITTING: 249 throw new IllegalStateException ("Already started committing."); 250 case Status.STATUS_COMMITTED: 251 throw new IllegalStateException ("Already committed."); 252 case Status.STATUS_NO_TRANSACTION: 253 throw new IllegalStateException ("No Transaction."); 254 case Status.STATUS_UNKNOWN: 255 throw new IllegalStateException ("Unknown state"); 256 case Status.STATUS_MARKED_ROLLBACK: 257 this.rollback(); 258 throw new RollbackException ("Already marked for rollback"); 259 case Status.STATUS_ACTIVE: 260 break; 261 default: 262 throw new IllegalStateException ("Illegal status: " + StatusHelper.toString(status)); 263 } 264 265 doBeforeCompletion(); 267 try { 268 if(status == Status.STATUS_ACTIVE) { 269 if(enlistedXARes.size() == 0) { 270 status = Status.STATUS_COMMITTED; 272 } 273 else if(enlistedXARes.size() == 1) { 274 doOnePhaseCommit(); 275 } 276 else { doTwoPhaseCommint(); 278 } 279 } 280 else if(status == Status.STATUS_MARKED_ROLLBACK) { 281 this.rollback(); 282 } 283 } 284 finally { 285 doAfterCompletion(); 286 enlistedXARes.clear(); 287 } 288 } 289 290 private void doOnePhaseCommit() throws RollbackException { 291 status = Status.STATUS_COMMITTING; 292 KeyXAResource keyXARes = (KeyXAResource) enlistedXARes.get(0); 293 try { 294 endResource(keyXARes, XAResource.TMSUCCESS); 295 } 296 catch(XAException e) { 297 logger.warn("end resource error, to be roll back", e); 298 status = Status.STATUS_MARKED_ROLLBACK; 299 } 300 301 if(status == Status.STATUS_MARKED_ROLLBACK) { 302 try { 303 status = Status.STATUS_ROLLING_BACK; 304 keyXARes.getXaResource().rollback(keyXARes.getXid()); 305 status = Status.STATUS_ROLLEDBACK; 306 } 307 catch(XAException e) { 308 logger.warn(e); 309 } 310 throw new RollbackException ("commit failed, already rolled back"); 311 } 312 else { 313 try { 314 keyXARes.getXaResource().commit(keyXARes.getXid(), true); 315 } 316 catch(XAException e) { 317 logger.warn(e); 318 } 319 } 320 status = Status.STATUS_COMMITTED; 321 } 322 323 327 private void doTwoPhaseCommint() throws 328 HeuristicMixedException , 329 HeuristicRollbackException , 330 RollbackException { 331 logger.debug("start two phase commit"); 332 status = Status.STATUS_PREPARING; 333 int heuristicCode = XAException.XA_RETRY; 334 int prepareStatus = XAResource.XA_RDONLY; 337 for(Iterator it = enlistedXARes.iterator(); it.hasNext();) { 338 KeyXAResource keyXARes = (KeyXAResource) it.next(); 339 try { 340 logger.debug("prepare " + keyXARes.getXid()); 341 endResource(keyXARes, XAResource.TMSUCCESS); 342 int xaStatus = keyXARes.getXaResource().prepare(keyXARes.getXid()); 343 if(xaStatus == XAResource.XA_OK) { 344 prepareStatus = XAResource.XA_OK; 345 } 346 else if(xaStatus == XAResource.XA_RDONLY) { 347 } 348 else { status = Status.STATUS_MARKED_ROLLBACK; 350 break; 351 } 352 } 353 catch(XAException e) { 354 logger.warn(e); 356 prepareStatus = XAResource.XA_OK; 357 switch(e.errorCode) { 358 case XAException.XA_HEURCOM: 359 heuristicCode = decideHeuristic(heuristicCode, e.errorCode); 361 break; 362 case XAException.XA_HEURRB: 363 case XAException.XA_HEURMIX: 364 case XAException.XA_HEURHAZ: 365 heuristicCode = decideHeuristic(heuristicCode, e.errorCode); 366 if(status == Status.STATUS_PREPARING) 367 status = Status.STATUS_MARKED_ROLLBACK; 368 break; 369 default: 370 if(status == Status.STATUS_PREPARING) 371 status = Status.STATUS_MARKED_ROLLBACK; 372 break; 373 } 374 try { 375 keyXARes.getXaResource().forget(keyXARes.getXid()); 376 } 377 catch(XAException ex) { 378 logger.warn(e); 379 } 380 } 381 catch(Throwable e) { 382 logger.warn(e); 383 status = Status.STATUS_MARKED_ROLLBACK; 384 } 385 } 386 status = Status.STATUS_PREPARED; 388 389 if(prepareStatus == XAResource.XA_RDONLY) { status = Status.STATUS_COMMITTED; 391 } 392 else { 394 if(heuristicCode == XAException.XA_RETRY || heuristicCode == XAException.XA_HEURCOM) { 396 status = Status.STATUS_COMMITTING; 397 List temp = new ArrayList (enlistedXARes); 398 for(Iterator it = temp.iterator(); it.hasNext();) { 399 KeyXAResource keyXARes = (KeyXAResource) it.next(); 400 try { 401 keyXARes.getXaResource().commit(keyXARes.getXid(), false); 402 } 403 catch(XAException e) { 404 logger.warn(e); 405 switch(e.errorCode) { 406 case XAException.XA_HEURRB: 407 case XAException.XA_HEURCOM: 408 case XAException.XA_HEURMIX: 409 case XAException.XA_HEURHAZ: 410 heuristicCode = decideHeuristic(heuristicCode, e.errorCode); 411 break; 412 default: 413 break; 414 } 415 try { 416 keyXARes.getXaResource().forget(keyXARes.getXid()); 417 } 418 catch(XAException ex) { 419 ex.printStackTrace(); 420 } 421 } 422 catch(Throwable t) { 423 t.printStackTrace(); 424 } 425 } 426 status = Status.STATUS_COMMITTED; 427 checkHeuristics(heuristicCode); 428 } 429 else { 431 status = Status.STATUS_ROLLING_BACK; 432 List temp = new ArrayList (enlistedXARes); 433 for(Iterator it = temp.iterator(); it.hasNext();) { 434 KeyXAResource keyXARes = (KeyXAResource) it.next(); 435 try { 436 keyXARes.getXaResource().rollback(keyXARes.getXid()); 437 } 438 catch(XAException e) { 439 logger.warn(e); 440 switch(e.errorCode) { 442 case XAException.XA_HEURRB: 443 case XAException.XA_HEURCOM: 444 case XAException.XA_HEURMIX: 445 case XAException.XA_HEURHAZ: 446 decideHeuristic(heuristicCode, e.errorCode); 447 break; 448 default: 449 break; 450 } 451 try { 452 keyXARes.getXaResource().forget(keyXARes.getXid()); 453 } 454 catch(XAException ex) { 455 ex.printStackTrace(); 456 } 457 } 458 catch(Throwable t) { 459 t.printStackTrace(); 460 } 461 } 462 status = Status.STATUS_ROLLEDBACK; 463 checkHeuristics(heuristicCode); 464 throw new RollbackException ("Unable to commit because of heuristic code, tx=" + toString()); 465 } 466 } 467 } 468 469 476 private int decideHeuristic(int heuristicCode, int xaExceptionCode) { 477 switch(xaExceptionCode) { 478 case XAException.XA_HEURMIX: 479 heuristicCode = XAException.XA_HEURMIX; 480 break; 481 case XAException.XA_HEURRB: 482 if(heuristicCode == XAException.XA_RETRY) 483 heuristicCode = XAException.XA_HEURRB; 484 else if(heuristicCode == XAException.XA_HEURCOM || 485 heuristicCode == XAException.XA_HEURHAZ) 486 heuristicCode = XAException.XA_HEURMIX; 487 break; 488 case XAException.XA_HEURCOM: 489 if(heuristicCode == XAException.XA_RETRY) 490 heuristicCode = XAException.XA_HEURCOM; 491 else if(heuristicCode == XAException.XA_HEURRB || 492 heuristicCode == XAException.XA_HEURHAZ) 493 heuristicCode = XAException.XA_HEURMIX; 494 break; 495 case XAException.XA_HEURHAZ: 496 if(heuristicCode == XAException.XA_RETRY) 497 heuristicCode = XAException.XA_HEURHAZ; 498 else if(heuristicCode == XAException.XA_HEURCOM || 499 heuristicCode == XAException.XA_HEURRB) 500 heuristicCode = XAException.XA_HEURMIX; 501 break; 502 default: 503 throw new IllegalArgumentException (); 504 } 505 return heuristicCode; 506 } 507 508 515 private void checkHeuristics(int heuristicCode) throws HeuristicMixedException , 516 HeuristicRollbackException { 517 switch(heuristicCode) { 518 case XAException.XA_HEURHAZ: 519 case XAException.XA_HEURMIX: 520 throw new HeuristicMixedException (); 521 case XAException.XA_HEURRB: 522 throw new HeuristicRollbackException (); 523 case XAException.XA_HEURCOM: 524 return; 525 } 526 } 527 528 529 public synchronized void rollback() throws IllegalStateException , SystemException { 530 logger.debug(new StringBuffer () 531 .append("Transaction.rollback(), tx= ") 532 .append(this.toString()) 533 .append(" status=") 534 .append(StatusHelper.toString(status)) 535 .toString()); 536 537 switch(status) { 538 case Status.STATUS_ACTIVE: 539 case Status.STATUS_MARKED_ROLLBACK: 540 break; 541 case Status.STATUS_ROLLEDBACK: 542 logger.warn("Transaction.rollback(): already rolled back"); 543 return; 544 default : 545 logger.error("Transaction.rollback(): bad status"); 546 throw new IllegalStateException ("Cannot rollback(), " + 547 "tx=" + toString() + 548 " status=" + 549 StatusHelper.toString(status)); 550 } 551 552 doBeforeCompletion(); 553 status = Status.STATUS_ROLLING_BACK; 554 List temp = new ArrayList (enlistedXARes); 555 int size = temp.size(); 556 for(int i = 0; i < size; i++) { 557 KeyXAResource keyXARes = ((KeyXAResource) temp.get(i)); 558 try { 559 endResource(keyXARes, XAResource.TMSUCCESS); 560 keyXARes.getXaResource().rollback(keyXARes.getXid()); 561 } 562 catch(XAException e) { 563 logger.error("rollback XAResource " + keyXARes.getXaResource() + " failed.", e); 564 } 565 } 566 status = Status.STATUS_ROLLEDBACK; 567 doAfterCompletion(); 568 } 569 570 public synchronized void setRollbackOnly() throws IllegalStateException , SystemException { 571 logger.debug(new StringBuffer () 572 .append("Transaction.setRollbackOnly(), tx= ") 573 .append(this.toString()) 574 .append(" status=") 575 .append(StatusHelper.toString(status)) 576 .toString()); 577 switch(status) { 578 case Status.STATUS_ACTIVE: 579 case Status.STATUS_PREPARING: 580 case Status.STATUS_PREPARED: 581 status = Status.STATUS_MARKED_ROLLBACK; 582 return; 583 case Status.STATUS_MARKED_ROLLBACK: 584 case Status.STATUS_ROLLING_BACK: 585 return; 586 case Status.STATUS_COMMITTING: 587 throw new IllegalStateException ("started committing."); 588 case Status.STATUS_COMMITTED: 589 throw new IllegalStateException ("already committed."); 590 case Status.STATUS_ROLLEDBACK: 591 throw new IllegalStateException ("already rolled back."); 592 case Status.STATUS_NO_TRANSACTION: 593 throw new IllegalStateException ("no Transaction."); 594 case Status.STATUS_UNKNOWN: 595 throw new IllegalStateException ("unknown state"); 596 default: 597 throw new IllegalStateException ("Illegal status: " + StatusHelper.toString(status)); 598 } 599 } 600 601 public String toString() { 602 return "Transaction [" + xid.toString() + "]"; 603 } 604 605 608 public synchronized void run() { 609 timeout(); 610 } 611 612 void timeout() { 613 try { 614 logger.debug(new StringBuffer () 615 .append("Transaction timeout, tx= ") 616 .append(this.toString()) 617 .append(" status=") 618 .append(StatusHelper.toString(status)) 619 .toString()); 620 621 this.setRollbackOnly(); 622 } 623 catch(Exception e) { 624 logger.warn(e.getMessage(), e); 625 } 626 } 627 628 private void doBeforeCompletion() { 629 try { 630 for(int i = 0; i < syncs.size(); i++) { 631 ((Synchronization ) syncs.get(i)).beforeCompletion(); 632 } 633 } 634 catch(Throwable t) { 635 logger.warn("Synchronization.beforeCompletion()", t); 636 status = Status.STATUS_MARKED_ROLLBACK; 637 } 638 } 639 640 private void doAfterCompletion() { 641 try { 642 for(int i = 0; i < syncs.size(); i++) { 643 ((Synchronization ) syncs.get(i)).afterCompletion(status); 644 } 645 } 646 catch(Throwable t) { 647 logger.warn("Synchronization.afterCompletion()", t); 648 } 649 } 650 651 656 synchronized void suspend() throws SystemException { 657 logger.debug(new StringBuffer () 658 .append("Transaction.suspend(), tx= ") 659 .append(this.toString()) 660 .append(" status=") 661 .append(StatusHelper.toString(status)) 662 .toString()); 663 664 suspendedXARes.addAll(enlistedXARes); 665 for(int i = 0; i < suspendedXARes.size(); i++) { 666 delistResource(((KeyXAResource) suspendedXARes.get(i)).getXaResource(), XAResource.TMSUSPEND); 667 } 668 enlistedXARes.clear(); 669 } 670 671 677 synchronized void resume() throws SystemException , RollbackException { 678 logger.debug(new StringBuffer () 679 .append("Transaction.resume(), tx= ") 680 .append(this.toString()) 681 .append(" status=") 682 .append(StatusHelper.toString(status)) 683 .toString()); 684 List temp = new ArrayList (suspendedXARes); 685 for(int i = 0; i < temp.size(); i++) { 686 enlistResource(((KeyXAResource) temp.get(i)).getXaResource()); 687 } 688 suspendedXARes.clear(); 689 } 690 691 692 private void endResource(KeyXAResource keyXAres, int flag) throws XAException { 693 logger.debug(new StringBuffer () 694 .append("XAResource.end(), tx= ") 695 .append(this.toString()) 696 .append(" status=") 697 .append(StatusHelper.toString(status)) 698 .append(" flag = " + flag) 699 .toString()); 700 701 keyXAres.getXaResource().end(keyXAres.getXid(), flag); 702 if(flag == XAResource.TMFAIL) status = Status.STATUS_MARKED_ROLLBACK; 703 enlistedXARes.remove(keyXAres); 704 } 705 706 public static void main(String [] args) { 707 708 } 709 710 715 private class KeyXAResource { 716 private XAResource xaRes = null; 717 private Xid branchXid = null; 718 719 public KeyXAResource(XAResource xaRes) { 720 this.xaRes = xaRes; 721 } 722 723 public synchronized Xid getXid() { 724 if(branchXid == null) { 725 branchXid = XidImpl.createBranchXid(xid, branchCount++); 726 } 727 return branchXid; 728 } 729 730 public XAResource getXaResource() { 731 return xaRes; 732 } 733 734 public int hashCode() { 735 return 0; 736 } 737 738 public boolean equals(Object obj) { 739 if(!(obj instanceof KeyXAResource)) { 740 return false; 741 } 742 else { 743 try { 744 return ((KeyXAResource) obj).getXaResource().isSameRM(xaRes); 745 } 746 catch(XAException e) { 747 logger.warn(e); 748 return false; 749 } 750 } 751 } 752 } 753 } 754 755 | Popular Tags |