1 18 package org.apache.activemq; 19 20 import java.util.ArrayList ; 21 import java.util.Arrays ; 22 import java.util.Iterator ; 23 24 import javax.jms.JMSException ; 25 import javax.jms.TransactionInProgressException ; 26 import javax.jms.TransactionRolledBackException ; 27 import javax.transaction.xa.XAException ; 28 import javax.transaction.xa.XAResource ; 29 import javax.transaction.xa.Xid ; 30 31 import org.apache.activemq.command.ConnectionId; 32 import org.apache.activemq.command.DataArrayResponse; 33 import org.apache.activemq.command.IntegerResponse; 34 import org.apache.activemq.command.LocalTransactionId; 35 import org.apache.activemq.command.TransactionId; 36 import org.apache.activemq.command.TransactionInfo; 37 import org.apache.activemq.command.XATransactionId; 38 import org.apache.activemq.command.DataStructure; 39 import org.apache.activemq.transaction.Synchronization; 40 import org.apache.activemq.util.JMSExceptionSupport; 41 import org.apache.activemq.util.LongSequenceGenerator; 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 45 import java.util.concurrent.ConcurrentHashMap ; 46 47 67 public class TransactionContext implements XAResource { 68 69 static final private Log log = LogFactory.getLog(TransactionContext.class); 70 71 private static final ConcurrentHashMap endedXATransactionContexts = new ConcurrentHashMap (); 73 74 private final ActiveMQConnection connection; 75 private final LongSequenceGenerator localTransactionIdGenerator; 76 private final ConnectionId connectionId; 77 private ArrayList synchornizations; 78 79 private Xid associatedXid; 81 private TransactionId transactionId; 82 private LocalTransactionEventListener localTransactionEventListener; 83 84 85 public TransactionContext(ActiveMQConnection connection) { 86 this.connection = connection; 87 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 88 this.connectionId = connection.getConnectionInfo().getConnectionId(); 89 } 90 91 public boolean isInXATransaction() { 92 return transactionId != null && transactionId.isXATransaction(); 93 } 94 95 public boolean isInLocalTransaction() { 96 return transactionId != null && transactionId.isLocalTransaction(); 97 } 98 99 102 public LocalTransactionEventListener getLocalTransactionEventListener() { 103 return localTransactionEventListener; 104 } 105 106 112 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 113 this.localTransactionEventListener = localTransactionEventListener; 114 } 115 116 123 public void addSynchronization(Synchronization s) { 124 if( synchornizations == null ) 125 synchornizations = new ArrayList (10); 126 synchornizations.add(s); 127 } 128 129 private void afterRollback() throws JMSException { 130 if( synchornizations == null ) 131 return; 132 133 int size = synchornizations.size(); 134 try { 135 for (int i = 0; i < size; i++) { 136 ((Synchronization) synchornizations.get(i)).afterRollback(); 137 } 138 } catch (JMSException e) { 139 throw e; 140 } catch (Throwable e) { 141 throw JMSExceptionSupport.create(e); 142 } 143 } 144 145 private void afterCommit() throws JMSException { 146 if( synchornizations == null ) 147 return; 148 149 int size = synchornizations.size(); 150 try { 151 for (int i = 0; i < size; i++) { 152 ((Synchronization) synchornizations.get(i)).afterCommit(); 153 } 154 } catch (JMSException e) { 155 throw e; 156 } catch (Throwable e) { 157 throw JMSExceptionSupport.create(e); 158 } 159 } 160 161 private void beforeEnd() throws JMSException { 162 if( synchornizations == null ) 163 return; 164 165 int size = synchornizations.size(); 166 try { 167 for (int i = 0; i < size; i++) { 168 ((Synchronization) synchornizations.get(i)).beforeEnd(); 169 } 170 } catch (JMSException e) { 171 throw e; 172 } catch (Throwable e) { 173 throw JMSExceptionSupport.create(e); 174 } 175 } 176 177 public TransactionId getTransactionId() { 178 return transactionId; 179 } 180 181 187 190 public void begin() throws JMSException { 191 192 if (isInXATransaction()) 193 throw new TransactionInProgressException ( 194 "Cannot start local transaction. XA transaction is already in progress."); 195 196 if (transactionId==null) { 197 synchornizations = null; 198 this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId()); 199 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 200 this.connection.ensureConnectionInfoSent(); 201 this.connection.asyncSendPacket(info); 202 203 if (localTransactionEventListener != null) { 205 localTransactionEventListener.beginEvent(); 206 } 207 } 208 } 209 210 220 public void rollback() throws JMSException { 221 if (isInXATransaction()) 222 throw new TransactionInProgressException ("Cannot rollback() if an XA transaction is already in progress "); 223 224 if (transactionId!=null) { 225 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 226 this.transactionId = null; 227 this.connection.asyncSendPacket(info); 228 if (localTransactionEventListener != null) { 230 localTransactionEventListener.rollbackEvent(); 231 } 232 } 233 234 afterRollback(); 235 } 236 237 250 public void commit() throws JMSException { 251 if (isInXATransaction()) 252 throw new TransactionInProgressException ("Cannot commit() if an XA transaction is already in progress "); 253 254 beforeEnd(); 255 256 if (transactionId!=null) { 258 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 259 this.transactionId = null; 260 this.connection.syncSendPacket(info); 262 if (localTransactionEventListener != null) { 263 localTransactionEventListener.commitEvent(); 264 } 265 afterCommit(); 266 } 267 } 268 269 277 public void start(Xid xid, int flags) throws XAException { 278 279 if( log.isDebugEnabled() ) 280 log.debug("Start: "+xid); 281 282 if (isInLocalTransaction()) 283 throw new XAException (XAException.XAER_PROTO); 284 285 if (associatedXid != null) { 287 throw new XAException (XAException.XAER_PROTO); 288 } 289 290 297 synchornizations = null; 299 setXid(xid); 300 } 301 302 305 private ConnectionId getConnectionId() { 306 return connection.getConnectionInfo().getConnectionId(); 307 } 308 309 public void end(Xid xid, int flags) throws XAException { 310 311 if( log.isDebugEnabled() ) 312 log.debug("End: "+xid); 313 314 if (isInLocalTransaction()) 315 throw new XAException (XAException.XAER_PROTO); 316 317 if ((flags & (TMSUSPEND|TMFAIL)) !=0 ) { 318 if (!equals(associatedXid, xid)) { 320 throw new XAException (XAException.XAER_PROTO); 321 } 322 323 try { 325 beforeEnd(); 326 } catch (JMSException e) { 327 throw toXAException(e); 328 } 329 setXid(null); 330 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 331 if (equals(associatedXid, xid)) { 334 try { 335 beforeEnd(); 336 } catch (JMSException e) { 337 throw toXAException(e); 338 } 339 setXid(null); 340 } 341 } else { 342 throw new XAException (XAException.XAER_INVAL); 343 } 344 } 345 346 private boolean equals(Xid xid1, Xid xid2) { 347 if( xid1 == xid2 ) 348 return true; 349 if( xid1==null ^ xid2==null ) 350 return false; 351 return xid1.getFormatId()==xid2.getFormatId() && 352 Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) && 353 Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 354 } 355 356 public int prepare(Xid xid) throws XAException { 357 if( log.isDebugEnabled() ) 358 log.debug("Prepare: "+xid); 359 360 XATransactionId x; 363 if (xid==null || (equals(associatedXid, xid)) ) { 366 throw new XAException (XAException.XAER_PROTO); 367 } else { 368 x = new XATransactionId(xid); 370 } 371 372 try { 373 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 374 375 IntegerResponse response = (IntegerResponse) this.connection.syncSendPacket(info); 377 return response.getResult(); 378 379 } catch (JMSException e) { 380 throw toXAException(e); 381 } 382 } 383 384 public void rollback(Xid xid) throws XAException { 385 386 if( log.isDebugEnabled() ) 387 log.debug("Rollback: "+xid); 388 389 XATransactionId x; 392 if (xid==null) { 393 throw new XAException (XAException.XAER_PROTO); 394 } 395 if (equals(associatedXid, xid)) { 396 x = (XATransactionId) transactionId; 399 } else { 400 x = new XATransactionId(xid); 401 } 402 403 try { 404 this.connection.checkClosedOrFailed(); 405 this.connection.ensureConnectionInfoSent(); 406 407 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 409 this.connection.syncSendPacket(info); 410 411 ArrayList l = (ArrayList ) endedXATransactionContexts.remove(x); 412 if( l!=null && !l.isEmpty()) { 413 for (Iterator iter = l.iterator(); iter.hasNext();) { 414 TransactionContext ctx = (TransactionContext) iter.next(); 415 ctx.afterRollback(); 416 } 417 } 418 419 } catch (JMSException e) { 420 throw toXAException(e); 421 } 422 } 423 424 public void commit(Xid xid, boolean onePhase) throws XAException { 426 427 if( log.isDebugEnabled() ) 428 log.debug("Commit: "+xid); 429 430 XATransactionId x; 433 if (xid==null || (equals(associatedXid, xid)) ) { 434 throw new XAException (XAException.XAER_PROTO); 437 } else { 438 x = new XATransactionId(xid); 439 } 440 441 442 try { 443 this.connection.checkClosedOrFailed(); 444 this.connection.ensureConnectionInfoSent(); 445 446 TransactionInfo info = new TransactionInfo(getConnectionId(), x, 448 onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 449 450 this.connection.syncSendPacket(info); 451 452 ArrayList l = (ArrayList ) endedXATransactionContexts.remove(x); 453 if( l!=null && !l.isEmpty()) { 454 for (Iterator iter = l.iterator(); iter.hasNext();) { 455 TransactionContext ctx = (TransactionContext) iter.next(); 456 ctx.afterCommit(); 457 } 458 } 459 460 } catch (JMSException e) { 461 throw toXAException(e); 462 } 463 464 } 465 466 public void forget(Xid xid) throws XAException { 467 if( log.isDebugEnabled() ) 468 log.debug("Forget: "+xid); 469 470 XATransactionId x; 473 if (xid==null) { 474 throw new XAException (XAException.XAER_PROTO); 475 } 476 if (equals(associatedXid, xid)) { 477 x = (XATransactionId) transactionId; 479 } else { 480 x = new XATransactionId(xid); 481 } 482 483 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 484 485 try { 486 this.connection.syncSendPacket(info); 488 } catch (JMSException e) { 489 throw toXAException(e); 490 } 491 } 492 493 public boolean isSameRM(XAResource xaResource) throws XAException { 494 if (xaResource == null) { 495 return false; 496 } 497 if (!(xaResource instanceof TransactionContext)) { 498 return false; 499 } 500 TransactionContext xar = (TransactionContext) xaResource; 501 try { 502 return getResourceManagerId().equals(xar.getResourceManagerId()); 503 } catch (Throwable e) { 504 throw (XAException ) new XAException ("Could not get resource manager id.").initCause(e); 505 } 506 } 507 508 public Xid [] recover(int flag) throws XAException { 509 if( log.isDebugEnabled() ) 510 log.debug("Recover: "+flag); 511 512 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 513 try { 514 this.connection.checkClosedOrFailed(); 515 this.connection.ensureConnectionInfoSent(); 516 517 DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info); 518 DataStructure[] data = receipt.getData(); 519 XATransactionId[] answer = null; 520 if (data instanceof XATransactionId[]) { 521 answer = (XATransactionId[]) data; 522 } 523 else { 524 answer = new XATransactionId[data.length]; 525 System.arraycopy(data, 0, answer, 0, data.length); 526 } 527 return answer; 528 } catch (JMSException e) { 529 throw toXAException(e); 530 } 531 } 532 533 public int getTransactionTimeout() throws XAException { 534 return 0; 535 } 536 537 public boolean setTransactionTimeout(int seconds) throws XAException { 538 return false; 539 } 540 541 private String getResourceManagerId() throws JMSException { 547 return this.connection.getResourceManagerId(); 548 } 549 550 private void setXid(Xid xid) throws XAException { 551 552 try { 553 this.connection.checkClosedOrFailed(); 554 this.connection.ensureConnectionInfoSent(); 555 } catch (JMSException e) { 556 throw toXAException(e); 557 } 558 559 if (xid != null) { 560 associatedXid = xid; 562 transactionId = new XATransactionId(xid); 563 564 TransactionInfo info = new TransactionInfo(connectionId,transactionId,TransactionInfo.BEGIN); 565 try { 566 this.connection.asyncSendPacket(info); 567 if( log.isDebugEnabled() ) 568 log.debug("Started XA transaction: "+transactionId); 569 } catch (JMSException e) { 570 throw toXAException(e); 571 } 572 573 } else { 574 575 if( transactionId!=null ) { 576 TransactionInfo info = new TransactionInfo(connectionId,transactionId,TransactionInfo.END); 577 try { 578 this.connection.syncSendPacket(info); 579 if( log.isDebugEnabled() ) 580 log.debug("Ended XA transaction: "+transactionId); 581 } catch (JMSException e) { 582 throw toXAException(e); 583 } 584 585 ArrayList l = (ArrayList ) endedXATransactionContexts.get(transactionId); 588 if( l==null ) { 589 l = new ArrayList (3); 590 endedXATransactionContexts.put(transactionId, l); 591 l.add(this); 592 } else if (!l.contains(this)) { 593 l.add(this); 594 } 595 } 596 597 associatedXid = null; 599 transactionId = null; 600 } 601 } 602 603 610 private XAException toXAException(JMSException e) { 611 if (e.getCause() != null && e.getCause() instanceof XAException ) { 612 XAException original = (XAException ) e.getCause(); 613 XAException xae = new XAException (original.getMessage()); 614 xae.errorCode = original.errorCode; 615 xae.initCause(original); 616 return xae; 617 } 618 619 XAException xae = new XAException (e.getMessage()); 620 xae.errorCode = XAException.XAER_RMFAIL; 621 xae.initCause(e); 622 return xae; 623 } 624 625 public ActiveMQConnection getConnection() { 626 return connection; 627 } 628 629 public void cleanup() { 630 associatedXid=null; 631 transactionId=null; 632 } 633 } 634 | Popular Tags |