1 2 3 4 package net.nutch.ipc; 5 6 import java.net.Socket ; 7 import java.net.InetSocketAddress ; 8 import java.net.SocketTimeoutException ; 9 10 import java.io.IOException ; 11 import java.io.EOFException ; 12 import java.io.DataInputStream ; 13 import java.io.DataOutputStream ; 14 import java.io.BufferedInputStream ; 15 import java.io.BufferedOutputStream ; 16 17 import java.util.Hashtable ; 18 import java.util.logging.Logger ; 19 import java.util.logging.Level ; 20 21 import net.nutch.util.LogFormatter; 22 import net.nutch.io.Writable; 23 import net.nutch.io.UTF8; 24 25 32 public class Client { 33 public static final Logger LOG = 34 LogFormatter.getLogger("net.nutch.ipc.Client"); 35 36 private Hashtable connections = new Hashtable (); 37 38 private Class valueClass; private int timeout = 10000; private int counter; private boolean running = true; 43 44 private class Call { 45 int id; Writable param; Writable value; String error; 50 protected Call(Writable param) { 51 this.param = param; 52 synchronized (Client.this) { 53 this.id = counter++; 54 } 55 } 56 57 59 public synchronized void callComplete() { 60 notify(); } 62 } 63 64 67 private class Connection extends Thread { 68 private InetSocketAddress address; private Socket socket; private DataInputStream in; 71 private DataOutputStream out; 72 private Hashtable calls = new Hashtable (); 74 public Connection(InetSocketAddress address) throws IOException { 75 this.address = address; 76 this.socket = new Socket (address.getAddress(), address.getPort()); 77 socket.setSoTimeout(timeout); 78 this.in = new DataInputStream 79 (new BufferedInputStream (socket.getInputStream())); 80 this.out = new DataOutputStream 81 (new BufferedOutputStream (socket.getOutputStream())); 82 this.setDaemon(true); 83 this.setName("Client connection to " 84 + address.getAddress().getHostAddress() 85 + ":" + address.getPort()); 86 } 87 88 public void run() { 89 LOG.info(getName() + ": starting"); 90 try { 91 while (running) { 92 int id; 93 try { 94 id = in.readInt(); } catch (SocketTimeoutException e) { 96 continue; 97 } 98 99 if (LOG.isLoggable(Level.FINE)) 100 LOG.fine(getName() + " got value #" + id); 101 102 Call call = (Call)calls.remove(new Integer (id)); 103 boolean isError = in.readBoolean(); if (isError) { 105 UTF8 utf8 = new UTF8(); 106 utf8.readFields(in); call.error = utf8.toString(); 108 call.value = null; 109 } else { 110 Writable value = makeValue(); 111 value.readFields(in); call.value = value; 113 call.error = null; 114 } 115 call.callComplete(); } 117 } catch (EOFException eof) { 118 } catch (Exception e) { 120 LOG.log(Level.INFO, getName() + " caught: " + e, e); 121 } finally { 122 close(); 123 } 124 } 125 126 130 public void sendParam(Call call) throws IOException { 131 boolean error = true; 132 try { 133 calls.put(new Integer (call.id), call); 134 synchronized (out) { 135 if (LOG.isLoggable(Level.FINE)) 136 LOG.fine(getName() + " sending #" + call.id); 137 out.writeInt(call.id); 138 call.param.write(out); 139 out.flush(); 140 } 141 error = false; 142 } finally { 143 if (error) 144 close(); } 146 } 147 148 149 public void close() { 150 LOG.info(getName() + ": closing"); 151 connections.remove(address); try { 153 socket.close(); } catch (IOException e) {} 155 } 156 157 } 158 159 160 private class ParallelCall extends Call { 161 private ParallelResults results; 162 private int index; 163 164 public ParallelCall(Writable param, ParallelResults results, int index) { 165 super(param); 166 this.results = results; 167 this.index = index; 168 } 169 170 171 public void callComplete() { 172 results.callComplete(this); 173 } 174 } 175 176 177 private static class ParallelResults { 178 private Writable[] values; 179 private int size; 180 private int count; 181 182 public ParallelResults(int size) { 183 this.values = new Writable[size]; 184 this.size = size; 185 } 186 187 188 public synchronized void callComplete(ParallelCall call) { 189 values[call.index] = call.value; count++; if (count == size) notify(); } 194 } 195 196 198 public Client(Class valueClass) { 199 this.valueClass = valueClass; 200 } 201 202 204 public void stop() { 205 LOG.info("Stopping client"); 206 try { 207 Thread.sleep(timeout); } catch (InterruptedException e) {} 209 running = false; 210 } 211 212 213 public void setTimeout(int timeout) { this.timeout = timeout; } 214 215 218 public Writable call(Writable param, InetSocketAddress address) 219 throws IOException { 220 Connection connection = getConnection(address); 221 Call call = new Call(param); 222 synchronized (call) { 223 connection.sendParam(call); try { 225 call.wait(timeout); } catch (InterruptedException e) {} 227 228 if (call.error != null) { 229 throw new IOException (call.error); 230 } else if (call.value == null) { 231 throw new IOException ("timed out waiting for response"); 232 } else { 233 return call.value; 234 } 235 } 236 } 237 238 242 public Writable[] call(Writable[] params, InetSocketAddress [] addresses) 243 throws IOException { 244 if (params.length == 0) return new Writable[0]; 245 246 ParallelResults results = new ParallelResults(params.length); 247 synchronized (results) { 248 for (int i = 0; i < params.length; i++) { 249 ParallelCall call = new ParallelCall(params[i], results, i); 250 try { 251 Connection connection = getConnection(addresses[i]); 252 connection.sendParam(call); } catch (IOException e) { 254 LOG.info("Calling "+addresses[i]+" caught: " + e); results.size--; } 257 } 258 try { 259 results.wait(timeout); } catch (InterruptedException e) {} 261 262 if (results.count == 0) { 263 throw new IOException ("no responses"); 264 } else { 265 return results.values; 266 } 267 } 268 } 269 270 272 private Connection getConnection(InetSocketAddress address) 273 throws IOException { 274 Connection connection; 275 synchronized (connections) { 276 connection = (Connection)connections.get(address); 277 if (connection == null) { 278 connection = new Connection(address); 279 connections.put(address, connection); 280 connection.start(); 281 } 282 } 283 return connection; 284 } 285 286 private Writable makeValue() { 287 Writable value; try { 289 value = (Writable)valueClass.newInstance(); 290 } catch (InstantiationException e) { 291 throw new RuntimeException (e.toString()); 292 } catch (IllegalAccessException e) { 293 throw new RuntimeException (e.toString()); 294 } 295 return value; 296 } 297 298 } 299 | Popular Tags |