KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > transaction > DeadlockTest


1 /*
2  *
3  * JBoss, the OpenSource J2EE webOS
4  *
5  * Distributable under LGPL license.
6  * See terms of license at gnu.org.
7  */

8
9 package org.jboss.cache.transaction;
10
11 import junit.framework.Test;
12 import junit.framework.TestCase;
13 import junit.framework.TestSuite;
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.jboss.cache.CacheException;
17 import org.jboss.cache.CacheImpl;
18 import org.jboss.cache.Fqn;
19 import org.jboss.cache.config.Configuration;
20 import org.jboss.cache.lock.IsolationLevel;
21 import org.jboss.cache.lock.TimeoutException;
22 import org.jboss.cache.lock.UpgradeException;
23 import org.jboss.cache.misc.TestingUtil;
24
25 import javax.transaction.NotSupportedException JavaDoc;
26 import javax.transaction.SystemException JavaDoc;
27 import javax.transaction.Transaction JavaDoc;
28
29 /**
30  * Tests transactional access to a local CacheImpl, with concurrent (deadlock-prone) access.
31  * Note: we use DummpyTranasctionManager to replace jta
32  *
33  * @version $Id: DeadlockTest.java,v 1.8 2006/12/30 17:49:53 msurtani Exp $
34  */

35 public class DeadlockTest extends TestCase
36 {
37    CacheImpl cache = null;
38    Exception JavaDoc thread_ex;
39
40    final Fqn NODE = Fqn.fromString("/a/b/c");
41    final Fqn PARENT_NODE = Fqn.fromString("/a/b");
42    final Fqn FQN1 = NODE;
43    final Fqn FQN2 = Fqn.fromString("/1/2/3");
44    final Log log = LogFactory.getLog(DeadlockTest.class);
45
46
47    public DeadlockTest(String JavaDoc name)
48    {
49       super(name);
50    }
51
52    public void setUp() throws Exception JavaDoc
53    {
54       super.setUp();
55       DummyTransactionManager.getInstance();
56       cache = new CacheImpl();
57       cache.getConfiguration().setInitialStateRetrievalTimeout(10000);
58       cache.getConfiguration().setClusterName("test");
59       cache.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL);
60       cache.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
61       cache.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
62       cache.getConfiguration().setLockAcquisitionTimeout(3000);
63       cache.create();
64       cache.start();
65       thread_ex = null;
66    }
67
68
69    public void tearDown() throws Exception JavaDoc
70    {
71       super.tearDown();
72       if (cache != null)
73       {
74          cache.stop();
75       }
76       if (thread_ex != null)
77       {
78          throw thread_ex;
79       }
80    }
81
82
83    public void testConcurrentUpgrade() throws CacheException, InterruptedException JavaDoc
84    {
85       MyThread t1 = new MyThreadTimeout("MyThread#1", NODE);
86       MyThread t2 = new MyThread("MyThread#2", NODE);
87
88       cache.put(NODE, null);
89
90       t1.start();
91       t2.start();
92
93       TestingUtil.sleepThread((long) 5000);
94
95       synchronized (t1)
96       {
97          t1.notify();// t1 will now try to upgrade RL to WL, but fails b/c t2 still has a RL
98
}
99
100       TestingUtil.sleepThread((long) 5000);
101
102       synchronized (t2)
103       {
104          t2.notify();// t1 should now be able to upgrade because t1 was rolled back (RL was removed)
105
}
106
107       t1.join();
108       t2.join();
109    }
110
111
112    /**
113     * Typical deadlock: t1 acquires WL on /a/b/c, t2 WL on /1/2/3, then t1 attempts to get WL on /1/2/3 (locked by t2),
114     * and t2 tries to acquire WL on /a/b/c. One (or both) of the 2 transactions is going to timeout and roll back.
115     */

116    public void testPutDeadlock() throws CacheException, InterruptedException JavaDoc
117    {
118       MyPutter t1 = new MyPutterTimeout("MyPutter#1", FQN1, FQN2);
119       MyPutter t2 = new MyPutter("MyPutter#2", FQN2, FQN1);
120
121       cache.put(FQN1, null);
122       cache.put(FQN2, null);
123
124       t1.start();
125       t2.start();
126
127       TestingUtil.sleepThread((long) 1000);
128
129       synchronized (t1)
130       {
131          t1.notify();// t1 will now try to acquire WL on /1/2/3 (held by t2) - this will time out
132
}
133
134       TestingUtil.sleepThread((long) 1000);
135
136       synchronized (t2)
137       {
138          t2.notify();// t2 tries to acquire WL on /a/b/c (held by t1)
139
}
140
141       t1.join();
142       t2.join();
143    }
144
145    /*
146
147     Commented out since JBCACHE-875 onwards, this will end up in a classic deadlock since we lock parent when removing a child.
148
149       public void testCreateIfNotExistsLogic() throws CacheException, InterruptedException
150       {
151          cache.put(NODE, null);
152
153          class T0 extends GenericThread
154          {
155             public T0(String name)
156             {
157                super(name);
158             }
159
160             protected void _run() throws Exception
161             {
162                Transaction myTx = startTransaction();
163                log("put(" + NODE + ")");
164                cache.put(NODE, null);
165                log("put(" + NODE + "): OK");
166
167                synchronized (this) {wait();}
168
169                log("remove(" + NODE + ")");
170                cache.remove(NODE);
171                log("remove(" + NODE + "): OK");
172
173                log("committing TX");
174                myTx.commit();
175             }
176          }
177
178          class T1 extends GenericThread
179          {
180             public T1(String name)
181             {
182                super(name);
183             }
184
185             protected void _run() throws Exception
186             {
187                Transaction myTx = startTransaction();
188                log("put(" + NODE + ")");
189                cache.put(NODE, null);
190                log("put(" + NODE + "): OK");
191
192                log("committing TX");
193                myTx.commit();
194             }
195
196          }
197
198          T0 t0 = new T0("T0");
199          t0.start();
200          TestingUtil.sleepThread((long) 500);
201          T1 t1 = new T1("T1");
202          t1.start();
203          TestingUtil.sleepThread((long) 500);
204          synchronized (t0)
205          {
206             t0.notify();
207          }
208          t0.join();
209          t1.join();
210       }
211
212    */

213
214    public void testMoreThanOneUpgrader() throws Exception JavaDoc
215    {
216       final int NUM = 2;
217       final Object JavaDoc lock = new Object JavaDoc();
218
219       cache.put(NODE, "bla", "blo");
220
221       MyUpgrader[] upgraders = new MyUpgrader[NUM];
222       for (int i = 0; i < upgraders.length; i++)
223       {
224          upgraders[i] = new MyUpgrader("Upgrader#" + i, NODE, lock);
225          upgraders[i].start();
226       }
227
228       TestingUtil.sleepThread((long) 1000);
229       log("locks: " + cache.printLockInfo());
230
231       synchronized (lock)
232       {
233          lock.notifyAll();
234       }
235
236       // all threads now try to upgrade the RL to a WL
237
for (int i = 0; i < upgraders.length; i++)
238       {
239          MyThread upgrader = upgraders[i];
240          upgrader.join();
241       }
242    }
243
244
245    public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException JavaDoc
246    {
247       ContinuousPutter putter = new ContinuousPutter("Putter", NODE);
248       ContinuousRemover remover = new ContinuousRemover("Remover", PARENT_NODE);
249       putter.start();
250       remover.start();
251       TestingUtil.sleepThread((long) 5000);
252       log("stopping Putter");
253       putter.looping = false;
254       log("stopping Remover");
255       remover.looping = false;
256       putter.join();
257       remover.join();
258    }
259
260    public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException JavaDoc
261    {
262       ContinuousPutter putter = new ContinuousPutter("Putter", PARENT_NODE);
263       ContinuousRemover remover = new ContinuousRemover("Remover", NODE);
264       putter.start();
265       remover.start();
266       TestingUtil.sleepThread((long) 5000);
267       log("stopping Putter");
268       putter.looping = false;
269       log("stopping Remover");
270       remover.looping = false;
271       putter.join();
272       remover.join();
273    }
274
275    public void testPutsAndRemovesOnSameNode() throws InterruptedException JavaDoc
276    {
277       ContinuousPutter putter = new ContinuousPutter("Putter", NODE);
278       ContinuousRemover remover = new ContinuousRemover("Remover", NODE);
279       putter.start();
280       remover.start();
281       TestingUtil.sleepThread((long) 5000);
282       log("stopping Putter");
283       putter.looping = false;
284       log("stopping Remover");
285       remover.looping = false;
286       putter.join();
287       remover.join();
288    }
289
290
291    class GenericThread extends Thread JavaDoc
292    {
293       protected Transaction JavaDoc tx;
294       protected boolean looping = true;
295
296       public GenericThread()
297       {
298
299       }
300
301       public GenericThread(String JavaDoc name)
302       {
303          super(name);
304       }
305
306       public void setLooping(boolean looping)
307       {
308          this.looping = looping;
309       }
310
311       public void run()
312       {
313          try
314          {
315             _run();
316          }
317          catch (Exception JavaDoc t)
318          {
319             System.out.println(getName() + ": " + t);
320             if (thread_ex == null)
321             {
322                thread_ex = t;
323             }
324          }
325          if (log.isTraceEnabled())
326          {
327             log.trace("Thread " + getName() + " terminated");
328          }
329       }
330
331       protected void _run() throws Exception JavaDoc
332       {
333          throw new UnsupportedOperationException JavaDoc();
334       }
335    }
336
337
338    class ContinuousRemover extends GenericThread
339    {
340       Fqn fqn;
341
342       public ContinuousRemover(String JavaDoc name, Fqn fqn)
343       {
344          super(name);
345          this.fqn = fqn;
346       }
347
348
349       protected void _run() throws Exception JavaDoc
350       {
351          while (thread_ex == null && looping)
352          {
353             try
354             {
355                if (interrupted())
356                {
357                   break;
358                }
359                tx = startTransaction();
360                log("remove(" + fqn + ")");
361                cache.remove(fqn);
362                sleep(random(20));
363                tx.commit();
364             }
365             catch (InterruptedException JavaDoc interrupted)
366             {
367                tx.rollback();
368                break;
369             }
370             catch (Exception JavaDoc ex)
371             {
372                tx.rollback();
373                throw ex;
374             }
375          }
376       }
377    }
378
379    class ContinuousPutter extends GenericThread
380    {
381       Fqn fqn;
382
383       public ContinuousPutter(String JavaDoc name, Fqn fqn)
384       {
385          super(name);
386          this.fqn = fqn;
387       }
388
389
390       protected void _run() throws Exception JavaDoc
391       {
392          while (thread_ex == null && looping)
393          {
394             try
395             {
396                if (interrupted())
397                {
398                   break;
399                }
400                tx = startTransaction();
401                log("put(" + fqn + ")");
402                cache.put(fqn, "foo", "bar");
403                sleep(random(20));
404                tx.commit();
405             }
406             catch (InterruptedException JavaDoc interrupted)
407             {
408                tx.rollback();
409                break;
410             }
411             catch (Exception JavaDoc ex)
412             {
413                tx.rollback();
414                throw ex;
415             }
416          }
417       }
418    }
419
420    public static long random(long range)
421    {
422       return (long) ((Math.random() * 100000) % range) + 1;
423    }
424
425
426    class MyThread extends GenericThread
427    {
428       Fqn fqn;
429
430
431       public MyThread(String JavaDoc name, Fqn fqn)
432       {
433          super(name);
434          this.fqn = fqn;
435       }
436
437       protected void _run() throws Exception JavaDoc
438       {
439          tx = startTransaction();
440          log("get(" + fqn + ")");
441          cache.get(fqn, "bla");// acquires RL
442
log("done, locks: " + cache.printLockInfo());
443
444          synchronized (this) {wait();}
445
446          log("put(" + fqn + ")");
447          cache.put(fqn, "key", "val");// need to upgrade RL to WL
448
log("done, locks: " + cache.printLockInfo());
449          tx.commit();
450          log("committed TX, locks: " + cache.printLockInfo());
451       }
452    }
453
454
455    class MyUpgrader extends MyThread
456    {
457       Object JavaDoc lock;
458
459       public MyUpgrader(String JavaDoc name, Fqn fqn)
460       {
461          super(name, fqn);
462       }
463
464       public MyUpgrader(String JavaDoc name, Fqn fqn, Object JavaDoc lock)
465       {
466          super(name, fqn);
467          this.lock = lock;
468       }
469
470       protected void _run() throws Exception JavaDoc
471       {
472          tx = startTransaction();
473          try
474          {
475             log("get(" + fqn + ")");
476
477             cache.get(fqn, "bla");// acquires RL
478

479             synchronized (lock) {lock.wait();}
480
481             log("put(" + fqn + ")");
482             cache.put(fqn, "key", "val");// need to upgrade RL to WL
483
log("done, locks: " + cache.printLockInfo());
484             tx.commit();
485             log("committed TX, locks: " + cache.printLockInfo());
486          }
487          catch (UpgradeException upge)
488          {
489             log("Exception upgrading lock");
490             tx.rollback();
491          }
492       }
493    }
494
495    class MyThreadTimeout extends MyThread
496    {
497
498       public MyThreadTimeout(String JavaDoc name, Fqn fqn)
499       {
500          super(name, fqn);
501       }
502
503       protected void _run() throws Exception JavaDoc
504       {
505          try
506          {
507             super._run();
508          }
509          catch (UpgradeException upgradeEx)
510          {
511             log("received UpgradeException as expected");
512             tx.rollback();
513             log("rolled back TX, locks: " + cache.printLockInfo());
514          }
515          catch (TimeoutException timeoutEx)
516          {
517             log("received TimeoutException as expected");
518             tx.rollback();
519             log("rolled back TX, locks: " + cache.printLockInfo());
520          }
521       }
522    }
523
524
525    class MyPutter extends GenericThread
526    {
527       Fqn fqn1, fqn2;
528
529       public MyPutter(String JavaDoc name, Fqn fqn1, Fqn fqn2)
530       {
531          super(name);
532          this.fqn1 = fqn1;
533          this.fqn2 = fqn2;
534       }
535
536       protected void _run() throws Exception JavaDoc
537       {
538          tx = startTransaction();
539          log("put(" + fqn1 + ")");
540          cache.put(fqn1, "key", "val");// need to upgrade RL to WL
541
log("done, locks: " + cache.printLockInfo());
542          synchronized (this) {wait();}
543          log("put(" + fqn2 + ")");
544          cache.put(fqn2, "key", "val");// need to upgrade RL to WL
545
log("done, locks: " + cache.printLockInfo());
546          tx.commit();
547          log("committed TX, locks: " + cache.printLockInfo());
548       }
549    }
550
551    class MyPutterTimeout extends MyPutter
552    {
553
554       public MyPutterTimeout(String JavaDoc name, Fqn fqn1, Fqn fqn2)
555       {
556          super(name, fqn1, fqn2);
557       }
558
559       protected void _run() throws Exception JavaDoc
560       {
561          try
562          {
563             super._run();
564          }
565          catch (TimeoutException timeoutEx)
566          {
567             log("received TimeoutException as expected");
568             tx.rollback();
569             log("rolled back TX, locks: " + cache.printLockInfo());
570          }
571       }
572    }
573
574
575    private static void log(String JavaDoc msg)
576    {
577       System.out.println(Thread.currentThread().getName() + ": " + msg);
578    }
579
580
581    Transaction JavaDoc startTransaction() throws SystemException JavaDoc, NotSupportedException JavaDoc
582    {
583       DummyTransactionManager mgr = DummyTransactionManager.getInstance();
584       mgr.begin();
585       Transaction JavaDoc tx = mgr.getTransaction();
586       return tx;
587    }
588
589
590    public static Test suite() throws Exception JavaDoc
591    {
592       return new TestSuite(DeadlockTest.class);
593    }
594
595    public static void main(String JavaDoc[] args) throws Exception JavaDoc
596    {
597       junit.textui.TestRunner.run(suite());
598    }
599
600
601 }
602
Popular Tags