1 package org.jboss.cache.interceptors; 2 3 import org.jboss.cache.CacheException; 4 import org.jboss.cache.CacheSPI; 5 import org.jboss.cache.Fqn; 6 import org.jboss.cache.GlobalTransaction; 7 import org.jboss.cache.Modification; 8 import org.jboss.cache.TransactionEntry; 9 import org.jboss.cache.TransactionTable; 10 import org.jboss.cache.config.CacheLoaderConfig; 11 import org.jboss.cache.marshall.MethodCall; 12 import org.jboss.cache.marshall.MethodDeclarations; 13 14 import javax.transaction.TransactionManager ; 15 import java.lang.reflect.Method ; 16 import java.util.ArrayList ; 17 import java.util.Collections ; 18 import java.util.HashMap ; 19 import java.util.Iterator ; 20 import java.util.List ; 21 import java.util.Map ; 22 import java.util.Set ; 23 import java.util.concurrent.ConcurrentHashMap ; 24 25 32 public class CacheStoreInterceptor extends BaseCacheLoaderInterceptor implements CacheStoreInterceptorMBean 33 { 34 35 protected CacheLoaderConfig loaderConfig = null; 36 protected TransactionManager tx_mgr = null; 37 protected TransactionTable tx_table = null; 38 private HashMap m_txStores = new HashMap (); 39 private Map preparingTxs = new ConcurrentHashMap (); 40 private long m_cacheStores = 0; 41 42 public void setCache(CacheSPI cache) 43 { 44 super.setCache(cache); 45 this.loaderConfig = cache.getCacheLoaderManager().getCacheLoaderConfig(); 46 tx_mgr = cache.getTransactionManager(); 47 tx_table = cache.getTransactionTable(); 48 } 49 50 58 public Object invoke(MethodCall m) throws Throwable 59 { 60 61 64 if (!cache.getInvocationContext().isOriginLocal() && loaderConfig.isShared()) 65 { 66 log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely."); 67 return super.invoke(m); 68 } 69 70 Fqn fqn; 71 Object key, value; 72 Object [] args = m.getArgs(); 73 Object retval, tmp_retval = null; 74 boolean use_tmp_retval = false; 75 76 77 if (log.isTraceEnabled()) 78 { 79 log.trace("CacheStoreInterceptor called with meth " + m); 80 } 81 82 if (tx_mgr != null && tx_mgr.getTransaction() != null) 83 { 84 log.trace("transactional so don't put stuff in the cloader yet."); 86 GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction(); 87 switch (m.getMethodId()) 88 { 89 case MethodDeclarations.commitMethod_id: 90 if (cache.getInvocationContext().isTxHasMods()) 91 { 92 if (log.isTraceEnabled()) log.trace("Calling loader.commit() for gtx " + gtx); 94 List fqnsModified = getFqnsFromModificationList(tx_table.get(gtx).getCacheLoaderModifications()); 96 obtainLoaderLocks(fqnsModified); 97 try 98 { 99 loader.commit(gtx); 100 } 101 finally 102 { 103 releaseLoaderLocks(fqnsModified); 104 preparingTxs.remove(gtx); 105 } 106 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 107 { 108 Integer puts = (Integer ) m_txStores.get(gtx); 109 if (puts != null) 110 { 111 m_cacheStores = m_cacheStores + puts; 112 } 113 m_txStores.remove(gtx); 114 } 115 } 116 else 117 { 118 log.trace("Commit called with no modifications; ignoring."); 119 } 120 break; 121 case MethodDeclarations.rollbackMethod_id: 122 if (cache.getInvocationContext().isTxHasMods()) 123 { 124 if (preparingTxs.containsKey(gtx)) 126 { 127 preparingTxs.remove(gtx); 128 loader.rollback(gtx); 129 } 130 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 131 { 132 m_txStores.remove(gtx); 133 } 134 } 135 else 136 { 137 log.trace("Rollback called with no modifications; ignoring."); 138 } 139 break; 140 case MethodDeclarations.optimisticPrepareMethod_id: 141 case MethodDeclarations.prepareMethod_id: 142 prepareCacheLoader(gtx, isOnePhaseCommitPrepareMehod(m)); 143 break; 144 } 145 146 return super.invoke(m); 148 } 149 150 152 switch (m.getMethodId()) 156 { 157 case MethodDeclarations.removeNodeMethodLocal_id: 158 fqn = (Fqn) args[1]; 159 obtainLoaderLock(fqn); 160 try 161 { 162 loader.remove(fqn); 163 } 164 finally 165 { 166 releaseLoaderLock(fqn); 167 } 168 break; 169 case MethodDeclarations.removeKeyMethodLocal_id: 170 fqn = (Fqn) args[1]; 171 key = args[2]; 172 obtainLoaderLock(fqn); 173 try 174 { 175 tmp_retval = loader.remove(fqn, key); 176 use_tmp_retval = true; 177 } 178 finally 179 { 180 releaseLoaderLock(fqn); 181 } 182 break; 183 case MethodDeclarations.removeDataMethodLocal_id: 184 fqn = (Fqn) args[1]; 185 obtainLoaderLock(fqn); 186 try 187 { 188 loader.removeData(fqn); 189 } 190 finally 191 { 192 releaseLoaderLock(fqn); 193 } 194 break; 195 } 196 198 retval = super.invoke(m); 199 200 switch (m.getMethodId()) 203 { 204 case MethodDeclarations.moveMethodLocal_id: 205 doMove((Fqn) args[0], (Fqn) args[1]); 206 break; 207 case MethodDeclarations.putDataMethodLocal_id: 208 case MethodDeclarations.putDataEraseMethodLocal_id: 209 Modification mod = convertMethodCallToModification(m); 210 log.debug(mod); 211 fqn = mod.getFqn(); 212 obtainLoaderLock(fqn); 213 try 214 { 215 loader.put(Collections.singletonList(mod)); 216 } 217 finally 218 { 219 releaseLoaderLock(fqn); 220 } 221 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 222 { 223 m_cacheStores++; 224 } 225 break; 226 case MethodDeclarations.putKeyValMethodLocal_id: 227 fqn = (Fqn) args[1]; 228 key = args[2]; 229 value = args[3]; 230 obtainLoaderLock(fqn); 231 try 232 { 233 tmp_retval = loader.put(fqn, key, value); 234 use_tmp_retval = true; 235 } 236 finally 237 { 238 releaseLoaderLock(fqn); 239 } 240 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 241 { 242 m_cacheStores++; 243 } 244 break; 245 } 246 248 if (use_tmp_retval) 249 { 250 return tmp_retval; 251 } 252 else 253 { 254 return retval; 255 } 256 } 257 258 private void doMove(Fqn node, Fqn parent) throws Exception 259 { 260 Fqn newNodeFqn = new Fqn(parent, node.getLastElement()); 261 265 recursiveMove(node, newNodeFqn); 266 try 267 { 268 obtainLoaderLock(node); 269 loader.remove(node); 270 } 271 finally 272 { 273 releaseLoaderLock(node); 274 } 275 } 276 277 private void recursiveMove(Fqn fqn, Fqn newFqn) throws Exception 278 { 279 List fqns = new ArrayList (); 280 fqns.add(fqn); 281 fqns.add(newFqn); 282 obtainLoaderLocks(fqns); 283 try 284 { 285 loader.put(newFqn, loader.get(fqn)); 286 Set childrenNames = loader.getChildrenNames(fqn); 288 if (childrenNames != null) 289 { 290 for (Object child : childrenNames) 291 { 292 recursiveMove(new Fqn(fqn, child), new Fqn(newFqn, child)); 293 } 294 } 295 } 296 finally 297 { 298 releaseLoaderLocks(fqns); 299 } 300 } 301 302 private List getFqnsFromModificationList(List <MethodCall> modifications) 303 { 304 List <Fqn> fqnList = new ArrayList <Fqn>(); 305 306 for (MethodCall mc : modifications) 307 { 308 Fqn fqn = findFqn(mc.getArgs()); 309 if (fqn != null && !fqnList.contains(fqn)) fqnList.add(fqn); 310 } 311 return fqnList; 312 } 313 314 private Fqn findFqn(Object [] o) 315 { 316 for (int i = 0; i < o.length; i++) 317 { 318 if (o[i] instanceof Fqn) return (Fqn) o[i]; 319 } 320 return null; 321 } 322 323 public long getCacheLoaderStores() 324 { 325 return m_cacheStores; 326 } 327 328 public void resetStatistics() 329 { 330 m_cacheStores = 0; 331 } 332 333 public Map <String , Object > dumpStatistics() 334 { 335 Map <String , Object > retval = new HashMap <String , Object >(); 336 retval.put("CacheLoaderStores", m_cacheStores); 337 return retval; 338 } 339 340 private void prepareCacheLoader(GlobalTransaction gtx, boolean onePhase) throws Exception 341 { 342 List modifications; 343 TransactionEntry entry; 344 int txPuts = 0; 345 346 entry = tx_table.get(gtx); 347 if (entry == null) 348 { 349 throw new Exception ("entry for transaction " + gtx + " not found in transaction table"); 350 } 351 modifications = entry.getCacheLoaderModifications(); 352 if (modifications.size() == 0) 353 { 354 return; 355 } 356 List cache_loader_modifications = new ArrayList (); 357 for (Iterator it = modifications.iterator(); it.hasNext();) 358 { 359 MethodCall methodCall = (MethodCall) it.next(); 360 Modification mod = convertMethodCallToModification(methodCall); 361 cache_loader_modifications.add(mod); 362 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 363 { 364 if ((mod.getType() == Modification.ModificationType.PUT_DATA) || 365 (mod.getType() == Modification.ModificationType.PUT_DATA_ERASE) || 366 (mod.getType() == Modification.ModificationType.PUT_KEY_VALUE)) 367 { 368 txPuts++; 369 } 370 } 371 } 372 if (log.isTraceEnabled()) 373 { 374 log.trace("Converted method calls to cache loader modifications. List size: " + cache_loader_modifications.size()); 375 } 376 if (cache_loader_modifications.size() > 0) 377 { 378 loader.prepare(gtx, cache_loader_modifications, onePhase); 379 preparingTxs.put(gtx, gtx); 380 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled() && txPuts > 0) 381 { 382 m_txStores.put(gtx, txPuts); 383 } 384 } 385 } 386 387 protected Modification convertMethodCallToModification(MethodCall methodCall) throws Exception 388 { 389 Method method = methodCall.getMethod(); 390 Object [] args; 391 if (method == null) 392 { 393 throw new Exception ("method call has no method: " + methodCall); 394 } 395 396 args = methodCall.getArgs(); 397 Modification mod = null; 398 switch (methodCall.getMethodId()) 399 { 400 case MethodDeclarations.putDataMethodLocal_id: 401 mod = new Modification(Modification.ModificationType.PUT_DATA, 402 (Fqn) args[1], (Map ) args[2]); break; 405 case MethodDeclarations.putDataEraseMethodLocal_id: 406 mod = new Modification(Modification.ModificationType.PUT_DATA_ERASE, 407 (Fqn) args[1], (Map ) args[2]); break; 410 case MethodDeclarations.putKeyValMethodLocal_id: 411 mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE, 412 (Fqn) args[1], args[2], args[3]); break; 416 case MethodDeclarations.removeNodeMethodLocal_id: 417 mod = new Modification(Modification.ModificationType.REMOVE_NODE, 418 (Fqn) args[1]); break; 420 case MethodDeclarations.removeKeyMethodLocal_id: 421 mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE, 422 (Fqn) args[1], args[2]); break; 425 case MethodDeclarations.removeDataMethodLocal_id: 426 mod = new Modification(Modification.ModificationType.REMOVE_DATA, 427 (Fqn) args[1]); break; 429 case MethodDeclarations.moveMethodLocal_id: 430 mod = new Modification(Modification.ModificationType.MOVE, (Fqn) args[0], (Fqn) args[1]); 431 break; 432 default: 433 throw new CacheException("method call " + method.getName() + " cannot be converted to a modification"); 434 } 435 436 if (log.isTraceEnabled()) log.trace("Converted " + methodCall + " to Modification of type " + mod.getType()); 437 return mod; 438 } 439 } 440 | Popular Tags |