1 22 23 package org.jboss.cache.statetransfer; 24 25 import org.jboss.cache.Cache; 26 import org.jboss.cache.CacheSPI; 27 import org.jboss.cache.Fqn; 28 import org.jboss.cache.config.Configuration; 29 import org.jboss.cache.config.Configuration.CacheMode; 30 import org.jboss.cache.factories.DefaultCacheFactory; 31 import org.jboss.cache.factories.UnitTestCacheFactory; 32 import org.jboss.cache.factories.XmlConfigurationParser; 33 import org.jboss.cache.loader.AbstractCacheLoaderTestBase; 34 import org.jboss.cache.loader.CacheLoader; 35 import org.jboss.cache.marshall.SelectedClassnameClassLoader; 36 import org.jboss.cache.misc.TestingUtil; 37 38 import java.io.File ; 39 import java.util.HashMap ; 40 import java.util.Map ; 41 import java.util.Properties ; 42 import java.util.Set ; 43 import java.util.concurrent.Semaphore ; 44 import java.util.concurrent.TimeUnit ; 45 46 52 public abstract class StateTransferTestBase extends AbstractCacheLoaderTestBase 53 { 54 public static final Fqn A_B = Fqn.fromString("/a/b"); 55 public static final Fqn A_C = Fqn.fromString("/a/c"); 56 public static final Fqn A_D = Fqn.fromString("/a/d"); 57 public static final String JOE = "JOE"; 58 public static final String BOB = "BOB"; 59 public static final String JANE = "JANE"; 60 public static final Integer TWENTY = 20; 61 public static final Integer FORTY = 40; 62 63 protected Map caches; 64 private ClassLoader orig_TCL; 65 66 67 protected abstract String getReplicationVersion(); 68 69 protected CacheSPI createCache(String cacheID, 70 boolean sync, 71 boolean useMarshalling, 72 boolean useCacheLoader) 73 throws Exception 74 { 75 return createCache(cacheID, sync, useMarshalling, useCacheLoader, false, true); 76 } 77 78 protected CacheSPI createCache(String cacheID, 79 boolean sync, 80 boolean useMarshalling, 81 boolean useCacheLoader, 82 boolean cacheLoaderAsync, 83 boolean startCache) 84 throws Exception 85 { 86 if (useCacheLoader) 87 { 88 return createCache(cacheID, sync, useMarshalling, "org.jboss.cache.loader.FileCacheLoader", cacheLoaderAsync, startCache); 89 } 90 else 91 { 92 return createCache(cacheID, sync, useMarshalling, null, cacheLoaderAsync, startCache); 93 } 94 } 95 96 protected CacheSPI createCache(String cacheID, boolean sync, boolean useMarshalling, String cacheLoaderClass, 97 boolean cacheLoaderAsync, boolean startCache) throws Exception 98 { 99 if (caches.get(cacheID) != null) 100 throw new IllegalStateException (cacheID + " already created"); 101 102 CacheMode mode = sync ? CacheMode.REPL_SYNC:CacheMode.REPL_ASYNC; 103 Configuration c = UnitTestCacheFactory.createConfiguration(mode); 104 c.setClusterName("VersionedTestBase"); 105 c.setReplVersionString(getReplicationVersion()); 106 c.setInitialStateRetrievalTimeout(60000); 108 if (useMarshalling) 109 { 110 c.setUseRegionBasedMarshalling(true); 111 c.setInactiveOnStartup(true); 112 } 113 if (cacheLoaderClass != null && cacheLoaderClass.length() > 0) 114 { 115 configureCacheLoader(c, cacheLoaderClass, cacheID, useMarshalling, cacheLoaderAsync); 116 } 117 CacheSPI tree = (CacheSPI) DefaultCacheFactory.getInstance().createCache(c, false); 119 120 configureMultiplexer(tree); 121 122 caches.put(cacheID, tree); 125 126 if (startCache) 127 { 128 tree.create(); 129 tree.start(); 130 } 131 132 return tree; 133 } 134 135 142 protected void configureMultiplexer(Cache cache) throws Exception 143 { 144 } 146 147 154 protected void validateMultiplexer(Cache cache) 155 { 156 assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer()); 157 } 158 159 protected void startCache(Cache cache) throws Exception 160 { 161 cache.create(); 162 cache.start(); 163 164 validateMultiplexer(cache); 165 } 166 167 protected void configureCacheLoader(Configuration c, 168 String cacheID, 169 boolean useExtended, 170 boolean async) 171 throws Exception 172 { 173 configureCacheLoader(c, "org.jboss.cache.loader.FileCacheLoader", cacheID, useExtended, async); 174 } 175 176 protected void configureCacheLoader(Configuration c, String cacheloaderClass, String cacheID, boolean useExtended, 177 boolean async) throws Exception 178 { 179 if (cacheloaderClass != null) 180 { 181 if (cacheloaderClass.equals("org.jboss.cache.loader.JDBCCacheLoader")) 182 { 183 Properties prop = new Properties (); 184 try 185 { 186 prop.load(this.getClass().getClassLoader().getResourceAsStream("cache-jdbc.properties")); 187 } 188 catch (Exception e) 189 { 190 System.out.println("Error loading jdbc properties "); 191 } 192 String props = "cache.jdbc.driver =" + prop.getProperty("cache.jdbc.driver") + "\n" + "cache.jdbc.url=" 193 + prop.getProperty("cache.jdbc.url") + "\n" + "cache.jdbc.user=" 194 + prop.getProperty("cache.jdbc.user") + "\n" + "cache.jdbc.password=" 195 + prop.getProperty("cache.jdbc.password") + "\n" + "cache.jdbc.node.type=" 196 + prop.getProperty("cache.jdbc.node.type"); 197 198 c.setCacheLoaderConfig(getSingleCacheLoaderConfig("", "org.jboss.cache.loader.JDBCCacheLoader", 199 props, false, true, false)); 200 } 201 else 202 { 203 204 String tmp_location = getTempLocation(cacheID); 205 206 File file = new File (tmp_location); 208 cleanFile(file); 209 210 file.mkdir(); 211 212 tmp_location = escapeWindowsPath(tmp_location); 213 String props = "location = " + tmp_location + "\n"; 214 c.setCacheLoaderConfig(getSingleCacheLoaderConfig("", cacheloaderClass, props, async, true, false)); 215 } 216 } 217 } 218 219 protected void initialStateTferWithLoaderTest(String cacheLoaderClass1, String cacheLoaderClass2, boolean asyncLoader) throws Exception 220 { 221 CacheSPI cache1 = createCache("cache1", false, false, cacheLoaderClass1, false, true); 222 223 cache1.put(A_B, "name", JOE); 224 cache1.put(A_B, "age", TWENTY); 225 cache1.put(A_C, "name", BOB); 226 cache1.put(A_C, "age", FORTY); 227 228 CacheSPI cache2 = createCache("cache2", false, false, cacheLoaderClass2, asyncLoader, false); 229 230 cache2.start(); 231 232 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000); 234 235 if (asyncLoader) 236 TestingUtil.sleepThread((long) 100); 237 238 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader(); 239 240 assertEquals("Incorrect loader name for /a/b", JOE, loader.get(A_B).get("name")); 241 assertEquals("Incorrect loader age for /a/b", TWENTY, loader.get(A_B).get("age")); 242 assertEquals("Incorrect loader name for /a/c", BOB, loader.get(A_C).get("name")); 243 assertEquals("Incorrect loader age for /a/c", FORTY, loader.get(A_C).get("age")); 244 245 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name")); 246 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age")); 247 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name")); 248 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age")); 249 } 250 251 protected String getTempLocation(String cacheID) 252 { 253 String tmp_location = System.getProperty("java.io.tmpdir", "c:\\tmp"); 254 File file = new File (tmp_location); 255 file = new File (file, cacheID); 256 return file.getAbsolutePath(); 257 } 258 259 protected String escapeWindowsPath(String path) 260 { 261 if ('/' == File.separatorChar) 262 return path; 263 264 char[] chars = path.toCharArray(); 265 StringBuffer sb = new StringBuffer (); 266 for (int i = 0; i < chars.length; i++) 267 { 268 if (chars[i] == '\\') 269 sb.append('\\'); 270 sb.append(chars[i]); 271 } 272 return sb.toString(); 273 } 274 275 protected void setUp() throws Exception 276 { 277 super.setUp(); 278 279 caches = new HashMap (); 280 281 orig_TCL = Thread.currentThread().getContextClassLoader(); 283 } 284 285 protected void tearDown() throws Exception 286 { 287 288 System.out.println("*** in tearDown()"); 289 290 super.tearDown(); 291 292 Thread.currentThread().setContextClassLoader(orig_TCL); 294 295 Set keys = caches.keySet(); 296 String [] cacheIDs = new String [keys.size()]; 297 cacheIDs = (String []) keys.toArray(cacheIDs); 298 for (int i = 0; i < cacheIDs.length; i++) 299 { 300 stopCache((Cache) caches.get(cacheIDs[i])); 301 TestingUtil.sleepThread(1500); 302 File file = new File (getTempLocation(cacheIDs[i])); 303 cleanFile(file); 304 } 305 } 306 307 protected void stopCache(Cache cache) 308 { 309 if (cache != null) 310 { 311 try 312 { 313 cache.stop(); 314 cache.destroy(); 315 } 316 catch (Exception e) 317 { 318 System.out.println("Exception stopping cache " + e.getMessage()); 319 e.printStackTrace(System.out); 320 } 321 } 322 } 323 324 protected void cleanFile(File file) 325 { 326 File [] children = file.listFiles(); 327 if (children != null) 328 { 329 for (int i = 0; i < children.length; i++) 330 { 331 cleanFile(children[i]); 332 } 333 } 334 335 if (file.exists()) 336 file.delete(); 337 if (file.exists()) 338 file.deleteOnExit(); 339 } 340 341 protected ClassLoader getClassLoader() throws Exception 342 { 343 String [] includesClasses = {"org.jboss.cache.marshall.Person", 344 "org.jboss.cache.marshall.Address"}; 345 String [] excludesClasses = {}; 346 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 347 return new SelectedClassnameClassLoader(includesClasses, excludesClasses, cl); 348 } 349 350 protected ClassLoader getNotFoundClassLoader() throws Exception 351 { 352 String [] notFoundClasses = {"org.jboss.cache.marshall.Person", 353 "org.jboss.cache.marshall.Address"}; 354 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 355 return new SelectedClassnameClassLoader(null, null, notFoundClasses, cl); 356 } 357 358 protected abstract class CacheUser implements Runnable 359 { 360 protected Semaphore semaphore; 361 protected CacheSPI cache; 362 protected String name; 363 protected Exception exception; 364 protected Thread thread; 365 366 CacheUser() 367 { 368 } 369 370 CacheUser(Semaphore semaphore, 371 String name, 372 boolean sync, 373 boolean activateRoot) 374 throws Exception 375 { 376 this.cache = createCache(name, sync, true, false); 377 this.semaphore = semaphore; 378 this.name = name; 379 380 if (activateRoot) 381 cache.getRegion(Fqn.ROOT, true).activate(); 382 } 383 384 public void run() 385 { 386 boolean acquired = false; 387 try 388 { 389 acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS); 390 if (!acquired) 391 throw new Exception (name + " cannot acquire semaphore"); 392 394 useCache(); 395 396 } 397 catch (Exception e) 398 { 399 System.out.println(name + ": " + e.getLocalizedMessage()); 400 e.printStackTrace(System.out); 401 402 exception = e; 404 } 405 finally 406 { 407 if (acquired) 408 semaphore.release(); 409 } 410 411 } 412 413 abstract void useCache() throws Exception ; 414 415 public Exception getException() 416 { 417 return exception; 418 } 419 420 public CacheSPI getCacheSPI() 421 { 422 return cache; 423 } 424 425 public String getName() 426 { 427 return name; 428 } 429 430 public void start() 431 { 432 thread = new Thread (this); 433 thread.start(); 434 } 435 436 public void cleanup() 437 { 438 if (thread != null && thread.isAlive()) 439 thread.interrupt(); 440 } 441 } 442 } 443 | Popular Tags |