1 8 package org.jboss.test.cache.test.replicated; 9 10 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore; 11 import junit.framework.Test; 12 import junit.framework.TestCase; 13 import junit.framework.TestSuite; 14 import org.jboss.cache.PropertyConfigurator; 15 import org.jboss.cache.TreeCache; 16 import org.jboss.cache.lock.IsolationLevel; 17 import org.jboss.cache.transaction.DummyTransactionManager; 18 import org.jboss.logging.Logger; 19 20 import javax.naming.Context ; 21 import javax.transaction.NotSupportedException ; 22 import javax.transaction.RollbackException ; 23 import javax.transaction.SystemException ; 24 import javax.transaction.Transaction ; 25 26 33 public class SyncTxUnitTestCase extends TestCase { 34 TreeCache cache1, cache2; 35 int caching_mode=TreeCache.REPL_SYNC; 36 final String group_name="TreeCacheTestGroup"; 37 String props= 38 "UDP(ip_mcast=true;ip_ttl=64;loopback=false;mcast_addr=228.1.2.3;" + 39 "mcast_port=45566;mcast_recv_buf_size=80000;mcast_send_buf_size=150000;" + 40 "ucast_recv_buf_size=80000;ucast_send_buf_size=150000):" + 41 "PING(down_thread=true;num_initial_members=2;timeout=500;up_thread=true):" + 42 "MERGE2(max_interval=20000;min_interval=10000):" + 43 "FD(down_thread=true;shun=true;up_thread=true):" + 44 "VERIFY_SUSPECT(down_thread=true;timeout=1500;up_thread=true):" + 45 "pbcast.NAKACK(down_thread=true;gc_lag=50;retransmit_timeout=600,1200,2400,4800;" + 46 "up_thread=true):" + 47 "pbcast.STABLE(desired_avg_gossip=20000;down_thread=true;up_thread=true):" + 48 "UNICAST(down_thread=true;min_threshold=10;timeout=600,1200,2400;window_size=100):" + 49 "FRAG(down_thread=true;frag_size=8192;up_thread=true):" + 50 "pbcast.GMS(join_retry_timeout=2000;join_timeout=5000;print_local_addr=true;shun=true):" + 51 "pbcast.STATE_TRANSFER(down_thread=true;up_thread=true)"; 52 53 final static Logger log_=Logger.getLogger(SyncTxUnitTestCase.class); 54 String old_factory=null; 55 final String FACTORY="org.jboss.cache.transaction.DummyContextFactory"; 56 FIFOSemaphore lock=new FIFOSemaphore(1); 57 DummyTransactionManager tx_mgr; 58 Throwable t1_ex, t2_ex; 59 60 61 62 public SyncTxUnitTestCase(String name) { 63 super(name); 64 } 65 66 public void setUp() throws Exception { 67 super.setUp(); 68 old_factory=System.getProperty(Context.INITIAL_CONTEXT_FACTORY); 69 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY); 70 tx_mgr=DummyTransactionManager.getInstance(); 71 initCaches(TreeCache.REPL_SYNC); 72 t1_ex=t2_ex=null; 73 } 74 75 public void tearDown() throws Exception { 76 super.tearDown(); 77 DummyTransactionManager.destroy(); 78 destroyCaches(); 79 if(old_factory != null) { 80 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory); 81 old_factory=null; 82 } 83 } 84 85 Transaction beginTransaction() throws SystemException , NotSupportedException { 86 tx_mgr.begin(); 87 return tx_mgr.getTransaction(); 88 } 89 90 void initCaches(int caching_mode) throws Exception { 91 this.caching_mode=caching_mode; 92 cache1=new TreeCache(); 93 cache2=new TreeCache(); 94 PropertyConfigurator config=new PropertyConfigurator(); 95 config.configure(cache1, "META-INF/replSync-service.xml"); config.configure(cache2, "META-INF/replSync-service.xml"); cache1.setCacheMode(caching_mode); 98 cache2.setCacheMode(caching_mode); 99 100 cache1.setIsolationLevel(IsolationLevel.SERIALIZABLE); 101 cache2.setIsolationLevel(IsolationLevel.SERIALIZABLE); 102 106 cache1.setLockAcquisitionTimeout(5000); 107 cache2.setLockAcquisitionTimeout(5000); 108 cache1.start(); 109 cache2.start(); 110 } 111 112 void destroyCaches() throws Exception { 113 cache1.stop(); 114 cache2.stop(); 115 cache1=null; 116 cache2=null; 117 } 118 119 120 public void testLockRemoval() throws Exception { 121 cache1.setSyncCommitPhase(true); 122 cache1.releaseAllLocks("/"); 123 Transaction tx=beginTransaction(); 124 cache1.put("/bela/ban", "name", "Bela Ban"); 125 assertEquals(2, cache1.getNumberOfLocksHeld()); 126 assertEquals(0, cache2.getNumberOfLocksHeld()); 127 tx.commit(); 128 assertEquals(0, cache1.getNumberOfLocksHeld()); 129 assertEquals(0, cache2.getNumberOfLocksHeld()); 130 } 131 132 133 134 public void testSyncRepl() throws Exception { 135 Integer age; 136 Transaction tx; 137 138 try { 139 tx=beginTransaction(); 141 cache1.put("/a/b/c", "age", new Integer (38)); 142 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age")); 143 tx.commit(); 144 145 age=(Integer )cache2.get("/a/b/c", "age"); 147 assertNotNull("\"age\" obtained from cache2 must be non-null ", age); 148 assertTrue("\"age\" must be 38", age.intValue() == 38); 149 } 150 catch(Exception e) { 151 fail(e.toString()); 152 } 153 } 154 155 156 157 public void testASyncRepl() throws Exception { 158 Integer age; 159 Transaction tx; 160 161 cache1.setCacheMode(TreeCache.REPL_ASYNC); 162 cache2.setCacheMode(TreeCache.REPL_ASYNC); 163 164 try { 165 tx=beginTransaction(); 166 cache1.put("/a/b/c", "age", new Integer (38)); 167 Thread.sleep(100); 168 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age")); 169 tx.commit(); 170 Thread.sleep(100); 171 172 age=(Integer )cache2.get("/a/b/c", "age"); 174 assertNotNull("\"age\" obtained from cache2 is null ", age); 175 assertTrue("\"age\" must be 38", age.intValue() == 38); 176 } 177 catch(Exception e) { 178 fail(e.toString()); 179 } 180 } 181 182 217 public void testConcurrentPuts() throws Exception { 218 cache1.setSyncCommitPhase(true); 219 220 Thread t1=new Thread ("Thread1") { 221 Transaction tx; 222 223 public void run() { 224 try { 225 tx=beginTransaction(); 226 cache1.put("/bela/ban", "name", "Bela Ban"); 227 _pause(2000); tx.commit(); 229 System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo()); 230 System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo()); 231 } 232 catch(Throwable ex) { 233 ex.printStackTrace(); 234 t1_ex=ex; 235 } 236 } 237 }; 238 239 Thread t2=new Thread ("Thread2") { 240 Transaction tx; 241 242 public void run() { 243 try { 244 _pause(1000); tx=beginTransaction(); 246 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 247 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 248 cache1.put("/bela/ban", "name", "Michelle Ban"); 249 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 250 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 251 tx.commit(); 252 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo()); 253 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo()); 254 } 255 catch(Throwable ex) { 256 ex.printStackTrace(); 257 t2_ex=ex; 258 } 259 } 260 }; 261 262 t1.start(); 264 t2.start(); 265 266 t1.join(); 268 t2.join(); 269 270 if(t1_ex != null) 271 fail("Thread1 failed: " + t1_ex); 272 if(t2_ex != null) 273 fail("Thread2 failed: " + t2_ex); 274 275 assertEquals("Michelle Ban", cache1.get("/bela/ban", "name")); 276 } 277 278 279 282 public void testConcurrentPutsOnTwoInstances() throws Exception { 283 final TreeCache cache1=this.cache1; 284 final TreeCache cache2=this.cache2; 285 286 Thread t1=new Thread () { 287 Transaction tx; 288 289 public void run() { 290 try { 291 tx=beginTransaction(); 292 cache1.put("/ben/wang", "name", "Ben Wang"); 293 _pause(8000); 294 tx.commit(); } 296 catch(Throwable ex) { 297 ex.printStackTrace(); 298 t1_ex=ex; 299 } 300 } 301 }; 302 303 Thread t2=new Thread () { 304 Transaction tx; 305 306 public void run() { 307 try { 308 _pause(1000); tx=beginTransaction(); 310 cache2.put("/ben/wang", "name", "Ben Jr."); 311 tx.commit(); } 313 catch(RollbackException rollback_ex) { 314 System.out.println("received rollback exception as expected"); 315 } 316 catch(Throwable ex) { 317 ex.printStackTrace(); 318 t2_ex=ex; 319 } 320 } 321 }; 322 323 t1.start(); 325 t2.start(); 326 327 t1.join(); 329 t2.join(); 330 331 if(t1_ex != null) 332 fail("Thread1 failed: " + t1_ex); 333 if(t2_ex != null) 334 fail("Thread2 failed: " + t2_ex); 335 assertEquals("Ben Wang", cache1.get("/ben/wang", "name")); 336 } 337 338 339 public void testPut() throws Exception { 340 final TreeCache cache1=this.cache1; 341 final TreeCache cache2=this.cache2; 342 343 344 Thread t1=new Thread () { 345 public void run() { 346 try { 347 lock.acquire(); 348 System.out.println("-- t1 has lock"); 349 cache1.put("/a/b/c", "age", new Integer (38)); 350 System.out.println("[Thread1] set value to 38"); 351 352 System.out.println("-- t1 releases lock"); 353 lock.release(); 354 _pause(300); 355 Thread.yield(); 356 357 lock.acquire(); 358 System.out.println("-- t1 has lock"); 359 cache1.put("/a/b/c", "age", new Integer (39)); 360 System.out.println("[Thread1] set value to 39"); 361 362 System.out.println("-- t1 releases lock"); 363 lock.release(); 364 assertEquals(new Integer (39), cache1.get("/a/b/c", "age")); 365 } 366 catch(Throwable ex) { 367 ex.printStackTrace(); 368 t1_ex=ex; 369 } 370 finally { 371 lock.release(); 372 } 373 } 374 }; 375 376 Thread t2=new Thread () { 377 public void run() { 378 try { 379 _pause(100); 380 Thread.yield(); 381 lock.acquire(); 382 System.out.println("-- t2 has lock"); 383 Integer val=(Integer )cache2.get("/a/b/c", "age"); 385 System.out.println("[Thread2] value is " + val); 386 assertEquals(new Integer (38), val); 387 System.out.println("-- t2 releases lock"); 388 lock.release(); 389 _pause(300); 390 Thread.yield(); 391 392 lock.acquire(); 393 System.out.println("-- t2 has lock"); 394 val=(Integer )cache2.get("/a/b/c", "age"); 395 System.out.println("-- t2 releases lock"); 396 lock.release(); 397 assertEquals(new Integer (39), val); 398 } 399 catch(Throwable ex) { 400 ex.printStackTrace(); 401 t2_ex=ex; 402 } 403 finally { 404 lock.release(); 405 } 406 } 407 }; 408 409 t1.start(); 411 t2.start(); 412 413 t1.join(); 415 t2.join(); 416 if(t1_ex != null) 417 fail("Thread1 failed: " + t1_ex); 418 if(t2_ex != null) 419 fail("Thread2 failed: " + t2_ex); 420 } 421 422 431 public void testPutTx() throws Exception { 432 Transaction tx=null; 433 434 try { 435 tx=beginTransaction(); 436 cache1.put("/a/b/c", "age", new Integer (38)); 437 cache1.put("/a/b/c", "age", new Integer (39)); 438 Object val=cache2.get("/a/b/c", "age"); assertNull(val); 440 tx.commit(); 441 442 tx=beginTransaction(); 443 assertEquals(new Integer (39), cache2.get("/a/b/c", "age")); tx.commit(); 445 } 446 catch(Throwable ex) { 447 ex.printStackTrace(); 448 t1_ex=ex; 449 } 450 finally { 451 lock.release(); 452 } 453 } 454 455 456 462 public void testPutTx1() throws Exception { 463 final TreeCache cache1=this.cache1; 464 final TreeCache cache2=this.cache2; 465 Thread t1=new Thread () { 466 public void run() { 467 Transaction tx=null; 468 469 try { 470 lock.acquire(); 471 tx=beginTransaction(); 472 cache1.put("/a/b/c", "age", new Integer (38)); 473 cache1.put("/a/b/c", "age", new Integer (39)); 474 lock.release(); 475 476 _pause(300); 477 lock.acquire(); 478 try { 479 tx.commit(); 480 } 481 catch(RollbackException ex) { 482 System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes"); 483 return; 484 } 485 finally { 486 lock.release(); 487 } 488 } 489 catch(Throwable ex) { 490 ex.printStackTrace(); 491 t1_ex=ex; 492 } 493 finally { 494 lock.release(); 495 } 496 } 497 }; 498 499 Thread t2=new Thread () { 500 public void run() { 501 Transaction tx=null; 502 503 try { 504 sleep(200); 505 Thread.yield(); 506 lock.acquire(); 507 tx=beginTransaction(); 508 assertNull(cache2.get("/a/b/c", "age")); cache2.put("/a/b/c", "age", new Integer (40)); 510 lock.release(); 511 512 _pause(300); 513 lock.acquire(); 514 assertEquals(new Integer (40), cache2.get("/a/b/c", "age")); tx.commit(); 516 lock.release(); 517 518 _pause(1000); 519 tx=beginTransaction(); 520 assertEquals("After cache2 commit", new Integer (40), cache2.get("/a/b/c", "age")); 521 tx.commit(); 522 } 523 catch(Throwable ex) { 524 ex.printStackTrace(); 525 t2_ex=ex; 526 } 527 finally { 528 lock.release(); 529 } 530 } 531 }; 532 533 t1.start(); 535 t2.start(); 536 537 t1.join(); 538 t2.join(); 539 540 if(t1_ex != null) 541 fail("Thread1 failed: " + t1_ex); 542 if(t2_ex != null) 543 fail("Thread2 failed: " + t2_ex); 544 } 545 546 547 548 public void testPutTxWithRollback() throws Exception { 549 final TreeCache cache1=this.cache1; 550 final TreeCache cache2=this.cache2; 551 Thread t1=new Thread () { 552 public void run() { 553 Transaction tx=null; 554 555 try { 556 lock.acquire(); 557 tx=beginTransaction(); 558 cache1.put("/a/b/c", "age", new Integer (38)); 559 cache1.put("/a/b/c", "age", new Integer (39)); 560 lock.release(); 561 562 _pause(100); 563 lock.acquire(); 564 tx.rollback(); 565 lock.release(); 566 } 567 catch(Throwable ex) { 568 ex.printStackTrace(); 569 t1_ex=ex; 570 } 571 finally { 572 lock.release(); 573 } 574 } 575 }; 576 577 Thread t2=new Thread () { 578 public void run() { 579 Transaction tx=null; 580 581 try { 582 sleep(200); 583 Thread.yield(); 584 lock.acquire(); 585 tx=beginTransaction(); 586 assertNull(cache2.get("/a/b/c", "age")); lock.release(); 588 589 _pause(100); 590 lock.acquire(); 591 assertNull(cache2.get("/a/b/c", "age")); tx.commit(); 593 lock.release(); 594 } 595 catch(Throwable ex) { 596 ex.printStackTrace(); 597 t2_ex=ex; 598 } 599 finally { 600 lock.release(); 601 } 602 } 603 }; 604 605 t1.start(); 607 t2.start(); 608 609 t1.join(); 611 t2.join(); 612 if(t1_ex != null) 613 fail("Thread1 failed: " + t1_ex); 614 if(t2_ex != null) 615 fail("Thread2 failed: " + t2_ex); 616 } 617 618 static void _pause(long millis) { 619 try { 620 Thread.sleep(millis); 621 } 622 catch(Exception ex) { 623 } 624 } 625 626 public static Test suite() throws Exception { 627 return new TestSuite(SyncTxUnitTestCase.class); 629 } 630 631 632 } 633 | Popular Tags |