1 5 package org.exoplatform.services.indexing.impl; 6 7 import java.util.* ; 8 import java.io.File ; 9 import org.apache.lucene.analysis.Analyzer; 10 import org.apache.lucene.document.Document; 11 import org.apache.lucene.index.Term; 12 import org.apache.lucene.index.IndexWriter; 13 import org.apache.lucene.index.IndexReader; 14 import org.apache.lucene.search.IndexSearcher; 15 import org.exoplatform.container.configuration.*; 16 import org.exoplatform.container.configuration.ConfigurationManager; 17 import org.apache.commons.logging.Log ; 18 import org.exoplatform.container.PortalContainer ; 19 import org.exoplatform.services.indexing.*; 20 import org.exoplatform.services.log.LogService; 21 import org.exoplatform.services.task.TaskService ; 22 import org.exoplatform.services.task.Task ; 23 import org.picocontainer.Startable; 24 25 32 public class IndexingServiceImpl implements IndexingService , Startable, Task { 33 final static private String UPDATE_ACTION = "update" ; 34 final static private String ADD_ACTION = "add" ; 35 final static private String DELETE_ACTION = "delete" ; 36 37 38 private Analyzer analyzer_ ; 39 private Map plugins_ ; 40 private List queues_ ; 41 private String indexDBLocation_ ; 42 private Searcher searcher_ ; 43 private Log log_ ; 44 private int counter_ ; 45 46 public IndexingServiceImpl(ConfigurationManager confService, 47 LogService lservice, 48 TaskService tservice, 49 Analyzer analyzer) throws Exception { 50 log_ = lservice.getLog(getClass()) ; 51 analyzer_ = analyzer ; 52 plugins_ = new HashMap() ; 53 queues_ = new ArrayList(200) ; 54 ServiceConfiguration sconf = confService.getServiceConfiguration(IndexingService.class) ; 55 ValueParam param = (ValueParam)sconf.getParameter("index.database.dir") ; 56 indexDBLocation_ = param.getValue() ; 57 if(confService.isDefault(indexDBLocation_)) { 58 indexDBLocation_ = System.getProperty("java.io.tmpdir") + "/lucenedb" ; 59 } 60 File dir = new File (indexDBLocation_) ; 61 if(dir.isFile()) { 62 throw new Exception ("Expect a directory, but " + indexDBLocation_ + " is a file") ; 63 } 64 65 if(!dir.exists()) dir.mkdir() ; 66 File segments = new File (indexDBLocation_ + "/segments") ; 67 if(!segments.exists()) { 68 IndexWriter writer = new IndexWriter(indexDBLocation_ , analyzer_, true); 69 writer.optimize(); 70 writer.close(); 71 } 72 tservice.queueRepeatTask(this) ; 73 } 74 75 public Analyzer getAnalyzer() { return analyzer_ ; } 76 77 public String getIndexDatabaseLocation() { return indexDBLocation_ ; } 78 79 public Collection getIndexerPlugins() throws Exception { 80 return plugins_.values() ; 81 } 82 83 public void addIndexerPlugin(IndexerPlugin plugin) { 84 plugins_.put(plugin.getPluginIdentifier(), plugin) ; 85 } 86 87 public IndexerPlugin getIndexerPlugin(String identifier) throws Exception { 88 IndexerPlugin plugin = (IndexerPlugin) plugins_.get(identifier) ; 89 if(plugin == null) throw new Exception ("Cannot find the plugin: " + identifier) ; 90 return plugin ; 91 } 92 93 synchronized public Searcher getSearcher() throws Exception { 94 if(searcher_ == null) { 95 IndexSearcher isearcher = new IndexSearcher(getIndexDatabaseLocation()) ; 96 searcher_ = new Searcher(isearcher, analyzer_); 97 } 98 return searcher_ ; 99 } 100 101 synchronized public void queueUpdateDocument(Document document) throws Exception { 102 Term deleteQuery = 103 new Term(IDENTIFIER_FIELD, document.getField(IDENTIFIER_FIELD).stringValue()); 104 queues_.add(new Command(UPDATE_ACTION, deleteQuery, document)) ; 105 activateIndexerThread() ; 106 } 107 108 synchronized public void queueUpdateDocuments(List documents) throws Exception { 109 for(int i = 0 ; i < documents.size(); i++) { 110 Document document = (Document) documents.get(i) ; 111 Term deleteQuery = 112 new Term(IDENTIFIER_FIELD, document.getField(IDENTIFIER_FIELD).stringValue()); 113 queues_.add(new Command(UPDATE_ACTION, deleteQuery, document)) ; 114 } 115 activateIndexerThread() ; 116 } 117 118 synchronized public void queueIndexDocument(Document document) throws Exception { 119 queues_.add(new Command(ADD_ACTION, null, document)) ; 120 activateIndexerThread() ; 121 } 122 123 synchronized public void queueIndexDocuments(List documents) throws Exception { 124 for(int i = 0 ; i < documents.size(); i++) { 125 Document document = (Document) documents.get(i) ; 126 queues_.add(new Command(ADD_ACTION, null, document)) ; 127 } 128 activateIndexerThread() ; 129 } 130 131 synchronized public void queueDeleteDocuments(Term queryTerm) throws Exception { 132 queues_.add(new Command(DELETE_ACTION, queryTerm, null)) ; 133 activateIndexerThread() ; 134 } 135 136 synchronized private List dequeue() throws Exception { 137 List tmp = queues_ ; 138 queues_ = new ArrayList(200) ; 139 return tmp ; 140 } 141 142 private void activateIndexerThread() { 143 } 144 145 private void runBatchCommand(List commands) throws Exception { 146 if(commands.size() == 0) return ; 147 IndexReader reader = null ; 148 for(int i = 0; i < commands.size(); i++) { 150 Command entry = (Command) commands.get(i) ; 151 if(entry.command_ == UPDATE_ACTION || entry.command_ == DELETE_ACTION) { 152 if(reader == null) { 153 reader = IndexReader.open(indexDBLocation_); 154 } 155 reader.delete(entry.deleteQuery_) ; 156 } 157 } 158 if(reader != null) reader.close() ; 159 IndexWriter writer = new IndexWriter(indexDBLocation_ , analyzer_, false); 161 for(int i = 0; i < commands.size(); i++) { 162 Command entry = (Command) commands.get(i) ; 163 if(entry.command_ == UPDATE_ACTION || entry.command_ == ADD_ACTION) { 164 writer.addDocument(entry.document_); 165 } 166 counter_++ ; 167 } 168 writer.optimize(); 169 writer.close(); 170 } 171 172 public void optimizeDatabase() throws Exception { 173 IndexWriter writer = new IndexWriter(indexDBLocation_ , analyzer_, false); 174 writer.optimize(); 175 writer.close(); 176 } 177 178 public void execute() { 179 try { 180 List queues = dequeue() ; 181 while(queues.size() > 0) { 182 runBatchCommand(queues) ; 183 queues = dequeue() ; 184 } 185 Iterator i = plugins_.values().iterator() ; 186 while(i.hasNext()) { 188 IndexerPlugin plugin = (IndexerPlugin) i.next() ; 189 plugin.resetSearcher() ; 190 } 191 searcher_ = null ; 192 } catch (Exception ex) { 193 log_.error("Error while indexing the new document entries: ", ex) ; 194 } 195 } 196 197 public String getName() { return "IndexingServiceImpl" ; } 198 public String getDescription() { 199 return "index the data, there are " + queues_.size() + " in the queue"; 200 } 201 202 public PortalContainer getPortalContainer() { return null ; } 203 204 public void start() { } 205 public void stop() { } 206 207 private class Command { 208 String command_ ; 209 Term deleteQuery_ ; 210 Document document_ ; 211 212 public Command(String action, Term deleteQuery, Document doc) { 213 command_ = action ; 214 deleteQuery_ = deleteQuery ; 215 document_ = doc ; 216 } 217 } 218 } 219 | Popular Tags |