1 18 package org.apache.activemq.transport.udp; 19 20 import java.io.IOException ; 21 import java.net.InetSocketAddress ; 22 import java.net.SocketAddress ; 23 import java.net.URI ; 24 import java.util.HashMap ; 25 import java.util.Map ; 26 27 import org.apache.activemq.command.BrokerInfo; 28 import org.apache.activemq.command.Command; 29 import org.apache.activemq.openwire.OpenWireFormat; 30 import org.apache.activemq.transport.CommandJoiner; 31 import org.apache.activemq.transport.InactivityMonitor; 32 import org.apache.activemq.transport.Transport; 33 import org.apache.activemq.transport.TransportListener; 34 import org.apache.activemq.transport.TransportServer; 35 import org.apache.activemq.transport.TransportServerSupport; 36 import org.apache.activemq.transport.reliable.ReliableTransport; 37 import org.apache.activemq.transport.reliable.ReplayStrategy; 38 import org.apache.activemq.transport.reliable.Replayer; 39 import org.apache.activemq.util.ServiceStopper; 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 48 49 public class UdpTransportServer extends TransportServerSupport { 50 private static final Log log = LogFactory.getLog(UdpTransportServer.class); 51 52 private UdpTransport serverTransport; 53 private ReplayStrategy replayStrategy; 54 private Transport configuredTransport; 55 private boolean usingWireFormatNegotiation; 56 private Map transports = new HashMap (); 57 58 59 public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) { 60 super(connectURI); 61 this.serverTransport = serverTransport; 62 this.configuredTransport = configuredTransport; 63 this.replayStrategy = replayStrategy; 64 } 65 66 public String toString() { 67 return "UdpTransportServer@" + serverTransport; 68 } 69 70 public void run() { 71 } 72 73 public UdpTransport getServerTransport() { 74 return serverTransport; 75 } 76 77 public void setBrokerInfo(BrokerInfo brokerInfo) { 78 } 79 80 protected void doStart() throws Exception { 81 log.info("Starting " + this); 82 83 configuredTransport.setTransportListener(new TransportListener() { 84 public void onCommand(Object o) { 85 final Command command = (Command) o; 86 processInboundConnection(command); 87 } 88 89 public void onException(IOException error) { 90 log.error("Caught: " + error, error); 91 } 92 93 public void transportInterupted() { 94 } 95 96 public void transportResumed() { 97 } 98 }); 99 configuredTransport.start(); 100 } 101 102 protected void doStop(ServiceStopper stopper) throws Exception { 103 configuredTransport.stop(); 104 } 105 106 protected void processInboundConnection(Command command) { 107 DatagramEndpoint endpoint = (DatagramEndpoint) command.getFrom(); 108 if (log.isDebugEnabled()) { 109 log.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command); 110 } 111 Transport transport = null; 112 synchronized (transports) { 113 transport = (Transport) transports.get(endpoint); 114 if (transport == null) { 115 if (usingWireFormatNegotiation && !command.isWireFormatInfo()) { 116 log.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command); 117 } 118 else { 119 if (log.isDebugEnabled()) { 120 log.debug("Creating a new UDP server connection"); 121 } 122 try { 123 transport = createTransport(command, endpoint); 124 transport = configureTransport(transport); 125 transports.put(endpoint, transport); 126 } 127 catch (IOException e) { 128 log.error("Caught: " + e, e); 129 getAcceptListener().onAcceptError(e); 130 } 131 } 132 } 133 else { 134 log.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command); 135 } 136 } 137 } 138 139 protected Transport configureTransport(Transport transport) { 140 transport = new InactivityMonitor(transport); 141 getAcceptListener().onAccept(transport); 142 return transport; 143 } 144 145 protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException { 146 if (endpoint == null) { 147 throw new IOException ("No endpoint available for command: " + command); 148 } 149 final SocketAddress address = endpoint.getAddress(); 150 final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); 151 final UdpTransport transport = new UdpTransport(connectionWireFormat, address); 152 153 final ReliableTransport reliableTransport = new ReliableTransport(transport, transport); 154 Replayer replayer = reliableTransport.getReplayer(); 155 reliableTransport.setReplayStrategy(replayStrategy); 156 157 return new CommandJoiner(reliableTransport, connectionWireFormat) { 159 public void start() throws Exception { 160 super.start(); 161 reliableTransport.onCommand(command); 162 } 163 }; 164 165 166 167 185 } 186 187 public InetSocketAddress getSocketAddress() { 188 return serverTransport.getLocalSocketAddress(); 189 } 190 } 191 | Popular Tags |