1 5 package com.tc.object.tx; 6 7 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 8 9 import com.tc.exception.TCRuntimeException; 10 import com.tc.logging.TCLogger; 11 import com.tc.logging.TCLogging; 12 import com.tc.properties.TCPropertiesImpl; 13 import com.tc.util.SequenceGenerator; 14 import com.tc.util.SequenceID; 15 import com.tc.util.Util; 16 17 public class TransactionSequencer { 18 19 private static final TCLogger logger = TCLogging.getLogger(TransactionSequencer.class); 20 21 private static final boolean LOGGING_ENABLED; 22 private static final int MAX_BYTE_SIZE_FOR_BATCH; 23 private static final int MAX_PENDING_BATCHES; 24 private static final long MAX_SLEEP_TIME_BEFORE_HALT; 25 26 static { 27 LOGGING_ENABLED = TCPropertiesImpl.getProperties().getBoolean("l1.transactionmanager.logging.enabled"); 29 MAX_BYTE_SIZE_FOR_BATCH = TCPropertiesImpl.getProperties().getInt("l1.transactionmanager.maxBatchSizeInKiloBytes") * 1024; 30 MAX_PENDING_BATCHES = TCPropertiesImpl.getProperties().getInt("l1.transactionmanager.maxPendingBatches"); 31 MAX_SLEEP_TIME_BEFORE_HALT = TCPropertiesImpl.getProperties() 32 .getLong("l1.transactionmanager.maxSleepTimeBeforeHalt"); 33 } 34 35 private final SequenceGenerator sequence = new SequenceGenerator(1); 36 private final TransactionBatchFactory batchFactory; 37 private final BoundedLinkedQueue pendingBatches = new BoundedLinkedQueue(MAX_PENDING_BATCHES); 38 39 private ClientTransactionBatch currentBatch; 40 private int pending_size = 0; 41 42 private int slowDownStartsAt; 43 private double sleepTimeIncrements; 44 private int txnsPerBatch = 0; 45 private boolean shutdown = false; 46 47 public TransactionSequencer(TransactionBatchFactory batchFactory) { 48 this.batchFactory = batchFactory; 49 currentBatch = createNewBatch(); 50 this.slowDownStartsAt = (int) (MAX_PENDING_BATCHES * 0.66); 51 this.sleepTimeIncrements = MAX_SLEEP_TIME_BEFORE_HALT / (MAX_PENDING_BATCHES - slowDownStartsAt); 52 if (LOGGING_ENABLED) log_settings(); 53 } 54 55 private void log_settings() { 56 logger.info("Max Byte Size for Batches = " + MAX_BYTE_SIZE_FOR_BATCH + " Max Pending Batches = " 57 + MAX_PENDING_BATCHES); 58 logger.info("Max Sleep time = " + MAX_SLEEP_TIME_BEFORE_HALT + " Slow down starts at = " + slowDownStartsAt 59 + " sleep time increments = " + sleepTimeIncrements); 60 } 61 62 private ClientTransactionBatch createNewBatch() { 63 return batchFactory.nextBatch(); 64 } 65 66 private void addTransactionToBatch(ClientTransaction txn, ClientTransactionBatch batch) { 67 batch.addTransaction(txn); 68 } 69 70 public synchronized void addTransaction(ClientTransaction txn) { 71 if (shutdown) { 72 logger.error("Sequencer shutdown. Not committing " + txn); 73 return; 74 } 75 76 try { 77 addTxnInternal(txn); 78 } catch (Throwable t) { 79 shutdown = true; 81 if (t instanceof Error ) { throw (Error ) t; } 82 if (t instanceof RuntimeException ) { throw (RuntimeException ) t; } 83 throw new RuntimeException (t); 84 } 85 } 86 87 public synchronized void shutdown() { 88 shutdown = true; 89 } 90 91 94 private void addTxnInternal(ClientTransaction txn) { 95 SequenceID sequenceID = new SequenceID(sequence.getNextSequence()); 96 txn.setSequenceID(sequenceID); 97 txnsPerBatch++; 98 99 addTransactionToBatch(txn, currentBatch); 100 if (currentBatch.byteSize() > MAX_BYTE_SIZE_FOR_BATCH) { 101 put(currentBatch); 102 reconcilePendingSize(); 103 if (LOGGING_ENABLED) log_stats(); 104 currentBatch = createNewBatch(); 105 txnsPerBatch = 0; 106 } 107 throttle(); 108 } 109 110 private void throttle() { 111 int diff = pending_size - slowDownStartsAt; 112 if (diff >= 0) { 113 long sleepTime = (long) (1 + diff * sleepTimeIncrements); 114 try { 115 wait(sleepTime); 116 } catch (InterruptedException e) { 117 throw new TCRuntimeException(e); 118 } 119 } 120 } 121 122 private void reconcilePendingSize() { 123 pending_size = pendingBatches.size(); 124 } 125 126 private void put(ClientTransactionBatch batch) { 127 try { 128 pendingBatches.put(batch); 129 } catch (InterruptedException e) { 130 throw new TCRuntimeException(e); 131 } 132 } 133 134 private void log_stats() { 135 int size = pending_size; 136 if (size == MAX_PENDING_BATCHES) { 137 logger.info("Max pending size reached !!! : Pending Batches size = " + size + " TxnsInBatch = " + txnsPerBatch); 138 } else if (size % 5 == 0) { 139 logger.info("Pending Batch Size : " + size + " TxnsInBatch = " + txnsPerBatch); 140 } 141 } 142 143 private ClientTransactionBatch get() { 144 boolean isInterrupted = false; 145 ClientTransactionBatch returnValue = null; 146 while (true) { 147 try { 148 returnValue = (ClientTransactionBatch) pendingBatches.poll(0); 149 break; 150 } catch (InterruptedException e) { 151 isInterrupted = true; 152 if (returnValue != null) { 153 break; 154 } 155 } 156 } 157 Util.selfInterruptIfNeeded(isInterrupted); 158 return returnValue; 159 } 160 161 private ClientTransactionBatch peek() { 162 return (ClientTransactionBatch) pendingBatches.peek(); 163 } 164 165 public ClientTransactionBatch getNextBatch() { 166 ClientTransactionBatch batch = get(); 167 if (batch != null) return batch; 168 synchronized (this) { 169 batch = get(); 171 reconcilePendingSize(); 172 notifyAll(); 173 if (batch != null) return batch; 174 if (!currentBatch.isEmpty()) { 175 batch = currentBatch; 176 currentBatch = createNewBatch(); 177 return batch; 178 } 179 return null; 180 } 181 } 182 183 186 public synchronized void clear() { 187 while (get() != null) { 188 } 190 currentBatch = createNewBatch(); 191 } 192 193 public SequenceID getNextSequenceID() { 194 ClientTransactionBatch batch = peek(); 195 if (batch != null) return batch.getMinTransactionSequence(); 196 synchronized (this) { 197 batch = peek(); 198 if (batch != null) return batch.getMinTransactionSequence(); 199 if (!currentBatch.isEmpty()) return currentBatch.getMinTransactionSequence(); 200 SequenceID currentSequenceID = new SequenceID(sequence.getCurrentSequence()); 201 return currentSequenceID.next(); 202 } 203 } 204 205 } 206 | Popular Tags |