1 18 package org.apache.activemq.transport; 19 20 import java.io.ByteArrayOutputStream ; 21 import java.io.DataInputStream ; 22 import java.io.IOException ; 23 24 import org.apache.activemq.command.Command; 25 import org.apache.activemq.command.LastPartialCommand; 26 import org.apache.activemq.command.PartialCommand; 27 import org.apache.activemq.openwire.OpenWireFormat; 28 import org.apache.activemq.util.ByteArrayInputStream; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 38 public class CommandJoiner extends TransportFilter { 39 private static final Log log = LogFactory.getLog(CommandJoiner.class); 40 41 private ByteArrayOutputStream out = new ByteArrayOutputStream (); 42 private OpenWireFormat wireFormat; 43 44 public CommandJoiner(Transport next, OpenWireFormat wireFormat) { 45 super(next); 46 this.wireFormat = wireFormat; 47 } 48 49 public void onCommand(Object o) { 50 Command command = (Command) o; 51 byte type = command.getDataStructureType(); 52 if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) { 53 PartialCommand header = (PartialCommand) command; 54 byte[] partialData = header.getData(); 55 try { 56 out.write(partialData); 57 } 58 catch (IOException e) { 59 getTransportListener().onException(e); 60 } 61 if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) { 62 try { 63 byte[] fullData = out.toByteArray(); 64 out.reset(); 65 DataInputStream dataIn = new DataInputStream (new ByteArrayInputStream(fullData)); 66 Command completeCommand = (Command) wireFormat.unmarshal(dataIn); 67 68 LastPartialCommand lastCommand = (LastPartialCommand) command; 69 lastCommand.configure(completeCommand); 70 71 getTransportListener().onCommand(completeCommand); 72 } 73 catch (IOException e) { 74 log.warn("Failed to unmarshal partial command: " + command); 75 getTransportListener().onException(e); 76 } 77 } 78 } 79 else { 80 getTransportListener().onCommand(command); 81 } 82 } 83 84 public void stop() throws Exception { 85 super.stop(); 86 out = null; 87 } 88 89 public String toString() { 90 return next.toString(); 91 } 92 } 93 | Popular Tags |