1 18 19 package org.apache.activemq.tool; 20 21 import org.apache.activemq.ActiveMQConnectionFactory; 22 import org.apache.activemq.broker.BrokerService; 23 import org.apache.activemq.tool.MemProducer; 24 import org.apache.activemq.tool.MemConsumer; 25 import org.apache.activemq.tool.MemoryMonitoringTool; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 29 import javax.jms.*; 30 31 32 import java.util.Properties ; 33 34 35 public class JMSMemtest { 36 37 private static final Log log = LogFactory.getLog(JMSMemtest.class); 38 private static final int DEFAULT_MESSAGECOUNT = 5000; 39 protected BrokerService broker; 40 protected boolean topic = true; 41 protected boolean durable = false; 42 43 protected long messageCount = 0; 44 45 protected int connectionCheckpointSize; 47 protected long connectionInterval; 48 49 50 protected int consumerCount; 51 protected int producerCount; 52 protected int checkpointInterval; 53 protected int prefetchSize; 54 protected int messageSize; 56 57 protected String reportDirectory; 58 protected String reportName; 59 60 61 protected String url = ""; 62 protected MemProducer[] producers; 63 protected MemConsumer[] consumers; 64 protected String destinationName; 65 protected boolean allMessagesConsumed = true; 66 protected MemConsumer allMessagesList = new MemConsumer(); 67 68 protected Message payload; 69 70 protected ActiveMQConnectionFactory connectionFactory; 71 protected Connection connection; 72 protected Destination destination; 73 74 75 protected boolean createConnectionPerClient = true; 76 77 protected boolean transacted = false; 78 protected boolean useEmbeddedBroker = true; 79 protected MemoryMonitoringTool memoryMonitoringTool; 80 81 82 public static void main(String [] args) { 83 84 85 Properties sysSettings = new Properties (); 86 87 for (int i = 0; i < args.length; i++) { 88 89 int index = args[i].indexOf("="); 90 String key = args[i].substring(0, index); 91 String val = args[i].substring(index + 1); 92 sysSettings.setProperty(key, val); 93 94 } 95 96 97 JMSMemtest memtest = new JMSMemtest(sysSettings); 98 try { 99 memtest.start(); 100 } catch (Exception e) { 101 102 e.printStackTrace(); 103 } 104 105 } 106 107 108 public JMSMemtest(Properties settings) { 109 url = settings.getProperty("url"); 110 topic = new Boolean (settings.getProperty("topic")).booleanValue(); 111 durable = new Boolean (settings.getProperty("durable")).booleanValue(); 112 connectionCheckpointSize = new Integer (settings.getProperty("connectionCheckpointSize")).intValue(); 113 producerCount = new Integer (settings.getProperty("producerCount")).intValue(); 114 consumerCount = new Integer (settings.getProperty("consumerCount")).intValue(); 115 messageCount = new Integer (settings.getProperty("messageCount")).intValue(); 116 messageSize = new Integer (settings.getProperty("messageSize")).intValue(); 117 prefetchSize = new Integer (settings.getProperty("prefetchSize")).intValue(); 118 checkpointInterval = new Integer (settings.getProperty("checkpointInterval")).intValue() * 1000; 119 producerCount = new Integer (settings.getProperty("producerCount")).intValue(); 120 reportName = settings.getProperty("reportName"); 121 destinationName = settings.getProperty("destinationName"); 122 reportDirectory = settings.getProperty("reportDirectory"); 123 connectionInterval = connectionCheckpointSize * 1024; 124 } 125 126 protected void start() throws Exception { 127 log.info("Starting Monitor"); 128 memoryMonitoringTool = new MemoryMonitoringTool(); 129 memoryMonitoringTool.setTestSettings(getSysTestSettings()); 130 Thread monitorThread = memoryMonitoringTool.startMonitor(); 131 132 if (messageCount == 0) { 133 messageCount = DEFAULT_MESSAGECOUNT; 134 } 135 136 137 if (useEmbeddedBroker) { 138 if (broker == null) { 139 broker = createBroker(); 140 } 141 } 142 143 144 connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory(); 145 if (prefetchSize > 0) { 146 connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize); 147 connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize); 148 } 149 150 connection = connectionFactory.createConnection(); 151 Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 152 153 if (topic) { 154 destination = session.createTopic(destinationName); 155 } else { 156 destination = session.createQueue(destinationName); 157 } 158 159 createPayload(session); 160 161 publishAndConsume(); 162 163 log.info("Closing resources"); 164 this.close(); 165 166 monitorThread.join(); 167 168 169 } 170 171 172 protected boolean resetConnection(int counter) { 173 if (connectionInterval > 0) { 174 long totalMsgSizeConsumed = counter * 1024; 175 if (connectionInterval < totalMsgSizeConsumed) { 176 return true; 177 } 178 } 179 return false; 180 } 181 182 protected void publishAndConsume() throws Exception { 183 184 createConsumers(); 185 createProducers(); 186 int counter = 0; 187 boolean resetCon = false; 188 log.info("Start sending messages "); 189 for (int i = 0; i < messageCount; i++) { 190 if (resetCon == true) { 191 closeConsumers(); 192 closeProducers(); 193 createConsumers(); 194 createProducers(); 195 resetCon = false; 196 } 197 198 for (int k = 0; k < producers.length; k++) { 199 producers[k].sendMessage(payload, "counter", counter); 200 counter++; 201 if (resetConnection(counter)) { 202 resetCon = true; 203 break; 204 } 205 } 206 } 207 } 208 209 210 protected void close() throws Exception { 211 connection.close(); 212 broker.stop(); 213 214 memoryMonitoringTool.stopMonitor(); 215 } 216 217 protected void createPayload(Session session) throws JMSException { 218 219 byte[] array = new byte[messageSize]; 220 for (int i = 0; i < array.length; i++) { 221 array[i] = (byte) i; 222 } 223 224 BytesMessage bystePayload = session.createBytesMessage(); 225 bystePayload.writeBytes(array); 226 payload = (Message) bystePayload; 227 } 228 229 230 protected void createProducers() throws JMSException { 231 producers = new MemProducer[producerCount]; 232 for (int i = 0; i < producerCount; i++) { 233 producers[i] = new MemProducer(connectionFactory, destination); 234 if (durable) { 235 producers[i].setDeliveryMode(DeliveryMode.PERSISTENT); 236 } else { 237 producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT); 238 } 239 producers[i].start(); 240 } 241 242 } 243 244 protected void createConsumers() throws JMSException { 245 consumers = new MemConsumer[consumerCount]; 246 for (int i = 0; i < consumerCount; i++) { 247 consumers[i] = new MemConsumer(connectionFactory, destination); 248 consumers[i].setParent(allMessagesList); 249 consumers[i].start(); 250 251 252 } 253 } 254 255 protected void closeProducers() throws JMSException { 256 for (int i = 0; i < producerCount; i++) { 257 producers[i].shutDown(); 258 } 259 260 } 261 262 protected void closeConsumers() throws JMSException { 263 for (int i = 0; i < consumerCount; i++) { 264 consumers[i].shutDown(); 265 } 266 } 267 268 protected ConnectionFactory createConnectionFactory() throws JMSException { 269 270 if (url == null || url.trim().equals("") || url.trim().equals("null")) { 271 return new ActiveMQConnectionFactory("vm://localhost"); 272 } else { 273 return new ActiveMQConnectionFactory(url); 274 } 275 } 276 277 protected BrokerService createBroker() throws Exception { 278 BrokerService broker = new BrokerService(); 279 configureBroker(broker); 280 broker.start(); 281 return broker; 282 } 283 284 protected void configureBroker(BrokerService broker) throws Exception { 285 broker.addConnector("vm://localhost"); 286 broker.setDeleteAllMessagesOnStartup(true); 287 } 288 289 protected Properties getSysTestSettings() { 290 Properties settings = new Properties (); 291 settings.setProperty("domain", topic == true ? "topic" : "queue"); 292 settings.setProperty("durable", durable == true ? "durable" : "non-durable"); 293 settings.setProperty("connection_checkpoint_size_kb", new Integer (connectionCheckpointSize).toString()); 294 settings.setProperty("producer_count", new Integer (producerCount).toString()); 295 settings.setProperty("consumer_count", new Integer (consumerCount).toString()); 296 settings.setProperty("message_count", new Long (messageCount).toString()); 297 settings.setProperty("message_size", new Integer (messageSize).toString()); 298 settings.setProperty("prefetchSize", new Integer (prefetchSize).toString()); 299 settings.setProperty("checkpoint_interval", new Integer (checkpointInterval).toString()); 300 settings.setProperty("destination_name", destinationName); 301 settings.setProperty("report_name", reportName); 302 settings.setProperty("report_directory", reportDirectory); 303 settings.setProperty("connection_checkpoint_size", new Integer (connectionCheckpointSize).toString()); 304 return settings; 305 } 306 307 308 } 309 | Popular Tags |