1 7 8 package java.io; 9 10 11 18 19 public class PipedReader extends Reader { 20 boolean closedByWriter = false; 21 boolean closedByReader = false; 22 boolean connected = false; 23 24 28 Thread readSide; 29 Thread writeSide; 30 31 34 static final int PIPE_SIZE = 1024; 35 36 39 char buffer[] = new char[PIPE_SIZE]; 40 41 47 int in = -1; 48 49 53 int out = 0; 54 55 64 public PipedReader(PipedWriter src) throws IOException { 65 connect(src); 66 } 67 68 77 public PipedReader() { 78 } 79 80 104 public void connect(PipedWriter src) throws IOException { 105 src.connect(this); 106 } 107 108 112 synchronized void receive(int c) throws IOException { 113 if (!connected) { 114 throw new IOException ("Pipe not connected"); 115 } else if (closedByWriter || closedByReader) { 116 throw new IOException ("Pipe closed"); 117 } else if (readSide != null && !readSide.isAlive()) { 118 throw new IOException ("Read end dead"); 119 } 120 121 writeSide = Thread.currentThread(); 122 while (in == out) { 123 if ((readSide != null) && !readSide.isAlive()) { 124 throw new IOException ("Pipe broken"); 125 } 126 127 notifyAll(); 128 try { 129 wait(1000); 130 } catch (InterruptedException ex) { 131 throw new java.io.InterruptedIOException (); 132 } 133 } 134 if (in < 0) { 135 in = 0; 136 out = 0; 137 } 138 buffer[in++] = (char) c; 139 if (in >= buffer.length) { 140 in = 0; 141 } 142 } 143 144 148 synchronized void receive(char c[], int off, int len) throws IOException { 149 while (--len >= 0) { 150 receive(c[off++]); 151 } 152 } 153 154 158 synchronized void receivedLast() { 159 closedByWriter = true; 160 notifyAll(); 161 } 162 163 179 public synchronized int read() throws IOException { 180 if (!connected) { 181 throw new IOException ("Pipe not connected"); 182 } else if (closedByReader) { 183 throw new IOException ("Pipe closed"); 184 } else if (writeSide != null && !writeSide.isAlive() 185 && !closedByWriter && (in < 0)) { 186 throw new IOException ("Write end dead"); 187 } 188 189 readSide = Thread.currentThread(); 190 int trials = 2; 191 while (in < 0) { 192 if (closedByWriter) { 193 194 return -1; 195 } 196 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { 197 throw new IOException ("Pipe broken"); 198 } 199 200 notifyAll(); 201 try { 202 wait(1000); 203 } catch (InterruptedException ex) { 204 throw new java.io.InterruptedIOException (); 205 } 206 } 207 int ret = buffer[out++]; 208 if (out >= buffer.length) { 209 out = 0; 210 } 211 if (in == out) { 212 213 in = -1; 214 } 215 return ret; 216 } 217 218 235 public synchronized int read(char cbuf[], int off, int len) throws IOException { 236 if (!connected) { 237 throw new IOException ("Pipe not connected"); 238 } else if (closedByReader) { 239 throw new IOException ("Pipe closed"); 240 } else if (writeSide != null && !writeSide.isAlive() 241 && !closedByWriter && (in < 0)) { 242 throw new IOException ("Write end dead"); 243 } 244 245 if ((off < 0) || (off > cbuf.length) || (len < 0) || 246 ((off + len) > cbuf.length) || ((off + len) < 0)) { 247 throw new IndexOutOfBoundsException (); 248 } else if (len == 0) { 249 return 0; 250 } 251 252 253 int c = read(); 254 if (c < 0) { 255 return -1; 256 } 257 cbuf[off] = (char)c; 258 int rlen = 1; 259 while ((in >= 0) && (--len > 0)) { 260 cbuf[off + rlen] = buffer[out++]; 261 rlen++; 262 if (out >= buffer.length) { 263 out = 0; 264 } 265 if (in == out) { 266 267 in = -1; 268 } 269 } 270 return rlen; 271 } 272 273 279 public synchronized boolean ready() throws IOException { 280 if (!connected) { 281 throw new IOException ("Pipe not connected"); 282 } else if (closedByReader) { 283 throw new IOException ("Pipe closed"); 284 } else if (writeSide != null && !writeSide.isAlive() 285 && !closedByWriter && (in < 0)) { 286 throw new IOException ("Write end dead"); 287 } 288 if (in < 0) { 289 return false; 290 } else { 291 return true; 292 } 293 } 294 295 301 public void close() throws IOException { 302 in = -1; 303 closedByReader = true; 304 } 305 } 306 | Popular Tags |