1 10 11 package org.mule.providers.jms; 12 13 import java.io.IOException ; 14 import java.io.InputStream ; 15 import java.io.ObjectOutputStream ; 16 import java.io.Serializable ; 17 import java.util.Enumeration ; 18 import java.util.Hashtable ; 19 import java.util.Iterator ; 20 import java.util.Map ; 21 import java.util.Vector ; 22 23 import javax.jms.BytesMessage ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.MapMessage ; 27 import javax.jms.Message ; 28 import javax.jms.MessageEOFException ; 29 import javax.jms.ObjectMessage ; 30 import javax.jms.Queue ; 31 import javax.jms.Session ; 32 import javax.jms.StreamMessage ; 33 import javax.jms.TextMessage ; 34 import javax.jms.Topic ; 35 36 import org.apache.commons.io.output.ByteArrayOutputStream; 37 import org.mule.util.ArrayUtils; 38 39 43 public class JmsMessageUtils 44 { 45 46 public static Message toMessage(Object object, Session session) throws JMSException 47 { 48 if (object instanceof Message ) 49 { 50 return (Message )object; 51 } 52 else if (object instanceof String ) 53 { 54 return session.createTextMessage((String )object); 55 } 56 else if (object instanceof Map ) 57 { 58 MapMessage mMsg = session.createMapMessage(); 59 Map src = (Map )object; 60 61 for (Iterator i = src.entrySet().iterator(); i.hasNext();) 62 { 63 Map.Entry entry = (Map.Entry )i.next(); 64 mMsg.setObject(entry.getKey().toString(), entry.getValue()); 65 } 66 67 return mMsg; 68 } 69 else if (object instanceof InputStream ) 70 { 71 StreamMessage sMsg = session.createStreamMessage(); 72 InputStream temp = (InputStream )object; 73 74 byte[] buffer = new byte[4096]; 75 int len; 76 77 try 78 { 79 while ((len = temp.read(buffer)) != -1) 80 { 81 sMsg.writeBytes(buffer, 0, len); 82 } 83 } 84 catch (IOException e) 85 { 86 throw new JMSException ("Failed to read input stream to create a stream message: " + e); 87 } 88 89 return sMsg; 90 } 91 else if (object instanceof byte[]) 92 { 93 BytesMessage bMsg = session.createBytesMessage(); 94 bMsg.writeBytes((byte[])object); 95 return bMsg; 96 } 97 else if (object instanceof Serializable ) 98 { 99 ObjectMessage oMsg = session.createObjectMessage(); 100 oMsg.setObject((Serializable )object); 101 return oMsg; 102 } 103 else 104 { 105 throw new JMSException ( 106 "Source was not a supported type, data must be Serializable, String, byte[], Map or InputStream"); 107 } 108 } 109 110 public static Object toObject(Message source, String jmsSpec) throws JMSException , IOException 111 { 112 if (source instanceof ObjectMessage ) 113 { 114 return ((ObjectMessage )source).getObject(); 115 } 116 else if (source instanceof MapMessage ) 117 { 118 Hashtable map = new Hashtable (); 119 MapMessage m = (MapMessage )source; 120 121 for (Enumeration e = m.getMapNames(); e.hasMoreElements();) 122 { 123 String name = (String )e.nextElement(); 124 Object obj = m.getObject(name); 125 map.put(name, obj); 126 } 127 128 return map; 129 } 130 else if (source instanceof TextMessage ) 131 { 132 return ((TextMessage )source).getText(); 133 } 134 else if (source instanceof BytesMessage ) 135 { 136 return toByteArray(source, jmsSpec); 137 } 138 else if (source instanceof StreamMessage ) 139 { 140 try 141 { 142 StreamMessage sMsg = (StreamMessage )source; 143 Vector result = new Vector (); 144 Object obj; 145 while ((obj = sMsg.readObject()) != null) 146 { 147 result.addElement(obj); 148 } 149 return result; 150 } 151 catch (MessageEOFException eof) 152 { 153 } 155 catch (Exception e) 156 { 157 throw new JMSException ("Failed to extract information from JMS Stream Message: " + e); 158 } 159 } 160 161 return source; 163 } 164 165 179 public static byte[] toByteArray(Message message, String jmsSpec) throws JMSException , IOException 180 { 181 if (message instanceof BytesMessage ) 182 { 183 BytesMessage bMsg = (BytesMessage )message; 184 bMsg.reset(); 185 186 if (JmsConstants.JMS_SPECIFICATION_11.equals(jmsSpec)) 187 { 188 long bmBodyLength = bMsg.getBodyLength(); 189 if (bmBodyLength > Integer.MAX_VALUE) 190 { 191 throw new JMSException ("Size of BytesMessage exceeds Integer.MAX_VALUE; " 192 + "please consider using JMS StreamMessage instead"); 193 } 194 195 if (bmBodyLength > 0) 196 { 197 byte[] bytes = new byte[(int)bmBodyLength]; 198 bMsg.readBytes(bytes); 199 return bytes; 200 } 201 else 202 { 203 return ArrayUtils.EMPTY_BYTE_ARRAY; 204 } 205 } 206 else 207 { 208 ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); 209 byte[] buffer = new byte[4096]; 210 int len; 211 212 while ((len = bMsg.readBytes(buffer)) != -1) 213 { 214 baos.write(buffer, 0, len); 215 } 216 217 if (baos.size() > 0) 218 { 219 return baos.toByteArray(); 220 } 221 else 222 { 223 return ArrayUtils.EMPTY_BYTE_ARRAY; 224 } 225 } 226 } 227 else if (message instanceof StreamMessage ) 228 { 229 StreamMessage sMsg = (StreamMessage )message; 230 sMsg.reset(); 231 232 ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); 233 byte[] buffer = new byte[4096]; 234 int len; 235 236 while ((len = sMsg.readBytes(buffer)) != -1) 237 { 238 baos.write(buffer, 0, len); 239 } 240 241 return baos.toByteArray(); 242 } 243 else if (message instanceof ObjectMessage ) 244 { 245 ObjectMessage oMsg = (ObjectMessage )message; 246 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 247 ObjectOutputStream os = new ObjectOutputStream (baos); 248 os.writeObject(oMsg.getObject()); 249 os.flush(); 250 os.close(); 251 return baos.toByteArray(); 252 } 253 else if (message instanceof TextMessage ) 254 { 255 TextMessage tMsg = (TextMessage )message; 256 String tMsgText = tMsg.getText(); 257 258 if (null == tMsgText) 259 { 260 return ArrayUtils.EMPTY_BYTE_ARRAY; 263 } 264 else 265 { 266 return tMsgText.getBytes(); 267 } 268 } 269 else 270 { 271 throw new JMSException ("Cannot get bytes from Map Message"); 272 } 273 } 274 275 public static String getNameForDestination(Destination dest) throws JMSException 276 { 277 if (dest instanceof Queue ) 278 { 279 return ((Queue )dest).getQueueName(); 280 } 281 else if (dest instanceof Topic ) 282 { 283 return ((Topic )dest).getTopicName(); 284 } 285 else 286 { 287 return null; 288 } 289 } 290 291 public static Message copyJMSProperties(Message from, Message to, JmsConnector connector) 292 throws JMSException 293 { 294 if (connector.supportsProperty(JmsConstants.JMS_CORRELATION_ID)) 295 { 296 to.setJMSCorrelationID(from.getJMSCorrelationID()); 297 } 298 if (connector.supportsProperty(JmsConstants.JMS_DELIVERY_MODE)) 299 { 300 to.setJMSDeliveryMode(from.getJMSDeliveryMode()); 301 } 302 if (connector.supportsProperty(JmsConstants.JMS_DESTINATION)) 303 { 304 to.setJMSDestination(from.getJMSDestination()); 305 } 306 if (connector.supportsProperty(JmsConstants.JMS_EXPIRATION)) 307 { 308 to.setJMSExpiration(from.getJMSExpiration()); 309 } 310 if (connector.supportsProperty(JmsConstants.JMS_MESSAGE_ID)) 311 { 312 to.setJMSMessageID(from.getJMSMessageID()); 313 } 314 if (connector.supportsProperty(JmsConstants.JMS_PRIORITY)) 315 { 316 to.setJMSPriority(from.getJMSPriority()); 317 } 318 if (connector.supportsProperty(JmsConstants.JMS_REDELIVERED)) 319 { 320 to.setJMSRedelivered(from.getJMSRedelivered()); 321 } 322 if (connector.supportsProperty(JmsConstants.JMS_REPLY_TO)) 323 { 324 to.setJMSReplyTo(from.getJMSReplyTo()); 325 } 326 if (connector.supportsProperty(JmsConstants.JMS_TIMESTAMP)) 327 { 328 to.setJMSTimestamp(from.getJMSTimestamp()); 329 } 330 if (connector.supportsProperty(JmsConstants.JMS_TYPE)) 331 { 332 to.setJMSType(from.getJMSType()); 333 } 334 return to; 335 } 336 } 337 | Popular Tags |