1 21 package net.sf.hajdbc.sync; 22 23 import java.sql.Connection ; 24 import java.sql.DatabaseMetaData ; 25 import java.sql.PreparedStatement ; 26 import java.sql.ResultSet ; 27 import java.sql.ResultSetMetaData ; 28 import java.sql.SQLException ; 29 import java.sql.Statement ; 30 import java.sql.Types ; 31 import java.util.Arrays ; 32 import java.util.Iterator ; 33 import java.util.LinkedHashSet ; 34 import java.util.List ; 35 import java.util.Map ; 36 import java.util.Set ; 37 import java.util.TreeMap ; 38 import java.util.concurrent.Callable ; 39 import java.util.concurrent.ExecutionException ; 40 import java.util.concurrent.ExecutorService ; 41 import java.util.concurrent.Executors ; 42 import java.util.concurrent.Future ; 43 44 import net.sf.hajdbc.Dialect; 45 import net.sf.hajdbc.ForeignKeyConstraint; 46 import net.sf.hajdbc.Messages; 47 import net.sf.hajdbc.SynchronizationStrategy; 48 import net.sf.hajdbc.UniqueConstraint; 49 import net.sf.hajdbc.util.concurrent.DaemonThreadFactory; 50 51 import org.slf4j.Logger; 52 import org.slf4j.LoggerFactory; 53 54 81 public class DifferentialSynchronizationStrategy implements SynchronizationStrategy 82 { 83 private static Logger logger = LoggerFactory.getLogger(DifferentialSynchronizationStrategy.class); 84 85 private ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.getInstance()); 86 private int fetchSize = 0; 87 88 91 public void synchronize(Connection inactiveConnection, Connection activeConnection, Map <String , List <String >> schemaMap, Dialect dialect) throws SQLException 92 { 93 DatabaseMetaData metaData = inactiveConnection.getMetaData(); 94 95 inactiveConnection.setAutoCommit(true); 96 97 Statement statement = inactiveConnection.createStatement(); 98 99 for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(inactiveConnection, schemaMap)) 101 { 102 String sql = dialect.getDropForeignKeyConstraintSQL(metaData, key); 103 104 logger.debug(sql); 105 106 statement.addBatch(sql); 107 } 108 109 statement.executeBatch(); 110 statement.clearBatch(); 111 112 Map <Short , String > primaryKeyColumnMap = new TreeMap <Short , String >(); 113 Set <Integer > primaryKeyColumnIndexSet = new LinkedHashSet <Integer >(); 114 115 inactiveConnection.setAutoCommit(false); 116 117 try 118 { 119 for (Map.Entry <String , List <String >> schemaMapEntry: schemaMap.entrySet()) 120 { 121 String schema = schemaMapEntry.getKey(); 122 123 for (String table: schemaMapEntry.getValue()) 124 { 125 String qualifiedTable = dialect.qualifyTable(metaData, schema, table); 126 127 primaryKeyColumnMap.clear(); 128 primaryKeyColumnIndexSet.clear(); 129 130 ResultSet primaryKeyResultSet = metaData.getPrimaryKeys(null, schema, table); 132 String primaryKeyName = null; 133 134 while (primaryKeyResultSet.next()) 135 { 136 String name = primaryKeyResultSet.getString("COLUMN_NAME"); 137 short position = primaryKeyResultSet.getShort("KEY_SEQ"); 138 139 primaryKeyColumnMap.put(position, name); 140 141 primaryKeyName = primaryKeyResultSet.getString("PK_NAME"); 142 } 143 144 primaryKeyResultSet.close(); 145 146 if (primaryKeyColumnMap.isEmpty()) 147 { 148 throw new SQLException (Messages.getMessage(Messages.PRIMARY_KEY_REQUIRED, this.getClass().getName(), table)); 149 } 150 151 for (UniqueConstraint constraint: UniqueConstraint.collect(inactiveConnection, schema, table, primaryKeyName)) 153 { 154 String sql = dialect.getDropUniqueConstraintSQL(metaData, constraint); 155 156 logger.debug(sql); 157 158 statement.addBatch(sql); 159 } 160 161 statement.executeBatch(); 162 statement.clearBatch(); 163 164 StringBuilder selectSQLBuilder = new StringBuilder ("SELECT * FROM ").append(qualifiedTable).append(" ORDER BY "); 166 StringBuilder whereClauseBuilder = new StringBuilder (" WHERE "); 167 168 Iterator <String > primaryKeyColumns = primaryKeyColumnMap.values().iterator(); 169 170 while (primaryKeyColumns.hasNext()) 171 { 172 String column = dialect.quote(metaData, primaryKeyColumns.next()); 173 174 selectSQLBuilder.append(column); 175 whereClauseBuilder.append(column).append(" = ?"); 176 177 if (primaryKeyColumns.hasNext()) 178 { 179 selectSQLBuilder.append(", "); 180 whereClauseBuilder.append(" AND "); 181 } 182 } 183 184 final String selectSQL = selectSQLBuilder.toString(); 185 186 final Statement inactiveStatement = inactiveConnection.createStatement(); 187 inactiveStatement.setFetchSize(this.fetchSize); 188 189 logger.debug(selectSQL); 190 191 Callable <ResultSet > callable = new Callable <ResultSet >() 192 { 193 public ResultSet call() throws java.sql.SQLException 194 { 195 return inactiveStatement.executeQuery(selectSQL); 196 } 197 }; 198 199 Future <ResultSet > future = this.executor.submit(callable); 200 201 Statement activeStatement = activeConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 202 activeStatement.setFetchSize(this.fetchSize); 203 204 ResultSet activeResultSet = activeStatement.executeQuery(selectSQL); 205 206 ResultSet inactiveResultSet = future.get(); 207 208 for (String primaryKeyColumn: primaryKeyColumnMap.values()) 210 { 211 primaryKeyColumnIndexSet.add(activeResultSet.findColumn(primaryKeyColumn)); 212 } 213 214 String deleteSQL = "DELETE FROM " + qualifiedTable + whereClauseBuilder; 216 217 logger.debug(deleteSQL); 218 219 PreparedStatement deleteStatement = inactiveConnection.prepareStatement(deleteSQL); 220 221 ResultSetMetaData resultSetMetaData = activeResultSet.getMetaData(); 222 int columns = resultSetMetaData.getColumnCount(); 223 int[] types = new int[columns + 1]; 224 225 StringBuilder insertSQLBuilder = new StringBuilder ("INSERT INTO ").append(qualifiedTable).append(" ("); 227 StringBuilder updateSQLBuilder = new StringBuilder ("UPDATE ").append(qualifiedTable).append(" SET"); 228 229 for (int i = 1; i <= columns; ++i) 230 { 231 types[i] = dialect.getColumnType(resultSetMetaData, i); 232 233 String column = dialect.quote(metaData, resultSetMetaData.getColumnName(i)); 234 235 if (i > 1) 236 { 237 insertSQLBuilder.append(", "); 238 } 239 240 insertSQLBuilder.append(column); 241 242 if (!primaryKeyColumnIndexSet.contains(i)) 243 { 244 updateSQLBuilder.append(' ').append(column).append(" = ?,"); 245 } 246 } 247 248 insertSQLBuilder.append(") VALUES ("); 249 250 for (int i = 1; i <= columns; ++i) 251 { 252 if (i > 1) 253 { 254 insertSQLBuilder.append(", "); 255 } 256 257 insertSQLBuilder.append("?"); 258 } 259 260 String insertSQL = insertSQLBuilder.append(")").toString(); 261 262 logger.debug(insertSQL); 263 264 PreparedStatement insertStatement = inactiveConnection.prepareStatement(insertSQL); 265 266 String updateSQL = updateSQLBuilder.deleteCharAt(updateSQLBuilder.length() - 1).append(whereClauseBuilder).toString(); 267 268 logger.debug(updateSQL); 269 270 PreparedStatement updateStatement = inactiveConnection.prepareStatement(updateSQL); 271 272 boolean hasMoreActiveResults = activeResultSet.next(); 273 boolean hasMoreInactiveResults = inactiveResultSet.next(); 274 275 int insertCount = 0; 276 int updateCount = 0; 277 int deleteCount = 0; 278 279 while (hasMoreActiveResults || hasMoreInactiveResults) 280 { 281 int compare = 0; 282 283 if (!hasMoreActiveResults) 284 { 285 compare = 1; 286 } 287 else if (!hasMoreInactiveResults) 288 { 289 compare = -1; 290 } 291 else 292 { 293 for (int column: primaryKeyColumnIndexSet) 294 { 295 Object activeObject = activeResultSet.getObject(column); 296 Object inactiveObject = inactiveResultSet.getObject(column); 297 298 compare = Comparable .class.cast(activeObject).compareTo(inactiveObject); 299 300 if (compare != 0) 301 { 302 break; 303 } 304 } 305 } 306 307 if (compare > 0) 308 { 309 deleteStatement.clearParameters(); 310 311 int index = 0; 312 313 for (int column: primaryKeyColumnIndexSet) 314 { 315 index += 1; 316 317 deleteStatement.setObject(index, inactiveResultSet.getObject(column), types[column]); 318 } 319 320 deleteStatement.addBatch(); 321 322 deleteCount += 1; 323 } 324 else if (compare < 0) 325 { 326 insertStatement.clearParameters(); 327 328 for (int i = 1; i <= columns; ++i) 329 { 330 int type = types[i]; 331 332 Object object = this.getObject(activeResultSet, i, type); 333 334 if (activeResultSet.wasNull()) 335 { 336 insertStatement.setNull(i, type); 337 } 338 else 339 { 340 insertStatement.setObject(i, object, type); 341 } 342 } 343 344 insertStatement.addBatch(); 345 346 insertCount += 1; 347 } 348 else { 350 updateStatement.clearParameters(); 351 352 int index = 0; 353 boolean updated = false; 354 355 for (int i = 1; i <= columns; ++i) 356 { 357 if (!primaryKeyColumnIndexSet.contains(i)) 358 { 359 index += 1; 360 361 int type = types[i]; 362 363 Object activeObject = this.getObject(activeResultSet, i, type); 364 Object inactiveObject = this.getObject(inactiveResultSet, i, type); 365 366 if (activeResultSet.wasNull()) 367 { 368 updateStatement.setNull(index, type); 369 370 updated |= !inactiveResultSet.wasNull(); 371 } 372 else 373 { 374 updateStatement.setObject(index, activeObject, type); 375 376 updated |= inactiveResultSet.wasNull(); 377 updated |= !equals(activeObject, inactiveObject); 378 } 379 } 380 } 381 382 if (updated) 383 { 384 for (int column: primaryKeyColumnIndexSet) 385 { 386 index += 1; 387 388 updateStatement.setObject(index, activeResultSet.getObject(column), types[column]); 389 } 390 391 updateStatement.addBatch(); 392 393 updateCount += 1; 394 } 395 } 396 397 if (hasMoreActiveResults && (compare <= 0)) 398 { 399 hasMoreActiveResults = activeResultSet.next(); 400 } 401 402 if (hasMoreInactiveResults && (compare >= 0)) 403 { 404 hasMoreInactiveResults = inactiveResultSet.next(); 405 } 406 } 407 408 if (deleteCount > 0) 409 { 410 deleteStatement.executeBatch(); 411 } 412 413 deleteStatement.close(); 414 415 if (insertCount > 0) 416 { 417 insertStatement.executeBatch(); 418 } 419 420 insertStatement.close(); 421 422 if (updateCount > 0) 423 { 424 updateStatement.executeBatch(); 425 } 426 427 updateStatement.close(); 428 429 inactiveStatement.close(); 430 activeStatement.close(); 431 432 for (UniqueConstraint constraint: UniqueConstraint.collect(activeConnection, schema, table, primaryKeyName)) 434 { 435 statement.addBatch(dialect.getCreateUniqueConstraintSQL(metaData, constraint)); 436 } 437 438 statement.executeBatch(); 439 statement.clearBatch(); 440 441 inactiveConnection.commit(); 442 443 logger.info(Messages.getMessage(Messages.INSERT_COUNT, insertCount, qualifiedTable)); 444 logger.info(Messages.getMessage(Messages.UPDATE_COUNT, updateCount, qualifiedTable)); 445 logger.info(Messages.getMessage(Messages.DELETE_COUNT, deleteCount, qualifiedTable)); 446 } 447 } 448 } 449 catch (ExecutionException e) 450 { 451 this.rollback(inactiveConnection); 452 453 throw new net.sf.hajdbc.SQLException(e.getCause()); 454 } 455 catch (InterruptedException e) 456 { 457 this.rollback(inactiveConnection); 458 459 throw new net.sf.hajdbc.SQLException(e); 460 } 461 catch (SQLException e) 462 { 463 this.rollback(inactiveConnection); 464 465 throw e; 466 } 467 468 inactiveConnection.setAutoCommit(true); 469 470 for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(activeConnection, schemaMap)) 472 { 473 statement.addBatch(dialect.getCreateForeignKeyConstraintSQL(metaData, key)); 474 } 475 476 statement.executeBatch(); 477 statement.close(); 478 } 479 480 private Object getObject(ResultSet resultSet, int index, int type) throws SQLException 481 { 482 switch (type) 483 { 484 case Types.BLOB: 485 { 486 return resultSet.getBlob(index); 487 } 488 case Types.CLOB: 489 { 490 return resultSet.getClob(index); 491 } 492 default: 493 { 494 return resultSet.getObject(index); 495 } 496 } 497 } 498 499 private boolean equals(Object object1, Object object2) 500 { 501 if (byte[].class.isInstance(object1) && byte[].class.isInstance(object2)) 502 { 503 byte[] bytes1 = (byte[]) object1; 504 byte[] bytes2 = (byte[]) object2; 505 506 if (bytes1.length != bytes2.length) 507 { 508 return false; 509 } 510 511 return Arrays.equals(bytes1, bytes2); 512 } 513 514 return object1.equals(object2); 515 } 516 517 520 public boolean requiresTableLocking() 521 { 522 return true; 523 } 524 525 private void rollback(Connection connection) 526 { 527 try 528 { 529 connection.rollback(); 530 connection.setAutoCommit(true); 531 } 532 catch (java.sql.SQLException e) 533 { 534 logger.warn(e.toString(), e); 535 } 536 } 537 538 541 public int getFetchSize() 542 { 543 return this.fetchSize; 544 } 545 546 549 public void setFetchSize(int fetchSize) 550 { 551 this.fetchSize = fetchSize; 552 } 553 } 554 | Popular Tags |