1 18 package org.apache.activemq.store.kahadaptor; 19 20 import java.io.DataInput ; 21 import java.io.DataOutput ; 22 import java.io.IOException ; 23 import java.util.ArrayList ; 24 import java.util.List ; 25 26 import org.apache.activemq.command.BaseCommand; 27 import org.apache.activemq.kaha.Marshaller; 28 import org.apache.activemq.util.ByteSequence; 29 import org.apache.activemq.wireformat.WireFormat; 30 31 35 public class TransactionMarshaller implements Marshaller{ 36 37 private WireFormat wireFormat; 38 public TransactionMarshaller(WireFormat wireFormat){ 39 this.wireFormat = wireFormat; 40 41 } 42 43 public void writePayload(Object object,DataOutput dataOut) throws IOException { 44 KahaTransaction kt = (KahaTransaction) object; 45 List list = kt.getList(); 46 dataOut.writeInt(list.size()); 47 for (int i = 0; i < list.size(); i++){ 48 TxCommand tx = (TxCommand) list.get(i); 49 Object key = tx.getMessageStoreKey(); 50 ByteSequence packet = wireFormat.marshal(key); 51 dataOut.writeInt(packet.length); 52 dataOut.write(packet.data, packet.offset, packet.length); 53 Object command = tx.getCommand(); 54 packet = wireFormat.marshal(command); 55 dataOut.writeInt(packet.length); 56 dataOut.write(packet.data, packet.offset, packet.length); 57 58 } 59 } 60 61 62 public Object readPayload(DataInput dataIn) throws IOException { 63 KahaTransaction result = new KahaTransaction(); 64 List list = new ArrayList (); 65 result.setList(list); 66 int number=dataIn.readInt(); 67 for (int i = 0; i < number; i++){ 68 TxCommand command = new TxCommand(); 69 int size = dataIn.readInt(); 70 byte[] data=new byte[size]; 71 dataIn.readFully(data); 72 Object key = wireFormat.unmarshal(new ByteSequence(data)); 73 command.setMessageStoreKey(key); 74 size = dataIn.readInt(); 75 data=new byte[size]; 76 dataIn.readFully(data); 77 BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteSequence(data)); 78 command.setCommand(bc); 79 list.add(command); 80 } 81 return result; 82 83 } 84 } 85 | Popular Tags |