KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > coyote > http11 > InternalNioOutputBuffer


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.coyote.http11;
19
20 import java.io.EOFException JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.nio.ByteBuffer JavaDoc;
23 import java.nio.channels.SelectionKey JavaDoc;
24 import java.nio.channels.Selector JavaDoc;
25
26 import org.apache.coyote.ActionCode;
27 import org.apache.coyote.OutputBuffer;
28 import org.apache.coyote.Response;
29 import org.apache.tomcat.util.buf.ByteChunk;
30 import org.apache.tomcat.util.buf.CharChunk;
31 import org.apache.tomcat.util.buf.MessageBytes;
32 import org.apache.tomcat.util.http.HttpMessages;
33 import org.apache.tomcat.util.http.MimeHeaders;
34 import org.apache.tomcat.util.net.NioChannel;
35 import org.apache.tomcat.util.net.NioEndpoint;
36 import org.apache.tomcat.util.net.NioSelectorPool;
37 import org.apache.tomcat.util.res.StringManager;
38
39 /**
40  * Output buffer.
41  *
42  * @author <a HREF="mailto:remm@apache.org">Remy Maucherat</a>
43  * @author Filip Hanik
44  */

45 public class InternalNioOutputBuffer
46     implements OutputBuffer {
47
48
49     // -------------------------------------------------------------- Constants
50

51
52     // ----------------------------------------------------------- Constructors
53
int bbufLimit = 0;
54     
55     
56     /**
57      * Default constructor.
58      */

59     public InternalNioOutputBuffer(Response response) {
60         this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
61     }
62
63
64     /**
65      * Alternate constructor.
66      */

67     public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) {
68
69         this.response = response;
70         headers = response.getMimeHeaders();
71
72         buf = new byte[headerBufferSize];
73         
74         if (headerBufferSize < (8 * 1024)) {
75             bbufLimit = 6 * 1500;
76         } else {
77             bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
78         }
79         //bbuf = ByteBuffer.allocateDirect(bbufLimit);
80

81         outputStreamOutputBuffer = new SocketOutputBuffer();
82
83         filterLibrary = new OutputFilter[0];
84         activeFilters = new OutputFilter[0];
85         lastActiveFilter = -1;
86
87         committed = false;
88         finished = false;
89         
90         this.writeTimeout = writeTimeout;
91
92         // Cause loading of HttpMessages
93
HttpMessages.getMessage(200);
94
95     }
96
97
98     // -------------------------------------------------------------- Variables
99

100
101     /**
102      * The string manager for this package.
103      */

104     protected static StringManager sm =
105         StringManager.getManager(Constants.Package);
106
107
108     // ----------------------------------------------------- Instance Variables
109

110
111     /**
112      * Associated Coyote response.
113      */

114     protected Response response;
115
116
117     /**
118      * Headers of the associated request.
119      */

120     protected MimeHeaders headers;
121
122
123     /**
124      * Committed flag.
125      */

126     protected boolean committed;
127
128
129     /**
130      * Finished flag.
131      */

132     protected boolean finished;
133
134
135     /**
136      * Pointer to the current write buffer.
137      */

138     protected byte[] buf;
139
140
141     /**
142      * Position in the buffer.
143      */

144     protected int pos;
145
146
147     /**
148      * Underlying socket.
149      */

150     protected NioChannel socket;
151     
152     /**
153      * Selector pool, for blocking reads and blocking writes
154      */

155     protected NioSelectorPool pool;
156
157
158
159     /**
160      * Underlying output buffer.
161      */

162     protected OutputBuffer outputStreamOutputBuffer;
163
164
165     /**
166      * Filter library.
167      * Note: Filter[0] is always the "chunked" filter.
168      */

169     protected OutputFilter[] filterLibrary;
170
171
172     /**
173      * Active filter (which is actually the top of the pipeline).
174      */

175     protected OutputFilter[] activeFilters;
176
177
178     /**
179      * Index of the last active filter.
180      */

181     protected int lastActiveFilter;
182     
183     /**
184      * Write time out in milliseconds
185      */

186     protected long writeTimeout = -1;
187
188
189     // ------------------------------------------------------------- Properties
190

191
192     /**
193      * Set the underlying socket.
194      */

195     public void setSocket(NioChannel socket) {
196         this.socket = socket;
197     }
198
199     public void setWriteTimeout(long writeTimeout) {
200         this.writeTimeout = writeTimeout;
201     }
202
203     /**
204      * Get the underlying socket input stream.
205      */

206     public NioChannel getSocket() {
207         return socket;
208     }
209
210     public long getWriteTimeout() {
211         return writeTimeout;
212     }
213
214     public void setSelectorPool(NioSelectorPool pool) {
215         this.pool = pool;
216     }
217
218     public NioSelectorPool getSelectorPool() {
219         return pool;
220     }
221     /**
222      * Set the socket buffer size.
223      */

224     public void setSocketBuffer(int socketBufferSize) {
225         // FIXME: Remove
226
}
227
228
229     /**
230      * Add an output filter to the filter library.
231      */

232     public void addFilter(OutputFilter filter) {
233
234         OutputFilter[] newFilterLibrary =
235             new OutputFilter[filterLibrary.length + 1];
236         for (int i = 0; i < filterLibrary.length; i++) {
237             newFilterLibrary[i] = filterLibrary[i];
238         }
239         newFilterLibrary[filterLibrary.length] = filter;
240         filterLibrary = newFilterLibrary;
241
242         activeFilters = new OutputFilter[filterLibrary.length];
243
244     }
245
246
247     /**
248      * Get filters.
249      */

250     public OutputFilter[] getFilters() {
251
252         return filterLibrary;
253
254     }
255
256
257     /**
258      * Clear filters.
259      */

260     public void clearFilters() {
261
262         filterLibrary = new OutputFilter[0];
263         lastActiveFilter = -1;
264
265     }
266
267
268     /**
269      * Add an output filter to the filter library.
270      */

271     public void addActiveFilter(OutputFilter filter) {
272
273         if (lastActiveFilter == -1) {
274             filter.setBuffer(outputStreamOutputBuffer);
275         } else {
276             for (int i = 0; i <= lastActiveFilter; i++) {
277                 if (activeFilters[i] == filter)
278                     return;
279             }
280             filter.setBuffer(activeFilters[lastActiveFilter]);
281         }
282
283         activeFilters[++lastActiveFilter] = filter;
284
285         filter.setResponse(response);
286
287     }
288
289
290     // --------------------------------------------------------- Public Methods
291

292
293     /**
294      * Flush the response.
295      *
296      * @throws IOException an undelying I/O error occured
297      */

298     public void flush()
299         throws IOException JavaDoc {
300
301         if (!committed) {
302
303             // Send the connector a request for commit. The connector should
304
// then validate the headers, send them (using sendHeader) and
305
// set the filters accordingly.
306
response.action(ActionCode.ACTION_COMMIT, null);
307
308         }
309
310         // Flush the current buffer
311
flushBuffer();
312
313     }
314
315
316     /**
317      * Reset current response.
318      *
319      * @throws IllegalStateException if the response has already been committed
320      */

321     public void reset() {
322
323         if (committed)
324             throw new IllegalStateException JavaDoc(/*FIXME:Put an error message*/);
325
326         // Recycle Request object
327
response.recycle();
328
329     }
330
331
332     /**
333      * Recycle the output buffer. This should be called when closing the
334      * connection.
335      */

336     public void recycle() {
337
338         // Recycle Request object
339
response.recycle();
340         socket.getBufHandler().getWriteBuffer().clear();
341
342         socket = null;
343         pos = 0;
344         lastActiveFilter = -1;
345         committed = false;
346         finished = false;
347
348     }
349
350
351     /**
352      * End processing of current HTTP request.
353      * Note: All bytes of the current request should have been already
354      * consumed. This method only resets all the pointers so that we are ready
355      * to parse the next HTTP request.
356      */

357     public void nextRequest() {
358
359         // Recycle Request object
360
response.recycle();
361
362         // Recycle filters
363
for (int i = 0; i <= lastActiveFilter; i++) {
364             activeFilters[i].recycle();
365         }
366
367         // Reset pointers
368
pos = 0;
369         lastActiveFilter = -1;
370         committed = false;
371         finished = false;
372
373     }
374
375
376     /**
377      * End request.
378      *
379      * @throws IOException an undelying I/O error occured
380      */

381     public void endRequest()
382         throws IOException JavaDoc {
383
384         if (!committed) {
385
386             // Send the connector a request for commit. The connector should
387
// then validate the headers, send them (using sendHeader) and
388
// set the filters accordingly.
389
response.action(ActionCode.ACTION_COMMIT, null);
390
391         }
392
393         if (finished)
394             return;
395
396         if (lastActiveFilter != -1)
397             activeFilters[lastActiveFilter].end();
398
399         flushBuffer();
400
401         finished = true;
402
403     }
404
405
406     // ------------------------------------------------ HTTP/1.1 Output Methods
407

408
409     /**
410      * Send an acknoledgement.
411      */

412     public void sendAck()
413         throws IOException JavaDoc {
414
415         if (!committed) {
416             //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
417
ByteBuffer JavaDoc buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);
418             writeToSocket(buf,false);
419         }
420
421     }
422
423     private synchronized void writeToSocket(ByteBuffer JavaDoc bytebuffer, boolean flip) throws IOException JavaDoc {
424         //int limit = bytebuffer.position();
425
if ( flip ) bytebuffer.flip();
426         int written = 0;
427         Selector JavaDoc selector = null;
428         try {
429             selector = getSelectorPool().get();
430         } catch ( IOException JavaDoc x ) {
431             //ignore
432
}
433         try {
434             written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout);
435             //make sure we are flushed
436
do {
437                 if (socket.flush(selector)) break;
438             }while ( true );
439         }finally {
440             if ( selector != null ) getSelectorPool().put(selector);
441         }
442         socket.getBufHandler().getWriteBuffer().clear();
443         this.total = 0;
444     }
445
446
447     /**
448      * Send the response status line.
449      */

450     public void sendStatus() {
451
452         // Write protocol name
453
write(Constants.HTTP_11_BYTES);
454         buf[pos++] = Constants.SP;
455
456         // Write status code
457
int status = response.getStatus();
458         switch (status) {
459         case 200:
460             write(Constants._200_BYTES);
461             break;
462         case 400:
463             write(Constants._400_BYTES);
464             break;
465         case 404:
466             write(Constants._404_BYTES);
467             break;
468         default:
469             write(status);
470         }
471
472         buf[pos++] = Constants.SP;
473
474         // Write message
475
String JavaDoc message = response.getMessage();
476         if (message == null) {
477             write(HttpMessages.getMessage(status));
478         } else {
479             write(message);
480         }
481
482         // End the response status line
483
buf[pos++] = Constants.CR;
484         buf[pos++] = Constants.LF;
485
486     }
487
488
489     /**
490      * Send a header.
491      *
492      * @param name Header name
493      * @param value Header value
494      */

495     public void sendHeader(MessageBytes name, MessageBytes value) {
496
497         write(name);
498         buf[pos++] = Constants.COLON;
499         buf[pos++] = Constants.SP;
500         write(value);
501         buf[pos++] = Constants.CR;
502         buf[pos++] = Constants.LF;
503
504     }
505
506
507     /**
508      * Send a header.
509      *
510      * @param name Header name
511      * @param value Header value
512      */

513     public void sendHeader(ByteChunk name, ByteChunk value) {
514
515         write(name);
516         buf[pos++] = Constants.COLON;
517         buf[pos++] = Constants.SP;
518         write(value);
519         buf[pos++] = Constants.CR;
520         buf[pos++] = Constants.LF;
521
522     }
523
524
525     /**
526      * Send a header.
527      *
528      * @param name Header name
529      * @param value Header value
530      */

531     public void sendHeader(String JavaDoc name, String JavaDoc value) {
532
533         write(name);
534         buf[pos++] = Constants.COLON;
535         buf[pos++] = Constants.SP;
536         write(value);
537         buf[pos++] = Constants.CR;
538         buf[pos++] = Constants.LF;
539
540     }
541
542
543     /**
544      * End the header block.
545      */

546     public void endHeaders() {
547
548         buf[pos++] = Constants.CR;
549         buf[pos++] = Constants.LF;
550
551     }
552
553
554     // --------------------------------------------------- OutputBuffer Methods
555

556
557     /**
558      * Write the contents of a byte chunk.
559      *
560      * @param chunk byte chunk
561      * @return number of bytes written
562      * @throws IOException an undelying I/O error occured
563      */

564     public int doWrite(ByteChunk chunk, Response res)
565         throws IOException JavaDoc {
566
567         if (!committed) {
568
569             // Send the connector a request for commit. The connector should
570
// then validate the headers, send them (using sendHeaders) and
571
// set the filters accordingly.
572
response.action(ActionCode.ACTION_COMMIT, null);
573
574         }
575
576         if (lastActiveFilter == -1)
577             return outputStreamOutputBuffer.doWrite(chunk, res);
578         else
579             return activeFilters[lastActiveFilter].doWrite(chunk, res);
580
581     }
582
583
584     // ------------------------------------------------------ Protected Methods
585

586
587     /**
588      * Commit the response.
589      *
590      * @throws IOException an undelying I/O error occured
591      */

592     protected void commit()
593         throws IOException JavaDoc {
594
595         // The response is now committed
596
committed = true;
597         response.setCommitted(true);
598
599         if (pos > 0) {
600             // Sending the response header buffer
601
addToBB(buf, 0, pos);
602         }
603
604     }
605
606     int total = 0;
607     private void addToBB(byte[] buf, int offset, int length) throws IOException JavaDoc {
608         if (socket.getBufHandler().getWriteBuffer().remaining() < length) {
609             flushBuffer();
610         }
611         socket.getBufHandler().getWriteBuffer().put(buf, offset, length);
612         total += length;
613     }
614
615
616     /**
617      * This method will write the contents of the specyfied message bytes
618      * buffer to the output stream, without filtering. This method is meant to
619      * be used to write the response header.
620      *
621      * @param mb data to be written
622      */

623     protected void write(MessageBytes mb) {
624
625         if (mb.getType() == MessageBytes.T_BYTES) {
626             ByteChunk bc = mb.getByteChunk();
627             write(bc);
628         } else if (mb.getType() == MessageBytes.T_CHARS) {
629             CharChunk cc = mb.getCharChunk();
630             write(cc);
631         } else {
632             write(mb.toString());
633         }
634
635     }
636
637
638     /**
639      * This method will write the contents of the specyfied message bytes
640      * buffer to the output stream, without filtering. This method is meant to
641      * be used to write the response header.
642      *
643      * @param bc data to be written
644      */

645     protected void write(ByteChunk bc) {
646
647         // Writing the byte chunk to the output buffer
648
System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos,
649                          bc.getLength());
650         pos = pos + bc.getLength();
651
652     }
653
654
655     /**
656      * This method will write the contents of the specyfied char
657      * buffer to the output stream, without filtering. This method is meant to
658      * be used to write the response header.
659      *
660      * @param cc data to be written
661      */

662     protected void write(CharChunk cc) {
663
664         int start = cc.getStart();
665         int end = cc.getEnd();
666         char[] cbuf = cc.getBuffer();
667         for (int i = start; i < end; i++) {
668             char c = cbuf[i];
669             // Note: This is clearly incorrect for many strings,
670
// but is the only consistent approach within the current
671
// servlet framework. It must suffice until servlet output
672
// streams properly encode their output.
673
if ((c <= 31) && (c != 9)) {
674                 c = ' ';
675             } else if (c == 127) {
676                 c = ' ';
677             }
678             buf[pos++] = (byte) c;
679         }
680
681     }
682
683
684     /**
685      * This method will write the contents of the specyfied byte
686      * buffer to the output stream, without filtering. This method is meant to
687      * be used to write the response header.
688      *
689      * @param b data to be written
690      */

691     public void write(byte[] b) {
692
693         // Writing the byte chunk to the output buffer
694
System.arraycopy(b, 0, buf, pos, b.length);
695         pos = pos + b.length;
696
697     }
698
699
700     /**
701      * This method will write the contents of the specyfied String to the
702      * output stream, without filtering. This method is meant to be used to
703      * write the response header.
704      *
705      * @param s data to be written
706      */

707     protected void write(String JavaDoc s) {
708
709         if (s == null)
710             return;
711
712         // From the Tomcat 3.3 HTTP/1.0 connector
713
int len = s.length();
714         for (int i = 0; i < len; i++) {
715             char c = s.charAt (i);
716             // Note: This is clearly incorrect for many strings,
717
// but is the only consistent approach within the current
718
// servlet framework. It must suffice until servlet output
719
// streams properly encode their output.
720
if ((c <= 31) && (c != 9)) {
721                 c = ' ';
722             } else if (c == 127) {
723                 c = ' ';
724             }
725             buf[pos++] = (byte) c;
726         }
727
728     }
729
730
731     /**
732      * This method will print the specified integer to the output stream,
733      * without filtering. This method is meant to be used to write the
734      * response header.
735      *
736      * @param i data to be written
737      */

738     protected void write(int i) {
739
740         write(String.valueOf(i));
741
742     }
743
744
745     /**
746      * Callback to write data from the buffer.
747      */

748     protected void flushBuffer()
749         throws IOException JavaDoc {
750
751         //prevent timeout for async,
752
SelectionKey JavaDoc key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
753         if (key != null) {
754             NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
755             attach.access();
756         }
757
758         //write to the socket, if there is anything to write
759
if (socket.getBufHandler().getWriteBuffer().position() > 0) {
760             writeToSocket(socket.getBufHandler().getWriteBuffer(),true);
761         }
762     }
763
764
765     // ----------------------------------- OutputStreamOutputBuffer Inner Class
766

767
768     /**
769      * This class is an output buffer which will write data to an output
770      * stream.
771      */

772     protected class SocketOutputBuffer
773         implements OutputBuffer {
774
775
776         /**
777          * Write chunk.
778          */

779         public int doWrite(ByteChunk chunk, Response res)
780             throws IOException JavaDoc {
781
782             int len = chunk.getLength();
783             int start = chunk.getStart();
784             byte[] b = chunk.getBuffer();
785             while (len > 0) {
786                 int thisTime = len;
787                 if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) {
788                     flushBuffer();
789                 }
790                 if (thisTime > socket.getBufHandler().getWriteBuffer().capacity() - socket.getBufHandler().getWriteBuffer().position()) {
791                     thisTime = socket.getBufHandler().getWriteBuffer().capacity() - socket.getBufHandler().getWriteBuffer().position();
792                 }
793                 addToBB(b,start,thisTime);
794                 len = len - thisTime;
795                 start = start + thisTime;
796             }
797             return chunk.getLength();
798
799         }
800
801
802     }
803
804
805 }
806
Popular Tags