1 22 package org.jboss.mq; 23 24 import java.util.Enumeration ; 25 26 import javax.jms.BytesMessage ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.Destination ; 29 import javax.jms.IllegalStateException ; 30 import javax.jms.InvalidDestinationException ; 31 import javax.jms.JMSException ; 32 import javax.jms.MapMessage ; 33 import javax.jms.Message ; 34 import javax.jms.MessageEOFException ; 35 import javax.jms.MessageProducer ; 36 import javax.jms.ObjectMessage ; 37 import javax.jms.StreamMessage ; 38 import javax.jms.TemporaryQueue ; 39 import javax.jms.TemporaryTopic ; 40 import javax.jms.TextMessage ; 41 42 import org.jboss.logging.Logger; 43 44 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 45 46 53 public class SpyMessageProducer implements MessageProducer 54 { 55 56 static Logger log = Logger.getLogger(SpyMessageProducer.class); 57 58 59 static boolean trace = log.isTraceEnabled(); 60 61 62 protected SpySession session; 63 64 protected Destination destination; 65 66 protected int defaultDeliveryMode = SpyMessage.DEFAULT_DELIVERY_MODE; 67 68 protected int defaultPriority = SpyMessage.DEFAULT_PRIORITY; 69 70 protected long defaultTTL = SpyMessage.DEFAULT_TIME_TO_LIVE; 71 72 private SynchronizedBoolean closed = new SynchronizedBoolean(false); 73 74 private boolean disableMessageID = false; 75 76 private boolean disableTS = false; 77 78 84 SpyMessageProducer(SpySession session, Destination destination) 85 { 86 trace = log.isTraceEnabled(); 87 88 this.session = session; 89 this.destination = destination; 90 try 91 { 92 if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic ) 93 setDeliveryMode(DeliveryMode.NON_PERSISTENT); 94 else 95 setDeliveryMode(DeliveryMode.PERSISTENT); 96 } 97 catch (JMSException ignored) 98 { 99 log.debug("Ignored error during setDeliveryMode", ignored); 100 } 101 102 if (trace) 103 log.trace("New message producer " + this); 104 } 105 106 public void setDisableMessageID(boolean value) throws JMSException 107 { 108 checkClosed(); 109 disableMessageID = value; 110 } 111 112 public void setDisableMessageTimestamp(boolean value) throws JMSException 113 { 114 checkClosed(); 115 disableTS = value; 116 } 117 118 public void setDeliveryMode(int deli) throws JMSException 119 { 120 checkClosed(); 121 if (deli != DeliveryMode.NON_PERSISTENT && deli != DeliveryMode.PERSISTENT) 122 throw new JMSException ("Bad DeliveryMode value"); 123 else 124 defaultDeliveryMode = deli; 125 } 126 127 public void setPriority(int pri) throws JMSException 128 { 129 checkClosed(); 130 if (pri < 0 || pri > 9) 131 throw new JMSException ("Bad priority value"); 132 else 133 defaultPriority = pri; 134 } 135 136 public void setTimeToLive(int timeToLive) throws JMSException 137 { 138 checkClosed(); 139 if (timeToLive < 0) 140 throw new JMSException ("Bad TimeToLive value"); 141 else 142 defaultTTL = timeToLive; 143 } 144 145 public void setTimeToLive(long timeToLive) throws JMSException 146 { 147 checkClosed(); 148 if (timeToLive < 0) 149 throw new JMSException ("Bad TimeToLive value"); 150 else 151 defaultTTL = timeToLive; 152 } 153 154 public boolean getDisableMessageID() throws JMSException 155 { 156 checkClosed(); 157 return disableMessageID; 158 } 159 160 public boolean getDisableMessageTimestamp() throws JMSException 161 { 162 checkClosed(); 163 return disableTS; 164 } 165 166 public int getDeliveryMode() throws JMSException 167 { 168 checkClosed(); 169 return defaultDeliveryMode; 170 } 171 172 public int getPriority() throws JMSException 173 { 174 checkClosed(); 175 return defaultPriority; 176 } 177 178 public long getTimeToLive() throws JMSException 179 { 180 checkClosed(); 181 return defaultTTL; 182 } 183 184 public void close() throws JMSException 185 { 186 if (closed.set(true)) 187 return; 188 189 session.removeProducer(this); 190 191 if (trace) 192 log.trace("Closed " + this); 193 } 194 195 public Destination getDestination() throws JMSException 196 { 197 checkClosed(); 198 return destination; 199 } 200 201 public void send(Message message) throws JMSException 202 { 203 if (destination == null) 204 throw new UnsupportedOperationException ( 205 "Not constructed with identifyed destination. Usage of method not allowed"); 206 send(destination, message, defaultDeliveryMode, defaultPriority, defaultTTL); 207 } 208 209 public void send(Destination destination, Message message) throws JMSException 210 { 211 send(destination, message, defaultDeliveryMode, defaultPriority, defaultTTL); 212 } 213 214 public void send(Message message, int deliveryMode, int priority, long ttl) throws JMSException 215 { 216 if (destination == null) 217 throw new UnsupportedOperationException ( 218 "Not constructed with identifyed destination. Usage of method not allowed"); 219 send(destination, message, deliveryMode, priority, ttl); 220 } 221 222 public void send(Destination destination, Message message, int deliveryMode, int priority, long ttl) 223 throws JMSException 224 { 225 checkClosed(); 226 227 if (this.destination != null && this.destination.equals(destination) == false) 228 throw new UnsupportedOperationException ("Sending to " + destination 229 + " not allowed when producer created with " + this.destination); 230 231 if (destination == null || (destination instanceof SpyDestination) == false) 232 throw new InvalidDestinationException ("Destination is not an instance of SpyDestination " + destination); 233 234 SpyMessage sendMessage; 236 if ((message instanceof SpyMessage) == false) 237 sendMessage = encapsulateMessage(message); 238 else 239 sendMessage = (SpyMessage) message; 240 241 sendMessage.setJMSDestination(destination); 243 sendMessage.setJMSDeliveryMode(deliveryMode); 244 long ts = System.currentTimeMillis(); 245 sendMessage.setJMSTimestamp(ts); 246 if (ttl == 0) 247 sendMessage.setJMSExpiration(0); 248 else 249 sendMessage.setJMSExpiration(ttl + ts); 250 sendMessage.setJMSPriority(priority); 251 String id = session.getNewMessageID(); 252 sendMessage.setJMSMessageID(id); 253 254 if (message != sendMessage) 256 { 257 message.setJMSDestination(destination); 258 message.setJMSDeliveryMode(deliveryMode); 259 message.setJMSTimestamp(ts); 260 if (ttl == 0) 261 message.setJMSExpiration(0); 262 else 263 message.setJMSExpiration(ttl + ts); 264 message.setJMSPriority(priority); 265 message.setJMSMessageID(id); 266 } 267 268 if (trace) 269 log.trace("Sending message " + this + " \n" + sendMessage); 270 271 session.sendMessage(sendMessage); 273 } 274 275 public String toString() 276 { 277 StringBuffer buffer = new StringBuffer (100); 278 buffer.append("SpyMessageProducer@").append(System.identityHashCode(this)); 279 buffer.append("[ dest=").append(destination); 280 if (defaultDeliveryMode == DeliveryMode.PERSISTENT) 281 buffer.append(" delivery=").append("persist"); 282 else 283 buffer.append(" delivery=").append("besteffort"); 284 buffer.append(" priority=").append(defaultPriority); 285 buffer.append(" ttl=").append(defaultTTL); 286 buffer.append(" disableMessageID=").append(disableMessageID); 287 buffer.append(" disableTS=").append(disableTS); 288 buffer.append(" session=").append(session); 289 buffer.append(']'); 290 return buffer.toString(); 291 } 292 293 protected SpyMessage encapsulateMessage(Message message) throws JMSException 294 { 295 SpyMessage result; 296 if (message instanceof BytesMessage ) 297 { 298 result = MessagePool.getBytesMessage(); 299 BytesMessage original = (BytesMessage ) message; 300 original.reset(); 301 byte[] temp = new byte[1024]; 302 int bytes = original.readBytes(temp); 303 while (bytes != -1) 304 { 305 ((BytesMessage ) result).writeBytes(temp, 0, bytes); 306 bytes = original.readBytes(temp); 307 } 308 } 309 else if (message instanceof MapMessage ) 310 { 311 result = MessagePool.getMapMessage(); 312 MapMessage original = (MapMessage ) message; 313 for (Enumeration en=original.getMapNames(); en.hasMoreElements();) 314 { 315 String key = (String ) en.nextElement(); 316 try 317 { 318 ((MapMessage ) result).setObject(key, original.getObject(key)); 319 } 320 catch (JMSException ignored) 321 { 322 if (trace) 323 log.trace("Unable to copy map entry " + key, ignored); 324 } 325 } 326 } 327 else if (message instanceof StreamMessage ) 328 { 329 result = MessagePool.getStreamMessage(); 330 StreamMessage original = (StreamMessage ) message; 331 original.reset(); 332 try 333 { 334 while (true) 335 { 336 ((StreamMessage ) result).writeObject(original.readObject()); 337 } 338 } 339 catch (MessageEOFException expected) 340 { 341 } 342 } 343 else if (message instanceof ObjectMessage ) 344 { 345 result = MessagePool.getObjectMessage(); 346 ((ObjectMessage ) result).setObject(((ObjectMessage ) message).getObject()); 347 } 348 else if (message instanceof TextMessage ) 349 { 350 result = MessagePool.getTextMessage(); 351 ((TextMessage ) result).setText(((TextMessage ) message).getText()); 352 } 353 else 354 result = MessagePool.getMessage(); 355 356 try 358 { 359 result.setJMSCorrelationID(message.getJMSCorrelationID()); 360 } 361 catch (JMSException e) 362 { 363 result.setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes()); 365 } 366 result.setJMSReplyTo(message.getJMSReplyTo()); 367 result.setJMSType(message.getJMSType()); 368 369 for (Enumeration en=message.getPropertyNames(); en.hasMoreElements();) 371 { 372 String key = (String ) en.nextElement(); 373 try 374 { 375 result.setObjectProperty(key, message.getObjectProperty(key)); 376 } 377 catch (JMSException ignored) 378 { 379 if (trace) 380 log.trace("Unable to copy property " + key, ignored); 381 } 382 } 383 384 return result; 385 } 386 387 392 protected void checkClosed() throws JMSException 393 { 394 if (closed.get()) 395 throw new IllegalStateException ("Message producer is closed"); 396 } 397 } 398 | Popular Tags |