1 18 package org.apache.activemq.kaha; 19 20 import java.io.DataInput ; 21 import java.io.DataOutput ; 22 import java.io.IOException ; 23 import org.apache.activemq.command.Message; 24 import org.apache.activemq.util.ByteSequence; 25 import org.apache.activemq.wireformat.WireFormat; 26 31 public class MessageMarshaller implements Marshaller<Message> { 32 33 private WireFormat wireFormat; 34 38 public MessageMarshaller(WireFormat wireFormat) { 39 this.wireFormat=wireFormat; 40 } 41 48 public void writePayload(Message message,DataOutput dataOut) throws IOException { 49 ByteSequence packet = wireFormat.marshal(message); 50 dataOut.writeInt(packet.length); 51 dataOut.write(packet.data, packet.offset, packet.length); 52 } 53 54 61 public Message readPayload(DataInput dataIn) throws IOException { 62 int size=dataIn.readInt(); 63 byte[] data=new byte[size]; 64 dataIn.readFully(data); 65 return (Message)wireFormat.unmarshal(new ByteSequence(data)); 66 } 67 } 68 | Popular Tags |