KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > statetransfer > DefaultStateTransferIntegrator


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.statetransfer;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.jboss.cache.CacheException;
12 import org.jboss.cache.CacheImpl;
13 import org.jboss.cache.Fqn;
14 import org.jboss.cache.Node;
15 import org.jboss.cache.NodeSPI;
16 import org.jboss.cache.Region;
17 import org.jboss.cache.buddyreplication.BuddyManager;
18 import org.jboss.cache.eviction.EvictedEventNode;
19 import org.jboss.cache.eviction.NodeEventType;
20 import org.jboss.cache.factories.NodeFactory;
21 import org.jboss.cache.loader.CacheLoader;
22 import org.jboss.cache.loader.NodeData;
23 import org.jboss.cache.loader.NodeDataExceptionMarker;
24
25 import java.io.IOException JavaDoc;
26 import java.io.ObjectInputStream JavaDoc;
27 import java.util.HashSet JavaDoc;
28 import java.util.Map JavaDoc;
29 import java.util.Set JavaDoc;
30
31 public class DefaultStateTransferIntegrator implements StateTransferIntegrator
32 {
33
34    protected Log log = LogFactory.getLog(getClass().getName());
35
36    private CacheImpl cache;
37
38    private Fqn targetFqn;
39
40    private NodeFactory factory;
41
42    private NodeFactory.NodeType nodeType;
43
44    private Set JavaDoc<Fqn> internalFqns;
45
46    public DefaultStateTransferIntegrator(Fqn targetFqn, CacheImpl cache)
47    {
48       this.targetFqn = targetFqn;
49       this.cache = cache;
50       this.factory = cache.getConfiguration().getRuntimeConfig().getNodeFactory();
51       this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
52               ? NodeFactory.NodeType.VERSIONED_NODE
53               : NodeFactory.NodeType.UNVERSIONED_NODE;
54       this.internalFqns = cache.getInternalFqns();
55    }
56
57    public void integrateState(ObjectInputStream JavaDoc ois, Node target, ClassLoader JavaDoc cl) throws Exception JavaDoc
58    {
59       integrateTransientState(ois, (NodeSPI) target, cl);
60       integrateAssociatedState(ois);
61       integratePersistentState(ois);
62    }
63
64    protected void integrateTransientState(ObjectInputStream JavaDoc in, NodeSPI target, ClassLoader JavaDoc cl) throws Exception JavaDoc
65    {
66       boolean transientSet = false;
67       ClassLoader JavaDoc oldCL = setClassLoader(cl);
68       try
69       {
70          if (log.isTraceEnabled())
71          {
72             log.trace("integrating transient state for " + target);
73          }
74
75          integrateTransientState(target, in);
76
77          transientSet = true;
78
79          if (log.isTraceEnabled())
80          {
81             log.trace("transient state successfully integrated");
82          }
83
84          notifyAllNodesCreated(target);
85       }
86       finally
87       {
88          if (!transientSet)
89          {
90             // Clear any existing state from the targetRoot
91
log.warn("transient state integration failed, removing all children of " + target);
92             target.clearData();
93             target.removeChildrenDirect();
94          }
95
96          resetClassLoader(oldCL);
97       }
98    }
99
100    /**
101     * Provided for subclasses that deal with associated state.
102     *
103     * @throws Exception
104     */

105    protected void integrateAssociatedState(ObjectInputStream JavaDoc in) throws Exception JavaDoc
106    {
107       // no-op in this base class
108
// just read marker
109
readNode(in);
110    }
111
112    protected void integratePersistentState(ObjectInputStream JavaDoc in) throws Exception JavaDoc
113    {
114
115       CacheLoader loader = cache.getCacheLoader();
116       if (loader == null)
117       {
118          if (log.isTraceEnabled())
119          {
120             log.trace("cache loader is null, will not attempt to integrate persistent state");
121          }
122       }
123       else
124       {
125          if (log.isTraceEnabled())
126          {
127             log.trace("integrating persistent state using " + loader.getClass().getName());
128          }
129
130          boolean persistentSet = false;
131          try
132          {
133             if (targetFqn.isRoot())
134             {
135                loader.storeEntireState(in);
136             }
137             else
138             {
139                loader.storeState(targetFqn, in);
140             }
141             persistentSet = true;
142          }
143          catch (ClassCastException JavaDoc cce)
144          {
145             log.error("Failed integrating persistent state. One of cacheloaders is not"
146                     + " adhering to state stream format. See JBCACHE-738.");
147             throw cce;
148          }
149          finally
150          {
151             if (!persistentSet)
152             {
153                log.warn("persistent state integration failed, removing all nodes from loader");
154                loader.remove(targetFqn);
155             }
156             else
157             {
158                if (log.isTraceEnabled())
159                {
160                   log.trace("persistent state integrated successfully");
161                }
162             }
163          }
164       }
165    }
166
167    protected CacheImpl getCache()
168    {
169       return cache;
170    }
171
172    protected NodeFactory getFactory()
173    {
174       return factory;
175    }
176
177    protected NodeFactory.NodeType getNodeType()
178    {
179       return nodeType;
180    }
181
182    protected Fqn getTargetFqn()
183    {
184       return targetFqn;
185    }
186
187    /**
188     * Generates NodeAdded notifications for all nodes of the tree. This is
189     * called whenever the tree is initially retrieved (state transfer)
190     */

191    private void notifyAllNodesCreated(NodeSPI curr)
192    {
193       if (curr == null) return;
194       getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true, true);
195       getCache().getNotifier().notifyNodeCreated(curr.getFqn(), false, true);
196       Set JavaDoc<NodeSPI> children = curr.getChildrenDirect();
197       for (NodeSPI n : children)
198       {
199          notifyAllNodesCreated(n);
200       }
201    }
202
203    private ClassLoader JavaDoc setClassLoader(ClassLoader JavaDoc newLoader)
204    {
205       ClassLoader JavaDoc oldClassLoader = null;
206       if (newLoader != null)
207       {
208          oldClassLoader = Thread.currentThread().getContextClassLoader();
209          Thread.currentThread().setContextClassLoader(newLoader);
210       }
211       return oldClassLoader;
212    }
213
214    private void resetClassLoader(ClassLoader JavaDoc oldLoader)
215    {
216       if (oldLoader != null)
217       {
218          Thread.currentThread().setContextClassLoader(oldLoader);
219       }
220    }
221
222    private void integrateTransientState(NodeSPI target, ObjectInputStream JavaDoc in) throws IOException JavaDoc,
223            ClassNotFoundException JavaDoc
224    {
225       Set JavaDoc<Node> retainedNodes = retainInternalNodes(target);
226
227       target.removeChildrenDirect();
228
229       // Read the first NodeData and integrate into our target
230
NodeData nd = readNode(in);
231
232       //are there any transient nodes at all?
233
if (nd != null && !nd.isMarker())
234       {
235          target.put(nd.getAttributes());
236
237          // Check whether this is an integration into the buddy backup
238
// subtree
239
Fqn tferFqn = nd.getFqn();
240          Fqn tgtFqn = target.getFqn();
241          boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
242                  && !tferFqn.isChildOrEquals(tgtFqn);
243          // If it is an integration, calculate how many levels of offset
244
int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
245
246          integrateStateTransferChildren(target, offset, in);
247
248          integrateRetainedNodes(target, retainedNodes);
249       }
250    }
251
252    private NodeData readNode(ObjectInputStream JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc
253    {
254       NodeData nd = (NodeData) in.readObject();
255       if (nd != null && nd.isExceptionMarker())
256       {
257          NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
258          throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
259                  + " threw exception during loadState", ndem.getCause());
260       }
261       return nd;
262    }
263
264    private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, ObjectInputStream JavaDoc in)
265            throws IOException JavaDoc, ClassNotFoundException JavaDoc
266    {
267       int parent_level = parent.getFqn().size();
268       int target_level = parent_level + 1;
269       Fqn fqn;
270       int size;
271       Object JavaDoc name;
272       NodeData nd = readNode(in);
273       while (nd != null && !nd.isMarker())
274       {
275          fqn = nd.getFqn();
276          // If we need to integrate into the buddy backup subtree,
277
// change the Fqn to fit under it
278
if (offset > 0)
279          {
280             fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
281          }
282          size = fqn.size();
283          if (size <= parent_level)
284          {
285             return nd;
286          }
287          else if (size > target_level)
288          {
289             throw new IllegalStateException JavaDoc("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
290          }
291
292          name = fqn.get(size - 1);
293
294          Map JavaDoc attrs = nd.getAttributes();
295
296          // We handle this NodeData. Create a TreeNode and
297
// integrate its data
298
NodeSPI target = factory.createDataNode(name, fqn, parent, attrs, false);
299          parent.addChild(name, target);
300
301          // JBCACHE-913
302
Region region = cache.getRegion(fqn, false);
303          if (region != null && region.getEvictionPolicy() != null)
304          {
305             region.putNodeEvent(new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT,
306                     attrs == null ? 0 : attrs.size()));
307          }
308
309          // Recursively call, which will walk down the tree
310
// and return the next NodeData that's a child of our parent
311
nd = integrateStateTransferChildren(target, offset, in);
312       }
313       return null;
314    }
315
316    private Set JavaDoc<Node> retainInternalNodes(Node target)
317    {
318       Set JavaDoc<Node> result = new HashSet JavaDoc<Node>();
319       Fqn targetFqn = target.getFqn();
320       for (Fqn internalFqn : internalFqns)
321       {
322          if (internalFqn.isChildOf(targetFqn))
323          {
324             Node internalNode = getInternalNode(target, internalFqn);
325             if (internalNode != null)
326             {
327                result.add(internalNode);
328             }
329          }
330       }
331
332       return result;
333    }
334
335    private Node getInternalNode(Node parent, Fqn internalFqn)
336    {
337       Object JavaDoc name = internalFqn.get(parent.getFqn().size());
338       Node result = parent.getChild(new Fqn(name));
339       if (result != null)
340       {
341          if (internalFqn.size() < result.getFqn().size())
342          {
343             // need to recursively walk down the tree
344
result = getInternalNode(result, internalFqn);
345          }
346       }
347       return result;
348    }
349
350    private void integrateRetainedNodes(NodeSPI root, Set JavaDoc<Node> retainedNodes)
351    {
352       Fqn rootFqn = root.getFqn();
353       for (Node retained : retainedNodes)
354       {
355          if (retained.getFqn().isChildOf(rootFqn))
356          {
357             integrateRetainedNode(root, retained);
358          }
359       }
360    }
361
362    private void integrateRetainedNode(NodeSPI ancestor, Node descendant)
363    {
364       Fqn descFqn = descendant.getFqn();
365       Fqn ancFqn = ancestor.getFqn();
366       Object JavaDoc name = descFqn.get(ancFqn.size());
367       NodeSPI child = (NodeSPI) ancestor.getChild(new Fqn(name));
368       if (ancFqn.size() == descFqn.size() + 1)
369       {
370          if (child == null)
371          {
372             ancestor.addChild(name, descendant);
373          }
374          else
375          {
376             log.warn("Received unexpected internal node " + descFqn + " in transferred state");
377          }
378       }
379       else
380       {
381          if (child == null)
382          {
383             // Missing level -- have to create empty node
384
// This shouldn't really happen -- internal fqns should
385
// be immediately under the root
386
child = factory.createDataNode(name, new Fqn(ancFqn, name), ancestor, null, true);
387             ancestor.addChild(name, child);
388          }
389
390          // Keep walking down the tree
391
integrateRetainedNode(child, descendant);
392       }
393    }
394 }
395
Popular Tags