1 23 24 package org.continuent.sequoia.controller.cache.result; 25 26 import java.util.ArrayList ; 27 import java.util.HashMap ; 28 import java.util.HashSet ; 29 import java.util.Iterator ; 30 31 import org.continuent.sequoia.common.i18n.Translate; 32 import org.continuent.sequoia.common.protocol.Field; 33 import org.continuent.sequoia.common.xml.DatabasesXmlTags; 34 import org.continuent.sequoia.controller.backend.result.ControllerResultSet; 35 import org.continuent.sequoia.controller.cache.CacheException; 36 import org.continuent.sequoia.controller.cache.CacheStatistics; 37 import org.continuent.sequoia.controller.cache.result.entries.AbstractResultCacheEntry; 38 import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryEager; 39 import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryNoCache; 40 import org.continuent.sequoia.controller.cache.result.entries.ResultCacheEntryRelaxed; 41 import org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseSchema; 42 import org.continuent.sequoia.controller.cache.result.schema.CacheDatabaseTable; 43 import org.continuent.sequoia.controller.cache.result.threads.EagerCacheThread; 44 import org.continuent.sequoia.controller.cache.result.threads.RelaxedCacheThread; 45 import org.continuent.sequoia.controller.requests.AbstractRequest; 46 import org.continuent.sequoia.controller.requests.CreateRequest; 47 import org.continuent.sequoia.controller.requests.ParsingGranularities; 48 import org.continuent.sequoia.controller.requests.RequestType; 49 import org.continuent.sequoia.controller.requests.SelectRequest; 50 import org.continuent.sequoia.controller.requests.UpdateRequest; 51 import org.continuent.sequoia.controller.sql.schema.DatabaseSchema; 52 53 77 public abstract class ResultCache extends AbstractResultCache 78 { 79 89 private int maxEntries; 91 92 private long pendingQueryTimeout = 0; 93 private HashMap queries; 95 private HashSet pendingQueries; 97 private HashSet cachingRules; 99 private ResultCacheRule defaultRule; 100 private ArrayList relaxedCache; 101 102 private AbstractResultCacheEntry lruHead; 104 private AbstractResultCacheEntry lruTail; 106 107 protected CacheDatabaseSchema cdbs; 109 110 private CacheStatistics stats; 111 112 private RelaxedCacheThread relaxedThread; 113 private static final boolean[] TRUE_TRUE = new boolean[]{true, 114 true }; 115 private boolean flushingCache; 116 private EagerCacheThread eagerThread; 117 private ArrayList eagerCache; 118 119 122 123 129 public ResultCache(int maxEntries, int pendingTimeout) 130 { 131 this.maxEntries = maxEntries; 132 this.pendingQueryTimeout = pendingTimeout; 133 cdbs = null; 134 stats = new CacheStatistics(); 135 queries = new HashMap (1000, (float) 0.75); 136 pendingQueries = new HashSet (); 137 cachingRules = new HashSet (); 138 relaxedCache = new ArrayList (); 139 eagerCache = new ArrayList (); 140 lruHead = null; 141 lruTail = null; 142 defaultRule = null; 143 relaxedThread = new RelaxedCacheThread(this); 144 relaxedThread.setPriority(9); 145 relaxedThread.start(); 146 eagerThread = new EagerCacheThread(this); 147 eagerThread.setPriority(9); 148 eagerThread.start(); 149 } 150 151 154 public synchronized void shutdown() 155 { 156 relaxedThread.shutdown(); 157 eagerThread.shutdown(); 158 } 159 160 166 public int getPendingQueryTimeout() 167 { 168 return (int) (pendingQueryTimeout / 1000); 169 } 170 171 177 public void setPendingQueryTimeout(int pendingQueryTimeout) 178 { 179 this.pendingQueryTimeout = pendingQueryTimeout * 1000L; 180 } 181 182 187 public HashMap getQueries() 188 { 189 return this.queries; 190 } 191 192 198 public void setDatabaseSchema(DatabaseSchema dbs) 199 { 200 if (cdbs == null) 201 { 202 logger.info(Translate.get("resultcache.setting.database.schema")); 203 cdbs = new CacheDatabaseSchema(dbs); 204 } 205 else 206 { CacheDatabaseSchema newSchema = new CacheDatabaseSchema(dbs); 208 ArrayList tables = cdbs.getTables(); 209 ArrayList newTables = newSchema.getTables(); 210 if (newTables == null) 211 { logger.info(Translate.get("resultcache.flusing.whole.cache")); 213 flushCache(); 214 cdbs = null; 215 return; 216 } 217 218 for (int i = 0; i < tables.size(); i++) 220 { 221 CacheDatabaseTable t = (CacheDatabaseTable) tables.get(i); 222 if (!newSchema.hasTable(t.getName())) 223 { 224 t.invalidateAll(); 225 cdbs.removeTable(t); 226 if (logger.isInfoEnabled()) 227 logger.info(Translate 228 .get("resultcache.removing.table", t.getName())); 229 } 230 } 231 232 int size = newTables.size(); 234 for (int i = 0; i < size; i++) 235 { 236 CacheDatabaseTable t = (CacheDatabaseTable) newTables.get(i); 237 if (!cdbs.hasTable(t.getName())) 238 { 239 cdbs.addTable(t); 240 if (logger.isInfoEnabled()) 241 logger.info(Translate.get("resultcache.adding.table", t.getName())); 242 } 243 } 244 } 245 } 246 247 253 public void mergeDatabaseSchema(DatabaseSchema dbs) 254 { 255 try 256 { 257 logger.info(Translate.get("resultcache.merging.new.database.schema")); 258 cdbs.mergeSchema(new CacheDatabaseSchema(dbs)); 259 } 260 catch (Exception e) 261 { 262 logger.error(Translate.get("resultcache.error.while.merging", e)); 263 } 264 } 265 266 272 public void addCachingRule(ResultCacheRule rule) 273 { 274 cachingRules.add(rule); 275 } 276 277 280 public ResultCacheRule getDefaultRule() 281 { 282 return defaultRule; 283 } 284 285 288 public void setDefaultRule(ResultCacheRule defaultRule) 289 { 290 this.defaultRule = defaultRule; 291 } 292 293 301 private CacheBehavior getCacheBehavior(SelectRequest request) 302 { 303 CacheBehavior behavior = null; 304 for (Iterator iter = cachingRules.iterator(); iter.hasNext();) 305 { 306 behavior = ((ResultCacheRule) iter.next()).matches(request); 307 if (behavior != null) 308 { 309 break; 310 } 311 } 312 if (behavior == null) 313 behavior = defaultRule.getCacheBehavior(); 314 if (logger.isDebugEnabled()) 315 logger.debug(Translate.get("resultcache.behavior.for.request", 316 new String []{request.getUniqueKey(), behavior.getType()})); 317 return behavior; 318 } 319 320 323 324 331 private String getCacheKeyFromRequest(SelectRequest request) 332 { 333 return request.getLogin() + "," + request.getUniqueKey(); 334 } 335 336 346 public boolean[] needInvalidate(ControllerResultSet result, 347 UpdateRequest request) 348 { 349 HashMap updatedValues = request.getUpdatedValues(); 350 boolean needInvalidate = false; 351 boolean needToSendQuery = false; 352 String value; 353 String columnName; 354 try 355 { 356 if ((result == null) || (result.getData() == null) 358 || (result.getData().size() != 1)) 359 return TRUE_TRUE; 360 } 361 catch (Exception e) 362 { 363 return TRUE_TRUE; 364 } 365 Field[] fields = result.getFields(); 366 ArrayList data = result.getData(); 367 int size = fields.length; 368 for (Iterator iter = updatedValues.keySet().iterator(); iter.hasNext();) 369 { 370 columnName = (String ) iter.next(); 371 value = (String ) updatedValues.get(columnName); 372 for (int i = 0; i < size; i++) 373 { 376 if (columnName.equals(fields[i].getFieldName())) 379 { 380 Object o = ((Object []) data.get(0))[i]; 381 if (!value.equals(o)) 382 { 383 return TRUE_TRUE; 387 } 388 else 389 break; 390 } 391 } 392 needToSendQuery = true; 395 } 399 return new boolean[]{needInvalidate, needToSendQuery}; 400 } 401 402 410 public void addToCache(SelectRequest request, ControllerResultSet result) 411 throws CacheException 412 { 413 boolean notifyThread = false; 414 415 if (request.getCacheAbility() == RequestType.UNCACHEABLE) 417 throw new CacheException(Translate.get("resultcache.uncacheable.request", 418 request.getUniqueKey())); 419 420 try 421 { 422 synchronized (pendingQueries) 423 { 424 removeFromPendingQueries(request); 427 428 String sqlQuery = getCacheKeyFromRequest(request); 429 430 if (result == null) 431 throw new CacheException(Translate.get("resultcache.null.result", 432 sqlQuery)); 433 434 if (result.hasMoreData()) 436 { 437 logger.info(Translate.get("resultcache.streamed.resultset", request 438 .getSqlShortForm(20))); 439 return; 440 } 441 442 if (logger.isDebugEnabled()) 443 logger.debug(Translate.get("resultcache.adding.query", sqlQuery)); 444 445 AbstractResultCacheEntry ce; 446 synchronized (queries) 447 { 448 ce = (AbstractResultCacheEntry) queries.get(sqlQuery); 450 if (ce == null) 451 { 452 CacheBehavior behavior = getCacheBehavior(request); 455 ce = behavior.getCacheEntry(request, result, this); 456 if (ce instanceof ResultCacheEntryNoCache) 457 return; 458 459 if (maxEntries > 0) 461 { 462 int size = queries.size(); 463 if (size >= maxEntries) 464 removeOldest(); 466 } 467 queries.put(sqlQuery, ce); 469 470 notifyThread = true; 471 } 472 else 473 { if (ce.isValid()) 475 logger.warn(Translate.get( 476 "resultcache.modifying.result.valid.entry", sqlQuery)); 477 ce.setResult(result); 478 } 479 480 if (lruHead != null) 482 { 483 lruHead.setPrev(ce); 484 ce.setNext(lruHead); 485 ce.setPrev(null); 486 } 487 if (lruTail == null) 488 lruTail = ce; 489 lruHead = ce; } 491 processAddToCache(ce); 492 493 if (notifyThread) 497 { 498 if (ce instanceof ResultCacheEntryRelaxed) 500 { 501 ResultCacheEntryRelaxed qcer = (ResultCacheEntryRelaxed) ce; 502 synchronized (relaxedThread) 503 { 504 relaxedCache.add(qcer); 505 if (qcer.getDeadline() < relaxedThread.getThreadWakeUpTime() 506 || relaxedThread.getThreadWakeUpTime() == 0) 507 { 508 relaxedThread.notify(); 509 } 510 } 511 } 512 else if (ce instanceof ResultCacheEntryEager) 513 { 514 ResultCacheEntryEager qcee = (ResultCacheEntryEager) ce; 516 if (qcee.getDeadline() != AbstractResultCacheEntry.NO_DEADLINE) 517 { synchronized (eagerThread) 519 { 520 eagerCache.add(qcee); 521 if (qcee.getDeadline() < eagerThread.getThreadWakeUpTime() 522 || eagerThread.getThreadWakeUpTime() == 0) 523 { 524 eagerThread.notify(); 525 } 526 } 527 } 528 } 529 } 530 } 531 } 532 catch (OutOfMemoryError oome) 533 { 534 flushCache(); 535 System.gc(); 536 logger.warn(Translate.get("cache.memory.error.cache.flushed", this 537 .getClass())); 538 } 539 } 540 541 546 protected abstract void processAddToCache(AbstractResultCacheEntry qe); 547 548 563 public AbstractResultCacheEntry getFromCache(SelectRequest request, 564 boolean addToPendingQueries) 565 { 566 stats.addSelect(); 567 568 if (request.getCacheAbility() == RequestType.UNCACHEABLE) 569 { 570 stats.addUncacheable(); 571 return null; 572 } 573 574 String sqlQuery = getCacheKeyFromRequest(request); 575 576 synchronized (pendingQueries) 578 { 579 if (addToPendingQueries) 580 { 581 long timeout = pendingQueryTimeout; 582 while (pendingQueries.contains(sqlQuery)) 586 { 587 try 588 { 589 if (logger.isDebugEnabled()) 590 logger.debug(Translate.get("resultcache.waiting.pending.query", 591 sqlQuery)); 592 593 if (timeout > 0) 594 { 595 long start = System.currentTimeMillis(); 596 pendingQueries.wait(pendingQueryTimeout); 597 long end = System.currentTimeMillis(); 598 timeout = timeout - (end - start); 599 if (timeout <= 0) 600 { 601 logger.warn(Translate.get("resultcache.pending.query.timeout")); 602 break; 603 } 604 } 605 else 606 pendingQueries.wait(); 607 } 608 catch (InterruptedException e) 609 { 610 logger.warn(Translate.get("resultcache.pending.query.timeout")); 611 break; 612 } 613 } 614 } 615 616 AbstractResultCacheEntry ce; 618 synchronized (queries) 619 { 620 ce = (AbstractResultCacheEntry) queries.get(sqlQuery); 621 if (ce == null) 622 { if (addToPendingQueries) 625 { 626 pendingQueries.add(sqlQuery); 627 if (logger.isDebugEnabled()) 629 { 630 logger.debug(Translate.get("resultcache.cache.miss")); 631 logger.debug(Translate.get( 632 "resultcache.adding.to.pending.queries", sqlQuery)); 633 } 634 } 635 return null; 636 } 637 else 638 { AbstractResultCacheEntry before = ce.getPrev(); 641 if (before != null) 642 { 643 AbstractResultCacheEntry after = ce.getNext(); 644 before.setNext(after); 645 if (after != null) 646 after.setPrev(before); 647 else 648 lruTail = before; 650 ce.setNext(lruHead); 651 ce.setPrev(null); 652 if (lruHead != ce) 653 lruHead.setPrev(ce); 654 lruHead = ce; 655 } 656 } 658 } 659 660 if (ce.getResult() == null) 661 { 662 if (addToPendingQueries) 663 { 664 pendingQueries.add(sqlQuery); 665 if (logger.isDebugEnabled()) 667 { 668 logger.debug(Translate.get("resultcache.cache.miss")); 669 logger.debug(Translate.get("resultcache.adding.to.pending.queries", 670 sqlQuery)); 671 } 672 } 673 if (ce.isValid() && logger.isInfoEnabled()) 674 logger.info(Translate.get("resultcache.valid.entry.without.result", 675 ce.getRequest().getUniqueKey())); 676 } 677 else 678 { 679 if (logger.isDebugEnabled()) 680 logger.debug(Translate.get("resultcache.cache.hit", sqlQuery)); 681 stats.addHits(); 682 } 683 684 return ce; 685 } 686 } 687 688 695 public void removeFromCache(SelectRequest request) 696 { 697 String sqlQuery = request.getUniqueKey(); 698 699 if (logger.isDebugEnabled()) 700 logger.debug("Removing from cache: " + sqlQuery); 701 702 synchronized (queries) 703 { 704 AbstractResultCacheEntry ce = (AbstractResultCacheEntry) queries 706 .remove(sqlQuery); 707 if (ce == null) 708 return; else 710 { 711 ce.setResult(null); 713 AbstractResultCacheEntry before = ce.getPrev(); 715 AbstractResultCacheEntry after = ce.getNext(); 716 if (before != null) 717 { 718 before.setNext(after); 719 if (after != null) 720 after.setPrev(before); 721 else 722 lruTail = before; 724 } 725 else 726 { lruHead = ce.getNext(); 728 if (after != null) 729 after.setPrev(null); 730 else 731 lruTail = before; 733 } 734 ce.setNext(null); 736 ce.setPrev(null); 737 } 738 } 739 } 740 741 746 public void removeFromPendingQueries(SelectRequest request) 747 { 748 String sqlQuery = getCacheKeyFromRequest(request); 749 750 synchronized (pendingQueries) 751 { 752 if (pendingQueries.remove(sqlQuery)) 755 { 756 if (logger.isDebugEnabled()) 757 logger.debug(Translate.get("resultcache.removing.pending.query", 758 sqlQuery)); 759 pendingQueries.notifyAll(); 760 } 761 else 762 logger.warn(Translate.get("resultcache.removing.pending.query.failed", 763 sqlQuery)); 764 } 765 } 766 767 770 public abstract boolean isUpdateNecessary(UpdateRequest request) 771 throws CacheException; 772 773 781 public void writeNotify(AbstractRequest request) throws CacheException 782 { 783 if (request.isInsert()) 785 stats.addInsert(); 786 else if (request.isUpdate()) 787 stats.addUpdate(); 788 else if (request.isDelete()) 789 stats.addDelete(); 790 else if (request.isCreate()) 791 { 792 stats.addCreate(); 793 if (parsingGranularity != ParsingGranularities.NO_PARSING) 795 { 796 CreateRequest createRequest = (CreateRequest) request; 797 if (createRequest.altersDatabaseSchema() 798 && (createRequest.getDatabaseTable() != null)) 799 cdbs 800 .addTable(new CacheDatabaseTable(createRequest.getDatabaseTable())); 801 } 802 return; 803 } 804 else if (request.isDrop()) 805 { 806 stats.addDrop(); 807 if (parsingGranularity != ParsingGranularities.NO_PARSING) 809 { 810 for (Iterator iter = request.getSemantic().getWriteSet().iterator(); iter 812 .hasNext();) 813 { 814 String tableName = (String ) iter.next(); 815 CacheDatabaseTable cdt = cdbs.getTable(tableName); 816 if (cdt != null) 817 { 818 cdt.invalidateAll(); 819 cdbs.removeTable(cdt); 820 return; 821 } 822 } 825 } 826 } 827 else 828 { 829 stats.addUnknown(); 830 } 831 if (logger.isDebugEnabled()) 832 logger.debug("Notifying write " + request.getUniqueKey()); 833 834 processWriteNotify(request); 835 } 836 837 842 protected abstract void processWriteNotify(AbstractRequest request); 843 844 847 public void flushCache() 848 { 849 synchronized (this) 851 { 852 if (flushingCache) 853 return; 854 flushingCache = true; 855 } 856 857 try 858 { 859 synchronized (queries) 860 { while (!queries.isEmpty()) 862 { 863 Iterator iter = queries.values().iterator(); 864 ((AbstractResultCacheEntry) iter.next()).invalidate(); 865 } 866 } 867 868 synchronized (pendingQueries) 869 { pendingQueries.clear(); 872 pendingQueries.notifyAll(); 873 } 874 } 875 finally 876 { 877 synchronized (this) 878 { 879 flushingCache = false; 880 } 881 if (logger.isDebugEnabled()) 882 logger.debug(Translate.get("resultcache.cache.flushed")); 883 } 884 } 885 886 891 public long getCacheSize() 892 { 893 return queries.size(); 895 } 896 897 903 private void removeOldest() 904 { 905 if (lruTail == null) 906 return; 907 AbstractResultCacheEntry oldce = lruTail; 909 lruTail = lruTail.getPrev(); 910 if (lruTail != null) 911 lruTail.setNext(null); 912 913 if (logger.isDebugEnabled()) 914 logger.debug(Translate.get("resultcache.removing.oldest.cache.entry", 915 oldce.getRequest().getUniqueKey())); 916 917 925 queries.remove(oldce.getRequest().getUniqueKey()); 926 927 if (oldce.isValid()) 928 { 929 oldce.setResult(null); 930 oldce.invalidate(); 931 } 932 933 stats.addRemove(); 934 } 935 936 941 public int getParsingGranularity() 942 { 943 return this.parsingGranularity; 944 } 945 946 951 public abstract String getName(); 952 953 957 963 public void commit(long transactionId) throws CacheException 964 { 965 } 967 968 974 public void rollback(long transactionId) throws CacheException 975 { 976 logger.info(Translate.get("resultcache.flushing.cache.cause.rollback", 977 transactionId)); 978 flushCache(); 979 } 980 981 984 985 988 public String [][] getCacheData() throws CacheException 989 { 990 try 991 { 992 synchronized (queries) 993 { 994 String [][] data = new String [queries.size()][]; 995 int count = 0; 996 for (Iterator iter = queries.values().iterator(); iter.hasNext(); count++) 997 { 998 AbstractResultCacheEntry qe = (AbstractResultCacheEntry) iter.next(); 999 if (qe != null) 1000 { 1001 data[count] = qe.toStringTable(); 1002 } 1003 } 1004 return data; 1005 } 1006 } 1007 catch (Exception e) 1008 { 1009 logger.error(Translate.get("resultcache.error.retrieving.cache.data", e)); 1010 throw new CacheException(e.getMessage()); 1011 } 1012 } 1013 1014 1017 public String [][] getCacheStatsData() throws CacheException 1018 { 1019 String [][] data = new String [1][]; 1020 String [] stat = stats.getCacheStatsData(); 1021 data[0] = new String [stat.length + 1]; 1022 for (int i = 0; i < stat.length; i++) 1023 data[0][i] = stat[i]; 1024 data[0][data[0].length - 1] = "" + queries.size(); 1025 return data; 1026 } 1027 1028 1031 public CacheStatistics getCacheStatistics() 1032 { 1033 return stats; 1034 } 1035 1036 1041 public ArrayList getEagerCache() 1042 { 1043 return eagerCache; 1044 } 1045 1046 1051 public ArrayList getRelaxedCache() 1052 { 1053 return relaxedCache; 1054 } 1055 1056 1061 protected String getXmlImpl() 1062 { 1063 StringBuffer info = new StringBuffer (); 1064 info.append("<" + DatabasesXmlTags.ELT_ResultCache + " " 1065 + DatabasesXmlTags.ATT_pendingTimeout + "=\"" + pendingQueryTimeout 1066 + "\" " + DatabasesXmlTags.ATT_maxNbOfEntries + "=\"" + maxEntries 1067 + "\" " + DatabasesXmlTags.ATT_granularity + "=\"" + getName() + "\">"); 1068 info.append("<" + DatabasesXmlTags.ELT_DefaultResultCacheRule + " " 1069 + DatabasesXmlTags.ATT_timestampResolution + "=\"" 1070 + defaultRule.getTimestampResolution() / 1000 + "\">"); 1071 info.append(defaultRule.getCacheBehavior().getXml()); 1072 info.append("</" + DatabasesXmlTags.ELT_DefaultResultCacheRule + ">"); 1073 for (Iterator iter = cachingRules.iterator(); iter.hasNext();) 1074 info.append(((ResultCacheRule) iter.next()).getXml()); 1075 info.append("</" + DatabasesXmlTags.ELT_ResultCache + ">"); 1076 return info.toString(); 1077 } 1078 1079} | Popular Tags |