KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > net > multiplexer > MultiplexOutputStream


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: MultiplexOutputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
44  */

45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.IOException JavaDoc;
48 import java.io.OutputStream JavaDoc;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52
53
54 /**
55  * An <code>OutputStream</code> which multiplexes data over a shared physical
56  * connection, managed by a {@link Multiplexer}.
57  * <p/>
58  * <em>NOTE:</em> the <code>OutputStream</code> methods of this class are not
59  * thread safe
60  *
61  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
62  * @version $Revision: 1.2 $ $Date: 2005/04/02 13:23:12 $
63  * @see Multiplexer
64  */

65 class MultiplexOutputStream extends OutputStream JavaDoc implements Constants {
66
67     /**
68      * The channel identifier, used to associate packets with a channel.
69      */

70     private final int _channelId;
71
72     /**
73      * The packet type.
74      */

75     private byte _type;
76
77     /**
78      * The multiplexer which handles this stream's output.
79      */

80     private Multiplexer _multiplexer;
81
82     /**
83      * The local data buffer.
84      */

85     private byte[] _data;
86
87     /**
88      * The current index into <code>_data</code>.
89      */

90     private int _index;
91
92     /**
93      * The no. of bytes that the remote endpoint can currently accept.
94      */

95     private int _remoteSpace;
96
97     /**
98      * The maximum no. of bytes that the remote endpoint can accept.
99      */

100     private final int _maxRemoteSpace;
101
102     /**
103      * Indicates if the underlying connection has been closed.
104      */

105     private boolean _disconnected;
106
107     /**
108      * Synchronization helper.
109      */

110     private final Object JavaDoc _lock = new Object JavaDoc();
111
112     /**
113      * The logger.
114      */

115     private static final Log _log =
116             LogFactory.getLog(MultiplexOutputStream.class);
117
118
119     /**
120      * Construct a new <code>MultiplexOutputStream</code>.
121      *
122      * @param channelId the channel identifier
123      * @param multiplexer the multiplexer which handles this stream's output
124      * @param size the size of the local data buffer
125      * @param remoteSize the size of the remote endpoint's data buffer
126      */

127     public MultiplexOutputStream(int channelId, Multiplexer multiplexer,
128                                  int size, int remoteSize) {
129         _channelId = channelId;
130         _multiplexer = multiplexer;
131         _data = new byte[size];
132         _maxRemoteSpace = remoteSize;
133         _remoteSpace = remoteSize;
134     }
135
136     /**
137      * Set the packet type.
138      *
139      * @param type the packet type
140      */

141     public void setType(byte type) {
142         _type = type;
143     }
144
145     /**
146      * This implementation flushes the stream, rather than closing it, as the
147      * stream is re-used.
148      *
149      * @throws IOException if an I/O error occurs
150      */

151     public void close() throws IOException JavaDoc {
152         flush();
153     }
154
155     /**
156      * Flushes this output stream and forces any buffered output bytes to be
157      * written out.
158      *
159      * @throws IOException if an I/O error occurs
160      */

161     public void flush() throws IOException JavaDoc {
162         int offset = 0;
163         int length = _index;
164         while (offset < _index) {
165             int available = waitForSpace();
166             int size = (length <= available) ? length : available;
167
168             send(_data, offset, size);
169             offset += size;
170             length -= size;
171         }
172         _index = 0;
173     }
174
175     /**
176      * Writes length bytes from the specified byte array starting at offset to
177      * this output stream.
178      *
179      * @param buffer the data to write
180      * @param offset the start offset in the data
181      * @param length the number of bytes to write
182      * @throws IOException if an I/O error occurs
183      */

184     public void write(byte[] buffer, int offset, int length)
185             throws IOException JavaDoc {
186
187         int space = _data.length - _index;
188         if (space >= length) {
189             // got enough space, so copy it to the buffer
190
System.arraycopy(buffer, offset, _data, _index, length);
191             _index += length;
192         } else {
193             flush();
194             int size = length;
195             // send the buffer, when the endpoint has enough free space
196
while (size > 0) {
197                 int available = waitForSpace();
198                 int count = (size <= available) ? size : available;
199                 send(buffer, offset, count);
200                 offset += count;
201                 size -= count;
202             }
203         }
204     }
205
206     /**
207      * Writes the specified byte to this output stream.
208      *
209      * @param value the byte value
210      * @throws IOException if an I/O error occurs
211      */

212     public void write(int value) throws IOException JavaDoc {
213         if (_index >= _data.length) {
214             flush();
215         }
216         _data[_index++] = (byte) value;
217     }
218
219     /**
220      * Notify this of the no. of bytes read by the remote endpoint.
221      *
222      * @param read the number of bytes read
223      * @throws IOException if the no. of bytes exceeds that expected
224      */

225     public void notifyRead(int read) throws IOException JavaDoc {
226         synchronized (_lock) {
227             int space = _remoteSpace + read;
228             if (space > _maxRemoteSpace) {
229                 throw new IOException JavaDoc("Remote space=" + space
230                                       + " exceeds expected space="
231                                       + _maxRemoteSpace);
232             }
233             _remoteSpace = space;
234
235             if (_log.isDebugEnabled()) {
236                 _log.debug("notifyRead(read=" + read
237                            + ") [channelId=" + _channelId
238                            + ", remoteSpace=" + _remoteSpace
239                            + "]");
240             }
241             _lock.notifyAll();
242         }
243     }
244
245     /**
246      * Invoked when the underlying physical connection is closed.
247      */

248     public void disconnected() {
249         synchronized (_lock) {
250             _disconnected = true;
251             _lock.notifyAll();
252         }
253     }
254
255     /**
256      * Returns a string representation of this.
257      *
258      * @return a string representation of this
259      */

260     public String JavaDoc toString() {
261         return "MultiplexOutputStream[index=" + _index + "]";
262     }
263
264     /**
265      * Sends length bytes from the specified byte array starting at offset to
266      * the endpoint.
267      *
268      * @param buffer the data to write
269      * @param offset the start offset in the data
270      * @param length the number of bytes to write
271      * @throws IOException if an I/O error occurs
272      */

273     private void send(byte[] buffer, int offset, int length)
274             throws IOException JavaDoc {
275         if (_log.isDebugEnabled()) {
276             _log.debug("send(length=" + length + ") [channelId=" + _channelId
277                        + ", remoteSpace=" + _remoteSpace
278                        + "]");
279         }
280         synchronized (_lock) {
281             _multiplexer.send(_type, _channelId, buffer, offset, length);
282             _type = DATA;
283
284             _remoteSpace -= length;
285
286 /*
287             if (_log.isDebugEnabled()) {
288                 StringBuffer buf = new StringBuffer();
289                 for (int i = 0; i < length; ++i) {
290                     if (i > 0) {
291                         buf.append(", ");
292                     }
293                     final int mask = 0xff;
294                     int value = buffer[offset + i] & mask;
295                     buf.append(Integer.toHexString(value));
296                 }
297                 _log.debug("send[channelId=" + _channelId + "], length="
298                            + length + ", data=" + buf);
299             }
300 */

301         }
302     }
303
304     /**
305      * Returns immediately if the endpoint can receive data, otherwise blocks,
306      * waiting for the endpoint to have space available.
307      *
308      * @return the number of bytes that the endpoint can accept
309      * @throws IOException if the connection is closed while blocking
310      */

311     private int waitForSpace() throws IOException JavaDoc {
312         int available = 0;
313         while (!_disconnected) {
314             synchronized (_lock) {
315                 if (_log.isDebugEnabled()) {
316                     _log.debug("waitForSpace() [channelId=" + _channelId
317                                + ", remoteSpace=" + _remoteSpace
318                                + "]");
319                 }
320
321                 if (_remoteSpace > 0) {
322                     available = _remoteSpace;
323                     break;
324                 } else {
325                     try {
326                         _lock.wait();
327                     } catch (InterruptedException JavaDoc ignore) {
328                     }
329                 }
330             }
331         }
332         if (_disconnected) {
333             throw new IOException JavaDoc("Connection has been closed");
334         }
335
336         return available;
337     }
338
339 }
340
Popular Tags