KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > eclipse > team > internal > core > streams > TimeoutOutputStream


1 /*******************************************************************************
2  * Copyright (c) 2000, 2006 IBM Corporation and others.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Eclipse Public License v1.0
5  * which accompanies this distribution, and is available at
6  * http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Contributors:
9  * IBM Corporation - initial API and implementation
10  *******************************************************************************/

11 package org.eclipse.team.internal.core.streams;
12
13 import java.io.BufferedOutputStream JavaDoc;
14 import java.io.FilterOutputStream JavaDoc;
15 import java.io.IOException JavaDoc;
16 import java.io.InterruptedIOException JavaDoc;
17 import java.io.OutputStream JavaDoc;
18
19 import org.eclipse.team.internal.core.Messages;
20
21 /**
22  * Wraps an output stream that blocks indefinitely to simulate timeouts on write(),
23  * flush(), and close(). The resulting output stream is buffered and supports
24  * retrying operations that failed due to an InterruptedIOException.
25  *
26  * Supports resuming partially completed operations after an InterruptedIOException
27  * REGARDLESS of whether the underlying stream does unless the underlying stream itself
28  * generates InterruptedIOExceptions in which case it must also support resuming.
29  * Check the bytesTransferred field to determine how much of the operation completed;
30  * conversely, at what point to resume.
31  */

32 public class TimeoutOutputStream extends FilterOutputStream JavaDoc {
33     // unsynchronized variables
34
private final long writeTimeout; // write() timeout in millis
35
private final long closeTimeout; // close() timeout in millis, or -1
36

37     // requests for the thread (synchronized)
38
private byte[] iobuffer; // circular buffer
39
private int head = 0; // points to first unwritten byte
40
private int length = 0; // number of remaining unwritten bytes
41
private boolean closeRequested = false; // if true, close requested
42
private boolean flushRequested = false; // if true, flush requested
43

44     // responses from the thread (synchronized)
45
private Thread JavaDoc thread;
46     private boolean waitingForClose = false; // if true, the thread is waiting for close()
47
private IOException JavaDoc ioe = null;
48
49     /**
50      * Creates a timeout wrapper for an output stream.
51      * @param out the underlying input stream
52      * @param bufferSize the buffer size in bytes; should be large enough to mitigate
53      * Thread synchronization and context switching overhead
54      * @param writeTimeout the number of milliseconds to block for a write() or flush() before
55      * throwing an InterruptedIOException; 0 blocks indefinitely
56      * @param closeTimeout the number of milliseconds to block for a close() before throwing
57      * an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background
58      */

59     public TimeoutOutputStream(OutputStream JavaDoc out, int bufferSize, long writeTimeout, long closeTimeout) {
60         super(new BufferedOutputStream JavaDoc(out, bufferSize));
61         this.writeTimeout = writeTimeout;
62         this.closeTimeout = closeTimeout;
63         this.iobuffer = new byte[bufferSize];
64         thread = new Thread JavaDoc(new Runnable JavaDoc() {
65             public void run() {
66                 runThread();
67             }
68         }, "TimeoutOutputStream");//$NON-NLS-1$
69
thread.setDaemon(true);
70         thread.start();
71     }
72
73     /**
74      * Wraps the underlying stream's method.
75      * It may be important to wait for a stream to actually be closed because it
76      * holds an implicit lock on a system resoure (such as a file) while it is
77      * open. Closing a stream may take time if the underlying stream is still
78      * servicing a previous request.
79      * @throws InterruptedIOException if the timeout expired, bytesTransferred will
80      * reflect the number of bytes flushed from the buffer
81      * @throws IOException if an i/o error occurs
82      */

83     public void close() throws IOException JavaDoc {
84         Thread JavaDoc oldThread;
85         synchronized (this) {
86             if (thread == null) return;
87             oldThread = thread;
88             closeRequested = true;
89             thread.interrupt();
90             checkError();
91         }
92         if (closeTimeout == -1) return;
93         try {
94             oldThread.join(closeTimeout);
95         } catch (InterruptedException JavaDoc e) {
96             Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
97
}
98         synchronized (this) {
99             checkError();
100             if (thread != null) throw new InterruptedIOException JavaDoc();
101         }
102     }
103
104     /**
105      * Writes a byte to the stream.
106      * @throws InterruptedIOException if the timeout expired and no data was sent,
107      * bytesTransferred will be zero
108      * @throws IOException if an i/o error occurs
109      */

110     public synchronized void write(int b) throws IOException JavaDoc {
111         syncCommit(true);
112         iobuffer[(head + length) % iobuffer.length] = (byte) b;
113         length++;
114         notify();
115     }
116     
117     /**
118      * Writes multiple bytes to the stream.
119      * @throws InterruptedIOException if the timeout expired, bytesTransferred will
120      * reflect the number of bytes sent
121      * @throws IOException if an i/o error occurs
122      */

123     public synchronized void write(byte[] buffer, int off, int len) throws IOException JavaDoc {
124         int amount = 0;
125         try {
126             do {
127                 syncCommit(true);
128                 while (amount < len && length != iobuffer.length) {
129                     iobuffer[(head + length) % iobuffer.length] = buffer[off++];
130                     length++;
131                     amount++;
132                 }
133             } while (amount < len);
134         } catch (InterruptedIOException JavaDoc e) {
135             e.bytesTransferred = amount;
136             throw e;
137         }
138         notify();
139     }
140
141     /**
142      * Flushes the stream.
143      * @throws InterruptedIOException if the timeout expired, bytesTransferred will
144      * reflect the number of bytes flushed from the buffer
145      * @throws IOException if an i/o error occurs
146      */

147     public synchronized void flush() throws IOException JavaDoc {
148         int oldLength = length;
149         flushRequested = true;
150         try {
151             syncCommit(false);
152         } catch (InterruptedIOException JavaDoc e) {
153             e.bytesTransferred = oldLength - length;
154             throw e;
155         }
156         notify();
157     }
158     
159     /**
160      * Waits for the buffer to drain if it is full.
161      * @param partial if true, waits until the buffer is partially empty, else drains it entirely
162      * @throws InterruptedIOException if the buffer could not be drained as requested
163      */

164     private void syncCommit(boolean partial) throws IOException JavaDoc {
165         checkError(); // check errors before allowing the addition of new bytes
166
if (partial && length != iobuffer.length || length == 0) return;
167         if (waitingForClose) throw new IOException JavaDoc(Messages.TimeoutOutputStream_cannotWriteToStream);
168         notify();
169         try {
170             wait(writeTimeout);
171         } catch (InterruptedException JavaDoc e) {
172             Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
173
}
174         checkError(); // check errors before allowing the addition of new bytes
175
if (partial && length != iobuffer.length || length == 0) return;
176         throw new InterruptedIOException JavaDoc();
177     }
178
179     /**
180      * If an exception is pending, throws it.
181      */

182     private void checkError() throws IOException JavaDoc {
183         if (ioe != null) {
184             IOException JavaDoc e = ioe;
185             ioe = null;
186             throw e;
187         }
188     }
189
190     /**
191      * Runs the thread in the background.
192      */

193     private void runThread() {
194         try {
195             writeUntilDone();
196         } catch (IOException JavaDoc e) {
197             synchronized (this) { ioe = e; }
198         } finally {
199             waitUntilClosed();
200             try {
201                 out.close();
202             } catch (IOException JavaDoc e) {
203                 synchronized (this) { ioe = e; }
204             } finally {
205                 synchronized (this) {
206                     thread = null;
207                     notify();
208                 }
209             }
210         }
211     }
212
213     /**
214      * Waits until we have been requested to close the stream.
215      */

216     private synchronized void waitUntilClosed() {
217         waitingForClose = true;
218         notify();
219         while (! closeRequested) {
220             try {
221                 wait();
222             } catch (InterruptedException JavaDoc e) {
223                 closeRequested = true; // alternate quit signal
224
}
225         }
226     }
227
228     /**
229      * Writes bytes from the buffer until closed and buffer is empty
230      */

231     private void writeUntilDone() throws IOException JavaDoc {
232         int bytesUntilFlush = -1; // if > 0, then we will flush after that many bytes have been written
233
for (;;) {
234             int off, len;
235             synchronized (this) {
236                 for (;;) {
237                     if (closeRequested && length == 0) return; // quit signal
238
if (length != 0 || flushRequested) break;
239                     try {
240                         wait();
241                     } catch (InterruptedException JavaDoc e) {
242                         closeRequested = true; // alternate quit signal
243
}
244                 }
245                 off = head;
246                 len = iobuffer.length - head;
247                 if (len > length) len = length;
248                 if (flushRequested && bytesUntilFlush < 0) {
249                     flushRequested = false;
250                     bytesUntilFlush = length;
251                 }
252             }
253             
254             // If there are bytes to be written, write them
255
if (len != 0) {
256                 // write out all remaining bytes from the buffer before flushing
257
try {
258                     // the i/o operation might block without releasing the lock,
259
// so we do this outside of the synchronized block
260
out.write(iobuffer, off, len);
261                 } catch (InterruptedIOException JavaDoc e) {
262                     len = e.bytesTransferred;
263                 }
264             }
265             
266             // If there was a pending flush, do it
267
if (bytesUntilFlush >= 0) {
268                 bytesUntilFlush -= len;
269                 if (bytesUntilFlush <= 0) {
270                     // flush the buffer now
271
try {
272                         out.flush();
273                     } catch (InterruptedIOException JavaDoc e) {
274                     }
275                     bytesUntilFlush = -1; // might have been 0
276
}
277             }
278             
279             // If bytes were written, update the circular buffer
280
if (len != 0) {
281                 synchronized (this) {
282                     head = (head + len) % iobuffer.length;
283                     length -= len;
284                     notify();
285                 }
286             }
287         }
288     }
289 }
290
Popular Tags