1 18 package org.apache.activemq.tool; 19 20 import org.apache.activemq.tool.properties.JmsProducerProperties; 21 import org.apache.activemq.tool.properties.JmsClientProperties; 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 25 import javax.jms.ConnectionFactory ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.TextMessage ; 28 import javax.jms.JMSException ; 29 import javax.jms.Destination ; 30 import javax.jms.DeliveryMode ; 31 import java.util.Arrays ; 32 33 public class JmsProducerClient extends AbstractJmsMeasurableClient { 34 private static final Log log = LogFactory.getLog(JmsProducerClient.class); 35 36 protected JmsProducerProperties client; 37 protected MessageProducer jmsProducer; 38 protected TextMessage jmsTextMessage; 39 40 public JmsProducerClient(ConnectionFactory factory) { 41 this(new JmsProducerProperties(), factory); 42 } 43 44 public JmsProducerClient(JmsProducerProperties clientProps, ConnectionFactory factory) { 45 super(factory); 46 this.client = clientProps; 47 } 48 49 public void sendMessages() throws JMSException { 50 if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) { 52 sendCountBasedMessages(client.getSendCount()); 53 54 } else { 56 sendTimeBasedMessages(client.getSendDuration()); 57 } 58 } 59 60 public void sendMessages(int destCount) throws JMSException { 61 this.destCount = destCount; 62 sendMessages(); 63 } 64 65 public void sendMessages(int destIndex, int destCount) throws JMSException { 66 this.destIndex = destIndex; 67 sendMessages(destCount); 68 } 69 70 public void sendCountBasedMessages(long messageCount) throws JMSException { 71 Destination [] dest = createDestination(destIndex, destCount); 74 75 if (getJmsProducer() == null) { 77 if (dest.length == 1) { 78 createJmsProducer(dest[0]); 79 } else { 80 createJmsProducer(); 81 } 82 } 83 try { 84 getConnection().start(); 85 log.info("Starting to publish " + client.getMessageSize() + " byte(s) of " + messageCount + " messages..."); 86 87 if (!client.isCreateNewMsg()) { 89 createJmsTextMessage(); 91 92 if (dest.length > 1) { 94 for (int i = 0; i < messageCount; i++) { 95 for (int j = 0; j < dest.length; j++) { 96 getJmsProducer().send(dest[j], getJmsTextMessage()); 97 incThroughput(); 98 } 99 } 100 } else { 102 for (int i = 0; i < messageCount; i++) { 103 getJmsProducer().send(getJmsTextMessage()); 104 incThroughput(); 105 } 106 } 107 108 } else { 112 if (dest.length > 1) { 114 for (int i = 0; i < messageCount; i++) { 115 for (int j = 0; j < dest.length; j++) { 116 getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]")); 117 incThroughput(); 118 } 119 } 120 121 } else { 123 for (int i = 0; i < messageCount; i++) { 124 getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]")); 125 incThroughput(); 126 } 127 } 128 } 129 } finally { 130 getConnection().close(); 131 } 132 } 133 134 public void sendTimeBasedMessages(long duration) throws JMSException { 135 long endTime = System.currentTimeMillis() + duration; 136 139 Destination [] dest = createDestination(destIndex, destCount); 140 141 if (getJmsProducer() == null) { 143 if (dest.length == 1) { 144 createJmsProducer(dest[0]); 145 } else { 146 createJmsProducer(); 147 } 148 } 149 150 try { 151 getConnection().start(); 152 log.info("Starting to publish " + client.getMessageSize() + " byte(s) messages for " + duration + " ms"); 153 154 if (!client.isCreateNewMsg()) { 156 createJmsTextMessage(); 158 159 if (dest.length > 1) { 161 while (System.currentTimeMillis() < endTime) { 162 for (int j = 0; j < dest.length; j++) { 163 getJmsProducer().send(dest[j], getJmsTextMessage()); 164 incThroughput(); 165 } 166 } 167 } else { 169 while (System.currentTimeMillis() < endTime) { 170 getJmsProducer().send(getJmsTextMessage()); 171 incThroughput(); 172 } 173 } 174 175 } else { 179 long count = 1; 181 if (dest.length > 1) { 182 while (System.currentTimeMillis() < endTime) { 183 for (int j = 0; j < dest.length; j++) { 184 getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]")); 185 incThroughput(); 186 } 187 } 188 189 } else { 191 while (System.currentTimeMillis() < endTime) { 192 193 getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]")); 194 incThroughput(); 195 } 196 } 197 } 198 } finally { 199 getConnection().close(); 200 } 201 } 202 203 public MessageProducer createJmsProducer() throws JMSException { 204 jmsProducer = getSession().createProducer(null); 205 if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) { 206 log.info("Creating producer to possible multiple destinations with persistent delivery."); 207 jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 208 } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) { 209 log.info("Creating producer to possible multiple destinations with non-persistent delivery."); 210 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 211 } else { 212 log.warn("Unknown deliveryMode value. Defaulting to non-persistent."); 213 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 214 } 215 return jmsProducer; 216 } 217 218 public MessageProducer createJmsProducer(Destination dest) throws JMSException { 219 jmsProducer = getSession().createProducer(dest); 220 if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) { 221 log.info("Creating producer to: " + dest.toString() + " with persistent delivery."); 222 jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 223 } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) { 224 log.info("Creating producer to: " + dest.toString() + " with non-persistent delivery."); 225 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 226 } else { 227 log.warn("Unknown deliveryMode value. Defaulting to non-persistent."); 228 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 229 } 230 return jmsProducer; 231 } 232 233 public MessageProducer getJmsProducer() { 234 return jmsProducer; 235 } 236 237 public TextMessage createJmsTextMessage() throws JMSException { 238 return createJmsTextMessage(client.getMessageSize()); 239 } 240 241 public TextMessage createJmsTextMessage(int size) throws JMSException { 242 jmsTextMessage = getSession().createTextMessage(buildText("", size)); 243 return jmsTextMessage; 244 } 245 246 public TextMessage createJmsTextMessage(String text) throws JMSException { 247 jmsTextMessage = getSession().createTextMessage(buildText(text, client.getMessageSize())); 248 return jmsTextMessage; 249 } 250 251 public TextMessage getJmsTextMessage() { 252 return jmsTextMessage; 253 } 254 255 public JmsClientProperties getClient() { 256 return client; 257 } 258 259 public void setClient(JmsClientProperties clientProps) { 260 client = (JmsProducerProperties)clientProps; 261 } 262 263 protected String buildText(String text, int size) { 264 byte[] data = new byte[size - text.length()]; 265 Arrays.fill(data, (byte) 0); 266 return text + new String (data); 267 } 268 } 269 | Popular Tags |