1 7 8 package org.jboss.cache.statetransfer; 9 10 import org.jboss.cache.Cache; 11 import org.jboss.cache.CacheException; 12 import org.jboss.cache.CacheSPI; 13 import org.jboss.cache.Fqn; 14 import org.jboss.cache.Node; 15 import org.jboss.cache.Region; 16 import org.jboss.cache.config.Configuration; 17 import org.jboss.cache.config.Configuration.CacheMode; 18 import org.jboss.cache.factories.DefaultCacheFactory; 19 import org.jboss.cache.factories.UnitTestCacheFactory; 20 import org.jboss.cache.factories.XmlConfigurationParser; 21 import org.jboss.cache.loader.CacheLoader; 22 import org.jboss.cache.marshall.InactiveRegionException; 23 import org.jboss.cache.misc.TestingUtil; 24 25 import java.lang.reflect.Method ; 26 import java.util.Random ; 27 import java.util.Set ; 28 import java.util.concurrent.Semaphore ; 29 import java.util.concurrent.TimeUnit ; 30 31 40 public abstract class VersionedTestBase extends StateTransferTestBase 41 { 42 private static final int SUBTREE_SIZE = 10; 43 44 public static final Fqn A = Fqn.fromString("/a"); 45 public static final Fqn B = Fqn.fromString("/b"); 46 public static final Fqn C = Fqn.fromString("/c"); 47 48 protected static final String ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address"; 49 protected static final String PERSON_CLASSNAME = "org.jboss.cache.marshall.data.Person"; 50 51 public static final Fqn A_B = Fqn.fromString("/a/b"); 52 public static final Fqn A_C = Fqn.fromString("/a/c"); 53 public static final Fqn A_D = Fqn.fromString("/a/d"); 54 55 public void testInitialStateTransfer() throws Exception 56 { 57 CacheSPI cache1 = createCache("cache1", false, false, false); 58 59 cache1.put(A_B, "name", JOE); 60 cache1.put(A_B, "age", TWENTY); 61 cache1.put(A_C, "name", BOB); 62 cache1.put(A_C, "age", FORTY); 63 64 CacheSPI cache2 = createCache("cache2", false, false, false); 65 66 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000); 68 69 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 70 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 71 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 72 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 73 } 74 75 public void testInitialStateTferWithLoader() throws Exception 76 { 77 initialStateTferWithLoaderTest(false); 78 } 79 80 public void testInitialStateTferWithAsyncLoader() throws Exception 81 { 82 initialStateTferWithLoaderTest(true); 83 } 84 85 protected void initialStateTferWithLoaderTest(boolean asyncLoader) throws Exception 86 { 87 initialStateTferWithLoaderTest("org.jboss.cache.loader.FileCacheLoader", 88 "org.jboss.cache.loader.FileCacheLoader", asyncLoader); 89 } 90 91 public void testPartialStateTransfer() throws Exception 92 { 93 CacheSPI cache1 = createCache("cache1", false, true, false); 94 95 cache1.getRegion(A, true).activate(); 96 97 cache1.put(A_B, "name", JOE); 98 cache1.put(A_B, "age", TWENTY); 99 cache1.put(A_C, "name", BOB); 100 cache1.put(A_C, "age", FORTY); 101 102 CacheSPI cache2 = createCache("cache2", false, true, false); 103 104 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000); 106 107 assertNull("/a/b name transferred against policy", cache2.get(A_B, "name")); 108 assertNull("/a/b age transferred against policy", cache2.get(A_B, "age")); 109 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name")); 110 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age")); 111 112 cache2.getRegion(A_B, true).activate(); 113 System.out.println("Region A_B on cache2: " + cache2.getRegion(A_B, false)); 114 115 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 116 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 117 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name")); 118 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age")); 119 120 cache1.put(A_D, "name", JANE); 121 122 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name")); 123 124 cache2.getRegion(A_C, true).activate(); 125 126 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 127 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 128 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 129 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 130 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name")); 131 132 cache2.getRegion(A_D, true).activate(); 133 134 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 135 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 136 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 137 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 138 assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name")); 139 140 cache1.getRegion(A, true).deactivate(); 141 142 cache1.getRegion(A_B, true).activate(); 143 cache1.getRegion(A_C, true).activate(); 144 cache1.getRegion(A_D, true).activate(); 145 146 assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name")); 147 assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age")); 148 assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name")); 149 assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age")); 150 assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name")); 151 152 } 153 154 public void testPartialStateTferWithLoader() throws Exception 155 { 156 CacheSPI cache1 = createCache("cache1", false, true, true); 157 158 cache1.getRegion(A, true).activate(); 159 160 cache1.put(A_B, "name", JOE); 161 cache1.put(A_B, "age", TWENTY); 162 cache1.put(A_C, "name", BOB); 163 cache1.put(A_C, "age", FORTY); 164 165 CacheSPI cache2 = createCache("cache2", false, true, true); 166 167 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000); 169 170 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader(); 171 172 assertNull("/a/b transferred to loader against policy", loader.get(A_B)); 173 174 assertNull("/a/b name transferred against policy", cache2.get(A_B, "name")); 175 assertNull("/a/b age transferred against policy", cache2.get(A_B, "age")); 176 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name")); 177 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age")); 178 179 cache2.getRegion(A_B, true).activate(); 180 181 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name")); 182 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age")); 183 assertNull("/a/c transferred to loader against policy", loader.get(A_C)); 184 185 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 186 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 187 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name")); 188 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age")); 189 190 cache1.put(A_D, "name", JANE); 191 192 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name")); 193 194 cache2.getRegion(A_C, true).activate(); 195 196 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name")); 197 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age")); 198 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name")); 199 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age")); 200 201 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 202 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 203 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 204 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 205 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name")); 206 207 cache2.getRegion(A_D, true).activate(); 208 209 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name")); 210 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age")); 211 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name")); 212 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age")); 213 assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name")); 214 215 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 216 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 217 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 218 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 219 assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name")); 220 221 cache1.getRegion(A, true).deactivate(); 222 223 cache1.getRegion(A_B, true).activate(); 224 cache1.getRegion(A_C, true).activate(); 225 cache1.getRegion(A_D, true).activate(); 226 227 loader = cache1.getCacheLoaderManager().getCacheLoader(); 228 229 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name")); 230 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age")); 231 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name")); 232 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age")); 233 assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name")); 234 235 assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name")); 236 assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age")); 237 assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name")); 238 assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age")); 239 assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name")); 240 } 241 242 public void testPartialStateTferWithClassLoader() throws Exception 243 { 244 250 Thread.currentThread().setContextClassLoader(getNotFoundClassLoader()); 252 253 CacheSPI cache1 = createCache("cache1", 254 false, true, true, false, false); ClassLoader cl1 = getClassLoader(); 259 cache1.getRegion(A, true).registerContextClassLoader(cl1); 260 startCache(cache1); 261 262 cache1.getRegion(A, true).activate(); 263 264 Object ben = createBen(cl1); 265 266 cache1.put(A_B, "person", ben); 267 268 CacheSPI cache2 = createCache("cache2", 270 false, true, true, false, true); 275 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000); 277 278 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader(); 279 280 assertNull("/a/b not transferred to loader", loader.get(A_B)); 281 282 assertNull("/a/b not transferred to cache", cache2.get(A_B, "person")); 283 284 ClassLoader cl2 = getClassLoader(); 285 286 Region r = cache2.getRegion(A, true); 288 r.registerContextClassLoader(cl2); 289 290 r.activate(); 291 292 assertEquals("Correct state from loader for /a/b", ben.toString(), loader.get(A_B).get("person").toString()); 293 294 assertEquals("Correct state from cache for /a/b", ben.toString(), cache2.get(A_B, "person").toString()); 295 296 } 297 298 public void testLoadEntireStateAfterStart() throws Exception 299 { 300 CacheSPI cache1 = createCache("cache1", false, true, true); 301 302 cache1.getRegion(Fqn.ROOT, true).activate(); 303 304 cache1.put(A_B, "name", JOE); 305 cache1.put(A_B, "age", TWENTY); 306 cache1.put(A_C, "name", BOB); 307 cache1.put(A_C, "age", FORTY); 308 309 CacheSPI cache2 = createCache("cache2", false, true, true); 310 311 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000); 313 314 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader(); 315 316 assertNull("/a/b transferred to loader against policy", loader.get(A_B)); 317 318 assertNull("/a/b name transferred against policy", cache2.get(A_B, "name")); 319 assertNull("/a/b age transferred against policy", cache2.get(A_B, "age")); 320 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name")); 321 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age")); 322 323 cache2.getRegion(Fqn.ROOT, true).activate(); 324 325 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name")); 326 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age")); 327 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name")); 328 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age")); 329 330 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 331 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 332 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 333 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 334 } 335 336 343 public void testConcurrentActivationSync() throws Exception 344 { 345 concurrentActivationTest(true); 346 } 347 348 355 public void testConcurrentActivationAsync() throws Exception 356 { 357 concurrentActivationTest(false); 358 } 359 360 373 private void concurrentActivationTest(boolean sync) throws Exception 374 { 375 String [] names = {"A", "B", "C", "D", "E"}; 376 int count = names.length; 377 CacheActivator[] activators = new CacheActivator[count]; 378 379 380 try 381 { 382 Semaphore semaphore = new Semaphore (count); 384 for (int i = 0; i < count; i++) 385 { 386 semaphore.acquire(); 387 } 388 389 CacheSPI[] caches = new CacheSPI[count]; 391 for (int i = 0; i < count; i++) 392 { 393 activators[i] = new CacheActivator(semaphore, names[i], sync); 394 caches[i] = activators[i].getCacheSPI(); 395 activators[i].start(); 396 } 397 398 TestingUtil.blockUntilViewsReceived(caches, 60000); 400 401 semaphore.release(count); 403 404 TestingUtil.sleepThread((long) 1000); 406 407 for (int i = 0; i < count; i++) 410 { 411 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS); 412 if (!acquired) 413 fail("failed to acquire semaphore " + i); 414 } 415 416 if (!sync) 418 TestingUtil.sleepThread((long) 500); 419 420 for (int i = 0; i < count; i++) 422 { 423 Exception aException = activators[i].getException(); 424 boolean gotUnexpectedException = aException != null 425 && !(aException instanceof InactiveRegionException || 426 aException.getCause() instanceof InactiveRegionException); 427 if (gotUnexpectedException) 428 { 429 fail("Activator " + names[i] + " caught an exception " + aException); 430 } 431 432 for (int j = 0; j < count; j++) 433 { 434 Fqn fqn = new Fqn(A_B, names[j]); 435 assertEquals("Incorrect value for " + fqn + " on activator " + names[i], 436 "VALUE", activators[i].getCacheValue(fqn)); 437 } 439 440 } 441 } 442 catch (Exception ex) 443 { 444 fail(ex.getLocalizedMessage()); 445 } 446 finally 447 { 448 for (int i = 0; i < count; i++) 449 activators[i].cleanup(); 450 } 451 452 } 453 454 472 private void concurrentActivationTest2(boolean sync) throws Exception 473 { 474 String [] names = {"A", "B"}; 475 int count = names.length; 476 int regionsToActivate = 15; 477 int sleepTimeBetweenNodeStarts = 10000; 478 StaggeredWebDeployerActivator[] activators = new StaggeredWebDeployerActivator[count]; 479 try 480 { 481 Semaphore semaphore = new Semaphore (count); 483 for (int i = 0; i < count; i++) 484 { 485 semaphore.acquire(); 486 } 487 488 CacheSPI[] caches = new CacheSPI[count]; 490 for (int i = 0; i < count; i++) 491 { 492 activators[i] = new StaggeredWebDeployerActivator(semaphore, names[i], sync, regionsToActivate); 493 caches[i] = activators[i].getCacheSPI(); 494 495 semaphore.release(1); 497 498 activators[i].start(); 499 TestingUtil.sleepThread(sleepTimeBetweenNodeStarts); 500 } 501 502 TestingUtil.blockUntilViewsReceived(caches, 60000); 504 505 TestingUtil.sleepThread(1000); 507 508 for (int i = 0; i < count; i++) 511 { 512 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS); 513 if (!acquired) 514 fail("failed to acquire semaphore " + i); 515 } 516 517 if (!sync) 519 TestingUtil.sleepThread(1000); 520 521 for (int i = 0; i < count; i++) 523 { 524 Exception aException = activators[i].getException(); 525 boolean gotUnexpectedException = aException != null 526 && !(aException instanceof InactiveRegionException || 527 aException.getCause() instanceof InactiveRegionException); 528 if (gotUnexpectedException) 529 { 530 fail("Activator " + names[i] + " caught an exception " + aException); 531 } 532 533 for (int j = 0; j < regionsToActivate; j++) 534 { 535 Fqn fqn = Fqn.fromString("/a/" + i + "/" + names[i]); 536 assertEquals("Incorrect value for " + fqn + " on activator " + names[i], 537 "VALUE", activators[i].getCacheValue(fqn)); 538 } 539 } 540 } 541 catch (Exception ex) 542 { 543 fail(ex.getLocalizedMessage()); 544 } 545 finally 546 { 547 for (int i = 0; i < count; i++) 548 activators[i].cleanup(); 549 } 550 551 } 552 553 568 public void testConcurrentStartupActivationAsync() throws Exception 569 { 570 concurrentActivationTest2(false); 571 } 572 573 588 public void testConcurrentStartupActivationSync() throws Exception 589 { 590 concurrentActivationTest2(true); 591 } 592 593 599 public void testConcurrentUseSync() throws Exception 600 { 601 concurrentUseTest(true); 602 } 603 604 610 public void testConcurrentUseAsync() throws Exception 611 { 612 concurrentUseTest(false); 613 } 614 615 625 private void concurrentUseTest(boolean sync) throws Exception 626 { 627 String [] names = {"B", "C", "D", "E"}; 628 int count = names.length; 629 CacheStressor[] stressors = new CacheStressor[count]; 630 631 try 632 { 633 634 CacheSPI cacheA = createCache("cacheA", sync, true, false); 636 637 CacheSPI[] caches = new CacheSPI[count + 1]; 638 caches[0] = cacheA; 639 640 Semaphore semaphore = new Semaphore (count); 642 for (int i = 0; i < count; i++) 643 { 644 semaphore.acquire(); 645 } 646 647 649 for (int i = 0; i < count; i++) 650 { 651 stressors[i] = new CacheStressor(semaphore, names[i], sync); 652 caches[i + 1] = stressors[i].getCacheSPI(); 653 stressors[i].start(); 654 } 655 656 TestingUtil.blockUntilViewsReceived(caches, 60000); 658 659 for (int x = 0; x < 1; x++) 662 { 663 if (x > 0) 664 { 665 for (int i = 0; i < count; i++) 668 { 669 cacheA.getRegion(Fqn.fromString("/" + names[i]), true).deactivate(); 670 System.out.println("Run " + x + "-- /" + names[i] + " deactivated on A"); 671 stressors[i].startPuts(); 672 } 673 } 674 675 semaphore.release(count); 677 678 TestingUtil.sleepThread((long) 300); 681 682 for (int i = 0; i < count; i++) 684 { 685 cacheA.getRegion(Fqn.fromString("/" + names[i]), true).activate(); 687 stressors[i].stopPuts(); 691 System.out.println("Run " + x + "-- /" + names[i] + " activated on A"); 692 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS); 694 if (!acquired) 695 fail("failed to acquire semaphore " + i); 696 697 TestingUtil.sleepThread((long) 100); 699 } 700 701 TestingUtil.sleepThread((long) 1000); 704 705 for (int i = 0; i < count; i++) 707 { 708 if (stressors[i].getException() != null && !(stressors[i].getException() instanceof InactiveRegionException)) 709 { 710 fail("Stressor " + names[i] + " caught an exception " + stressors[i].getException()); 711 } 712 713 } 714 715 for (int i = 0; i < count; i++) 717 { 718 for (int j = 0; j < SUBTREE_SIZE; j++) 719 { 720 Fqn fqn = Fqn.fromString("/" + names[i] + "/" + j); 721 assertEquals("/A/" + j + " matches " + fqn, 722 cacheA.get(fqn, "KEY"), 723 stressors[i].getCacheSPI().get(fqn, "KEY")); 724 } 725 } 726 } 727 728 for (int i = 0; i < count; i++) 729 stressors[i].stopThread(); 730 731 } 732 finally 733 { 734 for (int i = 0; i < count; i++) 735 { 736 if (stressors[i] != null) 737 stressors[i].cleanup(); 738 } 739 } 740 741 } 742 743 748 public void testEvictionSeesStateTransfer() throws Exception 749 { 750 751 Configuration c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true); 752 Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false); 753 cache1.start(); 754 caches.put("evict1", cache1); 755 756 cache1.put(Fqn.fromString("/a/b/c"), "key", "value"); 757 758 c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true); 759 Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false); 760 cache2.start(); 761 caches.put("evict2", cache2); 762 763 Region region = cache2.getRegion(Fqn.ROOT, false); 764 assertEquals("Saw the expected number of node events", 3, region.nodeEventQueueSize()); 766 } 767 768 773 public void testEvictionAfterStateTransfer() throws Exception 774 { 775 Configuration c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true); 776 Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false); 777 cache1.start(); 778 caches.put("evict1", cache1); 779 780 for (int i = 0; i < 25000; i++) 781 { 782 cache1.put(Fqn.fromString("/base/" + i), "key", "base" + i); 783 if (i < 5) 784 cache1.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i); 785 } 786 787 c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true); 788 final Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false); 789 cache2.start(); 790 caches.put("evict2", cache2); 791 792 Node parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data")); 793 Set children = parent.getChildren(); 794 assertEquals("All data children transferred", 5, children.size()); 795 parent = cache2.getRoot().getChild(Fqn.fromString("/base")); 796 children = parent.getChildren(); 797 assertTrue("Minimum number of base children transferred", children.size() >= 5000); 798 799 TestingUtil.sleepThread(2500); 802 803 class Putter extends Thread 804 { 805 Cache cache = null; 806 boolean stopped = false; 807 Exception ex = null; 808 public void run() 809 { 810 int i = 25000; 811 while (!stopped) 812 { 813 try 814 { 815 cache.put(Fqn.fromString("/base/" + i), "key", "base" + i); 816 cache.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data"+i); 817 i++; 818 } 819 catch (Exception e) 820 { 821 ex = e; 822 } 823 } 824 } 825 } 826 Putter p1= new Putter(); 827 p1.cache = cache1; 828 p1.start(); 829 Putter p2= new Putter(); 830 p2.cache = cache2; 831 p2.start(); 832 833 Random rnd = new Random (); 834 TestingUtil.sleepThread(rnd.nextInt(200)); 835 836 int maxCountBase = 0; 837 int maxCountData = 0; 838 boolean sawBaseDecrease = false; 839 boolean sawDataDecrease = false; 840 long start = System.currentTimeMillis(); 841 while ((System.currentTimeMillis() - start) < 10000) 842 { 843 parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data")); 844 children = parent.getChildren(); 845 if (children != null) 846 { 847 int dataCount = children.size(); 848 if (dataCount < maxCountData) 849 { 850 System.out.println("data " + dataCount + " < " + maxCountData + " elapsed = " + (System.currentTimeMillis() - start)); 851 sawDataDecrease = true; 852 } 853 else 854 { 855 maxCountData = dataCount; 856 } 857 } 858 859 parent = cache2.getRoot().getChild(Fqn.fromString("/base")); 860 children = parent.getChildren(); 861 if (children != null) 862 { 863 int baseCount = children.size(); 864 if (baseCount < maxCountBase) 865 { 866 System.out.println("base " + baseCount + " < " + maxCountBase+ " elapsed = " + (System.currentTimeMillis() - start)); 867 sawBaseDecrease = true; 868 } 869 else 870 { 871 maxCountBase = baseCount; 872 } 873 } 874 875 if (sawDataDecrease && sawBaseDecrease) 876 { 877 break; 878 } 879 880 TestingUtil.sleepThread(50); 881 } 882 883 p1.stopped = true; 884 p2.stopped = true; 885 p1.join(1000); 886 p2.join(1000); 887 888 assertTrue("Saw data decrease", sawDataDecrease); 889 assertTrue("Saw base decrease", sawBaseDecrease); 890 assertNull("No exceptions in p1", p1.ex); 891 assertNull("No exceptions in p2", p2.ex); 892 893 TestingUtil.sleepThread(5100); 895 896 parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data")); 897 children = parent.getChildren(); 898 if (children != null) 899 { 900 System.out.println(children.size()); 901 assertTrue("Excess children evicted", children.size() <= 5); 902 } 903 parent = cache2.getRoot().getChild(Fqn.fromString("/base")); 904 children = parent.getChildren(); 905 if (children != null) 906 { 907 System.out.println(children.size()); 908 assertTrue("Excess children evicted", children.size() <= 25000); 909 } 910 911 TestingUtil.sleepThread(8100); 914 915 parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data")); 916 children = parent.getChildren(); 917 if (children != null) 918 assertEquals("All data children evicted", 0, children.size()); 919 } 920 921 private Object createBen(ClassLoader loader) throws Exception 922 { 923 Class addrClazz = loader.loadClass(ADDRESS_CLASSNAME); 924 Method setCity = addrClazz.getMethod("setCity", new Class []{String .class}); 925 Method setStreet = addrClazz.getMethod("setStreet", new Class []{String .class}); 926 Method setZip = addrClazz.getMethod("setZip", new Class []{int.class}); 927 Object addr = addrClazz.newInstance(); 928 setCity.invoke(addr, new Object []{"San Jose"}); 929 setStreet.invoke(addr, new Object []{"1007 Home"}); 930 setZip.invoke(addr, new Object []{90210}); 931 932 Class benClazz = loader.loadClass(PERSON_CLASSNAME); 933 Method setName = benClazz.getMethod("setName", new Class []{String .class}); 934 Method setAddress = benClazz.getMethod("setAddress", new Class []{addrClazz}); 935 Object ben = benClazz.newInstance(); 936 setName.invoke(ben, new Object []{"Ben"}); 937 setAddress.invoke(ben, new Object []{addr}); 938 939 return ben; 940 } 941 942 private class CacheActivator extends CacheUser 943 { 944 945 CacheActivator(Semaphore semaphore, 946 String name, 947 boolean sync) 948 throws Exception 949 { 950 super(semaphore, name, sync, false); 951 } 952 953 void useCache() throws Exception 954 { 955 TestingUtil.sleepRandom(5000); 956 cache.getRegion(A_B, true).activate(); 957 Fqn childFqn = Fqn.fromString("/a/b/" + name); 959 960 cache.put(childFqn, "KEY", "VALUE"); 961 963 } 964 965 public Object getCacheValue(Fqn fqn) throws CacheException 966 { 967 return cache.get(fqn, "KEY"); 968 } 969 } 970 971 private class StaggeredWebDeployerActivator extends CacheUser 972 { 973 974 int regionCount = 15; 975 976 StaggeredWebDeployerActivator(Semaphore semaphore, 977 String name, 978 boolean sync, 979 int regionCount) 980 throws Exception 981 { 982 super(semaphore, name, sync, false); 983 this.regionCount = regionCount; 984 } 985 986 void useCache() throws Exception 987 { 988 for (int i = 0; i < regionCount; i++) 989 { 990 cache.getRegion(Fqn.fromString("/a/" + i), true).activate(); 991 992 Fqn childFqn = Fqn.fromString("/a/" + i + "/" + name); 993 cache.put(childFqn, "KEY", "VALUE"); 994 995 TestingUtil.sleepThread(1000); 996 } 997 } 998 999 public Object getCacheValue(Fqn fqn) throws CacheException 1000 { 1001 return cache.get(fqn, "KEY"); 1002 } 1003 } 1004 1005 private class CacheStressor extends CacheUser 1006 { 1007 private Random random = new Random (System.currentTimeMillis()); 1008 private boolean putsStopped = false; 1009 private boolean stopped = false; 1010 1011 CacheStressor(Semaphore semaphore, 1012 String name, 1013 boolean sync) 1014 throws Exception 1015 { 1016 super(semaphore, name, sync, true); 1017 } 1018 1019 void useCache() throws Exception 1020 { 1021 int factor = 0; 1025 int i = 0; 1026 Fqn fqn = null; 1027 1028 boolean acquired = false; 1029 while (!stopped) 1030 { 1031 if (i > 0) 1032 { 1033 acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS); 1034 if (!acquired) 1035 throw new Exception (name + " cannot acquire semaphore"); 1036 } 1037 1038 while (!putsStopped) 1039 { 1040 factor = random.nextInt(50); 1041 1042 fqn = Fqn.fromString("/" + name + "/" + String.valueOf(factor % SUBTREE_SIZE)); 1043 Integer value = factor / SUBTREE_SIZE; 1044 cache.put(fqn, "KEY", value); 1045 1046 TestingUtil.sleepThread((long) factor); 1047 1048 i++; 1049 } 1050 1051 System.out.println(name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE)); 1052 1053 semaphore.release(); 1054 1055 while (!stopped && putsStopped) 1057 TestingUtil.sleepThread((long) 100); 1058 } 1059 } 1060 1061 public void stopPuts() 1062 { 1063 putsStopped = true; 1064 } 1065 1066 public void startPuts() 1067 { 1068 putsStopped = false; 1069 } 1070 1071 public void stopThread() 1072 { 1073 stopped = true; 1074 if (thread.isAlive()) 1075 thread.interrupt(); 1076 } 1077 1078 1079 } 1080} | Popular Tags |