1 10 11 package org.mule.providers.udp; 12 13 import java.io.IOException ; 14 import java.net.DatagramPacket ; 15 import java.net.DatagramSocket ; 16 import java.net.InetAddress ; 17 import java.net.URI ; 18 import java.util.Map ; 19 20 import org.mule.impl.MuleMessage; 21 import org.mule.providers.AbstractMessageDispatcher; 22 import org.mule.umo.UMOEvent; 23 import org.mule.umo.UMOException; 24 import org.mule.umo.UMOMessage; 25 import org.mule.umo.endpoint.UMOImmutableEndpoint; 26 import org.mule.umo.provider.UMOConnector; 27 28 32 33 public class UdpMessageDispatcher extends AbstractMessageDispatcher 34 { 35 protected final UdpConnector connector; 36 protected InetAddress inetAddress; 37 protected DatagramSocket socket; 38 protected int port; 39 40 public UdpMessageDispatcher(UMOImmutableEndpoint endpoint) 41 { 42 super(endpoint); 43 this.connector = (UdpConnector)endpoint.getConnector(); 44 } 45 46 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 47 { 48 if (!connected.get()) 49 { 50 URI uri = endpoint.getEndpointURI().getUri(); 51 port = uri.getPort(); 52 inetAddress = InetAddress.getByName(uri.getHost()); 53 socket = createSocket(port, inetAddress); 54 } 55 } 56 57 protected void doDisconnect() throws Exception 58 { 59 try 60 { 61 if (socket != null) 62 { 63 socket.close(); 64 } 65 } 66 finally 67 { 68 socket = null; 69 } 70 } 71 72 protected DatagramSocket createSocket(int port, InetAddress inetAddress) throws IOException 73 { 74 DatagramSocket socket = new DatagramSocket (); 75 socket.setReceiveBufferSize(connector.getBufferSize()); 76 socket.setSendBufferSize(connector.getBufferSize()); 77 socket.setSoTimeout(connector.getTimeout()); 78 return socket; 79 } 80 81 protected synchronized void doDispatch(UMOEvent event) throws Exception 82 { 83 byte[] payload = event.getTransformedMessageAsBytes(); 84 write(socket, payload); 85 } 86 87 protected void write(DatagramSocket socket, byte[] data) throws IOException 88 { 89 DatagramPacket packet = new DatagramPacket (data, data.length); 90 if (port >= 0) 91 { 92 packet.setPort(port); 93 } 94 packet.setAddress(inetAddress); 95 socket.send(packet); 96 } 97 98 protected synchronized UMOMessage doSend(UMOEvent event) throws Exception 99 { 100 doDispatch(event); 101 if (event.getEndpoint().isRemoteSync()) 103 { 104 DatagramPacket result = receive(socket, event.getTimeout()); 105 if (result == null) 106 { 107 return null; 108 } 109 return new MuleMessage(connector.getMessageAdapter(result), event.getMessage()); 110 } 111 else 112 { 113 return event.getMessage(); 114 } 115 } 116 117 private DatagramPacket receive(DatagramSocket socket, int timeout) throws IOException 118 { 119 int origTimeout = socket.getSoTimeout(); 120 try 121 { 122 DatagramPacket packet = new DatagramPacket (new byte[connector.getBufferSize()], 123 connector.getBufferSize()); 124 socket.setSoTimeout(timeout); 125 socket.receive(packet); 126 return packet; 127 } 128 finally 129 { 130 socket.setSoTimeout(origTimeout); 131 } 132 } 133 134 146 protected synchronized UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 147 { 148 DatagramPacket result = receive(socket, (int)timeout); 149 if (result == null) 150 { 151 return null; 152 } 153 return new MuleMessage(connector.getMessageAdapter(result), (Map )null); 154 } 155 156 public Object getDelegateSession() throws UMOException 157 { 158 return null; 159 } 160 161 public UMOConnector getConnector() 162 { 163 return connector; 164 } 165 166 protected void doDispose() 167 { 168 } 170 } 171 | Popular Tags |