1 7 package org.jboss.cache; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.jboss.cache.buddyreplication.BuddyGroup; 12 import org.jboss.cache.buddyreplication.BuddyManager; 13 import org.jboss.cache.buddyreplication.BuddyNotInitException; 14 import org.jboss.cache.buddyreplication.GravitateResult; 15 import org.jboss.cache.config.BuddyReplicationConfig; 16 import org.jboss.cache.config.CacheLoaderConfig; 17 import org.jboss.cache.config.Configuration; 18 import org.jboss.cache.config.RuntimeConfig; 19 import org.jboss.cache.factories.InterceptorChainFactory; 20 import org.jboss.cache.factories.NodeFactory; 21 import org.jboss.cache.interceptors.Interceptor; 22 import org.jboss.cache.jmx.CacheJmxWrapper; 23 import org.jboss.cache.jmx.CacheJmxWrapperMBean; 24 import org.jboss.cache.loader.CacheLoader; 25 import org.jboss.cache.loader.CacheLoaderManager; 26 import org.jboss.cache.loader.NodeData; 27 import org.jboss.cache.lock.IsolationLevel; 28 import org.jboss.cache.lock.LockStrategyFactory; 29 import org.jboss.cache.lock.LockUtil; 30 import org.jboss.cache.lock.LockingException; 31 import org.jboss.cache.lock.NodeLock; 32 import org.jboss.cache.lock.TimeoutException; 33 import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher; 34 import org.jboss.cache.marshall.MethodCall; 35 import org.jboss.cache.marshall.MethodCallFactory; 36 import org.jboss.cache.marshall.MethodDeclarations; 37 import org.jboss.cache.marshall.RegionNameConflictException; 38 import org.jboss.cache.marshall.RegionNotFoundException; 39 import org.jboss.cache.marshall.VersionAwareMarshaller; 40 import org.jboss.cache.notifications.Notifier; 41 import org.jboss.cache.optimistic.DataVersion; 42 import org.jboss.cache.statetransfer.StateTransferManager; 43 import org.jboss.cache.util.ExposedByteArrayOutputStream; 44 import org.jboss.util.stream.MarshalledValueInputStream; 45 import org.jboss.util.stream.MarshalledValueOutputStream; 46 import org.jgroups.Address; 47 import org.jgroups.Channel; 48 import org.jgroups.ChannelClosedException; 49 import org.jgroups.ChannelNotConnectedException; 50 import org.jgroups.ExtendedMembershipListener; 51 import org.jgroups.ExtendedMessageListener; 52 import org.jgroups.JChannel; 53 import org.jgroups.Message; 54 import org.jgroups.MessageListener; 55 import org.jgroups.View; 56 import org.jgroups.blocks.GroupRequest; 57 import org.jgroups.blocks.RpcDispatcher; 58 import org.jgroups.jmx.JChannelFactoryMBean; 59 import org.jgroups.stack.IpAddress; 60 import org.jgroups.util.Rsp; 61 import org.jgroups.util.RspList; 62 import org.jgroups.util.Util; 63 import org.w3c.dom.Element ; 64 65 import javax.management.MBeanServer ; 66 import javax.management.ObjectName ; 67 import javax.transaction.Status ; 68 import javax.transaction.SystemException ; 69 import javax.transaction.Transaction ; 70 import javax.transaction.TransactionManager ; 71 import java.io.ByteArrayInputStream ; 72 import java.io.ByteArrayOutputStream ; 73 import java.io.IOException ; 74 import java.io.InputStream ; 75 import java.io.NotSerializableException ; 76 import java.io.OutputStream ; 77 import java.lang.reflect.Method ; 78 import java.util.ArrayList ; 79 import java.util.Arrays ; 80 import java.util.Collections ; 81 import java.util.HashMap ; 82 import java.util.HashSet ; 83 import java.util.Iterator ; 84 import java.util.LinkedList ; 85 import java.util.List ; 86 import java.util.Map ; 87 import java.util.Set ; 88 import java.util.Vector ; 89 import java.util.concurrent.ConcurrentHashMap ; 90 import java.util.concurrent.CopyOnWriteArraySet ; 91 92 106 public class CacheImpl implements Cloneable , ExtendedMembershipListener, CacheSPI 107 { 108 private static final String CREATE_MUX_CHANNEL = "createMultiplexerChannel"; 109 private static final String [] MUX_TYPES = {"java.lang.String", "java.lang.String"}; 110 111 protected NodeSPI root; 112 113 private RegionManager regionManager = null; 114 115 118 protected JChannel channel = null; 119 120 123 protected boolean coordinator = false; 124 125 128 protected final static Log log = LogFactory.getLog(CacheImpl.class); 129 130 133 protected final Vector <Address> members = new Vector <Address>(); 134 135 138 protected RpcDispatcher disp = null; 139 140 143 protected MessageListenerAdaptor ml = new MessageListenerAdaptor(); 144 145 148 private final TransactionTable tx_table = new TransactionTable(); 149 150 153 private final Map <Thread , List<NodeLock>> lock_table = new ConcurrentHashMap <Thread , List<NodeLock>>(); 154 155 159 protected Set <Fqn> internalFqns = new CopyOnWriteArraySet <Fqn>(); 160 161 164 protected volatile boolean isStateSet = false; 165 166 protected String evictionInterceptorClass = "org.jboss.cache.interceptors.EvictionInterceptor"; 167 168 171 protected VersionAwareMarshaller marshaller_ = null; 172 173 177 protected Interceptor interceptor_chain = null; 178 179 183 protected TransactionManagerLookup tm_lookup = null; 184 185 188 protected TransactionManager tm = null; 189 190 protected CacheLoaderManager cacheLoaderManager; 191 192 195 protected CacheLoaderConfig cloaderConfig; 196 197 200 protected ReplicationQueue repl_queue = null; 201 202 205 protected boolean useCreateService = false; 206 207 210 protected BuddyManager buddyManager; 211 212 215 private StateTransferManager stateTransferManager; 216 private Notifier notifier; 217 private CacheJmxWrapperMBean cacheMBean; 218 219 220 public StateTransferManager getStateTransferManager() 221 { 222 if (stateTransferManager == null) 223 { 224 stateTransferManager = new StateTransferManager(this); 225 } 226 return stateTransferManager; 227 } 228 229 public void setStateTransferManager(StateTransferManager manager) 230 { 231 this.stateTransferManager = manager; 232 } 233 234 private long stateFetchTimeout; 235 236 private ThreadLocal <InvocationContext> invocationContextContainer = new ThreadLocal <InvocationContext>(); 237 238 public boolean started; 239 240 public Configuration getConfiguration() 241 { 242 return configuration; 243 } 244 245 private Configuration configuration = new Configuration(this); 246 247 private RPCManager rpcManager; 248 249 public RPCManager getRpcManager() 250 { 251 return rpcManager; 252 } 253 254 public void setRpcManager(RPCManager rpcManager) 255 { 256 this.rpcManager = rpcManager; 257 } 258 259 262 public CacheImpl(Configuration configuration) throws Exception 263 { 264 notifier = new Notifier(this); 265 this.configuration = configuration; 266 regionManager = new RegionManager(this); 267 } 268 269 272 public CacheImpl() throws Exception 273 { 274 notifier = new Notifier(this); 275 regionManager = new RegionManager(this); 276 } 277 278 281 public CacheImpl(JChannel channel) throws Exception 282 { 283 notifier = new Notifier(this); 284 this.channel = channel; 285 regionManager = new RegionManager(this); 286 } 287 288 291 public String getVersion() 292 { 293 return Version.printVersion(); 294 } 295 296 300 public NodeSPI getRoot() 301 { 302 return root; 303 } 304 305 308 public Address getLocalAddress() 309 { 310 return channel != null ? channel.getLocalAddress() : null; 311 } 312 313 316 public Vector <Address> getMembers() 317 { 318 return members; 319 } 320 321 324 public boolean isCoordinator() 325 { 326 return coordinator; 327 } 328 329 332 public TransactionTable getTransactionTable() 333 { 334 return tx_table; 335 } 336 337 340 public Map <Thread , List<NodeLock>> getLockTable() 341 { 342 return lock_table; 343 } 344 345 348 public String dumpTransactionTable() 349 { 350 return tx_table.toString(true); 351 } 352 353 358 public boolean getDeadlockDetection() 359 { 360 return false; 361 } 362 363 368 public void setDeadlockDetection(boolean dt) 369 { 370 log.warn("Using deprecated configuration element 'DeadlockDetection'. Will be ignored."); 371 } 372 373 376 public void setInterceptorChain(Interceptor i) 377 { 378 interceptor_chain = i; 379 } 380 381 384 public List<Interceptor> getInterceptors() 385 { 386 return InterceptorChainFactory.asList(interceptor_chain); 387 } 388 389 392 public CacheLoader getCacheLoader() 393 { 394 if (cacheLoaderManager == null) return null; 395 return cacheLoaderManager.getCacheLoader(); 396 } 397 398 404 public void setPojoCacheConfig(Element config) throws CacheException 405 { 406 log.warn("setPojoCacheConfig(): You have a PojoCache config that is not used in CacheImpl."); 407 } 408 409 public Element getPojoCacheConfig() 410 { 411 return null; 412 } 413 414 417 public MessageListener getMessageListener() 418 { 419 return ml; 420 } 421 422 public String getEvictionInterceptorClass() 423 { 424 return this.evictionInterceptorClass; 425 } 426 427 private void setUseReplQueue(boolean flag) 428 { 429 if (flag) 430 { 431 if (repl_queue == null) 432 { 433 repl_queue = new ReplicationQueue(this, configuration.getReplQueueInterval(), configuration.getReplQueueMaxElements()); 434 if (configuration.getReplQueueInterval() >= 0) 435 { 436 repl_queue.start(); 437 } 438 } 439 } 440 else 441 { 442 if (repl_queue != null) 443 { 444 repl_queue.stop(); 445 repl_queue = null; 446 } 447 } 448 } 449 450 451 454 public ReplicationQueue getReplQueue() 455 { 456 return repl_queue; 457 } 458 459 462 private void setIsolationLevel(IsolationLevel level) 463 { 464 LockStrategyFactory.setIsolationLevel(level); 465 } 466 467 472 public void setTransactionManagerLookup(TransactionManagerLookup l) 473 { 474 this.tm_lookup = l; 475 } 476 477 479 public TransactionManager getTransactionManager() 480 { 481 return tm; 482 } 483 484 488 public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException 489 { 490 if (channel == null) 491 { 492 throw new ChannelNotConnectedException(); 493 } 494 boolean rc = channel.getState(null, timeout); 495 if (rc) 496 { 497 log.debug("fetchState(): state was retrieved successfully"); 498 } 499 else 500 { 501 log.debug("fetchState(): state could not be retrieved (first member)"); 502 } 503 } 504 505 public void fetchPartialState(Object sources[], Fqn sourceTarget, Fqn integrationTarget) throws Exception 506 { 507 String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMETER + integrationTarget; 508 fetchPartialState(sources, encodedStateId); 509 } 510 511 public void fetchPartialState(Object sources[], Fqn subtree) throws Exception 512 { 513 if (subtree == null) 514 { 515 throw new IllegalArgumentException ("Cannot fetch partial state. Invalid subtree " + subtree); 516 } 517 fetchPartialState(sources, subtree.toString()); 518 } 519 520 private void fetchPartialState(Object sources[], String stateId) throws Exception 521 { 522 if (sources == null || sources.length < 1 || stateId == null) 523 { 524 if (log.isWarnEnabled()) 528 { 529 log.warn("Cannot fetch partial state, targets are " + Arrays.asList(sources) + 530 " and stateId is " + stateId); 531 } 532 return; 533 } 534 535 ArrayList targets = new ArrayList (Arrays.asList(sources)); 536 537 targets.remove(getLocalAddress()); 539 540 if (targets.isEmpty()) 541 { 542 log.debug("Cannot fetch partial state. There are no target members specified"); 545 return; 546 } 547 548 log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets); 549 boolean successfulTransfer = false; 550 for (Iterator iter = targets.iterator(); iter.hasNext() && !successfulTransfer;) 551 { 552 Address target = (Address) iter.next(); 553 log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target); 554 isStateSet = false; 555 successfulTransfer = channel.getState(target, stateId, stateFetchTimeout); 556 if (successfulTransfer) 557 { 558 try 559 { 560 ml.waitForState(); 561 } 562 catch (Exception transferFailed) 563 { 564 successfulTransfer = false; 565 } 566 } 567 log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed")); 568 } 569 570 if (!successfulTransfer) 571 { 572 log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets); 573 } 574 } 575 576 581 public void create() throws Exception 582 { 583 NodeFactory nf; 585 if ((nf = configuration.getRuntimeConfig().getNodeFactory()) == null) 586 { 587 nf = new NodeFactory(this); 588 configuration.getRuntimeConfig().setNodeFactory(nf); 589 } 590 else 591 { 592 nf.init(); 594 } 595 596 if (notifier == null) notifier = new Notifier(this); 597 stateFetchTimeout = configuration.getLockAcquisitionTimeout() + 5000; 598 599 NodeSPI tempRoot = nf.createRootDataNode(); 601 if (root == null || !root.getClass().equals(tempRoot.getClass())) root = tempRoot; 604 605 setUseReplQueue(configuration.isUseReplQueue()); 606 setIsolationLevel(configuration.getIsolationLevel()); 607 608 this.tm = configuration.getRuntimeConfig().getTransactionManager(); 610 if (tm == null) 611 { 612 if (this.tm_lookup == null && configuration.getTransactionManagerLookupClass() != null) 614 { 615 Class clazz = Thread.currentThread().getContextClassLoader().loadClass(configuration.getTransactionManagerLookupClass()); 616 this.tm_lookup = (TransactionManagerLookup) clazz.newInstance(); 617 } 618 619 try 620 { 621 if (tm_lookup != null) 622 { 623 tm = tm_lookup.getTransactionManager(); 624 configuration.getRuntimeConfig().setTransactionManager(tm); 625 } 626 else 627 { 628 log.warn("No transaction manager lookup class has been defined. Transactions cannot be used"); 629 } 630 } 631 catch (Exception e) 632 { 633 log.debug("failed looking up TransactionManager, will not use transactions", e); 634 } 635 } 636 637 if ((configuration.getCacheLoaderConfig() != null || cloaderConfig != null) && cacheLoaderManager == null) 639 { 640 initialiseCacheLoaderManager(); 641 } 642 643 getRegionManager(); createEvictionPolicy(); 645 646 switch (configuration.getCacheMode()) 647 { 648 case LOCAL: 649 log.debug("cache mode is local, will not create the channel"); 650 break; 651 case REPL_SYNC: 652 case REPL_ASYNC: 653 case INVALIDATION_ASYNC: 654 case INVALIDATION_SYNC: 655 if (log.isDebugEnabled()) log.debug("cache mode is " + configuration.getCacheMode()); 656 if (channel != null) 657 { log.info("channel is already running"); 659 return; 660 } 661 662 channel = getMultiplexerChannel(); 664 665 if (channel != null) 666 { 667 configuration.setUsingMultiplexer(true); 668 if (log.isDebugEnabled()) 669 { 670 log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + 671 " using stack " + configuration.getMultiplexerStack()); 672 } 673 } 674 else 675 { 676 if (configuration.getClusterConfig() == null) 677 { 678 if (configuration.getMultiplexerService() != null 681 || configuration.getMultiplexerStack() != null 682 || configuration.getRuntimeConfig().getMuxChannelFactory() != null) 683 { 684 throw new RuntimeException ("Unable to start multiplexed Channel and property ClusterConfig not set"); 685 } 686 configuration.setClusterConfig(getDefaultProperties()); 687 log.debug("setting cluster properties to default value"); 688 } 689 channel = new JChannel(configuration.getClusterConfig()); 690 if (log.isTraceEnabled()) 691 { 692 log.trace("cache properties: " + configuration.getClusterConfig()); 693 } 694 } 695 channel.setOpt(Channel.AUTO_RECONNECT, true); 696 channel.setOpt(Channel.AUTO_GETSTATE, true); 697 channel.setOpt(Channel.BLOCK, true); 698 699 707 708 disp = new InactiveRegionAwareRpcDispatcher(channel, ml, this, this); 711 713 714 disp.setMarshaller(getMarshaller()); 715 716 setBuddyReplicationConfig(configuration.getBuddyReplicationConfig()); 717 break; 718 default: 719 throw new IllegalArgumentException ("cache mode " + configuration.getCacheMode() + " is invalid"); 720 } 721 722 interceptor_chain = new InterceptorChainFactory().buildInterceptorChain(this); 724 725 getRegionManager().setDefaultInactive(configuration.isInactiveOnStartup()); 726 727 useCreateService = true; 728 } 729 730 protected boolean shouldFetchStateOnStartup() 731 { 732 boolean loaderFetch = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState(); 733 return !configuration.isInactiveOnStartup() && buddyManager == null && (configuration.isFetchInMemoryState() || loaderFetch); 734 } 735 736 741 public void start() throws Exception 742 { 743 744 if (!useCreateService) 746 { 747 create(); 748 } 749 750 if (cacheLoaderManager != null) 753 { 754 cacheLoaderManager.startCacheLoader(); 755 } 756 757 switch (configuration.getCacheMode()) 758 { 759 case LOCAL: 760 break; 761 case REPL_SYNC: 762 case REPL_ASYNC: 763 case INVALIDATION_ASYNC: 764 case INVALIDATION_SYNC: 765 channel.connect(configuration.getClusterName()); 766 767 if (log.isInfoEnabled()) 768 { 769 log.info("CacheImpl local address is " + channel.getLocalAddress()); 770 } 771 if (shouldFetchStateOnStartup()) 772 { 773 try 774 { 775 fetchStateOnStartup(); 776 } 777 catch (Exception e) 778 { 779 channel.disconnect(); 782 channel.close(); 783 throw e; 784 } 785 } 786 if (buddyManager != null) 787 { 788 buddyManager.init(this); 789 if (configuration.isUseReplQueue()) 790 { 791 log.warn("Replication queue not supported when using buddy replication. Disabling repliction queue."); 792 configuration.setUseReplQueue(false); 793 repl_queue = null; 794 } 795 } 796 break; 797 default: 798 throw new IllegalArgumentException ("cache mode " + configuration.getCacheMode() + " is invalid"); 799 } 800 801 if (cacheLoaderManager != null) 803 { 804 cacheLoaderManager.preloadCache(); 805 } 806 807 determineCoordinator(); 809 810 if (regionManager.isUsingEvictions()) 812 { 813 regionManager.startEvictionThread(); 814 } 815 816 notifier.notifyCacheStarted(this, true); 817 started = true; 818 log.info("JBoss Cache version: " + getVersion()); 819 } 820 821 824 public void destroy() 825 { 826 useCreateService = false; 827 regionManager = null; 828 notifier = null; 829 } 830 831 832 835 public void stop() 836 { 837 if (channel != null) 838 { 839 log.info("stop(): closing the channel"); 840 channel.close(); 841 channel = null; 842 } 843 if (disp != null) 844 { 845 log.info("stop(): stopping the dispatcher"); 846 disp.stop(); 847 disp = null; 848 } 849 if (members != null && members.size() > 0) 850 { 851 members.clear(); 852 } 853 854 coordinator = false; 855 856 if (repl_queue != null) 857 { 858 repl_queue.stop(); 859 } 860 861 if (cacheLoaderManager != null) 862 { 863 cacheLoaderManager.stopCacheLoader(); 864 } 865 866 if (notifier != null) 867 { 868 notifier.notifyCacheStopped(this, true); 869 notifier.removeAllCacheListeners(); 870 notifier.setEvictionPolicyListener(null); 871 } 872 873 useCreateService = false; 874 started = false; 875 } 876 877 878 879 880 881 886 private void setBuddyReplicationConfig(BuddyReplicationConfig config) 887 { 888 if (config != null) 889 { 890 buddyManager = new BuddyManager(config); 891 if (!buddyManager.isEnabled()) 892 { 893 buddyManager = null; 894 } 895 else 896 { 897 internalFqns.add(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN); 898 } 899 } 900 } 901 902 907 public BuddyManager getBuddyManager() 908 { 909 return buddyManager; 910 } 911 912 920 public Set <Fqn> getInternalFqns() 921 { 922 return Collections.unmodifiableSet(internalFqns); 923 } 924 925 926 927 protected void createEvictionPolicy() 928 { 929 if (configuration.getEvictionConfig() != null 930 && configuration.getEvictionConfig().isValidConfig()) 931 { 932 regionManager.setEvictionConfig(configuration.getEvictionConfig()); 933 regionManager.setUsingEvictions(true); 934 } 935 else 936 { 937 regionManager.setUsingEvictions(false); 938 log.debug("Not using an EvictionPolicy"); 939 } 940 } 941 942 949 public void load(String fqn) throws Exception 950 { 951 if (cacheLoaderManager != null) 952 { 953 cacheLoaderManager.preload(Fqn.fromString(fqn), true, true); 954 } 955 } 956 957 protected void determineCoordinator() 958 { 959 synchronized (members) 961 { 962 Address coord = getCoordinator(); 963 coordinator = (coord == null ? false : coord.equals(getLocalAddress())); 964 } 965 } 966 967 971 public Address getCoordinator() 972 { 973 if (channel == null) 974 { 975 return null; 976 } 977 978 synchronized (members) 979 { 980 if (members.size() == 0) 981 { 982 log.debug("getCoordinator(): waiting on viewAccepted()"); 983 try 984 { 985 members.wait(); 986 } 987 catch (InterruptedException iex) 988 { 989 log.error("getCoordinator(): Interrupted while waiting for members to be set", iex); 990 } 991 } 992 return members.size() > 0 ? members.get(0) : null; 993 } 994 } 995 996 998 1009 public void registerClassLoader(String fqn, ClassLoader cl) throws RegionNameConflictException 1010 { 1011 regionManager.registerClassLoader(Fqn.fromString(fqn), cl); 1012 } 1013 1014 1021 public void unregisterClassLoader(String fqn) throws RegionNotFoundException 1022 { 1023 regionManager.unregisterClassLoader(Fqn.fromString(fqn)); 1024 } 1025 1026 1030 protected Node createSubtreeRootNode(Fqn subtree) throws CacheException 1031 { 1032 NodeSPI parent = root; 1033 NodeSPI child = null; 1034 Object owner = getOwnerForLock(); 1035 Object name; 1036 NodeFactory factory = configuration.getRuntimeConfig().getNodeFactory(); 1037 1038 NodeFactory.NodeType type = configuration.isNodeLockingOptimistic() 1039 ? NodeFactory.NodeType.VERSIONED_NODE 1040 : NodeFactory.NodeType.UNVERSIONED_NODE; 1041 1042 for (int i = 0; i < subtree.size(); i++) 1043 { 1044 name = subtree.get(i); 1045 child = parent.getChildDirect(name); 1046 if (child == null) 1047 { 1048 try 1050 { 1051 parent.getLock().acquire(owner, configuration.getSyncReplTimeout(), NodeLock.LockType.WRITE); 1052 } 1053 catch (InterruptedException e) 1054 { 1055 log.error("Interrupted while locking" + parent.getFqn(), e); 1056 throw new CacheException(e.getLocalizedMessage(), e); 1057 } 1058 1059 try 1060 { 1061 child = factory.createDataNode(name, 1062 subtree.getFqnChild(i + 1), 1063 parent, null, true); 1064 parent.addChild(name, child); 1065 } 1066 finally 1067 { 1068 if (log.isDebugEnabled()) 1069 { 1070 log.debug("forcing release of locks in " + parent.getFqn()); 1071 } 1072 try 1073 { 1074 parent.getLock().releaseAll(); 1075 } 1076 catch (Throwable t) 1077 { 1078 log.error("failed releasing locks", t); 1079 } 1080 } 1081 } 1082 1083 parent = child; 1084 } 1085 1086 return child; 1087 } 1088 1089 1096 protected void _evictSubtree(Fqn subtree) throws CacheException 1097 { 1098 1099 if (!exists(subtree)) 1100 { 1101 return; } 1103 1104 if (log.isTraceEnabled()) 1105 { 1106 log.trace("_evictSubtree(" + subtree + ")"); 1107 } 1108 1109 Set children = getChildrenNames(subtree); 1111 if (children != null) 1112 { 1113 Object [] kids = children.toArray(); 1114 1115 for (int i = 0; i < kids.length; i++) 1116 { 1117 Object s = kids[i]; 1118 Fqn tmp = new Fqn(subtree, s); 1119 _remove(null, tmp, 1121 false, false, true); } 1125 } 1126 1127 _remove(null, subtree, false, false, true); 1129 1130 } 1131 1132 1155 public byte[] generateState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable 1156 { 1157 1158 MarshalledValueOutputStream out = null; 1159 byte[] result = null; 1160 try 1161 { 1162 ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024); 1163 out = new MarshalledValueOutputStream(baos); 1164 getStateTransferManager().getState(out, fqn, timeout, force, suppressErrors); 1165 result = baos.getRawBuffer(); 1166 } 1167 finally 1168 { 1169 Util.close(out); 1170 } 1171 1172 return result; 1173 } 1174 1175 private void removeLocksForDeadMembers(NodeSPI node, 1176 Vector deadMembers) 1177 { 1178 Set deadOwners = new HashSet (); 1179 NodeLock lock = node.getLock(); 1180 Object owner = lock.getWriterOwner(); 1181 1182 if (isLockOwnerDead(owner, deadMembers)) 1183 { 1184 deadOwners.add(owner); 1185 } 1186 1187 Iterator iter = lock.getReaderOwners().iterator(); 1188 while (iter.hasNext()) 1189 { 1190 owner = iter.next(); 1191 if (isLockOwnerDead(owner, deadMembers)) 1192 { 1193 deadOwners.add(owner); 1194 } 1195 } 1196 1197 for (iter = deadOwners.iterator(); iter.hasNext();) 1198 { 1199 GlobalTransaction deadOwner = (GlobalTransaction) iter.next(); 1200 boolean localTx = deadOwner.getAddress().equals(getLocalAddress()); 1201 boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, this); 1202 1203 if (broken && log.isTraceEnabled()) 1204 { 1205 log.trace("Broke lock for node " + node.getFqn() + 1206 " held by " + deadOwner); 1207 } 1208 } 1209 1210 for (NodeSPI child : node.getChildrenDirect()) 1212 { 1213 removeLocksForDeadMembers(child, deadMembers); 1214 } 1215 } 1216 1217 private boolean isLockOwnerDead(Object owner, Vector deadMembers) 1218 { 1219 boolean result = false; 1220 if (owner != null && owner instanceof GlobalTransaction) 1221 { 1222 Object addr = ((GlobalTransaction) owner).getAddress(); 1223 result = deadMembers.contains(addr); 1224 } 1225 return result; 1226 } 1227 1228 protected void fetchStateOnStartup() throws Exception 1229 { 1230 long start, stop; 1231 isStateSet = false; 1232 start = System.currentTimeMillis(); 1233 boolean rc = channel.getState(null, stateFetchTimeout); 1234 if (rc) 1235 { 1236 ml.waitForState(); 1237 stop = System.currentTimeMillis(); 1238 if (log.isDebugEnabled()) 1239 { 1240 log.debug("state was retrieved successfully (in " + (stop - start) + " milliseconds)"); 1241 } 1242 } 1243 else 1244 { 1245 determineCoordinator(); 1250 1251 if (isCoordinator()) 1252 { 1253 log.debug("State could not be retrieved (we are the first member in group)"); 1254 } 1255 else 1256 { 1257 throw new CacheException("Initial state transfer failed: " + 1258 "Channel.getState() returned false"); 1259 } 1260 } 1261 } 1262 1263 1265 1270 public Node get(String fqn) throws CacheException 1271 { 1272 return get(Fqn.fromString(fqn)); 1273 } 1274 1275 1282 public Node get(Fqn fqn) throws CacheException 1283 { 1284 MethodCall m = MethodCallFactory.create(MethodDeclarations.getNodeMethodLocal, fqn); 1285 return (Node) invokeMethod(m); 1286 } 1287 1288 1291 public Node _get(Fqn fqn) throws CacheException 1292 { 1293 return findNode(fqn); 1294 } 1295 1296 1299 public Map _getData(Fqn fqn) 1300 { 1301 NodeSPI n = findNode(fqn); 1302 if (n == null) return null; 1303 return n.getDataDirect(); 1304 } 1305 1306 1313 public Set getKeys(String fqn) throws CacheException 1314 { 1315 return getKeys(Fqn.fromString(fqn)); 1316 } 1317 1318 1325 public Set getKeys(Fqn fqn) throws CacheException 1326 { 1327 MethodCall m = MethodCallFactory.create(MethodDeclarations.getKeysMethodLocal, fqn); 1328 return (Set ) invokeMethod(m); 1329 } 1330 1331 1332 public Set _getKeys(Fqn fqn) throws CacheException 1333 { 1334 NodeSPI n = findNode(fqn); 1335 if (n == null) 1336 { 1337 return null; 1338 } 1339 Set keys = n.getKeysDirect(); 1340 return new HashSet (keys); 1341 } 1342 1343 1350 public Object get(String fqn, Object key) throws CacheException 1351 { 1352 return get(Fqn.fromString(fqn), key); 1353 } 1354 1355 1356 1363 public Object get(Fqn fqn, Object key) throws CacheException 1364 { 1365 return get(fqn, key, true); 1366 } 1367 1368 public Object _get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException 1369 { 1370 if (log.isTraceEnabled()) 1371 { 1372 log.trace(new StringBuffer ("_get(").append("\"").append(fqn).append("\", \"").append(key).append("\", \""). 1373 append(sendNodeEvent).append("\")")); 1374 } 1375 if (sendNodeEvent) notifier.notifyNodeVisited(fqn, true, true); 1376 NodeSPI n = findNode(fqn); 1377 if (n == null) 1378 { 1379 log.trace("node not found"); 1380 return null; 1381 } 1382 if (sendNodeEvent) notifier.notifyNodeVisited(fqn, false, true); 1383 return n.getDirect(key); 1384 } 1385 1386 1387 protected Object get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException 1388 { 1389 MethodCall m = MethodCallFactory.create(MethodDeclarations.getKeyValueMethodLocal, fqn, key, sendNodeEvent); 1390 return invokeMethod(m); 1391 } 1392 1393 1401 public Object peek(Fqn fqn, Object key) throws CacheException 1402 { 1403 return get(fqn, key, false); 1404 } 1405 1406 1407 public NodeSPI peek(Fqn fqn) 1408 { 1409 return findInternal(fqn, true); 1410 } 1411 1412 1421 public boolean exists(String fqn) 1422 { 1423 return exists(Fqn.fromString(fqn)); 1424 } 1425 1426 1427 1436 public boolean exists(Fqn fqn) 1437 { 1438 Node n = findInternal(fqn, false); 1439 return n != null; 1440 } 1441 1442 1447 private NodeSPI findInternal(Fqn fqn, boolean includeNodesMarkedAsRemoved) 1448 { 1449 if (fqn == null || fqn.size() == 0) return root; 1450 NodeSPI n = root; 1451 int fqnSize = fqn.size(); 1452 for (int i = 0; i < fqnSize; i++) 1453 { 1454 Object obj = fqn.get(i); 1455 n = n.getChildDirect(obj); 1456 if (n == null) 1457 { 1458 return null; 1459 } 1460 else if (!includeNodesMarkedAsRemoved && n.isDeleted()) 1461 { 1462 return null; 1463 } 1464 } 1465 return n; 1466 } 1467 1468 1469 1473 public boolean exists(String fqn, Object key) 1474 { 1475 return exists(Fqn.fromString(fqn), key); 1476 } 1477 1478 1479 1487 public boolean exists(Fqn fqn, Object key) 1488 { 1489 NodeSPI n = findInternal(fqn, false); 1490 return n != null && n.getKeysDirect().contains(key); 1491 } 1492 1493 1494 1503 public void put(String fqn, Map data) throws CacheException 1504 { 1505 put(Fqn.fromString(fqn), data); 1506 } 1507 1508 1517 public void put(Fqn fqn, Map data) throws CacheException 1518 { 1519 GlobalTransaction tx = getCurrentTransaction(); 1520 MethodCall m = MethodCallFactory.create(MethodDeclarations.putDataMethodLocal, tx, fqn, data, true); 1521 invokeMethod(m); 1522 } 1523 1524 1534 public Object put(String fqn, Object key, Object value) throws CacheException 1535 { 1536 return put(Fqn.fromString(fqn), key, value); 1537 } 1538 1539 1549 public Object put(Fqn fqn, Object key, Object value) throws CacheException 1550 { 1551 GlobalTransaction tx = getCurrentTransaction(); 1552 MethodCall m = MethodCallFactory.create(MethodDeclarations.putKeyValMethodLocal, tx, fqn, key, value, true); 1553 return invokeMethod(m); 1554 } 1555 1556 1561 public void remove(String fqn) throws CacheException 1562 { 1563 remove(Fqn.fromString(fqn)); 1564 } 1565 1566 1571 public void remove(Fqn fqn) throws CacheException 1572 { 1573 GlobalTransaction tx = getCurrentTransaction(); 1574 MethodCall m = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, tx, fqn, true); 1575 invokeMethod(m); 1576 } 1577 1578 1588 public void evict(Fqn fqn) throws CacheException 1589 { 1590 MethodCall m = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, fqn); 1591 invokeMethod(m); 1592 } 1593 1594 1601 public Object remove(String fqn, Object key) throws CacheException 1602 { 1603 return remove(Fqn.fromString(fqn), key); 1604 } 1605 1606 1613 public Object remove(Fqn fqn, Object key) throws CacheException 1614 { 1615 GlobalTransaction tx = getCurrentTransaction(); 1616 MethodCall m = MethodCallFactory.create(MethodDeclarations.removeKeyMethodLocal, tx, fqn, key, true); 1617 return invokeMethod(m); 1618 } 1619 1620 1623 public void removeData(String fqn) throws CacheException 1624 { 1625 removeData(Fqn.fromString(fqn)); 1626 } 1627 1628 1631 public void removeData(Fqn fqn) throws CacheException 1632 { 1633 GlobalTransaction tx = getCurrentTransaction(); 1634 MethodCall m = MethodCallFactory.create(MethodDeclarations.removeDataMethodLocal, tx, fqn, true); 1635 invokeMethod(m); 1636 } 1637 1638 1648 1652 1660 1664 1667 public void releaseAllLocks(String fqn) 1668 { 1669 releaseAllLocks(Fqn.fromString(fqn)); 1670 } 1671 1672 1675 public void releaseAllLocks(Fqn fqn) 1676 { 1677 MethodCall m = MethodCallFactory.create(MethodDeclarations.releaseAllLocksMethodLocal, fqn); 1678 try 1679 { 1680 invokeMethod(m); 1681 } 1682 catch (CacheException e) 1683 { 1684 log.error("failed releasing all locks for " + fqn, e); 1685 } 1686 } 1687 1688 1692 public String print(String fqn) 1693 { 1694 return print(Fqn.fromString(fqn)); 1695 } 1696 1697 1701 public String print(Fqn fqn) 1702 { 1703 MethodCall m = MethodCallFactory.create(MethodDeclarations.printMethodLocal, fqn); 1704 Object retval = null; 1705 try 1706 { 1707 retval = invokeMethod(m); 1708 } 1709 catch (Throwable e) 1710 { 1711 retval = e; 1712 } 1713 if (retval != null) 1714 { 1715 return retval.toString(); 1716 } 1717 else 1718 { 1719 return ""; 1720 } 1721 } 1722 1723 1724 1734 public Set getChildrenNames(String fqn) throws CacheException 1735 { 1736 return getChildrenNames(Fqn.fromString(fqn)); 1737 } 1738 1739 1746 public Set getChildrenNames(Fqn fqn) throws CacheException 1747 { 1748 MethodCall m = MethodCallFactory.create(MethodDeclarations.getChildrenNamesMethodLocal, fqn); 1749 Set retval = (Set ) invokeMethod(m); 1750 return retval == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet (retval)); 1751 } 1752 1753 public Set <Object > _getChildrenNames(Fqn fqn) throws CacheException 1754 { 1755 NodeSPI n = findNode(fqn); 1756 if (n == null) return null; 1757 Set <Object > s = n.getChildrenNamesDirect(); 1758 return s; 1759 } 1760 1761 1764 public boolean hasChild(Fqn fqn) 1765 { 1766 if (fqn == null) return false; 1767 1768 NodeSPI n = root; 1769 Object obj; 1770 for (int i = 0; i < fqn.size(); i++) 1771 { 1772 obj = fqn.get(i); 1773 n = n.getChildDirect(obj); 1774 if (n == null) 1775 { 1776 return false; 1777 } 1778 } 1779 return !n.getChildrenMapDirect().isEmpty(); 1780 } 1781 1782 1785 public String toString() 1786 { 1787 return toString(true); 1788 } 1789 1790 1791 1794 public String toString(boolean details) 1795 { 1796 StringBuffer sb = new StringBuffer (); 1797 int indent = 0; 1798 1799 if (!details) 1800 { 1801 sb.append(getClass().getName()).append(" [").append(getNumberOfNodes()).append(" nodes, "); 1802 sb.append(getNumberOfLocksHeld()).append(" locks]"); 1803 } 1804 else 1805 { 1806 for (NodeSPI n : root.getChildrenDirect()) 1807 { 1808 n.print(sb, indent); 1809 sb.append("\n"); 1810 } 1811 } 1812 return sb.toString(); 1813 } 1814 1815 1816 1821 public String printDetails() 1822 { 1823 StringBuffer sb = new StringBuffer (); 1824 root.printDetails(sb, 0); 1825 sb.append("\n"); 1826 return sb.toString(); 1827 } 1828 1829 1832 public String printLockInfo() 1833 { 1834 StringBuffer sb = new StringBuffer ("\n"); 1835 int indent = 0; 1836 1837 for (NodeSPI n : root.getChildrenDirect()) 1838 { 1839 n.getLock().printLockInfo(sb, indent); 1840 sb.append("\n"); 1841 } 1842 return sb.toString(); 1843 } 1844 1845 1848 public int getNumberOfLocksHeld() 1849 { 1850 return numLocks(root); 1851 } 1852 1853 private int numLocks(NodeSPI n) 1854 { 1855 int num = 0; 1856 if (n.getLock().isLocked()) 1857 { 1858 num++; 1859 } 1860 for (NodeSPI cn : n.getChildrenDirect(true)) 1861 { 1862 num += numLocks(cn); 1863 } 1864 return num; 1865 } 1866 1867 1873 public int getNumberOfNodes() 1874 { 1875 return numNodes(root) - 1; 1876 } 1877 1878 private int numNodes(NodeSPI n) 1879 { 1880 if (n == null) 1881 { 1882 return 0; 1883 } 1884 int count = 1; for (NodeSPI child : n.getChildrenDirect()) 1886 { 1887 count += numNodes(child); 1888 } 1889 return count; 1890 } 1891 1892 1898 public int getNumberOfAttributes() 1899 { 1900 return numAttributes(root); 1901 } 1902 1903 1909 public int getNumberOfAttributes(Fqn fqn) 1910 { 1911 NodeSPI n = findNode(fqn); 1912 return numAttributes(n); 1913 } 1914 1915 private int numAttributes(NodeSPI n) 1916 { 1917 int count = 0; 1918 for (NodeSPI child : n.getChildrenDirect()) 1919 { 1920 count += numAttributes(child); 1921 } 1922 count += n.getDataDirect().size(); 1923 return count; 1924 } 1925 1926 1927 1928 1938 public List callRemoteMethods(List mbrs, MethodCall method_call, 1939 boolean synchronous, boolean exclude_self, long timeout) 1940 throws Exception 1941 { 1942 return callRemoteMethods(mbrs, method_call, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, exclude_self, timeout); 1943 } 1944 1945 1946 1958 public List callRemoteMethods(List mbrs, MethodCall method_call, int mode, boolean exclude_self, long timeout) 1959 throws Exception 1960 { 1961 RspList rsps; 1962 Rsp rsp; 1963 List retval; 1964 Vector validMembers; 1965 1966 if (disp == null) 1967 { 1968 return null; 1969 } 1970 1971 validMembers = mbrs != null ? new Vector (mbrs) : new Vector (this.members); 1972 if (exclude_self && validMembers.size() > 0) 1973 { 1974 Object local_addr = getLocalAddress(); 1975 if (local_addr != null) 1976 { 1977 validMembers.remove(local_addr); 1978 } 1979 } 1980 if (validMembers.size() == 0) 1981 { 1982 if (log.isTraceEnabled()) 1983 { 1984 log.trace("destination list is empty, discarding call"); 1985 } 1986 return null; 1987 } 1988 1989 if (log.isTraceEnabled()) 1990 { 1991 log.trace("callRemoteMethods(): valid members are " + validMembers + " methods: " + method_call.getArgs()[0]); 1992 } 1993 1994 rsps = disp.callRemoteMethods(validMembers, method_call, mode, timeout, buddyManager != null && buddyManager.isEnabled()); 1995 1996 if (rsps == null) 1999 { 2000 throw new NotSerializableException ("RpcDispatcher returned a null. This is most often caused by args for " + method_call + " not being serializable."); 2002 } 2003 2004 if (mode == GroupRequest.GET_NONE) 2005 { 2006 return new ArrayList (); } 2008 2009 if (log.isTraceEnabled()) 2010 { 2011 log.trace("(" + getLocalAddress() + "): responses for method " + method_call.getName() + ":\n" + rsps); 2012 } 2013 2014 retval = new ArrayList (rsps.size()); 2015 for (int i = 0; i < rsps.size(); i++) 2016 { 2017 rsp = (Rsp) rsps.elementAt(i); 2018 if (rsp.wasSuspected() || !rsp.wasReceived()) 2019 { 2020 CacheException ex; 2021 if (rsp.wasSuspected()) 2022 { 2023 ex = new SuspectException("suspected member: " + rsp.getSender()); 2024 } 2025 else 2026 { 2027 ex = new TimeoutException("timeout for " + rsp.getSender()); 2028 } 2029 retval.add(new ReplicationException("rsp=" + rsp, ex)); 2030 } 2031 else 2032 { 2033 retval.add(rsp.getValue()); 2034 } 2035 } 2036 return retval; 2037 } 2038 2039 2050 public List callRemoteMethods(List members, Method method, Object [] args, 2051 boolean synchronous, boolean exclude_self, long timeout) 2052 throws Exception 2053 { 2054 return callRemoteMethods(members, MethodCallFactory.create(method, args), synchronous, exclude_self, timeout); 2055 } 2056 2057 public List callRemoteMethods(Vector members, Method method, Object [] args, 2058 boolean synchronous, boolean exclude_self, long timeout) 2059 throws Exception 2060 { 2061 return callRemoteMethods(members, MethodCallFactory.create(method, args), synchronous, exclude_self, timeout); 2062 } 2063 2064 2076 public List callRemoteMethods(Vector members, String method_name, 2077 Class [] types, Object [] args, 2078 boolean synchronous, boolean exclude_self, long timeout) 2079 throws Exception 2080 { 2081 Method method = getClass().getDeclaredMethod(method_name, types); 2082 return callRemoteMethods(members, method, args, synchronous, exclude_self, timeout); 2083 } 2084 2085 2086 2087 2088 2089 2090 2091 public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, DataVersion dv) throws CacheException 2092 { 2093 _put(tx, fqn, data, create_undo_ops, false, dv); 2094 } 2095 2096 public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, boolean erase_contents, DataVersion dv) throws CacheException 2097 { 2098 _put(tx, fqn, data, create_undo_ops, erase_contents); 2099 } 2100 2101 public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops, DataVersion dv) throws CacheException 2102 { 2103 return _put(tx, fqn, key, value, create_undo_ops); 2104 } 2105 2106 public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, DataVersion dv) throws CacheException 2107 { 2108 _remove(tx, fqn, create_undo_ops, true); 2109 } 2110 2111 public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops, DataVersion dv) throws CacheException 2112 { 2113 return _remove(tx, fqn, key, create_undo_ops); 2114 } 2115 2116 public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, DataVersion dv) throws CacheException 2117 { 2118 2119 _removeData(tx, fqn, create_undo_ops, true); 2120 } 2121 2122 2123 2124 2139 public void _put(GlobalTransaction tx, String fqn, Map data, boolean create_undo_ops) 2140 throws CacheException 2141 { 2142 _put(tx, Fqn.fromString(fqn), data, create_undo_ops); 2143 } 2144 2145 2146 2161 public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops) 2162 throws CacheException 2163 { 2164 _put(tx, fqn, data, create_undo_ops, false); 2165 } 2166 2167 2183 public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, boolean erase_contents) 2184 throws CacheException 2185 { 2186 if (log.isTraceEnabled()) 2187 { 2188 log.trace("_put(" + tx + ", \"" + fqn + "\", " + data + " undo=" + create_undo_ops + " erase=" + erase_contents + ")"); 2189 } 2190 2191 NodeSPI n = findNodeCheck(tx, fqn); 2192 Map rawData = n.getDataDirect(); 2193 notifier.notifyNodeModified(fqn, true, CacheListener.ModificationType.PUT_MAP, rawData, true); 2194 2195 if (tx != null && create_undo_ops) 2198 { 2199 MethodCall undo_op = MethodCallFactory.create(MethodDeclarations.putDataEraseMethodLocal, tx, fqn, rawData, false, true); 2201 tx_table.addUndoOperation(tx, undo_op); 2202 } 2203 2204 if (erase_contents) n.clearDataDirect(); 2205 n.putDirect(data); 2206 2207 notifier.notifyNodeModified(fqn, false, CacheListener.ModificationType.PUT_MAP, n.getDataDirect(), true); 2208 2209 } 2210 2211 2216 public Object _put(GlobalTransaction tx, String fqn, Object key, Object value, boolean create_undo_ops) 2217 throws CacheException 2218 { 2219 return _put(tx, Fqn.fromString(fqn), key, value, create_undo_ops); 2220 } 2221 2222 2223 2228 public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops) 2229 throws CacheException 2230 { 2231 if (log.isTraceEnabled()) 2232 { 2233 log.trace(new StringBuffer ("_put(").append(tx).append(", \""). 2234 append(fqn).append("\", k=").append(key).append(", v=").append(value).append(")")); 2235 } 2236 if (key instanceof Map ) 2238 { 2239 log.warn("using a map as a key in a map, did you mean to do that?"); 2240 } 2241 2242 NodeSPI n = findNodeCheck(tx, fqn); 2243 Map rawData = n.getDataDirect(); 2244 notifier.notifyNodeModified(fqn, true, CacheListener.ModificationType.PUT_DATA, rawData, true); 2245 2246 Object old_value = n.putDirect(key, value); 2247 2248 if (tx != null && create_undo_ops) 2251 { 2252 MethodCall undo_op; 2253 if (old_value == null) 2254 { 2255 undo_op = MethodCallFactory.create(MethodDeclarations.removeKeyMethodLocal, tx, fqn, key, false); 2256 } 2257 else 2258 { 2259 undo_op = MethodCallFactory.create(MethodDeclarations.putKeyValMethodLocal, tx, fqn, key, old_value, false); 2260 } 2261 tx_table.addUndoOperation(tx, undo_op); 2263 } 2264 2265 Map newData = new HashMap (); 2266 newData.put(key, value); 2267 notifier.notifyNodeModified(fqn, false, CacheListener.ModificationType.PUT_DATA, newData, true); 2268 return old_value; 2269 } 2270 2271 2274 public void _remove(GlobalTransaction tx, String fqn, boolean create_undo_ops) throws CacheException 2275 { 2276 _remove(tx, Fqn.fromString(fqn), create_undo_ops); 2277 } 2278 2279 2282 public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops) throws CacheException 2283 { 2284 _remove(tx, fqn, create_undo_ops, true); 2285 } 2286 2287 public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent) 2288 throws CacheException 2289 { 2290 _remove(tx, fqn, create_undo_ops, sendNodeEvent, false); 2291 } 2292 2293 2301 public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction) 2302 throws CacheException 2303 { 2304 _remove(tx, fqn, create_undo_ops, sendNodeEvent, eviction, null); 2305 } 2306 2307 2320 public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction, DataVersion version) 2321 throws CacheException 2322 { 2323 2324 NodeSPI n; 2325 NodeSPI parent_node; 2326 MethodCall undo_op = null; 2327 2328 if (log.isTraceEnabled()) 2329 { 2330 log.trace("_remove(" + tx + ", \"" + fqn + "\", undo=" + create_undo_ops + ")"); 2331 } 2332 2333 if (tx != null) 2335 { 2336 try 2337 { 2338 int status = tx_table.getLocalTransaction(tx).getStatus(); 2339 if (status == Status.STATUS_MARKED_ROLLBACK || status == Status.STATUS_ROLLEDBACK || status == Status.STATUS_ROLLING_BACK) 2340 { 2341 log.debug("This remove call is triggered by a transaction rollback, as a compensation operation. Do a realRemove() instead."); 2342 realRemove(fqn, true); 2343 return; 2344 } 2345 } 2346 catch (Exception e) 2347 { 2348 log.warn("Unable to get a hold of the transaction for a supposedly transactional call! This *may* result in stale locks!", e); 2350 } 2351 } 2352 2353 if (fqn.size() == 0) 2354 { 2355 Set children = getChildrenNames(fqn); 2356 if (children != null) 2357 { 2358 Object [] kids = children.toArray(); 2359 2360 for (int i = 0; i < kids.length; i++) 2361 { 2362 Object s = kids[i]; 2363 Fqn tmp = new Fqn(fqn, s); 2364 try 2365 { 2366 _remove(tx, tmp, create_undo_ops, true, eviction); 2367 } 2368 catch (Exception e) 2369 { 2370 log.error("failure removing node " + tmp); 2371 } 2372 } 2373 } 2374 return; 2375 } 2376 2377 n = findNode(fqn, version); 2379 if (n == null) 2380 { 2381 if (log.isTraceEnabled()) 2382 { 2383 log.trace("node " + fqn + " not found"); 2384 } 2385 return; 2386 } 2387 2388 if (eviction) 2389 { 2390 notifier.notifyNodeEvicted(fqn, true, true); 2391 } 2392 else 2393 { 2394 notifier.notifyNodeRemoved(fqn, true, n.getDataDirect(), true); 2395 } 2396 2397 parent_node = n.getParent(); 2398 2399 if (eviction || configuration.isNodeLockingOptimistic()) 2401 { 2402 parent_node.removeChildDirect(n.getFqn().getLastElement()); 2403 } 2404 else 2405 { 2406 n.markAsDeleted(true); 2407 } 2408 2409 if (eviction) 2410 { 2411 parent_node.setChildrenLoaded(false); 2412 } 2413 2414 2417 if (tx != null && create_undo_ops && !eviction) 2420 { 2421 undo_op = MethodCallFactory.create(MethodDeclarations.addChildMethodLocal, tx, parent_node.getFqn(), n.getFqn().getLastElement(), n, false); 2422 2423 tx_table.addUndoOperation(tx, undo_op); 2425 } 2426 2427 if (eviction) 2428 { 2429 notifier.notifyNodeEvicted(fqn, false, true); 2430 } 2431 else 2432 { 2433 notifier.notifyNodeRemoved(fqn, false, null, true); 2434 } 2435 } 2436 2437 2444 public Object _remove(GlobalTransaction tx, String fqn, Object key, boolean create_undo_ops) 2445 throws CacheException 2446 { 2447 return _remove(tx, Fqn.fromString(fqn), key, create_undo_ops); 2448 } 2449 2450 2457 public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops) 2458 throws CacheException 2459 { 2460 MethodCall undo_op = null; 2461 Object old_value = null; 2462 2463 if (log.isTraceEnabled()) 2464 { 2465 log.trace("_remove(" + tx + ", \"" + fqn + "\", key=" + key + ")"); 2466 } 2467 2468 NodeSPI n = findNode(fqn); 2471 if (n == null) 2472 { 2473 log.warn("node " + fqn + " not found"); 2474 return null; 2475 } 2476 2477 notifier.notifyNodeModified(fqn, true, CacheListener.ModificationType.REMOVE_DATA, n.getDataDirect(), true); 2478 2479 old_value = n.removeDirect(key); 2480 2481 if (tx != null && create_undo_ops && old_value != null) 2484 { 2485 undo_op = MethodCallFactory.create(MethodDeclarations.putKeyValMethodLocal, tx, fqn, key, old_value, false); 2486 tx_table.addUndoOperation(tx, undo_op); 2488 } 2489 2490 Map removedData = new HashMap (); 2491 removedData.put(key, old_value); 2492 notifier.notifyNodeModified(fqn, false, CacheListener.ModificationType.REMOVE_DATA, removedData, true); 2493 2494 return old_value; 2495 } 2496 2497 2500 public void _removeData(GlobalTransaction tx, String fqn, boolean create_undo_ops) 2501 throws CacheException 2502 { 2503 _removeData(tx, Fqn.fromString(fqn), create_undo_ops); 2504 } 2505 2506 2509 public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops) 2510 throws CacheException 2511 { 2512 _removeData(tx, fqn, create_undo_ops, true); 2513 } 2514 2515 2518 public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent) 2519 throws CacheException 2520 { 2521 _removeData(tx, fqn, create_undo_ops, sendNodeEvent, false); 2522 } 2523 2524 2527 public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction) 2528 throws CacheException 2529 { 2530 _removeData(tx, fqn, create_undo_ops, sendNodeEvent, eviction, null); 2531 } 2532 2533 2536 public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction, DataVersion version) 2537 throws CacheException 2538 { 2539 MethodCall undo_op = null; 2540 2541 if (log.isTraceEnabled()) 2542 { 2543 log.trace("_removeData(" + tx + ", \"" + fqn + "\")"); 2544 } 2545 2546 NodeSPI n = findNode(fqn, version); 2549 if (n == null) 2550 { 2551 log.warn("node " + fqn + " not found"); 2552 return; 2553 } 2554 2555 Map data = n.getDataDirect(); 2556 2557 if (tx != null && create_undo_ops && !eviction) 2560 { 2561 if (!data.isEmpty()) 2562 { 2563 undo_op = MethodCallFactory.create(MethodDeclarations.putDataMethodLocal, 2564 tx, fqn, data, false); 2565 } 2566 } 2567 2568 if (eviction) 2569 { 2570 notifier.notifyNodeEvicted(fqn, true, true); 2571 } 2572 else 2573 { 2574 notifier.notifyNodeModified(fqn, true, CacheListener.ModificationType.REMOVE_DATA, data, true); 2575 } 2576 2577 n.clearDataDirect(); 2578 if (eviction) 2579 { 2580 n.setDataLoaded(false); 2581 } 2582 2583 if (sendNodeEvent) 2584 { 2585 notifier.notifyNodeVisited(fqn, false, true); 2586 } 2587 else 2588 { if (eviction) 2590 { 2591 notifier.notifyNodeEvicted(fqn, false, true); 2592 } 2593 else 2594 { 2595 notifier.notifyNodeModified(fqn, false, CacheListener.ModificationType.REMOVE_DATA, data, true); 2596 } 2597 } 2598 2599 if (tx != null && create_undo_ops) 2601 { 2602 tx_table.addUndoOperation(tx, undo_op); 2603 } 2604 } 2605 2606 2607 2613 public void _evict(Fqn fqn) throws CacheException 2614 { 2615 if (!exists(fqn)) return; boolean create_undo_ops = false; 2618 boolean sendNodeEvent = false; 2619 boolean eviction = true; 2620 if (log.isTraceEnabled()) 2621 { 2622 log.trace("_evict(" + fqn + ")"); 2623 } 2624 if (hasChild(fqn)) 2625 { 2626 _removeData(null, fqn, create_undo_ops, sendNodeEvent, eviction); 2627 } 2628 else 2629 { 2630 _remove(null, fqn, create_undo_ops, sendNodeEvent, eviction); 2631 } 2632 } 2633 2634 2641 public void _evict(Fqn fqn, DataVersion version) throws CacheException 2642 { 2643 if (!exists(fqn)) return; 2645 boolean create_undo_ops = false; 2646 boolean sendNodeEvent = false; 2647 boolean eviction = true; 2648 if (log.isTraceEnabled()) 2649 { 2650 log.trace("_evict(" + fqn + ", " + version + ")"); 2651 } 2652 if (hasChild(fqn)) 2653 { 2654 _removeData(null, fqn, create_undo_ops, sendNodeEvent, eviction, version); 2655 } 2656 else 2657 { 2658 _remove(null, fqn, create_undo_ops, sendNodeEvent, eviction, version); 2659 } 2660 } 2661 2662 2668 2676 2677 2680 public void _addChild(GlobalTransaction gtx, Fqn parent_fqn, Object child_name, Node cn, boolean undoOps) 2681 throws CacheException 2682 { 2683 NodeSPI childNode = (NodeSPI) cn; 2684 if (log.isTraceEnabled()) 2685 { 2686 log.trace("_addChild(\"" + parent_fqn + "\", \"" + child_name + "\", node=" + childNode + ")"); 2687 } 2688 2689 if (parent_fqn == null || child_name == null || childNode == null) 2690 { 2691 log.error("parent_fqn or child_name or childNode was null"); 2692 return; 2693 } 2694 NodeSPI parentNode = findNode(parent_fqn); 2695 if (parentNode == null) 2696 { 2697 log.warn("node " + parent_fqn + " not found"); 2698 return; 2699 } 2700 2701 Fqn fqn = new Fqn(parent_fqn, child_name); 2702 notifier.notifyNodeCreated(fqn, true, true); 2703 parentNode.addChild(child_name, childNode); 2704 2705 childNode.markAsDeleted(false, true); 2706 2707 if (gtx != null && undoOps) 2708 { 2709 tx_table.addUndoOperation(gtx, MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, gtx, fqn, false)); 2711 } 2712 2713 notifier.notifyNodeCreated(fqn, false, true); 2714 } 2715 2716 2717 2723 public Object _replicate(MethodCall method_call) throws Throwable 2724 { 2725 try 2726 { 2727 InvocationContext ctx = getInvocationContext(); 2728 ctx.setOriginLocal(false); 2729 setInvocationContext(ctx); 2730 return invokeMethod(method_call); 2731 } 2732 catch (Exception ex) 2733 { 2734 log.warn("replication failure with method_call " + method_call + " exception: " + ex); 2735 throw ex; 2736 } 2737 finally 2738 { 2739 InvocationContext ctx = getInvocationContext(); 2740 ctx.setOriginLocal(true); 2741 setInvocationContext(ctx); 2742 } 2743 } 2744 2745 2748 public void _replicate(List method_calls) throws Throwable 2749 { 2750 Iterator it = method_calls.iterator(); 2751 while (it.hasNext()) _replicate((MethodCall) it.next()); 2752 } 2753 2754 2760 public List _clusteredGet(MethodCall methodCall, Boolean searchBackupSubtrees) 2761 { 2762 MethodCall call = methodCall; 2763 if (log.isTraceEnabled()) log.trace("Clustered Get called with params: " + call + ", " + searchBackupSubtrees); 2764 Method m = call.getMethod(); 2765 Object [] args = call.getArgs(); 2766 2767 Object callResults = null; 2768 2769 try 2770 { 2771 Fqn fqn = (Fqn) args[0]; 2772 2773 if (log.isTraceEnabled()) log.trace("Clustered get: invoking call " + m + " with Fqn " + fqn); 2774 callResults = m.invoke(this, args); 2775 boolean found = validResult(callResults, call, fqn); 2776 if (log.isTraceEnabled()) log.trace("Got result " + callResults + ", found=" + found); 2777 if (found && callResults == null) callResults = createEmptyResults(call); 2778 } 2779 catch (Exception e) 2780 { 2781 log.warn("Problems processing clusteredGet call", e); 2782 } 2783 2784 List results = new ArrayList (2); 2785 if (callResults != null) 2786 { 2787 results.add(true); 2788 results.add(callResults); 2789 } 2790 else 2791 { 2792 results.add(false); 2793 results.add(null); 2794 } 2795 return results; 2796 } 2797 2798 2818 public List _gravitateData(Fqn fqn, boolean searchSubtrees, boolean marshal) 2819 throws CacheException 2820 { 2821 GravitateResult result = gravitateData(fqn, searchSubtrees, marshal); 2822 return result.asList(); 2823 } 2824 2825 public GravitateResult gravitateData(Fqn fqn, boolean searchSubtrees, boolean marshal) 2826 throws CacheException 2827 { 2828 2830 2832 NodeSPI actualNode = findNode(fqn); 2833 Fqn backupNodeFqn = null; 2834 if (actualNode == null && searchSubtrees) 2835 { 2836 NodeSPI backupSubtree = findNode(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN); 2837 if (backupSubtree != null) 2838 { 2839 Map children = backupSubtree.getChildrenMapDirect(); 2840 if (children != null) 2841 { 2842 Iterator childNames = children.keySet().iterator(); 2843 while (childNames.hasNext() && actualNode == null) 2844 { 2845 backupNodeFqn = BuddyManager.getBackupFqn(childNames.next().toString(), fqn); 2846 actualNode = findNode(backupNodeFqn); 2847 } 2848 } 2849 } 2850 } 2851 2852 if (actualNode == null) 2853 { 2854 return GravitateResult.noDataFound(); 2855 } 2856 2857 if (backupNodeFqn == null) 2858 { 2859 backupNodeFqn = BuddyManager.getBackupFqn(BuddyManager.getGroupNameFromAddress(getLocalAddress()), fqn); 2860 } 2861 2862 List list = getNodeData(new LinkedList (), actualNode); 2863 if (marshal) 2864 { 2865 try 2866 { 2867 ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); 2868 MarshalledValueOutputStream maos = new MarshalledValueOutputStream(baos); 2869 maos.writeObject(list); 2870 maos.close(); 2871 return GravitateResult.marshalledResult(baos.toByteArray(), backupNodeFqn); 2872 } 2873 catch (IOException e) 2874 { 2875 throw new CacheException("Failure marshalling subtree at " + fqn, e); 2876 } 2877 } 2878 else 2879 { 2880 GravitateResult gr = GravitateResult.subtreeResult(list, backupNodeFqn); 2881 return gr; 2882 } 2883 } 2884 2885 private List getNodeData(List list, NodeSPI node) 2886 { 2887 NodeData data = new NodeData(BuddyManager.getActualFqn(node.getFqn()), node.getDataDirect()); 2888 list.add(data); 2889 for (NodeSPI childNode : node.getChildrenDirect()) 2890 { 2891 getNodeData(list, childNode); 2892 } 2893 return list; 2894 } 2895 2896 2898 public void _remoteAssignToBuddyGroup(BuddyGroup group, Map state) throws Exception 2899 { 2900 if (buddyManager != null) buddyManager.handleAssignToBuddyGroup(group, state); 2901 } 2902 2903 public void _remoteRemoveFromBuddyGroup(String groupName) throws BuddyNotInitException 2904 { 2905 if (buddyManager != null) buddyManager.handleRemoveFromBuddyGroup(groupName); 2906 } 2907 2908 public void _remoteAnnounceBuddyPoolName(IpAddress address, String buddyPoolName) 2909 { 2910 if (buddyManager != null) buddyManager.handlePoolNameBroadcast(address, buddyPoolName); 2911 } 2912 2913 public void _dataGravitationCleanup(GlobalTransaction gtx, Fqn primary, Fqn backup) throws Exception 2914 { 2915 MethodCall primaryDataCleanup, backupDataCleanup; 2916 if (buddyManager.isDataGravitationRemoveOnFind()) 2917 { 2918 primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, null, primary, false); 2919 backupDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, null, backup, false); 2920 } 2921 else 2922 { 2923 primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, primary); 2924 backupDataCleanup = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, backup); 2925 } 2926 2927 invokeMethod(primaryDataCleanup); 2928 invokeMethod(backupDataCleanup); 2929 } 2930 2931 2933 2934 2937 private boolean validResult(Object callResults, MethodCall mc, Fqn fqn) 2938 { 2939 switch (mc.getMethodId()) 2940 { 2941 case MethodDeclarations.getDataMapMethodLocal_id: 2942 case MethodDeclarations.getChildrenNamesMethodLocal_id: 2943 return callResults != null || exists(fqn); 2944 case MethodDeclarations.existsMethod_id: 2945 return (Boolean ) callResults; 2946 default: 2947 return false; 2948 } 2949 } 2950 2951 2954 private Object createEmptyResults(MethodCall mc) 2955 { 2956 switch (mc.getMethodId()) 2957 { 2958 case MethodDeclarations.getDataMapMethodLocal_id: 2959 case MethodDeclarations.getChildrenNamesMethodLocal_id: 2960 return Collections.emptyMap(); 2961 default: 2962 return null; 2963 } 2964 } 2965 2966 2969 public void _releaseAllLocks(Fqn fqn) 2970 { 2971 NodeSPI n; 2972 2973 try 2974 { 2975 n = findNode(fqn); 2976 if (n == null) 2977 { 2978 log.error("releaseAllLocks(): node " + fqn + " not found"); 2979 return; 2980 } 2981 releaseAll(n); 2982 } 2983 catch (Throwable t) 2984 { 2985 log.error("releaseAllLocks(): failed", t); 2986 } 2987 } 2988 2989 private void releaseAll(NodeSPI n) 2990 { 2991 for (NodeSPI child : n.getChildrenDirect()) 2992 { 2993 releaseAll(child); 2994 } 2995 n.getLock().releaseAll(); 2996 } 2997 2998 2999 3003 public String _print(Fqn fqn) 3004 { 3005 try 3006 { 3007 Node n = findNode(fqn); 3008 if (n == null) return null; 3009 return n.toString(); 3010 } 3011 catch (Throwable t) 3012 { 3013 return null; 3014 } 3015 } 3016 3017 3020 public void _lock(Fqn fqn, NodeLock.LockType lock_type, boolean recursive) 3021 throws TimeoutException, LockingException 3022 { 3023 log.warn("method _lock() should not be invoked on CacheImpl"); 3024 } 3025 3026 3029 public void optimisticPrepare(GlobalTransaction gtx, List modifications, Map data, Address address, boolean onePhaseCommit) 3030 { 3031 throw new UnsupportedOperationException ("optimisticPrepare() should not be called on CacheImpl directly"); 3032 } 3033 3034 3037 public void prepare(GlobalTransaction global_tx, List modifications, Address coord, boolean onePhaseCommit) 3038 { 3039 throw new UnsupportedOperationException ("prepare() should not be called on CacheImpl directly"); 3040 } 3041 3042 3045 public void commit(GlobalTransaction tx) 3046 { 3047 throw new UnsupportedOperationException ("commit() should not be called on CacheImpl directly"); 3048 } 3049 3050 3053 public void rollback(GlobalTransaction tx) { 3055 throw new UnsupportedOperationException ("rollback() should not be called on CacheImpl directly"); 3056 } 3057 3058 3059 3060 3063 public void addUndoOperation(GlobalTransaction gtx, MethodCall undo_op) 3064 { 3065 tx_table.addUndoOperation(gtx, undo_op); 3066 } 3067 3068 3071 public CacheLoaderManager getCacheLoaderManager() 3072 { 3073 return cacheLoaderManager; 3074 } 3075 3076 3079 public void setCacheLoaderManager(CacheLoaderManager cacheLoaderManager) 3080 { 3081 this.cacheLoaderManager = cacheLoaderManager; 3082 } 3083 3084 public void setConfiguration(Configuration configuration) 3085 { 3086 this.configuration = configuration; 3087 configuration.setCacheImpl(this); 3088 } 3089 3090 3093 public Notifier getNotifier() 3094 { 3095 return notifier; 3096 } 3097 3098 public InvocationContext getInvocationContext() 3099 { 3100 InvocationContext ctx = invocationContextContainer.get(); 3101 if (ctx == null) 3102 { 3103 ctx = new InvocationContext(); 3104 invocationContextContainer.set(ctx); 3105 } 3106 return ctx; 3107 } 3108 3109 public void setInvocationContext(InvocationContext ctx) 3110 { 3111 invocationContextContainer.set(ctx); 3112 } 3113 3114 public CacheJmxWrapperMBean getCacheMBeanInterface() 3115 { 3116 if (cacheMBean == null) 3117 { 3118 cacheMBean = new CacheJmxWrapper(this); 3119 } 3120 return cacheMBean; 3121 } 3122 3123 public void setCacheMBeanInterface(CacheJmxWrapperMBean mbean) 3124 { 3125 this.cacheMBean = mbean; 3126 } 3127 3128 3133 public void move(Fqn nodeToMove, Fqn newParent) 3134 { 3135 MethodCall m = MethodCallFactory.create(MethodDeclarations.moveMethodLocal, nodeToMove, newParent); 3137 invokeMethod(m); 3138 } 3139 3140 3146 public void _move(Fqn nodeToMoveFqn, Fqn newParentFqn) 3147 { 3148 NodeSPI newParent = findNode(newParentFqn); 3150 3151 if (newParent == null) 3152 { 3153 throw new NodeNotExistsException("New parent node " + newParentFqn + " does not exist when attempting to move node!!"); 3154 } 3155 3156 NodeSPI node = findNode(nodeToMoveFqn); 3157 3158 if (node == null) 3159 { 3160 throw new NodeNotExistsException("Node " + nodeToMoveFqn + " does not exist when attempting to move node!!"); 3161 } 3162 3163 NodeSPI oldParent = node.getParent(); 3164 Object nodeName = nodeToMoveFqn.getLastElement(); 3165 3166 oldParent.removeChildDirect(nodeName); 3169 newParent.addChild(nodeName, node); 3170 3171 3173 notifier.notifyNodeMoved(nodeToMoveFqn, new Fqn(newParentFqn, nodeToMoveFqn.getLastElement()), true, true); 3175 3176 moveFqns(node, newParent.getFqn()); 3178 3179 notifier.notifyNodeMoved(nodeToMoveFqn, new Fqn(newParentFqn, nodeToMoveFqn.getLastElement()), false, true); 3180 3181 if (getInvocationContext().getTransaction() != null) 3183 { 3184 MethodCall undo = MethodCallFactory.create(MethodDeclarations.moveMethodLocal, new Fqn(newParentFqn, nodeToMoveFqn.getLastElement()), oldParent.getFqn()); 3185 tx_table.addUndoOperation(getInvocationContext().getGlobalTransaction(), undo); 3186 } 3187 } 3188 3189 public void _block() 3190 { 3191 } 3193 3194 public void _unblock() 3195 { 3196 } 3198 3199 private void moveFqns(NodeSPI node, Fqn newBase) 3200 { 3201 Fqn newFqn = new Fqn(newBase, node.getFqn().getLastElement()); 3202 node.setFqn(newFqn); 3203 } 3204 3205 protected class MessageListenerAdaptor implements ExtendedMessageListener 3206 { 3207 3211 protected volatile Exception setStateException; 3212 private final Object stateLock = new Object (); 3213 3214 protected MessageListenerAdaptor() 3215 { 3216 } 3217 3218 public void waitForState() throws Exception 3219 { 3220 synchronized (stateLock) 3221 { 3222 while (!isStateSet) 3223 { 3224 if (setStateException != null) 3225 { 3226 throw setStateException; 3227 } 3228 3229 try 3230 { 3231 stateLock.wait(); 3232 } 3233 catch (InterruptedException iex) 3234 { 3235 } 3236 } 3237 } 3238 } 3239 3240 protected void stateReceivedSuccess() 3241 { 3242 isStateSet = true; 3243 setStateException = null; 3244 } 3245 3246 protected void stateReceivingFailed(Throwable t) 3247 { 3248 if (t instanceof CacheException) 3249 { 3250 log.debug(t); 3251 } 3252 else 3253 { 3254 log.error("failed setting state", t); 3255 } 3256 if (t instanceof Exception ) 3257 { 3258 setStateException = (Exception ) t; 3259 } 3260 else 3261 { 3262 setStateException = new Exception (t); 3263 } 3264 } 3265 3266 protected void stateProducingFailed(Throwable t) 3267 { 3268 if (t instanceof CacheException) 3269 { 3270 log.debug(t); 3271 } 3272 else 3273 { 3274 log.error("Caught " + t.getClass().getName() 3275 + " while responding to state transfer request", t); 3276 } 3277 } 3278 3279 3282 public void receive(Message msg) 3283 { 3284 } 3285 3286 public byte[] getState() 3287 { 3288 MarshalledValueOutputStream out = null; 3289 byte[] result = null; 3290 try 3291 { 3292 ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024); 3293 out = new MarshalledValueOutputStream(baos); 3294 3295 getStateTransferManager().getState(out, Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true); 3296 result = baos.getRawBuffer(); 3297 } 3298 catch (Throwable t) 3299 { 3300 stateProducingFailed(t); 3301 } 3302 finally 3303 { 3304 Util.close(out); 3305 } 3306 return result; 3307 } 3308 3309 public void setState(byte[] new_state) 3310 { 3311 if (new_state == null) 3312 { 3313 log.debug("transferred state is null (may be first member in cluster)"); 3314 return; 3315 } 3316 ByteArrayInputStream bais = new ByteArrayInputStream (new_state); 3317 MarshalledValueInputStream in = null; 3318 try 3319 { 3320 in = new MarshalledValueInputStream(bais); 3321 getStateTransferManager().setState(in, Fqn.ROOT, null); 3322 stateReceivedSuccess(); 3323 } 3324 catch (Throwable t) 3325 { 3326 stateReceivingFailed(t); 3327 } 3328 finally 3329 { 3330 Util.close(in); 3331 synchronized (stateLock) 3332 { 3333 stateLock.notifyAll(); 3335 } 3336 } 3337 } 3338 3339 public byte[] getState(String state_id) 3340 { 3341 MarshalledValueOutputStream out = null; 3342 String sourceRoot = state_id; 3343 byte[] result = null; 3344 3345 boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0; 3346 if (hasDifferentSourceAndIntegrationRoots) 3347 { 3348 sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[0]; 3349 } 3350 3351 try 3352 { 3353 ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024); 3354 out = new MarshalledValueOutputStream(baos); 3355 3356 getStateTransferManager().getState(out, Fqn.fromString(sourceRoot), 3357 configuration.getInitialStateRetrievalTimeout(), true, true); 3358 result = baos.getRawBuffer(); 3359 } 3360 catch (Throwable t) 3361 { 3362 stateProducingFailed(t); 3363 } 3364 finally 3365 { 3366 Util.close(out); 3367 } 3368 return result; 3369 } 3370 3371 public void getState(OutputStream ostream) 3372 { 3373 MarshalledValueOutputStream out = null; 3374 try 3375 { 3376 out = new MarshalledValueOutputStream(ostream); 3377 getStateTransferManager().getState(out, Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true); 3378 } 3379 catch (Throwable t) 3380 { 3381 stateProducingFailed(t); 3382 } 3383 finally 3384 { 3385 Util.close(out); 3386 } 3387 } 3388 3389 public void getState(String state_id, OutputStream ostream) 3390 { 3391 String sourceRoot = state_id; 3392 MarshalledValueOutputStream out = null; 3393 boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0; 3394 if (hasDifferentSourceAndIntegrationRoots) 3395 { 3396 sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[0]; 3397 } 3398 try 3399 { 3400 out = new MarshalledValueOutputStream(ostream); 3401 getStateTransferManager().getState(out, Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true); 3402 } 3403 catch (Throwable t) 3404 { 3405 stateProducingFailed(t); 3406 } 3407 finally 3408 { 3409 Util.close(out); 3410 } 3411 } 3412 3413 public void setState(InputStream istream) 3414 { 3415 if (istream == null) 3416 { 3417 log.debug("stream is null (may be first member in cluster)"); 3418 return; 3419 } 3420 MarshalledValueInputStream in = null; 3421 try 3422 { 3423 in = new MarshalledValueInputStream(istream); 3424 getStateTransferManager().setState(in, Fqn.ROOT, null); 3425 stateReceivedSuccess(); 3426 } 3427 catch (Throwable t) 3428 { 3429 stateReceivingFailed(t); 3430 } 3431 finally 3432 { 3433 Util.close(in); 3434 synchronized (stateLock) 3435 { 3436 stateLock.notifyAll(); 3438 } 3439 } 3440 } 3441 3442 public void setState(String state_id, byte[] state) 3443 { 3444 if (state == null) 3445 { 3446 log.debug("partial transferred state is null"); 3447 return; 3448 } 3449 3450 MarshalledValueInputStream in = null; 3451 String targetRoot = state_id; 3452 boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0; 3453 if (hasDifferentSourceAndIntegrationRoots) 3454 { 3455 targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[1]; 3456 } 3457 try 3458 { 3459 log.debug("Setting received partial state for subroot " + state_id); 3460 Fqn subroot = Fqn.fromString(targetRoot); 3461 Region region = regionManager.getRegion(subroot, false); 3462 ClassLoader cl = null; 3463 if (region != null) 3464 { 3465 cl = region.getClassLoader(); 3467 } 3468 ByteArrayInputStream bais = new ByteArrayInputStream (state); 3469 in = new MarshalledValueInputStream(bais); 3470 getStateTransferManager().setState(in, subroot, cl); 3471 stateReceivedSuccess(); 3472 } 3473 catch (Throwable t) 3474 { 3475 stateReceivingFailed(t); 3476 } 3477 finally 3478 { 3479 Util.close(in); 3480 synchronized (stateLock) 3481 { 3482 stateLock.notifyAll(); 3484 } 3485 } 3486 } 3487 3488 public void setState(String state_id, InputStream istream) 3489 { 3490 String targetRoot = state_id; 3491 MarshalledValueInputStream in = null; 3492 boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0; 3493 if (hasDifferentSourceAndIntegrationRoots) 3494 { 3495 targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[1]; 3496 } 3497 if (istream == null) 3498 { 3499 log.debug("stream is null (may be first member in cluster). State is not set"); 3500 return; 3501 } 3502 3503 try 3504 { 3505 log.debug("Setting received partial state for subroot " + state_id); 3506 in = new MarshalledValueInputStream(istream); 3507 Fqn subroot = Fqn.fromString(targetRoot); 3508 Region region = regionManager.getRegion(subroot, false); 3509 ClassLoader cl = null; 3510 if (region != null) 3511 { 3512 cl = region.getClassLoader(); 3514 } 3515 getStateTransferManager().setState(in, subroot, cl); 3516 stateReceivedSuccess(); 3517 } 3518 catch (Throwable t) 3519 { 3520 stateReceivingFailed(t); 3521 } 3522 finally 3523 { 3524 Util.close(in); 3525 synchronized (stateLock) 3526 { 3527 stateLock.notifyAll(); 3529 } 3530 } 3531 } 3532 } 3533 3534 3535 3536 3537 3538 public void viewAccepted(View new_view) 3539 { 3540 Vector new_mbrs = new_view.getMembers(); 3541 if (log.isInfoEnabled()) log.info("viewAccepted(): " + new_view); 3542 synchronized (members) 3543 { 3544 boolean needNotification = false; 3545 if (new_mbrs != null) 3546 { 3547 Vector removed = (Vector ) members.clone(); 3550 removed.removeAll(new_mbrs); 3551 removeLocksForDeadMembers(root, removed); 3552 3553 members.removeAllElements(); 3554 members.addAll(new_view.getMembers()); 3555 3556 needNotification = true; 3557 } 3558 3559 coordinator = (members.size() != 0 && members.get(0).equals(getLocalAddress())); 3561 3562 if (needNotification) notifier.notifyViewChange(new_view, true); 3564 3565 members.notifyAll(); 3568 } 3569 } 3570 3571 3572 3575 public void suspect(Address suspected_mbr) 3576 { 3577 } 3578 3579 3582 public void block() 3583 { 3584 if (log.isDebugEnabled()) 3585 { 3586 log.debug("Block received at " + getLocalAddress()); 3587 } 3588 MethodCall m = MethodCallFactory.create(MethodDeclarations.blockChannelLocal); 3589 invokeMethod(m); 3590 if (log.isDebugEnabled()) 3591 { 3592 log.debug("Block processed at " + getLocalAddress()); 3593 } 3594 } 3595 3596 3599 public void unblock() 3600 { 3601 if (log.isDebugEnabled()) 3602 { 3603 log.debug("UnBlock received at " + getLocalAddress()); 3604 } 3605 MethodCall m = MethodCallFactory.create(MethodDeclarations.unblockChannelLocal); 3606 invokeMethod(m); 3607 if (log.isDebugEnabled()) 3608 { 3609 log.debug("UnBlock processed at " + getLocalAddress()); 3610 } 3611 } 3612 3613 3614 3615 3616 3617 3622 protected Transaction getLocalTransaction() 3623 { 3624 if (tm == null) 3625 { 3626 return null; 3627 } 3628 try 3629 { 3630 return tm.getTransaction(); 3631 } 3632 catch (Throwable t) 3633 { 3634 return null; 3635 } 3636 } 3637 3638 3639 3642 private boolean isValid(Transaction tx) 3643 { 3644 if (tx == null) return false; 3645 int status = -1; 3646 try 3647 { 3648 status = tx.getStatus(); 3649 return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING; 3650 } 3651 catch (SystemException e) 3652 { 3653 log.error("failed getting transaction status", e); 3654 return false; 3655 } 3656 } 3657 3658 3659 3668 public GlobalTransaction getCurrentTransaction() 3669 { 3670 return getCurrentTransaction(true); 3671 } 3672 3673 3677 public GlobalTransaction getCurrentTransaction(boolean createIfNotExists) 3678 { 3679 Transaction tx; 3680 3681 if ((tx = getLocalTransaction()) == null) 3682 { return null; 3684 } 3685 3686 if (!isValid(tx)) 3687 { int status = -1; 3689 try 3690 { 3691 status = tx.getStatus(); 3692 } 3693 catch (SystemException e) 3694 { 3695 } 3696 log.warn("status is " + status + " (not ACTIVE or PREPARING); returning null)", new Throwable ()); 3697 return null; 3698 } 3699 3700 return getCurrentTransaction(tx, createIfNotExists); 3701 } 3702 3703 3706 public GlobalTransaction getCurrentTransaction(Transaction tx) 3707 { 3708 return getCurrentTransaction(tx, true); 3709 } 3710 3711 3716 public GlobalTransaction getCurrentTransaction(Transaction tx, boolean createIfNotExists) 3717 { 3718 GlobalTransaction gtx = tx_table.get(tx); 3724 if (gtx == null && createIfNotExists) 3725 { 3726 Address addr = getLocalAddress(); 3727 gtx = GlobalTransaction.create(addr); 3728 tx_table.put(tx, gtx); 3729 TransactionEntry ent = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry(); 3730 ent.setTransaction(tx); 3731 tx_table.put(gtx, ent); 3732 if (log.isTraceEnabled()) 3733 { 3734 log.trace("created new GTX: " + gtx + ", local TX=" + tx); 3735 } 3736 } 3737 return gtx; 3738 } 3739 3740 3741 3746 protected Object invokeMethod(MethodCall m) throws CacheException 3747 { 3748 try 3749 { 3750 return interceptor_chain.invoke(m); 3751 } 3752 catch (Throwable t) 3753 { 3754 if (t instanceof CacheException) 3755 { 3756 throw (CacheException) t; 3757 } 3758 throw new RuntimeException (t); 3759 } 3760 } 3761 3762 3766 protected Object getOwnerForLock() 3767 { 3768 Object owner = getCurrentTransaction(); 3769 if (owner == null) 3770 { 3771 owner = Thread.currentThread(); 3772 } 3773 3774 return owner; 3775 } 3776 3777 3793 public NodeSPI findNode(Fqn fqn) 3794 { 3795 try 3796 { 3797 return findNode(fqn, null); 3798 } 3799 catch (CacheException e) 3800 { 3801 log.warn("Unexpected error", e); 3802 return null; 3803 } 3804 } 3805 3806 private NodeSPI findNodeCheck(GlobalTransaction tx, Fqn fqn) 3807 { 3808 NodeSPI n = findNode(fqn); 3809 if (n == null) 3810 { 3811 String errStr = "node " + fqn + " not found (gtx=" + tx + ", caller=" + Thread.currentThread() + ")"; 3812 if (log.isTraceEnabled()) 3813 { 3814 log.trace(errStr); 3815 } 3816 throw new NodeNotExistsException(errStr); 3817 } 3818 return n; 3819 } 3820 3821 3826 public void realRemove(Fqn f, boolean skipMarkerCheck) 3827 { 3828 NodeSPI n = findInternal(f, true); 3829 if (n == null) 3830 { 3831 return; 3832 } 3833 3834 if (log.isDebugEnabled()) log.debug("Performing a real remove for node " + f + ", marked for removal."); 3835 if (skipMarkerCheck || n.isDeleted()) 3836 { 3837 if (n.getFqn().isRoot()) 3838 { 3839 n.markAsDeleted(false); 3841 n.removeChildrenDirect(); 3843 } 3844 else 3845 { 3846 n.getParent().removeChildDirect(n.getFqn().getLastElement()); 3847 } 3848 } 3849 else 3850 { 3851 if (log.isDebugEnabled()) log.debug("Node " + f + " NOT marked for removal as expected, not removing!"); 3852 } 3853 } 3854 3855 3858 private NodeSPI findNode(Fqn fqn, DataVersion version) throws CacheException 3859 { 3860 if (fqn == null) return null; 3861 3862 NodeSPI toReturn = findInternal(fqn, false); 3863 3864 if (version != null && configuration.isNodeLockingOptimistic()) 3865 { 3866 DataVersion nodeVersion = toReturn.getVersion(); 3868 if (log.isDebugEnabled()) 3869 { 3870 log.debug("looking for optimistic node [" + fqn + "] with version [" + version + "]. My version is [" + nodeVersion + "]"); 3871 } 3872 if (nodeVersion.newerThan(version)) 3873 { 3874 throw new CacheException("Unable to validate versions."); 3876 } 3877 } 3878 return toReturn; 3879 } 3880 3881 public synchronized RegionManager getRegionManager() 3882 { 3883 if (regionManager == null) 3884 { 3885 regionManager = new RegionManager(this); 3886 } 3887 return regionManager; 3888 } 3889 3890 3891 public VersionAwareMarshaller getMarshaller() 3892 { 3893 if (marshaller_ == null) 3894 { 3895 marshaller_ = new VersionAwareMarshaller(getRegionManager(), configuration.isInactiveOnStartup(), configuration.isUseRegionBasedMarshalling(), configuration.getReplVersionString()); 3896 } 3897 return marshaller_; 3898 } 3899 3900 3904 protected String getDefaultProperties() 3905 { 3906 return "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" + 3907 "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + 3908 "PING(timeout=1000;num_initial_members=2):" + 3909 "MERGE2(min_interval=5000;max_interval=10000):" + 3910 "FD_SOCK:" + 3911 "VERIFY_SUSPECT(timeout=1500):" + 3912 "pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):" + 3913 "UNICAST(timeout=600,1200,2400,4800):" + 3914 "pbcast.STABLE(desired_avg_gossip=20000):" + 3915 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" + 3916 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 3917 "shun=false;print_local_addr=true):" + 3918 "pbcast.STATE_TRANSFER"; 3919 } 3920 3921 private void initialiseCacheLoaderManager() throws Exception 3922 { 3923 if (cacheLoaderManager == null) 3924 { 3925 cacheLoaderManager = new CacheLoaderManager(); 3926 } 3927 cacheLoaderManager.setConfig(configuration.getCacheLoaderConfig(), this); 3928 } 3929 3930 3937 public void setCacheLoader(CacheLoader loader) 3938 { 3939 log.warn("Using deprecated config method setCacheLoader. This element will be removed in future, please use CacheLoaderConfiguration instead."); 3940 3941 try 3942 { 3943 if (cacheLoaderManager == null) initialiseCacheLoaderManager(); 3944 } 3945 catch (Exception e) 3946 { 3947 log.warn("Problem setting cache loader. Perhaps your cache loader config has not been set yet?"); 3948 } 3949 cacheLoaderManager.setCacheLoader(loader); 3950 } 3951 3952 3955 public void purgeCacheLoaders() throws Exception 3956 { 3957 if (cacheLoaderManager != null) cacheLoaderManager.purgeLoaders(true); 3958 } 3959 3960 3964 private JChannel getMultiplexerChannel() 3965 { 3966 String stackName = configuration.getMultiplexerStack(); 3967 3968 RuntimeConfig rtc = configuration.getRuntimeConfig(); 3969 JChannelFactoryMBean channelFactory = rtc.getMuxChannelFactory(); 3970 try 3971 { 3972 if (channelFactory != null) 3973 { 3974 return (JChannel) channelFactory.createMultiplexerChannel(stackName, configuration.getClusterName()); 3975 } 3976 else 3977 { 3978 String serviceName = configuration.getMultiplexerService(); 3981 3982 if (serviceName == null || serviceName.length() == 0) 3983 { 3984 return null; 3985 } 3986 3987 MBeanServer mbserver = rtc.getMbeanServer(); 3988 if (mbserver == null) 3989 { 3990 log.warn("Multiplexer service specified but MBean server not found." + 3991 " Multiplexer will not be used for cache cluster " + configuration.getClusterName() + "."); 3992 return null; 3993 } 3994 3995 ObjectName muxName = new ObjectName (serviceName); 3996 3997 if (!mbserver.isRegistered(muxName)) 3999 { 4000 log.warn("Multiplexer service specified but '" + serviceName + "' not registered." + 4001 " Multiplexer will not be used for cache cluster " + configuration.getClusterName() + "."); 4002 return null; 4003 } 4004 4005 Object [] params = {stackName, configuration.getClusterName()}; 4007 return (JChannel) mbserver.invoke(muxName, CREATE_MUX_CHANNEL, params, MUX_TYPES); 4008 } 4009 } 4010 catch (Exception e) 4011 { 4012 log.error("Multiplexer channel creation failed." + 4013 " Multiplexer will not be used for cache cluster " + configuration.getClusterName() + ".", e); 4014 return null; 4015 } 4016 } 4017 4018 4020 public List<Interceptor> getInterceptorChain() 4021 { 4022 return Collections.unmodifiableList(getInterceptors()); 4023 } 4024 4025 public void addCacheListener(CacheListener l) 4026 { 4027 getNotifier().addCacheListener(l); 4028 } 4029 4030 public void addCacheListener(Fqn region, CacheListener l) 4031 { 4032 throw new UnsupportedOperationException ("Not implemented in this release"); 4033 } 4034 4035 public void removeCacheListener(CacheListener l) 4036 { 4037 getNotifier().removeCacheListener(l); 4038 } 4039 4040 public void removeCacheListener(Fqn region, CacheListener l) 4041 { 4042 throw new UnsupportedOperationException ("Not implemented in this release"); 4043 } 4044 4045 public Set <CacheListener> getCacheListeners() 4046 { 4047 return getNotifier().getCacheListeners(); 4048 } 4049 4050 public Set <CacheListener> getCacheListeners(Fqn region) 4051 { 4052 throw new UnsupportedOperationException ("Not implemented in this release"); 4053 } 4054 4055 public synchronized void addInterceptor(Interceptor i, int position) 4056 { 4057 List<Interceptor> interceptors = getInterceptors(); 4058 4059 i.setCache(this); 4060 4061 interceptors.add(position, i); 4062 4063 Interceptor linkedChain = InterceptorChainFactory.correctInterceptorChaining(interceptors); 4065 4066 setInterceptorChain(linkedChain); 4067 } 4068 4069 public synchronized void removeInterceptor(int position) 4070 { 4071 List<Interceptor> i = getInterceptors(); 4072 i.remove(position); 4073 setInterceptorChain(InterceptorChainFactory.correctInterceptorChaining(i)); 4074 } 4075 4076 public RPCManager getRPCManager() 4077 { 4078 return RPCManager.getInstance(this); 4079 } 4080 4081 public String getClusterName() 4082 { 4083 return getConfiguration().getClusterName(); 4084 } 4085 4086 public void evict(Fqn fqn, boolean recursive) 4087 { 4088 if (recursive) 4089 { 4090 Node n = get(fqn); 4091 if (n != null) 4092 { 4093 evictChildren((NodeSPI) n); 4094 } 4095 } 4096 else 4097 { 4098 evict(fqn); 4099 } 4100 } 4101 4102 private void evictChildren(NodeSPI n) 4103 { 4104 Set <NodeSPI> children = n.getChildrenDirect(); 4105 for (NodeSPI child : children) 4106 { 4107 evictChildren(child); 4108 } 4109 evict(n.getFqn()); 4110 } 4111 4112 public Region getRegion(Fqn fqn, boolean createIfAbsent) 4113 { 4114 return getRegionManager().getRegion(fqn, createIfAbsent); 4115 } 4116 4117 public void removeNode(Fqn fqn) 4118 { 4119 remove(fqn); 4120 } 4121 4122 public void putForExternalRead(Fqn fqn, Object key, Object value) 4123 { 4124 throw new UnsupportedOperationException ("Not yet implemented."); 4125 } 4129 4130 public boolean isStarted() 4131 { 4132 return started; 4133 } 4134} 4135 | Popular Tags |