1 8 package org.jboss.cache.replicated; 9 10 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore; 11 import junit.framework.Test; 12 import junit.framework.TestCase; 13 import junit.framework.TestSuite; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.jboss.cache.AbstractCacheListener; 17 import org.jboss.cache.Cache; 18 import org.jboss.cache.CacheException; 19 import org.jboss.cache.CacheImpl; 20 import org.jboss.cache.Fqn; 21 import org.jboss.cache.config.Configuration; 22 import org.jboss.cache.lock.IsolationLevel; 23 import org.jboss.cache.lock.TimeoutException; 24 import org.jboss.cache.misc.TestingUtil; 25 import org.jboss.cache.transaction.DummyTransactionManager; 26 27 import javax.naming.Context ; 28 import javax.transaction.NotSupportedException ; 29 import javax.transaction.RollbackException ; 30 import javax.transaction.Status ; 31 import javax.transaction.Synchronization ; 32 import javax.transaction.SystemException ; 33 import javax.transaction.Transaction ; 34 import javax.transaction.TransactionManager ; 35 import java.util.ArrayList ; 36 import java.util.List ; 37 import java.util.Map ; 38 39 46 public class SyncReplTxTest extends TestCase 47 { 48 private static Log log = LogFactory.getLog(SyncReplTxTest.class); 49 private CacheImpl cache1; 50 private CacheImpl cache2; 51 52 private String old_factory = null; 53 private final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory"; 54 private FIFOSemaphore lock = new FIFOSemaphore(1); 55 private DummyTransactionManager tx_mgr; 56 private Throwable t1_ex; 57 private Throwable t2_ex; 58 59 60 public SyncReplTxTest(String name) 61 { 62 super(name); 63 } 64 65 public void setUp() throws Exception 66 { 67 super.setUp(); 68 old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY); 69 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY); 70 t1_ex = t2_ex = null; 71 } 72 73 public void tearDown() throws Exception 74 { 75 super.tearDown(); 76 DummyTransactionManager.destroy(); 77 destroyCaches(); 78 if (old_factory != null) 79 { 80 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory); 81 old_factory = null; 82 } 83 } 84 85 private Transaction beginTransaction() throws SystemException , NotSupportedException 86 { 87 DummyTransactionManager mgr = DummyTransactionManager.getInstance(); 88 mgr.begin(); 89 return mgr.getTransaction(); 90 } 91 92 private void initCaches(Configuration.CacheMode caching_mode) throws Exception 93 { 94 cache1 = new CacheImpl(); 95 cache2 = new CacheImpl(); 96 cache1.getConfiguration().setCacheMode(caching_mode); 97 cache2.getConfiguration().setCacheMode(caching_mode); 98 cache1.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE); 99 cache2.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE); 100 101 cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 102 cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 103 cache1.getConfiguration().setLockAcquisitionTimeout(5000); 104 cache2.getConfiguration().setLockAcquisitionTimeout(5000); 105 106 configureMultiplexer(cache1); 107 configureMultiplexer(cache2); 108 109 cache1.start(); 110 cache2.start(); 111 112 validateMultiplexer(cache1); 113 validateMultiplexer(cache2); 114 } 115 116 123 protected void configureMultiplexer(Cache cache) throws Exception 124 { 125 } 127 128 135 protected void validateMultiplexer(Cache cache) 136 { 137 assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer()); 138 } 139 140 private void destroyCaches() 141 { 142 if (cache1 != null) 143 cache1.stop(); 144 if (cache2 != null) 145 cache2.stop(); 146 cache1 = null; 147 cache2 = null; 148 } 149 150 public void testLockRemoval() throws Exception 151 { 152 initCaches(Configuration.CacheMode.REPL_SYNC); 153 cache1.getConfiguration().setSyncCommitPhase(true); 154 cache1.releaseAllLocks("/"); 155 Transaction tx = beginTransaction(); 156 cache1.put("/bela/ban", "name", "Bela Ban"); 157 assertEquals(3, cache1.getNumberOfLocksHeld()); 158 assertEquals(0, cache2.getNumberOfLocksHeld()); 159 tx.commit(); 160 assertEquals(0, cache1.getNumberOfLocksHeld()); 161 assertEquals(0, cache2.getNumberOfLocksHeld()); 162 } 163 164 165 public void testSyncRepl() throws Exception 166 { 167 Integer age; 168 Transaction tx; 169 170 try 171 { 172 initCaches(Configuration.CacheMode.REPL_SYNC); 173 cache1.getConfiguration().setSyncCommitPhase(true); 174 cache2.getConfiguration().setSyncCommitPhase(true); 175 176 178 tx = beginTransaction(); 179 cache1.put("/a/b/c", "age", 38); 180 TransactionManager mgr = cache1.getTransactionManager(); 181 tx = mgr.suspend(); 182 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age")); 183 log.debug("cache1: locks held before commit: " + cache1.printLockInfo()); 184 log.debug("cache2: locks held before commit: " + cache2.printLockInfo()); 185 mgr.resume(tx); 186 tx.commit(); 187 log.debug("cache1: locks held after commit: " + cache1.printLockInfo()); 188 log.debug("cache2: locks held after commit: " + cache2.printLockInfo()); 189 190 age = (Integer ) cache2.get("/a/b/c", "age"); 192 assertNotNull("\"age\" obtained from cache2 must be non-null ", age); 193 assertTrue("\"age\" must be 38", age == 38); 194 } 195 catch (Exception e) 196 { 197 fail(e.toString()); 198 } 199 } 200 201 204 public void testSimplePut() throws Exception 205 { 206 initCaches(Configuration.CacheMode.REPL_SYNC); 207 208 cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang"); 209 210 cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", 10); 211 } 212 213 214 public void testSimpleTxPut() throws Exception 215 { 216 Transaction tx; 217 final Fqn NODE1 = Fqn.fromString("/one/two/three"); 218 initCaches(Configuration.CacheMode.REPL_SYNC); 219 220 tx = beginTransaction(); 221 cache1.put(NODE1, "age", 38); 222 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true)); 223 tx.commit(); 224 225 234 235 244 } 245 246 public void testSyncReplWithModficationsOnBothCaches() throws Exception 247 { 248 Integer age; 249 Transaction tx; 250 final Fqn NODE1 = Fqn.fromString("/one/two/three"); 251 final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei"); 252 253 initCaches(Configuration.CacheMode.REPL_SYNC); 254 255 cache1.put("/one/two", null); 257 cache2.put("/eins/zwei", null); 258 259 cache1.getConfiguration().setSyncCommitPhase(true); 260 cache2.getConfiguration().setSyncCommitPhase(true); 261 262 tx = beginTransaction(); 263 cache1.put(NODE1, "age", 38); 264 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true)); 265 266 cache2.put(NODE2, "age", 39); 267 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true)); 268 269 System.out.println("cache1 before commit:\n" + cache1.printLockInfo()); 270 System.out.println("cache2 before commit:\n" + cache2.printLockInfo()); 271 272 try 273 { 274 tx.commit(); 275 fail("Should not succeed with SERIALIZABLE semantics"); 276 } 277 catch (Exception e) 278 { 279 } 281 282 System.out.println("cache1 after commit:\n" + cache1.printLockInfo()); 283 System.out.println("cache2 after commit:\n" + cache2.printLockInfo()); 284 285 307 308 assertEquals(0, cache1.getNumberOfLocksHeld()); 309 assertEquals(0, cache2.getNumberOfLocksHeld()); 310 System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true)); 311 System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true)); 312 } 313 314 public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception 315 { 316 Transaction tx; 317 final Fqn NODE = Fqn.fromString("/one/two/three"); 318 initCaches(Configuration.CacheMode.REPL_SYNC); 319 tx = beginTransaction(); 320 cache1.put(NODE, "age", 38); 321 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true)); 322 323 cache2.put(NODE, "age", 39); 324 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true)); 325 326 System.out.println("cache1 before commit:\n" + cache1.printLockInfo()); 327 System.out.println("cache2 before commit:\n" + cache2.printLockInfo()); 328 329 try 330 { 331 tx.commit(); 332 fail("commit should throw a RollbackException, we should not get here"); 333 } 334 catch (RollbackException rollback) 335 { 336 System.out.println("Transaction was rolled back, this is correct"); 337 } 338 339 System.out.println("cache1 after commit:\n" + cache1.printLockInfo()); 340 System.out.println("cache2 after commit:\n" + cache2.printLockInfo()); 341 342 assertEquals(0, cache1.getNumberOfLocksHeld()); 343 assertEquals(0, cache2.getNumberOfLocksHeld()); 344 345 assertEquals(0, cache1.getNumberOfNodes()); 346 assertEquals(0, cache2.getNumberOfNodes()); 347 } 348 349 350 public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception 351 { 352 Transaction tx; 353 final Fqn NODE1 = Fqn.fromString("/one/two/three"); 354 final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei"); 355 356 initCaches(Configuration.CacheMode.REPL_SYNC); 357 358 cache1.getConfiguration().setSyncRollbackPhase(true); 359 cache2.getConfiguration().setSyncRollbackPhase(true); 360 361 tx = beginTransaction(); 362 cache1.put(NODE1, "age", 38); 363 cache2.put(NODE2, "age", 39); 364 365 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo()); 366 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo()); 367 368 tx.registerSynchronization(new TransactionAborter(tx)); 370 371 try 372 { 373 tx.commit(); 374 fail("commit should throw a RollbackException, we should not get here"); 375 } 376 catch (RollbackException rollback) 377 { 378 System.out.println("Transaction was rolled back, this is correct"); 379 } 380 381 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo()); 382 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo()); 383 384 assertEquals(0, cache1.getNumberOfLocksHeld()); 385 assertEquals(0, cache2.getNumberOfLocksHeld()); 386 387 assertEquals(0, cache1.getNumberOfNodes()); 388 assertEquals(0, cache2.getNumberOfNodes()); 389 } 390 391 392 398 public void testSyncReplWithRollbackAndListener() throws Exception 399 { 400 Transaction tx; 401 final Fqn NODE1 = Fqn.fromString("/one/two/three"); 402 403 initCaches(Configuration.CacheMode.REPL_SYNC); 404 405 cache1.getConfiguration().setSyncRollbackPhase(true); 406 cache2.getConfiguration().setSyncRollbackPhase(true); 407 408 410 CallbackListener cbl1 = new CallbackListener(cache1, "age"); 411 CallbackListener cbl2 = new CallbackListener(cache2, "age"); 412 413 tx = beginTransaction(); 414 cache1.put(NODE1, "age", 38); 415 416 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo()); 417 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo()); 418 419 tx.registerSynchronization(new TransactionAborter(tx)); 421 422 try 423 { 424 tx.commit(); 425 fail("commit should throw a RollbackException, we should not get here"); 426 } 427 catch (RollbackException rollback) 428 { 429 rollback.printStackTrace(); 430 System.out.println("Transaction was rolled back, this is correct"); 431 } 432 433 TestingUtil.sleepThread(1000); 435 436 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo()); 437 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo()); 438 439 assertNull(cbl1.getCallbackException()); 440 assertNull(cbl2.getCallbackException()); 441 442 assertEquals(0, cache1.getNumberOfLocksHeld()); 443 assertEquals(0, cache2.getNumberOfLocksHeld()); 444 445 assertEquals(0, cache1.getNumberOfNodes()); 446 assertEquals(0, cache2.getNumberOfNodes()); 447 448 450 cache2.getNotifier().removeCacheListener(cbl2); 451 cbl2 = new TransactionAborterCallbackListener(cache2, "age"); 453 454 tx = beginTransaction(); 455 cache1.put(NODE1, "age", 38); 456 457 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo()); 458 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo()); 459 460 tx.commit(); 461 462 TestingUtil.sleepThread(1000); 464 465 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo()); 466 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo()); 467 468 assertNull(cbl1.getCallbackException()); 469 assertNull(cbl2.getCallbackException()); 470 471 assertEquals(0, cache1.getNumberOfLocksHeld()); 472 assertEquals(0, cache2.getNumberOfLocksHeld()); 473 474 assertEquals(3, cache1.getNumberOfNodes()); 476 assertEquals(0, cache2.getNumberOfNodes()); 477 478 } 479 480 481 487 public void testSyncReplWithRemoteRollback() throws Exception 488 { 489 Transaction tx; 490 final Fqn NODE1 = Fqn.fromString("/one/two/three"); 491 492 initCaches(Configuration.CacheMode.REPL_SYNC); 493 494 cache1.getConfiguration().setSyncRollbackPhase(true); 495 cache2.getConfiguration().setSyncRollbackPhase(true); 496 497 499 TransactionAborterListener tal = new TransactionAborterListener(cache2); 501 502 tx = beginTransaction(); 503 cache1.put(NODE1, "age", 38); 504 505 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo()); 506 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo()); 507 508 try 509 { 510 tx.commit(); 511 fail("commit should throw a RollbackException, we should not get here"); 512 } 513 catch (RollbackException rollback) 514 { 515 System.out.println("Transaction was rolled back, this is correct"); 516 } 517 518 TestingUtil.sleepThread(1000); 520 521 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo()); 522 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo()); 523 524 assertNull(tal.getCallbackException()); 525 526 assertEquals(0, cache1.getNumberOfLocksHeld()); 527 assertEquals(0, cache2.getNumberOfLocksHeld()); 528 529 assertEquals(0, cache1.getNumberOfNodes()); 530 assertEquals(0, cache2.getNumberOfNodes()); 531 532 } 533 534 535 public void testASyncRepl() throws Exception 536 { 537 Integer age; 538 Transaction tx; 539 540 initCaches(Configuration.CacheMode.REPL_ASYNC); 541 542 tx = beginTransaction(); 543 cache1.put("/a/b/c", "age", 38); 544 Thread.sleep(1000); 545 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age")); 546 tx.commit(); 547 Thread.sleep(1000); 548 549 age = (Integer ) cache2.get("/a/b/c", "age"); 551 assertNotNull("\"age\" obtained from cache2 is null ", age); 552 assertTrue("\"age\" must be 38", age == 38); 553 554 } 555 556 592 public void testConcurrentPuts() throws Exception 593 { 594 initCaches(Configuration.CacheMode.REPL_SYNC); 595 cache1.getConfiguration().setSyncCommitPhase(true); 596 597 Thread t1 = new Thread ("Thread1") 598 { 599 Transaction tx; 600 601 public void run() 602 { 603 try 604 { 605 tx = beginTransaction(); 606 cache1.put("/bela/ban", "name", "Bela Ban"); 607 TestingUtil.sleepThread(2000); tx.commit(); 609 System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo()); 610 System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo()); 611 } 612 catch (Throwable ex) 613 { 614 ex.printStackTrace(); 615 t1_ex = ex; 616 } 617 } 618 }; 619 620 Thread t2 = new Thread ("Thread2") 621 { 622 Transaction tx; 623 624 public void run() 625 { 626 try 627 { 628 TestingUtil.sleepThread(1000); tx = beginTransaction(); 630 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 631 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 632 cache1.put("/bela/ban", "name", "Michelle Ban"); 633 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 634 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 635 tx.commit(); 636 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 637 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 638 } 639 catch (Throwable ex) 640 { 641 ex.printStackTrace(); 642 t2_ex = ex; 643 } 644 } 645 }; 646 647 t1.start(); 649 t2.start(); 650 651 t1.join(); 653 t2.join(); 654 655 if (t1_ex != null) 656 fail("Thread1 failed: " + t1_ex); 657 if (t2_ex != null) 658 fail("Thread2 failed: " + t2_ex); 659 660 assertEquals("Michelle Ban", cache1.get("/bela/ban", "name")); 661 } 662 663 664 667 public void testConcurrentCommitsWith1Thread() throws Exception 668 { 669 _testConcurrentCommits(1); 670 } 671 672 675 public void testConcurrentCommitsWith5Threads() throws Exception 676 { 677 _testConcurrentCommits(5); 678 } 679 680 683 private void _testConcurrentCommits(int num_threads) throws Exception 684 { 685 Object myMutex = new Object (); 686 687 final CacheImpl c1 = new CacheImpl(); 688 final CacheImpl c2 = new CacheImpl(); 689 c1.getConfiguration().setClusterName("TempCluster"); 690 c2.getConfiguration().setClusterName("TempCluster"); 691 c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC); 692 c2.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC); 693 c1.getConfiguration().setSyncCommitPhase(true); 694 c2.getConfiguration().setSyncCommitPhase(true); 695 c1.getConfiguration().setSyncRollbackPhase(true); 696 c2.getConfiguration().setSyncRollbackPhase(true); 697 c1.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ); 698 c2.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ); 699 c1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 700 c2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 701 c1.getConfiguration().setLockAcquisitionTimeout(5000); 702 c2.getConfiguration().setLockAcquisitionTimeout(5000); 703 c1.start(); 704 c2.start(); 705 final List <Exception > exceptions = new ArrayList <Exception >(); 706 707 class MyThread extends Thread 708 { 709 Object mutex; 710 711 public MyThread(String name, Object mutex) 712 { 713 super(name); 714 this.mutex = mutex; 715 } 716 717 public void run() 718 { 719 Transaction tx = null; 720 721 try 722 { 723 tx = beginTransaction(); 724 c1.put("/thread/" + getName(), null); 725 System.out.println("Thread " + getName() + " after put(): " + c1.toString()); 726 System.out.println("Thread " + getName() + " waiting on mutex"); 727 synchronized (mutex) 728 { 729 mutex.wait(); 730 } 731 System.out.println("Thread " + getName() + " committing"); 732 tx.commit(); 733 System.out.println("Thread " + getName() + " committed successfully"); 734 } 735 catch (Exception e) 736 { 737 exceptions.add(e); 738 } 739 finally 740 { 741 try 742 { 743 if (tx != null) tx.rollback(); 744 } 745 catch (Exception e) 746 { 747 } 748 } 749 } 750 } 751 752 MyThread[] threads = new MyThread[num_threads]; 753 for (int i = 0; i < threads.length; i++) 754 { 755 threads[i] = new MyThread("#" + i, myMutex); 756 } 757 for (int i = 0; i < threads.length; i++) 758 { 759 MyThread thread = threads[i]; 760 System.out.println("starting thread #" + i); 761 thread.start(); 762 } 763 764 TestingUtil.sleepThread(6000); 765 synchronized (myMutex) 766 { 767 System.out.println("cache is " + c1.printLockInfo()); 768 System.out.println("******************* SIGNALLING THREADS ********************"); 769 myMutex.notifyAll(); 770 } 771 772 for (MyThread thread : threads) 773 { 774 try 775 { 776 thread.join(); 777 System.out.println("Joined thread " + thread.getName()); 778 } 779 catch (InterruptedException e) 780 { 781 e.printStackTrace(); 782 } 783 } 784 785 System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo()); 786 787 assertEquals(0, c1.getNumberOfLocksHeld()); 788 assertEquals(0, c2.getNumberOfLocksHeld()); 789 790 c1.stop(); 791 c2.stop(); 792 793 799 803 for (Exception exception : exceptions) assertEquals(TimeoutException.class, exception.getClass()); 804 } 805 806 807 810 public void testConcurrentPutsOnTwoInstances() throws Exception 811 { 812 initCaches(Configuration.CacheMode.REPL_SYNC); 813 final CacheImpl c1 = this.cache1; 814 final CacheImpl c2 = this.cache2; 815 816 Thread t1 = new Thread () 817 { 818 Transaction tx; 819 820 public void run() 821 { 822 try 823 { 824 tx = beginTransaction(); 825 c1.put("/ben/wang", "name", "Ben Wang"); 826 TestingUtil.sleepThread(8000); 827 tx.commit(); } 829 catch (Throwable ex) 830 { 831 ex.printStackTrace(); 832 t1_ex = ex; 833 } 834 } 835 }; 836 837 Thread t2 = new Thread () 838 { 839 Transaction tx; 840 841 public void run() 842 { 843 try 844 { 845 TestingUtil.sleepThread(1000); tx = beginTransaction(); 847 c2.put("/ben/wang", "name", "Ben Jr."); 848 tx.commit(); } 850 catch (RollbackException rollback_ex) 851 { 852 System.out.println("received rollback exception as expected"); 853 } 854 catch (Throwable ex) 855 { 856 ex.printStackTrace(); 857 t2_ex = ex; 858 } 859 } 860 }; 861 862 t1.start(); 864 t2.start(); 865 866 t1.join(); 868 t2.join(); 869 870 if (t1_ex != null) 871 fail("Thread1 failed: " + t1_ex); 872 if (t2_ex != null) 873 fail("Thread2 failed: " + t2_ex); 874 assertEquals("Ben Wang", c1.get("/ben/wang", "name")); 875 } 876 877 878 public void testPut() throws Exception 879 { 880 initCaches(Configuration.CacheMode.REPL_SYNC); 881 final CacheImpl c1 = this.cache1; 882 883 884 Thread t1 = new Thread () 885 { 886 public void run() 887 { 888 try 889 { 890 lock.acquire(); 891 System.out.println("-- t1 has lock"); 892 c1.put("/a/b/c", "age", 38); 893 System.out.println("[Thread1] set value to 38"); 894 895 System.out.println("-- t1 releases lock"); 896 lock.release(); 897 TestingUtil.sleepThread(300); 898 Thread.yield(); 899 900 lock.acquire(); 901 System.out.println("-- t1 has lock"); 902 c1.put("/a/b/c", "age", 39); 903 System.out.println("[Thread1] set value to 39"); 904 905 System.out.println("-- t1 releases lock"); 906 lock.release(); 907 assertEquals(39, c1.get("/a/b/c", "age")); 908 } 909 catch (Throwable ex) 910 { 911 ex.printStackTrace(); 912 t1_ex = ex; 913 } 914 finally 915 { 916 lock.release(); 917 } 918 } 919 }; 920 921 Thread t2 = new Thread () 922 { 923 public void run() 924 { 925 try 926 { 927 TestingUtil.sleepThread(100); 928 Thread.yield(); 929 lock.acquire(); 930 System.out.println("-- t2 has lock"); 931 Integer val = (Integer ) cache2.get("/a/b/c", "age"); 933 System.out.println("[Thread2] value is " + val); 934 assertEquals(new Integer (38), val); 935 System.out.println("-- t2 releases lock"); 936 lock.release(); 937 TestingUtil.sleepThread(300); 938 Thread.yield(); 939 TestingUtil.sleepThread(500); 940 lock.acquire(); 941 System.out.println("-- t2 has lock"); 942 val = (Integer ) cache2.get("/a/b/c", "age"); 943 System.out.println("-- t2 releases lock"); 944 lock.release(); 945 assertEquals(new Integer (39), val); 946 } 947 catch (Throwable ex) 948 { 949 ex.printStackTrace(); 950 t2_ex = ex; 951 } 952 finally 953 { 954 lock.release(); 955 } 956 } 957 }; 958 959 t1.start(); 961 t2.start(); 962 963 t1.join(); 965 t2.join(); 966 if (t1_ex != null) 967 fail("Thread1 failed: " + t1_ex); 968 if (t2_ex != null) 969 fail("Thread2 failed: " + t2_ex); 970 } 971 972 981 public void testPutTx() throws Exception 982 { 983 Transaction tx = null; 984 985 try 986 { 987 initCaches(Configuration.CacheMode.REPL_SYNC); 988 cache1.getConfiguration().setSyncCommitPhase(true); 989 cache2.getConfiguration().setSyncCommitPhase(true); 990 tx = beginTransaction(); 991 cache1.put("/a/b/c", "age", 38); 992 cache1.put("/a/b/c", "age", 39); 993 Object val = cache2.get("/a/b/c", "age"); assertNull(val); 995 tx.commit(); 996 997 tx = beginTransaction(); 998 assertEquals(39, cache2.get("/a/b/c", "age")); tx.commit(); 1000 } 1001 catch (Throwable t) 1002 { 1003 t.printStackTrace(); 1004 t1_ex = t; 1005 } 1006 finally 1007 { 1008 lock.release(); 1009 } 1010 } 1011 1012 1013 1019 public void testPutTx1() throws Exception 1020 { 1021 initCaches(Configuration.CacheMode.REPL_SYNC); 1022 final CacheImpl c1 = this.cache1; 1023 Thread t1 = new Thread () 1024 { 1025 public void run() 1026 { 1027 Transaction tx = null; 1028 1029 try 1030 { 1031 lock.acquire(); 1032 tx = beginTransaction(); 1033 c1.put("/a/b/c", "age", 38); 1034 c1.put("/a/b/c", "age", 39); 1035 lock.release(); 1036 1037 TestingUtil.sleepThread(300); 1038 lock.acquire(); 1039 try 1040 { 1041 tx.commit(); 1042 } 1043 catch (RollbackException ex) 1044 { 1045 System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes"); 1046 } 1047 finally 1048 { 1049 lock.release(); 1050 } 1051 } 1052 catch (Throwable ex) 1053 { 1054 ex.printStackTrace(); 1055 t1_ex = ex; 1056 } 1057 finally 1058 { 1059 lock.release(); 1060 } 1061 } 1062 }; 1063 1064 Thread t2 = new Thread () 1065 { 1066 public void run() 1067 { 1068 Transaction tx = null; 1069 1070 try 1071 { 1072 sleep(200); 1073 Thread.yield(); 1074 lock.acquire(); 1075 tx = beginTransaction(); 1076 assertNull(cache2.get("/a/b/c", "age")); cache2.put("/a/b/c", "age", 40); 1078 lock.release(); 1079 1080 TestingUtil.sleepThread(300); 1081 lock.acquire(); 1082 assertEquals(40, cache2.get("/a/b/c", "age")); tx.commit(); 1084 lock.release(); 1085 1086 TestingUtil.sleepThread(1000); 1087 tx = beginTransaction(); 1088 assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age")); 1089 tx.commit(); 1090 } 1091 catch (Throwable ex) 1092 { 1093 ex.printStackTrace(); 1094 t2_ex = ex; 1095 } 1096 finally 1097 { 1098 lock.release(); 1099 } 1100 } 1101 }; 1102 1103 t1.start(); 1105 t2.start(); 1106 1107 t1.join(); 1108 t2.join(); 1109 1110 if (t1_ex != null) 1111 fail("Thread1 failed: " + t1_ex); 1112 if (t2_ex != null) 1113 fail("Thread2 failed: " + t2_ex); 1114 } 1115 1116 1117 public void testPutTxWithRollback() throws Exception 1118 { 1119 initCaches(Configuration.CacheMode.REPL_SYNC); 1120 final CacheImpl c2 = this.cache1; 1121 Thread t1 = new Thread () 1122 { 1123 public void run() 1124 { 1125 Transaction tx = null; 1126 1127 try 1128 { 1129 lock.acquire(); 1130 tx = beginTransaction(); 1131 c2.put("/a/b/c", "age", 38); 1132 c2.put("/a/b/c", "age", 39); 1133 lock.release(); 1134 1135 TestingUtil.sleepThread(100); 1136 lock.acquire(); 1137 tx.rollback(); 1138 lock.release(); 1139 } 1140 catch (Throwable ex) 1141 { 1142 ex.printStackTrace(); 1143 t1_ex = ex; 1144 } 1145 finally 1146 { 1147 lock.release(); 1148 } 1149 } 1150 }; 1151 1152 Thread t2 = new Thread () 1153 { 1154 public void run() 1155 { 1156 Transaction tx = null; 1157 1158 try 1159 { 1160 sleep(200); 1161 Thread.yield(); 1162 lock.acquire(); 1163 tx = beginTransaction(); 1164 assertNull(cache2.get("/a/b/c", "age")); lock.release(); 1166 1167 TestingUtil.sleepThread(100); 1168 lock.acquire(); 1169 assertNull(cache2.get("/a/b/c", "age")); tx.commit(); 1171 lock.release(); 1172 } 1173 catch (Throwable ex) 1174 { 1175 ex.printStackTrace(); 1176 t2_ex = ex; 1177 } 1178 finally 1179 { 1180 lock.release(); 1181 } 1182 } 1183 }; 1184 1185 t1.start(); 1187 t2.start(); 1188 1189 t1.join(); 1191 t2.join(); 1192 if (t1_ex != null) 1193 fail("Thread1 failed: " + t1_ex); 1194 if (t2_ex != null) 1195 fail("Thread2 failed: " + t2_ex); 1196 } 1197 1198 1199 static class TransactionAborter implements Synchronization 1200 { 1201 Transaction ltx = null; 1202 1203 public TransactionAborter(Transaction ltx) 1204 { 1205 this.ltx = ltx; 1206 } 1207 1208 public void beforeCompletion() 1209 { 1210 try 1211 { 1212 ltx.setRollbackOnly(); 1213 } 1214 catch (SystemException e) 1215 { 1216 } 1218 } 1219 1220 public void afterCompletion(int status) 1221 { 1222 } 1223 } 1224 1225 static class CallbackListener extends AbstractCacheListener 1226 { 1227 1228 CacheImpl callbackCache; 1229 Object callbackKey; 1230 Exception ex; 1231 Object mutex = new Object (); 1232 1233 CallbackListener(CacheImpl cache, Object callbackKey) 1234 { 1235 this.callbackCache = cache; 1236 this.callbackKey = callbackKey; 1237 cache.getNotifier().addCacheListener(this); 1238 } 1239 1240 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data) 1241 { 1242 if (!pre) 1243 { 1244 synchronized (mutex) 1247 { 1248 try 1249 { 1250 callbackCache.get(fqn, callbackKey); 1251 } 1252 catch (CacheException e) 1253 { 1254 e.printStackTrace(); 1255 ex = e; 1256 } 1257 } 1258 } 1259 } 1260 1261 Exception getCallbackException() 1262 { 1263 synchronized (mutex) 1264 { 1265 return ex; 1266 } 1267 } 1268 1269 } 1270 1271 static class TransactionAborterCallbackListener extends CallbackListener 1272 { 1273 1274 TransactionManager callbackTM; 1275 1276 TransactionAborterCallbackListener(CacheImpl cache, Object callbackKey) 1277 { 1278 super(cache, callbackKey); 1279 callbackTM = callbackCache.getTransactionManager(); 1280 } 1281 1282 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data) 1283 { 1284 if (!pre) 1285 { 1286 try 1287 { 1288 Transaction tx = callbackTM.getTransaction(); 1289 if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE) 1290 { 1291 tx.registerSynchronization(new TransactionAborter(tx)); 1293 } 1294 else 1295 { 1296 super.nodeModified(fqn, pre, isLocal, modType, data); 1297 } 1298 1299 } 1300 catch (Exception e) 1301 { 1302 e.printStackTrace(); 1303 if (ex == null) 1304 ex = e; 1305 } 1306 } 1307 } 1308 1309 } 1310 1311 static class TransactionAborterListener extends AbstractCacheListener 1312 { 1313 1314 TransactionManager callbackTM; 1315 Object mutex = new Object (); 1316 Exception ex; 1317 1318 TransactionAborterListener(CacheImpl cache) 1319 { 1320 callbackTM = cache.getTransactionManager(); 1321 cache.getNotifier().addCacheListener(this); 1322 } 1323 1324 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data) 1325 { 1326 if (!pre) 1327 { 1328 synchronized (mutex) 1329 { 1330 try 1331 { 1332 Transaction tx = callbackTM.getTransaction(); 1333 if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE) 1334 { 1335 tx.setRollbackOnly(); 1337 } 1338 } 1339 catch (Exception e) 1340 { 1341 e.printStackTrace(); 1342 if (ex == null) 1343 ex = e; 1344 } 1345 } 1346 } 1347 } 1348 1349 Exception getCallbackException() 1350 { 1351 synchronized (mutex) 1352 { 1353 return ex; 1354 } 1355 } 1356 1357 } 1358 1359 public static Test suite() 1360 { 1361 return new TestSuite(SyncReplTxTest.class); 1363 } 1364 1365 1366} 1367 | Popular Tags |