1 7 8 package org.jboss.cache.pojo.statetransfer; 9 10 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 11 import junit.framework.TestCase; 12 import org.apache.commons.logging.Log; 13 import org.apache.commons.logging.LogFactory; 14 import org.jboss.cache.Cache; 15 import org.jboss.cache.CacheException; 16 import org.jboss.cache.CacheSPI; 17 import org.jboss.cache.Fqn; 18 import org.jboss.cache.config.CacheLoaderConfig; 19 import org.jboss.cache.config.Configuration; 20 import org.jboss.cache.factories.XmlConfigurationParser; 21 import org.jboss.cache.loader.CacheLoader; 22 import org.jboss.cache.misc.TestingUtil; 23 import org.jboss.cache.pojo.PojoCache; 24 import org.jboss.cache.pojo.PojoCacheFactory; 25 import org.jboss.cache.pojo.test.Address; 26 import org.jboss.cache.pojo.test.Person; 27 import org.jboss.cache.xml.XmlHelper; 28 import org.w3c.dom.Element ; 29 30 import javax.transaction.TransactionManager ; 31 import java.io.File ; 32 import java.util.HashMap ; 33 import java.util.HashSet ; 34 import java.util.Map ; 35 import java.util.Random ; 36 import java.util.Set ; 37 38 44 public abstract class StateTransferAopTestBase extends TestCase 45 { 46 private Map caches; 47 48 public static final String A_B_1 = "/a/b/1"; 49 public static final String A_B_2 = "/a/b/2"; 50 public static final String A_C_1 = "/a/c/1"; 51 public static final String A_C_2 = "/a/c/2"; 52 53 public static final Fqn A_B_1_f = Fqn.fromString("/a/b/1"); 54 public static final Fqn A_B_2_f = Fqn.fromString("/a/b/2"); 55 public static final Fqn A_C_1_f = Fqn.fromString("/a/c/1"); 56 public static final Fqn A_C_2_f = Fqn.fromString("/a/c/2"); 57 58 private static final int SUBTREE_SIZE = 10; 59 60 private Person joe; 61 private Person bob; 62 private Person jane; 63 private Person jill; 64 private Address addr1; 65 private Address addr2; 66 67 public static final Integer TWENTY = 20; 68 public static final Integer TWENTYFIVE = 25; 69 public static final Integer FORTY = 40; 70 71 private Log log = LogFactory.getLog(StateTransferAopTestBase.class); 72 73 public void testInitialStateTransfer() throws Exception 74 { 75 log.info("Enter testInitialStateTransfer"); 76 77 PojoCache cache1 = createCache("cache1", true, false, false); 78 79 cache1.attach(A_B_1, joe); 80 cache1.attach(A_B_2, jane); 81 cache1.attach(A_C_1, bob); 82 cache1.attach(A_C_2, jill); 83 84 PojoCache cache2 = createCache("cache2", true, false, false); 85 86 90 Person ab1 = (Person) cache2.find(A_B_1); 91 Person ab2 = (Person) cache2.find(A_B_2); 92 Person ac1 = (Person) cache2.find(A_C_1); 93 Person ac2 = (Person) cache2.find(A_C_2); 94 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName()); 95 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity()); 96 assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName()); 97 assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity()); 98 assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress()); 99 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName()); 100 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity()); 101 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName()); 102 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity()); 103 assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress()); 104 } 105 106 public void testInitialStateTferWithLoader() throws Exception 107 { 108 log.info("Enter testInitialStateTferWithLoader"); 109 110 PojoCache cache1 = createCache("cache1", false, false, true); 111 112 cache1.attach(A_B_1, joe); 113 cache1.attach(A_B_2, jane); 114 cache1.attach(A_C_1, bob); 115 cache1.attach(A_C_2, jill); 116 117 PojoCache cache2 = createCache("cache2", false, false, true); 118 119 TestingUtil.blockUntilViewsReceived(new Cache[] 121 {cache1.getCache(), cache2.getCache()}, 60000); 122 123 Person ab1 = (Person) cache2.find(A_B_1); 124 Person ab2 = (Person) cache2.find(A_B_2); 125 Person ac1 = (Person) cache2.find(A_C_1); 126 Person ac2 = (Person) cache2.find(A_C_2); 127 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName()); 128 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity()); 129 assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName()); 130 assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity()); 131 assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress()); 132 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName()); 133 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity()); 134 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName()); 135 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity()); 136 assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress()); 137 } 138 139 public void testPartialStateTransfer() throws Exception 140 { 141 log.info("Enter testPartialStateTransfer"); 142 143 PojoCache cache1 = createCache("cache1", false, true, false); 144 cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate(); 145 146 cache1.attach(A_B_1, joe); 147 cache1.attach(A_B_2, jane); 148 149 PojoCache cache2 = createCache("cache2", false, true, false); 150 151 TestingUtil.blockUntilViewsReceived(new Cache[] 153 {cache1.getCache(), cache2.getCache()}, 60000); 154 155 assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1)); 156 assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2)); 157 158 cache2.getCache().getRegion(Fqn.fromString("/a"), true).activate(); 160 161 Person ab1 = (Person) cache2.find(A_B_1); 162 Person ab2 = (Person) cache2.find(A_B_2); 163 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName()); 164 assertEquals("City for /a/b/1 is Anytown", joe.getAddress().getCity(), ab1.getAddress().getCity()); 165 assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName()); 166 assertEquals("City for /a/b/2 is Anytown", jane.getAddress().getCity(), ab2.getAddress().getCity()); 167 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress()); 168 169 cache1.attach(A_C_1, bob); 170 cache1.attach(A_C_2, jill); 171 172 assertNotNull("/a/c/1 should be transferred per policy", cache2.find(A_C_1)); 173 174 cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate(); 175 176 cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate(); 177 178 ab1 = (Person) cache1.find(A_B_1); 179 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName()); 180 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity()); 181 ab2 = (Person) cache1.find(A_B_2); 182 assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName()); 183 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity()); 184 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress()); 185 } 186 187 public void testPartialStateTransferWithLoader() throws Exception 188 { 189 log.info("Enter testPartialStateTransferWithLoader"); 190 191 PojoCache cache1 = createCache("cache1", false, true, true); 192 cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate(); 193 194 cache1.attach(A_B_1, joe); 195 cache1.attach(A_B_2, jane); 196 197 PojoCache cache2 = createCache("cache2", false, true, true); 198 199 TestingUtil.blockUntilViewsReceived(new Cache[] 201 {cache1.getCache(), cache2.getCache()}, 60000); 202 203 CacheLoader loader = ((CacheSPI) cache2.getCache()).getCacheLoaderManager().getCacheLoader(); 204 205 Map map = loader.get(A_B_1_f); 206 if (map != null) 207 { 208 assertNull("/a/b/1 name not transferred per policy", map.get("name")); 209 assertNull("/a/b/1 age not transferred per policy", map.get("age")); 210 } 211 map = loader.get(A_B_2_f); 212 if (map != null) 213 { 214 assertNull("/a/b/1 name not transferred per policy", map.get("name")); 215 assertNull("/a/b/1 age not transferred per policy", map.get("age")); 216 } 217 assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1)); 218 assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2)); 219 220 cache2.getCache().getRegion(Fqn.fromString("/a"), true).activate(); 221 222 227 228 Person ab1 = (Person) cache2.find(A_B_1); 229 Person ab2 = (Person) cache2.find(A_B_2); 230 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName()); 231 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity()); 232 assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName()); 233 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity()); 234 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress()); 235 236 cache1.attach(A_C_1, bob); 237 cache1.attach(A_C_2, jill); 238 Thread.sleep(200); 239 240 assertNotNull("/a/c/1 transferred per policy", cache2.find(A_C_1)); 241 assertNotNull("/a/c/1 transferred per policy", cache2.find(A_C_2)); 242 243 Person ac1 = (Person) cache2.find(A_C_1); 244 Person ac2 = (Person) cache2.find(A_C_2); 245 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName()); 246 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity()); 247 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName()); 248 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity()); 249 assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress()); 250 251 cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate(); 252 253 ab1 = (Person) cache1.find(A_B_1); 254 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName()); 255 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity()); 256 ab2 = (Person) cache1.find(A_B_2); 257 assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName()); 258 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity()); 259 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress()); 260 ac1 = (Person) cache1.find(A_C_1); 261 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName()); 262 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity()); 263 ac2 = (Person) cache1.find(A_C_2); 264 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName()); 265 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity()); 266 assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress()); 267 } 268 269 270 277 public void testConcurrentActivationSync() throws Exception 278 { 279 log.info("Enter testConcurrentActivationSync"); 280 281 concurrentActivationTest(true); 282 } 283 284 291 public void testConcurrentActivationAsync() throws Exception 292 { 293 log.info("Enter testConcurrentActivationAsync"); 294 295 concurrentActivationTest(false); 296 } 297 298 311 private void concurrentActivationTest(boolean sync) throws Exception 312 { 313 String [] names = {"A", "B", "C", "D", "E"}; 314 int count = names.length; 315 CacheActivator[] activators = new CacheActivator[count]; 316 317 318 try 319 { 320 Semaphore semaphore = new Semaphore(count); 322 for (int i = 0; i < count; i++) 323 { 324 semaphore.acquire(); 325 } 326 327 Cache[] caches = new Cache[count]; 329 for (int i = 0; i < count; i++) 330 { 331 activators[i] = new CacheActivator(semaphore, names[i], sync); 332 caches[i] = activators[i].getCache(); 333 activators[i].start(); 334 } 335 336 TestingUtil.blockUntilViewsReceived(caches, 60000); 338 339 semaphore.release(count); 341 342 TestingUtil.sleepThread(1000); 344 345 for (int i = 0; i < count; i++) 348 { 349 boolean acquired = semaphore.attempt(60000); 350 if (!acquired) 351 fail("failed to acquire semaphore " + i); 352 } 353 354 if (!sync) 356 TestingUtil.sleepThread(500); 357 358 for (int i = 0; i < count; i++) 360 { 361 assertNull("Activator " + names[i] + " caught an exception", 362 activators[i].getException()); 363 364 for (int j = 0; j < count; j++) 365 { 366 String fqn = "/a/b/" + names[j]; 367 Person p = (Person) activators[i].getCacheValue(fqn); 368 assertNotNull(names[i] + ":" + fqn + " is not null", p); 369 assertEquals("Correct name for " + names[i] + ":" + fqn, 370 "Person " + names[j], p.getName()); 371 assertEquals("Correct street for " + names[i] + ":" + fqn, 372 names[j] + " Test Street", p.getAddress().getStreet()); 373 } 375 376 } 377 } 378 catch (Exception ex) 379 { 380 fail(ex.getLocalizedMessage()); 381 } 382 finally 383 { 384 for (int i = 0; i < count; i++) 385 activators[i].cleanup(); 386 } 387 388 } 389 390 396 public void testConcurrentUseSync() throws Exception 397 { 398 log.info("Enter testConcurrentUseSync"); 399 400 } 402 403 409 public void testConcurrentUseAsync() throws Exception 410 { 411 log.info("Enter testConcurrentUseAsync"); 412 413 } 415 416 426 private void XconcurrentUseTest(boolean sync) throws Exception 427 { 428 String [] names = {"B", "C", "D", "E"}; 429 int count = names.length; 430 CacheStressor[] stressors = new CacheStressor[count]; 431 432 try 433 { 434 435 PojoCache cacheA = createCache("cacheA", sync, true, false, false); 436 437 Cache[] caches = new Cache[count + 1]; 438 caches[0] = cacheA.getCache(); 439 440 Semaphore semaphore = new Semaphore(count); 442 for (int i = 0; i < count; i++) 443 { 444 semaphore.acquire(); 445 } 446 447 449 for (int i = 0; i < count; i++) 450 { 451 stressors[i] = new CacheStressor(semaphore, names[i], sync); 452 caches[i + 1] = stressors[i].getCache(); 453 stressors[i].start(); 454 TestingUtil.sleepThread(100); 456 } 457 458 TestingUtil.blockUntilViewsReceived(caches, 60000); 460 461 for (int x = 0; x < 2; x++) 463 { 464 for (int i = 0; i < count; i++) 469 { 470 cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).deactivate(); 471 log.info("TEST: Run " + x + "-- /" + names[i] + " inactivated on A"); 472 stressors[i].startPuts(); 473 } 474 476 semaphore.release(count); 478 479 TestingUtil.sleepThread(300); 482 483 for (int i = 0; i < count; i++) 485 { 486 log.info("TEST: Activating /" + names[i] + " on A"); 487 cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).activate(); 488 stressors[i].stopPuts(); 492 log.info("TEST: Run " + x + "-- /" + names[i] + " activated on A"); 493 boolean acquired = semaphore.attempt(60000); 495 if (!acquired) 496 fail("failed to acquire semaphore " + names[i]); 497 log.info("TEST: Run " + x + "-- acquired semaphore from " + names[i]); 498 499 TestingUtil.sleepThread(100); 501 } 502 503 if (!sync) 505 TestingUtil.sleepThread(2000); 506 507 for (int i = 0; i < count; i++) 509 { 510 Exception e = stressors[i].getException(); 511 if (e != null) 512 { 513 log.error("Stressor " + names[i] + " caught an exception", 514 e); 515 throw e; 516 } 517 } 518 519 521 Person p1 = null; 523 Person p2 = null; 524 for (int i = 0; i < count; i++) 525 { 526 529 for (int j = 0; j < SUBTREE_SIZE; j++) 530 { 531 532 String fqn = "/" + names[i] + "/" + j; 533 log.info("TEST: Getting A:" + fqn); 534 p1 = (Person) cacheA.find(fqn); 535 boolean p1Null = p1 == null; 536 log.info("TEST: Getting " + names[i] + ":" + fqn); 537 boolean p2Null = p2 == null; 539 assertEquals("Run " + x + ": " + fqn + 540 " null status matches", p1Null, p2Null); 541 if (!p1Null) 542 { 543 assertEquals("Run " + x + ": A:" + fqn + " age matches " + names[i] + ":" + fqn, 544 p1.getAge(), p2.getAge()); 545 assertEquals("Run " + x + ": A:" + fqn + " name matches " + names[i] + ":" + fqn, 546 p1.getName(), p2.getName()); 547 assertEquals("Run " + x + ": A:" + fqn + " address matches " + names[i] + ":" + fqn, 548 p1.getAddress().getStreet(), 549 p2.getAddress().getStreet()); 550 } 551 } 552 } 553 } 554 555 for (int i = 0; i < count; i++) 556 stressors[i].stopThread(); 557 558 } 559 finally 560 { 561 for (int i = 0; i < count; i++) 562 { 563 if (stressors[i] != null) 564 stressors[i].cleanup(); 565 } 566 } 567 568 } 569 570 protected PojoCache createCache(String cacheID, boolean sync, boolean useMarshalling, boolean useCacheLoader) 571 throws Exception 572 { 573 return createCache(cacheID, sync, useMarshalling, useCacheLoader, true); 574 } 575 576 protected PojoCache createCache(String cacheID, boolean sync, 577 boolean useMarshalling, 578 boolean useCacheLoader, 579 boolean inactiveOnStartup) 580 throws Exception 581 { 582 if (caches.get(cacheID) != null) 583 throw new IllegalStateException (cacheID + " already created"); 584 585 XmlConfigurationParser parser = new XmlConfigurationParser(); 586 Configuration c = parser.parseFile(sync ? "META-INF/replSync-service.xml" : "META-INF/replAsync-service.xml"); 587 c.setClusterName("StateTransferTestBase"); 588 c.setReplVersionString(getReplicationVersion()); 589 c.setInitialStateRetrievalTimeout(60000); 591 if (useMarshalling) 592 { 593 c.setUseRegionBasedMarshalling(true); 594 c.setInactiveOnStartup(inactiveOnStartup); 595 } 596 if (useCacheLoader) 597 { 598 configureCacheLoader(c, cacheID); 599 } 600 601 PojoCache cache = PojoCacheFactory.createCache(c, true); 602 caches.put(cacheID, cache); 605 606 return cache; 607 } 608 609 protected void configureCacheLoader(Configuration c, String cacheID) throws Exception 610 { 611 String tmp_location = getTempLocation(cacheID); 612 613 File file = new File (tmp_location); 615 cleanFile(file); 616 file.mkdir(); 617 tmp_location = escapeWindowsPath(tmp_location); 618 c.setCacheLoaderConfig(getCacheLoaderConfig("org.jboss.cache.loader.FileCacheLoader", tmp_location)); 619 } 620 621 622 protected CacheLoaderConfig getCacheLoaderConfig(String cl, String loc) throws Exception 623 { 624 String xml = " <config>\n" + 625 " \n" + 626 " <passivation>false</passivation>\n" + 627 " <preload></preload>\n" + 628 "\n" + 629 " <cacheloader>\n" + 630 " <class>" + cl + "</class>\n" + 631 " <properties>\n" + 632 " location=" + loc + "\n" + 633 " </properties>\n" + 634 " <async>false</async>\n" + 635 " <fetchPersistentState>true</fetchPersistentState>\n" + 636 " <ignoreModifications>false</ignoreModifications>\n" + 637 " </cacheloader>\n" + 638 " \n" + 639 " </config>"; 640 Element element = XmlHelper.stringToElement(xml); 641 return XmlConfigurationParser.parseCacheLoaderConfig(element); 642 } 643 644 protected String getTempLocation(String cacheID) 645 { 646 String tmp_location = System.getProperty("java.io.tmpdir", "c:\\tmp"); 647 File file = new File (tmp_location); 648 file = new File (file, cacheID); 649 return file.getAbsolutePath(); 650 } 651 652 protected String escapeWindowsPath(String path) 653 { 654 if ('/' == File.separatorChar) 655 return path; 656 657 char[] chars = path.toCharArray(); 658 StringBuffer sb = new StringBuffer (); 659 for (int i = 0; i < chars.length; i++) 660 { 661 if (chars[i] == '\\') 662 sb.append('\\'); 663 sb.append(chars[i]); 664 } 665 return sb.toString(); 666 } 667 668 protected abstract String getReplicationVersion(); 669 670 protected void setUp() throws Exception 671 { 672 super.setUp(); 673 674 caches = new HashMap (); 675 676 addr1 = new Address(); 677 addr1.setStreet("101 Oakview Dr"); 678 addr1.setCity("Anytown"); 679 addr1.setZip(11111); 680 681 addr2 = new Address(); 682 addr2.setStreet("222 Happy Dr"); 683 addr2.setCity("Fremont"); 684 addr2.setZip(22222); 685 686 joe = new Person(); 687 joe.setName("Joe"); 688 joe.setAge(TWENTY); 689 joe.setAddress(addr1); 690 Set skills = new HashSet (); 691 skills.add("TENNIS"); 692 skills.add("CARPENTRY"); 693 joe.setSkills(skills); 694 695 jane = new Person(); 696 jane.setName("Jane"); 697 jane.setAge(TWENTYFIVE); 698 jane.setAddress(addr1); 699 skills = new HashSet (); 700 skills.add("JUJITSU"); 701 skills.add("MACRAME"); 702 jane.setSkills(skills); 703 704 bob = new Person(); 705 bob.setName("Bob"); 706 bob.setAge(FORTY); 707 bob.setAddress(addr2); 708 skills = new HashSet (); 709 skills.add("LANGUAGES"); 710 skills.add("LAWN BOWLING"); 711 bob.setSkills(skills); 712 713 jill = new Person(); 714 jill.setName("Jill"); 715 jill.setAge(TWENTYFIVE); 716 jill.setAddress(addr2); 717 skills = new HashSet (); 718 skills.add("FORTRAN"); 719 skills.add("COBOL"); 720 jane.setSkills(skills); 721 } 722 723 protected void tearDown() throws Exception 724 { 725 super.tearDown(); 726 727 Set keys = caches.keySet(); 728 if (!keys.isEmpty()) 729 { 730 String [] cacheIDs = new String [keys.size()]; 731 cacheIDs = (String []) keys.toArray(cacheIDs); 732 PojoCache cache = (PojoCache) caches.get(cacheIDs[0]); 733 cache.getCache().removeNode(new Fqn("/")); 734 Thread.sleep(200); 735 736 for (int i = 0; i < cacheIDs.length; i++) 737 { 738 stopCache((PojoCache) caches.get(cacheIDs[i])); 739 File file = new File (getTempLocation(cacheIDs[i])); 740 cleanFile(file); 741 } 742 } 743 } 744 745 protected void stopCache(PojoCache cache) 746 { 747 if (cache != null) 748 { 749 try 750 { 751 cache.stop(); 752 cache.destroy(); 753 } 754 catch (Exception e) 755 { 756 log.error("Exception stopping cache " + e.getMessage(), e); 757 } 758 } 759 } 760 761 protected void cleanFile(File file) 762 { 763 File [] children = file.listFiles(); 764 if (children != null) 765 { 766 for (int i = 0; i < children.length; i++) 767 { 768 cleanFile(children[i]); 769 } 770 } 771 772 if (file.exists()) 773 file.delete(); 774 if (file.exists()) 775 file.deleteOnExit(); 776 } 777 778 private class CacheActivator extends CacheUser 779 { 780 781 CacheActivator(Semaphore semaphore, 782 String name, 783 boolean sync) 784 throws Exception 785 { 786 super(semaphore, name, sync, false); 787 } 788 789 void useCache() throws Exception 790 { 791 cache.getCache().getRegion(Fqn.fromString("/a/b"), true).activate(); 792 log.info("TEST: " + name + " activated region" + " " + System.currentTimeMillis()); 793 String childFqn = "/a/b/" + name; 794 795 Person p = new Person(); 796 p.setName("Person " + name); 797 798 Address addr = new Address(); 799 addr.setStreet(name + " Test Street"); 800 addr.setCity(name + ", CA"); 801 p.setAddress(addr); 802 803 TestingUtil.sleepThread(1); 804 805 cache.attach(childFqn, p); 809 log.info("TEST: " + name + " put fqn " + childFqn + " " + System.currentTimeMillis()); 810 821 } 822 823 public Object getCacheValue(String fqn) throws CacheException 824 { 825 return cache.find(fqn); 826 } 827 } 828 829 private class CacheStressor extends CacheUser 830 { 831 private Random random; 832 private boolean putsStopped = false; 833 private boolean stopped = false; 834 835 CacheStressor(Semaphore semaphore, 836 String name, 837 boolean sync) 838 throws Exception 839 { 840 super(semaphore, name, sync, true); 841 842 random = new Random (System.currentTimeMillis() + name.hashCode()); 843 } 844 845 void useCache() throws Exception 846 { 847 int factor = 0; 851 int i = 0; 852 String fqn = null; 853 854 Address addr1 = new Address(); 855 addr1.setStreet("1 Test Street"); 856 addr1.setCity("TestOne, CA"); 857 858 Address addr2 = new Address(); 859 addr2.setStreet("2 Test Street"); 860 addr2.setCity("TestTwo, CA"); 861 862 Person[] people = new Person[SUBTREE_SIZE]; 863 boolean[] loaded = new boolean[SUBTREE_SIZE]; 864 for (int j = 0; j < SUBTREE_SIZE; j++) 865 { 866 Person p = new Person(); 867 p.setName("Person " + j); 868 p.setAge(j); 869 p.setAddress((j % 2 == 0) ? addr1 : addr2); 870 people[j] = p; 871 } 872 873 boolean acquired = true; 874 try 875 { 876 while (!stopped) 877 { 878 if (i > 0) 879 { 880 acquired = semaphore.attempt(60000); 881 if (!acquired) 882 throw new Exception (name + " cannot acquire semaphore"); 883 log.info("TEST: " + name + " reacquired semaphore"); 884 System.out.println("TEST: " + name + " reacquired semaphore"); 885 } 886 887 int lastIndex = -1; 888 int index = -1; 889 while (!putsStopped) 890 { 891 while (index % 2 == lastIndex % 2) 897 { 898 factor = random.nextInt(50); 899 index = factor % SUBTREE_SIZE; 900 } 901 902 lastIndex = index; 903 904 TestingUtil.sleepThread(factor); 905 906 fqn = "/" + name + "/" + String.valueOf(index); 907 908 if (loaded[index] == false) 912 { 913 cache.attach(fqn, people[index]); 914 loaded[index] = true; 915 log.info("TEST: " + name + " put Person at " + fqn); 916 } 917 else if (i % 2 == 0) 918 { 919 int newAge = factor / SUBTREE_SIZE; 920 people[index].setAge(newAge); 921 } 922 else 923 { 924 people[index].getAddress().setStreet(factor + " Test Street"); 925 } 926 937 i++; 938 } 939 940 log.info("TEST: " + name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE)); 941 942 semaphore.release(); 943 acquired = false; 944 945 while (!stopped && putsStopped) 947 TestingUtil.sleepThread(100); 948 } 949 } 950 finally 951 { 952 if (acquired) 953 semaphore.release(); 954 } 955 } 956 957 963 public void stopPuts() 964 { 965 putsStopped = true; 966 log.info("TEST: " + name + " putsStopped"); 967 } 968 969 public void startPuts() 970 { 971 putsStopped = false; 972 } 973 974 public void stopThread() 975 { 976 stopped = true; 977 if (thread.isAlive()) 978 thread.interrupt(); 979 } 980 981 982 } 983 984 private abstract class CacheUser implements Runnable 985 { 986 protected Semaphore semaphore; 987 protected PojoCache cache; 988 protected TransactionManager tm; 989 protected String name; 990 protected Exception exception; 991 protected Thread thread; 992 993 CacheUser(Semaphore semaphore, 994 String name, 995 boolean sync, 996 boolean activateRoot) 997 throws Exception 998 { 999 this.cache = createCache(name, sync, true, false, !activateRoot); 1000 tm = ((CacheSPI) cache.getCache()).getTransactionManager(); 1001 if (tm == null) 1002 throw new IllegalStateException ("TransactionManager required"); 1003 this.semaphore = semaphore; 1004 this.name = name; 1005 1006 log.info("TEST: Cache " + name + " started"); 1007 System.out.println("TEST: Cache " + name + " started"); 1008 } 1009 1010 public void run() 1011 { 1012 log.info("TEST: " + name + " started"); 1013 System.out.println("TEST: " + name + " started"); 1014 1015 boolean acquired = false; 1016 try 1017 { 1018 acquired = semaphore.attempt(60000); 1019 if (!acquired) 1020 throw new Exception (name + " cannot acquire semaphore"); 1021 log.info("TEST: " + name + " acquired semaphore"); 1022 System.out.println("TEST: " + name + " acquired semaphore"); 1023 useCache(); 1024 1025 } 1026 catch (Exception e) 1027 { 1028 log.error("TEST: " + name + ": " + e.getLocalizedMessage(), e); 1029 1030 exception = e; 1032 } 1033 finally 1034 { 1035 if (acquired) 1036 semaphore.release(); 1037 } 1038 1039 } 1040 1041 abstract void useCache() throws Exception ; 1042 1043 public Exception getException() 1044 { 1045 return exception; 1046 } 1047 1048 public Cache getCache() 1049 { 1050 return cache.getCache(); 1051 } 1052 1053 public void start() throws Exception 1054 { 1055 thread = new Thread (this); 1056 thread.start(); 1057 } 1058 1059 public void cleanup() 1060 { 1061 if (thread != null && thread.isAlive()) 1062 thread.interrupt(); 1063 } 1064 } 1065} 1066 | Popular Tags |