KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > ReplicationInterceptor


1 package org.jboss.cache.interceptors;
2
3 import org.jboss.cache.GlobalTransaction;
4 import org.jboss.cache.InvocationContext;
5 import org.jboss.cache.config.Configuration;
6 import org.jboss.cache.config.Option;
7 import org.jboss.cache.marshall.MethodCall;
8 import org.jboss.cache.marshall.MethodDeclarations;
9
10 /**
11  * Takes care of replicating modifications to other nodes in a cluster. Also
12  * listens for prepare(), commit() and rollback() messages which are received
13  * 'side-ways' (see docs/design/Refactoring.txt).
14  *
15  * @author Bela Ban
16  * @version $Id: ReplicationInterceptor.java,v 1.41 2006/11/10 20:32:51 msurtani Exp $
17  */

18 public class ReplicationInterceptor extends BaseRpcInterceptor
19 {
20
21    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc
22    {
23       InvocationContext ctx = cache.getInvocationContext();
24       GlobalTransaction gtx = ctx.getGlobalTransaction();
25
26       // bypass for buddy group org metod calls.
27
if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m);
28
29       boolean isLocalCommitOrRollback = gtx != null && !gtx.isRemote() && (m.getMethodId() == MethodDeclarations.commitMethod_id || m.getMethodId() == MethodDeclarations.rollbackMethod_id);
30
31       if (log.isTraceEnabled()) log.trace("isLocalCommitOrRollback? " + isLocalCommitOrRollback + "; gtx = " + gtx);
32
33       // pass up the chain if not a local commit or rollback (in which case replicate first)
34
Object JavaDoc o = isLocalCommitOrRollback ? null : super.invoke(m);
35 // ctx = cache.getInvocationContext();
36

37       Option optionOverride = ctx.getOptionOverrides();
38
39       if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
40       {
41          log.trace("skip replication");
42          return isLocalCommitOrRollback ? super.invoke(m) : o;
43       }
44
45       // could be potentially TRANSACTIONAL. If so, we register for transaction completion callbacks (if we
46
// have not yet done so
47
if (ctx.getTransaction() != null)
48       {
49          if (gtx != null && !gtx.isRemote())
50          {
51             // lets see what sort of method we've got.
52
switch (m.getMethodId())
53             {
54                case MethodDeclarations.commitMethod_id:
55                   // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
56
if (containsModifications(m)) replicateCall(m, configuration.isSyncCommitPhase());
57                   // now pass up the chain
58
o = super.invoke(m);
59                   break;
60                case MethodDeclarations.prepareMethod_id:
61                   if (containsModifications(m))
62                   {
63                      // this is a prepare method
64
runPreparePhase(m, gtx);
65                   }
66                   break;
67                case MethodDeclarations.rollbackMethod_id:
68                   // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
69
if (containsModifications(m) && !ctx.isLocalRollbackOnly())
70                   {
71                      replicateCall(m, configuration.isSyncRollbackPhase());
72                   }
73                   // now pass up the chain
74
o = super.invoke(m);
75                   break;
76             }
77          }
78       }
79       else if (MethodDeclarations.isCrudMethod(m.getMethodId()))
80       {
81          // NON-TRANSACTIONAL and CRUD method
82
if (log.isTraceEnabled()) log.trace("Non-tx crud meth");
83          if (ctx.isOriginLocal())
84          {
85             // don't re-broadcast if we've received this from anotehr cache in the cluster.
86
handleReplicatedMethod(m, configuration.getCacheMode());
87          }
88       }
89       else
90       {
91          if (log.isTraceEnabled()) log.trace("Non-tx and non crud meth");
92       }
93
94       return o;
95    }
96
97    void handleReplicatedMethod(MethodCall m, Configuration.CacheMode mode) throws Throwable JavaDoc
98    {
99       if (log.isTraceEnabled())
100       {
101          log.trace("invoking method " + m + ", members=" + cache.getMembers() + ", mode=" +
102                  configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
103                  configuration.getSyncReplTimeout());
104       }
105       switch (mode)
106       {
107          case REPL_ASYNC:
108             // 2. Replicate change to all *other* members (exclude self !)
109
replicateCall(m, false);
110             break;
111          case REPL_SYNC:
112             // REVISIT Needs to exclude itself and apply the local change manually.
113
// This is needed such that transient field is modified properly in-VM.
114
replicateCall(m, true);
115             break;
116       }
117    }
118
119    /**
120     * Calls prepare(GlobalTransaction,List,org.jgroups.Address,boolean)) in all members except self.
121     * Waits for all responses. If one of the members failed to prepare, its return value
122     * will be an exception. If there is one exception we rethrow it. This will mark the
123     * current transaction as rolled back, which will cause the
124     * afterCompletion(int) callback to have a status
125     * of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the
126     * transaction.<br/>
127     * If everything runs okay, the afterCompletion(int)
128     * callback will trigger the @link #runCommitPhase(GlobalTransaction)).
129     * <br/>
130     *
131     * @throws Exception
132     */

133    protected void runPreparePhase(MethodCall prepareMethod, GlobalTransaction gtx) throws Throwable JavaDoc
134    {
135       boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
136       if (log.isTraceEnabled())
137       {
138          log.trace("(" + cache.getLocalAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
139       }
140
141       // this method will return immediately if we're the only member (because exclude_self=true)
142
replicateCall(prepareMethod, !async);
143    }
144 }
145
Popular Tags