1 package org.jboss.cache.loader; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jboss.cache.AbstractCacheListener; 6 import org.jboss.cache.CacheListener; 7 import org.jboss.cache.CacheSPI; 8 import org.jboss.cache.Fqn; 9 import org.jboss.cache.Modification; 10 import org.jboss.cache.NodeSPI; 11 import org.jgroups.Address; 12 import org.jgroups.View; 13 14 import java.io.ObjectInputStream ; 15 import java.util.Collection ; 16 import java.util.List ; 17 import java.util.Map ; 18 import java.util.Set ; 19 import java.util.Vector ; 20 21 39 public class SingletonStoreCacheLoader extends AbstractDelegatingCacheLoader 40 { 41 private static final Log log = LogFactory.getLog(SingletonStoreCacheLoader.class); 42 private Address localAddress; 43 private boolean active; private boolean pushStateWhenCoordinator; 45 private Thread pushStateThread; 46 private CacheListener cacheListener; 47 48 public SingletonStoreCacheLoader(CacheLoader cacheLoader, boolean pushConfiguration) 49 { 50 super(cacheLoader); 51 pushStateWhenCoordinator = pushConfiguration; 52 cacheListener = new SingletonStoreListener(); 53 } 54 55 public CacheListener getCacheListener() 56 { 57 return cacheListener; 58 } 59 60 protected void activeStatusChanged(boolean newActiveState) 61 { 62 active = newActiveState; 63 log.debug("changed mode: " + this); 64 if (active && pushStateWhenCoordinator) 65 { 66 if (pushStateThread == null || !pushStateThread.isAlive()) 67 { 68 pushStateThread = createPushStateThread(); 69 pushStateThread.setName("InMemoryToCacheLoaderPusher"); 70 pushStateThread.start(); 71 } 72 else 73 { 74 try 75 { 76 log.debug("joining currently running state push thread"); 77 pushStateThread.join(); 78 } 79 catch (InterruptedException e) 80 { 81 log.error("joining existing push state thread was interrupted", e); 82 } 83 } 84 } 85 } 86 87 protected Thread createPushStateThread() 88 { 89 return new Thread (new Runnable () 90 { 91 public void run() 92 { 93 log.debug("start pushing in-memory state to cache cacheLoader"); 94 try 95 { 96 pushState(cache.getRoot()); 97 log.debug("in-memory state passed to cache cacheLoader successfully"); 98 } 99 catch (Exception e) 100 { 101 log.error("unable to finish pushing the state", e); 102 } 103 } 104 }); 105 } 106 107 private boolean isCoordinator(View newView) 108 { 109 if (newView != null && localAddress != null) 110 { 111 Vector mbrs = newView.getMembers(); 112 if (mbrs != null) 113 { 114 if (mbrs.size() > 0 && localAddress.equals(mbrs.firstElement())) 115 { 116 117 return true; 118 } 119 } 120 121 return false; 122 } 123 124 125 return active; 126 } 127 128 private void pushState(NodeSPI node) throws Exception 129 { 130 131 Set keys = node.getKeysDirect(); 132 Fqn fqn = node.getFqn(); 133 134 for (Object aKey : keys) 135 { 136 Object value = cache.get(fqn, aKey); 137 put(fqn, aKey, value); 138 } 139 140 141 Collection <NodeSPI> children = node.getChildrenDirect(); 142 for (NodeSPI aChildren : children) 143 { 144 pushState(aChildren); 146 } 147 } 148 149 public Object put(Fqn name, Object key, Object value) throws Exception 150 { 151 if (active) 152 { 153 return super.put(name, key, value); 154 } 155 156 return null; 157 } 158 159 public void put(Fqn name, Map attributes) throws Exception 160 { 161 if (active) 162 { 163 super.put(name, attributes); 164 } 165 } 166 167 public void put(List <Modification> modifications) throws Exception 168 { 169 if (active) 170 { 171 super.put(modifications); 172 } 173 } 174 175 public Object remove(Fqn fqn, Object key) throws Exception 176 { 177 if (active) 178 { 179 return super.remove(fqn, key); 180 } 181 182 return null; 183 } 184 185 public void remove(Fqn fqn) throws Exception 186 { 187 if (active) 188 { 189 super.remove(fqn); 190 } 191 } 192 193 public void removeData(Fqn fqn) throws Exception 194 { 195 if (active) 196 { 197 super.removeData(fqn); 198 } 199 } 200 201 public void prepare(Object tx, List <Modification> modifications, boolean one_phase) throws Exception 202 { 203 if (active) 204 { 205 super.prepare(tx, modifications, one_phase); 206 } 207 } 208 209 public void commit(Object tx) throws Exception 210 { 211 if (active) 212 { 213 super.commit(tx); 214 } 215 } 216 217 public void rollback(Object tx) 218 { 219 if (active) 220 { 221 super.rollback(tx); 222 } 223 } 224 225 public void storeEntireState(ObjectInputStream is) throws Exception 226 { 227 if (active) 228 { 229 super.storeEntireState(is); 230 } 231 } 232 233 public void storeState(Fqn subtree, ObjectInputStream is) throws Exception 234 { 235 if (active) 236 { 237 super.storeState(subtree, is); 238 } 239 } 240 241 public Thread getPushStateThread() 242 { 243 return pushStateThread; 244 } 245 246 public String toString() 247 { 248 return "loc_addr=" + localAddress + ", active=" + active; 249 } 250 251 256 private class SingletonStoreListener extends AbstractCacheListener 257 { 258 public void cacheStarted(CacheSPI cache) 259 { 260 localAddress = cache.getLocalAddress(); 261 active = cache.getRPCManager().isCoordinator(); 262 log.debug("cache started: " + this); 263 } 264 265 public void cacheStopped(CacheSPI cache) 266 { 267 log.debug("cache stopped: " + this); 268 } 269 270 public void viewChange(View newView) 271 { 272 boolean tmp = isCoordinator(newView); 273 274 if (active != tmp) 275 { 276 activeStatusChanged(tmp); 277 } 278 } 279 } 280 } | Popular Tags |