1 5 package com.tc.objectserver.tx; 6 7 import com.tc.bytes.TCByteBuffer; 8 import com.tc.io.TCByteBufferInputStream; 9 import com.tc.net.protocol.tcm.ChannelID; 10 import com.tc.object.ObjectID; 11 import com.tc.object.dmi.DmiDescriptor; 12 import com.tc.object.dna.impl.DNAImpl; 13 import com.tc.object.dna.impl.ObjectStringSerializer; 14 import com.tc.object.lockmanager.api.LockID; 15 import com.tc.object.lockmanager.api.Notify; 16 import com.tc.object.tx.ServerTransactionID; 17 import com.tc.object.tx.TransactionID; 18 import com.tc.object.tx.TxnBatchID; 19 import com.tc.object.tx.TxnType; 20 import com.tc.util.Assert; 21 import com.tc.util.SequenceID; 22 23 import java.io.IOException ; 24 import java.util.ArrayList ; 25 import java.util.Collection ; 26 import java.util.HashMap ; 27 import java.util.Iterator ; 28 import java.util.LinkedList ; 29 import java.util.List ; 30 import java.util.Map ; 31 32 public class TransactionBatchReaderImpl implements TransactionBatchReader { 33 34 private final TCByteBufferInputStream in; 35 private final TxnBatchID batchID; 36 private final ChannelID source; 37 private int numTxns; 38 private final Collection acknowledgedTransactionIDs; 39 private ObjectStringSerializer serializer; 40 private final boolean passive; 41 42 public TransactionBatchReaderImpl(TCByteBuffer[] data, ChannelID source, Collection acknowledgedTransactionIDs, 43 ObjectStringSerializer serializer, boolean passive) throws IOException { 44 this.passive = passive; 45 this.in = new TCByteBufferInputStream(data); 46 this.source = source; 47 this.batchID = new TxnBatchID(in.readLong()); 48 this.numTxns = in.readInt(); 49 Assert.assertNotNull(acknowledgedTransactionIDs); 50 this.acknowledgedTransactionIDs = acknowledgedTransactionIDs; 51 this.serializer = serializer; 52 } 53 54 public ChannelID getChannelID() { 55 return this.source; 56 } 57 58 public ServerTransaction getNextTransaction() throws IOException { 59 if (numTxns == 0) { 60 int bytesRemaining = in.available(); 61 if (bytesRemaining != 0) { throw new IOException (bytesRemaining + " bytes remaining (expecting 0)"); } 62 return null; 63 } 64 65 TransactionID txnID = new TransactionID(in.readLong()); 67 TxnType txnType = TxnType.typeFor(in.readByte()); 68 69 SequenceID sequenceID = new SequenceID(in.readLong()); 70 71 final int numLocks = in.readInt(); 72 LockID[] locks = new LockID[numLocks]; 73 for (int i = 0; i < numLocks; i++) { 74 locks[i] = new LockID(in.readString()); 76 } 77 78 Map newRoots = new HashMap (); 79 final int numNewRoots = in.readInt(); 80 for (int i = 0; i < numNewRoots; i++) { 81 String name = in.readString(); 82 83 ObjectID id = new ObjectID(in.readLong()); 85 newRoots.put(name, id); 86 } 87 88 List notifies = new LinkedList (); 89 final int numNotifies = in.readInt(); 90 for (int i = 0; i < numNotifies; i++) { 91 Notify n = new Notify(); 92 n.deserializeFrom(in); 93 notifies.add(n); 94 } 95 96 final int dmiCount = in.readInt(); 97 final DmiDescriptor[] dmis = new DmiDescriptor[dmiCount]; 98 for (int i = 0; i < dmiCount; i++) { 99 DmiDescriptor dd = new DmiDescriptor(); 100 dd.deserializeFrom(in); 101 dmis[i] = dd; 102 } 103 104 List dnas = new ArrayList (); 105 final int numDNA = in.readInt(); 106 for (int i = 0; i < numDNA; i++) { 107 DNAImpl dna = new DNAImpl(serializer, true); 108 dna.deserializeFrom(in); 109 dnas.add(dna); 110 } 111 112 numTxns--; 113 return new ServerTransactionImpl(getBatchID(), txnID, sequenceID, locks, source, dnas, serializer, newRoots, 114 txnType, notifies, dmis, passive); 115 } 116 117 public TxnBatchID getBatchID() { 118 return this.batchID; 119 } 120 121 public int getNumTxns() { 122 return this.numTxns; 123 } 124 125 public Collection addAcknowledgedTransactionIDsTo(Collection c) { 126 for (Iterator iter = acknowledgedTransactionIDs.iterator(); iter.hasNext();) { 127 c.add(new ServerTransactionID(source, (TransactionID) iter.next())); 128 } 129 return c; 130 } 131 } 132 | Popular Tags |