1 18 package org.apache.activemq; 19 20 import java.util.Enumeration ; 21 22 import javax.jms.BytesMessage ; 23 import javax.jms.Destination ; 24 import javax.jms.MessageEOFException ; 25 import javax.jms.JMSException ; 26 import javax.jms.MapMessage ; 27 import javax.jms.Message ; 28 import javax.jms.ObjectMessage ; 29 import javax.jms.Queue ; 30 import javax.jms.StreamMessage ; 31 import javax.jms.TemporaryQueue ; 32 import javax.jms.TemporaryTopic ; 33 import javax.jms.TextMessage ; 34 import javax.jms.Topic ; 35 36 37 import org.apache.activemq.command.ActiveMQBytesMessage; 38 import org.apache.activemq.command.ActiveMQDestination; 39 import org.apache.activemq.command.ActiveMQMapMessage; 40 import org.apache.activemq.command.ActiveMQMessage; 41 import org.apache.activemq.command.ActiveMQObjectMessage; 42 import org.apache.activemq.command.ActiveMQQueue; 43 import org.apache.activemq.command.ActiveMQStreamMessage; 44 import org.apache.activemq.command.ActiveMQTempQueue; 45 import org.apache.activemq.command.ActiveMQTempTopic; 46 import org.apache.activemq.command.ActiveMQTextMessage; 47 import org.apache.activemq.command.ActiveMQTopic; 48 49 54 public class ActiveMQMessageTransformation { 55 56 63 public static ActiveMQDestination transformDestination(Destination destination) throws JMSException { 64 ActiveMQDestination activeMQDestination = null; 65 66 if (destination != null) { 67 if (destination instanceof ActiveMQDestination) { 68 return (ActiveMQDestination) destination; 69 70 } 71 else { 72 if (destination instanceof TemporaryQueue ) { 73 activeMQDestination = new ActiveMQTempQueue(((Queue ) destination).getQueueName()); 74 } 75 else if (destination instanceof TemporaryTopic ) { 76 activeMQDestination = new ActiveMQTempTopic(((Topic ) destination).getTopicName()); 77 } 78 else if (destination instanceof Queue ) { 79 activeMQDestination = new ActiveMQQueue(((Queue ) destination).getQueueName()); 80 } 81 else if (destination instanceof Topic ) { 82 activeMQDestination = new ActiveMQTopic(((Topic ) destination).getTopicName()); 83 } 84 } 85 } 86 87 return activeMQDestination; 88 } 89 90 91 100 public static final ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection) throws JMSException { 101 if (message instanceof ActiveMQMessage) { 102 return (ActiveMQMessage) message; 103 104 } else { 105 ActiveMQMessage activeMessage = null; 106 107 if (message instanceof BytesMessage ) { 108 BytesMessage bytesMsg = (BytesMessage ) message; 109 bytesMsg.reset(); 110 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 111 msg.setConnection(connection); 112 try { 113 for (;;) { 114 msg.writeByte(bytesMsg.readByte()); 117 } 118 } catch (MessageEOFException e) { 119 } catch (JMSException e) { 121 } 122 123 activeMessage = msg; 124 } else if (message instanceof MapMessage ) { 125 MapMessage mapMsg = (MapMessage ) message; 126 ActiveMQMapMessage msg = new ActiveMQMapMessage(); 127 msg.setConnection(connection); 128 Enumeration iter = mapMsg.getMapNames(); 129 130 while (iter.hasMoreElements()) { 131 String name = iter.nextElement().toString(); 132 msg.setObject(name, mapMsg.getObject(name)); 133 } 134 135 activeMessage = msg; 136 } else if (message instanceof ObjectMessage ) { 137 ObjectMessage objMsg = (ObjectMessage ) message; 138 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 139 msg.setConnection(connection); 140 msg.setObject(objMsg.getObject()); 141 msg.storeContent(); 142 activeMessage = msg; 143 } else if (message instanceof StreamMessage ) { 144 StreamMessage streamMessage = (StreamMessage ) message; 145 streamMessage.reset(); 146 ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); 147 msg.setConnection(connection); 148 Object obj = null; 149 150 try { 151 while ((obj = streamMessage.readObject()) != null) { 152 msg.writeObject(obj); 153 } 154 } catch (MessageEOFException e) { 155 } catch (JMSException e) { 157 } 158 159 activeMessage = msg; 160 } else if (message instanceof TextMessage ) { 161 TextMessage textMsg = (TextMessage ) message; 162 ActiveMQTextMessage msg = new ActiveMQTextMessage(); 163 msg.setConnection(connection); 164 msg.setText(textMsg.getText()); 165 activeMessage = msg; 166 } else { 167 activeMessage = new ActiveMQMessage(); 168 activeMessage.setConnection(connection); 169 } 170 171 copyProperties(message, activeMessage); 172 173 return activeMessage; 174 } 175 } 176 177 184 public static void copyProperties(Message fromMessage, Message toMesage) throws JMSException { 185 toMesage.setJMSMessageID(fromMessage.getJMSMessageID()); 186 toMesage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); 187 toMesage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); 188 toMesage.setJMSDestination(transformDestination(fromMessage.getJMSDestination())); 189 toMesage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); 190 toMesage.setJMSRedelivered(fromMessage.getJMSRedelivered()); 191 toMesage.setJMSType(fromMessage.getJMSType()); 192 toMesage.setJMSExpiration(fromMessage.getJMSExpiration()); 193 toMesage.setJMSPriority(fromMessage.getJMSPriority()); 194 toMesage.setJMSTimestamp(fromMessage.getJMSTimestamp()); 195 196 Enumeration propertyNames = fromMessage.getPropertyNames(); 197 198 while (propertyNames.hasMoreElements()) { 199 String name = propertyNames.nextElement().toString(); 200 Object obj = fromMessage.getObjectProperty(name); 201 toMesage.setObjectProperty(name, obj); 202 } 203 } 204 } 205 | Popular Tags |