KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > tcp > TcpTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.tcp;
19
20 import org.apache.activemq.Service;
21 import org.apache.activemq.transport.Transport;
22 import org.apache.activemq.transport.TransportThreadSupport;
23 import org.apache.activemq.util.IntrospectionSupport;
24 import org.apache.activemq.util.ServiceStopper;
25 import org.apache.activemq.wireformat.WireFormat;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import javax.net.SocketFactory;
30 import java.io.DataInputStream JavaDoc;
31 import java.io.DataOutputStream JavaDoc;
32 import java.io.IOException JavaDoc;
33 import java.io.InterruptedIOException JavaDoc;
34 import java.net.InetAddress JavaDoc;
35 import java.net.InetSocketAddress JavaDoc;
36 import java.net.Socket JavaDoc;
37 import java.net.SocketException JavaDoc;
38 import java.net.SocketTimeoutException JavaDoc;
39 import java.net.URI JavaDoc;
40 import java.net.UnknownHostException JavaDoc;
41 import java.util.HashMap JavaDoc;
42 import java.util.Map JavaDoc;
43
44 /**
45  * An implementation of the {@link Transport} interface using raw tcp/ip
46  *
47  * @version $Revision$
48  */

49 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable JavaDoc {
50     private static final Log log = LogFactory.getLog(TcpTransport.class);
51
52     protected final URI JavaDoc remoteLocation;
53     protected final URI JavaDoc localLocation;
54     protected final WireFormat wireFormat;
55
56     protected int connectionTimeout = 30000;
57     protected int soTimeout = 0;
58     protected int socketBufferSize = 64 * 1024;
59     protected int ioBufferSize = 8 * 1024;
60     protected Socket JavaDoc socket;
61     protected DataOutputStream JavaDoc dataOut;
62     protected DataInputStream JavaDoc dataIn;
63     protected boolean trace;
64     protected boolean useLocalHost = true;
65     protected int minmumWireFormatVersion;
66     protected SocketFactory socketFactory;
67
68     private Map JavaDoc socketOptions;
69     private Boolean JavaDoc keepAlive;
70     private Boolean JavaDoc tcpNoDelay;
71
72     /**
73      * Connect to a remote Node - e.g. a Broker
74      *
75      * @param wireFormat
76      * @param socketFactory
77      * @param remoteLocation
78      * @param localLocation -
79      * e.g. local InetAddress and local port
80      * @throws IOException
81      * @throws UnknownHostException
82      */

83     public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI JavaDoc remoteLocation, URI JavaDoc localLocation) throws UnknownHostException JavaDoc, IOException JavaDoc {
84         this.wireFormat = wireFormat;
85         this.socketFactory = socketFactory;
86         try {
87             this.socket = socketFactory.createSocket();
88         }
89         catch (SocketException JavaDoc e) {
90             this.socket = null;
91         }
92         this.remoteLocation = remoteLocation;
93         this.localLocation = localLocation;
94         setDaemon(false);
95     }
96
97
98     /**
99      * Initialize from a server Socket
100      *
101      * @param wireFormat
102      * @param socket
103      * @throws IOException
104      */

105     public TcpTransport(WireFormat wireFormat, Socket JavaDoc socket) throws IOException JavaDoc {
106         this.wireFormat = wireFormat;
107         this.socket = socket;
108         this.remoteLocation = null;
109         this.localLocation = null;
110         setDaemon(true);
111     }
112
113     /**
114      * A one way asynchronous send
115      */

116     public void oneway(Object JavaDoc command) throws IOException JavaDoc {
117         checkStarted();
118         wireFormat.marshal(command, dataOut);
119         dataOut.flush();
120     }
121
122     /**
123      * @return pretty print of 'this'
124      */

125     public String JavaDoc toString() {
126         return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
127     }
128
129     /**
130      * reads packets from a Socket
131      */

132     public void run() {
133         log.trace("TCP consumer thread starting");
134         while (!isStopped()) {
135             try {
136                 Object JavaDoc command = readCommand();
137                 doConsume(command);
138             }
139             catch (SocketTimeoutException JavaDoc e) {
140             }
141             catch (InterruptedIOException JavaDoc e) {
142             }
143             catch (IOException JavaDoc e) {
144                 try {
145                     stop();
146                 }
147                 catch (Exception JavaDoc e2) {
148                     log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
149                 }
150                 onException(e);
151             }
152         }
153     }
154
155     protected Object JavaDoc readCommand() throws IOException JavaDoc {
156         return wireFormat.unmarshal(dataIn);
157     }
158
159     // Properties
160
// -------------------------------------------------------------------------
161

162     public boolean isTrace() {
163         return trace;
164     }
165
166     public void setTrace(boolean trace) {
167         this.trace = trace;
168     }
169
170     public int getMinmumWireFormatVersion() {
171         return minmumWireFormatVersion;
172     }
173
174     public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
175         this.minmumWireFormatVersion = minmumWireFormatVersion;
176     }
177
178     public boolean isUseLocalHost() {
179         return useLocalHost;
180     }
181
182     /**
183      * Sets whether 'localhost' or the actual local host name should be used to
184      * make local connections. On some operating systems such as Macs its not
185      * possible to connect as the local host name so localhost is better.
186      */

187     public void setUseLocalHost(boolean useLocalHost) {
188         this.useLocalHost = useLocalHost;
189     }
190
191     public int getSocketBufferSize() {
192         return socketBufferSize;
193     }
194
195     /**
196      * Sets the buffer size to use on the socket
197      */

198     public void setSocketBufferSize(int socketBufferSize) {
199         this.socketBufferSize = socketBufferSize;
200     }
201
202     public int getSoTimeout() {
203         return soTimeout;
204     }
205
206     /**
207      * Sets the socket timeout
208      */

209     public void setSoTimeout(int soTimeout) {
210         this.soTimeout = soTimeout;
211     }
212
213     public int getConnectionTimeout() {
214         return connectionTimeout;
215     }
216
217     /**
218      * Sets the timeout used to connect to the socket
219      */

220     public void setConnectionTimeout(int connectionTimeout) {
221         this.connectionTimeout = connectionTimeout;
222     }
223
224     public Boolean JavaDoc getKeepAlive() {
225         return keepAlive;
226     }
227
228     /**
229      * Enable/disable TCP KEEP_ALIVE mode
230      */

231     public void setKeepAlive(Boolean JavaDoc keepAlive) {
232         this.keepAlive = keepAlive;
233     }
234
235     public Boolean JavaDoc getTcpNoDelay() {
236         return tcpNoDelay;
237     }
238
239     /**
240      * Enable/disable the TCP_NODELAY option on the socket
241      */

242     public void setTcpNoDelay(Boolean JavaDoc tcpNoDelay) {
243         this.tcpNoDelay = tcpNoDelay;
244     }
245     
246     /**
247      * @return the ioBufferSize
248      */

249     public int getIoBufferSize(){
250         return this.ioBufferSize;
251     }
252
253     /**
254      * @param ioBufferSize the ioBufferSize to set
255      */

256     public void setIoBufferSize(int ioBufferSize){
257         this.ioBufferSize=ioBufferSize;
258     }
259
260
261     // Implementation methods
262
// -------------------------------------------------------------------------
263
protected String JavaDoc resolveHostName(String JavaDoc host) throws UnknownHostException JavaDoc {
264         String JavaDoc localName = InetAddress.getLocalHost().getHostName();
265         if (localName != null && isUseLocalHost()) {
266             if (localName.equals(host)) {
267                 return "localhost";
268             }
269         }
270         return host;
271     }
272
273     /**
274      * Configures the socket for use
275      *
276      * @param sock
277      * @throws SocketException
278      */

279     protected void initialiseSocket(Socket JavaDoc sock) throws SocketException JavaDoc {
280         if (socketOptions != null) {
281             IntrospectionSupport.setProperties(socket, socketOptions);
282         }
283
284         try {
285             sock.setReceiveBufferSize(socketBufferSize);
286             sock.setSendBufferSize(socketBufferSize);
287         }
288         catch (SocketException JavaDoc se) {
289             log.warn("Cannot set socket buffer size = " + socketBufferSize);
290             log.debug("Cannot set socket buffer size. Reason: " + se, se);
291         }
292         sock.setSoTimeout(soTimeout);
293
294         if (keepAlive != null) {
295             sock.setKeepAlive(keepAlive.booleanValue());
296         }
297         if (tcpNoDelay != null) {
298             sock.setTcpNoDelay(tcpNoDelay.booleanValue());
299         }
300     }
301
302     protected void doStart() throws Exception JavaDoc {
303         connect();
304         super.doStart();
305     }
306
307     protected void connect() throws Exception JavaDoc {
308
309         if (socket == null && socketFactory == null) {
310             throw new IllegalStateException JavaDoc("Cannot connect if the socket or socketFactory have not been set");
311         }
312
313         InetSocketAddress JavaDoc localAddress = null;
314         InetSocketAddress JavaDoc remoteAddress = null;
315
316         if (localLocation != null) {
317             localAddress = new InetSocketAddress JavaDoc(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
318         }
319
320         if (remoteLocation != null) {
321             String JavaDoc host = resolveHostName(remoteLocation.getHost());
322             remoteAddress = new InetSocketAddress JavaDoc(host, remoteLocation.getPort());
323         }
324
325         if (socket != null) {
326
327             if (localAddress != null) {
328                 socket.bind(localAddress);
329             }
330
331             // If it's a server accepted socket.. we don't need to connect it
332
// to a remote address.
333
if (remoteAddress != null) {
334                 if (connectionTimeout >= 0) {
335                     socket.connect(remoteAddress, connectionTimeout);
336                 }
337                 else {
338                     socket.connect(remoteAddress);
339                 }
340             }
341
342         }
343         else {
344             // For SSL sockets.. you can't create an unconnected socket :(
345
// This means the timout option are not supported either.
346
if (localAddress != null) {
347                 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
348             }
349             else {
350                 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
351             }
352         }
353
354         initialiseSocket(socket);
355         initializeStreams();
356     }
357
358     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
359         if (log.isDebugEnabled()) {
360             log.debug("Stopping transport " + this);
361         }
362
363         // Closing the streams flush the sockets before closing.. if the socket
364
// is hung.. then this hangs the close.
365
// closeStreams();
366
if (socket != null) {
367             socket.close();
368         }
369     }
370
371     protected void initializeStreams() throws Exception JavaDoc {
372         TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
373         this.dataIn = new DataInputStream JavaDoc(buffIn);
374         TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
375         this.dataOut = new DataOutputStream JavaDoc(buffOut);
376     }
377
378     protected void closeStreams() throws IOException JavaDoc {
379         if (dataOut != null) {
380             dataOut.close();
381         }
382         if (dataIn != null) {
383             dataIn.close();
384         }
385     }
386
387     public void setSocketOptions(Map JavaDoc socketOptions) {
388         this.socketOptions = new HashMap JavaDoc(socketOptions);
389     }
390
391     public String JavaDoc getRemoteAddress() {
392         if (socket != null) {
393             return "" + socket.getRemoteSocketAddress();
394         }
395         return null;
396     }
397
398
399     
400   
401 }
402
Popular Tags