1 14 15 package org.apache.activemq.perf; 16 17 import javax.jms.Connection ; 18 import javax.jms.ConnectionFactory ; 19 import javax.jms.Destination ; 20 import javax.jms.JMSException ; 21 import javax.jms.Session ; 22 import junit.framework.TestCase; 23 import org.apache.activemq.ActiveMQConnectionFactory; 24 import org.apache.activemq.broker.BrokerService; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 28 31 public class SimpleTopicTest extends TestCase{ 32 33 private final Log log=LogFactory.getLog(getClass()); 34 protected BrokerService broker; 35 protected String bindAddress="tcp://localhost:61616"; 39 protected PerfProducer[] producers; 42 protected PerfConsumer[] consumers; 43 protected String DESTINATION_NAME=getClass().getName(); 44 protected int SAMPLE_COUNT=10; 45 protected long SAMPLE_INTERVAL=1000; 46 protected int NUMBER_OF_CONSUMERS=1; 47 protected int NUMBER_OF_PRODUCERS=1; 48 protected int PAYLOAD_SIZE=1024; 49 protected byte[] array=null; 50 protected ConnectionFactory factory; 51 protected Destination destination; 52 protected long CONSUMER_SLEEP_DURATION=0; 53 54 59 protected void setUp() throws Exception { 60 if(broker==null){ 61 broker=createBroker(); 62 } 63 factory=createConnectionFactory(); 64 Connection con=factory.createConnection(); 65 Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE); 66 destination=createDestination(session,DESTINATION_NAME); 67 log.info("Testing against destination: "+destination); 68 log.info("Running "+NUMBER_OF_PRODUCERS+" producer(s) and "+NUMBER_OF_CONSUMERS+" consumer(s)"); 69 con.close(); 70 producers=new PerfProducer[NUMBER_OF_PRODUCERS]; 71 consumers=new PerfConsumer[NUMBER_OF_CONSUMERS]; 72 for(int i=0;i<NUMBER_OF_CONSUMERS;i++){ 73 consumers[i]=createConsumer(factory,destination,i); 74 consumers[i].setSleepDuration(CONSUMER_SLEEP_DURATION); 75 } 76 for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ 77 array=new byte[PAYLOAD_SIZE]; 78 for(int j=i;j<array.length;j++){ 79 array[j]=(byte)j; 80 } 81 producers[i]=createProducer(factory,destination,i,array); 82 } 83 super.setUp(); 84 } 85 86 protected void tearDown() throws Exception { 87 super.tearDown(); 88 for(int i=0;i<NUMBER_OF_CONSUMERS;i++){ 89 consumers[i].shutDown(); 90 } 91 for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ 92 producers[i].shutDown(); 93 } 94 if(broker!=null){ 95 broker.stop(); 96 broker=null; 97 } 98 } 99 100 protected Destination createDestination(Session s,String destinationName) throws JMSException { 101 return s.createTopic(destinationName); 102 } 103 104 109 protected BrokerService createBroker() throws Exception { 110 BrokerService answer=new BrokerService(); 111 configureBroker(answer); 112 answer.start(); 113 return answer; 114 } 115 116 protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,byte[] payload) 117 throws JMSException { 118 return new PerfProducer(fac,dest,payload); 119 } 120 121 protected PerfConsumer createConsumer(ConnectionFactory fac,Destination dest,int number) throws JMSException { 122 return new PerfConsumer(fac,dest); 123 } 124 125 protected void configureBroker(BrokerService answer) throws Exception { 126 answer.addConnector(bindAddress); 127 answer.setDeleteAllMessagesOnStartup(true); 128 } 129 130 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 131 return new ActiveMQConnectionFactory(bindAddress); 132 } 133 134 public void testPerformance() throws JMSException ,InterruptedException { 135 for(int i=0;i<NUMBER_OF_CONSUMERS;i++){ 136 consumers[i].start(); 137 } 138 for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ 139 producers[i].start(); 140 } 141 log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval."); 142 for(int i=0;i<SAMPLE_COUNT;i++){ 143 Thread.sleep(SAMPLE_INTERVAL); 144 dumpProducerRate(); 145 dumpConsumerRate(); 146 } 147 for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ 148 producers[i].stop(); 149 } 150 for(int i=0;i<NUMBER_OF_CONSUMERS;i++){ 151 consumers[i].stop(); 152 } 153 } 154 155 protected void dumpProducerRate(){ 156 int totalRate=0; 157 int totalCount=0; 158 for(int i=0;i<producers.length;i++){ 159 PerfRate rate=producers[i].getRate().cloneAndReset(); 160 totalRate+=rate.getRate(); 161 totalCount+=rate.getTotalCount(); 162 } 163 int avgRate=totalRate/producers.length; 164 log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent = "+totalCount); 165 } 166 167 protected void dumpConsumerRate(){ 168 int totalRate=0; 169 int totalCount=0; 170 for(int i=0;i<consumers.length;i++){ 171 PerfRate rate=consumers[i].getRate().cloneAndReset(); 172 totalRate+=rate.getRate(); 173 totalCount+=rate.getTotalCount(); 174 } 175 if(consumers!=null&&consumers.length>0){ 176 int avgRate=totalRate/consumers.length; 177 log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received = "+totalCount); 178 } 179 } 180 } 181 | Popular Tags |