KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > stream > Connection


1 // $Id: Connection.java 1568 2007-07-26 06:47:32Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket.stream;
23
24 import java.io.IOException JavaDoc;
25 import java.io.UnsupportedEncodingException JavaDoc;
26 import java.net.InetAddress JavaDoc;
27 import java.net.InetSocketAddress JavaDoc;
28 import java.net.SocketOptions JavaDoc;
29 import java.nio.BufferUnderflowException JavaDoc;
30 import java.nio.ByteBuffer JavaDoc;
31 import java.nio.channels.WritableByteChannel JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.LinkedList JavaDoc;
34 import java.util.Map JavaDoc;
35 import java.util.logging.Level JavaDoc;
36 import java.util.logging.Logger JavaDoc;
37
38 import javax.net.ssl.SSLContext;
39
40
41 import org.xsocket.ByteBufferQueue;
42 import org.xsocket.ClosedConnectionException;
43 import org.xsocket.DataConverter;
44 import org.xsocket.MaxReadSizeExceededException;
45 import org.xsocket.stream.ByteBufferParser.Index;
46 import org.xsocket.stream.io.impl.IoProvider;
47 import org.xsocket.stream.io.spi.IClientIoProvider;
48 import org.xsocket.stream.io.spi.IHandlerIoProvider;
49 import org.xsocket.stream.io.spi.IIoHandler;
50 import org.xsocket.stream.io.spi.IIoHandlerCallback;
51 import org.xsocket.stream.io.spi.IIoHandlerContext;
52
53
54
55 /**
56  * Implementation base of the <code>IConnection</code> interface.
57  *
58  *
59  * @author grro@xsocket.org
60  */

61 abstract class Connection implements IConnection {
62
63     private static final Logger JavaDoc LOG = Logger.getLogger(Connection.class.getName());
64
65     static final FlushMode INITIAL_FLUSH_MODE = FlushMode.SYNC;
66
67     private static final long SEND_TIMEOUT = 60 * 1000;
68
69
70     // parser
71
private static final ByteBufferParser PARSER = new ByteBufferParser();
72
73
74     // io handler
75
static final IoProvider DEFAULT_CLIENT_IO_PROVIDER = new IoProvider(); // will be removed
76
private static IClientIoProvider clientIoProvider = null;
77     private IHandlerIoProvider ioProvider = null;
78
79     
80     // closed flag
81
private boolean isClosed = false;
82
83     // read & write queue
84
private final ByteBufferQueue writeQueue = new ByteBufferQueue();
85     private final ByteBufferQueue readQueue = new ByteBufferQueue();
86
87
88     // io handler
89
private IIoHandler ioHandler = null;
90     private IIoHandlerContext ioHandlerCtx = null;
91
92
93     // encoding
94
private String JavaDoc defaultEncoding = INITIAL_DEFAULT_ENCODING;
95
96     // autoflush
97
private boolean autoflush = INITIAL_AUTOFLUSH;
98
99     // flush mode
100
private FlushMode flushmode = INITIAL_FLUSH_MODE;
101
102     // index for extract method
103
private Index cachedIndex = null;
104
105     // attachment
106
private Object JavaDoc attachment = null;
107
108
109     // write thread handling
110
private Object JavaDoc writeGuard = new Object JavaDoc();
111     private IOException JavaDoc writeException = null;
112
113
114     // mark support
115
private LinkedList JavaDoc<ByteBuffer JavaDoc> readMarkBuffer = null;
116     private boolean isReadMarked = false;
117
118     private WriteMarkBuffer writeMarkBuffer = null;
119     private boolean isWriteMarked = false;
120
121
122     // timout
123
private boolean idleTimeoutOccured = false;
124     private boolean connectionTimeoutOccured = false;
125
126
127
128     static {
129         // IoHandlerManager
130
String JavaDoc clientIoManagerClassname = System.getProperty(IClientIoProvider.PROVIDER_CLASSNAME_KEY, IoProvider.class.getName());
131         try {
132             Class JavaDoc clientIoManagerClass = Class.forName(clientIoManagerClassname);
133             clientIoProvider = (IClientIoProvider) clientIoManagerClass.newInstance();
134         } catch (Exception JavaDoc e) {
135             LOG.warning("error occured by creating ClientIoManager " + clientIoManagerClassname + ": " + e.toString());
136             LOG.info("using default ClientIoManager " + DEFAULT_CLIENT_IO_PROVIDER.getClass().getName());
137             clientIoProvider = DEFAULT_CLIENT_IO_PROVIDER;
138         }
139     }
140
141
142     /**
143      * {@inheritDoc}
144      */

145     public final int getPendingWriteDataSize() {
146         return writeQueue.getSize() + ioHandler.getPendingWriteDataSize();
147     }
148
149
150     /**
151      * client-side constructor
152      *
153      */

154     Connection(IIoHandlerContext ioHandlerCtx, InetSocketAddress JavaDoc remoteAddress, Map JavaDoc<String JavaDoc ,Object JavaDoc> options, SSLContext sslContext, boolean sslOn) throws IOException JavaDoc {
155         this.ioHandlerCtx = ioHandlerCtx;
156
157         if (sslContext != null) {
158             ioHandler = ((IoProvider) clientIoProvider).createSSLClientIoHandler(ioHandlerCtx, remoteAddress, options, sslContext, sslOn);
159         } else {
160             ioHandler = clientIoProvider.createClientIoHandler(ioHandlerCtx, remoteAddress, options);
161         }
162         
163         ioProvider = clientIoProvider;
164     }
165
166
167     /**
168      * server-side constructor
169      *
170      */

171     Connection(IIoHandlerContext ioHandlerCtx, IIoHandler ioHandler, IHandlerIoProvider ioProvider) throws IOException JavaDoc {
172         this.ioHandlerCtx = ioHandlerCtx;
173         this.ioHandler = ioHandler;
174         this.ioProvider = ioProvider;
175     }
176
177     
178     protected final IHandlerIoProvider getIoProvider() {
179         return ioProvider;
180     }
181
182
183     protected final IIoHandlerContext getIoHandlerContext() {
184         return ioHandlerCtx;
185     }
186
187     protected final void init() throws IOException JavaDoc {
188         ioHandler.init(new HandlerCallback());
189
190         if (LOG.isLoggable(Level.FINE)) {
191             LOG.fine("connection " + getId() + " created. IoHandler: " + ioHandler.toString());
192         }
193     }
194
195     protected final IIoHandlerContext getioHandlerCtx() {
196         return ioHandlerCtx;
197     }
198
199     protected final void setIoHandler(IIoHandler ioHandler) {
200         this.ioHandler = ioHandler;
201     }
202
203     /**
204      * reset the connection state (e.g. for connection reuse)
205      *
206      */

207     void reset() throws IOException JavaDoc {
208         writeQueue.drain();
209
210         writeGuard = new Object JavaDoc();
211         writeException = null;
212
213         resumeRead();
214         setIdleTimeoutSec(Integer.MAX_VALUE);
215         setConnectionTimeoutSec(Integer.MAX_VALUE);
216         setAutoflush(IBlockingConnection.INITIAL_AUTOFLUSH);
217         setFlushmode(INITIAL_FLUSH_MODE);
218         setDefaultEncoding(IBlockingConnection.INITIAL_DEFAULT_ENCODING);
219         removeReadMark();
220         removeWriteMark();
221         resetCachedIndex();
222         attachment = null;
223         
224         idleTimeoutOccured = false;
225         connectionTimeoutOccured = false;
226
227         ioHandler.drainIncoming();
228         readQueue.drain();
229     }
230
231
232     /**
233      * return the ioHandler
234      * @return the iohandler
235      */

236     protected final IIoHandler getIoHandler() {
237         return ioHandler;
238     }
239
240
241
242     /**
243      * @deprecated
244      */

245     final SocketOptions JavaDoc getSocketOptions() {
246         Map JavaDoc<String JavaDoc, Object JavaDoc> opt = new HashMap JavaDoc<String JavaDoc, Object JavaDoc>();
247         
248         Map JavaDoc<String JavaDoc, Class JavaDoc> setOpts = getOptions();
249         for (String JavaDoc optionName : setOpts.keySet()) {
250             try {
251                 opt.put(optionName, getOption(optionName));
252             } catch (IOException JavaDoc ignore) { };
253         }
254         
255         return StreamSocketConfiguration.fromOptions(opt);
256     }
257
258
259     /**
260      * return the read queue
261      *
262      * @return the read queue
263      */

264     final ByteBufferQueue getReadQueue() {
265         return readQueue;
266     }
267
268
269
270     /**
271      * {@inheritDoc}
272      */

273     public final void close() throws IOException JavaDoc {
274         if (isOpen() && !isClosed) {
275             isClosed = true;
276
277             if (LOG.isLoggable(Level.FINE)) {
278                 LOG.fine("closing connection -> flush all remaining data");
279             }
280
281             flushWriteQueue();
282             ioHandler.close(false);
283         }
284     }
285
286
287     /**
288      * {@inheritDoc}
289      */

290     public final boolean isOpen() {
291         if (isClosed) {
292             return false;
293         } else {
294             return ioHandler.isOpen();
295         }
296     }
297
298     /**
299      * write incoming data into the read buffer
300      *
301      * @param data the data to add
302      */

303     final void writeIncoming(ByteBuffer JavaDoc data) {
304         readQueue.append(data);
305     }
306
307
308     /**
309      * write outgoing data into the write buffer
310      *
311      * @param data the data to add
312      */

313     final void writeOutgoing(ByteBuffer JavaDoc data) {
314         writeQueue.append(data);
315     }
316
317     /**
318      * write outgoing datas into the write buffer
319      *
320      * @param datas the data to add
321      */

322     void writeOutgoing(LinkedList JavaDoc<ByteBuffer JavaDoc> datas) {
323         writeQueue.append(datas);
324     }
325
326
327     /**
328      * {@inheritDoc}
329      */

330     public void suspendRead() throws IOException JavaDoc {
331         ioHandler.suspendRead();
332     }
333     
334     
335     /**
336      * {@inheritDoc}
337      */

338     public void resumeRead() throws IOException JavaDoc {
339         ioHandler.resumeRead();
340     }
341     
342     
343     /**
344      * {@inheritDoc}
345      */

346     public void flush() throws ClosedConnectionException, IOException JavaDoc {
347         if (autoflush) {
348             LOG.warning("flush has been called for a connection which is in autoflush mode (since xSocket V1.1 autoflush is activated by default)");
349         }
350         internalFlush();
351     }
352
353     /**
354      * real flush. This method will only called by framework internal classes
355      *
356      * @throws ClosedConnectionException if the connection has been closed
357      * @throws IOException if some io excpetion occurs
358      */

359     void internalFlush() throws ClosedConnectionException, IOException JavaDoc {
360
361         removeWriteMark();
362         if (flushmode == FlushMode.SYNC) {
363             // flush write queue by using a write guard to wait until the onWrittenEvent has been occured
364
syncFlush();
365             
366         } else {
367             // just flush the queue and return
368
flushWriteQueue();
369         }
370     }
371
372
373     /**
374      * set the flushmode
375      * @param flushMode the flushmode
376      */

377     public final void setFlushmode(FlushMode flushMode) {
378         this.flushmode = flushMode;
379     }
380
381     /**
382      * get the flushmode
383      *
384      * @return the flushmode
385      */

386     public final FlushMode getFlushmode() {
387         return flushmode;
388     }
389
390
391     /**
392      * {@inheritDoc}
393      *
394      */

395     public void setIdleTimeoutSec(int timeoutInSec) {
396         getIoHandler().setIdleTimeoutSec(timeoutInSec);
397     }
398
399     
400     /**
401      * {@inheritDoc}
402      */

403     public void setConnectionTimeoutSec(int timeoutSec) {
404         getIoHandler().setConnectionTimeoutSec(timeoutSec);
405     }
406
407
408     /**
409      * {@inheritDoc}
410      */

411     public int getConnectionTimeoutSec() {
412         return getIoHandler().getConnectionTimeoutSec();
413     }
414
415     
416     
417     /**
418      * {@inheritDoc}
419      *
420      */

421     public int getIdleTimeoutSec() {
422         return getIoHandler().getIdleTimeoutSec();
423     }
424
425
426
427     /**
428      * flush, and wait until data has been written to channel
429      */

430     private void syncFlush() throws ClosedConnectionException, IOException JavaDoc {
431
432         long start = System.currentTimeMillis();
433         long remainingTime = SEND_TIMEOUT;
434
435         synchronized (writeGuard) {
436             flushWriteQueue();
437
438             do {
439                 // all data written?
440
if (ioHandler.getPendingWriteDataSize() == 0) {
441                     return;
442
443                 // write exception occured?
444
} else if(writeException != null) {
445                     IOException JavaDoc ioe = writeException;
446                     writeException = null;
447                     throw ioe;
448
449                 // ... no -> wait
450
} else {
451                     try {
452                         writeGuard.wait(remainingTime);
453                     } catch (InterruptedException JavaDoc ignore) { }
454                 }
455
456                 remainingTime = (start + SEND_TIMEOUT) - System.currentTimeMillis();
457             } while (remainingTime > 0);
458         }
459     }
460
461     private void flushWriteQueue() throws ClosedConnectionException, IOException JavaDoc {
462         if (!writeQueue.isEmpty()) {
463             LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = writeQueue.drain();
464             ioHandler.writeOutgoing(buffers);
465         }
466     }
467
468
469
470
471
472     /**
473      * {@inheritDoc}
474      */

475     public final String JavaDoc getDefaultEncoding() {
476         return defaultEncoding;
477     }
478
479
480     /**
481      * {@inheritDoc}
482      */

483     public final void setDefaultEncoding(String JavaDoc defaultEncoding) {
484         this.defaultEncoding = defaultEncoding;
485     }
486
487
488     /**
489      * {@inheritDoc}
490      */

491     public final void setAutoflush(boolean autoflush) {
492         this.autoflush = autoflush;
493     }
494
495
496     /**
497      * {@inheritDoc}
498      */

499     public final boolean getAutoflush() {
500         return autoflush;
501     }
502
503
504     /**
505      * {@inheritDoc}
506      */

507     public final String JavaDoc getId() {
508         return ioHandler.getId();
509     }
510
511
512     /**
513      * {@inheritDoc}
514      */

515     public final InetAddress JavaDoc getLocalAddress() {
516         return ioHandler.getLocalAddress();
517     }
518
519
520
521     /**
522      * {@inheritDoc}
523      */

524     public final int getLocalPort() {
525         return ioHandler.getLocalPort();
526     }
527
528
529
530     /**
531      * {@inheritDoc}
532      */

533     public final InetAddress JavaDoc getRemoteAddress() {
534         return ioHandler.getRemoteAddress();
535     }
536
537
538
539     /**
540      * {@inheritDoc}
541      */

542     public final int getRemotePort() {
543         return ioHandler.getRemotePort();
544     }
545
546
547
548
549
550     /**
551      * {@inheritDoc}
552      */

553     public void activateSecuredMode() throws IOException JavaDoc {
554
555         boolean isPrestarted = DEFAULT_CLIENT_IO_PROVIDER.preStartSecuredMode(ioHandler);
556
557         if (isPrestarted) {
558             internalFlush();
559             DEFAULT_CLIENT_IO_PROVIDER.startSecuredMode(ioHandler, readQueue.drain());
560         }
561     }
562
563
564     /**
565      * {@inheritDoc}
566      */

567     public final int write(String JavaDoc s) throws ClosedConnectionException, IOException JavaDoc {
568         return write(s, defaultEncoding);
569     }
570
571
572     /**
573      * {@inheritDoc}
574      */

575     public final int write(String JavaDoc s, String JavaDoc encoding) throws ClosedConnectionException, IOException JavaDoc {
576         ByteBuffer JavaDoc buffer = DataConverter.toByteBuffer(s, encoding);
577         return write(buffer);
578     }
579
580     /**
581      * {@inheritDoc}
582      */

583     public final int write(byte b) throws ClosedConnectionException, IOException JavaDoc {
584         ByteBuffer JavaDoc buffer = ByteBuffer.allocate(1).put(b);
585         buffer.flip();
586         return write(buffer);
587     }
588
589
590     /**
591      * {@inheritDoc}
592      */

593     public final int write(byte... bytes) throws ClosedConnectionException, IOException JavaDoc {
594         return write(ByteBuffer.wrap(bytes));
595     }
596
597
598     /**
599      * {@inheritDoc}
600      */

601     public final int write(byte[] bytes, int offset, int length) throws ClosedConnectionException, IOException JavaDoc {
602         return write(ByteBuffer.wrap(bytes, offset, length));
603     }
604
605
606     /**
607      * {@inheritDoc}
608      */

609     public final long write(ByteBuffer JavaDoc[] buffers) throws ClosedConnectionException, IOException JavaDoc {
610         if (isOpen()) {
611             long written = 0;
612             for (ByteBuffer JavaDoc buffer : buffers) {
613                 written += buffer.limit() - buffer.position();
614             }
615
616
617             if (isWriteMarked) {
618                 for (ByteBuffer JavaDoc buffer : buffers) {
619                     writeMarkBuffer.add(buffer);
620                 }
621
622             } else {
623                 for (ByteBuffer JavaDoc buffer : buffers) {
624                     writeQueue.append(buffer);
625                 }
626             }
627
628
629             if (autoflush) {
630                 internalFlush();
631             }
632
633             return written;
634
635         } else {
636             throw new ClosedConnectionException("connection " + getId() + " is already closed");
637         }
638     }
639
640
641     /**
642      * {@inheritDoc}
643      */

644     public final int write(ByteBuffer JavaDoc buffer) throws ClosedConnectionException, IOException JavaDoc {
645         if (isOpen()) {
646             int written = buffer.limit() - buffer.position();
647
648             if (isWriteMarked) {
649                 writeMarkBuffer.add(buffer);
650
651             } else {
652                 writeQueue.append(buffer);
653             }
654
655
656             if (autoflush) {
657                 internalFlush();
658             }
659
660             return written;
661         } else {
662             throw new ClosedConnectionException("connection " + getId() + " is already closed");
663         }
664
665     }
666
667
668
669     /**
670      * {@inheritDoc}
671      */

672     public final int write(int i) throws ClosedConnectionException, IOException JavaDoc {
673         ByteBuffer JavaDoc buffer = ByteBuffer.allocate(4).putInt(i);
674         buffer.flip();
675         return (int) write(buffer);
676     }
677
678     
679     /**
680      * {@inheritDoc}
681      */

682     public final int write(short s) throws ClosedConnectionException, IOException JavaDoc {
683         ByteBuffer JavaDoc buffer = ByteBuffer.allocate(2).putShort(s);
684         buffer.flip();
685         return (int) write(buffer);
686     }
687
688     
689     
690     /**
691      * {@inheritDoc}
692      */

693     public final int write(long l) throws ClosedConnectionException, IOException JavaDoc {
694         ByteBuffer JavaDoc buffer = ByteBuffer.allocate(8).putLong(l);
695         buffer.flip();
696         return (int) write(buffer);
697     }
698
699
700     /**
701      * {@inheritDoc}
702      */

703     public final int write(double d) throws ClosedConnectionException, IOException JavaDoc {
704         ByteBuffer JavaDoc buffer = ByteBuffer.allocate(8).putDouble(d);
705         buffer.flip();
706         return (int) write(buffer);
707     }
708
709
710     /**
711      * {@inheritDoc}
712      */

713     public final long write(ByteBuffer JavaDoc[] srcs, int offset, int length) throws IOException JavaDoc {
714         ByteBuffer JavaDoc[] bufs = new ByteBuffer JavaDoc[length];
715         System.arraycopy(srcs, offset, bufs, 0, length);
716
717         return write(bufs);
718     }
719
720
721     /**
722      * extract all bytes from the queue
723      *
724      * @return all bytes of the queue
725      */

726     protected final LinkedList JavaDoc<ByteBuffer JavaDoc> extractAvailableFromReadQueue() {
727         resetCachedIndex();
728
729         LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = readQueue.drain();
730         onExtracted(buffers);
731
732         return buffers;
733     }
734
735     /**
736      * extract bytes by using a delimiter
737      *
738      * @param delimiter the delimiter
739      * @param maxLength the max length of bytes that should be read. If the limit will be exceeded a MaxReadSizeExceededException will been thrown
740      * @return the extracted data
741      * @throws IOException If some other I/O error occurs
742      * @throws BufferUnderflowException if the buffer's limit has been reached
743      * @throws MaxReadSizeExceededException If the max read length has been exceeded and the delimiter hasn�t been found */

744     protected final LinkedList JavaDoc<ByteBuffer JavaDoc> extractBytesByDelimiterFromReadQueue(byte[] delimiter, int maxLength) throws IOException JavaDoc, BufferUnderflowException JavaDoc, MaxReadSizeExceededException {
745
746         if (!readQueue.isEmpty()) {
747             LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = readQueue.drain();
748             assert (buffers != null);
749
750             ByteBufferParser.Index index = scanByDelimiter(buffers, delimiter);
751
752
753             // max Limit exceeded?
754
if (index.getReadBytes() > maxLength) {
755                 throw new MaxReadSizeExceededException();
756             }
757
758             // index found?
759
if (index.hasDelimiterFound()) {
760                 // delimiter found
761
LinkedList JavaDoc<ByteBuffer JavaDoc> extracted = PARSER.extract(buffers, index);
762                 onExtracted(extracted, delimiter);
763
764                 readQueue.addFirstSilence(buffers);
765                 resetCachedIndex();
766
767                 return extracted;
768
769             // .. no -> return buffer
770
} else {
771                 readQueue.addFirstSilence(buffers);
772                 cachedIndex = index;
773             }
774         }
775
776         
777         if (isOpen()) {
778             throw new BufferUnderflowException JavaDoc();
779             
780         } else {
781             // remaining reveived data of underlying io handler available?
782
int read = retrieveIoHandlerData();
783             
784             // got data? -> retry
785
if (read > 0) {
786                 return extractBytesByDelimiterFromReadQueue(delimiter, maxLength);
787                 
788             // .. no? -> throw exception
789
} else {
790                 throw new ClosedConnectionException("connection " + getId() + " is already closed");
791             }
792         }
793     }
794
795
796     /**
797      * Returns the index within this string of the first occurrence of the specified substring
798      *
799      * @param str any string
800      * @return if the string argument occurs as a substring within this object, then the
801      * index of the first character of the first such substring is returned;
802      * if it does not occur as a substring, -1 is returned.
803      *
804      * @deprecated uses getIndexOf instead
805      */

806     public int indexOf(String JavaDoc str) {
807
808         int length = 0;
809
810         if (!readQueue.isEmpty()) {
811             try {
812                 LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = readQueue.drain();
813                 ByteBufferParser.Index index = scanByDelimiter(buffers, str.getBytes(getDefaultEncoding()));
814
815                 // index found?
816
if (index.hasDelimiterFound()) {
817                     length = index.getReadBytes() - str.length();
818
819                 // .. no
820
} else {
821                     length = -1;
822                 }
823
824                 readQueue.addFirstSilence(buffers);
825                 cachedIndex = index;
826             } catch (UnsupportedEncodingException JavaDoc uce) {
827                 throw new RuntimeException JavaDoc(uce);
828             }
829         }
830
831         return length;
832     }
833
834
835
836     protected final int readIndexOf(byte[] bytes, int maxReadSize) throws IOException JavaDoc, BufferUnderflowException JavaDoc, MaxReadSizeExceededException {
837
838         int length = 0;
839
840         if (!readQueue.isEmpty()) {
841             LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = readQueue.drain();
842             ByteBufferParser.Index index = scanByDelimiter(buffers, bytes);
843
844             // index found?
845
if (index.hasDelimiterFound()) {
846                 length = index.getReadBytes() - bytes.length;
847
848             // .. no
849
} else {
850                 length = -1;
851             }
852
853             readQueue.addFirstSilence(buffers);
854             cachedIndex = index;
855         } else {
856             length = -1;
857         }
858
859         if (length < 0) {
860             if (readQueue.getSize() >= maxReadSize) {
861                 throw new MaxReadSizeExceededException();
862             }
863
864             if (isOpen()) {
865                 throw new BufferUnderflowException JavaDoc();
866             } else {
867                 throw new ClosedConnectionException("connection " + getId() + " is already closed");
868             }
869         }
870
871         return length;
872     }
873
874
875     /**
876      * extracts bytes by using
877      *
878      * @param length the number of bytes to extract
879      * @return the exctracted data
880      * @throws IOException If some other I/O error occurs
881      * @throws BufferUnderflowException if the buffer's limit has been reached
882      */

883     protected final LinkedList JavaDoc<ByteBuffer JavaDoc> extractBytesByLength(int length) throws IOException JavaDoc, BufferUnderflowException JavaDoc {
884
885         if (length == 0) {
886             return new LinkedList JavaDoc<ByteBuffer JavaDoc>();
887         }
888
889         // enough data?
890
if (readQueue.getSize() >= length) {
891             LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = readQueue.drain();
892             assert (buffers != null);
893
894             LinkedList JavaDoc<ByteBuffer JavaDoc> extracted = PARSER.extract(buffers, length);
895             onExtracted(extracted);
896
897             readQueue.addFirstSilence(buffers);
898             resetCachedIndex();
899
900             return extracted;
901
902         // .. no
903
} else {
904             if (isOpen()) {
905                 throw new BufferUnderflowException JavaDoc();
906                 
907             } else {
908                 // remaining reveived data of underlying io handler available?
909
int read = retrieveIoHandlerData();
910                 
911                 // got data? -> retry
912
if (read > 0) {
913                     return extractBytesByLength(length);
914                     
915                 // .. no? -> throw exception
916
} else {
917                     throw new ClosedConnectionException("connection " + getId() + " is already closed");
918                 }
919             }
920         }
921     }
922
923
924
925     /**
926      * extract available bytes from the queue by using a delimiter
927      *
928      * @param delimiter the delimiter
929      * @param outChannel the channel to write in
930      * @return true if the delimiter has been found
931      * @throws IOException If some other I/O error occurs
932      */

933     @SuppressWarnings JavaDoc("unchecked")
934     protected final boolean extractAvailableFromReadQueue(byte[] delimiter, WritableByteChannel JavaDoc outChannel) throws IOException JavaDoc {
935
936         if (!readQueue.isEmpty()) {
937             LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = readQueue.drain();
938             assert (buffers != null);
939
940             ByteBufferParser.Index index = scanByDelimiter(buffers, delimiter);
941
942
943             // delimiter found?
944
if (index.hasDelimiterFound()) {
945                 LinkedList JavaDoc<ByteBuffer JavaDoc> extracted = PARSER.extract(buffers, index);
946                 onExtracted(extracted, delimiter);
947                 for (ByteBuffer JavaDoc buffer : extracted) {
948                     outChannel.write(buffer);
949                 }
950
951                 readQueue.addFirstSilence(buffers);
952                 resetCachedIndex();
953                 return true;
954
955             // delimiter not found
956
} else {
957                 // read only if not part of delimiter has been detected
958
if (index.getDelimiterPos() == 0) {
959                     int readBytes = index.getReadBytes();
960                     if (readBytes > 0) {
961                         int availableBytes = readBytes - index.getDelimiterPos();
962                         if (availableBytes > 0) {
963                             LinkedList JavaDoc<ByteBuffer JavaDoc> extracted = PARSER.extract(buffers, availableBytes);
964                             onExtracted(extracted);
965                             for (ByteBuffer JavaDoc buffer : extracted) {
966                                 outChannel.write(buffer);
967                             }
968
969                             resetCachedIndex();
970                         }
971                     }
972                 }
973
974                 readQueue.addFirstSilence(buffers);
975                 return false;
976             }
977
978         } else {
979             return false;
980         }
981     }
982
983
984     private ByteBufferParser.Index scanByDelimiter(LinkedList JavaDoc<ByteBuffer JavaDoc> buffers, byte[] delimiter) {
985
986         // does index already exists (-> former scan)
987
if (cachedIndex != null) {
988             if (cachedIndex.isDelimiterEquals(delimiter)) {
989                 return PARSER.find(buffers, cachedIndex);
990             } else {
991                 cachedIndex = null;
992             }
993         }
994
995         return PARSER.find(buffers, delimiter);
996     }
997
998
999     private void onExtracted(LinkedList JavaDoc<ByteBuffer JavaDoc> buffers) {
1000        if (isReadMarked) {
1001            for (ByteBuffer JavaDoc buffer : buffers) {
1002                onExtracted(buffer);
1003            }
1004        }
1005    }
1006
1007
1008    private void onExtracted(LinkedList JavaDoc<ByteBuffer JavaDoc> buffers, byte[] delimiter) {
1009        if (isReadMarked) {
1010            for (ByteBuffer JavaDoc buffer : buffers) {
1011                onExtracted(buffer);
1012            }
1013            onExtracted(ByteBuffer.wrap(delimiter));
1014        }
1015    }
1016
1017    private void onExtracted(ByteBuffer JavaDoc buffer) {
1018        if (isReadMarked) {
1019            readMarkBuffer.addLast(buffer.duplicate());
1020        }
1021    }
1022
1023
1024
1025
1026    /**
1027     * extract bytes from the queue
1028     *
1029     * @param length the number of bytes to extract
1030     * @return the bytes
1031     * @throws BufferUnderflowException if the buffer's limit has been reached
1032     */

1033    protected final byte[] extractBytesFromReadQueue(int length) throws BufferUnderflowException JavaDoc {
1034        resetCachedIndex();
1035
1036        ByteBuffer JavaDoc buffer = readQueue.read(length);
1037        onExtracted(buffer);
1038        return DataConverter.toBytes(buffer);
1039    }
1040
1041
1042    /**
1043     * extract a int from the queue
1044     *
1045     * @return the int value
1046     * @throws BufferUnderflowException if the buffer's limit has been reached
1047     */

1048    protected final int extractIntFromReadQueue() throws BufferUnderflowException JavaDoc {
1049        resetCachedIndex();
1050
1051        ByteBuffer JavaDoc buffer = readQueue.read(4);
1052        onExtracted(buffer);
1053        return buffer.getInt();
1054    }
1055
1056    
1057    /**
1058     * extract a short from the queue
1059     *
1060     * @return the short value
1061     * @throws BufferUnderflowException if the buffer's limit has been reached
1062     */

1063    protected final short extractShortFromReadQueue() throws BufferUnderflowException JavaDoc {
1064        resetCachedIndex();
1065
1066        ByteBuffer JavaDoc buffer = readQueue.read(2);
1067        onExtracted(buffer);
1068        return buffer.getShort();
1069    }
1070
1071    /**
1072     * extract a byte value from the queue
1073     *
1074     * @return the byte value
1075     * @throws BufferUnderflowException if the buffer's limit has been reached
1076     */

1077    protected final byte extractByteFromReadQueue() throws BufferUnderflowException JavaDoc {
1078        resetCachedIndex();
1079
1080        ByteBuffer JavaDoc buffer = readQueue.read(1);
1081        onExtracted(buffer);
1082
1083        return buffer.get();
1084    }
1085
1086
1087    /**
1088     * extract a double value from the queue
1089     *
1090     * @return the double value
1091     * @throws BufferUnderflowException if the buffer's limit has been reached
1092     */

1093    protected final double extractDoubleFromReadQueue() throws BufferUnderflowException JavaDoc {
1094        resetCachedIndex();
1095
1096        ByteBuffer JavaDoc buffer = readQueue.read(8);
1097        onExtracted(buffer);
1098
1099        return buffer.getDouble();
1100    }
1101
1102
1103
1104    /**
1105     * extract a long value from the queue
1106     *
1107     * @return the long value
1108     * @throws BufferUnderflowException if the buffer's limit has been reached
1109     */

1110    protected final long extractLongFromReadQueue() throws BufferUnderflowException JavaDoc {
1111        resetCachedIndex();
1112
1113        ByteBuffer JavaDoc buffer = readQueue.read(8);
1114        onExtracted(buffer);
1115
1116        return buffer.getLong();
1117    }
1118
1119
1120    private void resetCachedIndex() {
1121        cachedIndex = null;
1122    }
1123
1124
1125
1126    /**
1127     * {@inheritDoc}
1128     */

1129    public final Object JavaDoc attach(Object JavaDoc obj) {
1130        Object JavaDoc old = attachment;
1131        attachment = obj;
1132        return old;
1133    }
1134
1135
1136    /**
1137     * {@inheritDoc}
1138     */

1139    public final Object JavaDoc attachment() {
1140        return attachment;
1141    }
1142
1143
1144    /**
1145     * {@inheritDoc}
1146     */

1147    public void markReadPosition() {
1148        removeReadMark();
1149
1150        isReadMarked = true;
1151        readMarkBuffer = new LinkedList JavaDoc<ByteBuffer JavaDoc>();
1152    }
1153
1154
1155    /**
1156     * {@inheritDoc}
1157     */

1158    public void markWritePosition() {
1159        if (getAutoflush()) {
1160            throw new UnsupportedOperationException JavaDoc("write mark is only supported for mode autoflush off");
1161        }
1162        removeWriteMark();
1163
1164        isWriteMarked = true;
1165        writeMarkBuffer = new WriteMarkBuffer();
1166    }
1167
1168
1169    /**
1170     * {@inheritDoc}
1171     */

1172    public boolean resetToWriteMark() {
1173        if (isWriteMarked) {
1174            writeMarkBuffer.resetWritePosition();
1175            return true;
1176
1177        } else {
1178            return false;
1179        }
1180    }
1181
1182
1183
1184    /**
1185     * {@inheritDoc}
1186     */

1187    public boolean resetToReadMark() {
1188        if (isReadMarked) {
1189            getReadQueue().addFirstSilence(readMarkBuffer);
1190            removeReadMark();
1191            return true;
1192
1193        } else {
1194            return false;
1195        }
1196    }
1197
1198
1199    /**
1200     * {@inheritDoc}
1201     */

1202    public void removeReadMark() {
1203        isReadMarked = false;
1204        readMarkBuffer = null;
1205    }
1206
1207
1208    /**
1209     * {@inheritDoc}
1210     */

1211    public void removeWriteMark() {
1212        if (isWriteMarked) {
1213            isWriteMarked = false;
1214            writeQueue.append(writeMarkBuffer.drain());
1215            writeMarkBuffer = null;
1216        }
1217    }
1218
1219
1220    protected int onDataEvent() {
1221        return retrieveIoHandlerData();
1222    }
1223    
1224    
1225    private int retrieveIoHandlerData() {
1226
1227        try {
1228
1229            LinkedList JavaDoc<ByteBuffer JavaDoc> buffers = getIoHandler().drainIncoming();
1230
1231            int addSize = 0;
1232            for (ByteBuffer JavaDoc buffer : buffers) {
1233                addSize += buffer.remaining();
1234            }
1235
1236
1237            if (addSize > 0) {
1238                getReadQueue().append(buffers);
1239            }
1240
1241            return addSize;
1242
1243        } catch (RuntimeException JavaDoc e) {
1244            if (LOG.isLoggable(Level.FINE)) {
1245                LOG.fine("error occured by transfering data to connection's read queue " + e.toString());
1246            }
1247            throw e;
1248        }
1249    }
1250
1251
1252
1253    protected void onConnectEvent() {
1254
1255    }
1256
1257
1258    protected void onDisconnectEvent() {
1259
1260    }
1261
1262    protected boolean onConnectionTimeoutEvent() {
1263        return false;
1264    }
1265
1266    protected boolean onIdleTimeoutEvent() {
1267        return false;
1268    }
1269
1270
1271    
1272    public void onWritten() {
1273        if (flushmode == FlushMode.SYNC) {
1274            synchronized (writeGuard) {
1275                writeGuard.notifyAll();
1276            }
1277        }
1278    }
1279    
1280    
1281    public void onWriteException(IOException JavaDoc ioException) {
1282        if (flushmode == FlushMode.SYNC) {
1283            synchronized (writeGuard) {
1284                writeException = ioException;
1285                writeGuard.notify();
1286            }
1287        }
1288    }
1289
1290    
1291    /**
1292     * {@inheritDoc}
1293     */

1294    public final Object JavaDoc getOption(String JavaDoc name) throws IOException JavaDoc {
1295        return ioHandler.getOption(name);
1296    }
1297    
1298    
1299    /**
1300     * {@inheritDoc}
1301     */

1302    public final Map JavaDoc<String JavaDoc, Class JavaDoc> getOptions() {
1303        return ioHandler.getOptions();
1304    }
1305
1306    
1307    /**
1308     * {@inheritDoc}
1309     */

1310    public IConnection setOption(String JavaDoc name, Object JavaDoc value) throws IOException JavaDoc {
1311        ioHandler.setOption(name, value);
1312        return this;
1313    }
1314    
1315    
1316    
1317    
1318
1319    private void initiateClose() {
1320        try {
1321            close();
1322        } catch (IOException JavaDoc ioe) {
1323            if (LOG.isLoggable(Level.FINE)) {
1324                LOG.fine("[" + getId() + "] error occured closing connection. Reason: " + ioe.toString());
1325            }
1326        }
1327    }
1328
1329
1330
1331    /**
1332     * {@inheritDoc}
1333     */

1334    @Override JavaDoc
1335    public String JavaDoc toString() {
1336            try {
1337            if (isOpen()) {
1338                return "id=" + getId() + ", remote=" + getRemoteAddress().getCanonicalHostName() + "(" + getRemoteAddress() + ":" + getRemotePort() + ")";
1339            } else {
1340                return "id=" + getId() + " (closed)";
1341            }
1342        } catch (Exception JavaDoc e) {
1343            return super.toString();
1344        }
1345    }
1346
1347
1348
1349
1350    private static final class WriteMarkBuffer {
1351        private final Entry head = new Entry(null, null);
1352        private Entry tail = head;
1353
1354
1355        public LinkedList JavaDoc<ByteBuffer JavaDoc> drain() {
1356            LinkedList JavaDoc<ByteBuffer JavaDoc> result = new LinkedList JavaDoc<ByteBuffer JavaDoc>();
1357
1358            Entry entry = head;
1359            do {
1360                entry = entry.next;
1361                if (entry!= null) {
1362                    result.add(entry.element);
1363                }
1364
1365            } while (entry != null);
1366
1367            head.next = null;
1368            tail = head;
1369
1370            return result;
1371        }
1372
1373        public void add(ByteBuffer JavaDoc data) {
1374            int size = data.remaining();
1375
1376            if (size == 0) {
1377                return;
1378            }
1379
1380
1381            // add entry on tail
1382
Entry entry = new Entry(data, tail.next);
1383            tail.next = entry;
1384            tail = entry;
1385
1386
1387            // tail is not last entry -> reamove written size on leading
1388
while (size > 0) {
1389                if (tail.next != null) {
1390                    int nextSize = tail.next.element.remaining();
1391
1392                    // next size =< (written) size -> remove it
1393
if (nextSize <= size) {
1394                        size = size - nextSize;
1395
1396                        tail.next = tail.next.next;
1397                        if (tail.next == null) {
1398                            break;
1399                        }
1400
1401                    // next size > (written) size -> slice and remove written size
1402
} else {
1403                        ByteBuffer JavaDoc buffer = tail.next.element;
1404                        buffer.position(buffer.position() + size);
1405                        ByteBuffer JavaDoc sliced = buffer.slice();
1406
1407                        Entry slicedEntry = new Entry(sliced, tail.next.next);
1408                        tail.next = slicedEntry;
1409                        break;
1410                    }
1411                } else {
1412                    break;
1413                }
1414            }
1415        }
1416
1417
1418
1419        public void resetWritePosition() {
1420            tail = head;
1421        }
1422    }
1423
1424    private static class Entry {
1425        private ByteBuffer JavaDoc element = null;
1426        private Entry next = null;
1427
1428        Entry(ByteBuffer JavaDoc element, Entry next) {
1429            this.element = element;
1430            this.next = next;
1431        }
1432
1433        @Override JavaDoc
1434        public String JavaDoc toString() {
1435            StringBuilder JavaDoc sb = new StringBuilder JavaDoc();
1436
1437            if (element != null) {
1438                sb.append(DataConverter.toHexString(new ByteBuffer JavaDoc[] {element}, 100000));
1439            }
1440
1441            if (next != null) {
1442                sb.append(next.toString());
1443            }
1444
1445            return sb.toString();
1446        }
1447    }
1448
1449
1450
1451    private final class HandlerCallback implements IIoHandlerCallback {
1452
1453        public void onWritten() {
1454            Connection.this.onWritten();
1455        }
1456        
1457        
1458        public void onWriteException(IOException JavaDoc ioException) {
1459            Connection.this.onWriteException(ioException);
1460        }
1461
1462        public void onDataRead() {
1463            Connection.this.onDataEvent();
1464        }
1465
1466
1467        public void onConnectionAbnormalTerminated() {
1468            Connection.this.initiateClose();
1469        }
1470
1471
1472        public void onConnect() {
1473            Connection.this.onConnectEvent();
1474        }
1475
1476
1477        public void onDisconnect() {
1478            Connection.this.onDisconnectEvent();
1479        }
1480
1481        public void onConnectionTimeout() {
1482            if (!connectionTimeoutOccured) {
1483                connectionTimeoutOccured = true;
1484                boolean isHandled = Connection.this.onConnectionTimeoutEvent();
1485                if (!isHandled) {
1486                    try {
1487                        close();
1488                    } catch (IOException JavaDoc ioe) {
1489                        if (LOG.isLoggable(Level.FINE)) {
1490                            LOG.fine("[" + getId() + "] error occured closing connection caused by connection timeout. Reason: " + ioe.toString());
1491                        }
1492                    }
1493                }
1494            }
1495        }
1496
1497        
1498        public void onIdleTimeout() {
1499            if (!idleTimeoutOccured) {
1500                idleTimeoutOccured = true;
1501
1502                boolean isHandled = Connection.this.onIdleTimeoutEvent();
1503                if (!isHandled) {
1504                    try {
1505                        close();
1506                    } catch (IOException JavaDoc ioe) {
1507                        if (LOG.isLoggable(Level.FINE)) {
1508                            LOG.fine("[" + getId() + "] error occured closing connection caused by idle timeout. Reason: " + ioe.toString());
1509                        }
1510                    }
1511                }
1512            }
1513        }
1514
1515    }
1516}
1517
Popular Tags