1 17 package org.apache.servicemix.jbi.audit.jdbc; 18 19 import java.io.IOException ; 20 import java.net.URI ; 21 import java.sql.Connection ; 22 import java.sql.SQLException ; 23 24 import javax.jbi.messaging.MessageExchange; 25 import javax.sql.DataSource ; 26 27 import org.apache.servicemix.jbi.audit.AbstractAuditor; 28 import org.apache.servicemix.jbi.audit.AuditorException; 29 import org.apache.servicemix.jbi.event.ExchangeEvent; 30 import org.apache.servicemix.jbi.messaging.ExchangePacket; 31 import org.apache.servicemix.jbi.messaging.InOnlyImpl; 32 import org.apache.servicemix.jbi.messaging.InOptionalOutImpl; 33 import org.apache.servicemix.jbi.messaging.InOutImpl; 34 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 35 import org.apache.servicemix.jbi.messaging.MessageExchangeSupport; 36 import org.apache.servicemix.jbi.messaging.RobustInOnlyImpl; 37 import org.apache.servicemix.jdbc.JDBCAdapter; 38 import org.apache.servicemix.jdbc.JDBCAdapterFactory; 39 import org.apache.servicemix.jdbc.Statements; 40 import org.springframework.beans.factory.InitializingBean; 41 42 59 public class JdbcAuditor extends AbstractAuditor implements InitializingBean { 60 61 private DataSource dataSource; 62 private boolean autoStart = true; 63 private Statements statements; 64 private String tableName = "SM_AUDIT"; 65 private JDBCAdapter adapter; 66 private boolean createDataBase = true; 67 68 public String getDescription() { 69 return "JDBC Auditing Service"; 70 } 71 72 public void afterPropertiesSet() throws Exception { 73 if (this.container == null) { 74 throw new IllegalArgumentException ("container should not be null"); 75 } 76 if (this.dataSource == null) { 77 throw new IllegalArgumentException ("dataSource should not be null"); 78 } 79 if (statements == null) { 80 statements = new Statements(); 81 statements.setStoreTableName(tableName); 82 } 83 Connection connection = null; 84 try { 85 connection = getDataSource().getConnection(); 86 adapter = JDBCAdapterFactory.getAdapter(connection); 87 if (statements == null) { 88 statements = new Statements(); 89 statements.setStoreTableName(tableName); 90 } 91 adapter.setStatements(statements); 92 if (createDataBase) { 93 adapter.doCreateTables(connection); 94 } 95 connection.commit(); 96 } catch (SQLException e) { 97 throw (IOException ) new IOException ("Exception while creating database").initCause(e); 98 } finally { 99 if (connection != null) { 100 try { 101 connection.close(); 102 } catch (Exception e) { 103 } 104 } 105 } 106 init(getContainer()); 107 if (autoStart) { 108 start(); 109 } else { 110 stop(); 111 } 112 } 113 114 public void exchangeSent(ExchangeEvent event) { 115 MessageExchange exchange = event.getExchange(); 116 if (exchange instanceof MessageExchangeImpl == false) { 117 throw new IllegalArgumentException ("exchange should be a MessageExchangeImpl"); 118 } 119 try { 120 ExchangePacket packet = ((MessageExchangeImpl) exchange).getPacket(); 121 String id = packet.getExchangeId(); 122 byte[] data = packet.getData(); 123 Connection connection = dataSource.getConnection(); 124 try { 125 store(connection, id, data); 126 connection.commit(); 127 } finally { 128 close(connection); 129 } 130 } catch (Exception e) { 131 log.error("Could not persist exchange", e); 132 } 133 } 134 135 private static void close(Connection connection) { 136 if (connection != null) { 137 try { 138 connection.close(); 139 } catch (SQLException e) { 140 } 141 } 142 143 } 144 145 protected void store(Connection connection, String id, byte[] data) throws Exception { 146 if (adapter.doLoadData(connection, id) != null) { 147 adapter.doUpdateData(connection, id, data); 148 } else { 149 adapter.doStoreData(connection, id, data); 150 } 151 } 152 153 public DataSource getDataSource() { 154 return dataSource; 155 } 156 157 public void setDataSource(DataSource dataSource) { 158 this.dataSource = dataSource; 159 } 160 161 164 public int getExchangeCount() throws AuditorException { 165 Connection connection = null; 166 try { 167 connection = dataSource.getConnection(); 168 return adapter.doGetCount(connection); 169 } catch (Exception e) { 170 throw new AuditorException("Could not retrieve exchange count", e); 171 } finally { 172 close(connection); 173 } 174 } 175 176 179 public String [] getExchangeIds(int fromIndex, int toIndex) throws AuditorException { 180 if (fromIndex < 0) { 181 throw new IllegalArgumentException ("fromIndex should be greater or equal to zero"); 182 } 183 if (toIndex < fromIndex) { 184 throw new IllegalArgumentException ("toIndex should be greater or equal to fromIndex"); 185 } 186 if (fromIndex == toIndex) { 188 return new String [0]; 189 } 190 Connection connection = null; 191 try { 192 connection = dataSource.getConnection(); 193 String [] ids = adapter.doGetIds(connection, fromIndex, toIndex); 194 return ids; 195 } catch (Exception e) { 196 throw new AuditorException("Could not retrieve exchange ids", e); 197 } finally { 198 close(connection); 199 } 200 } 201 202 205 public MessageExchange[] getExchanges(String [] ids) throws AuditorException { 206 MessageExchange[] exchanges = new MessageExchange[ids.length]; 207 Connection connection = null; 208 try { 209 connection = dataSource.getConnection(); 210 for (int row = 0; row < ids.length; row++) { 211 exchanges[row] = getExchange(adapter.doLoadData(connection, ids[row])); 212 } 213 return exchanges; 214 } catch (Exception e) { 215 throw new AuditorException("Could not retrieve exchanges", e); 216 } finally { 217 close(connection); 218 } 219 } 220 221 224 public int deleteExchanges(String [] ids) throws AuditorException { 225 Connection connection = null; 226 try { 227 connection = dataSource.getConnection(); 228 for (int row = 0; row < ids.length; row++) { 229 adapter.doRemoveData(connection, ids[row]); 230 } 231 return -1; 232 } catch (Exception e) { 233 throw new AuditorException("Could not delete exchanges", e); 234 } finally { 235 close(connection); 236 } 237 } 238 239 protected MessageExchange getExchange(byte[] data) throws AuditorException { 241 ExchangePacket packet = null; 242 try { 243 packet = ExchangePacket.readPacket(data); 244 } catch (Exception e) { 245 throw new AuditorException("Unable to reconstruct exchange", e); 246 } 247 URI mep = packet.getPattern(); 248 if (MessageExchangeSupport.IN_ONLY.equals(mep)) { 249 return new InOnlyImpl(packet); 250 } else if (MessageExchangeSupport.IN_OPTIONAL_OUT.equals(mep)) { 251 return new InOptionalOutImpl(packet); 252 } else if (MessageExchangeSupport.IN_OUT.equals(mep)) { 253 return new InOutImpl(packet); 254 } else if (MessageExchangeSupport.ROBUST_IN_ONLY.equals(mep)) { 255 return new RobustInOnlyImpl(packet); 256 } else { 257 throw new AuditorException("Unhandled mep: " + mep); 258 } 259 } 260 261 public boolean isAutoStart() { 262 return autoStart; 263 } 264 265 public void setAutoStart(boolean autoStart) { 266 this.autoStart = autoStart; 267 } 268 269 270 } 271 | Popular Tags |