1 23 package org.archive.crawler.frontier; 24 25 import java.io.IOException ; 26 import java.io.Serializable ; 27 import java.io.UnsupportedEncodingException ; 28 import java.util.logging.Level ; 29 import java.util.logging.Logger ; 30 31 import org.archive.crawler.datamodel.CrawlURI; 32 import org.archive.util.ArchiveUtils; 33 import org.archive.util.IoUtils; 34 35 import com.sleepycat.je.DatabaseEntry; 36 import com.sleepycat.je.DatabaseException; 37 38 39 43 public class BdbWorkQueue extends WorkQueue 44 implements Comparable , Serializable { 45 private static Logger LOGGER = 46 Logger.getLogger(BdbWorkQueue.class.getName()); 47 48 private static final long serialVersionUID = ArchiveUtils 50 .classnameBasedUID(BdbWorkQueue.class, 1); 51 52 56 private byte[] origin; 57 58 63 public BdbWorkQueue(String classKey, BdbFrontier frontier) { 64 super(classKey); 65 this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey); 66 if (LOGGER.isLoggable(Level.FINE)) { 67 LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey); 68 } 69 frontier.getWorkQueues().addCap(origin); 72 } 73 74 protected long deleteMatchingFromQueue(final WorkQueueFrontier frontier, 75 final String match) throws IOException { 76 try { 77 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier) 78 .getWorkQueues(); 79 return queues.deleteMatchingFromQueue(match, classKey, 80 new DatabaseEntry(origin)); 81 } catch (DatabaseException e) { 82 throw IoUtils.wrapAsIOException(e); 83 } 84 } 85 86 protected void deleteItem(final WorkQueueFrontier frontier, 87 final CrawlURI peekItem) throws IOException { 88 try { 89 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier) 90 .getWorkQueues(); 91 queues.delete(peekItem); 92 } catch (DatabaseException e) { 93 e.printStackTrace(); 94 throw IoUtils.wrapAsIOException(e); 95 } 96 } 97 98 protected CrawlURI peekItem(final WorkQueueFrontier frontier) 99 throws IOException { 100 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier) 101 .getWorkQueues(); 102 DatabaseEntry key = new DatabaseEntry(origin); 103 CrawlURI curi = null; 104 int tries = 1; 105 while(true) { 106 try { 107 curi = queues.get(key); 108 } catch (DatabaseException e) { 109 LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e); 110 } 111 112 if(!ArchiveUtils.startsWith(key.getData(),origin)) { 114 LOGGER.severe( 115 "inconsistency: "+classKey+"("+ 116 getPrefixClassKey(origin)+") with " + getCount() + " items gave " 117 + curi +"("+getPrefixClassKey(key.getData())); 118 curi = null; 120 key.setData(origin); 122 } 123 124 if (curi!=null) { 125 break; 127 } 128 129 if (tries>3) { 130 LOGGER.severe("no item where expected in queue "+classKey); 131 break; 132 } 133 tries++; 134 LOGGER.severe("Trying get #" + Integer.toString(tries) 135 + " in queue " + classKey + " with " + getCount() 136 + " items using key " 137 + getPrefixClassKey(key.getData())); 138 } 139 140 return curi; 141 } 142 143 protected void insertItem(final WorkQueueFrontier frontier, 144 final CrawlURI curi) throws IOException { 145 try { 146 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier) 147 .getWorkQueues(); 148 queues.put(curi); 149 if (LOGGER.isLoggable(Level.FINE)) { 150 LOGGER.fine("Inserted into " + getPrefixClassKey(this.origin) + 151 " (count " + Long.toString(getCount())+ "): " + 152 curi.toString()); 153 } 154 } catch (DatabaseException e) { 155 throw IoUtils.wrapAsIOException(e); 156 } 157 } 158 159 164 protected static String getPrefixClassKey(final byte [] byteArray) { 165 int zeroIndex = 0; 166 while(byteArray[zeroIndex]!=0) { 167 zeroIndex++; 168 } 169 try { 170 return new String (byteArray,0,zeroIndex,"UTF-8"); 171 } catch (UnsupportedEncodingException e) { 172 e.printStackTrace(); 174 return e.getMessage(); 175 } 176 } 177 } | Popular Tags |