1 18 package org.apache.activemq.transport.tcp; 19 20 import java.io.IOException ; 21 import java.net.InetAddress ; 22 import java.net.InetSocketAddress ; 23 import java.net.ServerSocket ; 24 import java.net.Socket ; 25 import java.net.SocketTimeoutException ; 26 import java.net.URI ; 27 import java.net.URISyntaxException ; 28 import java.net.UnknownHostException ; 29 import java.util.HashMap ; 30 import java.util.Map ; 31 32 import org.apache.activemq.command.BrokerInfo; 33 import org.apache.activemq.openwire.OpenWireFormatFactory; 34 import org.apache.activemq.transport.Transport; 35 import org.apache.activemq.transport.TransportServer; 36 import org.apache.activemq.transport.TransportServerThreadSupport; 37 import org.apache.activemq.util.IOExceptionSupport; 38 import org.apache.activemq.util.ServiceStopper; 39 import org.apache.activemq.wireformat.WireFormat; 40 import org.apache.activemq.wireformat.WireFormatFactory; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 import javax.net.ServerSocketFactory; 45 46 51 52 public class TcpTransportServer extends TransportServerThreadSupport { 53 54 private static final Log log = LogFactory.getLog(TcpTransportServer.class); 55 protected ServerSocket serverSocket; 56 protected int backlog = 5000; 57 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 58 protected final TcpTransportFactory transportFactory; 59 protected long maxInactivityDuration = 30000; 60 protected int minmumWireFormatVersion; 61 protected boolean trace; 62 protected Map transportOptions; 63 protected final ServerSocketFactory serverSocketFactory; 64 65 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException , URISyntaxException { 66 super(location); 67 this.transportFactory=transportFactory; 68 this.serverSocketFactory = serverSocketFactory; 69 } 70 71 public void bind() throws IOException { 72 URI bind = getBindLocation(); 73 74 String host = bind.getHost(); 75 host = (host == null || host.length() == 0) ? "localhost" : host; 76 InetAddress addr = InetAddress.getByName(host); 77 78 try { 79 if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { 80 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog); 81 } 82 else { 83 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 84 } 85 this.serverSocket.setSoTimeout(2000); 86 } 87 catch (IOException e) { 88 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 89 } 90 try { 91 setConnectURI(new URI (bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(), 92 bind.getQuery(), bind.getFragment())); 93 } catch (URISyntaxException e) { 94 95 try { 98 setConnectURI(new URI (bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 99 bind.getQuery(), bind.getFragment())); 100 } catch (URISyntaxException e2) { 101 throw IOExceptionSupport.create(e2); 102 } 103 } 104 } 105 106 109 public WireFormatFactory getWireFormatFactory() { 110 return wireFormatFactory; 111 } 112 113 117 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 118 this.wireFormatFactory = wireFormatFactory; 119 } 120 121 127 public void setBrokerInfo(BrokerInfo brokerInfo) { 128 } 129 130 public long getMaxInactivityDuration() { 131 return maxInactivityDuration; 132 } 133 134 public void setMaxInactivityDuration(long maxInactivityDuration) { 135 this.maxInactivityDuration = maxInactivityDuration; 136 } 137 138 public int getMinmumWireFormatVersion() { 139 return minmumWireFormatVersion; 140 } 141 142 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 143 this.minmumWireFormatVersion = minmumWireFormatVersion; 144 } 145 146 public boolean isTrace() { 147 return trace; 148 } 149 150 public void setTrace(boolean trace) { 151 this.trace = trace; 152 } 153 154 157 public void run() { 158 while (!isStopped()) { 159 Socket socket = null; 160 try { 161 socket = serverSocket.accept(); 162 if (socket != null) { 163 if (isStopped() || getAcceptListener() == null) { 164 socket.close(); 165 } 166 else { 167 HashMap options = new HashMap (); 168 options.put("maxInactivityDuration", new Long (maxInactivityDuration)); 169 options.put("minmumWireFormatVersion", new Integer (minmumWireFormatVersion)); 170 options.put("trace", new Boolean (trace)); 171 options.putAll(transportOptions); 172 WireFormat format = wireFormatFactory.createWireFormat(); 173 Transport transport = createTransport(socket, format); 174 Transport configuredTransport = transportFactory.serverConfigure(transport, format, options); 175 getAcceptListener().onAccept(configuredTransport); 176 } 177 } 178 } 179 catch (SocketTimeoutException ste) { 180 } 182 catch (Exception e) { 183 if (!isStopping()) { 184 onAcceptError(e); 185 } else if (!isStopped()) { 186 log.warn("run()", e); 187 onAcceptError(e); 188 } 189 } 190 } 191 } 192 193 200 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 201 return new TcpTransport(format, socket); 202 } 203 204 207 public String toString() { 208 return ""+getBindLocation(); 209 } 210 211 217 protected String resolveHostName(String hostName) throws UnknownHostException { 218 String result = hostName; 219 if (hostName != null && (hostName.equalsIgnoreCase("localhost") || hostName.equals("127.0.0.1"))) { 221 result = InetAddress.getLocalHost().getHostName(); 222 } 223 return result; 224 } 225 226 protected void doStop(ServiceStopper stopper) throws Exception { 227 super.doStop(stopper); 228 if (serverSocket != null) { 229 serverSocket.close(); 230 } 231 } 232 233 public InetSocketAddress getSocketAddress() { 234 return (InetSocketAddress )serverSocket.getLocalSocketAddress(); 235 } 236 237 public void setTransportOption(Map transportOptions) { 238 this.transportOptions = transportOptions; 239 } 240 } 241 | Popular Tags |