KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > jms > TransactionContext


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Coridan.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.api.jms;
47
48 import java.io.ByteArrayInputStream JavaDoc;
49 import java.io.ByteArrayOutputStream JavaDoc;
50 import java.io.IOException JavaDoc;
51 import java.io.ObjectInputStream JavaDoc;
52 import java.io.ObjectOutputStream JavaDoc;
53 import java.util.ArrayList JavaDoc;
54 import java.util.Iterator JavaDoc;
55 import java.util.Collection JavaDoc;
56
57 import javax.jms.IllegalStateException JavaDoc;
58 import javax.jms.Session JavaDoc;
59 import javax.jms.JMSException JavaDoc;
60 import javax.jms.TransactionInProgressException JavaDoc;
61 import javax.jms.TransactionRolledBackException JavaDoc;
62 import javax.transaction.xa.XAException JavaDoc;
63 import javax.transaction.xa.XAResource JavaDoc;
64 import javax.transaction.xa.Xid JavaDoc;
65
66 import org.apache.commons.logging.Log;
67 import org.apache.commons.logging.LogFactory;
68 import org.mr.MantaAgent;
69 import org.mr.core.persistent.PersistentMap;
70 import org.mr.core.protocol.MantaBusMessage;
71 import org.mr.core.util.byteable.ByteableByteArray;
72
73 /**
74  * A TransactionContext provides the means to control a JMS transaction. It provides
75  * a local transaction interface and also an XAResource interface.
76  *
77  * <p/>
78  * An application server controls the transactional assignment of an XASession
79  * by obtaining its XAResource. It uses the XAResource to assign the session
80  * to a transaction, prepare and commit work on the transaction, and so on.
81  * <p/>
82  * An XAResource provides some fairly sophisticated facilities for
83  * interleaving work on multiple transactions, recovering a list of
84  * transactions in progress, and so on. A JTA aware JMS provider must fully
85  * implement this functionality. This could be done by using the services of a
86  * database that supports XA, or a JMS provider may choose to implement this
87  * functionality from scratch.
88  * <p/>
89  *
90  * @version $Revision: 1.1.1.1 $
91  * @see javax.jms.Session
92  * @see javax.jms.QueueSession
93  * @see javax.jms.TopicSession
94  * @see javax.jms.XASession
95  */

96 public class TransactionContext implements XAResource JavaDoc {
97     
98     private static final Log log = LogFactory.getLog(TransactionContext.class);
99     
100     static final int SUSPENDED = 1;
101     static final int SUCCESS = 2;
102     static final int ROLLBACK_ONLY = 3;
103     static final int PREPARED = 4;
104       
105     private ArrayList JavaDoc sessions = new ArrayList JavaDoc(2);
106     private LocalTransactionIDGenerator idGenerator;
107     
108     protected PersistentMap transactionsTable;
109
110     // To track XA transactions.
111
private Xid JavaDoc associatedXid;
112
113     // To track local transactions.
114
private String JavaDoc localTransactionId;
115     
116     private String JavaDoc resourceName;
117
118     private LocalTransactionEventListener localTransactionEventListener;
119     
120     // used for session scope
121
public TransactionContext() {
122         transactionsTable = new PersistentMap("xacs", false, true);
123         idGenerator = new LocalTransactionIDGenerator(MantaAgent.getInstance().getMessageId());
124         resourceName = MantaAgent.getInstance().getAgentName();
125     }
126     
127     public boolean isInXATransaction() {
128         return associatedXid != null;
129     }
130     
131     public boolean isInLocalTransaction() {
132         return localTransactionId != null;
133     }
134     
135     
136     /**
137      * @return Returns the localTransactionEventListener.
138      */

139     public LocalTransactionEventListener getLocalTransactionEventListener() {
140         return localTransactionEventListener;
141     }
142
143     /**
144      * Used by the resource adapter to listen to transaction events.
145      *
146      * @param localTransactionEventListener The localTransactionEventListener to set.
147      */

148     public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
149         this.localTransactionEventListener = localTransactionEventListener;
150     }
151     
152     public String JavaDoc getResourceName() {
153         return this.resourceName;
154     }
155
156     /////////////////////////////////////////////////////////////
157
//
158
// Methods that interface with the session
159
//
160
/////////////////////////////////////////////////////////////
161
public void addSession(MantaSession session) {
162         sessions.add(session);
163     }
164
165     
166     public void removeSession(MantaSession session) {
167         sessions.remove(session);
168     }
169
170     
171     public Object JavaDoc getTransactionId() {
172         if (localTransactionId != null)
173             return localTransactionId;
174         return associatedXid;
175     }
176         
177     /////////////////////////////////////////////////////////////
178
//
179
// Local transaction interface.
180
//
181
/////////////////////////////////////////////////////////////
182

183     /**
184      * Start a local transaction.
185      */

186     public void begin() throws JMSException JavaDoc {
187         Iterator JavaDoc i = sessions.iterator();
188         while (i.hasNext()) {
189             MantaSession session = (MantaSession)i.next();
190             session.checkLegalOperation();
191             if (session.sessionAcknowledgementMode != Session.SESSION_TRANSACTED)
192                 throw new IllegalStateException JavaDoc("MNJMS00175 : FAILED ON METHOD begin(). SESSION IS NOT TRANSACTED.");
193         }
194         
195         if (associatedXid != null)
196             throw new TransactionInProgressException JavaDoc("Cannot start local transction. XA transaction is already in progress.");
197         
198         if (localTransactionId == null) {
199             localTransactionId = idGenerator.nextID();
200             
201             // Notify the listener that the tx was started.
202
if (localTransactionEventListener != null) {
203                 localTransactionEventListener.beginEvent(localTransactionId);
204             }
205             if( log.isDebugEnabled() )
206                 log.debug("Started local transaction: "+localTransactionId);
207         }
208     }
209     
210     /**
211      * Rolls back any messages done in this transaction and releases any locks currently held.
212      *
213      * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
214      * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
215      */

216     public void rollback() throws JMSException JavaDoc {
217         Iterator JavaDoc i = sessions.iterator();
218         while (i.hasNext()) {
219             MantaSession session = (MantaSession)i.next();
220             session.checkLegalOperation();
221             if (session.sessionAcknowledgementMode != Session.SESSION_TRANSACTED)
222                 throw new IllegalStateException JavaDoc("MNJMS00075 : FAILED ON METHOD rollback(). SESSION IS NOT TRANSACTED.");
223         }
224         
225         if (associatedXid != null)
226             throw new TransactionInProgressException JavaDoc("Cannot rollback local transction. XA transaction is already in progress.");
227         
228         if (localTransactionId != null) {
229             // Notify the listener that the tx was rolled back
230
if (localTransactionEventListener != null) {
231                 localTransactionEventListener.rollbackEvent(localTransactionId);
232             }
233             if (log.isDebugEnabled())
234                 log.debug("Rolledback local transaction: "+localTransactionId);
235             localTransactionId = null;
236         }
237         Iterator JavaDoc i2 = sessions.iterator();
238         while (i2.hasNext()) {
239             MantaSession session = (MantaSession)i2.next();
240             session.rollbackSession();
241         }
242     }
243     
244         
245     /**
246      * Commits all messages done in this transaction and releases any locks currently held.
247      *
248      * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
249      * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
250      * commit.
251      * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
252      */

253     public void commit() throws JMSException JavaDoc {
254         Iterator JavaDoc i = sessions.iterator();
255         while (i.hasNext()) {
256             MantaSession session = (MantaSession)i.next();
257             session.checkLegalOperation();
258             if (session.sessionAcknowledgementMode != Session.SESSION_TRANSACTED)
259                 throw new IllegalStateException JavaDoc("MNJMS00074 : FAILED ON METHOD commit(). SESSION IS NOT TRANSACTED.");
260         }
261         
262         if (associatedXid != null)
263             throw new TransactionInProgressException JavaDoc("Cannot commit local transction. XA transaction is already in progress.");
264
265         // Only send commit if the transaction was started.
266
if (localTransactionId != null) {
267             // Notify the listener that the tx was commited back
268
if (localTransactionEventListener != null) {
269                 localTransactionEventListener.commitEvent(localTransactionId);
270             }
271             if( log.isDebugEnabled() )
272                 log.debug("Committed local transaction: "+localTransactionId);
273             localTransactionId = null;
274         }
275         Iterator JavaDoc i2 = sessions.iterator();
276         while (i2.hasNext()) {
277             MantaSession session = (MantaSession)i2.next();
278             session.commitSession();
279         }
280     }
281     
282
283     
284     /**
285      * from API: start or resume a transaction.
286      *
287      */

288     public void start(Xid JavaDoc xid, int flags) throws XAException JavaDoc
289     {
290         // a new transaction can't be started while in the process
291
// of another transaction - XA or local
292
if (associatedXid != null) {
293             throw new XAException JavaDoc("MNJMS000B4 : FAILED ON METHOD start(). RESOURCE "+xid+" IS CURRENTLY ACTIVE IN TRANSACTION.");
294         }
295         
296         if (localTransactionId != null) {
297             throw new XAException JavaDoc("MNJMS000B4 : FAILED ON METHOD start(). RESOURCE "+xid+" IS CURRENTLY ACTIVE IN LOCAL TRANSACTION.");
298         }
299         
300         // If the resource is not active now, and it hasn't yet been started
301
// ever - tell the session to start upon it.
302
// check flags value
303
if (flags == TMNOFLAGS)
304             this.getInvolvedIn(xid);
305         
306         // if the resource was already active once, and got suspended, revoke it.
307
// however - it might not be suspended, and if so - no start() is done.
308
else if (flags == TMRESUME) {
309             if (this.getTransactionStatus(xid) != TransactionContext.SUSPENDED)
310                 throw new XAException JavaDoc("MNJMS000B5 : FAILED ON METHOD start(). RESOURCE "+xid+" IS NOT SUSPENDED.");
311             
312             // if we believe that the resource was suspended, better check
313
// with the session that an XAContent is attached to it.
314
// if not - the session will throw an exception.
315
this.resumeTransaction(xid);
316         }
317         
318         else if (flags == TMJOIN) {
319             log.info("got TMJOIN flag!! continue!");
320             //this.resumeTransaction(xid);
321
}
322         
323         else // flags value is not Kosher
324
throw new XAException JavaDoc("MNJMS000B6 : FAILED ON METHOD start(). FLAGS VALUE CORRUPTED : "+flags);
325         
326         // the resource is eligible for (re)starting.
327
associatedXid = xid;
328         if (log.isDebugEnabled())
329             log.debug("Started XAResource with Xid "+xid);
330     }
331     
332     
333     /**
334      * from API: stop or suspend a transaction.
335      *
336      */

337     public void end(Xid JavaDoc xid, int flags) throws XAException JavaDoc
338     {
339         // Check that the end() is called on a transaction that is really active
340
// right now.
341

342         // This is a geronimo patch because from some reason the end() method
343
// is called twice. The second time when associatedXid=null. We don't
344
// want to throw an exception so we just return.
345
if (associatedXid == null) {
346             MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
347             if (xaMsgs != null && xaMsgs.getStatus() == TransactionContext.SUSPENDED) {
348                 associatedXid = xid;
349                 if (log.isDebugEnabled()) {
350                     log.debug("Method end() was called for a suspended Xid. Reassociating Xid: "+xid);
351                 }
352             }
353             else {
354                 throw new XAException JavaDoc("MNJMS000B7 : FAILED ON METHOD end(). RESOURCE "+xid+" IS NOT ACTIVE IN CURRENT TRANSACTION.");
355             }
356         }
357         
358         if (!xid.equals(associatedXid))
359             throw new XAException JavaDoc("MNJMS000B7 : FAILED ON METHOD end(). RESOURCE "+xid+" IS NOT ACTIVE IN CURRENT TRANSACTION.");
360         
361         // check flags and act accordingly
362
if (flags == TMSUSPEND) {
363             this.setTransactionStatus(xid, TransactionContext.SUSPENDED);
364         }
365         else if (flags == TMFAIL) {
366             this.setTransactionStatus(xid, TransactionContext.ROLLBACK_ONLY);
367         }
368         else if (flags == TMSUCCESS) {
369             this.setTransactionStatus(xid, TransactionContext.SUCCESS);
370         }
371         else {
372             throw new XAException JavaDoc("MNJMS000B8 : FAILED ON METHOD start(). FLAGS VALUE CORRUPTED : "+flags);
373         }
374         
375         // after changing the status for the transaction, knowing the
376
// transaction really exists - we need to add the objects that the
377
// session is keeping for the transaction to the content. this is
378
// done by saveTransaction().
379
// notice - this does not persist anything to the filesystem yet.
380
this.saveTransaction(xid);
381         
382         // now- update the XAResource's fields to reflect the end() status.
383
// the XAResource is only one, and it takes the value of the specific
384
// xid that's there. therefore, upon end() - the XAResource is freed
385
// for use by anyone.
386
associatedXid = null;
387         if (log.isDebugEnabled())
388             log.debug("end() called for Xid "+xid);
389     }
390     
391     
392     /**
393      * from API: prepare the resource for commit.
394      * this method will end in persisting the messages in the content to the
395      * disk. after this is done, the prepare() will return.
396      * Return Values: XA_RDONLY or XA_OK.
397      */

398     public int prepare(Xid JavaDoc xid) throws XAException JavaDoc
399     {
400         if (log.isDebugEnabled())
401             log.debug("prepare() called on Xid "+xid);
402         
403         if (sessions.isEmpty()) {
404             throw new XAException JavaDoc(XAException.XAER_PROTO);
405         }
406         
407         // The xid must be in the SUCCESS mode for persisting.
408
int transactionStatus = this.getTransactionStatus(xid);
409         if (transactionStatus != TransactionContext.SUCCESS &&
410             transactionStatus != TransactionContext.SUSPENDED) // geronimo patch
411
throw new XAException JavaDoc("MNJMS000B9 :FAILED ON METHOD prepare(). TRANSACTION STATE ILLEGAL FOR PREPARE ACTION. REQUIRED: SUCCESS, RETRIEVED: "+this.getTransactionStatus(xid));
412         
413         int statusReturn = XAResource.XA_OK; //the return from this method.
414

415         // If the resource was in the SUCCESS state - the session should prepare it
416
// for the commit action.
417
try {
418             // the statusReturn gets the result of XA_OK or XA_RDONLY.
419
// it might also return a TMFAILED - not sure that this is what
420
// should return on failure. need to check.
421

422             statusReturn = this.prepareTransaction(xid);
423             // the status is set when the transaction is save to the disk
424
// in the xaSession.prepareTransaction(xid) method.
425
// cancelled: xaSession.setTransactionStatus(xid, PREPARED);
426

427         }
428         catch (Exception JavaDoc e) {
429             // if an error occured while preparing -
430
this.setTransactionStatus(xid, TransactionContext.ROLLBACK_ONLY);
431             //statusReturn = ROLLBACK_ONLY;
432
throw new XAException JavaDoc("MNTA10236Z EXCEPTION DURING PREPARE: "+e.getMessage());
433         }
434         
435         if (log.isDebugEnabled())
436             log.debug("prepare() completed on Xid "+xid+" , RC="+statusReturn);
437         return statusReturn;
438     }
439     
440     
441     /**
442      * from API : commit the resource.
443      *
444      */

445     public void commit(Xid JavaDoc xid, boolean onePhase) throws XAException JavaDoc
446     {
447         // in the case of one-phase optimization, a prepare must be done
448
// on the xid.
449
if (log.isDebugEnabled())
450             log.debug("commit() requested for Xid "+xid);
451         
452         if (sessions.isEmpty()) {
453             throw new XAException JavaDoc(XAException.XAER_PROTO);
454         }
455         
456         if (onePhase) {
457             try {
458                 prepare(xid);
459             }
460             catch (Exception JavaDoc e) {
461                 throw new XAException JavaDoc("MNTA10232E EXCEPTION DURING 1P PREPARE "
462                         + e.getMessage());
463             }
464         }
465         
466         // no matter how we got here - the xid must be prepared now. if it isn't -
467
// well, an exception must be throw.
468
if (this.getTransactionStatus(xid) != TransactionContext.PREPARED) {
469             throw new XAException JavaDoc("MNTA10234E TRANSACTION STATUS DISALLOWS PREPARE. "+
470                     "EXPECTED: PREPARED, RECEIVED "+this.getTransactionStatus(xid));
471         }
472         
473         // if the resource is prepared for committing- the session is called to
474
// commit and remove it.
475

476         try {
477             this.commitTransaction(xid);
478             // removing the transaction is done in the commit method.
479
// cancelled: xaSession.removeTransaction(xid);
480
}
481         catch (Exception JavaDoc e) {
482             this.setTransactionStatus(xid, TransactionContext.ROLLBACK_ONLY);
483             throw new XAException JavaDoc("MNTA10236E EXCEPTION DURING 2P COMMIT: "+e.getMessage());
484         }
485         
486         if (log.isDebugEnabled())
487             log.debug("commit completed for Xid "+xid);
488     }
489     
490     
491     /**
492      * from API: get the array of the ids of the prepared transactions
493      * this resource is involved in.
494      *
495      */

496     public Xid JavaDoc[] recover(int flag) throws XAException JavaDoc
497     {
498         if (sessions.isEmpty()) {
499             throw new XAException JavaDoc(XAException.XAER_PROTO);
500         }
501         
502         if (flag != XAResource.TMSTARTRSCAN && flag != XAResource.TMENDRSCAN)
503             //throw new XAException("MNTA10224E INVALID FLAG VALUE: "+flag);
504
return null;
505         
506         return this.getPreparedTransactions();
507     }
508     
509     
510     /**
511      * from API: roll back the transaction.
512      *
513      */

514     public void rollback(Xid JavaDoc xid) throws XAException JavaDoc
515     {
516         if (log.isDebugEnabled())
517             log.debug("rollback() called for Xid "+xid);
518         
519         if (associatedXid != null && xid.equals(associatedXid)) {
520             //this.saveTransaction(xid);
521
associatedXid = null;
522         }
523         
524         try {
525             this.rollbackTransaction(xid);
526         }
527         catch (Exception JavaDoc e) {}
528         
529         // done in rollbackTransaction(Xid)
530
//this.removeTransaction(xid);
531
}
532     
533     
534     /** Non implemented API method. */
535     public void forget(Xid JavaDoc xid) throws XAException JavaDoc {
536         if (log.isDebugEnabled())
537             log.debug("forget() called on Xid "+xid);
538         
539         //txnif (xaSessions.isEmpty()) { // this is not an XASession
540
if (sessions.isEmpty()) {
541             throw new XAException JavaDoc(XAException.XAER_PROTO);
542         }
543         
544         //throw new XAException("MNTA10240E CAN NOT FORGET A TRANSACTION");
545
this.removeTransaction(xid);
546     }
547     
548     /** Non implemented API method. */
549     public boolean setTransactionTimeout(int seconds) throws XAException JavaDoc {
550         throw new XAException JavaDoc("MNTA10241E CAN NOT SET A TIMEOUT");
551     }
552     
553     /** Non implemented API method. */
554     public int getTransactionTimeout() throws XAException JavaDoc {
555         throw new XAException JavaDoc("MNTA10242E CAN NOT GET A TIMEOUT");
556     }
557     
558     /** from API : find if the resources are the same */
559     public boolean isSameRM(javax.transaction.xa.XAResource JavaDoc xares) throws XAException JavaDoc {
560         return this.equals(xares);
561     }
562     
563     public boolean equals(XAResource JavaDoc xares) {
564         if (xares instanceof TransactionContext) {
565             TransactionContext context = (TransactionContext)xares;
566             // currently the is only one MantaRay instance in an
567
// application server so it should be true always.
568
return context.getResourceName().equals(this.getResourceName());
569         }
570         return false;
571     }
572     
573     
574     // moved from MantaXASession:
575

576     /*
577      * This method is a callback from the XA resource to involve the session
578      * in a given transaction.
579      *
580      */

581     private void getInvolvedIn(Xid JavaDoc xid) throws XAException JavaDoc
582     {
583         //If the given Xid is already listed as involved in a transaction -
584
//Throw an exception.
585
if (transactionsTable.containsKey(xid.toString()))
586             throw new XAException JavaDoc("MNJMS0001B : RESOURCE "+xid+" ALREADY INVOLVED. FAILED ON METHOD getInvolvedIn().");
587         
588         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
589         ByteableByteArray bba;
590         try {
591             ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
592             MantaXid serializableXid = new MantaXid(xid);
593             oos.writeObject(serializableXid);
594             bba = new ByteableByteArray();
595             bba.setPayload(baos.toByteArray());
596             transactionsTable.put(xid.toString(), new MantaXADescriptor(bba));
597         }
598         catch (IOException JavaDoc ioe) {
599             if (log.isErrorEnabled())
600                 log.error("Failed to serialize Xid: "+xid, ioe);
601             throw new XAException JavaDoc("MNJMS0001C : FAILED TO SERIALIZE RESOURCE "+xid+" METHOD getInvolvedIn() FAILED.");
602             
603         }
604     }
605     
606     
607     /*
608      * This method is a callback from the XAResource, asking to remove the
609      * specific Xid from the repository. This should be called when a transaction
610      * has been committed or rollback-ed.
611      */

612     private void removeTransaction(Xid JavaDoc xid)
613     {
614         transactionsTable.remove(xid.toString());
615     }
616     
617     /*
618      * This method is a callback from the XAResource, used to change the
619      * status of a transaction. Available statuses are listed in the
620      * TransactionContext class.
621      *
622      */

623     private void setTransactionStatus(Xid JavaDoc xid, int status) throws XAException JavaDoc
624     {
625         MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
626         
627         if (xaMsgs == null)
628             throw new XAException JavaDoc("MNJMS0001D : RESOURCE "+ xid +
629             " STATUS WILL NOT BE CHANGED. RESOURCE DOES NOT PARTICIPATE IN A TRANSACTION. METHOD setTransactionStatus() FAILED.");
630         
631         //if the resource is valid - its status will be changed.
632
xaMsgs.status = status;
633         //transactionsTable.put(xid.toString(),xaMsgs, true);
634
// the status is saved to the disk only in prepare
635
transactionsTable.put(xid.toString(),xaMsgs);
636     }
637     
638     /*
639      * This method is a callback from the XAResource, to retrieve the status
640      * of the specified transaction.
641      * This is used, e.g. when the resource is going to prepare - the status
642      * for the Xid must be SUCCESS.
643      */

644     private int getTransactionStatus(Xid JavaDoc xid) throws XAException JavaDoc
645     {
646         //Make sure that the xid is really here.
647
//Retrieval is valid only if the resource is listed in this XASession.
648
MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
649         if (xaMsgs == null)
650             throw new XAException JavaDoc("MNJMS0001E : RESOURCE "+ xid +
651             " STATUS WILL NOT BE RETRIEVED. RESOURCE DOES NOT PARTICIPATE IN A TRANSACTION. METHOD getTransactionStatus() FAILED.");
652         
653         //If the resource is active - return its status.
654
return xaMsgs.status;
655     }
656     
657     
658     /*
659      * This method is a callback from the XAResource. It persists the objects
660      * of the session in the repository, but not yet to the file system.
661      *
662      */

663     private void saveTransaction(Xid JavaDoc xid) throws XAException JavaDoc
664     {
665         //first - make sure we get the specified trx.
666
MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
667         
668         //This is only allowed if the Xid is involved.
669
if (xaMsgs == null)
670             throw new XAException JavaDoc("MNJMS0001F : RESOURCE IS NOT TRANSACTED IN "+xid+
671             " METHOD saveTransaction() FAILED.");
672         
673         //now -add the messages held for this resource to the resource's
674
//content. later, it will be possible to retrieve them.
675
// xaMsgs.addHeldMessages(heldMessages);
676
// heldMessages.clear();
677
// synchronized(unackedMessages) {
678
// xaMsgs.addUnackedMessages(unackedMessages);
679
// unackedMessages.clear();
680
// }
681
//txnIterator i = xaSessions.iterator();
682
Iterator JavaDoc i = sessions.iterator();
683         while (i.hasNext()) {
684             MantaSession session = (MantaSession)i.next();
685             session.saveMessages(xaMsgs);
686         }
687         // i don't think we need this
688
//transactionsTable.put(xid.toString(),xaMsgs);
689
}
690     
691 // public void saveTransactionNoPersist(Collection held, Collection unacked) {
692
// log.info("saveTransactionNoPersist()");
693
// if (this.associatedXid == null) {
694
// log.info("saveTransactionNoPersist: associatedXid not set!!");
695
// return;
696
// }
697
//
698
// MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(associatedXid.toString());
699
//
700
// //This is only allowed if the Xid is involved.
701
// if (xaMsgs == null) {
702
// log.info("saveTransactionNoPersist: descriptor doesn't exist for Xid!!");
703
// return;
704
// //throw new XAException("MNJMS0001F : RESOURCE IS NOT TRANSACTED IN "+xid+
705
// //" METHOD saveTransaction() FAILED.");
706
// }
707
//
708
// xaMsgs.addHeldMessages(held);
709
// xaMsgs.addUnackedMessages(unacked);
710
// }
711

712     /*
713      * This method is a callback from the XAresource when it is
714      * reassigned to a specific transaction. i.e. - the TM wants
715      * the Xid to become active again, after suspending it.
716      * This is only used to except() if the resource id is invalid.
717      */

718     private void resumeTransaction(Xid JavaDoc xid) throws XAException JavaDoc
719     {
720         // shai: casting is redundant?
721
//get the content of the transaction this Xid is involved in,
722
MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
723         
724         //This is only allowed if the xid is really in a transaction here.
725
if (xaMsgs == null)
726             throw new XAException JavaDoc("MNJMS00020 : RESOURCE IS NOT TRANSACTED IN "+xid+
727             " METHOD resumeTransaction() FAILED.");
728     }
729     
730     /*
731      * This method is a callback by the XAResource for
732      * preparing the session for sending the corresponding messages and
733      * acknowledgements related to it.
734      * The prepare() call actually persists the objects to file.
735      *
736      */

737     private int prepareTransaction(Xid JavaDoc xid) throws Exception JavaDoc
738     {
739         // get the content for this xid
740
MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
741         
742         //if not found - except().
743
if (xaMsgs == null)
744             throw new XAException JavaDoc("MNJMS00021 : RESOURCE IS NOT TRANSACTED IN "+xid +
745             " METHOD prepareTransaction() FAILED.");
746         
747         int returnValue;
748         
749         //ok. all is good - the content needs to be persisted to the disk.
750
//a failure will delegate to the Resource's prepare and be converted
751
//to a failure code.
752
if (xaMsgs.getHeldMessages().size() > 0 || xaMsgs.getUnackedMessages().size() > 0) {
753             returnValue = XAResource.XA_OK;
754         }
755         else {
756             returnValue = XAResource.XA_RDONLY;
757         }
758         xaMsgs.status = TransactionContext.PREPARED;
759         transactionsTable.put(xid.toString(),xaMsgs, true);
760
761         return returnValue;
762     }
763     
764     
765     /*
766      * This method is a callback from the XAResource when it needs to commit the
767      * transaction.
768      * The commit will send the messages that are stored in the xaMsgs,
769      * and will then remove the xid from the transactions list.
770      *
771      */

772     private void commitTransaction(Xid JavaDoc xid) throws JMSException JavaDoc, XAException JavaDoc
773     {
774         //get the content for this xid
775
MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
776
777         //revert if an incorrect xid was given.
778
if (xaMsgs == null)
779             throw new XAException JavaDoc("MNJMS00022 : RESOURCE IS NOT INVOLVED IN TRANSACTION "+xid+
780             " METHOD commitTransaction() FAILED.");
781         
782         //commit - i.e. - send all messages, on the session.
783
//this is done by the MantaSession's method for sending all the messages.
784
//txnIterator i = xaSessions.iterator();
785
Iterator JavaDoc i = sessions.iterator();
786         while (i.hasNext()) {
787             MantaSession session = (MantaSession)i.next();
788             session.sendAllMessages(xaMsgs.getHeldMessages());
789             session.ackAllMessages(xaMsgs.getUnackedMessages());
790         }
791         
792         this.removeTransaction(xid);
793     }
794
795     
796     /*
797      * This method is a callback by the XAResource when it needs to roll
798      * the transaction back.
799      */

800     private void rollbackTransaction(Xid JavaDoc xid) throws Exception JavaDoc
801     {
802         // get the content for this xid
803
MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(xid.toString());
804         
805         //Check if there's something to rollback at all.
806
if (xaMsgs == null)
807             throw new XAException JavaDoc("MNJMS00023 : RESOURCE IS NOT INVOLVED IN TRANACTION "+xid+
808             " METHOD rollbackTransaction() FAILED.");
809         
810         // redeliver all unacked messages. we do this by takung the
811
// unacked messages of the transaction and insert them
812
// back to the consumer messages list of the session.
813
// this willmake the serversession to send the messages again.
814
// we also mark messages as REDELIVERED!
815
Collection JavaDoc unackedMessages = xaMsgs.getUnackedMessages();
816         if (unackedMessages != null) {
817             Iterator JavaDoc i = unackedMessages.iterator();
818             while (i.hasNext()) {
819                 MantaBusMessage message = (MantaBusMessage)i.next();
820                 message.setDeliveryCount(message.getDeliveryCount()+1);
821             }
822         }
823         
824         Iterator JavaDoc i = sessions.iterator();
825         while (i.hasNext()) {
826             MantaSession session = (MantaSession)i.next();
827             session.addConsumerMessages(unackedMessages);
828         }
829         
830         this.removeTransaction(xid);
831     }
832     
833     
834     /*
835      * Returns an array of the identifiers of the transactions the
836      * session is involved in that are prepared.
837      */

838     private Xid JavaDoc[] getPreparedTransactions()
839     {
840         Iterator JavaDoc keys = transactionsTable.keySet().iterator();
841         ArrayList JavaDoc ids = new ArrayList JavaDoc();
842         while (keys.hasNext()) {
843             String JavaDoc key = (String JavaDoc) keys.next();
844             MantaXADescriptor xaMsgs = (MantaXADescriptor) transactionsTable.get(key);
845             if (xaMsgs.status == TransactionContext.PREPARED) {
846                 try {
847                     ByteableByteArray bba = (ByteableByteArray) xaMsgs.getXidBytes();
848                     ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(new ByteArrayInputStream JavaDoc(bba.getPayload()));
849                     Xid JavaDoc xid = (Xid JavaDoc) ois.readObject();
850                     ids.add(xid);
851                 }
852                 catch(Exception JavaDoc ioe) {
853                     if (log.isErrorEnabled())
854                         log.error("MNTA00024 : FAILED TO CREATE XID.",ioe);
855                 }
856             }
857         }
858         Xid JavaDoc[] array = new Xid JavaDoc[ids.size()];
859         ids.toArray(array);
860         return array;
861     }
862     
863 }
Popular Tags