KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > TransactionContext


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Arrays JavaDoc;
22 import java.util.Iterator JavaDoc;
23
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.TransactionInProgressException JavaDoc;
26 import javax.jms.TransactionRolledBackException JavaDoc;
27 import javax.transaction.xa.XAException JavaDoc;
28 import javax.transaction.xa.XAResource JavaDoc;
29 import javax.transaction.xa.Xid JavaDoc;
30
31 import org.apache.activemq.command.ConnectionId;
32 import org.apache.activemq.command.DataArrayResponse;
33 import org.apache.activemq.command.IntegerResponse;
34 import org.apache.activemq.command.LocalTransactionId;
35 import org.apache.activemq.command.TransactionId;
36 import org.apache.activemq.command.TransactionInfo;
37 import org.apache.activemq.command.XATransactionId;
38 import org.apache.activemq.command.DataStructure;
39 import org.apache.activemq.transaction.Synchronization;
40 import org.apache.activemq.util.JMSExceptionSupport;
41 import org.apache.activemq.util.LongSequenceGenerator;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44
45 import java.util.concurrent.ConcurrentHashMap JavaDoc;
46
47 /**
48  * A TransactionContext provides the means to control a JMS transaction. It
49  * provides a local transaction interface and also an XAResource interface.
50  *
51  * <p/> An application server controls the transactional assignment of an
52  * XASession by obtaining its XAResource. It uses the XAResource to assign the
53  * session to a transaction, prepare and commit work on the transaction, and so
54  * on. <p/> An XAResource provides some fairly sophisticated facilities for
55  * interleaving work on multiple transactions, recovering a list of transactions
56  * in progress, and so on. A JTA aware JMS provider must fully implement this
57  * functionality. This could be done by using the services of a database that
58  * supports XA, or a JMS provider may choose to implement this functionality
59  * from scratch. <p/>
60  *
61  * @version $Revision: 1.10 $
62  * @see javax.jms.Session
63  * @see javax.jms.QueueSession
64  * @see javax.jms.TopicSession
65  * @see javax.jms.XASession
66  */

67 public class TransactionContext implements XAResource JavaDoc {
68
69     static final private Log log = LogFactory.getLog(TransactionContext.class);
70     
71     // XATransactionId -> ArrayList of TransactionContext objects
72
private static final ConcurrentHashMap JavaDoc endedXATransactionContexts = new ConcurrentHashMap JavaDoc();
73
74     private final ActiveMQConnection connection;
75     private final LongSequenceGenerator localTransactionIdGenerator;
76     private final ConnectionId connectionId;
77     private ArrayList JavaDoc synchornizations;
78
79     // To track XA transactions.
80
private Xid JavaDoc associatedXid;
81     private TransactionId transactionId;
82     private LocalTransactionEventListener localTransactionEventListener;
83     
84
85     public TransactionContext(ActiveMQConnection connection) {
86         this.connection = connection;
87         this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
88         this.connectionId = connection.getConnectionInfo().getConnectionId();
89     }
90
91     public boolean isInXATransaction() {
92         return transactionId != null && transactionId.isXATransaction();
93     }
94
95     public boolean isInLocalTransaction() {
96         return transactionId != null && transactionId.isLocalTransaction();
97     }
98
99     /**
100      * @return Returns the localTransactionEventListener.
101      */

102     public LocalTransactionEventListener getLocalTransactionEventListener() {
103         return localTransactionEventListener;
104     }
105
106     /**
107      * Used by the resource adapter to listen to transaction events.
108      *
109      * @param localTransactionEventListener
110      * The localTransactionEventListener to set.
111      */

112     public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
113         this.localTransactionEventListener = localTransactionEventListener;
114     }
115
116     // ///////////////////////////////////////////////////////////
117
//
118
// Methods that work with the Synchronization objects registered with
119
// the transaction.
120
//
121
// ///////////////////////////////////////////////////////////
122

123     public void addSynchronization(Synchronization s) {
124         if( synchornizations == null )
125             synchornizations = new ArrayList JavaDoc(10);
126         synchornizations.add(s);
127     }
128     
129     private void afterRollback() throws JMSException JavaDoc {
130         if( synchornizations == null )
131             return;
132         
133         int size = synchornizations.size();
134         try {
135             for (int i = 0; i < size; i++) {
136                 ((Synchronization) synchornizations.get(i)).afterRollback();
137             }
138         } catch (JMSException JavaDoc e) {
139             throw e;
140         } catch (Throwable JavaDoc e) {
141             throw JMSExceptionSupport.create(e);
142         }
143     }
144
145     private void afterCommit() throws JMSException JavaDoc {
146         if( synchornizations == null )
147             return;
148
149         int size = synchornizations.size();
150         try {
151             for (int i = 0; i < size; i++) {
152                 ((Synchronization) synchornizations.get(i)).afterCommit();
153             }
154         } catch (JMSException JavaDoc e) {
155             throw e;
156         } catch (Throwable JavaDoc e) {
157             throw JMSExceptionSupport.create(e);
158         }
159     }
160
161     private void beforeEnd() throws JMSException JavaDoc {
162         if( synchornizations == null )
163             return;
164
165         int size = synchornizations.size();
166         try {
167             for (int i = 0; i < size; i++) {
168                 ((Synchronization) synchornizations.get(i)).beforeEnd();
169             }
170         } catch (JMSException JavaDoc e) {
171             throw e;
172         } catch (Throwable JavaDoc e) {
173             throw JMSExceptionSupport.create(e);
174         }
175     }
176
177     public TransactionId getTransactionId() {
178         return transactionId;
179     }
180
181     // ///////////////////////////////////////////////////////////
182
//
183
// Local transaction interface.
184
//
185
// ///////////////////////////////////////////////////////////
186

187     /**
188      * Start a local transaction.
189      */

190     public void begin() throws JMSException JavaDoc {
191
192         if (isInXATransaction())
193             throw new TransactionInProgressException JavaDoc(
194                     "Cannot start local transaction. XA transaction is already in progress.");
195
196         if (transactionId==null) {
197             synchornizations = null;
198             this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
199             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
200             this.connection.ensureConnectionInfoSent();
201             this.connection.asyncSendPacket(info);
202
203             // Notify the listener that the tx was started.
204
if (localTransactionEventListener != null) {
205                 localTransactionEventListener.beginEvent();
206             }
207         }
208     }
209
210     /**
211      * Rolls back any work done in this transaction and releases any locks
212      * currently held.
213      *
214      * @throws JMSException
215      * if the JMS provider fails to roll back the transaction due to
216      * some internal error.
217      * @throws javax.jms.IllegalStateException
218      * if the method is not called by a transacted session.
219      */

220     public void rollback() throws JMSException JavaDoc {
221         if (isInXATransaction())
222             throw new TransactionInProgressException JavaDoc("Cannot rollback() if an XA transaction is already in progress ");
223
224         if (transactionId!=null) {
225             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
226             this.transactionId = null;
227             this.connection.asyncSendPacket(info);
228             // Notify the listener that the tx was rolled back
229
if (localTransactionEventListener != null) {
230                 localTransactionEventListener.rollbackEvent();
231             }
232         }
233         
234         afterRollback();
235     }
236
237     /**
238      * Commits all work done in this transaction and releases any locks
239      * currently held.
240      *
241      * @throws JMSException
242      * if the JMS provider fails to commit the transaction due to
243      * some internal error.
244      * @throws TransactionRolledBackException
245      * if the transaction is rolled back due to some internal error
246      * during commit.
247      * @throws javax.jms.IllegalStateException
248      * if the method is not called by a transacted session.
249      */

250     public void commit() throws JMSException JavaDoc {
251         if (isInXATransaction())
252             throw new TransactionInProgressException JavaDoc("Cannot commit() if an XA transaction is already in progress ");
253
254         beforeEnd();
255         
256         // Only send commit if the transaction was started.
257
if (transactionId!=null) {
258             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
259             this.transactionId = null;
260             // Notify the listener that the tx was committed back
261
this.connection.syncSendPacket(info);
262             if (localTransactionEventListener != null) {
263                 localTransactionEventListener.commitEvent();
264             }
265             afterCommit();
266         }
267     }
268
269     // ///////////////////////////////////////////////////////////
270
//
271
// XAResource Implementation
272
//
273
// ///////////////////////////////////////////////////////////
274
/**
275      * Associates a transaction with the resource.
276      */

277     public void start(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
278         
279         if( log.isDebugEnabled() )
280             log.debug("Start: "+xid);
281         
282         if (isInLocalTransaction())
283             throw new XAException JavaDoc(XAException.XAER_PROTO);
284
285         // Are we already associated?
286
if (associatedXid != null) {
287             throw new XAException JavaDoc(XAException.XAER_PROTO);
288         }
289
290 // if ((flags & TMJOIN) == TMJOIN) {
291
// // TODO: verify that the server has seen the xid
292
// }
293
// if ((flags & TMJOIN) == TMRESUME) {
294
// // TODO: verify that the xid was suspended.
295
// }
296

297         // associate
298
synchornizations = null;
299         setXid(xid);
300     }
301
302     /**
303      * @return
304      */

305     private ConnectionId getConnectionId() {
306         return connection.getConnectionInfo().getConnectionId();
307     }
308
309     public void end(Xid JavaDoc xid, int flags) throws XAException JavaDoc {
310         
311         if( log.isDebugEnabled() )
312             log.debug("End: "+xid);
313
314         if (isInLocalTransaction())
315             throw new XAException JavaDoc(XAException.XAER_PROTO);
316
317         if ((flags & (TMSUSPEND|TMFAIL)) !=0 ) {
318             // You can only suspend the associated xid.
319
if (!equals(associatedXid, xid)) {
320                 throw new XAException JavaDoc(XAException.XAER_PROTO);
321             }
322
323             // TODO: we may want to put the xid in a suspended list.
324
try {
325                 beforeEnd();
326             } catch (JMSException JavaDoc e) {
327                 throw toXAException(e);
328             }
329             setXid(null);
330         } else if ((flags & TMSUCCESS) == TMSUCCESS) {
331             // set to null if this is the current xid.
332
// otherwise this could be an asynchronous success call
333
if (equals(associatedXid, xid)) {
334                 try {
335                     beforeEnd();
336                 } catch (JMSException JavaDoc e) {
337                     throw toXAException(e);
338                 }
339                 setXid(null);
340             }
341         } else {
342             throw new XAException JavaDoc(XAException.XAER_INVAL);
343         }
344     }
345
346     private boolean equals(Xid JavaDoc xid1, Xid JavaDoc xid2) {
347         if( xid1 == xid2 )
348             return true;
349         if( xid1==null ^ xid2==null )
350             return false;
351         return xid1.getFormatId()==xid2.getFormatId() &&
352                 Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) &&
353                 Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
354     }
355
356     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc {
357         if( log.isDebugEnabled() )
358             log.debug("Prepare: "+xid);
359
360         // We allow interleaving multiple transactions, so
361
// we don't limit prepare to the associated xid.
362
XATransactionId x;
363         // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
364
// called first
365
if (xid==null || (equals(associatedXid, xid)) ) {
366             throw new XAException JavaDoc(XAException.XAER_PROTO);
367         } else {
368             // TODO: cache the known xids so we don't keep recreating this one??
369
x = new XATransactionId(xid);
370         }
371
372         try {
373             TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
374             
375             // Find out if the server wants to commit or rollback.
376
IntegerResponse response = (IntegerResponse) this.connection.syncSendPacket(info);
377             return response.getResult();
378             
379         } catch (JMSException JavaDoc e) {
380             throw toXAException(e);
381         }
382     }
383
384     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc {
385         
386         if( log.isDebugEnabled() )
387             log.debug("Rollback: "+xid);
388
389         // We allow interleaving multiple transactions, so
390
// we don't limit rollback to the associated xid.
391
XATransactionId x;
392         if (xid==null) {
393             throw new XAException JavaDoc(XAException.XAER_PROTO);
394         }
395         if (equals(associatedXid, xid)) {
396             // I think this can happen even without an end(xid) call. Need to
397
// check spec.
398
x = (XATransactionId) transactionId;
399         } else {
400             x = new XATransactionId(xid);
401         }
402
403         try {
404             this.connection.checkClosedOrFailed();
405             this.connection.ensureConnectionInfoSent();
406
407             // Let the server know that the tx is rollback.
408
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
409             this.connection.syncSendPacket(info);
410
411             ArrayList JavaDoc l = (ArrayList JavaDoc) endedXATransactionContexts.remove(x);
412             if( l!=null && !l.isEmpty()) {
413                 for (Iterator JavaDoc iter = l.iterator(); iter.hasNext();) {
414                     TransactionContext ctx = (TransactionContext) iter.next();
415                     ctx.afterRollback();
416                 }
417             }
418             
419         } catch (JMSException JavaDoc e) {
420             throw toXAException(e);
421         }
422     }
423
424     // XAResource interface
425
public void commit(Xid JavaDoc xid, boolean onePhase) throws XAException JavaDoc {
426         
427         if( log.isDebugEnabled() )
428             log.debug("Commit: "+xid);
429
430         // We allow interleaving multiple transactions, so
431
// we don't limit commit to the associated xid.
432
XATransactionId x;
433         if (xid==null || (equals(associatedXid, xid)) ) {
434             // should never happen, end(xid,TMSUCCESS) must have been previously
435
// called
436
throw new XAException JavaDoc(XAException.XAER_PROTO);
437         } else {
438             x = new XATransactionId(xid);
439         }
440
441
442         try {
443             this.connection.checkClosedOrFailed();
444             this.connection.ensureConnectionInfoSent();
445             
446             // Notify the server that the tx was committed back
447
TransactionInfo info = new TransactionInfo(getConnectionId(), x,
448                     onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
449             
450             this.connection.syncSendPacket(info);
451             
452             ArrayList JavaDoc l = (ArrayList JavaDoc) endedXATransactionContexts.remove(x);
453             if( l!=null && !l.isEmpty()) {
454                 for (Iterator JavaDoc iter = l.iterator(); iter.hasNext();) {
455                     TransactionContext ctx = (TransactionContext) iter.next();
456                     ctx.afterCommit();
457                 }
458             }
459             
460         } catch (JMSException JavaDoc e) {
461             throw toXAException(e);
462         }
463
464     }
465
466     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
467         if( log.isDebugEnabled() )
468             log.debug("Forget: "+xid);
469
470         // We allow interleaving multiple transactions, so
471
// we don't limit forget to the associated xid.
472
XATransactionId x;
473         if (xid==null) {
474             throw new XAException JavaDoc(XAException.XAER_PROTO);
475         }
476         if (equals(associatedXid, xid)) {
477             // TODO determine if this can happen... I think not.
478
x = (XATransactionId) transactionId;
479         } else {
480             x = new XATransactionId(xid);
481         }
482
483         TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
484         
485         try {
486             // Tell the server to forget the transaction.
487
this.connection.syncSendPacket(info);
488         } catch (JMSException JavaDoc e) {
489             throw toXAException(e);
490         }
491     }
492
493     public boolean isSameRM(XAResource JavaDoc xaResource) throws XAException JavaDoc {
494         if (xaResource == null) {
495             return false;
496         }
497         if (!(xaResource instanceof TransactionContext)) {
498             return false;
499         }
500         TransactionContext xar = (TransactionContext) xaResource;
501         try {
502             return getResourceManagerId().equals(xar.getResourceManagerId());
503         } catch (Throwable JavaDoc e) {
504             throw (XAException JavaDoc) new XAException JavaDoc("Could not get resource manager id.").initCause(e);
505         }
506     }
507
508     public Xid JavaDoc[] recover(int flag) throws XAException JavaDoc {
509         if( log.isDebugEnabled() )
510             log.debug("Recover: "+flag);
511         
512         TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
513         try {
514             this.connection.checkClosedOrFailed();
515             this.connection.ensureConnectionInfoSent();
516
517             DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
518             DataStructure[] data = receipt.getData();
519             XATransactionId[] answer = null;
520             if (data instanceof XATransactionId[]) {
521                 answer = (XATransactionId[]) data;
522             }
523             else {
524                 answer = new XATransactionId[data.length];
525                 System.arraycopy(data, 0, answer, 0, data.length);
526             }
527             return answer;
528         } catch (JMSException JavaDoc e) {
529             throw toXAException(e);
530         }
531     }
532
533     public int getTransactionTimeout() throws XAException JavaDoc {
534         return 0;
535     }
536
537     public boolean setTransactionTimeout(int seconds) throws XAException JavaDoc {
538         return false;
539     }
540
541     // ///////////////////////////////////////////////////////////
542
//
543
// Helper methods.
544
//
545
// ///////////////////////////////////////////////////////////
546
private String JavaDoc getResourceManagerId() throws JMSException JavaDoc {
547         return this.connection.getResourceManagerId();
548     }
549
550     private void setXid(Xid JavaDoc xid) throws XAException JavaDoc {
551         
552         try {
553             this.connection.checkClosedOrFailed();
554             this.connection.ensureConnectionInfoSent();
555         } catch (JMSException JavaDoc e) {
556             throw toXAException(e);
557         }
558
559         if (xid != null) {
560             // associate
561
associatedXid = xid;
562             transactionId = new XATransactionId(xid);
563             
564             TransactionInfo info = new TransactionInfo(connectionId,transactionId,TransactionInfo.BEGIN);
565             try {
566                 this.connection.asyncSendPacket(info);
567                 if( log.isDebugEnabled() )
568                     log.debug("Started XA transaction: "+transactionId);
569             } catch (JMSException JavaDoc e) {
570                 throw toXAException(e);
571             }
572
573         } else {
574             
575             if( transactionId!=null ) {
576                 TransactionInfo info = new TransactionInfo(connectionId,transactionId,TransactionInfo.END);
577                 try {
578                     this.connection.syncSendPacket(info);
579                     if( log.isDebugEnabled() )
580                         log.debug("Ended XA transaction: "+transactionId);
581                 } catch (JMSException JavaDoc e) {
582                     throw toXAException(e);
583                 }
584                 
585                 // Add our self to the list of contexts that are interested in
586
// post commit/rollback events.
587
ArrayList JavaDoc l = (ArrayList JavaDoc) endedXATransactionContexts.get(transactionId);
588                 if( l==null ) {
589                     l = new ArrayList JavaDoc(3);
590                     endedXATransactionContexts.put(transactionId, l);
591                 l.add(this);
592                 } else if (!l.contains(this)) {
593                     l.add(this);
594                 }
595             }
596
597             // dis-associate
598
associatedXid = null;
599             transactionId = null;
600         }
601     }
602
603     /**
604      * Converts a JMSException from the server to an XAException. if the
605      * JMSException contained a linked XAException that is returned instead.
606      *
607      * @param e
608      * @return
609      */

610     private XAException JavaDoc toXAException(JMSException JavaDoc e) {
611         if (e.getCause() != null && e.getCause() instanceof XAException JavaDoc) {
612             XAException JavaDoc original = (XAException JavaDoc) e.getCause();
613             XAException JavaDoc xae = new XAException JavaDoc(original.getMessage());
614             xae.errorCode = original.errorCode;
615             xae.initCause(original);
616             return xae;
617         }
618
619         XAException JavaDoc xae = new XAException JavaDoc(e.getMessage());
620         xae.errorCode = XAException.XAER_RMFAIL;
621         xae.initCause(e);
622         return xae;
623     }
624
625     public ActiveMQConnection getConnection() {
626         return connection;
627     }
628
629     public void cleanup() {
630         associatedXid=null;
631         transactionId=null;
632     }
633 }
634
Popular Tags