1 17 package org.apache.servicemix.jbi.audit.lucene; 18 19 import java.io.IOException ; 20 import java.util.Iterator ; 21 import java.util.Set ; 22 23 import javax.jbi.JBIException; 24 import javax.jbi.messaging.ExchangeStatus; 25 import javax.jbi.messaging.MessageExchange; 26 import javax.jbi.messaging.MessagingException; 27 import javax.jbi.messaging.NormalizedMessage; 28 29 import org.apache.lucene.document.Document; 30 import org.apache.lucene.document.Field; 31 import org.apache.servicemix.jbi.audit.AbstractAuditor; 32 import org.apache.servicemix.jbi.audit.AuditorException; 33 import org.apache.servicemix.jbi.audit.AuditorMBean; 34 import org.apache.servicemix.jbi.audit.AuditorQueryMBean; 35 import org.apache.servicemix.jbi.event.ExchangeEvent; 36 import org.apache.servicemix.jbi.event.ExchangeListener; 37 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 38 39 58 public class LuceneAuditor extends AbstractAuditor implements AuditorQueryMBean { 59 60 private AuditorMBean delegatedAuditor; 61 private LuceneIndexer luceneIndexer = new LuceneIndexer(); 62 63 protected void doStart() throws JBIException { 64 super.doStart(); 65 if (delegatedAuditor == null) { 66 throw new JBIException("A delegated auditor must be provided"); 67 } 68 this.delegatedAuditor.start(); 69 } 70 71 protected void doStop() throws JBIException { 72 super.doStop(); 73 this.delegatedAuditor.stop(); 74 } 75 76 79 public LuceneIndexer getLuceneIndexer() { 80 return luceneIndexer; 81 } 82 83 86 public void setLuceneIndexer(LuceneIndexer luceneIndexer) { 87 this.luceneIndexer = luceneIndexer; 88 } 89 90 93 public AuditorMBean getDelegatedAuditor() { 94 return delegatedAuditor; 95 } 96 97 100 public void setDelegatedAuditor(AuditorMBean delegatedAuditor) { 101 this.delegatedAuditor = delegatedAuditor; 102 if (delegatedAuditor instanceof AbstractAuditor) { 103 ((AbstractAuditor)delegatedAuditor).setAsContainerListener(false); 104 } 105 } 106 107 public int getExchangeCount() throws AuditorException { 108 return this.delegatedAuditor.getExchangeCount(); 109 } 110 111 public String [] getExchangeIds(int fromIndex, int toIndex) throws AuditorException { 112 return this.delegatedAuditor.getExchangeIds(fromIndex,toIndex); 113 } 114 115 public MessageExchange[] getExchanges(String [] ids) throws AuditorException { 116 return this.delegatedAuditor.getExchanges(ids); 117 } 118 119 public int deleteExchanges(int fromIndex, int toIndex) throws AuditorException { 120 return this.delegatedAuditor.deleteExchanges(fromIndex,toIndex); 122 } 123 124 public int deleteExchanges(String [] ids) throws AuditorException { 125 try { 126 this.luceneIndexer.remove(ids); 127 } catch (IOException io) { 128 throw new AuditorException(io); 129 } 130 return this.delegatedAuditor.deleteExchanges(ids); 131 } 132 133 public void exchangeSent(ExchangeEvent event) { 134 MessageExchange exchange = event.getExchange(); 135 try { 136 Document doc = createDocument(exchange); 137 this.luceneIndexer.add(doc,exchange.getExchangeId()); 138 if (delegatedAuditor instanceof ExchangeListener) { 139 ((ExchangeListener) delegatedAuditor).exchangeSent(event); 140 } 141 } catch (Exception e) { 142 log.error("Error while adding to lucene", e); 143 } 144 } 145 146 public String getDescription() { 147 return "Lucene Auditor"; 148 } 149 150 public String [] findExchangesIDsByStatus(ExchangeStatus status) throws AuditorException { 151 String field = "org.apache.servicemix.exchangestatus"; 152 return getExchangeIds(field,String.valueOf(status)); 153 } 154 155 public String [] findExchangesIDsByMessageContent(String type, String content) throws AuditorException { 156 String field = "org.apache.servicemix."+type+".contents"; 157 return getExchangeIds(field,content); 158 } 159 160 public String [] findExchangesIDsByMessageProperty(String type, String property, String value) throws AuditorException { 161 if (property != null && !property.startsWith("org.apache.servicemix")) { 162 property = "org.apache.servicemix."+type+"."+property; 163 } 164 return getExchangeIds(property,value); 165 } 166 167 protected Document createDocument(MessageExchange me) throws MessagingException { 168 try { 169 SourceTransformer st = new SourceTransformer(); 171 Document d = new Document(); 172 d.add(Field.Keyword("org.apache.servicemix.exchangeid",me.getExchangeId())); 173 d.add(Field.Keyword("org.apache.servicemix.exchangestatus",String.valueOf(me.getStatus()))); 174 175 String [] types = {"in","out","fault"}; 176 for (int i=0;i<types.length;i++) { 177 String type = types[i]; 178 NormalizedMessage nm = me.getMessage(type); 179 if (nm != null) { 180 d.add(Field.UnStored("org.apache.servicemix."+type+".contents",st.contentToString(nm))); 181 addMessagePropertiesToDocument(nm,d,type); 182 } 183 } 184 return d; 185 } catch (MessagingException mse) { 186 throw mse; 187 } catch (Exception ex) { 188 throw new MessagingException("Error while creating Lucene Document",ex); 189 } 190 } 191 192 protected void addMessagePropertiesToDocument(NormalizedMessage nm, 193 Document document, String type) throws MessagingException { 194 Set propertyNames = nm.getPropertyNames(); 195 for (Iterator iter = propertyNames.iterator(); iter.hasNext();) { 196 String propertyName = (String ) iter.next(); 197 Object value = nm.getProperty(propertyName); 198 if (value instanceof String ) 199 document.add(Field.Keyword("org.apache.servicemix."+type+"."+propertyName,String.valueOf(value))); 201 } 202 } 203 204 public String [] getExchangeIds(String queryContent, String field) throws AuditorException { 205 DefaultLuceneCallback dfc = new DefaultLuceneCallback(queryContent,field); 206 try { 207 return (String [])luceneIndexer.search(dfc); 208 } catch (IOException e) { 209 throw new AuditorException("Error while getting Exchange IDs",e); 210 } 211 } 212 } 213 | Popular Tags |