1 18 package org.apache.activemq; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 24 import javax.jms.BytesMessage ; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageConsumer ; 31 import javax.jms.MessageListener ; 32 import javax.jms.MessageProducer ; 33 import javax.jms.Session ; 34 35 import junit.framework.Test; 36 37 import org.apache.activemq.ActiveMQConnectionFactory; 38 import org.apache.activemq.broker.BrokerFactory; 39 import org.apache.activemq.broker.BrokerService; 40 import org.apache.activemq.broker.TransportConnector; 41 import org.apache.activemq.command.ActiveMQDestination; 42 import org.apache.activemq.command.ActiveMQQueue; 43 44 import java.util.concurrent.Callable ; 45 import java.util.concurrent.CountDownLatch ; 46 import java.util.concurrent.Semaphore ; 47 import java.util.concurrent.TimeUnit ; 48 import java.util.concurrent.atomic.AtomicInteger ; 49 50 59 public class JmsBenchmark extends JmsTestSupport { 60 61 private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5)); 62 private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10")); 63 private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000*60)); 64 private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10")); 65 private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10")); 66 67 public ActiveMQDestination destination; 68 69 public static Test suite() { 70 return suite(JmsBenchmark.class); 71 } 72 73 public static void main(String [] args) { 74 junit.textui.TestRunner.run(JmsBenchmark.class); 75 } 76 77 public void initCombos() { 78 addCombinationValues("destination", new Object [] { 79 new ActiveMQQueue("TEST"), 81 }); 82 } 83 84 protected BrokerService createBroker() throws Exception { 85 return BrokerFactory.createBroker(new URI ("broker://(tcp://localhost:0)?persistent=false")); 86 } 87 88 protected ConnectionFactory createConnectionFactory() throws URISyntaxException , IOException { 89 return new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI()); 90 } 91 92 95 public void testConcurrentSendReceive() throws Throwable { 96 97 final Semaphore connectionsEstablished = new Semaphore (1 - (CONSUMER_COUNT + PRODUCER_COUNT)); 98 final Semaphore workerDone = new Semaphore (1 - (CONSUMER_COUNT + PRODUCER_COUNT)); 99 final CountDownLatch sampleTimeDone = new CountDownLatch (1); 100 101 final AtomicInteger producedMessages = new AtomicInteger (0); 102 final AtomicInteger receivedMessages = new AtomicInteger (0); 103 104 final Callable producer = new Callable () { 105 public Object call() throws JMSException , InterruptedException { 106 Connection connection = factory.createConnection(); 107 connections.add(connection); 108 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 109 MessageProducer producer = session.createProducer(destination); 110 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 111 BytesMessage message = session.createBytesMessage(); 112 message.writeBytes(new byte[1024]); 113 connection.start(); 114 connectionsEstablished.release(); 115 116 while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) { 117 producer.send(message); 118 producedMessages.incrementAndGet(); 119 } 120 121 connection.close(); 122 workerDone.release(); 123 return null; 124 } 125 }; 126 127 final Callable consumer = new Callable () { 128 public Object call() throws JMSException , InterruptedException { 129 Connection connection = factory.createConnection(); 130 connections.add(connection); 131 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 132 MessageConsumer consumer = session.createConsumer(destination); 133 134 consumer.setMessageListener(new MessageListener () { 135 public void onMessage(Message msg) { 136 receivedMessages.incrementAndGet(); 137 } 138 }); 139 connection.start(); 140 141 connectionsEstablished.release(); 142 sampleTimeDone.await(); 143 144 connection.close(); 145 workerDone.release(); 146 return null; 147 } 148 }; 149 150 final Throwable workerError[] = new Throwable [1]; 151 for (int i = 0; i < PRODUCER_COUNT; i++) { 152 new Thread ("Producer:" + i) { 153 public void run() { 154 try { 155 producer.call(); 156 } catch (Throwable e) { 157 e.printStackTrace(); 158 workerError[0] = e; 159 } 160 } 161 }.start(); 162 } 163 164 for (int i = 0; i < CONSUMER_COUNT; i++) { 165 new Thread ("Consumer:" + i) { 166 public void run() { 167 try { 168 consumer.call(); 169 } catch (Throwable e) { 170 e.printStackTrace(); 171 workerError[0] = e; 172 } 173 } 174 }.start(); 175 } 176 177 System.out.println(getName() + ": Waiting for Producers and Consumers to startup."); 178 connectionsEstablished.acquire(); 179 System.out.println("Producers and Consumers are now running. Waiting for system to reach steady state: " 180 + (SAMPLE_DELAY / 1000.0f) + " seconds"); 181 Thread.sleep(1000 * 10); 182 183 System.out.println("Starting sample: "+SAMPLES+" each lasting "+ (SAMPLE_DURATION / 1000.0f) + " seconds"); 184 185 186 long now = System.currentTimeMillis(); 187 for( int i=0; i < SAMPLES; i ++) { 188 189 long start = System.currentTimeMillis(); 190 producedMessages.set(0); 191 receivedMessages.set(0); 192 193 Thread.sleep(SAMPLE_DURATION); 194 195 long end = System.currentTimeMillis(); 196 int r = receivedMessages.get(); 197 int p = producedMessages.get(); 198 199 System.out.println("published: " + p + " msgs at "+ (p * 1000f / (end - start)) + " msgs/sec, "+ 200 "consumed: " + r + " msgs at "+ (r * 1000f / (end - start)) + " msgs/sec"); 201 } 202 203 System.out.println("Sample done."); 204 sampleTimeDone.countDown(); 205 206 workerDone.acquire(); 207 if (workerError[0] != null) { 208 throw workerError[0]; 209 } 210 211 } 212 213 } 214 | Popular Tags |