KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > stream > io > impl > IoSocketHandler


1 // $Id: IoSocketHandler.java 1316 2007-06-10 08:51:18Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006-2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket.stream.io.impl;
23
24 import java.io.IOException JavaDoc;
25 import java.net.InetAddress JavaDoc;
26 import java.nio.ByteBuffer JavaDoc;
27 import java.nio.channels.SelectionKey JavaDoc;
28 import java.nio.channels.SocketChannel JavaDoc;
29 import java.util.Collections JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.LinkedList JavaDoc;
32 import java.util.Map JavaDoc;
33 import java.util.logging.Level JavaDoc;
34 import java.util.logging.Logger JavaDoc;
35
36 import org.xsocket.ByteBufferQueue;
37 import org.xsocket.ClosedConnectionException;
38 import org.xsocket.IDispatcher;
39 import org.xsocket.IHandle;
40 import org.xsocket.DataConverter;
41 import org.xsocket.stream.io.spi.IClientIoProvider;
42 import org.xsocket.stream.io.spi.IIoHandlerCallback;
43 import org.xsocket.stream.io.spi.IIoHandlerContext;
44
45
46
47
48 /**
49  * Socket based io handler
50  *
51  * @author grro@xsocket.org
52  */

53 final class IoSocketHandler extends ChainableIoHandler implements IHandle {
54
55     private static final Logger JavaDoc LOG = Logger.getLogger(IoSocketHandler.class.getName());
56
57
58     private static final int MIN_READ_BUFFER_SIZE = 64; // if smaller than this new buffer has to be allocated
59

60     private static final Map JavaDoc<String JavaDoc ,Class JavaDoc> SUPPORTED_OPTIONS = new HashMap JavaDoc<String JavaDoc, Class JavaDoc>();
61     
62     static {
63         SUPPORTED_OPTIONS.put(IClientIoProvider.SO_RCVBUF, Integer JavaDoc.class);
64         SUPPORTED_OPTIONS.put(IClientIoProvider.SO_SNDBUF, Integer JavaDoc.class);
65         SUPPORTED_OPTIONS.put(IClientIoProvider.SO_REUSEADDR, Boolean JavaDoc.class);
66         SUPPORTED_OPTIONS.put(IClientIoProvider.SO_KEEPALIVE, Boolean JavaDoc.class);
67         SUPPORTED_OPTIONS.put(IClientIoProvider.TCP_NODELAY, Boolean JavaDoc.class);
68         SUPPORTED_OPTIONS.put(IClientIoProvider.SO_LINGER, Integer JavaDoc.class);
69     }
70
71     
72     // flag
73
private boolean isLogicalOpen = true;
74     private boolean isDisconnect = false;
75
76     // socket
77
private SocketChannel JavaDoc channel = null;
78
79
80     // distacher
81
private IoSocketDispatcher dispatcher = null;
82
83
84     // memory management
85
private IMemoryManager memoryManager = null;
86
87
88     // read & write queue
89
private final ByteBufferQueue sendQueue = new ByteBufferQueue();
90     private final ByteBufferQueue receiveQueue = new ByteBufferQueue();
91
92
93     // id
94
private String JavaDoc id = null;
95
96
97     // timeouts
98
private long idleTimeout = Long.MAX_VALUE;
99     private long connectionTimeout = Long.MAX_VALUE;
100
101     
102     // suspend flag
103
private boolean suspendRead = false;
104     
105
106     // statistics
107
private long openTime = -1;
108     private long lastTimeReceived = System.currentTimeMillis();
109 // private long lastTimeSent = System.currentTimeMillis();
110
private long receivedBytes = 0;
111     private long sendBytes = 0;
112
113
114     /**
115      * constructor
116      *
117      * @param channel the underlying channel
118      * @param idLocalPrefix the id namespace prefix
119      * @param dispatcher the dispatcher
120      * @throws IOException If some other I/O error occurs
121      */

122     @SuppressWarnings JavaDoc("unchecked")
123     IoSocketHandler(SocketChannel JavaDoc channel, IoSocketDispatcher dispatcher, IIoHandlerContext ctx, String JavaDoc connectionId) throws IOException JavaDoc {
124         super(null);
125
126         assert (channel != null);
127         this.channel = channel;
128
129         openTime = System.currentTimeMillis();
130
131         channel.configureBlocking(false);
132
133         this.dispatcher = dispatcher;
134         this.id = connectionId;
135     }
136
137
138     public void init(IIoHandlerCallback callbackHandler) throws IOException JavaDoc {
139         setPreviousCallback(callbackHandler);
140         
141         blockUntilIsConnected();
142         dispatcher.register(this, SelectionKey.OP_READ);
143     }
144
145
146
147     void setMemoryManager(IMemoryManager memoryManager) {
148         this.memoryManager = memoryManager;
149     }
150
151     @Override JavaDoc
152     public String JavaDoc getId() {
153         return id;
154     }
155
156
157     /**
158      * {@inheritDoc}
159      */

160     @Override JavaDoc
161     public int getPendingWriteDataSize() {
162         return sendQueue.getSize() + super.getPendingWriteDataSize();
163     }
164
165
166     int getPendingReceiveDataSize() {
167         return receiveQueue.getSize() + super.getPendingReceiveDataSize();
168     }
169
170     
171     /**
172      * {@inheritDoc}
173      */

174     public void setOption(String JavaDoc name, Object JavaDoc value) throws IOException JavaDoc {
175     
176         if (name.equals(IClientIoProvider.SO_SNDBUF)) {
177             channel.socket().setSendBufferSize((Integer JavaDoc) value);
178             
179         } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) {
180             channel.socket().setReuseAddress((Boolean JavaDoc) value);
181
182         } else if (name.equals(IClientIoProvider.SO_RCVBUF)) {
183             channel.socket().setReceiveBufferSize((Integer JavaDoc) value);
184
185         } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) {
186             channel.socket().setKeepAlive((Boolean JavaDoc) value);
187
188         } else if (name.equals(IClientIoProvider.SO_LINGER)) {
189             if (value instanceof Integer JavaDoc) {
190                 channel.socket().setSoLinger(true, (Integer JavaDoc) value);
191             } else if (value instanceof Boolean JavaDoc) {
192                 if (((Boolean JavaDoc) value).equals(Boolean.FALSE)) {
193                     channel.socket().setSoLinger(Boolean.FALSE, 0);
194                 }
195             }
196
197         } else if (name.equals(IClientIoProvider.TCP_NODELAY)) {
198             channel.socket().setTcpNoDelay((Boolean JavaDoc) value);
199
200             
201         } else {
202             LOG.warning("option " + name + " is not supproted for " + this.getClass().getName());
203         }
204     }
205     
206     
207     
208
209     /**
210      * {@inheritDoc}
211      */

212     public Object JavaDoc getOption(String JavaDoc name) throws IOException JavaDoc {
213
214         if (name.equals(IClientIoProvider.SO_SNDBUF)) {
215             return channel.socket().getSendBufferSize();
216             
217         } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) {
218             return channel.socket().getReuseAddress();
219             
220         } else if (name.equals(IClientIoProvider.SO_RCVBUF)) {
221             return channel.socket().getReceiveBufferSize();
222
223         } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) {
224             return channel.socket().getKeepAlive();
225             
226         } else if (name.equals(IClientIoProvider.TCP_NODELAY)) {
227             return channel.socket().getTcpNoDelay();
228
229         } else if (name.equals(IClientIoProvider.SO_LINGER)) {
230             return channel.socket().getSoLinger();
231
232             
233         } else {
234             LOG.warning("option " + name + " is not supproted for " + this.getClass().getName());
235             return null;
236         }
237     }
238     
239     
240     /**
241      * {@inheritDoc}
242      */

243     public Map JavaDoc<String JavaDoc, Class JavaDoc> getOptions() {
244         return Collections.unmodifiableMap(SUPPORTED_OPTIONS);
245     }
246   
247
248     /**
249      * {@inheritDoc}
250      */

251     public void setIdleTimeoutSec(int timeout) {
252         long timeoutMillis = ((long) timeout) * 1000L;
253         this.idleTimeout = timeoutMillis;
254
255         dispatcher.updateTimeoutCheckPeriod((long) timeout * 250L);
256     }
257
258
259     
260
261     /**
262      * sets the connection timout
263      *
264      * @param timeout the connection timeout
265      */

266     public void setConnectionTimeoutSec(int timeout) {
267         long timeoutMillis = ((long) timeout) * 1000L;
268         this.connectionTimeout = timeoutMillis;
269
270         dispatcher.updateTimeoutCheckPeriod((long) timeout * 250L);
271     }
272
273
274     /**
275      * gets the connection timeout
276      *
277      * @return the connection timeout
278      */

279     public int getConnectionTimeoutSec() {
280         return (int) (connectionTimeout / 1000);
281     }
282
283
284     /**
285      * {@inheritDoc}
286      */

287     public int getIdleTimeoutSec() {
288         return (int) (idleTimeout / 1000);
289     }
290
291
292     
293     public int getReceiveQueueSize() {
294         return receiveQueue.getSize();
295     }
296
297
298     int getSendQueueSize() {
299         return sendQueue.getSize();
300     }
301
302     /**
303      * check the timeout
304      *
305      * @param current the current time
306      * @return true, if the connection has been timed out
307      */

308     boolean checkIdleTimeout(Long JavaDoc current) {
309         long maxTime = lastTimeReceived + idleTimeout;
310         if (maxTime < 0) {
311             maxTime = Long.MAX_VALUE;
312         }
313         boolean timeoutReached = maxTime < current;
314         
315         if (timeoutReached) {
316             getPreviousCallback().onIdleTimeout();
317         }
318         return timeoutReached;
319     }
320
321     
322
323     /**
324      * check if the underyling connection is timed out
325      *
326      * @param current the current time
327      * @return true, if the connection has been timed out
328      */

329     void checkConnection() {
330         if (!channel.isOpen()) {
331             getPreviousCallback().onConnectionAbnormalTerminated();
332         }
333     }
334
335
336
337     /**
338      * check if the underyling connection is timed out
339      *
340      * @param current the current time
341      * @return true, if the connection has been timed out
342      */

343     boolean checkConnectionTimeout(Long JavaDoc current) {
344         long maxTime = openTime + connectionTimeout;
345         if (maxTime < 0) {
346             maxTime = Long.MAX_VALUE;
347         }
348         boolean timeoutReached = maxTime < current;
349         if (timeoutReached) {
350             getPreviousCallback().onConnectionTimeout();
351         }
352         return timeoutReached;
353     }
354
355
356     /**
357      * return the size of the read queue
358      *
359      * @return the read queue size
360      */

361     int getIncomingQueueSize() {
362         return receiveQueue.getSize();
363     }
364     
365     
366     void onConnectEvent() throws IOException JavaDoc {
367         getPreviousCallback().onConnect();
368     }
369     
370     
371     void onReadableEvent() throws IOException JavaDoc {
372         assert (IoSocketDispatcher.isDispatcherThread());
373         
374         try {
375             // read data from socket
376
readSocketIntoReceiveQueue();
377             
378             // handle the data
379

380             if (getReceiveQueueSize() > 0) {
381                 getPreviousCallback().onDataRead();
382             }
383                     
384             // increase preallocated read memory if not sufficient
385
checkPreallocatedReadMemory();
386             
387             
388         } catch (ClosedConnectionException ce) {
389             close(false);
390             
391         } catch (Throwable JavaDoc t) {
392             close(false);
393             if (LOG.isLoggable(Level.FINE)) {
394                 LOG.fine("error occured by handling readable event. reason: " + t.toString());
395             }
396         }
397     }
398     
399     
400     
401     void onWriteableEvent() throws IOException JavaDoc {
402         assert (IoSocketDispatcher.isDispatcherThread());
403         
404         if (suspendRead) {
405             updateInterestedSetNonen();
406             
407         } else {
408             updateInterestedSetRead();
409         }
410             
411         // updates interestSet with read & write data to socket
412
try {
413             writeSendQueueDataToSocket();
414                 
415             // notify the io handler that data has been written
416
getPreviousCallback().onWritten();
417         } catch(IOException JavaDoc ioe) {
418             getPreviousCallback().onWriteException(ioe);
419         }
420                 
421             
422         // are there remaining data to send -> announce write demand
423
if (getSendQueueSize() > 0) {
424             getDispatcher().updateInterestSet(this, SelectionKey.OP_WRITE);
425                 
426         // all data send -> check for close
427
} else {
428             if (shouldClosedPhysically()) {
429                 realClose();
430             }
431         }
432     }
433     
434
435     private void blockUntilIsConnected() throws IOException JavaDoc {
436         // check/wait until channel is connected
437
while (!getChannel().finishConnect()) {
438             try {
439                 Thread.sleep(25);
440             } catch (InterruptedException JavaDoc ignore) { }
441         }
442     }
443
444
445     private boolean shouldClosedPhysically() {
446         // close handling (-> close() leads automatically to write)
447
if (!isLogicalOpen) {
448
449             // send queue is emtpy -> close can be completed
450
if (sendQueue.isEmpty()) {
451                 return true;
452             }
453         }
454
455         return false;
456     }
457
458
459     /**
460      * {@inheritDoc}
461      */

462     @SuppressWarnings JavaDoc("unchecked")
463     public void writeOutgoing(ByteBuffer JavaDoc buffer) throws IOException JavaDoc {
464         if (buffer != null) {
465             sendQueue.append(buffer);
466             updateInterestedSetWrite();
467         }
468     }
469
470
471     /**
472      * {@inheritDoc}
473      */

474     @SuppressWarnings JavaDoc("unchecked")
475     public void writeOutgoing(LinkedList JavaDoc<ByteBuffer JavaDoc> buffers) throws IOException JavaDoc {
476         if (buffers != null) {
477             sendQueue.append(buffers);
478             updateInterestedSetWrite();
479         }
480     }
481
482
483     /**
484      * {@inheritDoc}
485      */

486     public LinkedList JavaDoc<ByteBuffer JavaDoc> drainIncoming() {
487         return receiveQueue.drain();
488     }
489
490
491
492     /**
493      * {@inheritDoc}
494      */

495     @SuppressWarnings JavaDoc("unchecked")
496     public void close(boolean immediate) throws IOException JavaDoc {
497         if (immediate || sendQueue.isEmpty()) {
498             realClose();
499
500         } else {
501             if (LOG.isLoggable(Level.FINE)) {
502                 LOG.fine("postpone close until remaning data to write (" + sendQueue.getSize() + ") has been written");
503             }
504             
505             isLogicalOpen = false;
506             updateInterestedSetWrite();
507         }
508     }
509
510     
511     private void realClose() {
512         try {
513             getDispatcher().deregister(this);
514         } catch (Exception JavaDoc e) {
515             if (LOG.isLoggable(Level.FINE)) {
516                 LOG.fine("error occured by deregistering connection " + id + " on dispatcher. reason: " + e.toString());
517             }
518         }
519         
520         try {
521             channel.close();
522             if (LOG.isLoggable(Level.FINE)) {
523                 LOG.fine("connection " + id + " has been closed");
524             }
525         } catch (Exception JavaDoc e) {
526             if (LOG.isLoggable(Level.FINE)) {
527                 LOG.fine("error occured by closing connection " + id + " reason: " + e.toString());
528             }
529         }
530
531             
532         if (!isDisconnect) {
533             isDisconnect = true;
534             getPreviousCallback().onDisconnect();
535         }
536     }
537
538
539
540     void onDispatcherClose() {
541         getPreviousCallback().onConnectionAbnormalTerminated();
542     }
543
544     private void updateInterestedSetWrite() throws ClosedConnectionException {
545         try {
546             dispatcher.updateInterestSet(this, SelectionKey.OP_WRITE);
547         } catch (IOException JavaDoc ioe) {
548             if (LOG.isLoggable(Level.FINE)) {
549                 LOG.fine("couldn't update interested set to write data on socket. Reason: " + ioe.toString());
550             }
551
552             try {
553                 dispatcher.deregister(this);
554             } catch (Exception JavaDoc ignore) { }
555
556             throw new ClosedConnectionException("connection " + id + " is already closed", ioe);
557         }
558     }
559     
560     private void updateInterestedSetRead() throws ClosedConnectionException {
561         try {
562             dispatcher.updateInterestSet(this, SelectionKey.OP_READ);
563         } catch (IOException JavaDoc ioe) {
564             if (LOG.isLoggable(Level.FINE)) {
565                 LOG.fine("couldn't update interested set to read data. Reason: " + ioe.toString());
566             }
567
568             try {
569                 dispatcher.deregister(this);
570             } catch (Exception JavaDoc ignore) { }
571
572             throw new ClosedConnectionException("connection " + id + " is already closed", ioe);
573         }
574     }
575     
576
577
578     
579     private void updateInterestedSetNonen() throws ClosedConnectionException {
580         try {
581             dispatcher.updateInterestSet(this, 0);
582         } catch (IOException JavaDoc ioe) {
583             if (LOG.isLoggable(Level.FINE)) {
584                 LOG.fine("couldn't update interested set to nonen. Reason: " + ioe.toString());
585             }
586
587             try {
588                 dispatcher.deregister(this);
589             } catch (Exception JavaDoc ignore) { }
590
591             throw new ClosedConnectionException("connection " + id + " is already closed", ioe);
592         }
593     }
594
595
596
597
598     /**
599      * {@inheritDoc}
600      */

601     public boolean isOpen() {
602         return channel.isOpen();
603     }
604
605
606
607     /**
608      * return the underlying channel
609      *
610      * @return the underlying channel
611      */

612     public SocketChannel JavaDoc getChannel() {
613         return channel;
614     }
615
616
617     IDispatcher<IoSocketHandler> getDispatcher() {
618         return dispatcher;
619     }
620
621     
622     @Override JavaDoc
623     public void suspendRead() throws IOException JavaDoc {
624         suspendRead = true;
625         updateInterestedSetNonen();
626     }
627     
628     
629     @Override JavaDoc
630     public void resumeRead() throws IOException JavaDoc {
631         suspendRead = false;
632         
633         // update to write (why not read?). Reason:
634
// * avoid race conditions in which current write need will be swallowed
635
// * write falls back to read interested set if no more data to write exist
636
updateInterestedSetWrite();
637     }
638
639     
640     /**
641      * reads socket into read queue
642      *
643      * @return the number of read bytes
644      * @throws IOException If some other I/O error occurs
645      * @throws ClosedConnectionException if the underlying channel is closed
646      */

647     private int readSocketIntoReceiveQueue() throws IOException JavaDoc {
648         assert (IoSocketDispatcher.isDispatcherThread());
649
650         int read = 0;
651         lastTimeReceived = System.currentTimeMillis();
652
653
654         if (isOpen()) {
655
656             assert (memoryManager instanceof UnsynchronizedMemoryManager);
657
658             ByteBuffer JavaDoc readBuffer = memoryManager.acquireMemory(MIN_READ_BUFFER_SIZE);
659             int pos = readBuffer.position();
660             int limit = readBuffer.limit();
661
662             // read from channel
663
try {
664                 read = channel.read( readBuffer);
665             // exception occured while reading
666
} catch (IOException JavaDoc ioe) {
667                 readBuffer.position(pos);
668                 readBuffer.limit(limit);
669                 memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE);
670
671                 if (LOG.isLoggable(Level.FINE)) {
672                     LOG.fine("[" + id + "] error occured while reading channel: " + ioe.toString());
673                 }
674
675                 throw ioe;
676             }
677
678
679             // handle read
680
switch (read) {
681
682                 // end-of-stream has been reached -> throw an exception
683
case -1:
684                     memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE);
685                     if (LOG.isLoggable(Level.FINE)) {
686                         LOG.fine("[" + id + "] channel has reached end-of-stream (maybe closed by peer)");
687                     }
688                     ClosedConnectionException cce = new ClosedConnectionException("[" + id + "] End of stream reached");
689                     throw cce;
690
691                 // no bytes read recycle read buffer and do nothing
692
case 0:
693                     memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE);
694                     break;
695
696                 // bytes available (read < -1 is not handled)
697
default:
698                     int savePos = readBuffer.position();
699                     int saveLimit = readBuffer.limit();
700
701                     readBuffer.position(savePos - read);
702                     readBuffer.limit(savePos);
703
704                     ByteBuffer JavaDoc readData = readBuffer.slice();
705                     receiveQueue.append(readData);
706
707
708                     if (readBuffer.hasRemaining()) {
709                         readBuffer.position(savePos);
710                         readBuffer.limit(saveLimit);
711                         memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE);
712                     }
713                     if (LOG.isLoggable(Level.FINE)) {
714                         LOG.fine("[" + id + "] received (" + (readData.limit() - readData.position()) + " bytes, total " + (receivedBytes + read) + " bytes): " + DataConverter.toTextOrHexString(new ByteBuffer JavaDoc[] {readData.duplicate() }, "UTF-8", 500));
715                     }
716                     break;
717             }
718         }
719
720
721         receivedBytes += read;
722
723         return read;
724     }
725
726
727     /**
728      * check if preallocated read buffer size is sufficient. if not increaese it
729      */

730     private void checkPreallocatedReadMemory() {
731         assert (IoSocketDispatcher.isDispatcherThread());
732
733         memoryManager.preallocate(MIN_READ_BUFFER_SIZE);
734     }
735
736     /**
737      * writes the content of the send queue to the socket
738      *
739      * @throws IOException If some other I/O error occurs
740      * @throws ClosedConnectionException if the underlying channel is closed
741      */

742     @SuppressWarnings JavaDoc("unchecked")
743     private void writeSendQueueDataToSocket() throws IOException JavaDoc {
744         assert (IoSocketDispatcher.isDispatcherThread());
745
746
747         ////////////////////////////////////////////////////////////
748
// Why hasn't channel.write(ByteBuffer[]) been used??
749
//
750
// sendBytes += channel.write(data.toArray(new ByteBuffer[data.size()])) doesn't
751
// work correct under WinXP_SP2 & Sun JDK 1.6.0_01-b06 (and other configurations?).
752
// The channel reports that x bytes have been written, but in some situations duplicated
753
// data appears on the line (caused by the channel impl?!)
754
// This behaviour doesn't appear under Suse9.1/Intel & Sun JDK 1.5.0_08
755
////////////////////////////////////////////////////////////
756

757
758         if (isOpen() && !sendQueue.isEmpty()) {
759             
760 // lastTimeSent = System.currentTimeMillis();
761

762             // write buffer after buffer
763
ByteBuffer JavaDoc buffer = null;
764             
765             do {
766                 buffer = sendQueue.removeFirst();
767
768                 if (buffer != null) {
769                     int writeSize = buffer.remaining();
770
771                     if (writeSize > 0) {
772                         if (LOG.isLoggable(Level.FINE)) {
773                             if (LOG.isLoggable(Level.FINE)) {
774                                 LOG.fine("[" + id + "] sending (" + writeSize + " bytes): " + DataConverter.toTextOrHexString(buffer.duplicate(), "UTF-8", 500));
775                             }
776                         }
777
778                         // write to socket (internal out buffer)
779
int written = channel.write(buffer);
780                         sendBytes += written;
781
782
783                         // not all data written?
784
if (written != writeSize) {
785                             if (LOG.isLoggable(Level.FINE)) {
786                                 LOG.fine("[" + id + "] " + written + " of " + (writeSize - written) + " bytes has been sent. initiate sending of the remaining (total sent " + sendBytes + " bytes)");
787                             }
788
789                             sendQueue.addFirst(buffer);
790                             updateInterestedSetWrite();
791                             break;
792                         }
793                     }
794                 }
795                 
796             } while (buffer != null);
797         }
798     }
799
800
801     /**
802      * {@inheritDoc}
803      */

804     @Override JavaDoc
805     public final InetAddress JavaDoc getLocalAddress() {
806         return channel.socket().getLocalAddress();
807     }
808
809
810     /**
811      * {@inheritDoc}
812      */

813     @Override JavaDoc
814     public final int getLocalPort() {
815         return channel.socket().getLocalPort();
816     }
817
818     
819     /**
820      * {@inheritDoc}
821      */

822     @Override JavaDoc
823     public final InetAddress JavaDoc getRemoteAddress() {
824         return channel.socket().getInetAddress();
825     }
826
827
828     /**
829      * {@inheritDoc}
830      */

831     @Override JavaDoc
832     public final int getRemotePort() {
833         return channel.socket().getPort();
834     }
835
836
837     /**
838      * {@inheritDoc}
839      */

840     public void flushOutgoing() {
841
842     }
843
844
845     /**
846      * {@inheritDoc}
847      */

848     @Override JavaDoc
849     public String JavaDoc toString() {
850         try {
851             return "(" + channel.socket().getInetAddress().toString() + ":" + channel.socket().getPort()
852                    + " -> " + channel.socket().getLocalAddress().toString() + ":" + channel.socket().getLocalPort() + ")"
853                    + " received=" + DataConverter.toFormatedBytesSize(receivedBytes)
854                    + ", sent=" + DataConverter.toFormatedBytesSize(sendBytes)
855                    + ", age=" + DataConverter.toFormatedDuration(System.currentTimeMillis() - openTime)
856                    + ", lastReceived=" + DataConverter.toFormatedDate(lastTimeReceived)
857                    + ", sendQueueSize=" + DataConverter.toFormatedBytesSize(sendQueue.getSize())
858                    + " [" + id + "]";
859         } catch (Exception JavaDoc e) {
860             return super.toString();
861         }
862     }
863 }
Popular Tags