KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > eclipse > team > internal > core > streams > TimeoutInputStream


1 /*******************************************************************************
2  * Copyright (c) 2000, 2006 IBM Corporation and others.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Eclipse Public License v1.0
5  * which accompanies this distribution, and is available at
6  * http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Contributors:
9  * IBM Corporation - initial API and implementation
10  *******************************************************************************/

11 package org.eclipse.team.internal.core.streams;
12
13 import java.io.FilterInputStream JavaDoc;
14 import java.io.IOException JavaDoc;
15 import java.io.InputStream JavaDoc;
16 import java.io.InterruptedIOException JavaDoc;
17 import org.eclipse.team.internal.core.Policy;
18
19 /**
20  * Wraps an input stream that blocks indefinitely to simulate timeouts on read(),
21  * skip(), and close(). The resulting input stream is buffered and supports
22  * retrying operations that failed due to an InterruptedIOException.
23  *
24  * Supports resuming partially completed operations after an InterruptedIOException
25  * REGARDLESS of whether the underlying stream does unless the underlying stream itself
26  * generates InterruptedIOExceptions in which case it must also support resuming.
27  * Check the bytesTransferred field to determine how much of the operation completed;
28  * conversely, at what point to resume.
29  */

30 public class TimeoutInputStream extends FilterInputStream JavaDoc {
31     // unsynchronized variables
32
private final long readTimeout; // read() timeout in millis
33
private final long closeTimeout; // close() timeout in millis, or -1
34

35     // requests for the thread (synchronized)
36
private boolean closeRequested = false; // if true, close requested
37

38     // responses from the thread (synchronized)
39
private Thread JavaDoc thread; // if null, thread has terminated
40
private byte[] iobuffer; // circular buffer
41
private int head = 0; // points to first unread byte
42
private int length = 0; // number of remaining unread bytes
43
private IOException JavaDoc ioe = null; // if non-null, contains a pending exception
44
private boolean waitingForClose = false; // if true, thread is waiting for close()
45

46     private boolean growWhenFull = false; // if true, buffer will grow when it is full
47

48     /**
49      * Creates a timeout wrapper for an input stream.
50      * @param in the underlying input stream
51      * @param bufferSize the buffer size in bytes; should be large enough to mitigate
52      * Thread synchronization and context switching overhead
53      * @param readTimeout the number of milliseconds to block for a read() or skip() before
54      * throwing an InterruptedIOException; 0 blocks indefinitely
55      * @param closeTimeout the number of milliseconds to block for a close() before throwing
56      * an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background
57      */

58     public TimeoutInputStream(InputStream JavaDoc in, int bufferSize, long readTimeout, long closeTimeout) {
59         super(in);
60         this.readTimeout = readTimeout;
61         this.closeTimeout = closeTimeout;
62         this.iobuffer = new byte[bufferSize];
63         thread = new Thread JavaDoc(new Runnable JavaDoc() {
64             public void run() {
65                 runThread();
66             }
67         }, "TimeoutInputStream");//$NON-NLS-1$
68
thread.setDaemon(true);
69         thread.start();
70     }
71     
72     public TimeoutInputStream(InputStream JavaDoc in, int bufferSize, long readTimeout, long closeTimeout, boolean growWhenFull) {
73         this(in, bufferSize, readTimeout, closeTimeout);
74         this.growWhenFull = growWhenFull;
75     }
76
77     /**
78      * Wraps the underlying stream's method.
79      * It may be important to wait for a stream to actually be closed because it
80      * holds an implicit lock on a system resoure (such as a file) while it is
81      * open. Closing a stream may take time if the underlying stream is still
82      * servicing a previous request.
83      * @throws InterruptedIOException if the timeout expired
84      * @throws IOException if an i/o error occurs
85      */

86     public void close() throws IOException JavaDoc {
87         Thread JavaDoc oldThread;
88         synchronized (this) {
89             if (thread == null) return;
90             oldThread = thread;
91             closeRequested = true;
92             thread.interrupt();
93             checkError();
94         }
95         if (closeTimeout == -1) return;
96         try {
97             oldThread.join(closeTimeout);
98         } catch (InterruptedException JavaDoc e) {
99             Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
100
}
101         synchronized (this) {
102             checkError();
103             if (thread != null) throw new InterruptedIOException JavaDoc();
104         }
105     }
106     
107     /**
108      * Returns the number of unread bytes in the buffer.
109      * @throws IOException if an i/o error occurs
110      */

111     public synchronized int available() throws IOException JavaDoc {
112         if (length == 0) checkError();
113         return length > 0 ? length : 0;
114     }
115     
116     /**
117      * Reads a byte from the stream.
118      * @throws InterruptedIOException if the timeout expired and no data was received,
119      * bytesTransferred will be zero
120      * @throws IOException if an i/o error occurs
121      */

122     public synchronized int read() throws IOException JavaDoc {
123         if (! syncFill()) return -1; // EOF reached
124
int b = iobuffer[head++] & 255;
125         if (head == iobuffer.length) head = 0;
126         length--;
127         notify();
128         return b;
129     }
130     
131     /**
132      * Reads multiple bytes from the stream.
133      * @throws InterruptedIOException if the timeout expired and no data was received,
134      * bytesTransferred will be zero
135      * @throws IOException if an i/o error occurs
136      */

137     public synchronized int read(byte[] buffer, int off, int len) throws IOException JavaDoc {
138         if (! syncFill()) return -1; // EOF reached
139
int pos = off;
140         if (len > length) len = length;
141         while (len-- > 0) {
142             buffer[pos++] = iobuffer[head++];
143             if (head == iobuffer.length) head = 0;
144             length--;
145         }
146         notify();
147         return pos - off;
148     }
149
150     /**
151      * Skips multiple bytes in the stream.
152      * @throws InterruptedIOException if the timeout expired before all of the
153      * bytes specified have been skipped, bytesTransferred may be non-zero
154      * @throws IOException if an i/o error occurs
155      */

156     public synchronized long skip(long count) throws IOException JavaDoc {
157         long amount = 0;
158         try {
159             do {
160                 if (! syncFill()) break; // EOF reached
161
int skip = (int) Math.min(count - amount, length);
162                 head = (head + skip) % iobuffer.length;
163                 length -= skip;
164                 amount += skip;
165             } while (amount < count);
166         } catch (InterruptedIOException JavaDoc e) {
167             e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT
168
throw e;
169         }
170         notify();
171         return amount;
172     }
173
174     /**
175      * Mark is not supported by the wrapper even if the underlying stream does, returns false.
176      */

177     public boolean markSupported() {
178         return false;
179     }
180
181     /**
182      * Waits for the buffer to fill if it is empty and the stream has not reached EOF.
183      * @return true if bytes are available, false if EOF has been reached
184      * @throws InterruptedIOException if EOF not reached but no bytes are available
185      */

186     private boolean syncFill() throws IOException JavaDoc {
187         if (length != 0) return true;
188         checkError(); // check errors only after we have read all remaining bytes
189
if (waitingForClose) return false;
190         notify();
191         try {
192             wait(readTimeout);
193         } catch (InterruptedException JavaDoc e) {
194             Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
195
}
196         if (length != 0) return true;
197         checkError(); // check errors only after we have read all remaining bytes
198
if (waitingForClose) return false;
199         throw new InterruptedIOException JavaDoc();
200     }
201
202     /**
203      * If an exception is pending, throws it.
204      */

205     private void checkError() throws IOException JavaDoc {
206         if (ioe != null) {
207             IOException JavaDoc e = ioe;
208             ioe = null;
209             throw e;
210         }
211     }
212     
213     /**
214      * Runs the thread in the background.
215      */

216     private void runThread() {
217         try {
218             readUntilDone();
219         } catch (IOException JavaDoc e) {
220             synchronized (this) { ioe = e; }
221         } finally {
222             waitUntilClosed();
223             try {
224                 in.close();
225             } catch (IOException JavaDoc e) {
226                 synchronized (this) { ioe = e; }
227             } finally {
228                 synchronized (this) {
229                     thread = null;
230                     notify();
231                 }
232             }
233         }
234     }
235     
236     /**
237      * Waits until we have been requested to close the stream.
238      */

239     private synchronized void waitUntilClosed() {
240         waitingForClose = true;
241         notify();
242         while (! closeRequested) {
243             try {
244                 wait();
245             } catch (InterruptedException JavaDoc e) {
246                 closeRequested = true; // alternate quit signal
247
}
248         }
249     }
250
251     /**
252      * Reads bytes into the buffer until EOF, closed, or error.
253      */

254     private void readUntilDone() throws IOException JavaDoc {
255         for (;;) {
256             int off, len;
257             synchronized (this) {
258                 while (isBufferFull()) {
259                     if (closeRequested) return; // quit signal
260
waitForRead();
261                 }
262                 off = (head + length) % iobuffer.length;
263                 len = ((head > off) ? head : iobuffer.length) - off;
264             }
265             int count;
266             try {
267                 // the i/o operation might block without releasing the lock,
268
// so we do this outside of the synchronized block
269
count = in.read(iobuffer, off, len);
270                 if (count == -1) return; // EOF encountered
271
} catch (InterruptedIOException JavaDoc e) {
272                 count = e.bytesTransferred; // keep partial transfer
273
}
274             synchronized (this) {
275                 length += count;
276                 notify();
277             }
278         }
279     }
280     
281     /*
282      * Wait for a read when the buffer is full (with the implication
283      * that space will become available in the buffer after the read
284      * takes place).
285      */

286     private synchronized void waitForRead() {
287         try {
288             if (growWhenFull) {
289                 // wait a second before growing to let reads catch up
290
wait(readTimeout);
291             } else {
292                 wait();
293             }
294         } catch (InterruptedException JavaDoc e) {
295             closeRequested = true; // alternate quit signal
296
}
297         // If the buffer is still full, give it a chance to grow
298
if (growWhenFull && isBufferFull()) {
299             growBuffer();
300         }
301     }
302
303     private synchronized void growBuffer() {
304         int newSize = 2 * iobuffer.length;
305         if (newSize > iobuffer.length) {
306             if (Policy.DEBUG_STREAMS) {
307                 System.out.println("InputStream growing to " + newSize + " bytes"); //$NON-NLS-1$ //$NON-NLS-2$
308
}
309             byte[] newBuffer = new byte[newSize];
310             int pos = 0;
311             int len = length;
312             while (len-- > 0) {
313                 newBuffer[pos++] = iobuffer[head++];
314                 if (head == iobuffer.length) head = 0;
315             }
316             iobuffer = newBuffer;
317             head = 0;
318             // length instance variable was not changed by this method
319
}
320     }
321
322     private boolean isBufferFull() {
323         return length == iobuffer.length;
324     }
325 }
326
Popular Tags