KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > udp > UdpTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.udp;
19
20 import org.apache.activemq.Service;
21 import org.apache.activemq.command.Command;
22 import org.apache.activemq.command.Endpoint;
23 import org.apache.activemq.openwire.OpenWireFormat;
24 import org.apache.activemq.transport.Transport;
25 import org.apache.activemq.transport.TransportThreadSupport;
26 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
27 import org.apache.activemq.transport.reliable.ReplayBuffer;
28 import org.apache.activemq.transport.reliable.ReplayStrategy;
29 import org.apache.activemq.transport.reliable.Replayer;
30 import org.apache.activemq.util.IntSequenceGenerator;
31 import org.apache.activemq.util.ServiceStopper;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35 import java.io.EOFException JavaDoc;
36 import java.io.IOException JavaDoc;
37 import java.net.BindException JavaDoc;
38 import java.net.DatagramSocket JavaDoc;
39 import java.net.InetAddress JavaDoc;
40 import java.net.InetSocketAddress JavaDoc;
41 import java.net.SocketAddress JavaDoc;
42 import java.net.SocketException JavaDoc;
43 import java.net.URI JavaDoc;
44 import java.net.UnknownHostException JavaDoc;
45 import java.nio.channels.AsynchronousCloseException JavaDoc;
46 import java.nio.channels.DatagramChannel JavaDoc;
47
48 /**
49  * An implementation of the {@link Transport} interface using raw UDP
50  *
51  * @version $Revision: 464110 $
52  */

53 public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable JavaDoc {
54     private static final Log log = LogFactory.getLog(UdpTransport.class);
55
56     private static final int MAX_BIND_ATTEMPTS = 50;
57     private static final long BIND_ATTEMPT_DELAY = 100;
58
59     private CommandChannel commandChannel;
60     private OpenWireFormat wireFormat;
61     private ByteBufferPool bufferPool;
62     private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
63     private ReplayBuffer replayBuffer;
64     private int datagramSize = 4 * 1024;
65     private SocketAddress JavaDoc targetAddress;
66     private SocketAddress JavaDoc originalTargetAddress;
67     private DatagramChannel JavaDoc channel;
68     private boolean trace = false;
69     private boolean useLocalHost = true;
70     private int port;
71     private int minmumWireFormatVersion;
72     private String JavaDoc description = null;
73     private IntSequenceGenerator sequenceGenerator;
74     private boolean replayEnabled = true;
75
76     protected UdpTransport(OpenWireFormat wireFormat) throws IOException JavaDoc {
77         this.wireFormat = wireFormat;
78     }
79
80     public UdpTransport(OpenWireFormat wireFormat, URI JavaDoc remoteLocation) throws UnknownHostException JavaDoc, IOException JavaDoc {
81         this(wireFormat);
82         this.targetAddress = createAddress(remoteLocation);
83         description = remoteLocation.toString() + "@";
84     }
85
86     public UdpTransport(OpenWireFormat wireFormat, SocketAddress JavaDoc socketAddress) throws IOException JavaDoc {
87         this(wireFormat);
88         this.targetAddress = socketAddress;
89         this.description = getProtocolName() + "ServerConnection@";
90     }
91
92     /**
93      * Used by the server transport
94      */

95     public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException JavaDoc, IOException JavaDoc {
96         this(wireFormat);
97         this.port = port;
98         this.targetAddress = null;
99         this.description = getProtocolName() + "Server@";
100     }
101
102
103     /**
104      * Creates a replayer for working with the reliable transport
105      */

106     public Replayer createReplayer() throws IOException JavaDoc {
107         if (replayEnabled ) {
108             return getCommandChannel();
109         }
110         return null;
111     }
112
113     /**
114      * A one way asynchronous send
115      */

116     public void oneway(Object JavaDoc command) throws IOException JavaDoc {
117         oneway(command, targetAddress);
118     }
119
120     /**
121      * A one way asynchronous send to a given address
122      */

123     public void oneway(Object JavaDoc command, SocketAddress JavaDoc address) throws IOException JavaDoc {
124         if (log.isDebugEnabled()) {
125             log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
126         }
127         checkStarted();
128         commandChannel.write((Command) command, address);
129     }
130
131     /**
132      * @return pretty print of 'this'
133      */

134     public String JavaDoc toString() {
135         if (description != null) {
136             return description + port;
137         }
138         else {
139             return getProtocolUriScheme() + targetAddress + "@" + port;
140         }
141     }
142
143     /**
144      * reads packets from a Socket
145      */

146     public void run() {
147         log.trace("Consumer thread starting for: " + toString());
148         while (!isStopped()) {
149             try {
150                 Command command = commandChannel.read();
151                 doConsume(command);
152             }
153             catch (AsynchronousCloseException JavaDoc e) {
154                 // DatagramChannel closed
155
try {
156                     stop();
157                 }
158                 catch (Exception JavaDoc e2) {
159                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
160                 }
161             }
162             catch (SocketException JavaDoc e) {
163                 // DatagramSocket closed
164
log.debug("Socket closed: " + e, e);
165                 try {
166                     stop();
167                 }
168                 catch (Exception JavaDoc e2) {
169                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
170                 }
171             }
172             catch (EOFException JavaDoc e) {
173                 // DataInputStream closed
174
log.debug("Socket closed: " + e, e);
175                 try {
176                     stop();
177                 }
178                 catch (Exception JavaDoc e2) {
179                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
180                 }
181             }
182             catch (Exception JavaDoc e) {
183                 try {
184                     stop();
185                 }
186                 catch (Exception JavaDoc e2) {
187                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
188                 }
189                 if (e instanceof IOException JavaDoc) {
190                     onException((IOException JavaDoc) e);
191                 }
192                 else {
193                     log.error("Caught: " + e, e);
194                     e.printStackTrace();
195                 }
196             }
197         }
198     }
199
200     /**
201      * We have received the WireFormatInfo from the server on the actual channel
202      * we should use for all future communication with the server, so lets set
203      * the target to be the actual channel that the server has chosen for us to
204      * talk on.
205      */

206     public void setTargetEndpoint(Endpoint newTarget) {
207         if (newTarget instanceof DatagramEndpoint) {
208             DatagramEndpoint endpoint = (DatagramEndpoint) newTarget;
209             SocketAddress JavaDoc address = endpoint.getAddress();
210             if (address != null) {
211                 if (originalTargetAddress == null) {
212                     originalTargetAddress = targetAddress;
213                 }
214                 targetAddress = address;
215                 commandChannel.setTargetAddress(address);
216             }
217         }
218     }
219
220     // Properties
221
// -------------------------------------------------------------------------
222
public boolean isTrace() {
223         return trace;
224     }
225
226     public void setTrace(boolean trace) {
227         this.trace = trace;
228     }
229
230     public int getDatagramSize() {
231         return datagramSize;
232     }
233
234     public void setDatagramSize(int datagramSize) {
235         this.datagramSize = datagramSize;
236     }
237
238     public boolean isUseLocalHost() {
239         return useLocalHost;
240     }
241
242     /**
243      * Sets whether 'localhost' or the actual local host name should be used to
244      * make local connections. On some operating systems such as Macs its not
245      * possible to connect as the local host name so localhost is better.
246      */

247     public void setUseLocalHost(boolean useLocalHost) {
248         this.useLocalHost = useLocalHost;
249     }
250
251     public CommandChannel getCommandChannel() throws IOException JavaDoc {
252         if (commandChannel == null) {
253             commandChannel = createCommandChannel();
254         }
255         return commandChannel;
256     }
257
258     /**
259      * Sets the implementation of the command channel to use.
260      */

261     public void setCommandChannel(CommandDatagramChannel commandChannel) {
262         this.commandChannel = commandChannel;
263     }
264
265     public ReplayStrategy getReplayStrategy() {
266         return replayStrategy;
267     }
268
269     /**
270      * Sets the strategy used to replay missed datagrams
271      */

272     public void setReplayStrategy(ReplayStrategy replayStrategy) {
273         this.replayStrategy = replayStrategy;
274     }
275
276     public int getPort() {
277         return port;
278     }
279
280     /**
281      * Sets the port to connect on
282      */

283     public void setPort(int port) {
284         this.port = port;
285     }
286
287     public int getMinmumWireFormatVersion() {
288         return minmumWireFormatVersion;
289     }
290
291     public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
292         this.minmumWireFormatVersion = minmumWireFormatVersion;
293     }
294
295     public OpenWireFormat getWireFormat() {
296         return wireFormat;
297     }
298
299     public IntSequenceGenerator getSequenceGenerator() {
300         if (sequenceGenerator == null) {
301             sequenceGenerator = new IntSequenceGenerator();
302         }
303         return sequenceGenerator;
304     }
305
306     public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
307         this.sequenceGenerator = sequenceGenerator;
308     }
309     
310     public boolean isReplayEnabled() {
311         return replayEnabled;
312     }
313
314     /**
315      * Sets whether or not replay should be enabled when using the reliable transport.
316      * i.e. should we maintain a buffer of messages that can be replayed?
317      */

318     public void setReplayEnabled(boolean replayEnabled) {
319         this.replayEnabled = replayEnabled;
320     }
321
322     public ByteBufferPool getBufferPool() {
323         if (bufferPool == null) {
324             bufferPool = new DefaultBufferPool();
325         }
326         return bufferPool;
327     }
328
329     public void setBufferPool(ByteBufferPool bufferPool) {
330         this.bufferPool = bufferPool;
331     }
332     
333     public ReplayBuffer getReplayBuffer() {
334         return replayBuffer;
335     }
336
337     public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException JavaDoc {
338         this.replayBuffer = replayBuffer;
339         getCommandChannel().setReplayBuffer(replayBuffer);
340     }
341
342     
343     // Implementation methods
344
// -------------------------------------------------------------------------
345

346     /**
347      * Creates an address from the given URI
348      */

349     protected InetSocketAddress JavaDoc createAddress(URI JavaDoc remoteLocation) throws UnknownHostException JavaDoc, IOException JavaDoc {
350         String JavaDoc host = resolveHostName(remoteLocation.getHost());
351         return new InetSocketAddress JavaDoc(host, remoteLocation.getPort());
352     }
353
354     protected String JavaDoc resolveHostName(String JavaDoc host) throws UnknownHostException JavaDoc {
355         String JavaDoc localName = InetAddress.getLocalHost().getHostName();
356         if (localName != null && isUseLocalHost()) {
357             if (localName.equals(host)) {
358                 return "localhost";
359             }
360         }
361         return host;
362     }
363
364     protected void doStart() throws Exception JavaDoc {
365         getCommandChannel().start();
366
367         super.doStart();
368     }
369
370     protected CommandChannel createCommandChannel() throws IOException JavaDoc {
371         SocketAddress JavaDoc localAddress = createLocalAddress();
372         channel = DatagramChannel.open();
373
374         channel = connect(channel, targetAddress);
375
376         DatagramSocket JavaDoc socket = channel.socket();
377         bind(socket, localAddress);
378         if (port == 0) {
379             port = socket.getLocalPort();
380         }
381
382         return createCommandDatagramChannel();
383     }
384
385     protected CommandChannel createCommandDatagramChannel() {
386         return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool());
387     }
388
389     protected void bind(DatagramSocket JavaDoc socket, SocketAddress JavaDoc localAddress) throws IOException JavaDoc {
390         channel.configureBlocking(true);
391
392         if (log.isDebugEnabled()) {
393             log.debug("Binding to address: " + localAddress);
394         }
395         
396         //
397
// We have noticed that on some platfoms like linux, after you close down
398
// a previously bound socket, it can take a little while before we can bind it again.
399
//
400
for(int i=0; i < MAX_BIND_ATTEMPTS; i++){
401             try {
402                 socket.bind(localAddress);
403                 return;
404             } catch (BindException JavaDoc e) {
405                 if ( i+1 == MAX_BIND_ATTEMPTS )
406                     throw e;
407                 try {
408                     Thread.sleep(BIND_ATTEMPT_DELAY);
409                 } catch (InterruptedException JavaDoc e1) {
410                     Thread.currentThread().interrupt();
411                     throw e;
412                 }
413             }
414         }
415
416     }
417
418     protected DatagramChannel JavaDoc connect(DatagramChannel JavaDoc channel, SocketAddress JavaDoc targetAddress2) throws IOException JavaDoc {
419         // TODO
420
// connect to default target address to avoid security checks each time
421
// channel = channel.connect(targetAddress);
422

423         return channel;
424     }
425
426     protected SocketAddress JavaDoc createLocalAddress() {
427         return new InetSocketAddress JavaDoc(port);
428     }
429
430     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
431         if (channel != null) {
432             channel.close();
433         }
434     }
435
436     protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
437         return new DatagramHeaderMarshaller();
438     }
439
440     protected String JavaDoc getProtocolName() {
441         return "Udp";
442     }
443
444     protected String JavaDoc getProtocolUriScheme() {
445         return "udp://";
446     }
447
448     protected SocketAddress JavaDoc getTargetAddress() {
449         return targetAddress;
450     }
451
452     protected DatagramChannel JavaDoc getChannel() {
453         return channel;
454     }
455
456     protected void setChannel(DatagramChannel JavaDoc channel) {
457         this.channel = channel;
458     }
459
460     public InetSocketAddress JavaDoc getLocalSocketAddress() {
461         if( channel==null ) {
462             return null;
463         } else {
464             return (InetSocketAddress JavaDoc)channel.socket().getLocalSocketAddress();
465         }
466     }
467
468     public String JavaDoc getRemoteAddress() {
469         if(targetAddress != null){
470             return "" + targetAddress;
471         }
472         return null;
473     }
474 }
475
Popular Tags