KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > cache > test > replicated > SyncTxUnitTestCase


1 /*
2  *
3  * JBoss, the OpenSource J2EE webOS
4  *
5  * Distributable under LGPL license.
6  * See terms of license at gnu.org.
7  */

8 package org.jboss.test.cache.test.replicated;
9
10 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
11 import junit.framework.Test;
12 import junit.framework.TestCase;
13 import junit.framework.TestSuite;
14 import org.jboss.cache.PropertyConfigurator;
15 import org.jboss.cache.TreeCache;
16 import org.jboss.cache.lock.IsolationLevel;
17 import org.jboss.cache.transaction.DummyTransactionManager;
18 import org.jboss.logging.Logger;
19
20 import javax.naming.Context JavaDoc;
21 import javax.transaction.NotSupportedException JavaDoc;
22 import javax.transaction.RollbackException JavaDoc;
23 import javax.transaction.SystemException JavaDoc;
24 import javax.transaction.Transaction JavaDoc;
25
26 /**
27  * Replicated unit test for sync transactional TreeCache
28  * Note: we use DummyTransactionManager for Tx purpose instead of relying on
29  * jta.
30  *
31  * @version $Revision: 1.19.2.1 $
32  */

33 public class SyncTxUnitTestCase extends TestCase {
34    TreeCache cache1, cache2;
35    int caching_mode=TreeCache.REPL_SYNC;
36    final String JavaDoc group_name="TreeCacheTestGroup";
37    String JavaDoc props=
38          "UDP(ip_mcast=true;ip_ttl=64;loopback=false;mcast_addr=228.1.2.3;" +
39          "mcast_port=45566;mcast_recv_buf_size=80000;mcast_send_buf_size=150000;" +
40          "ucast_recv_buf_size=80000;ucast_send_buf_size=150000):" +
41          "PING(down_thread=true;num_initial_members=2;timeout=500;up_thread=true):" +
42          "MERGE2(max_interval=20000;min_interval=10000):" +
43          "FD(down_thread=true;shun=true;up_thread=true):" +
44          "VERIFY_SUSPECT(down_thread=true;timeout=1500;up_thread=true):" +
45          "pbcast.NAKACK(down_thread=true;gc_lag=50;retransmit_timeout=600,1200,2400,4800;" +
46          "up_thread=true):" +
47          "pbcast.STABLE(desired_avg_gossip=20000;down_thread=true;up_thread=true):" +
48          "UNICAST(down_thread=true;min_threshold=10;timeout=600,1200,2400;window_size=100):" +
49          "FRAG(down_thread=true;frag_size=8192;up_thread=true):" +
50          "pbcast.GMS(join_retry_timeout=2000;join_timeout=5000;print_local_addr=true;shun=true):" +
51          "pbcast.STATE_TRANSFER(down_thread=true;up_thread=true)";
52
53    final static Logger log_=Logger.getLogger(SyncTxUnitTestCase.class);
54    String JavaDoc old_factory=null;
55    final String JavaDoc FACTORY="org.jboss.cache.transaction.DummyContextFactory";
56    FIFOSemaphore lock=new FIFOSemaphore(1);
57    DummyTransactionManager tx_mgr;
58    Throwable JavaDoc t1_ex, t2_ex;
59
60
61
62    public SyncTxUnitTestCase(String JavaDoc name) {
63       super(name);
64    }
65
66    public void setUp() throws Exception JavaDoc {
67       super.setUp();
68       old_factory=System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
69       System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
70       tx_mgr=DummyTransactionManager.getInstance();
71       initCaches(TreeCache.REPL_SYNC);
72       t1_ex=t2_ex=null;
73    }
74
75    public void tearDown() throws Exception JavaDoc {
76       super.tearDown();
77       DummyTransactionManager.destroy();
78       destroyCaches();
79       if(old_factory != null) {
80          System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
81          old_factory=null;
82       }
83    }
84
85    Transaction JavaDoc beginTransaction() throws SystemException JavaDoc, NotSupportedException JavaDoc {
86       tx_mgr.begin();
87       return tx_mgr.getTransaction();
88    }
89
90    void initCaches(int caching_mode) throws Exception JavaDoc {
91       this.caching_mode=caching_mode;
92       cache1=new TreeCache();
93       cache2=new TreeCache();
94       PropertyConfigurator config=new PropertyConfigurator();
95       config.configure(cache1, "META-INF/replSync-service.xml"); // read in generic replSync xml
96
config.configure(cache2, "META-INF/replSync-service.xml"); // read in generic replSync xml
97
cache1.setCacheMode(caching_mode);
98       cache2.setCacheMode(caching_mode);
99
100       cache1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
101       cache2.setIsolationLevel(IsolationLevel.SERIALIZABLE);
102       /*
103       cache1.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
104       cache2.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
105 */

106       cache1.setLockAcquisitionTimeout(5000);
107       cache2.setLockAcquisitionTimeout(5000);
108       cache1.start();
109       cache2.start();
110    }
111
112    void destroyCaches() throws Exception JavaDoc {
113       cache1.stop();
114       cache2.stop();
115       cache1=null;
116       cache2=null;
117    }
118
119
120    public void testLockRemoval() throws Exception JavaDoc {
121       cache1.setSyncCommitPhase(true);
122       cache1.releaseAllLocks("/");
123       Transaction JavaDoc tx=beginTransaction();
124       cache1.put("/bela/ban", "name", "Bela Ban");
125       assertEquals(2, cache1.getNumberOfLocksHeld());
126       assertEquals(0, cache2.getNumberOfLocksHeld());
127       tx.commit();
128       assertEquals(0, cache1.getNumberOfLocksHeld());
129       assertEquals(0, cache2.getNumberOfLocksHeld());
130     }
131
132
133
134    public void testSyncRepl() throws Exception JavaDoc {
135       Integer JavaDoc age;
136       Transaction JavaDoc tx;
137
138       try {
139          // cache1.setSyncCommitPhase(true);
140
tx=beginTransaction();
141          cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
142          assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
143          tx.commit();
144
145          // value on cache2 must be 38
146
age=(Integer JavaDoc)cache2.get("/a/b/c", "age");
147          assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
148          assertTrue("\"age\" must be 38", age.intValue() == 38);
149       }
150       catch(Exception JavaDoc e) {
151          fail(e.toString());
152       }
153    }
154
155
156
157    public void testASyncRepl() throws Exception JavaDoc {
158       Integer JavaDoc age;
159       Transaction JavaDoc tx;
160
161       cache1.setCacheMode(TreeCache.REPL_ASYNC);
162       cache2.setCacheMode(TreeCache.REPL_ASYNC);
163
164       try {
165          tx=beginTransaction();
166          cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
167          Thread.sleep(100);
168          assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
169          tx.commit();
170          Thread.sleep(100);
171
172          // value on cache2 must be 38
173
age=(Integer JavaDoc)cache2.get("/a/b/c", "age");
174          assertNotNull("\"age\" obtained from cache2 is null ", age);
175          assertTrue("\"age\" must be 38", age.intValue() == 38);
176       }
177       catch(Exception JavaDoc e) {
178          fail(e.toString());
179       }
180    }
181
182    /**
183     * Tests concurrent modifications: thread1 succeeds and thread2 is blocked until thread1 is done, and then succeeds
184     * too. However, this is flawed with the introduction of interceptors, here's why.<br/>
185     * <ul>
186     * <li>Thread1 acquires the lock for /bela/ban on cache1
187     * <li>Thread2 blocks on Thread1 to release the lock
188     * <li>Thread1 commits: this means the TransactionInterceptor and the ReplicationInterceptor are called in
189     * the sequence in which they registered. Unfortunately, the TransactionInterceptor registered first. In the
190     * PREPARE phase, the ReplicationInterceptor calls prepare() in cache2 synchronously. The TxInterceptor
191     * does nothing. The the COMMIT phase, the TxInterceptor commits the data by releasing the locks locally and
192     * then the ReplicationInterceptor sends an asynchronous COMMIT to cache2.
193     * <li>Because the TxInterceptor for Thread1 releases the locks locally <em>before</em> sending the async COMMIT,
194     * Thread2 is able to acquire the lock for /bela/ban in cache1 and then starts the PREPARE phase by sending a
195     * synchronous PREPARE to cache2. If this PREPARE arrives at cache2 <em>before</em> the COMMIT from Thread1,
196     * the PREPARE will block because it attempts to acquire a lock on /bela/ban on cache2 still held by Thread1
197     * (which would be released by Thread1's COMMIT). This results in deadlock, which is resolved by Thread2 running
198     * into a timeout with subsequent rollback and Thread1 succeeding.<br/>
199     * </ul>
200     * There are 3 solutions to this:
201     * <ol>
202     * <li>Do nothing. This is standard behavior for concurrent access to the same data. Same thing if the 2 threads
203     * operated on the same data in <em>separate</em> caches, e.g. Thread1 on /bela/ban in cache1 and Thread2 on
204     * /bela/ban in cache2. The semantics of Tx commit as handled by the interceptors is: after tx1.commit() returns
205     * the locks held by tx1 are release and a COMMIT message is on the way (if sent asynchronously).
206     * <li>Force an order over TxInterceptor and ReplicationInterceptor. This would require ReplicationInterceptor
207     * to always be fired first on TX commit. Downside: the interceptors have an implicit dependency, which is not
208     * nice.
209     * <li>Priority-order requests at the receiver; e.g. a COMMIT could release a blocked PREPARE. This is bad because
210     * it violates JGroups' FIFO ordering guarantees.
211     * </ol>
212     * I'm currently investigating solution #2, ie. creating an OrderedSynchronizationHandler, which allows other
213     * SynchronizationHandlers to register (atHead, atTail), and the OrderedSynchronizationHandler would call the
214     * SynchronizationHandler in the order in which they are defined.
215     * @throws Exception
216     */

217    public void testConcurrentPuts() throws Exception JavaDoc {
218       cache1.setSyncCommitPhase(true);
219
220       Thread JavaDoc t1=new Thread JavaDoc("Thread1") {
221          Transaction JavaDoc tx;
222
223          public void run() {
224             try {
225                tx=beginTransaction();
226                cache1.put("/bela/ban", "name", "Bela Ban");
227                _pause(2000); // Thread2 will be blocked until we commit
228
tx.commit();
229                System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
230                System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
231             }
232             catch(Throwable JavaDoc ex) {
233                ex.printStackTrace();
234                t1_ex=ex;
235             }
236          }
237       };
238
239       Thread JavaDoc t2=new Thread JavaDoc("Thread2") {
240          Transaction JavaDoc tx;
241
242          public void run() {
243             try {
244                _pause(1000); // give Thread1 time to acquire the lock
245
tx=beginTransaction();
246                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
247                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
248                cache1.put("/bela/ban", "name", "Michelle Ban");
249                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
250                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
251                tx.commit();
252                System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
253                System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
254             }
255             catch(Throwable JavaDoc ex) {
256                ex.printStackTrace();
257                t2_ex=ex;
258             }
259          }
260       };
261
262       // Let the game start
263
t1.start();
264       t2.start();
265
266       // Wait for threads to die
267
t1.join();
268       t2.join();
269
270       if(t1_ex != null)
271          fail("Thread1 failed: " + t1_ex);
272       if(t2_ex != null)
273          fail("Thread2 failed: " + t2_ex);
274
275       assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
276    }
277
278
279    /**
280     * Conncurrent put on 2 different instances.
281     */

282    public void testConcurrentPutsOnTwoInstances() throws Exception JavaDoc {
283       final TreeCache cache1=this.cache1;
284       final TreeCache cache2=this.cache2;
285
286       Thread JavaDoc t1=new Thread JavaDoc() {
287          Transaction JavaDoc tx;
288
289          public void run() {
290             try {
291                tx=beginTransaction();
292                cache1.put("/ben/wang", "name", "Ben Wang");
293                _pause(8000);
294                tx.commit(); // This should go thru
295
}
296             catch(Throwable JavaDoc ex) {
297                ex.printStackTrace();
298                t1_ex=ex;
299             }
300          }
301       };
302
303       Thread JavaDoc t2=new Thread JavaDoc() {
304          Transaction JavaDoc tx;
305
306          public void run() {
307             try {
308                _pause(1000); // give Thread1 time to acquire the lock
309
tx=beginTransaction();
310                cache2.put("/ben/wang", "name", "Ben Jr.");
311                tx.commit(); // This will time out and rollback first because Thread1 has a tx going as well.
312
}
313             catch(RollbackException JavaDoc rollback_ex) {
314                System.out.println("received rollback exception as expected");
315             }
316             catch(Throwable JavaDoc ex) {
317                ex.printStackTrace();
318                t2_ex=ex;
319             }
320          }
321       };
322
323       // Let the game start
324
t1.start();
325       t2.start();
326
327       // Wait for thread to die but put an insurance of 5 seconds on it.
328
t1.join();
329       t2.join();
330
331       if(t1_ex != null)
332          fail("Thread1 failed: " + t1_ex);
333       if(t2_ex != null)
334          fail("Thread2 failed: " + t2_ex);
335       assertEquals("Ben Wang", cache1.get("/ben/wang", "name"));
336    }
337
338
339    public void testPut() throws Exception JavaDoc {
340       final TreeCache cache1=this.cache1;
341       final TreeCache cache2=this.cache2;
342
343
344       Thread JavaDoc t1=new Thread JavaDoc() {
345          public void run() {
346             try {
347                lock.acquire();
348                System.out.println("-- t1 has lock");
349                cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
350                System.out.println("[Thread1] set value to 38");
351
352                System.out.println("-- t1 releases lock");
353                lock.release();
354                _pause(300);
355                Thread.yield();
356
357                lock.acquire();
358                System.out.println("-- t1 has lock");
359                cache1.put("/a/b/c", "age", new Integer JavaDoc(39));
360                System.out.println("[Thread1] set value to 39");
361
362                System.out.println("-- t1 releases lock");
363                lock.release();
364                assertEquals(new Integer JavaDoc(39), cache1.get("/a/b/c", "age"));
365             }
366             catch(Throwable JavaDoc ex) {
367                ex.printStackTrace();
368                t1_ex=ex;
369             }
370             finally {
371                lock.release();
372             }
373          }
374       };
375
376       Thread JavaDoc t2=new Thread JavaDoc() {
377          public void run() {
378             try {
379                _pause(100);
380                Thread.yield();
381                lock.acquire();
382                System.out.println("-- t2 has lock");
383                // Should replicate the value right away.
384
Integer JavaDoc val=(Integer JavaDoc)cache2.get("/a/b/c", "age");
385                System.out.println("[Thread2] value is " + val);
386                assertEquals(new Integer JavaDoc(38), val);
387                System.out.println("-- t2 releases lock");
388                lock.release();
389                _pause(300);
390                Thread.yield();
391
392                lock.acquire();
393                System.out.println("-- t2 has lock");
394                val=(Integer JavaDoc)cache2.get("/a/b/c", "age");
395                System.out.println("-- t2 releases lock");
396                lock.release();
397                assertEquals(new Integer JavaDoc(39), val);
398             }
399             catch(Throwable JavaDoc ex) {
400                ex.printStackTrace();
401                t2_ex=ex;
402             }
403             finally {
404                lock.release();
405             }
406          }
407       };
408
409       // Let the game start
410
t1.start();
411       t2.start();
412
413       // Wait for thread to die but put an insurance of 5 seconds on it.
414
t1.join();
415       t2.join();
416       if(t1_ex != null)
417          fail("Thread1 failed: " + t1_ex);
418       if(t2_ex != null)
419          fail("Thread2 failed: " + t2_ex);
420    }
421
422    /**
423     * Test replicated cache with transaction. Idea is to have two threads running
424     * a local cache each that is replicating. Depending on whether cache1 commit/rollback or not,
425     * the cache2.get will get different values.
426     * Note that we have used sleep to interpose thread execution sequence.
427     * Although it's not fool proof, it is rather simple and intuitive.
428     *
429     * @throws Exception
430     */

431    public void testPutTx() throws Exception JavaDoc {
432       Transaction JavaDoc tx=null;
433
434       try {
435          tx=beginTransaction();
436          cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
437          cache1.put("/a/b/c", "age", new Integer JavaDoc(39));
438          Object JavaDoc val=cache2.get("/a/b/c", "age"); // must be null as not yet committed
439
assertNull(val);
440          tx.commit();
441
442          tx=beginTransaction();
443          assertEquals(new Integer JavaDoc(39), cache2.get("/a/b/c", "age")); // must not be null
444
tx.commit();
445       }
446       catch(Throwable JavaDoc ex) {
447          ex.printStackTrace();
448          t1_ex=ex;
449       }
450       finally {
451          lock.release();
452       }
453    }
454
455
456    /**
457     * Have both cache1 and cache2 do add and commit. cache1 commit should time out
458     * since it can't obtain the lock when trying to replicate cache2. On the other hand,
459     * cache2 commit will succeed since now that cache1 is rollbacked and lock is
460     * released.
461     */

462    public void testPutTx1() throws Exception JavaDoc {
463       final TreeCache cache1=this.cache1;
464       final TreeCache cache2=this.cache2;
465       Thread JavaDoc t1=new Thread JavaDoc() {
466          public void run() {
467             Transaction JavaDoc tx=null;
468
469             try {
470                lock.acquire();
471                tx=beginTransaction();
472                cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
473                cache1.put("/a/b/c", "age", new Integer JavaDoc(39));
474                lock.release();
475
476                _pause(300);
477                lock.acquire();
478                try {
479                   tx.commit();
480                }
481                catch(RollbackException JavaDoc ex) {
482                   System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
483                   return;
484                }
485                finally {
486                   lock.release();
487                }
488             }
489             catch(Throwable JavaDoc ex) {
490                ex.printStackTrace();
491                t1_ex=ex;
492             }
493             finally {
494                lock.release();
495             }
496          }
497       };
498
499       Thread JavaDoc t2=new Thread JavaDoc() {
500          public void run() {
501             Transaction JavaDoc tx=null;
502
503             try {
504                sleep(200);
505                Thread.yield();
506                lock.acquire();
507                tx=beginTransaction();
508                assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
509
cache2.put("/a/b/c", "age", new Integer JavaDoc(40));
510                lock.release();
511
512                _pause(300);
513                lock.acquire();
514                assertEquals(new Integer JavaDoc(40), cache2.get("/a/b/c", "age")); // must not be null
515
tx.commit();
516                lock.release();
517
518                _pause(1000);
519                tx=beginTransaction();
520                assertEquals("After cache2 commit", new Integer JavaDoc(40), cache2.get("/a/b/c", "age"));
521                tx.commit();
522             }
523             catch(Throwable JavaDoc ex) {
524                ex.printStackTrace();
525                t2_ex=ex;
526             }
527             finally {
528                lock.release();
529             }
530          }
531       };
532
533       // Let the game start
534
t1.start();
535       t2.start();
536
537       t1.join();
538       t2.join();
539
540       if(t1_ex != null)
541          fail("Thread1 failed: " + t1_ex);
542       if(t2_ex != null)
543          fail("Thread2 failed: " + t2_ex);
544    }
545
546
547
548    public void testPutTxWithRollback() throws Exception JavaDoc {
549       final TreeCache cache1=this.cache1;
550       final TreeCache cache2=this.cache2;
551       Thread JavaDoc t1=new Thread JavaDoc() {
552          public void run() {
553             Transaction JavaDoc tx=null;
554
555             try {
556                lock.acquire();
557                tx=beginTransaction();
558                cache1.put("/a/b/c", "age", new Integer JavaDoc(38));
559                cache1.put("/a/b/c", "age", new Integer JavaDoc(39));
560                lock.release();
561
562                _pause(100);
563                lock.acquire();
564                tx.rollback();
565                lock.release();
566             }
567             catch(Throwable JavaDoc ex) {
568                ex.printStackTrace();
569                t1_ex=ex;
570             }
571             finally {
572                lock.release();
573             }
574          }
575       };
576
577       Thread JavaDoc t2=new Thread JavaDoc() {
578          public void run() {
579             Transaction JavaDoc tx=null;
580
581             try {
582                sleep(200);
583                Thread.yield();
584                lock.acquire();
585                tx=beginTransaction();
586                assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
587
lock.release();
588
589                _pause(100);
590                lock.acquire();
591                assertNull(cache2.get("/a/b/c", "age")); // must be null as rolledback
592
tx.commit();
593                lock.release();
594             }
595             catch(Throwable JavaDoc ex) {
596                ex.printStackTrace();
597                t2_ex=ex;
598             }
599             finally {
600                lock.release();
601             }
602          }
603       };
604
605       // Let the game start
606
t1.start();
607       t2.start();
608
609       // Wait for thread to die but put an insurance of 5 seconds on it.
610
t1.join();
611       t2.join();
612       if(t1_ex != null)
613          fail("Thread1 failed: " + t1_ex);
614       if(t2_ex != null)
615          fail("Thread2 failed: " + t2_ex);
616    }
617
618    static void _pause(long millis) {
619       try {
620          Thread.sleep(millis);
621       }
622       catch(Exception JavaDoc ex) {
623       }
624    }
625
626    public static Test suite() throws Exception JavaDoc {
627 // return getDeploySetup(SyncTxUnitTestCase.class, "cachetest.jar");
628
return new TestSuite(SyncTxUnitTestCase.class);
629    }
630
631
632 }
633
Popular Tags