1 5 package com.tc.objectserver.tx; 6 7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 8 9 import com.tc.logging.TCLogger; 10 import com.tc.logging.TCLogging; 11 import com.tc.net.protocol.tcm.ChannelID; 12 import com.tc.object.ObjectID; 13 import com.tc.object.dna.api.DNA; 14 import com.tc.object.dna.impl.VersionizedDNAWrapper; 15 import com.tc.object.gtx.GlobalTransactionID; 16 import com.tc.object.net.ChannelStats; 17 import com.tc.object.tx.ServerTransactionID; 18 import com.tc.object.tx.TransactionID; 19 import com.tc.objectserver.api.ObjectInstanceMonitor; 20 import com.tc.objectserver.api.ObjectManager; 21 import com.tc.objectserver.core.api.ManagedObject; 22 import com.tc.objectserver.gtx.ServerGlobalTransactionManager; 23 import com.tc.objectserver.l1.api.ClientStateManager; 24 import com.tc.objectserver.l1.impl.TransactionAcknowledgeAction; 25 import com.tc.objectserver.lockmanager.api.LockManager; 26 import com.tc.objectserver.managedobject.BackReferences; 27 import com.tc.objectserver.persistence.api.PersistenceTransaction; 28 import com.tc.objectserver.persistence.api.TransactionStore; 29 import com.tc.stats.counter.Counter; 30 31 import java.util.ArrayList ; 32 import java.util.Collection ; 33 import java.util.Collections ; 34 import java.util.HashMap ; 35 import java.util.Iterator ; 36 import java.util.List ; 37 import java.util.Map ; 38 import java.util.Set ; 39 import java.util.Map.Entry; 40 41 public class ServerTransactionManagerImpl implements ServerTransactionManager, ServerTransactionManagerMBean { 42 43 private static final TCLogger logger = TCLogging 44 .getLogger(ServerTransactionManager.class); 45 46 private final Map transactionAccounts = Collections.synchronizedMap(new HashMap ()); 47 private final ClientStateManager stateManager; 48 private final ObjectManager objectManager; 49 private final TransactionAcknowledgeAction action; 50 private final LockManager lockManager; 51 private final List rootEventListeners = new CopyOnWriteArrayList(); 52 private final List txnEventListeners = new CopyOnWriteArrayList(); 53 54 private final Counter transactionRateCounter; 55 56 private final ChannelStats channelStats; 57 58 private final ServerGlobalTransactionManager gtxm; 59 60 public ServerTransactionManagerImpl(ServerGlobalTransactionManager gtxm, TransactionStore transactionStore, 61 LockManager lockManager, ClientStateManager stateManager, 62 ObjectManager objectManager, TransactionAcknowledgeAction action, 63 Counter transactionRateCounter, ChannelStats channelStats) { 64 this.gtxm = gtxm; 65 this.lockManager = lockManager; 66 this.objectManager = objectManager; 67 this.stateManager = stateManager; 68 this.action = action; 69 this.transactionRateCounter = transactionRateCounter; 70 this.channelStats = channelStats; 71 this.addTransactionListener(gtxm); 72 } 73 74 public void dump() { 75 StringBuffer buf = new StringBuffer ("ServerTransactionManager"); 76 buf.append("transactionAccounts: " + transactionAccounts); 77 buf.append("\n/ServerTransactionManager"); 78 System.err.println(buf.toString()); 79 } 80 81 public void shutdownClient(ChannelID waitee) { 84 transactionAccounts.remove(waitee); 85 Map currentStates = new HashMap (transactionAccounts); 86 for (Iterator i = currentStates.keySet().iterator(); i.hasNext();) { 87 ChannelID key = (ChannelID) i.next(); 88 89 TransactionAccount client = getTransactionAccount(key); 90 if (client != null) { 91 for (Iterator it = client.requestersWaitingFor(waitee).iterator(); it.hasNext();) { 92 TransactionID reqID = (TransactionID) it.next(); 93 acknowledgement(client.getClientID(), reqID, waitee); 94 } 95 } 96 } 97 98 stateManager.shutdownClient(waitee); 99 lockManager.clearAllLocksFor(waitee); 100 gtxm.shutdownClient(waitee); 101 fireClientDisconnectedEvent(waitee); 102 } 103 104 public void setResentTransactionIDs(ChannelID channelID, Collection transactionIDs) { 105 Collection stxIDs = new ArrayList(); 106 for (Iterator iter = transactionIDs.iterator(); iter.hasNext();) { 107 TransactionID txn = (TransactionID) iter.next(); 108 stxIDs.add(new ServerTransactionID(channelID, txn)); 109 } 110 fireAddResentTransactionIDsEvent(stxIDs); 111 } 112 113 public void addWaitingForAcknowledgement(ChannelID waiter, TransactionID txnID, ChannelID waitee) { 114 TransactionAccount ci = getOrCreateTransactionAccount(waiter); 115 ci.addWaitee(waitee, txnID); 116 } 117 118 public boolean isWaiting(ChannelID waiter, TransactionID txnID) { 120 TransactionAccount c = getTransactionAccount(waiter); 121 return c != null && c.hasWaitees(txnID); 122 } 123 124 private void acknowledge(ChannelID waiter, TransactionID txnID) { 125 final ServerTransactionID serverTxnID = new ServerTransactionID(waiter, txnID); 126 fireTransactionCompleteEvent(serverTxnID); 127 if (!gtxm.needsApply(serverTxnID)) { 128 action.acknowledgeTransaction(serverTxnID); 131 } 132 } 133 134 public void acknowledgement(ChannelID waiter, TransactionID txnID, ChannelID waitee) { 135 136 TransactionAccount transactionAccount = getTransactionAccount(waiter); 137 if (transactionAccount == null) { 138 logger.warn("Waiter not found in the states map: " + waiter); 141 return; 142 } 143 144 if (transactionAccount.removeWaitee(waitee, txnID)) { 145 acknowledge(waiter, txnID); 146 } 147 } 148 149 public void apply(GlobalTransactionID gtxID, ServerTransaction txn, Map objects, BackReferences includeIDs, 150 ObjectInstanceMonitor instanceMonitor) { 151 152 final ChannelID channelID = txn.getChannelID(); 153 final TransactionID txnID = txn.getTransactionID(); 154 final List changes = txn.getChanges(); 155 156 TransactionAccount ci; 157 if (txn.isPassive()) { 158 ci = getOrCreateNullTransactionAccount(channelID); 159 } else { 160 ci = getOrCreateTransactionAccount(channelID); 163 } 164 ci.applyStarted(txnID); 165 166 for (Iterator i = changes.iterator(); i.hasNext();) { 167 DNA orgDNA = (DNA) i.next(); 168 long version = orgDNA.getVersion(); 169 if (version == DNA.NULL_VERSION) { 170 version = gtxID.toLong(); 171 } 172 DNA change = new VersionizedDNAWrapper(orgDNA, version, true); 173 ManagedObject mo = (ManagedObject) objects.get(change.getObjectID()); 174 mo.apply(change, txnID, includeIDs, instanceMonitor); 175 if (!change.isDelta() && !txn.isPassive()) { 176 stateManager.addReference(txn.getChannelID(), mo.getID()); 178 } 179 } 180 181 Map newRoots = txn.getNewRoots(); 182 183 if (newRoots.size() > 0) { 184 for (Iterator i = newRoots.entrySet().iterator(); i.hasNext();) { 185 Entry entry = (Entry) i.next(); 186 String rootName = (String ) entry.getKey(); 187 ObjectID newID = (ObjectID) entry.getValue(); 188 objectManager.createRoot(rootName, newID); 189 } 190 } 191 transactionRateCounter.increment(); 192 if (!channelID.isNull()) channelStats.notifyTransaction(channelID); 193 194 } 195 196 public void skipApplyAndCommit(ServerTransaction txn) { 197 final ChannelID channelID = txn.getChannelID(); 198 final TransactionID txnID = txn.getTransactionID(); 199 TransactionAccount ci = getOrCreateTransactionAccount(channelID); 200 if (ci.skipApplyAndCommit(txnID)) { 201 acknowledge(channelID, txnID); 202 } 203 fireTransactionAppliedEvent(txn.getServerTransactionID()); 204 } 205 206 public void release(PersistenceTransaction ptx, Collection objects, Map newRoots) { 207 objectManager.releaseAll(ptx, objects); 209 210 for (Iterator i = newRoots.entrySet().iterator(); i.hasNext();) { 213 Map.Entry entry = (Entry) i.next(); 214 fireRootCreatedEvent((String ) entry.getKey(), (ObjectID) entry.getValue()); 215 } 216 } 217 218 public void incomingTransactions(ChannelID cid, Set serverTxnIDs, boolean relayed) { 219 TransactionAccount ci = getOrCreateTransactionAccount(cid); 220 for (Iterator i = serverTxnIDs.iterator(); i.hasNext();) { 221 final ServerTransactionID txnId = (ServerTransactionID) i.next(); 222 final TransactionID txnID = txnId.getClientTransactionID(); 223 if (!relayed) { 224 ci.relayTransactionComplete(txnID); 225 } 226 } 227 fireIncomingTransactionsEvent(cid, serverTxnIDs); 228 } 229 230 public void transactionsRelayed(ChannelID channelID, Set serverTxnIDs) { 231 TransactionAccount ci = getOrCreateTransactionAccount(channelID); 232 if (ci == null) { 233 logger.warn("transactionsRelayed(): TransactionAccount not found for " + channelID); 234 return; 235 } 236 for (Iterator i = serverTxnIDs.iterator(); i.hasNext();) { 237 final ServerTransactionID txnId = (ServerTransactionID) i.next(); 238 final TransactionID txnID = txnId.getClientTransactionID(); 239 if (ci.relayTransactionComplete(txnID)) { 240 acknowledge(channelID, txnID); 241 } 242 } 243 } 244 245 public void committed(Collection txnsIds) { 246 for (Iterator i = txnsIds.iterator(); i.hasNext();) { 247 final ServerTransactionID txnId = (ServerTransactionID) i.next(); 248 final ChannelID waiter = txnId.getChannelID(); 249 final TransactionID txnID = txnId.getClientTransactionID(); 250 251 TransactionAccount ci = getTransactionAccount(waiter); 252 if (ci != null && ci.applyCommitted(txnID)) { 253 acknowledge(waiter, txnID); 254 } 255 256 fireTransactionAppliedEvent(txnId); 258 } 259 } 260 261 public void broadcasted(ChannelID waiter, TransactionID txnID) { 262 TransactionAccount ci = getTransactionAccount(waiter); 263 264 if (ci != null && ci.broadcastCompleted(txnID)) { 265 acknowledge(waiter, txnID); 266 } 267 } 268 269 private TransactionAccount getOrCreateTransactionAccount(ChannelID clientID) { 270 synchronized (transactionAccounts) { 271 TransactionAccount ta = (TransactionAccount) transactionAccounts.get(clientID); 272 if ((ta == null) || (ta instanceof NullTransactionAccount)) { 273 Object old = transactionAccounts.put(clientID, (ta = new TransactionAccountImpl(clientID))); 274 if (old != null) { 275 logger.info("Transaction Account changed from : " + old + " to " + ta); 276 } 277 } 278 return ta; 279 } 280 } 281 282 private TransactionAccount getOrCreateNullTransactionAccount(ChannelID clientID) { 283 synchronized (transactionAccounts) { 284 TransactionAccount ta = (TransactionAccount) transactionAccounts.get(clientID); 285 if ((ta == null) || (ta instanceof TransactionAccountImpl)) { 286 Object old = transactionAccounts.put(clientID, (ta = new NullTransactionAccount(clientID))); 287 if (old != null) { 288 logger.info("Transaction Account changed from : " + old + " to " + ta); 289 } 290 } 291 return ta; 292 293 } 294 } 295 296 private TransactionAccount getTransactionAccount(ChannelID clientID) { 297 return (TransactionAccount) transactionAccounts.get(clientID); 298 } 299 300 public void addRootListener(ServerTransactionManagerEventListener listener) { 301 if (listener == null) { throw new IllegalArgumentException ("listener cannot be null"); } 302 this.rootEventListeners.add(listener); 303 } 304 305 private void fireRootCreatedEvent(String rootName, ObjectID id) { 306 for (Iterator iter = rootEventListeners.iterator(); iter.hasNext();) { 307 try { 308 ServerTransactionManagerEventListener listener = (ServerTransactionManagerEventListener) iter.next(); 309 listener.rootCreated(rootName, id); 310 } catch (Exception e) { 311 if (logger.isDebugEnabled()) { 312 logger.debug(e); 313 } else { 314 logger.warn("Exception in rootCreated event callback: " + e.getMessage()); 315 } 316 } 317 } 318 } 319 320 public void addTransactionListener(ServerTransactionListener listener) { 321 if (listener == null) { throw new IllegalArgumentException ("listener cannot be null"); } 322 this.txnEventListeners.add(listener); 323 } 324 325 private void fireIncomingTransactionsEvent(ChannelID cid, Set serverTxnIDs) { 326 for (Iterator iter = txnEventListeners.iterator(); iter.hasNext();) { 327 try { 328 ServerTransactionListener listener = (ServerTransactionListener) iter.next(); 329 listener.incomingTransactions(cid, serverTxnIDs); 330 } catch (Exception e) { 331 if (logger.isDebugEnabled()) { 332 logger.debug(e); 333 } else { 334 logger.warn("Exception in Txn complete event callback: " + e.getMessage()); 335 } 336 } 337 } 338 } 339 340 private void fireTransactionCompleteEvent(ServerTransactionID stxID) { 341 for (Iterator iter = txnEventListeners.iterator(); iter.hasNext();) { 342 try { 343 ServerTransactionListener listener = (ServerTransactionListener) iter.next(); 344 listener.transactionCompleted(stxID); 345 } catch (Exception e) { 346 if (logger.isDebugEnabled()) { 347 logger.debug(e); 348 } else { 349 logger.warn("Exception in Txn complete event callback: " + e.getMessage()); 350 } 351 } 352 } 353 } 354 355 private void fireTransactionAppliedEvent(ServerTransactionID stxID) { 356 for (Iterator iter = txnEventListeners.iterator(); iter.hasNext();) { 357 try { 358 ServerTransactionListener listener = (ServerTransactionListener) iter.next(); 359 listener.transactionApplied(stxID); 360 } catch (Exception e) { 361 if (logger.isDebugEnabled()) { 362 logger.debug(e); 363 } else { 364 logger.warn("Exception in Txn Applied event callback: " + e.getMessage()); 365 } 366 } 367 } 368 } 369 370 private void fireAddResentTransactionIDsEvent(Collection stxIDs) { 371 for (Iterator iter = txnEventListeners.iterator(); iter.hasNext();) { 372 try { 373 ServerTransactionListener listener = (ServerTransactionListener) iter.next(); 374 listener.addResentServerTransactionIDs(stxIDs); 375 } catch (Exception e) { 376 if (logger.isDebugEnabled()) { 377 logger.debug(e); 378 } else { 379 logger.warn("Exception in addResentServerTransactionIDs() event callback: " + e.getMessage()); 380 } 381 } 382 } 383 } 384 385 private void fireClientDisconnectedEvent(ChannelID waitee) { 386 for (Iterator iter = txnEventListeners.iterator(); iter.hasNext();) { 387 try { 388 ServerTransactionListener listener = (ServerTransactionListener) iter.next(); 389 listener.clearAllTransactionsFor(waitee); 390 } catch (Exception e) { 391 if (logger.isDebugEnabled()) { 392 logger.debug(e); 393 } else { 394 logger.warn("Exception in addResentServerTransactionIDs() event callback: " + e.getMessage()); 395 } 396 } 397 } 398 } 399 } 400 | Popular Tags |