1 22 package org.jboss.aspects.versioned; 23 import java.io.Serializable ; 24 import java.util.Iterator ; 25 import java.util.LinkedHashMap ; 26 import java.util.List ; 27 import java.util.Map ; 28 import java.util.Set ; 29 30 import javax.management.MBeanServer ; 31 import javax.management.ObjectInstance ; 32 import javax.management.Query ; 33 import javax.management.QueryExp ; 34 35 import org.jboss.aop.InstanceAdvised; 36 import org.jboss.ha.framework.interfaces.HAPartition; 37 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer; 38 import org.jboss.ha.framework.server.ClusterPartitionMBean; 39 import org.jboss.logging.Logger; 40 import org.jboss.mx.util.MBeanProxyExt; 41 import org.jboss.mx.util.MBeanServerLocator; 42 import org.jboss.util.id.GUID; 43 47 public class DistributedTxCache implements HAPartitionStateTransfer 48 { 49 50 private static class LRUCache extends LinkedHashMap 51 { 52 private static final long serialVersionUID = -402696519285213913L; 53 54 private int maxSize; 55 public LRUCache(int max) 56 { 57 super(16, 0.75F, true); 58 this.maxSize = max; 59 } 60 protected boolean removeEldestEntry(Map.Entry eldest) 61 { 62 return this.size() > maxSize; 63 } 64 } 65 66 protected static Logger log = Logger.getLogger(DistributedTxCache.class); 67 protected long lockTimeout; 68 protected DistributedSynchronizationManager synchManager; 69 protected DistributedVersionManager versionManager; 70 protected String partitionName; 71 protected HAPartition partition; 72 protected String cacheName; 73 protected LRUCache cache = null; 74 protected int maxSize; 75 76 public DistributedTxCache(int maxSize, long lockTimeout, String cacheName) 77 { 78 this(maxSize, lockTimeout, cacheName, "DefaultPartition"); 79 } 80 81 public DistributedTxCache(int maxSize, long lockTimeout, String cacheName, String pName) 82 { 83 this.lockTimeout = lockTimeout; 84 this.partitionName = pName; 85 this.maxSize = maxSize; 86 this.cacheName = "DistributedTxCache/" + cacheName; 87 } 88 89 91 protected HAPartition findHAPartitionWithName (String name) throws Exception 92 { 93 HAPartition result = null; 94 MBeanServer server = MBeanServerLocator.locate(); 95 QueryExp exp = Query.and( 99 Query.match( 100 Query.attr("Name"), 101 Query.value("ClusterPartition") 102 ), 103 Query.match( 104 Query.attr("PartitionName"), 105 Query.value(name) 106 ) 107 ); 108 109 Set mbeans = server.queryMBeans (null, exp); 110 if (mbeans != null && mbeans.size () > 0) 111 { 112 for (Iterator iter = mbeans.iterator(); iter.hasNext();) 113 { 114 ObjectInstance inst = (ObjectInstance ) iter.next(); 115 try 116 { 117 ClusterPartitionMBean cp = (ClusterPartitionMBean) MBeanProxyExt.create ( 118 ClusterPartitionMBean.class, 119 inst.getObjectName (), 120 server); 121 result = cp.getHAPartition(); 122 break; 123 } 124 catch (Exception e) {} 125 } 126 } 127 128 return result; 129 } 130 131 public void create() throws Exception 132 { 133 this.partition = findHAPartitionWithName(partitionName); 134 partition.registerRPCHandler(cacheName, this); 140 synchManager = new DistributedSynchronizationManager(cacheName, null, partition); 141 versionManager = new DistributedVersionManager(lockTimeout, synchManager); 142 synchManager.versionManager = versionManager; 143 synchManager.create(); 144 } 145 146 public synchronized void start() throws Exception 147 { 148 synchManager.start(); 149 pullState(); 150 if (cache == null) cache = new LRUCache(maxSize); 151 } 152 153 protected void pullState() throws Exception 154 { 155 Object [] args = {}; 156 List rsp = partition.callMethodOnCluster(cacheName, "getCurrentState", args, true); 157 if (rsp.size() > 0) 158 { 159 setCurrentState((Serializable )rsp.get(0)); 160 } 161 } 162 163 164 public synchronized void _insert(Object key, Object obj) 165 { 166 cache.put(key, obj); 167 } 168 169 public void insert(Object key, Object obj) throws Exception 170 { 171 try 172 { 173 obj = versionManager.makeVersioned(obj); 174 if (versionManager.isVersioned(obj)) 175 { 176 log.trace("Inserting versioned object"); 177 obj = VersionManager.getGUID((InstanceAdvised)obj); 178 } 179 else 180 { 181 log.trace("Inserting a non-Versioned object"); 182 } 183 Object [] args = {key, obj}; 184 partition.callMethodOnCluster(cacheName, "_insert", args, false); 185 } 186 catch (Exception ex) 187 { 188 ex.printStackTrace(); 189 throw ex; 190 } 191 } 192 193 public synchronized void _remove(Object key) 194 { 195 cache.remove(key); 196 } 197 198 public void remove(Object key) 199 { 200 Object [] args = {key}; 201 try 202 { 203 partition.callMethodOnCluster(cacheName, "_remove", args, false); 204 } 205 catch (Exception ex) 206 { 207 throw new RuntimeException (ex); 208 } 209 } 210 211 212 public synchronized void _flush() 213 { 214 cache.clear(); 215 } 216 217 public void flush(Object key) 218 { 219 Object [] args = {}; 220 try 221 { 222 partition.callMethodOnCluster(cacheName, "_flush", args, false); 223 } 224 catch (Exception ex) 225 { 226 throw new RuntimeException (ex); 227 } 228 } 229 230 231 public synchronized Object get(Object key) 232 { 233 Object obj = cache.get(key); 234 if (obj instanceof GUID) 235 { 236 GUID guid = (GUID)obj; 237 obj = synchManager.getObject(guid); 238 } 239 return obj; 240 } 241 242 public Serializable getCurrentState() 243 { 244 log.trace("getCurrentState called on cache"); 245 return cache; 246 } 247 248 public void setCurrentState(Serializable newState) 249 { 250 log.trace("setCurrentState called on cache"); 251 synchronized (this) 252 { 253 this.cache = (LRUCache)newState; 254 } 255 } 256 257 } 258 | Popular Tags |