KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > statetransfer > ForcedStateTransferTest


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22
23 package org.jboss.cache.statetransfer;
24
25 import org.jboss.cache.AbstractCacheListener;
26 import org.jboss.cache.CacheException;
27 import org.jboss.cache.CacheSPI;
28 import org.jboss.cache.Fqn;
29 import org.jboss.cache.Version;
30 import org.jboss.cache.misc.TestingUtil;
31
32 import javax.transaction.Synchronization JavaDoc;
33 import javax.transaction.Transaction JavaDoc;
34 import javax.transaction.TransactionManager JavaDoc;
35 import java.util.Map JavaDoc;
36
37
38 /**
39  * Tests the ability to force a state transfer in the presence of
40  * transactional and non-transactional threads that are hung holding
41  * locks in the cache.
42  *
43  * @author Brian Stansberry
44  * @version $Revision$
45  */

46 public class ForcedStateTransferTest extends StateTransferTestBase
47 {
48    /**
49     * Starts a cache in a separate thread, allowing the main thread
50     * to abort if state transfer is taking too long.
51     */

52    static class CacheStarter extends Thread JavaDoc
53    {
54       CacheSPI cache;
55       boolean useMarshalling;
56       Exception JavaDoc failure;
57
58       CacheStarter(CacheSPI cache, boolean useMarshalling)
59       {
60          this.cache = cache;
61          this.useMarshalling = useMarshalling;
62       }
63
64       public void run()
65       {
66          try
67          {
68             cache.start();
69
70             if (useMarshalling)
71             {
72                // If we don't do initial state transfer, there is
73
// no guarantee of start() blocking until the view is received
74
// so we need to do it ourself
75
TestingUtil.blockUntilViewReceived(cache, 2, 60000);
76                cache.getRegion(Fqn.ROOT, true).activate();
77             }
78          }
79          catch (Exception JavaDoc e)
80          {
81             failure = e;
82          }
83       }
84    }
85
86    /**
87     * Generic superclass of classes that perform some operation on the
88     * cache that is intended to hang with a lock held on certain nodes.
89     */

90    static abstract class TaskRunner extends Thread JavaDoc
91    {
92       CacheSPI cache;
93       Fqn fqn;
94       String JavaDoc value;
95       Exception JavaDoc failure;
96       boolean asleep = false;
97
98       TaskRunner(CacheSPI cache, String JavaDoc rootFqn, String JavaDoc value)
99       {
100          this.cache = cache;
101          this.value = value;
102          this.fqn = new Fqn(Fqn.fromString(rootFqn), value);
103       }
104
105       public void run()
106       {
107          try
108          {
109             // do whatever the task is
110
executeTask();
111          }
112          catch (Exception JavaDoc e)
113          {
114             if (!isDone())
115                failure = e;
116          }
117          finally
118          {
119             asleep = false;
120             // hook to allow final processing
121
finalCleanup();
122          }
123       }
124
125       abstract void executeTask() throws Exception JavaDoc;
126
127       abstract boolean isDone();
128
129       void finalCleanup()
130       {
131       }
132
133       boolean isAsleep()
134       {
135          return asleep;
136       }
137    }
138
139    /**
140     * Hangs with an active or rollback-only transaction holding locks.
141     */

142    static class TxRunner extends TaskRunner
143    {
144       Transaction JavaDoc tx = null;
145       boolean rollback = false;
146       boolean done = true;
147
148       TxRunner(CacheSPI cache, String JavaDoc rootFqn, String JavaDoc value, boolean rollback)
149       {
150          super(cache, rootFqn, value);
151          this.rollback = rollback;
152       }
153
154       void executeTask() throws Exception JavaDoc
155       {
156          TransactionManager JavaDoc tm = cache.getTransactionManager();
157          tm.begin();
158          tx = tm.getTransaction();
159
160          cache.put(fqn, "KEY", value);
161
162          if (rollback)
163             tx.setRollbackOnly();
164
165          asleep = true;
166          TestingUtil.sleepThread((long) 25000);
167          done = true;
168       }
169
170       void finalCleanup()
171       {
172          if (tx != null)
173          {
174             try { tx.commit(); } catch (Exception JavaDoc ignore) {}
175          }
176       }
177
178       boolean isDone()
179       {
180          return done;
181       }
182    }
183
184    /**
185     * TreeCacheListener that hangs the thread in nodeModified().
186     */

187    static class HangThreadListener extends AbstractCacheListener
188    {
189       boolean asleep;
190       Fqn toHang;
191       boolean alreadyHung;
192       boolean done;
193
194       HangThreadListener(Fqn toHang)
195       {
196          this.toHang = toHang;
197       }
198
199       public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map JavaDoc data)
200       {
201          if (!pre) hangThread(fqn);
202       }
203
204       private void hangThread(Fqn fqn)
205       {
206          if (!alreadyHung && toHang.equals(fqn))
207          {
208             asleep = true;
209             //System.out.println("Hanging thread changing node " + fqn);
210
alreadyHung = true;
211             TestingUtil.sleepThread((long) 30000);
212             done = true;
213             asleep = false;
214          }
215       }
216    }
217
218    /**
219     * Hangs with a non-transactional thread holding locks.
220     */

221    static class HangThreadRunner extends TaskRunner
222    {
223       HangThreadListener listener;
224
225       HangThreadRunner(CacheSPI cache, String JavaDoc rootFqn, String JavaDoc value)
226       {
227          super(cache, rootFqn, value);
228          listener = new HangThreadListener(fqn);
229          cache.addCacheListener(listener);
230       }
231
232       void executeTask() throws Exception JavaDoc
233       {
234          // Just do a put and the listener will hang the thread
235
cache.put(fqn, "KEY", value);
236       }
237
238       boolean isAsleep()
239       {
240          return listener.asleep;
241       }
242
243       boolean isDone()
244       {
245          return listener.done;
246       }
247    }
248
249    /**
250     * Synchronization that hangs the thread either in
251     * beforeCompletion() or afterCompletion().
252     */

253    static class HangThreadSynchronization implements Synchronization JavaDoc
254    {
255       boolean asleep;
256       boolean hangBefore;
257       boolean done;
258
259       HangThreadSynchronization(boolean hangBefore)
260       {
261          this.hangBefore = hangBefore;
262       }
263
264       public void beforeCompletion()
265       {
266          if (hangBefore)
267          {
268             hang();
269          }
270       }
271
272       public void afterCompletion(int status)
273       {
274          if (!hangBefore)
275          {
276             hang();
277          }
278       }
279
280       void hang()
281       {
282          asleep = true;
283          TestingUtil.sleepThread((long) 30000);
284          done = true;
285       }
286
287    }
288
289    /**
290     * Hangs with a transactional thread either in the beforeCompletion()
291     * or afterCompletion() phase holding locks.
292     */

293    static class SynchronizationTxRunner extends TaskRunner
294    {
295       Transaction JavaDoc tx = null;
296       HangThreadSynchronization sync;
297
298       SynchronizationTxRunner(CacheSPI cache, String JavaDoc rootFqn, String JavaDoc value, boolean hangBefore)
299       {
300          super(cache, rootFqn, value);
301          this.sync = new HangThreadSynchronization(hangBefore);
302       }
303
304       void executeTask() throws Exception JavaDoc
305       {
306          TransactionManager JavaDoc tm = cache.getTransactionManager();
307          tm.begin();
308          tx = tm.getTransaction();
309          tx.registerSynchronization(sync);
310
311          cache.put(fqn, "KEY", value);
312
313          // Committing the tx will hang the thread
314
tx.commit();
315       }
316
317       boolean isAsleep()
318       {
319          return sync.asleep;
320       }
321
322       boolean isDone()
323       {
324          return sync.done;
325       }
326    }
327
328    /**
329     * Tests the ability to force a state transfer in the presence
330     * of active transactions on the sending cache.
331     *
332     * @throws Exception
333     */

334    public void testActiveTransaction() throws Exception JavaDoc
335    {
336       String JavaDoc[] values = {"A", "B", "C"};
337       transactionTest(values, false, "REPEATABLE_READ");
338    }
339
340    /**
341     * Tests the ability to force a state transfer in the presence
342     * of a transaction marked rollback-only on the sending cache.
343     *
344     * @throws Exception
345     */

346    public void testRollbackOnlyTransaction() throws Exception JavaDoc
347    {
348       String JavaDoc[] values = {"A", "B", "C"};
349       transactionTest(values, true, "REPEATABLE_READ");
350    }
351
352    /**
353     * Run a basic test with transactional threads doing puts and then
354     * hanging before committing.
355     *
356     * @param values node names under which puts should be done
357     * @param rollback should the transactions be marked rollback-only
358     * before hanging
359     * @param isolationLevel cache's isolation level
360     * @throws Exception
361     */

362    private void transactionTest(String JavaDoc[] values,
363                                 boolean rollback,
364                                 String JavaDoc isolationLevel) throws Exception JavaDoc
365    {
366       // Create the cache from which state will be requested
367
CacheSPI sender = initializeSender(isolationLevel, false, false);
368
369       // Start threads that will do operations on the cache and then hang
370
TxRunner[] runners =
371               initializeTransactionRunners(values, sender, "/LOCK", rollback);
372
373       // Create and start the cache that requests a state transfer
374
CacheSPI receiver = startReceiver(isolationLevel, false, false);
375
376       // Confirm the receiver got the expected state and the threads are OK
377
checkResults(receiver, runners, false);
378    }
379
380    /**
381     * Creates and starts a CacheSPI from which another cache will request
382     * state. Also adds value "X" under key "KEY" in node "/OK". This node
383     * should be present in the transferred state in any test.
384     *
385     * @param isolationLevel cache's isolation level
386     * @param replSync is cache REPL_SYNC?
387     * @param useMarshalling is the activateRegion() API to be used?
388     * @return the cache
389     * @throws Exception
390     */

391    private CacheSPI initializeSender(String JavaDoc isolationLevel,
392                                      boolean replSync,
393                                      boolean useMarshalling) throws Exception JavaDoc
394    {
395       CacheSPI sender = createCache("sender", isolationLevel, replSync, useMarshalling, true);
396
397       if (useMarshalling)
398          sender.getRegion(Fqn.ROOT, true).activate();
399
400       sender.put(Fqn.fromString("/OK"), "KEY", "X");
401
402       return sender;
403    }
404
405    /**
406     * Start a set of TaskRunner threads that do a transactional put on the cache
407     * and then go to sleep with the transaction uncommitted.
408     *
409     * @param values the name of the node that should be put under
410     * rootFqn, and the value that shoud be put in its map
411     * @param sender the cache on which the put should be done
412     * @param rootFqn Fqn under which the new node should be inserted -- the
413     * Fqn of the new node will be /rootFqn/value
414     * @param rollback <code>true</code> if the tx should be marked
415     * rollback-only before the thread goes to sleep
416     * @return the TaskRunner threads
417     */

418    private TxRunner[] initializeTransactionRunners(String JavaDoc[] values,
419                                                    CacheSPI sender,
420                                                    String JavaDoc rootFqn,
421                                                    boolean rollback)
422    {
423       TxRunner[] runners = new TxRunner[values.length];
424       for (int i = 0; i < values.length; i++)
425       {
426          runners[i] = new TxRunner(sender, rootFqn, values[i], rollback);
427          initializeRunner(runners[i]);
428       }
429
430       return runners;
431    }
432
433    /**
434     * Starts the runner and waits up to 1 second until it is asleep, confirming
435     * that it is alive.
436     *
437     * @param runner
438     */

439    private void initializeRunner(TaskRunner runner)
440    {
441       runner.start();
442
443       // Loop until it executes its put and goes to sleep (i.e. hangs)
444
long start = System.currentTimeMillis();
445       while (!(runner.isAsleep()))
446       {
447          assertTrue(runner.getClass().getName() + " " + runner.value +
448                  " is alive", runner.isAlive());
449          // Avoid hanging test fixture by only waiting 1 sec before failing
450
assertFalse(runner.getClass().getName() + " " + runner.value +
451                  " has not timed out",
452                  (System.currentTimeMillis() - start) > 1000);
453       }
454    }
455
456    /**
457     * Checks whether the receiver cache has the expected state and whether
458     * the runners ran cleanly. Also terminates the runners.
459     *
460     * @param receiver the cache that received state
461     * @param runners the task runners
462     * @param allowValues true if the runners' values are expected to
463     * be in the cache state; false otherwise
464     * @throws CacheException
465     */

466    private void checkResults(CacheSPI receiver,
467                              TaskRunner[] runners,
468                              boolean allowValues) throws CacheException
469    {
470       // Check that the runners are alive and kill them
471
boolean[] aliveStates = new boolean[runners.length];
472       for (int i = 0; i < runners.length; i++)
473       {
474          aliveStates[i] = runners[i].isAlive();
475          if (aliveStates[i])
476             runners[i].interrupt();
477       }
478
479       // Confirm we got the "non-hung" state
480
assertEquals("OK value correct", "X", receiver.get(Fqn.fromString("/OK"), "KEY"));
481
482       for (int i = 0; i < runners.length; i++)
483       {
484          assertTrue("Runner " + runners[i].value + " was alive", aliveStates[i]);
485          assertNull("Runner " + runners[i].value + " ran cleanly", runners[i].failure);
486          if (allowValues)
487          {
488             assertEquals("Correct value in " + runners[i].fqn,
489                     runners[i].value, receiver.get(runners[i].fqn, "KEY"));
490          }
491          else
492          {
493             assertNull("No value in " + runners[i].fqn,
494                     receiver.get(runners[i].fqn, "KEY"));
495          }
496       }
497    }
498
499    /**
500     * Tests the ability to force a state transfer in the presence of
501     * a hung thread holding a lock on the sending cache.
502     *
503     * @throws Exception
504     */

505    public void testHungThread() throws Exception JavaDoc
506    {
507       // Create the cache from which state will be requested
508
CacheSPI sender = initializeSender("REPEATABLE_READ", false, false);
509
510       // Start threads that will do operations on the cache and then hang
511
String JavaDoc[] values = {"A", "B", "C"};
512       HangThreadRunner[] runners = initializeHangThreadRunners(values, sender, "/LOCK");
513
514       // Create and start the cache that requests a state transfer
515
CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false);
516
517       // Confirm the receiver got the expected state and the threads are OK
518
checkResults(receiver, runners, true);
519    }
520
521    /**
522     * Start a set of TaskRunner threads that do a non-transactional put on the
523     * cache and then go to sleep with the thread hung in a
524     * TreeCacheListener and locks unreleased
525     *
526     * @param values the name of the node that should be put under
527     * rootFqn, and the value that shoud be put in its map
528     * @param sender the cache on which the put should be done
529     * @param rootFqn Fqn under which the new node should be inserted -- the
530     * Fqn of the new node will be /rootFqn/value
531     * @return the TaskRunner threads
532     */

533    private HangThreadRunner[] initializeHangThreadRunners(String JavaDoc[] values,
534                                                           CacheSPI sender,
535                                                           String JavaDoc rootFqn)
536    {
537       HangThreadRunner[] runners = new HangThreadRunner[values.length];
538       for (int i = 0; i < values.length; i++)
539       {
540          runners[i] = new HangThreadRunner(sender, rootFqn, values[i]);
541          initializeRunner(runners[i]);
542       }
543
544       return runners;
545    }
546
547    /**
548     * Tests the ability to force a state transfer in the presence
549     * of a transaction that is hung in a
550     * Synchronization.beforeCompletion() call.
551     *
552     * @throws Exception
553     */

554    public void testBeforeCompletionLock() throws Exception JavaDoc
555    {
556       synchronizationTest(true);
557    }
558
559    /**
560     * Tests the ability to force a state transfer in the presence
561     * of a transaction that is hung in a
562     * Synchronization.beforeCompletion() call.
563     *
564     * @throws Exception
565     */

566    public void testAfterCompletionLock() throws Exception JavaDoc
567    {
568       synchronizationTest(false);
569    }
570
571    /**
572     * Tests the ability to force a state transfer in the presence
573     * of a transaction that is hung either in a
574     * Synchronization.beforeCompletion() or Synchronization.afterCompletion()
575     * call.
576     *
577     * @param hangBefore <code>true</code> if the thread should hang in
578     * <code>beforeCompletion()</code>, <code>false</code>
579     * if it should hang in <code>afterCompletion</code>
580     * @throws Exception
581     */

582    private void synchronizationTest(boolean hangBefore) throws Exception JavaDoc
583    {
584       CacheSPI sender = initializeSender("REPEATABLE_READ", false, false);
585
586       String JavaDoc[] values = {"A", "B", "C"};
587       SynchronizationTxRunner[] runners =
588               initializeSynchronizationTxRunners(values, sender, "/LOCK", hangBefore);
589
590       CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false);
591
592       checkResults(receiver, runners, !hangBefore);
593    }
594
595
596    /**
597     * Start a set of TaskRunner threads that do a transactional put on the
598     * cache and then go to sleep with the thread hung in a
599     * transaction Synchronization call and locks unreleased
600     *
601     * @param values the name of the node that should be put under
602     * rootFqn, and the value that shoud be put in its map
603     * @param sender the cache on which the put should be done
604     * @param rootFqn Fqn under which the new node should be inserted -- the
605     * Fqn of the new node will be /rootFqn/value
606     * @param hangBefore <code>true</code> if the thread should hang in
607     * <code>beforeCompletion()</code>, <code>false</code>
608     * if it should hang in <code>afterCompletion</code>
609     * @return the TaskRunner threads
610     */

611    private SynchronizationTxRunner[] initializeSynchronizationTxRunners(String JavaDoc[] values,
612                                                                         CacheSPI sender,
613                                                                         String JavaDoc rootFqn,
614                                                                         boolean hangBefore)
615    {
616       SynchronizationTxRunner[] runners =
617               new SynchronizationTxRunner[values.length];
618       for (int i = 0; i < values.length; i++)
619       {
620          runners[i] = new SynchronizationTxRunner(sender, rootFqn, values[i], hangBefore);
621          initializeRunner(runners[i]);
622       }
623       return runners;
624    }
625
626    /**
627     * Tests the ability to force a state transfer in the presence
628     * of multiple issues on the sending cache (active transactions,
629     * rollback-only transactions, transactions hung in beforeCompletion() and
630     * afterCompletion() calls, as well as hung threads).
631     *
632     * @throws Exception
633     */

634    public void testMultipleProblems() throws Exception JavaDoc
635    {
636       multipleProblemTest("REPEATABLE_READ", "/LOCK", false, false);
637    }
638
639    /**
640     * Tests the ability to force a state transfer in the presence
641     * of an active transaction in the sending cache
642     * and isolation level SERIALIZABLE.
643     *
644     * @throws Exception
645     */

646    public void testSerializableIsolation() throws Exception JavaDoc
647    {
648       multipleProblemTest("SERIALIZABLE", "/", false, false);
649    }
650
651    /**
652     * Tests the ability to force a partial state transfer with multiple
653     * "problem" actors holding locks on the sending node. Same test as
654     * {@link #testMultipleProblems()} except the partial state transfer API is
655     * used instead of an initial state transfer.
656     *
657     * @throws Exception
658     */

659    public void testPartialStateTransfer() throws Exception JavaDoc
660    {
661       multipleProblemTest("REPEATABLE_READ", "/LOCK", false, true);
662    }
663
664    /**
665     * Tests the ability to force a partial state transfer with multiple
666     * "problem" actors holding locks on the sending node and cache mode
667     * REPL_SYNC. Same test as {@link #testMultipleProblems()} except the
668     * cache is configured for REPL_SYNC.
669     *
670     * @throws Exception
671     */

672    public void testReplSync() throws Exception JavaDoc
673    {
674       multipleProblemTest("REPEATABLE_READ", "/LOCK", true, false);
675    }
676
677    /**
678     * Tests the ability to force a partial state transfer with multiple
679     * "problem" actors holding locks on the sending node.
680     *
681     * @throws Exception
682     */

683    private void multipleProblemTest(String JavaDoc isolationLevel,
684                                     String JavaDoc rootFqn,
685                                     boolean replSync,
686                                     boolean useMarshalling) throws Exception JavaDoc
687    {
688       CacheSPI sender = initializeSender(isolationLevel, replSync, useMarshalling);
689
690       // Do the "after" nodes first, otherwise if there is a /LOCK parent
691
// node, the rollback of a tx will remove it causing the test to fail
692
// since the child node created by it will be gone as well.
693
// This is really a REPEATABLE_READ bug that this test isn't intended
694
// to catch; will create a separate locking test that shows it
695
String JavaDoc[] val1 = {"A", "B", "C"};
696       SynchronizationTxRunner[] after =
697               initializeSynchronizationTxRunners(val1, sender, rootFqn, false);
698
699       String JavaDoc[] val2 = {"D", "E", "F"};
700       SynchronizationTxRunner[] before =
701               initializeSynchronizationTxRunners(val2, sender, rootFqn, true);
702
703       String JavaDoc[] val3 = {"G", "H", "I"};
704       TxRunner[] active =
705               initializeTransactionRunners(val3, sender, rootFqn, false);
706
707       String JavaDoc[] val4 = {"J", "K", "L"};
708       TxRunner[] rollback =
709               initializeTransactionRunners(val4, sender, rootFqn, true);
710
711       String JavaDoc[] val5 = {"M", "N", "O"};
712       HangThreadRunner[] threads =
713               initializeHangThreadRunners(val5, sender, rootFqn);
714
715       CacheSPI receiver = startReceiver(isolationLevel, replSync, useMarshalling);
716
717       checkResults(receiver, active, false);
718       checkResults(receiver, rollback, false);
719       checkResults(receiver, before, false);
720       checkResults(receiver, after, true);
721       checkResults(receiver, threads, true);
722    }
723
724    protected String JavaDoc getReplicationVersion()
725    {
726       return Version.version;
727    }
728
729    /**
730     * Starts a cache that requests state from another cache. Confirms
731     * that the receiver cache starts properly.
732     *
733     * @param isolationLevel
734     * @param replSync
735     * @param useMarshalling
736     * @return the receiver cache
737     * @throws Exception
738     */

739    private CacheSPI startReceiver(String JavaDoc isolationLevel,
740                                   boolean replSync,
741                                   boolean useMarshalling) throws Exception JavaDoc
742    {
743       CacheSPI receiver = createCache("receiver", isolationLevel, replSync, useMarshalling, false);
744
745       // Start the cache in a separate thread so we can kill the
746
// thread if the cache doesn't start properly
747
CacheStarter starter = new CacheStarter(receiver, useMarshalling);
748
749       starter.start();
750
751       starter.join(20000);
752
753       boolean alive = starter.isAlive();
754       if (alive)
755          starter.interrupt();
756       assertFalse("Starter finished", alive);
757
758       assertNull("No exceptions in starter", starter.failure);
759
760       return receiver;
761    }
762
763    /**
764     * Override the superclass version to set an unlimited state transfer timeout
765     * and a 1 sec lock acquisition timeout.
766     */

767    private CacheSPI createCache(String JavaDoc cacheID,
768                                 String JavaDoc isolationLevel,
769                                 boolean replSync,
770                                 boolean useMarshalling,
771                                 boolean startCache)
772            throws Exception JavaDoc
773    {
774       CacheSPI result = super.createCache(cacheID, replSync,
775               useMarshalling, false, false, false);
776       result.getConfiguration().setInitialStateRetrievalTimeout(0);
777       result.getConfiguration().setLockAcquisitionTimeout(1000);
778       result.getConfiguration().setIsolationLevel(isolationLevel);
779
780       if (startCache)
781          result.start();
782
783       return result;
784    }
785
786
787 }
788
Popular Tags