1 10 11 package org.mule.providers.tcp; 12 13 import org.apache.commons.lang.StringUtils; 14 import org.mule.config.i18n.Message; 15 import org.mule.impl.MuleMessage; 16 import org.mule.providers.ConnectException; 17 import org.mule.providers.PollingMessageReceiver; 18 import org.mule.umo.UMOComponent; 19 import org.mule.umo.UMOMessage; 20 import org.mule.umo.endpoint.UMOEndpoint; 21 import org.mule.umo.lifecycle.InitialisationException; 22 import org.mule.umo.provider.UMOConnector; 23 import org.mule.umo.provider.UMOMessageAdapter; 24 25 import java.io.BufferedInputStream ; 26 import java.io.DataInputStream ; 27 import java.net.InetAddress ; 28 import java.net.Socket ; 29 import java.net.URI ; 30 31 42 public class TcpStreamingMessageReceiver extends PollingMessageReceiver 43 { 44 protected Socket clientSocket = null; 45 46 protected DataInputStream dataIn = null; 47 48 protected TcpProtocol protocol = null; 49 50 public TcpStreamingMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 51 throws InitialisationException 52 { 53 this(connector, component, endpoint, new Long (0)); 54 } 55 56 private TcpStreamingMessageReceiver(UMOConnector connector, 57 UMOComponent component, 58 UMOEndpoint endpoint, 59 Long frequency) throws InitialisationException 60 { 61 super(connector, component, endpoint, frequency); 62 protocol = ((TcpConnector)connector).getTcpProtocol(); 63 setFrequency(0); 64 } 65 66 public void poll() throws Exception 67 { 68 setFrequency(0); byte[] data = protocol.read(dataIn); 70 if (data != null) 71 { 72 UMOMessageAdapter adapter = connector.getMessageAdapter(data); 73 UMOMessage message = new MuleMessage(adapter); 74 routeMessage(message, endpoint.isSynchronous()); 75 } 76 } 77 78 public void doConnect() throws ConnectException 79 { 80 URI uri = endpoint.getEndpointURI().getUri(); 81 String host = StringUtils.defaultIfEmpty(uri.getHost(), "localhost"); 82 83 try 84 { 85 logger.debug("Attempting to connect to server socket"); 86 InetAddress inetAddress = InetAddress.getByName(host); 87 clientSocket = new Socket (inetAddress, uri.getPort()); 88 TcpConnector connector = (TcpConnector)this.connector; 89 clientSocket.setReceiveBufferSize(connector.getBufferSize()); 90 clientSocket.setSendBufferSize(connector.getBufferSize()); 91 clientSocket.setSoTimeout(connector.getReceiveTimeout()); 92 93 dataIn = new DataInputStream (new BufferedInputStream (clientSocket.getInputStream())); 94 logger.debug("Connected to server socket"); 95 } 96 catch (Exception e) 97 { 98 logger.error(e); 99 throw new ConnectException(new Message("tcp", 1, uri), e, this); 100 } 101 } 102 103 public void doDisconnect() throws Exception 104 { 105 try 106 { 107 if (clientSocket != null && !clientSocket.isClosed()) 108 { 109 clientSocket.shutdownInput(); 110 clientSocket.shutdownOutput(); 111 clientSocket.close(); 112 } 113 } 114 finally 115 { 116 clientSocket = null; 117 dataIn = null; 118 logger.info("Closed tcp client socket"); 119 } 120 } 121 } 122 | Popular Tags |