1 18 package org.apache.activemq.transport.udp; 19 20 import org.apache.activemq.Service; 21 import org.apache.activemq.command.Command; 22 import org.apache.activemq.command.Endpoint; 23 import org.apache.activemq.openwire.OpenWireFormat; 24 import org.apache.activemq.transport.Transport; 25 import org.apache.activemq.transport.TransportThreadSupport; 26 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; 27 import org.apache.activemq.transport.reliable.ReplayBuffer; 28 import org.apache.activemq.transport.reliable.ReplayStrategy; 29 import org.apache.activemq.transport.reliable.Replayer; 30 import org.apache.activemq.util.IntSequenceGenerator; 31 import org.apache.activemq.util.ServiceStopper; 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 35 import java.io.EOFException ; 36 import java.io.IOException ; 37 import java.net.BindException ; 38 import java.net.DatagramSocket ; 39 import java.net.InetAddress ; 40 import java.net.InetSocketAddress ; 41 import java.net.SocketAddress ; 42 import java.net.SocketException ; 43 import java.net.URI ; 44 import java.net.UnknownHostException ; 45 import java.nio.channels.AsynchronousCloseException ; 46 import java.nio.channels.DatagramChannel ; 47 48 53 public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 54 private static final Log log = LogFactory.getLog(UdpTransport.class); 55 56 private static final int MAX_BIND_ATTEMPTS = 50; 57 private static final long BIND_ATTEMPT_DELAY = 100; 58 59 private CommandChannel commandChannel; 60 private OpenWireFormat wireFormat; 61 private ByteBufferPool bufferPool; 62 private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy(); 63 private ReplayBuffer replayBuffer; 64 private int datagramSize = 4 * 1024; 65 private SocketAddress targetAddress; 66 private SocketAddress originalTargetAddress; 67 private DatagramChannel channel; 68 private boolean trace = false; 69 private boolean useLocalHost = true; 70 private int port; 71 private int minmumWireFormatVersion; 72 private String description = null; 73 private IntSequenceGenerator sequenceGenerator; 74 private boolean replayEnabled = true; 75 76 protected UdpTransport(OpenWireFormat wireFormat) throws IOException { 77 this.wireFormat = wireFormat; 78 } 79 80 public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException , IOException { 81 this(wireFormat); 82 this.targetAddress = createAddress(remoteLocation); 83 description = remoteLocation.toString() + "@"; 84 } 85 86 public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException { 87 this(wireFormat); 88 this.targetAddress = socketAddress; 89 this.description = getProtocolName() + "ServerConnection@"; 90 } 91 92 95 public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException , IOException { 96 this(wireFormat); 97 this.port = port; 98 this.targetAddress = null; 99 this.description = getProtocolName() + "Server@"; 100 } 101 102 103 106 public Replayer createReplayer() throws IOException { 107 if (replayEnabled ) { 108 return getCommandChannel(); 109 } 110 return null; 111 } 112 113 116 public void oneway(Object command) throws IOException { 117 oneway(command, targetAddress); 118 } 119 120 123 public void oneway(Object command, SocketAddress address) throws IOException { 124 if (log.isDebugEnabled()) { 125 log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command); 126 } 127 checkStarted(); 128 commandChannel.write((Command) command, address); 129 } 130 131 134 public String toString() { 135 if (description != null) { 136 return description + port; 137 } 138 else { 139 return getProtocolUriScheme() + targetAddress + "@" + port; 140 } 141 } 142 143 146 public void run() { 147 log.trace("Consumer thread starting for: " + toString()); 148 while (!isStopped()) { 149 try { 150 Command command = commandChannel.read(); 151 doConsume(command); 152 } 153 catch (AsynchronousCloseException e) { 154 try { 156 stop(); 157 } 158 catch (Exception e2) { 159 log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 160 } 161 } 162 catch (SocketException e) { 163 log.debug("Socket closed: " + e, e); 165 try { 166 stop(); 167 } 168 catch (Exception e2) { 169 log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 170 } 171 } 172 catch (EOFException e) { 173 log.debug("Socket closed: " + e, e); 175 try { 176 stop(); 177 } 178 catch (Exception e2) { 179 log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 180 } 181 } 182 catch (Exception e) { 183 try { 184 stop(); 185 } 186 catch (Exception e2) { 187 log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); 188 } 189 if (e instanceof IOException ) { 190 onException((IOException ) e); 191 } 192 else { 193 log.error("Caught: " + e, e); 194 e.printStackTrace(); 195 } 196 } 197 } 198 } 199 200 206 public void setTargetEndpoint(Endpoint newTarget) { 207 if (newTarget instanceof DatagramEndpoint) { 208 DatagramEndpoint endpoint = (DatagramEndpoint) newTarget; 209 SocketAddress address = endpoint.getAddress(); 210 if (address != null) { 211 if (originalTargetAddress == null) { 212 originalTargetAddress = targetAddress; 213 } 214 targetAddress = address; 215 commandChannel.setTargetAddress(address); 216 } 217 } 218 } 219 220 public boolean isTrace() { 223 return trace; 224 } 225 226 public void setTrace(boolean trace) { 227 this.trace = trace; 228 } 229 230 public int getDatagramSize() { 231 return datagramSize; 232 } 233 234 public void setDatagramSize(int datagramSize) { 235 this.datagramSize = datagramSize; 236 } 237 238 public boolean isUseLocalHost() { 239 return useLocalHost; 240 } 241 242 247 public void setUseLocalHost(boolean useLocalHost) { 248 this.useLocalHost = useLocalHost; 249 } 250 251 public CommandChannel getCommandChannel() throws IOException { 252 if (commandChannel == null) { 253 commandChannel = createCommandChannel(); 254 } 255 return commandChannel; 256 } 257 258 261 public void setCommandChannel(CommandDatagramChannel commandChannel) { 262 this.commandChannel = commandChannel; 263 } 264 265 public ReplayStrategy getReplayStrategy() { 266 return replayStrategy; 267 } 268 269 272 public void setReplayStrategy(ReplayStrategy replayStrategy) { 273 this.replayStrategy = replayStrategy; 274 } 275 276 public int getPort() { 277 return port; 278 } 279 280 283 public void setPort(int port) { 284 this.port = port; 285 } 286 287 public int getMinmumWireFormatVersion() { 288 return minmumWireFormatVersion; 289 } 290 291 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 292 this.minmumWireFormatVersion = minmumWireFormatVersion; 293 } 294 295 public OpenWireFormat getWireFormat() { 296 return wireFormat; 297 } 298 299 public IntSequenceGenerator getSequenceGenerator() { 300 if (sequenceGenerator == null) { 301 sequenceGenerator = new IntSequenceGenerator(); 302 } 303 return sequenceGenerator; 304 } 305 306 public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) { 307 this.sequenceGenerator = sequenceGenerator; 308 } 309 310 public boolean isReplayEnabled() { 311 return replayEnabled; 312 } 313 314 318 public void setReplayEnabled(boolean replayEnabled) { 319 this.replayEnabled = replayEnabled; 320 } 321 322 public ByteBufferPool getBufferPool() { 323 if (bufferPool == null) { 324 bufferPool = new DefaultBufferPool(); 325 } 326 return bufferPool; 327 } 328 329 public void setBufferPool(ByteBufferPool bufferPool) { 330 this.bufferPool = bufferPool; 331 } 332 333 public ReplayBuffer getReplayBuffer() { 334 return replayBuffer; 335 } 336 337 public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException { 338 this.replayBuffer = replayBuffer; 339 getCommandChannel().setReplayBuffer(replayBuffer); 340 } 341 342 343 346 349 protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException , IOException { 350 String host = resolveHostName(remoteLocation.getHost()); 351 return new InetSocketAddress (host, remoteLocation.getPort()); 352 } 353 354 protected String resolveHostName(String host) throws UnknownHostException { 355 String localName = InetAddress.getLocalHost().getHostName(); 356 if (localName != null && isUseLocalHost()) { 357 if (localName.equals(host)) { 358 return "localhost"; 359 } 360 } 361 return host; 362 } 363 364 protected void doStart() throws Exception { 365 getCommandChannel().start(); 366 367 super.doStart(); 368 } 369 370 protected CommandChannel createCommandChannel() throws IOException { 371 SocketAddress localAddress = createLocalAddress(); 372 channel = DatagramChannel.open(); 373 374 channel = connect(channel, targetAddress); 375 376 DatagramSocket socket = channel.socket(); 377 bind(socket, localAddress); 378 if (port == 0) { 379 port = socket.getLocalPort(); 380 } 381 382 return createCommandDatagramChannel(); 383 } 384 385 protected CommandChannel createCommandDatagramChannel() { 386 return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool()); 387 } 388 389 protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException { 390 channel.configureBlocking(true); 391 392 if (log.isDebugEnabled()) { 393 log.debug("Binding to address: " + localAddress); 394 } 395 396 for(int i=0; i < MAX_BIND_ATTEMPTS; i++){ 401 try { 402 socket.bind(localAddress); 403 return; 404 } catch (BindException e) { 405 if ( i+1 == MAX_BIND_ATTEMPTS ) 406 throw e; 407 try { 408 Thread.sleep(BIND_ATTEMPT_DELAY); 409 } catch (InterruptedException e1) { 410 Thread.currentThread().interrupt(); 411 throw e; 412 } 413 } 414 } 415 416 } 417 418 protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException { 419 423 return channel; 424 } 425 426 protected SocketAddress createLocalAddress() { 427 return new InetSocketAddress (port); 428 } 429 430 protected void doStop(ServiceStopper stopper) throws Exception { 431 if (channel != null) { 432 channel.close(); 433 } 434 } 435 436 protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() { 437 return new DatagramHeaderMarshaller(); 438 } 439 440 protected String getProtocolName() { 441 return "Udp"; 442 } 443 444 protected String getProtocolUriScheme() { 445 return "udp://"; 446 } 447 448 protected SocketAddress getTargetAddress() { 449 return targetAddress; 450 } 451 452 protected DatagramChannel getChannel() { 453 return channel; 454 } 455 456 protected void setChannel(DatagramChannel channel) { 457 this.channel = channel; 458 } 459 460 public InetSocketAddress getLocalSocketAddress() { 461 if( channel==null ) { 462 return null; 463 } else { 464 return (InetSocketAddress )channel.socket().getLocalSocketAddress(); 465 } 466 } 467 468 public String getRemoteAddress() { 469 if(targetAddress != null){ 470 return "" + targetAddress; 471 } 472 return null; 473 } 474 } 475 | Popular Tags |