1 7 package org.jboss.cache.interceptors; 8 9 import org.jboss.cache.CacheImpl; 10 import org.jboss.cache.CacheSPI; 11 import org.jboss.cache.Fqn; 12 import org.jboss.cache.GlobalTransaction; 13 import org.jboss.cache.InvocationContext; 14 import org.jboss.cache.NodeSPI; 15 import org.jboss.cache.TransactionEntry; 16 import org.jboss.cache.TransactionTable; 17 import org.jboss.cache.lock.IsolationLevel; 18 import org.jboss.cache.lock.LockingException; 19 import org.jboss.cache.lock.NodeLock; 20 import org.jboss.cache.lock.TimeoutException; 21 import org.jboss.cache.marshall.MethodCall; 22 import org.jboss.cache.marshall.MethodDeclarations; 23 24 import javax.transaction.Transaction ; 25 import java.util.Collections ; 26 import java.util.Iterator ; 27 import java.util.LinkedList ; 28 import java.util.List ; 29 import java.util.Map ; 30 import java.util.Set ; 31 32 41 public class PessimisticLockInterceptor extends Interceptor 42 { 43 private TransactionTable tx_table = null; 44 45 48 private Map lock_table; 49 private long lock_acquisition_timeout; 50 private LockManager lockManager = new LockManager(); 51 52 53 public void setCache(CacheSPI cache) 54 { 55 super.setCache(cache); 56 tx_table = cache.getTransactionTable(); 57 lock_table = cache.getLockTable(); 58 lock_acquisition_timeout = cache.getConfiguration().getLockAcquisitionTimeout(); 59 } 60 61 62 public Object invoke(MethodCall m) throws Throwable 63 { 64 Fqn fqn = null; 65 NodeLock.LockType lock_type = NodeLock.LockType.NONE; 66 Object [] args = m.getArgs(); 67 InvocationContext ctx = cache.getInvocationContext(); 68 boolean lockNecessary = false; 69 boolean locksAlreadyObtained = false; 70 71 if (log.isTraceEnabled()) log.trace("PessimisticLockInterceptor invoked for method " + m); 72 if (cache.getInvocationContext().getOptionOverrides() != null && cache.getInvocationContext().getOptionOverrides().isSuppressLocking()) 73 { 74 log.trace("Suppressing locking"); 75 switch (m.getMethodId()) 76 { 77 case MethodDeclarations.putDataMethodLocal_id: 78 case MethodDeclarations.putDataEraseMethodLocal_id: 79 case MethodDeclarations.putKeyValMethodLocal_id: 80 log.trace("Creating nodes if necessary"); 81 createNodes((Fqn) args[1], ctx.getGlobalTransaction()); 82 break; 83 } 84 85 return super.invoke(m); 86 } 87 88 91 93 boolean recursive = false; 94 boolean createIfNotExists = false; 95 boolean zeroLockTimeout = false; boolean isDeleteOperation = false; 98 switch (m.getMethodId()) 102 { 103 case MethodDeclarations.moveMethodLocal_id: 104 fqn = (Fqn) args[0]; 105 obtainLocksForMove(fqn, (Fqn) args[1]); 106 locksAlreadyObtained = true; 107 lockNecessary = true; 108 isDeleteOperation = true; 109 break; 110 case MethodDeclarations.putDataMethodLocal_id: 111 case MethodDeclarations.putDataEraseMethodLocal_id: 112 case MethodDeclarations.putKeyValMethodLocal_id: 113 createIfNotExists = true; 114 fqn = (Fqn) args[1]; 115 lock_type = NodeLock.LockType.WRITE; 116 break; 117 case MethodDeclarations.removeNodeMethodLocal_id: 118 isDeleteOperation = true; 119 fqn = (Fqn) args[1]; 120 lock_type = NodeLock.LockType.WRITE; 121 recursive = true; break; 123 case MethodDeclarations.removeKeyMethodLocal_id: 124 case MethodDeclarations.removeDataMethodLocal_id: 125 case MethodDeclarations.addChildMethodLocal_id: 126 fqn = (Fqn) args[1]; 127 lock_type = NodeLock.LockType.WRITE; 128 break; 129 case MethodDeclarations.evictNodeMethodLocal_id: 130 zeroLockTimeout = true; 131 fqn = (Fqn) args[0]; 132 lock_type = NodeLock.LockType.WRITE; 133 break; 134 case MethodDeclarations.getKeyValueMethodLocal_id: 135 case MethodDeclarations.getNodeMethodLocal_id: 136 case MethodDeclarations.getKeysMethodLocal_id: 137 case MethodDeclarations.getChildrenNamesMethodLocal_id: 138 case MethodDeclarations.releaseAllLocksMethodLocal_id: 139 case MethodDeclarations.printMethodLocal_id: 140 fqn = (Fqn) args[0]; 141 lock_type = NodeLock.LockType.READ; 142 break; 143 case MethodDeclarations.lockMethodLocal_id: 144 fqn = (Fqn) args[0]; 145 lock_type = (NodeLock.LockType) args[1]; 146 recursive = (Boolean ) args[2]; 147 break; 148 case MethodDeclarations.commitMethod_id: 149 commit(ctx.getGlobalTransaction()); 151 break; 152 case MethodDeclarations.rollbackMethod_id: 153 rollback(ctx.getGlobalTransaction()); 155 break; 156 default: 157 if (isOnePhaseCommitPrepareMehod(m)) 158 { 159 commit(ctx.getGlobalTransaction()); 161 } 162 break; 163 } 164 165 if (fqn != null) 170 { 171 if (!locksAlreadyObtained) 172 { 173 do 174 { 175 lock(fqn, ctx.getGlobalTransaction(), lock_type, recursive, createIfNotExists, zeroLockTimeout ? 0 : lock_acquisition_timeout, isDeleteOperation); 176 } 177 while (createIfNotExists && !cache.getRoot().hasChild(fqn)); } 179 } 180 else if (!lockNecessary) 181 { 182 if (log.isTraceEnabled()) 183 { 184 log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking"); 185 } 186 } 187 if (m.getMethodId() == MethodDeclarations.lockMethodLocal_id) 188 { 189 return null; 190 } 191 Object o = super.invoke(m); 192 if (isDeleteOperation && ctx.getGlobalTransaction() == null) 195 { 196 NodeSPI n = cache.peek(fqn); 200 if (n != null) 201 { 202 lockManager.getLock(n).releaseAll(Thread.currentThread()); 203 } 204 ((CacheImpl) cache).realRemove(fqn, true); 205 206 } 207 return o; 208 } 209 210 private void obtainLocksForMove(Fqn node, Fqn parent) throws InterruptedException 211 { 212 215 if (log.isTraceEnabled()) log.trace("Attempting to get WL on node to be moved [" + node + "]"); 217 lock(node, cache.getInvocationContext().getGlobalTransaction(), NodeLock.LockType.WRITE, true, false, lock_acquisition_timeout, true); 218 219 if (log.isTraceEnabled()) log.trace("Attempting to get RL on new parent [" + parent + "]"); 221 lock(parent, cache.getInvocationContext().getGlobalTransaction(), NodeLock.LockType.READ, true, false, lock_acquisition_timeout, false); 222 } 223 224 225 233 private void lock(Fqn fqn, GlobalTransaction gtx, NodeLock.LockType lock_type, boolean recursive, boolean createIfNotExists, long timeout, boolean isDeleteOperation) 234 throws TimeoutException, LockingException, InterruptedException 235 { 236 NodeSPI n; 237 NodeSPI child_node; 238 Object child_name; 239 Thread currentThread = Thread.currentThread(); 240 Object owner = (gtx != null) ? gtx : currentThread; 241 int treeNodeSize; 242 243 if (log.isTraceEnabled()) log.trace("Attempting to lock node " + fqn + " for owner " + owner); 244 245 if (fqn == null) 246 { 247 log.error("fqn is null - this should not be the case"); 248 return; 249 } 250 251 if (fqn.isRoot()) 252 { 253 return; 254 } 255 256 if (configuration.getIsolationLevel() == IsolationLevel.NONE) 257 { 258 lock_type = NodeLock.LockType.NONE; 259 } 260 261 n = cache.getRoot(); 262 treeNodeSize = fqn.size(); 263 for (int i = -1; i < treeNodeSize; i++) 264 { 265 if (i == -1) 266 { 267 child_name = Fqn.ROOT.getLastElement(); 269 child_node = n; 270 } 271 else 272 { 273 child_name = fqn.get(i); 274 child_node = n.getChildDirect(child_name); 275 } 276 281 if (child_node == null && createIfNotExists) 282 { 283 child_node = n.addChildDirect(new Fqn(child_name)); 284 } 285 286 if (child_node == null) 287 { 288 if (log.isTraceEnabled()) 289 { 290 log.trace("failed to find or create child " + child_name + " of node " + n); 291 } 292 return; 293 } 294 295 NodeLock.LockType lockTypeRequired; 296 if (lock_type == NodeLock.LockType.NONE) 297 { 298 n = child_node; 300 continue; 301 } 302 else 303 { 304 if (writeLockNeeded(lock_type, i, treeNodeSize, isDeleteOperation, createIfNotExists, fqn, child_node.getFqn())) 305 { 306 lockTypeRequired = NodeLock.LockType.WRITE; 309 310 } 311 else 312 { 313 lockTypeRequired = NodeLock.LockType.READ; 316 } 317 } 318 319 if (gtx != null && needToReverseRemove(child_node, tx_table.get(gtx), lock_type, isDeleteOperation, createIfNotExists)) 321 { 322 reverseRemove(child_node); 323 } 324 325 acquireNodeLock(child_node, owner, gtx, lockTypeRequired, timeout); 326 327 if (recursive && isTargetNode(i, treeNodeSize)) 328 { 329 Set acquired_locks = lockManager.acquireAll(child_node, owner, lock_type, timeout); 331 if (acquired_locks.size() > 0) 332 { 333 if (gtx != null) 334 { 335 cache.getTransactionTable().addLocks(gtx, acquired_locks); 336 } 337 else 338 { 339 List locks = getLocks(currentThread); 340 locks.addAll(acquired_locks); 341 } 342 } 343 } 344 n = child_node; 345 } 346 347 if (isDeleteOperation && gtx != null) cache.getTransactionTable().get(gtx).addRemovedNode(fqn); 349 } 350 351 private boolean needToReverseRemove(NodeSPI n, TransactionEntry te, NodeLock.LockType lockTypeRequested, boolean isRemoveOperation, boolean createIfNotExists) 352 { 353 return !isRemoveOperation && createIfNotExists && lockTypeRequested == NodeLock.LockType.WRITE && n.isDeleted() && te.getRemovedNodes().contains(n.getFqn()); 354 } 355 356 private void reverseRemove(NodeSPI n) 357 { 358 n.markAsDeleted(false); 359 } 360 361 private boolean writeLockNeeded(NodeLock.LockType lock_type, int currentNodeIndex, int treeNodeSize, boolean isRemoveOperation, boolean createIfNotExists, Fqn targetFqn, Fqn currentFqn) 362 { 363 if (isRemoveOperation && currentNodeIndex == treeNodeSize - 2) 364 { 365 return true; } 367 368 if (!isTargetNode(currentNodeIndex, treeNodeSize) && !cache.getRoot().hasChild(new Fqn(currentFqn, targetFqn.get(currentNodeIndex + 1)))) 369 { 370 return createIfNotExists; } 372 373 return lock_type == NodeLock.LockType.WRITE && isTargetNode(currentNodeIndex, treeNodeSize) && (createIfNotExists || isRemoveOperation); } 375 376 private boolean isTargetNode(int nodePosition, int treeNodeSize) 377 { 378 return nodePosition == (treeNodeSize - 1); 379 } 380 381 private void acquireNodeLock(NodeSPI node, Object owner, GlobalTransaction gtx, NodeLock.LockType lock_type, long lock_timeout) throws LockingException, TimeoutException, InterruptedException 382 { 383 boolean acquired = lockManager.acquire(node, owner, lock_type, lock_timeout); 384 if (acquired) 385 { 386 recordNodeLock(gtx, lockManager.getLock(node)); 388 } 389 } 390 391 private void recordNodeLock(GlobalTransaction gtx, NodeLock lock) 392 { 393 if (gtx != null) 394 { 395 cache.getTransactionTable().addLock(gtx, lock); 398 } 399 else 400 { 401 Thread currentThread = Thread.currentThread(); 402 List locks = getLocks(currentThread); 403 if (!locks.contains(lock)) 404 { 405 locks.add(lock); 406 lock_table.put(currentThread, locks); 407 } 408 } 409 } 410 411 private List getLocks(Thread currentThread) 412 { 413 List locks = (List ) lock_table.get(currentThread); 416 if (locks == null) 417 { 418 locks = Collections.synchronizedList(new LinkedList ()); 419 lock_table.put(currentThread, locks); 420 } 421 return locks; 422 } 423 424 425 private void createNodes(Fqn fqn, GlobalTransaction gtx) 426 { 427 int treeNodeSize; 428 if ((treeNodeSize = fqn.size()) == 0) return; 429 NodeSPI n = cache.getRoot(); 430 for (int i = 0; i < treeNodeSize; i++) 431 { 432 Object child_name = fqn.get(i); 433 Fqn childFqn = new Fqn(child_name); 434 435 NodeSPI child_node = n.getChildDirect(childFqn); 436 if (child_node == null) child_node = n.addChildDirect(childFqn); 437 if (gtx != null && needToReverseRemove(child_node, tx_table.get(gtx), NodeLock.LockType.WRITE, false, true)) 440 { 441 reverseRemove(child_node); 442 } 443 444 if (child_node == null) 445 { 446 if (log.isTraceEnabled()) 447 { 448 log.trace("failed to find or create child " + child_name + " of node " + n.getFqn()); 449 } 450 return; 451 } 452 n = child_node; 453 } 454 } 455 456 461 private void commit(GlobalTransaction gtx) 462 { 463 if (log.isTraceEnabled()) 464 { 465 log.trace("committing cache with gtx " + gtx); 466 } 467 468 TransactionEntry entry = tx_table.get(gtx); 469 if (entry == null) 470 { 471 log.error("entry for transaction " + gtx + " not found (maybe already committed)"); 472 return; 473 } 474 475 Iterator removedNodes = entry.getRemovedNodes().iterator(); 477 CacheImpl tcpi = (CacheImpl) cache; 478 while (removedNodes.hasNext()) 479 { 480 Fqn f = (Fqn) removedNodes.next(); 481 tcpi.realRemove(f, false); 482 } 483 484 entry.releaseAllLocksLIFO(gtx); 486 487 Transaction ltx = entry.getTransaction(); 488 if (log.isTraceEnabled()) 489 { 490 log.trace("removing local transaction " + ltx + " and global transaction " + gtx); 491 } 492 tx_table.remove(ltx); 493 tx_table.remove(gtx); 494 } 495 496 497 508 private void rollback(GlobalTransaction tx) 509 { 510 TransactionEntry entry = tx_table.get(tx); 511 512 if (log.isTraceEnabled()) 513 { 514 log.trace("called to rollback cache with GlobalTransaction=" + tx); 515 } 516 517 if (entry == null) 518 { 519 log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)"); 520 return; 521 } 522 523 524 Iterator removedNodes = entry.getRemovedNodes().iterator(); 525 CacheImpl tcpi = (CacheImpl) cache; 526 while (removedNodes.hasNext()) 527 { 528 Fqn f = (Fqn) removedNodes.next(); 529 tcpi.realRemove(f, false); 530 531 } 532 533 entry.undoOperations(cache); 535 536 539 551 entry.releaseAllLocksLIFO(tx); 555 556 Transaction ltx = entry.getTransaction(); 557 if (log.isTraceEnabled()) 558 { 559 log.trace("removing local transaction " + ltx + " and global transaction " + tx); 560 } 561 tx_table.remove(ltx); 562 tx_table.remove(tx); 563 } 564 565 private static class LockManager 566 { 567 boolean acquire(NodeSPI node, Object owner, NodeLock.LockType lockType, long timeout) throws InterruptedException 568 { 569 return getLock(node).acquire(owner, timeout, lockType); 570 } 571 572 NodeLock getLock(NodeSPI node) 573 { 574 return node.getLock(); 575 } 576 577 public Set acquireAll(NodeSPI node, Object owner, NodeLock.LockType lockType, long timeout) throws InterruptedException 578 { 579 return getLock(node).acquireAll(owner, timeout, lockType); 580 } 581 } 582 583 } 584 | Popular Tags |