KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > PessimisticLockInterceptor


1 /*
2  * JBoss, Home of Professional Open Source
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.interceptors;
8
9 import org.jboss.cache.CacheImpl;
10 import org.jboss.cache.CacheSPI;
11 import org.jboss.cache.Fqn;
12 import org.jboss.cache.GlobalTransaction;
13 import org.jboss.cache.InvocationContext;
14 import org.jboss.cache.NodeSPI;
15 import org.jboss.cache.TransactionEntry;
16 import org.jboss.cache.TransactionTable;
17 import org.jboss.cache.lock.IsolationLevel;
18 import org.jboss.cache.lock.LockingException;
19 import org.jboss.cache.lock.NodeLock;
20 import org.jboss.cache.lock.TimeoutException;
21 import org.jboss.cache.marshall.MethodCall;
22 import org.jboss.cache.marshall.MethodDeclarations;
23
24 import javax.transaction.Transaction JavaDoc;
25 import java.util.Collections JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.LinkedList JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.Set JavaDoc;
31
32 /**
33  * An interceptor that handles locking. When a TX is associated, we register
34  * for TX completion and unlock the locks acquired within the scope of the TX.
35  * When no TX is present, we keep track of the locks acquired during the
36  * current method and unlock when the method returns.
37  *
38  * @author Bela Ban
39  * @version $Id: PessimisticLockInterceptor.java,v 1.42 2007/01/04 05:35:37 msurtani Exp $
40  */

41 public class PessimisticLockInterceptor extends Interceptor
42 {
43    private TransactionTable tx_table = null;
44
45    /**
46     * Map<Object, java.util.List>. Keys = threads, values = lists of locks held by that thread
47     */

48    private Map JavaDoc lock_table;
49    private long lock_acquisition_timeout;
50    private LockManager lockManager = new LockManager();
51
52
53    public void setCache(CacheSPI cache)
54    {
55       super.setCache(cache);
56       tx_table = cache.getTransactionTable();
57       lock_table = cache.getLockTable();
58       lock_acquisition_timeout = cache.getConfiguration().getLockAcquisitionTimeout();
59    }
60
61
62    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc
63    {
64       Fqn fqn = null;
65       NodeLock.LockType lock_type = NodeLock.LockType.NONE;
66       Object JavaDoc[] args = m.getArgs();
67       InvocationContext ctx = cache.getInvocationContext();
68       boolean lockNecessary = false;
69       boolean locksAlreadyObtained = false;
70
71       if (log.isTraceEnabled()) log.trace("PessimisticLockInterceptor invoked for method " + m);
72       if (cache.getInvocationContext().getOptionOverrides() != null && cache.getInvocationContext().getOptionOverrides().isSuppressLocking())
73       {
74          log.trace("Suppressing locking");
75          switch (m.getMethodId())
76          {
77             case MethodDeclarations.putDataMethodLocal_id:
78             case MethodDeclarations.putDataEraseMethodLocal_id:
79             case MethodDeclarations.putKeyValMethodLocal_id:
80                log.trace("Creating nodes if necessary");
81                createNodes((Fqn) args[1], ctx.getGlobalTransaction());
82                break;
83          }
84
85          return super.invoke(m);
86       }
87
88       /** List<IdentityLock> locks. Locks acquired during the current method; will be released later by UnlockInterceptor.
89        * This list is only populated when there is no TX, otherwise the TransactionTable maintains the locks
90        * (keyed by TX) */

91       // List locks=null;
92

93       boolean recursive = false;
94       boolean createIfNotExists = false;
95       boolean zeroLockTimeout = false;// only used if the call is an evict() call. See JBCACHE-794
96
boolean isDeleteOperation = false;// needed for JBCACHE-871
97

98       // 1. Determine the type of lock (read, write, or none) depending on the method. If no lock is required, invoke
99
// the method, then return immediately
100
// Set the Fqn
101
switch (m.getMethodId())
102       {
103          case MethodDeclarations.moveMethodLocal_id:
104             fqn = (Fqn) args[0];
105             obtainLocksForMove(fqn, (Fqn) args[1]);
106             locksAlreadyObtained = true;
107             lockNecessary = true;
108             isDeleteOperation = true;
109             break;
110          case MethodDeclarations.putDataMethodLocal_id:
111          case MethodDeclarations.putDataEraseMethodLocal_id:
112          case MethodDeclarations.putKeyValMethodLocal_id:
113             createIfNotExists = true;
114             fqn = (Fqn) args[1];
115             lock_type = NodeLock.LockType.WRITE;
116             break;
117          case MethodDeclarations.removeNodeMethodLocal_id:
118             isDeleteOperation = true;
119             fqn = (Fqn) args[1];
120             lock_type = NodeLock.LockType.WRITE;
121             recursive = true;// remove node and *all* child nodes
122
break;
123          case MethodDeclarations.removeKeyMethodLocal_id:
124          case MethodDeclarations.removeDataMethodLocal_id:
125          case MethodDeclarations.addChildMethodLocal_id:
126             fqn = (Fqn) args[1];
127             lock_type = NodeLock.LockType.WRITE;
128             break;
129          case MethodDeclarations.evictNodeMethodLocal_id:
130             zeroLockTimeout = true;
131             fqn = (Fqn) args[0];
132             lock_type = NodeLock.LockType.WRITE;
133             break;
134          case MethodDeclarations.getKeyValueMethodLocal_id:
135          case MethodDeclarations.getNodeMethodLocal_id:
136          case MethodDeclarations.getKeysMethodLocal_id:
137          case MethodDeclarations.getChildrenNamesMethodLocal_id:
138          case MethodDeclarations.releaseAllLocksMethodLocal_id:
139          case MethodDeclarations.printMethodLocal_id:
140             fqn = (Fqn) args[0];
141             lock_type = NodeLock.LockType.READ;
142             break;
143          case MethodDeclarations.lockMethodLocal_id:
144             fqn = (Fqn) args[0];
145             lock_type = (NodeLock.LockType) args[1];
146             recursive = (Boolean JavaDoc) args[2];
147             break;
148          case MethodDeclarations.commitMethod_id:
149             // commit propagated up from the tx interceptor
150
commit(ctx.getGlobalTransaction());
151             break;
152          case MethodDeclarations.rollbackMethod_id:
153             // rollback propagated up from the tx interceptor
154
rollback(ctx.getGlobalTransaction());
155             break;
156          default:
157             if (isOnePhaseCommitPrepareMehod(m))
158             {
159                // commit propagated up from the tx interceptor
160
commit(ctx.getGlobalTransaction());
161             }
162             break;
163       }
164
165       // Lock the node (must be either read or write if we get here)
166
// If no TX: add each acquired lock to the list of locks for this method (locks)
167
// If TX: [merge code from TransactionInterceptor]: register with TxManager, on commit/rollback,
168
// release the locks for the given TX
169
if (fqn != null)
170       {
171          if (!locksAlreadyObtained)
172          {
173             do
174             {
175                lock(fqn, ctx.getGlobalTransaction(), lock_type, recursive, createIfNotExists, zeroLockTimeout ? 0 : lock_acquisition_timeout, isDeleteOperation);
176             }
177             while (createIfNotExists && !cache.getRoot().hasChild(fqn));// keep trying until we have the lock (fixes concurrent remove())
178
}
179       }
180       else if (!lockNecessary)
181       {
182          if (log.isTraceEnabled())
183          {
184             log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking");
185          }
186       }
187       if (m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
188       {
189          return null;
190       }
191       Object JavaDoc o = super.invoke(m);
192       // FIXME this should be done in UnlockInterceptor, but I didn't want
193
// to add the removedNodes map to CacheImpl
194
if (isDeleteOperation && ctx.getGlobalTransaction() == null)
195       {
196          //cache.getRemovedNodesMap().remove(fqn);
197
//cache.peek(fqn);
198
// do a REAL remove here.
199
NodeSPI n = cache.peek(fqn);
200          if (n != null)
201          {
202             lockManager.getLock(n).releaseAll(Thread.currentThread());
203          }
204          ((CacheImpl) cache).realRemove(fqn, true);
205
206       }
207       return o;
208    }
209
210    private void obtainLocksForMove(Fqn node, Fqn parent) throws InterruptedException JavaDoc
211    {
212       // parent node (new parent) and current node's existing parent should both get RLs.
213
// node should have a WL.
214

215       // this call will ensure the node gets a WL and it's current parent gets RL.
216
if (log.isTraceEnabled()) log.trace("Attempting to get WL on node to be moved [" + node + "]");
217       lock(node, cache.getInvocationContext().getGlobalTransaction(), NodeLock.LockType.WRITE, true, false, lock_acquisition_timeout, true);
218
219       //now for an RL for the new parent.
220
if (log.isTraceEnabled()) log.trace("Attempting to get RL on new parent [" + parent + "]");
221       lock(parent, cache.getInvocationContext().getGlobalTransaction(), NodeLock.LockType.READ, true, false, lock_acquisition_timeout, false);
222    }
223
224
225    /**
226     * Locks a given node.
227     *
228     * @param fqn
229     * @param gtx
230     * @param lock_type DataNode.LOCK_TYPE_READ, DataNode.LOCK_TYPE_WRITE or DataNode.LOCK_TYPE_NONE
231     * @param recursive Lock children recursively
232     */

233    private void lock(Fqn fqn, GlobalTransaction gtx, NodeLock.LockType lock_type, boolean recursive, boolean createIfNotExists, long timeout, boolean isDeleteOperation)
234            throws TimeoutException, LockingException, InterruptedException JavaDoc
235    {
236       NodeSPI n;
237       NodeSPI child_node;
238       Object JavaDoc child_name;
239       Thread JavaDoc currentThread = Thread.currentThread();
240       Object JavaDoc owner = (gtx != null) ? gtx : currentThread;
241       int treeNodeSize;
242
243       if (log.isTraceEnabled()) log.trace("Attempting to lock node " + fqn + " for owner " + owner);
244
245       if (fqn == null)
246       {
247          log.error("fqn is null - this should not be the case");
248          return;
249       }
250
251       if (fqn.isRoot())
252       {
253          return;
254       }
255
256       if (configuration.getIsolationLevel() == IsolationLevel.NONE)
257       {
258          lock_type = NodeLock.LockType.NONE;
259       }
260
261       n = cache.getRoot();
262       treeNodeSize = fqn.size();
263       for (int i = -1; i < treeNodeSize; i++)
264       {
265          if (i == -1)
266          {
267             // this is the root node
268
child_name = Fqn.ROOT.getLastElement();
269             child_node = n;
270          }
271          else
272          {
273             child_name = fqn.get(i);
274             child_node = n.getChildDirect(child_name);
275          }
276          /*
277          cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
278          Fqn childFqn = new Fqn(child_name);
279          child_node = n.getChild(new Fqn(child_name));
280          */

281          if (child_node == null && createIfNotExists)
282          {
283             child_node = n.addChildDirect(new Fqn(child_name));
284          }
285
286          if (child_node == null)
287          {
288             if (log.isTraceEnabled())
289             {
290                log.trace("failed to find or create child " + child_name + " of node " + n);
291             }
292             return;
293          }
294
295          NodeLock.LockType lockTypeRequired;
296          if (lock_type == NodeLock.LockType.NONE)
297          {
298             // acquired=false;
299
n = child_node;
300             continue;
301          }
302          else
303          {
304             if (writeLockNeeded(lock_type, i, treeNodeSize, isDeleteOperation, createIfNotExists, fqn, child_node.getFqn()))
305             {
306                //acquired=child_node.acquire(owner, lock_timeout, DataNode.LOCK_TYPE_WRITE);
307
//acquired = lockManager.acquire(child_node, owner, NodeLock.LockType.WRITE, timeout);
308
lockTypeRequired = NodeLock.LockType.WRITE;
309
310             }
311             else
312             {
313                //acquired=child_node.acquire(owner, lock_timeout, DataNode.LOCK_TYPE_READ);
314
//acquired = lockManager.acquire(child_node, owner, NodeLock.LockType.READ, timeout);
315
lockTypeRequired = NodeLock.LockType.READ;
316             }
317          }
318
319          // reverse the "remove" if the node has been previously removed in the same tx, if this operation is a put()
320
if (gtx != null && needToReverseRemove(child_node, tx_table.get(gtx), lock_type, isDeleteOperation, createIfNotExists))
321          {
322             reverseRemove(child_node);
323          }
324
325          acquireNodeLock(child_node, owner, gtx, lockTypeRequired, timeout);
326
327          if (recursive && isTargetNode(i, treeNodeSize))
328          {
329             //Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type);
330
Set JavaDoc acquired_locks = lockManager.acquireAll(child_node, owner, lock_type, timeout);
331             if (acquired_locks.size() > 0)
332             {
333                if (gtx != null)
334                {
335                   cache.getTransactionTable().addLocks(gtx, acquired_locks);
336                }
337                else
338                {
339                   List JavaDoc locks = getLocks(currentThread);
340                   locks.addAll(acquired_locks);
341                }
342             }
343          }
344          n = child_node;
345       }
346
347       // Add the Fqn to be removed to the transaction entry so we can clean up after ourselves during commit/rollback
348
if (isDeleteOperation && gtx != null) cache.getTransactionTable().get(gtx).addRemovedNode(fqn);
349    }
350
351    private boolean needToReverseRemove(NodeSPI n, TransactionEntry te, NodeLock.LockType lockTypeRequested, boolean isRemoveOperation, boolean createIfNotExists)
352    {
353       return !isRemoveOperation && createIfNotExists && lockTypeRequested == NodeLock.LockType.WRITE && n.isDeleted() && te.getRemovedNodes().contains(n.getFqn());
354    }
355
356    private void reverseRemove(NodeSPI n)
357    {
358       n.markAsDeleted(false);
359    }
360
361    private boolean writeLockNeeded(NodeLock.LockType lock_type, int currentNodeIndex, int treeNodeSize, boolean isRemoveOperation, boolean createIfNotExists, Fqn targetFqn, Fqn currentFqn)
362    {
363       if (isRemoveOperation && currentNodeIndex == treeNodeSize - 2)
364       {
365          return true;// we're doing a remove and we've reached the PARENT node of the target to be removed.
366
}
367
368       if (!isTargetNode(currentNodeIndex, treeNodeSize) && !cache.getRoot().hasChild(new Fqn(currentFqn, targetFqn.get(currentNodeIndex + 1))))
369       {
370          return createIfNotExists;// we're at a node in the tree, not yet at the target node, and we need to create the next node. So we need a WL here.
371
}
372
373       return lock_type == NodeLock.LockType.WRITE && isTargetNode(currentNodeIndex, treeNodeSize) && (createIfNotExists || isRemoveOperation);//normal operation, write lock explicitly requested and this is the target to be written to.
374
}
375
376    private boolean isTargetNode(int nodePosition, int treeNodeSize)
377    {
378       return nodePosition == (treeNodeSize - 1);
379    }
380
381    private void acquireNodeLock(NodeSPI node, Object JavaDoc owner, GlobalTransaction gtx, NodeLock.LockType lock_type, long lock_timeout) throws LockingException, TimeoutException, InterruptedException JavaDoc
382    {
383       boolean acquired = lockManager.acquire(node, owner, lock_type, lock_timeout);
384       if (acquired)
385       {
386          // Record the lock for release on method return or tx commit/rollback
387
recordNodeLock(gtx, lockManager.getLock(node));
388       }
389    }
390
391    private void recordNodeLock(GlobalTransaction gtx, NodeLock lock)
392    {
393       if (gtx != null)
394       {
395          // add the lock to the list of locks maintained for this transaction
396
// (needed for release of locks on commit or rollback)
397
cache.getTransactionTable().addLock(gtx, lock);
398       }
399       else
400       {
401          Thread JavaDoc currentThread = Thread.currentThread();
402          List JavaDoc locks = getLocks(currentThread);
403          if (!locks.contains(lock))
404          {
405             locks.add(lock);
406             lock_table.put(currentThread, locks);
407          }
408       }
409    }
410
411    private List JavaDoc getLocks(Thread JavaDoc currentThread)
412    {
413       // This sort of looks like a get/put race condition, but
414
// since we key off the Thread, it's not
415
List JavaDoc locks = (List JavaDoc) lock_table.get(currentThread);
416       if (locks == null)
417       {
418          locks = Collections.synchronizedList(new LinkedList JavaDoc());
419          lock_table.put(currentThread, locks);
420       }
421       return locks;
422    }
423
424
425    private void createNodes(Fqn fqn, GlobalTransaction gtx)
426    {
427       int treeNodeSize;
428       if ((treeNodeSize = fqn.size()) == 0) return;
429       NodeSPI n = cache.getRoot();
430       for (int i = 0; i < treeNodeSize; i++)
431       {
432          Object JavaDoc child_name = fqn.get(i);
433          Fqn childFqn = new Fqn(child_name);
434
435          NodeSPI child_node = n.getChildDirect(childFqn);
436          if (child_node == null) child_node = n.addChildDirect(childFqn);
437          // test if this node needs to be 'undeleted'
438
// reverse the "remove" if the node has been previously removed in the same tx, if this operation is a put()
439
if (gtx != null && needToReverseRemove(child_node, tx_table.get(gtx), NodeLock.LockType.WRITE, false, true))
440          {
441             reverseRemove(child_node);
442          }
443
444          if (child_node == null)
445          {
446             if (log.isTraceEnabled())
447             {
448                log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
449             }
450             return;
451          }
452          n = child_node;
453       }
454    }
455
456    /**
457     * Remove all locks held by <tt>tx</tt>, remove the transaction from the transaction table
458     *
459     * @param gtx
460     */

461    private void commit(GlobalTransaction gtx)
462    {
463       if (log.isTraceEnabled())
464       {
465          log.trace("committing cache with gtx " + gtx);
466       }
467
468       TransactionEntry entry = tx_table.get(gtx);
469       if (entry == null)
470       {
471          log.error("entry for transaction " + gtx + " not found (maybe already committed)");
472          return;
473       }
474
475       // first remove nodes that should be deleted.
476
Iterator JavaDoc removedNodes = entry.getRemovedNodes().iterator();
477       CacheImpl tcpi = (CacheImpl) cache;
478       while (removedNodes.hasNext())
479       {
480          Fqn f = (Fqn) removedNodes.next();
481          tcpi.realRemove(f, false);
482       }
483
484       // Let's do it in stack style, LIFO
485
entry.releaseAllLocksLIFO(gtx);
486
487       Transaction ltx = entry.getTransaction();
488       if (log.isTraceEnabled())
489       {
490          log.trace("removing local transaction " + ltx + " and global transaction " + gtx);
491       }
492       tx_table.remove(ltx);
493       tx_table.remove(gtx);
494    }
495
496
497    /**
498     * Revert all changes made inside this TX: invoke all method calls of the undo-ops
499     * list. Then release all locks and remove the TX from the transaction table.
500     * <ol>
501     * <li>Revert all modifications done in the current TX<li/>
502     * <li>Release all locks held by the current TX</li>
503     * <li>Remove all temporary nodes created by the current TX</li>
504     * </ol>
505     *
506     * @param tx
507     */

508    private void rollback(GlobalTransaction tx)
509    {
510       TransactionEntry entry = tx_table.get(tx);
511
512       if (log.isTraceEnabled())
513       {
514          log.trace("called to rollback cache with GlobalTransaction=" + tx);
515       }
516
517       if (entry == null)
518       {
519          log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)");
520          return;
521       }
522
523
524       Iterator JavaDoc removedNodes = entry.getRemovedNodes().iterator();
525       CacheImpl tcpi = (CacheImpl) cache;
526       while (removedNodes.hasNext())
527       {
528          Fqn f = (Fqn) removedNodes.next();
529          tcpi.realRemove(f, false);
530
531       }
532
533       // 1. Revert the modifications by running the undo-op list in reverse. This *cannot* throw any exceptions !
534
entry.undoOperations(cache);
535
536       // This was removed as we don't use temporary nodes anymore; we now create undo-operations on put(), e.g.
537
// put(/a/b/c) on /a, create b and c, plus undo operations _remove(a/b/c) and _remove(/a/b)
538

539       // 2. Remove all temporary nodes. Need to do it backwards since node is LIFO.
540
// for(ListIterator it=new LinkedList(entry.getNodes()).listIterator(entry.getNodes().size());
541
// it.hasPrevious();) {
542
// node_name=(Fqn)it.previous();
543
// try {
544
// cache._remove(tx, node_name, false);
545
// }
546
// catch(Throwable t) {
547
// log.error("failed removing node \"" + node_name + "\"", t);
548
// }
549
// }
550

551       // 3. Finally, release all locks held by this TX
552
// Let's do it in stack style, LIFO
553
// Note that the lock could have been released already so don't panic.
554
entry.releaseAllLocksLIFO(tx);
555
556       Transaction ltx = entry.getTransaction();
557       if (log.isTraceEnabled())
558       {
559          log.trace("removing local transaction " + ltx + " and global transaction " + tx);
560       }
561       tx_table.remove(ltx);
562       tx_table.remove(tx);
563    }
564
565    private static class LockManager
566    {
567       boolean acquire(NodeSPI node, Object JavaDoc owner, NodeLock.LockType lockType, long timeout) throws InterruptedException JavaDoc
568       {
569          return getLock(node).acquire(owner, timeout, lockType);
570       }
571
572       NodeLock getLock(NodeSPI node)
573       {
574          return node.getLock();
575       }
576
577       public Set JavaDoc acquireAll(NodeSPI node, Object JavaDoc owner, NodeLock.LockType lockType, long timeout) throws InterruptedException JavaDoc
578       {
579          return getLock(node).acquireAll(owner, timeout, lockType);
580       }
581    }
582
583 }
584
Popular Tags