KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > statetransfer > StateTransferManager


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.statetransfer;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.jboss.cache.CacheException;
12 import org.jboss.cache.CacheImpl;
13 import org.jboss.cache.Fqn;
14 import org.jboss.cache.Node;
15 import org.jboss.cache.NodeSPI;
16 import org.jboss.cache.config.Option;
17 import org.jboss.cache.loader.CacheLoaderManager;
18 import org.jboss.cache.loader.NodeData;
19 import org.jboss.cache.loader.NodeDataMarker;
20 import org.jboss.cache.lock.NodeLock;
21 import org.jboss.cache.lock.TimeoutException;
22 import org.jboss.cache.marshall.VersionAwareMarshaller;
23
24 import java.io.ObjectInputStream JavaDoc;
25 import java.io.ObjectOutputStream JavaDoc;
26
27
28 public class StateTransferManager
29 {
30    protected final static Log log = LogFactory.getLog(StateTransferManager.class);
31
32    public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
33
34    public static final String JavaDoc PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
35
36    private final CacheImpl cache;
37
38    public StateTransferManager(CacheImpl cache)
39    {
40       this.cache = cache;
41    }
42
43    public CacheImpl getTreeCache()
44    {
45       return cache;
46    }
47
48    /**
49     * Writes the state for the portion of the tree named by <code>fqn</code> to
50     * the provided OutputStream.
51     * <p/>
52     * <p/>
53     *
54     * @param fqn Fqn indicating the uppermost node in the
55     * portion of the tree whose state should be returned.
56     * @param timeout max number of ms this method should wait to acquire
57     * a read lock on the nodes being transferred
58     * @param force if a read lock cannot be acquired after
59     * <code>timeout</code> ms, should the lock acquisition
60     * be forced, and any existing transactions holding locks
61     * on the nodes be rolled back? <strong>NOTE:</strong>
62     * In release 1.2.4, this parameter has no effect.
63     * @param suppressErrors should any Throwable thrown be suppressed?
64     * @return a serialized byte[][], element 0 is the transient state
65     * (or null), and element 1 is the persistent state (or null).
66     */

67    public void getState(ObjectOutputStream JavaDoc out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable JavaDoc
68    {
69       VersionAwareMarshaller marshaller = cache.getMarshaller();
70
71       // can't give state for regions currently being activated/inactivated
72
boolean canProvideState = (!marshaller.isInactive(fqn.toString()) && cache.findNode(fqn) != null);
73
74       boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
75       CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
76       boolean fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
77
78       if (canProvideState && (fetchPersistentState || fetchTransientState))
79       {
80          out.writeBoolean(true);
81          StateTransferGenerator generator = getStateTransferGenerator();
82          Object JavaDoc owner = getOwnerForLock();
83          long startTime = System.currentTimeMillis();
84          NodeSPI rootNode = cache.findNode(fqn);
85
86          try
87          {
88             log.info("locking the " + fqn + " subtree to return the in-memory (transient) state");
89             acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
90             generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
91             log.info("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
92          }
93          finally
94          {
95             releaseStateTransferLocks(rootNode, owner, true);
96          }
97       }
98       else
99       {
100          out.writeBoolean(false);
101          Exception JavaDoc e = null;
102          if (!canProvideState)
103          {
104             String JavaDoc exceptionMessage = "Cache instance at " + cache.getLocalAddress() + " cannot provide state for fqn " + fqn + ".";
105
106             if (marshaller.isInactive(fqn.toString()))
107             {
108                exceptionMessage += " Region for fqn " + fqn + " is inactive.";
109             }
110             if (cache.findNode(fqn) == null)
111             {
112                exceptionMessage += " There is no cache node at fqn " + fqn;
113             }
114             e = new CacheException(exceptionMessage);
115          }
116          if (!fetchPersistentState && !fetchTransientState)
117          {
118             e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
119          }
120          out.writeObject(e);
121          throw e;
122       }
123    }
124
125    /**
126     * Requests state from each of the given source nodes in the cluster
127     * until it gets it or no node replies with a timeout exception. If state
128     * is returned, integrates it into the given DataNode. If no state is
129     * returned but a node replies with a timeout exception, the calls will be
130     * repeated with a longer timeout, until 3 attempts have been made.
131     *
132     * @param subtreeRoot Fqn of the topmost node in the subtree whose
133     * state should be transferred.
134     * @param integrationRoot the DataNode into which state should be integrated
135     * @param sources the cluster nodes to query for state
136     * @param cl the classloader to use to unmarshal the state.
137     * Can be <code>null</code>.
138     * @throws Exception
139     */

140    public void loadState(Fqn subtreeRoot, Node integrationRoot,
141                          Object JavaDoc[] sources, ClassLoader JavaDoc cl)
142            throws Exception JavaDoc
143    {
144       cache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
145    }
146
147    /**
148     * Set the portion of the cache rooted in <code>targetRoot</code>
149     * to match the given state. Updates the contents of <code>targetRoot</code>
150     * to reflect those in <code>new_state</code>.
151     * <p/>
152     * <strong>NOTE:</strong> This method performs no locking of nodes; it
153     * is up to the caller to lock <code>targetRoot</code> before calling
154     * this method.
155     *
156     * @param in a serialized byte[][] array where element 0 is the
157     * transient state (or null) , and element 1 is the
158     * persistent state (or null)
159     * @param targetRoot fqn of the node into which the state should be integrated
160     * @param cl classloader to use to unmarshal the state, or
161     * <code>null</code> if the TCCL should be used
162     */

163    public void setState(ObjectInputStream JavaDoc in, Fqn targetRoot, ClassLoader JavaDoc cl) throws Exception JavaDoc
164    {
165       CacheImpl cache = getTreeCache();
166       NodeSPI target = cache.findNode(targetRoot);
167       if (target == null)
168       {
169          // Create the integration root, but do not replicate
170
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
171          cache.put(targetRoot, null);
172          target = cache.findNode(targetRoot);
173       }
174       boolean hasState = in.readBoolean();
175       if (hasState)
176       {
177          setState(in, target, cl);
178       }
179       else
180       {
181          throw new CacheException("Cache instance at " + cache.getLocalAddress()
182                  + " cannot integrate state since state provider could not provide state due to " + in.readObject());
183       }
184    }
185
186    /**
187     * Set the portion of the cache rooted in <code>targetRoot</code>
188     * to match the given state. Updates the contents of <code>targetRoot</code>
189     * to reflect those in <code>new_state</code>.
190     * <p/>
191     * <strong>NOTE:</strong> This method performs no locking of nodes; it
192     * is up to the caller to lock <code>targetRoot</code> before calling
193     * this method.
194     *
195     * @param state a serialized byte[][] array where element 0 is the
196     * transient state (or null) , and element 1 is the
197     * persistent state (or null)
198     * @param targetRoot node into which the state should be integrated
199     * @param cl classloader to use to unmarshal the state, or
200     * <code>null</code> if the TCCL should be used
201     */

202    private void setState(ObjectInputStream JavaDoc state, NodeSPI targetRoot, ClassLoader JavaDoc cl) throws Exception JavaDoc
203    {
204       Object JavaDoc owner = getOwnerForLock();
205       long timeout = cache.getConfiguration().getInitialStateRetrievalTimeout();
206       long startTime = System.currentTimeMillis();
207
208       try
209       {
210          // Acquire a lock on the root node
211
acquireLocksForStateTransfer(targetRoot, owner, timeout, true, true);
212
213          /*
214           * Vladimir/Manik/Brian (Dec 7,2006)
215           *
216           * integrator.integrateState(in,targetRoot, cl) will call cache.put for each
217           * node read from stream. Having option override below allows nodes read
218           * to be directly stored into a tree since we bypass interceptor chain.
219           *
220           */

221
222          Option option = new Option();
223          option.setBypassInterceptorChain(true);
224          cache.getInvocationContext().setOptionOverrides(option);
225
226          StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
227          log.info("starting state integration at node " + targetRoot);
228          integrator.integrateState(state, targetRoot, cl);
229          log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
230       }
231       finally
232       {
233          releaseStateTransferLocks(targetRoot, owner, true);
234       }
235    }
236
237
238    /**
239     * Acquires locks on a root node for an owner for state transfer.
240     */

241    protected void acquireLocksForStateTransfer(NodeSPI root,
242                                                Object JavaDoc lockOwner,
243                                                long timeout,
244                                                boolean lockChildren,
245                                                boolean force)
246            throws Exception JavaDoc
247    {
248       try
249       {
250          if (lockChildren)
251          {
252             root.getLock().acquireAll(lockOwner, timeout, NodeLock.LockType.READ);
253          }
254          else
255          {
256             root.getLock().acquire(lockOwner, timeout, NodeLock.LockType.READ);
257          }
258       }
259       catch (TimeoutException te)
260       {
261          log.error("Caught TimeoutException acquiring locks on region " +
262                  root.getFqn(), te);
263          if (force)
264          {
265             // Until we have FLUSH in place, don't force locks
266
// forceAcquireLock(root, lockOwner, lockChildren);
267
throw te;
268
269          }
270          else
271          {
272             throw te;
273          }
274       }
275    }
276
277    /**
278     * Releases all state transfer locks acquired.
279     *
280     * @see #acquireLocksForStateTransfer
281     */

282    protected void releaseStateTransferLocks(NodeSPI root,
283                                             Object JavaDoc lockOwner,
284                                             boolean childrenLocked)
285    {
286       try
287       {
288          if (childrenLocked)
289          {
290             root.getLock().releaseAll(lockOwner);
291          }
292          else
293          {
294             root.getLock().release(lockOwner);
295          }
296       }
297       catch (Throwable JavaDoc t)
298       {
299          log.error("failed releasing locks", t);
300       }
301    }
302
303    protected StateTransferGenerator getStateTransferGenerator()
304    {
305       return StateTransferFactory.getStateTransferGenerator(getTreeCache());
306    }
307
308    protected StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream JavaDoc istream, Fqn fqn) throws Exception JavaDoc
309    {
310       return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
311    }
312
313    /**
314     * Returns an object suitable for use in node locking, either the current
315     * transaction or the current thread if there is no transaction.
316     */

317    private Object JavaDoc getOwnerForLock()
318    {
319       Object JavaDoc owner = getTreeCache().getCurrentTransaction();
320       if (owner == null)
321       {
322          owner = Thread.currentThread();
323       }
324       return owner;
325    }
326 }
327
Popular Tags