1 8 package org.jboss.cache.tests.replicated; 9 10 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore; 11 import junit.framework.Test; 12 import junit.framework.TestCase; 13 import junit.framework.TestSuite; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.jboss.cache.Fqn; 17 import org.jboss.cache.TreeCache; 18 import org.jboss.cache.lock.IsolationLevel; 19 import org.jboss.cache.transaction.DummyTransactionManager; 20 21 import javax.naming.Context ; 22 import javax.transaction.*; 23 24 31 public class SyncReplTxTest extends TestCase { 32 TreeCache cache1, cache2; 33 int caching_mode=TreeCache.REPL_SYNC; 34 final String group_name="TreeCacheTestGroup"; 35 String props= 36 "UDP(ip_mcast=true;ip_ttl=64;loopback=false;mcast_addr=228.1.2.3;" + 37 "mcast_port=45566;mcast_recv_buf_size=80000;mcast_send_buf_size=150000;" + 38 "ucast_recv_buf_size=80000;ucast_send_buf_size=150000):" + 39 "PING(down_thread=true;num_initial_members=2;timeout=500;up_thread=true):" + 40 "MERGE2(max_interval=20000;min_interval=10000):" + 41 "FD(down_thread=true;shun=true;up_thread=true):" + 42 "VERIFY_SUSPECT(down_thread=true;timeout=1500;up_thread=true):" + 43 "pbcast.NAKACK(down_thread=true;gc_lag=50;retransmit_timeout=600,1200,2400,4800;" + 44 "up_thread=true):" + 45 "pbcast.STABLE(desired_avg_gossip=20000;down_thread=true;up_thread=true):" + 46 "UNICAST(down_thread=true;min_threshold=10;timeout=600,1200,2400;window_size=100):" + 47 "FRAG(down_thread=true;frag_size=8192;up_thread=true):" + 48 "pbcast.GMS(join_retry_timeout=2000;join_timeout=5000;print_local_addr=true;shun=true):" + 49 "pbcast.STATE_TRANSFER(down_thread=true;up_thread=true)"; 50 51 final static Log log_=LogFactory.getLog(SyncReplTxTest.class); 52 String old_factory=null; 53 final String FACTORY="org.jboss.cache.transaction.DummyContextFactory"; 54 FIFOSemaphore lock=new FIFOSemaphore(1); 55 DummyTransactionManager tx_mgr; 56 Throwable t1_ex, t2_ex, ex=null; 57 58 59 60 public SyncReplTxTest(String name) { 61 super(name); 62 } 63 64 public void setUp() throws Exception { 65 super.setUp(); 66 old_factory=System.getProperty(Context.INITIAL_CONTEXT_FACTORY); 67 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY); 68 tx_mgr=DummyTransactionManager.getInstance(); 69 t1_ex=t2_ex=ex=null; 70 } 71 72 public void tearDown() throws Exception { 73 super.tearDown(); 74 DummyTransactionManager.destroy(); 75 destroyCaches(); 76 if(old_factory != null) { 77 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory); 78 old_factory=null; 79 } 80 } 81 82 Transaction beginTransaction() throws SystemException, NotSupportedException { 83 DummyTransactionManager mgr=DummyTransactionManager.getInstance(); 84 mgr.begin(); 85 Transaction tx=mgr.getTransaction(); 86 return tx; 87 } 88 89 void initCaches(int caching_mode) throws Exception { 90 this.caching_mode=caching_mode; 91 cache1=new TreeCache(); 92 cache2=new TreeCache(); 93 cache1.setCacheMode(caching_mode); 94 cache2.setCacheMode(caching_mode); 95 cache1.setIsolationLevel(IsolationLevel.SERIALIZABLE); 96 cache2.setIsolationLevel(IsolationLevel.SERIALIZABLE); 97 98 cache1.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 99 cache2.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 100 104 cache1.setLockAcquisitionTimeout(5000); 105 cache2.setLockAcquisitionTimeout(5000); 106 cache1.start(); 107 cache2.start(); 108 } 109 110 void destroyCaches() throws Exception { 111 if(cache1 != null) 112 cache1.stop(); 113 if(cache2 != null) 114 cache2.stop(); 115 cache1=null; 116 cache2=null; 117 } 118 119 120 public void testLockRemoval() throws Exception { 121 initCaches(TreeCache.REPL_SYNC); 122 cache1.setSyncCommitPhase(true); 123 cache1.releaseAllLocks("/"); 124 Transaction tx=beginTransaction(); 125 cache1.put("/bela/ban", "name", "Bela Ban"); 126 assertEquals(2, cache1.getNumberOfLocksHeld()); 127 assertEquals(0, cache2.getNumberOfLocksHeld()); 128 tx.commit(); 129 assertEquals(0, cache1.getNumberOfLocksHeld()); 130 assertEquals(0, cache2.getNumberOfLocksHeld()); 131 } 132 133 134 135 public void testSyncRepl() throws Exception { 136 Integer age; 137 Transaction tx; 138 139 try { 140 initCaches(TreeCache.REPL_SYNC); 141 cache1.setSyncCommitPhase(true); 142 cache2.setSyncCommitPhase(true); 143 144 146 tx=beginTransaction(); 147 cache1.put("/a/b/c", "age", new Integer (38)); 148 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age")); 149 tx.commit(); 150 151 age=(Integer )cache2.get("/a/b/c", "age"); 153 assertNotNull("\"age\" obtained from cache2 must be non-null ", age); 154 assertTrue("\"age\" must be 38", age.intValue() == 38); 155 } 156 catch(Exception e) { 157 fail(e.toString()); 158 } 159 } 160 161 162 public void testSyncReplWithModficationsOnBothCaches() throws Exception { 163 Integer age; 164 Transaction tx; 165 final Fqn NODE1=Fqn.fromString("/one/two/three"); 166 final Fqn NODE2=Fqn.fromString("/eins/zwei/drei"); 167 168 try { 169 initCaches(TreeCache.REPL_SYNC); 170 171 cache1.setSyncCommitPhase(true); 172 cache2.setSyncCommitPhase(true); 173 174 tx=beginTransaction(); 175 cache1.put(NODE1, "age", new Integer (38)); 176 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true)); 177 178 cache2.put(NODE2, "age", new Integer (39)); 179 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true)); 180 181 System.out.println("cache1 before commit:\n" + cache1.printLockInfo()); 182 System.out.println("cache2 before commit:\n" + cache2.printLockInfo()); 183 184 tx.commit(); 185 186 System.out.println("cache1 after commit:\n" + cache1.printLockInfo()); 187 System.out.println("cache2 after commit:\n" + cache2.printLockInfo()); 188 189 assertTrue(cache1.exists(NODE1)); 190 assertTrue(cache1.exists(NODE2)); 191 assertTrue(cache1.exists(NODE1)); 192 assertTrue(cache2.exists(NODE2)); 193 194 age=(Integer )cache1.get(NODE1, "age"); 195 assertNotNull("\"age\" obtained from cache1 for " + NODE1 + " must be non-null ", age); 196 assertTrue("\"age\" must be 38", age.intValue() == 38); 197 198 age=(Integer )cache2.get(NODE1, "age"); 199 assertNotNull("\"age\" obtained from cache2 for " + NODE1 + " must be non-null ", age); 200 assertTrue("\"age\" must be 38", age.intValue() == 38); 201 202 age=(Integer )cache1.get(NODE2, "age"); 203 assertNotNull("\"age\" obtained from cache1 for " + NODE2 + " must be non-null ", age); 204 assertTrue("\"age\" must be 39", age.intValue() == 39); 205 206 age=(Integer )cache2.get(NODE2, "age"); 207 assertNotNull("\"age\" obtained from cache2 for " + NODE2 + " must be non-null ", age); 208 assertTrue("\"age\" must be 39", age.intValue() == 39); 209 210 assertEquals(0, cache1.getNumberOfLocksHeld()); 211 assertEquals(0, cache2.getNumberOfLocksHeld()); 212 System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true)); 213 System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true)); 214 } 215 catch(Exception e) { 216 fail(e.toString()); 217 } 218 } 219 220 public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception { 221 Transaction tx; 222 final Fqn NODE=Fqn.fromString("/one/two/three"); 223 224 try { 225 initCaches(TreeCache.REPL_SYNC); 226 tx=beginTransaction(); 227 cache1.put(NODE, "age", new Integer (38)); 228 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true)); 229 230 cache2.put(NODE, "age", new Integer (39)); 231 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true)); 232 233 System.out.println("cache1 before commit:\n" + cache1.printLockInfo()); 234 System.out.println("cache2 before commit:\n" + cache2.printLockInfo()); 235 236 try { 237 tx.commit(); 238 fail("commit should throw a RollbackException, we should not get here"); 239 } 240 catch(RollbackException rollback) { 241 System.out.println("Transaction was rolled back, this is correct"); 242 } 243 244 System.out.println("cache1 after commit:\n" + cache1.printLockInfo()); 245 System.out.println("cache2 after commit:\n" + cache2.printLockInfo()); 246 247 assertEquals(0, cache1.getNumberOfLocksHeld()); 248 assertEquals(0, cache2.getNumberOfLocksHeld()); 249 250 assertEquals(0, cache1.getNumberOfNodes()); 251 assertEquals(0, cache2.getNumberOfNodes()); 252 } 253 catch(Exception e) { 254 fail(e.toString()); 255 } 256 } 257 258 259 public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception { 260 Transaction tx; 261 final Fqn NODE1=Fqn.fromString("/one/two/three"); 262 final Fqn NODE2=Fqn.fromString("/eins/zwei/drei"); 263 264 try { 265 initCaches(TreeCache.REPL_SYNC); 266 267 cache1.setSyncRollbackPhase(true); 268 cache2.setSyncRollbackPhase(true); 269 270 tx=beginTransaction(); 271 cache1.put(NODE1, "age", new Integer (38)); 272 cache2.put(NODE2, "age", new Integer (39)); 273 274 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo()); 275 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo()); 276 277 tx.registerSynchronization(new TransactionAborter(tx)); 279 280 try { 281 tx.commit(); 282 fail("commit should throw a RollbackException, we should not get here"); 283 } 284 catch(RollbackException rollback) { 285 System.out.println("Transaction was rolled back, this is correct"); 286 } 287 288 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo()); 289 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo()); 290 291 assertEquals(0, cache1.getNumberOfLocksHeld()); 292 assertEquals(0, cache2.getNumberOfLocksHeld()); 293 294 assertEquals(0, cache1.getNumberOfNodes()); 295 assertEquals(0, cache2.getNumberOfNodes()); 296 } 297 catch(Exception e) { 298 fail(e.toString()); 299 } 300 } 301 302 303 304 public void testASyncRepl() throws Exception { 305 Integer age; 306 Transaction tx; 307 308 initCaches(TreeCache.REPL_ASYNC); 309 310 try { 311 tx=beginTransaction(); 312 cache1.put("/a/b/c", "age", new Integer (38)); 313 Thread.sleep(1000); 314 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age")); 315 tx.commit(); 316 Thread.sleep(1000); 317 318 age=(Integer )cache2.get("/a/b/c", "age"); 320 assertNotNull("\"age\" obtained from cache2 is null ", age); 321 assertTrue("\"age\" must be 38", age.intValue() == 38); 322 } 323 catch(Exception e) { 324 fail(e.toString()); 325 } 326 } 327 328 363 public void testConcurrentPuts() throws Exception { 364 initCaches(TreeCache.REPL_SYNC); 365 cache1.setSyncCommitPhase(true); 366 367 Thread t1=new Thread ("Thread1") { 368 Transaction tx; 369 370 public void run() { 371 try { 372 tx=beginTransaction(); 373 cache1.put("/bela/ban", "name", "Bela Ban"); 374 _pause(2000); tx.commit(); 376 System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo()); 377 System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo()); 378 } 379 catch(Throwable ex) { 380 ex.printStackTrace(); 381 t1_ex=ex; 382 } 383 } 384 }; 385 386 Thread t2=new Thread ("Thread2") { 387 Transaction tx; 388 389 public void run() { 390 try { 391 _pause(1000); tx=beginTransaction(); 393 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 394 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 395 cache1.put("/bela/ban", "name", "Michelle Ban"); 396 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 397 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 398 tx.commit(); 399 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 400 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 401 } 402 catch(Throwable ex) { 403 ex.printStackTrace(); 404 t2_ex=ex; 405 } 406 } 407 }; 408 409 t1.start(); 411 t2.start(); 412 413 t1.join(); 415 t2.join(); 416 417 if(t1_ex != null) 418 fail("Thread1 failed: " + t1_ex); 419 if(t2_ex != null) 420 fail("Thread2 failed: " + t2_ex); 421 422 assertEquals("Michelle Ban", cache1.get("/bela/ban", "name")); 423 } 424 425 426 429 public void testConcurrentCommitsWith1Thread() throws Exception { 430 _testConcurrentCommits(1); 431 } 432 433 436 public void testConcurrentCommitsWith5Threads() throws Exception { 437 _testConcurrentCommits(5); 438 } 439 440 443 private void _testConcurrentCommits(int num_threads) throws Exception { 444 Object myMutex=new Object (); 445 446 final TreeCache c1=new TreeCache(); 447 final TreeCache c2=new TreeCache(); 448 c1.setClusterName("TempCluster"); 449 c2.setClusterName("TempCluster"); 450 c1.setCacheMode(TreeCache.REPL_SYNC); 451 c2.setCacheMode(TreeCache.REPL_SYNC); 452 c1.setSyncCommitPhase(true); 453 c2.setSyncCommitPhase(true); 454 c1.setIsolationLevel(IsolationLevel.REPEATABLE_READ); 455 c2.setIsolationLevel(IsolationLevel.REPEATABLE_READ); 456 c1.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 457 c2.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 458 c1.setLockAcquisitionTimeout(5000); 459 c2.setLockAcquisitionTimeout(5000); 460 c1.start(); 461 c2.start(); 462 463 class MyThread extends Thread { 464 Object mutex; 465 466 public MyThread(String name, Object mutex) { 467 super(name); 468 this.mutex=mutex; 469 } 470 471 public void run() { 472 Transaction tx; 473 474 try { 475 tx=beginTransaction(); 476 c1.put("/thread/" + getName(), null); 477 System.out.println("Thread " + getName() + " after put(): " + c1.toString()); 478 System.out.println("Thread " + getName() + " waiting on mutex"); 479 synchronized(mutex) { 480 mutex.wait(); 481 } 482 System.out.println("Thread " + getName() + " committing"); 483 tx.commit(); 484 System.out.println("Thread " + getName() + " committed successfully"); 485 } 486 catch(Exception e) { 487 ex=e; 488 } 489 } 490 } 491 492 MyThread[] threads=new MyThread[num_threads]; 493 for(int i=0; i < threads.length; i++) { 494 threads[i]=new MyThread("#" + i, myMutex); 495 } 496 for(int i=0; i < threads.length; i++) { 497 MyThread thread=threads[i]; 498 System.out.println("starting thread #" + i); 499 thread.start(); 500 } 501 502 _pause(6000); 503 synchronized(myMutex) { 504 System.out.println("cache is " + c1.printLockInfo()); 505 System.out.println("******************* SIGNALLING THREADS ********************"); 506 myMutex.notifyAll(); 507 } 508 509 for(int i=0; i < threads.length; i++) { 510 MyThread thread=threads[i]; 511 try { 512 thread.join(); 513 System.out.println("Joined thread " + thread.getName()); 514 } 515 catch(InterruptedException e) { 516 e.printStackTrace(); 517 } 518 } 519 520 System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo()); 521 522 assertEquals(0, c1.getNumberOfLocksHeld()); 523 assertEquals(0, c2.getNumberOfLocksHeld()); 524 525 c1.stop(); 526 c2.stop(); 527 528 if(ex != null) 529 fail("Thread failed: " + ex); 530 } 531 532 533 536 public void testConcurrentPutsOnTwoInstances() throws Exception { 537 initCaches(TreeCache.REPL_SYNC); 538 final TreeCache c1=this.cache1; 539 final TreeCache c2=this.cache2; 540 541 Thread t1=new Thread () { 542 Transaction tx; 543 544 public void run() { 545 try { 546 tx=beginTransaction(); 547 c1.put("/ben/wang", "name", "Ben Wang"); 548 _pause(8000); 549 tx.commit(); } 551 catch(Throwable ex) { 552 ex.printStackTrace(); 553 t1_ex=ex; 554 } 555 } 556 }; 557 558 Thread t2=new Thread () { 559 Transaction tx; 560 561 public void run() { 562 try { 563 _pause(1000); tx=beginTransaction(); 565 c2.put("/ben/wang", "name", "Ben Jr."); 566 tx.commit(); } 568 catch(RollbackException rollback_ex) { 569 System.out.println("received rollback exception as expected"); 570 } 571 catch(Throwable ex) { 572 ex.printStackTrace(); 573 t2_ex=ex; 574 } 575 } 576 }; 577 578 t1.start(); 580 t2.start(); 581 582 t1.join(); 584 t2.join(); 585 586 if(t1_ex != null) 587 fail("Thread1 failed: " + t1_ex); 588 if(t2_ex != null) 589 fail("Thread2 failed: " + t2_ex); 590 assertEquals("Ben Wang", c1.get("/ben/wang", "name")); 591 } 592 593 594 public void testPut() throws Exception { 595 initCaches(TreeCache.REPL_SYNC); 596 final TreeCache c1=this.cache1; 597 598 599 Thread t1=new Thread () { 600 public void run() { 601 try { 602 lock.acquire(); 603 System.out.println("-- t1 has lock"); 604 c1.put("/a/b/c", "age", new Integer (38)); 605 System.out.println("[Thread1] set value to 38"); 606 607 System.out.println("-- t1 releases lock"); 608 lock.release(); 609 _pause(300); 610 Thread.yield(); 611 612 lock.acquire(); 613 System.out.println("-- t1 has lock"); 614 c1.put("/a/b/c", "age", new Integer (39)); 615 System.out.println("[Thread1] set value to 39"); 616 617 System.out.println("-- t1 releases lock"); 618 lock.release(); 619 assertEquals(new Integer (39), c1.get("/a/b/c", "age")); 620 } 621 catch(Throwable ex) { 622 ex.printStackTrace(); 623 t1_ex=ex; 624 } 625 finally { 626 lock.release(); 627 } 628 } 629 }; 630 631 Thread t2=new Thread () { 632 public void run() { 633 try { 634 _pause(100); 635 Thread.yield(); 636 lock.acquire(); 637 System.out.println("-- t2 has lock"); 638 Integer val=(Integer )cache2.get("/a/b/c", "age"); 640 System.out.println("[Thread2] value is " + val); 641 assertEquals(new Integer (38), val); 642 System.out.println("-- t2 releases lock"); 643 lock.release(); 644 _pause(300); 645 Thread.yield(); 646 647 lock.acquire(); 648 System.out.println("-- t2 has lock"); 649 val=(Integer )cache2.get("/a/b/c", "age"); 650 System.out.println("-- t2 releases lock"); 651 lock.release(); 652 assertEquals(new Integer (39), val); 653 } 654 catch(Throwable ex) { 655 ex.printStackTrace(); 656 t2_ex=ex; 657 } 658 finally { 659 lock.release(); 660 } 661 } 662 }; 663 664 t1.start(); 666 t2.start(); 667 668 t1.join(); 670 t2.join(); 671 if(t1_ex != null) 672 fail("Thread1 failed: " + t1_ex); 673 if(t2_ex != null) 674 fail("Thread2 failed: " + t2_ex); 675 } 676 677 686 public void testPutTx() throws Exception { 687 Transaction tx=null; 688 689 try { 690 initCaches(TreeCache.REPL_SYNC); 691 cache1.setSyncCommitPhase(true); 692 cache2.setSyncCommitPhase(true); 693 tx=beginTransaction(); 694 cache1.put("/a/b/c", "age", new Integer (38)); 695 cache1.put("/a/b/c", "age", new Integer (39)); 696 Object val=cache2.get("/a/b/c", "age"); assertNull(val); 698 tx.commit(); 699 700 tx=beginTransaction(); 701 assertEquals(new Integer (39), cache2.get("/a/b/c", "age")); tx.commit(); 703 } 704 catch(Throwable t) { 705 t.printStackTrace(); 706 t1_ex=t; 707 } 708 finally { 709 lock.release(); 710 } 711 } 712 713 714 720 public void testPutTx1() throws Exception { 721 initCaches(TreeCache.REPL_SYNC); 722 final TreeCache c1=this.cache1; 723 Thread t1=new Thread () { 724 public void run() { 725 Transaction tx=null; 726 727 try { 728 lock.acquire(); 729 tx=beginTransaction(); 730 c1.put("/a/b/c", "age", new Integer (38)); 731 c1.put("/a/b/c", "age", new Integer (39)); 732 lock.release(); 733 734 _pause(300); 735 lock.acquire(); 736 try { 737 tx.commit(); 738 } 739 catch(RollbackException ex) { 740 System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes"); 741 return; 742 } 743 finally { 744 lock.release(); 745 } 746 } 747 catch(Throwable ex) { 748 ex.printStackTrace(); 749 t1_ex=ex; 750 } 751 finally { 752 lock.release(); 753 } 754 } 755 }; 756 757 Thread t2=new Thread () { 758 public void run() { 759 Transaction tx=null; 760 761 try { 762 sleep(200); 763 Thread.yield(); 764 lock.acquire(); 765 tx=beginTransaction(); 766 assertNull(cache2.get("/a/b/c", "age")); cache2.put("/a/b/c", "age", new Integer (40)); 768 lock.release(); 769 770 _pause(300); 771 lock.acquire(); 772 assertEquals(new Integer (40), cache2.get("/a/b/c", "age")); tx.commit(); 774 lock.release(); 775 776 _pause(1000); 777 tx=beginTransaction(); 778 assertEquals("After cache2 commit", new Integer (40), cache2.get("/a/b/c", "age")); 779 tx.commit(); 780 } 781 catch(Throwable ex) { 782 ex.printStackTrace(); 783 t2_ex=ex; 784 } 785 finally { 786 lock.release(); 787 } 788 } 789 }; 790 791 t1.start(); 793 t2.start(); 794 795 t1.join(); 796 t2.join(); 797 798 if(t1_ex != null) 799 fail("Thread1 failed: " + t1_ex); 800 if(t2_ex != null) 801 fail("Thread2 failed: " + t2_ex); 802 } 803 804 805 806 public void testPutTxWithRollback() throws Exception { 807 initCaches(TreeCache.REPL_SYNC); 808 final TreeCache c2=this.cache1; 809 Thread t1=new Thread () { 810 public void run() { 811 Transaction tx=null; 812 813 try { 814 lock.acquire(); 815 tx=beginTransaction(); 816 c2.put("/a/b/c", "age", new Integer (38)); 817 c2.put("/a/b/c", "age", new Integer (39)); 818 lock.release(); 819 820 _pause(100); 821 lock.acquire(); 822 tx.rollback(); 823 lock.release(); 824 } 825 catch(Throwable ex) { 826 ex.printStackTrace(); 827 t1_ex=ex; 828 } 829 finally { 830 lock.release(); 831 } 832 } 833 }; 834 835 Thread t2=new Thread () { 836 public void run() { 837 Transaction tx=null; 838 839 try { 840 sleep(200); 841 Thread.yield(); 842 lock.acquire(); 843 tx=beginTransaction(); 844 assertNull(cache2.get("/a/b/c", "age")); lock.release(); 846 847 _pause(100); 848 lock.acquire(); 849 assertNull(cache2.get("/a/b/c", "age")); tx.commit(); 851 lock.release(); 852 } 853 catch(Throwable ex) { 854 ex.printStackTrace(); 855 t2_ex=ex; 856 } 857 finally { 858 lock.release(); 859 } 860 } 861 }; 862 863 t1.start(); 865 t2.start(); 866 867 t1.join(); 869 t2.join(); 870 if(t1_ex != null) 871 fail("Thread1 failed: " + t1_ex); 872 if(t2_ex != null) 873 fail("Thread2 failed: " + t2_ex); 874 } 875 876 877 static class TransactionAborter implements Synchronization { 878 Transaction ltx=null; 879 880 public TransactionAborter(Transaction ltx) { 881 this.ltx=ltx; 882 } 883 884 public void beforeCompletion() { 885 try { 886 ltx.setRollbackOnly(); 887 } 888 catch(SystemException e) { 889 } 890 } 891 892 public void afterCompletion(int status) { 893 } 894 } 895 896 897 static void _pause(long millis) { 898 try { 899 Thread.sleep(millis); 900 } 901 catch(Exception t) { 902 } 903 } 904 905 public static Test suite() throws Exception { 906 return new TestSuite(SyncReplTxTest.class); 908 } 909 910 911 } 912 | Popular Tags |