1 5 package com.tc.objectserver.tx; 6 7 import com.tc.logging.TCLogger; 8 import com.tc.logging.TCLogging; 9 import com.tc.net.protocol.tcm.ChannelID; 10 import com.tc.object.tx.TransactionID; 11 import com.tc.object.tx.TxnBatchID; 12 import com.tc.util.Assert; 13 14 import java.util.HashMap ; 15 import java.util.Map ; 16 17 public class TransactionBatchManagerImpl implements TransactionBatchManager { 18 19 private static final TCLogger logger = TCLogging.getLogger(TransactionBatchManagerImpl.class); 20 21 private final Map map = new HashMap (); 22 23 public synchronized void defineBatch(ChannelID channelID, TxnBatchID batchID, int numTxns) { 24 BatchStats batchStats = getOrCreateStats(channelID); 25 batchStats.defineBatch(batchID, numTxns); 26 } 27 28 private BatchStats getOrCreateStats(ChannelID channelID) { 29 BatchStats bs = (BatchStats) map.get(channelID); 30 if (bs == null) { 31 bs = new BatchStats(channelID); 32 map.put(channelID, bs); 33 } 34 return bs; 35 } 36 37 public synchronized boolean batchComponentComplete(ChannelID channelID, TxnBatchID batchID, TransactionID txnID) { 38 BatchStats bs = (BatchStats) map.get(channelID); 39 Assert.assertNotNull(bs); 40 return bs.batchComplete(batchID, txnID); 41 } 42 43 public synchronized void shutdownClient(ChannelID channelID) { 44 BatchStats bs = (BatchStats) map.get(channelID); 45 if (bs != null) { 46 bs.shutdownClient(); 47 } 48 } 49 50 private void cleanUp(ChannelID channelID) { 51 map.remove(channelID); 52 } 53 54 public class BatchStats { 55 private final ChannelID channelID; 56 57 private int batchCount; 58 private int txnCount; 59 private float avg; 60 61 private boolean killed = false; 62 63 public BatchStats(ChannelID channelID) { 64 this.channelID = channelID; 65 } 66 67 public void defineBatch(TxnBatchID batchID, int numTxns) { 68 long adjustedTotal = (long) (batchCount * avg) + numTxns; 69 txnCount += numTxns; 70 batchCount++; 71 avg = adjustedTotal / batchCount; 72 if (false) log_stats(); 73 } 74 75 private void log_stats() { 76 logger.info(channelID + " : Batch Stats : batch count = " + batchCount + " txnCount = " + txnCount + " avg = " 77 + avg); 78 } 79 80 private void log_stats(float thresh) { 81 logger.info(channelID + " : Batch Stats : batch count = " + batchCount + " txnCount = " + txnCount + " avg = " 82 + avg + " threshold = " + thresh); 83 } 84 85 public boolean batchComplete(TxnBatchID batchID, TransactionID txnID) { 86 txnCount--; 87 if (killed) { 88 if (txnCount == 0) { 90 cleanUp(channelID); 91 return true; 92 } else { 93 return false; 94 } 95 } 96 float threshold = (avg * (batchCount - 1)); 97 98 if (false) log_stats(threshold); 99 100 if (txnCount <= threshold) { 101 batchCount--; 102 return true; 103 } else { 104 return false; 105 } 106 } 107 108 public void shutdownClient() { 109 this.killed = true; 110 } 111 } 112 } 113 | Popular Tags |