1 18 package org.apache.activemq.transport.udp; 19 20 import java.io.DataInputStream ; 21 import java.io.DataOutputStream ; 22 import java.io.IOException ; 23 import java.net.SocketAddress ; 24 import java.nio.ByteBuffer ; 25 import java.nio.channels.DatagramChannel ; 26 27 import org.apache.activemq.command.Command; 28 import org.apache.activemq.command.Endpoint; 29 import org.apache.activemq.command.LastPartialCommand; 30 import org.apache.activemq.command.PartialCommand; 31 import org.apache.activemq.openwire.BooleanStream; 32 import org.apache.activemq.openwire.OpenWireFormat; 33 import org.apache.activemq.transport.reliable.ReplayBuffer; 34 import org.apache.activemq.util.ByteArrayInputStream; 35 import org.apache.activemq.util.ByteArrayOutputStream; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 39 44 public class CommandDatagramChannel extends CommandChannelSupport { 45 46 private static final Log log = LogFactory.getLog(CommandDatagramChannel.class); 47 48 private DatagramChannel channel; 49 private ByteBufferPool bufferPool; 50 51 private Object readLock = new Object (); 53 private ByteBuffer readBuffer; 54 55 private Object writeLock = new Object (); 57 private int defaultMarshalBufferSize = 64 * 1024; 58 59 public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, 60 SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel, 61 ByteBufferPool bufferPool) { 62 super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); 63 this.channel = channel; 64 this.bufferPool = bufferPool; 65 } 66 67 public void start() throws Exception { 68 bufferPool.setDefaultSize(datagramSize); 69 bufferPool.start(); 70 readBuffer = bufferPool.borrowBuffer(); 71 } 72 73 public void stop() throws Exception { 74 bufferPool.stop(); 75 } 76 77 public Command read() throws IOException { 78 Command answer = null; 79 Endpoint from = null; 80 synchronized (readLock) { 81 while (true) { 82 readBuffer.clear(); 83 SocketAddress address = channel.receive(readBuffer); 84 85 readBuffer.flip(); 86 87 if (readBuffer.limit() == 0) { 88 continue; 89 } 90 from = headerMarshaller.createEndpoint(readBuffer, address); 91 92 int remaining = readBuffer.remaining(); 93 byte[] data = new byte[remaining]; 94 readBuffer.get(data); 95 96 DataInputStream dataIn = new DataInputStream (new ByteArrayInputStream(data)); 101 answer = (Command) wireFormat.unmarshal(dataIn); 102 break; 103 } 104 } 105 if (answer != null) { 106 answer.setFrom(from); 107 108 if (log.isDebugEnabled()) { 109 log.debug("Channel: " + name + " received from: " + from + " about to process: " + answer); 110 } 111 } 112 return answer; 113 } 114 115 public void write(Command command, SocketAddress address) throws IOException { 116 synchronized (writeLock) { 117 118 ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); 119 wireFormat.marshal(command, new DataOutputStream (largeBuffer)); 120 byte[] data = largeBuffer.toByteArray(); 121 int size = data.length; 122 123 ByteBuffer writeBuffer = bufferPool.borrowBuffer(); 124 writeBuffer.clear(); 125 headerMarshaller.writeHeader(command, writeBuffer); 126 127 if (size > writeBuffer.remaining()) { 128 int offset = 0; 130 boolean lastFragment = false; 131 for (int fragment = 0, length = data.length; !lastFragment; fragment++) { 132 if (fragment > 0) { 134 writeBuffer = bufferPool.borrowBuffer(); 135 writeBuffer.clear(); 136 headerMarshaller.writeHeader(command, writeBuffer); 137 } 138 139 int chunkSize = writeBuffer.remaining(); 140 141 144 BooleanStream bs = null; 146 if (wireFormat.isTightEncodingEnabled()) { 147 bs = new BooleanStream(); 148 bs.writeBoolean(true); } 151 152 chunkSize -= 1 + 4 + 4; 159 if (bs != null) { 161 chunkSize -= bs.marshalledSize(); 162 } 163 else { 164 chunkSize -= 1; 165 } 166 167 if (!wireFormat.isSizePrefixDisabled()) { 168 writeBuffer.putInt(chunkSize); 170 chunkSize -= 4; 171 } 172 173 lastFragment = offset + chunkSize >= length; 174 if (chunkSize + offset > length) { 175 chunkSize = length - offset; 176 } 177 178 if (lastFragment) { 179 writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE); 180 } 181 else { 182 writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); 183 } 184 185 if (bs != null) { 186 bs.marshal(writeBuffer); 187 } 188 189 int commandId = command.getCommandId(); 190 if (fragment > 0) { 191 commandId = sequenceGenerator.getNextSequenceId(); 192 } 193 writeBuffer.putInt(commandId); 194 if (bs == null) { 195 writeBuffer.put((byte) 1); 196 } 197 198 writeBuffer.putInt(chunkSize); 200 201 writeBuffer.put(data, offset, chunkSize); 203 204 offset += chunkSize; 205 sendWriteBuffer(commandId, address, writeBuffer, false); 206 } 207 } 208 else { 209 writeBuffer.put(data); 210 sendWriteBuffer(command.getCommandId(), address, writeBuffer, false); 211 } 212 } 213 } 214 215 218 public ByteBufferPool getBufferPool() { 219 return bufferPool; 220 } 221 222 225 public void setBufferPool(ByteBufferPool bufferPool) { 226 this.bufferPool = bufferPool; 227 } 228 229 protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) 232 throws IOException { 233 ReplayBuffer bufferCache = getReplayBuffer(); 236 if (bufferCache != null && !redelivery) { 237 bufferCache.addBuffer(commandId, writeBuffer); 238 } 239 240 writeBuffer.flip(); 241 242 if (log.isDebugEnabled()) { 243 String text = (redelivery) ? "REDELIVERING" : "sending"; 244 log.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address); 245 } 246 channel.send(writeBuffer, address); 247 } 248 249 public void sendBuffer(int commandId, Object buffer) throws IOException { 250 if (buffer != null) { 251 ByteBuffer writeBuffer = (ByteBuffer ) buffer; 252 sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true); 253 } 254 else { 255 if (log.isWarnEnabled()) { 256 log.warn("Request for buffer: " + commandId + " is no longer present"); 257 } 258 } 259 } 260 261 } 262 | Popular Tags |