1 7 package org.jboss.cache.interceptors; 8 9 import org.jboss.cache.CacheSPI; 10 import org.jboss.cache.Fqn; 11 import org.jboss.cache.GlobalTransaction; 12 import org.jboss.cache.InvocationContext; 13 import org.jboss.cache.OptimisticTransactionEntry; 14 import org.jboss.cache.TransactionEntry; 15 import org.jboss.cache.TransactionTable; 16 import org.jboss.cache.config.Configuration; 17 import org.jboss.cache.config.Option; 18 import org.jboss.cache.marshall.MethodCall; 19 import org.jboss.cache.marshall.MethodCallFactory; 20 import org.jboss.cache.marshall.MethodDeclarations; 21 import org.jboss.cache.optimistic.TransactionWorkspace; 22 23 import javax.transaction.SystemException ; 24 import javax.transaction.Transaction ; 25 import java.util.HashMap ; 26 import java.util.HashSet ; 27 import java.util.LinkedList ; 28 import java.util.List ; 29 import java.util.Map ; 30 import java.util.Set ; 31 32 44 public class InvalidationInterceptor extends BaseRpcInterceptor implements InvalidationInterceptorMBean 45 { 46 private long m_invalidations = 0; 47 protected TransactionTable txTable; 48 49 public void setCache(CacheSPI cache) 50 { 51 super.setCache(cache); 52 txTable = cache.getTransactionTable(); 53 } 54 55 public Object invoke(MethodCall m) throws Throwable 56 { 57 InvocationContext ctx = cache.getInvocationContext(); 58 Option optionOverride = ctx.getOptionOverrides(); 59 if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null) 60 { 61 return super.invoke(m); 63 } 64 65 Transaction tx = ctx.getTransaction(); 66 Object retval = super.invoke(m); 67 68 if (log.isTraceEnabled()) log.trace("(" + cache.getLocalAddress() + ") method call " + m); 69 70 if (MethodDeclarations.isCrudMethod(m.getMethodId())) 72 { 73 if (log.isDebugEnabled()) log.debug("Is a CRUD method"); 74 Fqn fqn = findFqn(m.getArgs()); 75 if (fqn != null) 76 { 77 if (tx == null || !isValid(tx)) 79 { 80 invalidateAcrossCluster(fqn, null); 83 } 84 } 85 } 86 else 87 { 88 if (tx != null && isValid(tx)) 90 { 91 switch (m.getMethodId()) 93 { 94 case MethodDeclarations.prepareMethod_id: 95 case MethodDeclarations.optimisticPrepareMethod_id: 96 log.debug("Entering InvalidationInterceptor's prepare phase"); 97 GlobalTransaction gtx = ctx.getGlobalTransaction(); 99 TransactionEntry entry = txTable.get(gtx); 100 if (entry == null) throw new IllegalStateException ("cannot find transaction entry for " + gtx); 101 List <MethodCall> modifications = new LinkedList <MethodCall>(entry.getModifications()); 102 103 if (modifications.size() > 0) 104 { 105 try 106 { 107 invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null); 108 } 109 catch (Throwable t) 110 { 111 log.warn("Unable to broadcast evicts as a part of the prepare phase. Rolling back.", t); 112 try 113 { 114 tx.setRollbackOnly(); 115 } 116 catch (SystemException se) 117 { 118 throw new RuntimeException ("setting tx rollback failed ", se); 119 } 120 throw new RuntimeException ("Unable to broadcast invalidation messages", t); 121 } 122 } 123 log.debug("Leaving InvalidationInterceptor's prepare phase"); 124 break; 125 } 126 } 127 128 } 129 return retval; 130 } 131 132 public long getInvalidations() 133 { 134 return m_invalidations; 135 } 136 137 public void resetStatistics() 138 { 139 m_invalidations = 0; 140 } 141 142 public Map <String , Object > dumpStatistics() 143 { 144 Map <String , Object > retval = new HashMap <String , Object >(); 145 retval.put("Invalidations", m_invalidations); 146 return retval; 147 } 148 149 protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace) throws Throwable 150 { 151 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled()) 153 m_invalidations++; 154 155 MethodCall call = workspace != null && !workspace.isVersioningImplicit() ? 157 MethodCallFactory.create(MethodDeclarations.evictVersionedNodeMethodLocal, fqn, workspace.getNode(fqn).getVersion()) : 158 MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, fqn); 159 160 if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() + "] replicating " + call); 161 replicateCall(call, configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC); 163 } 164 165 protected void invalidateModifications(List <MethodCall> modifications, TransactionWorkspace workspace) throws Throwable 166 { 167 Set <Fqn> modifiedFqns = optimisedIterator(modifications); 169 for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace); 170 } 171 172 protected TransactionWorkspace getWorkspace(GlobalTransaction gtx) 173 { 174 OptimisticTransactionEntry entry = (OptimisticTransactionEntry) txTable.get(gtx); 175 return entry.getTransactionWorkSpace(); 176 } 177 178 protected Fqn findFqn(Object [] objects) 179 { 180 return (Fqn) objects[1]; 182 } 183 184 192 protected Set <Fqn> optimisedIterator(List <MethodCall> list) 193 { 194 Set <Fqn> fqns = new HashSet <Fqn>(); 195 for (MethodCall mc : list) 196 { 197 if (MethodDeclarations.isCrudMethod(mc.getMethodId())) 198 { 199 fqns.add(findFqn(mc.getArgs())); 200 } 201 } 202 return fqns; 203 } 204 } 205 | Popular Tags |