1 16 package org.mortbay.http.nio; 17 18 import java.io.IOException ; 19 import java.io.InputStream ; 20 import java.io.InterruptedIOException ; 21 import java.net.SocketTimeoutException ; 22 import java.nio.ByteBuffer ; 23 24 import org.apache.commons.logging.Log; 25 import org.mortbay.log.LogFactory; 26 import org.mortbay.util.LazyList; 27 28 29 34 public class ByteBufferInputStream extends InputStream 35 { 36 private static Log log= LogFactory.getLog(ByteBufferInputStream.class); 37 38 long _timeout=30000; 39 int _bufferSize; 40 ByteBuffer _buffer; 41 Object _buffers; 42 Object _recycle; 43 boolean _closed=false; 44 45 46 48 public ByteBufferInputStream(int bufferSize) 49 { 50 super(); 51 _bufferSize=bufferSize; 52 } 53 54 55 56 59 public long getTimeout() 60 { 61 return _timeout; 62 } 63 64 65 68 public void setTimeout(long l) 69 { 70 _timeout= l; 71 } 72 73 74 77 public synchronized int read() throws IOException 78 { 79 if (!waitForContent()) 80 return -1; 81 return _buffer.get(); 82 } 83 84 85 88 public synchronized int available() throws IOException 89 { 90 if (!waitForContent()) 91 return -1; 92 return _buffer.remaining(); 93 } 94 95 96 99 public synchronized void close() throws IOException 100 { 101 _closed=true; 102 this.notify(); 103 } 104 105 106 109 public synchronized void mark(int arg0) 110 { 111 } 113 114 115 118 public synchronized boolean markSupported() 119 { 120 return false; 122 } 123 124 125 128 public synchronized int read(byte[] buf, int offset, int length) 129 throws IOException 130 { 131 132 if (!waitForContent()) 133 return -1; 134 135 if (length>_buffer.remaining()) 136 length=_buffer.remaining(); 137 138 _buffer.get(buf, offset, length); 139 return length; 140 } 141 142 143 146 public synchronized int read(byte[] buf) throws IOException 147 { 148 if (!waitForContent()) 149 return -1; 150 int length=buf.length; 151 if (length>_buffer.remaining()) 152 length=_buffer.remaining(); 153 154 _buffer.get(buf, 0, length); 155 return length; 156 } 157 158 159 162 public synchronized void reset() throws IOException 163 { 164 super.reset(); 166 } 167 168 169 172 public long skip(long length) throws IOException 173 { 174 if (!waitForContent()) 175 return -1; 176 if (length>_buffer.remaining()) 177 length=_buffer.remaining(); 178 _buffer.position((int)(_buffer.position()+length)); 179 return length; 180 } 181 182 183 public synchronized void write(ByteBuffer buffer) 184 { 185 if (buffer.hasRemaining()) 186 { 187 _buffers=LazyList.add(_buffers,buffer); 188 this.notify(); 189 } 190 else 191 recycle(buffer); 192 } 193 194 195 private synchronized boolean waitForContent() 196 throws InterruptedIOException 197 { 198 if (_buffer!=null) 199 { 200 if (_buffer.hasRemaining()) 201 return true; 202 203 recycle(_buffer); 205 _buffer=null; 206 } 207 208 while(!_closed && LazyList.size(_buffers)==0) 209 { 210 try 211 { 212 this.wait(_timeout); 213 } 214 catch(InterruptedException e) 215 { 216 log.debug(e); 217 throw new InterruptedIOException (e.toString()); 218 } 219 } 220 221 if (_closed) 222 return false; 223 224 if (LazyList.size(_buffers)==0) 225 throw new SocketTimeoutException (); 226 227 _buffer=(ByteBuffer )LazyList.get(_buffers, 0); 228 _buffers=LazyList.remove(_buffers, 0); 229 230 return true; 231 } 232 233 234 235 238 public synchronized ByteBuffer getBuffer() 239 { 240 ByteBuffer buf=null; 241 int s=LazyList.size(_recycle); 242 if (s>0) 243 { 244 s--; 245 buf=(ByteBuffer )LazyList.get(_recycle, s); 246 _recycle=LazyList.remove(_recycle,s); 247 buf.clear(); 248 } 249 else 250 { 251 buf=ByteBuffer.allocateDirect(_bufferSize); 252 } 253 return buf; 254 } 255 256 257 public synchronized void recycle(ByteBuffer buf) 258 { 259 _recycle=LazyList.add(_recycle,buf); 260 } 261 262 263 public void destroy() 264 { 265 _buffer=null; 266 _buffers=null; 267 _recycle=null; 268 } 269 270 271 } 272 | Popular Tags |