KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > cache > test > local > TxDeadlockUnitTestCase


1 /*
2  *
3  * JBoss, the OpenSource J2EE webOS
4  *
5  * Distributable under LGPL license.
6  * See terms of license at gnu.org.
7  */

8
9 package org.jboss.test.cache.test.local;
10
11 import junit.framework.Test;
12 import junit.framework.TestCase;
13 import junit.framework.TestSuite;
14 import org.jboss.cache.CacheException;
15 import org.jboss.cache.Fqn;
16 import org.jboss.cache.TreeCache;
17 import org.jboss.cache.lock.IsolationLevel;
18 import org.jboss.cache.lock.TimeoutException;
19 import org.jboss.cache.lock.UpgradeException;
20 import org.jboss.cache.transaction.DummyTransactionManager;
21 import org.jboss.logging.Logger;
22
23 import javax.transaction.NotSupportedException JavaDoc;
24 import javax.transaction.SystemException JavaDoc;
25 import javax.transaction.Transaction JavaDoc;
26
27 /**
28  * Tests transactional access to a local TreeCache, with concurrent (deadlock-prone) access.
29  * Note: we use DummpyTranasctionManager to replace jta
30  *
31  * @version $Id: TxDeadlockUnitTestCase.java,v 1.10.2.1 2005/05/01 04:48:05 starksm Exp $
32  */

33 public class TxDeadlockUnitTestCase extends TestCase {
34    TreeCache cache=null;
35    Exception JavaDoc thread_ex;
36
37    final Fqn NODE=Fqn.fromString("/a/b/c");
38    final Fqn PARENT_NODE=Fqn.fromString("/a/b");
39    final Fqn FQN1=NODE;
40    final Fqn FQN2=Fqn.fromString("/1/2/3");
41    final Logger log=Logger.getLogger(TxDeadlockUnitTestCase.class);
42
43
44    public TxDeadlockUnitTestCase(String JavaDoc name) {
45       super(name);
46    }
47
48    public void setUp() throws Exception JavaDoc {
49       super.setUp();
50       DummyTransactionManager.getInstance();
51       cache=new TreeCache("test", null, 10000);
52       cache.setCacheMode(TreeCache.LOCAL);
53       cache.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
54       cache.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
55       cache.setLockAcquisitionTimeout(3000);
56       cache.createService();
57       cache.startService();
58       thread_ex=null;
59    }
60
61
62    public void tearDown() throws Exception JavaDoc {
63       super.tearDown();
64       if(cache != null)
65          cache.stopService();
66       if(thread_ex != null)
67          throw thread_ex;
68    }
69
70
71    /* @todo JBCACHE-97
72    public void testConcurrentUpgrade() throws CacheException, InterruptedException {
73       MyThread t1=new MyThreadTimeout("MyThread#1", NODE);
74       MyThread t2=new MyThread("MyThread#2", NODE);
75
76       cache.put(NODE, null);
77
78       t1.start();
79       t2.start();
80
81       sleep(1000);
82
83       synchronized(t1) {
84          t1.notify(); // t1 will now try to upgrade RL to WL, but fails b/c t2 still has a RL
85       }
86
87       sleep(1000);
88
89       synchronized(t2) {
90          t2.notify(); // t1 should now be able to upgrade because t1 was rolled back (RL was removed)
91       }
92
93       t1.join();
94       t2.join();
95    }
96    */

97
98    /**
99     * Typical deadlock: t1 acquires WL on /a/b/c, t2 WL on /1/2/3, then t1 attempts to get WL on /1/2/3 (locked by t2),
100     * and t2 tries to acquire WL on /a/b/c. One (or both) of the 2 transactions is going to timeout and roll back.
101     */

102    public void testPutDeadlock() throws CacheException, InterruptedException JavaDoc {
103       MyPutter t1=new MyPutterTimeout("MyPutter#1", FQN1, FQN2);
104       MyPutter t2=new MyPutter("MyPutter#2", FQN2, FQN1);
105
106       cache.put(FQN1, null);
107       cache.put(FQN2, null);
108
109       t1.start();
110       t2.start();
111
112       sleep(1000);
113
114       synchronized(t1) {
115          t1.notify(); // t1 will now try to acquire WL on /1/2/3 (held by t2) - this will time out
116
}
117
118       sleep(1000);
119
120       synchronized(t2) {
121          t2.notify(); // t2 tries to acquire WL on /a/b/c (held by t1)
122
}
123
124       t1.join();
125       t2.join();
126    }
127
128
129    public void testCreateIfNotExistsLogic() throws CacheException, InterruptedException JavaDoc {
130       cache.put(NODE, null);
131
132       class T0 extends GenericThread {
133          public T0(String JavaDoc name) {
134             super(name);
135          }
136
137          protected void _run() throws Exception JavaDoc {
138             Transaction JavaDoc myTx=startTransaction();
139             log("put(" + NODE + ")");
140             cache.put(NODE, null);
141             log("put(" + NODE + "): OK");
142
143             synchronized(this) {wait();}
144
145             log("remove(" + NODE + ")");
146             cache.remove(NODE);
147             log("remove(" + NODE + "): OK");
148
149             log("committing TX");
150             myTx.commit();
151          }
152       }
153
154       class T1 extends GenericThread {
155          public T1(String JavaDoc name) {
156             super(name);
157          }
158
159          protected void _run() throws Exception JavaDoc {
160             Transaction JavaDoc myTx=startTransaction();
161             log("put(" + NODE + ")");
162             cache.put(NODE, null);
163             log("put(" + NODE + "): OK");
164
165             log("committing TX");
166             myTx.commit();
167          }
168
169       }
170
171       T0 t0=new T0("T0");
172       t0.start();
173       sleep(500);
174       T1 t1=new T1("T1");
175       t1.start();
176       sleep(500);
177       synchronized(t0) {
178          t0.notify();
179       }
180       t0.join();
181       t1.join();
182    }
183
184
185    /* @todo JBCACHE-97
186    public void testMoreThanOneUpgrader() throws Exception {
187       final int NUM=2;
188       final Object lock=new Object();
189
190       cache.put(NODE, "bla", "blo");
191
192       MyUpgrader[] upgraders=new MyUpgrader[NUM];
193       for(int i=0; i < upgraders.length; i++) {
194          upgraders[i]=new MyUpgrader("Upgrader#" + i, NODE, lock);
195          upgraders[i].start();
196       }
197
198       sleep(1000); // all threads have no RLs
199       log("locks: " + cache.printLockInfo());
200
201
202       synchronized(lock) {
203          lock.notifyAll();
204       }
205
206       // all threads now try to upgrade the RL to a WL
207       for(int i=0; i < upgraders.length; i++) {
208          MyThread upgrader=upgraders[i];
209          upgrader.join();
210       }
211    }
212    */

213
214    public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException JavaDoc {
215       ContinuousPutter putter=new ContinuousPutter("Putter", NODE);
216       ContinuousRemover remover=new ContinuousRemover("Remover", PARENT_NODE);
217       putter.start();
218       remover.start();
219       sleep(5000);
220       log("stopping Putter");
221       putter.looping=false;
222       log("stopping Remover");
223       remover.looping=false;
224       putter.join();
225       remover.join();
226    }
227
228    public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException JavaDoc {
229       ContinuousPutter putter=new ContinuousPutter("Putter", PARENT_NODE);
230       ContinuousRemover remover=new ContinuousRemover("Remover", NODE);
231       putter.start();
232       remover.start();
233       sleep(5000);
234       log("stopping Putter");
235       putter.looping=false;
236       log("stopping Remover");
237       remover.looping=false;
238       putter.join();
239       remover.join();
240    }
241
242    public void testPutsAndRemovesOnSameNode() throws InterruptedException JavaDoc {
243       ContinuousPutter putter=new ContinuousPutter("Putter", NODE);
244       ContinuousRemover remover=new ContinuousRemover("Remover", NODE);
245       putter.start();
246       remover.start();
247       sleep(5000);
248       log("stopping Putter");
249       putter.looping=false;
250       log("stopping Remover");
251       remover.looping=false;
252       putter.join();
253       remover.join();
254    }
255
256
257    class GenericThread extends Thread JavaDoc {
258       protected Transaction JavaDoc tx;
259       protected boolean looping=true;
260
261       public GenericThread() {
262
263       }
264
265       public GenericThread(String JavaDoc name) {
266          super(name);
267       }
268
269       public void setLooping(boolean looping) {
270          this.looping=looping;
271       }
272
273       public void run() {
274          try {
275             _run();
276          }
277          catch(Exception JavaDoc t) {
278             System.out.println(getName() + ": " + t);
279             if(thread_ex == null)
280                thread_ex=t;
281          }
282          if(log.isTraceEnabled())
283             log.trace("Thread " + getName() + " terminated");
284       }
285
286       protected void _run() throws Exception JavaDoc {
287          throw new UnsupportedOperationException JavaDoc();
288       }
289    }
290
291
292    class ContinuousRemover extends GenericThread {
293       Fqn fqn;
294
295       public ContinuousRemover(String JavaDoc name, Fqn fqn) {
296          super(name);
297          this.fqn=fqn;
298       }
299
300
301       protected void _run() throws Exception JavaDoc {
302          while(thread_ex == null && looping) {
303             try {
304                if(interrupted())
305                   break;
306                tx=startTransaction();
307                log("remove(" + fqn + ")");
308                cache.remove(fqn);
309                sleep(random(20));
310                tx.commit();
311             }
312             catch(InterruptedException JavaDoc interrupted) {
313                tx.rollback();
314                break;
315             }
316             catch(Exception JavaDoc ex) {
317                tx.rollback();
318                throw ex;
319             }
320          }
321       }
322    }
323
324    class ContinuousPutter extends GenericThread {
325       Fqn fqn;
326
327       public ContinuousPutter(String JavaDoc name, Fqn fqn) {
328          super(name);
329          this.fqn=fqn;
330       }
331
332
333       protected void _run() throws Exception JavaDoc {
334          while(thread_ex == null && looping) {
335             try {
336                if(interrupted())
337                   break;
338                tx=startTransaction();
339                log("put(" + fqn + ")");
340                cache.put(fqn, "foo", "bar");
341                sleep(random(20));
342                tx.commit();
343             }
344             catch(InterruptedException JavaDoc interrupted) {
345                tx.rollback();
346                break;
347             }
348             catch(Exception JavaDoc ex) {
349                tx.rollback();
350                throw ex;
351             }
352          }
353       }
354    }
355
356    public static long random(long range) {
357       return (long)((Math.random() * 100000) % range) + 1;
358    }
359
360
361
362    class MyThread extends GenericThread {
363       Fqn fqn;
364
365
366       public MyThread(String JavaDoc name, Fqn fqn) {
367          super(name);
368          this.fqn=fqn;
369       }
370
371       protected void _run() throws Exception JavaDoc {
372          tx=startTransaction();
373          log("get(" + fqn + ")");
374          cache.get(fqn, "bla"); // acquires RL
375
log("done, locks: " + cache.printLockInfo());
376
377          synchronized(this) {wait();}
378
379          log("put(" + fqn + ")");
380          cache.put(fqn, "key", "val"); // need to upgrade RL to WL
381
log("done, locks: " + cache.printLockInfo());
382          tx.commit();
383          log("committed TX, locks: " + cache.printLockInfo());
384       }
385    }
386
387
388    class MyUpgrader extends MyThread {
389       Object JavaDoc lock;
390
391       public MyUpgrader(String JavaDoc name, Fqn fqn) {
392          super(name, fqn);
393       }
394
395       public MyUpgrader(String JavaDoc name, Fqn fqn, Object JavaDoc lock) {
396          super(name, fqn);
397          this.lock=lock;
398       }
399
400       protected void _run() throws Exception JavaDoc {
401          tx=startTransaction();
402          log("get(" + fqn + ")");
403          cache.get(fqn, "bla"); // acquires RL
404

405          synchronized(lock) {lock.wait();}
406
407          log("put(" + fqn + ")");
408          cache.put(fqn, "key", "val"); // need to upgrade RL to WL
409
log("done, locks: " + cache.printLockInfo());
410          tx.commit();
411          log("committed TX, locks: " + cache.printLockInfo());
412       }
413
414    }
415
416    class MyThreadTimeout extends MyThread {
417
418       public MyThreadTimeout(String JavaDoc name, Fqn fqn) {
419          super(name, fqn);
420       }
421
422       protected void _run() throws Exception JavaDoc {
423          try {
424             super._run();
425          }
426          catch(UpgradeException upgradeEx) {
427             log("received UpgradeException as expected");
428             tx.rollback();
429             log("rolled back TX, locks: " + cache.printLockInfo());
430          }
431          catch(TimeoutException timeoutEx) {
432             log("received TimeoutException as expected");
433             tx.rollback();
434             log("rolled back TX, locks: " + cache.printLockInfo());
435          }
436       }
437    }
438
439
440
441    class MyPutter extends GenericThread {
442       Fqn fqn1, fqn2;
443
444       public MyPutter(String JavaDoc name, Fqn fqn1, Fqn fqn2) {
445          super(name);
446          this.fqn1=fqn1;
447          this.fqn2=fqn2;
448       }
449
450       protected void _run() throws Exception JavaDoc {
451          tx=startTransaction();
452          log("put(" + fqn1 + ")");
453          cache.put(fqn1, "key", "val"); // need to upgrade RL to WL
454
log("done, locks: " + cache.printLockInfo());
455          synchronized(this) {wait();}
456          log("put(" + fqn2 + ")");
457          cache.put(fqn2, "key", "val"); // need to upgrade RL to WL
458
log("done, locks: " + cache.printLockInfo());
459          tx.commit();
460          log("committed TX, locks: " + cache.printLockInfo());
461       }
462    }
463
464    class MyPutterTimeout extends MyPutter {
465
466       public MyPutterTimeout(String JavaDoc name, Fqn fqn1, Fqn fqn2) {
467          super(name, fqn1, fqn2);
468       }
469
470       protected void _run() throws Exception JavaDoc {
471          try {
472             super._run();
473          }
474          catch(TimeoutException timeoutEx) {
475             log("received TimeoutException as expected");
476             tx.rollback();
477             log("rolled back TX, locks: " + cache.printLockInfo());
478          }
479       }
480    }
481
482
483
484    private static void log(String JavaDoc msg) {
485       System.out.println(Thread.currentThread().getName() + ": " + msg);
486    }
487
488    private static void sleep(long timeout) {
489       try {
490          Thread.sleep(timeout);
491       }
492       catch(InterruptedException JavaDoc e) {
493       }
494    }
495
496
497
498    Transaction JavaDoc startTransaction() throws SystemException JavaDoc, NotSupportedException JavaDoc {
499       DummyTransactionManager mgr=DummyTransactionManager.getInstance();
500       mgr.begin();
501       Transaction JavaDoc tx=mgr.getTransaction();
502       return tx;
503    }
504
505
506
507    public static Test suite() throws Exception JavaDoc {
508       return new TestSuite(TxDeadlockUnitTestCase.class);
509    }
510
511    public static void main(String JavaDoc[] args) throws Exception JavaDoc {
512       junit.textui.TestRunner.run(suite());
513    }
514
515
516 }
517
Popular Tags