1 18 package org.apache.activemq.transport.reliable; 19 20 import junit.framework.TestCase; 21 22 import org.apache.activemq.command.ConsumerInfo; 23 import org.apache.activemq.transport.StubTransport; 24 import org.apache.activemq.transport.StubTransportListener; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 28 import java.util.Queue ; 29 30 34 public class ReliableTransportTest extends TestCase { 35 36 protected static final Log log = LogFactory.getLog(ReliableTransportTest.class); 37 38 protected ReliableTransport transport; 39 protected StubTransportListener listener = new StubTransportListener(); 40 protected ReplayStrategy replayStrategy; 41 42 public void testValidSequenceOfPackets() throws Exception { 43 int[] sequenceNumbers = { 1, 2, 3, 4, 5, 6, 7 }; 44 45 sendStreamOfCommands(sequenceNumbers, true); 46 } 47 48 public void testValidWrapAroundPackets() throws Exception { 49 int[] sequenceNumbers = new int[10]; 50 51 int value = Integer.MAX_VALUE - 3; 52 transport.setExpectedCounter(value); 53 54 for (int i = 0; i < 10; i++) { 55 log.info("command: " + i + " = " + value); 56 sequenceNumbers[i] = value++; 57 } 58 59 sendStreamOfCommands(sequenceNumbers, true); 60 } 61 62 public void testDuplicatePacketsDropped() throws Exception { 63 int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 }; 64 65 sendStreamOfCommands(sequenceNumbers, true, 7); 66 } 67 68 public void testOldDuplicatePacketsDropped() throws Exception { 69 int[] sequenceNumbers = { 1, 2, 3, 4, 5, 2, 6, 7 }; 70 71 sendStreamOfCommands(sequenceNumbers, true, 7); 72 } 73 74 public void testOldDuplicatePacketsDroppedUsingNegativeCounters() throws Exception { 75 int[] sequenceNumbers = { -3, -1, -3, -2, -1, 0, 1, -1, 3, 2, 0, 2, 4 }; 76 77 transport.setExpectedCounter(-3); 78 79 sendStreamOfCommands(sequenceNumbers, true, 8); 80 } 81 82 public void testWrongOrderOfPackets() throws Exception { 83 int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 }; 84 85 sendStreamOfCommands(sequenceNumbers, true); 86 } 87 88 public void testMissingPacketsFails() throws Exception { 89 int[] sequenceNumbers = { 1, 2, 4, 5, 6, 7, 8, 9, 10 }; 90 91 sendStreamOfCommands(sequenceNumbers, false); 92 } 93 94 protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected) { 95 sendStreamOfCommands(sequenceNumbers, expected, sequenceNumbers.length); 96 } 97 98 protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected, int expectedCount) { 99 for (int i = 0; i < sequenceNumbers.length; i++) { 100 int commandId = sequenceNumbers[i]; 101 102 ConsumerInfo info = new ConsumerInfo(); 103 info.setSelector("Cheese: " + commandId); 104 info.setCommandId(commandId); 105 106 transport.onCommand(info); 107 } 108 109 Queue exceptions = listener.getExceptions(); 110 Queue commands = listener.getCommands(); 111 if (expected) { 112 if (!exceptions.isEmpty()) { 113 Exception e = (Exception ) exceptions.remove(); 114 e.printStackTrace(); 115 fail("Caught exception: " + e); 116 } 117 assertEquals("number of messages received", expectedCount, commands.size()); 118 119 assertEquals("Should have no buffered commands", 0, transport.getBufferedCommandCount()); 120 } 121 else { 122 assertTrue("Should have received an exception!", exceptions.size() > 0); 123 Exception e = (Exception ) exceptions.remove(); 124 log.info("Caught expected response: " + e); 125 } 126 } 127 128 protected void setUp() throws Exception { 129 if (replayStrategy == null) { 130 replayStrategy = new ExceptionIfDroppedReplayStrategy(); 131 } 132 transport = new ReliableTransport(new StubTransport(), replayStrategy); 133 transport.setTransportListener(listener); 134 transport.start(); 135 } 136 137 } 138 | Popular Tags |