KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > opensymphony > oscache > base > TestConcurrency


1 /*
2  * Copyright (c) 2002-2003 by OpenSymphony
3  * All rights reserved.
4  */

5 package com.opensymphony.oscache.base;
6
7 import com.opensymphony.oscache.general.GeneralCacheAdministrator;
8
9 import junit.framework.Test;
10 import junit.framework.TestCase;
11 import junit.framework.TestSuite;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15
16 import java.util.BitSet JavaDoc;
17 import java.util.Properties JavaDoc;
18
19 /**
20  * Test the Cache class for any concurrency problems
21  *
22  * $Id: TestConcurrency.java,v 1.1 2005/06/17 05:06:49 dres Exp $
23  * @version $Revision: 1.1 $
24  * @author <a HREF="mailto:chris@chris.com">Chris Miller</a>
25  */

26 public class TestConcurrency extends TestCase {
27     private static transient final Log log = LogFactory.getLog(GeneralCacheAdministrator.class); //TestConcurrency.class
28

29     // Static variables required thru all the tests
30
private static GeneralCacheAdministrator admin = null;
31
32     // Constants needed in the tests
33
private final String JavaDoc KEY = "key";
34     private final String JavaDoc VALUE = "This is some content";
35     private final int ITERATION_COUNT = 5; //500;
36
private final int THREAD_COUNT = 6; //600;
37
private final int UNIQUE_KEYS = 1013;
38
39     /**
40      * Class constructor.
41      * <p>
42      * @param str The test name (required by JUnit)
43      */

44     public TestConcurrency(String JavaDoc str) {
45         super(str);
46     }
47
48     /**
49      * This method is invoked before each testXXXX methods of the
50      * class. It set ups the variables required for each tests.
51      */

52     public void setUp() {
53         // At first invocation, create a new Cache
54
if (admin == null) {
55             Properties JavaDoc config = new Properties JavaDoc();
56             config.setProperty(AbstractCacheAdministrator.CACHE_CAPACITY_KEY, "70");
57             config.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "false");
58             admin = new GeneralCacheAdministrator();
59             assertNotNull(admin);
60         }
61     }
62
63     /**
64      * This methods returns the name of this test class to JUnit
65      * <p>
66      * @return The name of this class
67      */

68     public static Test suite() {
69         return new TestSuite(TestConcurrency.class);
70     }
71
72     /**
73      * Check that the cache handles simultaneous attempts to access a
74      * new cache entry correctly
75      */

76     public void testNewEntry() {
77         String JavaDoc key = "new";
78
79         try {
80             admin.getFromCache(key, -1);
81             fail("NeedsRefreshException should have been thrown");
82         } catch (NeedsRefreshException nre) {
83             // Fire off another couple of threads to get the same cache entry
84
GetEntry getEntry = new GetEntry(key, VALUE, -1, false);
85             Thread JavaDoc thread = new Thread JavaDoc(getEntry);
86             thread.start();
87             getEntry = new GetEntry(key, VALUE, -1, false);
88             thread = new Thread JavaDoc(getEntry);
89             thread.start();
90
91             // OK, those threads should now be blocked waiting for the new cache
92
// entry to appear. Sleep for a bit to simulate the time taken to
93
// build the cache entry
94
try {
95                 Thread.sleep(500);
96             } catch (InterruptedException JavaDoc ie) {
97             }
98
99             // Putting the entry in the cache should unblock the previous threads
100
admin.putInCache(key, VALUE);
101         }
102     }
103
104     /**
105      * Check that the cache handles simultaneous attempts to access a
106      * new cache entry correctly
107      */

108     public void testNewEntryCancel() {
109         String JavaDoc key = "newCancel";
110         String JavaDoc NEW_VALUE = VALUE + "...";
111
112         try {
113             admin.getFromCache(key, -1);
114             fail("NeedsRefreshException should have been thrown");
115         } catch (NeedsRefreshException nre) {
116             // Fire off another thread to get the same cache entry
117
GetEntry getEntry = new GetEntry(key, NEW_VALUE, -1, true);
118             Thread JavaDoc thread = new Thread JavaDoc(getEntry);
119             thread.start();
120
121             // The above thread will be blocked waiting for the new content
122
try {
123                 Thread.sleep(500);
124             } catch (InterruptedException JavaDoc ie) {
125             }
126
127             // Now cancel the update (eg because an exception occurred while building the content).
128
// This will unblock the other thread and it will receive a NeedsRefreshException.
129
admin.cancelUpdate(key);
130
131             // Wait a bit for the other thread to update the cache
132
try {
133                 Thread.sleep(500);
134             } catch (InterruptedException JavaDoc ie) {
135             }
136
137             try {
138                 Object JavaDoc newValue = admin.getFromCache(key, -1);
139                 assertEquals(NEW_VALUE, newValue);
140             } catch (NeedsRefreshException e) {
141                 admin.cancelUpdate(key);
142                 fail("A NeedsRefreshException should not have been thrown");
143             }
144         }
145     }
146
147     /**
148      * Verify that we can concurrently access the cache without problems
149      */

150     public void testPut() {
151         Thread JavaDoc[] thread = new Thread JavaDoc[THREAD_COUNT];
152
153         for (int idx = 0; idx < THREAD_COUNT; idx++) {
154             OSGeneralTest runner = new OSGeneralTest();
155             thread[idx] = new Thread JavaDoc(runner);
156             thread[idx].start();
157         }
158
159         boolean stillAlive;
160
161         do {
162             try {
163                 Thread.sleep(100);
164             } catch (InterruptedException JavaDoc e) {
165                 // do nothing
166
}
167
168             stillAlive = false;
169
170             int i = 0;
171
172             while ((i < thread.length) && !stillAlive) {
173                 stillAlive |= thread[i++].isAlive();
174             }
175         } while (stillAlive);
176     }
177
178     /**
179      * Check that the cache handles simultaneous attempts to access a
180      * stale cache entry correctly
181      */

182     public void testStaleEntry() {
183         String JavaDoc key = "stale";
184         assertFalse("The cache should not be in blocking mode for this test.", admin.isBlocking());
185
186         admin.putInCache(key, VALUE);
187
188         try {
189             // This should throw a NeedsRefreshException since the refresh
190
// period is 0
191
admin.getFromCache(key, 0);
192             fail("NeedsRefreshException should have been thrown");
193         } catch (NeedsRefreshException nre) {
194             // Fire off another thread to get the same cache entry.
195
// Since blocking mode is currently disabled we should
196
// immediately get back the stale entry
197
GetEntry getEntry = new GetEntry(key, VALUE, 0, false);
198             Thread JavaDoc thread = new Thread JavaDoc(getEntry);
199             thread.start();
200
201             // Sleep for a bit to simulate the time taken to build the cache entry
202
try {
203                 Thread.sleep(200);
204             } catch (InterruptedException JavaDoc ie) {
205             }
206
207             // Putting the entry in the cache should mean that threads now retrieve
208
// the updated entry
209
String JavaDoc newValue = "New value";
210             admin.putInCache(key, newValue);
211
212             getEntry = new GetEntry(key, newValue, -1, false);
213             thread = new Thread JavaDoc(getEntry);
214             thread.start();
215
216             try {
217                 Object JavaDoc fromCache = admin.getFromCache(key, -1);
218                 assertEquals(newValue, fromCache);
219             } catch (NeedsRefreshException e) {
220                 admin.cancelUpdate(key);
221                 fail("Should not have received a NeedsRefreshException");
222             }
223
224             // Give the GetEntry thread a chance to finish
225
try {
226                 Thread.sleep(200);
227             } catch (InterruptedException JavaDoc ie) {
228             }
229         }
230     }
231
232     /**
233      * A test for the updating of a stale entry when CACHE.BLOCKING = TRUE
234      */

235     public void testStaleEntryBlocking() {
236         // A test for the case where oscache.blocking = true
237
admin.destroy();
238
239         Properties JavaDoc p = new Properties JavaDoc();
240         p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
241         admin = new GeneralCacheAdministrator(p);
242
243         assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());
244
245         // Use a unique key in case these test entries are being persisted
246
String JavaDoc key = "blocking";
247         String JavaDoc NEW_VALUE = VALUE + " abc";
248         admin.putInCache(key, VALUE);
249
250         try {
251             // Force a NeedsRefreshException
252
admin.getFromCache(key, 0);
253             fail("NeedsRefreshException should have been thrown");
254         } catch (NeedsRefreshException nre) {
255             // Fire off another thread to get the same cache entry.
256
// Since blocking mode is enabled this thread should block
257
// until the entry has been updated.
258
GetEntry getEntry = new GetEntry(key, NEW_VALUE, 0, false);
259             Thread JavaDoc thread = new Thread JavaDoc(getEntry);
260             thread.start();
261
262             // Sleep for a bit to simulate the time taken to build the cache entry
263
try {
264                 Thread.sleep(200);
265             } catch (InterruptedException JavaDoc ie) {
266             }
267
268             // Putting the entry in the cache should mean that threads now retrieve
269
// the updated entry
270
admin.putInCache(key, NEW_VALUE);
271
272             getEntry = new GetEntry(key, NEW_VALUE, -1, false);
273             thread = new Thread JavaDoc(getEntry);
274             thread.start();
275
276             try {
277                 Object JavaDoc fromCache = admin.getFromCache(key, -1);
278                 assertEquals(NEW_VALUE, fromCache);
279             } catch (NeedsRefreshException e) {
280                 admin.cancelUpdate(key);
281                 fail("Should not have received a NeedsRefreshException");
282             }
283         }
284     }
285
286     /**
287      * Checks whether the cache handles simultaneous attempts to access a
288      * stable cache entry correctly when the blocking mode is enabled.
289      *
290      * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
291      * The test is sucessfull if after some time, all threads are properly released
292      */

293     public void testConcurrentStaleGets() {
294         GeneralCacheAdministrator staticAdmin = admin;
295         admin = new GeneralCacheAdministrator(); //avoid poluting other test cases
296

297         try {
298             // A test for the case where oscache.blocking = true
299
//admin.destroy();
300
Properties JavaDoc p = new Properties JavaDoc();
301             p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
302             admin = new GeneralCacheAdministrator(p);
303
304             assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());
305
306             int nbThreads = 50;
307             int retryByThreads = 10000;
308
309             String JavaDoc key = "new";
310
311             //First put a value
312
admin.putInCache(key, VALUE);
313
314             try {
315                 //Then test without concurrency that it is reported as stale when time-to-live is zero
316
admin.getFromCache(key, 0);
317                 fail("NeedsRefreshException should have been thrown");
318             } catch (NeedsRefreshException nre) {
319                 //Ok this is was is excpected, we can release the update
320
admin.cancelUpdate(key);
321             }
322
323             //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
324
Thread JavaDoc[] spawnedThreads = new Thread JavaDoc[nbThreads];
325             BitSet JavaDoc successfullThreadTerminations = new BitSet JavaDoc(nbThreads); //Track which thread successfully terminated
326

327             for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
328                 GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations);
329                 Thread JavaDoc thread = new Thread JavaDoc(getEntry);
330                 spawnedThreads[threadIndex] = thread;
331                 thread.start();
332             }
333
334             // OK, those threads should now repeatidely be blocked waiting for the new cache
335
// entry to appear. Wait for all of them to terminate
336
int maxWaitingSeconds = 100;
337             int maxWaitForEachThread = 5;
338             long waitStartTime = System.currentTimeMillis();
339
340             boolean atLeastOneThreadRunning = false;
341
342             while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) {
343                 atLeastOneThreadRunning = false;
344
345                 //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.
346
try {
347                     Thread.sleep(500);
348                 } catch (InterruptedException JavaDoc ie) {
349                 }
350
351                 //check whether all threads are done.
352
for (int threadIndex = 0; threadIndex < nbThreads;
353                         threadIndex++) {
354                     Thread JavaDoc inspectedThread = spawnedThreads[threadIndex];
355
356                     try {
357                         inspectedThread.join(maxWaitForEachThread * 1000);
358                     } catch (InterruptedException JavaDoc e) {
359                         fail("Thread #" + threadIndex + " was interrupted");
360                     }
361
362                     if (inspectedThread.isAlive()) {
363                         atLeastOneThreadRunning = true;
364                         log.error("Thread #" + threadIndex + " did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ");
365                     }
366                 }
367
368                 if (!atLeastOneThreadRunning) {
369                     break; //while loop, test success.
370
}
371             }
372
373             assertTrue("at least one thread did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ", !atLeastOneThreadRunning);
374
375             for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
376                 assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex));
377             }
378         } finally {
379             admin = staticAdmin;
380
381             //Avoid po
382
}
383     }
384
385     private class GetEntry implements Runnable JavaDoc {
386         String JavaDoc key;
387         String JavaDoc value;
388         boolean expectNRE;
389         int time;
390
391         GetEntry(String JavaDoc key, String JavaDoc value, int time, boolean expectNRE) {
392             this.key = key;
393             this.value = value;
394             this.time = time;
395             this.expectNRE = expectNRE;
396         }
397
398         public void run() {
399             try {
400                 // Get from the cache
401
Object JavaDoc fromCache = admin.getFromCache(key, time);
402                 assertEquals(value, fromCache);
403             } catch (NeedsRefreshException nre) {
404                 if (!expectNRE) {
405                     admin.cancelUpdate(key);
406                     fail("Thread should have blocked until a new cache entry was ready");
407                 } else {
408                     // Put a new piece of content into the cache
409
admin.putInCache(key, value);
410                 }
411             }
412         }
413     }
414
415     /**
416       * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
417       */

418     private class GetStaleEntryAndCancelUpdate implements Runnable JavaDoc {
419         String JavaDoc key;
420         int retries;
421         int time;
422         private final BitSet JavaDoc successfullThreadTerminations;
423         private final int threadIndex;
424
425         GetStaleEntryAndCancelUpdate(String JavaDoc key, int time, int retries, int threadIndex, BitSet JavaDoc successfullThreadTerminations) {
426             this.key = key;
427             this.time = time;
428             this.retries = retries;
429             this.threadIndex = threadIndex;
430             this.successfullThreadTerminations = successfullThreadTerminations;
431         }
432
433         public void run() {
434             for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
435                 try {
436                     // Get from the cache
437
Object JavaDoc fromCache = admin.getFromCache(key, time);
438                     assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache);
439                 } catch (NeedsRefreshException nre) {
440                     try {
441                         admin.cancelUpdate(key);
442                     } catch (Throwable JavaDoc t) {
443                         log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);
444                         fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
445                     }
446                 } catch (Throwable JavaDoc t) {
447                     log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);
448                     fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
449                 }
450             }
451
452             //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.
453
synchronized (successfullThreadTerminations) {
454                 successfullThreadTerminations.set(threadIndex);
455             }
456         }
457     }
458
459     private class OSGeneralTest implements Runnable JavaDoc {
460         public void doit(int i) {
461             int refreshPeriod = 500 /*millis*/;
462             String JavaDoc key = KEY + (i % UNIQUE_KEYS);
463             admin.putInCache(key, VALUE);
464
465             try {
466                 // Get from the cache
467
admin.getFromCache(KEY, refreshPeriod);
468             } catch (NeedsRefreshException nre) {
469                 // Get the value
470
// Store in the cache
471
admin.putInCache(KEY, VALUE);
472             }
473
474             // Flush occasionally
475
if ((i % (UNIQUE_KEYS + 1)) == 0) {
476                 admin.getCache().flushEntry(key);
477             }
478         }
479
480         public void run() {
481             int start = (int) (Math.random() * UNIQUE_KEYS);
482             System.out.print(start + " ");
483
484             for (int i = start; i < (start + ITERATION_COUNT); i++) {
485                 doit(i);
486             }
487         }
488     }
489 }
490
Popular Tags