KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > CacheStoreInterceptor


1 package org.jboss.cache.interceptors;
2
3 import org.jboss.cache.CacheException;
4 import org.jboss.cache.CacheSPI;
5 import org.jboss.cache.Fqn;
6 import org.jboss.cache.GlobalTransaction;
7 import org.jboss.cache.Modification;
8 import org.jboss.cache.TransactionEntry;
9 import org.jboss.cache.TransactionTable;
10 import org.jboss.cache.config.CacheLoaderConfig;
11 import org.jboss.cache.marshall.MethodCall;
12 import org.jboss.cache.marshall.MethodDeclarations;
13
14 import javax.transaction.TransactionManager JavaDoc;
15 import java.lang.reflect.Method JavaDoc;
16 import java.util.ArrayList JavaDoc;
17 import java.util.Collections JavaDoc;
18 import java.util.HashMap JavaDoc;
19 import java.util.Iterator JavaDoc;
20 import java.util.List JavaDoc;
21 import java.util.Map JavaDoc;
22 import java.util.Set JavaDoc;
23 import java.util.concurrent.ConcurrentHashMap JavaDoc;
24
25 /**
26  * Writes modifications back to the store on the way out: stores modifications back
27  * through the CacheLoader, either after each method call (no TXs), or at TX commit.
28  *
29  * @author Bela Ban
30  * @version $Id: CacheStoreInterceptor.java,v 1.43 2006/12/31 03:01:20 msurtani Exp $
31  */

32 public class CacheStoreInterceptor extends BaseCacheLoaderInterceptor implements CacheStoreInterceptorMBean
33 {
34
35    protected CacheLoaderConfig loaderConfig = null;
36    protected TransactionManager JavaDoc tx_mgr = null;
37    protected TransactionTable tx_table = null;
38    private HashMap JavaDoc m_txStores = new HashMap JavaDoc();
39    private Map JavaDoc preparingTxs = new ConcurrentHashMap JavaDoc();
40    private long m_cacheStores = 0;
41
42    public void setCache(CacheSPI cache)
43    {
44       super.setCache(cache);
45       this.loaderConfig = cache.getCacheLoaderManager().getCacheLoaderConfig();
46       tx_mgr = cache.getTransactionManager();
47       tx_table = cache.getTransactionTable();
48    }
49
50    /**
51     * Pass the method on. When it returns, store the modification back to the store using the CacheLoader.
52     * In case of a transaction, register for TX completion (2PC) and at TX commit, write modifications made
53     * under the given TX to the CacheLoader
54     *
55     * @return
56     * @throws Throwable
57     */

58    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc
59    {
60
61       // if this is a shared cache loader and the call is of remote origin, pass up the chain. - Manik
62
// see http://www.jboss.com/index.html?module=bb&op=viewtopic&t=76090
63

64       if (!cache.getInvocationContext().isOriginLocal() && loaderConfig.isShared())
65       {
66          log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely.");
67          return super.invoke(m);
68       }
69
70       Fqn fqn;
71       Object JavaDoc key, value;
72       Object JavaDoc[] args = m.getArgs();
73       Object JavaDoc retval, tmp_retval = null;
74       boolean use_tmp_retval = false;
75
76
77       if (log.isTraceEnabled())
78       {
79          log.trace("CacheStoreInterceptor called with meth " + m);
80       }
81
82       if (tx_mgr != null && tx_mgr.getTransaction() != null)
83       {
84          // we have a tx running.
85
log.trace("transactional so don't put stuff in the cloader yet.");
86          GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction();
87          switch (m.getMethodId())
88          {
89             case MethodDeclarations.commitMethod_id:
90                if (cache.getInvocationContext().isTxHasMods())
91                {
92                   // this is a commit call.
93
if (log.isTraceEnabled()) log.trace("Calling loader.commit() for gtx " + gtx);
94                   // sync call (a write) on the loader
95
List JavaDoc fqnsModified = getFqnsFromModificationList(tx_table.get(gtx).getCacheLoaderModifications());
96                   obtainLoaderLocks(fqnsModified);
97                   try
98                   {
99                      loader.commit(gtx);
100                   }
101                   finally
102                   {
103                      releaseLoaderLocks(fqnsModified);
104                      preparingTxs.remove(gtx);
105                   }
106                   if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
107                   {
108                      Integer JavaDoc puts = (Integer JavaDoc) m_txStores.get(gtx);
109                      if (puts != null)
110                      {
111                         m_cacheStores = m_cacheStores + puts;
112                      }
113                      m_txStores.remove(gtx);
114                   }
115                }
116                else
117                {
118                   log.trace("Commit called with no modifications; ignoring.");
119                }
120                break;
121             case MethodDeclarations.rollbackMethod_id:
122                if (cache.getInvocationContext().isTxHasMods())
123                {
124                   // this is a rollback method
125
if (preparingTxs.containsKey(gtx))
126                   {
127                      preparingTxs.remove(gtx);
128                      loader.rollback(gtx);
129                   }
130                   if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
131                   {
132                      m_txStores.remove(gtx);
133                   }
134                }
135                else
136                {
137                   log.trace("Rollback called with no modifications; ignoring.");
138                }
139                break;
140             case MethodDeclarations.optimisticPrepareMethod_id:
141             case MethodDeclarations.prepareMethod_id:
142                prepareCacheLoader(gtx, isOnePhaseCommitPrepareMehod(m));
143                break;
144          }
145
146          // pass up the chain
147
return super.invoke(m);
148       }
149
150       // if we're here we don't run in a transaction
151

152       // remove() methods need to be applied to the CacheLoader before passing up the call: a listener might
153
// access an element just removed, causing the CacheLoader to *load* the element before *removing* it.
154
// synchronized(this) {
155
switch (m.getMethodId())
156       {
157          case MethodDeclarations.removeNodeMethodLocal_id:
158             fqn = (Fqn) args[1];
159             obtainLoaderLock(fqn);
160             try
161             {
162                loader.remove(fqn);
163             }
164             finally
165             {
166                releaseLoaderLock(fqn);
167             }
168             break;
169          case MethodDeclarations.removeKeyMethodLocal_id:
170             fqn = (Fqn) args[1];
171             key = args[2];
172             obtainLoaderLock(fqn);
173             try
174             {
175                tmp_retval = loader.remove(fqn, key);
176                use_tmp_retval = true;
177             }
178             finally
179             {
180                releaseLoaderLock(fqn);
181             }
182             break;
183          case MethodDeclarations.removeDataMethodLocal_id:
184             fqn = (Fqn) args[1];
185             obtainLoaderLock(fqn);
186             try
187             {
188                loader.removeData(fqn);
189             }
190             finally
191             {
192                releaseLoaderLock(fqn);
193             }
194             break;
195       }
196       // }
197

198       retval = super.invoke(m);
199
200       // put() methods need to be applied *after* the call
201
// synchronized(this) {
202
switch (m.getMethodId())
203       {
204          case MethodDeclarations.moveMethodLocal_id:
205             doMove((Fqn) args[0], (Fqn) args[1]);
206             break;
207          case MethodDeclarations.putDataMethodLocal_id:
208          case MethodDeclarations.putDataEraseMethodLocal_id:
209             Modification mod = convertMethodCallToModification(m);
210             log.debug(mod);
211             fqn = mod.getFqn();
212             obtainLoaderLock(fqn);
213             try
214             {
215                loader.put(Collections.singletonList(mod));
216             }
217             finally
218             {
219                releaseLoaderLock(fqn);
220             }
221             if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
222             {
223                m_cacheStores++;
224             }
225             break;
226          case MethodDeclarations.putKeyValMethodLocal_id:
227             fqn = (Fqn) args[1];
228             key = args[2];
229             value = args[3];
230             obtainLoaderLock(fqn);
231             try
232             {
233                tmp_retval = loader.put(fqn, key, value);
234                use_tmp_retval = true;
235             }
236             finally
237             {
238                releaseLoaderLock(fqn);
239             }
240             if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
241             {
242                m_cacheStores++;
243             }
244             break;
245       }
246       // }
247

248       if (use_tmp_retval)
249       {
250          return tmp_retval;
251       }
252       else
253       {
254          return retval;
255       }
256    }
257
258    private void doMove(Fqn node, Fqn parent) throws Exception JavaDoc
259    {
260       Fqn newNodeFqn = new Fqn(parent, node.getLastElement());
261       //UnversionedNode n = (UnversionedNode) ((TreeCacheProxyImpl) cache).peek(newNodeFqn);
262
//recursiveMove(n);
263
// throw new RuntimeException("Implement me!");
264

265       recursiveMove(node, newNodeFqn);
266       try
267       {
268          obtainLoaderLock(node);
269          loader.remove(node);
270       }
271       finally
272       {
273          releaseLoaderLock(node);
274       }
275    }
276
277    private void recursiveMove(Fqn fqn, Fqn newFqn) throws Exception JavaDoc
278    {
279       List JavaDoc fqns = new ArrayList JavaDoc();
280       fqns.add(fqn);
281       fqns.add(newFqn);
282       obtainLoaderLocks(fqns);
283       try
284       {
285          loader.put(newFqn, loader.get(fqn));
286          //recurse
287
Set JavaDoc childrenNames = loader.getChildrenNames(fqn);
288          if (childrenNames != null)
289          {
290             for (Object JavaDoc child : childrenNames)
291             {
292                recursiveMove(new Fqn(fqn, child), new Fqn(newFqn, child));
293             }
294          }
295       }
296       finally
297       {
298          releaseLoaderLocks(fqns);
299       }
300    }
301
302    private List JavaDoc getFqnsFromModificationList(List JavaDoc<MethodCall> modifications)
303    {
304       List JavaDoc<Fqn> fqnList = new ArrayList JavaDoc<Fqn>();
305
306       for (MethodCall mc : modifications)
307       {
308          Fqn fqn = findFqn(mc.getArgs());
309          if (fqn != null && !fqnList.contains(fqn)) fqnList.add(fqn);
310       }
311       return fqnList;
312    }
313
314    private Fqn findFqn(Object JavaDoc[] o)
315    {
316       for (int i = 0; i < o.length; i++)
317       {
318          if (o[i] instanceof Fqn) return (Fqn) o[i];
319       }
320       return null;
321    }
322
323    public long getCacheLoaderStores()
324    {
325       return m_cacheStores;
326    }
327
328    public void resetStatistics()
329    {
330       m_cacheStores = 0;
331    }
332
333    public Map JavaDoc<String JavaDoc, Object JavaDoc> dumpStatistics()
334    {
335       Map JavaDoc<String JavaDoc, Object JavaDoc> retval = new HashMap JavaDoc<String JavaDoc, Object JavaDoc>();
336       retval.put("CacheLoaderStores", m_cacheStores);
337       return retval;
338    }
339
340    private void prepareCacheLoader(GlobalTransaction gtx, boolean onePhase) throws Exception JavaDoc
341    {
342       List JavaDoc modifications;
343       TransactionEntry entry;
344       int txPuts = 0;
345
346       entry = tx_table.get(gtx);
347       if (entry == null)
348       {
349          throw new Exception JavaDoc("entry for transaction " + gtx + " not found in transaction table");
350       }
351       modifications = entry.getCacheLoaderModifications();
352       if (modifications.size() == 0)
353       {
354          return;
355       }
356       List JavaDoc cache_loader_modifications = new ArrayList JavaDoc();
357       for (Iterator JavaDoc it = modifications.iterator(); it.hasNext();)
358       {
359          MethodCall methodCall = (MethodCall) it.next();
360          Modification mod = convertMethodCallToModification(methodCall);
361          cache_loader_modifications.add(mod);
362          if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
363          {
364             if ((mod.getType() == Modification.ModificationType.PUT_DATA) ||
365                     (mod.getType() == Modification.ModificationType.PUT_DATA_ERASE) ||
366                     (mod.getType() == Modification.ModificationType.PUT_KEY_VALUE))
367             {
368                txPuts++;
369             }
370          }
371       }
372       if (log.isTraceEnabled())
373       {
374          log.trace("Converted method calls to cache loader modifications. List size: " + cache_loader_modifications.size());
375       }
376       if (cache_loader_modifications.size() > 0)
377       {
378          loader.prepare(gtx, cache_loader_modifications, onePhase);
379          preparingTxs.put(gtx, gtx);
380          if (configuration.getExposeManagementStatistics() && getStatisticsEnabled() && txPuts > 0)
381          {
382             m_txStores.put(gtx, txPuts);
383          }
384       }
385    }
386
387    protected Modification convertMethodCallToModification(MethodCall methodCall) throws Exception JavaDoc
388    {
389       Method JavaDoc method = methodCall.getMethod();
390       Object JavaDoc[] args;
391       if (method == null)
392       {
393          throw new Exception JavaDoc("method call has no method: " + methodCall);
394       }
395
396       args = methodCall.getArgs();
397       Modification mod = null;
398       switch (methodCall.getMethodId())
399       {
400          case MethodDeclarations.putDataMethodLocal_id:
401             mod = new Modification(Modification.ModificationType.PUT_DATA,
402                     (Fqn) args[1], // fqn
403
(Map JavaDoc) args[2]);// data
404
break;
405          case MethodDeclarations.putDataEraseMethodLocal_id:
406             mod = new Modification(Modification.ModificationType.PUT_DATA_ERASE,
407                     (Fqn) args[1], // fqn
408
(Map JavaDoc) args[2]);// data
409
break;
410          case MethodDeclarations.putKeyValMethodLocal_id:
411             mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE,
412                     (Fqn) args[1], // fqn
413
args[2], // key
414
args[3]);// value
415
break;
416          case MethodDeclarations.removeNodeMethodLocal_id:
417             mod = new Modification(Modification.ModificationType.REMOVE_NODE,
418                     (Fqn) args[1]);// fqn
419
break;
420          case MethodDeclarations.removeKeyMethodLocal_id:
421             mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE,
422                     (Fqn) args[1], // fqn
423
args[2]);// key
424
break;
425          case MethodDeclarations.removeDataMethodLocal_id:
426             mod = new Modification(Modification.ModificationType.REMOVE_DATA,
427                     (Fqn) args[1]);// fqn
428
break;
429          case MethodDeclarations.moveMethodLocal_id:
430             mod = new Modification(Modification.ModificationType.MOVE, (Fqn) args[0], (Fqn) args[1]);
431             break;
432          default:
433             throw new CacheException("method call " + method.getName() + " cannot be converted to a modification");
434       }
435
436       if (log.isTraceEnabled()) log.trace("Converted " + methodCall + " to Modification of type " + mod.getType());
437       return mod;
438    }
439 }
440
Popular Tags