KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > OptimisticReplicationInterceptor


1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.interceptors;
8
9 import org.jboss.cache.CacheException;
10 import org.jboss.cache.Fqn;
11 import org.jboss.cache.GlobalTransaction;
12 import org.jboss.cache.InvocationContext;
13 import org.jboss.cache.OptimisticTransactionEntry;
14 import org.jboss.cache.config.Configuration;
15 import org.jboss.cache.config.Option;
16 import org.jboss.cache.marshall.MethodCall;
17 import org.jboss.cache.marshall.MethodCallFactory;
18 import org.jboss.cache.marshall.MethodDeclarations;
19 import org.jboss.cache.optimistic.DataVersion;
20 import org.jboss.cache.optimistic.DefaultDataVersion;
21 import org.jboss.cache.optimistic.TransactionWorkspace;
22 import org.jboss.cache.optimistic.WorkspaceNode;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28 import java.util.concurrent.ConcurrentHashMap JavaDoc;
29
30 /**
31  * Replication interceptor for the optimistically locked interceptor chain
32  *
33  * @author <a HREF="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
34  * @author <a HREF="mailto:stevew@jofti.com">Steve Woodcock (stevew@jofti.com)</a>
35  */

36 public class OptimisticReplicationInterceptor extends BaseRpcInterceptor
37 {
38
39    //record of local broacasts - so we do not broadcast rollbacks/commits that resuted from
40
// local prepare failures
41
private Map JavaDoc broadcastTxs = new ConcurrentHashMap JavaDoc();
42
43    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc
44    {
45       // bypass for buddy group org metod calls.
46
if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m);
47
48       InvocationContext ctx = cache.getInvocationContext();
49       Option optionOverride = ctx.getOptionOverrides();
50       if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
51       {
52          // skip replication!!
53
return super.invoke(m);
54       }
55
56       Object JavaDoc retval;
57
58       //we need a transaction to be present in order to do this
59
if (ctx.getTransaction() != null)
60       {
61
62          // get the current gtx
63
GlobalTransaction gtx = ctx.getGlobalTransaction();
64          if (gtx == null)
65          {
66             throw new CacheException("failed to get global transaction");
67          }
68          log.debug(" received method " + m);
69
70          // on a local prepare we first run the prepare -
71
//if this works broadcast it
72

73          switch (m.getMethodId())
74          {
75             case MethodDeclarations.optimisticPrepareMethod_id:
76                // pass up the chain.
77
retval = super.invoke(m);
78
79                if (!gtx.isRemote())
80                {
81                   // replicate the prepare call.
82
retval = broadcastPrepare(m, gtx);
83                   //if we have an exception then the remote methods failed
84
if (retval instanceof Throwable JavaDoc)
85                   {
86                      throw (Throwable JavaDoc) retval;
87                   }
88                }
89                break;
90             case MethodDeclarations.commitMethod_id:
91                //lets broadcast the commit first
92
Throwable JavaDoc temp = null;
93                if (!gtx.isRemote() && broadcastTxs.containsKey(gtx))
94                {
95                   //we dont do anything
96
try
97                   {
98                      broadcastCommit(gtx);
99                   }
100                   catch (Throwable JavaDoc t)
101                   {
102                      log.error(" a problem occurred with remote commit", t);
103                      temp = t;
104                   }
105                }
106
107                retval = super.invoke(m);
108                if (temp != null)
109                {
110                   throw temp;
111                }
112                break;
113             case MethodDeclarations.rollbackMethod_id:
114                // lets broadcast the rollback first
115
Throwable JavaDoc temp2 = null;
116                if (!gtx.isRemote() && broadcastTxs.containsKey(gtx))
117                {
118                   //we dont do anything
119
try
120                   {
121                      broadcastRollback(gtx);
122                   }
123                   catch (Throwable JavaDoc t)
124                   {
125                      log.error(" a problem occurred with remote rollback", t);
126                      temp2 = t;
127                   }
128
129                }
130                retval = super.invoke(m);
131                if (temp2 != null)
132                {
133                   throw temp2;
134                }
135                break;
136             default:
137                //it is something we do not care about
138
log.debug(" received method " + m + " not handling");
139                retval = super.invoke(m);
140                break;
141          }
142       }
143       else
144       {
145          throw new CacheException("transaction does not exist");
146       }
147       return retval;
148    }
149
150    protected Object JavaDoc broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable JavaDoc
151    {
152       boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
153
154       Object JavaDoc[] args = methodCall.getArgs();
155       List JavaDoc modifications = (List JavaDoc) args[1];
156       int num_mods = modifications != null ? modifications.size() : 0;
157
158       // this method will return immediately if we're the only member (because
159
// exclude_self=true)
160

161       if (cache.getMembers() != null && cache.getMembers().size() > 1)
162       {
163
164          // See JBCACHE-843 and docs/design/DataVersioning.txt
165
MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
166
167          //record the things we have possibly sent
168
broadcastTxs.put(gtx, gtx);
169          if (log.isDebugEnabled())
170          {
171             log.debug("(" + cache.getLocalAddress()
172                     + "): broadcasting prepare for " + gtx
173                     + " (" + num_mods + " modifications");
174          }
175
176          replicateCall(toBroadcast, remoteCallSync);
177       }
178       else
179       {
180          //no members, ignoring
181
if (log.isDebugEnabled())
182          {
183             log.debug("(" + cache.getLocalAddress()
184                     + "):not broadcasting prepare as members are " + cache.getMembers());
185          }
186       }
187       return null;
188    }
189
190
191    protected void broadcastCommit(GlobalTransaction gtx) throws Throwable JavaDoc
192    {
193       boolean remoteCallSync = configuration.isSyncCommitPhase();
194
195       // 1. Multicast commit() to all members (exclude myself though)
196
if (cache.getMembers() != null && cache.getMembers().size() > 1)
197       {
198          try
199          {
200             broadcastTxs.remove(gtx);
201             MethodCall commit_method = MethodCallFactory.create(MethodDeclarations.commitMethod,
202                     gtx);
203
204             log.debug("running remote commit for " + gtx
205                     + " and coord=" + cache.getLocalAddress());
206
207             replicateCall(commit_method, remoteCallSync);
208          }
209          catch (Exception JavaDoc e)
210          {
211             log.fatal("commit failed", e);
212             throw e;
213          }
214       }
215       else
216       {
217          // ignoring
218
}
219    }
220
221    protected void broadcastRollback(GlobalTransaction gtx) throws Throwable JavaDoc
222    {
223       boolean remoteCallSync = configuration.isSyncRollbackPhase();
224
225       if (cache.getMembers() != null && cache.getMembers().size() > 1)
226       {
227          // 1. Multicast rollback() to all other members (excluding myself)
228
try
229          {
230             broadcastTxs.remove(gtx);
231             MethodCall rollback_method = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
232
233             log.debug("running remote rollback for " + gtx
234                     + " and coord=" + cache.getLocalAddress());
235             replicateCall(rollback_method, remoteCallSync);
236
237          }
238          catch (Exception JavaDoc e)
239          {
240             log.error("rollback failed", e);
241             throw e;
242          }
243       }
244    }
245
246    private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w)
247    {
248       Object JavaDoc[] origArgs = m.getArgs();
249       return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List JavaDoc) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
250    }
251
252    /**
253     * Translates a list of MethodCalls from non-versioned calls to versioned calls.
254     */

255    private List JavaDoc translate(List JavaDoc l, TransactionWorkspace w)
256    {
257       List JavaDoc newList = new ArrayList JavaDoc();
258       Iterator JavaDoc origCalls = l.iterator();
259       while (origCalls.hasNext())
260       {
261          MethodCall origCall = (MethodCall) origCalls.next();
262          if (MethodDeclarations.isDataGravitationMethod(origCall.getMethodId()))
263          {
264             // no need to translate data gravitation calls.
265
newList.add(origCall);
266          }
267          else
268          {
269             Object JavaDoc[] origArgs = origCall.getArgs();
270             // get the data version associated with this orig call.
271

272             // since these are all crud methods the Fqn is at arg subscript 1.
273
Fqn fqn = (Fqn) origArgs[1];
274             // now get a hold of the data version for this specific modification
275
DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn);
276
277             // build up the new arguments list for the new call. Identical to the original lis except that it has the
278
// data version tacked on to the end.
279
Object JavaDoc[] newArgs = new Object JavaDoc[origArgs.length + 1];
280             for (int i = 0; i < origArgs.length; i++) newArgs[i] = origArgs[i];
281             newArgs[origArgs.length] = versionToBroadcast;
282
283             // now create a new method call which contains this data version
284
MethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs);
285
286             // and add it to the new list.
287
newList.add(newCall);
288          }
289       }
290       return newList;
291    }
292
293    /**
294     * Digs out the DataVersion for a given Fqn. If the versioning is explicit, it is passed as-is. If implicit, it is
295     * cloned and then incremented, and the clone is returned.
296     */

297    private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f)
298    {
299       WorkspaceNode n = w.getNode(f);
300       if (n == null)
301       {
302          if (log.isTraceEnabled()) log.trace("Fqn " + f + " not found in workspace; not using a data version.");
303          return null;
304       }
305       if (n.isVersioningImplicit())
306       {
307          DefaultDataVersion v = (DefaultDataVersion) n.getVersion();
308          if (log.isTraceEnabled())
309             log.trace("Fqn " + f + " has implicit versioning. Broadcasting an incremented version.");
310          return v.increment();
311       }
312       else
313       {
314          if (log.isTraceEnabled()) log.trace("Fqn " + f + " has explicit versioning. Broadcasting the version as-is.");
315          return n.getVersion();
316       }
317    }
318
319    protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
320    {
321       OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx);
322
323       if (transactionEntry == null)
324       {
325          throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
326       }
327
328       // try and get the workspace from the transaction
329
return transactionEntry.getTransactionWorkSpace();
330    }
331
332 }
Popular Tags