1 22 package org.jboss.aspects.versioned; 23 24 import org.jboss.ha.framework.interfaces.HAPartition; 25 import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener; 26 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer; 27 import org.jboss.logging.Logger; 28 import org.jboss.util.id.GUID; 29 30 import java.io.Serializable ; 31 import java.util.ArrayList ; 32 import java.util.Hashtable ; 33 import java.util.Iterator ; 34 import java.util.List ; 35 import java.util.Vector ; 36 37 43 public class DistributedSynchronizationManager extends LocalSynchronizationManager implements HAPartitionStateTransfer, HAMembershipListener 44 { 45 46 protected static Logger log = Logger.getLogger(DistributedSynchronizationManager.class); 47 protected HAPartition partition; 48 protected String domainName; 49 protected Hashtable heldLocks = new Hashtable (); 50 51 public DistributedSynchronizationManager(String domainName, DistributedVersionManager versionManager, HAPartition partition) 52 { 53 super(versionManager); 54 this.partition = partition; 55 this.domainName = domainName + "/SynchManager"; 56 } 57 58 public void create() throws Exception 59 { 60 partition.registerRPCHandler(domainName, this); 62 } 63 64 public void start() throws Exception 65 { 66 pullState(); 67 } 68 69 protected void pullState() throws Exception 70 { 71 Object [] args = {}; 72 ArrayList rsp = partition.callMethodOnCluster(domainName, "getCurrentState", args, true); 73 if (rsp.size() > 0) 74 setCurrentState((Serializable )rsp.get(0)); 75 } 76 77 public Serializable getCurrentState() 78 { 79 if(log.isTraceEnabled() ) 80 log.trace("getCurrentState called"); 81 return stateTable; 82 } 83 84 public void setCurrentState(Serializable newState) 85 { 86 if( log.isTraceEnabled() ) 87 log.trace("setCurrentState called"); 88 try 89 { 90 synchronized (tableLock) 91 { 92 this.stateTable = (Hashtable )newState; 93 log.trace("setCurrentState, size: " + stateTable.size()); 94 Iterator it = stateTable.values().iterator(); 95 while (it.hasNext()) 96 { 97 DistributedState state = (DistributedState)it.next(); 98 if (objectTable.containsKey(state.getGUID())) continue; 99 state.buildObject(this, versionManager); 100 } 101 } 102 } 103 catch (Exception ex) 104 { 105 log.error("failed to set state sent from cluster", ex); 106 } 107 } 108 109 110 public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) 111 { 112 for (int i = 0; i < deadMembers.size(); i++) 113 { 114 Hashtable held = (Hashtable )heldLocks.remove(deadMembers.get(i)); 115 if (held != null) 116 { 117 Iterator it = held.values().iterator(); 118 while (it.hasNext()) 119 { 120 List list = (List )it.next(); 121 releaseHeldLocks(list); 122 } 123 } 124 } 125 } 126 127 public void sendNewObjects(List newObjects) throws Exception 128 { 129 log.trace("sending new objects"); 130 try 131 { 132 Object [] args = {newObjects}; 133 checkResponses(partition.callMethodOnCluster(domainName, "addNewObjects", args, true)); 134 } 135 catch (Exception ex) 136 { 137 log.error("serious cache problems, data inconsistency is imminent", ex); 138 throw ex; 139 } 140 141 } 142 143 protected void sendClusterUpdatesAndRelease(GUID globalTxId, List clusterUpdates) throws Exception 144 { 145 try 146 { 147 Object [] args = {partition.getNodeName(), globalTxId, clusterUpdates}; 148 checkResponses(partition.callMethodOnCluster(domainName, "updateObjects", args, true)); 149 150 } 151 catch (Exception ex) 152 { 153 log.error("serious cache problems, data inconsistency is imminent", ex); 154 throw ex; 155 } 156 } 157 protected void acquireRemoteLocks(GUID globalTxId, List guids) throws Exception 158 { 159 try 160 { 161 162 Object [] args = {partition.getNodeName(), globalTxId, guids}; 163 checkResponses(partition.callMethodOnCluster(domainName, "acquireLocks", args, true)); 164 } 165 catch (Exception ex) 166 { 167 try 168 { 169 Object [] args = {partition.getNodeName()}; 170 partition.callMethodOnCluster(domainName, "releaseHeldLocks", args, true); 171 } 172 catch (Exception ignored) 173 { 174 } 175 throw ex; 176 } 177 } 178 179 public void noTxUpdate(DistributedUpdate update) throws Exception 180 { 181 throw new RuntimeException ("NOT IMPLEMENTED"); 182 } 183 184 public void addNewObjects(List newObjects) throws Exception 185 { 186 synchronized (tableLock) 188 { 189 for (int i = 0; i < newObjects.size(); i++) 190 { 191 DistributedState state = (DistributedState)newObjects.get(i); 192 stateTable.put(state.getGUID(), state); 194 } 195 for (int i = 0; i < newObjects.size(); i++) 196 { 197 DistributedState state = (DistributedState)newObjects.get(i); 198 if (objectTable.containsKey(state.getGUID())) continue; 199 state.buildObject(this, versionManager); 200 } 201 } 202 } 203 204 public void updateObjects(String nodeName, GUID globalTxId, ArrayList updates) throws Exception 205 { 206 log.trace("updateObjects"); 207 synchronized (tableLock) 208 { 209 for (int i = 0; i < updates.size(); i++) 210 { 211 DistributedUpdate update = (DistributedUpdate)updates.get(i); 212 DistributedState state = (DistributedState)stateTable.get(update.getGUID()); 214 state.mergeState(update); 215 state.releaseWriteLock(); 216 } 217 } 218 Hashtable table = (Hashtable )heldLocks.get(nodeName); 219 table.remove(globalTxId); 220 log.trace("end updateObjects"); 221 } 222 223 public void releaseHeldLocks(String nodeName, GUID globalTxId) 224 { 225 Hashtable held = (Hashtable )heldLocks.get(nodeName); 226 if (held == null) return; 227 228 List locks = (List )held.remove(globalTxId); 229 if (locks != null) releaseHeldLocks(locks); 230 } 231 232 public void acquireLocks(String nodeName, GUID globalTxId, List list) throws Exception 233 { 234 log.trace("acquireLocks"); 235 ArrayList locks = new ArrayList (); 236 try 237 { 238 for (int i = 0; i < list.size(); i++) 239 { 240 GUID guid = (GUID)list.get(i); 241 DistributedState state = getState(guid); 242 state.acquireWriteLock(); 243 locks.add(state); 244 } 245 Hashtable held = (Hashtable )heldLocks.get(nodeName); 246 if (held == null) 247 { 248 held = new Hashtable (); 249 heldLocks.put(nodeName, held); 250 } 251 held.put(globalTxId, locks); 252 } 253 catch (Exception ex) 254 { 255 releaseHeldLocks(locks); 256 throw ex; 257 } 258 log.trace("end acquireLocks"); 259 } 260 261 267 protected void checkResponses(List rsps) throws Exception { 268 Object rsp; 269 if(rsps != null) { 270 for(Iterator it=rsps.iterator(); it.hasNext();) { 271 rsp=it.next(); 272 if(rsp != null) { 273 if(rsp instanceof RuntimeException ) 274 throw (RuntimeException )rsp; 275 if(rsp instanceof Exception ) 276 throw (Exception )rsp; 277 } 278 } 279 } 280 } 281 282 } 283 | Popular Tags |