1 18 package org.apache.activemq.perf; 19 20 import javax.jms.Connection ; 21 import javax.jms.ConnectionFactory ; 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageListener ; 27 import javax.jms.Session ; 28 import javax.jms.Topic ; 29 32 public class PerfConsumer implements MessageListener { 33 protected Connection connection; 34 protected MessageConsumer consumer; 35 protected long sleepDuration; 36 37 protected PerfRate rate=new PerfRate(); 38 public PerfConsumer(ConnectionFactory fac,Destination dest,String consumerName) throws JMSException { 39 connection=fac.createConnection(); 40 connection.setClientID(consumerName); 41 Session s=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 42 if(dest instanceof Topic &&consumerName!=null&&consumerName.length()>0){ 43 consumer=s.createDurableSubscriber((Topic ) dest,consumerName); 44 }else{ 45 consumer=s.createConsumer(dest); 46 } 47 consumer.setMessageListener(this); 48 } 49 public PerfConsumer(ConnectionFactory fac,Destination dest) throws JMSException { 50 this(fac,dest,null); 51 } 52 public void start() throws JMSException { 53 connection.start(); 54 rate.reset(); 55 } 56 public void stop() throws JMSException { 57 connection.stop(); 58 } 59 public void shutDown() throws JMSException { 60 connection.close(); 61 } 62 public PerfRate getRate(){ 63 return rate; 64 } 65 public void onMessage(Message msg){ 66 rate.increment(); 67 try { 68 if( sleepDuration!=0 ) { 69 Thread.sleep(sleepDuration); 70 } 71 } catch (InterruptedException e) { 72 } 73 } 74 75 public synchronized long getSleepDuration() { 76 return sleepDuration; 77 } 78 public synchronized void setSleepDuration(long sleepDuration) { 79 this.sleepDuration = sleepDuration; 80 } 81 } 82 | Popular Tags |