1 23 package org.archive.crawler.frontier; 24 25 import java.io.IOException ; 26 import java.util.logging.Level ; 27 import java.util.logging.Logger ; 28 29 import org.archive.crawler.datamodel.CandidateURI; 30 import org.archive.crawler.datamodel.CrawlSubstats; 31 import org.archive.crawler.datamodel.CrawlURI; 32 import org.archive.crawler.framework.Frontier.FrontierGroup; 33 import org.archive.util.ArchiveUtils; 34 35 import com.sleepycat.bind.EntryBinding; 36 import com.sleepycat.bind.serial.ClassCatalog; 37 import com.sleepycat.bind.serial.SerialBinding; 38 import com.sleepycat.bind.serial.StoredClassCatalog; 39 import com.sleepycat.bind.serial.TupleSerialKeyCreator; 40 import com.sleepycat.bind.tuple.StringBinding; 41 import com.sleepycat.bind.tuple.TupleBinding; 42 import com.sleepycat.bind.tuple.TupleInput; 43 import com.sleepycat.bind.tuple.TupleOutput; 44 import com.sleepycat.je.Cursor; 45 import com.sleepycat.je.Database; 46 import com.sleepycat.je.DatabaseConfig; 47 import com.sleepycat.je.DatabaseEntry; 48 import com.sleepycat.je.DatabaseException; 49 import com.sleepycat.je.Environment; 50 import com.sleepycat.je.LockMode; 51 import com.sleepycat.je.OperationStatus; 52 import com.sleepycat.je.SecondaryConfig; 53 import com.sleepycat.je.SecondaryDatabase; 54 55 74 public class AdaptiveRevisitHostQueue 75 implements AdaptiveRevisitAttributeConstants, FrontierGroup { 76 77 79 83 public static final int HQSTATE_EMPTY = 0; 84 85 public static final int HQSTATE_READY = 1; 86 90 public static final int HQSTATE_BUSY = 2; 91 92 public static final int HQSTATE_SNOOZED = 3; 93 94 96 final String hostName; 97 99 int state; 100 104 long nextReadyTime; 105 117 long[] wakeUpTime; 118 121 int valence; 122 126 long size; 127 130 long inProcessing; 131 135 private AdaptiveRevisitQueueList owner; 136 137 private static final Logger logger = 138 Logger.getLogger(AdaptiveRevisitHostQueue.class.getName()); 139 140 protected CrawlSubstats substats = new CrawlSubstats(); 141 142 146 protected Database primaryUriDB; 147 149 protected SecondaryDatabase secondaryUriDB; 150 151 protected Database processingUriDB; 152 154 protected StoredClassCatalog classCatalog; 155 156 protected EntryBinding primaryKeyBinding; 157 158 protected EntryBinding crawlURIBinding; 159 161 162 163 182 public AdaptiveRevisitHostQueue(String hostName, Environment env, 183 StoredClassCatalog catalog, int valence) 184 throws IOException { 185 try{ 186 if(valence < 1) { 187 this.valence = 1; 188 } else { 189 this.valence = valence; 190 } 191 wakeUpTime = new long[valence]; 192 for(int i = 0 ; i < valence ; i++){ 193 wakeUpTime[i]=0; } 195 196 inProcessing = 0; 197 198 this.hostName = hostName; 199 200 state = HQSTATE_EMPTY; nextReadyTime = Long.MAX_VALUE; 203 DatabaseConfig dbConfig = new DatabaseConfig(); 205 dbConfig.setTransactional(false); 206 dbConfig.setAllowCreate(true); 207 primaryUriDB = env.openDatabase(null, hostName, dbConfig); 208 209 this.classCatalog = catalog; 210 211 DatabaseConfig dbConfig2 = new DatabaseConfig(); 213 dbConfig2.setTransactional(false); 214 dbConfig2.setAllowCreate(true); 215 processingUriDB = env.openDatabase(null, 216 hostName + "/processing", dbConfig2); 217 218 primaryKeyBinding = TupleBinding.getPrimitiveBinding(String .class); 220 crawlURIBinding = new SerialBinding(classCatalog, CrawlURI.class); 222 223 SecondaryConfig secConfig = new SecondaryConfig(); 226 secConfig.setAllowCreate(true); 227 secConfig.setSortedDuplicates(true); 228 secConfig.setKeyCreator( 229 new OrderOfProcessingKeyCreator(classCatalog,CrawlURI.class)); 230 secondaryUriDB = env.openSecondaryDatabase(null, 231 hostName+"/timeOfProcessing", primaryUriDB, secConfig); 232 233 size = countCrawlURIs(); 235 if (size > 0) { 236 nextReadyTime = peek().getLong( 239 A_TIME_OF_NEXT_PROCESSING); 240 flushProcessingURIs(); 243 state = HQSTATE_READY; 244 } 245 } catch (DatabaseException e) { 246 IOException e2 = new IOException (e.getMessage()); 248 e2.setStackTrace(e.getStackTrace()); 249 throw e2; 250 } 251 } 252 253 257 public String getHostName() { 258 return hostName; 259 } 260 261 277 public void add(CrawlURI curi, boolean overrideSetTimeOnDups) 278 throws IOException { 279 if(logger.isLoggable(Level.FINER)){ 280 logger.finer("Adding " + curi.toString()); 281 } 282 try{ 283 if(inProcessing(curi.toString())){ 284 return; 287 } 288 289 OperationStatus opStatus = strictAdd(curi,false); 290 291 long curiProcessingTime = curi.getLong( 292 A_TIME_OF_NEXT_PROCESSING); 293 294 if (opStatus == OperationStatus.KEYEXIST){ 295 boolean update = false; 301 CrawlURI curiExisting = getCrawlURI(curi.toString()); 302 long oldCuriProcessingTime = curiExisting.getLong( 303 A_TIME_OF_NEXT_PROCESSING); 304 if(curi.getSchedulingDirective() < 305 curiExisting.getSchedulingDirective()){ 306 curiExisting.setSchedulingDirective( 309 curi.getSchedulingDirective()); 310 update = true; 311 } 312 if( (curiProcessingTime < oldCuriProcessingTime) 313 && (overrideSetTimeOnDups || update)){ 314 curiExisting.putLong( 319 A_TIME_OF_NEXT_PROCESSING, 320 curiProcessingTime); 321 update = true; 322 } 323 if(update){ 324 opStatus = strictAdd(curiExisting,true); } else { 326 return; 327 } 328 } else if(opStatus == OperationStatus.SUCCESS) { 329 size++; 331 } 332 333 if(opStatus == OperationStatus.SUCCESS){ 336 if (curiProcessingTime < nextReadyTime){ 337 setNextReadyTime(curiProcessingTime); 339 } 340 if(state == HQSTATE_EMPTY){ 341 state = HQSTATE_READY; 343 } 344 } else { 345 throw new DatabaseException("Error on add into database for " + 347 "CrawlURI " + curi.toString() + ". " + 348 opStatus.toString()); 349 } 350 } catch (DatabaseException e) { 351 IOException e2 = new IOException (e.getMessage()); 353 e2.setStackTrace(e.getStackTrace()); throw e2; 355 } 356 reorder(); } 358 359 371 protected OperationStatus strictAdd(CrawlURI curi, 372 boolean overrideDuplicates) 373 throws DatabaseException{ 374 DatabaseEntry keyEntry = new DatabaseEntry(); 375 DatabaseEntry dataEntry = new DatabaseEntry(); 376 primaryKeyBinding.objectToEntry(curi.toString(), keyEntry); 377 crawlURIBinding.objectToEntry(curi, dataEntry); 378 OperationStatus opStatus = null; 379 if(overrideDuplicates){ 380 opStatus = primaryUriDB.put(null,keyEntry,dataEntry); 381 } else { 382 opStatus = primaryUriDB.putNoOverwrite(null,keyEntry,dataEntry); 383 } 384 385 return opStatus; 386 } 387 388 397 protected void flushProcessingURIs() throws DatabaseException { 398 Cursor processingCursor = processingUriDB.openCursor(null,null); 399 DatabaseEntry keyEntry = new DatabaseEntry(); 400 DatabaseEntry dataEntry = new DatabaseEntry(); 401 402 while(true){ 403 OperationStatus opStatus = processingCursor.getFirst( 404 keyEntry, dataEntry, LockMode.DEFAULT); 405 406 if(opStatus == OperationStatus.SUCCESS){ 407 CrawlURI curi = 409 (CrawlURI) crawlURIBinding.entryToObject(dataEntry); 410 deleteInProcessing(curi.toString()); 412 strictAdd(curi,false); long curiNextReadyTime = curi.getLong( 417 A_TIME_OF_NEXT_PROCESSING); 418 if(curiNextReadyTime<nextReadyTime){ 419 setNextReadyTime(curiNextReadyTime); 420 } 421 } else { 422 processingCursor.close(); 424 return; 425 } 426 } 427 } 428 429 440 protected long countCrawlURIs() throws DatabaseException{ 441 long count = 0; 444 445 DatabaseEntry keyEntry = new DatabaseEntry(); 446 DatabaseEntry dataEntry = new DatabaseEntry(); 447 448 Cursor primaryCursor = primaryUriDB.openCursor(null,null); 450 OperationStatus opStatus = primaryCursor.getFirst(keyEntry, 451 dataEntry, 452 LockMode.DEFAULT); 453 while(opStatus == OperationStatus.SUCCESS){ 454 count++; 455 opStatus = primaryCursor.getNext(keyEntry, 456 dataEntry, 457 LockMode.DEFAULT); 458 } 459 460 primaryCursor.close(); 461 462 Cursor processingCursor = processingUriDB.openCursor(null,null); 464 opStatus = processingCursor.getFirst(keyEntry, 465 dataEntry, 466 LockMode.DEFAULT); 467 while(opStatus == OperationStatus.SUCCESS){ 468 count++; 469 opStatus = processingCursor.getNext(keyEntry, 470 dataEntry, 471 LockMode.DEFAULT); 472 } 473 474 processingCursor.close(); 475 return count; 476 } 477 478 488 protected boolean inProcessing(String uri) throws DatabaseException{ 489 DatabaseEntry keyEntry = new DatabaseEntry(); 490 DatabaseEntry dataEntry = new DatabaseEntry(); 491 492 StringBinding.stringToEntry(uri,keyEntry); 493 494 OperationStatus opStatus = processingUriDB.get(null, 495 keyEntry, 496 dataEntry, 497 LockMode.DEFAULT); 498 499 if (opStatus == OperationStatus.SUCCESS){ 500 return true; 501 } 502 503 return false; } 505 506 517 protected void deleteInProcessing(String uri) throws DatabaseException { 518 DatabaseEntry keyEntry = new DatabaseEntry(); 519 520 StringBinding.stringToEntry(uri, keyEntry); 521 522 OperationStatus opStatus = processingUriDB.delete(null, keyEntry); 523 524 if (opStatus != OperationStatus.SUCCESS) { 525 if (opStatus == OperationStatus.NOTFOUND) { 526 throw new IllegalStateException ("Trying to deleta a " 527 + "non-existant URI from the list of URIs being " 528 + "processed. HQ: " + hostName + ", CrawlURI: " + uri); 529 } 530 throw new DatabaseException("Error occured deleting URI: " + uri 531 + " from HQ " + hostName + " list " 532 + "of URIs currently being processed. " 533 + opStatus.toString()); 534 } 535 } 536 537 548 protected void addInProcessing(CrawlURI curi) throws DatabaseException, 549 IllegalStateException { 550 DatabaseEntry keyEntry = new DatabaseEntry(); 551 DatabaseEntry dataEntry = new DatabaseEntry(); 552 553 StringBinding.stringToEntry(curi.toString(), keyEntry); 554 crawlURIBinding.objectToEntry(curi, dataEntry); 555 556 OperationStatus opStatus = processingUriDB.putNoOverwrite(null, 557 keyEntry, dataEntry); 558 559 if (opStatus != OperationStatus.SUCCESS) { 560 if (opStatus == OperationStatus.KEYEXIST) { 561 throw new IllegalStateException ("Can not insert duplicate " 562 + "URI into list of URIs being processed. " + "HQ: " 563 + hostName + ", CrawlURI: " + curi.toString()); 564 } 565 throw new DatabaseException("Error occured adding CrawlURI: " 566 + curi.toString() + " to HQ " + hostName + " list " 567 + "of URIs currently being processed. " 568 + opStatus.toString()); 569 } 570 } 571 572 586 protected CrawlURI getCrawlURI(String uri) throws DatabaseException{ 587 DatabaseEntry keyEntry = new DatabaseEntry(); 588 DatabaseEntry dataEntry = new DatabaseEntry(); 589 590 primaryKeyBinding.objectToEntry(uri,keyEntry); 591 primaryUriDB.get(null,keyEntry,dataEntry,LockMode.DEFAULT); 592 593 CrawlURI curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry); 594 595 return curi; 596 } 597 598 618 public void update(CrawlURI curi, 619 boolean needWait, 620 long wakeupTime) 621 throws IllegalStateException , IOException { 622 update(curi,needWait,wakeupTime,false); 623 } 624 625 626 647 public void update(CrawlURI curi, 648 boolean needWait, 649 long wakeupTime, 650 boolean forgetURI) 651 throws IllegalStateException , IOException { 652 if (logger.isLoggable(Level.FINE)) { 653 logger.fine("Updating " + curi.toString()); 654 } 655 try{ 656 if (forgetURI == false){ 658 OperationStatus opStatus = strictAdd(curi,false); 659 if(opStatus != OperationStatus.SUCCESS){ 660 if(opStatus == OperationStatus.KEYEXIST){ 661 throw new IllegalStateException ("Trying to update a" + 662 " CrawlURI failed because it was in the queue" + 663 " of URIs waiting for processing. URIs currently" + 664 " being processsed can never be in that queue." + 665 " HQ: " + hostName + ", CrawlURI: " + 666 curi.toString()); 667 } 668 } 669 670 long curiTimeOfNextProcessing = curi.getLong( 672 A_TIME_OF_NEXT_PROCESSING); 673 if(nextReadyTime > curiTimeOfNextProcessing){ 674 setNextReadyTime(curiTimeOfNextProcessing); 675 } 676 677 } else { 678 size--; 679 } 680 681 deleteInProcessing(curi.toString()); 683 684 inProcessing--; 685 686 if(needWait==false){ 688 wakeupTime = 0; 690 } 691 692 updateWakeUpTimeSlot(wakeupTime); 693 } catch (DatabaseException e) { 694 IOException e2 = new IOException (e.getMessage()); 696 e2.setStackTrace(e.getStackTrace()); throw e2; 698 } 699 } 700 701 714 public CrawlURI next() throws IllegalStateException , IOException { 715 try{ 716 if(getState()!=HQSTATE_READY || useWakeUpTimeSlot()==false){ 718 throw new IllegalStateException ("Can not issue next URI when " + 719 "HQ " + hostName + " state is " + getStateByName()); 720 } 721 722 DatabaseEntry keyEntry = new DatabaseEntry(); 723 724 CrawlURI curi = peek(); 726 727 addInProcessing(curi); 729 730 primaryKeyBinding.objectToEntry(curi.toString(),keyEntry); 732 OperationStatus opStatus = primaryUriDB.delete(null,keyEntry); 733 734 if(opStatus != OperationStatus.SUCCESS){ 735 throw new DatabaseException("Error occured removing URI: " + 736 curi.toString() + " from HQ " + hostName + 737 " priority queue for processing. " + opStatus.toString()); 738 } 739 740 CrawlURI top = peek(); 742 long nextReady = Long.MAX_VALUE; 743 if(top != null){ 744 nextReady = top.getLong( 745 A_TIME_OF_NEXT_PROCESSING); 746 } 747 inProcessing++; 748 setNextReadyTime(nextReady); 749 logger.fine("Issuing " + curi.toString()); 750 return curi; 751 } catch (DatabaseException e) { 752 IOException e2 = new IOException (e.getMessage()); 754 e2.setStackTrace(e.getStackTrace()); throw e2; 756 } 757 } 758 759 775 public CrawlURI peek() throws IllegalStateException , IOException { 776 try{ 777 778 DatabaseEntry keyEntry = new DatabaseEntry(); 779 DatabaseEntry dataEntry = new DatabaseEntry(); 780 781 CrawlURI curi = null; 782 Cursor secondaryCursor = secondaryUriDB.openCursor(null,null); 783 784 OperationStatus opStatus = 785 secondaryCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT); 786 787 if( opStatus == OperationStatus.SUCCESS){ 788 curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry); 789 } else { 790 if( opStatus == OperationStatus.NOTFOUND ){ 791 curi = null; 792 } else { 793 throw new IOException ("Error occured in " + 794 "AdaptiveRevisitHostQueue.peek()." + opStatus.toString()); 795 } 796 } 797 secondaryCursor.close(); 798 return curi; 799 } catch (DatabaseException e) { 800 IOException e2 = new IOException (e.getMessage()); 802 e2.setStackTrace(e.getStackTrace()); throw e2; 804 } 805 } 806 807 808 809 819 public int getState(){ 820 if(state != HQSTATE_EMPTY){ 821 if(isBusy()){ 823 state = HQSTATE_BUSY; 824 } else { 825 long currentTime = System.currentTimeMillis(); 826 long wakeTime = getEarliestWakeUpTimeSlot(); 827 828 if(wakeTime > currentTime || nextReadyTime > currentTime){ 829 state = HQSTATE_SNOOZED; 830 } else { 831 state = HQSTATE_READY; 832 } 833 } 834 } 835 return state; 836 } 837 838 859 public long getNextReadyTime(){ 860 if(getState()==HQSTATE_BUSY || getState()==HQSTATE_EMPTY){ 861 return Long.MAX_VALUE; 863 } 864 long wakeTime = getEarliestWakeUpTimeSlot(); 865 return nextReadyTime > wakeTime ? nextReadyTime : wakeTime; 866 } 867 868 872 protected void setNextReadyTime(long newTime){ 873 if(logger.isLoggable(Level.FINEST)){ 874 logger.finest("Setting next ready to new value " + newTime + 875 " from " + getNextReadyTime()); 876 } 877 nextReadyTime=newTime; 878 reorder(); 879 } 880 881 886 protected void reorder(){ 887 if(owner != null){ 888 owner.reorder(this); 889 } 890 } 891 892 893 902 public String getStateByName() { 903 switch(getState()){ 904 case HQSTATE_BUSY : return "busy"; 905 case HQSTATE_EMPTY : return "empty"; 906 case HQSTATE_READY : return "ready"; 907 case HQSTATE_SNOOZED : return "snoozed"; 908 } 909 return "undefined"; 912 } 913 914 920 public long getSize(){ 921 return size; 922 } 923 924 933 public void setOwner(AdaptiveRevisitQueueList owner) { 934 this.owner = owner; 935 } 936 937 944 public void close() throws IOException { 945 try{ 946 secondaryUriDB.close(); 947 processingUriDB.close(); 948 primaryUriDB.close(); 949 } catch (DatabaseException e) { 950 IOException e2 = new IOException (e.getMessage()); 952 e2.setStackTrace(e.getStackTrace()); throw e2; 954 } 955 } 956 957 958 965 private boolean isBusy(){ 966 return inProcessing == valence; 967 } 968 969 973 private void updateWakeUpTimeSlot(long newVal){ 974 for(int i=0 ; i < valence ; i++){ 975 if(wakeUpTime[i]==-1){ 976 wakeUpTime[i]=newVal; 977 } 978 } 979 reorder(); 980 } 981 982 987 private boolean useWakeUpTimeSlot(){ 988 for(int i=0 ; i < valence ; i++){ 989 if(wakeUpTime[i]>-1 && wakeUpTime[i]<=System.currentTimeMillis()){ 990 wakeUpTime[i]=-1; 991 return true; 992 } 993 } 994 reorder(); 995 return false; 996 } 997 998 1006 private long getEarliestWakeUpTimeSlot(){ 1007 long earliest = Long.MAX_VALUE; 1008 for(int i=0 ; i < valence ; i++){ 1009 if(wakeUpTime[i]>-1 && wakeUpTime[i]<earliest){ 1010 earliest = wakeUpTime[i]; 1011 } 1012 } 1013 return earliest; 1014 } 1015 1016 1021 public String report(int max){ 1022 try{ 1023 StringBuffer ret = new StringBuffer (256); 1024 ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n"); 1025 ret.append("Size: " + size + "\n"); 1026 ret.append("State: " + getStateByName() + "\n"); 1027 if(getState()==HQSTATE_BUSY){ 1028 ret.append("Processing URIs: \n"); 1029 Cursor processingCursor = processingUriDB.openCursor(null,null); 1030 reportURIs(ret, processingCursor, valence); 1031 processingCursor.close(); 1032 } else { 1033 ret.append("Next ready: " + 1034 ArchiveUtils.formatMillisecondsToConventional( 1035 getNextReadyTime() - System.currentTimeMillis()) + 1036 "\n"); 1037 } 1038 ret.append("Top URIs: \n"); 1039 1040 Cursor secondaryCursor = secondaryUriDB.openCursor(null,null); 1041 reportURIs(ret,secondaryCursor,max); 1042 secondaryCursor.close(); 1043 return ret.toString(); 1044 } catch( DatabaseException e ){ 1045 return "Exception occured compiling report:\n" + e.getMessage(); 1046 } 1047 } 1048 1049 1059 private void reportURIs(StringBuffer ret, Cursor cursor, int max) 1060 throws DatabaseException{ 1061 DatabaseEntry keyEntry = new DatabaseEntry(); 1062 DatabaseEntry dataEntry = new DatabaseEntry(); 1063 OperationStatus opStatus = 1064 cursor.getFirst(keyEntry,dataEntry,LockMode.DEFAULT); 1065 if(max == 0){ 1066 max = Integer.MAX_VALUE; 1068 } 1069 int i = 0; 1070 while(i<max && opStatus == OperationStatus.SUCCESS){ 1071 CrawlURI tmp = (CrawlURI)crawlURIBinding.entryToObject(dataEntry); 1072 ret.append(" URI: " + tmp.toString() + "\n"); 1073 switch(tmp.getSchedulingDirective()){ 1074 case CandidateURI.HIGHEST : 1075 ret.append(" Sched. directive: HIGHEST\n"); break; 1076 case CandidateURI.HIGH : 1077 ret.append(" Sched. directive: HIGH\n"); break; 1078 case CandidateURI.MEDIUM : 1079 ret.append(" Sched. directive: MEDIUM\n"); break; 1080 case CandidateURI.NORMAL : 1081 ret.append(" Sched. directive: NORMAL\n"); break; 1082 } 1083 ret.append(" Next processing: "); 1084 long nextProcessing = 1085 tmp.getLong(A_TIME_OF_NEXT_PROCESSING) - 1086 System.currentTimeMillis(); 1087 if(nextProcessing < 0){ 1088 ret.append("Overdue "); 1089 nextProcessing = nextProcessing*-1; 1090 } 1091 ret.append(ArchiveUtils.formatMillisecondsToConventional( 1092 nextProcessing) + "\n"); 1093 if(tmp.getFetchStatus()!=0){ 1094 ret.append(" Last fetch status: " + 1095 tmp.getFetchStatus() + "\n"); 1096 } 1097 if(tmp.containsKey(A_WAIT_INTERVAL)){ 1098 ret.append(" Wait interval: " + 1099 ArchiveUtils.formatMillisecondsToConventional( 1100 tmp.getLong(A_WAIT_INTERVAL)) + "\n"); 1101 } 1102 if(tmp.containsKey(A_NUMBER_OF_VISITS)){ 1103 ret.append(" Visits: " + tmp.getInt( 1104 A_NUMBER_OF_VISITS) + "\n"); 1105 } 1106 if(tmp.containsKey(A_NUMBER_OF_VERSIONS)){ 1107 ret.append(" Versions: " + tmp.getInt( 1108 A_NUMBER_OF_VERSIONS) + "\n"); 1109 } 1110 1111 opStatus = cursor.getNext(keyEntry,dataEntry,LockMode.DEFAULT); 1112 i++; 1113 } 1114 } 1115 1116 1124 private static class OrderOfProcessingKeyCreator 1125 extends TupleSerialKeyCreator { 1126 1127 1135 public OrderOfProcessingKeyCreator(ClassCatalog classCatalog, 1136 Class dataClass) { 1137 super(classCatalog, dataClass); 1138 } 1139 1140 1143 public boolean createSecondaryKey(TupleInput primaryKeyInput, 1144 Object dataInput, 1145 TupleOutput indexKeyOutput) { 1146 CrawlURI curi = (CrawlURI)dataInput; 1147 int directive = curi.getSchedulingDirective(); 1148 switch (directive) { 1151 case CandidateURI.HIGHEST: 1152 directive = 0; 1153 break; 1154 case CandidateURI.HIGH: 1155 directive = 1; 1156 break; 1157 case CandidateURI.MEDIUM: 1158 directive = 2; 1159 break; 1160 case CandidateURI.NORMAL: 1161 directive = 3; 1162 break; 1163 default: 1164 directive = 3; } 1166 1167 indexKeyOutput.writeInt(directive); 1168 long timeOfNextProcessing = 1169 curi.getLong(A_TIME_OF_NEXT_PROCESSING); 1170 1171 indexKeyOutput.writeLong(timeOfNextProcessing); 1172 return true; 1173 } 1174 } 1175 1176 1179 public CrawlSubstats getSubstats() { 1180 return substats; 1181 } 1182 1183} 1184 | Popular Tags |