KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > tool > JMSMemtest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.tool;
20
21 import org.apache.activemq.ActiveMQConnectionFactory;
22 import org.apache.activemq.broker.BrokerService;
23 import org.apache.activemq.tool.MemProducer;
24 import org.apache.activemq.tool.MemConsumer;
25 import org.apache.activemq.tool.MemoryMonitoringTool;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import javax.jms.*;
30
31
32 import java.util.Properties JavaDoc;
33
34
35 public class JMSMemtest {
36
37     private static final Log log = LogFactory.getLog(JMSMemtest.class);
38     private static final int DEFAULT_MESSAGECOUNT = 5000;
39     protected BrokerService broker;
40     protected boolean topic = true;
41     protected boolean durable = false;
42
43     protected long messageCount = 0;
44
45     // how large the message in kb before we close/start the producer/consumer with a new connection. -1 means no connectionCheckpointSize
46
protected int connectionCheckpointSize;
47     protected long connectionInterval;
48
49
50     protected int consumerCount;
51     protected int producerCount;
52     protected int checkpointInterval;
53     protected int prefetchSize;
54     //set 10 kb of payload as default
55
protected int messageSize;
56
57     protected String JavaDoc reportDirectory;
58     protected String JavaDoc reportName;
59
60
61     protected String JavaDoc url = "";
62     protected MemProducer[] producers;
63     protected MemConsumer[] consumers;
64     protected String JavaDoc destinationName;
65     protected boolean allMessagesConsumed = true;
66     protected MemConsumer allMessagesList = new MemConsumer();
67
68     protected Message payload;
69
70     protected ActiveMQConnectionFactory connectionFactory;
71     protected Connection connection;
72     protected Destination destination;
73
74
75     protected boolean createConnectionPerClient = true;
76
77     protected boolean transacted = false;
78     protected boolean useEmbeddedBroker = true;
79     protected MemoryMonitoringTool memoryMonitoringTool;
80
81
82     public static void main(String JavaDoc[] args) {
83
84
85         Properties JavaDoc sysSettings = new Properties JavaDoc();
86
87         for (int i = 0; i < args.length; i++) {
88
89             int index = args[i].indexOf("=");
90             String JavaDoc key = args[i].substring(0, index);
91             String JavaDoc val = args[i].substring(index + 1);
92             sysSettings.setProperty(key, val);
93
94         }
95
96
97         JMSMemtest memtest = new JMSMemtest(sysSettings);
98         try {
99             memtest.start();
100         } catch (Exception JavaDoc e) {
101
102             e.printStackTrace();
103         }
104
105     }
106
107
108     public JMSMemtest(Properties JavaDoc settings) {
109         url = settings.getProperty("url");
110         topic = new Boolean JavaDoc(settings.getProperty("topic")).booleanValue();
111         durable = new Boolean JavaDoc(settings.getProperty("durable")).booleanValue();
112         connectionCheckpointSize = new Integer JavaDoc(settings.getProperty("connectionCheckpointSize")).intValue();
113         producerCount = new Integer JavaDoc(settings.getProperty("producerCount")).intValue();
114         consumerCount = new Integer JavaDoc(settings.getProperty("consumerCount")).intValue();
115         messageCount = new Integer JavaDoc(settings.getProperty("messageCount")).intValue();
116         messageSize = new Integer JavaDoc(settings.getProperty("messageSize")).intValue();
117         prefetchSize = new Integer JavaDoc(settings.getProperty("prefetchSize")).intValue();
118         checkpointInterval = new Integer JavaDoc(settings.getProperty("checkpointInterval")).intValue() * 1000;
119         producerCount = new Integer JavaDoc(settings.getProperty("producerCount")).intValue();
120         reportName = settings.getProperty("reportName");
121         destinationName = settings.getProperty("destinationName");
122         reportDirectory = settings.getProperty("reportDirectory");
123         connectionInterval = connectionCheckpointSize * 1024;
124     }
125
126     protected void start() throws Exception JavaDoc {
127         log.info("Starting Monitor");
128         memoryMonitoringTool = new MemoryMonitoringTool();
129         memoryMonitoringTool.setTestSettings(getSysTestSettings());
130         Thread JavaDoc monitorThread = memoryMonitoringTool.startMonitor();
131
132         if (messageCount == 0) {
133             messageCount = DEFAULT_MESSAGECOUNT;
134         }
135
136
137         if (useEmbeddedBroker) {
138             if (broker == null) {
139                 broker = createBroker();
140             }
141         }
142
143
144         connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory();
145         if (prefetchSize > 0) {
146             connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
147             connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
148         }
149
150         connection = connectionFactory.createConnection();
151         Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
152
153         if (topic) {
154             destination = session.createTopic(destinationName);
155         } else {
156             destination = session.createQueue(destinationName);
157         }
158
159         createPayload(session);
160
161         publishAndConsume();
162
163         log.info("Closing resources");
164         this.close();
165
166         monitorThread.join();
167
168
169     }
170
171
172     protected boolean resetConnection(int counter) {
173         if (connectionInterval > 0) {
174             long totalMsgSizeConsumed = counter * 1024;
175             if (connectionInterval < totalMsgSizeConsumed) {
176                 return true;
177             }
178         }
179         return false;
180     }
181
182     protected void publishAndConsume() throws Exception JavaDoc {
183
184         createConsumers();
185         createProducers();
186         int counter = 0;
187         boolean resetCon = false;
188         log.info("Start sending messages ");
189         for (int i = 0; i < messageCount; i++) {
190             if (resetCon == true) {
191                 closeConsumers();
192                 closeProducers();
193                 createConsumers();
194                 createProducers();
195                 resetCon = false;
196             }
197
198             for (int k = 0; k < producers.length; k++) {
199                 producers[k].sendMessage(payload, "counter", counter);
200                 counter++;
201                 if (resetConnection(counter)) {
202                     resetCon = true;
203                     break;
204                 }
205             }
206         }
207     }
208
209
210     protected void close() throws Exception JavaDoc {
211         connection.close();
212         broker.stop();
213
214         memoryMonitoringTool.stopMonitor();
215     }
216
217     protected void createPayload(Session session) throws JMSException {
218
219         byte[] array = new byte[messageSize];
220         for (int i = 0; i < array.length; i++) {
221             array[i] = (byte) i;
222         }
223
224         BytesMessage bystePayload = session.createBytesMessage();
225         bystePayload.writeBytes(array);
226         payload = (Message) bystePayload;
227     }
228
229
230     protected void createProducers() throws JMSException {
231         producers = new MemProducer[producerCount];
232         for (int i = 0; i < producerCount; i++) {
233             producers[i] = new MemProducer(connectionFactory, destination);
234             if (durable) {
235                 producers[i].setDeliveryMode(DeliveryMode.PERSISTENT);
236             } else {
237                 producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
238             }
239             producers[i].start();
240         }
241
242     }
243
244     protected void createConsumers() throws JMSException {
245         consumers = new MemConsumer[consumerCount];
246         for (int i = 0; i < consumerCount; i++) {
247             consumers[i] = new MemConsumer(connectionFactory, destination);
248             consumers[i].setParent(allMessagesList);
249             consumers[i].start();
250
251
252         }
253     }
254
255     protected void closeProducers() throws JMSException {
256         for (int i = 0; i < producerCount; i++) {
257             producers[i].shutDown();
258         }
259
260     }
261
262     protected void closeConsumers() throws JMSException {
263         for (int i = 0; i < consumerCount; i++) {
264             consumers[i].shutDown();
265         }
266     }
267
268     protected ConnectionFactory createConnectionFactory() throws JMSException {
269
270         if (url == null || url.trim().equals("") || url.trim().equals("null")) {
271             return new ActiveMQConnectionFactory("vm://localhost");
272         } else {
273             return new ActiveMQConnectionFactory(url);
274         }
275     }
276
277     protected BrokerService createBroker() throws Exception JavaDoc {
278         BrokerService broker = new BrokerService();
279         configureBroker(broker);
280         broker.start();
281         return broker;
282     }
283
284     protected void configureBroker(BrokerService broker) throws Exception JavaDoc {
285         broker.addConnector("vm://localhost");
286         broker.setDeleteAllMessagesOnStartup(true);
287     }
288
289     protected Properties JavaDoc getSysTestSettings() {
290         Properties JavaDoc settings = new Properties JavaDoc();
291         settings.setProperty("domain", topic == true ? "topic" : "queue");
292         settings.setProperty("durable", durable == true ? "durable" : "non-durable");
293         settings.setProperty("connection_checkpoint_size_kb", new Integer JavaDoc(connectionCheckpointSize).toString());
294         settings.setProperty("producer_count", new Integer JavaDoc(producerCount).toString());
295         settings.setProperty("consumer_count", new Integer JavaDoc(consumerCount).toString());
296         settings.setProperty("message_count", new Long JavaDoc(messageCount).toString());
297         settings.setProperty("message_size", new Integer JavaDoc(messageSize).toString());
298         settings.setProperty("prefetchSize", new Integer JavaDoc(prefetchSize).toString());
299         settings.setProperty("checkpoint_interval", new Integer JavaDoc(checkpointInterval).toString());
300         settings.setProperty("destination_name", destinationName);
301         settings.setProperty("report_name", reportName);
302         settings.setProperty("report_directory", reportDirectory);
303         settings.setProperty("connection_checkpoint_size", new Integer JavaDoc(connectionCheckpointSize).toString());
304         return settings;
305     }
306
307
308 }
309
Popular Tags