1 46 47 package org.mr.core.net; 48 49 import java.io.IOException ; 50 import java.net.InetSocketAddress ; 51 import java.net.SocketAddress ; 52 import java.nio.ByteBuffer ; 53 import java.nio.channels.SocketChannel ; 54 55 import org.apache.commons.logging.LogFactory; 56 import org.mr.core.util.SystemTime; 57 58 67 public class HTTPTransportImpl extends TCPTransportImpl { 68 private boolean isClient; 69 private boolean readLF; 70 private boolean readHTTPHeader; 71 private static final int HTTP_HEADER_SIZE = 1000; 72 73 public HTTPTransportImpl(SocketChannel channel) { 74 super(channel); 75 this.isClient = false; 76 this.readLF = false; 77 this.readHTTPHeader = false; 78 this.log = LogFactory.getLog("HTTPTransportImpl"); 79 writeHTTPHeader(); 80 } 81 82 public HTTPTransportImpl(SocketAddress local, SocketAddress remote) 83 throws IOException 84 { 85 super (local, remote); 86 this.isClient = true; 87 this.readLF = false; 88 this.readHTTPHeader = false; 89 this.log = LogFactory.getLog("HTTPTransportImpl"); 90 } 91 92 public void onConnect() { 93 writeHTTPHeader(); 94 } 95 96 public void read() { 97 if (this.readHTTPHeader) { 98 super.read(); 99 } else { 100 ByteBuffer buf = ByteBuffer.allocate(1); 101 int n; 102 try { 103 while (true) { 104 n = this.channel.read(buf); 105 if (n == 0) { 106 return; 107 } else if (n == -1) { 108 if(log.isWarnEnabled()) { 109 log.warn("Channel " + toString() + 110 " EOF. Shutting down."); 111 } 112 shutdown(); 113 } else { 114 buf.flip(); 115 byte b = buf.get(); 116 if (b == '\n') { 117 if (this.readLF) { 118 this.readHTTPHeader = true; 119 return; 120 } else { 121 this.readLF = true; 122 } 123 } else { 124 this.readLF = false; 125 } 126 } 127 } 128 } catch (IOException e) { 129 if(log.isWarnEnabled()) 130 log.warn("Error reading from channel (remote = " + 131 channel.socket().getRemoteSocketAddress() 132 .toString() +"):" + e.getMessage()); 133 shutdown(); 134 } 135 } 136 } 137 138 private void writeHTTPHeader() { 139 ByteBuffer buf = ByteBuffer.allocate(HTTP_HEADER_SIZE); 140 StringBuffer sbuf = new StringBuffer (); 141 if (this.isClient) { 142 sbuf.append("POST /index.html?crap="); 144 sbuf.append(SystemTime.currentTimeMillis() / 1000); 145 sbuf.append(" HTTP/1.1\n"); 146 147 sbuf.append("Host: "); 149 InetSocketAddress remote = (InetSocketAddress ) 150 this.channel.socket().getRemoteSocketAddress(); 151 sbuf.append(remote.getHostName()).append(":") 152 .append(remote.getPort()).append("\n"); 153 154 sbuf.append("Connection: Keep-Alive\n"); 156 sbuf.append("Content-Type: application/octet-stream\n"); 157 sbuf.append("\n"); 158 } else { 159 sbuf.append("HTTP/1.1 200 OK\n"); 160 sbuf.append("Connection: Keep-Alive\n"); 161 sbuf.append("Cache-Control: no-cache, no-store, must-revalidate\n"); 162 sbuf.append("Expires: 0\n"); 163 sbuf.append("Content-Type: application/octet-stream\n"); 164 sbuf.append("\n"); 165 } 166 buf.put(sbuf.toString().getBytes()); 167 buf.flip(); 168 int written = 0; 169 try { 170 while (buf.remaining() > 0) { 171 this.channel.write(buf); 172 } 173 } catch (IOException e) { 174 if(log.isErrorEnabled()) 175 log.error("Error writing to " + toString() + ": " + 176 e.toString() + "."); 177 shutdown(); 178 } 179 } 180 181 public String toString() { 182 StringBuffer buf = new StringBuffer (); 183 try { 184 buf.append(this.channel.socket().getLocalSocketAddress(). 185 toString()); 186 buf.append(this.channel.socket().getRemoteSocketAddress(). 187 toString()); 188 } catch (Throwable t) { 189 buf.append("/unknown/unknown"); 190 } 191 buf.append("@HTTP"); 192 193 return buf.toString(); 194 } 195 196 public TransportType getType() { 197 return TransportType.HTTP; 198 }} | Popular Tags |