1 45 package org.exolab.jms.net.multiplexer; 46 47 import java.io.DataInputStream ; 48 import java.io.IOException ; 49 import java.io.InputStream ; 50 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 54 55 66 class MultiplexInputStream extends InputStream implements Constants { 67 68 71 private final int _channelId; 72 73 76 private Multiplexer _multiplexer; 77 78 81 private byte[] _data; 82 83 86 private byte[] _byte = new byte[1]; 87 88 91 private int _index = 0; 92 93 96 private int _available = 0; 97 98 101 private boolean _disconnected = false; 102 103 106 private final int _lowWaterMark; 107 108 112 private int _read = 0; 113 114 117 private final Object _lock = new Object (); 118 119 122 private final Log _log = LogFactory.getLog(MultiplexInputStream.class); 123 124 125 132 public MultiplexInputStream(int channelId, Multiplexer multiplexer, 133 int size) { 134 _channelId = channelId; 135 _multiplexer = multiplexer; 136 _data = new byte[size]; 137 _lowWaterMark = size / 2; 138 } 139 140 143 public void close() { 144 } 145 146 151 public void destroy() throws IOException { 152 synchronized (_lock) { 154 if (!_disconnected) { 155 } 157 } 158 _multiplexer = null; 159 _data = null; 160 } 161 162 174 public int read() throws IOException { 175 final int mask = 0xFF; 176 int count = read(_byte, 0, 1); 177 return (count == 1) ? _byte[0] & mask : -1; 178 } 179 180 204 public int read(byte[] buffer, int offset, int length) throws IOException { 205 int count = 0; 206 if (length > 0) { 207 synchronized (_lock) { 208 count = (length <= _available) ? length : _available; 209 if (_log.isDebugEnabled()) { 210 _log.debug("read(length=" + length + ") [channelId=" 211 + _channelId 212 + ", available=" + _available + "]"); 213 } 214 215 if (count > 0) { 216 copy(buffer, offset, count); 218 } 219 220 if (count < length) { 221 int more = length - count; 223 while ((_available < more) && !_disconnected) { 224 if (_log.isDebugEnabled()) { 225 _log.debug("read() waiting on data [channelId=" 226 + _channelId 227 + ", available=" + _available 228 + ", requested=" + more + "]"); 229 } 230 231 try { 232 _lock.wait(); 233 } catch (InterruptedException ignore) { 234 } 235 } 236 237 if (_available > 0) { 238 more = (more <= _available) ? more : _available; 240 offset += count; 241 copy(buffer, offset, more); 242 count += more; 243 } 244 } 245 246 if ((count == 0) && _disconnected) { 247 count = -1; 250 } 251 } 252 } 253 return count; 254 } 255 256 264 public int available() { 265 int result; 266 synchronized (_lock) { 267 result = _available; 268 } 269 return result; 270 } 271 272 275 public void disconnected() { 276 synchronized (_lock) { 277 _disconnected = true; 278 _lock.notifyAll(); 279 } 280 } 281 282 287 public String toString() { 288 return "MultiplexInputStream[available=" + _available + "]"; 289 } 290 291 298 protected void receive(DataInputStream input, int length) 299 throws IOException { 300 301 synchronized (_lock) { 302 int space = _data.length - _available; 303 if (length > space) { 304 throw new IOException ("Buffer overflow: buffer size=" 305 + _data.length 306 + ", space available=" + space 307 + ", requested size=" + length); 308 } 309 310 int freeAtEnd = _data.length - (_index + _available); 311 if (length > freeAtEnd) { 312 System.arraycopy(_data, _index, _data, 0, _available); 315 _index = 0; 316 } 317 input.readFully(_data, _index + _available, length); 318 319 if (_log.isDebugEnabled()) { 320 _log.debug("receive(length=" + length 321 + ") [channelId=" + _channelId 322 + ", available=" + _available 323 + ", space=" + (_data.length - _available) + "]"); 324 325 338 } 339 340 _available += length; 341 342 _lock.notifyAll(); 343 } 344 } 345 346 361 private void copy(byte[] buffer, int offset, int length) 362 throws IOException { 363 364 System.arraycopy(_data, _index, buffer, offset, length); 365 _index += length; 366 _available -= length; 367 _read += length; 368 if (_read >= _lowWaterMark) { 369 notifyRead(); 370 } 371 } 372 373 378 private void notifyRead() throws IOException { 379 if (_log.isDebugEnabled()) { 380 _log.debug("notifyRead() [channelId=" + _channelId 381 + ", read=" + _read + "]"); 382 } 383 _multiplexer.send(FLOW_READ, _channelId, _read); 384 _read = 0; 385 } 386 387 } 388 | Popular Tags |