KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > server > hmux > HmuxStream


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.server.hmux;
31
32 import com.caucho.log.Log;
33 import com.caucho.util.Alarm;
34 import com.caucho.vfs.*;
35
36 import java.io.IOException JavaDoc;
37 import java.io.InputStream JavaDoc;
38 import java.io.OutputStream JavaDoc;
39 import java.net.ConnectException JavaDoc;
40 import java.net.Socket JavaDoc;
41 import java.net.SocketException JavaDoc;
42 import java.util.HashMap JavaDoc;
43 import java.util.Iterator JavaDoc;
44 import java.util.logging.Level JavaDoc;
45 import java.util.logging.Logger JavaDoc;
46
47 /**
48  * Underlying stream handling HTTP requests.
49  */

50 class HmuxStream extends StreamImpl {
51   private static final Logger JavaDoc log = Log.open(HmuxStream.class);
52   // reserved headers that should not be passed to the HTTP server
53
private static HashMap JavaDoc<String JavaDoc,String JavaDoc> _reserved;
54
55   private static final Object JavaDoc LOCK = new Object JavaDoc();
56   
57   // Saved keepalive stream for a new request.
58
private static HmuxStream _savedStream;
59   // Time the stream was saved
60
private static long _saveTime;
61   
62   private long _socketTimeout = 30000L;
63
64   private boolean _isSSL;
65
66   private Socket JavaDoc _s;
67   private InputStream JavaDoc _is;
68   private OutputStream JavaDoc _os;
69   private ReadStream _rs;
70   private WriteStream _ws;
71
72   // The server's host name
73
private String JavaDoc _host;
74   // The server's port
75
private int _port;
76
77   private String JavaDoc _virtualHost;
78
79   // the method
80
private String JavaDoc _method;
81   // true for a HEAD stream
82
private boolean _isHead;
83   // true for a POST stream
84
private boolean _isPost;
85
86   // buffer containing the POST data
87
private MemoryStream _tempStream;
88
89   // true if keepalive is allowed
90
private boolean _isKeepalive = true;
91   // true after the request has been sent
92
private boolean _didGet;
93   // length of the current chunk, -1 on eof
94
private int _chunkLength;
95   // the request is done
96
private boolean _isRequestDone;
97
98   private HashMap JavaDoc<String JavaDoc,Object JavaDoc> _attributes;
99
100   // Used to read unread bytes on recycle.
101
private byte []_tempBuffer;
102
103   /**
104    * Create a new HTTP stream.
105    */

106   private HmuxStream(Path path, String JavaDoc host, int port, Socket JavaDoc s)
107     throws IOException JavaDoc
108   {
109     _s = s;
110
111     _host = host;
112     _port = port;
113     
114     _is = _s.getInputStream();
115     _os = _s.getOutputStream();
116
117     _ws = VfsStream.openWrite(_os);
118     _rs = VfsStream.openRead(_is, _ws);
119
120     _attributes = new HashMap JavaDoc<String JavaDoc,Object JavaDoc>();
121
122     init(path);
123   }
124
125   /**
126    * Opens a new HTTP stream for reading, i.e. a GET request.
127    *
128    * @param path the URL for the stream
129    *
130    * @return the opened stream
131    */

132   static HmuxStreamWrapper openRead(HmuxPath path) throws IOException JavaDoc
133   {
134     HmuxStream stream = createStream(path);
135     stream._isPost = false;
136
137     return new HmuxStreamWrapper(stream);
138   }
139
140   /**
141    * Opens a new HTTP stream for reading and writing, i.e. a POST request.
142    *
143    * @param path the URL for the stream
144    *
145    * @return the opened stream
146    */

147   static HmuxStreamWrapper openReadWrite(HmuxPath path) throws IOException JavaDoc
148   {
149     HmuxStream stream = createStream(path);
150     stream._isPost = true;
151
152     return new HmuxStreamWrapper(stream);
153   }
154
155   /**
156    * Creates a new HTTP stream. If there is a saved connection to
157    * the same host, use it.
158    *
159    * @param path the URL for the stream
160    *
161    * @return the opened stream
162    */

163   static private HmuxStream createStream(HmuxPath path) throws IOException JavaDoc
164   {
165     String JavaDoc host = path.getHost();
166     int port = path.getPort();
167
168     HmuxStream stream = null;
169     long streamTime = 0;
170     synchronized (LOCK) {
171       if (_savedStream != null &&
172           host.equals(_savedStream.getHost()) &&
173           port == _savedStream.getPort()) {
174         stream = _savedStream;
175         streamTime = _saveTime;
176         _savedStream = null;
177       }
178     }
179
180     if (stream == null) {
181     }
182     // if the stream is still valid, use it
183
else if (Alarm.getCurrentTime() < streamTime + 5000) {
184       stream.init(path);
185       return stream;
186     }
187     // if the stream has timed out, close it
188
else {
189       try {
190         stream._isKeepalive = false;
191         stream.close();
192       } catch (IOException JavaDoc e) {
193         log.log(Level.FINE, e.toString(), e);
194       }
195     }
196
197     Socket JavaDoc s;
198
199     try {
200       s = new Socket JavaDoc(host, port);
201     } catch (ConnectException JavaDoc e) {
202       throw new ConnectException JavaDoc(path.getURL() + ": " + e.getMessage());
203     } catch (Exception JavaDoc e) {
204       throw new ConnectException JavaDoc(path.getURL() + ": " + e.toString());
205     }
206
207     int socketTimeout = 300 * 1000;
208
209     try {
210       s.setSoTimeout(socketTimeout);
211     } catch (Exception JavaDoc e) {
212     }
213           
214     return new HmuxStream(path, host, port, s);
215   }
216
217   /**
218    * Initializes the stream for the next request.
219    */

220   private void init(Path path)
221   {
222     _isRequestDone = false;
223     _didGet = false;
224     _isPost = false;
225     _isHead = false;
226     _method = null;
227     _attributes.clear();
228     
229     setPath(path);
230
231     if (path instanceof HmuxPath)
232       _virtualHost = ((HmuxPath) path).getVirtualHost();
233   }
234
235   /**
236    * Set if this should be an SSL connection.
237    */

238   public void setSSL(boolean isSSL)
239   {
240     _isSSL = isSSL;
241   }
242
243   /**
244    * Set if this should be an SSL connection.
245    */

246   public boolean isSSL()
247   {
248     return _isSSL;
249   }
250
251   /**
252    * Sets the method
253    */

254   public void setMethod(String JavaDoc method)
255   {
256     _method = method;
257   }
258
259   /**
260    * Sets true if we're only interested in the head.
261    */

262   public void setHead(boolean isHead)
263   {
264     _isHead = isHead;
265   }
266
267   /**
268    * Returns the stream's host.
269    */

270   public String JavaDoc getHost()
271   {
272     return _host;
273   }
274
275   /**
276    * Returns the stream's port.
277    */

278   public int getPort()
279   {
280     return _port;
281   }
282   
283   /**
284    * Returns a header from the response returned from the HTTP server.
285    *
286    * @param name name of the header
287    * @return the header value.
288    */

289   public Object JavaDoc getAttribute(String JavaDoc name)
290     throws IOException JavaDoc
291   {
292     if (! _didGet)
293       getConnInput();
294
295     return _attributes.get(name.toLowerCase());
296   }
297
298   /**
299    * Returns an iterator of the returned header names.
300    */

301   public Iterator JavaDoc getAttributeNames()
302     throws IOException JavaDoc
303   {
304     if (! _didGet)
305       getConnInput();
306
307     return _attributes.keySet().iterator();
308   }
309
310   /**
311    * Sets a header for the request.
312    */

313   public void setAttribute(String JavaDoc name, Object JavaDoc value)
314   {
315     if (name.equals("method"))
316       setMethod((String JavaDoc) value);
317     else if (name.equals("socket-timeout")) {
318       if (value instanceof Integer JavaDoc) {
319         int socketTimeout = ((Integer JavaDoc) value).intValue();
320
321         if (socketTimeout > 0) {
322           try {
323             if (_s != null)
324               _s.setSoTimeout(socketTimeout);
325           } catch (Exception JavaDoc e) {
326
327           }
328         }
329       }
330     }
331     else
332       _attributes.put(name.toLowerCase(), value);
333   }
334
335   /**
336    * Remove a header for the request.
337    */

338   public void removeAttribute(String JavaDoc name)
339   {
340     _attributes.remove(name.toLowerCase());
341   }
342
343   /**
344    * Sets the timeout.
345    */

346   public void setSocketTimeout(long timeout)
347     throws SocketException JavaDoc
348   {
349     if (_s != null)
350       _s.setSoTimeout((int) timeout);
351   }
352
353   /**
354    * The stream is always writable (?)
355    */

356   public boolean canWrite()
357   {
358     return true;
359   }
360
361   /**
362    * Writes a buffer to the underlying stream.
363    *
364    * @param buffer the byte array to write.
365    * @param offset the offset into the byte array.
366    * @param length the number of bytes to write.
367    * @param isEnd true when the write is flushing a close.
368    */

369   public void write(byte []buf, int offset, int length, boolean isEnd)
370     throws IOException JavaDoc
371   {
372     if (! _isPost)
373       return;
374
375     if (_tempStream == null)
376       _tempStream = new MemoryStream();
377
378     _tempStream.write(buf, offset, length, isEnd);
379   }
380
381   /**
382    * The stream is readable.
383    */

384   public boolean canRead()
385   {
386     return true;
387   }
388
389   /**
390    * Read data from the connection. If the request hasn't yet been sent
391    * to the server, send it.
392    */

393   public int read(byte []buf, int offset, int length) throws IOException JavaDoc
394   {
395     try {
396       return readInt(buf, offset, length);
397     } catch (IOException JavaDoc e) {
398       _isKeepalive = false;
399       throw e;
400     } catch (RuntimeException JavaDoc e) {
401       _isKeepalive = false;
402       throw e;
403     }
404   }
405   
406   /**
407    * Read data from the connection. If the request hasn't yet been sent
408    * to the server, send it.
409    */

410   public int readInt(byte []buf, int offset, int length) throws IOException JavaDoc
411   {
412     if (! _didGet)
413       getConnInput();
414
415     if (_isRequestDone)
416       return -1;
417
418     try {
419       int len = length;
420
421       if (_chunkLength == 0) {
422     if (! readData())
423       _chunkLength = -1;
424       }
425       
426       if (_chunkLength < 0)
427     return -1;
428       
429       if (_chunkLength < len)
430     len = _chunkLength;
431
432       len = _rs.read(buf, offset, len);
433
434       if (len < 0) {
435       }
436       else
437     _chunkLength -= len;
438     
439       return len;
440     } catch (IOException JavaDoc e) {
441       _isKeepalive = false;
442       throw e;
443     } catch (RuntimeException JavaDoc e) {
444       _isKeepalive = false;
445       throw e;
446     }
447   }
448
449   /**
450    * Sends the request and initializes the response.
451    */

452   private void getConnInput() throws IOException JavaDoc
453   {
454     if (_didGet)
455       return;
456
457     try {
458       getConnInputImpl();
459     } catch (IOException JavaDoc e) {
460       _isKeepalive = false;
461       throw e;
462     } catch (RuntimeException JavaDoc e) {
463       _isKeepalive = false;
464       throw e;
465     }
466   }
467
468   /**
469    * Send the request to the server, wait for the response and parse
470    * the headers.
471    */

472   private void getConnInputImpl() throws IOException JavaDoc
473   {
474     if (_didGet)
475       return;
476
477     _didGet = true;
478
479     _ws.write('C');
480     _ws.write(0);
481     _ws.write(0);
482
483     if (_method != null) {
484       writeString(HmuxRequest.HMUX_METHOD, _method);
485     }
486     else if (_isPost) {
487       writeString(HmuxRequest.HMUX_METHOD, "POST");
488     }
489     else if (_isHead)
490       writeString(HmuxRequest.HMUX_METHOD, "HEAD");
491     else
492       writeString(HmuxRequest.HMUX_METHOD, "GET");
493     
494     if (_virtualHost != null)
495       writeString(HmuxRequest.HMUX_SERVER_NAME, _virtualHost);
496     else {
497       writeString(HmuxRequest.HMUX_SERVER_NAME, _path.getHost());
498       _ws.print(_path.getHost());
499       if (_path.getPort() != 80) {
500     writeString(HmuxRequest.CSE_SERVER_PORT,
501             String.valueOf(_path.getPort()));
502       }
503     }
504
505     // Not splitting query? Also fullpath?
506
writeString(HmuxRequest.HMUX_URI, _path.getPath());
507
508     if (_path.getQuery() != null)
509       writeString(HmuxRequest.CSE_QUERY_STRING, _path.getQuery());
510     
511     Iterator JavaDoc iter = getAttributeNames();
512     while (iter.hasNext()) {
513       String JavaDoc name = (String JavaDoc) iter.next();
514       if (_reserved.get(name.toLowerCase()) == null) {
515     writeString(HmuxRequest.HMUX_HEADER, name);
516     writeString(HmuxRequest.HMUX_STRING, getAttribute(name));
517       }
518     }
519
520     if (_isPost) {
521       MemoryStream tempStream = _tempStream;
522       _tempStream = null;
523       if (tempStream != null) {
524     TempBuffer tb = TempBuffer.allocate();
525     byte []buffer = tb.getBuffer();
526     int sublen;
527
528     ReadStream postIn = tempStream.openRead();
529
530     while ((sublen = postIn.read(buffer, 0, buffer.length)) > 0) {
531       _ws.write('D');
532       _ws.write(sublen >> 8);
533       _ws.write(sublen);
534       _ws.write(buffer, 0, sublen);
535     }
536
537     tempStream.destroy();
538
539     TempBuffer.free(tb);
540       }
541     }
542
543     _attributes.clear();
544
545     _ws.write('Q');
546
547     readData();
548
549     if (_isHead)
550       _isRequestDone = true;
551   }
552
553   private void writeString(int code, String JavaDoc string)
554     throws IOException JavaDoc
555   {
556     WriteStream ws = _ws;
557
558     ws.write((byte) code);
559     int len = string.length();
560     ws.write(len >> 8);
561     ws.write(len);
562     ws.print(string);
563   }
564
565   private void writeString(int code, Object JavaDoc obj)
566     throws IOException JavaDoc
567   {
568     String JavaDoc string = String.valueOf(obj);
569     
570     WriteStream ws = _ws;
571
572     ws.write((byte) code);
573     int len = string.length();
574     ws.write(len >> 8);
575     ws.write(len);
576     ws.print(string);
577   }
578
579   /**
580    * Parse the headers returned from the server.
581    */

582   private boolean readData()
583     throws IOException JavaDoc
584   {
585     boolean isDebug = log.isLoggable(Level.FINE);
586     
587     int code;
588
589     ReadStream is = _rs;
590
591     while ((code = is.read()) > 0) {
592       switch (code) {
593       case HmuxRequest.HMUX_CHANNEL:
594     is.read();
595     is.read();
596     break;
597       case HmuxRequest.HMUX_QUIT:
598       case HmuxRequest.HMUX_EXIT:
599     is.close();
600
601     if (isDebug)
602       log.fine("HMUX: " + (char) code);
603     
604     return false;
605     
606       case HmuxRequest.HMUX_YIELD:
607     break;
608     
609       case HmuxRequest.HMUX_STATUS:
610     String JavaDoc value = readString(is);
611     _attributes.put("status", value.substring(0, 3));
612     
613     if (isDebug)
614       log.fine("HMUX: " + (char) code + " " + value);
615     break;
616     
617       case HmuxRequest.HMUX_DATA:
618     _chunkLength = 256 * (is.read() & 0xff) + (is.read() & 0xff);
619     
620     if (isDebug)
621       log.fine("HMUX: " + (char) code + " " + _chunkLength);
622     
623     return true;
624     
625       default:
626     int len = 256 * (is.read() & 0xff) + (is.read() & 0xff);
627     
628     if (isDebug)
629       log.fine("HMUX: " + (char) code + " " + len);
630     
631     is.skip(len);
632     break;
633       }
634     }
635
636     return false;
637   }
638
639   private String JavaDoc readString(ReadStream is)
640     throws IOException JavaDoc
641   {
642     int len = 256 * (is.read() & 0xff) + is.read();
643
644     char []buf = new char[len];
645
646     is.readAll(buf, 0, len);
647
648     return new String JavaDoc(buf);
649   }
650
651   /**
652    * Returns the bytes still available.
653    */

654   public int getAvailable() throws IOException JavaDoc
655   {
656     if (! _didGet)
657       getConnInput();
658
659     return _rs.getAvailable();
660   }
661
662   /**
663    * Close the connection.
664    */

665   public void close() throws IOException JavaDoc
666   {
667     if (_isKeepalive) {
668       // If recycling, read any unread data
669
if (! _didGet)
670         getConnInput();
671
672       if (! _isRequestDone) {
673         if (_tempBuffer == null)
674           _tempBuffer = new byte[256];
675
676         try {
677           while (read(_tempBuffer, 0, _tempBuffer.length) > 0) {
678           }
679         } catch (IOException JavaDoc e) {
680           _isKeepalive = false;
681         }
682       }
683     }
684
685     if (com.caucho.server.util.CauchoSystem.isTesting())
686       _isKeepalive = false; // XXX:
687

688     if (_isKeepalive) {
689       HmuxStream oldSaved;
690       long now = Alarm.getCurrentTime();
691       synchronized (LOCK) {
692         oldSaved = _savedStream;
693         _savedStream = this;
694         _saveTime = now;
695       }
696
697       if (oldSaved != null && oldSaved != this) {
698         oldSaved._isKeepalive = false;
699         oldSaved.close();
700       }
701
702       return;
703     }
704
705     try {
706       try {
707         if (_ws != null)
708           _ws.close();
709       } catch (Throwable JavaDoc e) {
710       }
711       _ws = null;
712
713       try {
714         if (_rs != null)
715           _rs.close();
716       } catch (Throwable JavaDoc e) {
717       }
718       _rs = null;
719
720       try {
721         if (_os != null)
722           _os.close();
723       } catch (Throwable JavaDoc e) {
724       }
725       _os = null;
726
727       try {
728         if (_is != null)
729           _is.close();
730       } catch (Throwable JavaDoc e) {
731       }
732       _is = null;
733     } finally {
734       if (_s != null)
735     _s.close();
736       _s = null;
737     }
738   }
739
740   static {
741     _reserved = new HashMap JavaDoc<String JavaDoc,String JavaDoc>();
742     _reserved.put("user-agent", "");
743     _reserved.put("content-length", "");
744     _reserved.put("content-encoding", "");
745     _reserved.put("connection", "");
746     _reserved.put("host", "");
747   }
748 }
749
Popular Tags