1 7 package org.jboss.cache.statetransfer; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.CacheException; 12 import org.jboss.cache.CacheImpl; 13 import org.jboss.cache.Fqn; 14 import org.jboss.cache.Node; 15 import org.jboss.cache.NodeSPI; 16 import org.jboss.cache.Region; 17 import org.jboss.cache.buddyreplication.BuddyManager; 18 import org.jboss.cache.eviction.EvictedEventNode; 19 import org.jboss.cache.eviction.NodeEventType; 20 import org.jboss.cache.factories.NodeFactory; 21 import org.jboss.cache.loader.CacheLoader; 22 import org.jboss.cache.loader.NodeData; 23 import org.jboss.cache.loader.NodeDataExceptionMarker; 24 25 import java.io.IOException ; 26 import java.io.ObjectInputStream ; 27 import java.util.HashSet ; 28 import java.util.Map ; 29 import java.util.Set ; 30 31 public class DefaultStateTransferIntegrator implements StateTransferIntegrator 32 { 33 34 protected Log log = LogFactory.getLog(getClass().getName()); 35 36 private CacheImpl cache; 37 38 private Fqn targetFqn; 39 40 private NodeFactory factory; 41 42 private NodeFactory.NodeType nodeType; 43 44 private Set <Fqn> internalFqns; 45 46 public DefaultStateTransferIntegrator(Fqn targetFqn, CacheImpl cache) 47 { 48 this.targetFqn = targetFqn; 49 this.cache = cache; 50 this.factory = cache.getConfiguration().getRuntimeConfig().getNodeFactory(); 51 this.nodeType = cache.getConfiguration().isNodeLockingOptimistic() 52 ? NodeFactory.NodeType.VERSIONED_NODE 53 : NodeFactory.NodeType.UNVERSIONED_NODE; 54 this.internalFqns = cache.getInternalFqns(); 55 } 56 57 public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception 58 { 59 integrateTransientState(ois, (NodeSPI) target, cl); 60 integrateAssociatedState(ois); 61 integratePersistentState(ois); 62 } 63 64 protected void integrateTransientState(ObjectInputStream in, NodeSPI target, ClassLoader cl) throws Exception 65 { 66 boolean transientSet = false; 67 ClassLoader oldCL = setClassLoader(cl); 68 try 69 { 70 if (log.isTraceEnabled()) 71 { 72 log.trace("integrating transient state for " + target); 73 } 74 75 integrateTransientState(target, in); 76 77 transientSet = true; 78 79 if (log.isTraceEnabled()) 80 { 81 log.trace("transient state successfully integrated"); 82 } 83 84 notifyAllNodesCreated(target); 85 } 86 finally 87 { 88 if (!transientSet) 89 { 90 log.warn("transient state integration failed, removing all children of " + target); 92 target.clearData(); 93 target.removeChildrenDirect(); 94 } 95 96 resetClassLoader(oldCL); 97 } 98 } 99 100 105 protected void integrateAssociatedState(ObjectInputStream in) throws Exception 106 { 107 readNode(in); 110 } 111 112 protected void integratePersistentState(ObjectInputStream in) throws Exception 113 { 114 115 CacheLoader loader = cache.getCacheLoader(); 116 if (loader == null) 117 { 118 if (log.isTraceEnabled()) 119 { 120 log.trace("cache loader is null, will not attempt to integrate persistent state"); 121 } 122 } 123 else 124 { 125 if (log.isTraceEnabled()) 126 { 127 log.trace("integrating persistent state using " + loader.getClass().getName()); 128 } 129 130 boolean persistentSet = false; 131 try 132 { 133 if (targetFqn.isRoot()) 134 { 135 loader.storeEntireState(in); 136 } 137 else 138 { 139 loader.storeState(targetFqn, in); 140 } 141 persistentSet = true; 142 } 143 catch (ClassCastException cce) 144 { 145 log.error("Failed integrating persistent state. One of cacheloaders is not" 146 + " adhering to state stream format. See JBCACHE-738."); 147 throw cce; 148 } 149 finally 150 { 151 if (!persistentSet) 152 { 153 log.warn("persistent state integration failed, removing all nodes from loader"); 154 loader.remove(targetFqn); 155 } 156 else 157 { 158 if (log.isTraceEnabled()) 159 { 160 log.trace("persistent state integrated successfully"); 161 } 162 } 163 } 164 } 165 } 166 167 protected CacheImpl getCache() 168 { 169 return cache; 170 } 171 172 protected NodeFactory getFactory() 173 { 174 return factory; 175 } 176 177 protected NodeFactory.NodeType getNodeType() 178 { 179 return nodeType; 180 } 181 182 protected Fqn getTargetFqn() 183 { 184 return targetFqn; 185 } 186 187 191 private void notifyAllNodesCreated(NodeSPI curr) 192 { 193 if (curr == null) return; 194 getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true, true); 195 getCache().getNotifier().notifyNodeCreated(curr.getFqn(), false, true); 196 Set <NodeSPI> children = curr.getChildrenDirect(); 197 for (NodeSPI n : children) 198 { 199 notifyAllNodesCreated(n); 200 } 201 } 202 203 private ClassLoader setClassLoader(ClassLoader newLoader) 204 { 205 ClassLoader oldClassLoader = null; 206 if (newLoader != null) 207 { 208 oldClassLoader = Thread.currentThread().getContextClassLoader(); 209 Thread.currentThread().setContextClassLoader(newLoader); 210 } 211 return oldClassLoader; 212 } 213 214 private void resetClassLoader(ClassLoader oldLoader) 215 { 216 if (oldLoader != null) 217 { 218 Thread.currentThread().setContextClassLoader(oldLoader); 219 } 220 } 221 222 private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws IOException , 223 ClassNotFoundException 224 { 225 Set <Node> retainedNodes = retainInternalNodes(target); 226 227 target.removeChildrenDirect(); 228 229 NodeData nd = readNode(in); 231 232 if (nd != null && !nd.isMarker()) 234 { 235 target.put(nd.getAttributes()); 236 237 Fqn tferFqn = nd.getFqn(); 240 Fqn tgtFqn = target.getFqn(); 241 boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN) 242 && !tferFqn.isChildOrEquals(tgtFqn); 243 int offset = move ? tgtFqn.size() - tferFqn.size() : 0; 245 246 integrateStateTransferChildren(target, offset, in); 247 248 integrateRetainedNodes(target, retainedNodes); 249 } 250 } 251 252 private NodeData readNode(ObjectInputStream in) throws IOException , ClassNotFoundException 253 { 254 NodeData nd = (NodeData) in.readObject(); 255 if (nd != null && nd.isExceptionMarker()) 256 { 257 NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd; 258 throw new CacheException("State provider node " + ndem.getCacheNodeIdentity() 259 + " threw exception during loadState", ndem.getCause()); 260 } 261 return nd; 262 } 263 264 private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, ObjectInputStream in) 265 throws IOException , ClassNotFoundException 266 { 267 int parent_level = parent.getFqn().size(); 268 int target_level = parent_level + 1; 269 Fqn fqn; 270 int size; 271 Object name; 272 NodeData nd = readNode(in); 273 while (nd != null && !nd.isMarker()) 274 { 275 fqn = nd.getFqn(); 276 if (offset > 0) 279 { 280 fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn); 281 } 282 size = fqn.size(); 283 if (size <= parent_level) 284 { 285 return nd; 286 } 287 else if (size > target_level) 288 { 289 throw new IllegalStateException ("NodeData " + fqn + " is not a direct child of " + parent.getFqn()); 290 } 291 292 name = fqn.get(size - 1); 293 294 Map attrs = nd.getAttributes(); 295 296 NodeSPI target = factory.createDataNode(name, fqn, parent, attrs, false); 299 parent.addChild(name, target); 300 301 Region region = cache.getRegion(fqn, false); 303 if (region != null && region.getEvictionPolicy() != null) 304 { 305 region.putNodeEvent(new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT, 306 attrs == null ? 0 : attrs.size())); 307 } 308 309 nd = integrateStateTransferChildren(target, offset, in); 312 } 313 return null; 314 } 315 316 private Set <Node> retainInternalNodes(Node target) 317 { 318 Set <Node> result = new HashSet <Node>(); 319 Fqn targetFqn = target.getFqn(); 320 for (Fqn internalFqn : internalFqns) 321 { 322 if (internalFqn.isChildOf(targetFqn)) 323 { 324 Node internalNode = getInternalNode(target, internalFqn); 325 if (internalNode != null) 326 { 327 result.add(internalNode); 328 } 329 } 330 } 331 332 return result; 333 } 334 335 private Node getInternalNode(Node parent, Fqn internalFqn) 336 { 337 Object name = internalFqn.get(parent.getFqn().size()); 338 Node result = parent.getChild(new Fqn(name)); 339 if (result != null) 340 { 341 if (internalFqn.size() < result.getFqn().size()) 342 { 343 result = getInternalNode(result, internalFqn); 345 } 346 } 347 return result; 348 } 349 350 private void integrateRetainedNodes(NodeSPI root, Set <Node> retainedNodes) 351 { 352 Fqn rootFqn = root.getFqn(); 353 for (Node retained : retainedNodes) 354 { 355 if (retained.getFqn().isChildOf(rootFqn)) 356 { 357 integrateRetainedNode(root, retained); 358 } 359 } 360 } 361 362 private void integrateRetainedNode(NodeSPI ancestor, Node descendant) 363 { 364 Fqn descFqn = descendant.getFqn(); 365 Fqn ancFqn = ancestor.getFqn(); 366 Object name = descFqn.get(ancFqn.size()); 367 NodeSPI child = (NodeSPI) ancestor.getChild(new Fqn(name)); 368 if (ancFqn.size() == descFqn.size() + 1) 369 { 370 if (child == null) 371 { 372 ancestor.addChild(name, descendant); 373 } 374 else 375 { 376 log.warn("Received unexpected internal node " + descFqn + " in transferred state"); 377 } 378 } 379 else 380 { 381 if (child == null) 382 { 383 child = factory.createDataNode(name, new Fqn(ancFqn, name), ancestor, null, true); 387 ancestor.addChild(name, child); 388 } 389 390 integrateRetainedNode(child, descendant); 392 } 393 } 394 } 395 | Popular Tags |