1 10 11 package org.mule.providers.udp; 12 13 import java.io.IOException ; 14 import java.net.DatagramPacket ; 15 import java.net.DatagramSocket ; 16 import java.net.InetAddress ; 17 import java.net.SocketTimeoutException ; 18 import java.net.URI ; 19 import java.net.UnknownHostException ; 20 21 import javax.resource.spi.work.Work ; 22 import javax.resource.spi.work.WorkException ; 23 import javax.resource.spi.work.WorkManager ; 24 25 import org.mule.config.i18n.Message; 26 import org.mule.config.i18n.Messages; 27 import org.mule.impl.MuleMessage; 28 import org.mule.providers.AbstractMessageReceiver; 29 import org.mule.umo.UMOComponent; 30 import org.mule.umo.UMOMessage; 31 import org.mule.umo.endpoint.UMOEndpoint; 32 import org.mule.umo.lifecycle.Disposable; 33 import org.mule.umo.lifecycle.InitialisationException; 34 import org.mule.umo.provider.UMOConnector; 35 import org.mule.umo.provider.UMOMessageAdapter; 36 import org.mule.umo.transformer.UMOTransformer; 37 38 41 public class UdpMessageReceiver extends AbstractMessageReceiver implements Work 42 { 43 protected DatagramSocket socket = null; 44 protected InetAddress inetAddress; 45 protected int bufferSize; 46 private URI uri; 47 protected UMOTransformer responseTransformer = null; 48 49 public UdpMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 50 throws InitialisationException 51 { 52 super(connector, component, endpoint); 53 bufferSize = ((UdpConnector)connector).getBufferSize(); 54 55 uri = endpoint.getEndpointURI().getUri(); 56 57 try 58 { 59 inetAddress = InetAddress.getByName(uri.getHost()); 60 } 61 catch (UnknownHostException e) 62 { 63 throw new InitialisationException(new Message("udp", 2, uri), e, this); 64 } 65 66 responseTransformer = getResponseTransformer(); 67 } 68 69 public void doConnect() throws Exception 70 { 71 try 72 { 73 socket = createSocket(uri, inetAddress); 74 socket.setSoTimeout(((UdpConnector)connector).getTimeout()); 75 socket.setReceiveBufferSize(bufferSize); 76 socket.setSendBufferSize(bufferSize); 77 } 78 catch (Exception e) 79 { 80 throw new InitialisationException(new Message("udp", 1, uri), e, this); 81 } 82 83 try 84 { 85 getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector); 86 } 87 catch (WorkException e) 88 { 89 throw new InitialisationException(new Message(Messages.FAILED_TO_SCHEDULE_WORK), e, this); 90 } 91 } 92 93 public void doDisconnect() throws Exception 94 { 95 disposing.set(true); 97 if (socket != null) 98 { 99 socket.close(); 100 } 101 102 } 103 104 protected UMOTransformer getResponseTransformer() throws InitialisationException 105 { 106 UMOTransformer transformer = component.getDescriptor().getResponseTransformer(); 107 if (transformer == null) 108 { 109 return connector.getDefaultResponseTransformer(); 110 } 111 return transformer; 112 } 113 114 protected DatagramSocket createSocket(URI uri, InetAddress inetAddress) throws IOException 115 { 116 return new DatagramSocket (uri.getPort(), inetAddress); 117 } 118 119 122 public DatagramSocket getSocket() 123 { 124 return socket; 125 } 126 127 protected DatagramPacket createPacket() 128 { 129 DatagramPacket packet = new DatagramPacket (new byte[bufferSize], bufferSize); 130 if (uri.getPort() > 0) 131 { 132 packet.setPort(uri.getPort()); 133 } 134 packet.setAddress(inetAddress); 135 return packet; 136 } 137 138 public void run() 139 { 140 while (!disposing.get()) 141 { 142 if (connector.isStarted()) 143 { 144 145 try 146 { 147 DatagramPacket packet = createPacket(); 148 try 149 { 150 socket.receive(packet); 151 152 if (logger.isTraceEnabled()) 153 { 154 logger.trace("Received packet on: " + inetAddress.toString()); 155 } 156 157 Work work = createWork(packet); 158 try 159 { 160 getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector); 161 } 162 catch (WorkException e) 163 { 164 logger.error("Udp receiver interrupted: " + e.getMessage(), e); 165 } 166 } 167 catch (SocketTimeoutException e) 168 { 169 } 171 172 } 173 catch (Exception e) 174 { 175 if (!connector.isDisposed() && !disposing.get()) 176 { 177 logger.debug("Accept failed on socket: " + e, e); 178 handleException(e); 179 } 180 } 181 } 182 } 183 } 184 185 public void release() 186 { 187 dispose(); 188 } 189 190 protected void doDispose() 191 { 192 if (socket != null && !socket.isClosed()) 193 { 194 logger.debug("Closing Udp connection: " + uri); 195 socket.close(); 196 logger.info("Closed Udp connection: " + uri); 197 } 198 } 199 200 protected Work createWork(DatagramPacket packet) throws IOException 201 { 202 return new UdpWorker(new DatagramSocket (0), packet); 203 } 204 205 protected class UdpWorker implements Work , Disposable 206 { 207 private DatagramSocket socket = null; 208 private DatagramPacket packet; 209 210 public UdpWorker(DatagramSocket socket, DatagramPacket packet) 211 { 212 this.socket = socket; 213 this.packet = packet; 214 } 215 216 public void release() 217 { 218 dispose(); 219 } 220 221 public void dispose() 222 { 223 if (socket != null && !socket.isClosed()) 224 { 225 try 226 { 227 socket.close(); 228 } 229 catch (Exception e) 230 { 231 logger.error("Socket close failed", e); 232 } 233 } 234 socket = null; 235 } 236 237 240 public void run() 241 { 242 UMOMessage returnMessage = null; 243 try 244 { 245 UMOMessageAdapter adapter = connector.getMessageAdapter(packet); 246 returnMessage = routeMessage(new MuleMessage(adapter), endpoint.isSynchronous()); 247 248 if (returnMessage != null) 249 { 250 byte[] data; 251 if (responseTransformer != null) 252 { 253 Object response = responseTransformer.transform(returnMessage.getPayload()); 254 if (response instanceof byte[]) 255 { 256 data = (byte[])response; 257 } 258 else 259 { 260 data = response.toString().getBytes(); 261 } 262 } 263 else 264 { 265 data = returnMessage.getPayloadAsBytes(); 266 } 267 DatagramPacket result = new DatagramPacket (data, data.length, packet.getAddress(), 268 packet.getPort()); 269 socket.send(result); 270 } 271 } 272 catch (Exception e) 273 { 274 if (!disposing.get()) 275 { 276 handleException(e); 277 } 278 } 279 finally 280 { 281 dispose(); 282 } 283 } 284 } 285 } 286 | Popular Tags |