1 18 package org.apache.activemq; 19 20 import javax.jms.DeliveryMode ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageListener ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Session ; 28 import javax.jms.TextMessage ; 29 import java.util.ArrayList ; 30 import java.util.Arrays ; 31 import java.util.Collections ; 32 import java.util.Date ; 33 import java.util.Iterator ; 34 import java.util.List ; 35 36 39 public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener { 40 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 41 .getLog(JmsSendReceiveTestSupport.class); 42 43 protected int messageCount = 100; 44 protected String [] data; 45 protected Session session; 46 protected MessageConsumer consumer; 47 protected MessageProducer producer; 48 protected Destination consumerDestination; 49 protected Destination producerDestination; 50 protected List messages = createConcurrentList(); 51 protected boolean topic = true; 52 protected boolean durable = false; 53 protected int deliveryMode = DeliveryMode.PERSISTENT; 54 protected final Object lock = new Object (); 55 protected boolean verbose = false; 56 57 60 protected void setUp() throws Exception { 61 super.setUp(); 62 String temp = System.getProperty("messageCount"); 63 64 if (temp != null) { 65 int i = Integer.parseInt(temp); 66 if (i > 0) { 67 messageCount = i; 68 } 69 } 70 71 log.info("Message count for test case is: " + messageCount); 72 data = new String [messageCount]; 73 74 for (int i = 0; i < messageCount; i++) { 75 data[i] = "Text for message: " + i + " at " + new Date (); 76 } 77 } 78 79 84 public void testSendReceive() throws Exception { 85 messages.clear(); 86 87 for (int i = 0; i < data.length; i++) { 88 Message message = session.createTextMessage(data[i]); 89 message.setStringProperty("stringProperty",data[i]); 90 message.setIntProperty("intProperty",i); 91 92 if (verbose) { 93 if (log.isDebugEnabled()) { 94 log.debug("About to send a message: " + message + " with text: " + data[i]); 95 } 96 } 97 98 producer.send(producerDestination, message); 99 messageSent(); 100 } 101 102 assertMessagesAreReceived(); 103 log.info("" + data.length + " messages(s) received, closing down connections"); 104 } 105 106 111 protected void assertMessagesAreReceived() throws JMSException { 112 waitForMessagesToBeDelivered(); 113 assertMessagesReceivedAreValid(messages); 114 } 115 116 122 protected void assertMessagesReceivedAreValid(List receivedMessages) throws JMSException { 123 List copyOfMessages = Arrays.asList(receivedMessages.toArray()); 124 int counter = 0; 125 126 if (data.length != copyOfMessages.size()) { 127 for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) { 128 TextMessage message = (TextMessage ) iter.next(); 129 if (log.isDebugEnabled()) { 130 log.info("<== " + counter++ + " = " + message); 131 } 132 } 133 } 134 135 assertEquals("Not enough messages received", data.length, receivedMessages.size()); 136 137 for (int i = 0; i < data.length; i++) { 138 TextMessage received = (TextMessage ) receivedMessages.get(i); 139 String text = received.getText(); 140 String stringProperty = received.getStringProperty("stringProperty"); 141 int intProperty = received.getIntProperty("intProperty"); 142 143 if (verbose) { 144 if (log.isDebugEnabled()) { 145 log.info("Received Text: " + text); 146 } 147 } 148 149 assertEquals("Message: " + i, data[i], text); 150 assertEquals(data[i],stringProperty); 151 assertEquals(i,intProperty) ; 152 } 153 } 154 155 158 protected void waitForMessagesToBeDelivered() { 159 long maxWaitTime = 30000; 160 long waitTime = maxWaitTime; 161 long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); 162 163 synchronized (lock) { 164 while (messages.size() < data.length && waitTime >= 0) { 165 try { 166 lock.wait(200); 167 } 168 catch (InterruptedException e) { 169 e.printStackTrace(); 170 } 171 172 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 173 } 174 } 175 } 176 177 180 public synchronized void onMessage(Message message) { 181 consumeMessage(message, messages); 182 } 183 184 190 protected void consumeMessage(Message message, List messageList) { 191 if (verbose) { 192 if (log.isDebugEnabled()) { 193 log.info("Received message: " + message); 194 } 195 } 196 197 messageList.add(message); 198 199 if (messageList.size() >= data.length) { 200 synchronized (lock) { 201 lock.notifyAll(); 202 } 203 } 204 } 205 206 211 protected List createConcurrentList() { 212 return Collections.synchronizedList(new ArrayList ()); 213 } 214 215 220 protected void messageSent() throws Exception { 221 222 } 223 } 224 | Popular Tags |