1 4 package com.tc.io; 5 6 import com.tc.bytes.TCByteBuffer; 7 import com.tc.bytes.TCByteBufferFactory; 8 import com.tc.util.Assert; 9 10 import java.io.DataOutputStream ; 11 import java.io.IOException ; 12 import java.io.OutputStream ; 13 import java.io.UTFDataFormatException ; 14 import java.util.ArrayList ; 15 import java.util.Collections ; 16 import java.util.IdentityHashMap ; 17 import java.util.Iterator ; 18 import java.util.List ; 19 import java.util.Map ; 20 21 26 public class TCByteBufferOutputStream extends OutputStream implements TCByteBufferOutput { 27 28 private static final int DEFAULT_MAX_BLOCK_SIZE = 4096; 29 private static final int DEFAULT_INITIAL_BLOCK_SIZE = 32; 30 31 private final boolean direct; 32 private final int maxBlockSize; 33 private final DataOutputStream dos; 34 35 private List buffers = new ArrayList (); 37 private Map localBuffers = new IdentityHashMap (); 38 private TCByteBuffer current; 39 private boolean closed; 40 private int written; 41 private int blockSize; 42 43 47 public TCByteBufferOutputStream() { 48 this(DEFAULT_INITIAL_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE, false); 49 } 50 51 public TCByteBufferOutputStream(int blockSize, boolean direct) { 52 this(blockSize, blockSize, false); 53 } 54 55 public TCByteBufferOutputStream(int initialBlockSize, int maxBlockSize, boolean direct) { 56 if (maxBlockSize < 1) { throw new IllegalArgumentException ("Max block size must be greater than or equal to 1"); } 57 if (initialBlockSize < 1) { throw new IllegalArgumentException ( 58 "Initial block size must be greater than or equal to 1"); } 59 60 if (maxBlockSize < initialBlockSize) { throw new IllegalArgumentException ( 61 "Initial block size less than max block size"); } 62 63 this.maxBlockSize = maxBlockSize; 64 this.blockSize = initialBlockSize; 65 this.direct = direct; 66 this.closed = false; 67 this.dos = new DataOutputStream (this); 68 addBuffer(); 69 } 70 71 76 public Mark mark() { 77 checkClosed(); 78 return new Mark(buffers.size(), current.position(), getBytesWritten()); 79 } 80 81 public void write(int b) { 82 checkClosed(); 83 84 written++; 85 86 if (!current.hasRemaining()) { 87 addBuffer(); 88 } 89 90 current.put((byte) b); 91 } 92 93 public void write(byte b[]) { 94 write(b, 0, b.length); 95 } 96 97 public void write(TCByteBuffer data) { 98 if (data == null) { throw new NullPointerException (); } 99 write(new TCByteBuffer[] { data }); 100 } 101 102 106 public void write(TCByteBuffer[] data) { 107 checkClosed(); 108 if (data == null) { throw new NullPointerException (); } 109 if (data.length == 0) { return; } 110 111 final boolean reuseCurrent = current.position() == 0; 113 114 if (!reuseCurrent) { 115 buffers.add(current.limit(current.position()).position(0)); 117 } 118 119 for (int i = 0, n = data.length; i < n; i++) { 120 int len = data[i].limit(); 121 if (len == 0) { 122 continue; 123 } 124 125 written += len; 126 buffers.add(data[i].duplicate().position(0)); 127 } 128 129 if (!reuseCurrent) { 130 current = (TCByteBuffer) buffers.remove(buffers.size() - 1); 131 current.position(current.limit()); 132 } 133 } 134 135 public int getBytesWritten() { 136 return written; 137 } 138 139 public void write(byte b[], final int offset, final int length) { 140 checkClosed(); 141 142 if (b == null) { throw new NullPointerException (); } 143 144 if ((offset < 0) || (offset > b.length) || (length < 0) || ((offset + length) > b.length)) { throw new IndexOutOfBoundsException (); } 145 146 if (length == 0) { return; } 147 148 written += length; 150 151 int index = offset; 152 int numToWrite = length; 153 while (numToWrite > 0) { 154 if (!current.hasRemaining()) { 155 addBuffer(); 156 } 157 final int numToPut = Math.min(current.remaining(), numToWrite); 158 current.put(b, index, numToPut); 159 numToWrite -= numToPut; 160 index += numToPut; 161 } 162 } 163 164 public void close() { 165 if (!closed) { 166 finalizeBuffers(); 167 closed = true; 168 } 169 } 170 171 174 public TCByteBuffer[] toArray() { 175 close(); 176 TCByteBuffer[] rv = new TCByteBuffer[buffers.size()]; 177 return (TCByteBuffer[]) buffers.toArray(rv); 178 } 179 180 public String toString() { 181 return (buffers == null) ? "null" : buffers.toString(); 182 } 183 184 private void addBuffer() { 185 if (current != null) { 186 current.flip(); 187 buffers.add(current); 188 189 if (blockSize < maxBlockSize) { 191 blockSize *= 2; 192 193 if (blockSize > maxBlockSize) { 194 blockSize = maxBlockSize; 195 } 196 } 197 } 198 199 current = TCByteBufferFactory.getInstance(direct, blockSize); 200 blockSize = current.capacity(); 201 localBuffers.put(current, current); 202 } 203 204 private void finalizeBuffers() { 205 if (current.position() > 0) { 206 current.flip(); 207 buffers.add(current); 208 } 209 210 current = null; 211 212 List finalBufs = new ArrayList (); 213 TCByteBuffer[] bufs = new TCByteBuffer[buffers.size()]; 214 bufs = (TCByteBuffer[]) buffers.toArray(bufs); 215 216 final int num = bufs.length; 217 int index = 0; 218 219 while (index < num) { 221 final int startIndex = index; 222 int size = bufs[startIndex].limit(); 223 224 if (size < maxBlockSize) { 225 while (index < (num - 1)) { 226 int nextSize = bufs[index + 1].limit(); 227 if ((size + nextSize) <= maxBlockSize) { 228 size += nextSize; 229 index++; 230 } else { 231 break; 232 } 233 } 234 } 235 236 if (index > startIndex) { 237 TCByteBuffer consolidated = TCByteBufferFactory.getInstance(direct, size); 238 localBuffers.put(consolidated, consolidated); 239 final int end = index; 240 for (int i = startIndex; i <= end; i++) { 241 consolidated.put(bufs[i]); 242 if (localBuffers.remove(bufs[i]) != null) { 243 bufs[i].recycle(); 244 } 245 } 246 Assert.assertEquals(size, consolidated.position()); 247 consolidated.flip(); 248 finalBufs.add(consolidated); 249 } else { 250 finalBufs.add(bufs[index]); 251 } 252 253 index++; 254 } 255 256 buffers = Collections.unmodifiableList(finalBufs); 257 } 258 259 public final void writeBoolean(boolean value) { 260 try { 261 dos.writeBoolean(value); 262 } catch (IOException e) { 263 throw new AssertionError (e); 264 } 265 } 266 267 public final void writeByte(int value) { 268 try { 269 dos.writeByte(value); 270 } catch (IOException e) { 271 throw new AssertionError (e); 272 } 273 } 274 275 public final void writeChar(int value) { 276 try { 277 dos.writeChar(value); 278 } catch (IOException e) { 279 throw new AssertionError (e); 280 } 281 } 282 283 public final void writeDouble(double value) { 284 try { 285 dos.writeDouble(value); 286 } catch (IOException e) { 287 throw new AssertionError (e); 288 } 289 } 290 291 public final void writeFloat(float value) { 292 try { 293 dos.writeFloat(value); 294 } catch (IOException e) { 295 throw new AssertionError (e); 296 } 297 } 298 299 public final void writeInt(int value) { 300 try { 301 dos.writeInt(value); 302 } catch (IOException e) { 303 throw new AssertionError (e); 304 } 305 } 306 307 public final void writeLong(long value) { 308 try { 309 dos.writeLong(value); 310 } catch (IOException e) { 311 throw new AssertionError (e); 312 } 313 } 314 315 public final void writeShort(int value) { 316 try { 317 dos.writeShort(value); 318 } catch (IOException e) { 319 throw new AssertionError (e); 320 } 321 } 322 323 public final void writeString(String string) { 324 writeString(string, false); 325 } 326 327 public final void writeString(String string, boolean forceRaw) { 328 if (string == null) { 330 writeBoolean(true); 331 return; 332 } else { 333 writeBoolean(false); 334 } 335 336 if (!forceRaw) { 337 Mark mark = mark(); 338 write(1); 340 341 try { 342 dos.writeUTF(string); 343 return; 345 } catch (IOException e) { 346 if (!(e instanceof UTFDataFormatException )) { throw new AssertionError (e); } 347 mark.write(0); 349 } 350 } else { 351 write(0); 352 } 353 354 writeStringAsRawChars(string); 355 } 356 357 private void writeStringAsRawChars(String string) { 358 if (string == null) { throw new AssertionError (); } 359 writeInt(string.length()); 360 try { 361 dos.writeChars(string); 362 } catch (IOException e) { 363 throw new AssertionError (e); 364 } 365 } 366 367 private void checkClosed() { 368 if (closed) { throw new IllegalStateException ("stream is closed"); } 369 } 370 371 public class Mark { 376 private final int bufferIndex; 377 private final int bufferPosition; 378 private final int absolutePosition; 379 380 private Mark(int bufferIndex, int bufferPosition, int absolutePosition) { 381 this.bufferIndex = bufferIndex; 382 this.bufferPosition = bufferPosition; 383 this.absolutePosition = absolutePosition; 384 } 385 386 public int getPosition() { 387 return this.absolutePosition; 388 } 389 390 393 public void write(byte[] data) { 394 checkClosed(); 395 396 if (data == null) { throw new NullPointerException (); } 397 398 if (data.length == 0) { return; } 399 400 if (getBytesWritten() - absolutePosition < data.length) { throw new IllegalArgumentException ( 401 "Cannot write past the existing tail of stream via the mark"); } 402 403 TCByteBuffer buf = getBuffer(bufferIndex); 404 405 int bufIndex = bufferIndex; 406 int bufPos = bufferPosition; 407 int dataIndex = 0; 408 int numToWrite = data.length; 409 410 while (numToWrite > 0) { 411 int howMany = Math.min(numToWrite, buf.limit() - bufPos); 412 413 if (howMany > 0) { 414 buf.put(bufPos, data, dataIndex, howMany); 415 dataIndex += howMany; 416 numToWrite -= howMany; 417 if (numToWrite == 0) { return; } 418 } 419 420 buf = getBuffer(++bufIndex); 421 bufPos = 0; 422 } 423 } 424 425 private TCByteBuffer getBuffer(int index) { 426 if (index <= buffers.size() - 1) { 427 return (TCByteBuffer) buffers.get(index); 428 } else if (index == buffers.size()) { 429 return current; 430 } else { 431 throw Assert.failure("index=" + index + ", buffers.size()=" + buffers.size()); 432 } 433 } 434 435 439 public void write(int b) { 440 write(new byte[] { (byte) b }); 441 } 442 } 443 444 public void recycle() { 445 if (localBuffers.size() > 0) { 446 for (Iterator i = localBuffers.keySet().iterator(); i.hasNext();) { 447 TCByteBuffer buffer = (TCByteBuffer) i.next(); 448 buffer.recycle(); 449 } 450 } 451 } 452 453 } | Popular Tags |