KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > net > multiplexer > Multiplexer


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: Multiplexer.java,v 1.6 2005/06/04 14:36:14 tanderson Exp $
44  */

45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.DataInputStream JavaDoc;
48 import java.io.DataOutputStream JavaDoc;
49 import java.io.IOException JavaDoc;
50 import java.io.InterruptedIOException JavaDoc;
51 import java.net.ProtocolException JavaDoc;
52 import java.security.Principal JavaDoc;
53 import java.util.HashMap JavaDoc;
54 import java.util.LinkedList JavaDoc;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
59
60 import org.exolab.jms.common.security.BasicPrincipal;
61 import org.exolab.jms.net.connector.Authenticator;
62 import org.exolab.jms.net.connector.ResourceException;
63 import org.exolab.jms.net.connector.SecurityException;
64
65
66 /**
67  * This class multiplexes data over a physical connection.
68  *
69  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
70  * @version $Revision: 1.6 $ $Date: 2005/06/04 14:36:14 $
71  */

72 public class Multiplexer implements Constants, Runnable JavaDoc {
73
74     /**
75      * The listener to notify.
76      */

77     private MultiplexerListener _listener;
78
79     /**
80      * If <code>true</code>, indicates that the multiplexer has been closed.
81      */

82     private volatile boolean _closed;
83
84     /**
85      * The endpoint.
86      */

87     private Endpoint _endpoint;
88
89     /**
90      * The endpoint's output stream.
91      */

92     private DataOutputStream JavaDoc _out;
93
94     /**
95      * The endpoint's input stream.
96      */

97     private DataInputStream JavaDoc _in;
98
99     /**
100      * The set of channels managed by this, keyed on channel identifier.
101      */

102     private HashMap JavaDoc _channels = new HashMap JavaDoc();
103
104     /**
105      * The set of free channels, keyed on channel identifier.
106      */

107     private LinkedList JavaDoc _free = new LinkedList JavaDoc();
108
109     /**
110      * If <code>true</code>, indicates that the physical connection was opened
111      * (client), rather than accepted (server). This is used in channel
112      * identifier generation
113      */

114     private boolean _client = false;
115
116     /**
117      * The channel identifier seed.
118      */

119     private int _seed = 0;
120
121     /**
122      * The thread pool for scheduling invocation requests.
123      */

124     private PooledExecutor _pool;
125
126     /**
127      * The principal that owns the connection, or <code>null</code>,
128      * if this is an unauthenticated connection.
129      */

130     private Principal JavaDoc _principal;
131
132     /**
133      * The sending and receiving buffer size, in bytes.
134      */

135     private static final int BUFFER_SIZE = 2048;
136
137     /**
138      * The logger.
139      */

140     private static final Log _log = LogFactory.getLog(Multiplexer.class);
141
142     /**
143      * Construct a new client-side <code>Multiplexer</code>.
144      *
145      * @param listener the multiplexer listener
146      * @param endpoint the endpoint to multiplex messages over
147      * @param principal the security principal
148      * @param pool thread pool for handling invocation requests
149      * @throws IOException if an I/O error occurs
150      * @throws SecurityException if connection is refused by the server
151      */

152     public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
153                        Principal JavaDoc principal, PooledExecutor pool)
154             throws IOException JavaDoc, SecurityException JavaDoc {
155         initialise(listener, endpoint, pool, true);
156         authenticate(principal);
157     }
158
159     /**
160      * Construct a new server-side <code>Multiplexer</code>.
161      *
162      * @param listener the multiplexer listener
163      * @param endpoint the endpoint to multiplex messages over
164      * @param authenticator the connection authenticator
165      * @param pool thread pool for handling invocation requests
166      * @throws IOException if an I/O error occurs
167      * @throws ResourceException if the authenticator cannot authenticate
168      */

169     public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
170                        Authenticator authenticator,
171                        PooledExecutor pool)
172             throws IOException JavaDoc, ResourceException {
173         initialise(listener, endpoint, pool, false);
174         authenticate(authenticator);
175     }
176
177     /**
178      * Construct a new <code>Multiplexer</code>.
179      * <p/>
180      * This constructor is provided for subclasses that must perform setup
181      * work prior to invoking {@link #initialise}
182      */

183     protected Multiplexer() {
184     }
185
186     /**
187      * Start multiplexing.
188      */

189     public void run() {
190         while (!_closed) {
191             multiplex();
192         }
193     }
194
195     /**
196      * Returns a free channel from the pool, opening a new one if none are
197      * available.
198      *
199      * @return a free channel
200      * @throws IOException if an I/O error occurs
201      */

202     public Channel getChannel() throws IOException JavaDoc {
203         Channel channel = null;
204
205         synchronized (_free) {
206             if (!_free.isEmpty()) {
207                 channel = (Channel) _free.removeFirst();
208             }
209         }
210
211         if (channel == null) {
212             channel = open();
213         }
214
215         return channel;
216     }
217
218     /**
219      * Releases a channel back to the pool.
220      *
221      * @param channel the channel to release
222      */

223     public void release(Channel channel) {
224         synchronized (_free) {
225             _free.add(channel);
226         }
227     }
228
229     /**
230      * Close a channel.
231      *
232      * @param channel the channel to close
233      * @throws IOException if an I/O error occurs
234      */

235     public void close(Channel channel) throws IOException JavaDoc {
236         int channelId = channel.getId();
237         synchronized (_channels) {
238             _channels.remove(new Integer JavaDoc(channelId));
239         }
240
241         send(CLOSE, channelId);
242     }
243
244     /**
245      * Send a message.
246      *
247      * @param type the packet type
248      * @throws IOException if an I/O error occurs
249      */

250     public void send(byte type) throws IOException JavaDoc {
251         synchronized (_out) {
252             _out.writeByte(type);
253             _out.flush();
254             if (_log.isDebugEnabled()) {
255                 _log.debug("send(type=0x" + Integer.toHexString(type) + ")");
256             }
257         }
258     }
259
260     /**
261      * Send a message.
262      *
263      * @param type the packet type
264      * @param channelId the identifier of the channel sending the message
265      * @throws IOException if an I/O error occurs
266      */

267     public void send(byte type, int channelId) throws IOException JavaDoc {
268         synchronized (_out) {
269             _out.writeByte(type);
270             _out.writeShort(channelId);
271             _out.flush();
272             if (_log.isDebugEnabled()) {
273                 _log.debug("send(type=0x" + Integer.toHexString(type)
274                            + ", channel=" + channelId + ")");
275             }
276         }
277     }
278
279     /**
280      * Send a message.
281      *
282      * @param type the packet type
283      * @param channelId the identifier of the channel sending the message
284      * @param data the data to send
285      * @throws IOException if an I/O error occurs
286      */

287     public void send(byte type, int channelId, int data) throws IOException JavaDoc {
288         synchronized (_out) {
289             _out.writeByte(type);
290             _out.writeShort(channelId);
291             _out.writeInt(data);
292             _out.flush();
293             if (_log.isDebugEnabled()) {
294                 _log.debug("send(type=" + type + ", channel=" + channelId
295                            + ", data=" + Integer.toHexString(data) + ")");
296             }
297         }
298     }
299
300     /**
301      * Send a message.
302      *
303      * @param type the packet type
304      * @param channelId the identifier of the channel sending the message
305      * @param data the data to send
306      * @param offset the offset into the data
307      * @param length the length of data
308      * @throws IOException if an I/O error occurs
309      */

310     public void send(byte type, int channelId, byte[] data, int offset,
311                      int length) throws IOException JavaDoc {
312         synchronized (_out) {
313             _out.writeByte(type);
314             _out.writeShort(channelId);
315             _out.writeInt(length);
316             _out.write(data, offset, length);
317             _out.flush();
318         }
319     }
320
321     /**
322      * Close the multiplexer, releasing any resources. This closes the socket
323      * and waits for the thread to terminate.
324      */

325     public void close() {
326         if (!_closed) {
327             _closed = true;
328             try {
329                 send(SHUTDOWN);
330             } catch (IOException JavaDoc exception) {
331                 _log.debug(exception);
332             }
333             try {
334                 _endpoint.close();
335             } catch (IOException JavaDoc exception) {
336             _log.debug(exception);
337             }
338             // _pool.shutdownAfterProcessingCurrentlyQueuedTasks();
339
// @todo - as the pool is shared, need to block for
340
// tasks queued by this
341
}
342     }
343
344     /**
345      * Determines if the multiplexer is closed.
346      *
347      * @return <code>true</code> if the multiplexer is closed
348      */

349     public boolean isClosed() {
350         return _closed;
351     }
352
353     /**
354      * Determines if this is a client-side instance.
355      *
356      * @return <code>true</code> if this is a client-side instance,
357      * <code>false</code> if it is a server=side instance
358      */

359     public boolean isClient() {
360         return _client;
361     }
362
363     /**
364      * Returns the principal that owns the connection.
365      *
366      * @return the principal that owns the connection, or <code>null<code>
367      * if this is an unauthenticated connection
368      */

369     public Principal JavaDoc getPrincipal() {
370         return _principal;
371     }
372
373     /**
374      * Initialise the multiplexer.
375      *
376      * @param listener the multiplexer listener
377      * @param endpoint the endpoint to multiplex messages over
378      * @param pool thread pool for handling ivocation requests
379      * @param client determines if this is a client-side or server-side
380      * instance
381      * @throws IOException if an I/O error occurs
382      */

383     protected void initialise(MultiplexerListener listener, Endpoint endpoint,
384                               PooledExecutor pool, boolean client)
385             throws IOException JavaDoc {
386
387
388         if (listener == null) {
389             throw new IllegalArgumentException JavaDoc("Argument 'listener' is null");
390         }
391         if (endpoint == null) {
392             throw new IllegalArgumentException JavaDoc("Argument 'endpoint' is null");
393         }
394         if (pool == null) {
395             throw new IllegalArgumentException JavaDoc("Argument 'pool' is null");
396         }
397         if (_log.isDebugEnabled()) {
398             _log.debug("Multiplexer(uri=" + endpoint.getURI()
399                        + ", client=" + client);
400         }
401         _listener = listener;
402         _endpoint = endpoint;
403         _pool = pool;
404         _out = new DataOutputStream JavaDoc(endpoint.getOutputStream());
405         _in = new DataInputStream JavaDoc(endpoint.getInputStream());
406         _client = client;
407         handshake(_out, _in);
408     }
409
410     /**
411      * Perform handshaking on initial connection, to verify protocol. Subclasses
412      * may extend this behaviour.
413      *
414      * @param out the endpoint's output stream
415      * @param in the endpoint's input stream
416      * @throws IOException for any I/O error
417      */

418     protected void handshake(DataOutputStream JavaDoc out, DataInputStream JavaDoc in)
419             throws IOException JavaDoc {
420         out.writeInt(MAGIC);
421         out.writeInt(VERSION);
422         out.flush();
423
424         int magic = in.readInt();
425         if (magic != MAGIC) {
426             throw new ProtocolException JavaDoc("Expected protocol magic=" + MAGIC
427                                         + ", but received=" + magic);
428         }
429         int version = in.readInt();
430         if (version != VERSION) {
431             throw new ProtocolException JavaDoc("Expected protocol version=" + VERSION
432                                         + ", but received=" + version);
433         }
434     }
435
436     /**
437      * Perform authentication on initial connection.
438      *
439      * @param principal the security principal. May be <code>null</code>
440      * @throws IOException for any I/O error
441      * @throws SecurityException if connection is refused by the server
442      */

443     protected void authenticate(Principal JavaDoc principal)
444             throws IOException JavaDoc, SecurityException JavaDoc {
445         try {
446             if (principal != null && !(principal instanceof BasicPrincipal)) {
447                 throw new IOException JavaDoc(
448                         "Cannot authenticate with principal of type "
449                         + principal.getClass().getName());
450             }
451             if (principal != null) {
452                 BasicPrincipal basic = (BasicPrincipal) principal;
453                 _out.writeByte(AUTH_BASIC);
454                 _out.writeUTF(basic.getName());
455                 _out.writeUTF(basic.getPassword());
456             } else {
457                 _out.writeByte(AUTH_NONE);
458             }
459             _out.flush();
460             if (_in.readByte() != AUTH_OK) {
461                 throw new SecurityException JavaDoc("Connection refused");
462             }
463         } catch (IOException JavaDoc exception) {
464             // terminate the connection
465
_endpoint.close();
466             throw exception;
467         }
468         _principal = principal;
469     }
470
471     /**
472      * Performs authentication on initial connection.
473      *
474      * @param authenticator the authenticator
475      * @throws IOException for any I/O error
476      * @throws ResourceException if the authenticator cannot authenticate
477      */

478     protected void authenticate(Authenticator authenticator)
479             throws IOException JavaDoc, ResourceException {
480
481         try {
482             Principal JavaDoc principal = null;
483             byte type = _in.readByte();
484
485             switch (type) {
486                 case AUTH_BASIC:
487                     String JavaDoc name = _in.readUTF();
488                     String JavaDoc password = _in.readUTF();
489                     principal = new BasicPrincipal(name, password);
490                     break;
491                 case AUTH_NONE:
492                     break;
493                 default:
494                     throw new IOException JavaDoc("Invalid packet type: " + type);
495             }
496             if (authenticator.authenticate(principal)) {
497                 _out.writeByte(AUTH_OK);
498                 _out.flush();
499             } else {
500                 _out.writeByte(AUTH_DENIED);
501                 _out.flush();
502                 throw new SecurityException JavaDoc("User " + principal
503                                              + " unauthorised");
504             }
505             _principal = principal;
506         } catch (IOException JavaDoc exception) {
507             // terminate the connection
508
_endpoint.close();
509             throw exception;
510         } catch (ResourceException exception) {
511             // terminate the connection
512
_endpoint.close();
513             throw exception;
514         }
515     }
516
517     /**
518      * Opens a new channel.
519      *
520      * @return a new channel
521      * @throws IOException if a channel can't be opened
522      */

523     protected Channel open() throws IOException JavaDoc {
524         Channel channel;
525         int channelId;
526         synchronized (_channels) {
527             channelId = getNextChannelId();
528             channel = addChannel(channelId);
529         }
530
531         send(OPEN, channelId);
532         return channel;
533     }
534
535     /**
536      * Read a packet from the endpoint.
537      */

538     private void multiplex() {
539         try {
540             byte type = _in.readByte();
541             switch (type) {
542                 case OPEN:
543                     handleOpen();
544                     break;
545                 case CLOSE:
546                     handleClose();
547                     break;
548                 case REQUEST:
549                     handleRequest();
550                     break;
551                 case RESPONSE:
552                     handleResponse();
553                     break;
554                 case DATA:
555                     handleData();
556                     break;
557                 case PING_REQUEST:
558                     handlePingRequest();
559                     break;
560                 case PING_RESPONSE:
561                     handlePingResponse();
562                     break;
563                 case FLOW_READ:
564                     handleFlowRead();
565                     break;
566                 case SHUTDOWN:
567                     handleShutdown();
568                     break;
569                 default:
570                     throw new IOException JavaDoc("Unrecognised message type: "
571                                           + type);
572             }
573         } catch (Exception JavaDoc exception) {
574             boolean closed = _closed;
575             shutdown();
576             if (!closed) {
577                 _log.debug("Multiplexer shutting down on error", exception);
578                 // error notify the listener
579
_listener.error(exception);
580             }
581         }
582     }
583
584     /**
585      * Shuts down the multiplexer.
586      */

587     private void shutdown() {
588         // mark this as closed
589
_closed = true;
590
591         // notify the channels
592
Channel[] channels;
593         synchronized (_channels) {
594             channels = (Channel[]) _channels.values().toArray(new Channel[0]);
595         }
596         for (int i = 0; i < channels.length; ++i) {
597             channels[i].disconnected();
598         }
599     }
600
601     /**
602      * Open a new channel.
603      *
604      * @throws IOException for any error
605      */

606     private void handleOpen() throws IOException JavaDoc {
607         int channelId = _in.readUnsignedShort();
608         Integer JavaDoc key = new Integer JavaDoc(channelId);
609
610         synchronized (_channels) {
611             if (_channels.get(key) != null) {
612                 throw new IOException JavaDoc(
613                         "A channel already exists with identifier: " + key);
614             }
615             addChannel(channelId);
616         }
617     }
618
619     /**
620      * Close a channel.
621      *
622      * @throws IOException for any error
623      */

624     private void handleClose() throws IOException JavaDoc {
625         int channelId = _in.readUnsignedShort();
626         Integer JavaDoc key = new Integer JavaDoc(channelId);
627
628         synchronized (_channels) {
629             Channel channel = (Channel) _channels.remove(key);
630             if (channel == null) {
631                 throw new IOException JavaDoc(
632                         "No channel exists with identifier: " + key);
633             }
634             channel.close();
635         }
636     }
637
638     /**
639      * Handle a <code>REQUEST</code> packet.
640      *
641      * @throws IOException if an I/O error occurs, or no channel exists matching
642      * that read from the packet
643      */

644     private void handleRequest() throws IOException JavaDoc {
645         final Channel channel = handleData();
646         Runnable JavaDoc request = new Runnable JavaDoc() {
647             public void run() {
648                 if (_log.isDebugEnabled()) {
649                     _log.debug("handleRequest() [channel="
650                                + channel.getId() + "]");
651                 }
652                 // todo - need to handle closed()
653
_listener.request(channel);
654
655                 if (_log.isDebugEnabled()) {
656                     _log.debug("handleRequest() [channel="
657                                + channel.getId() + "] - end");
658                 }
659             }
660         };
661         try {
662             _pool.execute(request);
663         } catch (InterruptedException JavaDoc exception) {
664             throw new InterruptedIOException JavaDoc(exception.getMessage());
665         }
666     }
667
668     /**
669      * Handle a <code>RESPONSE</code> packet.
670      *
671      * @throws IOException if an I/O error occurs, or no channel exists matching
672      * that read from the packet
673      */

674     private void handleResponse() throws IOException JavaDoc {
675         handleData();
676     }
677
678     /**
679      * Handle a <code>PING_REQUEST</code> packet.
680      *
681      * @throws IOException if an I/O error occurs
682      */

683     private void handlePingRequest() throws IOException JavaDoc {
684         Channel channel = readChannel();
685         channel.handlePingRequest();
686     }
687
688     /**
689      * Handle a <code>PING_RESPONSE</code> packet.
690      *
691      * @throws IOException if an I/O error occurs
692      */

693     private void handlePingResponse() throws IOException JavaDoc {
694         Channel channel = readChannel();
695         channel.handlePingResponse();
696     }
697
698     /**
699      * Handle a <code>DATA</code> packet.
700      *
701      * @return the channel to handle the packet
702      * @throws IOException if an I/O error occurs, or no channel exists matching
703      * that read from the packet
704      */

705     private Channel handleData() throws IOException JavaDoc {
706         Channel channel = readChannel();
707         int length = _in.readInt();
708         channel.getMultiplexInputStream().receive(_in, length);
709         return channel;
710     }
711
712     /**
713      * Handle a <code>FLOW_READ</code> packet.
714      *
715      * @throws IOException if an I/O error occurs
716      */

717     private void handleFlowRead() throws IOException JavaDoc {
718         Channel channel = readChannel();
719         int read = _in.readInt();
720         channel.getMultiplexOutputStream().notifyRead(read);
721     }
722
723     /**
724      * Handle a <code>SHUTDOWN</code> packet.
725      */

726     private void handleShutdown() {
727         shutdown();
728         _listener.closed();
729     }
730
731     /**
732      * Adds a new channel.
733      * <p/>
734      * NOTE: Must be invoked with <code>_channels</code> synchronized
735      *
736      * @param channelId the channel identifier
737      * @return the new channel
738      */

739     private Channel addChannel(int channelId) {
740         int size = BUFFER_SIZE;
741         MultiplexOutputStream out =
742                 new MultiplexOutputStream(channelId, this, size, size);
743         MultiplexInputStream in =
744                 new MultiplexInputStream(channelId, this, size);
745         Channel channel = new Channel(channelId, this, in, out);
746         _channels.put(new Integer JavaDoc(channelId), channel);
747         return channel;
748     }
749
750     /**
751      * Reads the channel identifier from the stream and returns the
752      * corresponding channel.
753      *
754      * @return the channel corresponding to the read channel identifier
755      * @throws IOException for any I/O error, or if there is no corresponding
756      * channel
757      */

758     private Channel readChannel() throws IOException JavaDoc {
759         int channelId = _in.readUnsignedShort();
760         return getChannel(channelId);
761     }
762
763     /**
764      * Returns a channel given its identifier.
765      *
766      * @param channelId the channel identifier
767      * @return the channel corresponding to <code>channelId</code>
768      * @throws IOException if there is no corresponding channel
769      */

770     private Channel getChannel(int channelId) throws IOException JavaDoc {
771         Channel channel;
772         Integer JavaDoc key = new Integer JavaDoc(channelId);
773         synchronized (_channels) {
774             channel = (Channel) _channels.get(key);
775             if (channel == null) {
776                 throw new IOException JavaDoc(
777                         "No channel exists with identifier: " + channelId);
778             }
779         }
780         return channel;
781     }
782
783     /**
784      * Returns the next available channel identifier. Channel identifiers
785      * generated on the client side are in the range 0x0..0x7FFF, on the server
786      * side, 0x8000-0xFFFF
787      * <p/>
788      * NOTE: Must be invoked with <code>_channels</code> synchronized
789      *
790      * @return the next channel identifier
791      * @throws IOException if the connection is closed
792      */

793     private int getNextChannelId() throws IOException JavaDoc {
794         final int mask = 0x7fff;
795         final int serverIdBase = 0x8000;
796         int channelId = 0;
797         while (!_closed) {
798             _seed = (_seed + 1) & mask;
799             channelId = (_client) ? _seed : _seed + serverIdBase;
800             if (!_channels.containsKey(new Integer JavaDoc(channelId))) {
801                 break;
802             }
803         }
804         if (_closed) {
805             throw new IOException JavaDoc("Connection has been closed");
806         }
807         return channelId;
808     }
809
810 }
811
Popular Tags