1 45 package org.exolab.jms.net.multiplexer; 46 47 import java.io.IOException ; 48 import java.io.OutputStream ; 49 50 import org.apache.commons.logging.Log; 51 import org.apache.commons.logging.LogFactory; 52 53 54 65 class MultiplexOutputStream extends OutputStream implements Constants { 66 67 70 private final int _channelId; 71 72 75 private byte _type; 76 77 80 private Multiplexer _multiplexer; 81 82 85 private byte[] _data; 86 87 90 private int _index; 91 92 95 private int _remoteSpace; 96 97 100 private final int _maxRemoteSpace; 101 102 105 private boolean _disconnected; 106 107 110 private final Object _lock = new Object (); 111 112 115 private static final Log _log = 116 LogFactory.getLog(MultiplexOutputStream.class); 117 118 119 127 public MultiplexOutputStream(int channelId, Multiplexer multiplexer, 128 int size, int remoteSize) { 129 _channelId = channelId; 130 _multiplexer = multiplexer; 131 _data = new byte[size]; 132 _maxRemoteSpace = remoteSize; 133 _remoteSpace = remoteSize; 134 } 135 136 141 public void setType(byte type) { 142 _type = type; 143 } 144 145 151 public void close() throws IOException { 152 flush(); 153 } 154 155 161 public void flush() throws IOException { 162 int offset = 0; 163 int length = _index; 164 while (offset < _index) { 165 int available = waitForSpace(); 166 int size = (length <= available) ? length : available; 167 168 send(_data, offset, size); 169 offset += size; 170 length -= size; 171 } 172 _index = 0; 173 } 174 175 184 public void write(byte[] buffer, int offset, int length) 185 throws IOException { 186 187 int space = _data.length - _index; 188 if (space >= length) { 189 System.arraycopy(buffer, offset, _data, _index, length); 191 _index += length; 192 } else { 193 flush(); 194 int size = length; 195 while (size > 0) { 197 int available = waitForSpace(); 198 int count = (size <= available) ? size : available; 199 send(buffer, offset, count); 200 offset += count; 201 size -= count; 202 } 203 } 204 } 205 206 212 public void write(int value) throws IOException { 213 if (_index >= _data.length) { 214 flush(); 215 } 216 _data[_index++] = (byte) value; 217 } 218 219 225 public void notifyRead(int read) throws IOException { 226 synchronized (_lock) { 227 int space = _remoteSpace + read; 228 if (space > _maxRemoteSpace) { 229 throw new IOException ("Remote space=" + space 230 + " exceeds expected space=" 231 + _maxRemoteSpace); 232 } 233 _remoteSpace = space; 234 235 if (_log.isDebugEnabled()) { 236 _log.debug("notifyRead(read=" + read 237 + ") [channelId=" + _channelId 238 + ", remoteSpace=" + _remoteSpace 239 + "]"); 240 } 241 _lock.notifyAll(); 242 } 243 } 244 245 248 public void disconnected() { 249 synchronized (_lock) { 250 _disconnected = true; 251 _lock.notifyAll(); 252 } 253 } 254 255 260 public String toString() { 261 return "MultiplexOutputStream[index=" + _index + "]"; 262 } 263 264 273 private void send(byte[] buffer, int offset, int length) 274 throws IOException { 275 if (_log.isDebugEnabled()) { 276 _log.debug("send(length=" + length + ") [channelId=" + _channelId 277 + ", remoteSpace=" + _remoteSpace 278 + "]"); 279 } 280 synchronized (_lock) { 281 _multiplexer.send(_type, _channelId, buffer, offset, length); 282 _type = DATA; 283 284 _remoteSpace -= length; 285 286 301 } 302 } 303 304 311 private int waitForSpace() throws IOException { 312 int available = 0; 313 while (!_disconnected) { 314 synchronized (_lock) { 315 if (_log.isDebugEnabled()) { 316 _log.debug("waitForSpace() [channelId=" + _channelId 317 + ", remoteSpace=" + _remoteSpace 318 + "]"); 319 } 320 321 if (_remoteSpace > 0) { 322 available = _remoteSpace; 323 break; 324 } else { 325 try { 326 _lock.wait(); 327 } catch (InterruptedException ignore) { 328 } 329 } 330 } 331 } 332 if (_disconnected) { 333 throw new IOException ("Connection has been closed"); 334 } 335 336 return available; 337 } 338 339 } 340 | Popular Tags |