1 45 package org.exolab.jms.tools.db.migration; 46 47 import java.io.ByteArrayInputStream ; 48 import java.sql.Connection ; 49 import java.sql.PreparedStatement ; 50 import java.sql.ResultSet ; 51 import java.sql.SQLException ; 52 53 import org.exolab.jms.persistence.PersistenceException; 54 import org.exolab.jms.persistence.SQLHelper; 55 import org.exolab.jms.tools.db.Database; 56 import org.exolab.jms.tools.db.RDBMSTool; 57 import org.exolab.jms.tools.db.SchemaConverter; 58 import org.exolab.jms.tools.db.SchemaHelper; 59 import org.exolab.jms.tools.db.Table; 60 61 62 68 public class V072toV076SchemaConverter implements SchemaConverter { 69 70 73 private Connection _connection; 74 75 78 private RDBMSTool _tool; 79 80 83 private static final String USERS_TABLE = "users"; 84 85 88 private static final String MESSAGES_TABLE = "messages"; 89 90 93 private static final String HANDLES_TABLE = "message_handles"; 94 95 96 101 public V072toV076SchemaConverter(Connection connection) { 102 _connection = connection; 103 } 104 105 public void convert() throws PersistenceException { 106 Database schema = SchemaHelper.getSchema(); 107 try { 108 if (_connection.getAutoCommit()) { 109 _connection.setAutoCommit(false); 110 } 111 _tool = new RDBMSTool(_connection); 112 } catch (SQLException exception) { 113 throw new PersistenceException(exception.getMessage()); 114 } 115 116 try { 117 convertMessagesTable(schema); 118 convertHandlesTable(schema); 119 createUsersTable(schema); 120 SchemaHelper.setVersion(_connection, "V0.7.6"); 121 _connection.commit(); 122 } catch (PersistenceException exception) { 123 SQLHelper.rollback(_connection); 124 throw exception; 125 } catch (SQLException exception) { 126 SQLHelper.rollback(_connection); 127 throw new PersistenceException(exception); 128 } 129 } 130 131 134 private void convertMessagesTable(Database schema) 135 throws PersistenceException, SQLException { 136 Table table = SchemaHelper.getTable(schema, MESSAGES_TABLE); 137 138 Table tmpTable = new Table(); 140 String tmpName = "openjms_tmp_" + MESSAGES_TABLE; 141 tmpTable.setName(tmpName); 142 tmpTable.setAttribute(table.getAttribute()); 143 144 _tool.drop(tmpTable); 145 _tool.create(tmpTable); 146 147 PreparedStatement select = _connection.prepareStatement( 150 "select messageid, destinationid, priority, createTime," 151 + "expiryTime, processed, messageBlob from " + MESSAGES_TABLE); 152 ResultSet set = select.executeQuery(); 153 while (set.next()) { 154 long id = set.getLong(1); 155 long destinationId = set.getLong(2); 156 int priority = set.getInt(3); 157 long createTime = set.getLong(4); 158 long expiryTime = set.getLong(5); 159 int processed = set.getInt(6); 160 byte[] blob = set.getBytes(7); 161 String messageId = "ID:" + id; 162 migrateMessage(tmpName, messageId, destinationId, priority, 163 createTime, expiryTime, processed, blob); 164 } 165 set.close(); 166 select.close(); 167 168 _tool.drop(table); 170 _tool.create(table); 171 172 select = _connection.prepareStatement( 174 "select messageid, destinationid, priority, createTime," 175 + "expiryTime, processed, messageBlob from " + tmpName); 176 177 set = select.executeQuery(); 178 while (set.next()) { 179 String messageId = set.getString(1); 180 long destinationId = set.getLong(2); 181 int priority = set.getInt(3); 182 long createTime = set.getLong(4); 183 long expiryTime = set.getLong(5); 184 int processed = set.getInt(6); 185 byte[] blob = set.getBytes(7); 186 migrateMessage(tmpName, messageId, destinationId, priority, 187 createTime, expiryTime, processed, blob); 188 } 189 set.close(); 190 select.close(); 191 192 _tool.drop(tmpTable); 194 } 195 196 private void migrateMessage(String table, String messageId, 197 long destinationId, int priority, 198 long createTime, long expiryTime, 199 int processed, byte[] blob) 200 throws SQLException { 201 PreparedStatement insert = null; 202 try { 203 insert = _connection.prepareStatement( 205 "insert into " + table + " values (?,?,?,?,?,?,?)"); 206 insert.setString(1, messageId); 207 insert.setLong(2, destinationId); 208 insert.setInt(3, priority); 209 insert.setLong(4, createTime); 210 insert.setLong(5, expiryTime); 211 insert.setInt(6, processed); 212 insert.setBinaryStream(7, new ByteArrayInputStream (blob), 213 blob.length); 214 215 if (insert.executeUpdate() != 1) { 217 throw new SQLException ("Failed to add message=" + messageId); 218 } 219 } finally { 220 SQLHelper.close(insert); 221 } 222 } 223 224 227 private void convertHandlesTable(Database schema) 228 throws PersistenceException, SQLException { 229 230 Table table = SchemaHelper.getTable(schema, HANDLES_TABLE); 231 232 Table tmpTable = new Table(); 234 String tmpName = "openjms_tmp_" + HANDLES_TABLE; 235 tmpTable.setName(tmpName); 236 tmpTable.setAttribute(table.getAttribute()); 237 238 _tool.drop(tmpTable); 239 _tool.create(tmpTable); 240 241 PreparedStatement select = _connection.prepareStatement( 244 "select messageid, destinationid, consumerid, priority, " 245 + " acceptedTime, sequenceNumber, expiryTime, delivered" 246 + " from " + HANDLES_TABLE); 247 ResultSet set = select.executeQuery(); 248 while (set.next()) { 249 long messageId = set.getLong(1); 250 long destinationId = set.getLong(2); 251 long consumerId = set.getLong(3); 252 int priority = set.getInt(4); 253 long acceptedTime = set.getLong(5); 254 long sequenceNo = set.getLong(6); 255 long expiryTime = set.getLong(7); 256 int delivered = set.getInt(8); 257 migrateHandle(tmpName, messageId, destinationId, consumerId, 258 priority, acceptedTime, sequenceNo, expiryTime, 259 delivered); 260 } 261 set.close(); 262 select.close(); 263 264 _tool.drop(table); 266 _tool.create(table); 267 268 select = _connection.prepareStatement( 270 "insert into " + HANDLES_TABLE + " select * from " + 271 tmpName); 272 select.executeQuery(); 273 select.close(); 274 275 _tool.drop(tmpTable); 277 } 278 279 private void migrateHandle(String table, long messageId, 280 long destinationId, long consumerId, 281 int priority, long acceptedTime, 282 long sequenceNo, long expiryTime, 283 int delivered) throws SQLException { 284 PreparedStatement insert = null; 285 try { 286 insert = _connection.prepareStatement( 288 "insert into " + table + " values (?,?,?,?,?,?,?,?)"); 289 insert.setString(1, "ID:" + messageId); 290 insert.setLong(2, destinationId); 291 insert.setLong(3, consumerId); 292 insert.setInt(4, priority); 293 insert.setLong(5, acceptedTime); 294 insert.setLong(6, sequenceNo); 295 insert.setLong(7, expiryTime); 296 insert.setInt(8, delivered); 297 298 if (insert.executeUpdate() != 1) { 300 throw new SQLException ("Failed to add handle=" + messageId); 301 } 302 } finally { 303 SQLHelper.close(insert); 304 } 305 } 306 307 private void createUsersTable(Database schema) 308 throws PersistenceException { 309 Table table = SchemaHelper.getTable(schema, USERS_TABLE); 310 _tool.create(table); 311 } 312 313 314 } | Popular Tags |