KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > tx > TransactionSequencer


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object.tx;
6
7 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
8
9 import com.tc.exception.TCRuntimeException;
10 import com.tc.logging.TCLogger;
11 import com.tc.logging.TCLogging;
12 import com.tc.properties.TCPropertiesImpl;
13 import com.tc.util.SequenceGenerator;
14 import com.tc.util.SequenceID;
15 import com.tc.util.Util;
16
17 public class TransactionSequencer {
18
19   private static final TCLogger logger = TCLogging.getLogger(TransactionSequencer.class);
20
21   private static final boolean LOGGING_ENABLED;
22   private static final int MAX_BYTE_SIZE_FOR_BATCH;
23   private static final int MAX_PENDING_BATCHES;
24   private static final long MAX_SLEEP_TIME_BEFORE_HALT;
25
26   static {
27     // Set the values from the properties here.
28
LOGGING_ENABLED = TCPropertiesImpl.getProperties().getBoolean("l1.transactionmanager.logging.enabled");
29     MAX_BYTE_SIZE_FOR_BATCH = TCPropertiesImpl.getProperties().getInt("l1.transactionmanager.maxBatchSizeInKiloBytes") * 1024;
30     MAX_PENDING_BATCHES = TCPropertiesImpl.getProperties().getInt("l1.transactionmanager.maxPendingBatches");
31     MAX_SLEEP_TIME_BEFORE_HALT = TCPropertiesImpl.getProperties()
32         .getLong("l1.transactionmanager.maxSleepTimeBeforeHalt");
33   }
34
35   private final SequenceGenerator sequence = new SequenceGenerator(1);
36   private final TransactionBatchFactory batchFactory;
37   private final BoundedLinkedQueue pendingBatches = new BoundedLinkedQueue(MAX_PENDING_BATCHES);
38
39   private ClientTransactionBatch currentBatch;
40   private int pending_size = 0;
41
42   private int slowDownStartsAt;
43   private double sleepTimeIncrements;
44   private int txnsPerBatch = 0;
45   private boolean shutdown = false;
46
47   public TransactionSequencer(TransactionBatchFactory batchFactory) {
48     this.batchFactory = batchFactory;
49     currentBatch = createNewBatch();
50     this.slowDownStartsAt = (int) (MAX_PENDING_BATCHES * 0.66);
51     this.sleepTimeIncrements = MAX_SLEEP_TIME_BEFORE_HALT / (MAX_PENDING_BATCHES - slowDownStartsAt);
52     if (LOGGING_ENABLED) log_settings();
53   }
54
55   private void log_settings() {
56     logger.info("Max Byte Size for Batches = " + MAX_BYTE_SIZE_FOR_BATCH + " Max Pending Batches = "
57                 + MAX_PENDING_BATCHES);
58     logger.info("Max Sleep time = " + MAX_SLEEP_TIME_BEFORE_HALT + " Slow down starts at = " + slowDownStartsAt
59                 + " sleep time increments = " + sleepTimeIncrements);
60   }
61
62   private ClientTransactionBatch createNewBatch() {
63     return batchFactory.nextBatch();
64   }
65
66   private void addTransactionToBatch(ClientTransaction txn, ClientTransactionBatch batch) {
67     batch.addTransaction(txn);
68   }
69
70   public synchronized void addTransaction(ClientTransaction txn) {
71     if (shutdown) {
72       logger.error("Sequencer shutdown. Not committing " + txn);
73       return;
74     }
75
76     try {
77       addTxnInternal(txn);
78     } catch (Throwable JavaDoc t) {
79       // logging of exceptions is done at a higher level
80
shutdown = true;
81       if (t instanceof Error JavaDoc) { throw (Error JavaDoc) t; }
82       if (t instanceof RuntimeException JavaDoc) { throw (RuntimeException JavaDoc) t; }
83       throw new RuntimeException JavaDoc(t);
84     }
85   }
86
87   public synchronized void shutdown() {
88     shutdown = true;
89   }
90
91   /**
92    * XXX::Note : There is automatic throttling built in by adding to a BoundedLinkedQueue from within a synch block
93    */

94   private void addTxnInternal(ClientTransaction txn) {
95     SequenceID sequenceID = new SequenceID(sequence.getNextSequence());
96     txn.setSequenceID(sequenceID);
97     txnsPerBatch++;
98
99     addTransactionToBatch(txn, currentBatch);
100     if (currentBatch.byteSize() > MAX_BYTE_SIZE_FOR_BATCH) {
101       put(currentBatch);
102       reconcilePendingSize();
103       if (LOGGING_ENABLED) log_stats();
104       currentBatch = createNewBatch();
105       txnsPerBatch = 0;
106     }
107     throttle();
108   }
109
110   private void throttle() {
111     int diff = pending_size - slowDownStartsAt;
112     if (diff >= 0) {
113       long sleepTime = (long) (1 + diff * sleepTimeIncrements);
114       try {
115         wait(sleepTime);
116       } catch (InterruptedException JavaDoc e) {
117         throw new TCRuntimeException(e);
118       }
119     }
120   }
121
122   private void reconcilePendingSize() {
123     pending_size = pendingBatches.size();
124   }
125
126   private void put(ClientTransactionBatch batch) {
127     try {
128       pendingBatches.put(batch);
129     } catch (InterruptedException JavaDoc e) {
130       throw new TCRuntimeException(e);
131     }
132   }
133
134   private void log_stats() {
135     int size = pending_size;
136     if (size == MAX_PENDING_BATCHES) {
137       logger.info("Max pending size reached !!! : Pending Batches size = " + size + " TxnsInBatch = " + txnsPerBatch);
138     } else if (size % 5 == 0) {
139       logger.info("Pending Batch Size : " + size + " TxnsInBatch = " + txnsPerBatch);
140     }
141   }
142
143   private ClientTransactionBatch get() {
144     boolean isInterrupted = false;
145     ClientTransactionBatch returnValue = null;
146     while (true) {
147       try {
148         returnValue = (ClientTransactionBatch) pendingBatches.poll(0);
149         break;
150       } catch (InterruptedException JavaDoc e) {
151         isInterrupted = true;
152         if (returnValue != null) {
153           break;
154         }
155       }
156     }
157     Util.selfInterruptIfNeeded(isInterrupted);
158     return returnValue;
159   }
160
161   private ClientTransactionBatch peek() {
162     return (ClientTransactionBatch) pendingBatches.peek();
163   }
164
165   public ClientTransactionBatch getNextBatch() {
166     ClientTransactionBatch batch = get();
167     if (batch != null) return batch;
168     synchronized (this) {
169       // Check again to avoid sending the txn in the wrong order
170
batch = get();
171       reconcilePendingSize();
172       notifyAll();
173       if (batch != null) return batch;
174       if (!currentBatch.isEmpty()) {
175         batch = currentBatch;
176         currentBatch = createNewBatch();
177         return batch;
178       }
179       return null;
180     }
181   }
182
183   /**
184    * Used only for testing
185    */

186   public synchronized void clear() {
187     while (get() != null) {
188       // remove all pending
189
}
190     currentBatch = createNewBatch();
191   }
192
193   public SequenceID getNextSequenceID() {
194     ClientTransactionBatch batch = peek();
195     if (batch != null) return batch.getMinTransactionSequence();
196     synchronized (this) {
197       batch = peek();
198       if (batch != null) return batch.getMinTransactionSequence();
199       if (!currentBatch.isEmpty()) return currentBatch.getMinTransactionSequence();
200       SequenceID currentSequenceID = new SequenceID(sequence.getCurrentSequence());
201       return currentSequenceID.next();
202     }
203   }
204
205 }
206
Popular Tags