KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > loader > AsyncCacheLoader


1 /**
2  *
3  */

4 package org.jboss.cache.loader;
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jboss.cache.CacheException;
9 import org.jboss.cache.Fqn;
10 import org.jboss.cache.Modification;
11 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
12 import org.jboss.cache.util.MapCopy;
13
14 import java.io.IOException JavaDoc;
15 import java.util.ArrayList JavaDoc;
16 import java.util.HashMap JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.List JavaDoc;
19 import java.util.Map JavaDoc;
20 import java.util.concurrent.BlockingQueue JavaDoc;
21 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
22 import java.util.concurrent.TimeUnit JavaDoc;
23 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
24 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
25
26 /**
27  * The AsyncCacheLoader is a delegating cache loader that extends
28  * AbstractDelegatingCacheLoader overriding methods to that should not
29  * just delegate the operation to the underlying cache loader.
30  * <p/>
31  * Read operations are done synchronously, while write (CRUD - Create, Remove,
32  * Update, Delete) operations are done asynchronously. There is no provision
33  * for exception handling at the moment for problems encountered with the
34  * underlying CacheLoader during a CRUD operation, and the exception is just
35  * logged.
36  * <p/>
37  * When configuring the CacheLoader, use the following attribute:
38  * <p/>
39  * <code>
40  * &lt;attribute name="CacheLoaderAsynchronous"&gt;true&lt;/attribute&gt;
41  * </code>
42  * <p/>
43  * to define whether cache loader operations are to be asynchronous. If not
44  * specified, a cache loader operation is assumed synchronous.
45  * <p/>
46  * <p/>
47  * The following additional parameters are available:
48  * <dl>
49  * <dt>cache.async.batchSize</dt>
50  * <dd>Number of modifications to commit in one transaction, default is
51  * 100. The minimum batch size is 1.</dd>
52  * <dt>cache.async.pollWait</dt>
53  * <dd>How long to wait before processing an incomplete batch, in
54  * milliseconds. Default is 100. Set this to 0 to not wait before processing
55  * available records.</dd>
56  * <dt>cache.async.returnOld</dt>
57  * <dd>If <code>true</code>, this loader returns the old values from {@link
58  * #put} and {@link #remove} methods. Otherwise, these methods always return
59  * null. Default is true. <code>false</code> improves the performance of these
60  * operations.</dd>
61  * <dt>cache.async.queueSize</dt>
62  * <dd>Maximum number of entries to enqueue for asynchronous processing.
63  * Lowering this size may help prevent out-of-memory conditions. It also may
64  * help to prevent less records lost in the case of JVM failure. Default is
65  * 10,000 operations.</dd>
66  * <dt>cache.async.put</dt>
67  * <dd>If set to false, all {@link #put} operations will be processed
68  * synchronously, and then only the {@link #remove} operations will be
69  * processed asynchronously. This mode may be useful for processing
70  * expiration of messages within a separate thread and keeping other
71  * operations synchronous for reliability.
72  * </dd>
73  * </dl>
74  * For increased performance for many smaller transactions, use higher values
75  * for <code>cache.async.batchSize</code> and
76  * <code>cache.async.pollWait</code>. For larger sized records, use a smaller
77  * value for <code>cache.async.queueSize</code>.
78  *
79  * @author Manik Surtani (manik.surtani@jboss.com)
80  */

81 public class AsyncCacheLoader extends AbstractDelegatingCacheLoader
82 {
83
84    private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
85
86    private static AtomicInteger JavaDoc threadId = new AtomicInteger JavaDoc(0);
87
88
89    /**
90     * Default limit on entries to process asynchronously.
91     */

92    public static final int DEFAULT_QUEUE_SIZE = 10000;
93
94    private AsynchCacheLoaderConfig config;
95    private AsyncProcessor processor;
96    private AtomicBoolean JavaDoc stopped = new AtomicBoolean JavaDoc(true);
97    private BlockingQueue JavaDoc<Modification> queue = new LinkedBlockingQueue JavaDoc(DEFAULT_QUEUE_SIZE);
98
99
100    public AsyncCacheLoader()
101    {
102       super(null);
103    }
104
105    public AsyncCacheLoader(CacheLoader cacheLoader)
106    {
107       super(cacheLoader);
108    }
109
110    public void setConfig(IndividualCacheLoaderConfig base)
111    {
112       if (base instanceof AsynchCacheLoaderConfig)
113       {
114          config = (AsynchCacheLoaderConfig) base;
115       }
116       else
117       {
118          config = new AsynchCacheLoaderConfig(base);
119       }
120
121       if (config.getQueueSize() > 0)
122       {
123          queue = new LinkedBlockingQueue JavaDoc(config.getQueueSize());
124       }
125
126       super.setConfig(base);
127    }
128
129    public Map JavaDoc get(Fqn name) throws Exception JavaDoc
130    {
131       try
132       {
133          return super.get(name);
134       }
135       catch (IOException JavaDoc e)
136       {
137          // FileCacheLoader sometimes does this apparently
138
log.trace(e);
139          return new HashMap JavaDoc(); // ?
140
}
141    }
142
143    Object JavaDoc get(Fqn name, Object JavaDoc key) throws Exception JavaDoc
144    {
145       if (config.getReturnOld())
146       {
147          try
148          {
149             Map JavaDoc map = super.get(name);
150             if (map != null)
151             {
152                return map.get(key);
153             }
154          }
155          catch (IOException JavaDoc e)
156          {
157             // FileCacheLoader sometimes does this apparently
158
log.trace(e);
159          }
160       }
161       return null;
162    }
163
164    public Object JavaDoc put(Fqn name, Object JavaDoc key, Object JavaDoc value) throws Exception JavaDoc
165    {
166       if (config.getUseAsyncPut())
167       {
168          Object JavaDoc oldValue = get(name, key);
169          Modification mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE, name, key, value);
170          enqueue(mod);
171          return oldValue;
172       }
173       else
174       {
175          return super.put(name, key, value);
176       }
177    }
178
179    public void put(Fqn name, Map JavaDoc attributes) throws Exception JavaDoc
180    {
181       if (config.getUseAsyncPut())
182       {
183          // JBCACHE-769 -- make a defensive copy
184
Map JavaDoc attrs = (attributes == null ? null : new MapCopy(attributes));
185          Modification mod = new Modification(Modification.ModificationType.PUT_DATA, name, attrs);
186          enqueue(mod);
187       }
188       else
189       {
190          super.put(name, attributes); // Let delegate make its own defensive copy
191
}
192    }
193
194    public void put(List JavaDoc<Modification> modifications) throws Exception JavaDoc
195    {
196       if (config.getUseAsyncPut())
197       {
198          Iterator JavaDoc i = modifications.iterator();
199          while (i.hasNext())
200          {
201             enqueue((Modification) i.next());
202          }
203       }
204       else
205       {
206          super.put(modifications);
207       }
208    }
209
210    public Object JavaDoc remove(Fqn name, Object JavaDoc key) throws Exception JavaDoc
211    {
212       Object JavaDoc oldValue = get(name, key);
213       Modification mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE, name, key);
214       enqueue(mod);
215       return oldValue;
216    }
217
218    public void remove(Fqn name) throws Exception JavaDoc
219    {
220       Modification mod = new Modification(Modification.ModificationType.REMOVE_NODE, name);
221       enqueue(mod);
222    }
223
224    public void removeData(Fqn name) throws Exception JavaDoc
225    {
226       Modification mod = new Modification(Modification.ModificationType.REMOVE_DATA, name);
227       enqueue(mod);
228    }
229
230    public void start() throws Exception JavaDoc
231    {
232       if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
233       stopped.set(false);
234       super.start();
235       processor = new AsyncProcessor();
236       processor.start();
237    }
238
239    public void stop()
240    {
241       stopped.set(true);
242       if (processor != null)
243       {
244          processor.stop();
245       }
246       super.stop();
247    }
248
249    private void enqueue(Modification mod)
250            throws CacheException, InterruptedException JavaDoc
251    {
252       if (stopped.get())
253       {
254          throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
255       }
256       queue.put(mod);
257    }
258
259    /**
260     * Processes (by batch if possible) a queue of {@link Modification}s.
261     *
262     * @author manik surtani
263     */

264    private class AsyncProcessor implements Runnable JavaDoc
265    {
266       private Thread JavaDoc t;
267
268       // Modifications to process as a single put
269
private final List JavaDoc mods = new ArrayList JavaDoc(config.getBatchSize());
270
271       public void start()
272       {
273          if (t == null || !t.isAlive())
274          {
275             t = new Thread JavaDoc(this, "AsyncCacheLoader-" + threadId.getAndIncrement());
276             //t.setDaemon(true);
277
t.start();
278          }
279       }
280
281       public void stop()
282       {
283          if (t != null)
284          {
285             t.interrupt();
286             try
287             {
288                t.join();
289             }
290             catch (InterruptedException JavaDoc e)
291             {
292             }
293          }
294          if (!queue.isEmpty())
295          {
296             log.warn("Async queue not yet empty, possibly interrupted");
297          }
298       }
299
300       public void run()
301       {
302          while (!Thread.interrupted())
303          {
304             try
305             {
306                run0();
307             }
308             catch (InterruptedException JavaDoc e)
309             {
310                break;
311             }
312          }
313
314          try
315          {
316             if (log.isTraceEnabled()) log.trace("process remaining batch " + mods.size());
317             put(mods);
318             if (log.isTraceEnabled()) log.trace("process remaining queued " + queue.size());
319             while (!queue.isEmpty())
320             {
321                run0();
322             }
323          }
324          catch (InterruptedException JavaDoc e)
325          {
326             log.trace("remaining interrupted");
327          }
328       }
329
330       private void run0() throws InterruptedException JavaDoc
331       {
332          log.trace("run0");
333          Object JavaDoc o = queue.take();
334          addTaken(o);
335          while (mods.size() < config.getBatchSize())
336          {
337             o = queue.poll(config.getPollWait(), TimeUnit.MILLISECONDS);
338             if (o == null)
339             {
340                break;
341             }
342             addTaken(o);
343          }
344          if (log.isTraceEnabled())
345          {
346             log.trace("put " + mods.size());
347          }
348          put(mods);
349          mods.clear();
350       }
351
352       private void addTaken(Object JavaDoc o)
353       {
354          if (o instanceof List JavaDoc)
355          {
356             mods.addAll((List JavaDoc) o);
357          }
358          else
359          {
360             Modification mod = (Modification) o;
361             mods.add(mod);
362          }
363       }
364
365       private void put(List JavaDoc<Modification> mods)
366       {
367          try
368          {
369             AsyncCacheLoader.super.put(mods);
370          }
371          catch (Exception JavaDoc e)
372          {
373             if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e);
374             log.debug("Exception: ", e);
375          }
376       }
377
378       public String JavaDoc toString()
379       {
380          return "TQ t=" + t;
381       }
382
383    }
384
385    public String JavaDoc toString()
386    {
387       return super.toString() +
388               " delegate=[" + super.getCacheLoader() + "]" +
389               " processor=" + processor +
390               " stopped=" + stopped +
391               " batchSize=" + config.getBatchSize() +
392               " pollWait=" + config.getPollWait() +
393               " returnOld=" + config.getReturnOld() +
394               " asyncPut=" + config.getUseAsyncPut() +
395               " queue.remainingCapacity()=" + queue.remainingCapacity() +
396               " queue.peek()=" + queue.peek();
397    }
398
399 }
400
Popular Tags