1 22 package org.jboss.ejb3.cache.tree; 23 24 import java.util.Map ; 25 26 import javax.ejb.EJBException ; 27 import javax.ejb.NoSuchEJBException ; 28 import javax.management.MBeanServer ; 29 import javax.management.ObjectName ; 30 31 import org.jboss.aop.Advisor; 32 import org.jboss.cache.eviction.EvictionPolicyConfig; 33 import org.jboss.cache.eviction.LRUConfiguration; 34 import org.jboss.cache.jmx.CacheJmxWrapperMBean; 35 import org.jboss.cache.AbstractCacheListener; 36 import org.jboss.cache.Cache; 37 import org.jboss.cache.CacheException; 38 import org.jboss.cache.CacheSPI; 39 import org.jboss.cache.InvocationContext; 40 import org.jboss.cache.Region; 41 import org.jboss.ejb3.Container; 42 import org.jboss.ejb3.Pool; 43 import org.jboss.ejb3.cache.ClusteredStatefulCache; 44 import org.jboss.ejb3.stateful.StatefulBeanContext; 45 import org.jboss.mx.util.MBeanProxyExt; 46 import org.jboss.mx.util.MBeanServerLocator; 47 import org.jboss.logging.Logger; 48 import org.jboss.annotation.ejb.cache.tree.CacheConfig; 49 50 import org.jboss.cache.Fqn; 51 import org.jboss.cache.config.Option; 52 53 59 public class StatefulTreeCache implements ClusteredStatefulCache 60 { 61 protected static Logger log = Logger.getLogger(StatefulTreeCache.class); 62 static int FQN_SIZE = 2; 64 private static Option BYPASS_OPTION = new Option(); 65 private static Option LOCAL_ONLY_OPTION = new Option(); 66 static 67 { 68 BYPASS_OPTION.setBypassInterceptorChain(true); 69 LOCAL_ONLY_OPTION.setCacheModeLocal(true); 70 } 71 72 private Pool pool; 73 private Cache cache; 74 private Fqn cacheNode; 75 private Region region; 76 private ClusteredStatefulCacheListener listener; 77 78 public static long MarkInUseWaitTime = 15000; 79 80 public StatefulBeanContext create() 81 { 82 StatefulBeanContext ctx = null; 83 try 84 { 85 ctx = (StatefulBeanContext) pool.get(); 86 cache.put(new Fqn(cacheNode, ctx.getId()), "bean", ctx); 87 ctx.inUse = true; 88 ctx.lastUsed = System.currentTimeMillis(); 89 } 90 catch (EJBException e) 91 { 92 throw e; 93 } 94 catch (Exception e) 95 { 96 throw new EJBException (e); 97 } 98 return ctx; 99 } 100 101 public StatefulBeanContext create(Class [] initTypes, Object [] initValues) 102 { 103 StatefulBeanContext ctx = null; 104 try 105 { 106 ctx = (StatefulBeanContext) pool.get(initTypes, initValues); 107 Fqn id = new Fqn(cacheNode, ctx.getId()); 108 cache.put(id, "bean", ctx); 109 ctx.inUse = true; 110 ctx.lastUsed = System.currentTimeMillis(); 111 } 112 catch (EJBException e) 113 { 114 throw e; 115 } 116 catch (Exception e) 117 { 118 throw new EJBException (e); 119 } 120 return ctx; 121 } 122 123 public StatefulBeanContext get(Object key) throws EJBException 124 { 125 StatefulBeanContext entry = null; 126 Fqn id = new Fqn(cacheNode, key); 127 try 128 { 129 Object obj = cache.get(id, "bean"); 130 entry = (StatefulBeanContext) obj; 131 } 132 catch (CacheException e) 133 { 134 throw new RuntimeException (e); 135 } 136 if (entry == null) 137 { 138 throw new NoSuchEJBException ("Could not find Stateful bean: " + key); 139 } 140 entry.inUse = true; 141 region.markNodeCurrentlyInUse(new Fqn(key), MarkInUseWaitTime); 144 if(log.isDebugEnabled()) 145 { 146 log.debug("get: retrieved bean with cache id " +id.toString()); 147 } 148 149 entry.lastUsed = System.currentTimeMillis(); 150 return entry; 151 } 152 153 public void remove(Object key) 154 { 155 Fqn id = new Fqn(cacheNode, key); 156 try 157 { 158 if(log.isDebugEnabled()) 159 { 160 log.debug("remove: cache id " +id.toString()); 161 } 162 cache.removeNode(id); 163 } 164 catch (CacheException e) 165 { 166 throw new RuntimeException (e); 167 } 168 } 169 170 public void finished(StatefulBeanContext ctx) 171 { 172 synchronized (ctx) 173 { 174 ctx.inUse = false; 175 ctx.lastUsed = System.currentTimeMillis(); 176 region.unmarkNodeCurrentlyInUse(new Fqn(ctx.getId())); 179 } 180 } 181 182 public void replicate(StatefulBeanContext ctx) 183 { 184 try 185 { 186 cache.put(new Fqn(cacheNode, ctx.getId()), "bean", ctx); 187 } 188 catch (CacheException e) 189 { 190 throw new RuntimeException (e); 191 } 192 } 193 194 public void initialize(Container container) throws Exception 195 { 196 Advisor advisor = (Advisor) container; 197 this.pool = container.getPool(); 198 CacheConfig config = (CacheConfig) advisor.resolveAnnotation(CacheConfig.class); 199 MBeanServer server = MBeanServerLocator.locateJBoss(); 200 ObjectName cacheON = new ObjectName (config.name()); 201 CacheJmxWrapperMBean mbean = (CacheJmxWrapperMBean) MBeanProxyExt.create(CacheJmxWrapperMBean.class, cacheON, server); 202 cache = mbean.getCache(); 203 204 cacheNode = Fqn.fromString("/" + container.getEjbName() + "/"); 205 206 region = cache.getRegion(cacheNode, true); 208 EvictionPolicyConfig epc = getEvictionPolicyConfig((int) config.idleTimeoutSeconds(), 209 config.maxSize()); 210 region.setEvictionPolicy(epc); 211 log.debug("initialize(): created eviction region: " +region + " for ejb: " +container.getEjbName()); 215 } 216 217 protected EvictionPolicyConfig getEvictionPolicyConfig(int timeToLiveSeconds, int maxNodes) 218 { 219 LRUConfiguration epc = new LRUConfiguration(); 220 epc.setTimeToLiveSeconds(timeToLiveSeconds); 221 epc.setMaxNodes(maxNodes); 222 return epc; 223 } 224 225 public void start() 226 { 227 listener = new ClusteredStatefulCacheListener(); 231 cache.addCacheListener(listener); 232 } 233 234 public void stop() 235 { 236 cache.removeCacheListener(listener); 238 239 247 ((CacheSPI) cache).getRegionManager().removeRegion(region.getFqn()); 249 region.resetEvictionQueues(); 251 region = null; 252 253 try { 254 InvocationContext ctx = cache.getInvocationContext(); 256 ctx.setOptionOverrides(getLocalOnlyOption()); 257 cache.removeNode(cacheNode); 258 } 259 catch (CacheException e) 260 { 261 log.error("Stop(): can't remove bean from the underlying distributed cache"); 262 } 263 264 log.debug("stop(): StatefulTreeCache stopped successfully for " +cacheNode); 265 } 266 267 272 public class ClusteredStatefulCacheListener extends AbstractCacheListener 273 { 274 protected Logger log = Logger.getLogger(ClusteredStatefulCacheListener.class); 275 276 307 @Override 308 public void nodeLoaded(Fqn fqn, boolean pre, Map nodeData) 309 { 310 if(pre) return; if(fqn.size() != FQN_SIZE) return; 312 if(!fqn.isChildOrEquals(cacheNode)) return; if (nodeData == null) return; 314 315 StatefulBeanContext bean = (StatefulBeanContext) nodeData.get("bean"); 316 317 if(bean == null) 318 { 319 throw new IllegalStateException ("nodeActivate(): null bean instance."); 320 } 321 322 if(log.isTraceEnabled()) 324 { 325 log.trace("nodeActivate(): send postActivate event on fqn: " +fqn); 326 } 327 328 bean.postActivate(); 329 330 } 331 332 @Override 333 public void nodePassivated(Fqn fqn, boolean pre) { 334 if(!pre) return; if(fqn.size() != FQN_SIZE) return; 336 if(!fqn.isChildOrEquals(cacheNode)) return; 338 339 try { 340 InvocationContext ctx = cache.getInvocationContext(); 341 ctx.setOptionOverrides(getBypassOption()); 342 StatefulBeanContext bean = (StatefulBeanContext) cache.get(fqn, "bean"); 343 if (bean != null && !bean.inUse) 344 bean.prePassivate(); 345 346 } catch (CacheException e) { 347 log.error("nodePassivate(): can't retrieve bean instance from: " +fqn + " with exception: " +e); 348 return; 349 } 350 351 if(log.isTraceEnabled()) 353 { 354 log.trace("nodePassivate(): send prePassivate event on fqn: " +fqn); 355 } 356 357 } 358 } 359 360 private Option getBypassOption() 361 { 362 try 363 { 364 return BYPASS_OPTION.clone(); 365 } 366 catch (CloneNotSupportedException e) 367 { 368 throw new RuntimeException (e); 369 } 370 } 371 372 private Option getLocalOnlyOption() 373 { 374 try 375 { 376 return LOCAL_ONLY_OPTION.clone(); 377 } 378 catch (CloneNotSupportedException e) 379 { 380 throw new RuntimeException (e); 381 } 382 } 383 } 384 | Popular Tags |