KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > perf > SimpleTopicTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.perf;
16
17 import javax.jms.Connection JavaDoc;
18 import javax.jms.ConnectionFactory JavaDoc;
19 import javax.jms.Destination JavaDoc;
20 import javax.jms.JMSException JavaDoc;
21 import javax.jms.Session JavaDoc;
22 import junit.framework.TestCase;
23 import org.apache.activemq.ActiveMQConnectionFactory;
24 import org.apache.activemq.broker.BrokerService;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 /**
29  * @version $Revision: 1.3 $
30  */

31 public class SimpleTopicTest extends TestCase{
32
33     private final Log log=LogFactory.getLog(getClass());
34     protected BrokerService broker;
35     // protected String
36
// bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
37
//protected String bindAddress="tcp://localhost:61616";
38
protected String JavaDoc bindAddress="tcp://localhost:61616";
39     //protected String bindAddress="vm://localhost?marshal=true";
40
//protected String bindAddress="vm://localhost";
41
protected PerfProducer[] producers;
42     protected PerfConsumer[] consumers;
43     protected String JavaDoc DESTINATION_NAME=getClass().getName();
44     protected int SAMPLE_COUNT=10;
45     protected long SAMPLE_INTERVAL=1000;
46     protected int NUMBER_OF_CONSUMERS=1;
47     protected int NUMBER_OF_PRODUCERS=1;
48     protected int PAYLOAD_SIZE=1024;
49     protected byte[] array=null;
50     protected ConnectionFactory JavaDoc factory;
51     protected Destination JavaDoc destination;
52     protected long CONSUMER_SLEEP_DURATION=0;
53
54     /**
55      * Sets up a test where the producer and consumer have their own connection.
56      *
57      * @see junit.framework.TestCase#setUp()
58      */

59     protected void setUp() throws Exception JavaDoc{
60         if(broker==null){
61             broker=createBroker();
62         }
63         factory=createConnectionFactory();
64         Connection JavaDoc con=factory.createConnection();
65         Session JavaDoc session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
66         destination=createDestination(session,DESTINATION_NAME);
67         log.info("Testing against destination: "+destination);
68         log.info("Running "+NUMBER_OF_PRODUCERS+" producer(s) and "+NUMBER_OF_CONSUMERS+" consumer(s)");
69         con.close();
70         producers=new PerfProducer[NUMBER_OF_PRODUCERS];
71         consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
72         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
73             consumers[i]=createConsumer(factory,destination,i);
74             consumers[i].setSleepDuration(CONSUMER_SLEEP_DURATION);
75         }
76         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
77             array=new byte[PAYLOAD_SIZE];
78             for(int j=i;j<array.length;j++){
79                 array[j]=(byte)j;
80             }
81             producers[i]=createProducer(factory,destination,i,array);
82         }
83         super.setUp();
84     }
85
86     protected void tearDown() throws Exception JavaDoc{
87         super.tearDown();
88         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
89             consumers[i].shutDown();
90         }
91         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
92             producers[i].shutDown();
93         }
94         if(broker!=null){
95             broker.stop();
96             broker=null;
97         }
98     }
99
100     protected Destination JavaDoc createDestination(Session JavaDoc s,String JavaDoc destinationName) throws JMSException JavaDoc{
101         return s.createTopic(destinationName);
102     }
103
104     /**
105      * Factory method to create a new broker
106      *
107      * @throws Exception
108      */

109     protected BrokerService createBroker() throws Exception JavaDoc{
110         BrokerService answer=new BrokerService();
111         configureBroker(answer);
112         answer.start();
113         return answer;
114     }
115
116     protected PerfProducer createProducer(ConnectionFactory JavaDoc fac,Destination JavaDoc dest,int number,byte[] payload)
117             throws JMSException JavaDoc{
118         return new PerfProducer(fac,dest,payload);
119     }
120
121     protected PerfConsumer createConsumer(ConnectionFactory JavaDoc fac,Destination JavaDoc dest,int number) throws JMSException JavaDoc{
122         return new PerfConsumer(fac,dest);
123     }
124
125     protected void configureBroker(BrokerService answer) throws Exception JavaDoc{
126         answer.addConnector(bindAddress);
127         answer.setDeleteAllMessagesOnStartup(true);
128     }
129
130     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception JavaDoc{
131         return new ActiveMQConnectionFactory(bindAddress);
132     }
133
134     public void testPerformance() throws JMSException JavaDoc,InterruptedException JavaDoc{
135         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
136             consumers[i].start();
137         }
138         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
139             producers[i].start();
140         }
141         log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval.");
142         for(int i=0;i<SAMPLE_COUNT;i++){
143             Thread.sleep(SAMPLE_INTERVAL);
144             dumpProducerRate();
145             dumpConsumerRate();
146         }
147         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
148             producers[i].stop();
149         }
150         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
151             consumers[i].stop();
152         }
153     }
154
155     protected void dumpProducerRate(){
156         int totalRate=0;
157         int totalCount=0;
158         for(int i=0;i<producers.length;i++){
159             PerfRate rate=producers[i].getRate().cloneAndReset();
160             totalRate+=rate.getRate();
161             totalCount+=rate.getTotalCount();
162         }
163         int avgRate=totalRate/producers.length;
164         log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent = "+totalCount);
165     }
166
167     protected void dumpConsumerRate(){
168         int totalRate=0;
169         int totalCount=0;
170         for(int i=0;i<consumers.length;i++){
171             PerfRate rate=consumers[i].getRate().cloneAndReset();
172             totalRate+=rate.getRate();
173             totalCount+=rate.getTotalCount();
174         }
175         if(consumers!=null&&consumers.length>0){
176             int avgRate=totalRate/consumers.length;
177             log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received = "+totalCount);
178         }
179     }
180 }
181
Popular Tags