1 18 package org.apache.activemq.tool; 19 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.commons.logging.Log; 22 import org.apache.activemq.tool.properties.JmsConsumerProperties; 23 import org.apache.activemq.tool.properties.JmsClientProperties; 24 25 import javax.jms.MessageConsumer ; 26 import javax.jms.JMSException ; 27 import javax.jms.ConnectionFactory ; 28 import javax.jms.Connection ; 29 import javax.jms.Destination ; 30 import javax.jms.Message ; 31 import javax.jms.MessageListener ; 32 import javax.jms.Topic ; 33 34 import java.util.concurrent.atomic.AtomicInteger ; 35 36 public class JmsConsumerClient extends AbstractJmsMeasurableClient { 37 private static final Log log = LogFactory.getLog(JmsConsumerClient.class); 38 39 protected MessageConsumer jmsConsumer; 40 protected JmsConsumerProperties client; 41 42 public JmsConsumerClient(ConnectionFactory factory) { 43 this(new JmsConsumerProperties(), factory); 44 } 45 46 public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) { 47 super(factory); 48 client = clientProps; 49 } 50 51 public void receiveMessages() throws JMSException { 52 if (client.isAsyncRecv()) { 53 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) { 54 receiveAsyncTimeBasedMessages(client.getRecvDuration()); 55 } else { 56 receiveAsyncCountBasedMessages(client.getRecvCount()); 57 } 58 } else { 59 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) { 60 receiveSyncTimeBasedMessages(client.getRecvDuration()); 61 } else { 62 receiveSyncCountBasedMessages(client.getRecvCount()); 63 } 64 } 65 } 66 67 public void receiveMessages(int destCount) throws JMSException { 68 this.destCount = destCount; 69 receiveMessages(); 70 } 71 72 public void receiveMessages(int destIndex, int destCount) throws JMSException { 73 this.destIndex = destIndex; 74 receiveMessages(destCount); 75 } 76 77 public void receiveSyncTimeBasedMessages(long duration) throws JMSException { 78 if (getJmsConsumer() == null) { 79 createJmsConsumer(); 80 } 81 82 try { 83 getConnection().start(); 84 85 log.info("Starting to synchronously receive messages for " + duration + " ms..."); 86 long endTime = System.currentTimeMillis() + duration; 87 while (System.currentTimeMillis() < endTime) { 88 getJmsConsumer().receive(); 89 incThroughput(); 90 } 91 } finally { 92 if (client.isDurable() && client.isUnsubscribe()) { 93 log.info("Unsubscribing durable subscriber: " + getClientName()); 94 getJmsConsumer().close(); 95 getSession().unsubscribe(getClientName()); 96 } 97 getConnection().close(); 98 } 99 } 100 101 public void receiveSyncCountBasedMessages(long count) throws JMSException { 102 if (getJmsConsumer() == null) { 103 createJmsConsumer(); 104 } 105 106 try { 107 getConnection().start(); 108 log.info("Starting to synchronously receive " + count + " messages..."); 109 110 int recvCount = 0; 111 while (recvCount < count) { 112 getJmsConsumer().receive(); 113 incThroughput(); 114 recvCount++; 115 } 116 } finally { 117 if (client.isDurable() && client.isUnsubscribe()) { 118 log.info("Unsubscribing durable subscriber: " + getClientName()); 119 getJmsConsumer().close(); 120 getSession().unsubscribe(getClientName()); 121 } 122 getConnection().close(); 123 } 124 } 125 126 public void receiveAsyncTimeBasedMessages(long duration) throws JMSException { 127 if (getJmsConsumer() == null) { 128 createJmsConsumer(); 129 } 130 131 getJmsConsumer().setMessageListener(new MessageListener () { 132 public void onMessage(Message msg) { 133 incThroughput(); 134 } 135 }); 136 137 try { 138 getConnection().start(); 139 log.info("Starting to asynchronously receive messages for " + duration + " ms..."); 140 try { 141 Thread.sleep(duration); 142 } catch (InterruptedException e) { 143 throw new JMSException ("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage()); 144 } 145 } finally { 146 if (client.isDurable() && client.isUnsubscribe()) { 147 log.info("Unsubscribing durable subscriber: " + getClientName()); 148 getJmsConsumer().close(); 149 getSession().unsubscribe(getClientName()); 150 } 151 getConnection().close(); 152 } 153 } 154 155 public void receiveAsyncCountBasedMessages(long count) throws JMSException { 156 if (getJmsConsumer() == null) { 157 createJmsConsumer(); 158 } 159 160 final AtomicInteger recvCount = new AtomicInteger (0); 161 getJmsConsumer().setMessageListener(new MessageListener () { 162 public void onMessage(Message msg) { 163 incThroughput(); 164 recvCount.incrementAndGet(); 165 recvCount.notify(); 166 } 167 }); 168 169 try { 170 getConnection().start(); 171 log.info("Starting to asynchronously receive " + client.getRecvCount() + " messages..."); 172 try { 173 while (recvCount.get() < count) { 174 recvCount.wait(); 175 } 176 } catch (InterruptedException e) { 177 throw new JMSException ("JMS consumer thread wait has been interrupted. Message: " + e.getMessage()); 178 } 179 } finally { 180 if (client.isDurable() && client.isUnsubscribe()) { 181 log.info("Unsubscribing durable subscriber: " + getClientName()); 182 getJmsConsumer().close(); 183 getSession().unsubscribe(getClientName()); 184 } 185 getConnection().close(); 186 } 187 } 188 189 public MessageConsumer createJmsConsumer() throws JMSException { 190 Destination [] dest = createDestination(destIndex, destCount); 191 return createJmsConsumer(dest[0]); 192 } 193 194 public MessageConsumer createJmsConsumer(Destination dest) throws JMSException { 195 if (client.isDurable()) { 196 String clientName = getClientName(); 197 if (clientName == null) { 198 clientName = "JmsConsumer"; 199 setClientName(clientName); 200 } 201 log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); 202 jmsConsumer = getSession().createDurableSubscriber((Topic ) dest, clientName); 203 } else { 204 log.info("Creating non-durable consumer to: " + dest.toString()); 205 jmsConsumer = getSession().createConsumer(dest); 206 } 207 return jmsConsumer; 208 } 209 210 public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException { 211 if (client.isDurable()) { 212 String clientName = getClientName(); 213 if (clientName == null) { 214 clientName = "JmsConsumer"; 215 setClientName(clientName); 216 } 217 log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); 218 jmsConsumer = getSession().createDurableSubscriber((Topic ) dest, clientName, selector, noLocal); 219 } else { 220 log.info("Creating non-durable consumer to: " + dest.toString()); 221 jmsConsumer = getSession().createConsumer(dest, selector, noLocal); 222 } 223 return jmsConsumer; 224 } 225 226 public MessageConsumer getJmsConsumer() { 227 return jmsConsumer; 228 } 229 230 public JmsClientProperties getClient() { 231 return client; 232 } 233 234 public void setClient(JmsClientProperties clientProps) { 235 client = (JmsConsumerProperties)clientProps; 236 } 237 } 238 | Popular Tags |