1 18 package org.apache.activemq.transport.udp; 19 20 import java.io.DataInputStream ; 21 import java.io.DataOutputStream ; 22 import java.io.IOException ; 23 import java.net.DatagramPacket ; 24 import java.net.DatagramSocket ; 25 import java.net.SocketAddress ; 26 27 import org.apache.activemq.command.Command; 28 import org.apache.activemq.command.Endpoint; 29 import org.apache.activemq.command.LastPartialCommand; 30 import org.apache.activemq.command.PartialCommand; 31 import org.apache.activemq.openwire.BooleanStream; 32 import org.apache.activemq.openwire.OpenWireFormat; 33 import org.apache.activemq.transport.reliable.ReplayBuffer; 34 import org.apache.activemq.util.ByteArrayInputStream; 35 import org.apache.activemq.util.ByteArrayOutputStream; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 39 44 public class CommandDatagramSocket extends CommandChannelSupport { 45 46 private static final Log log = LogFactory.getLog(CommandDatagramSocket.class); 47 48 private DatagramSocket channel; 49 private Object readLock = new Object (); 50 private Object writeLock = new Object (); 51 52 public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, 53 SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) { 54 super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); 55 this.channel = channel; 56 } 57 58 public void start() throws Exception { 59 } 60 61 public void stop() throws Exception { 62 } 63 64 public Command read() throws IOException { 65 Command answer = null; 66 Endpoint from = null; 67 synchronized (readLock) { 68 while (true) { 69 DatagramPacket datagram = createDatagramPacket(); 70 channel.receive(datagram); 71 72 DataInputStream dataIn = new DataInputStream (new ByteArrayInputStream(datagram.getData())); 75 76 from = headerMarshaller.createEndpoint(datagram, dataIn); 77 answer = (Command) wireFormat.unmarshal(dataIn); 78 break; 79 } 80 } 81 if (answer != null) { 82 answer.setFrom(from); 83 84 if (log.isDebugEnabled()) { 85 log.debug("Channel: " + name + " about to process: " + answer); 86 } 87 } 88 return answer; 89 } 90 91 public void write(Command command, SocketAddress address) throws IOException { 92 synchronized (writeLock) { 93 94 ByteArrayOutputStream writeBuffer = createByteArrayOutputStream(); 95 DataOutputStream dataOut = new DataOutputStream (writeBuffer); 96 headerMarshaller.writeHeader(command, dataOut); 97 98 int offset = writeBuffer.size(); 99 100 wireFormat.marshal(command, dataOut); 101 102 if (remaining(writeBuffer) >= 0) { 103 sendWriteBuffer(address, writeBuffer, command.getCommandId()); 104 } 105 else { 106 byte[] data = writeBuffer.toByteArray(); 108 boolean lastFragment = false; 109 for (int fragment = 0, length = data.length; !lastFragment; fragment++) { 110 writeBuffer = createByteArrayOutputStream(); 111 headerMarshaller.writeHeader(command, dataOut); 112 113 int chunkSize = remaining(writeBuffer); 114 115 118 BooleanStream bs = null; 120 if (wireFormat.isTightEncodingEnabled()) { 121 bs = new BooleanStream(); 122 bs.writeBoolean(true); } 125 126 chunkSize -= 1 + 4 + 4; 133 if (bs != null) { 135 chunkSize -= bs.marshalledSize(); 136 } 137 else { 138 chunkSize -= 1; 139 } 140 141 if (!wireFormat.isSizePrefixDisabled()) { 142 dataOut.writeInt(chunkSize); 144 chunkSize -= 4; 145 } 146 147 lastFragment = offset + chunkSize >= length; 148 if (chunkSize + offset > length) { 149 chunkSize = length - offset; 150 } 151 152 if (lastFragment) { 153 dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE); 154 } 155 else { 156 dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE); 157 } 158 159 if (bs != null) { 160 bs.marshal(dataOut); 161 } 162 163 int commandId = command.getCommandId(); 164 if (fragment > 0) { 165 commandId = sequenceGenerator.getNextSequenceId(); 166 } 167 dataOut.writeInt(commandId); 168 if (bs == null) { 169 dataOut.write((byte) 1); 170 } 171 172 dataOut.writeInt(chunkSize); 174 175 dataOut.write(data, offset, chunkSize); 177 178 offset += chunkSize; 179 sendWriteBuffer(address, writeBuffer, commandId); 180 } 181 } 182 } 183 } 184 185 public int getDatagramSize() { 186 return datagramSize; 187 } 188 189 public void setDatagramSize(int datagramSize) { 190 this.datagramSize = datagramSize; 191 } 192 193 protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) 196 throws IOException { 197 byte[] data = writeBuffer.toByteArray(); 198 sendWriteBuffer(commandId, address, data, false); 199 } 200 201 protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) 202 throws IOException { 203 ReplayBuffer bufferCache = getReplayBuffer(); 206 if (bufferCache != null && !redelivery) { 207 bufferCache.addBuffer(commandId, data); 208 } 209 210 if (log.isDebugEnabled()) { 211 String text = (redelivery) ? "REDELIVERING" : "sending"; 212 log.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address); 213 } 214 DatagramPacket packet = new DatagramPacket (data, 0, data.length, address); 215 channel.send(packet); 216 } 217 218 public void sendBuffer(int commandId, Object buffer) throws IOException { 219 if (buffer != null) { 220 byte[] data = (byte[]) buffer; 221 sendWriteBuffer(commandId, replayAddress, data, true); 222 } 223 else { 224 if (log.isWarnEnabled()) { 225 log.warn("Request for buffer: " + commandId + " is no longer present"); 226 } 227 } 228 } 229 230 protected DatagramPacket createDatagramPacket() { 231 return new DatagramPacket (new byte[datagramSize], datagramSize); 232 } 233 234 protected int remaining(ByteArrayOutputStream buffer) { 235 return datagramSize - buffer.size(); 236 } 237 238 protected ByteArrayOutputStream createByteArrayOutputStream() { 239 return new ByteArrayOutputStream(datagramSize); 240 } 241 } 242 | Popular Tags |