KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sf > hajdbc > sync > FullSynchronizationStrategy


1 /*
2  * HA-JDBC: High-Availability JDBC
3  * Copyright (c) 2004-2006 Paul Ferraro
4  *
5  * This library is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU Lesser General Public License as published by the
7  * Free Software Foundation; either version 2.1 of the License, or (at your
8  * option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public License
16  * along with this library; if not, write to the Free Software Foundation,
17  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: ferraro@users.sourceforge.net
20  */

21 package net.sf.hajdbc.sync;
22
23 import java.sql.Connection JavaDoc;
24 import java.sql.DatabaseMetaData JavaDoc;
25 import java.sql.PreparedStatement JavaDoc;
26 import java.sql.ResultSet JavaDoc;
27 import java.sql.ResultSetMetaData JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.sql.Statement JavaDoc;
30 import java.sql.Types JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.Map JavaDoc;
33 import java.util.concurrent.Callable JavaDoc;
34 import java.util.concurrent.ExecutionException JavaDoc;
35 import java.util.concurrent.ExecutorService JavaDoc;
36 import java.util.concurrent.Executors JavaDoc;
37 import java.util.concurrent.Future JavaDoc;
38
39 import net.sf.hajdbc.Dialect;
40 import net.sf.hajdbc.ForeignKeyConstraint;
41 import net.sf.hajdbc.Messages;
42 import net.sf.hajdbc.SynchronizationStrategy;
43 import net.sf.hajdbc.util.concurrent.DaemonThreadFactory;
44
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Database-independent synchronization strategy that only updates differences between two databases.
50  * This strategy is best used when there are <em>many</em> differences between the active database and the inactive database (i.e. very much out of sync).
51  * The following algorithm is used:
52  * <ol>
53  * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
54  * <li>For each database table:
55  * <ol>
56  * <li>Delete all rows in the inactive database table</li>
57  * <li>Query all rows on the active database table</li>
58  * <li>For each row in active database table:
59  * <ol>
60  * <li>Insert new row into inactive database table</li>
61  * </ol>
62  * </li>
63  * </ol>
64  * </li>
65  * <li>Re-create the foreign keys on the inactive database</li>
66  * </ol>
67  * @author Paul Ferraro
68  * @version $Revision: 1370 $
69  * @since 1.0
70  */

71 public class FullSynchronizationStrategy implements SynchronizationStrategy
72 {
73     private static Logger logger = LoggerFactory.getLogger(FullSynchronizationStrategy.class);
74
75     private ExecutorService JavaDoc executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.getInstance());
76     private int maxBatchSize = 100;
77     private int fetchSize = 0;
78     
79     /**
80      * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(Connection, Connection, Map, Dialect)
81      */

82     public void synchronize(Connection JavaDoc inactiveConnection, Connection JavaDoc activeConnection, Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> schemaMap, Dialect dialect) throws SQLException JavaDoc
83     {
84         inactiveConnection.setAutoCommit(true);
85         
86         DatabaseMetaData JavaDoc metaData = inactiveConnection.getMetaData();
87         
88         Statement JavaDoc statement = inactiveConnection.createStatement();
89         
90         // Drop foreign keys from the inactive database
91
for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(inactiveConnection, schemaMap))
92         {
93             String JavaDoc sql = dialect.getDropForeignKeyConstraintSQL(metaData, key);
94             
95             logger.debug(sql);
96             
97             statement.addBatch(sql);
98         }
99         
100         statement.executeBatch();
101         statement.clearBatch();
102         
103         inactiveConnection.setAutoCommit(false);
104         
105         try
106         {
107             for (Map.Entry JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> schemaMapEntry: schemaMap.entrySet())
108             {
109                 String JavaDoc schema = schemaMapEntry.getKey();
110                 
111                 for (String JavaDoc table: schemaMapEntry.getValue())
112                 {
113                     String JavaDoc qualifiedTable = dialect.qualifyTable(metaData, schema, table);
114                 
115                     final String JavaDoc selectSQL = "SELECT * FROM " + qualifiedTable;
116                     
117                     final Statement JavaDoc selectStatement = activeConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
118                     selectStatement.setFetchSize(this.fetchSize);
119                     
120                     Callable JavaDoc<ResultSet JavaDoc> callable = new Callable JavaDoc<ResultSet JavaDoc>()
121                     {
122                         public ResultSet JavaDoc call() throws SQLException JavaDoc
123                         {
124                             return selectStatement.executeQuery(selectSQL);
125                         }
126                     };
127         
128                     Future JavaDoc<ResultSet JavaDoc> future = this.executor.submit(callable);
129                     
130                     String JavaDoc deleteSQL = dialect.getTruncateTableSQL(metaData, schema, table);
131         
132                     logger.debug(deleteSQL);
133                     
134                     Statement JavaDoc deleteStatement = inactiveConnection.createStatement();
135         
136                     int deletedRows = deleteStatement.executeUpdate(deleteSQL);
137                     
138                     logger.info(Messages.getMessage(Messages.DELETE_COUNT, deletedRows, qualifiedTable));
139                     
140                     deleteStatement.close();
141                     
142                     ResultSet JavaDoc resultSet = future.get();
143                     
144                     StringBuilder JavaDoc builder = new StringBuilder JavaDoc("INSERT INTO ").append(qualifiedTable).append(" (");
145         
146                     ResultSetMetaData JavaDoc resultSetMetaData = resultSet.getMetaData();
147                     
148                     int columns = resultSetMetaData.getColumnCount();
149                     
150                     for (int i = 1; i <= columns; ++i)
151                     {
152                         if (i > 1)
153                         {
154                             builder.append(", ");
155                         }
156                         
157                         builder.append(dialect.quote(metaData, resultSetMetaData.getColumnName(i)));
158                     }
159                     
160                     builder.append(") VALUES (");
161                     
162                     for (int i = 1; i <= columns; ++i)
163                     {
164                         if (i > 1)
165                         {
166                             builder.append(", ");
167                         }
168                         
169                         builder.append("?");
170                     }
171                     
172                     String JavaDoc insertSQL = builder.append(")").toString();
173                     
174                     logger.debug(insertSQL);
175                     
176                     PreparedStatement JavaDoc insertStatement = inactiveConnection.prepareStatement(insertSQL);
177                     int statementCount = 0;
178                     
179                     while (resultSet.next())
180                     {
181                         for (int i = 1; i <= columns; ++i)
182                         {
183                             int type = dialect.getColumnType(resultSetMetaData, i);
184                             
185                             Object JavaDoc object = this.getObject(resultSet, i, type);
186                             
187                             if (resultSet.wasNull())
188                             {
189                                 insertStatement.setNull(i, type);
190                             }
191                             else
192                             {
193                                 insertStatement.setObject(i, object, type);
194                             }
195                         }
196                         
197                         insertStatement.addBatch();
198                         statementCount += 1;
199                         
200                         if ((statementCount % this.maxBatchSize) == 0)
201                         {
202                             insertStatement.executeBatch();
203                             insertStatement.clearBatch();
204                         }
205                         
206                         insertStatement.clearParameters();
207                     }
208         
209                     if ((statementCount % this.maxBatchSize) > 0)
210                     {
211                         insertStatement.executeBatch();
212                     }
213         
214                     logger.info(Messages.getMessage(Messages.INSERT_COUNT, statementCount, qualifiedTable));
215                     
216                     insertStatement.close();
217                     selectStatement.close();
218                     
219                     inactiveConnection.commit();
220                 }
221             }
222         }
223         catch (InterruptedException JavaDoc e)
224         {
225             this.rollback(inactiveConnection);
226
227             throw new net.sf.hajdbc.SQLException(e);
228         }
229         catch (ExecutionException JavaDoc e)
230         {
231             this.rollback(inactiveConnection);
232
233             throw new net.sf.hajdbc.SQLException(e.getCause());
234         }
235         catch (SQLException JavaDoc e)
236         {
237             this.rollback(inactiveConnection);
238             
239             throw e;
240         }
241         
242         inactiveConnection.setAutoCommit(true);
243
244         // Collect foreign keys from active database and create them on inactive database
245
for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(activeConnection, schemaMap))
246         {
247             String JavaDoc sql = dialect.getCreateForeignKeyConstraintSQL(metaData, key);
248             
249             logger.debug(sql);
250             
251             statement.addBatch(sql);
252         }
253         
254         statement.executeBatch();
255         statement.close();
256     }
257     
258     /**
259      * @see net.sf.hajdbc.SynchronizationStrategy#requiresTableLocking()
260      */

261     public boolean requiresTableLocking()
262     {
263         return true;
264     }
265
266     private Object JavaDoc getObject(ResultSet JavaDoc resultSet, int index, int type) throws SQLException JavaDoc
267     {
268         switch (type)
269         {
270             case Types.BLOB:
271             {
272                 return resultSet.getBlob(index);
273             }
274             case Types.CLOB:
275             {
276                 return resultSet.getClob(index);
277             }
278             default:
279             {
280                 return resultSet.getObject(index);
281             }
282         }
283     }
284     
285     private void rollback(Connection JavaDoc connection)
286     {
287         try
288         {
289             connection.rollback();
290             connection.setAutoCommit(true);
291         }
292         catch (java.sql.SQLException JavaDoc e)
293         {
294             logger.warn(e.toString(), e);
295         }
296     }
297
298     /**
299      * @return the fetchSize.
300      */

301     public int getFetchSize()
302     {
303         return this.fetchSize;
304     }
305
306     /**
307      * @param fetchSize the fetchSize to set.
308      */

309     public void setFetchSize(int fetchSize)
310     {
311         this.fetchSize = fetchSize;
312     }
313     
314     /**
315      * @return the maxBatchSize.
316      */

317     public int getMaxBatchSize()
318     {
319         return this.maxBatchSize;
320     }
321
322     /**
323      * @param maxBatchSize the maxBatchSize to set.
324      */

325     public void setMaxBatchSize(int maxBatchSize)
326     {
327         this.maxBatchSize = maxBatchSize;
328     }
329 }
330
Popular Tags