1 4 package com.tc.net.protocol.tcm; 5 6 import com.tc.bytes.TCByteBuffer; 7 import com.tc.io.TCByteBufferInputStream; 8 import com.tc.io.TCByteBufferOutput; 9 import com.tc.io.TCSerializable; 10 import com.tc.net.protocol.AbstractTCNetworkMessage; 11 import com.tc.util.Assert; 12 import com.tc.util.concurrent.SetOnceFlag; 13 14 import java.io.IOException ; 15 16 19 public abstract class TCMessageImpl extends AbstractTCNetworkMessage implements TCMessage { 20 21 private final MessageMonitor monitor; 22 private final SetOnceFlag processed = new SetOnceFlag(); 23 private final SetOnceFlag isSent = new SetOnceFlag(); 24 private final TCMessageType type; 25 private final MessageChannel channel; 26 private int nvCount; 27 28 private TCByteBufferOutput out; 29 private TCByteBufferInputStream bbis; 30 private int messageVersion; 31 32 35 protected TCMessageImpl(MessageMonitor monitor, TCByteBufferOutput output, MessageChannel channel, TCMessageType type) { 36 super(new TCMessageHeaderImpl(type)); 37 this.monitor = monitor; 38 this.type = type; 39 this.channel = channel; 40 41 this.out = output; 43 44 this.out.writeInt(0); 46 } 47 48 54 protected TCMessageImpl(MessageMonitor monitor, MessageChannel channel, TCMessageHeader header, TCByteBuffer[] data) { 55 super(header, data); 56 this.monitor = monitor; 57 this.type = TCMessageType.getInstance(header.getMessageType()); 58 this.messageVersion = header.getMessageTypeVersion(); 59 this.bbis = new TCByteBufferInputStream(data); 60 this.channel = channel; 61 } 62 63 public TCMessageType getMessageType() { 64 return type; 65 } 66 67 protected int getMessageVersion() { 68 return this.messageVersion; 69 } 70 71 protected void setMessageVersion(int version) { 72 this.messageVersion = version; 73 } 74 75 protected TCByteBufferInputStream getInputStream() { 77 return this.bbis; 78 } 79 80 85 protected void dehydrateValues() { 86 } 88 89 92 public void dehydrate() { 93 if (processed.attemptSet()) { 94 try { 95 dehydrateValues(); 96 97 final TCByteBuffer[] nvData = out.toArray(); 98 99 Assert.eval(nvData.length > 0); 100 nvData[0].putInt(0, nvCount); 101 setPayload(nvData); 102 103 TCMessageHeader hdr = (TCMessageHeader) getHeader(); 104 hdr.setMessageType(getMessageType().getType()); 105 hdr.setMessageTypeVersion(getMessageVersion()); 106 107 seal(); 108 } catch (Throwable t) { 109 t.printStackTrace(); 110 throw new RuntimeException (t); 111 } finally { 112 this.out.close(); 113 if(! isOutputStreamRecycled()) this.out = null; 114 } 115 } 116 } 117 118 123 public synchronized void hydrate() throws IOException , UnknownNameException { 124 if (processed.attemptSet()) { 125 try { 126 final int count = bbis.readInt(); 127 if (count < 0) { throw new IOException ("negative NV count: " + count); } 128 129 for (int i = 0; i < count; i++) { 130 final byte name = bbis.readByte(); 131 if (!hydrateValue(name)) { 132 logger.error(" Hydrate Error - " + toString()); 133 throw new UnknownNameException(getClass(), name); 134 } 135 } 136 } finally { 137 this.bbis.close(); 138 this.bbis = null; 139 doRecycleOnRead(); 140 } 141 monitor.newIncomingMessage(this); 142 } 143 } 144 145 public void doRecycleOnRead() { 147 recycle(); 148 } 149 150 protected boolean isOutputStreamRecycled() { 152 return false; 153 } 154 155 protected void recycleOutputStream() { 156 if(out != null) { 157 out.recycle(); 158 } 159 } 160 161 167 protected boolean hydrateValue(byte name) throws IOException { 168 if (false) { throw new IOException ("silence compiler warning"); } 169 return false; 170 } 171 172 protected boolean getBooleanValue() throws IOException { 173 return bbis.readBoolean(); 174 } 175 176 protected byte getByteValue() throws IOException { 177 return bbis.readByte(); 178 } 179 180 protected char getCharValue() throws IOException { 181 return bbis.readChar(); 182 } 183 184 protected double getDoubleValue() throws IOException { 185 return bbis.readDouble(); 186 } 187 188 protected float getFloatValue() throws IOException { 189 return bbis.readFloat(); 190 } 191 192 protected int getIntValue() throws IOException { 193 return bbis.readInt(); 194 } 195 196 protected long getLongValue() throws IOException { 197 return bbis.readLong(); 198 } 199 200 protected short getShortValue() throws IOException { 201 return bbis.readShort(); 202 } 203 204 protected Object getObject(TCSerializable target) throws IOException { 205 return target.deserializeFrom(bbis); 206 } 207 208 protected String getStringValue() throws IOException { 209 return bbis.readString(); 210 } 211 212 protected void putNVPair(byte name, boolean value) { 213 nvCount++; 214 out.write(name); 215 out.writeBoolean(value); 216 } 217 218 protected void putNVPair(byte name, byte value) { 219 nvCount++; 220 out.write(name); 221 out.writeByte(value); 222 } 223 224 protected void putNVPair(byte name, char value) { 225 nvCount++; 226 out.write(name); 227 out.writeChar(value); 228 } 229 230 protected void putNVPair(byte name, double value) { 231 nvCount++; 232 out.write(name); 233 out.writeDouble(value); 234 } 235 236 protected void putNVPair(byte name, float value) { 237 nvCount++; 238 out.write(name); 239 out.writeFloat(value); 240 } 241 242 protected void putNVPair(byte name, int value) { 243 nvCount++; 244 out.write(name); 245 out.writeInt(value); 246 } 247 248 protected void putNVPair(byte name, long value) { 249 nvCount++; 250 out.write(name); 251 out.writeLong(value); 252 } 253 254 protected void putNVPair(byte name, short value) { 255 nvCount++; 256 out.write(name); 257 out.writeShort(value); 258 } 259 260 protected void putNVPair(byte name, String value) { 261 nvCount++; 262 out.write(name); 263 out.writeString(value); 264 } 265 266 protected void putNVPair(byte name, TCSerializable object) { 267 nvCount++; 268 out.write(name); 269 object.serializeTo(out); 270 } 271 272 protected void putNVPair(byte name, TCByteBuffer[] data) { 273 nvCount++; 274 out.write(name); 275 out.write(data); 276 } 277 278 public ChannelID getChannelID() { 279 return channel.getChannelID(); 280 } 281 282 public MessageChannel getChannel() { 283 return channel; 284 } 285 286 291 public void send() { 292 if (isSent.attemptSet()) { 293 dehydrate(); 294 basicSend(); 295 } 296 } 297 298 private void basicSend() { 299 channel.send(this); 300 monitor.newOutgoingMessage(this); 301 } 302 } 303 | Popular Tags |