1 18 package org.apache.activemq.command; 19 20 import org.apache.activemq.ActiveMQConnection; 21 import org.apache.activemq.util.ByteArrayInputStream; 22 import org.apache.activemq.util.ByteArrayOutputStream; 23 import org.apache.activemq.util.ByteSequence; 24 import org.apache.activemq.util.JMSExceptionSupport; 25 import org.apache.activemq.util.MarshallingSupport; 26 import org.apache.activemq.wireformat.WireFormat; 27 28 import javax.jms.JMSException ; 29 import javax.jms.MessageNotWriteableException ; 30 import javax.jms.TextMessage ; 31 import java.io.DataInputStream ; 32 import java.io.DataOutputStream ; 33 import java.io.IOException ; 34 import java.io.InputStream ; 35 import java.io.OutputStream ; 36 import java.util.zip.DeflaterOutputStream ; 37 import java.util.zip.InflaterInputStream ; 38 39 44 public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage { 45 46 public static final byte DATA_STRUCTURE_TYPE=CommandTypes.ACTIVEMQ_TEXT_MESSAGE; 47 48 protected String text; 49 50 public Message copy() { 51 ActiveMQTextMessage copy = new ActiveMQTextMessage(); 52 copy(copy); 53 return copy; 54 } 55 56 private void copy(ActiveMQTextMessage copy) { 57 super.copy(copy); 58 copy.text = text; 59 } 60 61 public byte getDataStructureType() { 62 return DATA_STRUCTURE_TYPE; 63 } 64 65 public String getJMSXMimeType() { 66 return "jms/text-message"; 67 } 68 69 public void setText(String text) throws MessageNotWriteableException { 70 checkReadOnlyBody(); 71 this.text = text; 72 setContent(null); 73 } 74 75 public String getText() throws JMSException { 76 if (text == null && getContent() != null) { 77 try { 78 ByteSequence bodyAsBytes = getContent(); 79 if (bodyAsBytes != null) { 80 InputStream is = new ByteArrayInputStream(bodyAsBytes); 81 if( isCompressed() ) { 82 is = new InflaterInputStream (is); 83 } 84 DataInputStream dataIn = new DataInputStream (is); 85 text = MarshallingSupport.readUTF8(dataIn); 86 dataIn.close(); 87 setContent(null); 88 } 89 } catch (IOException ioe) { 90 throw JMSExceptionSupport.create(ioe); 91 } 92 } 93 return text; 94 } 95 96 public void beforeMarshall(WireFormat wireFormat) throws IOException { 97 super.beforeMarshall(wireFormat); 98 99 ByteSequence content = getContent(); 100 if (content == null && text!=null ) { 101 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 102 OutputStream os = bytesOut; 103 ActiveMQConnection connection = getConnection(); 104 if( connection!=null && connection.isUseCompression() ) { 105 compressed = true; 106 os = new DeflaterOutputStream (os); 107 } 108 DataOutputStream dataOut = new DataOutputStream (os); 109 MarshallingSupport.writeUTF8(dataOut, text); 110 dataOut.close(); 111 setContent(bytesOut.toByteSequence()); 112 } 113 } 114 115 123 public void clearBody() throws JMSException { 124 super.clearBody(); 125 this.text = null; 126 } 127 128 public int getSize() { 129 if( size == 0 && content==null && text!=null ) { 130 size = AVERAGE_MESSAGE_SIZE_OVERHEAD; 131 if( marshalledProperties!=null ) 132 size += marshalledProperties.getLength(); 133 size = text.length()*2; 134 } 135 return super.getSize(); 136 } 137 } 138 | Popular Tags |