KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > tests > replicated > SyncReplTxTest


1 /*
2  *
3  * JBoss, the OpenSource J2EE webOS
4  *
5  * Distributable under LGPL license.
6  * See terms of license at gnu.org.
7  */

8 package org.jboss.cache.tests.replicated;
9
10 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
11 import junit.framework.Test;
12 import junit.framework.TestCase;
13 import junit.framework.TestSuite;
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.jboss.cache.Fqn;
17 import org.jboss.cache.TreeCache;
18 import org.jboss.cache.lock.IsolationLevel;
19 import org.jboss.cache.transaction.DummyTransactionManager;
20
21 import javax.naming.Context JavaDoc;
22 import javax.transaction.*;
23
24 /**
25  * Replicated unit test for sync transactional TreeCache
26  * Note: we use DummyTransactionManager for Tx purpose instead of relying on
27  * jta.
28  *
29  * @version $Revision: 1.2 $
30  */

31 public class SyncReplTxTest extends TestCase {
32    TreeCache cache1, cache2;
33    int caching_mode=TreeCache.REPL_SYNC;
34    final String JavaDoc group_name="TreeCacheTestGroup";
35    String JavaDoc props=
36          "UDP(ip_mcast=true;ip_ttl=64;loopback=false;mcast_addr=228.1.2.3;" +
37          "mcast_port=45566;mcast_recv_buf_size=80000;mcast_send_buf_size=150000;" +
38          "ucast_recv_buf_size=80000;ucast_send_buf_size=150000):" +
39          "PING(down_thread=true;num_initial_members=2;timeout=500;up_thread=true):" +
40          "MERGE2(max_interval=20000;min_interval=10000):" +
41          "FD(down_thread=true;shun=true;up_thread=true):" +
42          "VERIFY_SUSPECT(down_thread=true;timeout=1500;up_thread=true):" +
43          "pbcast.NAKACK(down_thread=true;gc_lag=50;retransmit_timeout=600,1200,2400,4800;" +
44          "up_thread=true):" +
45          "pbcast.STABLE(desired_avg_gossip=20000;down_thread=true;up_thread=true):" +
46          "UNICAST(down_thread=true;min_threshold=10;timeout=600,1200,2400;window_size=100):" +
47          "FRAG(down_thread=true;frag_size=8192;up_thread=true):" +
48          "pbcast.GMS(join_retry_timeout=2000;join_timeout=5000;print_local_addr=true;shun=true):" +
49          "pbcast.STATE_TRANSFER(down_thread=true;up_thread=true)";
50
51    final static Log log_=LogFactory.getLog(SyncReplTxTest.class);
52    String JavaDoc old_factory=null;
53    final String JavaDoc FACTORY="org.jboss.cache.transaction.DummyContextFactory";
54    FIFOSemaphore lock=new FIFOSemaphore(1);
55    DummyTransactionManager tx_mgr;
56    Throwable JavaDoc t1_ex, t2_ex, ex=null;
57
58
59
60    public SyncReplTxTest(String JavaDoc name) {
61       super(name);
62    }
63
64    public void setUp() throws Exception JavaDoc {
65       super.setUp();
66       old_factory=System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
67       System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
68       tx_mgr=DummyTransactionManager.getInstance();
69       t1_ex=t2_ex=ex=null;
70    }
71
72    public void tearDown() throws Exception JavaDoc {
73       super.tearDown();
74       DummyTransactionManager.destroy();
75       destroyCaches();
76       if(old_factory != null) {
77          System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
78          old_factory=null;
79       }
80    }
81
82    Transaction beginTransaction() throws SystemException, NotSupportedException {
83       DummyTransactionManager mgr=DummyTransactionManager.getInstance();
84       mgr.begin();
85       Transaction tx=mgr.getTransaction();
86       return tx;
87    }
88
89    void initCaches(int caching_mode) throws Exception JavaDoc {
90       this.caching_mode=caching_mode;
91       cache1=new TreeCache();
92       cache2=new TreeCache();
93       cache1.setCacheMode(caching_mode);
94       cache2.setCacheMode(caching_mode);
95       cache1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
96       cache2.setIsolationLevel(IsolationLevel.SERIALIZABLE);
97
98       cache1.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
99       cache2.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
100       /*
101       cache1.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
102       cache2.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
103 */

104       cache1.setLockAcquisitionTimeout(5000);
105       cache2.setLockAcquisitionTimeout(5000);
106       cache1.start();
107       cache2.start();
108    }
109
110    void destroyCaches() throws Exception JavaDoc {
111       if(cache1 != null)
112          cache1.stop();
113       if(cache2 != null)
114          cache2.stop();
115       cache1=null;
116       cache2=null;
117    }
118
119
120    public void testLockRemoval() throws Exception JavaDoc {
121       initCaches(TreeCache.REPL_SYNC);
122       cache1.setSyncCommitPhase(true);
123       cache1.releaseAllLocks("/");
124       Transaction tx=beginTransaction();
125       cache1.put("/bela/ban", "name", "Bela Ban");
126       assertEquals(2, cache1.getNumberOfLocksHeld());
127       assertEquals(0, cache2.getNumberOfLocksHeld());
128       tx.commit();
129       assertEquals(0, cache1.getNumberOfLocksHeld());
130       assertEquals(0, cache2.getNumberOfLocksHeld());
131     }
132
133
134
135    public void testSyncRepl() throws Exception JavaDoc {
136       Integer JavaDoc age;
137       Transaction tx;
138
139       try {
140          initCaches(TreeCache.REPL_SYNC);
141          cache1.setSyncCommitPhase(true);
142          cache2.setSyncCommitPhase(true);
143
144          // assertEquals(2, cache1.getMembers().size());
145

146          tx=beginTransaction();
147          cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
148          assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
149          tx.commit();
150
151          // value on cache2 must be 38
152
age=(Integer JavaDoc)cache2.get("/a/b/c", "age");
153          assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
154          assertTrue("\"age\" must be 38", age.intValue() == 38);
155       }
156       catch(Exception JavaDoc e) {
157          fail(e.toString());
158       }
159    }
160
161
162      public void testSyncReplWithModficationsOnBothCaches() throws Exception JavaDoc {
163         Integer JavaDoc age;
164         Transaction tx;
165         final Fqn NODE1=Fqn.fromString("/one/two/three");
166         final Fqn NODE2=Fqn.fromString("/eins/zwei/drei");
167
168       try {
169          initCaches(TreeCache.REPL_SYNC);
170
171          cache1.setSyncCommitPhase(true);
172          cache2.setSyncCommitPhase(true);
173
174          tx=beginTransaction();
175          cache1.put(NODE1, "age", new Integer JavaDoc(38));
176          System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
177
178          cache2.put(NODE2, "age", new Integer JavaDoc(39));
179          System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
180
181          System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
182          System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
183
184          tx.commit();
185
186          System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
187          System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
188
189          assertTrue(cache1.exists(NODE1));
190          assertTrue(cache1.exists(NODE2));
191          assertTrue(cache1.exists(NODE1));
192          assertTrue(cache2.exists(NODE2));
193
194          age=(Integer JavaDoc)cache1.get(NODE1, "age");
195          assertNotNull("\"age\" obtained from cache1 for " + NODE1 + " must be non-null ", age);
196          assertTrue("\"age\" must be 38", age.intValue() == 38);
197
198          age=(Integer JavaDoc)cache2.get(NODE1, "age");
199          assertNotNull("\"age\" obtained from cache2 for " + NODE1 + " must be non-null ", age);
200          assertTrue("\"age\" must be 38", age.intValue() == 38);
201
202          age=(Integer JavaDoc)cache1.get(NODE2, "age");
203          assertNotNull("\"age\" obtained from cache1 for " + NODE2 + " must be non-null ", age);
204          assertTrue("\"age\" must be 39", age.intValue() == 39);
205
206          age=(Integer JavaDoc)cache2.get(NODE2, "age");
207          assertNotNull("\"age\" obtained from cache2 for " + NODE2 + " must be non-null ", age);
208          assertTrue("\"age\" must be 39", age.intValue() == 39);
209
210          assertEquals(0, cache1.getNumberOfLocksHeld());
211          assertEquals(0, cache2.getNumberOfLocksHeld());
212          System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
213          System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
214       }
215       catch(Exception JavaDoc e) {
216          fail(e.toString());
217       }
218    }
219
220    public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception JavaDoc {
221       Transaction tx;
222       final Fqn NODE=Fqn.fromString("/one/two/three");
223
224       try {
225          initCaches(TreeCache.REPL_SYNC);
226          tx=beginTransaction();
227          cache1.put(NODE, "age", new Integer JavaDoc(38));
228          System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
229
230          cache2.put(NODE, "age", new Integer JavaDoc(39));
231          System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
232
233          System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
234          System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
235
236          try {
237             tx.commit();
238             fail("commit should throw a RollbackException, we should not get here");
239          }
240          catch(RollbackException rollback) {
241             System.out.println("Transaction was rolled back, this is correct");
242          }
243
244          System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
245          System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
246
247          assertEquals(0, cache1.getNumberOfLocksHeld());
248          assertEquals(0, cache2.getNumberOfLocksHeld());
249
250          assertEquals(0, cache1.getNumberOfNodes());
251          assertEquals(0, cache2.getNumberOfNodes());
252       }
253       catch(Exception JavaDoc e) {
254          fail(e.toString());
255       }
256    }
257
258
259    public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception JavaDoc {
260          Transaction tx;
261          final Fqn NODE1=Fqn.fromString("/one/two/three");
262          final Fqn NODE2=Fqn.fromString("/eins/zwei/drei");
263
264        try {
265           initCaches(TreeCache.REPL_SYNC);
266
267           cache1.setSyncRollbackPhase(true);
268           cache2.setSyncRollbackPhase(true);
269
270           tx=beginTransaction();
271           cache1.put(NODE1, "age", new Integer JavaDoc(38));
272           cache2.put(NODE2, "age", new Integer JavaDoc(39));
273
274           System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
275           System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
276
277           // this will rollback the transaction
278
tx.registerSynchronization(new TransactionAborter(tx));
279
280           try {
281              tx.commit();
282              fail("commit should throw a RollbackException, we should not get here");
283           }
284           catch(RollbackException rollback) {
285              System.out.println("Transaction was rolled back, this is correct");
286           }
287
288           System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
289           System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
290
291           assertEquals(0, cache1.getNumberOfLocksHeld());
292           assertEquals(0, cache2.getNumberOfLocksHeld());
293
294           assertEquals(0, cache1.getNumberOfNodes());
295           assertEquals(0, cache2.getNumberOfNodes());
296        }
297        catch(Exception JavaDoc e) {
298           fail(e.toString());
299        }
300     }
301
302
303
304    public void testASyncRepl() throws Exception JavaDoc {
305       Integer JavaDoc age;
306       Transaction tx;
307
308       initCaches(TreeCache.REPL_ASYNC);
309
310       try {
311          tx=beginTransaction();
312          cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
313          Thread.sleep(1000);
314          assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
315          tx.commit();
316          Thread.sleep(1000);
317
318          // value on cache2 must be 38
319
age=(Integer JavaDoc)cache2.get("/a/b/c", "age");
320          assertNotNull("\"age\" obtained from cache2 is null ", age);
321          assertTrue("\"age\" must be 38", age.intValue() == 38);
322       }
323       catch(Exception JavaDoc e) {
324          fail(e.toString());
325       }
326    }
327
328    /**
329     * Tests concurrent modifications: thread1 succeeds and thread2 is blocked until thread1 is done, and then succeeds
330     * too. However, this is flawed with the introduction of interceptors, here's why.<br/>
331     * <ul>
332     * <li>Thread1 acquires the lock for /bela/ban on cache1
333     * <li>Thread2 blocks on Thread1 to release the lock
334     * <li>Thread1 commits: this means the TransactionInterceptor and the ReplicationInterceptor are called in
335     * the sequence in which they registered. Unfortunately, the TransactionInterceptor registered first. In the
336     * PREPARE phase, the ReplicationInterceptor calls prepare() in cache2 synchronously. The TxInterceptor
337     * does nothing. The the COMMIT phase, the TxInterceptor commits the data by releasing the locks locally and
338     * then the ReplicationInterceptor sends an asynchronous COMMIT to cache2.
339     * <li>Because the TxInterceptor for Thread1 releases the locks locally <em>before</em> sending the async COMMIT,
340     * Thread2 is able to acquire the lock for /bela/ban in cache1 and then starts the PREPARE phase by sending a
341     * synchronous PREPARE to cache2. If this PREPARE arrives at cache2 <em>before</em> the COMMIT from Thread1,
342     * the PREPARE will block because it attempts to acquire a lock on /bela/ban on cache2 still held by Thread1
343     * (which would be released by Thread1's COMMIT). This results in deadlock, which is resolved by Thread2 running
344     * into a timeout with subsequent rollback and Thread1 succeeding.<br/>
345     * </ul>
346     * There are 3 solutions to this:
347     * <ol>
348     * <li>Do nothing. This is standard behavior for concurrent access to the same data. Same thing if the 2 threads
349     * operated on the same data in <em>separate</em> caches, e.g. Thread1 on /bela/ban in cache1 and Thread2 on
350     * /bela/ban in cache2. The semantics of Tx commit as handled by the interceptors is: after tx1.commit() returns
351     * the locks held by tx1 are release and a COMMIT message is on the way (if sent asynchronously).
352     * <li>Force an order over TxInterceptor and ReplicationInterceptor. This would require ReplicationInterceptor
353     * to always be fired first on TX commit. Downside: the interceptors have an implicit dependency, which is not
354     * nice.
355     * <li>Priority-order requests at the receiver; e.g. a COMMIT could release a blocked PREPARE. This is bad because
356     * it violates JGroups' FIFO ordering guarantees.
357     * </ol>
358     * I'm currently investigating solution #2, ie. creating an OrderedSynchronizationHandler, which allows other
359     * SynchronizationHandlers to register (atHead, atTail), and the OrderedSynchronizationHandler would call the
360     * SynchronizationHandler in the order in which they are defined.
361     * @throws Exception
362     */

363    public void testConcurrentPuts() throws Exception JavaDoc {
364       initCaches(TreeCache.REPL_SYNC);
365       cache1.setSyncCommitPhase(true);
366
367       Thread JavaDoc t1=new Thread JavaDoc("Thread1") {
368          Transaction tx;
369
370          public void run() {
371             try {
372                tx=beginTransaction();
373                cache1.put("/bela/ban", "name", "Bela Ban");
374                _pause(2000); // Thread2 will be blocked until we commit
375
tx.commit();
376                System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
377                System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
378             }
379             catch(Throwable JavaDoc ex) {
380                ex.printStackTrace();
381                t1_ex=ex;
382             }
383          }
384       };
385
386       Thread JavaDoc t2=new Thread JavaDoc("Thread2") {
387          Transaction tx;
388
389          public void run() {
390             try {
391                _pause(1000); // give Thread1 time to acquire the lock
392
tx=beginTransaction();
393                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
394                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
395                cache1.put("/bela/ban", "name", "Michelle Ban");
396                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
397                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
398                tx.commit();
399                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
400                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
401             }
402             catch(Throwable JavaDoc ex) {
403                ex.printStackTrace();
404                t2_ex=ex;
405             }
406          }
407       };
408
409       // Let the game start
410
t1.start();
411       t2.start();
412
413       // Wait for threads to die
414
t1.join();
415       t2.join();
416
417       if(t1_ex != null)
418          fail("Thread1 failed: " + t1_ex);
419       if(t2_ex != null)
420          fail("Thread2 failed: " + t2_ex);
421
422       assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
423    }
424
425
426    /**
427     * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
428     */

429    public void testConcurrentCommitsWith1Thread() throws Exception JavaDoc {
430       _testConcurrentCommits(1);
431    }
432
433     /**
434     * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
435     */

436    public void testConcurrentCommitsWith5Threads() throws Exception JavaDoc {
437       _testConcurrentCommits(5);
438    }
439
440    /**
441     * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
442     */

443    private void _testConcurrentCommits(int num_threads) throws Exception JavaDoc {
444       Object JavaDoc myMutex=new Object JavaDoc();
445
446       final TreeCache c1=new TreeCache();
447       final TreeCache c2=new TreeCache();
448       c1.setClusterName("TempCluster");
449       c2.setClusterName("TempCluster");
450       c1.setCacheMode(TreeCache.REPL_SYNC);
451       c2.setCacheMode(TreeCache.REPL_SYNC);
452       c1.setSyncCommitPhase(true);
453       c2.setSyncCommitPhase(true);
454       c1.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
455       c2.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
456       c1.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
457       c2.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
458       c1.setLockAcquisitionTimeout(5000);
459       c2.setLockAcquisitionTimeout(5000);
460       c1.start();
461       c2.start();
462
463       class MyThread extends Thread JavaDoc {
464          Object JavaDoc mutex;
465
466          public MyThread(String JavaDoc name, Object JavaDoc mutex) {
467             super(name);
468             this.mutex=mutex;
469          }
470
471          public void run() {
472             Transaction tx;
473
474             try {
475                tx=beginTransaction();
476                c1.put("/thread/" + getName(), null);
477                System.out.println("Thread " + getName() + " after put(): " + c1.toString());
478                System.out.println("Thread " + getName() + " waiting on mutex");
479                synchronized(mutex) {
480                   mutex.wait();
481                }
482                System.out.println("Thread " + getName() + " committing");
483                tx.commit();
484                System.out.println("Thread " + getName() + " committed successfully");
485             }
486             catch(Exception JavaDoc e) {
487                ex=e;
488             }
489          }
490       }
491
492       MyThread[] threads=new MyThread[num_threads];
493       for(int i=0; i < threads.length; i++) {
494          threads[i]=new MyThread("#" + i, myMutex);
495       }
496       for(int i=0; i < threads.length; i++) {
497          MyThread thread=threads[i];
498          System.out.println("starting thread #" + i);
499          thread.start();
500       }
501
502       _pause(6000);
503       synchronized(myMutex) {
504          System.out.println("cache is " + c1.printLockInfo());
505          System.out.println("******************* SIGNALLING THREADS ********************");
506          myMutex.notifyAll();
507       }
508
509       for(int i=0; i < threads.length; i++) {
510          MyThread thread=threads[i];
511          try {
512             thread.join();
513             System.out.println("Joined thread " + thread.getName());
514          }
515          catch(InterruptedException JavaDoc e) {
516             e.printStackTrace();
517          }
518       }
519
520       System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());
521
522       assertEquals(0, c1.getNumberOfLocksHeld());
523       assertEquals(0, c2.getNumberOfLocksHeld());
524
525       c1.stop();
526       c2.stop();
527
528       if(ex != null)
529          fail("Thread failed: " + ex);
530    }
531
532
533    /**
534     * Conncurrent put on 2 different instances.
535     */

536    public void testConcurrentPutsOnTwoInstances() throws Exception JavaDoc {
537       initCaches(TreeCache.REPL_SYNC);
538       final TreeCache c1=this.cache1;
539       final TreeCache c2=this.cache2;
540
541       Thread JavaDoc t1=new Thread JavaDoc() {
542          Transaction tx;
543
544          public void run() {
545             try {
546                tx=beginTransaction();
547                c1.put("/ben/wang", "name", "Ben Wang");
548                _pause(8000);
549                tx.commit(); // This should go thru
550
}
551             catch(Throwable JavaDoc ex) {
552                ex.printStackTrace();
553                t1_ex=ex;
554             }
555          }
556       };
557
558       Thread JavaDoc t2=new Thread JavaDoc() {
559          Transaction tx;
560
561          public void run() {
562             try {
563                _pause(1000); // give Thread1 time to acquire the lock
564
tx=beginTransaction();
565                c2.put("/ben/wang", "name", "Ben Jr.");
566                tx.commit(); // This will time out and rollback first because Thread1 has a tx going as well.
567
}
568             catch(RollbackException rollback_ex) {
569                System.out.println("received rollback exception as expected");
570             }
571             catch(Throwable JavaDoc ex) {
572                ex.printStackTrace();
573                t2_ex=ex;
574             }
575          }
576       };
577
578       // Let the game start
579
t1.start();
580       t2.start();
581
582       // Wait for thread to die but put an insurance of 5 seconds on it.
583
t1.join();
584       t2.join();
585
586       if(t1_ex != null)
587          fail("Thread1 failed: " + t1_ex);
588       if(t2_ex != null)
589          fail("Thread2 failed: " + t2_ex);
590       assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
591    }
592
593
594    public void testPut() throws Exception JavaDoc {
595       initCaches(TreeCache.REPL_SYNC);
596       final TreeCache c1=this.cache1;
597
598
599       Thread JavaDoc t1=new Thread JavaDoc() {
600          public void run() {
601             try {
602                lock.acquire();
603                System.out.println("-- t1 has lock");
604                c1.put("/a/b/c", "age", new Integer JavaDoc(38));
605                System.out.println("[Thread1] set value to 38");
606
607                System.out.println("-- t1 releases lock");
608                lock.release();
609                _pause(300);
610                Thread.yield();
611
612                lock.acquire();
613                System.out.println("-- t1 has lock");
614                c1.put("/a/b/c", "age", new Integer JavaDoc(39));
615                System.out.println("[Thread1] set value to 39");
616
617                System.out.println("-- t1 releases lock");
618                lock.release();
619                assertEquals(new Integer JavaDoc(39), c1.get("/a/b/c", "age"));
620             }
621             catch(Throwable JavaDoc ex) {
622                ex.printStackTrace();
623                t1_ex=ex;
624             }
625             finally {
626                lock.release();
627             }
628          }
629       };
630
631       Thread JavaDoc t2=new Thread JavaDoc() {
632          public void run() {
633             try {
634                _pause(100);
635                Thread.yield();
636                lock.acquire();
637                System.out.println("-- t2 has lock");
638                // Should replicate the value right away.
639
Integer JavaDoc val=(Integer JavaDoc)cache2.get("/a/b/c", "age");
640                System.out.println("[Thread2] value is " + val);
641                assertEquals(new Integer JavaDoc(38), val);
642                System.out.println("-- t2 releases lock");
643                lock.release();
644                _pause(300);
645                Thread.yield();
646
647                lock.acquire();
648                System.out.println("-- t2 has lock");
649                val=(Integer JavaDoc)cache2.get("/a/b/c", "age");
650                System.out.println("-- t2 releases lock");
651                lock.release();
652                assertEquals(new Integer JavaDoc(39), val);
653             }
654             catch(Throwable JavaDoc ex) {
655                ex.printStackTrace();
656                t2_ex=ex;
657             }
658             finally {
659                lock.release();
660             }
661          }
662       };
663
664       // Let the game start
665
t1.start();
666       t2.start();
667
668       // Wait for thread to die but put an insurance of 5 seconds on it.
669
t1.join();
670       t2.join();
671       if(t1_ex != null)
672          fail("Thread1 failed: " + t1_ex);
673       if(t2_ex != null)
674          fail("Thread2 failed: " + t2_ex);
675    }
676
677    /**
678     * Test replicated cache with transaction. Idea is to have two threads running
679     * a local cache each that is replicating. Depending on whether cache1 commit/rollback or not,
680     * the cache2.get will get different values.
681     * Note that we have used sleep to interpose thread execution sequence.
682     * Although it's not fool proof, it is rather simple and intuitive.
683     *
684     * @throws Exception
685     */

686    public void testPutTx() throws Exception JavaDoc {
687       Transaction tx=null;
688
689       try {
690          initCaches(TreeCache.REPL_SYNC);
691          cache1.setSyncCommitPhase(true);
692          cache2.setSyncCommitPhase(true);
693          tx=beginTransaction();
694          cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
695          cache1.put("/a/b/c", "age", new Integer JavaDoc(39));
696          Object JavaDoc val=cache2.get("/a/b/c", "age"); // must be null as not yet committed
697
assertNull(val);
698          tx.commit();
699
700          tx=beginTransaction();
701          assertEquals(new Integer JavaDoc(39), cache2.get("/a/b/c", "age")); // must not be null
702
tx.commit();
703       }
704       catch(Throwable JavaDoc t) {
705          t.printStackTrace();
706          t1_ex=t;
707       }
708       finally {
709          lock.release();
710       }
711    }
712
713
714    /**
715     * Have both cache1 and cache2 do add and commit. cache1 commit should time out
716     * since it can't obtain the lock when trying to replicate cache2. On the other hand,
717     * cache2 commit will succeed since now that cache1 is rollbacked and lock is
718     * released.
719     */

720    public void testPutTx1() throws Exception JavaDoc {
721       initCaches(TreeCache.REPL_SYNC);
722       final TreeCache c1=this.cache1;
723       Thread JavaDoc t1=new Thread JavaDoc() {
724          public void run() {
725             Transaction tx=null;
726
727             try {
728                lock.acquire();
729                tx=beginTransaction();
730                c1.put("/a/b/c", "age", new Integer JavaDoc(38));
731                c1.put("/a/b/c", "age", new Integer JavaDoc(39));
732                lock.release();
733
734                _pause(300);
735                lock.acquire();
736                try {
737                   tx.commit();
738                }
739                catch(RollbackException ex) {
740                   System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
741                   return;
742                }
743                finally {
744                   lock.release();
745                }
746             }
747             catch(Throwable JavaDoc ex) {
748                ex.printStackTrace();
749                t1_ex=ex;
750             }
751             finally {
752                lock.release();
753             }
754          }
755       };
756
757       Thread JavaDoc t2=new Thread JavaDoc() {
758          public void run() {
759             Transaction tx=null;
760
761             try {
762                sleep(200);
763                Thread.yield();
764                lock.acquire();
765                tx=beginTransaction();
766                assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
767
cache2.put("/a/b/c", "age", new Integer JavaDoc(40));
768                lock.release();
769
770                _pause(300);
771                lock.acquire();
772                assertEquals(new Integer JavaDoc(40), cache2.get("/a/b/c", "age")); // must not be null
773
tx.commit();
774                lock.release();
775
776                _pause(1000);
777                tx=beginTransaction();
778                assertEquals("After cache2 commit", new Integer JavaDoc(40), cache2.get("/a/b/c", "age"));
779                tx.commit();
780             }
781             catch(Throwable JavaDoc ex) {
782                ex.printStackTrace();
783                t2_ex=ex;
784             }
785             finally {
786                lock.release();
787             }
788          }
789       };
790
791       // Let the game start
792
t1.start();
793       t2.start();
794
795       t1.join();
796       t2.join();
797
798       if(t1_ex != null)
799          fail("Thread1 failed: " + t1_ex);
800       if(t2_ex != null)
801          fail("Thread2 failed: " + t2_ex);
802    }
803
804
805
806    public void testPutTxWithRollback() throws Exception JavaDoc {
807       initCaches(TreeCache.REPL_SYNC);
808       final TreeCache c2=this.cache1;
809       Thread JavaDoc t1=new Thread JavaDoc() {
810          public void run() {
811             Transaction tx=null;
812
813             try {
814                lock.acquire();
815                tx=beginTransaction();
816                c2.put("/a/b/c", "age", new Integer JavaDoc(38));
817                c2.put("/a/b/c", "age", new Integer JavaDoc(39));
818                lock.release();
819
820                _pause(100);
821                lock.acquire();
822                tx.rollback();
823                lock.release();
824             }
825             catch(Throwable JavaDoc ex) {
826                ex.printStackTrace();
827                t1_ex=ex;
828             }
829             finally {
830                lock.release();
831             }
832          }
833       };
834
835       Thread JavaDoc t2=new Thread JavaDoc() {
836          public void run() {
837             Transaction tx=null;
838
839             try {
840                sleep(200);
841                Thread.yield();
842                lock.acquire();
843                tx=beginTransaction();
844                assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
845
lock.release();
846
847                _pause(100);
848                lock.acquire();
849                assertNull(cache2.get("/a/b/c", "age")); // must be null as rolledback
850
tx.commit();
851                lock.release();
852             }
853             catch(Throwable JavaDoc ex) {
854                ex.printStackTrace();
855                t2_ex=ex;
856             }
857             finally {
858                lock.release();
859             }
860          }
861       };
862
863       // Let the game start
864
t1.start();
865       t2.start();
866
867       // Wait for thread to die but put an insurance of 5 seconds on it.
868
t1.join();
869       t2.join();
870       if(t1_ex != null)
871          fail("Thread1 failed: " + t1_ex);
872       if(t2_ex != null)
873          fail("Thread2 failed: " + t2_ex);
874    }
875
876
877    static class TransactionAborter implements Synchronization {
878       Transaction ltx=null;
879
880       public TransactionAborter(Transaction ltx) {
881          this.ltx=ltx;
882       }
883
884       public void beforeCompletion() {
885          try {
886             ltx.setRollbackOnly();
887          }
888          catch(SystemException e) {
889          }
890       }
891
892       public void afterCompletion(int status) {
893       }
894    }
895
896
897    static void _pause(long millis) {
898       try {
899          Thread.sleep(millis);
900       }
901       catch(Exception JavaDoc t) {
902       }
903    }
904
905    public static Test suite() throws Exception JavaDoc {
906 // return getDeploySetup(SyncTxUnitTestCase.class, "cachetest.jar");
907
return new TestSuite(SyncReplTxTest.class);
908    }
909
910
911 }
912
Popular Tags