1 18 package org.apache.activemq.tool; 19 20 import org.apache.activemq.util.MessageIdList; 21 22 import javax.jms.Connection ; 23 import javax.jms.ConnectionFactory ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.Session ; 30 import javax.jms.Topic ; 31 34 public class Consumer extends MessageIdList implements MessageListener { 35 protected Connection connection; 36 protected MessageConsumer consumer; 37 protected long counter = 0; 38 protected boolean isParent = false; 39 protected boolean inOrder = true; 40 41 42 public Consumer() { 43 super(); 44 } 45 public Consumer(ConnectionFactory fac,Destination dest,String consumerName) throws JMSException { 46 connection=fac.createConnection(); 47 Session s=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 48 if(dest instanceof Topic &&consumerName!=null&&consumerName.length()>0){ 49 consumer=s.createDurableSubscriber((Topic ) dest,consumerName); 50 }else{ 51 consumer=s.createConsumer(dest); 52 } 53 consumer.setMessageListener(this); 54 } 55 public Consumer(ConnectionFactory fac,Destination dest) throws JMSException { 56 this(fac,dest,null); 57 } 58 public void start() throws JMSException { 59 connection.start(); 60 } 61 public void stop() throws JMSException { 62 connection.stop(); 63 } 64 public void shutDown() throws JMSException { 65 connection.close(); 66 } 67 68 69 public Message receive() throws JMSException { 70 return consumer.receive(); 71 } 72 73 public Message receive(long wait) throws JMSException { 74 return consumer.receive(wait); 75 } 76 77 public void onMessage(Message msg){ 78 super.onMessage(msg); 79 if(isParent) { 80 try { 81 long ctr = msg.getLongProperty("counter"); 82 if (counter != ctr){ 83 inOrder = false; 84 } 85 counter ++; 86 }catch(Exception e) { 87 e.printStackTrace(); 88 } 89 } 90 } 91 92 93 public boolean isInOrder() { 94 return inOrder; 95 } 96 97 98 public void setAsParent(boolean isParent) { 99 this.isParent = isParent; 100 } 101 102 public boolean isParent() { 103 return this.isParent; 104 } 105 106 107 113 public void assertMessagesReceivedAreInOrder(int messageCount) { 114 assertEquals("expected number of messages when received", messageCount, getMessageCount()); 115 } 116 117 } 118 | Popular Tags |