1 18 package org.apache.activemq.broker; 19 20 21 import junit.framework.Test; 22 23 import org.apache.activemq.command.ActiveMQDestination; 24 import org.apache.activemq.command.ActiveMQQueue; 25 import org.apache.activemq.command.ActiveMQTopic; 26 import org.apache.activemq.command.ConnectionInfo; 27 import org.apache.activemq.command.ConsumerInfo; 28 import org.apache.activemq.command.Message; 29 import org.apache.activemq.command.MessageAck; 30 import org.apache.activemq.command.ProducerInfo; 31 import org.apache.activemq.command.SessionInfo; 32 33 import java.util.concurrent.Semaphore ; 34 import java.util.concurrent.atomic.AtomicInteger ; 35 36 52 public class BrokerBenchmark extends BrokerTestSupport { 53 54 public int PRODUCE_COUNT=Integer.parseInt(System.getProperty("PRODUCE_COUNT","10000")); 55 public ActiveMQDestination destination; 56 public int PRODUCER_COUNT; 57 public int CONSUMER_COUNT; 58 public boolean deliveryMode; 59 60 public void initCombosForTestPerformance() { 61 addCombinationValues("destination", new Object []{ 62 new ActiveMQQueue("TEST"), 63 new ActiveMQTopic("TEST") 64 }); 65 addCombinationValues("PRODUCER_COUNT", new Object []{ 66 new Integer ("1"), 67 new Integer ("10")}); 68 addCombinationValues("CONSUMER_COUNT", new Object []{ 69 new Integer ("1"), 70 new Integer ("10")}); 71 addCombinationValues("CONSUMER_COUNT", new Object []{ 72 new Integer ("1"), 73 new Integer ("10")}); 74 addCombinationValues( "deliveryMode", new Object []{ 75 Boolean.TRUE 77 } ); 78 } 79 80 public void testPerformance() throws Exception { 81 82 System.out.println("Running Benchmark for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode); 83 final int CONSUME_COUNT = destination.isTopic() ? CONSUMER_COUNT*PRODUCE_COUNT : PRODUCE_COUNT; 84 85 final Semaphore consumersStarted = new Semaphore (1-(CONSUMER_COUNT)); 86 final Semaphore producersFinished = new Semaphore (1-(PRODUCER_COUNT)); 87 final Semaphore consumersFinished = new Semaphore (1-(CONSUMER_COUNT)); 88 final ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT+CONSUME_COUNT, 10); 89 90 92 profilerPause("Benchmark ready. Start profiler "); 93 94 long start = System.currentTimeMillis(); 95 96 97 final AtomicInteger receiveCounter = new AtomicInteger (0); 98 for( int i=0; i < CONSUMER_COUNT; i++) { 99 new Thread () { 100 public void run() { 101 try { 102 103 StubConnection connection = new StubConnection(broker); 105 ConnectionInfo connectionInfo = createConnectionInfo(); 106 connection.send(connectionInfo); 107 108 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 109 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 110 consumerInfo.setPrefetchSize(1000); 111 connection.send(sessionInfo); 112 connection.send(consumerInfo); 113 114 consumersStarted.release(); 115 116 while( receiveCounter.get() < CONSUME_COUNT ) { 117 118 int counter=0; 119 Message msg = receiveMessage(connection, 2000); 121 if( msg!=null ) { 122 printer.increment(); 123 receiveCounter.incrementAndGet(); 124 125 counter++; 126 127 Message extra=null; 129 while( (extra = receiveMessage(connection,0))!=null ) { 130 msg=extra; 131 printer.increment(); 132 receiveCounter.incrementAndGet(); 133 counter++; 134 } 135 } 136 137 138 if(msg!=null) { 139 connection.send(createAck(consumerInfo, msg, counter, MessageAck.STANDARD_ACK_TYPE)); 140 } else if ( receiveCounter.get() < CONSUME_COUNT ) { 141 System.out.println("Consumer stall, waiting for message #"+receiveCounter.get()+1); 142 } 143 } 144 145 connection.send(closeConsumerInfo(consumerInfo)); 146 } catch (Throwable e) { 147 e.printStackTrace(); 148 } finally { 149 consumersFinished.release(); 150 } 151 } 152 153 }.start(); 154 } 155 156 consumersStarted.acquire(); 159 160 for( int i=0; i < PRODUCER_COUNT; i++) { 162 new Thread () { 163 public void run() { 164 try { 165 StubConnection connection = new StubConnection(broker); 166 ConnectionInfo connectionInfo = createConnectionInfo(); 167 connection.send(connectionInfo); 168 169 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 170 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 171 connection.send(sessionInfo); 172 connection.send(producerInfo); 173 174 for(int i=0; i < PRODUCE_COUNT/PRODUCER_COUNT; i++) { 175 Message message = createMessage(producerInfo, destination); 176 message.setPersistent(deliveryMode); 177 message.setResponseRequired(false); 178 connection.send(message); 179 printer.increment(); 180 } 181 } catch (Throwable e) { 182 e.printStackTrace(); 183 } finally { 184 producersFinished.release(); 185 } 186 }; 187 }.start(); 188 } 189 190 producersFinished.acquire(); 191 long end1 = System.currentTimeMillis(); 192 consumersFinished.acquire(); 193 long end2 = System.currentTimeMillis(); 194 195 System.out.println("Results for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode); 196 System.out.println("Produced at messages/sec: "+ (PRODUCE_COUNT*1000.0/(end1-start))); 197 System.out.println("Consumed at messages/sec: "+ (CONSUME_COUNT*1000.0/(end2-start))); 198 profilerPause("Benchmark done. Stop profiler "); 199 } 200 201 public static Test suite() { 202 return suite(BrokerBenchmark.class); 203 } 204 205 public static void main(String [] args) { 206 junit.textui.TestRunner.run(suite()); 207 } 208 209 } 210 | Popular Tags |