KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > AbstractTCProtocolAdaptor


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.protocol;
5
6 import com.tc.bytes.TCByteBuffer;
7 import com.tc.bytes.TCByteBufferFactory;
8 import com.tc.logging.TCLogger;
9 import com.tc.net.core.TCConnection;
10 import com.tc.util.Assert;
11
12 import java.util.LinkedList JavaDoc;
13
14 /**
15  * Base class for protocol adaptors
16  *
17  * @author teck
18  */

19 public abstract class AbstractTCProtocolAdaptor implements TCProtocolAdaptor {
20   protected static final int MODE_HEADER = 1;
21   protected static final int MODE_DATA = 2;
22
23   private final TCLogger logger;
24   private final LinkedList JavaDoc collectedMessages = new LinkedList JavaDoc();
25   private int dataBytesNeeded;
26   private AbstractTCNetworkHeader header;
27   private TCByteBuffer[] dataBuffers;
28   private int bufferIndex = -1;
29   private int mode;
30
31   public AbstractTCProtocolAdaptor(TCLogger logger) {
32     this.logger = logger;
33     init();
34   }
35
36   public void addReadData(TCConnection source, TCByteBuffer[] data, int length) throws TCProtocolException {
37     processIncomingData(source, data, length);
38   }
39
40   public final TCByteBuffer[] getReadBuffers() {
41     if (mode == MODE_HEADER) { return new TCByteBuffer[] { header.getDataBuffer() }; }
42
43     Assert.eval(mode == MODE_DATA);
44
45     if (dataBuffers == null) {
46       dataBuffers = createDataBuffers(dataBytesNeeded);
47       Assert.eval(dataBuffers.length > 0);
48       bufferIndex = 0;
49     }
50
51     // only return the subset of buffers that can actually receive more bytes
52
final TCByteBuffer[] rv = new TCByteBuffer[dataBuffers.length - bufferIndex];
53     System.arraycopy(dataBuffers, bufferIndex, rv, 0, rv.length);
54
55     // Make sure we're not passing back a set of arrays with no space left in them
56
boolean spaceAvail = false;
57     for (int i = 0, n = rv.length; i < n; i++) {
58       if (rv[i].hasRemaining()) {
59         spaceAvail = true;
60         break;
61       }
62     }
63
64     Assert.assertTrue("No space in buffers to read more data", spaceAvail);
65     return rv;
66   }
67
68   abstract protected AbstractTCNetworkHeader getNewProtocolHeader();
69
70   // subclasses override this method to return specific message types
71
abstract protected TCNetworkMessage createMessage(TCConnection source, TCNetworkHeader hdr, TCByteBuffer[] data)
72       throws TCProtocolException;
73
74   abstract protected int computeDataLength(TCNetworkHeader hdr);
75
76   protected final TCNetworkMessage collectMessage() {
77     synchronized (this.collectedMessages) {
78       return (TCNetworkMessage) collectedMessages.removeFirst();
79     }
80   }
81
82   protected final void init() {
83     mode = MODE_HEADER;
84     dataBuffers = null;
85     header = getNewProtocolHeader();
86   }
87
88   protected final boolean processIncomingData(TCConnection source, TCByteBuffer[] data, final int length)
89       throws TCProtocolException {
90     if (mode == MODE_HEADER) { return processHeaderData(source, data); }
91
92     Assert.eval(mode == MODE_DATA);
93     if (length > dataBytesNeeded) { throw new TCProtocolException("More data read then expected: (" + length + " > "
94                                                                   + dataBytesNeeded + ")"); }
95     return processPayloadData(source, data);
96   }
97
98   protected final TCLogger getLogger() {
99     return logger;
100   }
101
102   private TCByteBuffer[] createDataBuffers(int length) {
103     Assert.eval(mode == MODE_DATA);
104     return TCByteBufferFactory.getFixedSizedInstancesForLength(false, length);
105   }
106
107   private boolean processHeaderData(TCConnection source, final TCByteBuffer[] data) throws TCProtocolException {
108     Assert.eval(data.length == 1);
109     Assert.eval(data[0] == this.header.getDataBuffer());
110
111     if (!this.header.isHeaderLengthAvail()) { return false; }
112
113     final TCByteBuffer buf = data[0];
114     final int headerLength = this.header.getHeaderByteLength();
115     final int bufferLength = buf.limit();
116
117     if (headerLength == AbstractTCNetworkHeader.LENGTH_NOT_AVAIL) { return false; }
118
119     if ((headerLength < header.minLength) || (headerLength > header.maxLength) || (headerLength < bufferLength)) {
120       // header data is screwed
121
throw new TCProtocolException("Invalid Header Length: " + headerLength + ", min: " + header.minLength + ", max: "
122                                     + header.maxLength + ", bufLen: " + bufferLength);
123     }
124
125     if (bufferLength != headerLength) {
126       // maybe we should support a way to swap out the header buffer for a larger sized one
127
// instead of always manadating that the backing buffer behind a header have
128
// enough capacity for the largest possible header for the given protocol. Just a thought
129

130       // protocol header is bigger than min length, adjust buffer limit and continue
131
buf.limit(headerLength);
132       return false;
133     } else {
134       Assert.eval(bufferLength == headerLength);
135
136       if (buf.position() == headerLength) {
137         this.header.validate();
138
139         this.mode = MODE_DATA;
140         this.dataBytesNeeded = computeDataLength(this.header);
141         this.dataBuffers = null;
142
143         if (this.dataBytesNeeded < 0) { throw new TCProtocolException("Negative data size detected: "
144                                                                       + this.dataBytesNeeded); }
145
146         // allow for message types with zero length data payloads
147
if (0 == this.dataBytesNeeded) {
148           synchronized (this.collectedMessages) {
149             this.collectedMessages.addLast(createMessage(source, this.header, null));
150           }
151           return true;
152         }
153
154         return false;
155       } else {
156         // protocol header not completely read yet, do nothing
157
return false;
158       }
159     }
160   }
161
162   private boolean processPayloadData(TCConnection source, final TCByteBuffer[] data) throws TCProtocolException {
163     for (int i = 0; i < data.length; i++) {
164       final TCByteBuffer buffer = data[i];
165
166       if (!buffer.hasRemaining()) {
167         buffer.flip();
168         dataBytesNeeded -= buffer.limit();
169         bufferIndex++;
170
171         if (dataBytesNeeded < 0) { throw new TCProtocolException("More data in buffers than expected"); }
172       } else {
173         break;
174       }
175     }
176
177     if (0 == dataBytesNeeded) {
178       if (bufferIndex != dataBuffers.length) { throw new TCProtocolException("Not all buffers consumed"); }
179
180       // message is complete!
181
TCNetworkMessage msg = createMessage(source, header, dataBuffers);
182
183       if (logger.isDebugEnabled()) {
184         logger.debug("Message complete on connection " + source + ": " + msg.toString());
185       }
186
187       synchronized (collectedMessages) {
188         collectedMessages.addLast(msg);
189       }
190
191       return true;
192     }
193
194     Assert.eval(dataBytesNeeded > 0);
195
196     // data portion not done, try again later
197
return false;
198   }
199
200 }
Popular Tags