KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > udp > UdpMessageReceiver


1 /*
2  * $Id: UdpMessageReceiver.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.udp;
12
13 import java.io.IOException JavaDoc;
14 import java.net.DatagramPacket JavaDoc;
15 import java.net.DatagramSocket JavaDoc;
16 import java.net.InetAddress JavaDoc;
17 import java.net.SocketTimeoutException JavaDoc;
18 import java.net.URI JavaDoc;
19 import java.net.UnknownHostException JavaDoc;
20
21 import javax.resource.spi.work.Work JavaDoc;
22 import javax.resource.spi.work.WorkException JavaDoc;
23 import javax.resource.spi.work.WorkManager JavaDoc;
24
25 import org.mule.config.i18n.Message;
26 import org.mule.config.i18n.Messages;
27 import org.mule.impl.MuleMessage;
28 import org.mule.providers.AbstractMessageReceiver;
29 import org.mule.umo.UMOComponent;
30 import org.mule.umo.UMOMessage;
31 import org.mule.umo.endpoint.UMOEndpoint;
32 import org.mule.umo.lifecycle.Disposable;
33 import org.mule.umo.lifecycle.InitialisationException;
34 import org.mule.umo.provider.UMOConnector;
35 import org.mule.umo.provider.UMOMessageAdapter;
36 import org.mule.umo.transformer.UMOTransformer;
37
38 /**
39  * <code>UdpMessageReceiver</code> receives UDP message packets.
40  */

41 public class UdpMessageReceiver extends AbstractMessageReceiver implements Work JavaDoc
42 {
43     protected DatagramSocket JavaDoc socket = null;
44     protected InetAddress JavaDoc inetAddress;
45     protected int bufferSize;
46     private URI JavaDoc uri;
47     protected UMOTransformer responseTransformer = null;
48
49     public UdpMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint)
50         throws InitialisationException
51     {
52         super(connector, component, endpoint);
53         bufferSize = ((UdpConnector)connector).getBufferSize();
54
55         uri = endpoint.getEndpointURI().getUri();
56
57         try
58         {
59             inetAddress = InetAddress.getByName(uri.getHost());
60         }
61         catch (UnknownHostException JavaDoc e)
62         {
63             throw new InitialisationException(new Message("udp", 2, uri), e, this);
64         }
65
66         responseTransformer = getResponseTransformer();
67     }
68
69     public void doConnect() throws Exception JavaDoc
70     {
71         try
72         {
73             socket = createSocket(uri, inetAddress);
74             socket.setSoTimeout(((UdpConnector)connector).getTimeout());
75             socket.setReceiveBufferSize(bufferSize);
76             socket.setSendBufferSize(bufferSize);
77         }
78         catch (Exception JavaDoc e)
79         {
80             throw new InitialisationException(new Message("udp", 1, uri), e, this);
81         }
82
83         try
84         {
85             getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector);
86         }
87         catch (WorkException JavaDoc e)
88         {
89             throw new InitialisationException(new Message(Messages.FAILED_TO_SCHEDULE_WORK), e, this);
90         }
91     }
92
93     public void doDisconnect() throws Exception JavaDoc
94     {
95         // this will cause the server thread to quit
96
disposing.set(true);
97         if (socket != null)
98         {
99             socket.close();
100         }
101
102     }
103
104     protected UMOTransformer getResponseTransformer() throws InitialisationException
105     {
106         UMOTransformer transformer = component.getDescriptor().getResponseTransformer();
107         if (transformer == null)
108         {
109             return connector.getDefaultResponseTransformer();
110         }
111         return transformer;
112     }
113
114     protected DatagramSocket JavaDoc createSocket(URI JavaDoc uri, InetAddress JavaDoc inetAddress) throws IOException JavaDoc
115     {
116         return new DatagramSocket JavaDoc(uri.getPort(), inetAddress);
117     }
118
119     /**
120      * Obtain the serverSocket
121      */

122     public DatagramSocket JavaDoc getSocket()
123     {
124         return socket;
125     }
126
127     protected DatagramPacket JavaDoc createPacket()
128     {
129         DatagramPacket JavaDoc packet = new DatagramPacket JavaDoc(new byte[bufferSize], bufferSize);
130         if (uri.getPort() > 0)
131         {
132             packet.setPort(uri.getPort());
133         }
134         packet.setAddress(inetAddress);
135         return packet;
136     }
137
138     public void run()
139     {
140         while (!disposing.get())
141         {
142             if (connector.isStarted())
143             {
144
145                 try
146                 {
147                     DatagramPacket JavaDoc packet = createPacket();
148                     try
149                     {
150                         socket.receive(packet);
151
152                         if (logger.isTraceEnabled())
153                         {
154                             logger.trace("Received packet on: " + inetAddress.toString());
155                         }
156
157                         Work JavaDoc work = createWork(packet);
158                         try
159                         {
160                             getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector);
161                         }
162                         catch (WorkException JavaDoc e)
163                         {
164                             logger.error("Udp receiver interrupted: " + e.getMessage(), e);
165                         }
166                     }
167                     catch (SocketTimeoutException JavaDoc e)
168                     {
169                         // ignore
170
}
171
172                 }
173                 catch (Exception JavaDoc e)
174                 {
175                     if (!connector.isDisposed() && !disposing.get())
176                     {
177                         logger.debug("Accept failed on socket: " + e, e);
178                         handleException(e);
179                     }
180                 }
181             }
182         }
183     }
184
185     public void release()
186     {
187         dispose();
188     }
189
190     protected void doDispose()
191     {
192         if (socket != null && !socket.isClosed())
193         {
194             logger.debug("Closing Udp connection: " + uri);
195             socket.close();
196             logger.info("Closed Udp connection: " + uri);
197         }
198     }
199
200     protected Work JavaDoc createWork(DatagramPacket JavaDoc packet) throws IOException JavaDoc
201     {
202         return new UdpWorker(new DatagramSocket JavaDoc(0), packet);
203     }
204
205     protected class UdpWorker implements Work JavaDoc, Disposable
206     {
207         private DatagramSocket JavaDoc socket = null;
208         private DatagramPacket JavaDoc packet;
209
210         public UdpWorker(DatagramSocket JavaDoc socket, DatagramPacket JavaDoc packet)
211         {
212             this.socket = socket;
213             this.packet = packet;
214         }
215
216         public void release()
217         {
218             dispose();
219         }
220
221         public void dispose()
222         {
223             if (socket != null && !socket.isClosed())
224             {
225                 try
226                 {
227                     socket.close();
228                 }
229                 catch (Exception JavaDoc e)
230                 {
231                     logger.error("Socket close failed", e);
232                 }
233             }
234             socket = null;
235         }
236
237         /**
238          * Accept requests from a given Udp address
239          */

240         public void run()
241         {
242             UMOMessage returnMessage = null;
243             try
244             {
245                 UMOMessageAdapter adapter = connector.getMessageAdapter(packet);
246                 returnMessage = routeMessage(new MuleMessage(adapter), endpoint.isSynchronous());
247
248                 if (returnMessage != null)
249                 {
250                     byte[] data;
251                     if (responseTransformer != null)
252                     {
253                         Object JavaDoc response = responseTransformer.transform(returnMessage.getPayload());
254                         if (response instanceof byte[])
255                         {
256                             data = (byte[])response;
257                         }
258                         else
259                         {
260                             data = response.toString().getBytes();
261                         }
262                     }
263                     else
264                     {
265                         data = returnMessage.getPayloadAsBytes();
266                     }
267                     DatagramPacket JavaDoc result = new DatagramPacket JavaDoc(data, data.length, packet.getAddress(),
268                         packet.getPort());
269                     socket.send(result);
270                 }
271             }
272             catch (Exception JavaDoc e)
273             {
274                 if (!disposing.get())
275                 {
276                     handleException(e);
277                 }
278             }
279             finally
280             {
281                 dispose();
282             }
283         }
284     }
285 }
286
Popular Tags