1 16 package org.outerj.daisy.ftindex; 17 18 import org.outerj.daisy.repository.*; 19 import org.outerj.daisy.repository.query.QueryManager; 20 import org.outerj.daisy.textextraction.TextExtractor; 21 import org.outerj.daisy.jms.JmsClient; 22 import org.outerj.daisy.jms.Sender; 23 import org.outerj.daisy.xmlutil.LocalSAXParserFactory; 24 import org.outerx.daisy.x10.*; 25 import org.apache.avalon.framework.service.ServiceManager; 26 import org.apache.avalon.framework.service.ServiceException; 27 import org.apache.avalon.framework.service.Serviceable; 28 import org.apache.avalon.framework.configuration.Configurable; 29 import org.apache.avalon.framework.configuration.Configuration; 30 import org.apache.avalon.framework.configuration.ConfigurationException; 31 import org.apache.avalon.framework.activity.Initializable; 32 import org.apache.avalon.framework.activity.Disposable; 33 import org.apache.avalon.framework.logger.AbstractLogEnabled; 34 import org.apache.xmlbeans.XmlOptions; 35 36 import javax.jms.*; 37 import javax.management.MBeanServer ; 38 import javax.management.ObjectName ; 39 import java.io.*; 40 import java.nio.charset.Charset ; 41 import java.util.Locale ; 42 43 51 public class FullTextIndexUpdater extends AbstractLogEnabled implements Configurable, Serviceable, Initializable, 52 FullTextIndexUpdaterMBean, Disposable { 53 private ServiceManager serviceManager; 54 private String subscriptionName; 55 private EventListener eventListener = new EventListener(); 56 private MessageListener fullTextQueueListener = new FullTextQueueListener(); 57 private RepositoryManager repositoryManager; 58 private Repository repository; 59 private TextExtractor textExtractor; 60 private String repoUser; 61 private String repoPassword; 62 private FullTextIndex fullTextIndex; 63 private File logExtractedTextFile; 64 private String jmsTopicName; 65 private String jmsQueueName; 66 private Sender fullTextQueueSender; 67 private JmsClient jmsClient; 68 private long dataMaxSize; 69 70 private Repository getRepository() { 71 return repository; 72 } 73 74 public void configure(Configuration configuration) throws ConfigurationException { 75 this.jmsTopicName = configuration.getChild("jmsTopic").getValue(); 76 this.jmsQueueName = configuration.getChild("jmsQueue").getValue(); 77 78 Configuration userConf = configuration.getChild("repositoryUser", false); 79 if (userConf == null) 80 throw new ConfigurationException("repositoryUser configuration element missing"); 81 repoUser = userConf.getAttribute("login"); 82 repoPassword = userConf.getAttribute("password"); 83 subscriptionName = configuration.getChild("jmsSubscriptionName").getValue(); 84 85 Configuration logExtractTextConf = configuration.getChild("logExtractedText", false); 86 if (logExtractTextConf != null) { 87 String fileName = logExtractTextConf.getAttribute("file"); 88 logExtractedTextFile = new File(fileName); 89 } 90 91 dataMaxSize = configuration.getChild("dataMaxSize").getValueAsLong(); 92 } 93 94 101 public void service(ServiceManager serviceManager) throws ServiceException { 102 this.serviceManager = serviceManager; 103 this.textExtractor = (TextExtractor)serviceManager.lookup("textextractor"); 104 this.repositoryManager = (RepositoryManager)serviceManager.lookup("repository-manager"); 105 this.fullTextIndex = (FullTextIndex)serviceManager.lookup("fulltextindex"); 106 this.jmsClient = (JmsClient)serviceManager.lookup("jmsclient"); 107 } 108 109 public void initialize() throws Exception { 110 repository = repositoryManager.getRepository(new Credentials(repoUser, repoPassword)); 111 fullTextQueueSender = jmsClient.getSender(jmsQueueName); 112 jmsClient.registerDurableTopicListener(jmsTopicName, subscriptionName, eventListener); 113 jmsClient.registerListener(jmsQueueName, fullTextQueueListener); 114 115 MBeanServer mbeanServer = (MBeanServer )serviceManager.lookup("mbeanserver"); 116 try { 117 mbeanServer.registerMBean(this, new ObjectName ("Daisy:name=FullTextIndexUpdater")); 118 } finally { 119 serviceManager.release(mbeanServer); 120 } 121 } 122 123 public void dispose() { 124 serviceManager.release(textExtractor); 125 serviceManager.release(repositoryManager); 126 serviceManager.release(fullTextIndex); 127 serviceManager.release(jmsClient); 128 } 129 130 131 public void reIndexDocuments(String query) throws Exception { 132 QueryManager queryManager = repository.getQueryManager(); 133 VariantKey[] allVariantKeys = queryManager.performQueryReturnKeys(query, Locale.US); 134 135 for (int i = 0; i < allVariantKeys.length; i++) { 136 scheduleIndexRequest(allVariantKeys[i].getDocumentId(), allVariantKeys[i].getBranchId(), allVariantKeys[i].getLanguageId()); 137 } 138 } 139 140 public void reIndexAllDocuments() throws Exception { 141 reIndexDocuments("select id where true"); 142 } 143 144 148 private class EventListener implements MessageListener { 149 public void onMessage(Message aMessage) { 150 try { 151 TextMessage message = (TextMessage)aMessage; 152 String messageType = message.getStringProperty("type"); 153 154 long documentId = -1; 155 long branchId = -1; 156 long languageId = -1; 157 158 if (messageType.equals("DocumentVariantUpdated")) { 159 XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader()); 160 DocumentVariantUpdatedDocument variantUpdatedDocument = DocumentVariantUpdatedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions); 161 long oldLastVersionId = variantUpdatedDocument.getDocumentVariantUpdated().getOldDocumentVariant().getDocument().getLastVersionId(); 162 long newLastVersionId = variantUpdatedDocument.getDocumentVariantUpdated().getNewDocumentVariant().getDocument().getLastVersionId(); 163 if (oldLastVersionId != newLastVersionId) { 164 DocumentDocument.Document documentXml = variantUpdatedDocument.getDocumentVariantUpdated().getNewDocumentVariant().getDocument(); 165 documentId = documentXml.getId(); 166 branchId = documentXml.getBranchId(); 167 languageId = documentXml.getLanguageId(); 168 } 169 } else if (messageType.equals("DocumentVariantCreated")) { 170 XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader()); 171 DocumentVariantCreatedDocument variantCreatedDocument = DocumentVariantCreatedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions); 172 DocumentDocument.Document documentXml = variantCreatedDocument.getDocumentVariantCreated().getNewDocumentVariant().getDocument(); 173 documentId = documentXml.getId(); 174 branchId = documentXml.getBranchId(); 175 languageId = documentXml.getLanguageId(); 176 } else if (messageType.equals("DocumentVariantDeleted")) { 177 XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader()); 178 DocumentVariantDeletedDocument variantDeletedDocument = DocumentVariantDeletedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions); 179 DocumentDocument.Document documentXml = variantDeletedDocument.getDocumentVariantDeleted().getDeletedDocumentVariant().getDocument(); 180 documentId = documentXml.getId(); 181 branchId = documentXml.getBranchId(); 182 languageId = documentXml.getLanguageId(); 183 } else if (messageType.equals("VersionStateChanged")) { 184 XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader()); 185 VersionStateChangedDocument.VersionStateChanged versionStateChanged = VersionStateChangedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions).getVersionStateChanged(); 186 if (versionStateChanged.getLiveVersionId() != versionStateChanged.getLiveVersionId()) { 187 documentId = versionStateChanged.getDocumentId(); 188 branchId = versionStateChanged.getBranchId(); 189 languageId = versionStateChanged.getLanguageId(); 190 } 191 } 192 193 if (getLogger().isDebugEnabled()) 194 getLogger().debug("Received a message from JMS of type " + messageType + " and decided to " + (documentId == -1 ? "do nothing." : "queue an index update request for document " + documentId + ", branch " + branchId + ", language " + languageId)); 195 196 if (documentId != -1) { 197 scheduleIndexRequest(documentId, branchId, languageId); 198 } 199 } catch (Throwable e) { 200 getLogger().error("Error processing JMS message.", e); 201 } 202 } 203 } 204 205 private void scheduleIndexRequest(long documentId, long branchId, long languageId) throws Exception { 206 MapMessage fullTextTaskMessage = fullTextQueueSender.createMapMessage(); 207 fullTextTaskMessage.setLong("documentId", documentId); 208 fullTextTaskMessage.setLong("branchId", branchId); 209 fullTextTaskMessage.setLong("languageId", languageId); 210 fullTextQueueSender.send(fullTextTaskMessage); 211 } 212 213 private class FullTextQueueListener implements MessageListener { 214 public void onMessage(Message aMessage) { 215 try { 216 MapMessage message = (MapMessage)aMessage; 217 long documentId = message.getLong("documentId"); 218 long branchId = message.getLong("branchId"); 219 long languageId = message.getLong("languageId"); 220 221 Version version = null; 222 try { 223 org.outerj.daisy.repository.Document document = getRepository().getDocument(documentId, branchId, languageId, false); 224 version = document.getLiveVersion(); 225 } catch (DocumentNotFoundException e) { 226 } catch (DocumentVariantNotFoundException e) { 228 } 230 if (version != null) { 231 StringBuffer content = new StringBuffer (); 233 Part[] parts = version.getParts().getArray(); 234 for (int p = 0; p < parts.length; p++) { 235 Part part = parts[p]; 236 String text = null; 237 238 if (part.getSize() > dataMaxSize) { 239 getLogger().info("Will not extract text from document ID " + documentId + ", branch ID " + branchId + ", language ID " + languageId + ", part ID " + part.getTypeId() + " because the part data is too large (" + part.getSize() + " > " + dataMaxSize + ")"); 240 } else if (textExtractor.supportsMimeType(part.getMimeType())) { 241 try { 242 if (getLogger().isDebugEnabled()) 245 getLogger().debug("Before calling textextractor for document " + documentId + ", branch " + branchId + ", language " + languageId + ", part type " + part.getTypeId()); 246 247 text = textExtractor.getText(part.getMimeType(), part.getDataStream()); 248 249 if (getLogger().isDebugEnabled()) 250 getLogger().debug("After calling textextractor for document " + documentId + ", branch " + branchId + ", language " + languageId + ", part type " + part.getTypeId()); 251 252 logExtractedText(text, documentId, branchId, languageId, part); 253 } catch (Throwable e) { 254 getLogger().error("Error extracting text from part data (document ID: " + documentId + ", branch ID: " + branchId + ", language ID: " + languageId + ", part ID: " + part.getTypeId() + ")", e); 255 continue; 256 } 257 } 258 259 if (text != null) { 260 content.append(text).append(" "); 261 } else { 262 if (getLogger().isDebugEnabled()) 263 getLogger().debug("Textextractor wasn't able to extract anything for document " + documentId + ", branch " + branchId + ", language " + languageId + ", part type " + part.getTypeId() + " (mimetype = " + part.getMimeType() + ")"); 264 } 265 } 266 267 StringBuffer fieldsContent = new StringBuffer (); 269 Field[] fields = version.getFields().getArray(); 270 for (int i = 0; i < fields.length; i++) { 271 if (fields[i].getValueType() == ValueType.STRING) { 272 Object [] values = fields[i].isMultiValue() ? (Object [])fields[i].getValue() : new Object [] {fields[i].getValue()}; 273 for (int k = 0; k < values.length; k++) { 274 fieldsContent.append(values[k]); 275 fieldsContent.append(' '); 276 } 277 } 278 } 279 280 fullTextIndex.index(documentId, branchId, languageId, version.getDocumentName(), content.toString(), fieldsContent.toString()); 281 } else { 282 if (getLogger().isDebugEnabled()) 283 getLogger().debug("Document " + documentId + " is deleted or has no version marked as published, will remove fulltext index (if any exists)."); 284 fullTextIndex.index(documentId, branchId, languageId, null, null, null); 285 } 286 } catch (Throwable e) { 287 getLogger().error("Error processing JMS message.", e); 288 } 289 } 290 } 291 292 private void logExtractedText(String text, long documentId, long branchId, long languageId, Part part) { 293 if (logExtractedTextFile != null) { 294 Writer writer = null; 295 try { 296 FileOutputStream fos = new FileOutputStream(logExtractedTextFile, true); 297 writer = new OutputStreamWriter(fos, Charset.forName("UTF-8")); 298 writer.write("\n\n********************************************************************************************\n"); 299 writer.write("Document " + documentId + ", branch " + branchId + ", language " + languageId + " -- part type " + part.getTypeId() + " -- mimetype " + part.getMimeType() + "\n"); 300 writer.write(text); 301 } catch (Throwable e) { 302 getLogger().error("Error logging extracted text to " + logExtractedTextFile.getAbsolutePath(), e); 303 } finally { 304 if (writer != null) { 305 try { 306 writer.close(); 307 } catch (Throwable e) { 308 } 310 } 311 } 312 } 313 } 314 } 315 | Popular Tags |