1 7 package org.jboss.cache.interceptors; 8 9 import org.jboss.cache.*; 10 import org.jboss.cache.lock.IdentityLock; 11 import org.jboss.cache.lock.LockingException; 12 import org.jboss.cache.lock.TimeoutException; 13 import org.jgroups.blocks.MethodCall; 14 15 import javax.transaction.Status ; 16 import javax.transaction.Synchronization ; 17 import javax.transaction.Transaction ; 18 import javax.transaction.TransactionManager ; 19 import java.lang.reflect.Method ; 20 import java.util.*; 21 22 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock; 23 24 31 public class LockInterceptor extends Interceptor { 32 private TransactionManager tx_mgr=null; 33 TransactionTable tx_table=null; 34 HashMap lock_table; 35 private long lock_acquisition_timeout; 36 37 ReentrantLock create_lock=new ReentrantLock(); 38 39 40 private List transactions=Collections.synchronizedList(new ArrayList()); 41 42 final static int NONE = 0; 43 final static int READ = 1; 44 final static int WRITE = 2; 45 46 47 public void setCache(TreeCache cache) { 48 super.setCache(cache); 49 tx_mgr=cache.getTransactionManager(); 50 tx_table=cache.getTransactionTable(); 51 lock_table=cache.getLockTable(); 52 lock_acquisition_timeout=cache.getLockAcquisitionTimeout(); 53 } 54 55 56 57 public Object invoke(MethodCall m) throws Throwable { 58 Transaction tx=null; 59 GlobalTransaction gtx=null; 60 Object retval=null; 61 Fqn fqn=null; 62 int lock_type=NONE; 63 long lock_timeout=lock_acquisition_timeout; 64 Method meth=m.getMethod(); 65 Object [] args=m.getArgs(); 66 67 70 List locks=null; 71 72 boolean recursive=false; 73 boolean createIfNotExists=false; 74 75 if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { if(!transactions.contains(tx)) { 77 gtx=cache.getCurrentTransaction(tx); 78 if(gtx == null) 79 throw new Exception ("failed to get global transaction"); 80 try { 81 OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx); 82 SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache); 83 handler.registerAtTail(myHandler); transactions.add(tx); 85 } 86 catch(Exception e) { 87 log.error("registration for tx=" + tx + " with transaction manager failed, running without TX", e); 88 } 89 } 90 else { 91 gtx=cache.getTransactionTable().get(tx); 92 } 93 } 94 else { locks=(List)lock_table.get(Thread.currentThread()); 97 if(locks == null) { 98 locks=new LinkedList(); 99 lock_table.put(Thread.currentThread(), locks); 100 } 101 } 102 103 if(meth.equals(TreeCache.putDataMethodLocal) || meth.equals(TreeCache.putDataEraseMethodLocal) || 107 meth.equals(TreeCache.putKeyValMethodLocal) || meth.equals(TreeCache.putFailFastKeyValueMethodLocal)) { 108 createIfNotExists=true; 109 fqn=(Fqn)args[1]; 110 lock_type=WRITE; 111 if(meth.equals(TreeCache.putFailFastKeyValueMethodLocal)) 112 lock_timeout=((Long )args[5]).longValue(); 113 } 114 else if(meth.equals(TreeCache.removeNodeMethodLocal)) { 115 fqn=(Fqn)args[1]; 116 lock_type=WRITE; 117 recursive=true; } 119 else if(meth.equals(TreeCache.removeKeyMethodLocal) || meth.equals(TreeCache.removeDataMethodLocal)) { 120 fqn=(Fqn)args[1]; 121 lock_type=WRITE; 122 } 123 else if(meth.equals(TreeCache.evictNodeMethodLocal)) { 124 fqn=(Fqn)args[0]; 125 lock_type=WRITE; 126 } 127 else if(meth.equals(TreeCache.addChildMethodLocal)) { 128 fqn=(Fqn)args[1]; 129 lock_type=WRITE; 130 } 131 else if(meth.equals(TreeCache.getKeyValueMethodLocal)) { 132 fqn=(Fqn)args[0]; 133 lock_type=READ; 134 } 135 else if(meth.equals(TreeCache.getNodeMethodLocal)) { 136 fqn=(Fqn)args[0]; 137 lock_type=READ; 138 } 139 else if(meth.equals(TreeCache.getKeysMethodLocal)) { 140 fqn=(Fqn)args[0]; 141 lock_type=READ; 142 } 143 else if(meth.equals(TreeCache.getChildrenNamesMethodLocal) || meth.equals(TreeCache.releaseAllLocksMethodLocal) || 144 meth.equals(TreeCache.printMethodLocal)) { 145 fqn=(Fqn)args[0]; 146 lock_type=READ; 147 } 148 else if(meth.equals(TreeCache.lockMethodLocal)) { 149 fqn=(Fqn)args[0]; 150 lock_type=((Integer )args[1]).intValue(); 151 recursive=((Boolean )args[2]).booleanValue(); 152 } 153 154 155 if(fqn != null) { 160 if(createIfNotExists) { 161 do { 162 lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists); 163 } 164 while(cache.exists(fqn) == false); } 167 else 168 lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists); 169 } 170 else { 171 if(log.isTraceEnabled()) 172 log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking"); 173 } 174 if(meth.equals(TreeCache.lockMethodLocal)) 175 return null; 176 retval=super.invoke(m); 177 return retval; 178 } 179 180 181 182 190 private void lock(Fqn fqn, GlobalTransaction gtx, int lock_type, List locks, boolean recursive, 191 long lock_timeout, boolean createIfNotExists) 192 throws TimeoutException, LockingException, InterruptedException { 193 Node n, child_node=null; 194 Object child_name; 195 Fqn tmp_fqn=new Fqn(); 196 int treeNodeSize; 197 Object owner=gtx != null? gtx : (Object )Thread.currentThread(); 198 boolean acquired=false; 199 200 if(fqn == null) { 201 log.error("fqn is null - this should not be the case"); 202 return; 203 } 204 205 if((treeNodeSize=fqn.size()) == 0) 206 return; 207 208 n=cache.getRoot(); 209 for(int i=0; i < treeNodeSize; i++) { 210 child_name=fqn.get(i); 211 tmp_fqn=new Fqn(tmp_fqn, child_name); 212 if(createIfNotExists) 213 create_lock.acquire(); 214 try { 215 child_node=n.getChild(child_name); 216 if(child_node == null) { 217 if(createIfNotExists) { 218 child_node=n.createChild(child_name, tmp_fqn, n); 219 if(log.isTraceEnabled()) 220 log.trace("created child " + child_name); 221 if(gtx != null) { 222 cache.addNode(gtx, (Fqn)tmp_fqn.clone()); 225 } 226 create_lock.release(); 227 cache.notifyNodeCreated(tmp_fqn); 228 } 229 else { 230 if(log.isTraceEnabled()) 231 log.trace("failed finding child " + child_name + " of node " + n.getFqn()); 232 return; 233 } 234 } 235 } 236 finally { 237 if(create_lock.holds() > 0) 238 create_lock.release(); 239 } 240 241 if(lock_type == NONE) { 242 ; 243 } 244 else { 245 if(lock_type == WRITE && i == (treeNodeSize - 1)) { 246 acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_WRITE); 247 } 248 else { 249 acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_READ); 250 } 251 } 252 253 if(acquired) { 254 if(gtx != null) { 255 cache.getTransactionTable().addLock(gtx, child_node.getLock()); 258 } 259 else { 260 IdentityLock l=child_node.getLock(); 261 if(!locks.contains(l)) 262 locks.add(l); 263 } 264 } 265 266 if(recursive && i == (treeNodeSize - 1)) { 267 Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type); 268 if(acquired_locks.size() > 0) { 269 if(gtx != null) { 270 cache.getTransactionTable().addLocks(gtx, acquired_locks); 271 } 272 else { 273 locks.addAll(acquired_locks); 274 } 275 } 276 } 277 278 n=child_node; 279 } 280 } 281 282 283 284 296 297 301 private void commit(GlobalTransaction gtx) { 302 if(log.isTraceEnabled()) 303 log.trace("committing cache with gtx " + gtx); 304 305 TransactionEntry entry=tx_table.get(gtx); 306 if(entry == null) { 307 log.error("entry for transaction " + gtx + " not found (maybe already committed)"); 308 return; 309 } 310 311 List list=new LinkedList(entry.getLocks()); 313 for(int i=list.size() - 1; i >= 0; i--) { 314 IdentityLock lock=(IdentityLock)list.get(i); 315 if(log.isTraceEnabled()) 316 log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")"); 317 lock.release(gtx); 318 } 319 entry.getLocks().clear(); 320 321 Transaction ltx=entry.getTransaction(); 322 if(log.isTraceEnabled()) 323 log.trace("removing local transaction " + ltx + " and global transaction " + gtx); 324 tx_table.remove(ltx); 325 tx_table.remove(gtx); 326 } 327 328 329 340 private void rollback(GlobalTransaction tx) { 341 List undo_ops; 342 TransactionEntry entry=tx_table.get(tx); 343 MethodCall undo_op; 344 Object retval; 345 Fqn node_name; 346 347 if(log.isTraceEnabled()) 348 log.trace("called to rollback cache with GlobalTransaction=" + tx); 349 350 if(entry == null) { 351 log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)"); 352 return; 353 } 354 355 undo_ops=new LinkedList(entry.getUndoOperations()); 357 for(ListIterator it=undo_ops.listIterator(undo_ops.size()); it.hasPrevious();) { 358 undo_op=(MethodCall)it.previous(); 359 try { 360 retval=undo_op.invoke(cache); 361 if(retval != null && retval instanceof Throwable ) { 362 log.error("undo operation failed, error=" + retval); 363 } 364 } 365 catch(Throwable t) { 366 log.error("undo operation failed", t); 367 } 368 } 369 370 for(ListIterator it=new LinkedList(entry.getNodes()).listIterator(entry.getNodes().size()); 372 it.hasPrevious();) { 373 node_name=(Fqn)it.previous(); 374 try { 375 cache._remove(tx, node_name, false); 376 } 377 catch(Throwable t) { 378 log.error("failed removing node \"" + node_name + "\"", t); 379 } 380 } 381 382 383 List list=new LinkedList(entry.getLocks()); 387 for(int i=list.size() - 1; i >= 0; i--) { 388 IdentityLock lock=(IdentityLock)list.get(i); 389 if(log.isTraceEnabled()) 390 log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")"); 391 lock.release(tx); 392 } 393 entry.getLocks().clear(); 394 395 Transaction ltx=entry.getTransaction(); 396 if(log.isTraceEnabled()) 397 log.trace("removing local transaction " + ltx + " and global transaction " + tx); 398 tx_table.remove(ltx); 399 tx_table.remove(tx); 400 } 401 402 403 404 class SynchronizationHandler implements Synchronization { 405 GlobalTransaction gtx=null; 406 Transaction tx=null; 407 TreeCache cache=null; 408 409 410 SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) { 411 this.gtx=gtx; 412 this.cache=cache; 413 this.tx=tx; 414 } 415 416 417 422 public void beforeCompletion() { 423 } 424 425 426 431 public void afterCompletion(int status) { 432 transactions.remove(tx); 433 switch(status) { 434 case Status.STATUS_COMMITTED: 435 commit(gtx); 436 break; 437 438 case Status.STATUS_MARKED_ROLLBACK: case Status.STATUS_ROLLEDBACK: 440 if(log.isDebugEnabled()) 441 log.debug("rolling back transaction"); 442 rollback(gtx); break; 444 default: 445 rollback(gtx); throw new IllegalStateException ("failed rolling back transaction: " + status); 447 } 448 } 449 } 450 451 452 } 453 | Popular Tags |