KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > tcp > TcpMessageReceiver


1 /*
2  * $Id: TcpMessageReceiver.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.tcp;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
14 import org.apache.commons.lang.StringUtils;
15 import org.mule.config.i18n.Message;
16 import org.mule.config.i18n.Messages;
17 import org.mule.impl.MuleMessage;
18 import org.mule.impl.ResponseOutputStream;
19 import org.mule.providers.AbstractMessageReceiver;
20 import org.mule.providers.ConnectException;
21 import org.mule.umo.UMOComponent;
22 import org.mule.umo.UMOMessage;
23 import org.mule.umo.endpoint.UMOEndpoint;
24 import org.mule.umo.lifecycle.Disposable;
25 import org.mule.umo.lifecycle.DisposeException;
26 import org.mule.umo.lifecycle.InitialisationException;
27 import org.mule.umo.provider.UMOConnector;
28 import org.mule.umo.provider.UMOMessageAdapter;
29
30 import javax.resource.spi.work.Work JavaDoc;
31 import javax.resource.spi.work.WorkException JavaDoc;
32 import javax.resource.spi.work.WorkManager JavaDoc;
33 import java.io.BufferedInputStream JavaDoc;
34 import java.io.BufferedOutputStream JavaDoc;
35 import java.io.DataInputStream JavaDoc;
36 import java.io.DataOutputStream JavaDoc;
37 import java.io.IOException JavaDoc;
38 import java.io.OutputStream JavaDoc;
39 import java.net.InetAddress JavaDoc;
40 import java.net.ServerSocket JavaDoc;
41 import java.net.Socket JavaDoc;
42 import java.net.SocketException JavaDoc;
43 import java.net.SocketTimeoutException JavaDoc;
44 import java.net.URI JavaDoc;
45 import java.net.SocketAddress JavaDoc;
46
47 /**
48  * <code>TcpMessageReceiver</code> acts like a tcp server to receive socket
49  * requests.
50  *
51  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
52  * @author <a HREF="mailto:tsuppari@yahoo.co.uk">P.Oikari</a>
53  * @version $Revision: 4219 $
54  */

55 public class TcpMessageReceiver extends AbstractMessageReceiver implements Work JavaDoc
56 {
57     protected ServerSocket JavaDoc serverSocket = null;
58
59     public TcpMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint)
60         throws InitialisationException
61     {
62         super(connector, component, endpoint);
63     }
64
65     public void doConnect() throws ConnectException
66     {
67         disposing.set(false);
68         URI JavaDoc uri = endpoint.getEndpointURI().getUri();
69         try
70         {
71             serverSocket = createSocket(uri);
72         }
73         catch (Exception JavaDoc e)
74         {
75             throw new org.mule.providers.ConnectException(new Message("tcp", 1, uri), e, this);
76         }
77
78         try
79         {
80             getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector);
81         }
82         catch (WorkException JavaDoc e)
83         {
84             throw new ConnectException(new Message(Messages.FAILED_TO_SCHEDULE_WORK), e, this);
85         }
86     }
87
88     public void doDisconnect() throws ConnectException
89     {
90         // this will cause the server thread to quit
91
disposing.set(true);
92         try
93         {
94             if (serverSocket != null)
95             {
96                 serverSocket.close();
97             }
98         }
99         catch (IOException JavaDoc e)
100         {
101             logger.warn("Failed to close server socket: " + e.getMessage(), e);
102         }
103     }
104
105     protected ServerSocket JavaDoc createSocket(URI JavaDoc uri) throws Exception JavaDoc
106     {
107         String JavaDoc host = StringUtils.defaultIfEmpty(uri.getHost(), "localhost");
108         int backlog = ((TcpConnector)connector).getBacklog();
109         InetAddress JavaDoc inetAddress = InetAddress.getByName(host);
110         if (inetAddress.equals(InetAddress.getLocalHost()) || inetAddress.isLoopbackAddress()
111             || host.trim().equals("localhost"))
112         {
113             return new ServerSocket JavaDoc(uri.getPort(), backlog);
114         }
115         else
116         {
117             return new ServerSocket JavaDoc(uri.getPort(), backlog, inetAddress);
118         }
119     }
120
121     /**
122      * Obtain the serverSocket
123      */

124     public ServerSocket JavaDoc getServerSocket()
125     {
126         return serverSocket;
127     }
128
129     public void run()
130     {
131         while (!disposing.get())
132         {
133             if (connector.isStarted() && !disposing.get())
134             {
135                 Socket JavaDoc socket = null;
136                 try
137                 {
138                     socket = serverSocket.accept();
139
140                     if (logger.isTraceEnabled())
141                     {
142                         logger.trace("Server socket Accepted on: " + serverSocket.getLocalPort());
143                     }
144                 }
145                 catch (java.io.InterruptedIOException JavaDoc iie)
146                 {
147                     if (logger.isDebugEnabled())
148                     {
149                         logger.debug("Interupted IO doing serverSocket.accept: " + iie.getMessage());
150                     }
151                 }
152                 catch (Exception JavaDoc e)
153                 {
154                     if (!connector.isDisposed() && !disposing.get())
155                     {
156                         logger.warn("Accept failed on socket: " + e, e);
157                         handleException(new ConnectException(e, this));
158                     }
159                 }
160
161                 if (socket != null)
162                 {
163                     try
164                     {
165                         Work JavaDoc work = createWork(socket);
166                         try
167                         {
168                             getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector);
169                         }
170                         catch (WorkException JavaDoc e)
171                         {
172                             logger.error("Tcp Server receiver Work was not processed: " + e.getMessage(), e);
173                         }
174                     }
175                     catch (IOException JavaDoc e)
176                     {
177                         handleException(e);
178                     }
179                 }
180             }
181         }
182     }
183
184     public void release()
185     {
186         // template method
187
}
188
189     protected void doDispose()
190     {
191         try
192         {
193             if (serverSocket != null && !serverSocket.isClosed())
194             {
195                 serverSocket.close();
196             }
197             serverSocket = null;
198
199         }
200         catch (Exception JavaDoc e)
201         {
202             logger.error(new DisposeException(new Message("tcp", 2), e));
203         }
204         logger.info("Closed Tcp port");
205     }
206
207     protected Work JavaDoc createWork(Socket JavaDoc socket) throws IOException JavaDoc
208     {
209         return new TcpWorker(socket);
210     }
211
212     protected class TcpWorker implements Work JavaDoc, Disposable
213     {
214         protected Socket JavaDoc socket = null;
215         protected DataInputStream JavaDoc dataIn;
216         protected DataOutputStream JavaDoc dataOut;
217         protected AtomicBoolean closed = new AtomicBoolean(false);
218         protected TcpProtocol protocol;
219
220         public TcpWorker(Socket JavaDoc socket)
221         {
222             this.socket = socket;
223
224             final TcpConnector tcpConnector = ((TcpConnector)connector);
225             this.protocol = tcpConnector.getTcpProtocol();
226
227             try
228             {
229                 if (tcpConnector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET
230                     && socket.getReceiveBufferSize() != tcpConnector.getBufferSize())
231                 {
232                     socket.setReceiveBufferSize(tcpConnector.getBufferSize());
233                 }
234                 if (tcpConnector.getBufferSize() != UMOConnector.INT_VALUE_NOT_SET
235                     && socket.getSendBufferSize() != tcpConnector.getBufferSize())
236                 {
237                     socket.setSendBufferSize(tcpConnector.getBufferSize());
238                 }
239                 if (tcpConnector.getReceiveTimeout() != UMOConnector.INT_VALUE_NOT_SET
240                     && socket.getSoTimeout() != tcpConnector.getReceiveTimeout())
241                 {
242                     socket.setSoTimeout(tcpConnector.getReceiveTimeout());
243                 }
244
245                 socket.setTcpNoDelay(true);
246                 socket.setKeepAlive(tcpConnector.isKeepAlive());
247             }
248             catch (SocketException JavaDoc e)
249             {
250                 logger.error("Failed to set Socket properties: " + e.getMessage(), e);
251             }
252
253         }
254
255         public void release()
256         {
257             dispose();
258         }
259
260         public void dispose()
261         {
262             closed.set(true);
263             try
264             {
265                 if (socket != null && !socket.isClosed())
266                 {
267                     if (logger.isDebugEnabled())
268                     {
269                         // some dirty workaround for IBM JSSE's SSL implementation,
270
// which closes sockets asynchronously by that point.
271
final SocketAddress JavaDoc socketAddress = socket.getLocalSocketAddress();
272                         if (socketAddress == null)
273                         {
274                             logger.debug("Listener has already been closed by other process.");
275                         }
276                         else
277                         {
278                             logger.debug("Closing listener: " + socketAddress);
279                         }
280                     }
281                     socket.close();
282                 }
283             }
284             catch (IOException JavaDoc e)
285             {
286                 logger.warn("Socket close failed with: " + e);
287             }
288         }
289
290         /**
291          * Accept requests from a given TCP port
292          */

293         public void run()
294         {
295             try
296             {
297                 dataIn = new DataInputStream JavaDoc(new BufferedInputStream JavaDoc(socket.getInputStream()));
298                 dataOut = new DataOutputStream JavaDoc(new BufferedOutputStream JavaDoc(socket.getOutputStream()));
299
300                 while (!socket.isClosed() && !disposing.get())
301                 {
302
303                     byte[] b;
304                     try
305                     {
306                         b = protocol.read(dataIn);
307                         // end of stream
308
if (b == null)
309                         {
310                             break;
311                         }
312
313                         byte[] result = processData(b);
314                         if (result != null)
315                         {
316                             protocol.write(dataOut, result);
317                         }
318                         dataOut.flush();
319                     }
320                     catch (SocketTimeoutException JavaDoc e)
321                     {
322                         if (!socket.getKeepAlive())
323                         {
324                             break;
325                         }
326                     }
327                 }
328             }
329             catch (Exception JavaDoc e)
330             {
331                 handleException(e);
332             }
333             finally
334             {
335                 dispose();
336             }
337         }
338
339         protected byte[] processData(byte[] data) throws Exception JavaDoc
340         {
341             UMOMessageAdapter adapter = connector.getMessageAdapter(data);
342             OutputStream os = new ResponseOutputStream(socket.getOutputStream(), socket);
343             UMOMessage returnMessage = routeMessage(new MuleMessage(adapter), endpoint.isSynchronous(), os);
344             if (returnMessage != null)
345             {
346                 return returnMessage.getPayloadAsBytes();
347             }
348             else
349             {
350                 return null;
351             }
352         }
353
354     }
355
356 }
357
Popular Tags