1 18 package org.apache.activemq.store.jdbc; 19 20 import java.io.IOException ; 21 import java.sql.Connection ; 22 import java.sql.PreparedStatement ; 23 import java.sql.SQLException ; 24 import java.sql.Statement ; 25 26 import javax.sql.DataSource ; 27 28 import org.apache.activemq.util.IOExceptionSupport; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 37 public class TransactionContext { 38 39 private static final Log log = LogFactory.getLog(TransactionContext.class); 40 41 private final DataSource dataSource; 42 private Connection connection; 43 private boolean inTx; 44 private PreparedStatement addMessageStatement; 45 private PreparedStatement removedMessageStatement; 46 private PreparedStatement updateLastAckStatement; 47 48 public TransactionContext(DataSource dataSource) { 49 this.dataSource = dataSource; 50 } 51 52 public Connection getConnection() throws IOException { 53 if( connection == null ) { 54 try { 55 connection = dataSource.getConnection(); 56 boolean autoCommit = !inTx; 57 if (connection.getAutoCommit() != autoCommit) { 58 connection.setAutoCommit(autoCommit); 59 } 60 } catch (SQLException e) { 61 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); 62 throw IOExceptionSupport.create(e); 63 } 64 65 try { 66 connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); 67 } catch (Throwable e) { 68 } 69 } 70 return connection; 71 } 72 73 public void executeBatch() throws SQLException { 74 try { 75 executeBatch(addMessageStatement, "Failed add a message"); 76 } finally { 77 addMessageStatement = null; 78 try { 79 executeBatch(removedMessageStatement, "Failed to remove a message"); 80 } finally { 81 removedMessageStatement=null; 82 try { 83 executeBatch(updateLastAckStatement, "Failed to ack a message"); 84 } finally { 85 updateLastAckStatement=null; 86 } 87 } 88 } 89 } 90 91 private void executeBatch(PreparedStatement p, String message) throws SQLException { 92 if( p == null ) 93 return; 94 95 try { 96 int[] rc = p.executeBatch(); 97 for (int i = 0; i < rc.length; i++) { 98 int code = rc[i]; 99 if ( code < 0 && code != Statement.SUCCESS_NO_INFO ) { 100 throw new SQLException (message + ". Response code: " + code); 101 } 102 } 103 } finally { 104 try { p.close(); } catch (Throwable e) { } 105 } 106 } 107 108 public void close() throws IOException { 109 if( !inTx ) { 110 try { 111 112 118 try{ 119 executeBatch(); 120 } finally { 121 if (connection != null && !connection.getAutoCommit()) { 122 connection.commit(); 123 } 124 } 125 126 } catch (SQLException e) { 127 JDBCPersistenceAdapter.log("Error while closing connection: ", e); 128 throw IOExceptionSupport.create(e); 129 } finally { 130 try { 131 if (connection != null) { 132 connection.close(); 133 } 134 } catch (Throwable e) { 135 log.warn("Close failed: "+e.getMessage(), e); 136 } finally { 137 connection=null; 138 } 139 } 140 } 141 } 142 143 public void begin() throws IOException { 144 if( inTx ) 145 throw new IOException ("Already started."); 146 inTx = true; 147 connection = getConnection(); 148 } 149 150 public void commit() throws IOException { 151 if( !inTx ) 152 throw new IOException ("Not started."); 153 try { 154 executeBatch(); 155 if( !connection.getAutoCommit() ) 156 connection.commit(); 157 } catch (SQLException e) { 158 JDBCPersistenceAdapter.log("Commit failed: ", e); 159 throw IOExceptionSupport.create(e); 160 } finally { 161 inTx=false; 162 close(); 163 } 164 } 165 166 public void rollback() throws IOException { 167 if( !inTx ) 168 throw new IOException ("Not started."); 169 try { 170 if( addMessageStatement != null ) { 171 addMessageStatement.close(); 172 addMessageStatement=null; 173 } 174 if( removedMessageStatement != null ) { 175 removedMessageStatement.close(); 176 removedMessageStatement=null; 177 } 178 if( updateLastAckStatement != null ) { 179 updateLastAckStatement.close(); 180 updateLastAckStatement=null; 181 } 182 connection.rollback(); 183 184 } catch (SQLException e) { 185 JDBCPersistenceAdapter.log("Rollback failed: ", e); 186 throw IOExceptionSupport.create(e); 187 } finally { 188 inTx=false; 189 close(); 190 } 191 } 192 193 public PreparedStatement getAddMessageStatement() { 194 return addMessageStatement; 195 } 196 public void setAddMessageStatement(PreparedStatement addMessageStatement) { 197 this.addMessageStatement = addMessageStatement; 198 } 199 200 public PreparedStatement getUpdateLastAckStatement() { 201 return updateLastAckStatement; 202 } 203 public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { 204 this.updateLastAckStatement = ackMessageStatement; 205 } 206 207 public PreparedStatement getRemovedMessageStatement() { 208 return removedMessageStatement; 209 } 210 public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { 211 this.removedMessageStatement = removedMessageStatement; 212 } 213 214 } 215 | Popular Tags |