KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > pojo > statetransfer > StateTransferAopTestBase


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7
8 package org.jboss.cache.pojo.statetransfer;
9
10 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
11 import junit.framework.TestCase;
12 import org.apache.commons.logging.Log;
13 import org.apache.commons.logging.LogFactory;
14 import org.jboss.cache.Cache;
15 import org.jboss.cache.CacheException;
16 import org.jboss.cache.CacheSPI;
17 import org.jboss.cache.Fqn;
18 import org.jboss.cache.config.CacheLoaderConfig;
19 import org.jboss.cache.config.Configuration;
20 import org.jboss.cache.factories.XmlConfigurationParser;
21 import org.jboss.cache.loader.CacheLoader;
22 import org.jboss.cache.misc.TestingUtil;
23 import org.jboss.cache.pojo.PojoCache;
24 import org.jboss.cache.pojo.PojoCacheFactory;
25 import org.jboss.cache.pojo.test.Address;
26 import org.jboss.cache.pojo.test.Person;
27 import org.jboss.cache.xml.XmlHelper;
28 import org.w3c.dom.Element JavaDoc;
29
30 import javax.transaction.TransactionManager JavaDoc;
31 import java.io.File JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.HashSet JavaDoc;
34 import java.util.Map JavaDoc;
35 import java.util.Random JavaDoc;
36 import java.util.Set JavaDoc;
37
38 /**
39  * Tests state transfer in PojoCache.
40  *
41  * @author <a HREF="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
42  * @version $Revision: 1.11 $
43  */

44 public abstract class StateTransferAopTestBase extends TestCase
45 {
46    private Map JavaDoc caches;
47
48    public static final String JavaDoc A_B_1 = "/a/b/1";
49    public static final String JavaDoc A_B_2 = "/a/b/2";
50    public static final String JavaDoc A_C_1 = "/a/c/1";
51    public static final String JavaDoc A_C_2 = "/a/c/2";
52
53    public static final Fqn A_B_1_f = Fqn.fromString("/a/b/1");
54    public static final Fqn A_B_2_f = Fqn.fromString("/a/b/2");
55    public static final Fqn A_C_1_f = Fqn.fromString("/a/c/1");
56    public static final Fqn A_C_2_f = Fqn.fromString("/a/c/2");
57
58    private static final int SUBTREE_SIZE = 10;
59
60    private Person joe;
61    private Person bob;
62    private Person jane;
63    private Person jill;
64    private Address addr1;
65    private Address addr2;
66
67    public static final Integer JavaDoc TWENTY = 20;
68    public static final Integer JavaDoc TWENTYFIVE = 25;
69    public static final Integer JavaDoc FORTY = 40;
70
71    private Log log = LogFactory.getLog(StateTransferAopTestBase.class);
72
73    public void testInitialStateTransfer() throws Exception JavaDoc
74    {
75       log.info("Enter testInitialStateTransfer");
76
77       PojoCache cache1 = createCache("cache1", true, false, false);
78
79       cache1.attach(A_B_1, joe);
80       cache1.attach(A_B_2, jane);
81       cache1.attach(A_C_1, bob);
82       cache1.attach(A_C_2, jill);
83
84       PojoCache cache2 = createCache("cache2", true, false, false);
85
86       // Pause to give caches time to see each other
87
// TestingUtil.blockUntilViewsReceived(new Cache[]
88
// {cache1.getCache(), cache2.getCache()}, 60000);
89

90       Person ab1 = (Person) cache2.find(A_B_1);
91       Person ab2 = (Person) cache2.find(A_B_2);
92       Person ac1 = (Person) cache2.find(A_C_1);
93       Person ac2 = (Person) cache2.find(A_C_2);
94       assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
95       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
96       assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
97       assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
98       assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
99       assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
100       assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
101       assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
102       assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
103       assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
104    }
105
106    public void testInitialStateTferWithLoader() throws Exception JavaDoc
107    {
108       log.info("Enter testInitialStateTferWithLoader");
109
110       PojoCache cache1 = createCache("cache1", false, false, true);
111
112       cache1.attach(A_B_1, joe);
113       cache1.attach(A_B_2, jane);
114       cache1.attach(A_C_1, bob);
115       cache1.attach(A_C_2, jill);
116
117       PojoCache cache2 = createCache("cache2", false, false, true);
118
119       // Pause to give caches time to see each other
120
TestingUtil.blockUntilViewsReceived(new Cache[]
121               {cache1.getCache(), cache2.getCache()}, 60000);
122
123       Person ab1 = (Person) cache2.find(A_B_1);
124       Person ab2 = (Person) cache2.find(A_B_2);
125       Person ac1 = (Person) cache2.find(A_C_1);
126       Person ac2 = (Person) cache2.find(A_C_2);
127       assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
128       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
129       assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
130       assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
131       assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
132       assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
133       assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
134       assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
135       assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
136       assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
137    }
138
139    public void testPartialStateTransfer() throws Exception JavaDoc
140    {
141       log.info("Enter testPartialStateTransfer");
142
143       PojoCache cache1 = createCache("cache1", false, true, false);
144       cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
145
146       cache1.attach(A_B_1, joe);
147       cache1.attach(A_B_2, jane);
148
149       PojoCache cache2 = createCache("cache2", false, true, false);
150
151       // Pause to give caches time to see each other
152
TestingUtil.blockUntilViewsReceived(new Cache[]
153               {cache1.getCache(), cache2.getCache()}, 60000);
154
155       assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
156       assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
157
158       // TODO: Reinstate once we have proper FLUSH working.
159
cache2.getCache().getRegion(Fqn.fromString("/a"), true).activate();
160
161       Person ab1 = (Person) cache2.find(A_B_1);
162       Person ab2 = (Person) cache2.find(A_B_2);
163       assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
164       assertEquals("City for /a/b/1 is Anytown", joe.getAddress().getCity(), ab1.getAddress().getCity());
165       assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
166       assertEquals("City for /a/b/2 is Anytown", jane.getAddress().getCity(), ab2.getAddress().getCity());
167       assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
168
169       cache1.attach(A_C_1, bob);
170       cache1.attach(A_C_2, jill);
171
172       assertNotNull("/a/c/1 should be transferred per policy", cache2.find(A_C_1));
173
174       cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
175
176       cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
177
178       ab1 = (Person) cache1.find(A_B_1);
179       assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
180       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
181       ab2 = (Person) cache1.find(A_B_2);
182       assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
183       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
184       assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
185    }
186
187    public void testPartialStateTransferWithLoader() throws Exception JavaDoc
188    {
189       log.info("Enter testPartialStateTransferWithLoader");
190
191       PojoCache cache1 = createCache("cache1", false, true, true);
192       cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
193
194       cache1.attach(A_B_1, joe);
195       cache1.attach(A_B_2, jane);
196
197       PojoCache cache2 = createCache("cache2", false, true, true);
198
199       // Pause to give caches time to see each other
200
TestingUtil.blockUntilViewsReceived(new Cache[]
201               {cache1.getCache(), cache2.getCache()}, 60000);
202
203       CacheLoader loader = ((CacheSPI) cache2.getCache()).getCacheLoaderManager().getCacheLoader();
204
205       Map JavaDoc map = loader.get(A_B_1_f);
206       if (map != null)
207       {
208          assertNull("/a/b/1 name not transferred per policy", map.get("name"));
209          assertNull("/a/b/1 age not transferred per policy", map.get("age"));
210       }
211       map = loader.get(A_B_2_f);
212       if (map != null)
213       {
214          assertNull("/a/b/1 name not transferred per policy", map.get("name"));
215          assertNull("/a/b/1 age not transferred per policy", map.get("age"));
216       }
217       assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
218       assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
219
220       cache2.getCache().getRegion(Fqn.fromString("/a"), true).activate();
221
222 // assertEquals("Correct name from loader for /a/b/1", joe.getName(), loader.get(A_B_1_f).get("name"));
223
// assertEquals("Correct age from loader for /a/b/1", TWENTY, loader.get(A_B_1_f).get("age"));
224
// assertEquals("Correct name from loader for /a/b/2", jane.getName(), loader.get(A_B_2_f).get("name"));
225
// assertEquals("Correct age from loader for /a/b/2", TWENTYFIVE, loader.get(A_B_2_f).get("age"));
226

227
228       Person ab1 = (Person) cache2.find(A_B_1);
229       Person ab2 = (Person) cache2.find(A_B_2);
230       assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
231       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
232       assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
233       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
234       assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
235
236       cache1.attach(A_C_1, bob);
237       cache1.attach(A_C_2, jill);
238       Thread.sleep(200);
239
240       assertNotNull("/a/c/1 transferred per policy", cache2.find(A_C_1));
241       assertNotNull("/a/c/1 transferred per policy", cache2.find(A_C_2));
242
243       Person ac1 = (Person) cache2.find(A_C_1);
244       Person ac2 = (Person) cache2.find(A_C_2);
245       assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
246       assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
247       assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
248       assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
249       assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
250
251       cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
252
253       ab1 = (Person) cache1.find(A_B_1);
254       assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
255       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
256       ab2 = (Person) cache1.find(A_B_2);
257       assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
258       assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
259       assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
260       ac1 = (Person) cache1.find(A_C_1);
261       assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
262       assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
263       ac2 = (Person) cache1.find(A_C_2);
264       assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
265       assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
266       assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress());
267    }
268
269
270    /**
271     * Tests concurrent activation of the same subtree by multiple nodes in a
272     * REPL_SYNC environment. The idea is to see what would happen with a
273     * farmed deployment. See <code>concurrentActivationTest</code> for details.
274     *
275     * @throws Exception
276     */

277    public void testConcurrentActivationSync() throws Exception JavaDoc
278    {
279       log.info("Enter testConcurrentActivationSync");
280
281       concurrentActivationTest(true);
282    }
283
284    /**
285     * Tests concurrent activation of the same subtree by multiple nodes in a
286     * REPL_ASYNC environment. The idea is to see what would happen with a
287     * farmed deployment. See <code>concurrentActivationTest</code> for details.
288     *
289     * @throws Exception
290     */

291    public void testConcurrentActivationAsync() throws Exception JavaDoc
292    {
293       log.info("Enter testConcurrentActivationAsync");
294
295       concurrentActivationTest(false);
296    }
297
298    /**
299     * Starts 5 caches and then concurrently activates the same region under
300     * all 5, causing each to attempt a partial state transfer from the others.
301     * As soon as each cache has activated its region, it does a put to a node
302     * in the region, thus complicating the lives of the other caches trying
303     * to get partial state.
304     * <p/>
305     * Failure condition is if any node sees an exception or if the final state
306     * of all caches is not consistent.
307     *
308     * @param sync use REPL_SYNC or REPL_ASYNC
309     * @throws Exception
310     */

311    private void concurrentActivationTest(boolean sync) throws Exception JavaDoc
312    {
313       String JavaDoc[] names = {"A", "B", "C", "D", "E"};
314       int count = names.length;
315       CacheActivator[] activators = new CacheActivator[count];
316
317
318       try
319       {
320          // Create a semaphore and take all its tickets
321
Semaphore semaphore = new Semaphore(count);
322          for (int i = 0; i < count; i++)
323          {
324             semaphore.acquire();
325          }
326
327          // Create activation threads that will block on the semaphore
328
Cache[] caches = new Cache[count];
329          for (int i = 0; i < count; i++)
330          {
331             activators[i] = new CacheActivator(semaphore, names[i], sync);
332             caches[i] = activators[i].getCache();
333             activators[i].start();
334          }
335
336          // Make sure everyone is in sync
337
TestingUtil.blockUntilViewsReceived(caches, 60000);
338
339          // Release the semaphore to allow the threads to start work
340
semaphore.release(count);
341
342          // Sleep to ensure the threads get all the semaphore tickets
343
TestingUtil.sleepThread(1000);
344
345          // Reacquire the semaphore tickets; when we have them all
346
// we know the threads are done
347
for (int i = 0; i < count; i++)
348          {
349             boolean acquired = semaphore.attempt(60000);
350             if (!acquired)
351                fail("failed to acquire semaphore " + i);
352          }
353
354          // Sleep to allow any async calls to clear
355
if (!sync)
356             TestingUtil.sleepThread(500);
357
358          // Ensure the caches held by the activators see all the values
359
for (int i = 0; i < count; i++)
360          {
361             assertNull("Activator " + names[i] + " caught an exception",
362                     activators[i].getException());
363
364             for (int j = 0; j < count; j++)
365             {
366                String JavaDoc fqn = "/a/b/" + names[j];
367                Person p = (Person) activators[i].getCacheValue(fqn);
368                assertNotNull(names[i] + ":" + fqn + " is not null", p);
369                assertEquals("Correct name for " + names[i] + ":" + fqn,
370                        "Person " + names[j], p.getName());
371                assertEquals("Correct street for " + names[i] + ":" + fqn,
372                        names[j] + " Test Street", p.getAddress().getStreet());
373 // System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
374
}
375
376          }
377       }
378       catch (Exception JavaDoc ex)
379       {
380          fail(ex.getLocalizedMessage());
381       }
382       finally
383       {
384          for (int i = 0; i < count; i++)
385             activators[i].cleanup();
386       }
387
388    }
389
390    /**
391     * Tests partial state transfer under heavy concurrent load and REPL_SYNC.
392     * See <code>concurrentUseTest</code> for details.
393     *
394     * @throws Exception
395     */

396    public void testConcurrentUseSync() throws Exception JavaDoc
397    {
398       log.info("Enter testConcurrentUseSync");
399
400 // concurrentUseTest(true);
401
}
402
403    /**
404     * Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
405     * See <code>concurrentUseTest</code> for details.
406     *
407     * @throws Exception
408     */

409    public void testConcurrentUseAsync() throws Exception JavaDoc
410    {
411       log.info("Enter testConcurrentUseAsync");
412
413 // concurrentUseTest(false);
414
}
415
416    /**
417     * Initiates 5 caches, 4 with active trees and one with an inactive tree.
418     * Each of the active caches begins rapidly generating puts against nodes
419     * in a subtree for which it is responsible. The 5th cache activates
420     * each subtree, and at the end confirms no node saw any exceptions and
421     * that each node has consistent state.
422     *
423     * @param sync whether to use REPL_SYNC or REPL_ASYNCE
424     * @throws Exception
425     */

426    private void XconcurrentUseTest(boolean sync) throws Exception JavaDoc
427    {
428       String JavaDoc[] names = {"B", "C", "D", "E"};
429       int count = names.length;
430       CacheStressor[] stressors = new CacheStressor[count];
431
432       try
433       {
434
435          PojoCache cacheA = createCache("cacheA", sync, true, false, false);
436
437          Cache[] caches = new Cache[count + 1];
438          caches[0] = cacheA.getCache();
439
440          // Create a semaphore and take all its tickets
441
Semaphore semaphore = new Semaphore(count);
442          for (int i = 0; i < count; i++)
443          {
444             semaphore.acquire();
445          }
446
447          // Create stressor threads that will block on the semaphore
448

449          for (int i = 0; i < count; i++)
450          {
451             stressors[i] = new CacheStressor(semaphore, names[i], sync);
452             caches[i + 1] = stressors[i].getCache();
453             stressors[i].start();
454             // Give each one a chance to stabilize
455
TestingUtil.sleepThread(100);
456          }
457
458          // Make sure everyone's views are in sync
459
TestingUtil.blockUntilViewsReceived(caches, 60000);
460
461          // Repeat the basic test two times in order to involve inactivation
462
for (int x = 0; x < 2; x++)
463          {
464 // if (x > 0)
465
// {
466
// Reset things by inactivating the region
467
// and enabling the stressors
468
for (int i = 0; i < count; i++)
469             {
470                cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).deactivate();
471                log.info("TEST: Run " + x + "-- /" + names[i] + " inactivated on A");
472                stressors[i].startPuts();
473             }
474 // }
475

476             // Release the semaphore to allow the threads to start work
477
semaphore.release(count);
478
479             // Sleep to ensure the threads get all the semaphore tickets
480
// and to ensure puts are actively in progress
481
TestingUtil.sleepThread(300);
482
483             // Activate cacheA
484
for (int i = 0; i < count; i++)
485             {
486                log.info("TEST: Activating /" + names[i] + " on A");
487                cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).activate();
488                // Stop the stressor so we don't pollute cacheA's state
489
// with too many messages sent after activation -- we want
490
// to compare transferred state with the sender
491
stressors[i].stopPuts();
492                log.info("TEST: Run " + x + "-- /" + names[i] + " activated on A");
493                // Reacquire one semaphore ticket
494
boolean acquired = semaphore.attempt(60000);
495                if (!acquired)
496                   fail("failed to acquire semaphore " + names[i]);
497                log.info("TEST: Run " + x + "-- acquired semaphore from " + names[i]);
498
499                // Pause to allow other work to proceed
500
TestingUtil.sleepThread(100);
501             }
502
503             // Sleep to allow any in transit msgs to clear
504
if (!sync)
505                TestingUtil.sleepThread(2000);
506
507             // Ensure the stressors saw no exceptions
508
for (int i = 0; i < count; i++)
509             {
510                Exception JavaDoc e = stressors[i].getException();
511                if (e != null)
512                {
513                   log.error("Stressor " + names[i] + " caught an exception",
514                           e);
515                   throw e;
516                }
517             }
518
519 // log.info("Cache A details:\n" + cacheA.printDetails());
520

521             // Compare cache contents
522
Person p1 = null;
523             Person p2 = null;
524             for (int i = 0; i < count; i++)
525             {
526 // log.info("Cache " + names[i] + " details:\n" +
527
// stressors[i].getTreeCache().printDetails());
528

529                for (int j = 0; j < SUBTREE_SIZE; j++)
530                {
531
532                   String JavaDoc fqn = "/" + names[i] + "/" + j;
533                   log.info("TEST: Getting A:" + fqn);
534                   p1 = (Person) cacheA.find(fqn);
535                   boolean p1Null = p1 == null;
536                   log.info("TEST: Getting " + names[i] + ":" + fqn);
537 // p2 = (Person) stressors[i].getCache().find(fqn);
538
boolean p2Null = p2 == null;
539                   assertEquals("Run " + x + ": " + fqn +
540                           " null status matches", p1Null, p2Null);
541                   if (!p1Null)
542                   {
543                      assertEquals("Run " + x + ": A:" + fqn + " age matches " + names[i] + ":" + fqn,
544                              p1.getAge(), p2.getAge());
545                      assertEquals("Run " + x + ": A:" + fqn + " name matches " + names[i] + ":" + fqn,
546                              p1.getName(), p2.getName());
547                      assertEquals("Run " + x + ": A:" + fqn + " address matches " + names[i] + ":" + fqn,
548                              p1.getAddress().getStreet(),
549                              p2.getAddress().getStreet());
550                   }
551                }
552             }
553          }
554
555          for (int i = 0; i < count; i++)
556             stressors[i].stopThread();
557
558       }
559       finally
560       {
561          for (int i = 0; i < count; i++)
562          {
563             if (stressors[i] != null)
564                stressors[i].cleanup();
565          }
566       }
567
568    }
569
570    protected PojoCache createCache(String JavaDoc cacheID, boolean sync, boolean useMarshalling, boolean useCacheLoader)
571            throws Exception JavaDoc
572    {
573       return createCache(cacheID, sync, useMarshalling, useCacheLoader, true);
574    }
575
576    protected PojoCache createCache(String JavaDoc cacheID, boolean sync,
577                                    boolean useMarshalling,
578                                    boolean useCacheLoader,
579                                    boolean inactiveOnStartup)
580            throws Exception JavaDoc
581    {
582       if (caches.get(cacheID) != null)
583          throw new IllegalStateException JavaDoc(cacheID + " already created");
584
585       XmlConfigurationParser parser = new XmlConfigurationParser();
586       Configuration c = parser.parseFile(sync ? "META-INF/replSync-service.xml" : "META-INF/replAsync-service.xml");
587       c.setClusterName("StateTransferTestBase");
588       c.setReplVersionString(getReplicationVersion());
589       // Use a long timeout to facilitate setting debugger breakpoints
590
c.setInitialStateRetrievalTimeout(60000);
591       if (useMarshalling)
592       {
593          c.setUseRegionBasedMarshalling(true);
594          c.setInactiveOnStartup(inactiveOnStartup);
595       }
596       if (useCacheLoader)
597       {
598          configureCacheLoader(c, cacheID);
599       }
600
601       PojoCache cache = PojoCacheFactory.createCache(c, true);
602       // Put the cache in the map before starting, so if it fails in
603
// start it can still be destroyed later
604
caches.put(cacheID, cache);
605
606       return cache;
607    }
608
609    protected void configureCacheLoader(Configuration c, String JavaDoc cacheID) throws Exception JavaDoc
610    {
611       String JavaDoc tmp_location = getTempLocation(cacheID);
612
613       // Do cleanup in case it failed before
614
File JavaDoc file = new File JavaDoc(tmp_location);
615       cleanFile(file);
616       file.mkdir();
617       tmp_location = escapeWindowsPath(tmp_location);
618       c.setCacheLoaderConfig(getCacheLoaderConfig("org.jboss.cache.loader.FileCacheLoader", tmp_location));
619    }
620
621
622    protected CacheLoaderConfig getCacheLoaderConfig(String JavaDoc cl, String JavaDoc loc) throws Exception JavaDoc
623    {
624       String JavaDoc xml = " <config>\n" +
625               " \n" +
626               " <passivation>false</passivation>\n" +
627               " <preload></preload>\n" +
628               "\n" +
629               " <cacheloader>\n" +
630               " <class>" + cl + "</class>\n" +
631               " <properties>\n" +
632               " location=" + loc + "\n" +
633               " </properties>\n" +
634               " <async>false</async>\n" +
635               " <fetchPersistentState>true</fetchPersistentState>\n" +
636               " <ignoreModifications>false</ignoreModifications>\n" +
637               " </cacheloader>\n" +
638               " \n" +
639               " </config>";
640       Element JavaDoc element = XmlHelper.stringToElement(xml);
641       return XmlConfigurationParser.parseCacheLoaderConfig(element);
642    }
643
644    protected String JavaDoc getTempLocation(String JavaDoc cacheID)
645    {
646       String JavaDoc tmp_location = System.getProperty("java.io.tmpdir", "c:\\tmp");
647       File JavaDoc file = new File JavaDoc(tmp_location);
648       file = new File JavaDoc(file, cacheID);
649       return file.getAbsolutePath();
650    }
651
652    protected String JavaDoc escapeWindowsPath(String JavaDoc path)
653    {
654       if ('/' == File.separatorChar)
655          return path;
656
657       char[] chars = path.toCharArray();
658       StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
659       for (int i = 0; i < chars.length; i++)
660       {
661          if (chars[i] == '\\')
662             sb.append('\\');
663          sb.append(chars[i]);
664       }
665       return sb.toString();
666    }
667
668    protected abstract String JavaDoc getReplicationVersion();
669
670    protected void setUp() throws Exception JavaDoc
671    {
672       super.setUp();
673
674       caches = new HashMap JavaDoc();
675
676       addr1 = new Address();
677       addr1.setStreet("101 Oakview Dr");
678       addr1.setCity("Anytown");
679       addr1.setZip(11111);
680
681       addr2 = new Address();
682       addr2.setStreet("222 Happy Dr");
683       addr2.setCity("Fremont");
684       addr2.setZip(22222);
685
686       joe = new Person();
687       joe.setName("Joe");
688       joe.setAge(TWENTY);
689       joe.setAddress(addr1);
690       Set JavaDoc skills = new HashSet JavaDoc();
691       skills.add("TENNIS");
692       skills.add("CARPENTRY");
693       joe.setSkills(skills);
694
695       jane = new Person();
696       jane.setName("Jane");
697       jane.setAge(TWENTYFIVE);
698       jane.setAddress(addr1);
699       skills = new HashSet JavaDoc();
700       skills.add("JUJITSU");
701       skills.add("MACRAME");
702       jane.setSkills(skills);
703
704       bob = new Person();
705       bob.setName("Bob");
706       bob.setAge(FORTY);
707       bob.setAddress(addr2);
708       skills = new HashSet JavaDoc();
709       skills.add("LANGUAGES");
710       skills.add("LAWN BOWLING");
711       bob.setSkills(skills);
712
713       jill = new Person();
714       jill.setName("Jill");
715       jill.setAge(TWENTYFIVE);
716       jill.setAddress(addr2);
717       skills = new HashSet JavaDoc();
718       skills.add("FORTRAN");
719       skills.add("COBOL");
720       jane.setSkills(skills);
721    }
722
723    protected void tearDown() throws Exception JavaDoc
724    {
725       super.tearDown();
726
727       Set JavaDoc keys = caches.keySet();
728       if (!keys.isEmpty())
729       {
730          String JavaDoc[] cacheIDs = new String JavaDoc[keys.size()];
731          cacheIDs = (String JavaDoc[]) keys.toArray(cacheIDs);
732          PojoCache cache = (PojoCache) caches.get(cacheIDs[0]);
733          cache.getCache().removeNode(new Fqn("/"));
734          Thread.sleep(200);
735
736          for (int i = 0; i < cacheIDs.length; i++)
737          {
738             stopCache((PojoCache) caches.get(cacheIDs[i]));
739             File JavaDoc file = new File JavaDoc(getTempLocation(cacheIDs[i]));
740             cleanFile(file);
741          }
742       }
743    }
744
745    protected void stopCache(PojoCache cache)
746    {
747       if (cache != null)
748       {
749          try
750          {
751             cache.stop();
752             cache.destroy();
753          }
754          catch (Exception JavaDoc e)
755          {
756             log.error("Exception stopping cache " + e.getMessage(), e);
757          }
758       }
759    }
760
761    protected void cleanFile(File JavaDoc file)
762    {
763       File JavaDoc[] children = file.listFiles();
764       if (children != null)
765       {
766          for (int i = 0; i < children.length; i++)
767          {
768             cleanFile(children[i]);
769          }
770       }
771
772       if (file.exists())
773          file.delete();
774       if (file.exists())
775          file.deleteOnExit();
776    }
777
778    private class CacheActivator extends CacheUser
779    {
780
781       CacheActivator(Semaphore semaphore,
782                      String JavaDoc name,
783                      boolean sync)
784               throws Exception JavaDoc
785       {
786          super(semaphore, name, sync, false);
787       }
788
789       void useCache() throws Exception JavaDoc
790       {
791          cache.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
792          log.info("TEST: " + name + " activated region" + " " + System.currentTimeMillis());
793          String JavaDoc childFqn = "/a/b/" + name;
794
795          Person p = new Person();
796          p.setName("Person " + name);
797
798          Address addr = new Address();
799          addr.setStreet(name + " Test Street");
800          addr.setCity(name + ", CA");
801          p.setAddress(addr);
802
803          TestingUtil.sleepThread(1);
804
805 // tm.begin();
806
// try
807
// {
808
cache.attach(childFqn, p);
809          log.info("TEST: " + name + " put fqn " + childFqn + " " + System.currentTimeMillis());
810 // }
811
// catch (Exception e)
812
// {
813
// tm.setRollbackOnly();
814
// throw e;
815
// }
816
// finally
817
// {
818
// tm.commit();
819
// }
820

821       }
822
823       public Object JavaDoc getCacheValue(String JavaDoc fqn) throws CacheException
824       {
825          return cache.find(fqn);
826       }
827    }
828
829    private class CacheStressor extends CacheUser
830    {
831       private Random JavaDoc random;
832       private boolean putsStopped = false;
833       private boolean stopped = false;
834
835       CacheStressor(Semaphore semaphore,
836                     String JavaDoc name,
837                     boolean sync)
838               throws Exception JavaDoc
839       {
840          super(semaphore, name, sync, true);
841
842          random = new Random JavaDoc(System.currentTimeMillis() + name.hashCode());
843       }
844
845       void useCache() throws Exception JavaDoc
846       {
847          // Do lots of puts into the cache. Use our own nodes,
848
// as we're not testing conflicts between writer nodes,
849
// just whether activation causes problems
850
int factor = 0;
851          int i = 0;
852          String JavaDoc fqn = null;
853
854          Address addr1 = new Address();
855          addr1.setStreet("1 Test Street");
856          addr1.setCity("TestOne, CA");
857
858          Address addr2 = new Address();
859          addr2.setStreet("2 Test Street");
860          addr2.setCity("TestTwo, CA");
861
862          Person[] people = new Person[SUBTREE_SIZE];
863          boolean[] loaded = new boolean[SUBTREE_SIZE];
864          for (int j = 0; j < SUBTREE_SIZE; j++)
865          {
866             Person p = new Person();
867             p.setName("Person " + j);
868             p.setAge(j);
869             p.setAddress((j % 2 == 0) ? addr1 : addr2);
870             people[j] = p;
871          }
872
873          boolean acquired = true;
874          try
875          {
876             while (!stopped)
877             {
878                if (i > 0)
879                {
880                   acquired = semaphore.attempt(60000);
881                   if (!acquired)
882                      throw new Exception JavaDoc(name + " cannot acquire semaphore");
883                   log.info("TEST: " + name + " reacquired semaphore");
884                   System.out.println("TEST: " + name + " reacquired semaphore");
885                }
886
887                int lastIndex = -1;
888                int index = -1;
889                while (!putsStopped)
890                {
891                   // Ensure we don't operate on the same address twice in a row
892
// otherwise deadlock detection sometimes causes
893
// the _put for the second call to precede the commit
894
// for the first, leading to deadlock. This seems like a
895
// JGroups bug, but the purpose of this test isn't to expose it
896
while (index % 2 == lastIndex % 2)
897                   {
898                      factor = random.nextInt(50);
899                      index = factor % SUBTREE_SIZE;
900                   }
901
902                   lastIndex = index;
903
904                   TestingUtil.sleepThread(factor);
905
906                   fqn = "/" + name + "/" + String.valueOf(index);
907
908 // tm.begin();
909
// try
910
// {
911
if (loaded[index] == false)
912                   {
913                      cache.attach(fqn, people[index]);
914                      loaded[index] = true;
915                      log.info("TEST: " + name + " put Person at " + fqn);
916                   }
917                   else if (i % 2 == 0)
918                   {
919                      int newAge = factor / SUBTREE_SIZE;
920                      people[index].setAge(newAge);
921                   }
922                   else
923                   {
924                      people[index].getAddress().setStreet(factor + " Test Street");
925                   }
926 // }
927
// catch (Exception e)
928
// {
929
// tm.setRollbackOnly();
930
// throw e;
931
// }
932
// finally
933
// {
934
// tm.commit();
935
// }
936

937                   i++;
938                }
939
940                log.info("TEST: " + name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
941
942                semaphore.release();
943                acquired = false;
944
945                // Go to sleep until directed otherwise
946
while (!stopped && putsStopped)
947                   TestingUtil.sleepThread(100);
948             }
949          }
950          finally
951          {
952             if (acquired)
953                semaphore.release();
954          }
955       }
956
957 // public void start() throws Exception
958
// {
959
// super.start();
960
// cache.activateRegion("/" + name);
961
// }
962

963       public void stopPuts()
964       {
965          putsStopped = true;
966          log.info("TEST: " + name + " putsStopped");
967       }
968
969       public void startPuts()
970       {
971          putsStopped = false;
972       }
973
974       public void stopThread()
975       {
976          stopped = true;
977          if (thread.isAlive())
978             thread.interrupt();
979       }
980
981
982    }
983
984    private abstract class CacheUser implements Runnable JavaDoc
985    {
986       protected Semaphore semaphore;
987       protected PojoCache cache;
988       protected TransactionManager JavaDoc tm;
989       protected String JavaDoc name;
990       protected Exception JavaDoc exception;
991       protected Thread JavaDoc thread;
992
993       CacheUser(Semaphore semaphore,
994                 String JavaDoc name,
995                 boolean sync,
996                 boolean activateRoot)
997               throws Exception JavaDoc
998       {
999          this.cache = createCache(name, sync, true, false, !activateRoot);
1000         tm = ((CacheSPI) cache.getCache()).getTransactionManager();
1001         if (tm == null)
1002            throw new IllegalStateException JavaDoc("TransactionManager required");
1003         this.semaphore = semaphore;
1004         this.name = name;
1005
1006         log.info("TEST: Cache " + name + " started");
1007         System.out.println("TEST: Cache " + name + " started");
1008      }
1009
1010      public void run()
1011      {
1012         log.info("TEST: " + name + " started");
1013         System.out.println("TEST: " + name + " started");
1014
1015         boolean acquired = false;
1016         try
1017         {
1018            acquired = semaphore.attempt(60000);
1019            if (!acquired)
1020               throw new Exception JavaDoc(name + " cannot acquire semaphore");
1021            log.info("TEST: " + name + " acquired semaphore");
1022            System.out.println("TEST: " + name + " acquired semaphore");
1023            useCache();
1024
1025         }
1026         catch (Exception JavaDoc e)
1027         {
1028            log.error("TEST: " + name + ": " + e.getLocalizedMessage(), e);
1029
1030            // Save it for the test to check
1031
exception = e;
1032         }
1033         finally
1034         {
1035            if (acquired)
1036               semaphore.release();
1037         }
1038
1039      }
1040
1041      abstract void useCache() throws Exception JavaDoc;
1042
1043      public Exception JavaDoc getException()
1044      {
1045         return exception;
1046      }
1047
1048      public Cache getCache()
1049      {
1050         return cache.getCache();
1051      }
1052
1053      public void start() throws Exception JavaDoc
1054      {
1055         thread = new Thread JavaDoc(this);
1056         thread.start();
1057      }
1058
1059      public void cleanup()
1060      {
1061         if (thread != null && thread.isAlive())
1062            thread.interrupt();
1063      }
1064   }
1065}
1066
Popular Tags