1 24 package com.scalagent.kjoram; 25 26 import com.scalagent.kjoram.jms.*; 27 import com.scalagent.kjoram.excepts.IllegalStateException; 28 import com.scalagent.kjoram.excepts.*; 29 30 public class MessageProducer 31 { 32 33 private int deliveryMode = DeliveryMode.PERSISTENT; 34 35 private int priority = 4; 36 37 private long timeToLive = 0; 38 43 private boolean messageIDDisabled = false; 44 45 private boolean timestampDisabled = false; 46 47 private boolean identified = true; 48 49 50 protected boolean closed = false; 51 52 protected Session sess; 53 54 protected Destination dest = null; 55 56 57 66 MessageProducer(Session sess, Destination dest) throws JMSException 67 { 68 this.sess = sess; 69 this.dest = dest; 70 71 if (dest == null) 72 identified = false; 73 74 sess.producers.addElement(this); 75 76 if (JoramTracing.dbgClient) 77 JoramTracing.log(JoramTracing.DEBUG, this + ": created."); 78 } 79 80 85 public void setDisableMessageID(boolean value) throws JMSException 86 { 87 if (closed) 88 throw new IllegalStateException ("Forbidden call on a closed producer."); 89 } 90 91 97 public void setDeliveryMode(int deliveryMode) throws JMSException 98 { 99 if (closed) 100 throw new IllegalStateException ("Forbidden call on a closed producer."); 101 102 if (deliveryMode != DeliveryMode.PERSISTENT 103 && deliveryMode != DeliveryMode.NON_PERSISTENT) 104 throw new JMSException("Can't set invalid delivery mode."); 105 106 this.deliveryMode = deliveryMode; 107 } 108 109 115 public void setPriority(int priority) throws JMSException 116 { 117 if (closed) 118 throw new IllegalStateException ("Forbidden call on a closed producer."); 119 120 if (priority < 0 || priority > 9) 121 throw new JMSException("Can't set invalid priority value."); 122 123 this.priority = priority; 124 } 125 126 131 public void setTimeToLive(long timeToLive) throws JMSException 132 { 133 if (closed) 134 throw new IllegalStateException ("Forbidden call on a closed producer."); 135 136 this.timeToLive = timeToLive; 137 } 138 139 144 public void setDisableMessageTimestamp(boolean value) throws JMSException 145 { 146 if (closed) 147 throw new IllegalStateException ("Forbidden call on a closed producer."); 148 149 this.timestampDisabled = value; 150 } 151 152 157 public Destination getDestination() throws JMSException 158 { 159 if (closed) 160 throw new IllegalStateException ("Forbidden call on a closed producer."); 161 162 return dest; 163 } 164 165 170 public boolean getDisableMessageID() throws JMSException 171 { 172 if (closed) 173 throw new IllegalStateException ("Forbidden call on a closed producer."); 174 175 return messageIDDisabled; 176 } 177 178 183 public int getDeliveryMode() throws JMSException 184 { 185 if (closed) 186 throw new IllegalStateException ("Forbidden call on a closed producer."); 187 188 return deliveryMode; 189 } 190 191 196 public int getPriority() throws JMSException 197 { 198 if (closed) 199 throw new IllegalStateException ("Forbidden call on a closed producer."); 200 201 return priority; 202 } 203 204 209 public long getTimeToLive() throws JMSException 210 { 211 if (closed) 212 throw new IllegalStateException ("Forbidden call on a closed producer."); 213 214 return timeToLive; 215 } 216 217 222 public boolean getDisableMessageTimestamp() throws JMSException 223 { 224 if (closed) 225 throw new IllegalStateException ("Forbidden call on a closed producer."); 226 227 return timestampDisabled; 228 } 229 230 231 239 public void send(Message message) throws JMSException 240 { 241 if (! identified) 242 throw new RuntimeException ("Can't send message to" 243 + " an unidentified" 244 + " destination."); 245 doSend(dest, message, deliveryMode, priority, timeToLive); 247 } 248 249 257 public void send(Message message, int deliveryMode, 258 int priority, long timeToLive) throws JMSException 259 { 260 if (! identified) 261 throw new RuntimeException ("Can't send message to" 262 + " an unidentified" 263 + " destination."); 264 doSend(dest, message, deliveryMode, priority, timeToLive); 266 } 267 268 280 public void send(Destination dest, 281 Message message) throws JMSException 282 { 283 if (identified) 284 throw new RuntimeException ("An unidentified message" 285 + " producer can't use this" 286 + " identified message" 287 + " producer."); 288 if (dest == null) 289 throw new RuntimeException ("Can't send message to" 290 + " an unidentified" 291 + " destination."); 292 293 doSend((Destination) dest, message, deliveryMode, priority, timeToLive); 294 } 295 296 308 public void send(Destination dest, Message message, 309 int deliveryMode, int priority, 310 long timeToLive) throws JMSException 311 { 312 if (identified) 313 throw new RuntimeException ("An unidentified message" 314 + " producer can't use this" 315 + " identified message" 316 + " producer."); 317 if (dest == null) 318 throw new RuntimeException ("Can't send message to" 319 + " an unidentified" 320 + " destination."); 321 322 doSend((Destination) dest, message, deliveryMode, priority, timeToLive); 323 } 324 325 330 public void close() throws JMSException 331 { 332 if (closed) 334 return; 335 336 if (JoramTracing.dbgClient) 337 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 338 + ": closing..."); 339 340 sess.producers.removeElement(this); 341 closed = true; 342 343 if (JoramTracing.dbgClient) 344 JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); 345 346 } 347 348 357 private void doSend(Destination dest, Message message, 358 int deliveryMode, int priority, 359 long timeToLive) throws JMSException 360 { 361 if (closed) 362 throw new IllegalStateException ("Forbidden call on a closed producer."); 363 364 if (JoramTracing.dbgClient) 365 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 366 + ": producing..."); 367 368 String msgID = sess.cnx.nextMessageId(); 370 message.setJMSMessageID(msgID); 371 message.setJMSDeliveryMode(deliveryMode); 372 message.setJMSDestination(dest); 373 if (timeToLive == 0) 374 message.setJMSExpiration(0); 375 else 376 message.setJMSExpiration(System.currentTimeMillis() + timeToLive); 377 message.setJMSPriority(priority); 378 if (! timestampDisabled) 379 message.setJMSTimestamp(System.currentTimeMillis()); 380 381 com.scalagent.kjoram.messages.Message momMsg = null; 382 if (message instanceof Message) 385 momMsg = ((Message) message).getMomMessage(); 386 387 else if (message instanceof Message) { 390 try { 391 Message joramMessage = Message.convertJMSMessage(message); 392 momMsg = joramMessage.getMomMessage(); 393 } 394 catch (JMSException jE) { 395 MessageFormatException mE = new MessageFormatException("Message to" 396 + " send is" 397 + " invalid."); 398 mE.setLinkedException(jE); 399 throw mE; 400 } 401 } 402 else { 403 MessageFormatException mE = new MessageFormatException("Message to" 404 + " send is" 405 + " invalid."); 406 throw mE; 407 } 408 409 if (sess.transacted) { 411 if (JoramTracing.dbgClient) 412 JoramTracing.log(JoramTracing.DEBUG, "Buffering the message."); 413 414 sess.prepareSend(dest, 415 (com.scalagent.kjoram.messages.Message) momMsg.clone()); 416 } 417 else { 419 ProducerMessages pM = new ProducerMessages(dest.getName(), momMsg); 420 421 if (JoramTracing.dbgClient) 422 JoramTracing.log(JoramTracing.DEBUG, "Sending " + momMsg); 423 424 sess.cnx.syncRequest(pM); 425 } 426 } 427 } 428 | Popular Tags |