1 18 package org.apache.activemq.perf; 19 20 import java.util.concurrent.CountDownLatch ; 21 22 import javax.jms.BytesMessage ; 23 import javax.jms.Connection ; 24 import javax.jms.ConnectionFactory ; 25 import javax.jms.Destination ; 26 import javax.jms.JMSException ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.Session ; 29 32 public class PerfProducer implements Runnable { 33 protected Connection connection; 34 protected MessageProducer producer; 35 protected PerfRate rate=new PerfRate(); 36 private byte[] payload; 37 private Session session; 38 private final CountDownLatch stopped = new CountDownLatch (1); 39 private boolean running; 40 41 public PerfProducer(ConnectionFactory fac,Destination dest, byte[] palyload) throws JMSException { 42 connection=fac.createConnection(); 43 session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 44 producer=session.createProducer(dest); 45 this.payload = palyload; 46 } 47 48 public void setDeliveryMode(int mode) throws JMSException { 49 producer.setDeliveryMode(mode); 50 } 51 52 public void shutDown() throws JMSException { 53 connection.close(); 54 } 55 56 public PerfRate getRate(){ 57 return rate; 58 } 59 60 synchronized public void start() throws JMSException { 61 if( !running ) { 62 rate.reset(); 63 running = true; 64 connection.start(); 65 new Thread (this).start(); 66 } 67 } 68 public void stop() throws JMSException , InterruptedException { 69 synchronized(this) { 70 running=false; 71 } 72 stopped.await(); 73 connection.stop(); 74 } 75 synchronized public boolean isRunning() { 76 return running; 77 } 78 79 public void run() { 80 try { 81 while(isRunning()){ 82 BytesMessage msg; 83 msg=session.createBytesMessage(); 84 msg.writeBytes(payload); 85 producer.send(msg); 86 rate.increment(); 87 } 88 } catch (Throwable e) { 89 e.printStackTrace(); 90 } finally { 91 stopped.countDown(); 92 } 93 } 94 95 } 96 | Popular Tags |