1 18 19 package org.apache.activemq.command; 20 21 22 23 24 import org.apache.activemq.ActiveMQConnection; 25 import org.apache.activemq.util.ByteArrayInputStream; 26 import org.apache.activemq.util.ByteArrayOutputStream; 27 import org.apache.activemq.util.ByteSequence; 28 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream; 29 import org.apache.activemq.util.JMSExceptionSupport; 30 31 import javax.jms.JMSException ; 32 import javax.jms.ObjectMessage ; 33 import java.io.DataInputStream ; 34 import java.io.DataOutputStream ; 35 import java.io.IOException ; 36 import java.io.InputStream ; 37 import java.io.ObjectOutputStream ; 38 import java.io.OutputStream ; 39 import java.io.Serializable ; 40 import java.util.zip.DeflaterOutputStream ; 41 import java.util.zip.InflaterInputStream ; 42 43 64 public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage { 65 static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE; 67 68 protected transient Serializable object; 69 70 public Message copy() { 71 ActiveMQObjectMessage copy = new ActiveMQObjectMessage(); 72 copy(copy); 73 return copy; 74 } 75 76 private void copy(ActiveMQObjectMessage copy) { 77 storeContent(); 78 super.copy(copy); 79 copy.object=null; 80 } 81 82 public void storeContent() { 83 ByteSequence bodyAsBytes = getContent(); 84 if (bodyAsBytes == null && object != null) { 85 try { 86 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 87 OutputStream os = bytesOut; 88 ActiveMQConnection connection = getConnection(); 89 if (connection!=null && connection.isUseCompression()) { 90 compressed = true; 91 os = new DeflaterOutputStream (os); 92 } 93 DataOutputStream dataOut = new DataOutputStream (os); 94 ObjectOutputStream objOut = new ObjectOutputStream (dataOut); 95 objOut.writeObject(object); 96 objOut.flush(); 97 objOut.reset(); 98 objOut.close(); 99 setContent(bytesOut.toByteSequence()); 100 } catch (IOException ioe) { 101 throw new RuntimeException (ioe.getMessage(), ioe); 102 } 103 } 104 } 105 106 public byte getDataStructureType() { 107 return DATA_STRUCTURE_TYPE; 108 } 109 110 public String getJMSXMimeType() { 111 return "jms/object-message"; 112 } 113 114 115 123 124 public void clearBody() throws JMSException { 125 super.clearBody(); 126 this.object = null; 127 } 128 129 141 142 public void setObject(Serializable newObject) throws JMSException { 143 checkReadOnlyBody(); 144 this.object = newObject; 145 setContent(null); 146 ActiveMQConnection connection = getConnection(); 147 if( connection==null || !connection.isObjectMessageSerializationDefered() ) { 148 storeContent(); 149 } 150 } 151 152 153 159 public Serializable getObject() throws JMSException { 160 if (object == null && getContent()!=null ) { 161 try { 162 ByteSequence content = getContent(); 163 InputStream is = new ByteArrayInputStream(content); 164 if( isCompressed() ) { 165 is = new InflaterInputStream (is); 166 } 167 DataInputStream dataIn = new DataInputStream (is); 168 ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn); 169 try { 170 object = (Serializable ) objIn.readObject(); 171 } catch (ClassNotFoundException ce) { 172 throw new IOException (ce.getMessage()); 173 } 174 dataIn.close(); 175 } catch (IOException e) { 176 throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e); 177 } 178 } 179 return this.object; 180 } 181 182 public void onMessageRolledBack() { 183 super.onMessageRolledBack(); 184 185 object = null; 187 } 188 189 public String toString() { 190 try { 191 getObject(); 192 } catch (JMSException e) { 193 } 194 return super.toString(); 195 } 196 } 197 | Popular Tags |