1 46 package org.mr.api.jms; 47 48 import java.io.ByteArrayInputStream ; 49 import java.io.ByteArrayOutputStream ; 50 import java.io.IOException ; 51 import java.io.ObjectInputStream ; 52 import java.io.ObjectOutputStream ; 53 import java.util.ArrayList ; 54 import java.util.Iterator ; 55 import java.util.Collection ; 56 57 import javax.jms.IllegalStateException ; 58 import javax.jms.Session ; 59 import javax.jms.JMSException ; 60 import javax.jms.TransactionInProgressException ; 61 import javax.jms.TransactionRolledBackException ; 62 import javax.transaction.xa.XAException ; 63 import javax.transaction.xa.XAResource ; 64 import javax.transaction.xa.Xid ; 65 66 import org.apache.commons.logging.Log; 67 import org.apache.commons.logging.LogFactory; 68 import org.mr.MantaAgent; 69 import org.mr.core.persistent.PersistentMap; 70 import org.mr.core.protocol.MantaBusMessage; 71 import org.mr.core.util.byteable.ByteableByteArray; 72 73 96 public class TransactionContext implements XAResource { 97 98 private static final Log log = LogFactory.getLog(TransactionContext.class); 99 100 static final int SUSPENDED = 1; 101 static final int SUCCESS = 2; 102 static final int ROLLBACK_ONLY = 3; 103 static final int PREPARED = 4; 104 105 private ArrayList sessions = new ArrayList (2); 106 private LocalTransactionIDGenerator idGenerator; 107 108 protected PersistentMap transactionsTable; 109 110 private Xid associatedXid; 112 113 private String localTransactionId; 115 116 private String resourceName; 117 118 private LocalTransactionEventListener localTransactionEventListener; 119 120 public TransactionContext() { 122 transactionsTable = new PersistentMap("xacs", false, true); 123 idGenerator = new LocalTransactionIDGenerator(MantaAgent.getInstance().getMessageId()); 124 resourceName = MantaAgent.getInstance().getAgentName(); 125 } 126 127 public boolean isInXATransaction() { 128 return associatedXid != null; 129 } 130 131 public boolean isInLocalTransaction() { 132 return localTransactionId != null; 133 } 134 135 136 139 public LocalTransactionEventListener getLocalTransactionEventListener() { 140 return localTransactionEventListener; 141 } 142 143 148 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 149 this.localTransactionEventListener = localTransactionEventListener; 150 } 151 152 public String getResourceName() { 153 return this.resourceName; 154 } 155 156 public void addSession(MantaSession session) { 162 sessions.add(session); 163 } 164 165 166 public void removeSession(MantaSession session) { 167 sessions.remove(session); 168 } 169 170 171 public Object getTransactionId() { 172 if (localTransactionId != null) 173 return localTransactionId; 174 return associatedXid; 175 } 176 177 183 186 public void begin() throws JMSException { 187 Iterator i = sessions.iterator(); 188 while (i.hasNext()) { 189 MantaSession session = (MantaSession)i.next(); 190 session.checkLegalOperation(); 191 if (session.sessionAcknowledgementMode != Session.SESSION_TRANSACTED) 192 throw new IllegalStateException ("MNJMS00175 : FAILED ON METHOD begin(). SESSION IS NOT TRANSACTED."); 193 } 194 195 if (associatedXid != null) 196 throw new TransactionInProgressException ("Cannot start local transction. XA transaction is already in progress."); 197 198 if (localTransactionId == null) { 199 localTransactionId = idGenerator.nextID(); 200 201 if (localTransactionEventListener != null) { 203 localTransactionEventListener.beginEvent(localTransactionId); 204 } 205 if( log.isDebugEnabled() ) 206 log.debug("Started local transaction: "+localTransactionId); 207 } 208 } 209 210 216 public void rollback() throws JMSException { 217 Iterator i = sessions.iterator(); 218 while (i.hasNext()) { 219 MantaSession session = (MantaSession)i.next(); 220 session.checkLegalOperation(); 221 if (session.sessionAcknowledgementMode != Session.SESSION_TRANSACTED) 222 throw new IllegalStateException ("MNJMS00075 : FAILED ON METHOD rollback(). SESSION IS NOT TRANSACTED."); 223 } 224 225 if (associatedXid != null) 226 throw new TransactionInProgressException ("Cannot rollback local transction. XA transaction is already in progress."); 227 228 if (localTransactionId != null) { 229 if (localTransactionEventListener != null) { 231 localTransactionEventListener.rollbackEvent(localTransactionId); 232 } 233 if (log.isDebugEnabled()) 234 log.debug("Rolledback local transaction: "+localTransactionId); 235 localTransactionId = null; 236 } 237 Iterator i2 = sessions.iterator(); 238 while (i2.hasNext()) { 239 MantaSession session = (MantaSession)i2.next(); 240 session.rollbackSession(); 241 } 242 } 243 244 245 253 public void commit() throws JMSException { 254 Iterator i = sessions.iterator(); 255 while (i.hasNext()) { 256 MantaSession session = (MantaSession)i.next(); 257 session.checkLegalOperation(); 258 if (session.sessionAcknowledgementMode != Session.SESSION_TRANSACTED) 259 throw new IllegalStateException ("MNJMS00074 : FAILED ON METHOD commit(). SESSION IS NOT TRANSACTED."); 260 } 261 262 if (associatedXid != null) 263 throw new TransactionInProgressException ("Cannot commit local transction. XA transaction is already in progress."); 264 265 if (localTransactionId != null) { 267 if (localTransactionEventListener != null) { 269 localTransactionEventListener.commitEvent(localTransactionId); 270 } 271 if( log.isDebugEnabled() ) 272 log.debug("Committed local transaction: "+localTransactionId); 273 localTransactionId = null; 274 } 275 Iterator i2 = sessions.iterator(); 276 while (i2.hasNext()) { 277 MantaSession session = (MantaSession)i2.next(); 278 session.commitSession(); 279 } 280 } 281 282 283 284 288 public void start(Xid xid, int flags) throws XAException  289 { 290 if (associatedXid != null) { 293 throw new XAException ("MNJMS000B4 : FAILED ON METHOD start(). RESOURCE "+xid+" IS CURRENTLY ACTIVE IN TRANSACTION."); 294 } 295 296 if (localTransactionId != null) { 297 throw new XAException ("MNJMS000B4 : FAILED ON METHOD start(). RESOURCE "+xid+" IS CURRENTLY ACTIVE IN LOCAL TRANSACTION."); 298 } 299 300 if (flags == TMNOFLAGS) 304 this.getInvolvedIn(xid); 305 306 else if (flags == TMRESUME) { 309 if (this.getTransactionStatus(xid) != TransactionContext.SUSPENDED) 310 throw new XAException ("MNJMS000B5 : FAILED ON METHOD start(). RESOURCE "+xid+" IS NOT SUSPENDED."); 311 312 this.resumeTransaction(xid); 316 } 317 318 else if (flags == TMJOIN) { 319 log.info("got TMJOIN flag!! continue!"); 320 } 322 323 else throw new XAException ("MNJMS000B6 : FAILED ON METHOD start(). FLAGS VALUE CORRUPTED : "+flags); 325 326 associatedXid = xid; 328 if (log.isDebugEnabled()) 329 log.debug("Started XAResource with Xid "+xid); 330 } 331 332 333 337 public void end(Xid xid, int flags) throws XAException  338 { 339 342 if (associatedXid == null) { 346 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 347 if (xaMsgs != null && xaMsgs.getStatus() == TransactionContext.SUSPENDED) { 348 associatedXid = xid; 349 if (log.isDebugEnabled()) { 350 log.debug("Method end() was called for a suspended Xid. Reassociating Xid: "+xid); 351 } 352 } 353 else { 354 throw new XAException ("MNJMS000B7 : FAILED ON METHOD end(). RESOURCE "+xid+" IS NOT ACTIVE IN CURRENT TRANSACTION."); 355 } 356 } 357 358 if (!xid.equals(associatedXid)) 359 throw new XAException ("MNJMS000B7 : FAILED ON METHOD end(). RESOURCE "+xid+" IS NOT ACTIVE IN CURRENT TRANSACTION."); 360 361 if (flags == TMSUSPEND) { 363 this.setTransactionStatus(xid, TransactionContext.SUSPENDED); 364 } 365 else if (flags == TMFAIL) { 366 this.setTransactionStatus(xid, TransactionContext.ROLLBACK_ONLY); 367 } 368 else if (flags == TMSUCCESS) { 369 this.setTransactionStatus(xid, TransactionContext.SUCCESS); 370 } 371 else { 372 throw new XAException ("MNJMS000B8 : FAILED ON METHOD start(). FLAGS VALUE CORRUPTED : "+flags); 373 } 374 375 this.saveTransaction(xid); 381 382 associatedXid = null; 387 if (log.isDebugEnabled()) 388 log.debug("end() called for Xid "+xid); 389 } 390 391 392 398 public int prepare(Xid xid) throws XAException  399 { 400 if (log.isDebugEnabled()) 401 log.debug("prepare() called on Xid "+xid); 402 403 if (sessions.isEmpty()) { 404 throw new XAException (XAException.XAER_PROTO); 405 } 406 407 int transactionStatus = this.getTransactionStatus(xid); 409 if (transactionStatus != TransactionContext.SUCCESS && 410 transactionStatus != TransactionContext.SUSPENDED) throw new XAException ("MNJMS000B9 :FAILED ON METHOD prepare(). TRANSACTION STATE ILLEGAL FOR PREPARE ACTION. REQUIRED: SUCCESS, RETRIEVED: "+this.getTransactionStatus(xid)); 412 413 int statusReturn = XAResource.XA_OK; 415 try { 418 422 statusReturn = this.prepareTransaction(xid); 423 427 } 428 catch (Exception e) { 429 this.setTransactionStatus(xid, TransactionContext.ROLLBACK_ONLY); 431 throw new XAException ("MNTA10236Z EXCEPTION DURING PREPARE: "+e.getMessage()); 433 } 434 435 if (log.isDebugEnabled()) 436 log.debug("prepare() completed on Xid "+xid+" , RC="+statusReturn); 437 return statusReturn; 438 } 439 440 441 445 public void commit(Xid xid, boolean onePhase) throws XAException  446 { 447 if (log.isDebugEnabled()) 450 log.debug("commit() requested for Xid "+xid); 451 452 if (sessions.isEmpty()) { 453 throw new XAException (XAException.XAER_PROTO); 454 } 455 456 if (onePhase) { 457 try { 458 prepare(xid); 459 } 460 catch (Exception e) { 461 throw new XAException ("MNTA10232E EXCEPTION DURING 1P PREPARE " 462 + e.getMessage()); 463 } 464 } 465 466 if (this.getTransactionStatus(xid) != TransactionContext.PREPARED) { 469 throw new XAException ("MNTA10234E TRANSACTION STATUS DISALLOWS PREPARE. "+ 470 "EXPECTED: PREPARED, RECEIVED "+this.getTransactionStatus(xid)); 471 } 472 473 476 try { 477 this.commitTransaction(xid); 478 } 481 catch (Exception e) { 482 this.setTransactionStatus(xid, TransactionContext.ROLLBACK_ONLY); 483 throw new XAException ("MNTA10236E EXCEPTION DURING 2P COMMIT: "+e.getMessage()); 484 } 485 486 if (log.isDebugEnabled()) 487 log.debug("commit completed for Xid "+xid); 488 } 489 490 491 496 public Xid [] recover(int flag) throws XAException  497 { 498 if (sessions.isEmpty()) { 499 throw new XAException (XAException.XAER_PROTO); 500 } 501 502 if (flag != XAResource.TMSTARTRSCAN && flag != XAResource.TMENDRSCAN) 503 return null; 505 506 return this.getPreparedTransactions(); 507 } 508 509 510 514 public void rollback(Xid xid) throws XAException  515 { 516 if (log.isDebugEnabled()) 517 log.debug("rollback() called for Xid "+xid); 518 519 if (associatedXid != null && xid.equals(associatedXid)) { 520 associatedXid = null; 522 } 523 524 try { 525 this.rollbackTransaction(xid); 526 } 527 catch (Exception e) {} 528 529 } 532 533 534 535 public void forget(Xid xid) throws XAException { 536 if (log.isDebugEnabled()) 537 log.debug("forget() called on Xid "+xid); 538 539 if (sessions.isEmpty()) { 541 throw new XAException (XAException.XAER_PROTO); 542 } 543 544 this.removeTransaction(xid); 546 } 547 548 549 public boolean setTransactionTimeout(int seconds) throws XAException { 550 throw new XAException ("MNTA10241E CAN NOT SET A TIMEOUT"); 551 } 552 553 554 public int getTransactionTimeout() throws XAException { 555 throw new XAException ("MNTA10242E CAN NOT GET A TIMEOUT"); 556 } 557 558 559 public boolean isSameRM(javax.transaction.xa.XAResource xares) throws XAException { 560 return this.equals(xares); 561 } 562 563 public boolean equals(XAResource xares) { 564 if (xares instanceof TransactionContext) { 565 TransactionContext context = (TransactionContext)xares; 566 return context.getResourceName().equals(this.getResourceName()); 569 } 570 return false; 571 } 572 573 574 576 581 private void getInvolvedIn(Xid xid) throws XAException  582 { 583 if (transactionsTable.containsKey(xid.toString())) 586 throw new XAException ("MNJMS0001B : RESOURCE "+xid+" ALREADY INVOLVED. FAILED ON METHOD getInvolvedIn()."); 587 588 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 589 ByteableByteArray bba; 590 try { 591 ObjectOutputStream oos = new ObjectOutputStream (baos); 592 MantaXid serializableXid = new MantaXid(xid); 593 oos.writeObject(serializableXid); 594 bba = new ByteableByteArray(); 595 bba.setPayload(baos.toByteArray()); 596 transactionsTable.put(xid.toString(), new MantaXADescriptor(bba)); 597 } 598 catch (IOException ioe) { 599 if (log.isErrorEnabled()) 600 log.error("Failed to serialize Xid: "+xid, ioe); 601 throw new XAException ("MNJMS0001C : FAILED TO SERIALIZE RESOURCE "+xid+" METHOD getInvolvedIn() FAILED."); 602 603 } 604 } 605 606 607 612 private void removeTransaction(Xid xid) 613 { 614 transactionsTable.remove(xid.toString()); 615 } 616 617 623 private void setTransactionStatus(Xid xid, int status) throws XAException  624 { 625 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 626 627 if (xaMsgs == null) 628 throw new XAException ("MNJMS0001D : RESOURCE "+ xid + 629 " STATUS WILL NOT BE CHANGED. RESOURCE DOES NOT PARTICIPATE IN A TRANSACTION. METHOD setTransactionStatus() FAILED."); 630 631 xaMsgs.status = status; 633 transactionsTable.put(xid.toString(),xaMsgs); 636 } 637 638 644 private int getTransactionStatus(Xid xid) throws XAException  645 { 646 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 649 if (xaMsgs == null) 650 throw new XAException ("MNJMS0001E : RESOURCE "+ xid + 651 " STATUS WILL NOT BE RETRIEVED. RESOURCE DOES NOT PARTICIPATE IN A TRANSACTION. METHOD getTransactionStatus() FAILED."); 652 653 return xaMsgs.status; 655 } 656 657 658 663 private void saveTransaction(Xid xid) throws XAException  664 { 665 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 667 668 if (xaMsgs == null) 670 throw new XAException ("MNJMS0001F : RESOURCE IS NOT TRANSACTED IN "+xid+ 671 " METHOD saveTransaction() FAILED."); 672 673 Iterator i = sessions.iterator(); 683 while (i.hasNext()) { 684 MantaSession session = (MantaSession)i.next(); 685 session.saveMessages(xaMsgs); 686 } 687 } 690 691 712 718 private void resumeTransaction(Xid xid) throws XAException  719 { 720 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 723 724 if (xaMsgs == null) 726 throw new XAException ("MNJMS00020 : RESOURCE IS NOT TRANSACTED IN "+xid+ 727 " METHOD resumeTransaction() FAILED."); 728 } 729 730 737 private int prepareTransaction(Xid xid) throws Exception  738 { 739 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 741 742 if (xaMsgs == null) 744 throw new XAException ("MNJMS00021 : RESOURCE IS NOT TRANSACTED IN "+xid + 745 " METHOD prepareTransaction() FAILED."); 746 747 int returnValue; 748 749 if (xaMsgs.getHeldMessages().size() > 0 || xaMsgs.getUnackedMessages().size() > 0) { 753 returnValue = XAResource.XA_OK; 754 } 755 else { 756 returnValue = XAResource.XA_RDONLY; 757 } 758 xaMsgs.status = TransactionContext.PREPARED; 759 transactionsTable.put(xid.toString(),xaMsgs, true); 760 761 return returnValue; 762 } 763 764 765 772 private void commitTransaction(Xid xid) throws JMSException , XAException  773 { 774 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 776 777 if (xaMsgs == null) 779 throw new XAException ("MNJMS00022 : RESOURCE IS NOT INVOLVED IN TRANSACTION "+xid+ 780 " METHOD commitTransaction() FAILED."); 781 782 Iterator i = sessions.iterator(); 786 while (i.hasNext()) { 787 MantaSession session = (MantaSession)i.next(); 788 session.sendAllMessages(xaMsgs.getHeldMessages()); 789 session.ackAllMessages(xaMsgs.getUnackedMessages()); 790 } 791 792 this.removeTransaction(xid); 793 } 794 795 796 800 private void rollbackTransaction(Xid xid) throws Exception  801 { 802 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString()); 804 805 if (xaMsgs == null) 807 throw new XAException ("MNJMS00023 : RESOURCE IS NOT INVOLVED IN TRANACTION "+xid+ 808 " METHOD rollbackTransaction() FAILED."); 809 810 Collection unackedMessages = xaMsgs.getUnackedMessages(); 816 if (unackedMessages != null) { 817 Iterator i = unackedMessages.iterator(); 818 while (i.hasNext()) { 819 MantaBusMessage message = (MantaBusMessage)i.next(); 820 message.setDeliveryCount(message.getDeliveryCount()+1); 821 } 822 } 823 824 Iterator i = sessions.iterator(); 825 while (i.hasNext()) { 826 MantaSession session = (MantaSession)i.next(); 827 session.addConsumerMessages(unackedMessages); 828 } 829 830 this.removeTransaction(xid); 831 } 832 833 834 838 private Xid [] getPreparedTransactions() 839 { 840 Iterator keys = transactionsTable.keySet().iterator(); 841 ArrayList ids = new ArrayList (); 842 while (keys.hasNext()) { 843 String key = (String ) keys.next(); 844 MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(key); 845 if (xaMsgs.status == TransactionContext.PREPARED) { 846 try { 847 ByteableByteArray bba = (ByteableByteArray) xaMsgs.getXidBytes(); 848 ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (bba.getPayload())); 849 Xid xid = (Xid ) ois.readObject(); 850 ids.add(xid); 851 } 852 catch(Exception ioe) { 853 if (log.isErrorEnabled()) 854 log.error("MNTA00024 : FAILED TO CREATE XID.",ioe); 855 } 856 } 857 } 858 Xid [] array = new Xid [ids.size()]; 859 ids.toArray(array); 860 return array; 861 } 862 863 } | Popular Tags |