1 10 11 package org.mule.providers.tcp; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 14 import org.apache.commons.lang.StringUtils; 15 import org.mule.config.i18n.Message; 16 import org.mule.config.i18n.Messages; 17 import org.mule.impl.MuleMessage; 18 import org.mule.impl.ResponseOutputStream; 19 import org.mule.providers.AbstractMessageReceiver; 20 import org.mule.providers.ConnectException; 21 import org.mule.umo.UMOComponent; 22 import org.mule.umo.UMOMessage; 23 import org.mule.umo.endpoint.UMOEndpoint; 24 import org.mule.umo.lifecycle.Disposable; 25 import org.mule.umo.lifecycle.DisposeException; 26 import org.mule.umo.lifecycle.InitialisationException; 27 import org.mule.umo.provider.UMOConnector; 28 import org.mule.umo.provider.UMOMessageAdapter; 29 30 import javax.resource.spi.work.Work ; 31 import javax.resource.spi.work.WorkException ; 32 import javax.resource.spi.work.WorkManager ; 33 import java.io.BufferedInputStream ; 34 import java.io.BufferedOutputStream ; 35 import java.io.DataInputStream ; 36 import java.io.DataOutputStream ; 37 import java.io.IOException ; 38 import java.io.OutputStream ; 39 import java.net.InetAddress ; 40 import java.net.ServerSocket ; 41 import java.net.Socket ; 42 import java.net.SocketException ; 43 import java.net.SocketTimeoutException ; 44 import java.net.URI ; 45 import java.net.SocketAddress ; 46 47 55 public class TcpMessageReceiver extends AbstractMessageReceiver implements Work 56 { 57 protected ServerSocket serverSocket = null; 58 59 public TcpMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 60 throws InitialisationException 61 { 62 super(connector, component, endpoint); 63 } 64 65 public void doConnect() throws ConnectException 66 { 67 disposing.set(false); 68 URI uri = endpoint.getEndpointURI().getUri(); 69 try 70 { 71 serverSocket = createSocket(uri); 72 } 73 catch (Exception e) 74 { 75 throw new org.mule.providers.ConnectException(new Message("tcp", 1, uri), e, this); 76 } 77 78 try 79 { 80 getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector); 81 } 82 catch (WorkException e) 83 { 84 throw new ConnectException(new Message(Messages.FAILED_TO_SCHEDULE_WORK), e, this); 85 } 86 } 87 88 public void doDisconnect() throws ConnectException 89 { 90 disposing.set(true); 92 try 93 { 94 if (serverSocket != null) 95 { 96 serverSocket.close(); 97 } 98 } 99 catch (IOException e) 100 { 101 logger.warn("Failed to close server socket: " + e.getMessage(), e); 102 } 103 } 104 105 protected ServerSocket createSocket(URI uri) throws Exception 106 { 107 String host = StringUtils.defaultIfEmpty(uri.getHost(), "localhost"); 108 int backlog = ((TcpConnector)connector).getBacklog(); 109 InetAddress inetAddress = InetAddress.getByName(host); 110 if (inetAddress.equals(InetAddress.getLocalHost()) || inetAddress.isLoopbackAddress() 111 || host.trim().equals("localhost")) 112 { 113 return new ServerSocket (uri.getPort(), backlog); 114 } 115 else 116 { 117 return new ServerSocket (uri.getPort(), backlog, inetAddress); 118 } 119 } 120 121 124 public ServerSocket getServerSocket() 125 { 126 return serverSocket; 127 } 128 129 public void run() 130 { 131 while (!disposing.get()) 132 { 133 if (connector.isStarted() && !disposing.get()) 134 { 135 Socket socket = null; 136 try 137 { 138 socket = serverSocket.accept(); 139 140 if (logger.isTraceEnabled()) 141 { 142 logger.trace("Server socket Accepted on: " + serverSocket.getLocalPort()); 143 } 144 } 145 catch (java.io.InterruptedIOException iie) 146 { 147 if (logger.isDebugEnabled()) 148 { 149 logger.debug("Interupted IO doing serverSocket.accept: " + iie.getMessage()); 150 } 151 } 152 catch (Exception e) 153 { 154 if (!connector.isDisposed() && !disposing.get()) 155 { 156 logger.warn("Accept failed on socket: " + e, e); 157 handleException(new ConnectException(e, this)); 158 } 159 } 160 161 if (socket != null) 162 { 163 try 164 { 165 Work work = createWork(socket); 166 try 167 { 168 getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector); 169 } 170 catch (WorkException e) 171 { 172 logger.error("Tcp Server receiver Work was not processed: " + e.getMessage(), e); 173 } 174 } 175 catch (IOException e) 176 { 177 handleException(e); 178 } 179 } 180 } 181 } 182 } 183 184 public void release() 185 { 186 } 188 189 protected void doDispose() 190 { 191 try 192 { 193 if (serverSocket != null && !serverSocket.isClosed()) 194 { 195 serverSocket.close(); 196 } 197 serverSocket = null; 198 199 } 200 catch (Exception e) 201 { 202 logger.error(new DisposeException(new Message("tcp", 2), e)); 203 } 204 logger.info("Closed Tcp port"); 205 } 206 207 protected Work createWork(Socket socket) throws IOException 208 { 209 return new TcpWorker(socket); 210 } 211 212 protected class TcpWorker implements Work , Disposable 213 { 214 protected Socket socket = null; 215 protected DataInputStream dataIn; 216 protected DataOutputStream dataOut; 217 protected AtomicBoolean closed = new AtomicBoolean(false); 218 protected TcpProtocol protocol; 219 220 public TcpWorker(Socket socket) 221 { 222 this.socket = socket; 223 224 final TcpConnector tcpConnector = ((TcpConnector)connector); 225 this.protocol = tcpConnector.getTcpProtocol(); 226 227 try 228 { 229 if (tcpConnector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET 230 && socket.getReceiveBufferSize() != tcpConnector.getBufferSize()) 231 { 232 socket.setReceiveBufferSize(tcpConnector.getBufferSize()); 233 } 234 if (tcpConnector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET 235 && socket.getSendBufferSize() != tcpConnector.getBufferSize()) 236 { 237 socket.setSendBufferSize(tcpConnector.getBufferSize()); 238 } 239 if (tcpConnector.getReceiveTimeout() != UMOConnector.INT_VALUE_NOT_SET 240 && socket.getSoTimeout() != tcpConnector.getReceiveTimeout()) 241 { 242 socket.setSoTimeout(tcpConnector.getReceiveTimeout()); 243 } 244 245 socket.setTcpNoDelay(true); 246 socket.setKeepAlive(tcpConnector.isKeepAlive()); 247 } 248 catch (SocketException e) 249 { 250 logger.error("Failed to set Socket properties: " + e.getMessage(), e); 251 } 252 253 } 254 255 public void release() 256 { 257 dispose(); 258 } 259 260 public void dispose() 261 { 262 closed.set(true); 263 try 264 { 265 if (socket != null && !socket.isClosed()) 266 { 267 if (logger.isDebugEnabled()) 268 { 269 final SocketAddress socketAddress = socket.getLocalSocketAddress(); 272 if (socketAddress == null) 273 { 274 logger.debug("Listener has already been closed by other process."); 275 } 276 else 277 { 278 logger.debug("Closing listener: " + socketAddress); 279 } 280 } 281 socket.close(); 282 } 283 } 284 catch (IOException e) 285 { 286 logger.warn("Socket close failed with: " + e); 287 } 288 } 289 290 293 public void run() 294 { 295 try 296 { 297 dataIn = new DataInputStream (new BufferedInputStream (socket.getInputStream())); 298 dataOut = new DataOutputStream (new BufferedOutputStream (socket.getOutputStream())); 299 300 while (!socket.isClosed() && !disposing.get()) 301 { 302 303 byte[] b; 304 try 305 { 306 b = protocol.read(dataIn); 307 if (b == null) 309 { 310 break; 311 } 312 313 byte[] result = processData(b); 314 if (result != null) 315 { 316 protocol.write(dataOut, result); 317 } 318 dataOut.flush(); 319 } 320 catch (SocketTimeoutException e) 321 { 322 if (!socket.getKeepAlive()) 323 { 324 break; 325 } 326 } 327 } 328 } 329 catch (Exception e) 330 { 331 handleException(e); 332 } 333 finally 334 { 335 dispose(); 336 } 337 } 338 339 protected byte[] processData(byte[] data) throws Exception 340 { 341 UMOMessageAdapter adapter = connector.getMessageAdapter(data); 342 OutputStream os = new ResponseOutputStream(socket.getOutputStream(), socket); 343 UMOMessage returnMessage = routeMessage(new MuleMessage(adapter), endpoint.isSynchronous(), os); 344 if (returnMessage != null) 345 { 346 return returnMessage.getPayloadAsBytes(); 347 } 348 else 349 { 350 return null; 351 } 352 } 353 354 } 355 356 } 357 | Popular Tags |