1 7 package org.jboss.cache.interceptors; 8 9 import org.jboss.cache.CacheException; 10 import org.jboss.cache.Fqn; 11 import org.jboss.cache.GlobalTransaction; 12 import org.jboss.cache.InvocationContext; 13 import org.jboss.cache.OptimisticTransactionEntry; 14 import org.jboss.cache.config.Configuration; 15 import org.jboss.cache.config.Option; 16 import org.jboss.cache.marshall.MethodCall; 17 import org.jboss.cache.marshall.MethodCallFactory; 18 import org.jboss.cache.marshall.MethodDeclarations; 19 import org.jboss.cache.optimistic.DataVersion; 20 import org.jboss.cache.optimistic.DefaultDataVersion; 21 import org.jboss.cache.optimistic.TransactionWorkspace; 22 import org.jboss.cache.optimistic.WorkspaceNode; 23 24 import java.util.ArrayList ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 import java.util.Map ; 28 import java.util.concurrent.ConcurrentHashMap ; 29 30 36 public class OptimisticReplicationInterceptor extends BaseRpcInterceptor 37 { 38 39 private Map broadcastTxs = new ConcurrentHashMap (); 42 43 public Object invoke(MethodCall m) throws Throwable 44 { 45 if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m); 47 48 InvocationContext ctx = cache.getInvocationContext(); 49 Option optionOverride = ctx.getOptionOverrides(); 50 if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null) 51 { 52 return super.invoke(m); 54 } 55 56 Object retval; 57 58 if (ctx.getTransaction() != null) 60 { 61 62 GlobalTransaction gtx = ctx.getGlobalTransaction(); 64 if (gtx == null) 65 { 66 throw new CacheException("failed to get global transaction"); 67 } 68 log.debug(" received method " + m); 69 70 73 switch (m.getMethodId()) 74 { 75 case MethodDeclarations.optimisticPrepareMethod_id: 76 retval = super.invoke(m); 78 79 if (!gtx.isRemote()) 80 { 81 retval = broadcastPrepare(m, gtx); 83 if (retval instanceof Throwable ) 85 { 86 throw (Throwable ) retval; 87 } 88 } 89 break; 90 case MethodDeclarations.commitMethod_id: 91 Throwable temp = null; 93 if (!gtx.isRemote() && broadcastTxs.containsKey(gtx)) 94 { 95 try 97 { 98 broadcastCommit(gtx); 99 } 100 catch (Throwable t) 101 { 102 log.error(" a problem occurred with remote commit", t); 103 temp = t; 104 } 105 } 106 107 retval = super.invoke(m); 108 if (temp != null) 109 { 110 throw temp; 111 } 112 break; 113 case MethodDeclarations.rollbackMethod_id: 114 Throwable temp2 = null; 116 if (!gtx.isRemote() && broadcastTxs.containsKey(gtx)) 117 { 118 try 120 { 121 broadcastRollback(gtx); 122 } 123 catch (Throwable t) 124 { 125 log.error(" a problem occurred with remote rollback", t); 126 temp2 = t; 127 } 128 129 } 130 retval = super.invoke(m); 131 if (temp2 != null) 132 { 133 throw temp2; 134 } 135 break; 136 default: 137 log.debug(" received method " + m + " not handling"); 139 retval = super.invoke(m); 140 break; 141 } 142 } 143 else 144 { 145 throw new CacheException("transaction does not exist"); 146 } 147 return retval; 148 } 149 150 protected Object broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable 151 { 152 boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC; 153 154 Object [] args = methodCall.getArgs(); 155 List modifications = (List ) args[1]; 156 int num_mods = modifications != null ? modifications.size() : 0; 157 158 161 if (cache.getMembers() != null && cache.getMembers().size() > 1) 162 { 163 164 MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx)); 166 167 broadcastTxs.put(gtx, gtx); 169 if (log.isDebugEnabled()) 170 { 171 log.debug("(" + cache.getLocalAddress() 172 + "): broadcasting prepare for " + gtx 173 + " (" + num_mods + " modifications"); 174 } 175 176 replicateCall(toBroadcast, remoteCallSync); 177 } 178 else 179 { 180 if (log.isDebugEnabled()) 182 { 183 log.debug("(" + cache.getLocalAddress() 184 + "):not broadcasting prepare as members are " + cache.getMembers()); 185 } 186 } 187 return null; 188 } 189 190 191 protected void broadcastCommit(GlobalTransaction gtx) throws Throwable 192 { 193 boolean remoteCallSync = configuration.isSyncCommitPhase(); 194 195 if (cache.getMembers() != null && cache.getMembers().size() > 1) 197 { 198 try 199 { 200 broadcastTxs.remove(gtx); 201 MethodCall commit_method = MethodCallFactory.create(MethodDeclarations.commitMethod, 202 gtx); 203 204 log.debug("running remote commit for " + gtx 205 + " and coord=" + cache.getLocalAddress()); 206 207 replicateCall(commit_method, remoteCallSync); 208 } 209 catch (Exception e) 210 { 211 log.fatal("commit failed", e); 212 throw e; 213 } 214 } 215 else 216 { 217 } 219 } 220 221 protected void broadcastRollback(GlobalTransaction gtx) throws Throwable 222 { 223 boolean remoteCallSync = configuration.isSyncRollbackPhase(); 224 225 if (cache.getMembers() != null && cache.getMembers().size() > 1) 226 { 227 try 229 { 230 broadcastTxs.remove(gtx); 231 MethodCall rollback_method = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx); 232 233 log.debug("running remote rollback for " + gtx 234 + " and coord=" + cache.getLocalAddress()); 235 replicateCall(rollback_method, remoteCallSync); 236 237 } 238 catch (Exception e) 239 { 240 log.error("rollback failed", e); 241 throw e; 242 } 243 } 244 } 245 246 private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w) 247 { 248 Object [] origArgs = m.getArgs(); 249 return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List ) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]); 250 } 251 252 255 private List translate(List l, TransactionWorkspace w) 256 { 257 List newList = new ArrayList (); 258 Iterator origCalls = l.iterator(); 259 while (origCalls.hasNext()) 260 { 261 MethodCall origCall = (MethodCall) origCalls.next(); 262 if (MethodDeclarations.isDataGravitationMethod(origCall.getMethodId())) 263 { 264 newList.add(origCall); 266 } 267 else 268 { 269 Object [] origArgs = origCall.getArgs(); 270 272 Fqn fqn = (Fqn) origArgs[1]; 274 DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn); 276 277 Object [] newArgs = new Object [origArgs.length + 1]; 280 for (int i = 0; i < origArgs.length; i++) newArgs[i] = origArgs[i]; 281 newArgs[origArgs.length] = versionToBroadcast; 282 283 MethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs); 285 286 newList.add(newCall); 288 } 289 } 290 return newList; 291 } 292 293 297 private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f) 298 { 299 WorkspaceNode n = w.getNode(f); 300 if (n == null) 301 { 302 if (log.isTraceEnabled()) log.trace("Fqn " + f + " not found in workspace; not using a data version."); 303 return null; 304 } 305 if (n.isVersioningImplicit()) 306 { 307 DefaultDataVersion v = (DefaultDataVersion) n.getVersion(); 308 if (log.isTraceEnabled()) 309 log.trace("Fqn " + f + " has implicit versioning. Broadcasting an incremented version."); 310 return v.increment(); 311 } 312 else 313 { 314 if (log.isTraceEnabled()) log.trace("Fqn " + f + " has explicit versioning. Broadcasting the version as-is."); 315 return n.getVersion(); 316 } 317 } 318 319 protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException 320 { 321 OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx); 322 323 if (transactionEntry == null) 324 { 325 throw new CacheException("unable to map global transaction " + gtx + " to transaction entry"); 326 } 327 328 return transactionEntry.getTransactionWorkSpace(); 330 } 331 332 } | Popular Tags |