1 17 package org.apache.activemq.transport.reliable; 18 19 import org.apache.activemq.openwire.OpenWireFormat; 20 import org.apache.activemq.transport.CommandJoiner; 21 import org.apache.activemq.transport.Transport; 22 import org.apache.activemq.transport.udp.ResponseRedirectInterceptor; 23 import org.apache.activemq.transport.udp.UdpTransport; 24 import org.apache.activemq.transport.udp.UdpTransportTest; 25 26 import java.net.SocketAddress ; 27 import java.net.URI ; 28 import java.util.HashSet ; 29 import java.util.Set ; 30 31 35 public class UnreliableUdpTransportTest extends UdpTransportTest { 36 37 protected DropCommandStrategy dropStrategy = new DropCommandStrategy() { 38 39 public boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery) { 40 if (redelivery) { 41 return false; 42 } 43 return commandId % 3 == 2; 44 } 45 }; 46 47 protected Transport createProducer() throws Exception { 48 log.info("Producer using URI: " + producerURI); 49 50 OpenWireFormat wireFormat = createWireFormat(); 51 UnreliableUdpTransport transport = new UnreliableUdpTransport(wireFormat, new URI (producerURI)); 52 transport.setDropCommandStrategy(dropStrategy); 53 54 ReliableTransport reliableTransport = new ReliableTransport(transport, transport); 55 Replayer replayer = reliableTransport.getReplayer(); 56 reliableTransport.setReplayStrategy(createReplayStrategy(replayer)); 57 58 return new CommandJoiner(reliableTransport, wireFormat); 59 } 60 61 protected Transport createConsumer() throws Exception { 62 log.info("Consumer on port: " + consumerPort); 63 OpenWireFormat wireFormat = createWireFormat(); 64 UdpTransport transport = new UdpTransport(wireFormat, consumerPort); 65 66 ReliableTransport reliableTransport = new ReliableTransport(transport, transport); 67 Replayer replayer = reliableTransport.getReplayer(); 68 reliableTransport.setReplayStrategy(createReplayStrategy(replayer)); 69 70 ResponseRedirectInterceptor redirectInterceptor = new ResponseRedirectInterceptor(reliableTransport, transport); 71 return new CommandJoiner(redirectInterceptor, wireFormat); 72 } 73 74 protected ReplayStrategy createReplayStrategy(Replayer replayer) { 75 assertNotNull("Should have a replayer!", replayer); 76 return new DefaultReplayStrategy(1); 77 } 78 } 79 | Popular Tags |