KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sf > hajdbc > sync > DifferentialSynchronizationStrategy


1 /*
2  * HA-JDBC: High-Availability JDBC
3  * Copyright (c) 2004-2006 Paul Ferraro
4  *
5  * This library is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU Lesser General Public License as published by the
7  * Free Software Foundation; either version 2.1 of the License, or (at your
8  * option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public License
16  * along with this library; if not, write to the Free Software Foundation,
17  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: ferraro@users.sourceforge.net
20  */

21 package net.sf.hajdbc.sync;
22
23 import java.sql.Connection JavaDoc;
24 import java.sql.DatabaseMetaData JavaDoc;
25 import java.sql.PreparedStatement JavaDoc;
26 import java.sql.ResultSet JavaDoc;
27 import java.sql.ResultSetMetaData JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.sql.Statement JavaDoc;
30 import java.sql.Types JavaDoc;
31 import java.util.Arrays JavaDoc;
32 import java.util.Iterator JavaDoc;
33 import java.util.LinkedHashSet JavaDoc;
34 import java.util.List JavaDoc;
35 import java.util.Map JavaDoc;
36 import java.util.Set JavaDoc;
37 import java.util.TreeMap JavaDoc;
38 import java.util.concurrent.Callable JavaDoc;
39 import java.util.concurrent.ExecutionException JavaDoc;
40 import java.util.concurrent.ExecutorService JavaDoc;
41 import java.util.concurrent.Executors JavaDoc;
42 import java.util.concurrent.Future JavaDoc;
43
44 import net.sf.hajdbc.Dialect;
45 import net.sf.hajdbc.ForeignKeyConstraint;
46 import net.sf.hajdbc.Messages;
47 import net.sf.hajdbc.SynchronizationStrategy;
48 import net.sf.hajdbc.UniqueConstraint;
49 import net.sf.hajdbc.util.concurrent.DaemonThreadFactory;
50
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Database-independent synchronization strategy that only updates differences between two databases.
56  * This strategy is best used when there are <em>few</em> differences between the active database and the inactive database (i.e. barely out of sync).
57  * The following algorithm is used:
58  * <ol>
59  * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
60  * <li>For each database table:
61  * <ol>
62  * <li>Drop the unique constraints on the table (to avoid integrity constraint violations)</li>
63  * <li>Find the primary key(s) of the table</li>
64  * <li>Query all rows in the inactive database table, sorting by the primary key(s)</li>
65  * <li>Query all rows on the active database table</li>
66  * <li>For each row in table:
67  * <ol>
68  * <li>If primary key of the rows are the same, determine whether or not row needs to be updated</li>
69  * <li>Otherwise, determine whether row should be deleted, or a new row is to be inserted</li>
70  * </ol>
71  * </li>
72  * <li>Re-create the unique constraints on the table (to avoid integrity constraint violations)</li>
73  * </ol>
74  * </li>
75  * <li>Re-create the foreign keys on the inactive database</li>
76  * </ol>
77  * @author Paul Ferraro
78  * @version $Revision: 1370 $
79  * @since 1.0
80  */

81 public class DifferentialSynchronizationStrategy implements SynchronizationStrategy
82 {
83     private static Logger logger = LoggerFactory.getLogger(DifferentialSynchronizationStrategy.class);
84
85     private ExecutorService JavaDoc executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.getInstance());
86     private int fetchSize = 0;
87     
88     /**
89      * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(Connection, Connection, Map, Dialect)
90      */

91     public void synchronize(Connection JavaDoc inactiveConnection, Connection JavaDoc activeConnection, Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> schemaMap, Dialect dialect) throws SQLException JavaDoc
92     {
93         DatabaseMetaData JavaDoc metaData = inactiveConnection.getMetaData();
94         
95         inactiveConnection.setAutoCommit(true);
96         
97         Statement JavaDoc statement = inactiveConnection.createStatement();
98
99         // Drop foreign key constraints on the inactive database
100
for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(inactiveConnection, schemaMap))
101         {
102             String JavaDoc sql = dialect.getDropForeignKeyConstraintSQL(metaData, key);
103             
104             logger.debug(sql);
105             
106             statement.addBatch(sql);
107         }
108
109         statement.executeBatch();
110         statement.clearBatch();
111         
112         Map JavaDoc<Short JavaDoc, String JavaDoc> primaryKeyColumnMap = new TreeMap JavaDoc<Short JavaDoc, String JavaDoc>();
113         Set JavaDoc<Integer JavaDoc> primaryKeyColumnIndexSet = new LinkedHashSet JavaDoc<Integer JavaDoc>();
114         
115         inactiveConnection.setAutoCommit(false);
116         
117         try
118         {
119             for (Map.Entry JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> schemaMapEntry: schemaMap.entrySet())
120             {
121                 String JavaDoc schema = schemaMapEntry.getKey();
122                 
123                 for (String JavaDoc table: schemaMapEntry.getValue())
124                 {
125                     String JavaDoc qualifiedTable = dialect.qualifyTable(metaData, schema, table);
126
127                     primaryKeyColumnMap.clear();
128                     primaryKeyColumnIndexSet.clear();
129                     
130                     // Fetch primary keys of this table
131
ResultSet JavaDoc primaryKeyResultSet = metaData.getPrimaryKeys(null, schema, table);
132                     String JavaDoc primaryKeyName = null;
133                     
134                     while (primaryKeyResultSet.next())
135                     {
136                         String JavaDoc name = primaryKeyResultSet.getString("COLUMN_NAME");
137                         short position = primaryKeyResultSet.getShort("KEY_SEQ");
138         
139                         primaryKeyColumnMap.put(position, name);
140                         
141                         primaryKeyName = primaryKeyResultSet.getString("PK_NAME");
142                     }
143                     
144                     primaryKeyResultSet.close();
145                     
146                     if (primaryKeyColumnMap.isEmpty())
147                     {
148                         throw new SQLException JavaDoc(Messages.getMessage(Messages.PRIMARY_KEY_REQUIRED, this.getClass().getName(), table));
149                     }
150                     
151                     // Drop unique constraints on the current table
152
for (UniqueConstraint constraint: UniqueConstraint.collect(inactiveConnection, schema, table, primaryKeyName))
153                     {
154                         String JavaDoc sql = dialect.getDropUniqueConstraintSQL(metaData, constraint);
155                         
156                         logger.debug(sql);
157                         
158                         statement.addBatch(sql);
159                     }
160                     
161                     statement.executeBatch();
162                     statement.clearBatch();
163                     
164                     // Retrieve table rows in primary key order
165
StringBuilder JavaDoc selectSQLBuilder = new StringBuilder JavaDoc("SELECT * FROM ").append(qualifiedTable).append(" ORDER BY ");
166                     StringBuilder JavaDoc whereClauseBuilder = new StringBuilder JavaDoc(" WHERE ");
167                     
168                     Iterator JavaDoc<String JavaDoc> primaryKeyColumns = primaryKeyColumnMap.values().iterator();
169                     
170                     while (primaryKeyColumns.hasNext())
171                     {
172                         String JavaDoc column = dialect.quote(metaData, primaryKeyColumns.next());
173                         
174                         selectSQLBuilder.append(column);
175                         whereClauseBuilder.append(column).append(" = ?");
176                         
177                         if (primaryKeyColumns.hasNext())
178                         {
179                             selectSQLBuilder.append(", ");
180                             whereClauseBuilder.append(" AND ");
181                         }
182                     }
183                     
184                     final String JavaDoc selectSQL = selectSQLBuilder.toString();
185                     
186                     final Statement JavaDoc inactiveStatement = inactiveConnection.createStatement();
187                     inactiveStatement.setFetchSize(this.fetchSize);
188         
189                     logger.debug(selectSQL);
190                     
191                     Callable JavaDoc<ResultSet JavaDoc> callable = new Callable JavaDoc<ResultSet JavaDoc>()
192                     {
193                         public ResultSet JavaDoc call() throws java.sql.SQLException JavaDoc
194                         {
195                             return inactiveStatement.executeQuery(selectSQL);
196                         }
197                     };
198         
199                     Future JavaDoc<ResultSet JavaDoc> future = this.executor.submit(callable);
200                     
201                     Statement JavaDoc activeStatement = activeConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
202                     activeStatement.setFetchSize(this.fetchSize);
203                     
204                     ResultSet JavaDoc activeResultSet = activeStatement.executeQuery(selectSQL);
205
206                     ResultSet JavaDoc inactiveResultSet = future.get();
207                     
208                     // Create set of primary key columns
209
for (String JavaDoc primaryKeyColumn: primaryKeyColumnMap.values())
210                     {
211                         primaryKeyColumnIndexSet.add(activeResultSet.findColumn(primaryKeyColumn));
212                     }
213                     
214                     // Construct DELETE SQL
215
String JavaDoc deleteSQL = "DELETE FROM " + qualifiedTable + whereClauseBuilder;
216                     
217                     logger.debug(deleteSQL);
218                     
219                     PreparedStatement JavaDoc deleteStatement = inactiveConnection.prepareStatement(deleteSQL);
220                     
221                     ResultSetMetaData JavaDoc resultSetMetaData = activeResultSet.getMetaData();
222                     int columns = resultSetMetaData.getColumnCount();
223                     int[] types = new int[columns + 1];
224                     
225                     // Construct INSERT SQL
226
StringBuilder JavaDoc insertSQLBuilder = new StringBuilder JavaDoc("INSERT INTO ").append(qualifiedTable).append(" (");
227                     StringBuilder JavaDoc updateSQLBuilder = new StringBuilder JavaDoc("UPDATE ").append(qualifiedTable).append(" SET");
228                     
229                     for (int i = 1; i <= columns; ++i)
230                     {
231                         types[i] = dialect.getColumnType(resultSetMetaData, i);
232
233                         String JavaDoc column = dialect.quote(metaData, resultSetMetaData.getColumnName(i));
234                         
235                         if (i > 1)
236                         {
237                             insertSQLBuilder.append(", ");
238                         }
239                         
240                         insertSQLBuilder.append(column);
241                         
242                         if (!primaryKeyColumnIndexSet.contains(i))
243                         {
244                             updateSQLBuilder.append(' ').append(column).append(" = ?,");
245                         }
246                     }
247                     
248                     insertSQLBuilder.append(") VALUES (");
249         
250                     for (int i = 1; i <= columns; ++i)
251                     {
252                         if (i > 1)
253                         {
254                             insertSQLBuilder.append(", ");
255                         }
256                         
257                         insertSQLBuilder.append("?");
258                     }
259         
260                     String JavaDoc insertSQL = insertSQLBuilder.append(")").toString();
261                     
262                     logger.debug(insertSQL);
263                     
264                     PreparedStatement JavaDoc insertStatement = inactiveConnection.prepareStatement(insertSQL);
265
266                     String JavaDoc updateSQL = updateSQLBuilder.deleteCharAt(updateSQLBuilder.length() - 1).append(whereClauseBuilder).toString();
267                     
268                     logger.debug(updateSQL);
269                     
270                     PreparedStatement JavaDoc updateStatement = inactiveConnection.prepareStatement(updateSQL);
271                     
272                     boolean hasMoreActiveResults = activeResultSet.next();
273                     boolean hasMoreInactiveResults = inactiveResultSet.next();
274                     
275                     int insertCount = 0;
276                     int updateCount = 0;
277                     int deleteCount = 0;
278                     
279                     while (hasMoreActiveResults || hasMoreInactiveResults)
280                     {
281                         int compare = 0;
282                         
283                         if (!hasMoreActiveResults)
284                         {
285                             compare = 1;
286                         }
287                         else if (!hasMoreInactiveResults)
288                         {
289                             compare = -1;
290                         }
291                         else
292                         {
293                             for (int column: primaryKeyColumnIndexSet)
294                             {
295                                 Object JavaDoc activeObject = activeResultSet.getObject(column);
296                                 Object JavaDoc inactiveObject = inactiveResultSet.getObject(column);
297                                 
298                                 compare = Comparable JavaDoc.class.cast(activeObject).compareTo(inactiveObject);
299                                 
300                                 if (compare != 0)
301                                 {
302                                     break;
303                                 }
304                             }
305                         }
306                         
307                         if (compare > 0)
308                         {
309                             deleteStatement.clearParameters();
310                             
311                             int index = 0;
312                             
313                             for (int column: primaryKeyColumnIndexSet)
314                             {
315                                 index += 1;
316                                 
317                                 deleteStatement.setObject(index, inactiveResultSet.getObject(column), types[column]);
318                             }
319                             
320                             deleteStatement.addBatch();
321                             
322                             deleteCount += 1;
323                         }
324                         else if (compare < 0)
325                         {
326                             insertStatement.clearParameters();
327         
328                             for (int i = 1; i <= columns; ++i)
329                             {
330                                 int type = types[i];
331                                 
332                                 Object JavaDoc object = this.getObject(activeResultSet, i, type);
333                                 
334                                 if (activeResultSet.wasNull())
335                                 {
336                                     insertStatement.setNull(i, type);
337                                 }
338                                 else
339                                 {
340                                     insertStatement.setObject(i, object, type);
341                                 }
342                             }
343                             
344                             insertStatement.addBatch();
345                             
346                             insertCount += 1;
347                         }
348                         else // if (compare == 0)
349
{
350                             updateStatement.clearParameters();
351                             
352                             int index = 0;
353                             boolean updated = false;
354                             
355                             for (int i = 1; i <= columns; ++i)
356                             {
357                                 if (!primaryKeyColumnIndexSet.contains(i))
358                                 {
359                                     index += 1;
360                                     
361                                     int type = types[i];
362                                     
363                                     Object JavaDoc activeObject = this.getObject(activeResultSet, i, type);
364                                     Object JavaDoc inactiveObject = this.getObject(inactiveResultSet, i, type);
365                                     
366                                     if (activeResultSet.wasNull())
367                                     {
368                                         updateStatement.setNull(index, type);
369                                         
370                                         updated |= !inactiveResultSet.wasNull();
371                                     }
372                                     else
373                                     {
374                                         updateStatement.setObject(index, activeObject, type);
375                                         
376                                         updated |= inactiveResultSet.wasNull();
377                                         updated |= !equals(activeObject, inactiveObject);
378                                     }
379                                 }
380                             }
381                             
382                             if (updated)
383                             {
384                                 for (int column: primaryKeyColumnIndexSet)
385                                 {
386                                     index += 1;
387                                     
388                                     updateStatement.setObject(index, activeResultSet.getObject(column), types[column]);
389                                 }
390                                 
391                                 updateStatement.addBatch();
392                                 
393                                 updateCount += 1;
394                             }
395                         }
396                         
397                         if (hasMoreActiveResults && (compare <= 0))
398                         {
399                             hasMoreActiveResults = activeResultSet.next();
400                         }
401                         
402                         if (hasMoreInactiveResults && (compare >= 0))
403                         {
404                             hasMoreInactiveResults = inactiveResultSet.next();
405                         }
406                     }
407                     
408                     if (deleteCount > 0)
409                     {
410                         deleteStatement.executeBatch();
411                     }
412                     
413                     deleteStatement.close();
414                     
415                     if (insertCount > 0)
416                     {
417                         insertStatement.executeBatch();
418                     }
419                     
420                     insertStatement.close();
421                     
422                     if (updateCount > 0)
423                     {
424                         updateStatement.executeBatch();
425                     }
426                     
427                     updateStatement.close();
428                     
429                     inactiveStatement.close();
430                     activeStatement.close();
431                     
432                     // Collect unique constraints on this table from the active database and re-create them on the inactive database
433
for (UniqueConstraint constraint: UniqueConstraint.collect(activeConnection, schema, table, primaryKeyName))
434                     {
435                         statement.addBatch(dialect.getCreateUniqueConstraintSQL(metaData, constraint));
436                     }
437                     
438                     statement.executeBatch();
439                     statement.clearBatch();
440                     
441                     inactiveConnection.commit();
442                     
443                     logger.info(Messages.getMessage(Messages.INSERT_COUNT, insertCount, qualifiedTable));
444                     logger.info(Messages.getMessage(Messages.UPDATE_COUNT, updateCount, qualifiedTable));
445                     logger.info(Messages.getMessage(Messages.DELETE_COUNT, deleteCount, qualifiedTable));
446                 }
447             }
448         }
449         catch (ExecutionException JavaDoc e)
450         {
451             this.rollback(inactiveConnection);
452             
453             throw new net.sf.hajdbc.SQLException(e.getCause());
454         }
455         catch (InterruptedException JavaDoc e)
456         {
457             this.rollback(inactiveConnection);
458             
459             throw new net.sf.hajdbc.SQLException(e);
460         }
461         catch (SQLException JavaDoc e)
462         {
463             this.rollback(inactiveConnection);
464             
465             throw e;
466         }
467         
468         inactiveConnection.setAutoCommit(true);
469
470         // Collect foreign key constraints from the active database and create them on the inactive database
471
for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(activeConnection, schemaMap))
472         {
473             statement.addBatch(dialect.getCreateForeignKeyConstraintSQL(metaData, key));
474         }
475         
476         statement.executeBatch();
477         statement.close();
478     }
479
480     private Object JavaDoc getObject(ResultSet JavaDoc resultSet, int index, int type) throws SQLException JavaDoc
481     {
482         switch (type)
483         {
484             case Types.BLOB:
485             {
486                 return resultSet.getBlob(index);
487             }
488             case Types.CLOB:
489             {
490                 return resultSet.getClob(index);
491             }
492             default:
493             {
494                 return resultSet.getObject(index);
495             }
496         }
497     }
498
499     private boolean equals(Object JavaDoc object1, Object JavaDoc object2)
500     {
501         if (byte[].class.isInstance(object1) && byte[].class.isInstance(object2))
502         {
503             byte[] bytes1 = (byte[]) object1;
504             byte[] bytes2 = (byte[]) object2;
505             
506             if (bytes1.length != bytes2.length)
507             {
508                 return false;
509             }
510             
511             return Arrays.equals(bytes1, bytes2);
512         }
513         
514         return object1.equals(object2);
515     }
516     
517     /**
518      * @see net.sf.hajdbc.SynchronizationStrategy#requiresTableLocking()
519      */

520     public boolean requiresTableLocking()
521     {
522         return true;
523     }
524     
525     private void rollback(Connection JavaDoc connection)
526     {
527         try
528         {
529             connection.rollback();
530             connection.setAutoCommit(true);
531         }
532         catch (java.sql.SQLException JavaDoc e)
533         {
534             logger.warn(e.toString(), e);
535         }
536     }
537
538     /**
539      * @return the fetchSize.
540      */

541     public int getFetchSize()
542     {
543         return this.fetchSize;
544     }
545
546     /**
547      * @param fetchSize the fetchSize to set.
548      */

549     public void setFetchSize(int fetchSize)
550     {
551         this.fetchSize = fetchSize;
552     }
553 }
554
Popular Tags