KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > net > multiplexer > MultiplexInputStream


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: MultiplexInputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
44  */

45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.DataInputStream JavaDoc;
48 import java.io.IOException JavaDoc;
49 import java.io.InputStream JavaDoc;
50
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53
54
55 /**
56  * An <code>InputStream</code> which reads multiplexed data over a shared
57  * physical connection, managed by a {@link Multiplexer}.
58  * <p/>
59  * <em>NOTE:</em> the <code>InputStream</code> methods of this class are not
60  * thread safe
61  *
62  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
63  * @version $Revision: 1.2 $
64  * @see Multiplexer
65  */

66 class MultiplexInputStream extends InputStream JavaDoc implements Constants {
67
68     /**
69      * The channel identifier.
70      */

71     private final int _channelId;
72
73     /**
74      * The multiplexer.
75      */

76     private Multiplexer _multiplexer;
77
78     /**
79      * The local data buffer.
80      */

81     private byte[] _data;
82
83     /**
84      * Temporary buffer for single byte reads.
85      */

86     private byte[] _byte = new byte[1];
87
88     /**
89      * The index into <code>_data</code> where data starts.
90      */

91     private int _index = 0;
92
93     /**
94      * The number of available bytes in <code>_data</code>.
95      */

96     private int _available = 0;
97
98     /**
99      * Indicates if the underyling connection has been closed.
100      */

101     private boolean _disconnected = false;
102
103     /**
104      * The no. of bytes to read before notifying the remote endpoint.
105      */

106     private final int _lowWaterMark;
107
108     /**
109      * The number of bytes read from this stream since the last.
110      * <code>notifyRead()</code> call
111      */

112     private int _read = 0;
113
114     /**
115      * Synchronization helper.
116      */

117     private final Object JavaDoc _lock = new Object JavaDoc();
118
119     /**
120      * The logger.
121      */

122     private final Log _log = LogFactory.getLog(MultiplexInputStream.class);
123
124
125     /**
126      * Construct a new <code>MultiplexInputStream</code>.
127      *
128      * @param channelId the channel identifier
129      * @param multiplexer the multiplexer
130      * @param size the size of the local data buffer
131      */

132     public MultiplexInputStream(int channelId, Multiplexer multiplexer,
133                                 int size) {
134         _channelId = channelId;
135         _multiplexer = multiplexer;
136         _data = new byte[size];
137         _lowWaterMark = size / 2;
138     }
139
140     /**
141      * This implementation is a no-op, as the stream is re-used.
142      */

143     public void close() {
144     }
145
146     /**
147      * Closes this input stream and releases any resources associated with it.
148      *
149      * @throws IOException if an I/O error occurs
150      */

151     public void destroy() throws IOException JavaDoc {
152         // notify the endpoint iff it hasn't notified this of disconnection
153
synchronized (_lock) {
154             if (!_disconnected) {
155                 //_multiplexer.closed(this);
156
}
157         }
158         _multiplexer = null;
159         _data = null;
160     }
161
162     /**
163      * Reads the next byte of data from the input stream. The value byte is
164      * returned as an <code>int</code> in the range <code>0</code> to
165      * <code>255</code>. If no byte is available because the end of the stream
166      * has been reached, the value <code>-1</code> is returned. This method
167      * blocks until input data is available, the end of the stream is detected,
168      * or an exception is thrown.
169      *
170      * @return the next byte of data, or <code>-1</code> if the end of the
171      * stream is reached.
172      * @throws IOException if an I/O error occurs.
173      */

174     public int read() throws IOException JavaDoc {
175         final int mask = 0xFF;
176         int count = read(_byte, 0, 1);
177         return (count == 1) ? _byte[0] & mask : -1;
178     }
179
180     /**
181      * Reads up to <code>length</code> bytes of data from the input stream into
182      * an array of bytes. An attempt is made to read as many as
183      * <code>length</code> bytes, but a smaller number may be read, possibly
184      * zero. The number of bytes actually read is returned as an integer.
185      * <p/>
186      * <p> If the first byte cannot be read for any reason other than end of
187      * file, then an <code>IOException</code> is thrown. In particular, an
188      * <code>IOException</code> is thrown if the input stream has been closed.
189      *
190      * @param buffer the buffer into which the data is read
191      * @param offset the start offset in array <code>buffer</code> at which the
192      * data is written
193      * @param length the maximum number of bytes to read
194      * @return the total number of bytes read into the buffer, or
195      * <code>-1</code> if there is no more data because the end of the
196      * stream has been reached.
197      * @throws IOException if an I/O error occurs.
198      * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
199      * <code>length</code> is negative, or
200      * <code>offset+length</code> is greater
201      * than the length of the array
202      * @throws NullPointerException if <code>buffer</code> is null
203      */

204     public int read(byte[] buffer, int offset, int length) throws IOException JavaDoc {
205         int count = 0;
206         if (length > 0) {
207             synchronized (_lock) {
208                 count = (length <= _available) ? length : _available;
209                 if (_log.isDebugEnabled()) {
210                     _log.debug("read(length=" + length + ") [channelId="
211                                + _channelId
212                                + ", available=" + _available + "]");
213                 }
214
215                 if (count > 0) {
216                     // copy the available data into the buffer
217
copy(buffer, offset, count);
218                 }
219
220                 if (count < length) {
221                     // wait for more data to become available
222
int more = length - count;
223                     while ((_available < more) && !_disconnected) {
224                         if (_log.isDebugEnabled()) {
225                             _log.debug("read() waiting on data [channelId="
226                                        + _channelId
227                                        + ", available=" + _available
228                                        + ", requested=" + more + "]");
229                         }
230
231                         try {
232                             _lock.wait();
233                         } catch (InterruptedException JavaDoc ignore) {
234                         }
235                     }
236
237                     if (_available > 0) {
238                         // more data available, so copy it
239
more = (more <= _available) ? more : _available;
240                         offset += count;
241                         copy(buffer, offset, more);
242                         count += more;
243                     }
244                 }
245
246                 if ((count == 0) && _disconnected) {
247                     // no data was read, and we were disconnected. Indicate
248
// end of stream to user
249
count = -1;
250                 }
251             }
252         }
253         return count;
254     }
255
256     /**
257      * Returns the number of bytes that can be read (or skipped over) from this
258      * input stream without blocking by the next caller of a method for this
259      * input stream.
260      *
261      * @return the number of bytes that can be read from this input stream
262      * without blocking.
263      */

264     public int available() {
265         int result;
266         synchronized (_lock) {
267             result = _available;
268         }
269         return result;
270     }
271
272     /**
273      * Invoked when the underlying physical connection is closed.
274      */

275     public void disconnected() {
276         synchronized (_lock) {
277             _disconnected = true;
278             _lock.notifyAll();
279         }
280     }
281
282     /**
283      * Returns a string representation of this.
284      *
285      * @return a string representation of this
286      */

287     public String JavaDoc toString() {
288         return "MultiplexInputStream[available=" + _available + "]";
289     }
290
291     /**
292      * Invoked by {@link Multiplexer} when data is available for this stream.
293      *
294      * @param input the stream to read data from
295      * @param length the number of bytes to read
296      * @throws IOException if an I/O error occurs
297      */

298     protected void receive(DataInputStream JavaDoc input, int length)
299             throws IOException JavaDoc {
300
301         synchronized (_lock) {
302             int space = _data.length - _available;
303             if (length > space) {
304                 throw new IOException JavaDoc("Buffer overflow: buffer size="
305                                       + _data.length
306                                       + ", space available=" + space
307                                       + ", requested size=" + length);
308             }
309
310             int freeAtEnd = _data.length - (_index + _available);
311             if (length > freeAtEnd) {
312                 // make space at the end of the buffer, by shuffling data
313
// to the start
314
System.arraycopy(_data, _index, _data, 0, _available);
315                 _index = 0;
316             }
317             input.readFully(_data, _index + _available, length);
318
319             if (_log.isDebugEnabled()) {
320                 _log.debug("receive(length=" + length
321                            + ") [channelId=" + _channelId
322                            + ", available=" + _available
323                            + ", space=" + (_data.length - _available) + "]");
324
325 /*
326                 StringBuffer buf = new StringBuffer();
327                 for (int i = 0; i < length; ++i) {
328                     if (i > 0) {
329                       buf.append(", ");
330                     }
331                     final int mask = 0xff;
332                     int value = _data[_index + i + _available] & mask;
333                     buf.append(Integer.toHexString(value));
334                 }
335                 _log.debug("receive[channelId=" + _channelId
336                            + "], length=" + length + ", data=" + buf);
337 */

338             }
339
340             _available += length;
341
342             _lock.notifyAll();
343         }
344     }
345
346     /**
347      * Helper to copy data to a user buffer, notifying the remote endpoint if
348      * more data should be sent.
349      *
350      * @param buffer the buffer into which the data is read
351      * @param offset the start offset in array <code>buffer</code> at which the
352      * data is written
353      * @param length the maximum number of bytes to read
354      * @throws IOException if an I/O error occurs.
355      * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
356      * <code>length</code> is negative, or
357      * <code>offset+length</code> is greater
358      * than the length of the array
359      * @throws NullPointerException if <code>buffer</code> is null
360      */

361     private void copy(byte[] buffer, int offset, int length)
362             throws IOException JavaDoc {
363
364         System.arraycopy(_data, _index, buffer, offset, length);
365         _index += length;
366         _available -= length;
367         _read += length;
368         if (_read >= _lowWaterMark) {
369             notifyRead();
370         }
371     }
372
373     /**
374      * Notify the remote endpoint of the current no. of bytes read.
375      *
376      * @throws IOException if the notification fails
377      */

378     private void notifyRead() throws IOException JavaDoc {
379         if (_log.isDebugEnabled()) {
380             _log.debug("notifyRead() [channelId=" + _channelId
381                        + ", read=" + _read + "]");
382         }
383         _multiplexer.send(FLOW_READ, _channelId, _read);
384         _read = 0;
385     }
386
387 }
388
Popular Tags