1 5 package com.opensymphony.oscache.base; 6 7 import com.opensymphony.oscache.general.GeneralCacheAdministrator; 8 9 import junit.framework.Test; 10 import junit.framework.TestCase; 11 import junit.framework.TestSuite; 12 13 import org.apache.commons.logging.Log; 14 import org.apache.commons.logging.LogFactory; 15 16 import java.util.BitSet ; 17 import java.util.Properties ; 18 19 26 public class TestConcurrency extends TestCase { 27 private static transient final Log log = LogFactory.getLog(GeneralCacheAdministrator.class); 29 private static GeneralCacheAdministrator admin = null; 31 32 private final String KEY = "key"; 34 private final String VALUE = "This is some content"; 35 private final int ITERATION_COUNT = 5; private final int THREAD_COUNT = 6; private final int UNIQUE_KEYS = 1013; 38 39 44 public TestConcurrency(String str) { 45 super(str); 46 } 47 48 52 public void setUp() { 53 if (admin == null) { 55 Properties config = new Properties (); 56 config.setProperty(AbstractCacheAdministrator.CACHE_CAPACITY_KEY, "70"); 57 config.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "false"); 58 admin = new GeneralCacheAdministrator(); 59 assertNotNull(admin); 60 } 61 } 62 63 68 public static Test suite() { 69 return new TestSuite(TestConcurrency.class); 70 } 71 72 76 public void testNewEntry() { 77 String key = "new"; 78 79 try { 80 admin.getFromCache(key, -1); 81 fail("NeedsRefreshException should have been thrown"); 82 } catch (NeedsRefreshException nre) { 83 GetEntry getEntry = new GetEntry(key, VALUE, -1, false); 85 Thread thread = new Thread (getEntry); 86 thread.start(); 87 getEntry = new GetEntry(key, VALUE, -1, false); 88 thread = new Thread (getEntry); 89 thread.start(); 90 91 try { 95 Thread.sleep(500); 96 } catch (InterruptedException ie) { 97 } 98 99 admin.putInCache(key, VALUE); 101 } 102 } 103 104 108 public void testNewEntryCancel() { 109 String key = "newCancel"; 110 String NEW_VALUE = VALUE + "..."; 111 112 try { 113 admin.getFromCache(key, -1); 114 fail("NeedsRefreshException should have been thrown"); 115 } catch (NeedsRefreshException nre) { 116 GetEntry getEntry = new GetEntry(key, NEW_VALUE, -1, true); 118 Thread thread = new Thread (getEntry); 119 thread.start(); 120 121 try { 123 Thread.sleep(500); 124 } catch (InterruptedException ie) { 125 } 126 127 admin.cancelUpdate(key); 130 131 try { 133 Thread.sleep(500); 134 } catch (InterruptedException ie) { 135 } 136 137 try { 138 Object newValue = admin.getFromCache(key, -1); 139 assertEquals(NEW_VALUE, newValue); 140 } catch (NeedsRefreshException e) { 141 admin.cancelUpdate(key); 142 fail("A NeedsRefreshException should not have been thrown"); 143 } 144 } 145 } 146 147 150 public void testPut() { 151 Thread [] thread = new Thread [THREAD_COUNT]; 152 153 for (int idx = 0; idx < THREAD_COUNT; idx++) { 154 OSGeneralTest runner = new OSGeneralTest(); 155 thread[idx] = new Thread (runner); 156 thread[idx].start(); 157 } 158 159 boolean stillAlive; 160 161 do { 162 try { 163 Thread.sleep(100); 164 } catch (InterruptedException e) { 165 } 167 168 stillAlive = false; 169 170 int i = 0; 171 172 while ((i < thread.length) && !stillAlive) { 173 stillAlive |= thread[i++].isAlive(); 174 } 175 } while (stillAlive); 176 } 177 178 182 public void testStaleEntry() { 183 String key = "stale"; 184 assertFalse("The cache should not be in blocking mode for this test.", admin.isBlocking()); 185 186 admin.putInCache(key, VALUE); 187 188 try { 189 admin.getFromCache(key, 0); 192 fail("NeedsRefreshException should have been thrown"); 193 } catch (NeedsRefreshException nre) { 194 GetEntry getEntry = new GetEntry(key, VALUE, 0, false); 198 Thread thread = new Thread (getEntry); 199 thread.start(); 200 201 try { 203 Thread.sleep(200); 204 } catch (InterruptedException ie) { 205 } 206 207 String newValue = "New value"; 210 admin.putInCache(key, newValue); 211 212 getEntry = new GetEntry(key, newValue, -1, false); 213 thread = new Thread (getEntry); 214 thread.start(); 215 216 try { 217 Object fromCache = admin.getFromCache(key, -1); 218 assertEquals(newValue, fromCache); 219 } catch (NeedsRefreshException e) { 220 admin.cancelUpdate(key); 221 fail("Should not have received a NeedsRefreshException"); 222 } 223 224 try { 226 Thread.sleep(200); 227 } catch (InterruptedException ie) { 228 } 229 } 230 } 231 232 235 public void testStaleEntryBlocking() { 236 admin.destroy(); 238 239 Properties p = new Properties (); 240 p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true"); 241 admin = new GeneralCacheAdministrator(p); 242 243 assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking()); 244 245 String key = "blocking"; 247 String NEW_VALUE = VALUE + " abc"; 248 admin.putInCache(key, VALUE); 249 250 try { 251 admin.getFromCache(key, 0); 253 fail("NeedsRefreshException should have been thrown"); 254 } catch (NeedsRefreshException nre) { 255 GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false); 259 Thread thread = new Thread (getEntry); 260 thread.start(); 261 262 try { 264 Thread.sleep(200); 265 } catch (InterruptedException ie) { 266 } 267 268 admin.putInCache(key, NEW_VALUE); 271 272 getEntry = new GetEntry(key, NEW_VALUE, -1, false); 273 thread = new Thread (getEntry); 274 thread.start(); 275 276 try { 277 Object fromCache = admin.getFromCache(key, -1); 278 assertEquals(NEW_VALUE, fromCache); 279 } catch (NeedsRefreshException e) { 280 admin.cancelUpdate(key); 281 fail("Should not have received a NeedsRefreshException"); 282 } 283 } 284 } 285 286 293 public void testConcurrentStaleGets() { 294 GeneralCacheAdministrator staticAdmin = admin; 295 admin = new GeneralCacheAdministrator(); 297 try { 298 Properties p = new Properties (); 301 p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true"); 302 admin = new GeneralCacheAdministrator(p); 303 304 assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking()); 305 306 int nbThreads = 50; 307 int retryByThreads = 10000; 308 309 String key = "new"; 310 311 admin.putInCache(key, VALUE); 313 314 try { 315 admin.getFromCache(key, 0); 317 fail("NeedsRefreshException should have been thrown"); 318 } catch (NeedsRefreshException nre) { 319 admin.cancelUpdate(key); 321 } 322 323 Thread [] spawnedThreads = new Thread [nbThreads]; 325 BitSet successfullThreadTerminations = new BitSet (nbThreads); 327 for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) { 328 GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations); 329 Thread thread = new Thread (getEntry); 330 spawnedThreads[threadIndex] = thread; 331 thread.start(); 332 } 333 334 int maxWaitingSeconds = 100; 337 int maxWaitForEachThread = 5; 338 long waitStartTime = System.currentTimeMillis(); 339 340 boolean atLeastOneThreadRunning = false; 341 342 while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) { 343 atLeastOneThreadRunning = false; 344 345 try { 347 Thread.sleep(500); 348 } catch (InterruptedException ie) { 349 } 350 351 for (int threadIndex = 0; threadIndex < nbThreads; 353 threadIndex++) { 354 Thread inspectedThread = spawnedThreads[threadIndex]; 355 356 try { 357 inspectedThread.join(maxWaitForEachThread * 1000); 358 } catch (InterruptedException e) { 359 fail("Thread #" + threadIndex + " was interrupted"); 360 } 361 362 if (inspectedThread.isAlive()) { 363 atLeastOneThreadRunning = true; 364 log.error("Thread #" + threadIndex + " did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s "); 365 } 366 } 367 368 if (!atLeastOneThreadRunning) { 369 break; } 371 } 372 373 assertTrue("at least one thread did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ", !atLeastOneThreadRunning); 374 375 for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) { 376 assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex)); 377 } 378 } finally { 379 admin = staticAdmin; 380 381 } 383 } 384 385 private class GetEntry implements Runnable { 386 String key; 387 String value; 388 boolean expectNRE; 389 int time; 390 391 GetEntry(String key, String value, int time, boolean expectNRE) { 392 this.key = key; 393 this.value = value; 394 this.time = time; 395 this.expectNRE = expectNRE; 396 } 397 398 public void run() { 399 try { 400 Object fromCache = admin.getFromCache(key, time); 402 assertEquals(value, fromCache); 403 } catch (NeedsRefreshException nre) { 404 if (!expectNRE) { 405 admin.cancelUpdate(key); 406 fail("Thread should have blocked until a new cache entry was ready"); 407 } else { 408 admin.putInCache(key, value); 410 } 411 } 412 } 413 } 414 415 418 private class GetStaleEntryAndCancelUpdate implements Runnable { 419 String key; 420 int retries; 421 int time; 422 private final BitSet successfullThreadTerminations; 423 private final int threadIndex; 424 425 GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) { 426 this.key = key; 427 this.time = time; 428 this.retries = retries; 429 this.threadIndex = threadIndex; 430 this.successfullThreadTerminations = successfullThreadTerminations; 431 } 432 433 public void run() { 434 for (int retryIndex = 0; retryIndex < retries; retryIndex++) { 435 try { 436 Object fromCache = admin.getFromCache(key, time); 438 assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache); 439 } catch (NeedsRefreshException nre) { 440 try { 441 admin.cancelUpdate(key); 442 } catch (Throwable t) { 443 log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t); 444 fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]"); 445 } 446 } catch (Throwable t) { 447 log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t); 448 fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]"); 449 } 450 } 451 452 synchronized (successfullThreadTerminations) { 454 successfullThreadTerminations.set(threadIndex); 455 } 456 } 457 } 458 459 private class OSGeneralTest implements Runnable { 460 public void doit(int i) { 461 int refreshPeriod = 500 ; 462 String key = KEY + (i % UNIQUE_KEYS); 463 admin.putInCache(key, VALUE); 464 465 try { 466 admin.getFromCache(KEY, refreshPeriod); 468 } catch (NeedsRefreshException nre) { 469 admin.putInCache(KEY, VALUE); 472 } 473 474 if ((i % (UNIQUE_KEYS + 1)) == 0) { 476 admin.getCache().flushEntry(key); 477 } 478 } 479 480 public void run() { 481 int start = (int) (Math.random() * UNIQUE_KEYS); 482 System.out.print(start + " "); 483 484 for (int i = start; i < (start + ITERATION_COUNT); i++) { 485 doit(i); 486 } 487 } 488 } 489 } 490 | Popular Tags |