1 18 package org.apache.activemq.store.jdbc; 19 20 import java.util.concurrent.ScheduledFuture ; 21 import java.util.concurrent.ScheduledThreadPoolExecutor ; 22 import java.util.concurrent.ThreadFactory ; 23 import java.util.concurrent.TimeUnit ; 24 25 import org.apache.activemq.broker.BrokerService; 26 import org.apache.activemq.broker.BrokerServiceAware; 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.command.ActiveMQQueue; 29 import org.apache.activemq.command.ActiveMQTopic; 30 import org.apache.activemq.memory.UsageManager; 31 import org.apache.activemq.openwire.OpenWireFormat; 32 import org.apache.activemq.store.MessageStore; 33 import org.apache.activemq.store.PersistenceAdapter; 34 import org.apache.activemq.store.TopicMessageStore; 35 import org.apache.activemq.store.TransactionStore; 36 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 37 import org.apache.activemq.store.memory.MemoryTransactionStore; 38 import org.apache.activemq.util.FactoryFinder; 39 import org.apache.activemq.util.IOExceptionSupport; 40 import org.apache.activemq.wireformat.WireFormat; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 import javax.sql.DataSource ; 45 46 import java.io.File ; 47 import java.io.IOException ; 48 import java.sql.SQLException ; 49 import java.util.Collections ; 50 import java.util.Set ; 51 52 64 public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, BrokerServiceAware { 65 66 private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class); 67 private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/"); 68 69 private WireFormat wireFormat = new OpenWireFormat(); 70 private BrokerService brokerService; 71 private Statements statements; 72 private JDBCAdapter adapter; 73 private MemoryTransactionStore transactionStore; 74 private ScheduledThreadPoolExecutor clockDaemon; 75 private ScheduledFuture clockTicket; 76 private int cleanupPeriod = 1000 * 60 * 5; 77 private boolean useExternalMessageReferences; 78 private boolean useDatabaseLock = true; 79 private int lockKeepAlivePeriod = 0; 80 private DatabaseLocker databaseLocker; 81 private boolean createTablesOnStartup = true; 82 83 public JDBCPersistenceAdapter() { 84 } 85 86 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 87 super(ds); 88 this.wireFormat = wireFormat; 89 } 90 91 public Set getDestinations() { 92 TransactionContext c = null; 94 try { 95 c = getTransactionContext(); 96 return getAdapter().doGetDestinations(c); 97 } 98 catch (IOException e) { 99 return Collections.EMPTY_SET; 100 } 101 catch (SQLException e) { 102 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 103 return Collections.EMPTY_SET; 104 } 105 finally { 106 if (c != null) { 107 try { 108 c.close(); 109 } 110 catch (Throwable e) { 111 } 112 } 113 } 114 } 115 116 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 117 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination); 118 if (transactionStore != null) { 119 rc = transactionStore.proxy(rc); 120 } 121 return rc; 122 } 123 124 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 125 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination); 126 if (transactionStore != null) { 127 rc = transactionStore.proxy(rc); 128 } 129 return rc; 130 } 131 132 public TransactionStore createTransactionStore() throws IOException { 133 if (transactionStore == null) { 134 transactionStore = new MemoryTransactionStore(); 135 } 136 return this.transactionStore; 137 } 138 139 public long getLastMessageBrokerSequenceId() throws IOException { 140 TransactionContext c = getTransactionContext(); 142 try { 143 return getAdapter().doGetLastMessageBrokerSequenceId(c); 144 } catch (SQLException e) { 145 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 146 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 147 } finally { 148 c.close(); 149 } 150 } 151 152 public void start() throws Exception { 153 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 154 155 if (isCreateTablesOnStartup()) { 156 TransactionContext transactionContext = getTransactionContext(); 157 transactionContext.begin(); 158 try { 159 try { 160 getAdapter().doCreateTables(transactionContext); 161 } catch (SQLException e) { 162 log.warn("Cannot create tables due to: " + e); 163 JDBCPersistenceAdapter.log("Failure Details: ",e); 164 } 165 } finally { 166 transactionContext.commit(); 167 } 168 } 169 170 if (isUseDatabaseLock()) { 171 DatabaseLocker service = getDatabaseLocker(); 172 if (service == null) { 173 log.warn("No databaseLocker configured for the JDBC Persistence Adapter"); 174 } 175 else { 176 service.start(); 177 } 178 } 179 180 cleanup(); 181 182 if (cleanupPeriod > 0) { 184 clockTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable () { 185 public void run() { 186 cleanup(); 187 } 188 }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS); 189 } 190 } 191 192 public synchronized void stop() throws Exception { 193 if (clockTicket != null) { 194 clockTicket.cancel(true); 195 clockTicket = null; 196 } 197 if (clockDaemon != null) { 198 clockDaemon.shutdown(); 199 clockDaemon = null; 200 } 201 DatabaseLocker service = getDatabaseLocker(); 202 if (service != null) { 203 service.stop(); 204 } 205 } 206 207 public void cleanup() { 208 TransactionContext c = null; 209 try { 210 log.debug("Cleaning up old messages."); 211 c = getTransactionContext(); 212 getAdapter().doDeleteOldMessages(c); 213 } 214 catch (IOException e) { 215 log.warn("Old message cleanup failed due to: " + e, e); 216 } 217 catch (SQLException e) { 218 log.warn("Old message cleanup failed due to: " + e); 219 JDBCPersistenceAdapter.log("Failure Details: ", e); 220 } 221 finally { 222 if (c != null) { 223 try { 224 c.close(); 225 } 226 catch (Throwable e) { 227 } 228 } 229 log.debug("Cleanup done."); 230 } 231 } 232 233 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) { 234 this.clockDaemon = clockDaemon; 235 } 236 237 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 238 if (clockDaemon == null) { 239 clockDaemon = new ScheduledThreadPoolExecutor (5, new ThreadFactory () { 240 public Thread newThread(Runnable runnable) { 241 Thread thread = new Thread (runnable, "ActiveMQ Cleanup Timer"); 242 thread.setDaemon(true); 243 return thread; 244 } 245 }); 246 } 247 return clockDaemon; 248 } 249 250 public JDBCAdapter getAdapter() throws IOException { 251 if (adapter == null) { 252 setAdapter(createAdapter()); 253 } 254 return adapter; 255 } 256 257 258 public DatabaseLocker getDatabaseLocker() throws IOException { 259 if (databaseLocker == null) { 260 databaseLocker = createDatabaseLocker(); 261 if (lockKeepAlivePeriod > 0) { 262 getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable () { 263 public void run() { 264 databaseLockKeepAlive(); 265 } 266 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS); 267 } 268 } 269 return databaseLocker; 270 } 271 272 275 public void setDatabaseLocker(DatabaseLocker databaseLocker) { 276 this.databaseLocker = databaseLocker; 277 } 278 279 public BrokerService getBrokerService() { 280 return brokerService; 281 } 282 283 public void setBrokerService(BrokerService brokerService) { 284 this.brokerService = brokerService; 285 } 286 287 290 protected JDBCAdapter createAdapter() throws IOException { 291 JDBCAdapter adapter=null; 292 TransactionContext c = getTransactionContext(); 293 try { 294 295 try { 296 297 String dirverName = c.getConnection().getMetaData().getDriverName(); 299 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(); 300 301 try { 302 adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName); 303 log.info("Database driver recognized: [" + dirverName + "]"); 304 } catch (Throwable e) { 305 log.warn("Database driver NOT recognized: [" + dirverName 306 + "]. Will use default JDBC implementation."); 307 } 308 309 } catch (SQLException e) { 310 log.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: " 311 + e.getMessage()); 312 JDBCPersistenceAdapter.log("Failure Details: ",e); 313 } 314 315 if (adapter == null) { 318 adapter = new DefaultJDBCAdapter(); 319 } 320 321 } finally { 322 c.close(); 323 } 324 return adapter; 325 } 326 327 public void setAdapter(JDBCAdapter adapter) { 328 this.adapter = adapter; 329 this.adapter.setStatements(getStatements()); 330 } 331 332 public WireFormat getWireFormat() { 333 return wireFormat; 334 } 335 336 public void setWireFormat(WireFormat wireFormat) { 337 this.wireFormat = wireFormat; 338 } 339 340 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 341 if (context == null) { 342 return getTransactionContext(); 343 } else { 344 TransactionContext answer = (TransactionContext) context.getLongTermStoreContext(); 345 if (answer == null) { 346 answer = new TransactionContext(getDataSource()); 347 context.setLongTermStoreContext(answer); 348 } 349 return answer; 350 } 351 } 352 353 public TransactionContext getTransactionContext() throws IOException { 354 return new TransactionContext(getDataSource()); 355 } 356 357 public void beginTransaction(ConnectionContext context) throws IOException { 358 TransactionContext transactionContext = getTransactionContext(context); 359 transactionContext.begin(); 360 } 361 362 public void commitTransaction(ConnectionContext context) throws IOException { 363 TransactionContext transactionContext = getTransactionContext(context); 364 transactionContext.commit(); 365 } 366 367 public void rollbackTransaction(ConnectionContext context) throws IOException { 368 TransactionContext transactionContext = getTransactionContext(context); 369 transactionContext.rollback(); 370 } 371 372 public int getCleanupPeriod() { 373 return cleanupPeriod; 374 } 375 376 379 public void setCleanupPeriod(int cleanupPeriod) { 380 this.cleanupPeriod = cleanupPeriod; 381 } 382 383 public void deleteAllMessages() throws IOException { 384 TransactionContext c = getTransactionContext(); 385 try { 386 getAdapter().doDropTables(c); 387 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 388 getAdapter().doCreateTables(c); 389 } catch (SQLException e) { 390 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 391 throw IOExceptionSupport.create(e); 392 } finally { 393 c.close(); 394 } 395 } 396 397 public boolean isUseExternalMessageReferences() { 398 return useExternalMessageReferences; 399 } 400 401 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 402 this.useExternalMessageReferences = useExternalMessageReferences; 403 } 404 405 public boolean isCreateTablesOnStartup() { 406 return createTablesOnStartup; 407 } 408 409 412 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 413 this.createTablesOnStartup = createTablesOnStartup; 414 } 415 416 public boolean isUseDatabaseLock() { 417 return useDatabaseLock; 418 } 419 420 423 public void setUseDatabaseLock(boolean useDatabaseLock) { 424 this.useDatabaseLock = useDatabaseLock; 425 } 426 427 static public void log(String msg, SQLException e) { 428 String s = msg+e.getMessage(); 429 while( e.getNextException() != null ) { 430 e = e.getNextException(); 431 s += ", due to: "+e.getMessage(); 432 } 433 log.debug(s, e); 434 } 435 436 public Statements getStatements() { 437 if( statements == null ) { 438 statements = new Statements(); 439 } 440 return statements; 441 } 442 443 public void setStatements(Statements statements) { 444 this.statements = statements; 445 } 446 447 450 public void setUsageManager(UsageManager usageManager) { 451 } 452 453 454 protected void databaseLockKeepAlive() { 455 boolean stop = false; 456 try { 457 DatabaseLocker locker = getDatabaseLocker(); 458 if (locker != null) { 459 if (!locker.keepAlive()) { 460 stop = true; 461 } 462 } 463 } 464 catch (IOException e) { 465 log.error("Failed to get database when trying keepalive: " + e, e); 466 } 467 if (stop) { 468 stopBroker(); 469 } 470 } 471 472 protected void stopBroker() { 473 log.info("No longer able to keep the exclusive lock so giving up being a master"); 475 try { 476 brokerService.stop(); 477 } 478 catch (Exception e) { 479 log.warn("Failed to stop broker"); 480 } 481 } 482 483 protected DatabaseLocker createDatabaseLocker() throws IOException { 484 return new DefaultDatabaseLocker(getDataSource(), getStatements()); 485 } 486 487 public void setBrokerName(String brokerName){ 488 } 489 490 public String toString(){ 491 return "JDBCPersistenceAdaptor("+super.toString()+")"; 492 } 493 494 public void setDirectory(File dir){ 495 } 496 497 public void checkpoint(boolean sync) throws IOException { 498 } 499 } 500 | Popular Tags |