KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > io > PipedInputStream


1 /*
2  * @(#)PipedInputStream.java 1.35 03/12/19
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.io;
9
10 /**
11  * A piped input stream should be connected
12  * to a piped output stream; the piped input
13  * stream then provides whatever data bytes
14  * are written to the piped output stream.
15  * Typically, data is read from a <code>PipedInputStream</code>
16  * object by one thread and data is written
17  * to the corresponding <code>PipedOutputStream</code>
18  * by some other thread. Attempting to use
19  * both objects from a single thread is not
20  * recommended, as it may deadlock the thread.
21  * The piped input stream contains a buffer,
22  * decoupling read operations from write operations,
23  * within limits.
24  *
25  * @author James Gosling
26  * @version 1.35, 12/19/03
27  * @see java.io.PipedOutputStream
28  * @since JDK1.0
29  */

30 public
31 class PipedInputStream extends InputStream JavaDoc {
32     boolean closedByWriter = false;
33     volatile boolean closedByReader = false;
34     boolean connected = false;
35
36     /* REMIND: identification of the read and write sides needs to be
37        more sophisticated. Either using thread groups (but what about
38        pipes within a thread?) or using finalization (but it may be a
39        long time until the next GC). */

40     Thread JavaDoc readSide;
41     Thread JavaDoc writeSide;
42
43     /**
44      * The size of the pipe's circular input buffer.
45      * @since JDK1.1
46      */

47     protected static final int PIPE_SIZE = 1024;
48
49     /**
50      * The circular buffer into which incoming data is placed.
51      * @since JDK1.1
52      */

53     protected byte buffer[] = new byte[PIPE_SIZE];
54
55     /**
56      * The index of the position in the circular buffer at which the
57      * next byte of data will be stored when received from the connected
58      * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
59      * <code>in==out</code> implies the buffer is full
60      * @since JDK1.1
61      */

62     protected int in = -1;
63
64     /**
65      * The index of the position in the circular buffer at which the next
66      * byte of data will be read by this piped input stream.
67      * @since JDK1.1
68      */

69     protected int out = 0;
70
71     /**
72      * Creates a <code>PipedInputStream</code> so
73      * that it is connected to the piped output
74      * stream <code>src</code>. Data bytes written
75      * to <code>src</code> will then be available
76      * as input from this stream.
77      *
78      * @param src the stream to connect to.
79      * @exception IOException if an I/O error occurs.
80      */

81     public PipedInputStream(PipedOutputStream JavaDoc src) throws IOException JavaDoc {
82     connect(src);
83     }
84
85     /**
86      * Creates a <code>PipedInputStream</code> so
87      * that it is not yet connected. It must be
88      * connected to a <code>PipedOutputStream</code>
89      * before being used.
90      *
91      * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
92      * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
93      */

94     public PipedInputStream() {
95     }
96
97     /**
98      * Causes this piped input stream to be connected
99      * to the piped output stream <code>src</code>.
100      * If this object is already connected to some
101      * other piped output stream, an <code>IOException</code>
102      * is thrown.
103      * <p>
104      * If <code>src</code> is an
105      * unconnected piped output stream and <code>snk</code>
106      * is an unconnected piped input stream, they
107      * may be connected by either the call:
108      * <p>
109      * <pre><code>snk.connect(src)</code> </pre>
110      * <p>
111      * or the call:
112      * <p>
113      * <pre><code>src.connect(snk)</code> </pre>
114      * <p>
115      * The two
116      * calls have the same effect.
117      *
118      * @param src The piped output stream to connect to.
119      * @exception IOException if an I/O error occurs.
120      */

121     public void connect(PipedOutputStream JavaDoc src) throws IOException JavaDoc {
122     src.connect(this);
123     }
124
125     /**
126      * Receives a byte of data. This method will block if no input is
127      * available.
128      * @param b the byte being received
129      * @exception IOException If the pipe is broken.
130      * @since JDK1.1
131      */

132     protected synchronized void receive(int b) throws IOException JavaDoc {
133         checkStateForReceive();
134         writeSide = Thread.currentThread();
135         if (in == out)
136             awaitSpace();
137     if (in < 0) {
138         in = 0;
139         out = 0;
140     }
141     buffer[in++] = (byte)(b & 0xFF);
142     if (in >= buffer.length) {
143         in = 0;
144     }
145     }
146
147     /**
148      * Receives data into an array of bytes. This method will
149      * block until some input is available.
150      * @param b the buffer into which the data is received
151      * @param off the start offset of the data
152      * @param len the maximum number of bytes received
153      * @exception IOException If an I/O error has occurred.
154      */

155     synchronized void receive(byte b[], int off, int len) throws IOException JavaDoc {
156         checkStateForReceive();
157         writeSide = Thread.currentThread();
158         int bytesToTransfer = len;
159         while (bytesToTransfer > 0) {
160             if (in == out)
161                 awaitSpace();
162             int nextTransferAmount = 0;
163             if (out < in) {
164                 nextTransferAmount = buffer.length - in;
165             } else if (in < out) {
166                 if (in == -1) {
167                     in = out = 0;
168                     nextTransferAmount = buffer.length - in;
169                 } else {
170                     nextTransferAmount = out - in;
171                 }
172             }
173             if (nextTransferAmount > bytesToTransfer)
174                 nextTransferAmount = bytesToTransfer;
175             assert(nextTransferAmount > 0);
176             System.arraycopy(b, off, buffer, in, nextTransferAmount);
177             bytesToTransfer -= nextTransferAmount;
178             off += nextTransferAmount;
179             in += nextTransferAmount;
180             if (in >= buffer.length) {
181                 in = 0;
182             }
183         }
184     }
185
186     private void checkStateForReceive() throws IOException JavaDoc {
187         if (!connected) {
188             throw new IOException JavaDoc("Pipe not connected");
189         } else if (closedByWriter || closedByReader) {
190         throw new IOException JavaDoc("Pipe closed");
191     } else if (readSide != null && !readSide.isAlive()) {
192             throw new IOException JavaDoc("Read end dead");
193         }
194     }
195
196     private void awaitSpace() throws IOException JavaDoc {
197     while (in == out) {
198         if ((readSide != null) && !readSide.isAlive()) {
199         throw new IOException JavaDoc("Pipe broken");
200         }
201         /* full: kick any waiting readers */
202         notifyAll();
203         try {
204             wait(1000);
205         } catch (InterruptedException JavaDoc ex) {
206         throw new java.io.InterruptedIOException JavaDoc();
207         }
208     }
209     }
210
211     /**
212      * Notifies all waiting threads that the last byte of data has been
213      * received.
214      */

215     synchronized void receivedLast() {
216     closedByWriter = true;
217     notifyAll();
218     }
219
220     /**
221      * Reads the next byte of data from this piped input stream. The
222      * value byte is returned as an <code>int</code> in the range
223      * <code>0</code> to <code>255</code>. If no byte is available
224      * because the end of the stream has been reached, the value
225      * <code>-1</code> is returned. This method blocks until input data
226      * is available, the end of the stream is detected, or an exception
227      * is thrown.
228      * If a thread was providing data bytes
229      * to the connected piped output stream, but
230      * the thread is no longer alive, then an
231      * <code>IOException</code> is thrown.
232      *
233      * @return the next byte of data, or <code>-1</code> if the end of the
234      * stream is reached.
235      * @exception IOException if the pipe is broken.
236      */

237     public synchronized int read() throws IOException JavaDoc {
238         if (!connected) {
239             throw new IOException JavaDoc("Pipe not connected");
240         } else if (closedByReader) {
241         throw new IOException JavaDoc("Pipe closed");
242     } else if (writeSide != null && !writeSide.isAlive()
243                    && !closedByWriter && (in < 0)) {
244             throw new IOException JavaDoc("Write end dead");
245         }
246
247         readSide = Thread.currentThread();
248     int trials = 2;
249     while (in < 0) {
250         if (closedByWriter) {
251         /* closed by writer, return EOF */
252         return -1;
253         }
254         if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
255         throw new IOException JavaDoc("Pipe broken");
256         }
257             /* might be a writer waiting */
258         notifyAll();
259         try {
260             wait(1000);
261         } catch (InterruptedException JavaDoc ex) {
262         throw new java.io.InterruptedIOException JavaDoc();
263         }
264     }
265     int ret = buffer[out++] & 0xFF;
266     if (out >= buffer.length) {
267         out = 0;
268     }
269     if (in == out) {
270             /* now empty */
271         in = -1;
272     }
273     return ret;
274     }
275
276     /**
277      * Reads up to <code>len</code> bytes of data from this piped input
278      * stream into an array of bytes. Less than <code>len</code> bytes
279      * will be read if the end of the data stream is reached. This method
280      * blocks until at least one byte of input is available.
281      * If a thread was providing data bytes
282      * to the connected piped output stream, but
283      * the thread is no longer alive, then an
284      * <code>IOException</code> is thrown.
285      *
286      * @param b the buffer into which the data is read.
287      * @param off the start offset of the data.
288      * @param len the maximum number of bytes read.
289      * @return the total number of bytes read into the buffer, or
290      * <code>-1</code> if there is no more data because the end of
291      * the stream has been reached.
292      * @exception IOException if an I/O error occurs.
293      */

294     public synchronized int read(byte b[], int off, int len) throws IOException JavaDoc {
295     if (b == null) {
296         throw new NullPointerException JavaDoc();
297     } else if ((off < 0) || (off > b.length) || (len < 0) ||
298            ((off + len) > b.length) || ((off + len) < 0)) {
299         throw new IndexOutOfBoundsException JavaDoc();
300     } else if (len == 0) {
301         return 0;
302     }
303
304         /* possibly wait on the first character */
305     int c = read();
306     if (c < 0) {
307         return -1;
308     }
309     b[off] = (byte) c;
310     int rlen = 1;
311     while ((in >= 0) && (--len > 0)) {
312         b[off + rlen] = buffer[out++];
313         rlen++;
314         if (out >= buffer.length) {
315         out = 0;
316         }
317         if (in == out) {
318                 /* now empty */
319         in = -1;
320         }
321     }
322     return rlen;
323     }
324
325     /**
326      * Returns the number of bytes that can be read from this input
327      * stream without blocking. This method overrides the <code>available</code>
328      * method of the parent class.
329      *
330      * @return the number of bytes that can be read from this input stream
331      * without blocking.
332      * @exception IOException if an I/O error occurs.
333      * @since JDK1.0.2
334      */

335   public synchronized int available() throws IOException JavaDoc {
336     if(in < 0)
337       return 0;
338     else if(in == out)
339       return buffer.length;
340     else if (in > out)
341       return in - out;
342     else
343       return in + buffer.length - out;
344   }
345
346     /**
347      * Closes this piped input stream and releases any system resources
348      * associated with the stream.
349      *
350      * @exception IOException if an I/O error occurs.
351      */

352     public void close() throws IOException JavaDoc {
353     closedByReader = true;
354         synchronized (this) {
355             in = -1;
356         }
357     }
358 }
359
Popular Tags