KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > cache > result > ResultCache


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): Julie Marguerite, Sara Bouchenak, Nicolas Modrzyk.
22  */

23
24 package org.continuent.sequoia.controller.cache.result;
25
26 import java.util.ArrayList JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.HashSet JavaDoc;
29 import java.util.Iterator JavaDoc;
30
31 import org.continuent.sequoia.common.i18n.Translate;
32 import org.continuent.sequoia.common.protocol.Field;
33 import org.continuent.sequoia.common.xml.DatabasesXmlTags;
34 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
35 import org.continuent.sequoia.controller.cache.CacheException;
36 import org.continuent.sequoia.controller.cache.CacheStatistics;
37 import org.continuent.sequoia.controller.cache.result.entries.AbstractResultCacheEntry;
38 import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryEager;
39 import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryNoCache;
40 import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryRelaxed;
41 import org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema;
42 import org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseTable;
43 import org.continuent.sequoia.controller.cache.result.threads.EagerCacheThread;
44 import org.continuent.sequoia.controller.cache.result.threads.RelaxedCacheThread;
45 import org.continuent.sequoia.controller.requests.AbstractRequest;
46 import org.continuent.sequoia.controller.requests.CreateRequest;
47 import org.continuent.sequoia.controller.requests.ParsingGranularities;
48 import org.continuent.sequoia.controller.requests.RequestType;
49 import org.continuent.sequoia.controller.requests.SelectRequest;
50 import org.continuent.sequoia.controller.requests.UpdateRequest;
51 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema;
52
53 /**
54  * This is a query cache implementation with tunable granularity. <br>
55  * Cache invalidation granularity can take on of the following values:
56  * <ul>
57  * <li><code>NO_INVALIDATE</code>: no invalidation, the cache is
58  * inconsistent and this should just be used to determine hit ratio upper bound.
59  * </li>
60  * <li><code>DATABASE</code>: the cache is flushed each time the database is
61  * updated (every INSERT, UPDATE, DELETE, ... statement).</li>
62  * <li><code>TABLE</code>: table granularity, entries in the cache are
63  * invalidated based on table dependencies.</li>
64  * <li><code>COLUMN</code>: column granularity, entries in the cache are
65  * invalidated based on column dependencies</li>
66  * <li><code>COLUMN_UNIQUE</code>: same as <code>COLUMN</code> except that
67  * <code>UNIQUE</code> queries that selects a single row based on a key are
68  * invalidated only when needed.
69  * </ul>
70  *
71  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
72  * @author <a HREF="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
73  * @author <a HREF="mailto:Sara.Bouchenak@epfl.ch">Sara Bouchenak </a>
74  * @author <a HREF="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
75  * @version 1.0
76  */

77 public abstract class ResultCache extends AbstractResultCache
78 {
79   //
80
// How the code is organized?
81
//
82
// 1. Member variables
83
// 2. Constructor
84
// 3. Cache management
85
// 4. Transaction management
86
// 5. Debug/Monitoring
87
//
88

89   // Max number of cache entries
90
private int maxEntries;
91   /** Pending query timeout in ms. Default is: 0 (wait forever). */
92   private long pendingQueryTimeout = 0;
93   // queries: SQL -> AbstractResultCacheEntry
94
private HashMap JavaDoc queries;
95   // Pending SQL requests (String)
96
private HashSet JavaDoc pendingQueries;
97   // The rules to apply for this cache
98
private HashSet JavaDoc cachingRules;
99   private ResultCacheRule defaultRule;
100   private ArrayList JavaDoc relaxedCache;
101
102   // LRU (head) of cache entries for replacement
103
private AbstractResultCacheEntry lruHead;
104   // LRU (tail) of cache entries for replacement
105
private AbstractResultCacheEntry lruTail;
106
107   // Database schema
108
protected CacheDatabaseSchema cdbs;
109
110   private CacheStatistics stats;
111
112   private RelaxedCacheThread relaxedThread;
113   private static final boolean[] TRUE_TRUE = new boolean[]{true,
114       true };
115   private boolean flushingCache;
116   private EagerCacheThread eagerThread;
117   private ArrayList JavaDoc eagerCache;
118
119   /*
120    * Constructor
121    */

122
123   /**
124    * Creates a new <code>Cache</code> instance.
125    *
126    * @param maxEntries maximum number of cache entries
127    * @param pendingTimeout pending queries timeout
128    */

129   public ResultCache(int maxEntries, int pendingTimeout)
130   {
131     this.maxEntries = maxEntries;
132     this.pendingQueryTimeout = pendingTimeout;
133     cdbs = null;
134     stats = new CacheStatistics();
135     queries = new HashMap JavaDoc(1000, (float) 0.75);
136     pendingQueries = new HashSet JavaDoc();
137     cachingRules = new HashSet JavaDoc();
138     relaxedCache = new ArrayList JavaDoc();
139     eagerCache = new ArrayList JavaDoc();
140     lruHead = null;
141     lruTail = null;
142     defaultRule = null;
143     relaxedThread = new RelaxedCacheThread(this);
144     relaxedThread.setPriority(9);
145     relaxedThread.start();
146     eagerThread = new EagerCacheThread(this);
147     eagerThread.setPriority(9);
148     eagerThread.start();
149   }
150
151   /**
152    * Shutdown the result cache and all its threads.
153    */

154   public synchronized void shutdown()
155   {
156     relaxedThread.shutdown();
157     eagerThread.shutdown();
158   }
159
160   /**
161    * Returns the pending query timeout in seconds.
162    *
163    * @return the pending query timeout.
164    * @see #setPendingQueryTimeout
165    */

166   public int getPendingQueryTimeout()
167   {
168     return (int) (pendingQueryTimeout / 1000);
169   }
170
171   /**
172    * Sets the pending query timeout in seconds.
173    *
174    * @param pendingQueryTimeout the pending query timeout to set.
175    * @see #getPendingQueryTimeout
176    */

177   public void setPendingQueryTimeout(int pendingQueryTimeout)
178   {
179     this.pendingQueryTimeout = pendingQueryTimeout * 1000L;
180   }
181
182   /**
183    * Possibly we want to access the queries in the cache for timing purposes
184    *
185    * @return the <code>HashMap</code> of queries (not synchronized)
186    */

187   public HashMap JavaDoc getQueries()
188   {
189     return this.queries;
190   }
191
192   /**
193    * Sets the <code>DatabaseSchema</code> of the current virtual database.
194    *
195    * @param dbs a <code>DatabaseSchema</code> value
196    * @see org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema
197    */

198   public void setDatabaseSchema(DatabaseSchema dbs)
199   {
200     if (cdbs == null)
201     {
202       logger.info(Translate.get("resultcache.setting.database.schema"));
203       cdbs = new CacheDatabaseSchema(dbs);
204     }
205     else
206     { // Schema is updated, compute the diff !
207
CacheDatabaseSchema newSchema = new CacheDatabaseSchema(dbs);
208       ArrayList JavaDoc tables = cdbs.getTables();
209       ArrayList JavaDoc newTables = newSchema.getTables();
210       if (newTables == null)
211       { // New schema is empty (no backend is active anymore)
212
logger.info(Translate.get("resultcache.flusing.whole.cache"));
213         flushCache();
214         cdbs = null;
215         return;
216       }
217
218       // Remove extra-tables
219
for (int i = 0; i < tables.size(); i++)
220       {
221         CacheDatabaseTable t = (CacheDatabaseTable) tables.get(i);
222         if (!newSchema.hasTable(t.getName()))
223         {
224           t.invalidateAll();
225           cdbs.removeTable(t);
226           if (logger.isInfoEnabled())
227             logger.info(Translate
228                 .get("resultcache.removing.table", t.getName()));
229         }
230       }
231
232       // Add missing tables
233
int size = newTables.size();
234       for (int i = 0; i < size; i++)
235       {
236         CacheDatabaseTable t = (CacheDatabaseTable) newTables.get(i);
237         if (!cdbs.hasTable(t.getName()))
238         {
239           cdbs.addTable(t);
240           if (logger.isInfoEnabled())
241             logger.info(Translate.get("resultcache.adding.table", t.getName()));
242         }
243       }
244     }
245   }
246
247   /**
248    * Merge the given <code>DatabaseSchema</code> with the current one.
249    *
250    * @param dbs a <code>DatabaseSchema</code> value
251    * @see org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema
252    */

253   public void mergeDatabaseSchema(DatabaseSchema dbs)
254   {
255     try
256     {
257       logger.info(Translate.get("resultcache.merging.new.database.schema"));
258       cdbs.mergeSchema(new CacheDatabaseSchema(dbs));
259     }
260     catch (Exception JavaDoc e)
261     {
262       logger.error(Translate.get("resultcache.error.while.merging", e));
263     }
264   }
265
266   /**
267    * Add a rule for this <code>ResultCache</code>
268    *
269    * @param rule that contains information on the action to perform for a
270    * specific query
271    */

272   public void addCachingRule(ResultCacheRule rule)
273   {
274     cachingRules.add(rule);
275   }
276
277   /**
278    * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#getDefaultRule()
279    */

280   public ResultCacheRule getDefaultRule()
281   {
282     return defaultRule;
283   }
284
285   /**
286    * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#setDefaultRule(ResultCacheRule)
287    */

288   public void setDefaultRule(ResultCacheRule defaultRule)
289   {
290     this.defaultRule = defaultRule;
291   }
292
293   /**
294    * Finds the behavior of the cache with the given query skeleton. If the query
295    * match a pattern of a rule then we get the associated action for this,
296    * otherwise we look for the default behavior.
297    *
298    * @param request to get action for
299    * @return the <code>CacheBehavior</code> associated for this query.
300    */

301   private CacheBehavior getCacheBehavior(SelectRequest request)
302   {
303     CacheBehavior behavior = null;
304     for (Iterator JavaDoc iter = cachingRules.iterator(); iter.hasNext();)
305     {
306       behavior = ((ResultCacheRule) iter.next()).matches(request);
307       if (behavior != null)
308       {
309         break;
310       }
311     }
312     if (behavior == null)
313       behavior = defaultRule.getCacheBehavior();
314     if (logger.isDebugEnabled())
315       logger.debug(Translate.get("resultcache.behavior.for.request",
316           new String JavaDoc[]{request.getUniqueKey(), behavior.getType()}));
317     return behavior;
318   }
319
320   /*
321    * Cache Management
322    */

323
324   /**
325    * Creates a unique cache entry key from the given request. The key is
326    * currently composed of the login name and the request SQL statement.
327    *
328    * @param request the request to generate the key from
329    * @return a unique cache key for this request
330    */

331   private String JavaDoc getCacheKeyFromRequest(SelectRequest request)
332   {
333     return request.getLogin() + "," + request.getUniqueKey();
334   }
335
336   /**
337    * Do we need invalidation after an update request, given a
338    * ControllerResultSet. Note that this method is meant to be used with unique
339    * queries where the ControllerResultSet is the result of a pk selection (like
340    * an Entity Bean).
341    *
342    * @param result that could be in the cache
343    * @param request the update we want to get updated values from
344    * @return boolean[] {needInvalidate,needToSendQuery}
345    */

346   public boolean[] needInvalidate(ControllerResultSet result,
347       UpdateRequest request)
348   {
349     HashMap JavaDoc updatedValues = request.getUpdatedValues();
350     boolean needInvalidate = false;
351     boolean needToSendQuery = false;
352     String JavaDoc value;
353     String JavaDoc columnName;
354     try
355     {
356       // If we don't have exactly one row, we don't handle the optimization
357
if ((result == null) || (result.getData() == null)
358           || (result.getData().size() != 1))
359         return TRUE_TRUE;
360     }
361     catch (Exception JavaDoc e)
362     {
363       return TRUE_TRUE;
364     }
365     Field[] fields = result.getFields();
366     ArrayList JavaDoc data = result.getData();
367     int size = fields.length;
368     for (Iterator JavaDoc iter = updatedValues.keySet().iterator(); iter.hasNext();)
369     {
370       columnName = (String JavaDoc) iter.next();
371       value = (String JavaDoc) updatedValues.get(columnName);
372       for (int i = 0; i < size; i++)
373       { // Find the corresponding column in the ResultSet by comparing column
374
// names
375

376         // We can have something like:
377
// FIRSTNAME and ADDRESS.FIRSTNAME
378
if (columnName.equals(fields[i].getFieldName()))
379         {
380           Object JavaDoc o = ((Object JavaDoc[]) data.get(0))[i];
381           if (!value.equals(o))
382           {
383             // The value from the cache entry is different we need to update
384
// the
385
// cache and the database
386
return TRUE_TRUE;
387           }
388           else
389             break;
390         }
391       }
392       // We don't need to invalidate the cache because the columns affected are
393
// different but we need to send the query to the database.
394
needToSendQuery = true;
395       // We don't stop here though because other columns could be updated and
396
// we
397
// could need invalidation
398
}
399     return new boolean[]{needInvalidate, needToSendQuery};
400   }
401
402   /**
403    * Adds an entry request/reply to the cache. Note that if the request was
404    * already in the cache, only the result is updated.
405    *
406    * @param request the request
407    * @param result the result corresponding to the request
408    * @exception CacheException if an error occurs
409    */

410   public void addToCache(SelectRequest request, ControllerResultSet result)
411       throws CacheException
412   {
413     boolean notifyThread = false;
414
415     // Sanity checks
416
if (request.getCacheAbility() == RequestType.UNCACHEABLE)
417       throw new CacheException(Translate.get("resultcache.uncacheable.request",
418           request.getUniqueKey()));
419
420     try
421     {
422       synchronized (pendingQueries)
423       {
424         // Remove the pending query from the list and wake up
425
// all waiting queries
426
removeFromPendingQueries(request);
427
428         String JavaDoc sqlQuery = getCacheKeyFromRequest(request);
429
430         if (result == null)
431           throw new CacheException(Translate.get("resultcache.null.result",
432               sqlQuery));
433
434         // Check against streamable ResultSets
435
if (result.hasMoreData())
436         {
437           logger.info(Translate.get("resultcache.streamed.resultset", request
438               .getSqlShortForm(20)));
439           return;
440         }
441
442         if (logger.isDebugEnabled())
443           logger.debug(Translate.get("resultcache.adding.query", sqlQuery));
444
445         AbstractResultCacheEntry ce;
446         synchronized (queries)
447         {
448           // Check first that the query is not already in the cache
449
ce = (AbstractResultCacheEntry) queries.get(sqlQuery);
450           if (ce == null)
451           {
452             // Not in cache, add this entry
453
// check the rule
454
CacheBehavior behavior = getCacheBehavior(request);
455             ce = behavior.getCacheEntry(request, result, this);
456             if (ce instanceof ResultCacheEntryNoCache)
457               return;
458
459             // Test size of cache
460
if (maxEntries > 0)
461             {
462               int size = queries.size();
463               if (size >= maxEntries)
464                 // LRU replacement policy: Remove the oldest cache entry
465
removeOldest();
466             }
467             // Add to the cache
468
queries.put(sqlQuery, ce);
469
470             notifyThread = true;
471           }
472           else
473           { // Oh, oh, already in cache ...
474
if (ce.isValid())
475               logger.warn(Translate.get(
476                   "resultcache.modifying.result.valid.entry", sqlQuery));
477             ce.setResult(result);
478           }
479
480           // Update LRU
481
if (lruHead != null)
482           {
483             lruHead.setPrev(ce);
484             ce.setNext(lruHead);
485             ce.setPrev(null);
486           }
487           if (lruTail == null)
488             lruTail = ce;
489           lruHead = ce; // This is also fine if LRUHead == null
490
}
491         processAddToCache(ce);
492
493         // process thread notification out of the synchronized block on
494
// pending queries to avoid deadlock, while adding/removing
495
// on cache
496
if (notifyThread)
497         {
498           // relaxed entry
499
if (ce instanceof ResultCacheEntryRelaxed)
500           {
501             ResultCacheEntryRelaxed qcer = (ResultCacheEntryRelaxed) ce;
502             synchronized (relaxedThread)
503             {
504               relaxedCache.add(qcer);
505               if (qcer.getDeadline() < relaxedThread.getThreadWakeUpTime()
506                   || relaxedThread.getThreadWakeUpTime() == 0)
507               {
508                 relaxedThread.notify();
509               }
510             }
511           }
512           else if (ce instanceof ResultCacheEntryEager)
513           {
514             // eager entry
515
ResultCacheEntryEager qcee = (ResultCacheEntryEager) ce;
516             if (qcee.getDeadline() != AbstractResultCacheEntry.NO_DEADLINE)
517             { // Only deal with entries that specify a timeout
518
synchronized (eagerThread)
519               {
520                 eagerCache.add(qcee);
521                 if (qcee.getDeadline() < eagerThread.getThreadWakeUpTime()
522                     || eagerThread.getThreadWakeUpTime() == 0)
523                 {
524                   eagerThread.notify();
525                 }
526               }
527             }
528           }
529         }
530       }
531     }
532     catch (OutOfMemoryError JavaDoc oome)
533     {
534       flushCache();
535       System.gc();
536       logger.warn(Translate.get("cache.memory.error.cache.flushed", this
537           .getClass()));
538     }
539   }
540
541   /**
542    * Process the add to cache to update implementation specific data structures.
543    *
544    * @param qe to add to the cache.
545    */

546   protected abstract void processAddToCache(AbstractResultCacheEntry qe);
547
548   /**
549    * Gets the result to the given request from the cache. The returned
550    * <code>AbstractResultCacheEntry</code> is <code>null</code> if the
551    * request is not present in the cache.
552    * <p>
553    * An invalid <code>AbstractResultCacheEntry</code> may be returned (it
554    * means that the result is <code>null</code>) but the already parsed query
555    * can be retrieved from the cache entry.
556    *
557    * @param request an SQL select request
558    * @param addToPendingQueries <code>true</code> if the request must be added
559    * to the pending query list on a cache miss
560    * @return the <code>AbstractResultCacheEntry</code> if found, else
561    * <code>null</code>
562    */

563   public AbstractResultCacheEntry getFromCache(SelectRequest request,
564       boolean addToPendingQueries)
565   {
566     stats.addSelect();
567
568     if (request.getCacheAbility() == RequestType.UNCACHEABLE)
569     {
570       stats.addUncacheable();
571       return null;
572     }
573
574     String JavaDoc sqlQuery = getCacheKeyFromRequest(request);
575
576     // Check if we have the same query pending
577
synchronized (pendingQueries)
578     {
579       if (addToPendingQueries)
580       {
581         long timeout = pendingQueryTimeout;
582         // Yes, wait for the result
583
// As we use a single lock for all pending queries, we use a
584
// while to re-check that this wake-up was for us!
585
while (pendingQueries.contains(sqlQuery))
586         {
587           try
588           {
589             if (logger.isDebugEnabled())
590               logger.debug(Translate.get("resultcache.waiting.pending.query",
591                   sqlQuery));
592
593             if (timeout > 0)
594             {
595               long start = System.currentTimeMillis();
596               pendingQueries.wait(pendingQueryTimeout);
597               long end = System.currentTimeMillis();
598               timeout = timeout - (end - start);
599               if (timeout <= 0)
600               {
601                 logger.warn(Translate.get("resultcache.pending.query.timeout"));
602                 break;
603               }
604             }
605             else
606               pendingQueries.wait();
607           }
608           catch (InterruptedException JavaDoc e)
609           {
610             logger.warn(Translate.get("resultcache.pending.query.timeout"));
611             break;
612           }
613         }
614       }
615
616       // Check the cache
617
AbstractResultCacheEntry ce;
618       synchronized (queries)
619       {
620         ce = (AbstractResultCacheEntry) queries.get(sqlQuery);
621         if (ce == null)
622         // if ((ce == null) || !ce.isValid())
623
{ // Cache miss or dirty entry
624
if (addToPendingQueries)
625           {
626             pendingQueries.add(sqlQuery);
627             // Add this query to the pending queries
628
if (logger.isDebugEnabled())
629             {
630               logger.debug(Translate.get("resultcache.cache.miss"));
631               logger.debug(Translate.get(
632                   "resultcache.adding.to.pending.queries", sqlQuery));
633             }
634           }
635           return null;
636         }
637         else
638         { // Cache hit (must update LRU)
639
// Move cache entry to head of LRU
640
AbstractResultCacheEntry before = ce.getPrev();
641           if (before != null)
642           {
643             AbstractResultCacheEntry after = ce.getNext();
644             before.setNext(after);
645             if (after != null)
646               after.setPrev(before);
647             else
648               // We were the tail, update the tail
649
lruTail = before;
650             ce.setNext(lruHead);
651             ce.setPrev(null);
652             if (lruHead != ce)
653               lruHead.setPrev(ce);
654             lruHead = ce;
655           }
656           // else it was already the LRU head
657
}
658       }
659
660       if (ce.getResult() == null)
661       {
662         if (addToPendingQueries)
663         {
664           pendingQueries.add(sqlQuery);
665           // Add this query to the pending queries
666
if (logger.isDebugEnabled())
667           {
668             logger.debug(Translate.get("resultcache.cache.miss"));
669             logger.debug(Translate.get("resultcache.adding.to.pending.queries",
670                 sqlQuery));
671           }
672         }
673         if (ce.isValid() && logger.isInfoEnabled())
674           logger.info(Translate.get("resultcache.valid.entry.without.result",
675               ce.getRequest().getUniqueKey()));
676       }
677       else
678       {
679         if (logger.isDebugEnabled())
680           logger.debug(Translate.get("resultcache.cache.hit", sqlQuery));
681         stats.addHits();
682       }
683
684       return ce;
685     }
686   }
687
688   /**
689    * Removes an entry from the cache (both request and reply are dropped). The
690    * request is NOT removed from the pending query list, but it shouldn't be in
691    * this list.
692    *
693    * @param request a <code>SelectRequest</code>
694    */

695   public void removeFromCache(SelectRequest request)
696   {
697     String JavaDoc sqlQuery = request.getUniqueKey();
698
699     if (logger.isDebugEnabled())
700       logger.debug("Removing from cache: " + sqlQuery);
701
702     synchronized (queries)
703     {
704       // Remove from the cache
705
AbstractResultCacheEntry ce = (AbstractResultCacheEntry) queries
706           .remove(sqlQuery);
707       if (ce == null)
708         return; // Was not in the cache!
709
else
710       {
711         // Update result set
712
ce.setResult(null);
713         // Update LRU
714
AbstractResultCacheEntry before = ce.getPrev();
715         AbstractResultCacheEntry after = ce.getNext();
716         if (before != null)
717         {
718           before.setNext(after);
719           if (after != null)
720             after.setPrev(before);
721           else
722             // We were the tail, update the tail
723
lruTail = before;
724         }
725         else
726         { // We are the LRUHead
727
lruHead = ce.getNext();
728           if (after != null)
729             after.setPrev(null);
730           else
731             // We were the tail, update the tail
732
lruTail = before;
733         }
734         // Remove links to other cache entries for GC
735
ce.setNext(null);
736         ce.setPrev(null);
737       }
738     }
739   }
740
741   /**
742    * Removes an entry from the pending query list.
743    *
744    * @param request a <code>SelectRequest</code>
745    */

746   public void removeFromPendingQueries(SelectRequest request)
747   {
748     String JavaDoc sqlQuery = getCacheKeyFromRequest(request);
749
750     synchronized (pendingQueries)
751     {
752       // Remove the pending query from the list and wake up
753
// all waiting queries
754
if (pendingQueries.remove(sqlQuery))
755       {
756         if (logger.isDebugEnabled())
757           logger.debug(Translate.get("resultcache.removing.pending.query",
758               sqlQuery));
759         pendingQueries.notifyAll();
760       }
761       else
762         logger.warn(Translate.get("resultcache.removing.pending.query.failed",
763             sqlQuery));
764     }
765   }
766
767   /**
768    * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#isUpdateNecessary(org.continuent.sequoia.controller.requests.UpdateRequest)
769    */

770   public abstract boolean isUpdateNecessary(UpdateRequest request)
771       throws CacheException;
772
773   /**
774    * Notifies the cache that this write request has been issued, so that cache
775    * coherency can be maintained. If the cache is distributed, this method is
776    * reponsible for broadcasting this information to other caches.
777    *
778    * @param request an <code>AbstractRequest</code> value
779    * @exception CacheException if an error occurs
780    */

781   public void writeNotify(AbstractRequest request) throws CacheException
782   {
783     // Update the stats
784
if (request.isInsert())
785       stats.addInsert();
786     else if (request.isUpdate())
787       stats.addUpdate();
788     else if (request.isDelete())
789       stats.addDelete();
790     else if (request.isCreate())
791     {
792       stats.addCreate();
793       // Create: we only need to update the schema
794
if (parsingGranularity != ParsingGranularities.NO_PARSING)
795       {
796         CreateRequest createRequest = (CreateRequest) request;
797         if (createRequest.altersDatabaseSchema()
798             && (createRequest.getDatabaseTable() != null))
799           cdbs
800               .addTable(new CacheDatabaseTable(createRequest.getDatabaseTable()));
801       }
802       return;
803     }
804     else if (request.isDrop())
805     {
806       stats.addDrop();
807       // Drop: we need to update the schema
808
if (parsingGranularity != ParsingGranularities.NO_PARSING)
809       {
810         // Invalidate the cache entries associated with this table
811
for (Iterator JavaDoc iter = request.getSemantic().getWriteSet().iterator(); iter
812             .hasNext();)
813         {
814           String JavaDoc tableName = (String JavaDoc) iter.next();
815           CacheDatabaseTable cdt = cdbs.getTable(tableName);
816           if (cdt != null)
817           {
818             cdt.invalidateAll();
819             cdbs.removeTable(cdt);
820             return;
821           }
822           // else: the table was not previously cached
823
// (no previous 'select' requests on the table).
824
}
825       }
826     }
827     else
828     {
829       stats.addUnknown();
830     }
831     if (logger.isDebugEnabled())
832       logger.debug("Notifying write " + request.getUniqueKey());
833
834     processWriteNotify(request);
835   }
836
837   /**
838    * Implementation specific invalidation of the cache.
839    *
840    * @param request request that invalidates the cache.
841    */

842   protected abstract void processWriteNotify(AbstractRequest request);
843
844   /**
845    * Removes all entries from the cache.
846    */

847   public void flushCache()
848   {
849     // Check if we are already flushing the cache
850
synchronized (this)
851     {
852       if (flushingCache)
853         return;
854       flushingCache = true;
855     }
856
857     try
858     {
859       synchronized (queries)
860       { // Invalidate the whole cache until it is empty
861
while (!queries.isEmpty())
862         {
863           Iterator JavaDoc iter = queries.values().iterator();
864           ((AbstractResultCacheEntry) iter.next()).invalidate();
865         }
866       }
867
868       synchronized (pendingQueries)
869       { // Clean pending queries to unblock everyone if some queries/backends
870
// remained in an unstable state.
871
pendingQueries.clear();
872         pendingQueries.notifyAll();
873       }
874     }
875     finally
876     {
877       synchronized (this)
878       {
879         flushingCache = false;
880       }
881       if (logger.isDebugEnabled())
882         logger.debug(Translate.get("resultcache.cache.flushed"));
883     }
884   }
885
886   /**
887    * Get Cache size
888    *
889    * @return the approximate size of the cache in bytes
890    */

891   public long getCacheSize()
892   {
893     // No need to synchronize, the implementation returns an int
894
return queries.size();
895   }
896
897   /**
898    * Removes the oldest entry from the cache.
899    * <p>
900    * <b>!Warning! </b> This method is not synchronized and should be called in
901    * the scope of a synchronized(queries)
902    */

903   private void removeOldest()
904   {
905     if (lruTail == null)
906       return;
907     // Update the LRU
908
AbstractResultCacheEntry oldce = lruTail;
909     lruTail = lruTail.getPrev();
910     if (lruTail != null)
911       lruTail.setNext(null);
912
913     if (logger.isDebugEnabled())
914       logger.debug(Translate.get("resultcache.removing.oldest.cache.entry",
915           oldce.getRequest().getUniqueKey()));
916
917     /*
918      * We remove the query from the hashtable so that the garbage collector can
919      * do its job. We need to remove the query from the queries HashTable first
920      * in case we invalidate an eager cache entry that will call removeFromCache
921      * (and will try to update the LRU is the entry is still in the queries
922      * HashTable). So, to be compatible with all type of cache entries: 1.
923      * queries.remove(ce) 2. ce.invalidate
924      */

925     queries.remove(oldce.getRequest().getUniqueKey());
926
927     if (oldce.isValid())
928     {
929       oldce.setResult(null);
930       oldce.invalidate();
931     }
932
933     stats.addRemove();
934   }
935
936   /**
937    * Gets the needed query parsing granularity.
938    *
939    * @return needed query parsing granularity
940    */

941   public int getParsingGranularity()
942   {
943     return this.parsingGranularity;
944   }
945
946   /**
947    * Retrieve the name of this cache
948    *
949    * @return name
950    */

951   public abstract String JavaDoc getName();
952
953   //
954
// Transaction management
955
//
956

957   /**
958    * Commit a transaction given its id.
959    *
960    * @param transactionId the transaction id
961    * @throws CacheException if an error occurs
962    */

963   public void commit(long transactionId) throws CacheException
964   {
965     // Ok, the transaction has commited, nothing to do
966
}
967
968   /**
969    * Rollback a transaction given its id.
970    *
971    * @param transactionId the transaction id
972    * @throws CacheException if an error occurs
973    */

974   public void rollback(long transactionId) throws CacheException
975   {
976     logger.info(Translate.get("resultcache.flushing.cache.cause.rollback",
977         transactionId));
978     flushCache();
979   }
980
981   /*
982    * Debug/Monitoring
983    */

984
985   /**
986    * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#getCacheData
987    */

988   public String JavaDoc[][] getCacheData() throws CacheException
989   {
990     try
991     {
992       synchronized (queries)
993       {
994         String JavaDoc[][] data = new String JavaDoc[queries.size()][];
995         int count = 0;
996         for (Iterator JavaDoc iter = queries.values().iterator(); iter.hasNext(); count++)
997         {
998           AbstractResultCacheEntry qe = (AbstractResultCacheEntry) iter.next();
999           if (qe != null)
1000          {
1001            data[count] = qe.toStringTable();
1002          }
1003        }
1004        return data;
1005      }
1006    }
1007    catch (Exception JavaDoc e)
1008    {
1009      logger.error(Translate.get("resultcache.error.retrieving.cache.data", e));
1010      throw new CacheException(e.getMessage());
1011    }
1012  }
1013
1014  /**
1015   * @see org.continuent.sequoia.controller.cache.result.AbstractResultCache#getCacheStatsData()
1016   */

1017  public String JavaDoc[][] getCacheStatsData() throws CacheException
1018  {
1019    String JavaDoc[][] data = new String JavaDoc[1][];
1020    String JavaDoc[] stat = stats.getCacheStatsData();
1021    data[0] = new String JavaDoc[stat.length + 1];
1022    for (int i = 0; i < stat.length; i++)
1023      data[0][i] = stat[i];
1024    data[0][data[0].length - 1] = "" + queries.size();
1025    return data;
1026  }
1027
1028  /**
1029   * @return Returns the stats.
1030   */

1031  public CacheStatistics getCacheStatistics()
1032  {
1033    return stats;
1034  }
1035
1036  /**
1037   * Returns the eagerCache value.
1038   *
1039   * @return Returns the eagerCache.
1040   */

1041  public ArrayList JavaDoc getEagerCache()
1042  {
1043    return eagerCache;
1044  }
1045
1046  /**
1047   * Returns the relaxedCache value.
1048   *
1049   * @return Returns the relaxedCache.
1050   */

1051  public ArrayList JavaDoc getRelaxedCache()
1052  {
1053    return relaxedCache;
1054  }
1055
1056  /**
1057   * Gets information about the request cache
1058   *
1059   * @return <code>String</code> containing information
1060   */

1061  protected String JavaDoc getXmlImpl()
1062  {
1063    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
1064    info.append("<" + DatabasesXmlTags.ELT_ResultCache + " "
1065        + DatabasesXmlTags.ATT_pendingTimeout + "=\"" + pendingQueryTimeout
1066        + "\" " + DatabasesXmlTags.ATT_maxNbOfEntries + "=\"" + maxEntries
1067        + "\" " + DatabasesXmlTags.ATT_granularity + "=\"" + getName() + "\">");
1068    info.append("<" + DatabasesXmlTags.ELT_DefaultResultCacheRule + " "
1069        + DatabasesXmlTags.ATT_timestampResolution + "=\""
1070        + defaultRule.getTimestampResolution() / 1000 + "\">");
1071    info.append(defaultRule.getCacheBehavior().getXml());
1072    info.append("</" + DatabasesXmlTags.ELT_DefaultResultCacheRule + ">");
1073    for (Iterator JavaDoc iter = cachingRules.iterator(); iter.hasNext();)
1074      info.append(((ResultCacheRule) iter.next()).getXml());
1075    info.append("</" + DatabasesXmlTags.ELT_ResultCache + ">");
1076    return info.toString();
1077  }
1078
1079}
Popular Tags