1 10 11 package org.mule.providers.tcp; 12 13 import org.mule.util.MapUtils; 14 import org.mule.impl.MuleMessage; 15 import org.mule.providers.AbstractMessageDispatcher; 16 import org.mule.transformers.simple.SerializableToByteArray; 17 import org.mule.umo.UMOEvent; 18 import org.mule.umo.UMOException; 19 import org.mule.umo.UMOMessage; 20 import org.mule.umo.endpoint.UMOImmutableEndpoint; 21 import org.mule.umo.provider.DispatchException; 22 import org.mule.umo.provider.UMOConnector; 23 import org.mule.umo.transformer.TransformerException; 24 25 import java.io.BufferedInputStream ; 26 import java.io.BufferedOutputStream ; 27 import java.io.DataInputStream ; 28 import java.io.IOException ; 29 import java.io.OutputStream ; 30 import java.net.InetAddress ; 31 import java.net.Socket ; 32 import java.net.SocketTimeoutException ; 33 import java.net.URI ; 34 import java.net.URISyntaxException ; 35 36 39 40 public class TcpMessageDispatcher extends AbstractMessageDispatcher 41 { 42 private final TcpConnector connector; 43 protected final SerializableToByteArray serializableToByteArray; 44 protected Socket connectedSocket = null; 45 protected boolean keepSendSocketOpen = false; 46 47 public TcpMessageDispatcher(UMOImmutableEndpoint endpoint) 48 { 49 super(endpoint); 50 this.connector = (TcpConnector)endpoint.getConnector(); 51 serializableToByteArray = new SerializableToByteArray(); 52 } 53 54 protected Socket initSocket(String endpoint) throws IOException , URISyntaxException 55 { 56 URI uri = new URI (endpoint); 57 int port = uri.getPort(); 58 InetAddress inetAddress = InetAddress.getByName(uri.getHost()); 59 Socket socket = createSocket(port, inetAddress); 60 socket.setReuseAddress(true); 61 if (connector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET 62 && socket.getReceiveBufferSize() != connector.getBufferSize()) 63 { 64 socket.setReceiveBufferSize(connector.getBufferSize()); 65 } 66 if (connector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET 67 && socket.getSendBufferSize() != connector.getBufferSize()) 68 { 69 socket.setSendBufferSize(connector.getBufferSize()); 70 } 71 if (connector.getReceiveTimeout() != UMOConnector.INT_VALUE_NOT_SET 72 && socket.getSoTimeout() != connector.getReceiveTimeout()) 73 { 74 socket.setSoTimeout(connector.getReceiveTimeout()); 75 } 76 return socket; 77 } 78 79 protected synchronized void doDispatch(UMOEvent event) throws Exception 80 { 81 try 82 { 83 doInternalDispatch(event); 84 } 85 finally 86 { 87 if (!keepSendSocketOpen) 88 { 89 doDispose(); 90 } 91 } 92 } 93 94 protected synchronized UMOMessage doSend(UMOEvent event) throws Exception 95 { 96 doInternalDispatch(event); 97 98 if (useRemoteSync(event)) 99 { 100 try 101 { 102 byte[] result = receive(connectedSocket, event.getTimeout()); 103 if (result == null) 104 { 105 return null; 106 } 107 return new MuleMessage(connector.getMessageAdapter(result)); 108 } 109 catch (SocketTimeoutException e) 110 { 111 logger.info("Socket timed out normally while doing a synchronous receive on endpointUri: " 113 + event.getEndpoint().getEndpointURI()); 114 return null; 115 } 116 } 117 else 118 { 119 return event.getMessage(); 120 } 121 } 122 123 130 protected void doInternalDispatch(UMOEvent event) throws Exception 131 { 132 Object payload = event.getTransformedMessage(); 133 134 if (!keepSendSocketOpen || connectedSocket == null || connectedSocket.isClosed()) 136 { 137 connectedSocket = initSocket(endpoint.getEndpointURI().getAddress()); 138 } 139 140 try 141 { 142 write(connectedSocket, payload); 143 } 145 catch (IOException e) 146 { 147 if (keepSendSocketOpen) 148 { 149 logger.warn("Write raised exception: '" + e.getMessage() + "' attempting to reconnect."); 150 reconnect(); 152 write(connectedSocket, payload); 153 } 154 else 155 { 156 throw e; 157 } 158 } 159 } 160 161 protected Socket createSocket(int port, InetAddress inetAddress) throws IOException 162 { 163 return new Socket (inetAddress, port); 164 } 165 166 protected void write(Socket socket, Object data) throws IOException , TransformerException 167 { 168 TcpProtocol protocol = connector.getTcpProtocol(); 169 170 byte[] binaryData; 171 if (data instanceof String ) 172 { 173 binaryData = data.toString().getBytes(); 174 } 175 else if (data instanceof byte[]) 176 { 177 binaryData = (byte[])data; 178 } 179 else 180 { 181 binaryData = (byte[])serializableToByteArray.transform(data); 182 } 183 184 BufferedOutputStream bos = new BufferedOutputStream (socket.getOutputStream()); 185 protocol.write(bos, binaryData); 186 bos.flush(); 187 } 188 189 protected byte[] receive(Socket socket, int timeout) throws IOException 190 { 191 DataInputStream dis = new DataInputStream (new BufferedInputStream (socket.getInputStream())); 192 if (timeout >= 0) 193 { 194 socket.setSoTimeout(timeout); 195 } 196 return connector.getTcpProtocol().read(dis); 197 } 198 199 211 protected synchronized UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 212 { 213 Socket socket = null; 214 try 215 { 216 socket = initSocket(endpoint.getEndpointURI().getAddress()); 217 try 218 { 219 byte[] result = receive(socket, (int)timeout); 220 if (result == null) 221 { 222 return null; 223 } 224 return new MuleMessage(connector.getMessageAdapter(result)); 225 } 226 catch (SocketTimeoutException e) 227 { 228 logger.info("Socket timed out normally while doing a synchronous receive on endpointUri: " 230 + endpoint.getEndpointURI()); 231 return null; 232 } 233 } 234 finally 235 { 236 if (socket != null && !socket.isClosed()) 237 { 238 socket.close(); 239 } 240 } 241 } 242 243 public Object getDelegateSession() throws UMOException 244 { 245 return null; 246 } 247 248 public UMOConnector getConnector() 249 { 250 return connector; 251 } 252 253 263 public OutputStream getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message) 264 throws UMOException 265 { 266 try 267 { 268 return connectedSocket.getOutputStream(); 269 } 270 catch (IOException e) 271 { 272 throw new DispatchException(message, endpoint, e); 273 } 274 } 275 276 protected synchronized void doDispose() 277 { 278 try 279 { 280 doDisconnect(); 281 } 282 catch (Exception e) 283 { 284 logger.error("Failed to shutdown the dispatcher.", e); 285 } 286 } 287 288 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 289 { 290 keepSendSocketOpen = MapUtils.getBooleanValue(endpoint.getProperties(), "keepSendSocketOpen", 291 connector.isKeepSendSocketOpen()); 292 293 if (connectedSocket == null || connectedSocket.isClosed() || !keepSendSocketOpen) 294 { 295 connectedSocket = initSocket(endpoint.getEndpointURI().getAddress()); 296 } 297 } 298 299 protected void doDisconnect() throws Exception 300 { 301 if (null != connectedSocket && !connectedSocket.isClosed()) 302 { 303 try 304 { 305 connectedSocket.close(); 306 connectedSocket = null; 307 } 308 catch (IOException e) 309 { 310 logger.warn("ConnectedSocked.close() raised exception. Reason: " + e.getMessage()); 311 } 312 } 313 } 314 } 315 | Popular Tags |