KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsBenchmark


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.io.IOException JavaDoc;
21 import java.net.URI JavaDoc;
22 import java.net.URISyntaxException JavaDoc;
23
24 import javax.jms.BytesMessage JavaDoc;
25 import javax.jms.Connection JavaDoc;
26 import javax.jms.ConnectionFactory JavaDoc;
27 import javax.jms.DeliveryMode JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageConsumer JavaDoc;
31 import javax.jms.MessageListener JavaDoc;
32 import javax.jms.MessageProducer JavaDoc;
33 import javax.jms.Session JavaDoc;
34
35 import junit.framework.Test;
36
37 import org.apache.activemq.ActiveMQConnectionFactory;
38 import org.apache.activemq.broker.BrokerFactory;
39 import org.apache.activemq.broker.BrokerService;
40 import org.apache.activemq.broker.TransportConnector;
41 import org.apache.activemq.command.ActiveMQDestination;
42 import org.apache.activemq.command.ActiveMQQueue;
43
44 import java.util.concurrent.Callable JavaDoc;
45 import java.util.concurrent.CountDownLatch JavaDoc;
46 import java.util.concurrent.Semaphore JavaDoc;
47 import java.util.concurrent.TimeUnit JavaDoc;
48 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
49
50 /**
51  * Benchmarks the broker by starting many consumer and producers against the
52  * same destination.
53  *
54  * Make sure you run with jvm option -server (makes a big difference). The tests
55  * simulate storing 1000 1k jms messages to see the rate of processing msg/sec.
56  *
57  * @version $Revision$
58  */

59 public class JmsBenchmark extends JmsTestSupport {
60
61     private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
62     private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
63     private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000*60));
64     private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
65     private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
66
67     public ActiveMQDestination destination;
68
69     public static Test suite() {
70         return suite(JmsBenchmark.class);
71     }
72
73     public static void main(String JavaDoc[] args) {
74         junit.textui.TestRunner.run(JmsBenchmark.class);
75     }
76
77     public void initCombos() {
78         addCombinationValues("destination", new Object JavaDoc[] {
79 // new ActiveMQTopic("TEST"),
80
new ActiveMQQueue("TEST"),
81                 });
82     }
83
84     protected BrokerService createBroker() throws Exception JavaDoc {
85         return BrokerFactory.createBroker(new URI JavaDoc("broker://(tcp://localhost:0)?persistent=false"));
86     }
87
88     protected ConnectionFactory JavaDoc createConnectionFactory() throws URISyntaxException JavaDoc, IOException JavaDoc {
89         return new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI());
90     }
91
92     /**
93      * @throws Throwable
94      */

95     public void testConcurrentSendReceive() throws Throwable JavaDoc {
96
97         final Semaphore JavaDoc connectionsEstablished = new Semaphore JavaDoc(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
98         final Semaphore JavaDoc workerDone = new Semaphore JavaDoc(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
99         final CountDownLatch JavaDoc sampleTimeDone = new CountDownLatch JavaDoc(1);
100
101         final AtomicInteger JavaDoc producedMessages = new AtomicInteger JavaDoc(0);
102         final AtomicInteger JavaDoc receivedMessages = new AtomicInteger JavaDoc(0);
103
104         final Callable JavaDoc producer = new Callable JavaDoc() {
105             public Object JavaDoc call() throws JMSException JavaDoc, InterruptedException JavaDoc {
106                 Connection JavaDoc connection = factory.createConnection();
107                 connections.add(connection);
108                 Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
109                 MessageProducer JavaDoc producer = session.createProducer(destination);
110                 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
111                 BytesMessage JavaDoc message = session.createBytesMessage();
112                 message.writeBytes(new byte[1024]);
113                 connection.start();
114                 connectionsEstablished.release();
115
116                 while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
117                     producer.send(message);
118                     producedMessages.incrementAndGet();
119                 }
120
121                 connection.close();
122                 workerDone.release();
123                 return null;
124             }
125         };
126
127         final Callable JavaDoc consumer = new Callable JavaDoc() {
128             public Object JavaDoc call() throws JMSException JavaDoc, InterruptedException JavaDoc {
129                 Connection JavaDoc connection = factory.createConnection();
130                 connections.add(connection);
131                 Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
132                 MessageConsumer JavaDoc consumer = session.createConsumer(destination);
133
134                 consumer.setMessageListener(new MessageListener JavaDoc() {
135                     public void onMessage(Message msg) {
136                         receivedMessages.incrementAndGet();
137                     }
138                 });
139                 connection.start();
140
141                 connectionsEstablished.release();
142                 sampleTimeDone.await();
143
144                 connection.close();
145                 workerDone.release();
146                 return null;
147             }
148         };
149
150         final Throwable JavaDoc workerError[] = new Throwable JavaDoc[1];
151         for (int i = 0; i < PRODUCER_COUNT; i++) {
152             new Thread JavaDoc("Producer:" + i) {
153                 public void run() {
154                     try {
155                         producer.call();
156                     } catch (Throwable JavaDoc e) {
157                         e.printStackTrace();
158                         workerError[0] = e;
159                     }
160                 }
161             }.start();
162         }
163
164         for (int i = 0; i < CONSUMER_COUNT; i++) {
165             new Thread JavaDoc("Consumer:" + i) {
166                 public void run() {
167                     try {
168                         consumer.call();
169                     } catch (Throwable JavaDoc e) {
170                         e.printStackTrace();
171                         workerError[0] = e;
172                     }
173                 }
174             }.start();
175         }
176
177         System.out.println(getName() + ": Waiting for Producers and Consumers to startup.");
178         connectionsEstablished.acquire();
179         System.out.println("Producers and Consumers are now running. Waiting for system to reach steady state: "
180                 + (SAMPLE_DELAY / 1000.0f) + " seconds");
181         Thread.sleep(1000 * 10);
182
183         System.out.println("Starting sample: "+SAMPLES+" each lasting "+ (SAMPLE_DURATION / 1000.0f) + " seconds");
184
185
186         long now = System.currentTimeMillis();
187         for( int i=0; i < SAMPLES; i ++) {
188             
189             long start = System.currentTimeMillis();
190             producedMessages.set(0);
191             receivedMessages.set(0);
192             
193             Thread.sleep(SAMPLE_DURATION);
194             
195             long end = System.currentTimeMillis();
196             int r = receivedMessages.get();
197             int p = producedMessages.get();
198             
199             System.out.println("published: " + p + " msgs at "+ (p * 1000f / (end - start)) + " msgs/sec, "+
200                     "consumed: " + r + " msgs at "+ (r * 1000f / (end - start)) + " msgs/sec");
201         }
202
203         System.out.println("Sample done.");
204         sampleTimeDone.countDown();
205
206         workerDone.acquire();
207         if (workerError[0] != null) {
208             throw workerError[0];
209         }
210
211     }
212
213 }
214
Popular Tags