1 11 package org.eclipse.team.internal.core.streams; 12 13 import java.io.BufferedOutputStream ; 14 import java.io.FilterOutputStream ; 15 import java.io.IOException ; 16 import java.io.InterruptedIOException ; 17 import java.io.OutputStream ; 18 19 import org.eclipse.team.internal.core.Messages; 20 21 32 public class TimeoutOutputStream extends FilterOutputStream { 33 private final long writeTimeout; private final long closeTimeout; 37 private byte[] iobuffer; private int head = 0; private int length = 0; private boolean closeRequested = false; private boolean flushRequested = false; 44 private Thread thread; 46 private boolean waitingForClose = false; private IOException ioe = null; 48 49 59 public TimeoutOutputStream(OutputStream out, int bufferSize, long writeTimeout, long closeTimeout) { 60 super(new BufferedOutputStream (out, bufferSize)); 61 this.writeTimeout = writeTimeout; 62 this.closeTimeout = closeTimeout; 63 this.iobuffer = new byte[bufferSize]; 64 thread = new Thread (new Runnable () { 65 public void run() { 66 runThread(); 67 } 68 }, "TimeoutOutputStream"); thread.setDaemon(true); 70 thread.start(); 71 } 72 73 83 public void close() throws IOException { 84 Thread oldThread; 85 synchronized (this) { 86 if (thread == null) return; 87 oldThread = thread; 88 closeRequested = true; 89 thread.interrupt(); 90 checkError(); 91 } 92 if (closeTimeout == -1) return; 93 try { 94 oldThread.join(closeTimeout); 95 } catch (InterruptedException e) { 96 Thread.currentThread().interrupt(); } 98 synchronized (this) { 99 checkError(); 100 if (thread != null) throw new InterruptedIOException (); 101 } 102 } 103 104 110 public synchronized void write(int b) throws IOException { 111 syncCommit(true); 112 iobuffer[(head + length) % iobuffer.length] = (byte) b; 113 length++; 114 notify(); 115 } 116 117 123 public synchronized void write(byte[] buffer, int off, int len) throws IOException { 124 int amount = 0; 125 try { 126 do { 127 syncCommit(true); 128 while (amount < len && length != iobuffer.length) { 129 iobuffer[(head + length) % iobuffer.length] = buffer[off++]; 130 length++; 131 amount++; 132 } 133 } while (amount < len); 134 } catch (InterruptedIOException e) { 135 e.bytesTransferred = amount; 136 throw e; 137 } 138 notify(); 139 } 140 141 147 public synchronized void flush() throws IOException { 148 int oldLength = length; 149 flushRequested = true; 150 try { 151 syncCommit(false); 152 } catch (InterruptedIOException e) { 153 e.bytesTransferred = oldLength - length; 154 throw e; 155 } 156 notify(); 157 } 158 159 164 private void syncCommit(boolean partial) throws IOException { 165 checkError(); if (partial && length != iobuffer.length || length == 0) return; 167 if (waitingForClose) throw new IOException (Messages.TimeoutOutputStream_cannotWriteToStream); 168 notify(); 169 try { 170 wait(writeTimeout); 171 } catch (InterruptedException e) { 172 Thread.currentThread().interrupt(); } 174 checkError(); if (partial && length != iobuffer.length || length == 0) return; 176 throw new InterruptedIOException (); 177 } 178 179 182 private void checkError() throws IOException { 183 if (ioe != null) { 184 IOException e = ioe; 185 ioe = null; 186 throw e; 187 } 188 } 189 190 193 private void runThread() { 194 try { 195 writeUntilDone(); 196 } catch (IOException e) { 197 synchronized (this) { ioe = e; } 198 } finally { 199 waitUntilClosed(); 200 try { 201 out.close(); 202 } catch (IOException e) { 203 synchronized (this) { ioe = e; } 204 } finally { 205 synchronized (this) { 206 thread = null; 207 notify(); 208 } 209 } 210 } 211 } 212 213 216 private synchronized void waitUntilClosed() { 217 waitingForClose = true; 218 notify(); 219 while (! closeRequested) { 220 try { 221 wait(); 222 } catch (InterruptedException e) { 223 closeRequested = true; } 225 } 226 } 227 228 231 private void writeUntilDone() throws IOException { 232 int bytesUntilFlush = -1; for (;;) { 234 int off, len; 235 synchronized (this) { 236 for (;;) { 237 if (closeRequested && length == 0) return; if (length != 0 || flushRequested) break; 239 try { 240 wait(); 241 } catch (InterruptedException e) { 242 closeRequested = true; } 244 } 245 off = head; 246 len = iobuffer.length - head; 247 if (len > length) len = length; 248 if (flushRequested && bytesUntilFlush < 0) { 249 flushRequested = false; 250 bytesUntilFlush = length; 251 } 252 } 253 254 if (len != 0) { 256 try { 258 out.write(iobuffer, off, len); 261 } catch (InterruptedIOException e) { 262 len = e.bytesTransferred; 263 } 264 } 265 266 if (bytesUntilFlush >= 0) { 268 bytesUntilFlush -= len; 269 if (bytesUntilFlush <= 0) { 270 try { 272 out.flush(); 273 } catch (InterruptedIOException e) { 274 } 275 bytesUntilFlush = -1; } 277 } 278 279 if (len != 0) { 281 synchronized (this) { 282 head = (head + len) % iobuffer.length; 283 length -= len; 284 notify(); 285 } 286 } 287 } 288 } 289 } 290 | Popular Tags |