1 11 package org.eclipse.team.internal.core.streams; 12 13 import java.io.FilterInputStream ; 14 import java.io.IOException ; 15 import java.io.InputStream ; 16 import java.io.InterruptedIOException ; 17 import org.eclipse.team.internal.core.Policy; 18 19 30 public class TimeoutInputStream extends FilterInputStream { 31 private final long readTimeout; private final long closeTimeout; 35 private boolean closeRequested = false; 38 private Thread thread; private byte[] iobuffer; private int head = 0; private int length = 0; private IOException ioe = null; private boolean waitingForClose = false; 46 private boolean growWhenFull = false; 48 58 public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) { 59 super(in); 60 this.readTimeout = readTimeout; 61 this.closeTimeout = closeTimeout; 62 this.iobuffer = new byte[bufferSize]; 63 thread = new Thread (new Runnable () { 64 public void run() { 65 runThread(); 66 } 67 }, "TimeoutInputStream"); thread.setDaemon(true); 69 thread.start(); 70 } 71 72 public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout, boolean growWhenFull) { 73 this(in, bufferSize, readTimeout, closeTimeout); 74 this.growWhenFull = growWhenFull; 75 } 76 77 86 public void close() throws IOException { 87 Thread oldThread; 88 synchronized (this) { 89 if (thread == null) return; 90 oldThread = thread; 91 closeRequested = true; 92 thread.interrupt(); 93 checkError(); 94 } 95 if (closeTimeout == -1) return; 96 try { 97 oldThread.join(closeTimeout); 98 } catch (InterruptedException e) { 99 Thread.currentThread().interrupt(); } 101 synchronized (this) { 102 checkError(); 103 if (thread != null) throw new InterruptedIOException (); 104 } 105 } 106 107 111 public synchronized int available() throws IOException { 112 if (length == 0) checkError(); 113 return length > 0 ? length : 0; 114 } 115 116 122 public synchronized int read() throws IOException { 123 if (! syncFill()) return -1; int b = iobuffer[head++] & 255; 125 if (head == iobuffer.length) head = 0; 126 length--; 127 notify(); 128 return b; 129 } 130 131 137 public synchronized int read(byte[] buffer, int off, int len) throws IOException { 138 if (! syncFill()) return -1; int pos = off; 140 if (len > length) len = length; 141 while (len-- > 0) { 142 buffer[pos++] = iobuffer[head++]; 143 if (head == iobuffer.length) head = 0; 144 length--; 145 } 146 notify(); 147 return pos - off; 148 } 149 150 156 public synchronized long skip(long count) throws IOException { 157 long amount = 0; 158 try { 159 do { 160 if (! syncFill()) break; int skip = (int) Math.min(count - amount, length); 162 head = (head + skip) % iobuffer.length; 163 length -= skip; 164 amount += skip; 165 } while (amount < count); 166 } catch (InterruptedIOException e) { 167 e.bytesTransferred = (int) amount; throw e; 169 } 170 notify(); 171 return amount; 172 } 173 174 177 public boolean markSupported() { 178 return false; 179 } 180 181 186 private boolean syncFill() throws IOException { 187 if (length != 0) return true; 188 checkError(); if (waitingForClose) return false; 190 notify(); 191 try { 192 wait(readTimeout); 193 } catch (InterruptedException e) { 194 Thread.currentThread().interrupt(); } 196 if (length != 0) return true; 197 checkError(); if (waitingForClose) return false; 199 throw new InterruptedIOException (); 200 } 201 202 205 private void checkError() throws IOException { 206 if (ioe != null) { 207 IOException e = ioe; 208 ioe = null; 209 throw e; 210 } 211 } 212 213 216 private void runThread() { 217 try { 218 readUntilDone(); 219 } catch (IOException e) { 220 synchronized (this) { ioe = e; } 221 } finally { 222 waitUntilClosed(); 223 try { 224 in.close(); 225 } catch (IOException e) { 226 synchronized (this) { ioe = e; } 227 } finally { 228 synchronized (this) { 229 thread = null; 230 notify(); 231 } 232 } 233 } 234 } 235 236 239 private synchronized void waitUntilClosed() { 240 waitingForClose = true; 241 notify(); 242 while (! closeRequested) { 243 try { 244 wait(); 245 } catch (InterruptedException e) { 246 closeRequested = true; } 248 } 249 } 250 251 254 private void readUntilDone() throws IOException { 255 for (;;) { 256 int off, len; 257 synchronized (this) { 258 while (isBufferFull()) { 259 if (closeRequested) return; waitForRead(); 261 } 262 off = (head + length) % iobuffer.length; 263 len = ((head > off) ? head : iobuffer.length) - off; 264 } 265 int count; 266 try { 267 count = in.read(iobuffer, off, len); 270 if (count == -1) return; } catch (InterruptedIOException e) { 272 count = e.bytesTransferred; } 274 synchronized (this) { 275 length += count; 276 notify(); 277 } 278 } 279 } 280 281 286 private synchronized void waitForRead() { 287 try { 288 if (growWhenFull) { 289 wait(readTimeout); 291 } else { 292 wait(); 293 } 294 } catch (InterruptedException e) { 295 closeRequested = true; } 297 if (growWhenFull && isBufferFull()) { 299 growBuffer(); 300 } 301 } 302 303 private synchronized void growBuffer() { 304 int newSize = 2 * iobuffer.length; 305 if (newSize > iobuffer.length) { 306 if (Policy.DEBUG_STREAMS) { 307 System.out.println("InputStream growing to " + newSize + " bytes"); } 309 byte[] newBuffer = new byte[newSize]; 310 int pos = 0; 311 int len = length; 312 while (len-- > 0) { 313 newBuffer[pos++] = iobuffer[head++]; 314 if (head == iobuffer.length) head = 0; 315 } 316 iobuffer = newBuffer; 317 head = 0; 318 } 320 } 321 322 private boolean isBufferFull() { 323 return length == iobuffer.length; 324 } 325 } 326 | Popular Tags |