1 18 package org.apache.activemq.store.journal; 19 20 import java.io.File ; 21 import java.io.IOException ; 22 import org.apache.activeio.journal.Journal; 23 import org.apache.activeio.journal.active.JournalImpl; 24 import org.apache.activeio.journal.active.JournalLockedException; 25 import org.apache.activemq.store.PersistenceAdapter; 26 import org.apache.activemq.store.PersistenceAdapterFactory; 27 import org.apache.activemq.store.amq.AMQPersistenceAdapter; 28 import org.apache.activemq.store.jdbc.DataSourceSupport; 29 import org.apache.activemq.store.jdbc.JDBCAdapter; 30 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 31 import org.apache.activemq.store.jdbc.Statements; 32 import org.apache.activemq.thread.TaskRunnerFactory; 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 36 41 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory { 42 43 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000; 44 45 private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class); 46 47 private int journalLogFileSize = 1024*1024*20; 48 private int journalLogFiles = 2; 49 private TaskRunnerFactory taskRunnerFactory; 50 private Journal journal; 51 private boolean useJournal=true; 52 private boolean useQuickJournal=false; 53 private File journalArchiveDirectory; 54 private boolean failIfJournalIsLocked=false; 55 private int journalThreadPriority = Thread.MAX_PRIORITY; 56 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); 57 58 public PersistenceAdapter createPersistenceAdapter() throws IOException { 59 jdbcPersistenceAdapter.setDataSource(getDataSource()); 60 61 if( !useJournal ) { 62 return jdbcPersistenceAdapter; 63 } 64 return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); 65 66 } 67 68 public int getJournalLogFiles() { 69 return journalLogFiles; 70 } 71 72 75 public void setJournalLogFiles(int journalLogFiles) { 76 this.journalLogFiles = journalLogFiles; 77 } 78 79 public int getJournalLogFileSize() { 80 return journalLogFileSize; 81 } 82 83 88 public void setJournalLogFileSize(int journalLogFileSize) { 89 this.journalLogFileSize = journalLogFileSize; 90 } 91 92 public JDBCPersistenceAdapter getJdbcAdapter() { 93 return jdbcPersistenceAdapter; 94 } 95 96 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) { 97 this.jdbcPersistenceAdapter = jdbcAdapter; 98 } 99 100 public boolean isUseJournal() { 101 return useJournal; 102 } 103 104 109 public void setUseJournal(boolean useJournal) { 110 this.useJournal = useJournal; 111 } 112 113 public TaskRunnerFactory getTaskRunnerFactory() { 114 if( taskRunnerFactory == null ) { 115 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000); 116 } 117 return taskRunnerFactory; 118 } 119 120 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 121 this.taskRunnerFactory = taskRunnerFactory; 122 } 123 124 public Journal getJournal() throws IOException { 125 if( journal == null ) { 126 createJournal(); 127 } 128 return journal; 129 } 130 131 public void setJournal(Journal journal) { 132 this.journal = journal; 133 } 134 135 public File getJournalArchiveDirectory() { 136 if( journalArchiveDirectory == null && useQuickJournal ) { 137 journalArchiveDirectory = new File (getDataDirectoryFile(), "journal"); 138 } 139 return journalArchiveDirectory; 140 } 141 142 public void setJournalArchiveDirectory(File journalArchiveDirectory) { 143 this.journalArchiveDirectory = journalArchiveDirectory; 144 } 145 146 147 public boolean isUseQuickJournal() { 148 return useQuickJournal; 149 } 150 151 156 public void setUseQuickJournal(boolean useQuickJournal) { 157 this.useQuickJournal = useQuickJournal; 158 } 159 160 public JDBCAdapter getAdapter() throws IOException { 161 return jdbcPersistenceAdapter.getAdapter(); 162 } 163 164 public void setAdapter(JDBCAdapter adapter) { 165 jdbcPersistenceAdapter.setAdapter(adapter); 166 } 167 168 public Statements getStatements() { 169 return jdbcPersistenceAdapter.getStatements(); 170 } 171 public void setStatements(Statements statements) { 172 jdbcPersistenceAdapter.setStatements(statements); 173 } 174 175 public boolean isUseDatabaseLock() { 176 return jdbcPersistenceAdapter.isUseDatabaseLock(); 177 } 178 179 182 public void setUseDatabaseLock(boolean useDatabaseLock) { 183 jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock); 184 } 185 186 public boolean isCreateTablesOnStartup() { 187 return jdbcPersistenceAdapter.isCreateTablesOnStartup(); 188 } 189 190 193 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 194 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); 195 } 196 197 public int getJournalThreadPriority(){ 198 return journalThreadPriority; 199 } 200 201 204 public void setJournalThreadPriority(int journalThreadPriority){ 205 this.journalThreadPriority=journalThreadPriority; 206 } 207 208 211 protected void createJournal() throws IOException { 212 File journalDir = new File (getDataDirectoryFile(), "journal").getCanonicalFile(); 213 if( failIfJournalIsLocked ) { 214 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory()); 215 } else { 216 while( true ) { 217 try { 218 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory()); 219 break; 220 } catch (JournalLockedException e) { 221 log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked."); 222 try { 223 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 224 } catch (InterruptedException e1) { 225 } 226 } 227 } 228 } 229 } 230 231 232 233 } 234 | Popular Tags |