KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > interceptors > LockInterceptor


1 /*
2 * JBoss, the OpenSource J2EE webOS
3 *
4 * Distributable under LGPL license.
5 * See terms of license at gnu.org.
6 */

7 package org.jboss.cache.interceptors;
8
9 import org.jboss.cache.*;
10 import org.jboss.cache.lock.IdentityLock;
11 import org.jboss.cache.lock.LockingException;
12 import org.jboss.cache.lock.TimeoutException;
13 import org.jgroups.blocks.MethodCall;
14
15 import javax.transaction.Status JavaDoc;
16 import javax.transaction.Synchronization JavaDoc;
17 import javax.transaction.Transaction JavaDoc;
18 import javax.transaction.TransactionManager JavaDoc;
19 import java.lang.reflect.Method JavaDoc;
20 import java.util.*;
21
22 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
23
24 /**
25  * Handles locking. When a TX is associated, we register for TX completion and unlock the locks acquired within the
26  * scope of the TX. When no TX is present, we keep track of the locks acquired during the current method and unlock
27  * when the method returns
28  * @author Bela Ban
29  * @version $Id: LockInterceptor.java,v 1.8.2.2 2005/04/06 21:06:41 starksm Exp $
30  */

31 public class LockInterceptor extends Interceptor {
32    private TransactionManager JavaDoc tx_mgr=null;
33    TransactionTable tx_table=null;
34    HashMap lock_table;
35    private long lock_acquisition_timeout;
36
37    ReentrantLock create_lock=new ReentrantLock();
38
39    /** List<Transaction> that we have registered for */
40    private List transactions=Collections.synchronizedList(new ArrayList());
41
42    final static int NONE = 0;
43    final static int READ = 1;
44    final static int WRITE = 2;
45
46
47    public void setCache(TreeCache cache) {
48       super.setCache(cache);
49       tx_mgr=cache.getTransactionManager();
50       tx_table=cache.getTransactionTable();
51       lock_table=cache.getLockTable();
52       lock_acquisition_timeout=cache.getLockAcquisitionTimeout();
53    }
54
55
56
57    public Object JavaDoc invoke(MethodCall m) throws Throwable JavaDoc {
58       Transaction JavaDoc tx=null;
59       GlobalTransaction gtx=null;
60       Object JavaDoc retval=null;
61       Fqn fqn=null;
62       int lock_type=NONE;
63       long lock_timeout=lock_acquisition_timeout;
64       Method JavaDoc meth=m.getMethod();
65       Object JavaDoc[] args=m.getArgs();
66
67       /** List<IdentityLock> locks. Locks acquired during the current method; will be released later by UnlockInterceptor.
68        * This list is only populated when there is no TX, otherwise the TransactionTable maintains the locks
69        * (keyed by TX) */

70       List locks=null;
71
72       boolean recursive=false;
73       boolean createIfNotExists=false;
74
75       if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { // ACTIVE or PREPARING
76
if(!transactions.contains(tx)) {
77             gtx=cache.getCurrentTransaction(tx);
78             if(gtx == null)
79                throw new Exception JavaDoc("failed to get global transaction");
80             try {
81                OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx);
82                SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache);
83                handler.registerAtTail(myHandler); // needs to be invoked last on TX commit
84
transactions.add(tx);
85             }
86             catch(Exception JavaDoc e) {
87                log.error("registration for tx=" + tx + " with transaction manager failed, running without TX", e);
88             }
89          }
90          else {
91             gtx=cache.getTransactionTable().get(tx);
92          }
93       }
94       else { // no TX
95
// we don't need synchronization on lock_table because the same thread won't enter concurrently
96
locks=(List)lock_table.get(Thread.currentThread());
97          if(locks == null) {
98             locks=new LinkedList();
99             lock_table.put(Thread.currentThread(), locks);
100          }
101       }
102
103       // 1. Determine the type of lock (read, write, or none) depending on the method. If no lock is required, invoke
104
// the method, then return immediately
105
// Set the Fqn
106
if(meth.equals(TreeCache.putDataMethodLocal) || meth.equals(TreeCache.putDataEraseMethodLocal) ||
107             meth.equals(TreeCache.putKeyValMethodLocal) || meth.equals(TreeCache.putFailFastKeyValueMethodLocal)) {
108          createIfNotExists=true;
109          fqn=(Fqn)args[1];
110          lock_type=WRITE;
111          if(meth.equals(TreeCache.putFailFastKeyValueMethodLocal))
112             lock_timeout=((Long JavaDoc)args[5]).longValue();
113       }
114       else if(meth.equals(TreeCache.removeNodeMethodLocal)) {
115          fqn=(Fqn)args[1];
116          lock_type=WRITE;
117          recursive=true; // remove node and *all* child nodes
118
}
119       else if(meth.equals(TreeCache.removeKeyMethodLocal) || meth.equals(TreeCache.removeDataMethodLocal)) {
120          fqn=(Fqn)args[1];
121          lock_type=WRITE;
122       }
123       else if(meth.equals(TreeCache.evictNodeMethodLocal)) {
124          fqn=(Fqn)args[0];
125          lock_type=WRITE;
126       }
127       else if(meth.equals(TreeCache.addChildMethodLocal)) {
128          fqn=(Fqn)args[1];
129          lock_type=WRITE;
130       }
131       else if(meth.equals(TreeCache.getKeyValueMethodLocal)) {
132          fqn=(Fqn)args[0];
133          lock_type=READ;
134       }
135       else if(meth.equals(TreeCache.getNodeMethodLocal)) {
136          fqn=(Fqn)args[0];
137          lock_type=READ;
138       }
139       else if(meth.equals(TreeCache.getKeysMethodLocal)) {
140          fqn=(Fqn)args[0];
141          lock_type=READ;
142       }
143       else if(meth.equals(TreeCache.getChildrenNamesMethodLocal) || meth.equals(TreeCache.releaseAllLocksMethodLocal) ||
144             meth.equals(TreeCache.printMethodLocal)) {
145          fqn=(Fqn)args[0];
146          lock_type=READ;
147       }
148       else if(meth.equals(TreeCache.lockMethodLocal)) {
149          fqn=(Fqn)args[0];
150          lock_type=((Integer JavaDoc)args[1]).intValue();
151          recursive=((Boolean JavaDoc)args[2]).booleanValue();
152       }
153
154
155       // Lock the node (must be either read or write if we get here)
156
// If no TX: add each acquired lock to the list of locks for this method (locks)
157
// If TX: [merge code from TransactionInterceptor]: register with TxManager, on commit/rollback,
158
// release the locks for the given TX
159
if(fqn != null) {
160          if(createIfNotExists) {
161             do {
162                lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists);
163             }
164             while(cache.exists(fqn) == false); // keep trying until we have the lock (fixes concurrent remove())
165
// terminates successfully, or with (Timeout)Exception
166
}
167          else
168             lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists);
169       }
170       else {
171          if(log.isTraceEnabled())
172             log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking");
173       }
174       if(meth.equals(TreeCache.lockMethodLocal))
175          return null;
176       retval=super.invoke(m);
177       return retval;
178    }
179
180
181
182    /**
183     * Locks a given node.
184     * @param fqn
185     * @param gtx
186     * @param lock_type READ, WRITE or NONE
187     * @param locks A List<Lock> of locks held, each new node locked is added
188     * @param recursive Lock children recursively
189     */

190    private void lock(Fqn fqn, GlobalTransaction gtx, int lock_type, List locks, boolean recursive,
191                      long lock_timeout, boolean createIfNotExists)
192          throws TimeoutException, LockingException, InterruptedException JavaDoc {
193       Node n, child_node=null;
194       Object JavaDoc child_name;
195       Fqn tmp_fqn=new Fqn();
196       int treeNodeSize;
197       Object JavaDoc owner=gtx != null? gtx : (Object JavaDoc)Thread.currentThread();
198       boolean acquired=false;
199
200       if(fqn == null) {
201          log.error("fqn is null - this should not be the case");
202          return;
203       }
204
205       if((treeNodeSize=fqn.size()) == 0)
206          return;
207
208       n=cache.getRoot();
209       for(int i=0; i < treeNodeSize; i++) {
210          child_name=fqn.get(i);
211          tmp_fqn=new Fqn(tmp_fqn, child_name);
212          if(createIfNotExists)
213             create_lock.acquire();
214          try {
215             child_node=n.getChild(child_name);
216             if(child_node == null) {
217                if(createIfNotExists) {
218                   child_node=n.createChild(child_name, tmp_fqn, n);
219                   if(log.isTraceEnabled())
220                      log.trace("created child " + child_name);
221                   if(gtx != null) {
222                      // add the node name to the list maintained for the current tx
223
// (needed for abort/rollback of transaction)
224
cache.addNode(gtx, (Fqn)tmp_fqn.clone());
225                   }
226                   create_lock.release();
227                   cache.notifyNodeCreated(tmp_fqn);
228                }
229                else {
230                   if(log.isTraceEnabled())
231                      log.trace("failed finding child " + child_name + " of node " + n.getFqn());
232                   return;
233                }
234             }
235          }
236          finally {
237             if(create_lock.holds() > 0)
238                create_lock.release();
239          }
240
241          if(lock_type == NONE) {
242             ;
243          }
244          else {
245             if(lock_type == WRITE && i == (treeNodeSize - 1)) {
246                acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_WRITE);
247             }
248             else {
249                acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_READ);
250             }
251          }
252
253          if(acquired) {
254             if(gtx != null) {
255                // add the lock to the list of locks maintained for this transaction
256
// (needed for release of locks on commit or rollback)
257
cache.getTransactionTable().addLock(gtx, child_node.getLock());
258             }
259             else {
260                IdentityLock l=child_node.getLock();
261                if(!locks.contains(l))
262                   locks.add(l);
263             }
264          }
265
266          if(recursive && i == (treeNodeSize - 1)) {
267             Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type);
268             if(acquired_locks.size() > 0) {
269                if(gtx != null) {
270                   cache.getTransactionTable().addLocks(gtx, acquired_locks);
271                }
272                else {
273                   locks.addAll(acquired_locks);
274                }
275             }
276          }
277
278          n=child_node;
279       }
280    }
281
282
283
284 // private boolean acquire(Node node, Object owner, int lock_type, long lock_timeout)
285
// throws TimeoutException, LockingException, InterruptedException {
286
// switch(lock_type) {
287
// case NONE:
288
// return true;
289
// case WRITE:
290
// return node.acquire(owner, lock_timeout, Node.LOCK_TYPE_WRITE);
291
// case READ:
292
// return node.acquire(owner, lock_timeout, Node.LOCK_TYPE_READ);
293
// }
294
// }
295

296
297    /**
298     * Remove all locks held by <tt>tx</tt>, remove the transaction from the transaction table
299     * @param gtx
300     */

301    private void commit(GlobalTransaction gtx) {
302       if(log.isTraceEnabled())
303          log.trace("committing cache with gtx " + gtx);
304
305       TransactionEntry entry=tx_table.get(gtx);
306       if(entry == null) {
307          log.error("entry for transaction " + gtx + " not found (maybe already committed)");
308          return;
309       }
310
311       // Let's do it in stack style, LIFO
312
List list=new LinkedList(entry.getLocks());
313       for(int i=list.size() - 1; i >= 0; i--) {
314          IdentityLock lock=(IdentityLock)list.get(i);
315          if(log.isTraceEnabled())
316             log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")");
317          lock.release(gtx);
318       }
319       entry.getLocks().clear();
320
321       Transaction JavaDoc ltx=entry.getTransaction();
322       if(log.isTraceEnabled())
323          log.trace("removing local transaction " + ltx + " and global transaction " + gtx);
324       tx_table.remove(ltx);
325       tx_table.remove(gtx);
326    }
327
328
329    /**
330      * Revert all changes made inside this TX: invoke all method calls of the undo-ops
331      * list. Then release all locks and remove the TX from the transaction table.
332      * <ol>
333      * <li>Revert all modifications done in the current TX<li/>
334      * <li>Release all locks held by the current TX</li>
335      * <li>Remove all temporary nodes created by the current TX</li>
336      * </ol>
337      *
338      * @param tx
339      */

340    private void rollback(GlobalTransaction tx) {
341       List undo_ops;
342       TransactionEntry entry=tx_table.get(tx);
343       MethodCall undo_op;
344       Object JavaDoc retval;
345       Fqn node_name;
346
347       if(log.isTraceEnabled())
348          log.trace("called to rollback cache with GlobalTransaction=" + tx);
349
350       if(entry == null) {
351          log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)");
352          return;
353       }
354
355       // 1. Revert the modifications by running the undo-op list in reverse. This *cannot* throw any exceptions !
356
undo_ops=new LinkedList(entry.getUndoOperations());
357       for(ListIterator it=undo_ops.listIterator(undo_ops.size()); it.hasPrevious();) {
358          undo_op=(MethodCall)it.previous();
359          try {
360             retval=undo_op.invoke(cache);
361             if(retval != null && retval instanceof Throwable JavaDoc) {
362                log.error("undo operation failed, error=" + retval);
363             }
364          }
365          catch(Throwable JavaDoc t) {
366             log.error("undo operation failed", t);
367          }
368       }
369
370       // 2. Remove all temporary nodes. Need to do it backwards since node is LIFO.
371
for(ListIterator it=new LinkedList(entry.getNodes()).listIterator(entry.getNodes().size());
372           it.hasPrevious();) {
373          node_name=(Fqn)it.previous();
374          try {
375             cache._remove(tx, node_name, false);
376          }
377          catch(Throwable JavaDoc t) {
378             log.error("failed removing node \"" + node_name + "\"", t);
379          }
380       }
381
382
383       // 3. Finally, release all locks held by this TX
384
// Let's do it in stack style, LIFO
385
// Note that the lock could have been released already so don't panic.
386
List list=new LinkedList(entry.getLocks());
387       for(int i=list.size() - 1; i >= 0; i--) {
388          IdentityLock lock=(IdentityLock)list.get(i);
389          if(log.isTraceEnabled())
390             log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")");
391          lock.release(tx);
392       }
393       entry.getLocks().clear();
394
395       Transaction JavaDoc ltx=entry.getTransaction();
396       if(log.isTraceEnabled())
397          log.trace("removing local transaction " + ltx + " and global transaction " + tx);
398       tx_table.remove(ltx);
399       tx_table.remove(tx);
400    }
401
402
403
404    class SynchronizationHandler implements Synchronization JavaDoc {
405       GlobalTransaction gtx=null;
406       Transaction JavaDoc tx=null;
407       TreeCache cache=null;
408
409
410       SynchronizationHandler(GlobalTransaction gtx, Transaction JavaDoc tx, TreeCache cache) {
411          this.gtx=gtx;
412          this.cache=cache;
413          this.tx=tx;
414       }
415
416
417       /**
418        * This method is invoked before the start of the commit or rollback
419        * process. We don't do anything because this method handles only the case of cache_mode
420        * being LOCAL
421        */

422       public void beforeCompletion() {
423       }
424
425
426       /**
427        * Depending on the status (OK or FAIL), call commit() or rollback() on the CacheLoader
428        *
429        * @param status
430        */

431       public void afterCompletion(int status) {
432          transactions.remove(tx);
433          switch(status) {
434             case Status.STATUS_COMMITTED:
435                commit(gtx);
436                break;
437
438             case Status.STATUS_MARKED_ROLLBACK: // this one is probably not needed
439
case Status.STATUS_ROLLEDBACK:
440                if(log.isDebugEnabled())
441                   log.debug("rolling back transaction");
442                rollback(gtx); // roll back locally
443
break;
444             default:
445                rollback(gtx); // roll back locally
446
throw new IllegalStateException JavaDoc("failed rolling back transaction: " + status);
447          }
448       }
449    }
450
451
452 }
453
Popular Tags