1 4 package com.tc.object.tx; 5 6 import com.tc.bytes.TCByteBuffer; 7 import com.tc.io.TCByteBufferOutputStream; 8 import com.tc.lang.Recyclable; 9 import com.tc.object.ObjectID; 10 import com.tc.object.change.TCChangeBuffer; 11 import com.tc.object.dmi.DmiDescriptor; 12 import com.tc.object.dna.impl.DNAEncoding; 13 import com.tc.object.dna.impl.ObjectStringSerializer; 14 import com.tc.object.lockmanager.api.LockID; 15 import com.tc.object.lockmanager.api.Notify; 16 import com.tc.object.msg.CommitTransactionMessage; 17 import com.tc.object.msg.CommitTransactionMessageFactory; 18 import com.tc.util.SequenceID; 19 20 import java.util.ArrayList ; 21 import java.util.Collection ; 22 import java.util.HashSet ; 23 import java.util.Iterator ; 24 import java.util.LinkedHashMap ; 25 import java.util.LinkedList ; 26 import java.util.List ; 27 import java.util.Map ; 28 import java.util.Set ; 29 import java.util.Map.Entry; 30 31 public class TransactionBatchWriter implements ClientTransactionBatch { 32 33 private final CommitTransactionMessageFactory commitTransactionMessageFactory; 34 private final Set acknowledgedTransactionIDs = new HashSet (); 35 private final TxnBatchID batchID; 36 private final LinkedHashMap transactionData = new LinkedHashMap (); 37 private final ObjectStringSerializer serializer; 38 private final DNAEncoding encoding; 39 private int txns2Serialize = 0; 40 private final List batchDataOutputStreams = new ArrayList (); 41 private short outstandingWriteCount = 0; 42 private int bytesWritten = 0; 43 44 public TransactionBatchWriter(TxnBatchID batchID, ObjectStringSerializer serializer, DNAEncoding encoding, 45 CommitTransactionMessageFactory commitTransactionMessageFactory) { 46 this.batchID = batchID; 47 this.encoding = encoding; 48 this.commitTransactionMessageFactory = commitTransactionMessageFactory; 49 this.serializer = serializer; 50 } 51 52 public String toString() { 53 return super.toString() + "[" + this.batchID + ", isEmpty=" + isEmpty() + ", size=" + numberOfTxns() 54 + ", txns2Serialize =" + txns2Serialize + "]"; 55 } 56 57 public TxnBatchID getTransactionBatchID() { 58 return this.batchID; 59 } 60 61 public synchronized boolean isEmpty() { 62 return transactionData.isEmpty(); 63 } 64 65 public synchronized int numberOfTxns() { 66 return transactionData.size(); 67 } 68 69 public synchronized int byteSize() { 70 return bytesWritten; 71 } 72 73 public boolean isNull() { 74 return false; 75 } 76 77 public synchronized void removeTransaction(TransactionID txID) { 78 TransactionDescriptor removed = (TransactionDescriptor) transactionData.remove(txID); 79 if (removed == null) throw new AssertionError ("Attempt to remove a transaction that doesn't exist : " + removed); 80 if (outstandingWriteCount == 0) removed.recycle(); 85 } 86 87 public synchronized void addTransaction(ClientTransaction txn) { 88 SequenceID sequenceID = txn.getSequenceID(); 89 TCByteBufferOutputStream out = newOutputStream(); 90 91 96 out.writeLong(txn.getTransactionID().toLong()); 97 out.writeByte(txn.getTransactionType().getType()); 98 SequenceID sid = txn.getSequenceID(); 99 if (sid.isNull()) throw new AssertionError ("SequenceID is null: " + txn); 100 out.writeLong(sid.toLong()); 101 102 LockID[] locks = txn.getAllLockIDs(); 103 out.writeInt(locks.length); 104 for (int i = 0, n = locks.length; i < n; i++) { 105 out.writeString(locks[i].asString()); 106 } 107 108 Map newRoots = txn.getNewRoots(); 109 out.writeInt(newRoots.size()); 110 for (Iterator i = newRoots.entrySet().iterator(); i.hasNext();) { 111 Entry entry = (Entry) i.next(); 112 String name = (String ) entry.getKey(); 113 ObjectID id = (ObjectID) entry.getValue(); 114 out.writeString(name); 115 out.writeLong(id.toLong()); 116 } 117 118 List notifies = txn.addNotifiesTo(new LinkedList ()); 119 out.writeInt(notifies.size()); 120 for (Iterator i = notifies.iterator(); i.hasNext();) { 121 Notify n = (Notify) i.next(); 122 n.serializeTo(out); 123 } 124 125 List dmis = txn.getDmiDescriptors(); 126 out.writeInt(dmis.size()); 127 for (Iterator i = dmis.iterator(); i.hasNext();) { 128 DmiDescriptor dd = (DmiDescriptor) i.next(); 129 dd.serializeTo(out); 130 } 131 132 Map changes = txn.getChangeBuffers(); 133 out.writeInt(changes.size()); 134 for (Iterator i = changes.values().iterator(); i.hasNext();) { 135 TCChangeBuffer buffer = (TCChangeBuffer) i.next(); 136 buffer.writeTo(out, serializer, encoding); 137 } 138 139 bytesWritten += out.getBytesWritten(); 140 transactionData.put(txn.getTransactionID(), new TransactionDescriptor(sequenceID, out.toArray(), txn 141 .getReferencesOfObjectsInTxn())); 142 } 143 144 public synchronized TCByteBuffer[] getData() { 146 outstandingWriteCount++; 147 TCByteBufferOutputStream out = newOutputStream(); 148 writeHeader(out); 149 for (Iterator i = transactionData.values().iterator(); i.hasNext();) { 150 TransactionDescriptor td = ((TransactionDescriptor) i.next()); 151 TCByteBuffer[] data = td.getData(); 152 out.write(data); 153 } 154 batchDataOutputStreams.add(out); 155 156 158 return out.toArray(); 159 } 160 161 private void writeHeader(TCByteBufferOutputStream out) { 162 out.writeLong(this.batchID.toLong()); 163 out.writeInt(numberOfTxns()); 164 } 165 166 private TCByteBufferOutputStream newOutputStream() { 167 TCByteBufferOutputStream out = new TCByteBufferOutputStream(32, 4096, false); 168 return out; 169 } 170 171 public synchronized void send() { 172 CommitTransactionMessage msg = this.commitTransactionMessageFactory.newCommitTransactionMessage(); 173 msg.setBatch(this, serializer); 174 msg.send(); 175 } 176 177 public synchronized Collection addTransactionIDsTo(Collection c) { 178 c.addAll(transactionData.keySet()); 179 return c; 180 } 181 182 public synchronized void addAcknowledgedTransactionIDs(Collection acknowledged) { 183 this.acknowledgedTransactionIDs.addAll(acknowledged); 184 } 185 186 public Collection getAcknowledgedTransactionIDs() { 187 return this.acknowledgedTransactionIDs; 188 } 189 190 public synchronized SequenceID getMinTransactionSequence() { 191 return transactionData.isEmpty() ? SequenceID.NULL_ID : ((TransactionDescriptor) transactionData.values() 192 .iterator().next()).getSequenceID(); 193 } 194 195 public Collection addTransactionSequenceIDsTo(Collection sequenceIDs) { 196 for (Iterator i = transactionData.values().iterator(); i.hasNext();) { 197 TransactionDescriptor td = ((TransactionDescriptor) i.next()); 198 sequenceIDs.add(td.getSequenceID()); 199 } 200 return sequenceIDs; 201 } 202 203 public synchronized void recycle() { 205 for (Iterator iter = batchDataOutputStreams.iterator(); iter.hasNext();) { 206 TCByteBufferOutputStream buffer = (TCByteBufferOutputStream) iter.next(); 207 buffer.recycle(); 208 } 209 batchDataOutputStreams.clear(); 210 outstandingWriteCount--; 211 } 212 213 public synchronized String dump() { 214 StringBuffer sb = new StringBuffer ("TransactionBatchWriter = { \n"); 215 for (Iterator i = transactionData.entrySet().iterator(); i.hasNext();) { 216 Map.Entry entry = (Entry) i.next(); 217 sb.append(entry.getKey()).append(" = "); 218 sb.append(((TransactionDescriptor) entry.getValue()).dump()); 219 sb.append("\n"); 220 } 221 return sb.append(" } ").toString(); 222 } 223 224 227 public synchronized void wait4AllTxns2Serialize() { 228 while (txns2Serialize != 0) { 229 try { 230 wait(2000); 231 } catch (InterruptedException e) { 232 throw new AssertionError (e); 233 } 234 } 235 } 236 237 private static final class TransactionDescriptor implements Recyclable { 238 239 final SequenceID sequenceID; 240 final TCByteBuffer[] data; 241 private final Collection references; 243 244 TransactionDescriptor(SequenceID sequenceID, TCByteBuffer[] data, Collection references) { 245 this.sequenceID = sequenceID; 246 this.data = data; 247 this.references = references; 248 } 249 250 public String dump() { 251 return " { " + sequenceID + " , Objects in Txn = " + references.size() + " }"; 252 } 253 254 SequenceID getSequenceID() { 255 return this.sequenceID; 256 } 257 258 TCByteBuffer[] getData() { 259 return data; 260 } 261 262 public void recycle() { 263 for (int i = 0; i < data.length; i++) { 264 data[i].recycle(); 265 } 266 } 267 } 268 269 } 270 | Popular Tags |