KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > statetransfer > VersionedTestBase


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7
8 package org.jboss.cache.statetransfer;
9
10 import org.jboss.cache.Cache;
11 import org.jboss.cache.CacheException;
12 import org.jboss.cache.CacheSPI;
13 import org.jboss.cache.Fqn;
14 import org.jboss.cache.Node;
15 import org.jboss.cache.Region;
16 import org.jboss.cache.config.Configuration;
17 import org.jboss.cache.config.Configuration.CacheMode;
18 import org.jboss.cache.factories.DefaultCacheFactory;
19 import org.jboss.cache.factories.UnitTestCacheFactory;
20 import org.jboss.cache.factories.XmlConfigurationParser;
21 import org.jboss.cache.loader.CacheLoader;
22 import org.jboss.cache.marshall.InactiveRegionException;
23 import org.jboss.cache.misc.TestingUtil;
24
25 import java.lang.reflect.Method JavaDoc;
26 import java.util.Random JavaDoc;
27 import java.util.Set JavaDoc;
28 import java.util.concurrent.Semaphore JavaDoc;
29 import java.util.concurrent.TimeUnit JavaDoc;
30
31 /**
32  * Abstract superclass of "StateTransferVersion"-specific tests
33  * of CacheSPI's state transfer capability.
34  * <p/>
35  * TODO add tests with classloader regions
36  *
37  * @author <a HREF="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
38  * @version $Id$
39  */

40 public abstract class VersionedTestBase extends StateTransferTestBase
41 {
42    private static final int SUBTREE_SIZE = 10;
43
44    public static final Fqn A = Fqn.fromString("/a");
45    public static final Fqn B = Fqn.fromString("/b");
46    public static final Fqn C = Fqn.fromString("/c");
47
48    protected static final String JavaDoc ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address";
49    protected static final String JavaDoc PERSON_CLASSNAME = "org.jboss.cache.marshall.data.Person";
50
51    public static final Fqn A_B = Fqn.fromString("/a/b");
52    public static final Fqn A_C = Fqn.fromString("/a/c");
53    public static final Fqn A_D = Fqn.fromString("/a/d");
54
55    public void testInitialStateTransfer() throws Exception JavaDoc
56    {
57       CacheSPI cache1 = createCache("cache1", false, false, false);
58
59       cache1.put(A_B, "name", JOE);
60       cache1.put(A_B, "age", TWENTY);
61       cache1.put(A_C, "name", BOB);
62       cache1.put(A_C, "age", FORTY);
63
64       CacheSPI cache2 = createCache("cache2", false, false, false);
65
66       // Pause to give caches time to see each other
67
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
68
69       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
70       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
71       assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
72       assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
73    }
74
75    public void testInitialStateTferWithLoader() throws Exception JavaDoc
76    {
77       initialStateTferWithLoaderTest(false);
78    }
79
80    public void testInitialStateTferWithAsyncLoader() throws Exception JavaDoc
81    {
82       initialStateTferWithLoaderTest(true);
83    }
84
85    protected void initialStateTferWithLoaderTest(boolean asyncLoader) throws Exception JavaDoc
86    {
87       initialStateTferWithLoaderTest("org.jboss.cache.loader.FileCacheLoader",
88               "org.jboss.cache.loader.FileCacheLoader", asyncLoader);
89    }
90
91    public void testPartialStateTransfer() throws Exception JavaDoc
92    {
93       CacheSPI cache1 = createCache("cache1", false, true, false);
94
95       cache1.getRegion(A, true).activate();
96
97       cache1.put(A_B, "name", JOE);
98       cache1.put(A_B, "age", TWENTY);
99       cache1.put(A_C, "name", BOB);
100       cache1.put(A_C, "age", FORTY);
101
102       CacheSPI cache2 = createCache("cache2", false, true, false);
103
104       // Pause to give caches time to see each other
105
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
106
107       assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
108       assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
109       assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
110       assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
111
112       cache2.getRegion(A_B, true).activate();
113       System.out.println("Region A_B on cache2: " + cache2.getRegion(A_B, false));
114
115       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
116       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
117       assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
118       assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
119
120       cache1.put(A_D, "name", JANE);
121
122       assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
123
124       cache2.getRegion(A_C, true).activate();
125
126       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
127       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
128       assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
129       assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
130       assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
131
132       cache2.getRegion(A_D, true).activate();
133
134       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
135       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
136       assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
137       assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
138       assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
139
140       cache1.getRegion(A, true).deactivate();
141
142       cache1.getRegion(A_B, true).activate();
143       cache1.getRegion(A_C, true).activate();
144       cache1.getRegion(A_D, true).activate();
145
146       assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
147       assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
148       assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
149       assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
150       assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
151
152    }
153
154    public void testPartialStateTferWithLoader() throws Exception JavaDoc
155    {
156       CacheSPI cache1 = createCache("cache1", false, true, true);
157
158       cache1.getRegion(A, true).activate();
159
160       cache1.put(A_B, "name", JOE);
161       cache1.put(A_B, "age", TWENTY);
162       cache1.put(A_C, "name", BOB);
163       cache1.put(A_C, "age", FORTY);
164
165       CacheSPI cache2 = createCache("cache2", false, true, true);
166
167       // Pause to give caches time to see each other
168
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
169
170       CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
171
172       assertNull("/a/b transferred to loader against policy", loader.get(A_B));
173
174       assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
175       assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
176       assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
177       assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
178
179       cache2.getRegion(A_B, true).activate();
180
181       assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
182       assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
183       assertNull("/a/c transferred to loader against policy", loader.get(A_C));
184
185       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
186       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
187       assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
188       assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
189
190       cache1.put(A_D, "name", JANE);
191
192       assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
193
194       cache2.getRegion(A_C, true).activate();
195
196       assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
197       assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
198       assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
199       assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
200
201       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
202       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
203       assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
204       assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
205       assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
206
207       cache2.getRegion(A_D, true).activate();
208
209       assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
210       assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
211       assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
212       assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
213       assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
214
215       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
216       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
217       assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
218       assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
219       assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
220
221       cache1.getRegion(A, true).deactivate();
222
223       cache1.getRegion(A_B, true).activate();
224       cache1.getRegion(A_C, true).activate();
225       cache1.getRegion(A_D, true).activate();
226
227       loader = cache1.getCacheLoaderManager().getCacheLoader();
228
229       assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
230       assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
231       assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
232       assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
233       assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
234
235       assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
236       assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
237       assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
238       assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
239       assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
240    }
241
242    public void testPartialStateTferWithClassLoader() throws Exception JavaDoc
243    {
244       // FIXME: This test is meaningless because MarshalledValueInputStream
245
// will find the classes w/ their own loader if TCL can't. Need
246
// to find a way to test!
247
// But, at least it tests JBCACHE-305 by registering a classloader
248
// both before and after start()
249

250       // Set the TCL to a classloader that can't see Person/Address
251
Thread.currentThread().setContextClassLoader(getNotFoundClassLoader());
252
253       CacheSPI cache1 = createCache("cache1",
254               false, // async
255
true, // use marshaller
256
true, // use cacheloader
257
false, false); // don't start
258
ClassLoader JavaDoc cl1 = getClassLoader();
259       cache1.getRegion(A, true).registerContextClassLoader(cl1);
260       startCache(cache1);
261
262       cache1.getRegion(A, true).activate();
263
264       Object JavaDoc ben = createBen(cl1);
265
266       cache1.put(A_B, "person", ben);
267
268       // For cache 2 we won't register loader until later
269
CacheSPI cache2 = createCache("cache2",
270               false, // async
271
true, // use marshalling
272
true, // use cacheloader
273
false, true); // start
274

275       // Pause to give caches time to see each other
276
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
277
278       CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
279
280       assertNull("/a/b not transferred to loader", loader.get(A_B));
281
282       assertNull("/a/b not transferred to cache", cache2.get(A_B, "person"));
283
284       ClassLoader JavaDoc cl2 = getClassLoader();
285
286 // cache2.registerClassLoader(A, cl2);
287
Region r = cache2.getRegion(A, true);
288       r.registerContextClassLoader(cl2);
289
290       r.activate();
291
292       assertEquals("Correct state from loader for /a/b", ben.toString(), loader.get(A_B).get("person").toString());
293
294       assertEquals("Correct state from cache for /a/b", ben.toString(), cache2.get(A_B, "person").toString());
295
296    }
297
298    public void testLoadEntireStateAfterStart() throws Exception JavaDoc
299    {
300       CacheSPI cache1 = createCache("cache1", false, true, true);
301
302       cache1.getRegion(Fqn.ROOT, true).activate();
303
304       cache1.put(A_B, "name", JOE);
305       cache1.put(A_B, "age", TWENTY);
306       cache1.put(A_C, "name", BOB);
307       cache1.put(A_C, "age", FORTY);
308
309       CacheSPI cache2 = createCache("cache2", false, true, true);
310
311       // Pause to give caches time to see each other
312
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
313
314       CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
315
316       assertNull("/a/b transferred to loader against policy", loader.get(A_B));
317
318       assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
319       assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
320       assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
321       assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
322
323       cache2.getRegion(Fqn.ROOT, true).activate();
324
325       assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
326       assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
327       assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
328       assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
329
330       assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
331       assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
332       assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
333       assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
334    }
335
336    /**
337     * Tests concurrent activation of the same subtree by multiple nodes in a
338     * REPL_SYNC environment. The idea is to see what would happen with a
339     * farmed deployment. See <code>concurrentActivationTest</code> for details.
340     *
341     * @throws Exception
342     */

343    public void testConcurrentActivationSync() throws Exception JavaDoc
344    {
345       concurrentActivationTest(true);
346    }
347
348    /**
349     * Tests concurrent activation of the same subtree by multiple nodes in a
350     * REPL_ASYNC environment. The idea is to see what would happen with a
351     * farmed deployment. See <code>concurrentActivationTest</code> for details.
352     *
353     * @throws Exception
354     */

355    public void testConcurrentActivationAsync() throws Exception JavaDoc
356    {
357       concurrentActivationTest(false);
358    }
359
360    /**
361     * Starts 5 caches and then concurrently activates the same region under
362     * all 5, causing each to attempt a partial state transfer from the others.
363     * As soon as each cache has activated its region, it does a put to a node
364     * in the region, thus complicating the lives of the other caches trying
365     * to get partial state.
366     * <p/>
367     * Failure condition is if any node sees an exception or if the final state
368     * of all caches is not consistent.
369     *
370     * @param sync use REPL_SYNC or REPL_ASYNC
371     * @throws Exception
372     */

373    private void concurrentActivationTest(boolean sync) throws Exception JavaDoc
374    {
375       String JavaDoc[] names = {"A", "B", "C", "D", "E"};
376       int count = names.length;
377       CacheActivator[] activators = new CacheActivator[count];
378
379
380       try
381       {
382          // Create a semaphore and take all its tickets
383
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(count);
384          for (int i = 0; i < count; i++)
385          {
386             semaphore.acquire();
387          }
388
389          // Create activation threads that will block on the semaphore
390
CacheSPI[] caches = new CacheSPI[count];
391          for (int i = 0; i < count; i++)
392          {
393             activators[i] = new CacheActivator(semaphore, names[i], sync);
394             caches[i] = activators[i].getCacheSPI();
395             activators[i].start();
396          }
397
398          // Make sure everyone is in sync
399
TestingUtil.blockUntilViewsReceived(caches, 60000);
400
401          // Release the semaphore to allow the threads to start work
402
semaphore.release(count);
403
404          // Sleep to ensure the threads get all the semaphore tickets
405
TestingUtil.sleepThread((long) 1000);
406
407          // Reacquire the semaphore tickets; when we have them all
408
// we know the threads are done
409
for (int i = 0; i < count; i++)
410          {
411             boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
412             if (!acquired)
413                fail("failed to acquire semaphore " + i);
414          }
415
416          // Sleep to allow any async calls to clear
417
if (!sync)
418             TestingUtil.sleepThread((long) 500);
419
420          // Ensure the caches held by the activators see all the values
421
for (int i = 0; i < count; i++)
422          {
423             Exception JavaDoc aException = activators[i].getException();
424             boolean gotUnexpectedException = aException != null
425                     && !(aException instanceof InactiveRegionException ||
426                     aException.getCause() instanceof InactiveRegionException);
427             if (gotUnexpectedException)
428             {
429                fail("Activator " + names[i] + " caught an exception " + aException);
430             }
431
432             for (int j = 0; j < count; j++)
433             {
434                Fqn fqn = new Fqn(A_B, names[j]);
435                assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
436                        "VALUE", activators[i].getCacheValue(fqn));
437 // System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
438
}
439
440          }
441       }
442       catch (Exception JavaDoc ex)
443       {
444          fail(ex.getLocalizedMessage());
445       }
446       finally
447       {
448          for (int i = 0; i < count; i++)
449             activators[i].cleanup();
450       }
451
452    }
453
454    /**
455     * Starts two caches where each cache has N regions. We put some data in each of the regions.
456     * We run two threads where each thread creates a cache then goes into a loop where it
457     * activates the N regions, with a 1 sec pause between activations.
458     * <p/>
459     * Threads are started with 10 sec difference.
460     * <p/>
461     * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
462     * then deploying webapps.
463     * <p/>
464     * <p/>
465     * <p/>
466     * Failure condition is if any node sees an exception or if the final state
467     * of all caches is not consistent.
468     *
469     * @param sync use REPL_SYNC or REPL_ASYNC
470     * @throws Exception
471     */

472    private void concurrentActivationTest2(boolean sync) throws Exception JavaDoc
473    {
474       String JavaDoc[] names = {"A", "B"};
475       int count = names.length;
476       int regionsToActivate = 15;
477       int sleepTimeBetweenNodeStarts = 10000;
478       StaggeredWebDeployerActivator[] activators = new StaggeredWebDeployerActivator[count];
479       try
480       {
481          // Create a semaphore and take all its tickets
482
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(count);
483          for (int i = 0; i < count; i++)
484          {
485             semaphore.acquire();
486          }
487
488          // Create activation threads that will block on the semaphore
489
CacheSPI[] caches = new CacheSPI[count];
490          for (int i = 0; i < count; i++)
491          {
492             activators[i] = new StaggeredWebDeployerActivator(semaphore, names[i], sync, regionsToActivate);
493             caches[i] = activators[i].getCacheSPI();
494
495             // Release the semaphore to allow the thread to start working
496
semaphore.release(1);
497
498             activators[i].start();
499             TestingUtil.sleepThread(sleepTimeBetweenNodeStarts);
500          }
501
502          // Make sure everyone is in sync
503
TestingUtil.blockUntilViewsReceived(caches, 60000);
504
505          // Sleep to ensure the threads get all the semaphore tickets
506
TestingUtil.sleepThread(1000);
507
508          // Reacquire the semaphore tickets; when we have them all
509
// we know the threads are done
510
for (int i = 0; i < count; i++)
511          {
512             boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
513             if (!acquired)
514                fail("failed to acquire semaphore " + i);
515          }
516
517          // Sleep to allow any async calls to clear
518
if (!sync)
519             TestingUtil.sleepThread(1000);
520
521          // Ensure the caches held by the activators see all the values
522
for (int i = 0; i < count; i++)
523          {
524             Exception JavaDoc aException = activators[i].getException();
525             boolean gotUnexpectedException = aException != null
526                     && !(aException instanceof InactiveRegionException ||
527                     aException.getCause() instanceof InactiveRegionException);
528             if (gotUnexpectedException)
529             {
530                fail("Activator " + names[i] + " caught an exception " + aException);
531             }
532
533             for (int j = 0; j < regionsToActivate; j++)
534             {
535                Fqn fqn = Fqn.fromString("/a/" + i + "/" + names[i]);
536                assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
537                        "VALUE", activators[i].getCacheValue(fqn));
538             }
539          }
540       }
541       catch (Exception JavaDoc ex)
542       {
543          fail(ex.getLocalizedMessage());
544       }
545       finally
546       {
547          for (int i = 0; i < count; i++)
548             activators[i].cleanup();
549       }
550
551    }
552
553    /**
554     * Starts two caches where each cache has N regions. We put some data in each of the regions.
555     * We run two threads where each thread creates a cache then goes into a loop where it
556     * activates the N regions, with a 1 sec pause between activations.
557     * <p/>
558     * Threads are started with 10 sec difference.
559     * <p/>
560     * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
561     * then deploying webapps.
562     * <p/>
563     * <p/>
564     * <p/>
565     * Failure condition is if any node sees an exception or if the final state
566     * of all caches is not consistent.
567     */

568    public void testConcurrentStartupActivationAsync() throws Exception JavaDoc
569    {
570       concurrentActivationTest2(false);
571    }
572
573    /**
574     * Starts two caches where each cache has N regions. We put some data in each of the regions.
575     * We run two threads where each thread creates a cache then goes into a loop where it
576     * activates the N regions, with a 1 sec pause between activations.
577     * <p/>
578     * Threads are started with 10 sec difference.
579     * <p/>
580     * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
581     * then deploying webapps.
582     * <p/>
583     * <p/>
584     * <p/>
585     * Failure condition is if any node sees an exception or if the final state
586     * of all caches is not consistent.
587     */

588    public void testConcurrentStartupActivationSync() throws Exception JavaDoc
589    {
590       concurrentActivationTest2(true);
591    }
592
593    /**
594     * Tests partial state transfer under heavy concurrent load and REPL_SYNC.
595     * See <code>concurrentUseTest</code> for details.
596     *
597     * @throws Exception
598     */

599    public void testConcurrentUseSync() throws Exception JavaDoc
600    {
601       concurrentUseTest(true);
602    }
603
604    /**
605     * Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
606     * See <code>concurrentUseTest</code> for details.
607     *
608     * @throws Exception
609     */

610    public void testConcurrentUseAsync() throws Exception JavaDoc
611    {
612       concurrentUseTest(false);
613    }
614
615    /**
616     * Initiates 5 caches, 4 with active trees and one with an inactive tree.
617     * Each of the active caches begins rapidly generating puts against nodes
618     * in a subtree for which it is responsible. The 5th cache activates
619     * each subtree, and at the end confirms no node saw any exceptions and
620     * that each node has consistent state.
621     *
622     * @param sync whether to use REPL_SYNC or REPL_ASYNCE
623     * @throws Exception
624     */

625    private void concurrentUseTest(boolean sync) throws Exception JavaDoc
626    {
627       String JavaDoc[] names = {"B", "C", "D", "E"};
628       int count = names.length;
629       CacheStressor[] stressors = new CacheStressor[count];
630
631       try
632       {
633
634          // The first cache we create is inactivated.
635
CacheSPI cacheA = createCache("cacheA", sync, true, false);
636
637          CacheSPI[] caches = new CacheSPI[count + 1];
638          caches[0] = cacheA;
639
640          // Create a semaphore and take all its tickets
641
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(count);
642          for (int i = 0; i < count; i++)
643          {
644             semaphore.acquire();
645          }
646
647          // Create stressor threads that will block on the semaphore
648

649          for (int i = 0; i < count; i++)
650          {
651             stressors[i] = new CacheStressor(semaphore, names[i], sync);
652             caches[i + 1] = stressors[i].getCacheSPI();
653             stressors[i].start();
654          }
655
656          // Make sure everyone's views are in sync
657
TestingUtil.blockUntilViewsReceived(caches, 60000);
658
659          // Repeat the basic test four times
660
//for (int x = 0; x < 4; x++)
661
for (int x = 0; x < 1; x++)
662          {
663             if (x > 0)
664             {
665                // Reset things by inactivating the region
666
// and enabling the stressors
667
for (int i = 0; i < count; i++)
668                {
669                   cacheA.getRegion(Fqn.fromString("/" + names[i]), true).deactivate();
670                   System.out.println("Run " + x + "-- /" + names[i] + " deactivated on A");
671                   stressors[i].startPuts();
672                }
673             }
674
675             // Release the semaphore to allow the threads to start work
676
semaphore.release(count);
677
678             // Sleep to ensure the threads get all the semaphore tickets
679
// and to ensure puts are actively in progress
680
TestingUtil.sleepThread((long) 300);
681
682             // Activate cacheA
683
for (int i = 0; i < count; i++)
684             {
685 // System.out.println("Activating /" + names[i] + " on A");
686
cacheA.getRegion(Fqn.fromString("/" + names[i]), true).activate();
687                // Stop the stressor so we don't pollute cacheA's state
688
// with too many messages sent after activation -- we want
689
// to compare transferred state with the sender
690
stressors[i].stopPuts();
691                System.out.println("Run " + x + "-- /" + names[i] + " activated on A");
692                // Reacquire one semaphore ticket
693
boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
694                if (!acquired)
695                   fail("failed to acquire semaphore " + i);
696
697                // Pause to allow other work to proceed
698
TestingUtil.sleepThread((long) 100);
699             }
700
701             // Sleep to allow any in transit msgs to clear
702
// if (!sync)
703
TestingUtil.sleepThread((long) 1000);
704
705             // Ensure the stressors saw no exceptions
706
for (int i = 0; i < count; i++)
707             {
708                if (stressors[i].getException() != null && !(stressors[i].getException() instanceof InactiveRegionException))
709                {
710                   fail("Stressor " + names[i] + " caught an exception " + stressors[i].getException());
711                }
712
713             }
714
715             // Compare cache contents
716
for (int i = 0; i < count; i++)
717             {
718                for (int j = 0; j < SUBTREE_SIZE; j++)
719                {
720                   Fqn fqn = Fqn.fromString("/" + names[i] + "/" + j);
721                   assertEquals("/A/" + j + " matches " + fqn,
722                           cacheA.get(fqn, "KEY"),
723                           stressors[i].getCacheSPI().get(fqn, "KEY"));
724                }
725             }
726          }
727
728          for (int i = 0; i < count; i++)
729             stressors[i].stopThread();
730
731       }
732       finally
733       {
734          for (int i = 0; i < count; i++)
735          {
736             if (stressors[i] != null)
737                stressors[i].cleanup();
738          }
739       }
740
741    }
742
743    /**
744     * Test for JBCACHE-913
745     *
746     * @throws Exception
747     */

748    public void testEvictionSeesStateTransfer() throws Exception JavaDoc
749    {
750       
751       Configuration c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true);
752       Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
753       cache1.start();
754       caches.put("evict1", cache1);
755       
756       cache1.put(Fqn.fromString("/a/b/c"), "key", "value");
757       
758       c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true);
759       Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
760       cache2.start();
761       caches.put("evict2", cache2);
762       
763       Region region = cache2.getRegion(Fqn.ROOT, false);
764       // We expect events for /a, /a/b and /a/b/c
765
assertEquals("Saw the expected number of node events", 3, region.nodeEventQueueSize());
766    }
767    
768    /**
769     * Further test for JBCACHE-913
770     *
771     * @throws Exception
772     */

773    public void testEvictionAfterStateTransfer() throws Exception JavaDoc
774    {
775       Configuration c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true);
776       Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
777       cache1.start();
778       caches.put("evict1", cache1);
779       
780       for (int i = 0; i < 25000; i++)
781       {
782          cache1.put(Fqn.fromString("/base/" + i), "key", "base" + i);
783          if (i < 5)
784              cache1.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
785       }
786       
787       c = UnitTestCacheFactory.createConfiguration(CacheMode.REPL_SYNC,true);
788       final Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
789       cache2.start();
790       caches.put("evict2", cache2);
791       
792       Node parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
793       Set JavaDoc children = parent.getChildren();
794       assertEquals("All data children transferred", 5, children.size());
795       parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
796       children = parent.getChildren();
797       assertTrue("Minimum number of base children transferred", children.size() >= 5000);
798       
799       // Sleep 2.5 secs so the nodes we are about to create in data won't
800
// exceed the 4 sec TTL when eviction thread runs
801
TestingUtil.sleepThread(2500);
802       
803       class Putter extends Thread JavaDoc
804       {
805           Cache cache = null;
806           boolean stopped = false;
807           Exception JavaDoc ex = null;
808           public void run()
809           {
810               int i = 25000;
811               while (!stopped)
812               {
813                   try
814                   {
815                       cache.put(Fqn.fromString("/base/" + i), "key", "base" + i);
816                       cache.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data"+i);
817                       i++;
818                   }
819                   catch (Exception JavaDoc e)
820                   {
821                     ex = e;
822                   }
823               }
824           }
825       }
826       Putter p1= new Putter();
827       p1.cache = cache1;
828       p1.start();
829       Putter p2= new Putter();
830       p2.cache = cache2;
831       p2.start();
832       
833       Random JavaDoc rnd = new Random JavaDoc();
834       TestingUtil.sleepThread(rnd.nextInt(200));
835       
836       int maxCountBase = 0;
837       int maxCountData = 0;
838       boolean sawBaseDecrease = false;
839       boolean sawDataDecrease = false;
840       long start = System.currentTimeMillis();
841       while ((System.currentTimeMillis() - start) < 10000)
842       {
843          parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
844          children = parent.getChildren();
845          if (children != null)
846          {
847             int dataCount = children.size();
848             if (dataCount < maxCountData)
849             {
850                 System.out.println("data " + dataCount + " < " + maxCountData + " elapsed = " + (System.currentTimeMillis() - start));
851                 sawDataDecrease = true;
852             }
853             else
854             {
855                 maxCountData = dataCount;
856             }
857          }
858          
859          parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
860          children = parent.getChildren();
861          if (children != null)
862          {
863             int baseCount = children.size();
864             if (baseCount < maxCountBase)
865             {
866                 System.out.println("base " + baseCount + " < " + maxCountBase+ " elapsed = " + (System.currentTimeMillis() - start));
867                 sawBaseDecrease = true;
868             }
869             else
870             {
871                 maxCountBase = baseCount;
872             }
873          }
874          
875          if (sawDataDecrease && sawBaseDecrease)
876          {
877              break;
878          }
879          
880          TestingUtil.sleepThread(50);
881       }
882       
883       p1.stopped = true;
884       p2.stopped = true;
885       p1.join(1000);
886       p2.join(1000);
887       
888       assertTrue("Saw data decrease", sawDataDecrease);
889       assertTrue("Saw base decrease", sawBaseDecrease);
890       assertNull("No exceptions in p1", p1.ex);
891       assertNull("No exceptions in p2", p2.ex);
892       
893       // Sleep 5.1 secs so we are sure the eviction thread ran
894
TestingUtil.sleepThread(5100);
895       
896       parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
897       children = parent.getChildren();
898       if (children != null)
899       {
900          System.out.println(children.size());
901          assertTrue("Excess children evicted", children.size() <= 5);
902       }
903       parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
904       children = parent.getChildren();
905       if (children != null)
906       {
907          System.out.println(children.size());
908          assertTrue("Excess children evicted", children.size() <= 25000);
909       }
910       
911       // Sleep more to let the eviction thread run again,
912
// which will evict all data nodes due to their ttl of 4 secs
913
TestingUtil.sleepThread(8100);
914       
915       parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
916       children = parent.getChildren();
917       if (children != null)
918          assertEquals("All data children evicted", 0, children.size());
919    }
920
921    private Object JavaDoc createBen(ClassLoader JavaDoc loader) throws Exception JavaDoc
922    {
923       Class JavaDoc addrClazz = loader.loadClass(ADDRESS_CLASSNAME);
924       Method JavaDoc setCity = addrClazz.getMethod("setCity", new Class JavaDoc[]{String JavaDoc.class});
925       Method JavaDoc setStreet = addrClazz.getMethod("setStreet", new Class JavaDoc[]{String JavaDoc.class});
926       Method JavaDoc setZip = addrClazz.getMethod("setZip", new Class JavaDoc[]{int.class});
927       Object JavaDoc addr = addrClazz.newInstance();
928       setCity.invoke(addr, new Object JavaDoc[]{"San Jose"});
929       setStreet.invoke(addr, new Object JavaDoc[]{"1007 Home"});
930       setZip.invoke(addr, new Object JavaDoc[]{90210});
931
932       Class JavaDoc benClazz = loader.loadClass(PERSON_CLASSNAME);
933       Method JavaDoc setName = benClazz.getMethod("setName", new Class JavaDoc[]{String JavaDoc.class});
934       Method JavaDoc setAddress = benClazz.getMethod("setAddress", new Class JavaDoc[]{addrClazz});
935       Object JavaDoc ben = benClazz.newInstance();
936       setName.invoke(ben, new Object JavaDoc[]{"Ben"});
937       setAddress.invoke(ben, new Object JavaDoc[]{addr});
938
939       return ben;
940    }
941
942    private class CacheActivator extends CacheUser
943    {
944
945       CacheActivator(Semaphore JavaDoc semaphore,
946                      String JavaDoc name,
947                      boolean sync)
948               throws Exception JavaDoc
949       {
950          super(semaphore, name, sync, false);
951       }
952
953       void useCache() throws Exception JavaDoc
954       {
955          TestingUtil.sleepRandom(5000);
956          cache.getRegion(A_B, true).activate();
957 // System.out.println(name + " activated region" + " " + System.currentTimeMillis());
958
Fqn childFqn = Fqn.fromString("/a/b/" + name);
959
960          cache.put(childFqn, "KEY", "VALUE");
961 // System.out.println(name + " put fqn " + childFqn + " " + System.currentTimeMillis());
962

963       }
964
965       public Object JavaDoc getCacheValue(Fqn fqn) throws CacheException
966       {
967          return cache.get(fqn, "KEY");
968       }
969    }
970
971    private class StaggeredWebDeployerActivator extends CacheUser
972    {
973
974       int regionCount = 15;
975
976       StaggeredWebDeployerActivator(Semaphore JavaDoc semaphore,
977                                     String JavaDoc name,
978                                     boolean sync,
979                                     int regionCount)
980               throws Exception JavaDoc
981       {
982          super(semaphore, name, sync, false);
983          this.regionCount = regionCount;
984       }
985
986       void useCache() throws Exception JavaDoc
987       {
988          for (int i = 0; i < regionCount; i++)
989          {
990             cache.getRegion(Fqn.fromString("/a/" + i), true).activate();
991
992             Fqn childFqn = Fqn.fromString("/a/" + i + "/" + name);
993             cache.put(childFqn, "KEY", "VALUE");
994
995             TestingUtil.sleepThread(1000);
996          }
997       }
998
999       public Object JavaDoc getCacheValue(Fqn fqn) throws CacheException
1000      {
1001         return cache.get(fqn, "KEY");
1002      }
1003   }
1004
1005   private class CacheStressor extends CacheUser
1006   {
1007      private Random JavaDoc random = new Random JavaDoc(System.currentTimeMillis());
1008      private boolean putsStopped = false;
1009      private boolean stopped = false;
1010
1011      CacheStressor(Semaphore JavaDoc semaphore,
1012                    String JavaDoc name,
1013                    boolean sync)
1014              throws Exception JavaDoc
1015      {
1016         super(semaphore, name, sync, true);
1017      }
1018
1019      void useCache() throws Exception JavaDoc
1020      {
1021         // Do continuous puts into the cache. Use our own nodes,
1022
// as we're not testing conflicts between writer nodes,
1023
// just whether activation causes problems
1024
int factor = 0;
1025         int i = 0;
1026         Fqn fqn = null;
1027
1028         boolean acquired = false;
1029         while (!stopped)
1030         {
1031            if (i > 0)
1032            {
1033               acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
1034               if (!acquired)
1035                  throw new Exception JavaDoc(name + " cannot acquire semaphore");
1036            }
1037
1038            while (!putsStopped)
1039            {
1040               factor = random.nextInt(50);
1041
1042               fqn = Fqn.fromString("/" + name + "/" + String.valueOf(factor % SUBTREE_SIZE));
1043               Integer JavaDoc value = factor / SUBTREE_SIZE;
1044               cache.put(fqn, "KEY", value);
1045
1046               TestingUtil.sleepThread((long) factor);
1047
1048               i++;
1049            }
1050
1051            System.out.println(name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
1052
1053            semaphore.release();
1054
1055            // Go to sleep until directed otherwise
1056
while (!stopped && putsStopped)
1057               TestingUtil.sleepThread((long) 100);
1058         }
1059      }
1060
1061      public void stopPuts()
1062      {
1063         putsStopped = true;
1064      }
1065
1066      public void startPuts()
1067      {
1068         putsStopped = false;
1069      }
1070
1071      public void stopThread()
1072      {
1073         stopped = true;
1074         if (thread.isAlive())
1075            thread.interrupt();
1076      }
1077
1078
1079   }
1080}
Popular Tags