1 4 package com.tc.objectserver.tx; 5 6 import com.tc.logging.TCLogger; 7 import com.tc.logging.TCLogging; 8 import com.tc.util.Assert; 9 10 import java.util.Arrays ; 11 import java.util.Collection ; 12 import java.util.HashSet ; 13 import java.util.Iterator ; 14 import java.util.LinkedList ; 15 import java.util.List ; 16 import java.util.Set ; 17 18 public class TransactionSequencer { 19 20 private static final TCLogger logger = TCLogging.getLogger(TransactionSequencer.class); 21 22 private final Set pendingTxns = new HashSet (); 23 24 private final LinkedList txnQ = new LinkedList (); 25 private final LinkedList blockedQ = new LinkedList (); 26 27 private final BlockedSet locks = new BlockedSet(); 28 private final BlockedSet objects = new BlockedSet(); 29 30 private int txnsCount; 31 private boolean reconcile = false; 32 33 public synchronized void addTransactions(List txns) { 34 if (false) log_incoming(txns); 35 txnQ.addAll(txns); 36 txnsCount += txns.size(); 37 } 38 39 private void log_incoming(List txns) { 40 for (Iterator i = txns.iterator(); i.hasNext();) { 41 ServerTransaction txn = (ServerTransaction) i.next(); 42 logger.info("Incoming : " + txn); 43 } 44 } 45 46 public synchronized ServerTransaction getNextTxnToProcess() { 47 reconcileIfNeeded(); 48 while (!txnQ.isEmpty()) { 49 50 ServerTransaction txn = (ServerTransaction) txnQ.removeFirst(); 51 if (isBlocked(txn)) { 52 addBlocked(txn); 53 } else { 54 if (false) log_outgoing(txn); 55 txnsCount--; 56 return txn; 57 } 58 } 59 if (false) log_no_txns_to_process(); 60 return null; 61 } 62 63 private void reconcileIfNeeded() { 64 if (reconcile) { 65 txnQ.addAll(0, blockedQ); 67 blockedQ.clear(); 68 locks.clearBlocked(); 69 objects.clearBlocked(); 70 reconcile = false; 71 } 72 } 73 74 private void addBlocked(ServerTransaction txn) { 75 locks.addBlocked(Arrays.asList(txn.getLockIDs())); 76 objects.addBlocked(txn.getObjectIDs()); 77 blockedQ.add(txn); 78 } 79 80 private void log_no_txns_to_process() { 81 if (txnsCount != 0) { 82 int psize = pendingTxns.size(); 83 logger.info("No More Txns that can be processed : txnCount = " + txnsCount + " and pending txns = " + psize); 84 } 85 } 86 87 private void log_outgoing(ServerTransaction txn) { 88 logger.info("Outgoing : " + txn); 89 } 90 91 private boolean isBlocked(ServerTransaction txn) { 92 return locks.isBlocked(Arrays.asList(txn.getLockIDs())) || objects.isBlocked(txn.getObjectIDs()); 93 } 94 95 public synchronized void makePending(ServerTransaction txn) { 96 locks.makePending(Arrays.asList(txn.getLockIDs())); 97 objects.makePending(txn.getObjectIDs()); 98 Assert.assertTrue(pendingTxns.add(txn.getServerTransactionID())); 99 if (false) logger.info("Make Pending : " + txn); 100 } 101 102 public synchronized void makeUnpending(ServerTransaction txn) { 103 Assert.assertTrue(pendingTxns.remove(txn.getServerTransactionID())); 104 locks.makeUnpending(Arrays.asList(txn.getLockIDs())); 105 objects.makeUnpending(txn.getObjectIDs()); 106 reconcile = true; 107 if (false) logger.info("Processed Pending : " + txn); 108 } 109 110 113 boolean isPending(List txns) { 114 for (Iterator i = txns.iterator(); i.hasNext();) { 115 ServerTransaction st = (ServerTransaction) i.next(); 116 if (pendingTxns.contains(st.getServerTransactionID())) return true; 117 } 118 return false; 119 } 120 121 private static final class BlockedSet { 122 123 Set cause = new HashSet (); 124 Set effect = new HashSet (); 125 126 public boolean isBlocked(Collection keys) { 127 for (Iterator i = keys.iterator(); i.hasNext();) { 128 Object o = i.next(); 129 if (cause.contains(o) || effect.contains(o)) { return true; } 130 } 131 return false; 132 } 133 134 public void makePending(Collection keys) { 135 cause.addAll(keys); 136 } 137 138 public void makeUnpending(Collection keys) { 139 cause.removeAll(keys); 140 } 141 142 public void addBlocked(Collection keys) { 143 effect.addAll(keys); 144 } 145 146 public void clearBlocked() { 147 effect.clear(); 148 } 149 } 150 } 151 | Popular Tags |