KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > tx > TransactionBatchWriter


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.object.tx;
5
6 import com.tc.bytes.TCByteBuffer;
7 import com.tc.io.TCByteBufferOutputStream;
8 import com.tc.lang.Recyclable;
9 import com.tc.object.ObjectID;
10 import com.tc.object.change.TCChangeBuffer;
11 import com.tc.object.dmi.DmiDescriptor;
12 import com.tc.object.dna.impl.DNAEncoding;
13 import com.tc.object.dna.impl.ObjectStringSerializer;
14 import com.tc.object.lockmanager.api.LockID;
15 import com.tc.object.lockmanager.api.Notify;
16 import com.tc.object.msg.CommitTransactionMessage;
17 import com.tc.object.msg.CommitTransactionMessageFactory;
18 import com.tc.util.SequenceID;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Collection JavaDoc;
22 import java.util.HashSet JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.LinkedHashMap JavaDoc;
25 import java.util.LinkedList JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28 import java.util.Set JavaDoc;
29 import java.util.Map.Entry;
30
31 public class TransactionBatchWriter implements ClientTransactionBatch {
32
33   private final CommitTransactionMessageFactory commitTransactionMessageFactory;
34   private final Set JavaDoc acknowledgedTransactionIDs = new HashSet JavaDoc();
35   private final TxnBatchID batchID;
36   private final LinkedHashMap JavaDoc transactionData = new LinkedHashMap JavaDoc();
37   private final ObjectStringSerializer serializer;
38   private final DNAEncoding encoding;
39   private int txns2Serialize = 0;
40   private final List JavaDoc batchDataOutputStreams = new ArrayList JavaDoc();
41   private short outstandingWriteCount = 0;
42   private int bytesWritten = 0;
43
44   public TransactionBatchWriter(TxnBatchID batchID, ObjectStringSerializer serializer, DNAEncoding encoding,
45                                 CommitTransactionMessageFactory commitTransactionMessageFactory) {
46     this.batchID = batchID;
47     this.encoding = encoding;
48     this.commitTransactionMessageFactory = commitTransactionMessageFactory;
49     this.serializer = serializer;
50   }
51
52   public String JavaDoc toString() {
53     return super.toString() + "[" + this.batchID + ", isEmpty=" + isEmpty() + ", size=" + numberOfTxns()
54            + ", txns2Serialize =" + txns2Serialize + "]";
55   }
56
57   public TxnBatchID getTransactionBatchID() {
58     return this.batchID;
59   }
60
61   public synchronized boolean isEmpty() {
62     return transactionData.isEmpty();
63   }
64
65   public synchronized int numberOfTxns() {
66     return transactionData.size();
67   }
68
69   public synchronized int byteSize() {
70     return bytesWritten;
71   }
72
73   public boolean isNull() {
74     return false;
75   }
76
77   public synchronized void removeTransaction(TransactionID txID) {
78     TransactionDescriptor removed = (TransactionDescriptor) transactionData.remove(txID);
79     if (removed == null) throw new AssertionError JavaDoc("Attempt to remove a transaction that doesn't exist : " + removed);
80     // if we get some acks from the previous instance of the server after we resend this
81
// transaction, but before we write to the network, then we dont recycle. We lose those
82
// buffers. But since it is a rare scenario we dont lose much, but this check avoid writting
83
// corrupt buffers.
84
if (outstandingWriteCount == 0) removed.recycle();
85   }
86
87   public synchronized void addTransaction(ClientTransaction txn) {
88     SequenceID sequenceID = txn.getSequenceID();
89     TCByteBufferOutputStream out = newOutputStream();
90
91     // /////////////////////////////////////////////////////////////////////////////////////////
92
// If you're modifying this format, you'll need to update
93
// TransactionBatchReader as well //
94
// /////////////////////////////////////////////////////////////////////////////////////////
95

96     out.writeLong(txn.getTransactionID().toLong());
97     out.writeByte(txn.getTransactionType().getType());
98     SequenceID sid = txn.getSequenceID();
99     if (sid.isNull()) throw new AssertionError JavaDoc("SequenceID is null: " + txn);
100     out.writeLong(sid.toLong());
101
102     LockID[] locks = txn.getAllLockIDs();
103     out.writeInt(locks.length);
104     for (int i = 0, n = locks.length; i < n; i++) {
105       out.writeString(locks[i].asString());
106     }
107
108     Map JavaDoc newRoots = txn.getNewRoots();
109     out.writeInt(newRoots.size());
110     for (Iterator JavaDoc i = newRoots.entrySet().iterator(); i.hasNext();) {
111       Entry entry = (Entry) i.next();
112       String JavaDoc name = (String JavaDoc) entry.getKey();
113       ObjectID id = (ObjectID) entry.getValue();
114       out.writeString(name);
115       out.writeLong(id.toLong());
116     }
117
118     List JavaDoc notifies = txn.addNotifiesTo(new LinkedList JavaDoc());
119     out.writeInt(notifies.size());
120     for (Iterator JavaDoc i = notifies.iterator(); i.hasNext();) {
121       Notify n = (Notify) i.next();
122       n.serializeTo(out);
123     }
124
125     List JavaDoc dmis = txn.getDmiDescriptors();
126     out.writeInt(dmis.size());
127     for (Iterator JavaDoc i = dmis.iterator(); i.hasNext();) {
128       DmiDescriptor dd = (DmiDescriptor) i.next();
129       dd.serializeTo(out);
130     }
131
132     Map JavaDoc changes = txn.getChangeBuffers();
133     out.writeInt(changes.size());
134     for (Iterator JavaDoc i = changes.values().iterator(); i.hasNext();) {
135       TCChangeBuffer buffer = (TCChangeBuffer) i.next();
136       buffer.writeTo(out, serializer, encoding);
137     }
138     
139     bytesWritten += out.getBytesWritten();
140     transactionData.put(txn.getTransactionID(), new TransactionDescriptor(sequenceID, out.toArray(), txn
141         .getReferencesOfObjectsInTxn()));
142   }
143
144   // Called from CommitTransactionMessageImpl
145
public synchronized TCByteBuffer[] getData() {
146     outstandingWriteCount++;
147     TCByteBufferOutputStream out = newOutputStream();
148     writeHeader(out);
149     for (Iterator JavaDoc i = transactionData.values().iterator(); i.hasNext();) {
150       TransactionDescriptor td = ((TransactionDescriptor) i.next());
151       TCByteBuffer[] data = td.getData();
152       out.write(data);
153     }
154     batchDataOutputStreams.add(out);
155
156     // System.err.println("Batch size: " + out.getBytesWritten() + ", # TXNs = " + numberOfTxns());
157

158     return out.toArray();
159   }
160
161   private void writeHeader(TCByteBufferOutputStream out) {
162     out.writeLong(this.batchID.toLong());
163     out.writeInt(numberOfTxns());
164   }
165
166   private TCByteBufferOutputStream newOutputStream() {
167     TCByteBufferOutputStream out = new TCByteBufferOutputStream(32, 4096, false);
168     return out;
169   }
170
171   public synchronized void send() {
172     CommitTransactionMessage msg = this.commitTransactionMessageFactory.newCommitTransactionMessage();
173     msg.setBatch(this, serializer);
174     msg.send();
175   }
176
177   public synchronized Collection JavaDoc addTransactionIDsTo(Collection JavaDoc c) {
178     c.addAll(transactionData.keySet());
179     return c;
180   }
181
182   public synchronized void addAcknowledgedTransactionIDs(Collection JavaDoc acknowledged) {
183     this.acknowledgedTransactionIDs.addAll(acknowledged);
184   }
185
186   public Collection JavaDoc getAcknowledgedTransactionIDs() {
187     return this.acknowledgedTransactionIDs;
188   }
189
190   public synchronized SequenceID getMinTransactionSequence() {
191     return transactionData.isEmpty() ? SequenceID.NULL_ID : ((TransactionDescriptor) transactionData.values()
192         .iterator().next()).getSequenceID();
193   }
194
195   public Collection JavaDoc addTransactionSequenceIDsTo(Collection JavaDoc sequenceIDs) {
196     for (Iterator JavaDoc i = transactionData.values().iterator(); i.hasNext();) {
197       TransactionDescriptor td = ((TransactionDescriptor) i.next());
198       sequenceIDs.add(td.getSequenceID());
199     }
200     return sequenceIDs;
201   }
202
203   // Called from CommitTransactionMessageImpl recycle on write.
204
public synchronized void recycle() {
205     for (Iterator JavaDoc iter = batchDataOutputStreams.iterator(); iter.hasNext();) {
206       TCByteBufferOutputStream buffer = (TCByteBufferOutputStream) iter.next();
207       buffer.recycle();
208     }
209     batchDataOutputStreams.clear();
210     outstandingWriteCount--;
211   }
212
213   public synchronized String JavaDoc dump() {
214     StringBuffer JavaDoc sb = new StringBuffer JavaDoc("TransactionBatchWriter = { \n");
215     for (Iterator JavaDoc i = transactionData.entrySet().iterator(); i.hasNext();) {
216       Map.Entry JavaDoc entry = (Entry) i.next();
217       sb.append(entry.getKey()).append(" = ");
218       sb.append(((TransactionDescriptor) entry.getValue()).dump());
219       sb.append("\n");
220     }
221     return sb.append(" } ").toString();
222   }
223
224   /**
225    * This is for testing only.
226    */

227   public synchronized void wait4AllTxns2Serialize() {
228     while (txns2Serialize != 0) {
229       try {
230         wait(2000);
231       } catch (InterruptedException JavaDoc e) {
232         throw new AssertionError JavaDoc(e);
233       }
234     }
235   }
236
237   private static final class TransactionDescriptor implements Recyclable {
238
239     final SequenceID sequenceID;
240     final TCByteBuffer[] data;
241     // Maintaining hard references so that it doesnt get GCed on us
242
private final Collection JavaDoc references;
243
244     TransactionDescriptor(SequenceID sequenceID, TCByteBuffer[] data, Collection JavaDoc references) {
245       this.sequenceID = sequenceID;
246       this.data = data;
247       this.references = references;
248     }
249
250     public String JavaDoc dump() {
251       return " { " + sequenceID + " , Objects in Txn = " + references.size() + " }";
252     }
253
254     SequenceID getSequenceID() {
255       return this.sequenceID;
256     }
257
258     TCByteBuffer[] getData() {
259       return data;
260     }
261
262     public void recycle() {
263       for (int i = 0; i < data.length; i++) {
264         data[i].recycle();
265       }
266     }
267   }
268
269 }
270
Popular Tags