1 18 package org.apache.activemq.systest.impl; 19 20 import org.apache.activemq.systest.AgentStopper; 21 import org.apache.activemq.systest.ConsumerAgent; 22 import org.apache.activemq.systest.MessageList; 23 24 import javax.jms.JMSException ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.Topic ; 27 28 33 public class ConsumerAgentImpl extends JmsClientSupport implements ConsumerAgent { 34 35 private String selector; 36 private String durableSubscriber; 37 private boolean noLocal; 38 private MessageConsumer consumer; 39 private AgentMessageListener listener; 40 41 public void start() throws Exception { 42 listener = new AgentMessageListener(); 43 getConsumer().setMessageListener(listener); 44 super.start(); 45 } 46 47 public void assertConsumed(MessageList messageList) throws JMSException { 48 int size = messageList.getSize(); 49 listener.waitForMessagesToArrive(size); 50 51 messageList.assertMessagesCorrect(listener.flushMessages()); 53 54 System.out.println("Consumer received all: " + size + " message(s)"); 55 } 56 57 public void waitUntilConsumed(MessageList messageList, int percentOfList) { 58 int size = messageList.getSize(); 59 int limit = (size * percentOfList) / 100; 60 listener.waitForMessagesToArrive(limit); 61 } 62 63 public MessageConsumer getConsumer() throws JMSException { 66 if (consumer == null) { 67 consumer = createConsumer(); 68 } 69 return consumer; 70 } 71 72 public void setConsumer(MessageConsumer consumer) { 73 this.consumer = consumer; 74 } 75 76 public String getDurableSubscriber() { 77 return durableSubscriber; 78 } 79 80 public void setDurableSubscriber(String durableSubscriber) { 81 this.durableSubscriber = durableSubscriber; 82 } 83 84 public boolean isNoLocal() { 85 return noLocal; 86 } 87 88 public void setNoLocal(boolean noLocal) { 89 this.noLocal = noLocal; 90 } 91 92 public String getSelector() { 93 return selector; 94 } 95 96 public void setSelector(String selector) { 97 this.selector = selector; 98 } 99 100 public void stop(AgentStopper stopper) { 101 if (listener != null) { 102 listener.stop(); 103 listener = null; 104 } 105 106 if (consumer != null) { 107 try { 108 consumer.close(); 109 } 110 catch (JMSException e) { 111 stopper.onException(this, e); 112 } 113 finally { 114 consumer = null; 115 } 116 } 117 super.stop(stopper); 118 } 119 120 protected MessageConsumer createConsumer() throws JMSException { 123 if (durableSubscriber != null) { 124 if (selector != null) { 125 return getSession().createDurableSubscriber((Topic ) getDestination(), durableSubscriber, selector, noLocal); 126 } 127 else { 128 return getSession().createDurableSubscriber((Topic ) getDestination(), durableSubscriber); 129 } 130 } 131 else { 132 if (selector != null) { 133 if (noLocal) { 134 return getSession().createConsumer(getDestination(), selector, noLocal); 135 } 136 else { 137 return getSession().createConsumer(getDestination(), selector); 138 } 139 } 140 else { 141 return getSession().createConsumer(getDestination()); 142 } 143 } 144 145 } 146 } 147 | Popular Tags |