1 7 package org.jboss.cache.statetransfer; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.CacheException; 12 import org.jboss.cache.CacheImpl; 13 import org.jboss.cache.Fqn; 14 import org.jboss.cache.Node; 15 import org.jboss.cache.NodeSPI; 16 import org.jboss.cache.config.Option; 17 import org.jboss.cache.loader.CacheLoaderManager; 18 import org.jboss.cache.loader.NodeData; 19 import org.jboss.cache.loader.NodeDataMarker; 20 import org.jboss.cache.lock.NodeLock; 21 import org.jboss.cache.lock.TimeoutException; 22 import org.jboss.cache.marshall.VersionAwareMarshaller; 23 24 import java.io.ObjectInputStream ; 25 import java.io.ObjectOutputStream ; 26 27 28 public class StateTransferManager 29 { 30 protected final static Log log = LogFactory.getLog(StateTransferManager.class); 31 32 public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker(); 33 34 public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER"; 35 36 private final CacheImpl cache; 37 38 public StateTransferManager(CacheImpl cache) 39 { 40 this.cache = cache; 41 } 42 43 public CacheImpl getTreeCache() 44 { 45 return cache; 46 } 47 48 67 public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable 68 { 69 VersionAwareMarshaller marshaller = cache.getMarshaller(); 70 71 boolean canProvideState = (!marshaller.isInactive(fqn.toString()) && cache.findNode(fqn) != null); 73 74 boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState(); 75 CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager(); 76 boolean fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState(); 77 78 if (canProvideState && (fetchPersistentState || fetchTransientState)) 79 { 80 out.writeBoolean(true); 81 StateTransferGenerator generator = getStateTransferGenerator(); 82 Object owner = getOwnerForLock(); 83 long startTime = System.currentTimeMillis(); 84 NodeSPI rootNode = cache.findNode(fqn); 85 86 try 87 { 88 log.info("locking the " + fqn + " subtree to return the in-memory (transient) state"); 89 acquireLocksForStateTransfer(rootNode, owner, timeout, true, force); 90 generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors); 91 log.info("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec"); 92 } 93 finally 94 { 95 releaseStateTransferLocks(rootNode, owner, true); 96 } 97 } 98 else 99 { 100 out.writeBoolean(false); 101 Exception e = null; 102 if (!canProvideState) 103 { 104 String exceptionMessage = "Cache instance at " + cache.getLocalAddress() + " cannot provide state for fqn " + fqn + "."; 105 106 if (marshaller.isInactive(fqn.toString())) 107 { 108 exceptionMessage += " Region for fqn " + fqn + " is inactive."; 109 } 110 if (cache.findNode(fqn) == null) 111 { 112 exceptionMessage += " There is no cache node at fqn " + fqn; 113 } 114 e = new CacheException(exceptionMessage); 115 } 116 if (!fetchPersistentState && !fetchTransientState) 117 { 118 e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state"); 119 } 120 out.writeObject(e); 121 throw e; 122 } 123 } 124 125 140 public void loadState(Fqn subtreeRoot, Node integrationRoot, 141 Object [] sources, ClassLoader cl) 142 throws Exception 143 { 144 cache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn()); 145 } 146 147 163 public void setState(ObjectInputStream in, Fqn targetRoot, ClassLoader cl) throws Exception 164 { 165 CacheImpl cache = getTreeCache(); 166 NodeSPI target = cache.findNode(targetRoot); 167 if (target == null) 168 { 169 cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true); 171 cache.put(targetRoot, null); 172 target = cache.findNode(targetRoot); 173 } 174 boolean hasState = in.readBoolean(); 175 if (hasState) 176 { 177 setState(in, target, cl); 178 } 179 else 180 { 181 throw new CacheException("Cache instance at " + cache.getLocalAddress() 182 + " cannot integrate state since state provider could not provide state due to " + in.readObject()); 183 } 184 } 185 186 202 private void setState(ObjectInputStream state, NodeSPI targetRoot, ClassLoader cl) throws Exception 203 { 204 Object owner = getOwnerForLock(); 205 long timeout = cache.getConfiguration().getInitialStateRetrievalTimeout(); 206 long startTime = System.currentTimeMillis(); 207 208 try 209 { 210 acquireLocksForStateTransfer(targetRoot, owner, timeout, true, true); 212 213 221 222 Option option = new Option(); 223 option.setBypassInterceptorChain(true); 224 cache.getInvocationContext().setOptionOverrides(option); 225 226 StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn()); 227 log.info("starting state integration at node " + targetRoot); 228 integrator.integrateState(state, targetRoot, cl); 229 log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec"); 230 } 231 finally 232 { 233 releaseStateTransferLocks(targetRoot, owner, true); 234 } 235 } 236 237 238 241 protected void acquireLocksForStateTransfer(NodeSPI root, 242 Object lockOwner, 243 long timeout, 244 boolean lockChildren, 245 boolean force) 246 throws Exception 247 { 248 try 249 { 250 if (lockChildren) 251 { 252 root.getLock().acquireAll(lockOwner, timeout, NodeLock.LockType.READ); 253 } 254 else 255 { 256 root.getLock().acquire(lockOwner, timeout, NodeLock.LockType.READ); 257 } 258 } 259 catch (TimeoutException te) 260 { 261 log.error("Caught TimeoutException acquiring locks on region " + 262 root.getFqn(), te); 263 if (force) 264 { 265 throw te; 268 269 } 270 else 271 { 272 throw te; 273 } 274 } 275 } 276 277 282 protected void releaseStateTransferLocks(NodeSPI root, 283 Object lockOwner, 284 boolean childrenLocked) 285 { 286 try 287 { 288 if (childrenLocked) 289 { 290 root.getLock().releaseAll(lockOwner); 291 } 292 else 293 { 294 root.getLock().release(lockOwner); 295 } 296 } 297 catch (Throwable t) 298 { 299 log.error("failed releasing locks", t); 300 } 301 } 302 303 protected StateTransferGenerator getStateTransferGenerator() 304 { 305 return StateTransferFactory.getStateTransferGenerator(getTreeCache()); 306 } 307 308 protected StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream istream, Fqn fqn) throws Exception 309 { 310 return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache()); 311 } 312 313 317 private Object getOwnerForLock() 318 { 319 Object owner = getTreeCache().getCurrentTransaction(); 320 if (owner == null) 321 { 322 owner = Thread.currentThread(); 323 } 324 return owner; 325 } 326 } 327 | Popular Tags |