1 18 package org.apache.activemq.transport.tcp; 19 20 import org.apache.activemq.Service; 21 import org.apache.activemq.transport.Transport; 22 import org.apache.activemq.transport.TransportThreadSupport; 23 import org.apache.activemq.util.IntrospectionSupport; 24 import org.apache.activemq.util.ServiceStopper; 25 import org.apache.activemq.wireformat.WireFormat; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 29 import javax.net.SocketFactory; 30 import java.io.DataInputStream ; 31 import java.io.DataOutputStream ; 32 import java.io.IOException ; 33 import java.io.InterruptedIOException ; 34 import java.net.InetAddress ; 35 import java.net.InetSocketAddress ; 36 import java.net.Socket ; 37 import java.net.SocketException ; 38 import java.net.SocketTimeoutException ; 39 import java.net.URI ; 40 import java.net.UnknownHostException ; 41 import java.util.HashMap ; 42 import java.util.Map ; 43 44 49 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 50 private static final Log log = LogFactory.getLog(TcpTransport.class); 51 52 protected final URI remoteLocation; 53 protected final URI localLocation; 54 protected final WireFormat wireFormat; 55 56 protected int connectionTimeout = 30000; 57 protected int soTimeout = 0; 58 protected int socketBufferSize = 64 * 1024; 59 protected int ioBufferSize = 8 * 1024; 60 protected Socket socket; 61 protected DataOutputStream dataOut; 62 protected DataInputStream dataIn; 63 protected boolean trace; 64 protected boolean useLocalHost = true; 65 protected int minmumWireFormatVersion; 66 protected SocketFactory socketFactory; 67 68 private Map socketOptions; 69 private Boolean keepAlive; 70 private Boolean tcpNoDelay; 71 72 83 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException , IOException { 84 this.wireFormat = wireFormat; 85 this.socketFactory = socketFactory; 86 try { 87 this.socket = socketFactory.createSocket(); 88 } 89 catch (SocketException e) { 90 this.socket = null; 91 } 92 this.remoteLocation = remoteLocation; 93 this.localLocation = localLocation; 94 setDaemon(false); 95 } 96 97 98 105 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { 106 this.wireFormat = wireFormat; 107 this.socket = socket; 108 this.remoteLocation = null; 109 this.localLocation = null; 110 setDaemon(true); 111 } 112 113 116 public void oneway(Object command) throws IOException { 117 checkStarted(); 118 wireFormat.marshal(command, dataOut); 119 dataOut.flush(); 120 } 121 122 125 public String toString() { 126 return "tcp://" + socket.getInetAddress() + ":" + socket.getPort(); 127 } 128 129 132 public void run() { 133 log.trace("TCP consumer thread starting"); 134 while (!isStopped()) { 135 try { 136 Object command = readCommand(); 137 doConsume(command); 138 } 139 catch (SocketTimeoutException e) { 140 } 141 catch (InterruptedIOException e) { 142 } 143 catch (IOException e) { 144 try { 145 stop(); 146 } 147 catch (Exception e2) { 148 log.warn("Caught while closing: " + e2 + ". Now Closed", e2); 149 } 150 onException(e); 151 } 152 } 153 } 154 155 protected Object readCommand() throws IOException { 156 return wireFormat.unmarshal(dataIn); 157 } 158 159 162 public boolean isTrace() { 163 return trace; 164 } 165 166 public void setTrace(boolean trace) { 167 this.trace = trace; 168 } 169 170 public int getMinmumWireFormatVersion() { 171 return minmumWireFormatVersion; 172 } 173 174 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 175 this.minmumWireFormatVersion = minmumWireFormatVersion; 176 } 177 178 public boolean isUseLocalHost() { 179 return useLocalHost; 180 } 181 182 187 public void setUseLocalHost(boolean useLocalHost) { 188 this.useLocalHost = useLocalHost; 189 } 190 191 public int getSocketBufferSize() { 192 return socketBufferSize; 193 } 194 195 198 public void setSocketBufferSize(int socketBufferSize) { 199 this.socketBufferSize = socketBufferSize; 200 } 201 202 public int getSoTimeout() { 203 return soTimeout; 204 } 205 206 209 public void setSoTimeout(int soTimeout) { 210 this.soTimeout = soTimeout; 211 } 212 213 public int getConnectionTimeout() { 214 return connectionTimeout; 215 } 216 217 220 public void setConnectionTimeout(int connectionTimeout) { 221 this.connectionTimeout = connectionTimeout; 222 } 223 224 public Boolean getKeepAlive() { 225 return keepAlive; 226 } 227 228 231 public void setKeepAlive(Boolean keepAlive) { 232 this.keepAlive = keepAlive; 233 } 234 235 public Boolean getTcpNoDelay() { 236 return tcpNoDelay; 237 } 238 239 242 public void setTcpNoDelay(Boolean tcpNoDelay) { 243 this.tcpNoDelay = tcpNoDelay; 244 } 245 246 249 public int getIoBufferSize(){ 250 return this.ioBufferSize; 251 } 252 253 256 public void setIoBufferSize(int ioBufferSize){ 257 this.ioBufferSize=ioBufferSize; 258 } 259 260 261 protected String resolveHostName(String host) throws UnknownHostException { 264 String localName = InetAddress.getLocalHost().getHostName(); 265 if (localName != null && isUseLocalHost()) { 266 if (localName.equals(host)) { 267 return "localhost"; 268 } 269 } 270 return host; 271 } 272 273 279 protected void initialiseSocket(Socket sock) throws SocketException { 280 if (socketOptions != null) { 281 IntrospectionSupport.setProperties(socket, socketOptions); 282 } 283 284 try { 285 sock.setReceiveBufferSize(socketBufferSize); 286 sock.setSendBufferSize(socketBufferSize); 287 } 288 catch (SocketException se) { 289 log.warn("Cannot set socket buffer size = " + socketBufferSize); 290 log.debug("Cannot set socket buffer size. Reason: " + se, se); 291 } 292 sock.setSoTimeout(soTimeout); 293 294 if (keepAlive != null) { 295 sock.setKeepAlive(keepAlive.booleanValue()); 296 } 297 if (tcpNoDelay != null) { 298 sock.setTcpNoDelay(tcpNoDelay.booleanValue()); 299 } 300 } 301 302 protected void doStart() throws Exception { 303 connect(); 304 super.doStart(); 305 } 306 307 protected void connect() throws Exception { 308 309 if (socket == null && socketFactory == null) { 310 throw new IllegalStateException ("Cannot connect if the socket or socketFactory have not been set"); 311 } 312 313 InetSocketAddress localAddress = null; 314 InetSocketAddress remoteAddress = null; 315 316 if (localLocation != null) { 317 localAddress = new InetSocketAddress (InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); 318 } 319 320 if (remoteLocation != null) { 321 String host = resolveHostName(remoteLocation.getHost()); 322 remoteAddress = new InetSocketAddress (host, remoteLocation.getPort()); 323 } 324 325 if (socket != null) { 326 327 if (localAddress != null) { 328 socket.bind(localAddress); 329 } 330 331 if (remoteAddress != null) { 334 if (connectionTimeout >= 0) { 335 socket.connect(remoteAddress, connectionTimeout); 336 } 337 else { 338 socket.connect(remoteAddress); 339 } 340 } 341 342 } 343 else { 344 if (localAddress != null) { 347 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort()); 348 } 349 else { 350 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); 351 } 352 } 353 354 initialiseSocket(socket); 355 initializeStreams(); 356 } 357 358 protected void doStop(ServiceStopper stopper) throws Exception { 359 if (log.isDebugEnabled()) { 360 log.debug("Stopping transport " + this); 361 } 362 363 if (socket != null) { 367 socket.close(); 368 } 369 } 370 371 protected void initializeStreams() throws Exception { 372 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); 373 this.dataIn = new DataInputStream (buffIn); 374 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 375 this.dataOut = new DataOutputStream (buffOut); 376 } 377 378 protected void closeStreams() throws IOException { 379 if (dataOut != null) { 380 dataOut.close(); 381 } 382 if (dataIn != null) { 383 dataIn.close(); 384 } 385 } 386 387 public void setSocketOptions(Map socketOptions) { 388 this.socketOptions = new HashMap (socketOptions); 389 } 390 391 public String getRemoteAddress() { 392 if (socket != null) { 393 return "" + socket.getRemoteSocketAddress(); 394 } 395 return null; 396 } 397 398 399 400 401 } 402 | Popular Tags |