1 18 package org.apache.activemq; 19 20 import org.apache.activemq.command.ActiveMQDestination; 21 import org.apache.activemq.command.ActiveMQQueue; 22 import org.apache.activemq.command.ActiveMQTopic; 23 import org.apache.activemq.util.MessageIdList; 24 import org.apache.activemq.broker.BrokerService; 25 import org.apache.activemq.broker.BrokerFactory; 26 27 import java.util.List ; 28 import java.util.ArrayList ; 29 import java.util.Map ; 30 import java.util.HashMap ; 31 import java.util.Iterator ; 32 import java.util.Arrays ; 33 import java.util.Collections ; 34 import java.net.URI ; 35 36 import java.util.concurrent.atomic.AtomicInteger ; 37 38 import javax.jms.*; 39 40 45 public class JmsMultipleClientsTestSupport extends CombinationTestSupport { 46 private AtomicInteger producerLock; 47 48 protected Map consumers = new HashMap (); protected int consumerCount = 1; 50 protected int producerCount = 1; 51 52 protected int messageSize = 1024; 53 54 protected boolean useConcurrentSend = true; 55 protected boolean durable = false; 56 protected boolean topic = false; 57 protected boolean persistent = false; 58 59 protected BrokerService broker; 60 protected Destination destination; 61 protected List connections = Collections.synchronizedList(new ArrayList ()); 62 protected MessageIdList allMessagesList = new MessageIdList(); 63 64 protected void startProducers(Destination dest, int msgCount) throws Exception { 65 startProducers(createConnectionFactory(), dest, msgCount); 66 } 67 68 protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception { 69 if (useConcurrentSend) { 71 producerLock = new AtomicInteger (producerCount); 72 73 for (int i=0; i<producerCount; i++) { 74 Thread t = new Thread (new Runnable () { 75 public void run() { 76 try { 77 sendMessages(factory.createConnection(), dest, msgCount); 78 } catch (Exception e) { 79 e.printStackTrace(); 80 } 81 82 synchronized (producerLock) { 83 producerLock.decrementAndGet(); 84 producerLock.notifyAll(); 85 } 86 } 87 }); 88 89 t.start(); 90 } 91 92 synchronized (producerLock) { 94 while (producerLock.get() != 0) { 95 producerLock.wait(2000); 96 } 97 } 98 99 100 } else { 102 for (int i=0; i<producerCount; i++) { 103 sendMessages(factory.createConnection(), dest, msgCount); 104 } 105 } 106 } 107 108 protected void sendMessages(Connection connection, Destination destination, int count) throws Exception { 109 connection.start(); 110 111 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 112 MessageProducer producer = session.createProducer(destination); 113 producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 114 115 for (int i = 0; i < count; i++) { 116 TextMessage msg = createTextMessage(session, "" + i); 117 producer.send(msg); 118 } 119 120 producer.close(); 121 session.close(); 122 connection.close(); 123 } 124 125 protected TextMessage createTextMessage(Session session, String initText) throws Exception { 126 TextMessage msg = session.createTextMessage(); 127 128 if (initText.length() < messageSize) { 130 char[] data = new char[messageSize - initText.length()]; 131 Arrays.fill(data, '*'); 132 String str = new String (data); 133 msg.setText(initText + str); 134 135 } else { 137 msg.setText(initText); 138 } 139 140 return msg; 141 } 142 143 protected void startConsumers(Destination dest) throws Exception { 144 startConsumers(createConnectionFactory(), dest); 145 } 146 147 protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception { 148 MessageConsumer consumer; 149 for (int i=0; i<consumerCount; i++) { 150 if (durable && topic) { 151 consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i+1)); 152 } else { 153 consumer = createMessageConsumer(factory.createConnection(), dest); 154 } 155 MessageIdList list = new MessageIdList(); 156 list.setParent(allMessagesList); 157 consumer.setMessageListener(list); 158 consumers.put(consumer, list); 159 } 160 } 161 162 protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception { 163 connections.add(conn); 164 165 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 166 final MessageConsumer consumer = sess.createConsumer(dest); 167 conn.start(); 168 169 return consumer; 170 } 171 172 protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception { 173 conn.setClientID(name); 174 connections.add(conn); 175 conn.start(); 176 177 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 178 final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic )dest, name); 179 180 return consumer; 181 } 182 183 protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception { 184 allMessagesList.waitForMessagesToArrive(messageCount); 185 } 186 187 protected ActiveMQDestination createDestination() throws JMSException { 188 String name = "." + getClass().getName() + "." + getName(); 189 if (topic) { 190 destination = new ActiveMQTopic("Topic" + name); 191 return (ActiveMQDestination)destination; 192 } else { 193 destination = new ActiveMQQueue("Queue" + name); 194 return (ActiveMQDestination)destination; 195 } 196 } 197 198 protected ConnectionFactory createConnectionFactory() throws Exception { 199 return new ActiveMQConnectionFactory("vm://localhost"); 200 } 201 202 protected BrokerService createBroker() throws Exception { 203 return BrokerFactory.createBroker(new URI ("broker://()/localhost?persistent=false&useJmx=true")); 204 } 205 206 protected void setUp() throws Exception { 207 super.setAutoFail(true); 208 super.setUp(); 209 broker = createBroker(); 210 broker.start(); 211 } 212 213 protected void tearDown() throws Exception { 214 for (Iterator iter = connections.iterator(); iter.hasNext();) { 215 Connection conn= (Connection) iter.next(); 216 try { 217 conn.close(); 218 } catch (Throwable e) { 219 } 220 } 221 broker.stop(); 222 allMessagesList.flushMessages(); 223 consumers.clear(); 224 super.tearDown(); 225 } 226 227 230 protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) { 231 MessageIdList messageIdList = (MessageIdList)consumers.get(consumer); 232 messageIdList.assertAtLeastMessagesReceived(msgCount); 233 } 234 235 protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) { 236 MessageIdList messageIdList = (MessageIdList)consumers.get(consumer); 237 messageIdList.assertAtMostMessagesReceived(msgCount); 238 } 239 240 protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) { 241 MessageIdList messageIdList = (MessageIdList)consumers.get(consumer); 242 messageIdList.assertMessagesReceivedNoWait(msgCount); 243 } 244 245 protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) { 246 for (Iterator i=consumers.keySet().iterator();i.hasNext();) { 247 assertConsumerReceivedAtLeastXMessages((MessageConsumer)i.next(), msgCount); 248 } 249 } 250 251 protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) { 252 for (Iterator i=consumers.keySet().iterator();i.hasNext();) { 253 assertConsumerReceivedAtMostXMessages((MessageConsumer)i.next(), msgCount); 254 } 255 } 256 257 protected void assertEachConsumerReceivedXMessages(int msgCount) { 258 for (Iterator i=consumers.keySet().iterator();i.hasNext();) { 259 assertConsumerReceivedXMessages((MessageConsumer)i.next(), msgCount); 260 } 261 } 262 263 protected void assertTotalMessagesReceived(int msgCount) { 264 allMessagesList.assertMessagesReceivedNoWait(msgCount); 265 266 int totalMsg = 0; 268 for (Iterator i=consumers.keySet().iterator(); i.hasNext();) { 269 MessageIdList messageIdList = (MessageIdList)consumers.get(i.next()); 270 totalMsg += messageIdList.getMessageCount(); 271 } 272 assertEquals("Total of consumers message count", msgCount, totalMsg); 273 } 274 } 275 | Popular Tags |