KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > audit > jdbc > JdbcAuditor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.audit.jdbc;
18
19 import java.io.IOException JavaDoc;
20 import java.net.URI JavaDoc;
21 import java.sql.Connection JavaDoc;
22 import java.sql.SQLException JavaDoc;
23
24 import javax.jbi.messaging.MessageExchange;
25 import javax.sql.DataSource JavaDoc;
26
27 import org.apache.servicemix.jbi.audit.AbstractAuditor;
28 import org.apache.servicemix.jbi.audit.AuditorException;
29 import org.apache.servicemix.jbi.event.ExchangeEvent;
30 import org.apache.servicemix.jbi.messaging.ExchangePacket;
31 import org.apache.servicemix.jbi.messaging.InOnlyImpl;
32 import org.apache.servicemix.jbi.messaging.InOptionalOutImpl;
33 import org.apache.servicemix.jbi.messaging.InOutImpl;
34 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
35 import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
36 import org.apache.servicemix.jbi.messaging.RobustInOnlyImpl;
37 import org.apache.servicemix.jdbc.JDBCAdapter;
38 import org.apache.servicemix.jdbc.JDBCAdapterFactory;
39 import org.apache.servicemix.jdbc.Statements;
40 import org.springframework.beans.factory.InitializingBean;
41
42 /**
43  * Basic implementation of ServiceMix auditor on a jdbc store.
44  * This implementation, for performance purposes, only relies
45  * on one table SM_AUDIT with two columns:
46  * <ul>
47  * <li><b>ID</b> the exchange id (varchar)</li>
48  * <li><b>EXCHANGE</b> the serialized exchange (blob)</li>
49  * </ul>
50  * To minimize overhead, the exchange serialized is the undelying
51  * {@link org.apache.servicemix.jbi.messaging.ExchangePacket}.
52  *
53  * @org.apache.xbean.XBean element="jdbcAuditor" description="The Auditor of message exchanges to a JDBC database"
54  *
55  * @author Guillaume Nodet (gnt)
56  * @version $Revision: 426415 $
57  * @since 2.1
58  */

59 public class JdbcAuditor extends AbstractAuditor implements InitializingBean {
60
61     private DataSource JavaDoc dataSource;
62     private boolean autoStart = true;
63     private Statements statements;
64     private String JavaDoc tableName = "SM_AUDIT";
65     private JDBCAdapter adapter;
66     private boolean createDataBase = true;
67     
68     public String JavaDoc getDescription() {
69         return "JDBC Auditing Service";
70     }
71     
72     public void afterPropertiesSet() throws Exception JavaDoc {
73         if (this.container == null) {
74             throw new IllegalArgumentException JavaDoc("container should not be null");
75         }
76         if (this.dataSource == null) {
77             throw new IllegalArgumentException JavaDoc("dataSource should not be null");
78         }
79         if (statements == null) {
80             statements = new Statements();
81             statements.setStoreTableName(tableName);
82         }
83         Connection JavaDoc connection = null;
84         try {
85             connection = getDataSource().getConnection();
86             adapter = JDBCAdapterFactory.getAdapter(connection);
87             if (statements == null) {
88                 statements = new Statements();
89                 statements.setStoreTableName(tableName);
90             }
91             adapter.setStatements(statements);
92             if (createDataBase) {
93                 adapter.doCreateTables(connection);
94             }
95             connection.commit();
96         } catch (SQLException JavaDoc e) {
97             throw (IOException JavaDoc) new IOException JavaDoc("Exception while creating database").initCause(e);
98         } finally {
99             if (connection != null) {
100                 try {
101                     connection.close();
102                 } catch (Exception JavaDoc e) {
103                 }
104             }
105         }
106         init(getContainer());
107         if (autoStart) {
108             start();
109         } else {
110             stop();
111         }
112     }
113     
114     public void exchangeSent(ExchangeEvent event) {
115         MessageExchange exchange = event.getExchange();
116         if (exchange instanceof MessageExchangeImpl == false) {
117             throw new IllegalArgumentException JavaDoc("exchange should be a MessageExchangeImpl");
118         }
119         try {
120             ExchangePacket packet = ((MessageExchangeImpl) exchange).getPacket();
121             String JavaDoc id = packet.getExchangeId();
122             byte[] data = packet.getData();
123             Connection JavaDoc connection = dataSource.getConnection();
124             try {
125                 store(connection, id, data);
126                 connection.commit();
127             } finally {
128                 close(connection);
129             }
130         } catch (Exception JavaDoc e) {
131             log.error("Could not persist exchange", e);
132         }
133     }
134     
135     private static void close(Connection JavaDoc connection) {
136         if (connection != null) {
137             try {
138                 connection.close();
139             } catch (SQLException JavaDoc e) {
140             }
141         }
142         
143     }
144
145     protected void store(Connection JavaDoc connection, String JavaDoc id, byte[] data) throws Exception JavaDoc {
146         if (adapter.doLoadData(connection, id) != null) {
147             adapter.doUpdateData(connection, id, data);
148         } else {
149             adapter.doStoreData(connection, id, data);
150         }
151     }
152     
153     public DataSource JavaDoc getDataSource() {
154         return dataSource;
155     }
156
157     public void setDataSource(DataSource JavaDoc dataSource) {
158         this.dataSource = dataSource;
159     }
160
161     /* (non-Javadoc)
162      * @see org.apache.servicemix.jbi.audit.AuditorMBean#getExchangeCount()
163      */

164     public int getExchangeCount() throws AuditorException {
165         Connection JavaDoc connection = null;
166         try {
167             connection = dataSource.getConnection();
168             return adapter.doGetCount(connection);
169         } catch (Exception JavaDoc e) {
170             throw new AuditorException("Could not retrieve exchange count", e);
171         } finally {
172             close(connection);
173         }
174     }
175
176     /* (non-Javadoc)
177      * @see org.apache.servicemix.jbi.audit.AuditorMBean#getExchangeIds(int, int)
178      */

179     public String JavaDoc[] getExchangeIds(int fromIndex, int toIndex) throws AuditorException {
180         if (fromIndex < 0) {
181             throw new IllegalArgumentException JavaDoc("fromIndex should be greater or equal to zero");
182         }
183         if (toIndex < fromIndex) {
184             throw new IllegalArgumentException JavaDoc("toIndex should be greater or equal to fromIndex");
185         }
186         // Do not hit the database if no ids are requested
187
if (fromIndex == toIndex) {
188             return new String JavaDoc[0];
189         }
190         Connection JavaDoc connection = null;
191         try {
192             connection = dataSource.getConnection();
193             String JavaDoc[] ids = adapter.doGetIds(connection, fromIndex, toIndex);
194             return ids;
195         } catch (Exception JavaDoc e) {
196             throw new AuditorException("Could not retrieve exchange ids", e);
197         } finally {
198             close(connection);
199         }
200     }
201
202     /* (non-Javadoc)
203      * @see org.apache.servicemix.jbi.audit.AuditorMBean#getExchanges(java.lang.String[])
204      */

205     public MessageExchange[] getExchanges(String JavaDoc[] ids) throws AuditorException {
206         MessageExchange[] exchanges = new MessageExchange[ids.length];
207         Connection JavaDoc connection = null;
208         try {
209             connection = dataSource.getConnection();
210             for (int row = 0; row < ids.length; row++) {
211                 exchanges[row] = getExchange(adapter.doLoadData(connection, ids[row]));
212             }
213             return exchanges;
214         } catch (Exception JavaDoc e) {
215             throw new AuditorException("Could not retrieve exchanges", e);
216         } finally {
217             close(connection);
218         }
219     }
220
221     /* (non-Javadoc)
222      * @see org.apache.servicemix.jbi.audit.AuditorMBean#deleteExchanges(java.lang.String[])
223      */

224     public int deleteExchanges(String JavaDoc[] ids) throws AuditorException {
225         Connection JavaDoc connection = null;
226         try {
227             connection = dataSource.getConnection();
228             for (int row = 0; row < ids.length; row++) {
229                 adapter.doRemoveData(connection, ids[row]);
230             }
231             return -1;
232         } catch (Exception JavaDoc e) {
233             throw new AuditorException("Could not delete exchanges", e);
234         } finally {
235             close(connection);
236         }
237     }
238     
239     // TODO: this should be somewhere in org.apache.servicemix.jbi.messaging
240
protected MessageExchange getExchange(byte[] data) throws AuditorException {
241         ExchangePacket packet = null;
242         try {
243             packet = ExchangePacket.readPacket(data);
244         } catch (Exception JavaDoc e) {
245             throw new AuditorException("Unable to reconstruct exchange", e);
246         }
247         URI JavaDoc mep = packet.getPattern();
248         if (MessageExchangeSupport.IN_ONLY.equals(mep)) {
249             return new InOnlyImpl(packet);
250         } else if (MessageExchangeSupport.IN_OPTIONAL_OUT.equals(mep)) {
251             return new InOptionalOutImpl(packet);
252         } else if (MessageExchangeSupport.IN_OUT.equals(mep)) {
253             return new InOutImpl(packet);
254         } else if (MessageExchangeSupport.ROBUST_IN_ONLY.equals(mep)) {
255             return new RobustInOnlyImpl(packet);
256         } else {
257             throw new AuditorException("Unhandled mep: " + mep);
258         }
259     }
260     
261     public boolean isAutoStart() {
262         return autoStart;
263     }
264
265     public void setAutoStart(boolean autoStart) {
266         this.autoStart = autoStart;
267     }
268
269     
270 }
271
Popular Tags