1 2 3 4 package net.nutch.searcher; 5 6 import java.net.InetSocketAddress ; 7 import java.io.*; 8 import java.util.*; 9 import java.util.logging.Logger ; 10 11 import net.nutch.parse.ParseData; 12 import net.nutch.parse.ParseText; 13 import net.nutch.util.LogFormatter; 14 import net.nutch.io.*; 15 import net.nutch.ipc.*; 16 17 18 19 public class DistributedSearch { 20 public static final Logger LOG = 21 LogFormatter.getLogger("net.nutch.searcher.DistributedSearch"); 22 23 private DistributedSearch() {} 25 private static final byte OP_SEGMENTS = (byte)0; 27 private static final byte OP_SEARCH = (byte)1; 28 private static final byte OP_EXPLAIN = (byte)2; 29 private static final byte OP_DETAILS = (byte)3; 30 private static final byte OP_SUMMARY = (byte)4; 31 private static final byte OP_CONTENT = (byte)5; 32 private static final byte OP_ANCHORS = (byte)6; 33 private static final byte OP_PARSEDATA = (byte)7; 34 private static final byte OP_PARSETEXT = (byte)8; 35 private static final byte OP_FETCHDATE = (byte)9; 36 37 38 private static final String [] OP_NAMES = new String [10]; 39 static { 40 OP_NAMES[OP_SEGMENTS] = "getSegmentNames"; 41 OP_NAMES[OP_SEARCH] = "search"; 42 OP_NAMES[OP_EXPLAIN] = "getExplanation"; 43 OP_NAMES[OP_DETAILS] = "getDetails"; 44 OP_NAMES[OP_SUMMARY] = "getSummary"; 45 OP_NAMES[OP_CONTENT] = "getContent"; 46 OP_NAMES[OP_ANCHORS] = "getAnchors"; 47 OP_NAMES[OP_PARSEDATA] = "getParseData"; 48 OP_NAMES[OP_PARSETEXT] = "getParseText"; 49 OP_NAMES[OP_FETCHDATE] = "getFetchDate"; 50 } 51 52 54 public static class Param implements Writable { 55 private byte op; private Writable first; private Writable second; 59 public Param() {} 60 61 Param(byte op, Writable first) { 62 this(op, first, NullWritable.get()); 63 } 64 65 Param(byte op, Writable first, Writable second) { 66 this.op = op; 67 this.first = first; 68 this.second = second; 69 } 70 71 public void write(DataOutput out) throws IOException { 72 out.writeByte(op); 73 first.write(out); 74 second.write(out); 75 } 76 77 public void readFields(DataInput in) throws IOException { 78 op = in.readByte(); 79 80 switch (op) { 81 case OP_SEGMENTS: 82 first = NullWritable.get(); 83 second = NullWritable.get(); 84 break; 85 case OP_SEARCH: 86 first = new Query(); 87 second = new IntWritable(); 88 break; 89 case OP_EXPLAIN: 90 first = new Query(); 91 second = new Hit(); 92 break; 93 case OP_DETAILS: 94 first = new Hit(); 95 second = NullWritable.get(); 96 break; 97 case OP_SUMMARY: 98 first = new HitDetails(); 99 second = new Query(); 100 break; 101 case OP_CONTENT: 102 case OP_ANCHORS: 103 case OP_PARSEDATA: 104 case OP_PARSETEXT: 105 case OP_FETCHDATE: 106 first = new HitDetails(); 107 second = NullWritable.get(); 108 break; 109 default: 110 throw new RuntimeException ("Unknown op code: " + op); 111 } 112 113 first.readFields(in); 114 second.readFields(in); 115 } 116 } 117 118 120 public static class Result implements Writable { 121 private byte op; 122 private Writable value; 123 124 public Result() {} 125 126 Result(byte op, Writable value) { 127 this.op = op; 128 this.value = value; 129 } 130 131 public void write(DataOutput out) throws IOException { 132 out.writeByte(op); 133 value.write(out); 134 } 135 136 public void readFields(DataInput in) throws IOException { 137 op = in.readByte(); 138 139 switch (op) { 140 case OP_SEGMENTS: 141 value = new ArrayWritable(UTF8.class); 142 break; 143 case OP_SEARCH: 144 value = new Hits(); 145 break; 146 case OP_EXPLAIN: 147 value = new UTF8(); 148 break; 149 case OP_DETAILS: 150 value = new HitDetails(); 151 break; 152 case OP_SUMMARY: 153 value = new UTF8(); 154 break; 155 case OP_CONTENT: 156 value = new BytesWritable(); 157 break; 158 case OP_ANCHORS: 159 value = new ArrayWritable(UTF8.class); 160 break; 161 case OP_PARSEDATA: 162 value = new ParseData(); 163 break; 164 case OP_PARSETEXT: 165 value = new ParseText(); 166 break; 167 case OP_FETCHDATE: 168 value = new LongWritable(); 169 break; 170 default: 171 throw new RuntimeException ("Unknown op code: " + op); 172 } 173 174 value.readFields(in); 175 } 176 } 177 178 179 public static class Server extends net.nutch.ipc.Server { 180 private NutchBean bean; 181 182 184 public Server(File directory, int port) throws IOException { 185 super(port, Param.class, 10); 186 this.bean = new NutchBean(directory); 187 } 188 189 public Writable call(Writable param) throws IOException { 190 Param p = (Param)param; 191 logRequest(p); 192 Writable value; 193 switch (p.op) { 194 case OP_SEGMENTS: 195 value = new ArrayWritable(bean.getSegmentNames()); 196 break; 197 case OP_SEARCH: 198 value = bean.search((Query)p.first, ((IntWritable)p.second).get()); 199 break; 200 case OP_EXPLAIN: 201 value = new UTF8(bean.getExplanation((Query)p.first, (Hit)p.second)); 202 break; 203 case OP_DETAILS: 204 value = bean.getDetails((Hit)p.first); 205 break; 206 case OP_SUMMARY: 207 value = new UTF8(bean.getSummary((HitDetails)p.first,(Query)p.second)); 208 break; 209 case OP_CONTENT: 210 value = new BytesWritable(bean.getContent((HitDetails)p.first)); 211 break; 212 case OP_ANCHORS: 213 value = new ArrayWritable(bean.getAnchors((HitDetails)p.first)); 214 break; 215 case OP_PARSEDATA: 216 value = bean.getParseData((HitDetails)p.first); 217 break; 218 case OP_PARSETEXT: 219 value = bean.getParseText((HitDetails)p.first); 220 break; 221 case OP_FETCHDATE: 222 value = new LongWritable(bean.getFetchDate((HitDetails)p.first)); 223 break; 224 default: 225 throw new RuntimeException ("Unknown op code: " + p.op); 226 } 227 228 230 return new Result(p.op, value); 231 232 } 233 234 private static void logRequest(Param p) { 235 StringBuffer buffer = new StringBuffer (); 236 buffer.append(Thread.currentThread().getName()); 237 buffer.append(": "); 238 buffer.append(OP_NAMES[p.op]); 239 buffer.append("("); 240 if (p.first != NullWritable.get()) { 241 buffer.append(p.first); 242 if (p.second != NullWritable.get()) { 243 buffer.append(", "); 244 buffer.append(p.second); 245 } 246 } 247 buffer.append(")"); 248 LOG.info(buffer.toString()); 249 } 250 251 252 public static void main(String [] args) throws Exception { 253 String usage = "DistributedSearch$Server <port> <index dir>"; 254 255 if (args.length == 0 || args.length > 2) { 256 System.err.println(usage); 257 System.exit(-1); 258 } 259 260 int port = Integer.parseInt(args[0]); 261 File directory = new File(args[1]); 262 263 Server server = new Server(directory, port); 264 server.start(); 266 server.join(); 267 } 268 269 } 270 271 272 public static class Client extends net.nutch.ipc.Client 273 implements Searcher, HitDetailer, HitSummarizer, HitContent { 274 275 private InetSocketAddress [] addresses; 276 private HashMap segmentToAddress = new HashMap(); 277 278 282 283 public Client(File file) throws IOException { 284 this(readConfig(file)); 285 } 286 287 private static InetSocketAddress [] readConfig(File config) 288 throws IOException { 289 BufferedReader reader = new BufferedReader(new FileReader(config)); 290 ArrayList addrs = new ArrayList(); 291 String line; 292 while ((line = reader.readLine()) != null) { 293 StringTokenizer tokens = new StringTokenizer(line); 294 if (tokens.hasMoreTokens()) { 295 String host = tokens.nextToken(); 296 if (tokens.hasMoreTokens()) { 297 String port = tokens.nextToken(); 298 addrs.add(new InetSocketAddress (host, Integer.parseInt(port))); 299 LOG.info("Client adding server " + host + ":" + port); 300 } 301 } 302 } 303 return (InetSocketAddress []) 304 addrs.toArray(new InetSocketAddress [addrs.size()]); 305 } 306 307 308 public Client(InetSocketAddress [] addresses) throws IOException { 309 super(Result.class); 310 311 this.addresses = addresses; 312 313 Param param = new Param(OP_SEGMENTS, NullWritable.get()); 315 Writable[] params = new Writable[addresses.length]; 316 for (int i = 0; i < params.length; i++) { 317 params[i] = param; } 319 Writable[] results = call(params, addresses); 321 for (int i = 0; i < results.length; i++) { Result result = (Result)results[i]; 323 if (result == null) { 324 LOG.warning("Client: no segments from: " + addresses[i]); 325 continue; 326 } 327 String [] segments = ((ArrayWritable)result.value).toStrings(); 328 for (int j = 0; j < segments.length; j++) { 329 LOG.info("Client: segment "+segments[j]+" at "+addresses[i]); 330 segmentToAddress.put(segments[j], addresses[i]); 331 } 332 } 333 } 334 335 336 public String [] getSegmentNames() { 337 return (String [])segmentToAddress.keySet().toArray(new String [segmentToAddress.size()]); 338 } 339 340 public Hits search(Query query, int numHits) throws IOException { 341 long totalHits = 0; 342 Hits[] segmentHits = new Hits[addresses.length]; 343 344 Param param = new Param(OP_SEARCH, query, new IntWritable(numHits)); 345 Writable[] params = new Writable[addresses.length]; 346 for (int i = 0; i < params.length; i++) { 347 params[i] = param; } 349 Writable[] results = call(params, addresses); 351 TreeSet queue = new TreeSet(); float minScore = 0.0f; 353 for (int i = 0; i < results.length; i++) { 354 Result result = (Result)results[i]; 355 if (result == null) continue; 356 Hits hits = (Hits)result.value; 357 totalHits += hits.getTotal(); 358 for (int j = 0; j < hits.getLength(); j++) { 359 Hit h = hits.getHit(j); 360 if (h.getScore() >= minScore) { 361 queue.add(new Hit(i, h.getIndexDocNo(),h.getScore(),h.getSite())); 362 if (queue.size() > numHits) { queue.remove(queue.last()); minScore = ((Hit)queue.last()).getScore(); } 366 } 367 } 368 } 369 return new Hits(totalHits, (Hit[])queue.toArray(new Hit[queue.size()])); 370 } 371 372 public String getExplanation(Query query, Hit hit) throws IOException { 373 Param param = new Param(OP_EXPLAIN, query, hit); 374 Result result = (Result)call(param, addresses[hit.getIndexNo()]); 375 return result.value.toString(); 376 } 377 378 public HitDetails getDetails(Hit hit) throws IOException { 379 Param param = new Param(OP_DETAILS, hit); 380 Result result = (Result)call(param, addresses[hit.getIndexNo()]); 381 return (HitDetails)result.value; 382 } 383 384 public HitDetails[] getDetails(Hit[] hits) throws IOException { 385 Writable[] params = new Writable[hits.length]; 386 InetSocketAddress [] addrs = new InetSocketAddress [hits.length]; 387 for (int i = 0; i < hits.length; i++) { 388 params[i] = new Param(OP_DETAILS, hits[i]); 389 addrs[i] = addresses[hits[i].getIndexNo()]; 390 } 391 Writable[] writables = call(params, addrs); 392 HitDetails[] results = new HitDetails[writables.length]; 393 for (int i = 0; i < results.length; i++) { 394 results[i] = (HitDetails)((Result)writables[i]).value; 395 } 396 return results; 397 } 398 399 400 public String getSummary(HitDetails hit, Query query) throws IOException { 401 Param param = new Param(OP_SUMMARY, hit, query); 402 InetSocketAddress address = 403 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 404 Result result = (Result)call(param, address); 405 return result.value.toString(); 406 } 407 408 public String [] getSummary(HitDetails[] hits, Query query) 409 throws IOException { 410 Writable[] params = new Writable[hits.length]; 411 InetSocketAddress [] addrs = new InetSocketAddress [hits.length]; 412 for (int i = 0; i < hits.length; i++) { 413 HitDetails hit = hits[i]; 414 params[i] = new Param(OP_SUMMARY, hit, query); 415 addrs[i] = 416 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 417 } 418 Writable[] results = call(params, addrs); 419 String [] strings = new String [results.length]; 420 for (int i = 0; i < results.length; i++) { 421 if (results[i] != null) 422 strings[i] = ((Result)results[i]).value.toString(); 423 } 424 return strings; 425 } 426 427 public byte[] getContent(HitDetails hit) throws IOException { 428 Param param = new Param(OP_CONTENT, hit); 429 InetSocketAddress address = 430 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 431 Result result = (Result)call(param, address); 432 return ((BytesWritable)result.value).get(); 433 } 434 435 public ParseData getParseData(HitDetails hit) throws IOException { 436 Param param = new Param(OP_PARSEDATA, hit); 437 InetSocketAddress address = 438 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 439 Result result = (Result)call(param, address); 440 return (ParseData)result.value; 441 } 442 443 public ParseText getParseText(HitDetails hit) throws IOException { 444 Param param = new Param(OP_PARSETEXT, hit); 445 InetSocketAddress address = 446 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 447 Result result = (Result)call(param, address); 448 return (ParseText)result.value; 449 } 450 451 public String [] getAnchors(HitDetails hit) throws IOException { 452 Param param = new Param(OP_ANCHORS, hit); 453 InetSocketAddress address = 454 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 455 Result result = (Result)call(param, address); 456 return ((ArrayWritable)result.value).toStrings(); 457 } 458 459 public long getFetchDate(HitDetails hit) throws IOException { 460 Param param = new Param(OP_FETCHDATE, hit); 461 InetSocketAddress address = 462 (InetSocketAddress )segmentToAddress.get(hit.getValue("segment")); 463 Result result = (Result)call(param, address); 464 return ((LongWritable)result.value).get(); 465 } 466 467 public static void main(String [] args) throws Exception { 468 String usage = "DistributedSearch$Client query <host> <port> ..."; 469 470 if (args.length == 0) { 471 System.err.println(usage); 472 System.exit(-1); 473 } 474 475 Query query = Query.parse(args[0]); 476 477 InetSocketAddress [] addresses = new InetSocketAddress [(args.length-1)/2]; 478 for (int i = 0; i < (args.length-1)/2; i++) { 479 addresses[i] = 480 new InetSocketAddress (args[i*2+1], Integer.parseInt(args[i*2+2])); 481 } 482 483 Client client = new Client(addresses); 484 486 Hits hits = client.search(query, 10); 487 System.out.println("Total hits: " + hits.getTotal()); 488 for (int i = 0; i < hits.getLength(); i++) { 489 System.out.println(" "+i+" "+ client.getDetails(hits.getHit(i))); 490 } 491 492 } 493 494 495 } 496 497 } 498 | Popular Tags |