KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > io > RecordingInputStream


1 /* RecordingInputStream
2  *
3  * $Id: RecordingInputStream.java,v 1.33 2006/08/15 04:39:00 gojomo Exp $
4  *
5  * Created on Sep 24, 2003
6  *
7  * Copyright (C) 2003 Internet Archive.
8  *
9  * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24  */

25 package org.archive.io;
26
27 import java.io.File JavaDoc;
28 import java.io.FileOutputStream JavaDoc;
29 import java.io.IOException JavaDoc;
30 import java.io.InputStream JavaDoc;
31 import java.net.SocketException JavaDoc;
32 import java.net.SocketTimeoutException JavaDoc;
33 import java.security.MessageDigest JavaDoc;
34 import java.util.logging.Level JavaDoc;
35 import java.util.logging.Logger JavaDoc;
36
37
38 /**
39  * Stream which records all data read from it, which it acquires from a wrapped
40  * input stream.
41  *
42  * Makes use of a RecordingOutputStream for recording because of its being
43  * file backed so we can write massive amounts of data w/o worrying about
44  * overflowing memory.
45  *
46  * @author gojomo
47  *
48  */

49 public class RecordingInputStream
50     extends InputStream JavaDoc {
51
52     protected static Logger JavaDoc logger =
53         Logger.getLogger("org.archive.io.RecordingInputStream");
54
55     /**
56      * Where we are recording to.
57      */

58     private RecordingOutputStream recordingOutputStream;
59
60     /**
61      * Stream to record.
62      */

63     private InputStream JavaDoc in = null;
64
65     /**
66      * Reusable buffer to avoid reallocation on each readFullyUntil
67      */

68     protected byte[] drainBuffer = new byte[16*1024];
69
70     /**
71      * Create a new RecordingInputStream.
72      *
73      * @param bufferSize Size of buffer to use.
74      * @param backingFilename Name of backing file.
75      */

76     public RecordingInputStream(int bufferSize, String JavaDoc backingFilename)
77     {
78         this.recordingOutputStream = new RecordingOutputStream(bufferSize,
79             backingFilename);
80     }
81
82     public void open(InputStream JavaDoc wrappedStream) throws IOException JavaDoc {
83         logger.fine(Thread.currentThread().getName() + " opening " +
84             wrappedStream + ", " + Thread.currentThread().getName());
85         if(isOpen()) {
86             // error; should not be opening/wrapping in an unclosed
87
// stream remains open
88
throw new IOException JavaDoc("RIS already open for "
89                     +Thread.currentThread().getName());
90         }
91         this.in = wrappedStream;
92         this.recordingOutputStream.open();
93     }
94
95     public int read() throws IOException JavaDoc {
96         if (!isOpen()) {
97             throw new IOException JavaDoc("Stream closed " +
98                 Thread.currentThread().getName());
99         }
100         int b = this.in.read();
101         if (b != -1) {
102             assert this.recordingOutputStream != null: "ROS is null " +
103                 Thread.currentThread().getName();
104             this.recordingOutputStream.write(b);
105         }
106         return b;
107     }
108
109     public int read(byte[] b, int off, int len) throws IOException JavaDoc {
110         if (!isOpen()) {
111             throw new IOException JavaDoc("Stream closed " +
112                 Thread.currentThread().getName());
113         }
114         int count = this.in.read(b,off,len);
115         if (count > 0) {
116             assert this.recordingOutputStream != null: "ROS is null " +
117                 Thread.currentThread().getName();
118             this.recordingOutputStream.write(b,off,count);
119         }
120         return count;
121     }
122
123     public int read(byte[] b) throws IOException JavaDoc {
124             if (!isOpen()) {
125                     throw new IOException JavaDoc("Stream closed " +
126                     Thread.currentThread().getName());
127             }
128             int count = this.in.read(b);
129         if (count > 0) {
130             assert this.recordingOutputStream != null: "ROS is null " +
131                 Thread.currentThread().getName();
132             this.recordingOutputStream.write(b,0,count);
133         }
134         return count;
135     }
136
137     public void close() throws IOException JavaDoc {
138         if (logger.isLoggable(Level.FINE)) {
139             logger.fine(Thread.currentThread().getName() + " closing " +
140                     this.in + ", " + Thread.currentThread().getName());
141         }
142         if (this.in != null) {
143             this.in.close();
144             this.in = null;
145         }
146         this.recordingOutputStream.close();
147     }
148
149     public ReplayInputStream getReplayInputStream() throws IOException JavaDoc {
150         return this.recordingOutputStream.getReplayInputStream();
151     }
152
153     public ReplayInputStream getContentReplayInputStream() throws IOException JavaDoc {
154         return this.recordingOutputStream.getContentReplayInputStream();
155     }
156
157     public long readFully() throws IOException JavaDoc {
158         while(read(drainBuffer) != -1) {
159             // Empty out stream.
160
continue;
161         }
162         return this.recordingOutputStream.getSize();
163     }
164
165     /**
166      * Read all of a stream (Or read until we timeout or have read to the max).
167      * @param softMaxLength Maximum length to read; if zero or < 0, then no
168      * limit. If met, return normally.
169      * @param hardMaxLength Maximum length to read; if zero or < 0, then no
170      * limit. If exceeded, throw RecorderLengthExceededException
171      * @param timeout Timeout in milliseconds for total read; if zero or
172      * negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
173      * RecorderTimeoutException
174      * @param maxBytesPerMs How many bytes per millisecond.
175      * @throws IOException failed read.
176      * @throws RecorderLengthExceededException
177      * @throws RecorderTimeoutException
178      * @throws InterruptedException
179      */

180     public void readFullyOrUntil(long softMaxLength, long hardMaxLength,
181     long timeout, int maxBytesPerMs)
182         throws IOException JavaDoc, RecorderLengthExceededException,
183             RecorderTimeoutException, InterruptedException JavaDoc {
184         // Check we're open before proceeding.
185
if (!isOpen()) {
186             // TODO: should this be a noisier exception-raising error?
187
return;
188         }
189
190         // We need the rate limit in ms per byte, so turn it over
191
double minMsPerByte = 0.0;
192         if (maxBytesPerMs != 0) {
193             minMsPerByte = ((double)1.0)/(double)maxBytesPerMs;
194         }
195
196         long maxLength;
197         if ( softMaxLength>0 && hardMaxLength>0) {
198             // both maxes set; use lower of softMax or hardMax+1
199
maxLength = Math.min(softMaxLength,hardMaxLength+1);
200         } else if(softMaxLength>0 && hardMaxLength<=0) {
201             // softMax only; set max to softMax
202
maxLength = softMaxLength;
203         } else if(softMaxLength<=0 && hardMaxLength>0) {
204             // hardMax only; set max to read as 1 past max
205
maxLength = hardMaxLength+1;
206         } else { // => (softMaxLength<=0 && hardMaxLength<=0)
207
// no maxes
208
maxLength = -1;
209         }
210
211         long timeoutTime;
212         long startTime = System.currentTimeMillis();
213
214         if(timeout > 0) {
215             timeoutTime = startTime + timeout;
216         } else {
217             timeoutTime = Long.MAX_VALUE;
218         }
219
220         long totalReadingTime = 0L;
221         long totalBytes = 0L;
222         long bytesRead = -1L;
223         int maxToRead = -1;
224         while (true) {
225             try {
226                 maxToRead = (maxLength <= 0)
227                     ? drainBuffer.length
228                     : (int) Math.min(drainBuffer.length, maxLength - totalBytes);
229                 // Now, decide if (and for how long) to sleep, prior to reading,
230
// in order to yield the average fetch rate desired.
231
if (minMsPerByte != 0.0 && totalBytes > 0L) {
232                     // We've already fetched something, so we can estimate the
233
// actual bandwidth we've been getting. This number is used
234
// to figure out how many ms we need to wait before
235
// starting the next read. We want the stats at the end of
236
// that read to be within the specified bounds.
237
double actualMsPerByte =
238                         ((double)totalReadingTime)/(double)totalBytes;
239                     // Calculate the minimum time
240
long minTime =
241                         (long)((totalBytes + maxToRead) * minMsPerByte);
242                     // Calculate the actual time spent, plus the estimated time
243
// for the bytes to read
244
long estTime = System.currentTimeMillis() - startTime +
245                         (long)(maxToRead * actualMsPerByte);
246                     // If estTime < minTime, sleep for the difference
247
if (estTime < minTime) {
248                         Thread.sleep(minTime - estTime);
249                     }
250                 }
251
252                 long readStartTime = System.currentTimeMillis();
253                 bytesRead = read(drainBuffer,0,maxToRead);
254                 if (bytesRead == -1) {
255                     break;
256                 }
257                 totalBytes += bytesRead;
258                 totalReadingTime += System.currentTimeMillis() - readStartTime;
259
260                 if (Thread.interrupted()) {
261                     throw new InterruptedException JavaDoc("Interrupted during IO");
262                 }
263             } catch (SocketTimeoutException JavaDoc e) {
264                 // A socket timeout is just a transient problem, meaning
265
// nothing was available in the configured timeout period,
266
// but something else might become available later.
267
// Take this opportunity to check the overall
268
// timeout (below). One reason for this timeout is
269
// servers that keep up the connection, 'keep-alive', even
270
// though we asked them to not keep the connection open.
271
if (System.currentTimeMillis() < timeoutTime) {
272                     if (logger.isLoggable(Level.FINE)) {
273                         logger.fine("Socket timed out after " +
274                             (timeoutTime - startTime) + "ms: " +
275                             e.getMessage());
276                     }
277                 }
278             } catch (SocketException JavaDoc se) {
279                 throw se;
280             } catch (NullPointerException JavaDoc e) {
281                 // [ 896757 ] NPEs in Andy's Th-Fri Crawl.
282
// A crawl was showing NPE's in this part of the code but can
283
// not reproduce. Adding this rethrowing catch block w/
284
// diagnostics to help should we come across the problem in the
285
// future.
286
throw new NullPointerException JavaDoc("Stream " + this.in + ", " +
287                     e.getMessage() + " " + Thread.currentThread().getName());
288             }
289             if (System.currentTimeMillis() >= timeoutTime) {
290                 throw new RecorderTimeoutException("Timedout after " +
291                     (timeoutTime - startTime) + "ms.");
292             }
293             if (hardMaxLength > 0 && totalBytes >= hardMaxLength) {
294                 throw new RecorderLengthExceededException();
295             }
296             if (maxLength > 0 && totalBytes >= maxLength) {
297                 break; // return
298
}
299         }
300     }
301
302     public long getSize() {
303         return this.recordingOutputStream.getSize();
304     }
305
306     public void markContentBegin() {
307         this.recordingOutputStream.markContentBegin();
308     }
309
310     public void startDigest() {
311         this.recordingOutputStream.startDigest();
312     }
313
314     /**
315      * Convenience method for setting SHA1 digest.
316      */

317     public void setSha1Digest() {
318         this.recordingOutputStream.setSha1Digest();
319     }
320
321     /**
322      * Sets a digest function which may be applied to recorded data.
323      * As usually only a subset of the recorded data should
324      * be fed to the digest, you must also call startDigest()
325      * to begin digesting.
326      *
327      * @param md
328      */

329     public void setDigest(MessageDigest JavaDoc md) {
330         this.recordingOutputStream.setDigest(md);
331     }
332
333     /**
334      * Return the digest value for any recorded, digested data. Call
335      * only after all data has been recorded; otherwise, the running
336      * digest state is ruined.
337      *
338      * @return the digest final value
339      */

340     public byte[] getDigestValue() {
341         return this.recordingOutputStream.getDigestValue();
342     }
343
344     public ReplayCharSequence getReplayCharSequence() throws IOException JavaDoc {
345         return getReplayCharSequence(null);
346     }
347
348     /**
349      * @param characterEncoding Encoding of recorded stream.
350      * @return A ReplayCharSequence Will return null if an IOException. Call
351      * close on returned RCS when done.
352      * @throws IOException
353      */

354     public ReplayCharSequence getReplayCharSequence(String JavaDoc characterEncoding)
355             throws IOException JavaDoc {
356         return this.recordingOutputStream.
357             getReplayCharSequence(characterEncoding);
358     }
359
360     public long getResponseContentLength() {
361         return this.recordingOutputStream.getResponseContentLength();
362     }
363
364     public void closeRecorder() throws IOException JavaDoc {
365         this.recordingOutputStream.closeRecorder();
366     }
367
368     /**
369      * @param tempFile
370      * @throws IOException
371      */

372     public void copyContentBodyTo(File JavaDoc tempFile) throws IOException JavaDoc {
373         FileOutputStream JavaDoc fos = new FileOutputStream JavaDoc(tempFile);
374         ReplayInputStream ris = getContentReplayInputStream();
375         ris.readFullyTo(fos);
376         fos.close();
377         ris.close();
378     }
379
380     /**
381      * @return True if we've been opened.
382      */

383     public boolean isOpen()
384     {
385         return this.in != null;
386     }
387 }
388
Popular Tags