1 36 package org.columba.ristretto.io; 37 38 import java.io.FilterInputStream ; 39 import java.io.IOException ; 40 import java.io.InputStream ; 41 42 import org.columba.ristretto.concurrency.Semaphore; 43 44 53 public class AsyncInputStream extends FilterInputStream { 54 55 private Semaphore semaphore; 56 private IOException exception; 57 private int size; 58 private int read; 59 60 66 public AsyncInputStream(InputStream source, int size) { 67 super(source); 68 semaphore = new Semaphore(); 69 this.size = size; 70 read = 0; 71 } 72 73 74 77 public int read() throws IOException { 78 if( read < size ) { 79 80 semaphore.acquire(); 81 if( exception != null ) throw exception; 82 if( read < size ) { 83 read++; 84 85 return super.read(); 86 } else { 87 return -1; 88 } 89 } else { 90 return -1; 91 } 92 } 93 94 95 96 101 public void exceptionOccured( IOException e ) { 102 exception = e; 103 } 104 105 110 public void grow(int size) { 111 semaphore.release(size); 112 } 113 114 117 public int read(byte[] arg0, int arg1, int arg2) throws IOException { 118 int next; 119 for( int i=0; i<arg2; i++) { 120 next = read(); 121 if( next == -1 ) { 122 if( i == 0 ) { 123 return -1; 124 } else { 125 return i; 126 } 127 } 128 arg0[arg1+i] = (byte) next; 129 } 130 return arg2; 131 } 132 135 public int available() throws IOException { 136 return size - read; 137 } 138 141 public int getSize() { 142 return size; 143 } 144 147 public void setSize(int size) { 148 this.size = size; 149 } 150 } 151 | Popular Tags |