KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > loader > SingletonStoreCacheLoader


1 package org.jboss.cache.loader;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jboss.cache.AbstractCacheListener;
6 import org.jboss.cache.CacheListener;
7 import org.jboss.cache.CacheSPI;
8 import org.jboss.cache.Fqn;
9 import org.jboss.cache.Modification;
10 import org.jboss.cache.NodeSPI;
11 import org.jgroups.Address;
12 import org.jgroups.View;
13
14 import java.io.ObjectInputStream JavaDoc;
15 import java.util.Collection JavaDoc;
16 import java.util.List JavaDoc;
17 import java.util.Map JavaDoc;
18 import java.util.Set JavaDoc;
19 import java.util.Vector JavaDoc;
20
21 /**
22  * SingletonStoreCacheLoader is a delegating cache loader used for situations when only one node should interact with
23  * the underlying store. The coordinator of the cluster will be responsible for the underlying CacheLoader.
24  * SingletonStoreCacheLoader is a simply facade to a real CacheLoader implementation. It always delegates reads to the
25  * real CacheLoader.
26  * <p/>
27  * Writes are forwarded only if this SingletonStoreCacheLoader is currently the cordinator. This avoid having all
28  * CacheLoaders in a cluster writing the same data to the same underlying store. Although not incorrect (e.g. a DB
29  * will just discard additional INSERTs for the same key, and throw an exception), this will avoid a lot of
30  * redundant work.<br/>
31  * <p/>
32  * Whenever the current coordinator dies (or leaves), the second in line will take over. That SingletonStoreCacheLoader
33  * will then pass writes through to its underlying CacheLoader. Optionally, when a new coordinator takes over the
34  * Singleton, it can push the in-memory state to the cache cacheLoader.
35  *
36  * @author Bela Ban
37  * @author <a HREF="mailto:galder.zamarreno@jboss.com">Galder Zamarreno</a>
38  */

39 public class SingletonStoreCacheLoader extends AbstractDelegatingCacheLoader
40 {
41    private static final Log log = LogFactory.getLog(SingletonStoreCacheLoader.class);
42    private Address localAddress;
43    private boolean active;// only active if coordinator
44
private boolean pushStateWhenCoordinator;
45    private Thread JavaDoc pushStateThread;
46    private CacheListener cacheListener;
47
48    public SingletonStoreCacheLoader(CacheLoader cacheLoader, boolean pushConfiguration)
49    {
50       super(cacheLoader);
51       pushStateWhenCoordinator = pushConfiguration;
52       cacheListener = new SingletonStoreListener();
53    }
54
55    public CacheListener getCacheListener()
56    {
57       return cacheListener;
58    }
59
60    protected void activeStatusChanged(boolean newActiveState)
61    {
62       active = newActiveState;
63       log.debug("changed mode: " + this);
64       if (active && pushStateWhenCoordinator)
65       {
66          if (pushStateThread == null || !pushStateThread.isAlive())
67          {
68             pushStateThread = createPushStateThread();
69             pushStateThread.setName("InMemoryToCacheLoaderPusher");
70             pushStateThread.start();
71          }
72          else
73          {
74             try
75             {
76                log.debug("joining currently running state push thread");
77                pushStateThread.join();
78             }
79             catch (InterruptedException JavaDoc e)
80             {
81                log.error("joining existing push state thread was interrupted", e);
82             }
83          }
84       }
85    }
86
87    protected Thread JavaDoc createPushStateThread()
88    {
89       return new Thread JavaDoc(new Runnable JavaDoc()
90       {
91          public void run()
92          {
93             log.debug("start pushing in-memory state to cache cacheLoader");
94             try
95             {
96                pushState(cache.getRoot());
97                log.debug("in-memory state passed to cache cacheLoader successfully");
98             }
99             catch (Exception JavaDoc e)
100             {
101                log.error("unable to finish pushing the state", e);
102             }
103          }
104       });
105    }
106
107    private boolean isCoordinator(View newView)
108    {
109       if (newView != null && localAddress != null)
110       {
111          Vector JavaDoc mbrs = newView.getMembers();
112          if (mbrs != null)
113          {
114             if (mbrs.size() > 0 && localAddress.equals(mbrs.firstElement()))
115             {
116                /* This node is the coordinator */
117                return true;
118             }
119          }
120
121          return false;
122       }
123
124       /* Invalid new view, so previous value returned */
125       return active;
126    }
127
128    private void pushState(NodeSPI node) throws Exception JavaDoc
129    {
130       /* Put the node's data first */
131       Set JavaDoc keys = node.getKeysDirect();
132       Fqn fqn = node.getFqn();
133
134       for (Object JavaDoc aKey : keys)
135       {
136          Object JavaDoc value = cache.get(fqn, aKey);
137          put(fqn, aKey, value);
138       }
139
140       /* Navigates to the children */
141       Collection JavaDoc<NodeSPI> children = node.getChildrenDirect();
142       for (NodeSPI aChildren : children)
143       {
144          //Map.Entry entry = (Map.Entry) aChildren;
145
pushState(aChildren);
146       }
147    }
148
149    public Object JavaDoc put(Fqn name, Object JavaDoc key, Object JavaDoc value) throws Exception JavaDoc
150    {
151       if (active)
152       {
153          return super.put(name, key, value);
154       }
155
156       return null;
157    }
158
159    public void put(Fqn name, Map JavaDoc attributes) throws Exception JavaDoc
160    {
161       if (active)
162       {
163          super.put(name, attributes);
164       }
165    }
166
167    public void put(List JavaDoc<Modification> modifications) throws Exception JavaDoc
168    {
169       if (active)
170       {
171          super.put(modifications);
172       }
173    }
174
175    public Object JavaDoc remove(Fqn fqn, Object JavaDoc key) throws Exception JavaDoc
176    {
177       if (active)
178       {
179          return super.remove(fqn, key);
180       }
181
182       return null;
183    }
184
185    public void remove(Fqn fqn) throws Exception JavaDoc
186    {
187       if (active)
188       {
189          super.remove(fqn);
190       }
191    }
192
193    public void removeData(Fqn fqn) throws Exception JavaDoc
194    {
195       if (active)
196       {
197          super.removeData(fqn);
198       }
199    }
200
201    public void prepare(Object JavaDoc tx, List JavaDoc<Modification> modifications, boolean one_phase) throws Exception JavaDoc
202    {
203       if (active)
204       {
205          super.prepare(tx, modifications, one_phase);
206       }
207    }
208
209    public void commit(Object JavaDoc tx) throws Exception JavaDoc
210    {
211       if (active)
212       {
213          super.commit(tx);
214       }
215    }
216
217    public void rollback(Object JavaDoc tx)
218    {
219       if (active)
220       {
221          super.rollback(tx);
222       }
223    }
224
225    public void storeEntireState(ObjectInputStream JavaDoc is) throws Exception JavaDoc
226    {
227       if (active)
228       {
229          super.storeEntireState(is);
230       }
231    }
232
233    public void storeState(Fqn subtree, ObjectInputStream JavaDoc is) throws Exception JavaDoc
234    {
235       if (active)
236       {
237          super.storeState(subtree, is);
238       }
239    }
240
241    public Thread JavaDoc getPushStateThread()
242    {
243       return pushStateThread;
244    }
245
246    public String JavaDoc toString()
247    {
248       return "loc_addr=" + localAddress + ", active=" + active;
249    }
250
251    /**
252     * Cache listener that reacts to cluster topology changes to find out whether a new coordinator is elected.
253     * SingletonStoreCacheLoader reacts to these changes in order to decide which node should interact with the
254     * underlying cache store.
255     */

256    private class SingletonStoreListener extends AbstractCacheListener
257    {
258       public void cacheStarted(CacheSPI cache)
259       {
260          localAddress = cache.getLocalAddress();
261          active = cache.getRPCManager().isCoordinator();
262          log.debug("cache started: " + this);
263       }
264
265       public void cacheStopped(CacheSPI cache)
266       {
267          log.debug("cache stopped: " + this);
268       }
269
270       public void viewChange(View newView)
271       {
272          boolean tmp = isCoordinator(newView);
273
274          if (active != tmp)
275          {
276             activeStatusChanged(tmp);
277          }
278       }
279    }
280 }
Popular Tags