1 2 23 24 package net.fenyo.gnetwatch.actions; 25 26 import net.fenyo.gnetwatch.*; 27 import net.fenyo.gnetwatch.actions.Action.InterruptCause; 28 import net.fenyo.gnetwatch.activities.Background; 29 import net.fenyo.gnetwatch.data.*; 30 import net.fenyo.gnetwatch.targets.*; 31 32 import java.io.*; 33 import java.net.*; 34 import javax.net.ssl.*; 35 import java.util.Arrays ; 36 import java.util.Date ; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 41 47 48 public class ActionHTTP extends Action { 49 private static Log log = LogFactory.getLog(ActionHTTP.class); 50 51 private boolean interrupted = false; 52 53 private String error_string = ""; 54 55 60 public ActionHTTP(final Target target, final Background background) { 63 super(target, background); 64 setItem("http"); 65 } 66 67 71 public ActionHTTP() { 74 setItem("http"); 75 } 76 77 82 public String getQueueName() { 84 return "http"; 85 } 86 87 92 public long getMaxDelay() { 95 return 30000000; 96 } 97 98 104 public void interrupt(final InterruptCause reason) { 107 interrupted = true; 108 } 109 110 122 private int connect(final int idx, final IPQuerier querier, final URLConnection [] connections, 123 final InputStream [] streams, final int [] sizes, final URL url, final Proxy proxy) throws IOException { 124 error_string = ""; 125 try { 126 connections[idx] = querier.getUseProxy() ? url.openConnection(proxy) : url.openConnection(); 127 connections[idx].setUseCaches(false); 128 connections[idx].connect(); 129 streams[idx] = connections[idx].getInputStream(); 130 sizes[idx] = connections[idx].getContentLength(); 131 132 } catch (final IOException ex) { 133 134 streams[idx] = null; 135 sizes[idx] = 0; 136 137 int response_code = 0; 138 try { 139 response_code = ((HttpURLConnection) connections[idx]).getResponseCode(); 140 } catch (final ConnectException ex2) { 141 getGUI().appendConsole(ex2.toString() + "<BR/>"); 142 try { 143 Thread.sleep(1000); 144 } catch (final InterruptedException ex3) {} 145 146 throw ex2; 147 } 148 149 error_string = "(http error " + response_code + ")"; 150 final InputStream error_stream = ((HttpURLConnection) connections[idx]).getErrorStream(); 151 if (error_stream == null) return 0; 152 int nread, nread_tot = 0; 153 String error_str = ""; 154 final byte [] error_buf = new byte [65536]; 155 while ((nread = error_stream.read(error_buf)) > 0) { 156 error_str += new String (error_buf); 158 nread_tot += nread; 159 } 160 error_stream.close(); 161 return nread_tot; 162 } 163 return 0; 164 } 165 166 174 public void invoke() throws IOException, InterruptedException { 177 if (isDisposed() == true) return; 178 179 try { 180 super.invoke(); 181 182 final IPQuerier querier; 183 if (TargetIPv4.class.isInstance(getTarget())) { 184 querier = ((TargetIPv4) getTarget()).getIPQuerier(); 185 } else if (TargetIPv6.class.isInstance(getTarget())) { 186 querier = ((TargetIPv6) getTarget()).getIPQuerier(); 187 } else return; 188 189 final URL url = new URL(querier.getURL()); 190 final Proxy proxy = querier.getUseProxy() ? new Proxy(Proxy.Type.HTTP, new InetSocketAddress(querier.getProxyHost(), querier.getProxyPort())) : null; 191 192 URLConnection [] connections = new URLConnection[querier.getNParallel()]; 193 InputStream [] streams = new InputStream[querier.getNParallel()]; 194 int [] sizes = new int [querier.getNParallel()]; 195 196 for (int idx = 0; idx < querier.getNParallel(); idx++) 197 connect(idx, querier, connections, streams, sizes, url, proxy); 198 199 final byte [] buf = new byte [65536]; 200 long last_time = System.currentTimeMillis(); 201 int bytes_received = 0; 202 int pages_received = 0; 203 204 while (true) { 205 int available_for_every_connections = 0; 206 207 for (int idx = 0; idx < querier.getNParallel(); idx++) { 208 final int available = (streams[idx] != null) ? streams[idx].available() : 0; 209 available_for_every_connections += available; 210 211 if (available == 0) { 212 if (sizes[idx] == 0) { 213 if (streams[idx] != null) streams[idx].close(); 214 bytes_received += connect(idx, querier, connections, streams, sizes, url, proxy); 215 pages_received++; 216 } 217 } else { 218 final int nread = streams[idx].read(buf); 219 switch (nread) { 220 case -1: 221 streams[idx].close(); 222 connect(idx, querier, connections, streams, sizes, url, proxy); 223 pages_received++; 224 break; 225 226 case 0: 227 log.error("0 byte read"); 228 for (InputStream foo : streams) if (foo != null) foo.close(); 229 return; 230 231 default: 232 bytes_received += nread; 234 sizes[idx] -= nread; 235 } 236 } 237 238 if (System.currentTimeMillis() - last_time > 1000) { 239 getTarget().addEvent(new EventHTTP(bytes_received)); 240 getTarget().addEvent(new EventHTTPPages(pages_received)); 241 242 setDescription("" + new Double (((double) 8 * 1000 * bytes_received) / (System.currentTimeMillis() - last_time)).intValue() + " bit/s (" + new Double (((double) 1000 * pages_received) / (System.currentTimeMillis() - last_time)).intValue() + " pages/sec)"); 243 getGUI().setStatus(getGUI().getConfig().getPattern("bytes_http", bytes_received, querier.getAddress().toString().substring(1)) + " " + error_string); 244 245 last_time = System.currentTimeMillis(); 246 bytes_received = 0; 247 pages_received = 0; 248 } 249 250 if (interrupted == true) { 251 for (InputStream foo : streams) if (foo != null) foo.close(); 252 return; 253 } 254 } 255 if (available_for_every_connections == 0) Thread.sleep(10); 256 } 257 } catch (final InterruptedException ex) { 258 log.error("Exception", ex); 259 } 260 } 261 262 267 protected void disposed() { 268 super.disposed(); 270 271 interrupt(InterruptCause.removed); 273 } 274 } 275 | Popular Tags |