1 18 package org.apache.activemq; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageConsumer ; 31 import javax.jms.MessageProducer ; 32 import javax.jms.Session ; 33 import javax.jms.Topic ; 34 35 import junit.framework.Test; 36 37 import org.apache.activemq.ActiveMQConnectionFactory; 38 import org.apache.activemq.broker.BrokerFactory; 39 import org.apache.activemq.broker.BrokerService; 40 import org.apache.activemq.broker.TransportConnector; 41 import org.apache.activemq.command.ActiveMQDestination; 42 43 import java.util.concurrent.CountDownLatch ; 44 import java.util.concurrent.TimeUnit ; 45 46 53 public class LoadTestBurnIn extends JmsTestSupport { 54 55 public static Test suite() { 56 return suite(LoadTestBurnIn.class); 57 } 58 59 protected void setUp() throws Exception { 60 System.out.println("Start: "+getName()); 61 super.setUp(); 62 } 63 64 protected void tearDown() throws Exception { 65 try { 66 super.tearDown(); 67 } catch (Throwable e) { 68 e.printStackTrace(System.out); 69 } finally { 70 System.out.println("End: "+getName()); 71 } 72 } 73 74 public static void main(String [] args) { 75 junit.textui.TestRunner.run(suite()); 76 } 77 78 protected BrokerService createBroker() throws Exception { 79 return BrokerFactory.createBroker(new URI ("broker://(tcp://localhost:0)?useJmx=true")); 80 } 82 83 protected ConnectionFactory createConnectionFactory() throws URISyntaxException , IOException { 84 return new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI()); 85 } 86 87 public ActiveMQDestination destination; 88 public int deliveryMode; 89 public byte destinationType; 90 public boolean durableConsumer; 91 92 public int messageCount = 50000; 93 public int messageSize = 1024; 94 95 public void initCombosForTestSendReceive() { 96 addCombinationValues("deliveryMode", new Object [] { 97 new Integer (DeliveryMode.NON_PERSISTENT), 98 new Integer (DeliveryMode.PERSISTENT) }); 99 addCombinationValues("destinationType", new Object [] { 100 new Byte (ActiveMQDestination.TOPIC_TYPE), 101 }); 103 addCombinationValues("durableConsumer", new Object [] { 104 Boolean.TRUE, 105 }); 107 addCombinationValues("messageSize", new Object [] { 108 new Integer (101), 109 new Integer (102), 110 new Integer (103), 111 new Integer (104), 112 new Integer (105), 113 new Integer (106), 114 new Integer (107), 115 new Integer (108), 116 }); 117 } 118 119 public void testSendReceive() throws Exception { 120 121 if( durableConsumer && destinationType!=ActiveMQDestination.TOPIC_TYPE) 123 return; 124 125 connection.setClientID(getName()); 126 connection.getPrefetchPolicy().setAll(1000); 127 connection.start(); 128 129 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 130 destination = createDestination(session, destinationType); 131 MessageConsumer consumer; 132 if( durableConsumer ) { 133 consumer = session.createDurableSubscriber((Topic ) destination, "sub1:"+System.currentTimeMillis()); 134 } else { 135 consumer = session.createConsumer(destination); 136 } 137 profilerPause("Ready: "); 138 139 final CountDownLatch producerDoneLatch = new CountDownLatch (1); 140 141 new Thread () { 143 public void run() { 144 Connection connection2=null; 145 try { 146 connection2 = factory.createConnection(); 147 Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); 148 MessageProducer producer = session.createProducer(destination); 149 producer.setDeliveryMode(deliveryMode); 150 for (int i = 0; i < messageCount; i++) { 151 BytesMessage m = session.createBytesMessage(); 152 m.writeBytes(new byte[messageSize]); 153 producer.send(m); 154 } 155 producer.close(); 156 } catch (JMSException e) { 157 e.printStackTrace(); 158 } finally { 159 safeClose(connection2); 160 producerDoneLatch.countDown(); 161 } 162 163 } 164 }.start(); 165 166 Message message = null; 168 for (int i = 0; i < messageCount; i++) { 169 message = consumer.receive(5000); 170 assertNotNull("Did not get message: "+i, message); 171 } 172 173 profilerPause("Done: "); 174 175 assertNull(consumer.receiveNoWait()); 176 message.acknowledge(); 177 178 assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS)); 180 } 181 182 } 183 | Popular Tags |