1 22 23 package org.jboss.cache.statetransfer; 24 25 import org.jboss.cache.AbstractCacheListener; 26 import org.jboss.cache.CacheException; 27 import org.jboss.cache.CacheSPI; 28 import org.jboss.cache.Fqn; 29 import org.jboss.cache.Version; 30 import org.jboss.cache.misc.TestingUtil; 31 32 import javax.transaction.Synchronization ; 33 import javax.transaction.Transaction ; 34 import javax.transaction.TransactionManager ; 35 import java.util.Map ; 36 37 38 46 public class ForcedStateTransferTest extends StateTransferTestBase 47 { 48 52 static class CacheStarter extends Thread 53 { 54 CacheSPI cache; 55 boolean useMarshalling; 56 Exception failure; 57 58 CacheStarter(CacheSPI cache, boolean useMarshalling) 59 { 60 this.cache = cache; 61 this.useMarshalling = useMarshalling; 62 } 63 64 public void run() 65 { 66 try 67 { 68 cache.start(); 69 70 if (useMarshalling) 71 { 72 TestingUtil.blockUntilViewReceived(cache, 2, 60000); 76 cache.getRegion(Fqn.ROOT, true).activate(); 77 } 78 } 79 catch (Exception e) 80 { 81 failure = e; 82 } 83 } 84 } 85 86 90 static abstract class TaskRunner extends Thread 91 { 92 CacheSPI cache; 93 Fqn fqn; 94 String value; 95 Exception failure; 96 boolean asleep = false; 97 98 TaskRunner(CacheSPI cache, String rootFqn, String value) 99 { 100 this.cache = cache; 101 this.value = value; 102 this.fqn = new Fqn(Fqn.fromString(rootFqn), value); 103 } 104 105 public void run() 106 { 107 try 108 { 109 executeTask(); 111 } 112 catch (Exception e) 113 { 114 if (!isDone()) 115 failure = e; 116 } 117 finally 118 { 119 asleep = false; 120 finalCleanup(); 122 } 123 } 124 125 abstract void executeTask() throws Exception ; 126 127 abstract boolean isDone(); 128 129 void finalCleanup() 130 { 131 } 132 133 boolean isAsleep() 134 { 135 return asleep; 136 } 137 } 138 139 142 static class TxRunner extends TaskRunner 143 { 144 Transaction tx = null; 145 boolean rollback = false; 146 boolean done = true; 147 148 TxRunner(CacheSPI cache, String rootFqn, String value, boolean rollback) 149 { 150 super(cache, rootFqn, value); 151 this.rollback = rollback; 152 } 153 154 void executeTask() throws Exception 155 { 156 TransactionManager tm = cache.getTransactionManager(); 157 tm.begin(); 158 tx = tm.getTransaction(); 159 160 cache.put(fqn, "KEY", value); 161 162 if (rollback) 163 tx.setRollbackOnly(); 164 165 asleep = true; 166 TestingUtil.sleepThread((long) 25000); 167 done = true; 168 } 169 170 void finalCleanup() 171 { 172 if (tx != null) 173 { 174 try { tx.commit(); } catch (Exception ignore) {} 175 } 176 } 177 178 boolean isDone() 179 { 180 return done; 181 } 182 } 183 184 187 static class HangThreadListener extends AbstractCacheListener 188 { 189 boolean asleep; 190 Fqn toHang; 191 boolean alreadyHung; 192 boolean done; 193 194 HangThreadListener(Fqn toHang) 195 { 196 this.toHang = toHang; 197 } 198 199 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data) 200 { 201 if (!pre) hangThread(fqn); 202 } 203 204 private void hangThread(Fqn fqn) 205 { 206 if (!alreadyHung && toHang.equals(fqn)) 207 { 208 asleep = true; 209 alreadyHung = true; 211 TestingUtil.sleepThread((long) 30000); 212 done = true; 213 asleep = false; 214 } 215 } 216 } 217 218 221 static class HangThreadRunner extends TaskRunner 222 { 223 HangThreadListener listener; 224 225 HangThreadRunner(CacheSPI cache, String rootFqn, String value) 226 { 227 super(cache, rootFqn, value); 228 listener = new HangThreadListener(fqn); 229 cache.addCacheListener(listener); 230 } 231 232 void executeTask() throws Exception 233 { 234 cache.put(fqn, "KEY", value); 236 } 237 238 boolean isAsleep() 239 { 240 return listener.asleep; 241 } 242 243 boolean isDone() 244 { 245 return listener.done; 246 } 247 } 248 249 253 static class HangThreadSynchronization implements Synchronization 254 { 255 boolean asleep; 256 boolean hangBefore; 257 boolean done; 258 259 HangThreadSynchronization(boolean hangBefore) 260 { 261 this.hangBefore = hangBefore; 262 } 263 264 public void beforeCompletion() 265 { 266 if (hangBefore) 267 { 268 hang(); 269 } 270 } 271 272 public void afterCompletion(int status) 273 { 274 if (!hangBefore) 275 { 276 hang(); 277 } 278 } 279 280 void hang() 281 { 282 asleep = true; 283 TestingUtil.sleepThread((long) 30000); 284 done = true; 285 } 286 287 } 288 289 293 static class SynchronizationTxRunner extends TaskRunner 294 { 295 Transaction tx = null; 296 HangThreadSynchronization sync; 297 298 SynchronizationTxRunner(CacheSPI cache, String rootFqn, String value, boolean hangBefore) 299 { 300 super(cache, rootFqn, value); 301 this.sync = new HangThreadSynchronization(hangBefore); 302 } 303 304 void executeTask() throws Exception 305 { 306 TransactionManager tm = cache.getTransactionManager(); 307 tm.begin(); 308 tx = tm.getTransaction(); 309 tx.registerSynchronization(sync); 310 311 cache.put(fqn, "KEY", value); 312 313 tx.commit(); 315 } 316 317 boolean isAsleep() 318 { 319 return sync.asleep; 320 } 321 322 boolean isDone() 323 { 324 return sync.done; 325 } 326 } 327 328 334 public void testActiveTransaction() throws Exception 335 { 336 String [] values = {"A", "B", "C"}; 337 transactionTest(values, false, "REPEATABLE_READ"); 338 } 339 340 346 public void testRollbackOnlyTransaction() throws Exception 347 { 348 String [] values = {"A", "B", "C"}; 349 transactionTest(values, true, "REPEATABLE_READ"); 350 } 351 352 362 private void transactionTest(String [] values, 363 boolean rollback, 364 String isolationLevel) throws Exception 365 { 366 CacheSPI sender = initializeSender(isolationLevel, false, false); 368 369 TxRunner[] runners = 371 initializeTransactionRunners(values, sender, "/LOCK", rollback); 372 373 CacheSPI receiver = startReceiver(isolationLevel, false, false); 375 376 checkResults(receiver, runners, false); 378 } 379 380 391 private CacheSPI initializeSender(String isolationLevel, 392 boolean replSync, 393 boolean useMarshalling) throws Exception 394 { 395 CacheSPI sender = createCache("sender", isolationLevel, replSync, useMarshalling, true); 396 397 if (useMarshalling) 398 sender.getRegion(Fqn.ROOT, true).activate(); 399 400 sender.put(Fqn.fromString("/OK"), "KEY", "X"); 401 402 return sender; 403 } 404 405 418 private TxRunner[] initializeTransactionRunners(String [] values, 419 CacheSPI sender, 420 String rootFqn, 421 boolean rollback) 422 { 423 TxRunner[] runners = new TxRunner[values.length]; 424 for (int i = 0; i < values.length; i++) 425 { 426 runners[i] = new TxRunner(sender, rootFqn, values[i], rollback); 427 initializeRunner(runners[i]); 428 } 429 430 return runners; 431 } 432 433 439 private void initializeRunner(TaskRunner runner) 440 { 441 runner.start(); 442 443 long start = System.currentTimeMillis(); 445 while (!(runner.isAsleep())) 446 { 447 assertTrue(runner.getClass().getName() + " " + runner.value + 448 " is alive", runner.isAlive()); 449 assertFalse(runner.getClass().getName() + " " + runner.value + 451 " has not timed out", 452 (System.currentTimeMillis() - start) > 1000); 453 } 454 } 455 456 466 private void checkResults(CacheSPI receiver, 467 TaskRunner[] runners, 468 boolean allowValues) throws CacheException 469 { 470 boolean[] aliveStates = new boolean[runners.length]; 472 for (int i = 0; i < runners.length; i++) 473 { 474 aliveStates[i] = runners[i].isAlive(); 475 if (aliveStates[i]) 476 runners[i].interrupt(); 477 } 478 479 assertEquals("OK value correct", "X", receiver.get(Fqn.fromString("/OK"), "KEY")); 481 482 for (int i = 0; i < runners.length; i++) 483 { 484 assertTrue("Runner " + runners[i].value + " was alive", aliveStates[i]); 485 assertNull("Runner " + runners[i].value + " ran cleanly", runners[i].failure); 486 if (allowValues) 487 { 488 assertEquals("Correct value in " + runners[i].fqn, 489 runners[i].value, receiver.get(runners[i].fqn, "KEY")); 490 } 491 else 492 { 493 assertNull("No value in " + runners[i].fqn, 494 receiver.get(runners[i].fqn, "KEY")); 495 } 496 } 497 } 498 499 505 public void testHungThread() throws Exception 506 { 507 CacheSPI sender = initializeSender("REPEATABLE_READ", false, false); 509 510 String [] values = {"A", "B", "C"}; 512 HangThreadRunner[] runners = initializeHangThreadRunners(values, sender, "/LOCK"); 513 514 CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false); 516 517 checkResults(receiver, runners, true); 519 } 520 521 533 private HangThreadRunner[] initializeHangThreadRunners(String [] values, 534 CacheSPI sender, 535 String rootFqn) 536 { 537 HangThreadRunner[] runners = new HangThreadRunner[values.length]; 538 for (int i = 0; i < values.length; i++) 539 { 540 runners[i] = new HangThreadRunner(sender, rootFqn, values[i]); 541 initializeRunner(runners[i]); 542 } 543 544 return runners; 545 } 546 547 554 public void testBeforeCompletionLock() throws Exception 555 { 556 synchronizationTest(true); 557 } 558 559 566 public void testAfterCompletionLock() throws Exception 567 { 568 synchronizationTest(false); 569 } 570 571 582 private void synchronizationTest(boolean hangBefore) throws Exception 583 { 584 CacheSPI sender = initializeSender("REPEATABLE_READ", false, false); 585 586 String [] values = {"A", "B", "C"}; 587 SynchronizationTxRunner[] runners = 588 initializeSynchronizationTxRunners(values, sender, "/LOCK", hangBefore); 589 590 CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false); 591 592 checkResults(receiver, runners, !hangBefore); 593 } 594 595 596 611 private SynchronizationTxRunner[] initializeSynchronizationTxRunners(String [] values, 612 CacheSPI sender, 613 String rootFqn, 614 boolean hangBefore) 615 { 616 SynchronizationTxRunner[] runners = 617 new SynchronizationTxRunner[values.length]; 618 for (int i = 0; i < values.length; i++) 619 { 620 runners[i] = new SynchronizationTxRunner(sender, rootFqn, values[i], hangBefore); 621 initializeRunner(runners[i]); 622 } 623 return runners; 624 } 625 626 634 public void testMultipleProblems() throws Exception 635 { 636 multipleProblemTest("REPEATABLE_READ", "/LOCK", false, false); 637 } 638 639 646 public void testSerializableIsolation() throws Exception 647 { 648 multipleProblemTest("SERIALIZABLE", "/", false, false); 649 } 650 651 659 public void testPartialStateTransfer() throws Exception 660 { 661 multipleProblemTest("REPEATABLE_READ", "/LOCK", false, true); 662 } 663 664 672 public void testReplSync() throws Exception 673 { 674 multipleProblemTest("REPEATABLE_READ", "/LOCK", true, false); 675 } 676 677 683 private void multipleProblemTest(String isolationLevel, 684 String rootFqn, 685 boolean replSync, 686 boolean useMarshalling) throws Exception 687 { 688 CacheSPI sender = initializeSender(isolationLevel, replSync, useMarshalling); 689 690 String [] val1 = {"A", "B", "C"}; 696 SynchronizationTxRunner[] after = 697 initializeSynchronizationTxRunners(val1, sender, rootFqn, false); 698 699 String [] val2 = {"D", "E", "F"}; 700 SynchronizationTxRunner[] before = 701 initializeSynchronizationTxRunners(val2, sender, rootFqn, true); 702 703 String [] val3 = {"G", "H", "I"}; 704 TxRunner[] active = 705 initializeTransactionRunners(val3, sender, rootFqn, false); 706 707 String [] val4 = {"J", "K", "L"}; 708 TxRunner[] rollback = 709 initializeTransactionRunners(val4, sender, rootFqn, true); 710 711 String [] val5 = {"M", "N", "O"}; 712 HangThreadRunner[] threads = 713 initializeHangThreadRunners(val5, sender, rootFqn); 714 715 CacheSPI receiver = startReceiver(isolationLevel, replSync, useMarshalling); 716 717 checkResults(receiver, active, false); 718 checkResults(receiver, rollback, false); 719 checkResults(receiver, before, false); 720 checkResults(receiver, after, true); 721 checkResults(receiver, threads, true); 722 } 723 724 protected String getReplicationVersion() 725 { 726 return Version.version; 727 } 728 729 739 private CacheSPI startReceiver(String isolationLevel, 740 boolean replSync, 741 boolean useMarshalling) throws Exception 742 { 743 CacheSPI receiver = createCache("receiver", isolationLevel, replSync, useMarshalling, false); 744 745 CacheStarter starter = new CacheStarter(receiver, useMarshalling); 748 749 starter.start(); 750 751 starter.join(20000); 752 753 boolean alive = starter.isAlive(); 754 if (alive) 755 starter.interrupt(); 756 assertFalse("Starter finished", alive); 757 758 assertNull("No exceptions in starter", starter.failure); 759 760 return receiver; 761 } 762 763 767 private CacheSPI createCache(String cacheID, 768 String isolationLevel, 769 boolean replSync, 770 boolean useMarshalling, 771 boolean startCache) 772 throws Exception 773 { 774 CacheSPI result = super.createCache(cacheID, replSync, 775 useMarshalling, false, false, false); 776 result.getConfiguration().setInitialStateRetrievalTimeout(0); 777 result.getConfiguration().setLockAcquisitionTimeout(1000); 778 result.getConfiguration().setIsolationLevel(isolationLevel); 779 780 if (startCache) 781 result.start(); 782 783 return result; 784 } 785 786 787 } 788 | Popular Tags |