1 21 package net.sf.hajdbc.sync; 22 23 import java.sql.Connection ; 24 import java.sql.DatabaseMetaData ; 25 import java.sql.PreparedStatement ; 26 import java.sql.ResultSet ; 27 import java.sql.ResultSetMetaData ; 28 import java.sql.SQLException ; 29 import java.sql.Statement ; 30 import java.sql.Types ; 31 import java.util.List ; 32 import java.util.Map ; 33 import java.util.concurrent.Callable ; 34 import java.util.concurrent.ExecutionException ; 35 import java.util.concurrent.ExecutorService ; 36 import java.util.concurrent.Executors ; 37 import java.util.concurrent.Future ; 38 39 import net.sf.hajdbc.Dialect; 40 import net.sf.hajdbc.ForeignKeyConstraint; 41 import net.sf.hajdbc.Messages; 42 import net.sf.hajdbc.SynchronizationStrategy; 43 import net.sf.hajdbc.util.concurrent.DaemonThreadFactory; 44 45 import org.slf4j.Logger; 46 import org.slf4j.LoggerFactory; 47 48 71 public class FullSynchronizationStrategy implements SynchronizationStrategy 72 { 73 private static Logger logger = LoggerFactory.getLogger(FullSynchronizationStrategy.class); 74 75 private ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.getInstance()); 76 private int maxBatchSize = 100; 77 private int fetchSize = 0; 78 79 82 public void synchronize(Connection inactiveConnection, Connection activeConnection, Map <String , List <String >> schemaMap, Dialect dialect) throws SQLException 83 { 84 inactiveConnection.setAutoCommit(true); 85 86 DatabaseMetaData metaData = inactiveConnection.getMetaData(); 87 88 Statement statement = inactiveConnection.createStatement(); 89 90 for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(inactiveConnection, schemaMap)) 92 { 93 String sql = dialect.getDropForeignKeyConstraintSQL(metaData, key); 94 95 logger.debug(sql); 96 97 statement.addBatch(sql); 98 } 99 100 statement.executeBatch(); 101 statement.clearBatch(); 102 103 inactiveConnection.setAutoCommit(false); 104 105 try 106 { 107 for (Map.Entry <String , List <String >> schemaMapEntry: schemaMap.entrySet()) 108 { 109 String schema = schemaMapEntry.getKey(); 110 111 for (String table: schemaMapEntry.getValue()) 112 { 113 String qualifiedTable = dialect.qualifyTable(metaData, schema, table); 114 115 final String selectSQL = "SELECT * FROM " + qualifiedTable; 116 117 final Statement selectStatement = activeConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 118 selectStatement.setFetchSize(this.fetchSize); 119 120 Callable <ResultSet > callable = new Callable <ResultSet >() 121 { 122 public ResultSet call() throws SQLException 123 { 124 return selectStatement.executeQuery(selectSQL); 125 } 126 }; 127 128 Future <ResultSet > future = this.executor.submit(callable); 129 130 String deleteSQL = dialect.getTruncateTableSQL(metaData, schema, table); 131 132 logger.debug(deleteSQL); 133 134 Statement deleteStatement = inactiveConnection.createStatement(); 135 136 int deletedRows = deleteStatement.executeUpdate(deleteSQL); 137 138 logger.info(Messages.getMessage(Messages.DELETE_COUNT, deletedRows, qualifiedTable)); 139 140 deleteStatement.close(); 141 142 ResultSet resultSet = future.get(); 143 144 StringBuilder builder = new StringBuilder ("INSERT INTO ").append(qualifiedTable).append(" ("); 145 146 ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); 147 148 int columns = resultSetMetaData.getColumnCount(); 149 150 for (int i = 1; i <= columns; ++i) 151 { 152 if (i > 1) 153 { 154 builder.append(", "); 155 } 156 157 builder.append(dialect.quote(metaData, resultSetMetaData.getColumnName(i))); 158 } 159 160 builder.append(") VALUES ("); 161 162 for (int i = 1; i <= columns; ++i) 163 { 164 if (i > 1) 165 { 166 builder.append(", "); 167 } 168 169 builder.append("?"); 170 } 171 172 String insertSQL = builder.append(")").toString(); 173 174 logger.debug(insertSQL); 175 176 PreparedStatement insertStatement = inactiveConnection.prepareStatement(insertSQL); 177 int statementCount = 0; 178 179 while (resultSet.next()) 180 { 181 for (int i = 1; i <= columns; ++i) 182 { 183 int type = dialect.getColumnType(resultSetMetaData, i); 184 185 Object object = this.getObject(resultSet, i, type); 186 187 if (resultSet.wasNull()) 188 { 189 insertStatement.setNull(i, type); 190 } 191 else 192 { 193 insertStatement.setObject(i, object, type); 194 } 195 } 196 197 insertStatement.addBatch(); 198 statementCount += 1; 199 200 if ((statementCount % this.maxBatchSize) == 0) 201 { 202 insertStatement.executeBatch(); 203 insertStatement.clearBatch(); 204 } 205 206 insertStatement.clearParameters(); 207 } 208 209 if ((statementCount % this.maxBatchSize) > 0) 210 { 211 insertStatement.executeBatch(); 212 } 213 214 logger.info(Messages.getMessage(Messages.INSERT_COUNT, statementCount, qualifiedTable)); 215 216 insertStatement.close(); 217 selectStatement.close(); 218 219 inactiveConnection.commit(); 220 } 221 } 222 } 223 catch (InterruptedException e) 224 { 225 this.rollback(inactiveConnection); 226 227 throw new net.sf.hajdbc.SQLException(e); 228 } 229 catch (ExecutionException e) 230 { 231 this.rollback(inactiveConnection); 232 233 throw new net.sf.hajdbc.SQLException(e.getCause()); 234 } 235 catch (SQLException e) 236 { 237 this.rollback(inactiveConnection); 238 239 throw e; 240 } 241 242 inactiveConnection.setAutoCommit(true); 243 244 for (ForeignKeyConstraint key: ForeignKeyConstraint.collect(activeConnection, schemaMap)) 246 { 247 String sql = dialect.getCreateForeignKeyConstraintSQL(metaData, key); 248 249 logger.debug(sql); 250 251 statement.addBatch(sql); 252 } 253 254 statement.executeBatch(); 255 statement.close(); 256 } 257 258 261 public boolean requiresTableLocking() 262 { 263 return true; 264 } 265 266 private Object getObject(ResultSet resultSet, int index, int type) throws SQLException 267 { 268 switch (type) 269 { 270 case Types.BLOB: 271 { 272 return resultSet.getBlob(index); 273 } 274 case Types.CLOB: 275 { 276 return resultSet.getClob(index); 277 } 278 default: 279 { 280 return resultSet.getObject(index); 281 } 282 } 283 } 284 285 private void rollback(Connection connection) 286 { 287 try 288 { 289 connection.rollback(); 290 connection.setAutoCommit(true); 291 } 292 catch (java.sql.SQLException e) 293 { 294 logger.warn(e.toString(), e); 295 } 296 } 297 298 301 public int getFetchSize() 302 { 303 return this.fetchSize; 304 } 305 306 309 public void setFetchSize(int fetchSize) 310 { 311 this.fetchSize = fetchSize; 312 } 313 314 317 public int getMaxBatchSize() 318 { 319 return this.maxBatchSize; 320 } 321 322 325 public void setMaxBatchSize(int maxBatchSize) 326 { 327 this.maxBatchSize = maxBatchSize; 328 } 329 } 330 | Popular Tags |