1 22 package org.jboss.aspects.versioned; 23 24 import org.jboss.logging.Logger; 25 import org.jboss.tm.TransactionLocal; 26 import org.jboss.util.id.GUID; 27 28 import javax.transaction.Status ; 29 import javax.transaction.Synchronization ; 30 import javax.transaction.Transaction ; 31 32 import java.lang.ref.WeakReference ; 33 import java.util.ArrayList ; 34 import java.util.Collections ; 35 import java.util.HashMap ; 36 import java.util.Hashtable ; 37 import java.util.Iterator ; 38 import java.util.List ; 39 40 41 46 public class LocalSynchronizationManager implements SynchronizationManager 47 { 48 49 protected static Logger log = Logger.getLogger(DistributedSynchronizationManager.class); 50 protected TransactionLocal txSynch = new TransactionLocal(); 51 52 protected Object tableLock = new Object (); 53 protected Hashtable objectTable = new Hashtable (); 54 protected Hashtable stateTable = new Hashtable (); 55 protected DistributedVersionManager versionManager; 56 57 public LocalSynchronizationManager(DistributedVersionManager versionManager) 58 { 59 this.versionManager = versionManager; 60 } 61 62 public Object getObject(GUID guid) 63 { 64 synchronized (tableLock) 65 { 66 WeakReference ref = (WeakReference )objectTable.get(guid); 67 if (ref != null) 68 { 69 return ref.get(); 70 } 71 return null; 72 } 73 } 74 75 public void putObject(GUID guid, Object obj) 76 { 77 synchronized (tableLock) 78 { 79 objectTable.put(guid, new WeakReference (obj)); 80 } 81 } 82 83 public DistributedState getState(GUID guid) 84 { 85 synchronized (tableLock) 86 { 87 return (DistributedState)stateTable.get(guid); 88 } 89 } 90 91 public void putState(GUID guid, Object obj) 92 { 93 synchronized (tableLock) 94 { 95 stateTable.put(guid, obj); 96 } 97 } 98 99 public void registerUpdate(Transaction tx, DistributedState state) 100 throws Exception 101 { 102 if (tx == null) return; 103 GUID guid = state.getGUID(); 104 DistributedStateSynchronization synch = (DistributedStateSynchronization)txSynch.get(tx); 105 if (synch == null) 106 { 107 synch = new DistributedStateSynchronization(tx); 108 txSynch.set(tx, synch); 109 tx.registerSynchronization(synch); 110 synch.updates().put(guid, state); 111 return; 112 } 113 if (synch.updates().containsKey(guid)) return; 114 synch.updates().put(guid, state); 115 } 116 117 public void createObjects(List newObjects) throws Exception 118 { 119 log.trace("in create Objects"); 120 for (int i = 0; i < newObjects.size(); i++) 121 { 122 DistributedState state = (DistributedState)newObjects.get(i); 123 synchronized (tableLock) 124 { 125 objectTable.put(state.getGUID(), new WeakReference (state.getObject())); 126 stateTable.put(state.getGUID(), state); 127 } 128 } 129 sendNewObjects(newObjects); 130 } 131 132 public void sendNewObjects(List newObjects) throws Exception 133 { 134 } 136 137 protected void sendClusterUpdatesAndRelease(GUID globalTxId, List clusterUpdates) throws Exception 138 { 139 } 141 protected void acquireRemoteLocks(GUID globalTxId, List guids) throws Exception 142 { 143 } 145 146 public void noTxUpdate(DistributedUpdate update) throws Exception 147 { 148 } 150 151 protected void releaseHeldLocks(List locks) 152 { 153 log.trace("releaseHeldLocks"); 154 for (int i = 0; i < locks.size(); i++) 155 { 156 try 157 { 158 DistributedState state = (DistributedState)locks.get(i); 159 state.releaseWriteLock(); 160 } 161 catch (Exception ignored) 162 { 163 } 165 } 166 log.trace("end releaseHeldLocks"); 167 } 168 169 private final class DistributedStateSynchronization implements Synchronization 170 { 171 final Transaction tx; 172 HashMap managers = new HashMap (); 173 ArrayList locks; boolean optimisticLockPassed = false; 175 ArrayList clusterUpdates; 176 GUID globalTxId; 177 public DistributedStateSynchronization(final Transaction tx) 178 { 179 this.tx = tx; 180 } 181 182 public HashMap updates() { return managers; } 183 184 185 public void beforeCompletion() 186 { 187 ArrayList guidList = new ArrayList (managers.keySet()); 188 189 Collections.sort(guidList); 191 clusterUpdates = new ArrayList (); 192 locks = new ArrayList (); try 194 { 195 for (int i = 0; i < guidList.size(); i++) 196 { 197 GUID guid = (GUID)guidList.get(i); 198 DistributedState manager = (DistributedState)managers.get(guid); 199 log.trace("acquiring writelock in beforecompletion"); 200 manager.acquireWriteLock(); 201 locks.add(manager); 203 manager.checkOptimisticLock(tx); 204 clusterUpdates.add(manager.createTxUpdate(tx)); 205 } 206 globalTxId = new GUID(); 207 acquireRemoteLocks(globalTxId, guidList); 208 } 209 catch (RuntimeException ex) 210 { 211 releaseHeldLocks(locks); 213 throw ex; 214 } 215 catch (Exception ex) 216 { 217 throw new RuntimeException (ex); 218 } 219 optimisticLockPassed = true; 220 } 221 222 public void afterCompletion(int status) 223 { 224 if (status != Status.STATUS_ROLLEDBACK) 226 { 227 Iterator it = managers.values().iterator(); 228 while (it.hasNext()) 229 { 230 DistributedState manager = (DistributedState)it.next(); 231 try 232 { 233 manager.mergeState(tx); 234 } 235 catch (Exception ignored) 236 { 237 log.error("afterCompletion failed on mergeState, cache is probably inconsistent and should be flushed", ignored); 240 } 241 } 242 try 243 { 244 sendClusterUpdatesAndRelease(globalTxId, clusterUpdates); 246 } 247 catch (Exception ignored) 248 { 249 log.error("afterCompletion failed on mergeState, cache is probably inconsistent and should be flushed", ignored); 252 } 253 } 254 if (optimisticLockPassed) 255 { 256 log.trace("afterCompletion releaseHeldLocks"); 257 releaseHeldLocks(locks); 258 } 259 } 260 } 261 } 262 | Popular Tags |