1 18 package org.apache.activemq.broker.store; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.ConnectionFactory ; 26 import javax.jms.DeliveryMode ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 31 import junit.framework.Test; 32 33 import org.apache.activemq.ActiveMQConnectionFactory; 34 import org.apache.activemq.JmsTestSupport; 35 import org.apache.activemq.broker.BrokerFactory; 36 import org.apache.activemq.broker.BrokerService; 37 import org.apache.activemq.broker.ProgressPrinter; 38 import org.apache.activemq.broker.TransportConnector; 39 import org.apache.activemq.command.ActiveMQDestination; 40 import org.apache.activemq.command.ActiveMQQueue; 41 42 46 public class LoadTester extends JmsTestSupport { 47 48 protected int MESSAGE_SIZE=1024*64; 49 protected int PRODUCE_COUNT=10000; 50 51 protected BrokerService createBroker() throws Exception { 52 return BrokerFactory.createBroker(new URI ("xbean:org/apache/activemq/broker/store/loadtester.xml")); 53 } 54 55 protected ConnectionFactory createConnectionFactory() throws URISyntaxException , IOException { 56 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI()); 57 factory.setUseAsyncSend(true); 58 return factory; 59 } 60 61 public void testQueueSendThenAddConsumer() throws Exception { 62 ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT, 20); 63 64 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 65 66 connection.setUseCompression(false); 67 connection.getPrefetchPolicy().setAll(10); 68 connection.start(); 69 Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 70 MessageProducer producer = session.createProducer(destination); 71 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 72 73 log.info("Sending "+ PRODUCE_COUNT+" messages that are "+(MESSAGE_SIZE/1024.0)+"k large, for a total of "+(PRODUCE_COUNT*MESSAGE_SIZE/(1024.0*1024.0))+" megs of data."); 74 long start = System.currentTimeMillis(); 76 for( int i=0; i < PRODUCE_COUNT; i++) { 77 printer.increment(); 78 BytesMessage msg = session.createBytesMessage(); 79 msg.writeBytes(new byte[MESSAGE_SIZE]); 80 producer.send(msg); 81 } 82 long end1 = System.currentTimeMillis(); 83 84 log.info("Produced messages/sec: "+ (PRODUCE_COUNT*1000.0/(end1-start))); 85 86 printer = new ProgressPrinter(PRODUCE_COUNT, 10); 87 start = System.currentTimeMillis(); 88 MessageConsumer consumer = session.createConsumer(destination); 89 for( int i=0; i < PRODUCE_COUNT; i++) { 90 printer.increment(); 91 assertNotNull("Getting message: "+i,consumer.receive(20000)); 92 } 93 end1 = System.currentTimeMillis(); 94 log.info("Consumed messages/sec: "+ (PRODUCE_COUNT*1000.0/(end1-start))); 95 96 97 } 98 99 public static Test suite() { 100 return suite(LoadTester.class); 101 } 102 103 public static void main(String [] args) { 104 junit.textui.TestRunner.run(suite()); 105 } 106 107 } 108 | Popular Tags |