KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > outerj > daisy > ftindex > FullTextIndexUpdater


1 /*
2  * Copyright 2004 Outerthought bvba and Schaubroeck nv
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.outerj.daisy.ftindex;
17
18 import org.outerj.daisy.repository.*;
19 import org.outerj.daisy.repository.query.QueryManager;
20 import org.outerj.daisy.textextraction.TextExtractor;
21 import org.outerj.daisy.jms.JmsClient;
22 import org.outerj.daisy.jms.Sender;
23 import org.outerj.daisy.xmlutil.LocalSAXParserFactory;
24 import org.outerx.daisy.x10.*;
25 import org.apache.avalon.framework.service.ServiceManager;
26 import org.apache.avalon.framework.service.ServiceException;
27 import org.apache.avalon.framework.service.Serviceable;
28 import org.apache.avalon.framework.configuration.Configurable;
29 import org.apache.avalon.framework.configuration.Configuration;
30 import org.apache.avalon.framework.configuration.ConfigurationException;
31 import org.apache.avalon.framework.activity.Initializable;
32 import org.apache.avalon.framework.activity.Disposable;
33 import org.apache.avalon.framework.logger.AbstractLogEnabled;
34 import org.apache.xmlbeans.XmlOptions;
35
36 import javax.jms.*;
37 import javax.management.MBeanServer JavaDoc;
38 import javax.management.ObjectName JavaDoc;
39 import java.io.*;
40 import java.nio.charset.Charset JavaDoc;
41 import java.util.Locale JavaDoc;
42
43 /**
44  * This component receives repository events from JMS, and if the event would influence
45  * the full text index, adds an event upon the fulltext task queue. Also, it processes
46  * the message on the fulltext task queue, by extracting indexable content from the documents
47  * and calling the {@link FullTextIndex} component to update its index for the document.
48  *
49  * @avalon.component version="1.0" name="fulltextindexupdater" lifestyle="singleton"
50  */

51 public class FullTextIndexUpdater extends AbstractLogEnabled implements Configurable, Serviceable, Initializable,
52         FullTextIndexUpdaterMBean, Disposable {
53     private ServiceManager serviceManager;
54     private String JavaDoc subscriptionName;
55     private EventListener eventListener = new EventListener();
56     private MessageListener fullTextQueueListener = new FullTextQueueListener();
57     private RepositoryManager repositoryManager;
58     private Repository repository;
59     private TextExtractor textExtractor;
60     private String JavaDoc repoUser;
61     private String JavaDoc repoPassword;
62     private FullTextIndex fullTextIndex;
63     private File logExtractedTextFile;
64     private String JavaDoc jmsTopicName;
65     private String JavaDoc jmsQueueName;
66     private Sender fullTextQueueSender;
67     private JmsClient jmsClient;
68     private long dataMaxSize;
69
70     private Repository getRepository() {
71         return repository;
72     }
73
74     public void configure(Configuration configuration) throws ConfigurationException {
75         this.jmsTopicName = configuration.getChild("jmsTopic").getValue();
76         this.jmsQueueName = configuration.getChild("jmsQueue").getValue();
77
78         Configuration userConf = configuration.getChild("repositoryUser", false);
79         if (userConf == null)
80             throw new ConfigurationException("repositoryUser configuration element missing");
81         repoUser = userConf.getAttribute("login");
82         repoPassword = userConf.getAttribute("password");
83         subscriptionName = configuration.getChild("jmsSubscriptionName").getValue();
84
85         Configuration logExtractTextConf = configuration.getChild("logExtractedText", false);
86         if (logExtractTextConf != null) {
87             String JavaDoc fileName = logExtractTextConf.getAttribute("file");
88             logExtractedTextFile = new File(fileName);
89         }
90
91         dataMaxSize = configuration.getChild("dataMaxSize").getValueAsLong();
92     }
93
94     /**
95      * @avalon.dependency key="repository-manager" type="org.outerj.daisy.repository.RepositoryManager"
96      * @avalon.dependency key="fulltextindex" type="org.outerj.daisy.ftindex.FullTextIndex"
97      * @avalon.dependency key="textextractor" type="org.outerj.daisy.textextraction.TextExtractor"
98      * @avalon.dependency key="jmsclient" type="org.outerj.daisy.jms.JmsClient"
99      * @avalon.dependency key="mbeanserver" type="javax.management.MBeanServer"
100      */

101     public void service(ServiceManager serviceManager) throws ServiceException {
102         this.serviceManager = serviceManager;
103         this.textExtractor = (TextExtractor)serviceManager.lookup("textextractor");
104         this.repositoryManager = (RepositoryManager)serviceManager.lookup("repository-manager");
105         this.fullTextIndex = (FullTextIndex)serviceManager.lookup("fulltextindex");
106         this.jmsClient = (JmsClient)serviceManager.lookup("jmsclient");
107     }
108
109     public void initialize() throws Exception JavaDoc {
110         repository = repositoryManager.getRepository(new Credentials(repoUser, repoPassword));
111         fullTextQueueSender = jmsClient.getSender(jmsQueueName);
112         jmsClient.registerDurableTopicListener(jmsTopicName, subscriptionName, eventListener);
113         jmsClient.registerListener(jmsQueueName, fullTextQueueListener);
114
115         MBeanServer JavaDoc mbeanServer = (MBeanServer JavaDoc)serviceManager.lookup("mbeanserver");
116         try {
117             mbeanServer.registerMBean(this, new ObjectName JavaDoc("Daisy:name=FullTextIndexUpdater"));
118         } finally {
119             serviceManager.release(mbeanServer);
120         }
121     }
122
123     public void dispose() {
124         serviceManager.release(textExtractor);
125         serviceManager.release(repositoryManager);
126         serviceManager.release(fullTextIndex);
127         serviceManager.release(jmsClient);
128     }
129
130
131     public void reIndexDocuments(String JavaDoc query) throws Exception JavaDoc {
132         QueryManager queryManager = repository.getQueryManager();
133         VariantKey[] allVariantKeys = queryManager.performQueryReturnKeys(query, Locale.US);
134
135         for (int i = 0; i < allVariantKeys.length; i++) {
136             scheduleIndexRequest(allVariantKeys[i].getDocumentId(), allVariantKeys[i].getBranchId(), allVariantKeys[i].getLanguageId());
137         }
138     }
139
140     public void reIndexAllDocuments() throws Exception JavaDoc {
141         reIndexDocuments("select id where true");
142     }
143
144     /**
145      * Listens for events in the daisy repository and adds a message to the full text index update queue
146      * if appropriate.
147      */

148     private class EventListener implements MessageListener {
149         public void onMessage(Message aMessage) {
150             try {
151                 TextMessage message = (TextMessage)aMessage;
152                 String JavaDoc messageType = message.getStringProperty("type");
153
154                 long documentId = -1;
155                 long branchId = -1;
156                 long languageId = -1;
157
158                 if (messageType.equals("DocumentVariantUpdated")) {
159                     XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader());
160                     DocumentVariantUpdatedDocument variantUpdatedDocument = DocumentVariantUpdatedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions);
161                     long oldLastVersionId = variantUpdatedDocument.getDocumentVariantUpdated().getOldDocumentVariant().getDocument().getLastVersionId();
162                     long newLastVersionId = variantUpdatedDocument.getDocumentVariantUpdated().getNewDocumentVariant().getDocument().getLastVersionId();
163                     if (oldLastVersionId != newLastVersionId) {
164                         DocumentDocument.Document documentXml = variantUpdatedDocument.getDocumentVariantUpdated().getNewDocumentVariant().getDocument();
165                         documentId = documentXml.getId();
166                         branchId = documentXml.getBranchId();
167                         languageId = documentXml.getLanguageId();
168                     }
169                 } else if (messageType.equals("DocumentVariantCreated")) {
170                     XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader());
171                     DocumentVariantCreatedDocument variantCreatedDocument = DocumentVariantCreatedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions);
172                     DocumentDocument.Document documentXml = variantCreatedDocument.getDocumentVariantCreated().getNewDocumentVariant().getDocument();
173                     documentId = documentXml.getId();
174                     branchId = documentXml.getBranchId();
175                     languageId = documentXml.getLanguageId();
176                 } else if (messageType.equals("DocumentVariantDeleted")) {
177                     XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader());
178                     DocumentVariantDeletedDocument variantDeletedDocument = DocumentVariantDeletedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions);
179                     DocumentDocument.Document documentXml = variantDeletedDocument.getDocumentVariantDeleted().getDeletedDocumentVariant().getDocument();
180                     documentId = documentXml.getId();
181                     branchId = documentXml.getBranchId();
182                     languageId = documentXml.getLanguageId();
183                 } else if (messageType.equals("VersionStateChanged")) {
184                     XmlOptions xmlOptions = new XmlOptions().setLoadUseXMLReader(LocalSAXParserFactory.newXmlReader());
185                     VersionStateChangedDocument.VersionStateChanged versionStateChanged = VersionStateChangedDocument.Factory.parse(new StringReader(message.getText()), xmlOptions).getVersionStateChanged();
186                     if (versionStateChanged.getLiveVersionId() != versionStateChanged.getLiveVersionId()) {
187                         documentId = versionStateChanged.getDocumentId();
188                         branchId = versionStateChanged.getBranchId();
189                         languageId = versionStateChanged.getLanguageId();
190                     }
191                 }
192
193                 if (getLogger().isDebugEnabled())
194                     getLogger().debug("Received a message from JMS of type " + messageType + " and decided to " + (documentId == -1 ? "do nothing." : "queue an index update request for document " + documentId + ", branch " + branchId + ", language " + languageId));
195
196                 if (documentId != -1) {
197                     scheduleIndexRequest(documentId, branchId, languageId);
198                 }
199             } catch (Throwable JavaDoc e) {
200                 getLogger().error("Error processing JMS message.", e);
201             }
202         }
203     }
204
205     private void scheduleIndexRequest(long documentId, long branchId, long languageId) throws Exception JavaDoc {
206         MapMessage fullTextTaskMessage = fullTextQueueSender.createMapMessage();
207         fullTextTaskMessage.setLong("documentId", documentId);
208         fullTextTaskMessage.setLong("branchId", branchId);
209         fullTextTaskMessage.setLong("languageId", languageId);
210         fullTextQueueSender.send(fullTextTaskMessage);
211     }
212
213     private class FullTextQueueListener implements MessageListener {
214         public void onMessage(Message aMessage) {
215             try {
216                 MapMessage message = (MapMessage)aMessage;
217                 long documentId = message.getLong("documentId");
218                 long branchId = message.getLong("branchId");
219                 long languageId = message.getLong("languageId");
220
221                 Version version = null;
222                 try {
223                     org.outerj.daisy.repository.Document document = getRepository().getDocument(documentId, branchId, languageId, false);
224                     version = document.getLiveVersion();
225                 } catch (DocumentNotFoundException e) {
226                     // is ok, version will be == null
227
} catch (DocumentVariantNotFoundException e) {
228                     // is ok, version will be == null
229
}
230                 if (version != null) {
231                     // Collect the content of the parts
232
StringBuffer JavaDoc content = new StringBuffer JavaDoc();
233                     Part[] parts = version.getParts().getArray();
234                     for (int p = 0; p < parts.length; p++) {
235                         Part part = parts[p];
236                         String JavaDoc text = null;
237
238                         if (part.getSize() > dataMaxSize) {
239                             getLogger().info("Will not extract text from document ID " + documentId + ", branch ID " + branchId + ", language ID " + languageId + ", part ID " + part.getTypeId() + " because the part data is too large (" + part.getSize() + " > " + dataMaxSize + ")");
240                         } else if (textExtractor.supportsMimeType(part.getMimeType())) {
241                             try {
242                                 // Note: before and after text extraction are logged to be able to check if
243
// the textextractor nicely returns (and doesn't go in endless loops)
244
if (getLogger().isDebugEnabled())
245                                     getLogger().debug("Before calling textextractor for document " + documentId + ", branch " + branchId + ", language " + languageId + ", part type " + part.getTypeId());
246
247                                 text = textExtractor.getText(part.getMimeType(), part.getDataStream());
248
249                                 if (getLogger().isDebugEnabled())
250                                     getLogger().debug("After calling textextractor for document " + documentId + ", branch " + branchId + ", language " + languageId + ", part type " + part.getTypeId());
251
252                                 logExtractedText(text, documentId, branchId, languageId, part);
253                             } catch (Throwable JavaDoc e) {
254                                 getLogger().error("Error extracting text from part data (document ID: " + documentId + ", branch ID: " + branchId + ", language ID: " + languageId + ", part ID: " + part.getTypeId() + ")", e);
255                                 continue;
256                             }
257                         }
258
259                         if (text != null) {
260                             content.append(text).append(" ");
261                         } else {
262                             if (getLogger().isDebugEnabled())
263                                 getLogger().debug("Textextractor wasn't able to extract anything for document " + documentId + ", branch " + branchId + ", language " + languageId + ", part type " + part.getTypeId() + " (mimetype = " + part.getMimeType() + ")");
264                         }
265                     }
266
267                     // collect the content of the fields
268
StringBuffer JavaDoc fieldsContent = new StringBuffer JavaDoc();
269                     Field[] fields = version.getFields().getArray();
270                     for (int i = 0; i < fields.length; i++) {
271                         if (fields[i].getValueType() == ValueType.STRING) {
272                             Object JavaDoc[] values = fields[i].isMultiValue() ? (Object JavaDoc[])fields[i].getValue() : new Object JavaDoc[] {fields[i].getValue()};
273                             for (int k = 0; k < values.length; k++) {
274                                 fieldsContent.append(values[k]);
275                                 fieldsContent.append(' ');
276                             }
277                         }
278                     }
279
280                     fullTextIndex.index(documentId, branchId, languageId, version.getDocumentName(), content.toString(), fieldsContent.toString());
281                 } else {
282                     if (getLogger().isDebugEnabled())
283                         getLogger().debug("Document " + documentId + " is deleted or has no version marked as published, will remove fulltext index (if any exists).");
284                     fullTextIndex.index(documentId, branchId, languageId, null, null, null);
285                 }
286             } catch (Throwable JavaDoc e) {
287                 getLogger().error("Error processing JMS message.", e);
288             }
289         }
290     }
291
292     private void logExtractedText(String JavaDoc text, long documentId, long branchId, long languageId, Part part) {
293         if (logExtractedTextFile != null) {
294             Writer writer = null;
295             try {
296                 FileOutputStream fos = new FileOutputStream(logExtractedTextFile, true);
297                 writer = new OutputStreamWriter(fos, Charset.forName("UTF-8"));
298                 writer.write("\n\n********************************************************************************************\n");
299                 writer.write("Document " + documentId + ", branch " + branchId + ", language " + languageId + " -- part type " + part.getTypeId() + " -- mimetype " + part.getMimeType() + "\n");
300                 writer.write(text);
301             } catch (Throwable JavaDoc e) {
302                 getLogger().error("Error logging extracted text to " + logExtractedTextFile.getAbsolutePath(), e);
303             } finally {
304                 if (writer != null) {
305                     try {
306                         writer.close();
307                     } catch (Throwable JavaDoc e) {
308                         // ignore
309
}
310                 }
311             }
312         }
313     }
314 }
315
Popular Tags