1 5 package com.tc.objectserver.tx; 6 7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 8 9 import com.tc.logging.TCLogger; 10 import com.tc.logging.TCLogging; 11 import com.tc.net.protocol.tcm.ChannelID; 12 import com.tc.object.ObjectID; 13 import com.tc.object.tx.ServerTransactionID; 14 import com.tc.objectserver.api.ObjectManager; 15 import com.tc.objectserver.api.ObjectManagerLookupResults; 16 import com.tc.objectserver.context.ApplyTransactionContext; 17 import com.tc.objectserver.context.CommitTransactionContext; 18 import com.tc.objectserver.context.ObjectManagerResultsContext; 19 import com.tc.objectserver.context.RecallObjectsContext; 20 import com.tc.objectserver.gtx.ServerGlobalTransactionManager; 21 import com.tc.properties.TCPropertiesImpl; 22 import com.tc.text.PrettyPrintable; 23 import com.tc.text.PrettyPrinter; 24 import com.tc.util.Assert; 25 26 import java.io.PrintWriter ; 27 import java.util.ArrayList ; 28 import java.util.Collection ; 29 import java.util.Collections ; 30 import java.util.HashMap ; 31 import java.util.HashSet ; 32 import java.util.IdentityHashMap ; 33 import java.util.Iterator ; 34 import java.util.LinkedHashMap ; 35 import java.util.List ; 36 import java.util.Map ; 37 import java.util.Set ; 38 import java.util.Map.Entry; 39 40 44 public class TransactionalObjectManagerImpl implements TransactionalObjectManager, PrettyPrintable { 45 46 private static final TCLogger logger = TCLogging 47 .getLogger(TransactionalObjectManagerImpl.class); 48 private static final int MAX_COMMIT_SIZE = TCPropertiesImpl 49 .getProperties() 50 .getInt( 51 "l2.objectmanager.maxObjectsToCommit"); 52 private final ObjectManager objectManager; 53 private final TransactionSequencer sequencer; 54 private final ServerGlobalTransactionManager gtxm; 55 56 private final Object completedTxnIdsLock = new Object (); 57 private Set completedTxnIDs = new HashSet (); 58 59 62 private final Map checkedOutObjects = new HashMap (); 63 private final Map applyPendingTxns = new HashMap (); 64 private final LinkedHashMap commitPendingTxns = new LinkedHashMap (); 65 66 private final Set pendingObjectRequest = new HashSet (); 67 private final PendingList pendingTxnList = new PendingList(); 68 private final LinkedQueue processedPendingLookups = new LinkedQueue(); 69 private final LinkedQueue processedApplys = new LinkedQueue(); 70 71 private final TransactionalStageCoordinator txnStageCoordinator; 72 73 public TransactionalObjectManagerImpl(ObjectManager objectManager, TransactionSequencer sequencer, 74 ServerGlobalTransactionManager gtxm, 75 TransactionalStageCoordinator txnStageCoordinator) { 76 this.objectManager = objectManager; 77 this.sequencer = sequencer; 78 this.gtxm = gtxm; 79 this.txnStageCoordinator = txnStageCoordinator; 80 } 81 82 public void addTransactions(ChannelID channelID, List txns, Collection completedTxnIds) { 84 sequencer.addTransactions(txns); 85 addCompletedTxnIds(completedTxnIds); 86 txnStageCoordinator.initiateLookup(); 87 } 88 89 private void addCompletedTxnIds(Collection txnIds) { 90 synchronized (completedTxnIdsLock) { 91 completedTxnIDs.addAll(txnIds); 92 } 93 } 94 95 private Set getCompletedTxnIds() { 96 synchronized (completedTxnIdsLock) { 97 Set toRet = completedTxnIDs; 98 completedTxnIDs = new HashSet (); 99 return toRet; 100 } 101 } 102 103 public void lookupObjectsForTransactions() { 105 processPendingIfNecessary(); 106 while (true) { 107 ServerTransaction txn = sequencer.getNextTxnToProcess(); 108 if (txn == null) break; 109 ServerTransactionID stxID = txn.getServerTransactionID(); 110 if (gtxm.needsApply(stxID)) { 111 lookupObjectsForApplyAndAddToSink(txn, true); 112 } else { 113 txnStageCoordinator.addToApplyStage(new ApplyTransactionContext(txn, Collections.EMPTY_MAP)); 115 } 116 } 117 } 118 119 private synchronized void processPendingIfNecessary() { 120 if (addProcessedPendingLookups()) { 121 processPendingTransactions(); 122 } 123 } 124 125 public synchronized void lookupObjectsForApplyAndAddToSink(ServerTransaction txn, boolean newTxn) { 126 Collection oids = txn.getObjectIDs(); 127 Set newRequests = new HashSet (); 129 boolean makePending = false; 130 for (Iterator i = oids.iterator(); i.hasNext();) { 131 ObjectID oid = (ObjectID) i.next(); 132 TxnObjectGrouping tog; 133 if (pendingObjectRequest.contains(oid)) { 134 makePending = true; 135 } else if ((tog = (TxnObjectGrouping) checkedOutObjects.get(oid)) == null) { 136 newRequests.add(oid); 138 } else if (tog.limitReached()) { 139 newRequests.add(oid); 141 } 144 } 145 LookupContext lookupContext = null; 147 if (!newRequests.isEmpty()) { 148 lookupContext = new LookupContext(newRequests, (newTxn ? txn.getNewObjectIDs() : Collections.EMPTY_SET)); 149 if (objectManager.lookupObjectsForCreateIfNecessary(txn.getChannelID(), lookupContext)) { 150 addLookedupObjects(lookupContext); 151 } else { 152 makePending = true; 155 pendingObjectRequest.addAll(newRequests); 156 } 157 } 158 if (makePending) { 159 makePending(txn); 161 if (lookupContext != null) lookupContext.makePending(); 162 } else { 163 ServerTransactionID txnID = txn.getServerTransactionID(); 164 TxnObjectGrouping newGrouping = new TxnObjectGrouping(txnID, txn.getNewRoots()); 165 mergeTransactionGroupings(oids, newGrouping); 166 applyPendingTxns.put(txnID, newGrouping); 167 txnStageCoordinator.addToApplyStage(new ApplyTransactionContext(txn, getRequiredObjectsMap(oids, newGrouping 168 .getObjects()))); 169 makeUnpending(txn); 170 } 172 } 173 174 public String shortDescription() { 175 return "TxnObjectManager : checked Out count = " + checkedOutObjects.size() + " apply pending txn = " 176 + applyPendingTxns.size() + " commit pending = " + commitPendingTxns.size() + " pending txns = " 177 + pendingTxnList.size() + " pending object requests = " + pendingObjectRequest.size(); 178 } 179 180 private Map getRequiredObjectsMap(Collection oids, Map objects) { 181 HashMap map = new HashMap (oids.size()); 182 for (Iterator i = oids.iterator(); i.hasNext();) { 183 Object oid = i.next(); 184 Object mo = objects.get(oid); 185 if (mo == null) { 186 dump(); 187 log("NULL !! " + oid + " not found ! " + oids); 188 log("Map contains " + objects); 189 throw new AssertionError ("Object is NULL !! : " + oid); 190 } 191 map.put(oid, mo); 192 } 193 return map; 194 } 195 196 private void log(String message) { 197 logger.info(message); 198 } 199 200 private void mergeTransactionGroupings(Collection oids, TxnObjectGrouping newGrouping) { 203 long start = System.currentTimeMillis(); 204 for (Iterator i = oids.iterator(); i.hasNext();) { 205 ObjectID oid = (ObjectID) i.next(); 206 TxnObjectGrouping oldGrouping = (TxnObjectGrouping) checkedOutObjects.get(oid); 207 if (oldGrouping == null) { 208 throw new AssertionError ("Transaction Grouping for lookedup objects is Null !! " + oid); 209 } else if (oldGrouping != newGrouping && oldGrouping.isActive()) { 210 ServerTransactionID oldTxnId = oldGrouping.getServerTransactionID(); 211 newGrouping.merge(oldGrouping); 213 commitPendingTxns.remove(oldTxnId); 214 } 215 } 216 for (Iterator j = newGrouping.getObjects().keySet().iterator(); j.hasNext();) { 217 checkedOutObjects.put(j.next(), newGrouping); 218 } 219 for (Iterator j = newGrouping.getApplyPendingTxnsIterator(); j.hasNext();) { 220 ServerTransactionID oldTxnId = (ServerTransactionID) j.next(); 221 if (applyPendingTxns.containsKey(oldTxnId)) { 222 applyPendingTxns.put(oldTxnId, newGrouping); 223 } 224 } 225 long timeTaken = System.currentTimeMillis() - start; 226 if (timeTaken > 500) { 227 log("Merged " + oids.size() + " object into " + newGrouping.shortDescription() + " in " + timeTaken + " ms"); 228 } 229 } 230 231 private synchronized void addLookedupObjects(LookupContext context) { 232 Map lookedUpObjects = context.getLookedUpObjects(); 233 Assert.assertTrue(lookedUpObjects != null && lookedUpObjects.size() > 0); 234 TxnObjectGrouping tg = new TxnObjectGrouping(lookedUpObjects); 235 for (Iterator i = lookedUpObjects.keySet().iterator(); i.hasNext();) { 236 Object oid = i.next(); 237 pendingObjectRequest.remove(oid); 238 checkedOutObjects.put(oid, tg); 239 } 240 } 241 242 private void makePending(ServerTransaction txn) { 243 if (pendingTxnList.add(txn)) { 244 sequencer.makePending(txn); 245 } 246 } 247 248 private void makeUnpending(ServerTransaction txn) { 249 if (pendingTxnList.remove(txn)) { 250 sequencer.makeUnpending(txn); 251 } 252 } 253 254 private boolean addProcessedPendingLookups() { 255 LookupContext c; 256 boolean processedPending = false; 257 try { 258 while ((c = (LookupContext) processedPendingLookups.poll(0)) != null) { 259 addLookedupObjects(c); 260 processedPending = true; 261 } 262 } catch (InterruptedException e) { 263 throw new AssertionError (e); 264 } 265 return processedPending; 266 } 267 268 private void addProcessedPending(LookupContext context) { 269 try { 270 processedPendingLookups.put(context); 271 } catch (InterruptedException e) { 272 throw new AssertionError (e); 273 } 274 txnStageCoordinator.initiateLookup(); 275 } 276 277 private void processPendingTransactions() { 278 List copy = pendingTxnList.copy(); 279 for (Iterator i = copy.iterator(); i.hasNext();) { 280 ServerTransaction txn = (ServerTransaction) i.next(); 281 lookupObjectsForApplyAndAddToSink(txn, false); 282 } 283 } 284 285 public boolean applyTransactionComplete(ServerTransactionID stxnID) { 287 try { 288 processedApplys.put(stxnID); 289 } catch (InterruptedException e) { 290 throw new AssertionError (e); 291 } 292 txnStageCoordinator.initiateApplyComplete(); 293 return true; 294 } 295 296 public void processApplyComplete() { 298 try { 299 ServerTransactionID txnID; 300 ArrayList txnIDs = new ArrayList (); 301 while ((txnID = (ServerTransactionID) processedApplys.poll(0)) != null) { 302 txnIDs.add(txnID); 303 } 304 if (txnIDs.size() > 0) { 305 processApplyTxnComplete(txnIDs); 306 } 307 } catch (InterruptedException e) { 308 throw new AssertionError (e); 309 } 310 } 311 312 private synchronized void processApplyTxnComplete(ArrayList txnIDs) { 313 for (Iterator i = txnIDs.iterator(); i.hasNext();) { 314 ServerTransactionID stxnID = (ServerTransactionID) i.next(); 315 processApplyTxnComplete(stxnID); 316 } 317 } 318 319 private void processApplyTxnComplete(ServerTransactionID stxnID) { 320 TxnObjectGrouping grouping = (TxnObjectGrouping) applyPendingTxns.remove(stxnID); 321 Assert.assertNotNull(grouping); 322 if (grouping.applyComplete(stxnID)) { 323 ServerTransactionID pTxnID = grouping.getServerTransactionID(); 326 Assert.assertNull(applyPendingTxns.get(pTxnID)); 327 Object old = commitPendingTxns.put(pTxnID, grouping); 328 Assert.assertNull(old); 329 txnStageCoordinator.initiateCommit(); 330 } 331 } 332 333 public synchronized void commitTransactionsComplete(CommitTransactionContext ctc) { 335 336 if (commitPendingTxns.isEmpty()) return; 337 338 Map newRoots = new HashMap (); 339 Map objects = new HashMap (); 340 Collection txnIDs = new ArrayList (); 341 for (Iterator i = commitPendingTxns.values().iterator(); i.hasNext();) { 342 TxnObjectGrouping tog = (TxnObjectGrouping) i.next(); 343 newRoots.putAll(tog.getNewRoots()); 344 txnIDs.addAll(tog.getTxnIDs()); 345 objects.putAll(tog.getObjects()); 346 i.remove(); 347 if (objects.size() > MAX_COMMIT_SIZE) { 348 break; 349 } 350 } 351 352 ctc.initialize(txnIDs, objects.values(), newRoots, getCompletedTxnIds()); 353 354 for (Iterator j = objects.keySet().iterator(); j.hasNext();) { 355 Object old = checkedOutObjects.remove(j.next()); 356 Assert.assertNotNull(old); 357 } 358 359 if (!commitPendingTxns.isEmpty()) { 360 txnStageCoordinator.initiateCommit(); 362 } 363 } 364 365 public void recallAllCheckedoutObject() { 367 txnStageCoordinator.initiateRecallAll(); 368 } 369 370 public synchronized void recallCheckedoutObject(RecallObjectsContext roc) { 372 processPendingIfNecessary(); 373 if (roc.recallAll()) { 374 IdentityHashMap recalled = new IdentityHashMap (); 375 HashMap recalledObjects = new HashMap (); 376 for (Iterator i = checkedOutObjects.entrySet().iterator(); i.hasNext();) { 377 Entry e = (Entry) i.next(); 378 TxnObjectGrouping tog = (TxnObjectGrouping) e.getValue(); 379 if (tog.getServerTransactionID().isNull()) { 380 i.remove(); 381 if (!recalled.containsKey(tog)) { 382 recalled.put(tog, null); 383 recalledObjects.putAll(tog.getObjects()); 384 } 385 } 386 } 387 if (!recalledObjects.isEmpty()) { 388 logger.info("Recalling " + recalledObjects.size() + " Objects to ObjectManager"); 389 objectManager.releaseAll(recalledObjects.values()); 390 } 391 } 392 } 393 394 public void dump() { 395 PrintWriter pw = new PrintWriter (System.err); 396 new PrettyPrinter(pw).visit(this); 397 pw.flush(); 398 } 399 400 public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) { 401 out.println(getClass().getName()); 402 out.indent().print("checkedOutObjects: ").visit(checkedOutObjects).println(); 403 out.indent().print("applyPendingTxns: ").visit(applyPendingTxns).println(); 404 out.indent().print("commitPendingTxns: ").visit(commitPendingTxns).println(); 405 out.indent().print("pendingTxnList: ").visit(pendingTxnList).println(); 406 out.indent().print("pendingObjectRequest: ").visit(pendingObjectRequest).println(); 407 return out; 408 } 409 410 private class LookupContext implements ObjectManagerResultsContext { 411 412 private boolean pending = false; 413 private boolean resultsSet = false; 414 private Map lookedUpObjects; 415 private final Set oids; 416 private final Set newOids; 417 418 public LookupContext(Set oids, Set newOids) { 419 this.oids = oids; 420 this.newOids = newOids; 421 } 422 423 public synchronized void makePending() { 424 pending = true; 425 if (resultsSet) { 426 TransactionalObjectManagerImpl.this.addProcessedPending(this); 427 } 428 } 429 430 public synchronized void setResults(ObjectManagerLookupResults results) { 431 lookedUpObjects = results.getObjects(); 432 resultsSet = true; 433 if (pending) { 434 TransactionalObjectManagerImpl.this.addProcessedPending(this); 435 } 436 } 437 438 public synchronized Map getLookedUpObjects() { 439 return lookedUpObjects; 440 } 441 442 public String toString() { 443 return "LookupContext [ " + oids + "] = { pending = " + pending + ", lookedupObjects = " 444 + lookedUpObjects.keySet() + "}"; 445 } 446 447 public Set getLookupIDs() { 448 return oids; 449 } 450 451 public Set getNewObjectIDs() { 452 return newOids; 453 } 454 455 } 456 457 private static final class PendingList { 458 LinkedHashMap pending = new LinkedHashMap (); 459 460 public boolean add(ServerTransaction txn) { 461 ServerTransactionID sTxID = txn.getServerTransactionID(); 462 if (pending.containsKey(sTxID)) { 464 return false; 465 } else { 466 pending.put(sTxID, txn); 467 return true; 468 } 469 } 470 471 public List copy() { 472 return new ArrayList (pending.values()); 473 } 474 475 public boolean remove(ServerTransaction txn) { 476 return (pending.remove(txn.getServerTransactionID()) != null); 477 } 478 479 public String toString() { 480 return "PendingList : pending Txns = " + pending; 481 } 482 483 public int size() { 484 return pending.size(); 485 } 486 } 487 } 488 | Popular Tags |