KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > TxInterceptor


1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.interceptors;
8
9 import org.jboss.cache.CacheException;
10 import org.jboss.cache.CacheSPI;
11 import org.jboss.cache.GlobalTransaction;
12 import org.jboss.cache.InvocationContext;
13 import org.jboss.cache.OptimisticTransactionEntry;
14 import org.jboss.cache.ReplicationException;
15 import org.jboss.cache.TransactionEntry;
16 import org.jboss.cache.config.Configuration;
17 import org.jboss.cache.config.Option;
18 import org.jboss.cache.marshall.MethodCall;
19 import org.jboss.cache.marshall.MethodCallFactory;
20 import org.jboss.cache.marshall.MethodDeclarations;
21 import org.jboss.cache.optimistic.DataVersion;
22
23 import javax.transaction.Status JavaDoc;
24 import javax.transaction.Synchronization JavaDoc;
25 import javax.transaction.SystemException JavaDoc;
26 import javax.transaction.Transaction JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.concurrent.ConcurrentHashMap JavaDoc;
31
32 /**
33  * This interceptor is the new default at the head of all interceptor chains,
34  * and makes transactional attributes available to all interceptors in the chain.
35  * This interceptor is also responsible for registering for synchronisation on
36  * transaction completion.
37  *
38  * @author <a HREF="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
39  * @author <a HREF="mailto:stevew@jofti.com">Steve Woodcock (stevew@jofti.com)</a>
40  */

41 public class TxInterceptor extends BaseTransactionalContextInterceptor implements TxInterceptorMBean
42 {
43    /**
44     * List <Transaction>that we have registered for
45     */

46    private Map JavaDoc transactions = new ConcurrentHashMap JavaDoc(16);
47    private Map JavaDoc rollbackTransactions = new ConcurrentHashMap JavaDoc(16);
48    private long m_prepares = 0;
49    private long m_commits = 0;
50    private long m_rollbacks = 0;
51    final static Object JavaDoc NULL = new Object JavaDoc();
52
53    /**
54     * Set<GlobalTransaction> of GlobalTransactions that originated somewhere else (we didn't create them).
55     * This is a result of a PREPARE phase. GlobalTransactions in this list should be ignored by this
56     * interceptor when registering for TX completion
57     */

58    private Map JavaDoc remoteTransactions = new ConcurrentHashMap JavaDoc();
59
60    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc
61    {
62       if (log.isTraceEnabled())
63       {
64          log.trace("(" + cache.getLocalAddress() + ") call on method [" + m + "]");
65       }
66       // bypass for buddy group org metod calls.
67
if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m);
68
69       InvocationContext ctx = cache.getInvocationContext();
70
71       boolean scrubTxsOnExit = false;
72       Option optionOverride = ctx.getOptionOverrides();
73
74       Object JavaDoc result = null;
75
76       try
77       {
78          // first of all deal with tx methods - these are only going to be
79
// prepare/commit/rollback called by a remote cache, since calling
80
// such methods on CacheImpl directly would fail.
81

82          if (MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
83          {
84             // this is a prepare, commit, or rollback.
85
if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
86
87             if (ctx.getGlobalTransaction().isRemote()) remoteTransactions.put(ctx.getGlobalTransaction(), NULL);
88
89             switch (m.getMethodId())
90             {
91                case MethodDeclarations.optimisticPrepareMethod_id:
92                case MethodDeclarations.prepareMethod_id:
93                   if (ctx.getGlobalTransaction().isRemote())
94                   {
95                      result = handleRemotePrepare(m, ctx.getGlobalTransaction());
96                      scrubTxsOnExit = true;
97                      if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
98                      {
99                         m_prepares++;
100                      }
101                   }
102                   else
103                   {
104                      if (log.isTraceEnabled()) log.trace("received my own message (discarding it)");
105                      result = null;
106                   }
107                   break;
108                case MethodDeclarations.commitMethod_id:
109                case MethodDeclarations.rollbackMethod_id:
110                   if (ctx.getGlobalTransaction().isRemote())
111                   {
112                      result = handleRemoteCommitRollback(m, ctx.getGlobalTransaction());
113                      scrubTxsOnExit = true;
114                   }
115                   else
116                   {
117                      if (log.isTraceEnabled()) log.trace("received my own message (discarding it)");
118                      result = null;
119                   }
120                   break;
121             }
122          }
123          else
124          {
125             // non-transaction lifecycle method.
126
result = handleNonTxMethod(m);
127          }
128       }
129       catch (Exception JavaDoc e)
130       {
131          log.info("There was a problem handling this request", e);
132          if (optionOverride == null || !optionOverride.isFailSilently()) throw e;
133       }
134       finally
135       {
136          // we should scrub txs after every call to prevent race conditions
137
// basically any other call coming in on the same thread and hijacking any running tx's
138
// was highlighted in JBCACHE-606
139

140          if (scrubTxsOnExit)
141          {
142             setTransactionalContext(null, null);
143          }
144       }
145       return result;
146    }
147
148    public long getPrepares()
149    {
150       return m_prepares;
151    }
152
153    public long getCommits()
154    {
155       return m_commits;
156    }
157
158    public long getRollbacks()
159    {
160       return m_rollbacks;
161    }
162
163    public void resetStatistics()
164    {
165       m_prepares = 0;
166       m_commits = 0;
167       m_rollbacks = 0;
168    }
169
170    public Map JavaDoc<String JavaDoc, Object JavaDoc> dumpStatistics()
171    {
172       Map JavaDoc<String JavaDoc, Object JavaDoc> retval = new HashMap JavaDoc<String JavaDoc, Object JavaDoc>(3);
173       retval.put("Prepares", m_prepares);
174       retval.put("Commits", m_commits);
175       retval.put("Rollbacks", m_rollbacks);
176       return retval;
177    }
178
179    private Object JavaDoc handleRemotePrepare(MethodCall m, GlobalTransaction gtx) throws Throwable JavaDoc
180    {
181       List JavaDoc<MethodCall> modifications = (List JavaDoc<MethodCall>) m.getArgs()[1];
182       boolean onePhase = (Boolean JavaDoc) m.getArgs()[configuration.isNodeLockingOptimistic() ? 4 : 3];
183
184       // Is there a local transaction associated with GTX ?
185
Transaction ltx = txTable.getLocalTransaction(gtx);
186
187       Transaction currentTx = txManager.getTransaction();
188       Object JavaDoc retval = null;
189
190       try
191       {
192          if (ltx == null)
193          {
194             if (currentTx != null) txManager.suspend();
195             ltx = createLocalTxForGlobalTx(gtx);// creates new LTX and associates it with a GTX
196
if (log.isDebugEnabled())
197             {
198                log.debug("(" + cache.getLocalAddress() + "): started new local TX as result of remote PREPARE: local TX=" + ltx + ", global TX=" + gtx);
199             }
200          }
201          else
202          {
203             //this should be valid
204
if (!isValid(ltx)) throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
205
206             //associate this thread with this ltx if this ltx is NOT the current tx.
207
if (currentTx == null || !ltx.equals(currentTx))
208             {
209                txManager.suspend();
210                txManager.resume(ltx);
211             }
212          }
213
214
215          if (log.isTraceEnabled()) {log.trace("Resuming existing transaction " + ltx + ", global TX=" + gtx);}
216
217          // at this point we have a non-null ltx
218

219          // Asssociate the local TX with the global TX. Create new
220
// entry for TX in txTable, the modifications
221
// below will need this entry to add their modifications
222
// under the GlobalTx key
223
if (txTable.get(gtx) == null)
224          {
225             // create a new transaction entry
226

227             TransactionEntry entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
228             entry.setTransaction(ltx);
229             log.debug("creating new tx entry");
230             txTable.put(gtx, entry);
231             if (log.isTraceEnabled()) log.trace("TxTable contents: " + txTable);
232          }
233
234          // register a sync handler for this tx.
235
registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, cache));
236
237          if (configuration.isNodeLockingOptimistic())
238          {
239             retval = handleOptimisticPrepare(m, gtx, modifications, onePhase, ltx);
240          }
241          else
242          {
243             retval = handlePessimisticPrepare(m, gtx, modifications, onePhase, ltx);
244          }
245       }
246       finally
247       {
248          txManager.suspend();// suspends ltx - could be null
249
// resume whatever else we had going.
250
if (currentTx != null) txManager.resume(currentTx);
251          if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
252       }
253
254       return retval;
255    }
256
257    // --------------------------------------------------------------
258
// handler methods.
259
// --------------------------------------------------------------
260

261    /**
262     * Tests if we already have a tx running. If so, register a sync handler for this method invocation.
263     * if not, create a local tx if we're using opt locking.
264     *
265     * @param m
266     * @return
267     * @throws Throwable
268     */

269    private Object JavaDoc handleNonTxMethod(MethodCall m) throws Throwable JavaDoc
270    {
271       InvocationContext ctx = cache.getInvocationContext();
272       Transaction tx = ctx.getTransaction();
273       Object JavaDoc result;
274       // if there is no current tx and we're using opt locking, we need to use an implicit tx.
275
boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx == null;
276       if (implicitTransaction)
277       {
278          tx = createLocalTx();
279          // we need to attach this tx to the InvocationContext.
280
ctx.setTransaction(tx);
281       }
282       if (tx != null) m = attachGlobalTransaction(tx, m);
283
284       GlobalTransaction gtx = ctx.getGlobalTransaction();
285
286       try
287       {
288          result = super.invoke(m);
289          if (implicitTransaction)
290          {
291             copyInvocationScopeOptionsToTxScope(ctx);
292             txManager.commit();
293          }
294       }
295       catch (Throwable JavaDoc t)
296       {
297          if (implicitTransaction)
298          {
299             log.warn("Rolling back, exception encountered", t);
300             result = t;
301             try
302             {
303                setTransactionalContext(tx, gtx);
304                txManager.rollback();
305             }
306             catch (Throwable JavaDoc th)
307             {
308                log.warn("Roll back failed encountered", th);
309             }
310          }
311          else
312          {
313             throw t;
314          }
315       }
316       return result;
317    }
318
319    private MethodCall attachGlobalTransaction(Transaction tx, MethodCall m) throws Exception JavaDoc
320    {
321       if (log.isDebugEnabled())
322       {
323          log.debug(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
324       }
325       if (log.isTraceEnabled())
326       {
327          GlobalTransaction tempGtx = txTable.get(tx);
328          log.trace("Associated gtx in txTable is " + tempGtx);
329       }
330
331       // register a sync handler for this tx - only if the gtx is not remotely initiated.
332
GlobalTransaction gtx = registerTransaction(tx);
333       if (gtx != null)
334       {
335          m = replaceGtx(m, gtx);
336       }
337       else
338       {
339          // get the current gtx from the txTable.
340
gtx = txTable.get(tx);
341       }
342
343       // make sure we attach this gtx to the invocation context.
344
cache.getInvocationContext().setGlobalTransaction(gtx);
345
346       return m;
347    }
348
349    /**
350     * This is called by invoke() if we are in a remote gtx's prepare() phase.
351     * Finds the appropriate tx, suspends any existing txs, registers a sync handler
352     * and passes up the chain.
353     * <p/>
354     * Resumes any existing txs before returning.
355     *
356     * @param m
357     * @return
358     * @throws Throwable
359     */

360    private Object JavaDoc handleOptimisticPrepare(MethodCall m, GlobalTransaction gtx, List JavaDoc<MethodCall> modifications, boolean onePhase, Transaction ltx) throws Throwable JavaDoc
361    {
362       Object JavaDoc retval;
363       if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
364       replayModifications(modifications, ltx, true);
365       retval = super.invoke(m);
366       // JBCACHE-361 Confirm that the transaction is ACTIVE
367
if (!isActive(ltx))
368       {
369          throw new ReplicationException("prepare() failed -- " +
370                  "local transaction status is not STATUS_ACTIVE;" +
371                  " is " + ltx.getStatus());
372       }
373       return retval;
374    }
375
376    private Object JavaDoc handlePessimisticPrepare(MethodCall m, GlobalTransaction gtx, List JavaDoc<MethodCall> modifications, boolean commit, Transaction ltx) throws Exception JavaDoc
377    {
378       boolean success = true;
379       Object JavaDoc retval;
380       try
381       {
382          // now pass up the prepare method itself.
383
try
384          {
385             replayModifications(modifications, ltx, false);
386             if (isOnePhaseCommitPrepareMehod(m))
387             {
388                log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
389             }
390             else
391             {
392                super.invoke(m);
393             }
394
395             // JBCACHE-361 Confirm that the transaction is ACTIVE
396
if (!isActive(ltx))
397             {
398                throw new ReplicationException("prepare() failed -- " +
399                        "local transaction status is not STATUS_ACTIVE;" +
400                        " is " + ltx.getStatus());
401             }
402          }
403          catch (Throwable JavaDoc th)
404          {
405             log.error("prepare method invocation failed", th);
406             retval = th;
407             success = false;
408             if (retval instanceof Exception JavaDoc)
409             {
410                throw (Exception JavaDoc) retval;
411             }
412          }
413       }
414       finally
415       {
416
417          if (log.isTraceEnabled()) {log.trace("Are we running a 1-phase commit? " + commit);}
418          // 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this will cause
419
// {before/after}Completion() to be called in all registered interceptors: the TransactionInterceptor
420
// will then commit/rollback against the cache
421

422          if (commit)
423          {
424             try
425             {
426                // invokeOnePhaseCommitMethod(gtx, modifications.size() > 0, success);
427
if (success)
428                {
429                   ltx.commit();
430                }
431                else
432                {
433                   ltx.rollback();
434                }
435             }
436             catch (Throwable JavaDoc t)
437             {
438                log.error("Commit/rollback failed.", t);
439                if (success)
440                {
441                   // try another rollback...
442
try
443                   {
444                      log.info("Attempting anotehr rollback");
445                      //invokeOnePhaseCommitMethod(gtx, modifications.size() > 0, false);
446
ltx.rollback();
447                   }
448                   catch (Throwable JavaDoc t2)
449                   {
450                      log.error("Unable to rollback", t2);
451                   }
452                }
453             }
454             finally
455             {
456                transactions.remove(ltx);// JBAS-298
457
remoteTransactions.remove(gtx);// JBAS-308
458
}
459          }
460       }
461       return null;
462    }
463
464    private Object JavaDoc replayModifications(List JavaDoc<MethodCall> modifications, Transaction tx, boolean injectDataVersions)
465    {
466       Object JavaDoc retval = null;
467
468       if (modifications != null)
469       {
470          for (MethodCall modification : modifications)
471          {
472             try
473             {
474                if (injectDataVersions && !MethodDeclarations.isDataGravitationMethod(modification.getMethodId()))
475                {
476                   Object JavaDoc[] origArgs = modification.getArgs();
477                   // there may be instances (e.g., data gravitation calls) where a data version is not passed in or not even relevant.
478
// make sure we are aware of this.
479
injectDataVersion(origArgs[origArgs.length - 1]);
480                   // modify the call to the non-dataversioned counterpart since we've popped out the data version
481
Object JavaDoc[] args = new Object JavaDoc[origArgs.length - 1];
482                   System.arraycopy(origArgs, 0, args, 0, args.length);
483
484                   retval = super.invoke(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(modification.getMethodId()), args));
485                }
486                else
487                {
488                   retval = super.invoke(modification);
489                }
490                if (!isActive(tx))
491                {
492                   throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
493                }
494             }
495             catch (Throwable JavaDoc t)
496             {
497                log.error("method invocation failed", t);
498                retval = t;
499             }
500             finally
501             {
502                // reset any options
503
if (injectDataVersions) cache.getInvocationContext().setOptionOverrides(null);
504             }
505             if (retval != null && retval instanceof Exception JavaDoc)
506             {
507                throw new RuntimeException JavaDoc((Exception JavaDoc) retval);
508             }
509          }
510       }
511       // need to pass up the prepare as well and return value from that
512
return retval;
513    }
514
515    public void injectDataVersion(Object JavaDoc obj)
516    {
517       if (obj instanceof DataVersion)
518       {
519          Option o = new Option();
520          o.setDataVersion((DataVersion) obj);
521          cache.getInvocationContext().setOptionOverrides(o);
522       }
523       else
524       {
525          log.debug("Object " + obj + " is not a DataVersion, not applying to this mod.");
526       }
527    }
528
529    /**
530     * Handles a commit or a rollback for a remote gtx. Called by invoke().
531     *
532     * @param m
533     * @return
534     * @throws Throwable
535     */

536    private Object JavaDoc handleRemoteCommitRollback(MethodCall m, GlobalTransaction gtx) throws Throwable JavaDoc
537    {
538       Transaction ltx;
539       try
540       {
541          ltx = getLocalTxForGlobalTx(gtx);
542       }
543       catch (IllegalStateException JavaDoc e)
544       {
545          if (m.getMethodId() == MethodDeclarations.rollbackMethod_id)
546          {
547             log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?");
548             return null;
549          }
550          else
551          {
552             throw e;
553          }
554       }
555
556       // disconnect if we have a current tx associated
557
Transaction currentTx = txManager.getTransaction();
558       boolean resumeCurrentTxOnCompletion = false;
559       try
560       {
561          if (!ltx.equals(currentTx))
562          {
563             currentTx = txManager.suspend();
564             resumeCurrentTxOnCompletion = true;
565             txManager.resume(ltx);
566             // make sure we set this in the ctx
567
cache.getInvocationContext().setTransaction(ltx);
568          }
569          if (log.isDebugEnabled()) log.debug(" executing " + m + "() with local TX " + ltx + " under global tx " + gtx);
570
571          // pass commit up the chain
572
// super.invoke(m);
573
// commit or rollback the tx.
574
if (m.getMethodId() == MethodDeclarations.commitMethod_id)
575          {
576             txManager.commit();
577             if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
578             {
579                m_commits++;
580             }
581          }
582          else
583          {
584             txManager.rollback();
585             if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
586             {
587                m_rollbacks++;
588             }
589          }
590       }
591       finally
592       {
593          //resume the old transaction if we suspended it
594
if (resumeCurrentTxOnCompletion)
595          {
596             if (log.isTraceEnabled()) log.trace("Resuming suspended transaction " + currentTx);
597             txManager.suspend();
598             if (currentTx != null)
599             {
600                txManager.resume(currentTx);
601                cache.getInvocationContext().setTransaction(currentTx);
602             }
603          }
604
605          // remove from local lists.
606
remoteTransactions.remove(gtx);
607          transactions.remove(ltx);
608
609          // this tx has completed. Clean up in the tx table.
610
txTable.remove(gtx);
611          txTable.remove(ltx);
612       }
613
614       if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx);
615
616       return null;
617    }
618
619    private Transaction getLocalTxForGlobalTx(GlobalTransaction gtx) throws IllegalStateException JavaDoc
620    {
621       Transaction ltx = txTable.getLocalTransaction(gtx);
622       if (ltx != null)
623       {
624          if (log.isDebugEnabled()) log.debug("Found local TX=" + ltx + ", global TX=" + gtx);
625       }
626       else
627       {
628          throw new IllegalStateException JavaDoc(" found no local TX for global TX " + gtx);
629       }
630       return ltx;
631    }
632
633    /**
634     * Handles a commit or a rollback. Called by the synch handler. Simply tests that we are in the correct tx and
635     * passes the meth call up the interceptor chain.
636     *
637     * @param m
638     * @return
639     * @throws Throwable
640     */

641    private Object JavaDoc handleCommitRollback(MethodCall m) throws Throwable JavaDoc
642    {
643       //GlobalTransaction gtx = findGlobalTransaction(m.getArgs());
644
GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction();
645       Object JavaDoc result;
646
647       // this must have a local transaction associated if a prepare has been
648
// callled before
649
//Transaction ltx = getLocalTxForGlobalTx(gtx);
650

651       // Transaction currentTx = txManager.getTransaction();
652

653       //if (!ltx.equals(currentTx)) throw new IllegalStateException(" local transaction " + ltx + " transaction does not match running tx " + currentTx);
654

655       result = super.invoke(m);
656
657       if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx);
658       return result;
659    }
660
661    // --------------------------------------------------------------
662
// Transaction phase runners
663
// --------------------------------------------------------------
664

665    /**
666     * creates a commit() MethodCall and feeds it to handleCommitRollback();
667     *
668     * @param gtx
669     */

670    protected void runCommitPhase(GlobalTransaction gtx, Transaction tx, List JavaDoc modifications, boolean onePhaseCommit)
671    {
672       // set the hasMods flag in the invocation ctx. This should not be replicated, just used locally by the interceptors.
673
cache.getInvocationContext().setTxHasMods(modifications != null && modifications.size() > 0);
674       try
675       {
676          MethodCall commitMethod;
677          if (onePhaseCommit)
678          {
679             // running a 1-phase commit.
680
if (configuration.isNodeLockingOptimistic())
681             {
682                commitMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod,
683                        gtx, modifications, null, cache.getLocalAddress(), true);
684             }
685             else
686             {
687                commitMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod,
688                        gtx, modifications, cache.getLocalAddress(),
689                        true);
690             }
691          }
692          else
693          {
694             commitMethod = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx);
695          }
696
697          if (log.isTraceEnabled()) {log.trace(" running commit for " + gtx);}
698          handleCommitRollback(commitMethod);
699       }
700       catch (Throwable JavaDoc e)
701       {
702          log.warn("Commit failed. Clearing stale locks.");
703          try
704          {
705             cleanupStaleLocks(gtx);
706          }
707          catch (Throwable JavaDoc e2)
708          {
709             log.error("Unable to clear stale locks", e2);
710             throw new RuntimeException JavaDoc(e2);
711          }
712          throw new RuntimeException JavaDoc("Commit failed.", e);
713       }
714    }
715
716
717    private void cleanupStaleLocks(GlobalTransaction gtx) throws Throwable JavaDoc
718    {
719       TransactionEntry entry = txTable.get(gtx);
720       if (entry != null)
721       {
722          entry.releaseAllLocksLIFO(gtx);
723       }
724    }
725
726    /**
727     * creates a rollback() MethodCall and feeds it to handleCommitRollback();
728     *
729     * @param gtx
730     */

731    protected void runRollbackPhase(GlobalTransaction gtx, Transaction tx, List JavaDoc modifications)
732    {
733       //Transaction ltx = null;
734
try
735       {
736          cache.getInvocationContext().setTxHasMods(modifications != null && modifications.size() > 0);
737          // JBCACHE-457
738
// MethodCall rollbackMethod = MethodCall(CacheImpl.rollbackMethod, new Object[]{gtx, hasMods ? true : false});
739
MethodCall rollbackMethod = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
740          if (log.isTraceEnabled()) {log.trace(" running rollback for " + gtx);}
741
742          //JBCACHE-359 Store a lookup for the gtx so a listener
743
// callback can find it
744
//ltx = getLocalTxForGlobalTx(gtx);
745
rollbackTransactions.put(tx, gtx);
746
747          handleCommitRollback(rollbackMethod);
748       }
749       catch (Throwable JavaDoc e)
750       {
751          log.warn("Rollback had a problem", e);
752       }
753       finally
754       {
755          if (tx != null) rollbackTransactions.remove(tx);
756       }
757    }
758
759    /**
760     * Handles a local prepare - invoked by the sync handler. Tests if the current tx matches the gtx passed in to the
761     * method call and passes the prepare() call up the chain.
762     *
763     * @return
764     * @throws Throwable
765     */

766    protected Object JavaDoc runPreparePhase(GlobalTransaction gtx, List JavaDoc modifications) throws Throwable JavaDoc
767    {
768       // build the method call
769
MethodCall prepareMethod;
770       // if (cache.getCacheModeInternal() != CacheImpl.REPL_ASYNC)
771
// {
772
// running a 2-phase commit.
773
if (configuration.isNodeLockingOptimistic())
774       {
775          prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, modifications, null, cache.getLocalAddress(), false);
776       }
777       else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC)
778       {
779          prepareMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod,
780                  gtx, modifications, cache.getLocalAddress(),
781                  false);// don't commit or rollback - wait for call
782
}
783       //}
784
else
785       {
786          // this is a REPL_ASYNC call - do 1-phase commit. break!
787
log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
788          return null;
789       }
790
791       // passes a prepare call up the local interceptor chain. The replication interceptor
792
// will do the broadcasting if needed. This is so all requests (local/remote) are
793
// treated the same
794
Object JavaDoc result;
795
796       // Is there a local transaction associated with GTX ?
797
Transaction ltx = txTable.getLocalTransaction(gtx);
798
799       //if ltx is not null and it is already running
800
if (txManager.getTransaction() != null && ltx != null && txManager.getTransaction().equals(ltx))
801       {
802          result = super.invoke(prepareMethod);
803       }
804       else
805       {
806          log.warn("Local transaction does not exist or does not match expected transaction " + gtx);
807          throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx);
808       }
809       return result;
810    }
811
812    // --------------------------------------------------------------
813
// Private helper methods
814
// --------------------------------------------------------------
815

816
817    /**
818     * Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
819     *
820     * @param tx
821     * @return
822     * @throws Exception
823     */

824    private GlobalTransaction registerTransaction(Transaction tx) throws Exception JavaDoc
825    {
826       GlobalTransaction gtx;
827       if (isValid(tx) && transactions.put(tx, NULL) == null)
828       {
829          gtx = cache.getCurrentTransaction(tx, true);
830          if (gtx.isRemote())
831          {
832             // should be no need to register a handler since this a remotely initiated gtx
833
if (log.isTraceEnabled()) {log.trace("is a remotely initiated gtx so no need to register a tx for it");}
834          }
835          else
836          {
837             if (log.isTraceEnabled()) {log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);}
838             LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, cache);
839             registerHandler(tx, myHandler);
840          }
841       }
842       else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
843       {
844          if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered and is rolling back.");
845       }
846       else
847       {
848          if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered.");
849
850       }
851       return gtx;
852    }
853
854    /**
855     * Registers a sync hander against a tx.
856     *
857     * @param tx
858     * @param handler
859     * @throws Exception
860     */

861    private void registerHandler(Transaction tx, RemoteSynchronizationHandler handler) throws Exception JavaDoc
862    {
863       OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(tx);
864
865       if (log.isTraceEnabled()) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
866
867       orderedHandler.registerAtHead(handler);// needs to be invoked first on TX commit
868
}
869
870    /**
871     * Replaces the global transaction in a method call with a new global transaction passed in.
872     */

873    private MethodCall replaceGtx(MethodCall m, GlobalTransaction gtx)
874    {
875       Class JavaDoc[] argClasses = m.getMethod().getParameterTypes();
876       Object JavaDoc[] args = m.getArgs();
877
878       for (int i = 0; i < argClasses.length; i++)
879       {
880          if (argClasses[i].equals(GlobalTransaction.class))
881          {
882             if (!gtx.equals(args[i]))
883             {
884                args[i] = gtx;
885                m.setArgs(args);
886             }
887             break;
888          }
889       }
890       return m;
891    }
892
893    /**
894     * Creates and starts a local tx
895     *
896     * @return
897     * @throws Exception
898     */

899    private Transaction createLocalTx() throws Exception JavaDoc
900    {
901       if (log.isTraceEnabled()) {log.trace("Creating transaction for thread " + Thread.currentThread());}
902       Transaction localTx;
903       if (txManager == null) throw new Exception JavaDoc("Failed to create local transaction; TransactionManager is null");
904       txManager.begin();
905       localTx = txManager.getTransaction();
906       return localTx;
907    }
908
909    /**
910     * Creates a new local transaction for a given global transaction.
911     *
912     * @param gtx
913     * @return
914     * @throws Exception
915     */

916    private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx) throws Exception JavaDoc
917    {
918       Transaction localTx = createLocalTx();
919       txTable.put(localTx, gtx);
920       // attach this to the context
921
cache.getInvocationContext().setTransaction(localTx);
922       if (log.isTraceEnabled()) log.trace("Created new tx for gtx " + gtx);
923       return localTx;
924    }
925
926    // ------------------------------------------------------------------------
927
// Synchronization classes
928
// ------------------------------------------------------------------------
929

930    // this controls the whole transaction
931

932    class RemoteSynchronizationHandler implements Synchronization JavaDoc
933    {
934       Transaction tx = null;
935       GlobalTransaction gtx = null;
936       CacheSPI cache = null;
937       List JavaDoc modifications = null;
938       TransactionEntry entry = null;
939
940
941       RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache)
942       {
943          this.gtx = gtx;
944          this.tx = tx;
945          this.cache = cache;
946       }
947
948       public void beforeCompletion()
949       {
950          if (log.isTraceEnabled()) log.trace("Running beforeCompletion on gtx " + gtx);
951          entry = txTable.get(gtx);
952          if (entry == null)
953          {
954             log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
955             log.error("TxTable contents: " + txTable);
956             throw new IllegalStateException JavaDoc("cannot find transaction entry for " + gtx);
957          }
958
959          modifications = entry.getModifications();
960       }
961
962       // this should really not be done here -
963
// it is supposed to be post commit not actually run the commit
964
public void afterCompletion(int status)
965       {
966          try
967          {
968             setTransactionalContext(tx, gtx);
969
970             try
971             {
972                if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx);
973             }
974             catch (Exception JavaDoc e)
975             {
976                e.printStackTrace();
977             }
978
979
980             if (log.isTraceEnabled()) log.trace("calling aftercompletion for " + gtx);
981             // set any transaction wide options as current for this thread.
982
if ((entry = txTable.get(gtx)) != null)
983             {
984                modifications = entry.getModifications();
985                cache.getInvocationContext().setOptionOverrides(entry.getOption());
986             }
987             transactions.remove(tx);
988
989             switch (status)
990             {
991                case Status.STATUS_COMMITTED:
992
993                   // if this is optimistic or sync repl
994
boolean onePhaseCommit = !configuration.isNodeLockingOptimistic() && configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
995                   if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
996                   runCommitPhase(gtx, tx, modifications, onePhaseCommit);
997                   log.debug("Finished commit phase");
998                   break;
999
1000               case Status.STATUS_MARKED_ROLLBACK:
1001               case Status.STATUS_ROLLEDBACK:
1002                  log.debug("Running rollback phase");
1003                  runRollbackPhase(gtx, tx, modifications);
1004                  log.debug("Finished rollback phase");
1005                  break;
1006
1007               default:
1008                  throw new IllegalStateException JavaDoc("illegal status: " + status);
1009            }
1010         }
1011         finally
1012         {
1013            // clean up the tx table
1014
txTable.remove(gtx);
1015            txTable.remove(tx);
1016            setTransactionalContext(null, null);
1017         }
1018      }
1019
1020      public String JavaDoc toString()
1021      {
1022         return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + tx + ")";
1023      }
1024   }
1025
1026   class LocalSynchronizationHandler extends RemoteSynchronizationHandler
1027   {
1028      private boolean localRollbackOnly = true;
1029
1030      LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache)
1031      {
1032         super(gtx, tx, cache);
1033      }
1034
1035      public void beforeCompletion()
1036      {
1037         super.beforeCompletion();
1038         // fetch the modifications before the transaction is committed
1039
// (and thus removed from the txTable)
1040
setTransactionalContext(tx, gtx);
1041         if (modifications.size() == 0)
1042         {
1043            if (log.isTraceEnabled()) log.trace("No modifications in this tx. Skipping beforeCompletion()");
1044            return;
1045         }
1046
1047         // set any transaction wide options as current for this thread.
1048
cache.getInvocationContext().setOptionOverrides(entry.getOption());
1049
1050         try
1051         {
1052            switch (tx.getStatus())
1053            {
1054               // if we are active or preparing then we can go ahead
1055
case Status.STATUS_ACTIVE:
1056               case Status.STATUS_PREPARING:
1057                  // run a prepare call.
1058
Object JavaDoc result = runPreparePhase(gtx, modifications);
1059
1060                  if (result instanceof Throwable JavaDoc)
1061                  {
1062                     tx.setRollbackOnly();
1063                     throw (Throwable JavaDoc) result;
1064                  }
1065                  break;
1066               default:
1067                  throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unbale to start transaction");
1068            }
1069         }
1070         catch (Throwable JavaDoc t)
1071         {
1072            try
1073            {
1074               tx.setRollbackOnly();
1075            }
1076            catch (SystemException JavaDoc se)
1077            {
1078               throw new RuntimeException JavaDoc("setting tx rollback failed ", se);
1079            }
1080            throw new RuntimeException JavaDoc("", t);
1081         }
1082         finally
1083         {
1084            localRollbackOnly = false;
1085            setTransactionalContext(null, null);
1086         }
1087      }
1088
1089      public void afterCompletion(int status)
1090      {
1091         cache.getInvocationContext().setLocalRollbackOnly(localRollbackOnly);
1092         super.afterCompletion(status);
1093      }
1094
1095      public String JavaDoc toString()
1096      {
1097         return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + tx + ")";
1098      }
1099   }
1100}
Popular Tags