KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > replicated > SyncReplTxTest


1 /*
2  *
3  * JBoss, the OpenSource J2EE webOS
4  *
5  * Distributable under LGPL license.
6  * See terms of license at gnu.org.
7  */

8 package org.jboss.cache.replicated;
9
10 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
11 import junit.framework.Test;
12 import junit.framework.TestCase;
13 import junit.framework.TestSuite;
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.jboss.cache.AbstractCacheListener;
17 import org.jboss.cache.Cache;
18 import org.jboss.cache.CacheException;
19 import org.jboss.cache.CacheImpl;
20 import org.jboss.cache.Fqn;
21 import org.jboss.cache.config.Configuration;
22 import org.jboss.cache.lock.IsolationLevel;
23 import org.jboss.cache.lock.TimeoutException;
24 import org.jboss.cache.misc.TestingUtil;
25 import org.jboss.cache.transaction.DummyTransactionManager;
26
27 import javax.naming.Context JavaDoc;
28 import javax.transaction.NotSupportedException JavaDoc;
29 import javax.transaction.RollbackException JavaDoc;
30 import javax.transaction.Status JavaDoc;
31 import javax.transaction.Synchronization JavaDoc;
32 import javax.transaction.SystemException JavaDoc;
33 import javax.transaction.Transaction JavaDoc;
34 import javax.transaction.TransactionManager JavaDoc;
35 import java.util.ArrayList JavaDoc;
36 import java.util.List JavaDoc;
37 import java.util.Map JavaDoc;
38
39 /**
40  * Replicated unit test for sync transactional CacheImpl
41  * Note: we use DummyTransactionManager for Tx purpose instead of relying on
42  * jta.
43  *
44  * @version $Revision: 1.18 $
45  */

46 public class SyncReplTxTest extends TestCase
47 {
48    private static Log log = LogFactory.getLog(SyncReplTxTest.class);
49    private CacheImpl cache1;
50    private CacheImpl cache2;
51
52    private String JavaDoc old_factory = null;
53    private final String JavaDoc FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
54    private FIFOSemaphore lock = new FIFOSemaphore(1);
55    private DummyTransactionManager tx_mgr;
56    private Throwable JavaDoc t1_ex;
57    private Throwable JavaDoc t2_ex;
58
59
60    public SyncReplTxTest(String JavaDoc name)
61    {
62       super(name);
63    }
64
65    public void setUp() throws Exception JavaDoc
66    {
67       super.setUp();
68       old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
69       System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
70       t1_ex = t2_ex = null;
71    }
72
73    public void tearDown() throws Exception JavaDoc
74    {
75       super.tearDown();
76       DummyTransactionManager.destroy();
77       destroyCaches();
78       if (old_factory != null)
79       {
80          System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
81          old_factory = null;
82       }
83    }
84
85    private Transaction JavaDoc beginTransaction() throws SystemException JavaDoc, NotSupportedException JavaDoc
86    {
87       DummyTransactionManager mgr = DummyTransactionManager.getInstance();
88       mgr.begin();
89       return mgr.getTransaction();
90    }
91
92    private void initCaches(Configuration.CacheMode caching_mode) throws Exception JavaDoc
93    {
94       cache1 = new CacheImpl();
95       cache2 = new CacheImpl();
96       cache1.getConfiguration().setCacheMode(caching_mode);
97       cache2.getConfiguration().setCacheMode(caching_mode);
98       cache1.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
99       cache2.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
100
101       cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
102       cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
103       cache1.getConfiguration().setLockAcquisitionTimeout(5000);
104       cache2.getConfiguration().setLockAcquisitionTimeout(5000);
105
106       configureMultiplexer(cache1);
107       configureMultiplexer(cache2);
108
109       cache1.start();
110       cache2.start();
111
112       validateMultiplexer(cache1);
113       validateMultiplexer(cache2);
114    }
115
116    /**
117     * Provides a hook for multiplexer integration. This default implementation
118     * is a no-op; subclasses that test mux integration would override
119     * to integrate the given cache with a multiplexer.
120     * <p/>
121     * param cache a cache that has been configured but not yet created.
122     */

123    protected void configureMultiplexer(Cache cache) throws Exception JavaDoc
124    {
125       // default does nothing
126
}
127
128    /**
129     * Provides a hook to check that the cache's channel came from the
130     * multiplexer, or not, as expected. This default impl asserts that
131     * the channel did not come from the multiplexer.
132     *
133     * @param cache a cache that has already been started
134     */

135    protected void validateMultiplexer(Cache cache)
136    {
137       assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer());
138    }
139
140    private void destroyCaches()
141    {
142       if (cache1 != null)
143          cache1.stop();
144       if (cache2 != null)
145          cache2.stop();
146       cache1 = null;
147       cache2 = null;
148    }
149
150    public void testLockRemoval() throws Exception JavaDoc
151    {
152       initCaches(Configuration.CacheMode.REPL_SYNC);
153       cache1.getConfiguration().setSyncCommitPhase(true);
154       cache1.releaseAllLocks("/");
155       Transaction JavaDoc tx = beginTransaction();
156       cache1.put("/bela/ban", "name", "Bela Ban");
157       assertEquals(3, cache1.getNumberOfLocksHeld());
158       assertEquals(0, cache2.getNumberOfLocksHeld());
159       tx.commit();
160       assertEquals(0, cache1.getNumberOfLocksHeld());
161       assertEquals(0, cache2.getNumberOfLocksHeld());
162    }
163
164
165    public void testSyncRepl() throws Exception JavaDoc
166    {
167       Integer JavaDoc age;
168       Transaction JavaDoc tx;
169
170       try
171       {
172          initCaches(Configuration.CacheMode.REPL_SYNC);
173          cache1.getConfiguration().setSyncCommitPhase(true);
174          cache2.getConfiguration().setSyncCommitPhase(true);
175
176          // assertEquals(2, cache1.getMembers().size());
177

178          tx = beginTransaction();
179          cache1.put("/a/b/c", "age", 38);
180          TransactionManager mgr = cache1.getTransactionManager();
181          tx = mgr.suspend();
182          assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
183          log.debug("cache1: locks held before commit: " + cache1.printLockInfo());
184          log.debug("cache2: locks held before commit: " + cache2.printLockInfo());
185          mgr.resume(tx);
186          tx.commit();
187          log.debug("cache1: locks held after commit: " + cache1.printLockInfo());
188          log.debug("cache2: locks held after commit: " + cache2.printLockInfo());
189
190          // value on cache2 must be 38
191
age = (Integer JavaDoc) cache2.get("/a/b/c", "age");
192          assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
193          assertTrue("\"age\" must be 38", age == 38);
194       }
195       catch (Exception JavaDoc e)
196       {
197          fail(e.toString());
198       }
199    }
200
201    /**
202     * @throws Exception
203     */

204    public void testSimplePut() throws Exception JavaDoc
205    {
206       initCaches(Configuration.CacheMode.REPL_SYNC);
207
208       cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang");
209
210       cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", 10);
211    }
212
213
214    public void testSimpleTxPut() throws Exception JavaDoc
215    {
216       Transaction JavaDoc tx;
217       final Fqn NODE1 = Fqn.fromString("/one/two/three");
218       initCaches(Configuration.CacheMode.REPL_SYNC);
219
220       tx = beginTransaction();
221       cache1.put(NODE1, "age", 38);
222       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
223       tx.commit();
224
225       /*
226       tx=beginTransaction();
227       cache1.put(NODE1, "age", new Integer(38));
228       cache1.put(NODE2, "name", "Ben of The Far East");
229       cache1.put(NODE3, "key", "UnknowKey");
230       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
231
232       tx.commit();
233       */

234
235       /*
236       tx=beginTransaction();
237       cache1.put(NODE1, "age", new Integer(38));
238       cache1.put(NODE1, "AOPInstance", new AOPInstance());
239       cache1.put(NODE2, "AOPInstance", new AOPInstance());
240       cache1.put(NODE1, "AOPInstance", new AOPInstance());
241       tx.commit();
242       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
243       */

244    }
245
246    public void testSyncReplWithModficationsOnBothCaches() throws Exception JavaDoc
247    {
248       Integer JavaDoc age;
249       Transaction JavaDoc tx;
250       final Fqn NODE1 = Fqn.fromString("/one/two/three");
251       final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
252
253       initCaches(Configuration.CacheMode.REPL_SYNC);
254
255       // create roots first
256
cache1.put("/one/two", null);
257       cache2.put("/eins/zwei", null);
258
259       cache1.getConfiguration().setSyncCommitPhase(true);
260       cache2.getConfiguration().setSyncCommitPhase(true);
261
262       tx = beginTransaction();
263       cache1.put(NODE1, "age", 38);
264       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
265
266       cache2.put(NODE2, "age", 39);
267       System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
268
269       System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
270       System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
271
272       try
273       {
274          tx.commit();
275          fail("Should not succeed with SERIALIZABLE semantics");
276       }
277       catch (Exception JavaDoc e)
278       {
279          //should be a classic deadlock here.
280
}
281
282       System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
283       System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
284
285       /*
286       assertTrue(cache1.exists(NODE1));
287       assertTrue(cache1.exists(NODE2));
288       assertTrue(cache1.exists(NODE1));
289       assertTrue(cache2.exists(NODE2));
290
291       age = (Integer) cache1.get(NODE1, "age");
292       assertNotNull("\"age\" obtained from cache1 for " + NODE1 + " must be non-null ", age);
293       assertTrue("\"age\" must be 38", age == 38);
294
295       age = (Integer) cache2.get(NODE1, "age");
296       assertNotNull("\"age\" obtained from cache2 for " + NODE1 + " must be non-null ", age);
297       assertTrue("\"age\" must be 38", age == 38);
298
299       age = (Integer) cache1.get(NODE2, "age");
300       assertNotNull("\"age\" obtained from cache1 for " + NODE2 + " must be non-null ", age);
301       assertTrue("\"age\" must be 39", age == 39);
302
303       age = (Integer) cache2.get(NODE2, "age");
304       assertNotNull("\"age\" obtained from cache2 for " + NODE2 + " must be non-null ", age);
305       assertTrue("\"age\" must be 39", age == 39);
306       */

307
308       assertEquals(0, cache1.getNumberOfLocksHeld());
309       assertEquals(0, cache2.getNumberOfLocksHeld());
310       System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
311       System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
312    }
313
314    public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception JavaDoc
315    {
316       Transaction JavaDoc tx;
317       final Fqn NODE = Fqn.fromString("/one/two/three");
318       initCaches(Configuration.CacheMode.REPL_SYNC);
319       tx = beginTransaction();
320       cache1.put(NODE, "age", 38);
321       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
322
323       cache2.put(NODE, "age", 39);
324       System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
325
326       System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
327       System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
328
329       try
330       {
331          tx.commit();
332          fail("commit should throw a RollbackException, we should not get here");
333       }
334       catch (RollbackException JavaDoc rollback)
335       {
336          System.out.println("Transaction was rolled back, this is correct");
337       }
338
339       System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
340       System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
341
342       assertEquals(0, cache1.getNumberOfLocksHeld());
343       assertEquals(0, cache2.getNumberOfLocksHeld());
344
345       assertEquals(0, cache1.getNumberOfNodes());
346       assertEquals(0, cache2.getNumberOfNodes());
347    }
348
349
350    public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception JavaDoc
351    {
352       Transaction JavaDoc tx;
353       final Fqn NODE1 = Fqn.fromString("/one/two/three");
354       final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
355
356       initCaches(Configuration.CacheMode.REPL_SYNC);
357
358       cache1.getConfiguration().setSyncRollbackPhase(true);
359       cache2.getConfiguration().setSyncRollbackPhase(true);
360
361       tx = beginTransaction();
362       cache1.put(NODE1, "age", 38);
363       cache2.put(NODE2, "age", 39);
364
365       System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
366       System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
367
368       // this will rollback the transaction
369
tx.registerSynchronization(new TransactionAborter(tx));
370
371       try
372       {
373          tx.commit();
374          fail("commit should throw a RollbackException, we should not get here");
375       }
376       catch (RollbackException JavaDoc rollback)
377       {
378          System.out.println("Transaction was rolled back, this is correct");
379       }
380
381       System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
382       System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
383
384       assertEquals(0, cache1.getNumberOfLocksHeld());
385       assertEquals(0, cache2.getNumberOfLocksHeld());
386
387       assertEquals(0, cache1.getNumberOfNodes());
388       assertEquals(0, cache2.getNumberOfNodes());
389    }
390
391
392    /**
393     * Test for JBCACHE-359 -- does a callback into cache from a listener
394     * interfere with transaction rollback.
395     *
396     * @throws Exception
397     */

398    public void testSyncReplWithRollbackAndListener() throws Exception JavaDoc
399    {
400       Transaction JavaDoc tx;
401       final Fqn NODE1 = Fqn.fromString("/one/two/three");
402
403       initCaches(Configuration.CacheMode.REPL_SYNC);
404
405       cache1.getConfiguration().setSyncRollbackPhase(true);
406       cache2.getConfiguration().setSyncRollbackPhase(true);
407
408       // Test with a rollback on the sending side
409

410       CallbackListener cbl1 = new CallbackListener(cache1, "age");
411       CallbackListener cbl2 = new CallbackListener(cache2, "age");
412
413       tx = beginTransaction();
414       cache1.put(NODE1, "age", 38);
415
416       System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
417       System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
418
419       // this will rollback the transaction
420
tx.registerSynchronization(new TransactionAborter(tx));
421
422       try
423       {
424          tx.commit();
425          fail("commit should throw a RollbackException, we should not get here");
426       }
427       catch (RollbackException JavaDoc rollback)
428       {
429          rollback.printStackTrace();
430          System.out.println("Transaction was rolled back, this is correct");
431       }
432
433       // Sleep, as the rollback call to cache2 is async
434
TestingUtil.sleepThread(1000);
435
436       System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
437       System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
438
439       assertNull(cbl1.getCallbackException());
440       assertNull(cbl2.getCallbackException());
441
442       assertEquals(0, cache1.getNumberOfLocksHeld());
443       assertEquals(0, cache2.getNumberOfLocksHeld());
444
445       assertEquals(0, cache1.getNumberOfNodes());
446       assertEquals(0, cache2.getNumberOfNodes());
447
448       // Test with a rollback on the receiving side
449

450       cache2.getNotifier().removeCacheListener(cbl2);
451       // listener aborts any active tx
452
cbl2 = new TransactionAborterCallbackListener(cache2, "age");
453
454       tx = beginTransaction();
455       cache1.put(NODE1, "age", 38);
456
457       System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
458       System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
459
460       tx.commit();
461
462       // Sleep, as the commit call to cache2 is async
463
TestingUtil.sleepThread(1000);
464
465       System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
466       System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
467
468       assertNull(cbl1.getCallbackException());
469       assertNull(cbl2.getCallbackException());
470
471       assertEquals(0, cache1.getNumberOfLocksHeld());
472       assertEquals(0, cache2.getNumberOfLocksHeld());
473
474       // cache1 didn't fail, so should have 3 nodes
475
assertEquals(3, cache1.getNumberOfNodes());
476       assertEquals(0, cache2.getNumberOfNodes());
477
478    }
479
480
481    /**
482     * Test for JBCACHE-361 -- does marking a tx on the remote side
483     * rollback-only cause a rollback on the originating side?
484     *
485     * @throws Exception
486     */

487    public void testSyncReplWithRemoteRollback() throws Exception JavaDoc
488    {
489       Transaction JavaDoc tx;
490       final Fqn NODE1 = Fqn.fromString("/one/two/three");
491
492       initCaches(Configuration.CacheMode.REPL_SYNC);
493
494       cache1.getConfiguration().setSyncRollbackPhase(true);
495       cache2.getConfiguration().setSyncRollbackPhase(true);
496
497       // Test with a rollback on the remote side
498

499       // listener aborts any active tx
500
TransactionAborterListener tal = new TransactionAborterListener(cache2);
501
502       tx = beginTransaction();
503       cache1.put(NODE1, "age", 38);
504
505       System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
506       System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
507
508       try
509       {
510          tx.commit();
511          fail("commit should throw a RollbackException, we should not get here");
512       }
513       catch (RollbackException JavaDoc rollback)
514       {
515          System.out.println("Transaction was rolled back, this is correct");
516       }
517
518       // Sleep, as the commit call to cache2 is async
519
TestingUtil.sleepThread(1000);
520
521       System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
522       System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
523
524       assertNull(tal.getCallbackException());
525
526       assertEquals(0, cache1.getNumberOfLocksHeld());
527       assertEquals(0, cache2.getNumberOfLocksHeld());
528
529       assertEquals(0, cache1.getNumberOfNodes());
530       assertEquals(0, cache2.getNumberOfNodes());
531
532    }
533
534
535    public void testASyncRepl() throws Exception JavaDoc
536    {
537       Integer JavaDoc age;
538       Transaction JavaDoc tx;
539
540       initCaches(Configuration.CacheMode.REPL_ASYNC);
541
542       tx = beginTransaction();
543       cache1.put("/a/b/c", "age", 38);
544       Thread.sleep(1000);
545       assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
546       tx.commit();
547       Thread.sleep(1000);
548
549       // value on cache2 must be 38
550
age = (Integer JavaDoc) cache2.get("/a/b/c", "age");
551       assertNotNull("\"age\" obtained from cache2 is null ", age);
552       assertTrue("\"age\" must be 38", age == 38);
553
554    }
555
556    /**
557     * Tests concurrent modifications: thread1 succeeds and thread2 is blocked until thread1 is done, and then succeeds
558     * too. However, this is flawed with the introduction of interceptors, here's why.<br/>
559     * <ul>
560     * <li>Thread1 acquires the lock for /bela/ban on cache1
561     * <li>Thread2 blocks on Thread1 to release the lock
562     * <li>Thread1 commits: this means the TransactionInterceptor and the ReplicationInterceptor are called in
563     * the sequence in which they registered. Unfortunately, the TransactionInterceptor registered first. In the
564     * PREPARE phase, the ReplicationInterceptor calls prepare() in cache2 synchronously. The TxInterceptor
565     * does nothing. The the COMMIT phase, the TxInterceptor commits the data by releasing the locks locally and
566     * then the ReplicationInterceptor sends an asynchronous COMMIT to cache2.
567     * <li>Because the TxInterceptor for Thread1 releases the locks locally <em>before</em> sending the async COMMIT,
568     * Thread2 is able to acquire the lock for /bela/ban in cache1 and then starts the PREPARE phase by sending a
569     * synchronous PREPARE to cache2. If this PREPARE arrives at cache2 <em>before</em> the COMMIT from Thread1,
570     * the PREPARE will block because it attempts to acquire a lock on /bela/ban on cache2 still held by Thread1
571     * (which would be released by Thread1's COMMIT). This results in deadlock, which is resolved by Thread2 running
572     * into a timeout with subsequent rollback and Thread1 succeeding.<br/>
573     * </ul>
574     * There are 3 solutions to this:
575     * <ol>
576     * <li>Do nothing. This is standard behavior for concurrent access to the same data. Same thing if the 2 threads
577     * operated on the same data in <em>separate</em> caches, e.g. Thread1 on /bela/ban in cache1 and Thread2 on
578     * /bela/ban in cache2. The semantics of Tx commit as handled by the interceptors is: after tx1.commit() returns
579     * the locks held by tx1 are release and a COMMIT message is on the way (if sent asynchronously).
580     * <li>Force an order over TxInterceptor and ReplicationInterceptor. This would require ReplicationInterceptor
581     * to always be fired first on TX commit. Downside: the interceptors have an implicit dependency, which is not
582     * nice.
583     * <li>Priority-order requests at the receiver; e.g. a COMMIT could release a blocked PREPARE. This is bad because
584     * it violates JGroups' FIFO ordering guarantees.
585     * </ol>
586     * I'm currently investigating solution #2, ie. creating an OrderedSynchronizationHandler, which allows other
587     * SynchronizationHandlers to register (atHead, atTail), and the OrderedSynchronizationHandler would call the
588     * SynchronizationHandler in the order in which they are defined.
589     *
590     * @throws Exception
591     */

592    public void testConcurrentPuts() throws Exception JavaDoc
593    {
594       initCaches(Configuration.CacheMode.REPL_SYNC);
595       cache1.getConfiguration().setSyncCommitPhase(true);
596
597       Thread JavaDoc t1 = new Thread JavaDoc("Thread1")
598       {
599          Transaction JavaDoc tx;
600
601          public void run()
602          {
603             try
604             {
605                tx = beginTransaction();
606                cache1.put("/bela/ban", "name", "Bela Ban");
607                TestingUtil.sleepThread(2000); // Thread2 will be blocked until we commit
608
tx.commit();
609                System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
610                System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
611             }
612             catch (Throwable JavaDoc ex)
613             {
614                ex.printStackTrace();
615                t1_ex = ex;
616             }
617          }
618       };
619
620       Thread JavaDoc t2 = new Thread JavaDoc("Thread2")
621       {
622          Transaction JavaDoc tx;
623
624          public void run()
625          {
626             try
627             {
628                TestingUtil.sleepThread(1000); // give Thread1 time to acquire the lock
629
tx = beginTransaction();
630                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
631                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
632                cache1.put("/bela/ban", "name", "Michelle Ban");
633                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
634                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
635                tx.commit();
636                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
637                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
638             }
639             catch (Throwable JavaDoc ex)
640             {
641                ex.printStackTrace();
642                t2_ex = ex;
643             }
644          }
645       };
646
647       // Let the game start
648
t1.start();
649       t2.start();
650
651       // Wait for threads to die
652
t1.join();
653       t2.join();
654
655       if (t1_ex != null)
656          fail("Thread1 failed: " + t1_ex);
657       if (t2_ex != null)
658          fail("Thread2 failed: " + t2_ex);
659
660       assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
661    }
662
663
664    /**
665     * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
666     */

667    public void testConcurrentCommitsWith1Thread() throws Exception JavaDoc
668    {
669       _testConcurrentCommits(1);
670    }
671
672    /**
673     * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
674     */

675    public void testConcurrentCommitsWith5Threads() throws Exception JavaDoc
676    {
677       _testConcurrentCommits(5);
678    }
679
680    /**
681     * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
682     */

683    private void _testConcurrentCommits(int num_threads) throws Exception JavaDoc
684    {
685       Object JavaDoc myMutex = new Object JavaDoc();
686
687       final CacheImpl c1 = new CacheImpl();
688       final CacheImpl c2 = new CacheImpl();
689       c1.getConfiguration().setClusterName("TempCluster");
690       c2.getConfiguration().setClusterName("TempCluster");
691       c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
692       c2.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
693       c1.getConfiguration().setSyncCommitPhase(true);
694       c2.getConfiguration().setSyncCommitPhase(true);
695       c1.getConfiguration().setSyncRollbackPhase(true);
696       c2.getConfiguration().setSyncRollbackPhase(true);
697       c1.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
698       c2.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
699       c1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
700       c2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
701       c1.getConfiguration().setLockAcquisitionTimeout(5000);
702       c2.getConfiguration().setLockAcquisitionTimeout(5000);
703       c1.start();
704       c2.start();
705       final List JavaDoc<Exception JavaDoc> exceptions = new ArrayList JavaDoc<Exception JavaDoc>();
706
707       class MyThread extends Thread JavaDoc
708       {
709          Object JavaDoc mutex;
710
711          public MyThread(String JavaDoc name, Object JavaDoc mutex)
712          {
713             super(name);
714             this.mutex = mutex;
715          }
716
717          public void run()
718          {
719             Transaction JavaDoc tx = null;
720
721             try
722             {
723                tx = beginTransaction();
724                c1.put("/thread/" + getName(), null);
725                System.out.println("Thread " + getName() + " after put(): " + c1.toString());
726                System.out.println("Thread " + getName() + " waiting on mutex");
727                synchronized (mutex)
728                {
729                   mutex.wait();
730                }
731                System.out.println("Thread " + getName() + " committing");
732                tx.commit();
733                System.out.println("Thread " + getName() + " committed successfully");
734             }
735             catch (Exception JavaDoc e)
736             {
737                exceptions.add(e);
738             }
739             finally
740             {
741                try
742                {
743                   if (tx != null) tx.rollback();
744                }
745                catch (Exception JavaDoc e)
746                {
747                }
748             }
749          }
750       }
751
752       MyThread[] threads = new MyThread[num_threads];
753       for (int i = 0; i < threads.length; i++)
754       {
755          threads[i] = new MyThread("#" + i, myMutex);
756       }
757       for (int i = 0; i < threads.length; i++)
758       {
759          MyThread thread = threads[i];
760          System.out.println("starting thread #" + i);
761          thread.start();
762       }
763
764       TestingUtil.sleepThread(6000);
765       synchronized (myMutex)
766       {
767          System.out.println("cache is " + c1.printLockInfo());
768          System.out.println("******************* SIGNALLING THREADS ********************");
769          myMutex.notifyAll();
770       }
771
772       for (MyThread thread : threads)
773       {
774          try
775          {
776             thread.join();
777             System.out.println("Joined thread " + thread.getName());
778          }
779          catch (InterruptedException JavaDoc e)
780          {
781             e.printStackTrace();
782          }
783       }
784
785       System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());
786
787       assertEquals(0, c1.getNumberOfLocksHeld());
788       assertEquals(0, c2.getNumberOfLocksHeld());
789
790       c1.stop();
791       c2.stop();
792
793 // if(ex != null)
794
// {
795
// ex.printStackTrace();
796
// fail("Thread failed: " + ex);
797
// }
798

799       // we can only expect 1 thread to succeed. The others will fail. So, threads.length -1 exceptions.
800
// this is a timing issue - 2 threads still may succeed on a multi cpu system
801
// assertEquals(threads.length - 1, exceptions.size());
802

803       for (Exception JavaDoc exception : exceptions) assertEquals(TimeoutException.class, exception.getClass());
804    }
805
806
807    /**
808     * Conncurrent put on 2 different instances.
809     */

810    public void testConcurrentPutsOnTwoInstances() throws Exception JavaDoc
811    {
812       initCaches(Configuration.CacheMode.REPL_SYNC);
813       final CacheImpl c1 = this.cache1;
814       final CacheImpl c2 = this.cache2;
815
816       Thread JavaDoc t1 = new Thread JavaDoc()
817       {
818          Transaction JavaDoc tx;
819
820          public void run()
821          {
822             try
823             {
824                tx = beginTransaction();
825                c1.put("/ben/wang", "name", "Ben Wang");
826                TestingUtil.sleepThread(8000);
827                tx.commit(); // This should go thru
828
}
829             catch (Throwable JavaDoc ex)
830             {
831                ex.printStackTrace();
832                t1_ex = ex;
833             }
834          }
835       };
836
837       Thread JavaDoc t2 = new Thread JavaDoc()
838       {
839          Transaction JavaDoc tx;
840
841          public void run()
842          {
843             try
844             {
845                TestingUtil.sleepThread(1000); // give Thread1 time to acquire the lock
846
tx = beginTransaction();
847                c2.put("/ben/wang", "name", "Ben Jr.");
848                tx.commit(); // This will time out and rollback first because Thread1 has a tx going as well.
849
}
850             catch (RollbackException JavaDoc rollback_ex)
851             {
852                System.out.println("received rollback exception as expected");
853             }
854             catch (Throwable JavaDoc ex)
855             {
856                ex.printStackTrace();
857                t2_ex = ex;
858             }
859          }
860       };
861
862       // Let the game start
863
t1.start();
864       t2.start();
865
866       // Wait for thread to die but put an insurance of 5 seconds on it.
867
t1.join();
868       t2.join();
869
870       if (t1_ex != null)
871          fail("Thread1 failed: " + t1_ex);
872       if (t2_ex != null)
873          fail("Thread2 failed: " + t2_ex);
874       assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
875    }
876
877
878    public void testPut() throws Exception JavaDoc
879    {
880       initCaches(Configuration.CacheMode.REPL_SYNC);
881       final CacheImpl c1 = this.cache1;
882
883
884       Thread JavaDoc t1 = new Thread JavaDoc()
885       {
886          public void run()
887          {
888             try
889             {
890                lock.acquire();
891                System.out.println("-- t1 has lock");
892                c1.put("/a/b/c", "age", 38);
893                System.out.println("[Thread1] set value to 38");
894
895                System.out.println("-- t1 releases lock");
896                lock.release();
897                TestingUtil.sleepThread(300);
898                Thread.yield();
899
900                lock.acquire();
901                System.out.println("-- t1 has lock");
902                c1.put("/a/b/c", "age", 39);
903                System.out.println("[Thread1] set value to 39");
904
905                System.out.println("-- t1 releases lock");
906                lock.release();
907                assertEquals(39, c1.get("/a/b/c", "age"));
908             }
909             catch (Throwable JavaDoc ex)
910             {
911                ex.printStackTrace();
912                t1_ex = ex;
913             }
914             finally
915             {
916                lock.release();
917             }
918          }
919       };
920
921       Thread JavaDoc t2 = new Thread JavaDoc()
922       {
923          public void run()
924          {
925             try
926             {
927                TestingUtil.sleepThread(100);
928                Thread.yield();
929                lock.acquire();
930                System.out.println("-- t2 has lock");
931                // Should replicate the value right away.
932
Integer JavaDoc val = (Integer JavaDoc) cache2.get("/a/b/c", "age");
933                System.out.println("[Thread2] value is " + val);
934                assertEquals(new Integer JavaDoc(38), val);
935                System.out.println("-- t2 releases lock");
936                lock.release();
937                TestingUtil.sleepThread(300);
938                Thread.yield();
939                TestingUtil.sleepThread(500);
940                lock.acquire();
941                System.out.println("-- t2 has lock");
942                val = (Integer JavaDoc) cache2.get("/a/b/c", "age");
943                System.out.println("-- t2 releases lock");
944                lock.release();
945                assertEquals(new Integer JavaDoc(39), val);
946             }
947             catch (Throwable JavaDoc ex)
948             {
949                ex.printStackTrace();
950                t2_ex = ex;
951             }
952             finally
953             {
954                lock.release();
955             }
956          }
957       };
958
959       // Let the game start
960
t1.start();
961       t2.start();
962
963       // Wait for thread to die but put an insurance of 5 seconds on it.
964
t1.join();
965       t2.join();
966       if (t1_ex != null)
967          fail("Thread1 failed: " + t1_ex);
968       if (t2_ex != null)
969          fail("Thread2 failed: " + t2_ex);
970    }
971
972    /**
973     * Test replicated cache with transaction. Idea is to have two threads running
974     * a local cache each that is replicating. Depending on whether cache1 commit/rollback or not,
975     * the cache2.get will get different values.
976     * Note that we have used sleep to interpose thread execution sequence.
977     * Although it's not fool proof, it is rather simple and intuitive.
978     *
979     * @throws Exception
980     */

981    public void testPutTx() throws Exception JavaDoc
982    {
983       Transaction JavaDoc tx = null;
984
985       try
986       {
987          initCaches(Configuration.CacheMode.REPL_SYNC);
988          cache1.getConfiguration().setSyncCommitPhase(true);
989          cache2.getConfiguration().setSyncCommitPhase(true);
990          tx = beginTransaction();
991          cache1.put("/a/b/c", "age", 38);
992          cache1.put("/a/b/c", "age", 39);
993          Object JavaDoc val = cache2.get("/a/b/c", "age"); // must be null as not yet committed
994
assertNull(val);
995          tx.commit();
996
997          tx = beginTransaction();
998          assertEquals(39, cache2.get("/a/b/c", "age")); // must not be null
999
tx.commit();
1000      }
1001      catch (Throwable JavaDoc t)
1002      {
1003         t.printStackTrace();
1004         t1_ex = t;
1005      }
1006      finally
1007      {
1008         lock.release();
1009      }
1010   }
1011
1012
1013   /**
1014    * Have both cache1 and cache2 do add and commit. cache1 commit should time out
1015    * since it can't obtain the lock when trying to replicate cache2. On the other hand,
1016    * cache2 commit will succeed since now that cache1 is rollbacked and lock is
1017    * released.
1018    */

1019   public void testPutTx1() throws Exception JavaDoc
1020   {
1021      initCaches(Configuration.CacheMode.REPL_SYNC);
1022      final CacheImpl c1 = this.cache1;
1023      Thread JavaDoc t1 = new Thread JavaDoc()
1024      {
1025         public void run()
1026         {
1027            Transaction JavaDoc tx = null;
1028
1029            try
1030            {
1031               lock.acquire();
1032               tx = beginTransaction();
1033               c1.put("/a/b/c", "age", 38);
1034               c1.put("/a/b/c", "age", 39);
1035               lock.release();
1036
1037               TestingUtil.sleepThread(300);
1038               lock.acquire();
1039               try
1040               {
1041                  tx.commit();
1042               }
1043               catch (RollbackException JavaDoc ex)
1044               {
1045                  System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
1046               }
1047               finally
1048               {
1049                  lock.release();
1050               }
1051            }
1052            catch (Throwable JavaDoc ex)
1053            {
1054               ex.printStackTrace();
1055               t1_ex = ex;
1056            }
1057            finally
1058            {
1059               lock.release();
1060            }
1061         }
1062      };
1063
1064      Thread JavaDoc t2 = new Thread JavaDoc()
1065      {
1066         public void run()
1067         {
1068            Transaction JavaDoc tx = null;
1069
1070            try
1071            {
1072               sleep(200);
1073               Thread.yield();
1074               lock.acquire();
1075               tx = beginTransaction();
1076               assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
1077
cache2.put("/a/b/c", "age", 40);
1078               lock.release();
1079
1080               TestingUtil.sleepThread(300);
1081               lock.acquire();
1082               assertEquals(40, cache2.get("/a/b/c", "age")); // must not be null
1083
tx.commit();
1084               lock.release();
1085
1086               TestingUtil.sleepThread(1000);
1087               tx = beginTransaction();
1088               assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
1089               tx.commit();
1090            }
1091            catch (Throwable JavaDoc ex)
1092            {
1093               ex.printStackTrace();
1094               t2_ex = ex;
1095            }
1096            finally
1097            {
1098               lock.release();
1099            }
1100         }
1101      };
1102
1103      // Let the game start
1104
t1.start();
1105      t2.start();
1106
1107      t1.join();
1108      t2.join();
1109
1110      if (t1_ex != null)
1111         fail("Thread1 failed: " + t1_ex);
1112      if (t2_ex != null)
1113         fail("Thread2 failed: " + t2_ex);
1114   }
1115
1116
1117   public void testPutTxWithRollback() throws Exception JavaDoc
1118   {
1119      initCaches(Configuration.CacheMode.REPL_SYNC);
1120      final CacheImpl c2 = this.cache1;
1121      Thread JavaDoc t1 = new Thread JavaDoc()
1122      {
1123         public void run()
1124         {
1125            Transaction JavaDoc tx = null;
1126
1127            try
1128            {
1129               lock.acquire();
1130               tx = beginTransaction();
1131               c2.put("/a/b/c", "age", 38);
1132               c2.put("/a/b/c", "age", 39);
1133               lock.release();
1134
1135               TestingUtil.sleepThread(100);
1136               lock.acquire();
1137               tx.rollback();
1138               lock.release();
1139            }
1140            catch (Throwable JavaDoc ex)
1141            {
1142               ex.printStackTrace();
1143               t1_ex = ex;
1144            }
1145            finally
1146            {
1147               lock.release();
1148            }
1149         }
1150      };
1151
1152      Thread JavaDoc t2 = new Thread JavaDoc()
1153      {
1154         public void run()
1155         {
1156            Transaction JavaDoc tx = null;
1157
1158            try
1159            {
1160               sleep(200);
1161               Thread.yield();
1162               lock.acquire();
1163               tx = beginTransaction();
1164               assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
1165
lock.release();
1166
1167               TestingUtil.sleepThread(100);
1168               lock.acquire();
1169               assertNull(cache2.get("/a/b/c", "age")); // must be null as rolledback
1170
tx.commit();
1171               lock.release();
1172            }
1173            catch (Throwable JavaDoc ex)
1174            {
1175               ex.printStackTrace();
1176               t2_ex = ex;
1177            }
1178            finally
1179            {
1180               lock.release();
1181            }
1182         }
1183      };
1184
1185      // Let the game start
1186
t1.start();
1187      t2.start();
1188
1189      // Wait for thread to die but put an insurance of 5 seconds on it.
1190
t1.join();
1191      t2.join();
1192      if (t1_ex != null)
1193         fail("Thread1 failed: " + t1_ex);
1194      if (t2_ex != null)
1195         fail("Thread2 failed: " + t2_ex);
1196   }
1197
1198
1199   static class TransactionAborter implements Synchronization JavaDoc
1200   {
1201      Transaction JavaDoc ltx = null;
1202
1203      public TransactionAborter(Transaction JavaDoc ltx)
1204      {
1205         this.ltx = ltx;
1206      }
1207
1208      public void beforeCompletion()
1209      {
1210         try
1211         {
1212            ltx.setRollbackOnly();
1213         }
1214         catch (SystemException JavaDoc e)
1215         {
1216            // who cares
1217
}
1218      }
1219
1220      public void afterCompletion(int status)
1221      {
1222      }
1223   }
1224
1225   static class CallbackListener extends AbstractCacheListener
1226   {
1227
1228      CacheImpl callbackCache;
1229      Object JavaDoc callbackKey;
1230      Exception JavaDoc ex;
1231      Object JavaDoc mutex = new Object JavaDoc();
1232
1233      CallbackListener(CacheImpl cache, Object JavaDoc callbackKey)
1234      {
1235         this.callbackCache = cache;
1236         this.callbackKey = callbackKey;
1237         cache.getNotifier().addCacheListener(this);
1238      }
1239
1240      public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map JavaDoc data)
1241      {
1242         if (!pre)
1243         {
1244            // Lock on a mutex so test can't check for an exception
1245
// until the get call completes
1246
synchronized (mutex)
1247            {
1248               try
1249               {
1250                  callbackCache.get(fqn, callbackKey);
1251               }
1252               catch (CacheException e)
1253               {
1254                  e.printStackTrace();
1255                  ex = e;
1256               }
1257            }
1258         }
1259      }
1260
1261      Exception JavaDoc getCallbackException()
1262      {
1263         synchronized (mutex)
1264         {
1265            return ex;
1266         }
1267      }
1268
1269   }
1270
1271   static class TransactionAborterCallbackListener extends CallbackListener
1272   {
1273
1274      TransactionManager callbackTM;
1275
1276      TransactionAborterCallbackListener(CacheImpl cache, Object JavaDoc callbackKey)
1277      {
1278         super(cache, callbackKey);
1279         callbackTM = callbackCache.getTransactionManager();
1280      }
1281
1282      public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map JavaDoc data)
1283      {
1284         if (!pre)
1285         {
1286            try
1287            {
1288               Transaction JavaDoc tx = callbackTM.getTransaction();
1289               if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
1290               {
1291                  // this will rollback the transaction
1292
tx.registerSynchronization(new TransactionAborter(tx));
1293               }
1294               else
1295               {
1296                  super.nodeModified(fqn, pre, isLocal, modType, data);
1297               }
1298
1299            }
1300            catch (Exception JavaDoc e)
1301            {
1302               e.printStackTrace();
1303               if (ex == null)
1304                  ex = e;
1305            }
1306         }
1307      }
1308
1309   }
1310
1311   static class TransactionAborterListener extends AbstractCacheListener
1312   {
1313
1314      TransactionManager callbackTM;
1315      Object JavaDoc mutex = new Object JavaDoc();
1316      Exception JavaDoc ex;
1317
1318      TransactionAborterListener(CacheImpl cache)
1319      {
1320         callbackTM = cache.getTransactionManager();
1321         cache.getNotifier().addCacheListener(this);
1322      }
1323
1324      public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map JavaDoc data)
1325      {
1326         if (!pre)
1327         {
1328            synchronized (mutex)
1329            {
1330               try
1331               {
1332                  Transaction JavaDoc tx = callbackTM.getTransaction();
1333                  if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
1334                  {
1335                     // this will rollback the transaction
1336
tx.setRollbackOnly();
1337                  }
1338               }
1339               catch (Exception JavaDoc e)
1340               {
1341                  e.printStackTrace();
1342                  if (ex == null)
1343                     ex = e;
1344               }
1345            }
1346         }
1347      }
1348
1349      Exception JavaDoc getCallbackException()
1350      {
1351         synchronized (mutex)
1352         {
1353            return ex;
1354         }
1355      }
1356
1357   }
1358
1359   public static Test suite()
1360   {
1361// return getDeploySetup(SyncTxUnitTestCase.class, "cachetest.jar");
1362
return new TestSuite(SyncReplTxTest.class);
1363   }
1364
1365
1366}
1367
Popular Tags