1 package com.jofti.cache.adapter; 2 3 import java.util.ArrayList ; 4 import java.util.Collection ; 5 import java.util.HashMap ; 6 import java.util.HashSet ; 7 import java.util.Iterator ; 8 import java.util.List ; 9 import java.util.Map ; 10 import java.util.Properties ; 11 import java.util.Set ; 12 13 import javax.transaction.HeuristicMixedException ; 14 import javax.transaction.HeuristicRollbackException ; 15 import javax.transaction.RollbackException ; 16 import javax.transaction.Status ; 17 import javax.transaction.Synchronization ; 18 import javax.transaction.SystemException ; 19 import javax.transaction.Transaction ; 20 21 import net.sf.ehcache.CacheException; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.jboss.cache.Fqn; 26 import org.jboss.cache.GlobalTransaction; 27 import org.jboss.cache.Node; 28 import org.jboss.cache.PropertyConfigurator; 29 import org.jboss.cache.TreeCache; 30 import org.jboss.cache.TreeCacheListener; 31 import org.jboss.cache.lock.IsolationLevel; 32 import org.jgroups.View; 33 34 35 36 import com.jofti.api.IndexCache; 37 import com.jofti.api.IndexQuery; 38 import com.jofti.api.NameSpaceKey; 39 import com.jofti.cache.CacheAdapter; 40 import com.jofti.cache.BaseAdaptor; 41 import com.jofti.cache.NameSpacedCacheAdapter; 42 import com.jofti.core.INameSpaceAware; 43 import com.jofti.core.IParsedQuery; 44 import com.jofti.core.ITransactionAware; 45 import com.jofti.core.InternalIndex; 46 import com.jofti.core.QueryId; 47 import com.jofti.core.QueryType; 48 import com.jofti.core.TransactionLevel; 49 import com.jofti.exception.JoftiException; 50 import com.jofti.introspect.ClassIntrospector; 51 import com.jofti.parser.ClassFieldMethods; 52 import com.jofti.tree.NameSpacedTreeIndex; 53 import com.jofti.util.CompositeComparator; 54 import com.jofti.util.ObjectProcedureAdapter; 55 import com.jofti.util.OpenHashMap; 56 import com.jofti.util.ValueTreeMap; 57 import com.tangosol.dev.compiler.Manager; 58 59 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 60 61 75 public class JBossCacheAdapter extends BaseAdaptor implements CacheAdapter, 76 NameSpacedCacheAdapter, ITransactionAware 77 { 78 79 TreeCache cache = null; 80 81 javax.transaction.TransactionManager txMgr = null; 82 83 private static Log log = LogFactory 84 .getLog(JBossCacheAdapter.class); 85 86 private String name; 87 88 89 TransactionLevel level = null; 90 91 private int expiryTime = 3600; 93 private boolean requiresStarting; 94 95 private Indexer defaultIndexer = new Indexer(); 97 98 private Map transactionMap = new ConcurrentHashMap(); 99 100 private Map xaMap = new ConcurrentHashMap(); 101 102 105 public int getExpiryTime() 106 { 107 return expiryTime; 108 } 109 110 114 public void setExpiryTime(int expiryTime) 115 { 116 this.expiryTime = expiryTime; 117 } 118 119 public JBossCacheAdapter() 120 { 121 requiresStarting = true; 122 } 123 124 public JBossCacheAdapter(Object cache) 125 { 126 this.cache = (TreeCache) cache; 127 } 128 129 public void setCacheImpl(Object cache) 130 { 131 this.cache = (TreeCache) cache; 132 try { 133 initCache(this.cache); 134 } catch (Exception e) { 135 throw new RuntimeException (e); 136 } 137 138 } 139 140 141 144 public Object get(Object key) 145 { 146 147 if (key != null) { 149 if (key instanceof NameSpaceKey) { 150 NameSpaceKey temp = null; 151 try { 152 temp = (NameSpaceKey) key; 153 } catch (ClassCastException cce) { 154 log.warn("key value must be a NameSpaceKey ", cce); 155 return null; 156 } 157 159 try { 160 synchronized (getCacheLock(key)) { 161 Object val = cache.get((Fqn) temp.getNameSpace(), temp 162 .getKey()); 163 return val; 164 } 165 } catch (org.jboss.cache.CacheException ce) { 166 log.warn("Unable to retrieve value from cache ", ce); 167 } 168 169 } else { 170 log 171 .warn("Unable to retrieve value from cache Key type must be " 172 + NameSpaceKey.class); 173 } 174 175 } 176 return null; 179 } 180 181 private Object getFromCache(Object key) throws JoftiException 182 { 183 184 if (key instanceof NameSpaceKey) { 186 NameSpaceKey temp = null; 187 try { 188 temp = (NameSpaceKey) key; 189 } catch (ClassCastException cce) { 190 log.warn("key value must be a NameSpaceKey ", cce); 191 return null; 192 } 193 195 try { 196 Object val = cache.get((Fqn) temp.getNameSpace(), temp 197 .getKey()); 198 return val; 199 } catch (Throwable ce) { 200 throw new JoftiException(ce); 201 } 202 203 } else { 204 log 205 .warn("Unable to retrieve value from cache Key type must be " 206 + NameSpaceKey.class); 207 } 208 209 return null; 212 } 213 214 215 218 public void put(Object key, Object value) throws JoftiException 219 { 220 221 NameSpaceKey temp = null; 222 223 try { 224 temp = (NameSpaceKey) key; 225 } catch (ClassCastException cce) { 226 throw new JoftiException("key value must be a NameSpaceKey ", cce); 227 } 228 229 Transaction tx = getTransaction(); 231 232 JBossIndexer indexer = getIndexer(tx); 234 235 try { 236 acquireUpdateLock(); 237 }catch (Exception e){ 238 log.error("unable to acquire update lock ",e); 239 throw new JoftiException(e); 240 } 241 try { 242 synchronized (getCacheLock(key)) { 243 Object result = cache.get((Fqn) temp.getNameSpace(), temp 245 .getKey()); 246 if (log.isDebugEnabled()){ 247 log.debug( "inserting " + temp.getNameSpace() + " " + temp 248 .getKey()); 249 } 250 cache.put((Fqn) temp.getNameSpace(), temp.getKey(), value); 251 252 if (result != null) { 253 indexer.update(temp, result, value); 254 } else { 255 indexer.add(temp, value); 256 } 257 } 258 registerIndexer(indexer, tx); 259 260 } catch (org.jboss.cache.CacheException ce) { 261 throw new JoftiException(ce); 262 } catch (Exception e) { 263 throw new JoftiException(e); 264 }finally { 265 releaseUpdateLock(); 266 } 267 268 269 } 270 271 private Transaction getTransaction() throws JoftiException 273 { 274 Transaction tx = null; 275 try { 276 if (txMgr != null) { 277 tx = txMgr.getTransaction(); 278 } 279 } catch (SystemException se) { 280 throw new JoftiException(se); 281 } 282 return tx; 283 } 284 285 293 public void removeNameSpace(Object nameSpace) throws JoftiException 294 { 295 try{ 296 297 acquireUpdateLock(); 298 try { 299 synchronized (getCacheLock(nameSpace)) { 300 Node result = null; 302 Fqn temp = null; 303 if (nameSpace instanceof String ) { 304 temp = Fqn.fromString((String ) nameSpace); 305 306 } else if (nameSpace instanceof Fqn) { 307 temp = (Fqn) nameSpace; 308 } else { 309 throw new JoftiException( 310 "namespace object must be a String or an Fqn object " 311 + nameSpace.getClass()); 312 } 313 result = cache.get(temp); 315 316 if (result != null){ 319 320 Map childMap = result.getChildren(); 321 List nodesToRemove = parseChildren(childMap, new ArrayList (), 322 temp.toString() + "/"); 323 nodesToRemove.add(temp.toString()); 324 if (log.isDebugEnabled()) { 325 log.debug(nodesToRemove); 326 } 327 328 329 for (Iterator it = nodesToRemove.iterator(); it.hasNext();) { 330 cache.remove((String ) it.next()); 331 } 332 333 334 } 335 } 336 } catch (org.jboss.cache.CacheException ce) { 337 throw new JoftiException(ce); 338 } catch (Exception e) { 339 throw new JoftiException(e); 340 }finally { 341 releaseUpdateLock(); 342 } 343 } catch (Exception e){ 344 log.error("unable to acquire update lock",e); 345 } 346 } 347 348 private List parseChildren(Map childMap, List tempList, String prefix) 349 { 350 if (childMap != null) { 351 for (Iterator it = childMap.entrySet().iterator(); it.hasNext();) { 352 Map.Entry entry = (Map.Entry ) it.next(); 353 Map map = ((Node) entry.getValue()).getChildren(); 354 parseChildren(map, tempList, prefix + "/" + entry.getKey() 355 + "/"); 356 tempList.add(prefix + entry.getKey()); 357 } 358 } 359 return tempList; 360 } 361 362 365 public void remove(Object key) throws JoftiException 366 { 367 368 NameSpaceKey temp = null; 369 370 try { 371 temp = (NameSpaceKey) key; 372 } catch (ClassCastException cce) { 373 throw new JoftiException("key value must be a JBossKeyWrapper ", 374 cce); 375 } 376 Transaction tx = getTransaction(); 377 JBossIndexer indexer = getIndexer(tx); 378 379 try{ 380 acquireUpdateLock(); 381 try { 382 383 synchronized (getCacheLock(key)) { 384 385 Object result = cache.remove((Fqn) temp.getNameSpace(), temp 386 .getKey()); 387 388 if (result != null) { 389 indexer.remove(temp, result); 390 } else { 391 if (index.contains(temp)) { 392 indexer.remove(temp); 393 } 394 } 395 } 396 registerIndexer(indexer, tx); 397 } catch (org.jboss.cache.CacheException ce) { 398 throw new JoftiException(ce); 399 } catch (Exception e) { 400 throw new JoftiException(e); 401 }finally { 402 releaseUpdateLock(); 403 } 404 } catch (Exception e){ 405 log.error("unable to acquire update lock",e); 406 } 407 408 } 409 410 415 public void removeAll() throws JoftiException 416 { 417 try { 418 cache.remove("/"); 419 420 index.removeAll(); 421 422 } catch (Exception ce) { 423 throw new JoftiException(ce); 424 } 425 426 } 427 428 private synchronized JBossIndexer getIndexer(Transaction tx) 429 throws JoftiException 430 { 431 432 JBossIndexer indexer = null; 433 434 if (tx == null || level == TransactionLevel.NONE 435 || level == TransactionLevel.READ_UNCOMMITTED) { 436 indexer = defaultIndexer; 438 } else { 439 441 indexer = (JBossIndexer) transactionMap.get(tx); 442 443 if (indexer == null) { 444 indexer = new TransactionIndexer(index.getIntrospector()); 445 ((TransactionIndexer) indexer).init(); 446 transactionMap.put(tx, indexer); 447 } 448 } 449 return indexer; 450 } 451 452 private synchronized void registerIndexer(JBossIndexer indexer, 453 Transaction tx) throws SystemException , RollbackException , 454 JoftiException 455 { 456 if (indexer instanceof TransactionIndexer 457 && (!xaMap.containsKey(indexer))) { 458 try { 459 tx.registerSynchronization((Synchronization ) indexer); 460 transactionMap.put(tx, indexer); 461 xaMap.put(indexer, tx); 462 if (log.isDebugEnabled()) { 463 log.debug("registering transaction " + tx + " in cache " 464 + cache.getLocalAddress()); 465 } 466 } catch (Exception e) { 467 throw new JoftiException(e); 468 } 469 } 470 471 } 472 473 476 public synchronized void init(Properties properties) throws JoftiException 477 { 478 try { 479 String cacheConfigFile = null; 480 if (properties != null) { 481 String key = null; 482 for (Iterator it = properties.keySet().iterator(); it.hasNext();) { 483 key = (String ) it.next(); 484 if (MUTABLE_VALUES.equalsIgnoreCase(key)) { 485 checkMutable = Boolean.valueOf( 486 properties.getProperty(key)).booleanValue(); 487 if (log.isInfoEnabled()) { 488 log.info("Mutability checking is set to " 489 + checkMutable); 490 } 491 } 492 if ("file".equalsIgnoreCase(key)) { 493 cacheConfigFile = properties.getProperty(key); 494 } 495 } 496 } 497 if (cache == null) { 498 cache = new TreeCache(); 499 } 500 if (requiresStarting) { 501 PropertyConfigurator config = new PropertyConfigurator(); 502 if (cacheConfigFile == null){ 503 throw new JoftiException("Config file cannot be null in config - check you have set the config file property correctly"); 504 }else{ 505 config.configure(cache, cacheConfigFile); 506 } 507 } 508 509 initCache(cache); 510 511 } catch (Exception e) { 512 throw new JoftiException(e); 513 } 514 515 } 516 517 private void initCache(TreeCache cache) throws JoftiException 518 { 519 cache.addTreeCacheListener(new EventListener()); 520 txMgr = cache.getTransactionManager(); 521 level = getTransactionLevel(); 522 } 523 524 527 public void destroy() throws JoftiException 528 { 529 cache.destroy(); 530 } 531 532 public String getName() 533 { 534 return name; 535 } 536 537 public void setName(String name) 538 { 539 this.name = name; 540 } 541 542 public String toString() 543 { 544 return "JBossCache(" + getName() + ')'; 545 } 546 547 552 public Object getCacheImpl() 553 { 554 return cache; 555 } 556 557 562 public void setInternalIndex(InternalIndex index) 563 { 564 if (!(index instanceof NameSpacedTreeIndex)){ 565 throw new RuntimeException ("JBossCache tree index must be set to "+ NameSpacedTreeIndex.class); 566 } 567 this.index = index; 568 569 } 570 571 class TransactionIndexer implements Synchronization , JBossIndexer 572 { 573 574 575 protected ChangeRecorder recorder = null; 576 577 TransactionIndexer(ClassIntrospector parser) 578 { 579 recorder = new ChangeRecorder(parser); 580 } 581 582 587 588 public void init() throws JoftiException 589 { 590 recorder.init(new Properties (),index.getClass().getName()); 591 } 592 593 public void beforeCompletion() 594 { 595 597 } 598 599 604 public void afterCompletion(int status) 605 { 606 Object obj = xaMap.get(this); 609 xaMap.remove(this); 610 transactionMap.remove(obj); 611 switch (status) 612 { 613 case Status.STATUS_COMMITTED: 614 try { 615 616 acquireUpdateLock(); 617 try { 618 applyAll(); 619 } catch (Throwable e) { 620 log.error("failed committing changes to index", e); 621 }finally{ 622 releaseUpdateLock(); 623 } 624 } catch (JoftiException e){ 625 log.error("unable to acquire update lock",e); 626 } 627 break; 628 629 case Status.STATUS_MARKED_ROLLBACK: case Status.STATUS_ROLLEDBACK: 632 default: 633 ; 634 } 635 636 } 637 638 public void update(NameSpaceKey keyWrapper, Object oldValue, 639 Object newValue) throws JoftiException 640 { 641 642 recorder.update(keyWrapper, oldValue, newValue, index 643 .getIntrospector()); 644 645 } 646 647 public void add(NameSpaceKey keyWrapper, Object value) 648 throws JoftiException 649 { 650 recorder.add(keyWrapper, value, index.getIntrospector()); 651 } 652 653 public void remove(NameSpaceKey keyWrapper, Object value) 654 throws JoftiException 655 { 656 recorder.remove(keyWrapper, value, index.getIntrospector()); 657 } 658 659 660 public void remove(NameSpaceKey keyWrapper) throws JoftiException 661 { 662 recorder.remove(keyWrapper, index.getIntrospector()); 663 664 } 665 666 private void applyAll() throws JoftiException 667 { 668 if (log.isDebugEnabled()) { 670 log.debug("Applying all index updates after transaction commit on cache " 671 + cache.getLocalAddress()); 672 } 673 Map removes = recorder.getRemovedMap(); 674 for (Iterator it = removes.entrySet().iterator(); it.hasNext();) { 675 Map.Entry entry = (Map.Entry ) it.next(); 676 Object key = entry.getKey(); 677 List values = (List ) entry.getValue(); 678 679 synchronized (getCacheLock(key)) { 680 for (Iterator valIt = values.iterator(); valIt.hasNext();) { 681 Object value = valIt.next(); 682 683 index.removeByKey((Comparable ) key); 684 if (log.isDebugEnabled()) { 685 log.info("index removed " + key + " value " 686 + value); 687 } 688 } 689 } 690 } 691 for (Iterator it = recorder.getRemovedKeys().iterator(); it 693 .hasNext();) { 694 Object key = it.next(); 695 696 synchronized (getCacheLock(key)) { 697 698 index.removeByKey((Comparable ) key); 699 if (log.isDebugEnabled()) { 700 log.debug("index removed entries for " + key); 701 } 702 } 703 } 704 705 Map updates = recorder.getUpdatedMap(); 707 for (Iterator it = updates.entrySet().iterator(); it.hasNext();) { 708 Map.Entry entry = (Map.Entry ) it.next(); 709 Object key = entry.getKey(); 710 List values = (List ) entry.getValue(); 711 synchronized (getCacheLock(key)) { 712 for (Iterator valIt = values.iterator(); valIt.hasNext();) { 713 Object value = valIt.next(); 714 index.removeByKey((Comparable ) key); 715 if (log.isDebugEnabled()) { 716 log.debug("index removed for update " + key 717 + " value " + value); 718 } 719 } 720 } 721 } 722 723 Collection col = recorder.getAllTreeValues(); 725 for (Iterator it = col.iterator(); it.hasNext();) { 726 NameSpaceKey wrapper = (NameSpaceKey) it.next(); 727 728 synchronized (getCacheLock(wrapper)) { 729 Object val = null; 730 731 try { 732 val = cache._get((Fqn) wrapper.getNameSpace(), wrapper 733 .getKey(), false); 734 } catch (Exception e) { 737 log.warn("Unable to insert entry " + wrapper 738 + " into index ", e); 739 } 740 if (val != null) { 741 if (log.isDebugEnabled()) { 742 log.debug("index removed for update " + wrapper 743 + " value " + val); 744 } 745 index.insert(wrapper, val); 746 } else { 747 log.warn("Entry lookup for " + wrapper 748 + " in transaction tree is null in cache "); 749 } 750 } 751 } 752 if (log.isDebugEnabled()) { 753 log 754 .debug("Finished Applying all index updates after transaction commit " 755 + cache.getLocalAddress()); 756 } 757 } 758 759 764 public boolean contains(NameSpaceKey keyWrapper) throws JoftiException 765 { 766 767 return index.contains(keyWrapper); 768 } 769 770 776 public Map query(IndexQuery query, IndexCache parent) throws JoftiException 777 { 778 779 782 acquireQueryLock(); 784 785 786 OpenHashMap localResults =null; 787 final ChangeRecorder tempRecorder = recorder; 788 OpenHashMap processedQueryMap =null; 789 790 Object nameSpace = ((INameSpaceAware) query) 791 .getNameSpace(); 792 793 if (((QueryId)query).getQueryType() != QueryType.PARSED_QUERY) { 794 query = index.getParserManager().parseQuery(query); 795 796 } 797 798 try { 799 final OpenHashMap queryMap = (OpenHashMap)index.query(query); 801 802 804 queryMap.forEachKey(new ObjectProcedureAdapter(){ 805 public boolean apply(Object key){ 806 807 if (tempRecorder.getRemovedKeys().contains(key) 808 || tempRecorder.getRemovedMap().containsKey(key) 809 || tempRecorder.getUpdatedMap().containsKey(key)){ 810 queryMap.removeNoReHash(key); 811 } 812 return true; 813 } 814 }); 815 processedQueryMap = queryMap; 816 817 localResults = (OpenHashMap)recorder.query(query); 819 } finally { 820 releaseQueryLock(); 821 } 822 Map res = mergeOpenMaps(parent, processedQueryMap, localResults ); 824 825 826 827 return getCacheValues(parent,res,nameSpace,(IParsedQuery)query,index.getIntrospector()); 828 829 } 830 831 832 833 protected Map mergeOpenMaps(IndexCache parent, OpenHashMap map1, OpenHashMap map2){ 834 OpenHashMap smaller =null; 835 OpenHashMap larger =null; 836 if (map1.size() < map2.size()){ 837 smaller = map1; 838 larger = map2; 839 }else{ 840 smaller =map2; 841 larger = map1; 842 } 843 844 larger.ensureCapacity(larger.size() + smaller.size() *2); 845 846 final OpenHashMap tempLarger = larger; 847 smaller.forEachPair(new ObjectProcedureAdapter(){ 848 public boolean apply(Object key, Object value){ 849 if (!tempLarger.containsKey(key)){ 850 tempLarger.put(key, value); 851 } 852 return false; 853 } 854 }); 855 return larger; 856 } 857 858 } 859 860 861 868 869 class Indexer implements JBossIndexer 870 { 871 872 878 public void update(NameSpaceKey keyWrapper, Object oldValue, 879 Object newValue) throws JoftiException 880 { 881 882 index.removeByKey(keyWrapper); 883 884 index.insert(keyWrapper, newValue); 885 886 } 887 888 894 public void add(NameSpaceKey keyWrapper, Object value) 895 throws JoftiException 896 { 897 index.insert(keyWrapper, value); 898 899 } 900 901 907 public void remove(NameSpaceKey keyWrapper, Object value) 908 throws JoftiException 909 { 910 index.removeByKey(keyWrapper); 911 912 } 913 914 919 public void remove(NameSpaceKey keyWrapper) throws JoftiException 920 { 921 index.removeByKey(keyWrapper); 922 923 } 924 925 930 public boolean contains(NameSpaceKey keyWrapper) throws JoftiException 931 { 932 933 return index.contains(keyWrapper); 934 } 935 936 941 public Map query(IndexQuery query, IndexCache parent) throws JoftiException 942 { 943 944 Map temp =null; 945 946 Object nameSpace = ((INameSpaceAware) query) 947 .getNameSpace(); 948 949 if (((QueryId)query).getQueryType() != QueryType.PARSED_QUERY) { 950 query = index.getParserManager().parseQuery(query); 951 952 } 953 954 acquireQueryLock(); 955 956 try { 957 temp = index.query(query); 958 } finally { 959 releaseQueryLock(); 960 } 961 962 return getCacheValues(parent, temp, nameSpace,(IParsedQuery)query,index.getIntrospector()); 963 } 964 965 } 966 967 private boolean isValidForNameSpace(Object nameSpace, Object keyNameSpace) 968 { 969 970 return (nameSpace.equals(keyNameSpace) || ((Fqn) keyNameSpace) 971 .isChildOf((Fqn) nameSpace)); 972 } 973 974 979 public void start() throws JoftiException 980 { 981 982 try { 983 if (requiresStarting) { 984 if (cache == null){ 985 log.warn("JBossCache instance is NULL in adaptor - ensure Cache has been configured correctly"); 986 } 987 cache.start(); 988 } 989 txMgr = cache.getTransactionManager(); 990 loadInitialValues(cache); 991 } catch (Exception e) { 992 993 throw new JoftiException(e); 994 } 995 } 996 997 1003 public void put(Object nameSpace, Object key, Object value) 1004 throws JoftiException 1005 { 1006 if (nameSpace instanceof Fqn) { 1007 put(new NameSpaceKeyWrapper((Fqn) nameSpace, key), value); 1008 } else if (nameSpace instanceof String ) { 1009 put( 1010 new NameSpaceKeyWrapper(Fqn.fromString((String ) nameSpace), 1011 key), value); 1012 } else { 1013 throw new JoftiException("Unable to insert value " + value 1014 + " namespace must be either a " + Fqn.class 1015 + "or a String "); 1016 } 1017 } 1018 1019 1025 public Object get(Object nameSpace, Object key) 1026 { 1027 1028 if (nameSpace instanceof Fqn) { 1029 return get(new NameSpaceKeyWrapper((Fqn) nameSpace, key)); 1030 } else if (nameSpace instanceof String ) { 1031 return get(new NameSpaceKeyWrapper(Fqn 1032 .fromString((String ) nameSpace), key)); 1033 } else { 1034 return null; 1035 } 1036 } 1037 1038 1039 1040 1043 public void remove(Object nameSpace, Object key) throws JoftiException 1044 { 1045 if (nameSpace instanceof Fqn) { 1046 remove(new NameSpaceKeyWrapper((Fqn) nameSpace, key)); 1047 } else if (nameSpace instanceof String ) { 1048 remove(new NameSpaceKeyWrapper(Fqn.fromString((String ) nameSpace), 1049 key)); 1050 } 1051 1052 } 1053 1054 1059 public Map query(IndexQuery query) throws JoftiException 1060 { 1061 1062 IndexQuery origQuery = query; 1063 if (((QueryId)query).getQueryType() == QueryType.UNPARSED_QUERY) { 1064 query = (IndexQuery) index.getParserManager().parseQuery( query); 1065 } 1066 if (query instanceof INameSpaceAware) { 1068 INameSpaceAware orig = (INameSpaceAware) query; 1069 Fqn tempFqn = null; 1071 1072 Object tempNameSpace = orig.getNameSpace(); 1073 if (tempNameSpace ==null){ 1074 throw new JoftiException("NameSpace is required for JBossCache query: "+ origQuery); 1075 }else if (orig.getNameSpace() instanceof String ) { 1076 tempFqn = Fqn.fromString((String ) orig.getNameSpace()); 1077 orig.setNameSpace(tempFqn); 1078 } else if (!(orig.getNameSpace() instanceof Fqn)) { 1079 throw new JoftiException( 1080 "name space object must be a string or Fqn for JBoss IndexCache"); 1081 } 1082 1083 } 1084 1085 Transaction tx = getTransaction(); 1086 1087 JBossIndexer indexer = getIndexer(tx); 1088 1089 try { 1090 registerIndexer(indexer, tx); 1091 } catch (Exception e) { 1092 throw new JoftiException(e); 1093 } 1094 1095 1096 return indexer.query(query, this); 1097 1098 1099 } 1100 1101 public IndexQuery addQuery(String name, IndexQuery query)throws JoftiException { 1102 1103 return index.getParserManager().addQuery(name, query); 1104 } 1105 1106 1109 public IndexQuery getQuery(String name) { 1110 1111 return index.getParserManager().getQuery(name); 1112 } 1113 1114 protected Map getCacheValues(final IndexCache cache, Map col,final Object nameSpace,final IParsedQuery query, ClassIntrospector introspector 1115 ) 1116 throws JoftiException 1117 { 1118 final Map returnClasses = query.getResultFieldsMap(); 1119 final CompositeComparator comp = query.getOrderingComparator(); 1120 final int maxResults = query.getMaxResults(); 1121 final int startEntry = query.getFirstResult(); 1122 1123 Map interim = null; 1124 if (comp ==null || comp.getSize() ==0){ 1125 interim =new HashMap (col.size() + 2, 1.00f); 1126 }else{ 1127 interim = new ValueTreeMap(comp); 1128 } 1129 1130 final Map temp = interim; 1132 1133 final Object [] errors = new Object [1]; 1134 OpenHashMap originalMap = (OpenHashMap)col; 1136 1137 boolean noError = originalMap.forEachPair(new ObjectProcedureAdapter(){ 1138 public boolean apply(Object key,Object value){ 1139 1140 if (value == null || returnClasses.containsKey(value)){ 1141 1142 1143 Object result =null; 1144 1145 if (isValidForNameSpace(nameSpace, ((NameSpaceKey) key) 1147 .getNameSpace())) 1148 { 1149 try { 1150 result = getCacheValue(cache, key, nameSpace); 1151 1152 }catch (JoftiException e){ 1153 log.warn("unable to get cache value for key " + key + " in nameSpace "+ nameSpace,e); 1154 errors[0] = new JoftiException("unable to get cache value for key " + key + " in nameSpace "+ nameSpace,e); 1155 return false; 1156 } 1157 1158 if (result == null){ 1159 1160 if(log.isWarnEnabled()){ 1161 log.warn("Index and cache have become out of date for key "+key); 1162 } 1163 1164 return true; 1165 }else{ 1166 1167 if (checkMutable) { 1168 result = checkMutable(key, result); 1169 if (result == null){ 1171 if (log.isDebugEnabled()){ 1172 log.debug("Object under key:"+key +" has changed in cache since it was indexed"); 1173 } 1174 return true; 1175 } 1176 } 1177 1178 if (returnClasses != null){ 1179 ClassFieldMethods fieldSet = (ClassFieldMethods)returnClasses.get(value); 1181 1182 Map tempMap = fieldSet != null ? fieldSet.getFieldMap():null; 1183 1184 if (tempMap != null && tempMap.size()>0 ){ 1185 log.warn("field is set for "+ value); 1187 }else{ 1188 temp.put(key, result); 1190 } 1191 }else{ 1192 temp.put(key, result); 1193 } 1194 1195 } 1196 } 1197 } 1198 return true; 1199 } 1200 }); 1201 if (noError){ 1202 if (maxResults >0 || startEntry >0){ 1204 return limitResults(temp, startEntry, maxResults); 1205 }else if (startEntry <0 || maxResults <0){ 1206 if (startEntry <0){ 1207 throw new IllegalArgumentException ("startResult cannot be less than 0:"+startEntry); 1208 } 1209 if (maxResults <0){ 1210 throw new IllegalArgumentException ("maxResults cannot be less than 0:"+maxResults); 1211 } 1212 } 1213 return temp; 1214 }else{ 1215 throw (JoftiException)errors[0]; 1216 } 1217 1218 1219 } 1220 1221 1222 protected Object checkMutable(Object key, Object result) 1223 { 1224 try { 1225 Map cacheObjectValues = index.getIntrospector().getAttributeValues( 1227 result); 1228 1229 Map indexObjectValues = index.getAttributesByKey((Comparable ) key); 1230 if (cacheObjectValues.equals(indexObjectValues)) { 1231 return result; 1232 } else { 1233 if (log.isDebugEnabled()) { 1234 log.debug("Object under Key " + key 1235 + " - attributes changed without re-insert"); 1236 } 1237 } 1238 1239 } catch (JoftiException e) { 1240 log.warn("Error checking mutability", e); 1241 } 1242 1243 return null; 1244 } 1245 1246 private void loadInitialValues(TreeCache temp) throws JoftiException 1247 { 1248 Throwable t =null; 1249 Collection fqns = new ArrayList (); 1251 1252 Transaction tx = null; 1253 1254 try { 1255 if (cache.getIsolationLevelClass() != IsolationLevel.NONE) { 1256 tx = getTransaction(); 1257 if (tx == null) { 1258 if (cache.getTransactionManager() == null){ 1259 log.warn("No TransactionManager founf in TreeCache for isolation level "+ cache.getIsolationLevel()); 1260 throw new JoftiException("You must have a transaction manager configured in JBossCache to use Isolation level "+ cache.getIsolationLevel()); 1261 } 1262 cache.getTransactionManager().begin(); 1263 tx = getTransaction(); 1264 } 1265 } 1266 Fqn start = cache.getRoot().getFqn(); 1267 fqns.add(start); 1268 try { 1269 fqns = getChildrenForFqn(temp, start, fqns); 1270 } catch (org.jboss.cache.CacheException e) { 1271 throw new JoftiException(e); 1272 } 1273 1274 for (Iterator it = fqns.iterator(); it.hasNext();) { 1276 Fqn fqn = (Fqn) it.next(); 1277 Node node = cache.get(fqn); 1278 1279 if (node != null && node.getDataKeys() != null 1280 && node.getDataKeys().size() > 0) { 1281 Set dataKeys = node.getDataKeys(); 1282 for (Iterator mapIt = dataKeys.iterator(); mapIt 1283 .hasNext();) { 1284 Object key = mapIt.next(); 1285 Object value = node.get(key); 1286 index.insert(new NameSpaceKeyWrapper(fqn, key), value); 1287 } 1288 } 1289 } 1290 } catch (Exception e) { 1291 t = e; 1293 index.removeAll(); 1294 if (cache.getIsolationLevelClass() != IsolationLevel.NONE) { 1295 try { 1296 log 1297 .warn("Unable to complete index of initial values - rolling back " 1298 + e); 1299 if (tx != null){ 1300 tx.rollback(); 1301 }else{ 1302 log.warn("Expected to rollback Transaction but Transaction is null"); 1303 } 1304 } catch (IllegalStateException e1) { 1305 throw new JoftiException(e1); 1306 } catch (SystemException e1) { 1307 throw new JoftiException(e1); 1308 }catch (Throwable t1){ 1309 throw new JoftiException("Unable to rollback tx as is "+ tx); 1310 } 1311 } 1312 1313 } finally { 1314 if (cache.getIsolationLevelClass() != IsolationLevel.NONE) { 1315 try { 1316 if (tx != null && !isRollback(tx.getStatus())){ 1317 1318 tx.commit(); 1319 }else{ 1320 log.info("Transaction not committed as marked as rolledback or null"); 1321 } 1322 log.info("initial data transaction loaded"); 1323 } catch (SecurityException e) { 1324 1325 throw new JoftiException(e); 1326 } catch (RollbackException e) { 1327 1328 throw new JoftiException(e); 1329 } catch (HeuristicMixedException e) { 1330 1331 throw new JoftiException(e); 1332 } catch (HeuristicRollbackException e) { 1333 1334 throw new JoftiException(e); 1335 } catch (SystemException e) { 1336 1337 throw new JoftiException(e); 1338 } 1339 } 1340 if (t != null){ 1341 if (t instanceof JoftiException){ 1342 throw (JoftiException) t; 1343 }else{ 1344 throw new JoftiException(t); 1345 } 1346 } 1347 } 1348 1349 } 1350 1351 private Collection getChildrenForFqn(TreeCache temp, Fqn fqn, 1352 Collection names) throws org.jboss.cache.CacheException 1353 { 1354 1355 Set set = temp.getChildrenNames(fqn); 1356 if (set != null && !set.isEmpty()) { 1357 int size = set.size(); 1358 Iterator it = set.iterator(); 1359 for (int i=0;i<size;i++) { 1360 Fqn tempFqn = Fqn.fromString((String ) it.next()); 1361 1362 getChildrenForFqn(temp, tempFqn, names); 1363 names.add(tempFqn); 1364 } 1365 return names; 1366 1367 } else { 1368 return names; 1369 } 1370 1371 } 1372 1373 private Object getCacheValue(IndexCache parent, Object key, 1374 Object nameSpace) throws JoftiException 1375 { 1376 Object result = null; 1377 1378 if (key != null 1379 && isValidForNameSpace(nameSpace, ((NameSpaceKey) key) 1380 .getNameSpace())) { 1381 1382 1385 result = ((JBossCacheAdapter)parent).getFromCache(key); 1386 1387 } 1388 return result; 1389 } 1390 1391 class EventListener implements TreeCacheListener 1392 { 1393 1394 1399 public void nodeCreated(Fqn arg0) 1400 { 1401 1404 } 1405 1406 1411 public void nodeRemoved(Fqn arg0) 1412 { 1413 if (log.isDebugEnabled()) { 1414 log.debug("node removed triggered for " + arg0); 1415 } 1416 try { 1417 Transaction tx = getTransaction(); 1418 1419 final JBossIndexer indexer = getIndexer(tx); 1420 registerIndexer(indexer, tx); 1421 1422 OpenHashMap searchKeys = (OpenHashMap)index.getEntries(new NameSpaceWrapper(arg0)); 1423 1424 searchKeys.forEachKey(new ObjectProcedureAdapter(){ 1426 1429 public boolean apply(Object element) { 1430 try { 1431 indexer.remove((NameSpaceKeyWrapper)element); 1432 }catch (Exception e){ 1433 log.warn("unable to remove key "+ element); 1435 } 1436 return true; 1437 } 1438 }); 1439 1440 1441 1442 } catch (Exception e) { 1443 log.warn("unable to remove entries in node removal " + arg0); 1444 } 1445 1446 } 1447 1448 1453 public void nodeLoaded(Fqn arg0) 1454 { 1455 1457 } 1458 1459 1464 public void nodeEvicted(Fqn arg0) 1465 { 1466 1468 } 1469 1470 1475 public void nodeModified(Fqn arg0) 1476 { 1477 1478 try { 1482 1484 Transaction tx = getTransaction(); 1485 1486 GlobalTransaction gtx = cache.getTransactionTable().get(tx); 1487 1488 if (gtx != null && gtx.getAddress() != null 1489 && (!gtx.getAddress().equals(cache.getLocalAddress()))) { 1490 if (log.isDebugEnabled()) { 1491 log.debug("gtx " + gtx.getAddress() + " cache address " 1492 + cache.getLocalAddress()); 1493 } 1494 final List indexKeys = new ArrayList (); 1496 1497 1498 OpenHashMap searchKeys = (OpenHashMap)index.getEntries(new NameSpaceWrapper(arg0)); 1499 1500 searchKeys.forEachKey(new ObjectProcedureAdapter(){ 1502 1505 public boolean apply(Object element) { 1506 try { 1507 NameSpaceKey entry = (NameSpaceKey) element; 1508 indexKeys.add(entry.getKey()); 1509 }catch (Exception e){ 1510 log.warn("unable to remove key "+ element); 1512 } 1513 return true; 1514 } 1515 }); 1516 1517 JBossIndexer indexer = getIndexer(tx); 1519 registerIndexer(indexer, tx); 1520 1522 Set cacheKeys = cache._get(arg0).getDataKeys(); 1523 1524 if (indexKeys.size() == cacheKeys.size()) { 1525 if (log.isDebugEnabled()) { 1529 log.debug("Node key updated for " + arg0); 1530 } 1531 for (Iterator it = indexKeys.iterator(); it.hasNext();) { 1532 Object tempKey = it.next(); 1533 if (log.isDebugEnabled()) { 1534 log.debug("Node key removing " + tempKey); 1535 } 1536 1537 NameSpaceKey tempWrapper = new NameSpaceKeyWrapper( 1538 arg0, tempKey); 1539 indexer.remove(tempWrapper); 1540 Object temp = cache.peek(arg0, tempKey); 1541 if (log.isDebugEnabled()) { 1542 log.debug("Node key adding " + tempKey); 1543 } 1544 indexer.add(tempWrapper, temp); 1545 } 1546 1547 } else if (indexKeys.size() < cacheKeys.size()) { 1548 if (log.isDebugEnabled()) { 1551 log.debug("Node key added for " + arg0 1552 + " indexKeys:" + indexKeys + " cacheKeys" 1553 + cacheKeys); 1554 } 1555 Set tempSet = new HashSet (cacheKeys); 1556 tempSet.removeAll(indexKeys); 1557 for (Iterator it = tempSet.iterator(); it.hasNext();) { 1559 Object tempKey = it.next(); 1560 Object temp = cache.peek(arg0, tempKey); 1561 if (log.isDebugEnabled()) { 1562 log.debug("Node key adding " + tempKey); 1563 } 1564 indexer.add(new NameSpaceKeyWrapper(arg0, tempKey), 1565 temp); 1566 } 1567 } else { 1568 if (log.isDebugEnabled()) { 1570 log.debug("Node key removed for " + arg0); 1571 } 1572 indexKeys.removeAll(cacheKeys); 1573 for (Iterator it = indexKeys.iterator(); it.hasNext();) { 1575 Object tempKey = it.next(); 1576 if (log.isDebugEnabled()) { 1577 log.debug("Node key removed " + tempKey); 1578 } 1579 indexer.remove(new NameSpaceKeyWrapper(arg0, 1580 tempKey)); 1581 } 1582 } 1583 } 1584 1585 } catch (Exception e) { 1586 log.warn("node modification failed to update index with node " 1587 + arg0, e); 1588 } 1589 1590 } 1591 1592 1597 public void nodeVisited(Fqn arg0) 1598 { 1599 1601 } 1602 1603 1608 public void cacheStarted(TreeCache arg0) 1609 { 1610 1612 } 1613 1614 1619 public void cacheStopped(TreeCache arg0) 1620 { 1621 1623 } 1624 1625 1630 public void viewChange(View arg0) 1631 { 1632 1634 } 1635 1636 } 1637 1638 1643 public TransactionLevel getTransactionLevel() throws JoftiException 1644 { 1645 if (cache.getIsolationLevelClass() == IsolationLevel.NONE) { 1646 return TransactionLevel.NONE; 1647 } else if (cache.getIsolationLevelClass() == IsolationLevel.READ_COMMITTED) { 1648 return TransactionLevel.READ_COMMITTED; 1649 } else if (cache.getIsolationLevelClass() == IsolationLevel.READ_UNCOMMITTED) { 1650 return TransactionLevel.READ_UNCOMMITTED; 1651 } else if (cache.getIsolationLevelClass() == IsolationLevel.REPEATABLE_READ) { 1652 return TransactionLevel.REPEATABLE_READ; 1653 } else { 1654 return TransactionLevel.SERIALIZABLE; 1655 } 1656 } 1657 1658 1659 1660 protected Object getCacheValue(Object key) { 1661 return get(key); 1662 } 1663 1664 1667 public InternalIndex getIndex() { 1668 1669 return null; 1670 } 1671 1672 1673 boolean isRollback(int status) { 1674 return status==Status.STATUS_MARKED_ROLLBACK || 1675 status==Status.STATUS_ROLLING_BACK || 1676 status==Status.STATUS_ROLLEDBACK; 1677 } 1678 1679} 1680 | Popular Tags |