1 4 package com.tc.net.protocol; 5 6 import com.tc.bytes.TCByteBuffer; 7 import com.tc.bytes.TCByteBufferFactory; 8 import com.tc.logging.TCLogger; 9 import com.tc.net.core.TCConnection; 10 import com.tc.util.Assert; 11 12 import java.util.LinkedList ; 13 14 19 public abstract class AbstractTCProtocolAdaptor implements TCProtocolAdaptor { 20 protected static final int MODE_HEADER = 1; 21 protected static final int MODE_DATA = 2; 22 23 private final TCLogger logger; 24 private final LinkedList collectedMessages = new LinkedList (); 25 private int dataBytesNeeded; 26 private AbstractTCNetworkHeader header; 27 private TCByteBuffer[] dataBuffers; 28 private int bufferIndex = -1; 29 private int mode; 30 31 public AbstractTCProtocolAdaptor(TCLogger logger) { 32 this.logger = logger; 33 init(); 34 } 35 36 public void addReadData(TCConnection source, TCByteBuffer[] data, int length) throws TCProtocolException { 37 processIncomingData(source, data, length); 38 } 39 40 public final TCByteBuffer[] getReadBuffers() { 41 if (mode == MODE_HEADER) { return new TCByteBuffer[] { header.getDataBuffer() }; } 42 43 Assert.eval(mode == MODE_DATA); 44 45 if (dataBuffers == null) { 46 dataBuffers = createDataBuffers(dataBytesNeeded); 47 Assert.eval(dataBuffers.length > 0); 48 bufferIndex = 0; 49 } 50 51 final TCByteBuffer[] rv = new TCByteBuffer[dataBuffers.length - bufferIndex]; 53 System.arraycopy(dataBuffers, bufferIndex, rv, 0, rv.length); 54 55 boolean spaceAvail = false; 57 for (int i = 0, n = rv.length; i < n; i++) { 58 if (rv[i].hasRemaining()) { 59 spaceAvail = true; 60 break; 61 } 62 } 63 64 Assert.assertTrue("No space in buffers to read more data", spaceAvail); 65 return rv; 66 } 67 68 abstract protected AbstractTCNetworkHeader getNewProtocolHeader(); 69 70 abstract protected TCNetworkMessage createMessage(TCConnection source, TCNetworkHeader hdr, TCByteBuffer[] data) 72 throws TCProtocolException; 73 74 abstract protected int computeDataLength(TCNetworkHeader hdr); 75 76 protected final TCNetworkMessage collectMessage() { 77 synchronized (this.collectedMessages) { 78 return (TCNetworkMessage) collectedMessages.removeFirst(); 79 } 80 } 81 82 protected final void init() { 83 mode = MODE_HEADER; 84 dataBuffers = null; 85 header = getNewProtocolHeader(); 86 } 87 88 protected final boolean processIncomingData(TCConnection source, TCByteBuffer[] data, final int length) 89 throws TCProtocolException { 90 if (mode == MODE_HEADER) { return processHeaderData(source, data); } 91 92 Assert.eval(mode == MODE_DATA); 93 if (length > dataBytesNeeded) { throw new TCProtocolException("More data read then expected: (" + length + " > " 94 + dataBytesNeeded + ")"); } 95 return processPayloadData(source, data); 96 } 97 98 protected final TCLogger getLogger() { 99 return logger; 100 } 101 102 private TCByteBuffer[] createDataBuffers(int length) { 103 Assert.eval(mode == MODE_DATA); 104 return TCByteBufferFactory.getFixedSizedInstancesForLength(false, length); 105 } 106 107 private boolean processHeaderData(TCConnection source, final TCByteBuffer[] data) throws TCProtocolException { 108 Assert.eval(data.length == 1); 109 Assert.eval(data[0] == this.header.getDataBuffer()); 110 111 if (!this.header.isHeaderLengthAvail()) { return false; } 112 113 final TCByteBuffer buf = data[0]; 114 final int headerLength = this.header.getHeaderByteLength(); 115 final int bufferLength = buf.limit(); 116 117 if (headerLength == AbstractTCNetworkHeader.LENGTH_NOT_AVAIL) { return false; } 118 119 if ((headerLength < header.minLength) || (headerLength > header.maxLength) || (headerLength < bufferLength)) { 120 throw new TCProtocolException("Invalid Header Length: " + headerLength + ", min: " + header.minLength + ", max: " 122 + header.maxLength + ", bufLen: " + bufferLength); 123 } 124 125 if (bufferLength != headerLength) { 126 130 buf.limit(headerLength); 132 return false; 133 } else { 134 Assert.eval(bufferLength == headerLength); 135 136 if (buf.position() == headerLength) { 137 this.header.validate(); 138 139 this.mode = MODE_DATA; 140 this.dataBytesNeeded = computeDataLength(this.header); 141 this.dataBuffers = null; 142 143 if (this.dataBytesNeeded < 0) { throw new TCProtocolException("Negative data size detected: " 144 + this.dataBytesNeeded); } 145 146 if (0 == this.dataBytesNeeded) { 148 synchronized (this.collectedMessages) { 149 this.collectedMessages.addLast(createMessage(source, this.header, null)); 150 } 151 return true; 152 } 153 154 return false; 155 } else { 156 return false; 158 } 159 } 160 } 161 162 private boolean processPayloadData(TCConnection source, final TCByteBuffer[] data) throws TCProtocolException { 163 for (int i = 0; i < data.length; i++) { 164 final TCByteBuffer buffer = data[i]; 165 166 if (!buffer.hasRemaining()) { 167 buffer.flip(); 168 dataBytesNeeded -= buffer.limit(); 169 bufferIndex++; 170 171 if (dataBytesNeeded < 0) { throw new TCProtocolException("More data in buffers than expected"); } 172 } else { 173 break; 174 } 175 } 176 177 if (0 == dataBytesNeeded) { 178 if (bufferIndex != dataBuffers.length) { throw new TCProtocolException("Not all buffers consumed"); } 179 180 TCNetworkMessage msg = createMessage(source, header, dataBuffers); 182 183 if (logger.isDebugEnabled()) { 184 logger.debug("Message complete on connection " + source + ": " + msg.toString()); 185 } 186 187 synchronized (collectedMessages) { 188 collectedMessages.addLast(msg); 189 } 190 191 return true; 192 } 193 194 Assert.eval(dataBytesNeeded > 0); 195 196 return false; 198 } 199 200 } | Popular Tags |