KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > tcp > TcpTransportServer


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.tcp;
19
20 import java.io.IOException JavaDoc;
21 import java.net.InetAddress JavaDoc;
22 import java.net.InetSocketAddress JavaDoc;
23 import java.net.ServerSocket JavaDoc;
24 import java.net.Socket JavaDoc;
25 import java.net.SocketTimeoutException JavaDoc;
26 import java.net.URI JavaDoc;
27 import java.net.URISyntaxException JavaDoc;
28 import java.net.UnknownHostException JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.Map JavaDoc;
31
32 import org.apache.activemq.command.BrokerInfo;
33 import org.apache.activemq.openwire.OpenWireFormatFactory;
34 import org.apache.activemq.transport.Transport;
35 import org.apache.activemq.transport.TransportServer;
36 import org.apache.activemq.transport.TransportServerThreadSupport;
37 import org.apache.activemq.util.IOExceptionSupport;
38 import org.apache.activemq.util.ServiceStopper;
39 import org.apache.activemq.wireformat.WireFormat;
40 import org.apache.activemq.wireformat.WireFormatFactory;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 import javax.net.ServerSocketFactory;
45
46 /**
47  * A TCP based implementation of {@link TransportServer}
48  *
49  * @version $Revision: 1.1 $
50  */

51
52 public class TcpTransportServer extends TransportServerThreadSupport {
53     
54     private static final Log log = LogFactory.getLog(TcpTransportServer.class);
55     protected ServerSocket JavaDoc serverSocket;
56     protected int backlog = 5000;
57     protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
58     protected final TcpTransportFactory transportFactory;
59     protected long maxInactivityDuration = 30000;
60     protected int minmumWireFormatVersion;
61     protected boolean trace;
62     protected Map JavaDoc transportOptions;
63     protected final ServerSocketFactory serverSocketFactory;
64     
65     public TcpTransportServer(TcpTransportFactory transportFactory, URI JavaDoc location, ServerSocketFactory serverSocketFactory) throws IOException JavaDoc, URISyntaxException JavaDoc {
66         super(location);
67         this.transportFactory=transportFactory;
68         this.serverSocketFactory = serverSocketFactory;
69     }
70
71     public void bind() throws IOException JavaDoc {
72         URI JavaDoc bind = getBindLocation();
73         
74         String JavaDoc host = bind.getHost();
75         host = (host == null || host.length() == 0) ? "localhost" : host;
76         InetAddress JavaDoc addr = InetAddress.getByName(host);
77
78         try {
79             if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
80                 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog);
81             }
82             else {
83                 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
84             }
85             this.serverSocket.setSoTimeout(2000);
86         }
87         catch (IOException JavaDoc e) {
88             throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
89         }
90         try {
91             setConnectURI(new URI JavaDoc(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(),
92                     bind.getQuery(), bind.getFragment()));
93         } catch (URISyntaxException JavaDoc e) {
94
95             // it could be that the host name contains invalid characters such as _ on unix platforms
96
// so lets try use the IP address instead
97
try {
98                 setConnectURI(new URI JavaDoc(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
99                         bind.getQuery(), bind.getFragment()));
100             } catch (URISyntaxException JavaDoc e2) {
101                 throw IOExceptionSupport.create(e2);
102             }
103         }
104     }
105     
106     /**
107      * @return Returns the wireFormatFactory.
108      */

109     public WireFormatFactory getWireFormatFactory() {
110         return wireFormatFactory;
111     }
112
113     /**
114      * @param wireFormatFactory
115      * The wireFormatFactory to set.
116      */

117     public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
118         this.wireFormatFactory = wireFormatFactory;
119     }
120
121     /**
122      * Associates a broker info with the transport server so that the transport
123      * can do discovery advertisements of the broker.
124      *
125      * @param brokerInfo
126      */

127     public void setBrokerInfo(BrokerInfo brokerInfo) {
128     }
129
130     public long getMaxInactivityDuration() {
131         return maxInactivityDuration;
132     }
133
134     public void setMaxInactivityDuration(long maxInactivityDuration) {
135         this.maxInactivityDuration = maxInactivityDuration;
136     }
137
138     public int getMinmumWireFormatVersion() {
139         return minmumWireFormatVersion;
140     }
141
142     public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
143         this.minmumWireFormatVersion = minmumWireFormatVersion;
144     }
145
146     public boolean isTrace() {
147         return trace;
148     }
149
150     public void setTrace(boolean trace) {
151         this.trace = trace;
152     }
153
154     /**
155      * pull Sockets from the ServerSocket
156      */

157     public void run() {
158         while (!isStopped()) {
159             Socket JavaDoc socket = null;
160             try {
161                 socket = serverSocket.accept();
162                 if (socket != null) {
163                     if (isStopped() || getAcceptListener() == null) {
164                         socket.close();
165                     }
166                     else {
167                         HashMap JavaDoc options = new HashMap JavaDoc();
168                         options.put("maxInactivityDuration", new Long JavaDoc(maxInactivityDuration));
169                         options.put("minmumWireFormatVersion", new Integer JavaDoc(minmumWireFormatVersion));
170                         options.put("trace", new Boolean JavaDoc(trace));
171                         options.putAll(transportOptions);
172                         WireFormat format = wireFormatFactory.createWireFormat();
173                         Transport transport = createTransport(socket, format);
174                         Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
175                         getAcceptListener().onAccept(configuredTransport);
176                     }
177                 }
178             }
179             catch (SocketTimeoutException JavaDoc ste) {
180                 // expect this to happen
181
}
182             catch (Exception JavaDoc e) {
183                 if (!isStopping()) {
184                     onAcceptError(e);
185                 } else if (!isStopped()) {
186                     log.warn("run()", e);
187                     onAcceptError(e);
188                 }
189             }
190         }
191     }
192
193     /**
194      * Allow derived classes to override the Transport implementation that this transport server creates.
195      * @param socket
196      * @param format
197      * @return
198      * @throws IOException
199      */

200     protected Transport createTransport(Socket JavaDoc socket, WireFormat format) throws IOException JavaDoc {
201         return new TcpTransport(format, socket);
202     }
203
204     /**
205      * @return pretty print of this
206      */

207     public String JavaDoc toString() {
208         return ""+getBindLocation();
209     }
210
211     /**
212      *
213      * @param hostName
214      * @return real hostName
215      * @throws UnknownHostException
216      */

217     protected String JavaDoc resolveHostName(String JavaDoc hostName) throws UnknownHostException JavaDoc {
218         String JavaDoc result = hostName;
219         // hostname can be null for vm:// protocol ...
220
if (hostName != null && (hostName.equalsIgnoreCase("localhost") || hostName.equals("127.0.0.1"))) {
221             result = InetAddress.getLocalHost().getHostName();
222         }
223         return result;
224     }
225
226     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
227         super.doStop(stopper);
228         if (serverSocket != null) {
229             serverSocket.close();
230         }
231     }
232
233     public InetSocketAddress JavaDoc getSocketAddress() {
234         return (InetSocketAddress JavaDoc)serverSocket.getLocalSocketAddress();
235     }
236
237     public void setTransportOption(Map JavaDoc transportOptions) {
238         this.transportOptions = transportOptions;
239     }
240 }
241
Popular Tags