KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > tcp > TcpStreamingMessageReceiver


1 /*
2  * $Id: TcpStreamingMessageReceiver.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.tcp;
12
13 import org.apache.commons.lang.StringUtils;
14 import org.mule.config.i18n.Message;
15 import org.mule.impl.MuleMessage;
16 import org.mule.providers.ConnectException;
17 import org.mule.providers.PollingMessageReceiver;
18 import org.mule.umo.UMOComponent;
19 import org.mule.umo.UMOMessage;
20 import org.mule.umo.endpoint.UMOEndpoint;
21 import org.mule.umo.lifecycle.InitialisationException;
22 import org.mule.umo.provider.UMOConnector;
23 import org.mule.umo.provider.UMOMessageAdapter;
24
25 import java.io.BufferedInputStream JavaDoc;
26 import java.io.DataInputStream JavaDoc;
27 import java.net.InetAddress JavaDoc;
28 import java.net.Socket JavaDoc;
29 import java.net.URI JavaDoc;
30
31 /**
32  * <code>TcpStreamingMessageReceiver</code> establishes a tcp client connection to
33  * an external server and reads the streaming data. No polling frequency is used
34  * since with blocking i/o reads will block, and with non-blocking i/o reads will
35  * occur when data is available. Causing delays between read attempts is unnecessary,
36  * so this forces the pollingFrequency property to zero so no pause occurs in the
37  * PollingMessageReceiver class.
38  *
39  * @author <a HREF="mailto:rlucente@xecu.net">Rich Lucente</a>
40  * @version $Revision: 3798 $
41  */

42 public class TcpStreamingMessageReceiver extends PollingMessageReceiver
43 {
44     protected Socket JavaDoc clientSocket = null;
45
46     protected DataInputStream JavaDoc dataIn = null;
47
48     protected TcpProtocol protocol = null;
49
50     public TcpStreamingMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint)
51         throws InitialisationException
52     {
53         this(connector, component, endpoint, new Long JavaDoc(0));
54     }
55
56     private TcpStreamingMessageReceiver(UMOConnector connector,
57                                         UMOComponent component,
58                                         UMOEndpoint endpoint,
59                                         Long JavaDoc frequency) throws InitialisationException
60     {
61         super(connector, component, endpoint, frequency);
62         protocol = ((TcpConnector)connector).getTcpProtocol();
63         setFrequency(0);
64     }
65
66     public void poll() throws Exception JavaDoc
67     {
68         setFrequency(0); // make sure this is zero and not overridden via config
69
byte[] data = protocol.read(dataIn);
70         if (data != null)
71         {
72             UMOMessageAdapter adapter = connector.getMessageAdapter(data);
73             UMOMessage message = new MuleMessage(adapter);
74             routeMessage(message, endpoint.isSynchronous());
75         }
76     }
77
78     public void doConnect() throws ConnectException
79     {
80         URI JavaDoc uri = endpoint.getEndpointURI().getUri();
81         String JavaDoc host = StringUtils.defaultIfEmpty(uri.getHost(), "localhost");
82
83         try
84         {
85             logger.debug("Attempting to connect to server socket");
86             InetAddress JavaDoc inetAddress = InetAddress.getByName(host);
87             clientSocket = new Socket JavaDoc(inetAddress, uri.getPort());
88             TcpConnector connector = (TcpConnector)this.connector;
89             clientSocket.setReceiveBufferSize(connector.getBufferSize());
90             clientSocket.setSendBufferSize(connector.getBufferSize());
91             clientSocket.setSoTimeout(connector.getReceiveTimeout());
92
93             dataIn = new DataInputStream JavaDoc(new BufferedInputStream JavaDoc(clientSocket.getInputStream()));
94             logger.debug("Connected to server socket");
95         }
96         catch (Exception JavaDoc e)
97         {
98             logger.error(e);
99             throw new ConnectException(new Message("tcp", 1, uri), e, this);
100         }
101     }
102
103     public void doDisconnect() throws Exception JavaDoc
104     {
105         try
106         {
107             if (clientSocket != null && !clientSocket.isClosed())
108             {
109                 clientSocket.shutdownInput();
110                 clientSocket.shutdownOutput();
111                 clientSocket.close();
112             }
113         }
114         finally
115         {
116             clientSocket = null;
117             dataIn = null;
118             logger.info("Closed tcp client socket");
119         }
120     }
121 }
122
Popular Tags