KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > DataGravitatorInterceptor


1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.interceptors;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.jboss.cache.CacheException;
12 import org.jboss.cache.CacheSPI;
13 import org.jboss.cache.Fqn;
14 import org.jboss.cache.GlobalTransaction;
15 import org.jboss.cache.InvocationContext;
16 import org.jboss.cache.Node;
17 import org.jboss.cache.NodeSPI;
18 import org.jboss.cache.TransactionEntry;
19 import org.jboss.cache.buddyreplication.BuddyManager;
20 import org.jboss.cache.buddyreplication.GravitateResult;
21 import org.jboss.cache.config.Configuration;
22 import org.jboss.cache.loader.NodeData;
23 import org.jboss.cache.marshall.MethodCall;
24 import org.jboss.cache.marshall.MethodCallFactory;
25 import org.jboss.cache.marshall.MethodDeclarations;
26 import org.jboss.util.stream.MarshalledValueInputStream;
27 import org.jgroups.Address;
28 import org.jgroups.blocks.GroupRequest;
29
30 import java.io.ByteArrayInputStream JavaDoc;
31 import java.util.ArrayList JavaDoc;
32 import java.util.Collection JavaDoc;
33 import java.util.Collections JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.List JavaDoc;
36 import java.util.Map JavaDoc;
37 import java.util.concurrent.ConcurrentHashMap JavaDoc;
38
39 /**
40  * The Data Gravitator interceptor intercepts cache misses and attempts t
41  * gravitate data from other parts of the cluster.
42  * <p/>
43  * Only used if Buddy Replication is enabled. Also, the interceptor only kicks
44  * in if an {@link org.jboss.cache.config.Option} is passed in to force Data
45  * Gravitation for a specific invocation or if <b>autoDataGravitation</b> is
46  * set to <b>true</b> when configuring Buddy Replication.
47  * <p/>
48  * See the JBoss Cache User Guide for more details on configuration options.
49  * There is a section dedicated to Buddy Replication in the Replication
50  * chapter.
51  *
52  * @author <a HREF="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
53  */

54 public class DataGravitatorInterceptor extends BaseRpcInterceptor
55 {
56    private BuddyManager buddyManager;
57    private boolean syncCommunications = false;
58    private Log log = LogFactory.getLog(DataGravitatorInterceptor.class);
59    private Map JavaDoc transactionMods = new ConcurrentHashMap JavaDoc();
60
61    public void setCache(CacheSPI cache)
62    {
63       super.setCache(cache);
64       this.buddyManager = cache.getBuddyManager();
65       syncCommunications = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC;
66    }
67
68    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc
69    {
70       // if (isGravitationEnabled(cache.getInvocationContext()))
71
// {
72
// Option opt = cache.getInvocationContext().getOptionOverrides();
73
// if (opt == null || !opt.isSuppressDataGravitation())
74
// {
75
if (log.isTraceEnabled()) log.trace("Invoked with method call " + m);
76
77       // Transactional lifecycle methods should be handled regardless of whether data gravitation is enabled or not.
78
if (!MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
79       {
80          if (isGravitationEnabled(cache.getInvocationContext()))
81          {
82             // test that the Fqn being requested exists locally in the cache.
83
Fqn fqn = extractFqn(m.getMethodId(), m.getArgs());
84             if (log.isTraceEnabled()) log.trace("Checking local existence of fqn " + fqn);
85             if (BuddyManager.isBackupFqn(fqn))
86             {
87                log.info("Is call for a backup Fqn, not performing any gravitation. Direct calls on internal backup nodes are *not* supported.");
88             }
89             else
90             {
91                if (!cache.getRoot().hasChild(fqn))
92                {
93                   BackupData data;
94
95                   // perform a data gravitation
96
if (localBackupExists(fqn))
97                   {
98                      log.trace("Gravitating from local backup tree");
99                      data = localBackupGet(fqn);
100                   }
101                   else
102                   {
103                      log.trace("Gravitating from remote backup tree");
104                      // gravitate remotely.
105
data = remoteBackupGet(fqn);
106                   }
107
108                   if (data != null)
109                   {
110                      // create node locally so I don't gravitate again
111
// when I do the put() call to the cluster!
112
createNode(true, data.backupData);
113                      // Make sure I replicate to my buddies.
114
log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
115                      createNode(false, data.backupData);
116
117                      // Clean up the other nodes
118
cleanBackupData(data);
119                   }
120                }
121             }
122          }
123          else
124          {
125             if (log.isTraceEnabled())
126             {
127                log.trace("Suppressing data gravitation for this call.");
128             }
129          }
130       }
131       else
132       {
133
134          try
135          {
136             switch (m.getMethodId())
137             {
138                case MethodDeclarations.prepareMethod_id:
139                case MethodDeclarations.optimisticPrepareMethod_id:
140                   Object JavaDoc o = super.invoke(m);
141                   doPrepare(cache.getInvocationContext().getGlobalTransaction());
142                   return o;
143                case MethodDeclarations.rollbackMethod_id:
144                   transactionMods.remove(cache.getInvocationContext().getGlobalTransaction());
145                   return super.invoke(m);
146                case MethodDeclarations.commitMethod_id:
147                   doCommit(cache.getInvocationContext().getGlobalTransaction());
148                   transactionMods.remove(cache.getInvocationContext().getGlobalTransaction());
149                   return super.invoke(m);
150             }
151          }
152          catch (Throwable JavaDoc throwable)
153          {
154             transactionMods.remove(cache.getInvocationContext().getGlobalTransaction());
155             throw throwable;
156          }
157       }
158       // }
159
// }
160
// else
161
// {
162
// if (log.isTraceEnabled())
163
// log.trace("Suppressing data gravitation for this call.");
164
// }
165
return super.invoke(m);
166    }
167
168    private boolean isGravitationEnabled(InvocationContext ctx)
169    {
170       boolean enabled = ctx.isOriginLocal();
171       if (enabled)
172       {
173          if (!buddyManager.isAutoDataGravitation())
174          {
175             enabled = ctx.getOptionOverrides().getForceDataGravitation();
176          }
177       }
178       return enabled;
179    }
180
181    private void doPrepare(GlobalTransaction gtx) throws Throwable JavaDoc
182    {
183       MethodCall cleanup = (MethodCall) transactionMods.get(gtx);
184       if (log.isTraceEnabled()) log.trace("Broadcasting prepare for cleanup ops " + cleanup);
185       if (cleanup != null)
186       {
187          MethodCall prepare;
188          List JavaDoc mods = new ArrayList JavaDoc(1);
189          mods.add(cleanup);
190          if (configuration.isNodeLockingOptimistic())
191          {
192             prepare = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, mods, null, cache.getLocalAddress(), false);
193          }
194          else
195          {
196             prepare = MethodCallFactory.create(MethodDeclarations.prepareMethod, gtx, mods, cache.getLocalAddress(), syncCommunications);
197          }
198
199          replicateCall(getMembersOutsideBuddyGroup(), prepare, syncCommunications);
200       }
201       else
202       {
203          if (log.isTraceEnabled()) log.trace("Nothing to broadcast in prepare phase for gtx " + gtx);
204       }
205    }
206
207    private void doCommit(GlobalTransaction gtx) throws Throwable JavaDoc
208    {
209       if (transactionMods.containsKey(gtx))
210       {
211          if (log.isTraceEnabled()) log.trace("Broadcasting commit for gtx " + gtx);
212          replicateCall(getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod, gtx), syncCommunications);
213       }
214       else
215       {
216          if (log.isTraceEnabled()) log.trace("Nothing to broadcast in commit phase for gtx " + gtx);
217       }
218    }
219
220    private List JavaDoc<Address> getMembersOutsideBuddyGroup()
221    {
222       List JavaDoc<Address> members = new ArrayList JavaDoc<Address>(cache.getMembers());
223       members.remove(cache.getLocalAddress());
224       members.removeAll(buddyManager.getBuddyAddresses());
225       return members;
226    }
227
228    private BackupData remoteBackupGet(Fqn name) throws Exception JavaDoc
229    {
230
231       BackupData result = null;
232
233       Object JavaDoc[] resp = gravitateData(name);
234
235       if (resp[0] != null)
236       {
237          if (log.isTraceEnabled())
238          {
239             log.trace("Got response " + resp[0]);
240          }
241
242          List JavaDoc nodes = null;
243          if (configuration.isUseRegionBasedMarshalling())
244          {
245             ClassLoader JavaDoc cl = Thread.currentThread().getContextClassLoader();
246             try
247             {
248                cache.getRegionManager().setContextClassLoaderAsCurrent(name);
249
250                byte[] nodeData = (byte[]) resp[0];
251                ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(nodeData);
252                MarshalledValueInputStream mais = new MarshalledValueInputStream(bais);
253                nodes = (List JavaDoc) mais.readObject();
254                mais.close();
255             }
256             finally
257             {
258                Thread.currentThread().setContextClassLoader(cl);
259             }
260          }
261          else
262          {
263             nodes = (List JavaDoc) resp[0];
264          }
265
266          Fqn bkup = (Fqn) resp[1];
267          result = new BackupData(name, bkup, nodes);
268       }
269
270       return result;
271    }
272
273    private void cleanBackupData(BackupData backup) throws Throwable JavaDoc
274    {
275       // MethodCall primaryDataCleanup, backupDataCleanup;
276
// if (buddyManager.isDataGravitationRemoveOnFind())
277
// {
278
// primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, new Object[]{null, backup.primaryFqn, Boolean.FALSE});
279
// backupDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, new Object[]{null, backup.backupFqn, Boolean.FALSE});
280
// }
281
// else
282
// {
283
// primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, new Object[]{backup.primaryFqn});
284
// backupDataCleanup = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, new Object[]{backup.backupFqn});
285
// }
286

287       MethodCall cleanup = MethodCallFactory.create(MethodDeclarations.dataGravitationCleanupMethod, cache.getInvocationContext().getGlobalTransaction(), backup.primaryFqn, backup.backupFqn);
288
289
290       if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
291       GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction();
292       if (gtx == null)
293       {
294          // broadcast removes
295
// remove main Fqn
296
//replicateCall(cache.getMembers(), primaryDataCleanup, syncCommunications);
297

298          if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.backupFqn + "]");
299          // remove backup Fqn
300
//replicateCall(cache.getMembers(), backupDataCleanup, syncCommunications);
301
replicateCall(cache.getMembers(), cleanup, syncCommunications);
302       }
303       else
304       {
305          if (log.isTraceEnabled())
306          {
307             log.trace("Data gravitation performed under global transaction " + gtx + ". Not broadcasting cleanups until the tx commits. Adding to tx mod list instead.");
308          }
309          transactionMods.put(gtx, cleanup);
310          TransactionEntry te = getTransactionEntry(gtx);
311          te.addModification(cleanup);
312       }
313    }
314
315    private Object JavaDoc[] gravitateData(Fqn fqn) throws Exception JavaDoc
316    {
317       if (log.isTraceEnabled())
318       {
319          log.trace("cache=" + cache.getLocalAddress() + "; requesting data gravitation for Fqn " + fqn);
320       }
321       List JavaDoc<Address> mbrs = cache.getMembers();
322       Boolean JavaDoc searchSubtrees = (buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE);
323       Boolean JavaDoc marshal = configuration.isUseRegionBasedMarshalling() ? Boolean.TRUE : Boolean.FALSE;
324       MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees, marshal);
325       List JavaDoc resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_FIRST, true, buddyManager.getBuddyCommunicationTimeout());
326       if (resps == null)
327       {
328          log.error("No replies to call " + dGrav + ". Perhaps we're alone in the cluster?");
329          return new Object JavaDoc[]{null, null};
330       }
331       else
332       {
333          // test for and remove exceptions
334
Iterator JavaDoc i = resps.iterator();
335          Object JavaDoc result = null;
336          Object JavaDoc backupFqn = null;
337
338          while (i.hasNext())
339          {
340             Object JavaDoc o = i.next();
341             if (o instanceof Throwable JavaDoc)
342             {
343                if (log.isDebugEnabled())
344                {
345                   log.debug("Found remote Throwable among responses - removing from responses list", (Exception JavaDoc) o);
346                }
347             }
348             else if (o != null)
349             {
350                // keep looping till we find a FOUND answer.
351
List JavaDoc dGravResp = (List JavaDoc) o;
352                // found?
353
if ((Boolean JavaDoc) dGravResp.get(0))
354                {
355                   result = dGravResp.get(1);
356                   backupFqn = dGravResp.get(2);
357                   break;
358                }
359             }
360             else if (!configuration.isUseRegionBasedMarshalling())
361             {
362                // Null is OK if we are using region based marshalling; it
363
// is what is returned if a region is inactive. Otherwise
364
// getting a null is an error condition
365
log.error("Unexpected null response to call " + dGrav + ".");
366             }
367
368          }
369
370          if (log.isTraceEnabled()) log.trace("got responses " + resps);
371          return new Object JavaDoc[]{result, backupFqn};
372       }
373    }
374
375    private void createNode(boolean localOnly, List JavaDoc nodeData) throws CacheException
376    {
377       Iterator JavaDoc nodes = nodeData.iterator();
378
379       while (nodes.hasNext())
380       {
381          NodeData data = (NodeData) nodes.next();
382          if (localOnly)
383          {
384             if (!cache.getRoot().hasChild(data.getFqn()))
385             {
386                createNodes(data.getFqn(), data.getAttributes());
387             }
388          }
389          else
390          {
391             cache.put(data.getFqn(), data.getAttributes());
392          }
393       }
394    }
395
396    private void createNodes(Fqn fqn, Map JavaDoc data) throws CacheException
397    {
398       int treeNodeSize;
399       if ((treeNodeSize = fqn.size()) == 0) return;
400       NodeSPI n = cache.getRoot();
401       for (int i = 0; i < treeNodeSize; i++)
402       {
403          Object JavaDoc child_name = fqn.get(i);
404          NodeSPI child_node = n.addChildDirect(new Fqn(child_name));
405          if (child_node == null)
406          {
407             if (log.isTraceEnabled())
408             {
409                log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
410             }
411             return;
412          }
413          if (i == treeNodeSize - 1)
414          {
415             // set data
416
cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
417             cache.put(fqn, data);
418             cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(false);
419          }
420          n = child_node;
421       }
422    }
423
424    private TransactionEntry getTransactionEntry(GlobalTransaction gtx)
425    {
426       return cache.getTransactionTable().get(gtx);
427    }
428
429    private Fqn extractFqn(int methodId, Object JavaDoc[] args)
430    {
431       return (Fqn) args[MethodDeclarations.isCrudMethod(methodId) ? 1 : 0];
432    }
433
434    private boolean localBackupExists(Fqn fqn)
435    {
436       boolean exists = false;
437
438       for (Node node : getBackupRootCollection())
439       {
440          Fqn newSearchFqn = new Fqn(node.getFqn(), fqn);
441          exists = cache.getRoot().hasChild(newSearchFqn);
442          if (exists) break;
443       }
444
445       return exists;
446    }
447
448    private BackupData localBackupGet(Fqn fqn) throws CacheException
449    {
450       GravitateResult result = cache.gravitateData(fqn, true, false);// a "local" gravitation
451
boolean found = result.getDataFound();
452       BackupData data = null;
453
454       if (found)
455       {
456          Fqn backupFqn = result.getBuddyBackupRegion();
457          List JavaDoc nodeData = result.getSubtree();
458          data = new BackupData(fqn, backupFqn, nodeData);
459          // now the cleanup
460
if (buddyManager.isDataGravitationRemoveOnFind())
461          {
462             // Remove locally only; the remote call will
463
// be broadcast later
464
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
465             cache.removeNode(backupFqn);
466          }
467          else
468          {
469             cache.evict(backupFqn, true);
470          }
471       }
472
473       return data;
474    }
475
476    private Collection JavaDoc<Node> getBackupRootCollection()
477    {
478       NodeSPI backupRoot = cache.peek(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN);
479       return backupRoot == null ? Collections.EMPTY_SET : backupRoot.getChildrenDirect();
480    }
481
482    private static class BackupData
483    {
484       Fqn primaryFqn;
485       Fqn backupFqn;
486       List JavaDoc backupData;
487
488       BackupData(Fqn primary, Fqn backup, List JavaDoc data)
489       {
490          this.primaryFqn = primary;
491          this.backupFqn = backup;
492          this.backupData = data;
493       }
494    }
495
496
497 }
498
Popular Tags