KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > tcp > TcpMessageDispatcher


1 /*
2  * $Id: TcpMessageDispatcher.java 3982 2006-11-22 14:28:01Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.tcp;
12
13 import org.mule.util.MapUtils;
14 import org.mule.impl.MuleMessage;
15 import org.mule.providers.AbstractMessageDispatcher;
16 import org.mule.transformers.simple.SerializableToByteArray;
17 import org.mule.umo.UMOEvent;
18 import org.mule.umo.UMOException;
19 import org.mule.umo.UMOMessage;
20 import org.mule.umo.endpoint.UMOImmutableEndpoint;
21 import org.mule.umo.provider.DispatchException;
22 import org.mule.umo.provider.UMOConnector;
23 import org.mule.umo.transformer.TransformerException;
24
25 import java.io.BufferedInputStream JavaDoc;
26 import java.io.BufferedOutputStream JavaDoc;
27 import java.io.DataInputStream JavaDoc;
28 import java.io.IOException JavaDoc;
29 import java.io.OutputStream JavaDoc;
30 import java.net.InetAddress JavaDoc;
31 import java.net.Socket JavaDoc;
32 import java.net.SocketTimeoutException JavaDoc;
33 import java.net.URI JavaDoc;
34 import java.net.URISyntaxException JavaDoc;
35
36 /**
37  * <code>TcpMessageDispatcher</code> will send transformed Mule events over TCP.
38  */

39
40 public class TcpMessageDispatcher extends AbstractMessageDispatcher
41 {
42     private final TcpConnector connector;
43     protected final SerializableToByteArray serializableToByteArray;
44     protected Socket JavaDoc connectedSocket = null;
45     protected boolean keepSendSocketOpen = false;
46
47     public TcpMessageDispatcher(UMOImmutableEndpoint endpoint)
48     {
49         super(endpoint);
50         this.connector = (TcpConnector)endpoint.getConnector();
51         serializableToByteArray = new SerializableToByteArray();
52     }
53
54     protected Socket JavaDoc initSocket(String JavaDoc endpoint) throws IOException JavaDoc, URISyntaxException JavaDoc
55     {
56         URI JavaDoc uri = new URI JavaDoc(endpoint);
57         int port = uri.getPort();
58         InetAddress JavaDoc inetAddress = InetAddress.getByName(uri.getHost());
59         Socket JavaDoc socket = createSocket(port, inetAddress);
60         socket.setReuseAddress(true);
61         if (connector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET
62             && socket.getReceiveBufferSize() != connector.getBufferSize())
63         {
64             socket.setReceiveBufferSize(connector.getBufferSize());
65         }
66         if (connector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET
67             && socket.getSendBufferSize() != connector.getBufferSize())
68         {
69             socket.setSendBufferSize(connector.getBufferSize());
70         }
71         if (connector.getReceiveTimeout() != UMOConnector.INT_VALUE_NOT_SET
72             && socket.getSoTimeout() != connector.getReceiveTimeout())
73         {
74             socket.setSoTimeout(connector.getReceiveTimeout());
75         }
76         return socket;
77     }
78
79     protected synchronized void doDispatch(UMOEvent event) throws Exception JavaDoc
80     {
81         try
82         {
83             doInternalDispatch(event);
84         }
85         finally
86         {
87             if (!keepSendSocketOpen)
88             {
89                 doDispose();
90             }
91         }
92     }
93
94     protected synchronized UMOMessage doSend(UMOEvent event) throws Exception JavaDoc
95     {
96         doInternalDispatch(event);
97
98         if (useRemoteSync(event))
99         {
100             try
101             {
102                 byte[] result = receive(connectedSocket, event.getTimeout());
103                 if (result == null)
104                 {
105                     return null;
106                 }
107                 return new MuleMessage(connector.getMessageAdapter(result));
108             }
109             catch (SocketTimeoutException JavaDoc e)
110             {
111                 // we don't necessarily expect to receive a response here
112
logger.info("Socket timed out normally while doing a synchronous receive on endpointUri: "
113                             + event.getEndpoint().getEndpointURI());
114                 return null;
115             }
116         }
117         else
118         {
119             return event.getMessage();
120         }
121     }
122
123     /**
124      * The doSend() and doDispatch() methods need to handle socket disposure
125      * differently, thus the need to extract this common code.
126      *
127      * @param event event
128      * @throws Exception in case of any error
129      */

130     protected void doInternalDispatch(UMOEvent event) throws Exception JavaDoc
131     {
132         Object JavaDoc payload = event.getTransformedMessage();
133
134         // utilize the value after any endpoint overrides, check for dead socket
135
if (!keepSendSocketOpen || connectedSocket == null || connectedSocket.isClosed())
136         {
137             connectedSocket = initSocket(endpoint.getEndpointURI().getAddress());
138         }
139
140         try
141         {
142             write(connectedSocket, payload);
143             // If we're doing sync receive try and read return info from socket
144
}
145         catch (IOException JavaDoc e)
146         {
147             if (keepSendSocketOpen)
148             {
149                 logger.warn("Write raised exception: '" + e.getMessage() + "' attempting to reconnect.");
150                 // Try reconnecting or a Fatal Connection Exception will be thrown
151
reconnect();
152                 write(connectedSocket, payload);
153             }
154             else
155             {
156                 throw e;
157             }
158         }
159     }
160
161     protected Socket JavaDoc createSocket(int port, InetAddress JavaDoc inetAddress) throws IOException JavaDoc
162     {
163         return new Socket JavaDoc(inetAddress, port);
164     }
165
166     protected void write(Socket JavaDoc socket, Object JavaDoc data) throws IOException JavaDoc, TransformerException
167     {
168         TcpProtocol protocol = connector.getTcpProtocol();
169
170         byte[] binaryData;
171         if (data instanceof String JavaDoc)
172         {
173             binaryData = data.toString().getBytes();
174         }
175         else if (data instanceof byte[])
176         {
177             binaryData = (byte[])data;
178         }
179         else
180         {
181             binaryData = (byte[])serializableToByteArray.transform(data);
182         }
183
184         BufferedOutputStream JavaDoc bos = new BufferedOutputStream JavaDoc(socket.getOutputStream());
185         protocol.write(bos, binaryData);
186         bos.flush();
187     }
188
189     protected byte[] receive(Socket JavaDoc socket, int timeout) throws IOException JavaDoc
190     {
191         DataInputStream JavaDoc dis = new DataInputStream JavaDoc(new BufferedInputStream JavaDoc(socket.getInputStream()));
192         if (timeout >= 0)
193         {
194             socket.setSoTimeout(timeout);
195         }
196         return connector.getTcpProtocol().read(dis);
197     }
198
199     /**
200      * Make a specific request to the underlying transport
201      *
202      * @param endpoint the endpoint to use when connecting to the resource
203      * @param timeout the maximum time the operation should block before returning.
204      * The call should return immediately if there is data available. If
205      * no data becomes available before the timeout elapses, null will be
206      * returned
207      * @return the result of the request wrapped in a UMOMessage object. Null will be
208      * returned if no data was avaialable
209      * @throws Exception if the call to the underlying protocal cuases an exception
210      */

211     protected synchronized UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc
212     {
213         Socket JavaDoc socket = null;
214         try
215         {
216             socket = initSocket(endpoint.getEndpointURI().getAddress());
217             try
218             {
219                 byte[] result = receive(socket, (int)timeout);
220                 if (result == null)
221                 {
222                     return null;
223                 }
224                 return new MuleMessage(connector.getMessageAdapter(result));
225             }
226             catch (SocketTimeoutException JavaDoc e)
227             {
228                 // we don't necesarily expect to receive a resonse here
229
logger.info("Socket timed out normally while doing a synchronous receive on endpointUri: "
230                             + endpoint.getEndpointURI());
231                 return null;
232             }
233         }
234         finally
235         {
236             if (socket != null && !socket.isClosed())
237             {
238                 socket.close();
239             }
240         }
241     }
242
243     public Object JavaDoc getDelegateSession() throws UMOException
244     {
245         return null;
246     }
247
248     public UMOConnector getConnector()
249     {
250         return connector;
251     }
252
253     /**
254      * Well get the output stream (if any) for this type of transport. Typically this
255      * will be called only when Streaming is being used on an outbound endpoint
256      *
257      * @param endpoint the endpoint that releates to this Dispatcher
258      * @param message the current message being processed
259      * @return the output stream to use for this request or null if the transport
260      * does not support streaming
261      * @throws org.mule.umo.UMOException
262      */

263     public OutputStream JavaDoc getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message)
264         throws UMOException
265     {
266         try
267         {
268             return connectedSocket.getOutputStream();
269         }
270         catch (IOException JavaDoc e)
271         {
272             throw new DispatchException(message, endpoint, e);
273         }
274     }
275
276     protected synchronized void doDispose()
277     {
278         try
279         {
280             doDisconnect();
281         }
282         catch (Exception JavaDoc e)
283         {
284             logger.error("Failed to shutdown the dispatcher.", e);
285         }
286     }
287
288     protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception JavaDoc
289     {
290         keepSendSocketOpen = MapUtils.getBooleanValue(endpoint.getProperties(), "keepSendSocketOpen",
291             connector.isKeepSendSocketOpen());
292
293         if (connectedSocket == null || connectedSocket.isClosed() || !keepSendSocketOpen)
294         {
295             connectedSocket = initSocket(endpoint.getEndpointURI().getAddress());
296         }
297     }
298
299     protected void doDisconnect() throws Exception JavaDoc
300     {
301         if (null != connectedSocket && !connectedSocket.isClosed())
302         {
303             try
304             {
305                 connectedSocket.close();
306                 connectedSocket = null;
307             }
308             catch (IOException JavaDoc e)
309             {
310                 logger.warn("ConnectedSocked.close() raised exception. Reason: " + e.getMessage());
311             }
312         }
313     }
314 }
315
Popular Tags