1 17 package org.apache.activemq.store.jdbc; 18 19 import org.apache.activemq.Service; 20 import org.apache.activemq.broker.BrokerService; 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 24 import javax.sql.DataSource ; 25 26 import java.sql.Connection ; 27 import java.sql.PreparedStatement ; 28 import java.sql.SQLException ; 29 30 36 public class DefaultDatabaseLocker implements DatabaseLocker { 37 private static final Log log = LogFactory.getLog(DefaultDatabaseLocker.class); 38 39 private final DataSource dataSource; 40 private final Statements statements; 41 private long sleepTime = 1000; 42 private Connection connection; 43 private boolean stopping; 44 45 public DefaultDatabaseLocker(DataSource dataSource, Statements statements) { 46 this.dataSource = dataSource; 47 this.statements = statements; 48 } 49 50 public void start() throws Exception { 51 stopping = false; 52 connection = dataSource.getConnection(); 53 connection.setAutoCommit(false); 54 55 log.info("Attempting to acquire the exclusive lock to become the Master broker"); 56 String sql = statements.getLockCreateStatement(); 57 PreparedStatement statement = connection.prepareStatement(sql); 58 while (true) { 59 try { 60 statement.execute(); 61 break; 62 } 63 catch (Exception e) { 64 if (stopping) { 65 throw new Exception ("Cannot start broker as being asked to shut down. Interupted attempt to acquire lock: " + e, e); 66 } 67 log.error("Failed to acquire lock: " + e, e); 68 } 69 log.debug("Sleeping for " + sleepTime + " milli(s) before trying again to get the lock..."); 70 Thread.sleep(sleepTime); 71 } 72 73 log.info("Becoming the master on dataSource: " + dataSource); 74 } 75 76 public void stop() throws Exception { 77 stopping = true; 78 if (connection != null) { 79 connection.rollback(); 80 connection.close(); 81 } 82 } 83 84 public boolean keepAlive() { 85 try { 86 PreparedStatement statement = connection.prepareStatement(statements.getLockUpdateStatement()); 87 statement.setLong(1, System.currentTimeMillis()); 88 int rows = statement.executeUpdate(); 89 if (rows == 1) { 90 return true; 91 } 92 } 93 catch (Exception e) { 94 log.error("Failed to update database lock: " + e, e); 95 } 96 return false; 97 } 98 } 99 | Popular Tags |