1 18 package org.apache.activemq.test; 19 20 import java.util.ArrayList ; 21 import java.util.Arrays ; 22 import java.util.Collections ; 23 import java.util.Date ; 24 import java.util.Iterator ; 25 import java.util.List ; 26 27 import javax.jms.DeliveryMode ; 28 import javax.jms.Destination ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.MessageListener ; 33 import javax.jms.MessageProducer ; 34 import javax.jms.Session ; 35 import javax.jms.TextMessage ; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 40 43 public abstract class JmsSendReceiveTestSupport extends TestSupport implements MessageListener { 44 protected static final Log log = LogFactory.getLog(JmsSendReceiveTestSupport.class); 45 46 protected int messageCount = 100; 47 protected String [] data; 48 protected Session session; 49 protected Session consumeSession; 50 protected MessageConsumer consumer; 51 protected MessageProducer producer; 52 protected Destination consumerDestination; 53 protected Destination producerDestination; 54 protected List messages = createConcurrentList(); 55 protected boolean topic = true; 56 protected boolean durable = false; 57 protected int deliveryMode = DeliveryMode.PERSISTENT; 58 protected final Object lock = new Object (); 59 protected boolean verbose = false; 60 protected boolean useSeparateSession = false; 61 protected boolean largeMessages = false; 62 protected int largeMessageLoopSize = 4 * 1024; 63 64 67 protected void setUp() throws Exception { 68 super.setUp(); 69 String temp = System.getProperty("messageCount"); 70 71 if (temp != null) { 72 int i = Integer.parseInt(temp); 73 if (i > 0) { 74 messageCount = i; 75 } 76 } 77 78 log.info("Message count for test case is: " + messageCount); 79 data = new String [messageCount]; 80 81 for (int i = 0; i < messageCount; i++) { 82 data[i] = createMessageText(i); 83 } 84 } 85 86 87 protected String createMessageText(int i) { 88 if (largeMessages) { 89 return createMessageBodyText(); 90 } 91 else { 92 return "Text for message: " + i + " at " + new Date (); 93 } 94 } 95 96 protected String createMessageBodyText() { 97 StringBuffer buffer = new StringBuffer (); 98 for (int i = 0; i < largeMessageLoopSize; i++) { 99 buffer.append("0123456789"); 100 } 101 return buffer.toString(); 102 } 103 104 109 public void testSendReceive() throws Exception { 110 111 Thread.sleep(1000); 112 messages.clear(); 113 114 for (int i = 0; i < data.length; i++) { 115 Message message = createMessage(i); 116 configureMessage(message); 117 if (verbose) { 118 log.info("About to send a message: " + message + " with text: " + data[i]); 119 } 120 121 producer.send(producerDestination, message); 122 } 123 124 assertMessagesAreReceived(); 125 log.info("" + data.length + " messages(s) received, closing down connections"); 126 } 127 128 129 protected Message createMessage(int index) throws JMSException { 130 Message message = session.createTextMessage(data[index]); 131 return message; 132 } 133 134 138 protected void configureMessage(Message message) throws JMSException { 139 } 140 141 142 148 protected void assertMessagesAreReceived() throws JMSException { 149 waitForMessagesToBeDelivered(); 150 assertMessagesReceivedAreValid(messages); 151 } 152 153 159 protected void assertMessagesReceivedAreValid(List receivedMessages) throws JMSException { 160 List copyOfMessages = Arrays.asList(receivedMessages.toArray()); 161 int counter = 0; 162 163 if (data.length != copyOfMessages.size()) { 164 for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) { 165 Object message = iter.next(); 166 log.info("<== " + counter++ + " = " + message); 167 } 168 } 169 170 assertEquals("Not enough messages received", data.length, receivedMessages.size()); 171 172 for (int i = 0; i < data.length; i++) { 173 Message received = (Message ) receivedMessages.get(i); 174 assertMessageValid(i, received); 175 } 176 } 177 178 179 protected void assertMessageValid(int index, Message message) throws JMSException { 180 TextMessage textMessage = (TextMessage ) message; 181 String text = textMessage.getText(); 182 183 if (verbose) { 184 log.info("Received Text: " + text); 185 } 186 187 assertEquals("Message: " + index, data[index], text); 188 } 189 190 193 protected void waitForMessagesToBeDelivered() { 194 long maxWaitTime = 60000; 195 long waitTime = maxWaitTime; 196 long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); 197 198 synchronized (lock) { 199 while (messages.size() < data.length && waitTime >= 0) { 200 try { 201 lock.wait(200); 202 } 203 catch (InterruptedException e) { 204 e.printStackTrace(); 205 } 206 207 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 208 } 209 } 210 } 211 212 213 216 public synchronized void onMessage(Message message) { 217 consumeMessage(message, messages); 218 } 219 220 226 protected void consumeMessage(Message message, List messageList) { 227 if (verbose) { 228 log.info("Received message: " + message); 229 } 230 231 messageList.add(message); 232 233 if (messageList.size() >= data.length) { 234 synchronized (lock) { 235 lock.notifyAll(); 236 } 237 } 238 } 239 240 245 protected List createConcurrentList() { 246 return Collections.synchronizedList(new ArrayList ()); 247 } 248 } 249 | Popular Tags |