1 18 package org.apache.activemq.tool; 19 20 import org.apache.activemq.tool.MemMessageIdList; 21 22 import javax.jms.*; 23 24 27 public class MemConsumer extends MemMessageIdList implements MessageListener { 28 protected Connection connection; 29 protected MessageConsumer consumer; 30 protected long counter = 0; 31 protected boolean isParent = false; 32 protected boolean inOrder = true; 33 34 35 public MemConsumer() { 36 super(); 37 } 38 39 public MemConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException { 40 connection = fac.createConnection(); 41 Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 42 if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) { 43 consumer = s.createDurableSubscriber((Topic) dest, consumerName); 44 } else { 45 consumer = s.createConsumer(dest); 46 } 47 consumer.setMessageListener(this); 48 } 49 50 public MemConsumer(ConnectionFactory fac, Destination dest) throws JMSException { 51 this(fac, dest, null); 52 } 53 54 public void start() throws JMSException { 55 connection.start(); 56 } 57 58 public void stop() throws JMSException { 59 connection.stop(); 60 } 61 62 public void shutDown() throws JMSException { 63 connection.close(); 64 } 65 66 67 public Message receive() throws JMSException { 68 return consumer.receive(); 69 } 70 71 public Message receive(long wait) throws JMSException { 72 return consumer.receive(wait); 73 } 74 75 static long ctr = 0; 76 77 public void onMessage(Message msg) { 78 super.onMessage(msg); 79 80 if (isParent) { 81 try { 82 long ctr = msg.getLongProperty("counter"); 83 if (counter != ctr) { 84 inOrder = false; 85 } 86 counter++; 87 88 } catch (Exception e) { 89 e.printStackTrace(); 90 } 91 } 92 } 93 94 95 public boolean isInOrder() { 96 return inOrder; 97 } 98 99 100 public void setAsParent(boolean isParent) { 101 this.isParent = isParent; 102 } 103 104 public boolean isParent() { 105 return this.isParent; 106 } 107 108 109 } 110 | Popular Tags |