1 18 package org.apache.activemq.transport.udp; 19 20 import java.io.IOException ; 21 22 import javax.jms.MessageNotWriteableException ; 23 24 import junit.framework.TestCase; 25 26 import org.apache.activemq.command.ActiveMQDestination; 27 import org.apache.activemq.command.ActiveMQQueue; 28 import org.apache.activemq.command.ActiveMQTextMessage; 29 import org.apache.activemq.command.Command; 30 import org.apache.activemq.command.ConsumerInfo; 31 import org.apache.activemq.command.ProducerInfo; 32 import org.apache.activemq.command.Response; 33 import org.apache.activemq.command.WireFormatInfo; 34 import org.apache.activemq.transport.Transport; 35 import org.apache.activemq.transport.TransportAcceptListener; 36 import org.apache.activemq.transport.TransportListener; 37 import org.apache.activemq.transport.TransportServer; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 41 45 public abstract class UdpTestSupport extends TestCase implements TransportListener { 46 47 protected static final Log log = LogFactory.getLog(UdpTestSupport.class); 48 49 protected Transport producer; 50 protected Transport consumer; 51 52 protected Object lock = new Object (); 53 protected Command receivedCommand; 54 protected TransportServer server; 55 protected boolean large; 56 57 protected int waitForCommandTimeout = 40000; 59 60 public void testSendingSmallMessage() throws Exception { 61 ConsumerInfo expected = new ConsumerInfo(); 62 expected.setSelector("Cheese"); 63 expected.setExclusive(true); 64 expected.setExclusive(true); 65 expected.setPrefetchSize(3456); 66 67 try { 68 log.info("About to send: " + expected); 69 producer.oneway(expected); 70 71 Command received = assertCommandReceived(); 72 assertTrue("Should have received a ConsumerInfo but was: " + received, received instanceof ConsumerInfo); 73 ConsumerInfo actual = (ConsumerInfo) received; 74 assertEquals("Selector", expected.getSelector(), actual.getSelector()); 75 assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive()); 76 assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize()); 77 } 78 catch (Exception e) { 79 log.info("Caught: " + e); 80 e.printStackTrace(); 81 fail("Failed to send to transport: " + e); 82 } 83 } 84 85 public void testSendingMediumMessage() throws Exception { 86 String text = createMessageBodyText(4 * 105); 87 ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Medium"); 88 assertSendTextMessage(destination, text); 89 } 90 91 public void testSendingLargeMessage() throws Exception { 92 String text = createMessageBodyText(4 * 1024); 93 ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Large"); 94 assertSendTextMessage(destination, text); 95 } 96 97 protected void assertSendTextMessage(ActiveMQDestination destination, String text) 98 throws MessageNotWriteableException { 99 large = true; 100 101 ActiveMQTextMessage expected = new ActiveMQTextMessage(); 102 103 expected.setText(text); 104 expected.setDestination(destination); 105 106 try { 107 log.info("About to send message of type: " + expected.getClass()); 108 producer.oneway(expected); 109 110 producer.oneway(new ProducerInfo()); 115 producer.oneway(new ProducerInfo()); 116 117 Command received = assertCommandReceived(); 118 assertTrue("Should have received a ActiveMQTextMessage but was: " + received, 119 received instanceof ActiveMQTextMessage); 120 ActiveMQTextMessage actual = (ActiveMQTextMessage) received; 121 122 assertEquals("getDestination", expected.getDestination(), actual.getDestination()); 123 assertEquals("getText", expected.getText(), actual.getText()); 124 125 log.info("Received text message with: " + actual.getText().length() + " character(s)"); 126 } 127 catch (Exception e) { 128 log.info("Caught: " + e); 129 e.printStackTrace(); 130 fail("Failed to send to transport: " + e); 131 } 132 } 133 134 protected String createMessageBodyText(int loopSize) { 135 StringBuffer buffer = new StringBuffer (); 136 for (int i = 0; i < loopSize; i++) { 137 buffer.append("0123456789"); 138 } 139 return buffer.toString(); 140 } 141 142 protected void setUp() throws Exception { 143 server = createServer(); 144 if (server != null) { 145 server.setAcceptListener(new TransportAcceptListener() { 146 147 public void onAccept(Transport transport) { 148 consumer = transport; 149 consumer.setTransportListener(UdpTestSupport.this); 150 try { 151 consumer.start(); 152 } 153 catch (Exception e) { 154 throw new RuntimeException (e); 155 } 156 } 157 158 public void onAcceptError(Exception error) { 159 } 160 }); 161 server.start(); 162 } 163 164 consumer = createConsumer(); 165 if (consumer != null) { 166 consumer.setTransportListener(this); 167 consumer.start(); 168 } 169 170 producer = createProducer(); 171 producer.setTransportListener(new TransportListener() { 172 public void onCommand(Object command) { 173 log.info("Producer received: " + command); 174 } 175 176 public void onException(IOException error) { 177 log.info("Producer exception: " + error); 178 error.printStackTrace(); 179 } 180 181 public void transportInterupted() { 182 } 183 184 public void transportResumed() { 185 } 186 }); 187 188 producer.start(); 189 } 190 191 protected void tearDown() throws Exception { 192 if (producer != null) { 193 producer.stop(); 194 } 195 if (consumer != null) { 196 consumer.stop(); 197 } 198 if (server != null) { 199 server.stop(); 200 } 201 } 202 203 public void onCommand(Object o) { 204 final Command command = (Command) o; 205 if (command instanceof WireFormatInfo) { 206 log.info("Got WireFormatInfo: " + command); 207 } 208 else { 209 if (command.isResponseRequired()) { 210 sendResponse(command); 212 213 } 214 if (large) { 215 log.info("### Received command: " + command.getClass() + " with id: " 216 + command.getCommandId()); 217 } 218 else { 219 log.info("### Received command: " + command); 220 } 221 222 synchronized (lock) { 223 if (receivedCommand == null) { 224 receivedCommand = command; 225 } 226 else { 227 log.info("Ignoring superfluous command: " + command); 228 } 229 lock.notifyAll(); 230 } 231 } 232 } 233 234 protected void sendResponse(Command command) { 235 Response response = new Response(); 236 response.setCorrelationId(command.getCommandId()); 237 try { 238 consumer.oneway(response); 239 } 240 catch (IOException e) { 241 log.info("Caught: " + e); 242 e.printStackTrace(); 243 throw new RuntimeException (e); 244 } 245 } 246 247 public void onException(IOException error) { 248 log.info("### Received error: " + error); 249 error.printStackTrace(); 250 } 251 252 public void transportInterupted() { 253 log.info("### Transport interrupted"); 254 } 255 256 public void transportResumed() { 257 log.info("### Transport resumed"); 258 } 259 260 protected Command assertCommandReceived() throws InterruptedException { 261 Command answer = null; 262 synchronized (lock) { 263 answer = receivedCommand; 264 if (answer == null) { 265 lock.wait(waitForCommandTimeout); 266 } 267 answer = receivedCommand; 268 } 269 270 assertNotNull("Should have received a Command by now!", answer); 271 return answer; 272 } 273 274 protected abstract Transport createConsumer() throws Exception ; 275 276 protected abstract Transport createProducer() throws Exception ; 277 278 protected TransportServer createServer() throws Exception { 279 return null; 280 } 281 282 } 283 | Popular Tags |