KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > ch > ethz > ssh2 > StreamGobbler


1
2 package ch.ethz.ssh2;
3
4 import java.io.IOException JavaDoc;
5 import java.io.InputStream JavaDoc;
6
7 /**
8  * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
9  * thread to constantly consume input from another InputStream. It uses a buffer
10  * to store the consumed data. The buffer size is automatically adjusted, if needed.
11  * <p>
12  * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
13  * InputStreams with instances of this class, then you don't have to bother about
14  * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
15  * since all arriving data will be immediatelly consumed by the worker threads.
16  * Also, as a side effect, the streams will be buffered (e.g., single byte
17  * read() operations are faster).
18  * <p>
19  * Other SSH for Java libraries include this functionality by default in
20  * their STDOUT and STDERR InputStream implementations, however, please be aware
21  * that this approach has also a downside:
22  * <p>
23  * If you do not call the StreamGobbler's <code>read()</code> method often enough
24  * and the peer is constantly sending huge amounts of data, then you will sooner or later
25  * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
26  * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
27  * <p>
28  * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
29  * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
30  *
31  * @author Christian Plattner, plattner@inf.ethz.ch
32  * @version $Id: StreamGobbler.java,v 1.4 2006/02/14 19:43:16 cplattne Exp $
33  */

34
35 public class StreamGobbler extends InputStream JavaDoc
36 {
37     class GobblerThread extends Thread JavaDoc
38     {
39         public void run()
40         {
41             byte[] buff = new byte[8192];
42
43             while (true)
44             {
45                 try
46                 {
47                     int avail = is.read(buff);
48
49                     synchronized (synchronizer)
50                     {
51                         if (avail <= 0)
52                         {
53                             isEOF = true;
54                             synchronizer.notifyAll();
55                             break;
56                         }
57                         
58                         int space_available = buffer.length - write_pos;
59                         
60                         if (space_available < avail)
61                         {
62                             /* compact/resize buffer */
63
64                             int unread_size = write_pos - read_pos;
65                             int need_space = unread_size + avail;
66
67                             byte[] new_buffer = buffer;
68
69                             if (need_space > buffer.length)
70                             {
71                                 int inc = need_space / 3;
72                                 inc = (inc < 256) ? 256 : inc;
73                                 inc = (inc > 8192) ? 8192 : inc;
74                                 new_buffer = new byte[need_space + inc];
75                             }
76                             
77                             if (unread_size > 0)
78                                 System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
79
80                             buffer = new_buffer;
81                             
82                             read_pos = 0;
83                             write_pos = unread_size;
84                         }
85                         
86                         System.arraycopy(buff, 0, buffer, write_pos, avail);
87                         write_pos += avail;
88
89                         synchronizer.notifyAll();
90                     }
91                 }
92                 catch (IOException JavaDoc e)
93                 {
94                     synchronized (synchronizer)
95                     {
96                         exception = e;
97                         synchronizer.notifyAll();
98                         break;
99                     }
100                 }
101             }
102         }
103     }
104
105     private InputStream JavaDoc is;
106     private GobblerThread t;
107
108     private Object JavaDoc synchronizer = new Object JavaDoc();
109
110     private boolean isEOF = false;
111     private boolean isClosed = false;
112     private IOException JavaDoc exception = null;
113
114     private byte[] buffer = new byte[2048];
115     private int read_pos = 0;
116     private int write_pos = 0;
117
118     public StreamGobbler(InputStream JavaDoc is)
119     {
120         this.is = is;
121         t = new GobblerThread();
122         t.setDaemon(true);
123         t.start();
124     }
125
126     public int read() throws IOException JavaDoc
127     {
128         synchronized (synchronizer)
129         {
130             if (isClosed)
131                 throw new IOException JavaDoc("This StreamGobbler is closed.");
132
133             while (read_pos == write_pos)
134             {
135                 if (exception != null)
136                     throw exception;
137
138                 if (isEOF)
139                     return -1;
140
141                 try
142                 {
143                     synchronizer.wait();
144                 }
145                 catch (InterruptedException JavaDoc e)
146                 {
147                 }
148             }
149
150             int b = buffer[read_pos++] & 0xff;
151
152             return b;
153         }
154     }
155
156     public int available() throws IOException JavaDoc
157     {
158         synchronized (synchronizer)
159         {
160             if (isClosed)
161                 throw new IOException JavaDoc("This StreamGobbler is closed.");
162
163             return write_pos - read_pos;
164         }
165     }
166
167     public int read(byte[] b) throws IOException JavaDoc
168     {
169         return read(b, 0, b.length);
170     }
171
172     public void close() throws IOException JavaDoc
173     {
174         synchronized (synchronizer)
175         {
176             if (isClosed)
177                 return;
178             isClosed = true;
179             isEOF = true;
180             synchronizer.notifyAll();
181             is.close();
182         }
183     }
184
185     public int read(byte[] b, int off, int len) throws IOException JavaDoc
186     {
187         if (b == null)
188             throw new NullPointerException JavaDoc();
189
190         if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length))
191             throw new IndexOutOfBoundsException JavaDoc();
192
193         if (len == 0)
194             return 0;
195
196         synchronized (synchronizer)
197         {
198             if (isClosed)
199                 throw new IOException JavaDoc("This StreamGobbler is closed.");
200
201             while (read_pos == write_pos)
202             {
203                 if (exception != null)
204                     throw exception;
205
206                 if (isEOF)
207                     return -1;
208
209                 try
210                 {
211                     synchronizer.wait();
212                 }
213                 catch (InterruptedException JavaDoc e)
214                 {
215                 }
216             }
217
218             int avail = write_pos - read_pos;
219
220             avail = (avail > len) ? len : avail;
221
222             System.arraycopy(buffer, read_pos, b, off, avail);
223
224             read_pos += avail;
225
226             return avail;
227         }
228     }
229 }
230
Popular Tags