KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > loader > RpcDelegatingCacheLoader


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache.loader;
8
9 import org.jboss.cache.CacheImpl;
10 import org.jboss.cache.CacheSPI;
11 import org.jboss.cache.Fqn;
12 import org.jboss.cache.NodeSPI;
13 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
14 import org.jboss.cache.lock.TimeoutException;
15 import org.jboss.cache.marshall.MethodCall;
16 import org.jboss.cache.marshall.MethodCallFactory;
17 import org.jgroups.Address;
18
19 import java.io.ObjectInputStream JavaDoc;
20 import java.io.ObjectOutputStream JavaDoc;
21 import java.lang.reflect.Method JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.Set JavaDoc;
25 import java.util.Vector JavaDoc;
26
27 /**
28  * DelegatingCacheLoader implementation which delegates to a remote (not in the same VM)
29  * CacheImpl using JGroups' RPC mechanism. The remote CacheImpl delegated to is this
30  * cacheloader's cache's coordinator.
31  * <p/>
32  * This CacheLoader uses an optional configuration property, <tt>timeout</tt>, which
33  * specifies the timeout in milliseconds for each RPC call. If <tt>timeout</tt> is not
34  * specified, it defaults to <tt>5000</tt>.
35  *
36  * @author Daniel Gredler
37  * @version $Id: RpcDelegatingCacheLoader.java,v 1.16 2007/01/04 05:35:37 msurtani Exp $
38  */

39 public class RpcDelegatingCacheLoader extends DelegatingCacheLoader
40 {
41
42    private RpcDelegatingCacheLoaderConfig config = new RpcDelegatingCacheLoaderConfig();
43    private Address localAddress;
44
45    public static final Method JavaDoc METHOD_GET_STATE;
46    public static final Method JavaDoc METHOD_SET_STATE;
47    public static final Method JavaDoc METHOD_GET_CHILDREN_NAMES;
48    public static final Method JavaDoc METHOD_GET_WITH_2_PARAMS;
49    public static final Method JavaDoc METHOD_GET_WITH_1_PARAM;
50    public static final Method JavaDoc METHOD_EXISTS;
51    public static final Method JavaDoc METHOD_PUT_WITH_3_PARAMS;
52    public static final Method JavaDoc METHOD_PUT_WITH_2_PARAMS;
53    public static final Method JavaDoc METHOD_REMOVE_WITH_2_PARAMS;
54    public static final Method JavaDoc METHOD_REMOVE_WITH_1_PARAM;
55    public static final Method JavaDoc METHOD_REMOVE_DATA;
56
57    /**
58     * Initializes the <tt>Method</tt> instances needed by the operations
59     * delegated to the cacheloader's cache's coordinator.
60     */

61    static
62    {
63       try
64       {
65          METHOD_GET_STATE = CacheImpl.class.getDeclaredMethod("getStateBytes", new Class JavaDoc[]{});
66          METHOD_SET_STATE = CacheImpl.class.getDeclaredMethod("setStateBytes", new Class JavaDoc[]{byte[].class});
67          METHOD_GET_CHILDREN_NAMES = CacheImpl.class.getDeclaredMethod("getChildrenNames", new Class JavaDoc[]{Fqn.class});
68          METHOD_GET_WITH_2_PARAMS = CacheImpl.class.getDeclaredMethod("get", new Class JavaDoc[]{Fqn.class, Object JavaDoc.class});
69          METHOD_GET_WITH_1_PARAM = CacheImpl.class.getDeclaredMethod("get", new Class JavaDoc[]{Fqn.class});
70          METHOD_EXISTS = CacheImpl.class.getDeclaredMethod("exists", new Class JavaDoc[]{Fqn.class});
71          METHOD_PUT_WITH_3_PARAMS = CacheImpl.class.getDeclaredMethod("put", new Class JavaDoc[]{Fqn.class, Object JavaDoc.class, Object JavaDoc.class});
72          METHOD_PUT_WITH_2_PARAMS = CacheImpl.class.getDeclaredMethod("put", new Class JavaDoc[]{Fqn.class, Map JavaDoc.class});
73          METHOD_REMOVE_WITH_2_PARAMS = CacheImpl.class.getDeclaredMethod("remove", new Class JavaDoc[]{Fqn.class, Object JavaDoc.class});
74          METHOD_REMOVE_WITH_1_PARAM = CacheImpl.class.getDeclaredMethod("remove", new Class JavaDoc[]{Fqn.class});
75          METHOD_REMOVE_DATA = CacheImpl.class.getDeclaredMethod("removeData", new Class JavaDoc[]{Fqn.class});
76       }
77       catch (NoSuchMethodException JavaDoc ex)
78       {
79          throw new ExceptionInInitializerError JavaDoc(ex);
80       }
81    }
82
83    /**
84     * Default constructor.
85     */

86    public RpcDelegatingCacheLoader()
87    {
88       // Empty.
89
}
90
91    /**
92     * Allows programmatic configuration.
93     *
94     * @param timeout The timeout in milliseconds for each RPC call.
95     */

96    public RpcDelegatingCacheLoader(CacheSPI cache, int timeout)
97    {
98       setCache(cache);
99       config.setTimeout(timeout);
100    }
101
102    /**
103     * Allows configuration via XML config file.
104     */

105    public void setConfig(IndividualCacheLoaderConfig base)
106    {
107       if (base instanceof RpcDelegatingCacheLoaderConfig)
108       {
109          this.config = (RpcDelegatingCacheLoaderConfig) base;
110       }
111       else
112       {
113          this.config = new RpcDelegatingCacheLoaderConfig(base);
114       }
115    }
116
117    public IndividualCacheLoaderConfig getConfig()
118    {
119       return config;
120    }
121
122    /**
123     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGetChildrenNames(org.jboss.cache.Fqn)
124     */

125    protected Set JavaDoc delegateGetChildrenNames(Fqn name) throws Exception JavaDoc
126    {
127       return (Set JavaDoc) this.doMethodCall(METHOD_GET_CHILDREN_NAMES, new Object JavaDoc[]{name});
128    }
129
130    // See http://jira.jboss.com/jira/browse/JBCACHE-118 for why this is commented out.
131
/**
132     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn,java.lang.Object)
133     */

134    // protected Object delegateGet(Fqn name, Object key) throws Exception {
135
// return this.doMethodCall( METHOD_GET_WITH_2_PARAMS, new Object[] { name, key } );
136
// }
137

138    /**
139     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn)
140     */

141    protected Map JavaDoc delegateGet(Fqn name) throws Exception JavaDoc
142    {
143       NodeSPI n = (NodeSPI) this.doMethodCall(METHOD_GET_WITH_1_PARAM, new Object JavaDoc[]{name});
144       if (n == null)
145       {
146          return null;
147       }
148       return n.getDataDirect();
149    }
150
151    /**
152     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateExists(org.jboss.cache.Fqn)
153     */

154    protected boolean delegateExists(Fqn name) throws Exception JavaDoc
155    {
156       Boolean JavaDoc exists = (Boolean JavaDoc) this.doMethodCall(METHOD_EXISTS, new Object JavaDoc[]{name});
157       return exists != null && exists;
158    }
159
160    /**
161     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn,java.lang.Object,java.lang.Object)
162     */

163    protected Object JavaDoc delegatePut(Fqn name, Object JavaDoc key, Object JavaDoc value) throws Exception JavaDoc
164    {
165       return this.doMethodCall(METHOD_PUT_WITH_3_PARAMS, new Object JavaDoc[]{name, key, value});
166    }
167
168    /**
169     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn,java.util.Map)
170     */

171    protected void delegatePut(Fqn name, Map JavaDoc attributes) throws Exception JavaDoc
172    {
173       this.doMethodCall(METHOD_PUT_WITH_2_PARAMS, new Object JavaDoc[]{name, attributes});
174    }
175
176    /**
177     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn,java.lang.Object)
178     */

179    protected Object JavaDoc delegateRemove(Fqn name, Object JavaDoc key) throws Exception JavaDoc
180    {
181       return this.doMethodCall(METHOD_REMOVE_WITH_2_PARAMS, new Object JavaDoc[]{name, key});
182    }
183
184    /**
185     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn)
186     */

187    protected void delegateRemove(Fqn name) throws Exception JavaDoc
188    {
189       this.doMethodCall(METHOD_REMOVE_WITH_1_PARAM, new Object JavaDoc[]{name});
190    }
191
192    /**
193     * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemoveData(org.jboss.cache.Fqn)
194     */

195    protected void delegateRemoveData(Fqn name) throws Exception JavaDoc
196    {
197       this.doMethodCall(METHOD_REMOVE_DATA, new Object JavaDoc[]{name});
198    }
199
200    @Override JavaDoc
201    protected void delegateLoadEntireState(ObjectOutputStream JavaDoc os) throws Exception JavaDoc
202    {
203       // TODO
204

205    }
206
207    @Override JavaDoc
208    protected void delegateLoadState(Fqn subtree, ObjectOutputStream JavaDoc os) throws Exception JavaDoc
209    {
210       throw new UnsupportedOperationException JavaDoc("setting and loading state for specific Fqns not supported");
211    }
212
213    @Override JavaDoc
214    protected void delegateStoreEntireState(ObjectInputStream JavaDoc is) throws Exception JavaDoc
215    {
216       // TODO
217

218    }
219
220    @Override JavaDoc
221    protected void delegateStoreState(Fqn subtree, ObjectInputStream JavaDoc is) throws Exception JavaDoc
222    {
223       throw new UnsupportedOperationException JavaDoc("setting and loading state for specific Fqns not supported");
224    }
225
226    /**
227     * Performs the specified remote method call using the specified arguments. This method
228     * returns <tt>null</tt> if unable to delegate the method call to the cacheloader's
229     * cache's coordinator because it is in fact itself the coordinator.
230     *
231     * @param method The remote method to call.
232     * @param args The arguments to use for the remote method call.
233     * @return The value returned by the remote method call.
234     */

235    private Object JavaDoc doMethodCall(Method JavaDoc method, Object JavaDoc[] args) throws Exception JavaDoc
236    {
237       if (this.cache.getRPCManager().isCoordinator())
238       {
239          if (log.isTraceEnabled())
240          {
241             log.trace("Cannot delegate to the remote coordinator because the cache is itself the coordinator.");
242          }
243          return null;
244       }
245       if (this.localAddress == null)
246       {
247          this.localAddress = this.cache.getLocalAddress();
248       }
249       if (this.localAddress == null)
250       {
251          throw new Exception JavaDoc("Cannot delegate to the remote coordinator because the cache has no local address.");
252       }
253       Address coordinator = cache.getRPCManager().getCoordinator();
254       if (coordinator == null)
255       {
256          throw new Exception JavaDoc("Cannot delegate to the remote coordinator because the cache has no coordinator.");
257       }
258       Vector JavaDoc<Address> members = new Vector JavaDoc<Address>();
259       members.add(coordinator);
260       MethodCall methodCall = MethodCallFactory.create(method, args);
261       boolean synchronous = true;
262       boolean excludeSelf = true;
263       List JavaDoc responses = cache.getRPCManager().callRemoteMethods(members, methodCall, synchronous, excludeSelf, config.getTimeout());
264       if (responses == null)
265       {
266          throw new Exception JavaDoc("Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() was discarded!");
267       }
268       Object JavaDoc response = responses.get(0);
269       if (response instanceof TimeoutException)
270       {
271          throw new Exception JavaDoc("Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() timed out: " + response);
272       }
273       else if (response instanceof Throwable JavaDoc)
274       {
275          throw new Exception JavaDoc("Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() failed!", (Throwable JavaDoc) response);
276       }
277       return response;
278    }
279 }
280
Popular Tags