1 7 package org.jboss.cache.loader; 8 9 import org.jboss.cache.CacheImpl; 10 import org.jboss.cache.CacheSPI; 11 import org.jboss.cache.Fqn; 12 import org.jboss.cache.NodeSPI; 13 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig; 14 import org.jboss.cache.lock.TimeoutException; 15 import org.jboss.cache.marshall.MethodCall; 16 import org.jboss.cache.marshall.MethodCallFactory; 17 import org.jgroups.Address; 18 19 import java.io.ObjectInputStream ; 20 import java.io.ObjectOutputStream ; 21 import java.lang.reflect.Method ; 22 import java.util.List ; 23 import java.util.Map ; 24 import java.util.Set ; 25 import java.util.Vector ; 26 27 39 public class RpcDelegatingCacheLoader extends DelegatingCacheLoader 40 { 41 42 private RpcDelegatingCacheLoaderConfig config = new RpcDelegatingCacheLoaderConfig(); 43 private Address localAddress; 44 45 public static final Method METHOD_GET_STATE; 46 public static final Method METHOD_SET_STATE; 47 public static final Method METHOD_GET_CHILDREN_NAMES; 48 public static final Method METHOD_GET_WITH_2_PARAMS; 49 public static final Method METHOD_GET_WITH_1_PARAM; 50 public static final Method METHOD_EXISTS; 51 public static final Method METHOD_PUT_WITH_3_PARAMS; 52 public static final Method METHOD_PUT_WITH_2_PARAMS; 53 public static final Method METHOD_REMOVE_WITH_2_PARAMS; 54 public static final Method METHOD_REMOVE_WITH_1_PARAM; 55 public static final Method METHOD_REMOVE_DATA; 56 57 61 static 62 { 63 try 64 { 65 METHOD_GET_STATE = CacheImpl.class.getDeclaredMethod("getStateBytes", new Class []{}); 66 METHOD_SET_STATE = CacheImpl.class.getDeclaredMethod("setStateBytes", new Class []{byte[].class}); 67 METHOD_GET_CHILDREN_NAMES = CacheImpl.class.getDeclaredMethod("getChildrenNames", new Class []{Fqn.class}); 68 METHOD_GET_WITH_2_PARAMS = CacheImpl.class.getDeclaredMethod("get", new Class []{Fqn.class, Object .class}); 69 METHOD_GET_WITH_1_PARAM = CacheImpl.class.getDeclaredMethod("get", new Class []{Fqn.class}); 70 METHOD_EXISTS = CacheImpl.class.getDeclaredMethod("exists", new Class []{Fqn.class}); 71 METHOD_PUT_WITH_3_PARAMS = CacheImpl.class.getDeclaredMethod("put", new Class []{Fqn.class, Object .class, Object .class}); 72 METHOD_PUT_WITH_2_PARAMS = CacheImpl.class.getDeclaredMethod("put", new Class []{Fqn.class, Map .class}); 73 METHOD_REMOVE_WITH_2_PARAMS = CacheImpl.class.getDeclaredMethod("remove", new Class []{Fqn.class, Object .class}); 74 METHOD_REMOVE_WITH_1_PARAM = CacheImpl.class.getDeclaredMethod("remove", new Class []{Fqn.class}); 75 METHOD_REMOVE_DATA = CacheImpl.class.getDeclaredMethod("removeData", new Class []{Fqn.class}); 76 } 77 catch (NoSuchMethodException ex) 78 { 79 throw new ExceptionInInitializerError (ex); 80 } 81 } 82 83 86 public RpcDelegatingCacheLoader() 87 { 88 } 90 91 96 public RpcDelegatingCacheLoader(CacheSPI cache, int timeout) 97 { 98 setCache(cache); 99 config.setTimeout(timeout); 100 } 101 102 105 public void setConfig(IndividualCacheLoaderConfig base) 106 { 107 if (base instanceof RpcDelegatingCacheLoaderConfig) 108 { 109 this.config = (RpcDelegatingCacheLoaderConfig) base; 110 } 111 else 112 { 113 this.config = new RpcDelegatingCacheLoaderConfig(base); 114 } 115 } 116 117 public IndividualCacheLoaderConfig getConfig() 118 { 119 return config; 120 } 121 122 125 protected Set delegateGetChildrenNames(Fqn name) throws Exception 126 { 127 return (Set ) this.doMethodCall(METHOD_GET_CHILDREN_NAMES, new Object []{name}); 128 } 129 130 134 138 141 protected Map delegateGet(Fqn name) throws Exception 142 { 143 NodeSPI n = (NodeSPI) this.doMethodCall(METHOD_GET_WITH_1_PARAM, new Object []{name}); 144 if (n == null) 145 { 146 return null; 147 } 148 return n.getDataDirect(); 149 } 150 151 154 protected boolean delegateExists(Fqn name) throws Exception 155 { 156 Boolean exists = (Boolean ) this.doMethodCall(METHOD_EXISTS, new Object []{name}); 157 return exists != null && exists; 158 } 159 160 163 protected Object delegatePut(Fqn name, Object key, Object value) throws Exception 164 { 165 return this.doMethodCall(METHOD_PUT_WITH_3_PARAMS, new Object []{name, key, value}); 166 } 167 168 171 protected void delegatePut(Fqn name, Map attributes) throws Exception 172 { 173 this.doMethodCall(METHOD_PUT_WITH_2_PARAMS, new Object []{name, attributes}); 174 } 175 176 179 protected Object delegateRemove(Fqn name, Object key) throws Exception 180 { 181 return this.doMethodCall(METHOD_REMOVE_WITH_2_PARAMS, new Object []{name, key}); 182 } 183 184 187 protected void delegateRemove(Fqn name) throws Exception 188 { 189 this.doMethodCall(METHOD_REMOVE_WITH_1_PARAM, new Object []{name}); 190 } 191 192 195 protected void delegateRemoveData(Fqn name) throws Exception 196 { 197 this.doMethodCall(METHOD_REMOVE_DATA, new Object []{name}); 198 } 199 200 @Override 201 protected void delegateLoadEntireState(ObjectOutputStream os) throws Exception 202 { 203 205 } 206 207 @Override 208 protected void delegateLoadState(Fqn subtree, ObjectOutputStream os) throws Exception 209 { 210 throw new UnsupportedOperationException ("setting and loading state for specific Fqns not supported"); 211 } 212 213 @Override 214 protected void delegateStoreEntireState(ObjectInputStream is) throws Exception 215 { 216 218 } 219 220 @Override 221 protected void delegateStoreState(Fqn subtree, ObjectInputStream is) throws Exception 222 { 223 throw new UnsupportedOperationException ("setting and loading state for specific Fqns not supported"); 224 } 225 226 235 private Object doMethodCall(Method method, Object [] args) throws Exception 236 { 237 if (this.cache.getRPCManager().isCoordinator()) 238 { 239 if (log.isTraceEnabled()) 240 { 241 log.trace("Cannot delegate to the remote coordinator because the cache is itself the coordinator."); 242 } 243 return null; 244 } 245 if (this.localAddress == null) 246 { 247 this.localAddress = this.cache.getLocalAddress(); 248 } 249 if (this.localAddress == null) 250 { 251 throw new Exception ("Cannot delegate to the remote coordinator because the cache has no local address."); 252 } 253 Address coordinator = cache.getRPCManager().getCoordinator(); 254 if (coordinator == null) 255 { 256 throw new Exception ("Cannot delegate to the remote coordinator because the cache has no coordinator."); 257 } 258 Vector <Address> members = new Vector <Address>(); 259 members.add(coordinator); 260 MethodCall methodCall = MethodCallFactory.create(method, args); 261 boolean synchronous = true; 262 boolean excludeSelf = true; 263 List responses = cache.getRPCManager().callRemoteMethods(members, methodCall, synchronous, excludeSelf, config.getTimeout()); 264 if (responses == null) 265 { 266 throw new Exception ("Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() was discarded!"); 267 } 268 Object response = responses.get(0); 269 if (response instanceof TimeoutException) 270 { 271 throw new Exception ("Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() timed out: " + response); 272 } 273 else if (response instanceof Throwable ) 274 { 275 throw new Exception ("Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() failed!", (Throwable ) response); 276 } 277 return response; 278 } 279 } 280 | Popular Tags |