KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > BrokerBenchmark


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20
21 import junit.framework.Test;
22
23 import org.apache.activemq.command.ActiveMQDestination;
24 import org.apache.activemq.command.ActiveMQQueue;
25 import org.apache.activemq.command.ActiveMQTopic;
26 import org.apache.activemq.command.ConnectionInfo;
27 import org.apache.activemq.command.ConsumerInfo;
28 import org.apache.activemq.command.Message;
29 import org.apache.activemq.command.MessageAck;
30 import org.apache.activemq.command.ProducerInfo;
31 import org.apache.activemq.command.SessionInfo;
32
33 import java.util.concurrent.Semaphore JavaDoc;
34 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
35
36 /**
37  * BrokerBenchmark is used to get an idea of the raw performance of a broker. Since
38  * the broker data structures using in message dispatching are under high contention from
39  * client requests, it's performance should be monitored closely since it typically is the
40  * biggest bottleneck in a high performance messaging fabric.
41  *
42  * The benchmarks are run under all the following combinations options:
43  *
44  * Queue vs. Topic, 1 vs. 10 producer threads, 1 vs. 10 consumer threads, and
45  * Persistent vs. Non-Persistent messages.
46  *
47  * Message Acking uses client ack style batch acking since that typically has the
48  * best ack performance.
49  *
50  * @version $Revision: 1.9 $
51  */

52 public class BrokerBenchmark extends BrokerTestSupport {
53     
54     public int PRODUCE_COUNT=Integer.parseInt(System.getProperty("PRODUCE_COUNT","10000"));
55     public ActiveMQDestination destination;
56     public int PRODUCER_COUNT;
57     public int CONSUMER_COUNT;
58     public boolean deliveryMode;
59
60     public void initCombosForTestPerformance() {
61         addCombinationValues("destination", new Object JavaDoc[]{
62                 new ActiveMQQueue("TEST"),
63                 new ActiveMQTopic("TEST")
64                 });
65         addCombinationValues("PRODUCER_COUNT", new Object JavaDoc[]{
66                 new Integer JavaDoc("1"),
67                 new Integer JavaDoc("10")});
68         addCombinationValues("CONSUMER_COUNT", new Object JavaDoc[]{
69                 new Integer JavaDoc("1"),
70                 new Integer JavaDoc("10")});
71         addCombinationValues("CONSUMER_COUNT", new Object JavaDoc[]{
72                 new Integer JavaDoc("1"),
73                 new Integer JavaDoc("10")});
74         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
75 // Boolean.FALSE,
76
Boolean.TRUE
77                 } );
78     }
79     
80     public void testPerformance() throws Exception JavaDoc {
81         
82         System.out.println("Running Benchmark for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode);
83         final int CONSUME_COUNT = destination.isTopic() ? CONSUMER_COUNT*PRODUCE_COUNT : PRODUCE_COUNT;
84
85         final Semaphore JavaDoc consumersStarted = new Semaphore JavaDoc(1-(CONSUMER_COUNT));
86         final Semaphore JavaDoc producersFinished = new Semaphore JavaDoc(1-(PRODUCER_COUNT));
87         final Semaphore JavaDoc consumersFinished = new Semaphore JavaDoc(1-(CONSUMER_COUNT));
88         final ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT+CONSUME_COUNT, 10);
89         
90         // Start a producer and consumer
91

92         profilerPause("Benchmark ready. Start profiler ");
93         
94         long start = System.currentTimeMillis();
95         
96         
97         final AtomicInteger JavaDoc receiveCounter = new AtomicInteger JavaDoc(0);
98         for( int i=0; i < CONSUMER_COUNT; i++) {
99             new Thread JavaDoc() {
100                 public void run() {
101                     try {
102                         
103                         // Consume the messages
104
StubConnection connection = new StubConnection(broker);
105                         ConnectionInfo connectionInfo = createConnectionInfo();
106                         connection.send(connectionInfo);
107
108                         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
109                         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
110                         consumerInfo.setPrefetchSize(1000);
111                         connection.send(sessionInfo);
112                         connection.send(consumerInfo);
113                         
114                         consumersStarted.release();
115                         
116                         while( receiveCounter.get() < CONSUME_COUNT ) {
117                 
118                             int counter=0;
119                             // Get a least 1 message.
120
Message msg = receiveMessage(connection, 2000);
121                             if( msg!=null ) {
122                                 printer.increment();
123                                 receiveCounter.incrementAndGet();
124                                 
125                                 counter++;
126                                 
127                                 // Try to piggy back a few extra message acks if they are ready.
128
Message extra=null;
129                                 while( (extra = receiveMessage(connection,0))!=null ) {
130                                     msg=extra;
131                                     printer.increment();
132                                     receiveCounter.incrementAndGet();
133                                     counter++;
134                                 }
135                             }
136                             
137                             
138                             if(msg!=null) {
139                                 connection.send(createAck(consumerInfo, msg, counter, MessageAck.STANDARD_ACK_TYPE));
140                             } else if ( receiveCounter.get() < CONSUME_COUNT ) {
141                                 System.out.println("Consumer stall, waiting for message #"+receiveCounter.get()+1);
142                             }
143                         }
144                         
145                         connection.send(closeConsumerInfo(consumerInfo));
146                     } catch (Throwable JavaDoc e) {
147                         e.printStackTrace();
148                     } finally {
149                         consumersFinished.release();
150                     }
151                 }
152
153             }.start();
154         }
155         
156         // Make sure that the consumers are started first to avoid sending messages
157
// before a topic is subscribed so that those messages are not missed.
158
consumersStarted.acquire();
159         
160         // Send the messages in an async thread.
161
for( int i=0; i < PRODUCER_COUNT; i++) {
162             new Thread JavaDoc() {
163                 public void run() {
164                     try {
165                         StubConnection connection = new StubConnection(broker);
166                         ConnectionInfo connectionInfo = createConnectionInfo();
167                         connection.send(connectionInfo);
168                         
169                         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
170                         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
171                         connection.send(sessionInfo);
172                         connection.send(producerInfo);
173     
174                         for(int i=0; i < PRODUCE_COUNT/PRODUCER_COUNT; i++) {
175                             Message message = createMessage(producerInfo, destination);
176                             message.setPersistent(deliveryMode);
177                             message.setResponseRequired(false);
178                             connection.send(message);
179                             printer.increment();
180                         }
181                     } catch (Throwable JavaDoc e) {
182                         e.printStackTrace();
183                     } finally {
184                         producersFinished.release();
185                     }
186                 };
187             }.start();
188         }
189         
190         producersFinished.acquire();
191         long end1 = System.currentTimeMillis();
192         consumersFinished.acquire();
193         long end2 = System.currentTimeMillis();
194         
195         System.out.println("Results for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode);
196         System.out.println("Produced at messages/sec: "+ (PRODUCE_COUNT*1000.0/(end1-start)));
197         System.out.println("Consumed at messages/sec: "+ (CONSUME_COUNT*1000.0/(end2-start)));
198         profilerPause("Benchmark done. Stop profiler ");
199     }
200
201     public static Test suite() {
202         return suite(BrokerBenchmark.class);
203     }
204     
205     public static void main(String JavaDoc[] args) {
206         junit.textui.TestRunner.run(suite());
207     }
208
209 }
210
Popular Tags