1 18 package com.Ostermiller.util; 19 20 import java.io.InputStream ; 21 import java.io.OutputStream ; 22 import java.io.IOException ; 23 24 43 public class CircularByteBuffer { 44 45 50 private final static int DEFAULT_SIZE = 1024; 51 52 57 public final static int INFINITE_SIZE = -1; 58 59 81 protected byte[] buffer; 82 87 protected volatile int readPosition = 0; 88 93 protected volatile int writePosition = 0; 94 99 protected volatile int markPosition = 0; 100 106 protected volatile int markSize = 0; 107 112 protected volatile boolean infinite = false; 113 119 protected boolean blockingWrite = true; 120 125 protected InputStream in = new CircularByteBufferInputStream(); 126 131 protected boolean inputStreamClosed = false; 132 137 protected OutputStream out = new CircularByteBufferOutputStream(); 138 143 protected boolean outputStreamClosed = false; 144 145 152 public void clear(){ 153 synchronized (this){ 154 readPosition = 0; 155 writePosition = 0; 156 markPosition = 0; 157 outputStreamClosed = false; 158 inputStreamClosed = false; 159 } 160 } 161 162 177 public OutputStream getOutputStream(){ 178 return out; 179 } 180 181 192 public InputStream getInputStream(){ 193 return in; 194 } 195 196 208 public int getAvailable(){ 209 synchronized (this){ 210 return available(); 211 } 212 } 213 214 227 public int getSpaceLeft(){ 228 synchronized (this){ 229 return spaceLeft(); 230 } 231 } 232 233 245 public int getSize(){ 246 synchronized (this){ 247 return buffer.length; 248 } 249 } 250 251 256 private void resize(){ 257 byte[] newBuffer = new byte[buffer.length * 2]; 258 int marked = marked(); 259 int available = available(); 260 if (markPosition <= writePosition){ 261 int length = writePosition - markPosition; 265 System.arraycopy(buffer, markPosition, newBuffer, 0, length); 266 } else { 267 int length1 = buffer.length - markPosition; 268 System.arraycopy(buffer, markPosition, newBuffer, 0, length1); 269 int length2 = writePosition; 270 System.arraycopy(buffer, 0, newBuffer, length1, length2); 271 } 272 buffer = newBuffer; 273 markPosition = 0; 274 readPosition = marked; 275 writePosition = marked + available; 276 } 277 278 283 private int spaceLeft(){ 284 if (writePosition < markPosition){ 285 return (markPosition - writePosition - 1); 289 } else { 290 return ((buffer.length - 1) - (writePosition - markPosition)); 292 } 293 } 294 295 300 private int available(){ 301 if (readPosition <= writePosition){ 302 return (writePosition - readPosition); 306 } else { 307 return (buffer.length - (readPosition - writePosition)); 309 } 310 } 311 312 317 private int marked(){ 318 if (markPosition <= readPosition){ 319 return (readPosition - markPosition); 323 } else { 324 return (buffer.length - (markPosition - readPosition)); 326 } 327 } 328 329 335 private void ensureMark(){ 336 if (marked() >= markSize){ 337 markPosition = readPosition; 338 markSize = 0; 339 } 340 } 341 342 349 public CircularByteBuffer(){ 350 this (DEFAULT_SIZE, true); 351 } 352 353 370 public CircularByteBuffer(int size){ 371 this (size, true); 372 } 373 374 384 public CircularByteBuffer(boolean blockingWrite){ 385 this (DEFAULT_SIZE, blockingWrite); 386 } 387 388 407 public CircularByteBuffer(int size, boolean blockingWrite){ 408 if (size == INFINITE_SIZE){ 409 buffer = new byte[DEFAULT_SIZE]; 410 infinite = true; 411 } else { 412 buffer = new byte[size]; 413 infinite = false; 414 } 415 this.blockingWrite = blockingWrite; 416 } 417 418 423 protected class CircularByteBufferInputStream extends InputStream { 424 425 435 public int available() throws IOException { 436 synchronized (CircularByteBuffer.this){ 437 if (inputStreamClosed) throw new IOException ("InputStream has been closed, it is not ready."); 438 return (CircularByteBuffer.this.available()); 439 } 440 } 441 442 451 public void close() throws IOException { 452 synchronized (CircularByteBuffer.this){ 453 inputStreamClosed = true; 454 } 455 } 456 457 470 public void mark(int readAheadLimit) { 471 synchronized (CircularByteBuffer.this){ 472 if (buffer.length - 1 > readAheadLimit) { 474 markSize = readAheadLimit; 475 markPosition = readPosition; 476 } 477 } 478 } 479 480 487 public boolean markSupported() { 488 return true; 489 } 490 491 502 public int read() throws IOException { 503 while (true){ 504 synchronized (CircularByteBuffer.this){ 505 if (inputStreamClosed) throw new IOException ("InputStream has been closed; cannot read from a closed InputStream."); 506 int available = CircularByteBuffer.this.available(); 507 if (available > 0){ 508 int result = buffer[readPosition] & 0xff; 509 readPosition++; 510 if (readPosition == buffer.length){ 511 readPosition = 0; 512 } 513 ensureMark(); 514 return result; 515 } else if (outputStreamClosed){ 516 return -1; 517 } 518 } 519 try { 520 Thread.sleep(100); 521 } catch(Exception x){ 522 throw new IOException ("Blocking read operation interrupted."); 523 } 524 } 525 } 526 527 539 public int read(byte[] cbuf) throws IOException { 540 return read(cbuf, 0, cbuf.length); 541 } 542 543 557 public int read(byte[] cbuf, int off, int len) throws IOException { 558 while (true){ 559 synchronized (CircularByteBuffer.this){ 560 if (inputStreamClosed) throw new IOException ("InputStream has been closed; cannot read from a closed InputStream."); 561 int available = CircularByteBuffer.this.available(); 562 if (available > 0){ 563 int length = Math.min(len, available); 564 int firstLen = Math.min(length, buffer.length - readPosition); 565 int secondLen = length - firstLen; 566 System.arraycopy(buffer, readPosition, cbuf, off, firstLen); 567 if (secondLen > 0){ 568 System.arraycopy(buffer, 0, cbuf, off+firstLen, secondLen); 569 readPosition = secondLen; 570 } else { 571 readPosition += length; 572 } 573 if (readPosition == buffer.length) { 574 readPosition = 0; 575 } 576 ensureMark(); 577 return length; 578 } else if (outputStreamClosed){ 579 return -1; 580 } 581 } 582 try { 583 Thread.sleep(100); 584 } catch(Exception x){ 585 throw new IOException ("Blocking read operation interrupted."); 586 } 587 } 588 } 589 590 600 public void reset() throws IOException { 601 synchronized (CircularByteBuffer.this){ 602 if (inputStreamClosed) throw new IOException ("InputStream has been closed; cannot reset a closed InputStream."); 603 readPosition = markPosition; 604 } 605 } 606 607 619 public long skip(long n) throws IOException , IllegalArgumentException { 620 while (true){ 621 synchronized (CircularByteBuffer.this){ 622 if (inputStreamClosed) throw new IOException ("InputStream has been closed; cannot skip bytes on a closed InputStream."); 623 int available = CircularByteBuffer.this.available(); 624 if (available > 0){ 625 int length = Math.min((int)n, available); 626 int firstLen = Math.min(length, buffer.length - readPosition); 627 int secondLen = length - firstLen; 628 if (secondLen > 0){ 629 readPosition = secondLen; 630 } else { 631 readPosition += length; 632 } 633 if (readPosition == buffer.length) { 634 readPosition = 0; 635 } 636 return length; 637 } else if (outputStreamClosed){ 638 return 0; 639 } 640 } 641 try { 642 Thread.sleep(100); 643 } catch(Exception x){ 644 throw new IOException ("Blocking read operation interrupted."); 645 } 646 } 647 } 648 } 649 650 658 protected class CircularByteBufferOutputStream extends OutputStream { 659 660 672 public void close() throws IOException { 673 synchronized (CircularByteBuffer.this){ 674 if (!outputStreamClosed){ 675 flush(); 676 } 677 outputStreamClosed = true; 678 } 679 } 680 681 688 public void flush() throws IOException { 689 if (outputStreamClosed) throw new IOException ("OutputStream has been closed; cannot flush a closed OutputStream."); 690 if (inputStreamClosed) throw new IOException ("Buffer closed by inputStream; cannot flush."); 691 } 693 694 707 public void write(byte[] cbuf) throws IOException { 708 write(cbuf, 0, cbuf.length); 709 } 710 711 726 public void write(byte[] cbuf, int off, int len) throws IOException { 727 while (len > 0){ 728 synchronized (CircularByteBuffer.this){ 729 if (outputStreamClosed) throw new IOException ("OutputStream has been closed; cannot write to a closed OutputStream."); 730 if (inputStreamClosed) throw new IOException ("Buffer closed by InputStream; cannot write to a closed buffer."); 731 int spaceLeft = spaceLeft(); 732 while (infinite && spaceLeft < len){ 733 resize(); 734 spaceLeft = spaceLeft(); 735 } 736 if (!blockingWrite && spaceLeft < len) throw new BufferOverflowException("CircularByteBuffer is full; cannot write " + len + " bytes"); 737 int realLen = Math.min(len, spaceLeft); 738 int firstLen = Math.min(realLen, buffer.length - writePosition); 739 int secondLen = Math.min(realLen - firstLen, buffer.length - markPosition - 1); 740 int written = firstLen + secondLen; 741 if (firstLen > 0){ 742 System.arraycopy(cbuf, off, buffer, writePosition, firstLen); 743 } 744 if (secondLen > 0){ 745 System.arraycopy(cbuf, off+firstLen, buffer, 0, secondLen); 746 writePosition = secondLen; 747 } else { 748 writePosition += written; 749 } 750 if (writePosition == buffer.length) { 751 writePosition = 0; 752 } 753 off += written; 754 len -= written; 755 } 756 if (len > 0){ 757 try { 758 Thread.sleep(100); 759 } catch(Exception x){ 760 throw new IOException ("Waiting for available space in buffer interrupted."); 761 } 762 } 763 } 764 } 765 766 780 public void write(int c) throws IOException { 781 boolean written = false; 782 while (!written){ 783 synchronized (CircularByteBuffer.this){ 784 if (outputStreamClosed) throw new IOException ("OutputStream has been closed; cannot write to a closed OutputStream."); 785 if (inputStreamClosed) throw new IOException ("Buffer closed by InputStream; cannot write to a closed buffer."); 786 int spaceLeft = spaceLeft(); 787 while (infinite && spaceLeft < 1){ 788 resize(); 789 spaceLeft = spaceLeft(); 790 } 791 if (!blockingWrite && spaceLeft < 1) throw new BufferOverflowException("CircularByteBuffer is full; cannot write 1 byte"); 792 if (spaceLeft > 0){ 793 buffer[writePosition] = (byte)(c & 0xff); 794 writePosition++; 795 if (writePosition == buffer.length) { 796 writePosition = 0; 797 } 798 written = true; 799 } 800 } 801 if (!written){ 802 try { 803 Thread.sleep(100); 804 } catch(Exception x){ 805 throw new IOException ("Waiting for available space in buffer interrupted."); 806 } 807 } 808 } 809 } 810 } 811 } 812 | Popular Tags |