1 7 8 package java.io; 9 10 30 public 31 class PipedInputStream extends InputStream { 32 boolean closedByWriter = false; 33 volatile boolean closedByReader = false; 34 boolean connected = false; 35 36 40 Thread readSide; 41 Thread writeSide; 42 43 47 protected static final int PIPE_SIZE = 1024; 48 49 53 protected byte buffer[] = new byte[PIPE_SIZE]; 54 55 62 protected int in = -1; 63 64 69 protected int out = 0; 70 71 81 public PipedInputStream(PipedOutputStream src) throws IOException { 82 connect(src); 83 } 84 85 94 public PipedInputStream() { 95 } 96 97 121 public void connect(PipedOutputStream src) throws IOException { 122 src.connect(this); 123 } 124 125 132 protected synchronized void receive(int b) throws IOException { 133 checkStateForReceive(); 134 writeSide = Thread.currentThread(); 135 if (in == out) 136 awaitSpace(); 137 if (in < 0) { 138 in = 0; 139 out = 0; 140 } 141 buffer[in++] = (byte)(b & 0xFF); 142 if (in >= buffer.length) { 143 in = 0; 144 } 145 } 146 147 155 synchronized void receive(byte b[], int off, int len) throws IOException { 156 checkStateForReceive(); 157 writeSide = Thread.currentThread(); 158 int bytesToTransfer = len; 159 while (bytesToTransfer > 0) { 160 if (in == out) 161 awaitSpace(); 162 int nextTransferAmount = 0; 163 if (out < in) { 164 nextTransferAmount = buffer.length - in; 165 } else if (in < out) { 166 if (in == -1) { 167 in = out = 0; 168 nextTransferAmount = buffer.length - in; 169 } else { 170 nextTransferAmount = out - in; 171 } 172 } 173 if (nextTransferAmount > bytesToTransfer) 174 nextTransferAmount = bytesToTransfer; 175 assert(nextTransferAmount > 0); 176 System.arraycopy(b, off, buffer, in, nextTransferAmount); 177 bytesToTransfer -= nextTransferAmount; 178 off += nextTransferAmount; 179 in += nextTransferAmount; 180 if (in >= buffer.length) { 181 in = 0; 182 } 183 } 184 } 185 186 private void checkStateForReceive() throws IOException { 187 if (!connected) { 188 throw new IOException ("Pipe not connected"); 189 } else if (closedByWriter || closedByReader) { 190 throw new IOException ("Pipe closed"); 191 } else if (readSide != null && !readSide.isAlive()) { 192 throw new IOException ("Read end dead"); 193 } 194 } 195 196 private void awaitSpace() throws IOException { 197 while (in == out) { 198 if ((readSide != null) && !readSide.isAlive()) { 199 throw new IOException ("Pipe broken"); 200 } 201 202 notifyAll(); 203 try { 204 wait(1000); 205 } catch (InterruptedException ex) { 206 throw new java.io.InterruptedIOException (); 207 } 208 } 209 } 210 211 215 synchronized void receivedLast() { 216 closedByWriter = true; 217 notifyAll(); 218 } 219 220 237 public synchronized int read() throws IOException { 238 if (!connected) { 239 throw new IOException ("Pipe not connected"); 240 } else if (closedByReader) { 241 throw new IOException ("Pipe closed"); 242 } else if (writeSide != null && !writeSide.isAlive() 243 && !closedByWriter && (in < 0)) { 244 throw new IOException ("Write end dead"); 245 } 246 247 readSide = Thread.currentThread(); 248 int trials = 2; 249 while (in < 0) { 250 if (closedByWriter) { 251 252 return -1; 253 } 254 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { 255 throw new IOException ("Pipe broken"); 256 } 257 258 notifyAll(); 259 try { 260 wait(1000); 261 } catch (InterruptedException ex) { 262 throw new java.io.InterruptedIOException (); 263 } 264 } 265 int ret = buffer[out++] & 0xFF; 266 if (out >= buffer.length) { 267 out = 0; 268 } 269 if (in == out) { 270 271 in = -1; 272 } 273 return ret; 274 } 275 276 294 public synchronized int read(byte b[], int off, int len) throws IOException { 295 if (b == null) { 296 throw new NullPointerException (); 297 } else if ((off < 0) || (off > b.length) || (len < 0) || 298 ((off + len) > b.length) || ((off + len) < 0)) { 299 throw new IndexOutOfBoundsException (); 300 } else if (len == 0) { 301 return 0; 302 } 303 304 305 int c = read(); 306 if (c < 0) { 307 return -1; 308 } 309 b[off] = (byte) c; 310 int rlen = 1; 311 while ((in >= 0) && (--len > 0)) { 312 b[off + rlen] = buffer[out++]; 313 rlen++; 314 if (out >= buffer.length) { 315 out = 0; 316 } 317 if (in == out) { 318 319 in = -1; 320 } 321 } 322 return rlen; 323 } 324 325 335 public synchronized int available() throws IOException { 336 if(in < 0) 337 return 0; 338 else if(in == out) 339 return buffer.length; 340 else if (in > out) 341 return in - out; 342 else 343 return in + buffer.length - out; 344 } 345 346 352 public void close() throws IOException { 353 closedByReader = true; 354 synchronized (this) { 355 in = -1; 356 } 357 } 358 } 359 | Popular Tags |