1 18 package org.apache.activemq; 19 20 import java.util.HashMap ; 21 import java.util.concurrent.atomic.AtomicLong ; 22 23 import javax.jms.DeliveryMode ; 24 import javax.jms.Destination ; 25 import javax.jms.IllegalStateException ; 26 import javax.jms.InvalidDestinationException ; 27 import javax.jms.JMSException ; 28 import javax.jms.Message ; 29 import javax.jms.MessageFormatException ; 30 import javax.jms.MessageProducer ; 31 32 import org.apache.activemq.command.ActiveMQDestination; 33 import org.apache.activemq.command.ProducerAck; 34 import org.apache.activemq.command.ProducerId; 35 import org.apache.activemq.command.ProducerInfo; 36 import org.apache.activemq.management.JMSProducerStatsImpl; 37 import org.apache.activemq.management.StatsCapable; 38 import org.apache.activemq.management.StatsImpl; 39 import org.apache.activemq.memory.UsageManager; 40 import org.apache.activemq.util.IntrospectionSupport; 41 42 75 public class ActiveMQMessageProducer implements MessageProducer , StatsCapable, Closeable, Disposable { 76 77 protected ActiveMQSession session; 78 protected ProducerInfo info; 79 private JMSProducerStatsImpl stats; 80 private AtomicLong messageSequence; 81 82 protected boolean closed; 83 private boolean disableMessageID; 84 private boolean disableMessageTimestamp; 85 private int defaultDeliveryMode; 86 private int defaultPriority; 87 private long defaultTimeToLive; 88 private long startTime; 89 private MessageTransformer transformer; 90 private UsageManager producerWindow; 91 92 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) 93 throws JMSException { 94 this.session = session; 95 this.info = new ProducerInfo(producerId); 96 this.info.setWindowSize(session.connection.getProducerWindowSize()); 97 if (destination!=null && destination.getOptions() != null) { 98 HashMap options = new HashMap (destination.getOptions()); 99 IntrospectionSupport.setProperties(this.info, options, "producer."); 100 } 101 this.info.setDestination(destination); 102 103 if( session.connection.getProtocolVersion()>=3 && this.info.getWindowSize()>0 ) { 105 producerWindow = new UsageManager("Producer Window: "+producerId); 106 producerWindow.setLimit(this.info.getWindowSize()); 107 } 108 109 this.disableMessageID = false; 110 this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault(); 111 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; 112 this.defaultPriority = Message.DEFAULT_PRIORITY; 113 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; 114 this.startTime = System.currentTimeMillis(); 115 this.messageSequence = new AtomicLong (0); 116 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); 117 this.session.addProducer(this); 118 this.session.asyncSendPacket(info); 119 setTransformer(session.getTransformer()); 120 } 121 122 public StatsImpl getStats() { 123 return stats; 124 } 125 126 public JMSProducerStatsImpl getProducerStats() { 127 return stats; 128 } 129 130 149 public void setDisableMessageID(boolean value) throws JMSException { 150 checkClosed(); 151 this.disableMessageID = value; 152 } 153 154 161 public boolean getDisableMessageID() throws JMSException { 162 checkClosed(); 163 return this.disableMessageID; 164 } 165 166 185 public void setDisableMessageTimestamp(boolean value) throws JMSException { 186 checkClosed(); 187 this.disableMessageTimestamp = value; 188 } 189 190 197 public boolean getDisableMessageTimestamp() throws JMSException { 198 checkClosed(); 199 return this.disableMessageTimestamp; 200 } 201 202 217 public void setDeliveryMode(int newDeliveryMode) throws JMSException { 218 if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) { 219 throw new IllegalStateException ("unkown delivery mode: " + newDeliveryMode); 220 } 221 checkClosed(); 222 this.defaultDeliveryMode = newDeliveryMode; 223 } 224 225 232 public int getDeliveryMode() throws JMSException { 233 checkClosed(); 234 return this.defaultDeliveryMode; 235 } 236 237 252 public void setPriority(int newDefaultPriority) throws JMSException { 253 if (newDefaultPriority < 0 || newDefaultPriority > 9) { 254 throw new IllegalStateException ("default priority must be a value between 0 and 9"); 255 } 256 checkClosed(); 257 this.defaultPriority = newDefaultPriority; 258 } 259 260 268 public int getPriority() throws JMSException { 269 checkClosed(); 270 return this.defaultPriority; 271 } 272 273 285 public void setTimeToLive(long timeToLive) throws JMSException { 286 if (timeToLive < 0l) { 287 throw new IllegalStateException ("cannot set a negative timeToLive"); 288 } 289 checkClosed(); 290 this.defaultTimeToLive = timeToLive; 291 } 292 293 302 public long getTimeToLive() throws JMSException { 303 checkClosed(); 304 return this.defaultTimeToLive; 305 } 306 307 315 public Destination getDestination() throws JMSException { 316 checkClosed(); 317 return this.info.getDestination(); 318 } 319 320 331 public void close() throws JMSException { 332 if( closed==false ) { 333 dispose(); 334 this.session.asyncSendPacket(info.createRemoveCommand()); 335 } 336 } 337 338 public void dispose() { 339 if( closed==false ) { 340 this.session.removeProducer(this); 341 closed = true; 342 } 343 } 344 345 346 350 protected void checkClosed() throws IllegalStateException { 351 if (closed) { 352 throw new IllegalStateException ("The producer is closed"); 353 } 354 } 355 356 374 public void send(Message message) throws JMSException { 375 this.send(this.getDestination(), 376 message, 377 this.defaultDeliveryMode, 378 this.defaultPriority, 379 this.defaultTimeToLive); 380 } 381 382 402 public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { 403 this.send(this.getDestination(), 404 message, 405 deliveryMode, 406 priority, 407 timeToLive); 408 } 409 410 433 public void send(Destination destination, Message message) throws JMSException { 434 this.send(destination, 435 message, 436 this.defaultDeliveryMode, 437 this.defaultPriority, 438 this.defaultTimeToLive); 439 } 440 441 462 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) 463 throws JMSException { 464 checkClosed(); 465 if (destination == null) { 466 if( info.getDestination() == null ) { 467 throw new UnsupportedOperationException ("A destination must be specified."); 468 } 469 throw new InvalidDestinationException ("Don't understand null destinations"); 470 } 471 472 ActiveMQDestination dest; 473 if( destination == info.getDestination() ) { 474 dest = (ActiveMQDestination) destination; 475 } else if ( info.getDestination() == null ) { 476 dest = ActiveMQDestination.transform(destination); 477 } else { 478 throw new UnsupportedOperationException ("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); 479 } 480 if (dest == null) { 481 throw new JMSException ("No destination specified"); 482 } 483 484 if (transformer != null) { 485 Message transformedMessage = transformer.producerTransform(session, this, message); 486 if (transformedMessage != null) { 487 message = transformedMessage; 488 } 489 } 490 491 if( producerWindow!=null ) { 492 try { 493 producerWindow.waitForSpace(); 494 } catch (InterruptedException e) { 495 throw new JMSException ("Send aborted due to thread interrupt."); 496 } 497 } 498 499 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow); 500 501 stats.onMessage(); 502 } 503 504 505 public MessageTransformer getTransformer() { 506 return transformer; 507 } 508 509 512 public void setTransformer(MessageTransformer transformer) { 513 this.transformer = transformer; 514 } 515 516 519 protected long getStartTime() { 520 return this.startTime; 521 } 522 523 526 protected long getMessageSequence() { 527 return messageSequence.incrementAndGet(); 528 } 529 530 533 protected void setMessageSequence(AtomicLong messageSequence) { 534 this.messageSequence = messageSequence; 535 } 536 537 540 protected ProducerInfo getProducerInfo(){ 541 return this.info!=null?this.info:null; 542 } 543 544 547 protected void setProducerInfo(ProducerInfo info){ 548 this.info = info; 549 } 550 551 public String toString() { 552 return "ActiveMQMessageProducer { value=" +info.getProducerId()+" }"; 553 } 554 555 public void onProducerAck(ProducerAck pa) { 556 if( this.producerWindow!=null ) { 557 this.producerWindow.decreaseUsage(pa.getSize()); 558 } 559 } 560 561 } 562 | Popular Tags |