1 28 29 package HTTPClient; 30 31 import java.io.InputStream ; 32 import java.io.IOException ; 33 import java.io.InterruptedIOException ; 34 35 44 final class RespInputStream extends InputStream implements GlobalConstants 45 { 46 47 private StreamDemultiplexor demux = null; 48 49 50 private ResponseHandler resph; 51 52 54 boolean closed = false; 55 56 57 private boolean dont_truncate = false; 58 59 60 private byte[] buffer = null; 61 62 63 private boolean interrupted = false; 64 65 66 private int offset = 0; 67 68 69 private int end = 0; 70 71 72 int count = 0; 73 74 75 77 RespInputStream(StreamDemultiplexor demux, ResponseHandler resph) 78 { 79 this.demux = demux; 80 this.resph = resph; 81 } 82 83 84 86 private byte[] ch = new byte[1]; 87 93 public synchronized int read() throws IOException 94 { 95 int rcvd = read(ch, 0, 1); 96 if (rcvd == 1) 97 return ch[0] & 0xff; 98 else 99 return -1; 100 } 101 102 103 110 public synchronized int read(byte[] b, int off, int len) throws IOException 111 { 112 if (closed) 113 return -1; 114 115 int left = end - offset; 116 if (buffer != null && !(left == 0 && interrupted)) 117 { 118 if (left == 0) return -1; 119 120 len = (len > left ? left : len); 121 System.arraycopy(buffer, offset, b, off, len); 122 offset += len; 123 124 return len; 125 } 126 else 127 { 128 if (DebugDemux) 129 { 130 if (resph.resp.cd_type != CD_HDRS) 131 System.err.println("RspIS: Reading stream " + 132 this.hashCode() + 133 " (" + Thread.currentThread() + ")"); 134 } 135 136 int rcvd; 137 if (resph.resp.cd_type == CD_HDRS) 138 rcvd = demux.read(b, off, len, resph, resph.resp.timeout); 139 else 140 rcvd = demux.read(b, off, len, resph, 0); 141 if (rcvd != -1 && resph.resp.got_headers) 142 count += rcvd; 143 144 return rcvd; 145 } 146 } 147 148 149 155 public synchronized long skip(long num) throws IOException 156 { 157 if (closed) 158 return 0; 159 160 int left = end - offset; 161 if (buffer != null && !(left == 0 && interrupted)) 162 { 163 num = (num > left ? left : num); 164 offset += num; 165 return num; 166 } 167 else 168 { 169 long skpd = demux.skip(num, resph); 170 if (resph.resp.got_headers) 171 count += skpd; 172 return skpd; 173 } 174 } 175 176 177 183 public synchronized int available() throws IOException 184 { 185 if (closed) 186 return 0; 187 188 if (buffer != null && !(end-offset == 0 && interrupted)) 189 return end-offset; 190 else 191 return demux.available(resph); 192 } 193 194 195 201 public synchronized void close() throws IOException 202 { 203 if (!closed) 204 { 205 closed = true; 206 207 if (dont_truncate && (buffer == null || interrupted)) 208 readAll(resph.resp.timeout); 209 210 if (DebugDemux) 211 System.err.println("RspIS: User closed stream " + hashCode() + 212 " (" + Thread.currentThread() + ")"); 213 214 demux.closeSocketIfAllStreamsClosed(); 215 216 if (dont_truncate) 217 { 218 try 219 { resph.resp.http_resp.invokeTrailerHandlers(false); } 220 catch (ModuleException me) 221 { throw new IOException (me.toString()); } 222 } 223 } 224 } 225 226 227 230 protected void finalize() throws Throwable 231 { 232 try 233 { close(); } 234 finally 235 { super.finalize(); } 236 } 237 238 239 241 254 void readAll(int timeout) throws IOException 255 { 256 if (DebugDemux) 257 System.err.println("RspIS: Read-all on stream " + this.hashCode() + 258 " (" + Thread.currentThread() + ")"); 259 260 synchronized(resph.resp) 261 { 262 if (!resph.resp.got_headers) { 264 int sav_to = resph.resp.timeout; 265 resph.resp.timeout = timeout; 266 resph.resp.getStatusCode(); 267 resph.resp.timeout = sav_to; 268 } 269 } 270 271 synchronized(this) 272 { 273 if (buffer != null && !interrupted) return; 274 275 int rcvd = 0; 276 try 277 { 278 if (closed) { 280 buffer = new byte[10000]; 281 do 282 { 283 count += rcvd; 284 rcvd = demux.read(buffer, 0, buffer.length, resph, 285 timeout); 286 } while (rcvd != -1); 287 buffer = null; 288 } 289 else 290 { 291 if (buffer == null) 292 { 293 buffer = new byte[10000]; 294 offset = 0; 295 end = 0; 296 } 297 298 do 299 { 300 rcvd = demux.read(buffer, end, buffer.length-end, resph, 301 timeout); 302 if (rcvd < 0) break; 303 304 count += rcvd; 305 end += rcvd; 306 buffer = Util.resizeArray(buffer, end+10000); 307 } while (true); 308 } 309 } 310 catch (InterruptedIOException iioe) 311 { 312 interrupted = true; 313 throw iioe; 314 } 315 catch (IOException ioe) 316 { 317 buffer = null; } 319 320 interrupted = false; 321 } 322 } 323 324 325 330 synchronized void dontTruncate() 331 { 332 dont_truncate = true; 333 } 334 } 335 336 | Popular Tags |