1 24 25 package com.mysql.jdbc.util; 26 27 import java.io.IOException ; 28 import java.io.InputStream ; 29 30 39 public class ReadAheadInputStream extends InputStream { 40 41 private final static int DEFAULT_BUFFER_SIZE = 4096; 42 43 private InputStream underlyingStream; 44 45 private byte buf[]; 46 47 protected int endOfCurrentData; 48 49 protected int currentPosition; 50 51 protected boolean doDebug = false; 52 53 private void fill(int readAtLeastTheseManyBytes) throws IOException { 54 checkClosed(); 55 56 this.currentPosition = 0; 57 58 this.endOfCurrentData = currentPosition; 59 60 64 int bytesToRead = Math.min(this.buf.length - currentPosition, 65 readAtLeastTheseManyBytes); 66 67 int bytesAvailable = this.underlyingStream.available(); 68 69 if (bytesAvailable > bytesToRead) { 70 71 74 bytesToRead = Math.min(this.buf.length - currentPosition, 75 bytesAvailable); 76 } 77 78 if (this.doDebug) { 79 StringBuffer debugBuf = new StringBuffer (); 80 debugBuf.append(" ReadAheadInputStream.fill("); 81 debugBuf.append(readAtLeastTheseManyBytes); 82 debugBuf.append("), buffer_size="); 83 debugBuf.append(this.buf.length); 84 debugBuf.append(", current_position="); 85 debugBuf.append(currentPosition); 86 debugBuf.append(", need to read "); 87 debugBuf.append(Math.min(this.buf.length - currentPosition, 88 readAtLeastTheseManyBytes)); 89 debugBuf.append(" bytes to fill request,"); 90 91 if (bytesAvailable > 0) { 92 debugBuf.append(" underlying InputStream reports "); 93 debugBuf.append(bytesAvailable); 94 95 debugBuf.append(" total bytes available,"); 96 } 97 98 debugBuf.append(" attempting to read "); 99 debugBuf.append(bytesToRead); 100 debugBuf.append(" bytes."); 101 102 System.err.println(debugBuf.toString()); 103 } 104 105 int n = this.underlyingStream.read(this.buf, currentPosition, 106 bytesToRead); 107 108 if (n > 0) { 109 endOfCurrentData = n + currentPosition; 110 } 111 } 112 113 private int readFromUnderlyingStreamIfNecessary(byte[] b, int off, int len) 114 throws IOException { 115 checkClosed(); 116 117 int avail = endOfCurrentData - currentPosition; 118 119 if (this.doDebug) { 120 StringBuffer debugBuf = new StringBuffer (); 121 debugBuf.append("ReadAheadInputStream.readIfNecessary("); 122 debugBuf.append(b); 123 debugBuf.append(","); 124 debugBuf.append(off); 125 debugBuf.append(","); 126 debugBuf.append(len); 127 debugBuf.append(")"); 128 129 if (avail <= 0) { 130 debugBuf 131 .append(" not all data available in buffer, must read from stream"); 132 133 if (len >= this.buf.length) { 134 debugBuf 135 .append(", amount requested > buffer, returning direct read() from stream"); 136 } 137 } 138 139 System.err.println(debugBuf.toString()); 140 } 141 142 if (avail <= 0) { 143 144 if (len >= this.buf.length) { 145 return this.underlyingStream.read(b, off, len); 146 } 147 148 fill(len); 149 150 avail = endOfCurrentData - currentPosition; 151 152 if (avail <= 0) 153 return -1; 154 } 155 156 int bytesActuallyRead = (avail < len) ? avail : len; 157 158 System.arraycopy(this.buf, currentPosition, b, off, bytesActuallyRead); 159 160 this.currentPosition += bytesActuallyRead; 161 162 return bytesActuallyRead; 163 } 164 165 public synchronized int read(byte b[], int off, int len) throws IOException { 166 checkClosed(); if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 168 throw new IndexOutOfBoundsException (); 169 } else if (len == 0) { 170 return 0; 171 } 172 173 int totalBytesRead = 0; 174 175 while (true) { 176 int bytesReadThisRound = readFromUnderlyingStreamIfNecessary(b, off 177 + totalBytesRead, len - totalBytesRead); 178 179 if (bytesReadThisRound <= 0) { 181 if (totalBytesRead == 0) { 182 totalBytesRead = bytesReadThisRound; 183 } 184 185 break; 186 } 187 188 totalBytesRead += bytesReadThisRound; 189 190 if (totalBytesRead >= len) { 192 break; 193 } 194 195 if (this.underlyingStream.available() <= 0) { 197 break; 198 } 199 } 200 201 return totalBytesRead; 202 } 203 204 public int read() throws IOException { 205 checkClosed(); 206 207 if (currentPosition >= endOfCurrentData) { 208 fill(1); 209 if (currentPosition >= endOfCurrentData) 210 return -1; 211 } 212 213 return this.buf[currentPosition++] & 0xff; 214 } 215 216 public int available() throws IOException { 217 checkClosed(); 218 219 return this.underlyingStream.available() 220 + (this.endOfCurrentData - this.currentPosition); 221 } 222 223 private void checkClosed() throws IOException { 224 225 if (this.buf == null) { 226 throw new IOException ("Stream closed"); 227 } 228 } 229 230 233 public ReadAheadInputStream(InputStream toBuffer, boolean debug) { 234 this(toBuffer, DEFAULT_BUFFER_SIZE, debug); 235 } 236 237 public ReadAheadInputStream(InputStream toBuffer, int bufferSize, 238 boolean debug) { 239 this.underlyingStream = toBuffer; 240 this.buf = new byte[bufferSize]; 241 this.doDebug = debug; 242 } 243 244 249 public void close() throws IOException { 250 if (this.underlyingStream != null) { 251 try { 252 this.underlyingStream.close(); 253 } finally { 254 this.underlyingStream = null; 255 this.buf = null; 256 } 257 } 258 } 259 260 265 public boolean markSupported() { 266 return false; 267 } 268 269 274 public long skip(long n) throws IOException { 275 checkClosed(); 276 if (n <= 0) { 277 return 0; 278 } 279 280 long bytesAvailInBuffer = this.endOfCurrentData - this.currentPosition; 281 282 if (bytesAvailInBuffer <= 0) { 283 284 fill((int) n); 285 bytesAvailInBuffer = this.endOfCurrentData - this.currentPosition; 286 if (bytesAvailInBuffer <= 0) 287 return 0; 288 } 289 290 long bytesSkipped = (bytesAvailInBuffer < n) ? bytesAvailInBuffer : n; 291 this.currentPosition += bytesSkipped; 292 return bytesSkipped; 293 } 294 } 295 | Popular Tags |