KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > tests > transaction > DeadlockTest


1 /*
2  *
3  * JBoss, the OpenSource J2EE webOS
4  *
5  * Distributable under LGPL license.
6  * See terms of license at gnu.org.
7  */

8
9 package org.jboss.cache.tests.transaction;
10
11 import junit.framework.Test;
12 import junit.framework.TestCase;
13 import junit.framework.TestSuite;
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.jboss.cache.CacheException;
17 import org.jboss.cache.Fqn;
18 import org.jboss.cache.TreeCache;
19 import org.jboss.cache.lock.IsolationLevel;
20 import org.jboss.cache.lock.TimeoutException;
21 import org.jboss.cache.lock.UpgradeException;
22 import org.jboss.cache.transaction.DummyTransactionManager;
23
24 import javax.transaction.NotSupportedException JavaDoc;
25 import javax.transaction.SystemException JavaDoc;
26 import javax.transaction.Transaction JavaDoc;
27
28 /**
29  * Tests transactional access to a local TreeCache, with concurrent (deadlock-prone) access.
30  * Note: we use DummpyTranasctionManager to replace jta
31  *
32  * @version $Id: DeadlockTest.java,v 1.2 2005/04/05 17:19:45 belaban Exp $
33  */

34 public class DeadlockTest extends TestCase {
35    TreeCache cache=null;
36    Exception JavaDoc thread_ex;
37
38    final Fqn NODE=Fqn.fromString("/a/b/c");
39    final Fqn PARENT_NODE=Fqn.fromString("/a/b");
40    final Fqn FQN1=NODE;
41    final Fqn FQN2=Fqn.fromString("/1/2/3");
42    final Log log=LogFactory.getLog(DeadlockTest.class);
43
44
45    public DeadlockTest(String JavaDoc name) {
46       super(name);
47    }
48
49    public void setUp() throws Exception JavaDoc {
50       super.setUp();
51       DummyTransactionManager.getInstance();
52       cache=new TreeCache("test", null, 10000);
53       cache.setCacheMode(TreeCache.LOCAL);
54       cache.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
55       cache.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
56       cache.setLockAcquisitionTimeout(3000);
57       cache.createService();
58       cache.startService();
59       thread_ex=null;
60    }
61
62
63    public void tearDown() throws Exception JavaDoc {
64       super.tearDown();
65       if(cache != null)
66          cache.stopService();
67       if(thread_ex != null)
68          throw thread_ex;
69    }
70
71
72    public void testConcurrentUpgrade() throws CacheException, InterruptedException JavaDoc {
73       MyThread t1=new MyThreadTimeout("MyThread#1", NODE);
74       MyThread t2=new MyThread("MyThread#2", NODE);
75
76       cache.put(NODE, null);
77
78       t1.start();
79       t2.start();
80
81       sleep(1000);
82
83       synchronized(t1) {
84          t1.notify(); // t1 will now try to upgrade RL to WL, but fails b/c t2 still has a RL
85
}
86
87       sleep(1000);
88
89       synchronized(t2) {
90          t2.notify(); // t1 should now be able to upgrade because t1 was rolled back (RL was removed)
91
}
92
93       t1.join();
94       t2.join();
95    }
96
97
98    /**
99     * Typical deadlock: t1 acquires WL on /a/b/c, t2 WL on /1/2/3, then t1 attempts to get WL on /1/2/3 (locked by t2),
100     * and t2 tries to acquire WL on /a/b/c. One (or both) of the 2 transactions is going to timeout and roll back.
101     */

102    public void testPutDeadlock() throws CacheException, InterruptedException JavaDoc {
103       MyPutter t1=new MyPutterTimeout("MyPutter#1", FQN1, FQN2);
104       MyPutter t2=new MyPutter("MyPutter#2", FQN2, FQN1);
105
106       cache.put(FQN1, null);
107       cache.put(FQN2, null);
108
109       t1.start();
110       t2.start();
111
112       sleep(1000);
113
114       synchronized(t1) {
115          t1.notify(); // t1 will now try to acquire WL on /1/2/3 (held by t2) - this will time out
116
}
117
118       sleep(1000);
119
120       synchronized(t2) {
121          t2.notify(); // t2 tries to acquire WL on /a/b/c (held by t1)
122
}
123
124       t1.join();
125       t2.join();
126    }
127
128
129    public void testCreateIfNotExistsLogic() throws CacheException, InterruptedException JavaDoc {
130       cache.put(NODE, null);
131
132       class T0 extends GenericThread {
133          public T0(String JavaDoc name) {
134             super(name);
135          }
136
137          protected void _run() throws Exception JavaDoc {
138             Transaction JavaDoc myTx=startTransaction();
139             log("put(" + NODE + ")");
140             cache.put(NODE, null);
141             log("put(" + NODE + "): OK");
142
143             synchronized(this) {wait();}
144
145             log("remove(" + NODE + ")");
146             cache.remove(NODE);
147             log("remove(" + NODE + "): OK");
148
149             log("committing TX");
150             myTx.commit();
151          }
152       }
153
154       class T1 extends GenericThread {
155          public T1(String JavaDoc name) {
156             super(name);
157          }
158
159          protected void _run() throws Exception JavaDoc {
160             Transaction JavaDoc myTx=startTransaction();
161             log("put(" + NODE + ")");
162             cache.put(NODE, null);
163             log("put(" + NODE + "): OK");
164
165             log("committing TX");
166             myTx.commit();
167          }
168
169       }
170
171       T0 t0=new T0("T0");
172       t0.start();
173       sleep(500);
174       T1 t1=new T1("T1");
175       t1.start();
176       sleep(500);
177       synchronized(t0) {
178          t0.notify();
179       }
180       t0.join();
181       t1.join();
182    }
183
184
185    public void testMoreThanOneUpgrader() throws Exception JavaDoc {
186       final int NUM=2;
187       final Object JavaDoc lock=new Object JavaDoc();
188
189       cache.put(NODE, "bla", "blo");
190
191       MyUpgrader[] upgraders=new MyUpgrader[NUM];
192       for(int i=0; i < upgraders.length; i++) {
193          upgraders[i]=new MyUpgrader("Upgrader#" + i, NODE, lock);
194          upgraders[i].start();
195       }
196
197       sleep(1000); // all threads have no RLs
198
log("locks: " + cache.printLockInfo());
199
200
201       synchronized(lock) {
202          lock.notifyAll();
203       }
204
205       // all threads now try to upgrade the RL to a WL
206
for(int i=0; i < upgraders.length; i++) {
207          MyThread upgrader=upgraders[i];
208          upgrader.join();
209       }
210    }
211
212
213    public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException JavaDoc {
214       ContinuousPutter putter=new ContinuousPutter("Putter", NODE);
215       ContinuousRemover remover=new ContinuousRemover("Remover", PARENT_NODE);
216       putter.start();
217       remover.start();
218       sleep(5000);
219       log("stopping Putter");
220       putter.looping=false;
221       log("stopping Remover");
222       remover.looping=false;
223       putter.join();
224       remover.join();
225    }
226
227    public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException JavaDoc {
228       ContinuousPutter putter=new ContinuousPutter("Putter", PARENT_NODE);
229       ContinuousRemover remover=new ContinuousRemover("Remover", NODE);
230       putter.start();
231       remover.start();
232       sleep(5000);
233       log("stopping Putter");
234       putter.looping=false;
235       log("stopping Remover");
236       remover.looping=false;
237       putter.join();
238       remover.join();
239    }
240
241    public void testPutsAndRemovesOnSameNode() throws InterruptedException JavaDoc {
242       ContinuousPutter putter=new ContinuousPutter("Putter", NODE);
243       ContinuousRemover remover=new ContinuousRemover("Remover", NODE);
244       putter.start();
245       remover.start();
246       sleep(5000);
247       log("stopping Putter");
248       putter.looping=false;
249       log("stopping Remover");
250       remover.looping=false;
251       putter.join();
252       remover.join();
253    }
254
255
256    class GenericThread extends Thread JavaDoc {
257       protected Transaction JavaDoc tx;
258       protected boolean looping=true;
259
260       public GenericThread() {
261
262       }
263
264       public GenericThread(String JavaDoc name) {
265          super(name);
266       }
267
268       public void setLooping(boolean looping) {
269          this.looping=looping;
270       }
271
272       public void run() {
273          try {
274             _run();
275          }
276          catch(Exception JavaDoc t) {
277             System.out.println(getName() + ": " + t);
278             if(thread_ex == null)
279                thread_ex=t;
280          }
281          if(log.isTraceEnabled())
282             log.trace("Thread " + getName() + " terminated");
283       }
284
285       protected void _run() throws Exception JavaDoc {
286          throw new UnsupportedOperationException JavaDoc();
287       }
288    }
289
290
291    class ContinuousRemover extends GenericThread {
292       Fqn fqn;
293
294       public ContinuousRemover(String JavaDoc name, Fqn fqn) {
295          super(name);
296          this.fqn=fqn;
297       }
298
299
300       protected void _run() throws Exception JavaDoc {
301          while(thread_ex == null && looping) {
302             try {
303                if(interrupted())
304                   break;
305                tx=startTransaction();
306                log("remove(" + fqn + ")");
307                cache.remove(fqn);
308                sleep(random(20));
309                tx.commit();
310             }
311             catch(InterruptedException JavaDoc interrupted) {
312                tx.rollback();
313                break;
314             }
315             catch(Exception JavaDoc ex) {
316                tx.rollback();
317                throw ex;
318             }
319          }
320       }
321    }
322
323    class ContinuousPutter extends GenericThread {
324       Fqn fqn;
325
326       public ContinuousPutter(String JavaDoc name, Fqn fqn) {
327          super(name);
328          this.fqn=fqn;
329       }
330
331
332       protected void _run() throws Exception JavaDoc {
333          while(thread_ex == null && looping) {
334             try {
335                if(interrupted())
336                   break;
337                tx=startTransaction();
338                log("put(" + fqn + ")");
339                cache.put(fqn, "foo", "bar");
340                sleep(random(20));
341                tx.commit();
342             }
343             catch(InterruptedException JavaDoc interrupted) {
344                tx.rollback();
345                break;
346             }
347             catch(Exception JavaDoc ex) {
348                tx.rollback();
349                throw ex;
350             }
351          }
352       }
353    }
354
355    public static long random(long range) {
356       return (long)((Math.random() * 100000) % range) + 1;
357    }
358
359
360
361    class MyThread extends GenericThread {
362       Fqn fqn;
363
364
365       public MyThread(String JavaDoc name, Fqn fqn) {
366          super(name);
367          this.fqn=fqn;
368       }
369
370       protected void _run() throws Exception JavaDoc {
371          tx=startTransaction();
372          log("get(" + fqn + ")");
373          cache.get(fqn, "bla"); // acquires RL
374
log("done, locks: " + cache.printLockInfo());
375
376          synchronized(this) {wait();}
377
378          log("put(" + fqn + ")");
379          cache.put(fqn, "key", "val"); // need to upgrade RL to WL
380
log("done, locks: " + cache.printLockInfo());
381          tx.commit();
382          log("committed TX, locks: " + cache.printLockInfo());
383       }
384    }
385
386
387    class MyUpgrader extends MyThread {
388       Object JavaDoc lock;
389
390       public MyUpgrader(String JavaDoc name, Fqn fqn) {
391          super(name, fqn);
392       }
393
394       public MyUpgrader(String JavaDoc name, Fqn fqn, Object JavaDoc lock) {
395          super(name, fqn);
396          this.lock=lock;
397       }
398
399       protected void _run() throws Exception JavaDoc {
400          tx=startTransaction();
401          log("get(" + fqn + ")");
402          cache.get(fqn, "bla"); // acquires RL
403

404          synchronized(lock) {lock.wait();}
405
406          log("put(" + fqn + ")");
407          cache.put(fqn, "key", "val"); // need to upgrade RL to WL
408
log("done, locks: " + cache.printLockInfo());
409          tx.commit();
410          log("committed TX, locks: " + cache.printLockInfo());
411       }
412
413    }
414
415    class MyThreadTimeout extends MyThread {
416
417       public MyThreadTimeout(String JavaDoc name, Fqn fqn) {
418          super(name, fqn);
419       }
420
421       protected void _run() throws Exception JavaDoc {
422          try {
423             super._run();
424          }
425          catch(UpgradeException upgradeEx) {
426             log("received UpgradeException as expected");
427             tx.rollback();
428             log("rolled back TX, locks: " + cache.printLockInfo());
429          }
430          catch(TimeoutException timeoutEx) {
431             log("received TimeoutException as expected");
432             tx.rollback();
433             log("rolled back TX, locks: " + cache.printLockInfo());
434          }
435       }
436    }
437
438
439
440    class MyPutter extends GenericThread {
441       Fqn fqn1, fqn2;
442
443       public MyPutter(String JavaDoc name, Fqn fqn1, Fqn fqn2) {
444          super(name);
445          this.fqn1=fqn1;
446          this.fqn2=fqn2;
447       }
448
449       protected void _run() throws Exception JavaDoc {
450          tx=startTransaction();
451          log("put(" + fqn1 + ")");
452          cache.put(fqn1, "key", "val"); // need to upgrade RL to WL
453
log("done, locks: " + cache.printLockInfo());
454          synchronized(this) {wait();}
455          log("put(" + fqn2 + ")");
456          cache.put(fqn2, "key", "val"); // need to upgrade RL to WL
457
log("done, locks: " + cache.printLockInfo());
458          tx.commit();
459          log("committed TX, locks: " + cache.printLockInfo());
460       }
461    }
462
463    class MyPutterTimeout extends MyPutter {
464
465       public MyPutterTimeout(String JavaDoc name, Fqn fqn1, Fqn fqn2) {
466          super(name, fqn1, fqn2);
467       }
468
469       protected void _run() throws Exception JavaDoc {
470          try {
471             super._run();
472          }
473          catch(TimeoutException timeoutEx) {
474             log("received TimeoutException as expected");
475             tx.rollback();
476             log("rolled back TX, locks: " + cache.printLockInfo());
477          }
478       }
479    }
480
481
482
483    private static void log(String JavaDoc msg) {
484       System.out.println(Thread.currentThread().getName() + ": " + msg);
485    }
486
487    private static void sleep(long timeout) {
488       try {
489          Thread.sleep(timeout);
490       }
491       catch(InterruptedException JavaDoc e) {
492       }
493    }
494
495
496
497    Transaction JavaDoc startTransaction() throws SystemException JavaDoc, NotSupportedException JavaDoc {
498       DummyTransactionManager mgr=DummyTransactionManager.getInstance();
499       mgr.begin();
500       Transaction JavaDoc tx=mgr.getTransaction();
501       return tx;
502    }
503
504
505
506    public static Test suite() throws Exception JavaDoc {
507       return new TestSuite(DeadlockTest.class);
508    }
509
510    public static void main(String JavaDoc[] args) throws Exception JavaDoc {
511       junit.textui.TestRunner.run(suite());
512    }
513
514
515 }
516
Popular Tags