1 7 package org.jboss.cache.interceptors; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.CacheException; 12 import org.jboss.cache.CacheSPI; 13 import org.jboss.cache.Fqn; 14 import org.jboss.cache.GlobalTransaction; 15 import org.jboss.cache.InvocationContext; 16 import org.jboss.cache.Node; 17 import org.jboss.cache.NodeSPI; 18 import org.jboss.cache.TransactionEntry; 19 import org.jboss.cache.buddyreplication.BuddyManager; 20 import org.jboss.cache.buddyreplication.GravitateResult; 21 import org.jboss.cache.config.Configuration; 22 import org.jboss.cache.loader.NodeData; 23 import org.jboss.cache.marshall.MethodCall; 24 import org.jboss.cache.marshall.MethodCallFactory; 25 import org.jboss.cache.marshall.MethodDeclarations; 26 import org.jboss.util.stream.MarshalledValueInputStream; 27 import org.jgroups.Address; 28 import org.jgroups.blocks.GroupRequest; 29 30 import java.io.ByteArrayInputStream ; 31 import java.util.ArrayList ; 32 import java.util.Collection ; 33 import java.util.Collections ; 34 import java.util.Iterator ; 35 import java.util.List ; 36 import java.util.Map ; 37 import java.util.concurrent.ConcurrentHashMap ; 38 39 54 public class DataGravitatorInterceptor extends BaseRpcInterceptor 55 { 56 private BuddyManager buddyManager; 57 private boolean syncCommunications = false; 58 private Log log = LogFactory.getLog(DataGravitatorInterceptor.class); 59 private Map transactionMods = new ConcurrentHashMap (); 60 61 public void setCache(CacheSPI cache) 62 { 63 super.setCache(cache); 64 this.buddyManager = cache.getBuddyManager(); 65 syncCommunications = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC; 66 } 67 68 public Object invoke(MethodCall m) throws Throwable 69 { 70 if (log.isTraceEnabled()) log.trace("Invoked with method call " + m); 76 77 if (!MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId())) 79 { 80 if (isGravitationEnabled(cache.getInvocationContext())) 81 { 82 Fqn fqn = extractFqn(m.getMethodId(), m.getArgs()); 84 if (log.isTraceEnabled()) log.trace("Checking local existence of fqn " + fqn); 85 if (BuddyManager.isBackupFqn(fqn)) 86 { 87 log.info("Is call for a backup Fqn, not performing any gravitation. Direct calls on internal backup nodes are *not* supported."); 88 } 89 else 90 { 91 if (!cache.getRoot().hasChild(fqn)) 92 { 93 BackupData data; 94 95 if (localBackupExists(fqn)) 97 { 98 log.trace("Gravitating from local backup tree"); 99 data = localBackupGet(fqn); 100 } 101 else 102 { 103 log.trace("Gravitating from remote backup tree"); 104 data = remoteBackupGet(fqn); 106 } 107 108 if (data != null) 109 { 110 createNode(true, data.backupData); 113 log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established."); 115 createNode(false, data.backupData); 116 117 cleanBackupData(data); 119 } 120 } 121 } 122 } 123 else 124 { 125 if (log.isTraceEnabled()) 126 { 127 log.trace("Suppressing data gravitation for this call."); 128 } 129 } 130 } 131 else 132 { 133 134 try 135 { 136 switch (m.getMethodId()) 137 { 138 case MethodDeclarations.prepareMethod_id: 139 case MethodDeclarations.optimisticPrepareMethod_id: 140 Object o = super.invoke(m); 141 doPrepare(cache.getInvocationContext().getGlobalTransaction()); 142 return o; 143 case MethodDeclarations.rollbackMethod_id: 144 transactionMods.remove(cache.getInvocationContext().getGlobalTransaction()); 145 return super.invoke(m); 146 case MethodDeclarations.commitMethod_id: 147 doCommit(cache.getInvocationContext().getGlobalTransaction()); 148 transactionMods.remove(cache.getInvocationContext().getGlobalTransaction()); 149 return super.invoke(m); 150 } 151 } 152 catch (Throwable throwable) 153 { 154 transactionMods.remove(cache.getInvocationContext().getGlobalTransaction()); 155 throw throwable; 156 } 157 } 158 return super.invoke(m); 166 } 167 168 private boolean isGravitationEnabled(InvocationContext ctx) 169 { 170 boolean enabled = ctx.isOriginLocal(); 171 if (enabled) 172 { 173 if (!buddyManager.isAutoDataGravitation()) 174 { 175 enabled = ctx.getOptionOverrides().getForceDataGravitation(); 176 } 177 } 178 return enabled; 179 } 180 181 private void doPrepare(GlobalTransaction gtx) throws Throwable 182 { 183 MethodCall cleanup = (MethodCall) transactionMods.get(gtx); 184 if (log.isTraceEnabled()) log.trace("Broadcasting prepare for cleanup ops " + cleanup); 185 if (cleanup != null) 186 { 187 MethodCall prepare; 188 List mods = new ArrayList (1); 189 mods.add(cleanup); 190 if (configuration.isNodeLockingOptimistic()) 191 { 192 prepare = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, mods, null, cache.getLocalAddress(), false); 193 } 194 else 195 { 196 prepare = MethodCallFactory.create(MethodDeclarations.prepareMethod, gtx, mods, cache.getLocalAddress(), syncCommunications); 197 } 198 199 replicateCall(getMembersOutsideBuddyGroup(), prepare, syncCommunications); 200 } 201 else 202 { 203 if (log.isTraceEnabled()) log.trace("Nothing to broadcast in prepare phase for gtx " + gtx); 204 } 205 } 206 207 private void doCommit(GlobalTransaction gtx) throws Throwable 208 { 209 if (transactionMods.containsKey(gtx)) 210 { 211 if (log.isTraceEnabled()) log.trace("Broadcasting commit for gtx " + gtx); 212 replicateCall(getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod, gtx), syncCommunications); 213 } 214 else 215 { 216 if (log.isTraceEnabled()) log.trace("Nothing to broadcast in commit phase for gtx " + gtx); 217 } 218 } 219 220 private List <Address> getMembersOutsideBuddyGroup() 221 { 222 List <Address> members = new ArrayList <Address>(cache.getMembers()); 223 members.remove(cache.getLocalAddress()); 224 members.removeAll(buddyManager.getBuddyAddresses()); 225 return members; 226 } 227 228 private BackupData remoteBackupGet(Fqn name) throws Exception 229 { 230 231 BackupData result = null; 232 233 Object [] resp = gravitateData(name); 234 235 if (resp[0] != null) 236 { 237 if (log.isTraceEnabled()) 238 { 239 log.trace("Got response " + resp[0]); 240 } 241 242 List nodes = null; 243 if (configuration.isUseRegionBasedMarshalling()) 244 { 245 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 246 try 247 { 248 cache.getRegionManager().setContextClassLoaderAsCurrent(name); 249 250 byte[] nodeData = (byte[]) resp[0]; 251 ByteArrayInputStream bais = new ByteArrayInputStream (nodeData); 252 MarshalledValueInputStream mais = new MarshalledValueInputStream(bais); 253 nodes = (List ) mais.readObject(); 254 mais.close(); 255 } 256 finally 257 { 258 Thread.currentThread().setContextClassLoader(cl); 259 } 260 } 261 else 262 { 263 nodes = (List ) resp[0]; 264 } 265 266 Fqn bkup = (Fqn) resp[1]; 267 result = new BackupData(name, bkup, nodes); 268 } 269 270 return result; 271 } 272 273 private void cleanBackupData(BackupData backup) throws Throwable 274 { 275 287 MethodCall cleanup = MethodCallFactory.create(MethodDeclarations.dataGravitationCleanupMethod, cache.getInvocationContext().getGlobalTransaction(), backup.primaryFqn, backup.backupFqn); 288 289 290 if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.primaryFqn + "]"); 291 GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction(); 292 if (gtx == null) 293 { 294 298 if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.backupFqn + "]"); 299 replicateCall(cache.getMembers(), cleanup, syncCommunications); 302 } 303 else 304 { 305 if (log.isTraceEnabled()) 306 { 307 log.trace("Data gravitation performed under global transaction " + gtx + ". Not broadcasting cleanups until the tx commits. Adding to tx mod list instead."); 308 } 309 transactionMods.put(gtx, cleanup); 310 TransactionEntry te = getTransactionEntry(gtx); 311 te.addModification(cleanup); 312 } 313 } 314 315 private Object [] gravitateData(Fqn fqn) throws Exception 316 { 317 if (log.isTraceEnabled()) 318 { 319 log.trace("cache=" + cache.getLocalAddress() + "; requesting data gravitation for Fqn " + fqn); 320 } 321 List <Address> mbrs = cache.getMembers(); 322 Boolean searchSubtrees = (buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE); 323 Boolean marshal = configuration.isUseRegionBasedMarshalling() ? Boolean.TRUE : Boolean.FALSE; 324 MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees, marshal); 325 List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_FIRST, true, buddyManager.getBuddyCommunicationTimeout()); 326 if (resps == null) 327 { 328 log.error("No replies to call " + dGrav + ". Perhaps we're alone in the cluster?"); 329 return new Object []{null, null}; 330 } 331 else 332 { 333 Iterator i = resps.iterator(); 335 Object result = null; 336 Object backupFqn = null; 337 338 while (i.hasNext()) 339 { 340 Object o = i.next(); 341 if (o instanceof Throwable ) 342 { 343 if (log.isDebugEnabled()) 344 { 345 log.debug("Found remote Throwable among responses - removing from responses list", (Exception ) o); 346 } 347 } 348 else if (o != null) 349 { 350 List dGravResp = (List ) o; 352 if ((Boolean ) dGravResp.get(0)) 354 { 355 result = dGravResp.get(1); 356 backupFqn = dGravResp.get(2); 357 break; 358 } 359 } 360 else if (!configuration.isUseRegionBasedMarshalling()) 361 { 362 log.error("Unexpected null response to call " + dGrav + "."); 366 } 367 368 } 369 370 if (log.isTraceEnabled()) log.trace("got responses " + resps); 371 return new Object []{result, backupFqn}; 372 } 373 } 374 375 private void createNode(boolean localOnly, List nodeData) throws CacheException 376 { 377 Iterator nodes = nodeData.iterator(); 378 379 while (nodes.hasNext()) 380 { 381 NodeData data = (NodeData) nodes.next(); 382 if (localOnly) 383 { 384 if (!cache.getRoot().hasChild(data.getFqn())) 385 { 386 createNodes(data.getFqn(), data.getAttributes()); 387 } 388 } 389 else 390 { 391 cache.put(data.getFqn(), data.getAttributes()); 392 } 393 } 394 } 395 396 private void createNodes(Fqn fqn, Map data) throws CacheException 397 { 398 int treeNodeSize; 399 if ((treeNodeSize = fqn.size()) == 0) return; 400 NodeSPI n = cache.getRoot(); 401 for (int i = 0; i < treeNodeSize; i++) 402 { 403 Object child_name = fqn.get(i); 404 NodeSPI child_node = n.addChildDirect(new Fqn(child_name)); 405 if (child_node == null) 406 { 407 if (log.isTraceEnabled()) 408 { 409 log.trace("failed to find or create child " + child_name + " of node " + n.getFqn()); 410 } 411 return; 412 } 413 if (i == treeNodeSize - 1) 414 { 415 cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true); 417 cache.put(fqn, data); 418 cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(false); 419 } 420 n = child_node; 421 } 422 } 423 424 private TransactionEntry getTransactionEntry(GlobalTransaction gtx) 425 { 426 return cache.getTransactionTable().get(gtx); 427 } 428 429 private Fqn extractFqn(int methodId, Object [] args) 430 { 431 return (Fqn) args[MethodDeclarations.isCrudMethod(methodId) ? 1 : 0]; 432 } 433 434 private boolean localBackupExists(Fqn fqn) 435 { 436 boolean exists = false; 437 438 for (Node node : getBackupRootCollection()) 439 { 440 Fqn newSearchFqn = new Fqn(node.getFqn(), fqn); 441 exists = cache.getRoot().hasChild(newSearchFqn); 442 if (exists) break; 443 } 444 445 return exists; 446 } 447 448 private BackupData localBackupGet(Fqn fqn) throws CacheException 449 { 450 GravitateResult result = cache.gravitateData(fqn, true, false); boolean found = result.getDataFound(); 452 BackupData data = null; 453 454 if (found) 455 { 456 Fqn backupFqn = result.getBuddyBackupRegion(); 457 List nodeData = result.getSubtree(); 458 data = new BackupData(fqn, backupFqn, nodeData); 459 if (buddyManager.isDataGravitationRemoveOnFind()) 461 { 462 cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true); 465 cache.removeNode(backupFqn); 466 } 467 else 468 { 469 cache.evict(backupFqn, true); 470 } 471 } 472 473 return data; 474 } 475 476 private Collection <Node> getBackupRootCollection() 477 { 478 NodeSPI backupRoot = cache.peek(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN); 479 return backupRoot == null ? Collections.EMPTY_SET : backupRoot.getChildrenDirect(); 480 } 481 482 private static class BackupData 483 { 484 Fqn primaryFqn; 485 Fqn backupFqn; 486 List backupData; 487 488 BackupData(Fqn primary, Fqn backup, List data) 489 { 490 this.primaryFqn = primary; 491 this.backupFqn = backup; 492 this.backupData = data; 493 } 494 } 495 496 497 } 498 | Popular Tags |