KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > journal > JournalPersistenceAdapterFactory


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.journal;
19
20 import java.io.File JavaDoc;
21 import java.io.IOException JavaDoc;
22 import org.apache.activeio.journal.Journal;
23 import org.apache.activeio.journal.active.JournalImpl;
24 import org.apache.activeio.journal.active.JournalLockedException;
25 import org.apache.activemq.store.PersistenceAdapter;
26 import org.apache.activemq.store.PersistenceAdapterFactory;
27 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
28 import org.apache.activemq.store.jdbc.DataSourceSupport;
29 import org.apache.activemq.store.jdbc.JDBCAdapter;
30 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
31 import org.apache.activemq.store.jdbc.Statements;
32 import org.apache.activemq.thread.TaskRunnerFactory;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35
36 /**
37  * Factory class that can create PersistenceAdapter objects.
38  *
39  * @version $Revision: 1.4 $
40  */

41 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
42     
43     private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000;
44
45     private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
46     
47     private int journalLogFileSize = 1024*1024*20;
48     private int journalLogFiles = 2;
49     private TaskRunnerFactory taskRunnerFactory;
50     private Journal journal;
51     private boolean useJournal=true;
52     private boolean useQuickJournal=false;
53     private File JavaDoc journalArchiveDirectory;
54     private boolean failIfJournalIsLocked=false;
55     private int journalThreadPriority = Thread.MAX_PRIORITY;
56     private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
57     
58     public PersistenceAdapter createPersistenceAdapter() throws IOException JavaDoc {
59         jdbcPersistenceAdapter.setDataSource(getDataSource());
60         
61         if( !useJournal ) {
62             return jdbcPersistenceAdapter;
63         }
64         return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
65         
66     }
67
68     public int getJournalLogFiles() {
69         return journalLogFiles;
70     }
71
72     /**
73      * Sets the number of journal log files to use
74      */

75     public void setJournalLogFiles(int journalLogFiles) {
76         this.journalLogFiles = journalLogFiles;
77     }
78
79     public int getJournalLogFileSize() {
80         return journalLogFileSize;
81     }
82
83     /**
84      * Sets the size of the journal log files
85      *
86      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
87      */

88     public void setJournalLogFileSize(int journalLogFileSize) {
89         this.journalLogFileSize = journalLogFileSize;
90     }
91     
92     public JDBCPersistenceAdapter getJdbcAdapter() {
93         return jdbcPersistenceAdapter;
94     }
95
96     public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
97         this.jdbcPersistenceAdapter = jdbcAdapter;
98     }
99
100     public boolean isUseJournal() {
101         return useJournal;
102     }
103
104     /**
105      * Enables or disables the use of the journal. The default is to use the journal
106      *
107      * @param useJournal
108      */

109     public void setUseJournal(boolean useJournal) {
110         this.useJournal = useJournal;
111     }
112
113     public TaskRunnerFactory getTaskRunnerFactory() {
114         if( taskRunnerFactory == null ) {
115             taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000);
116         }
117         return taskRunnerFactory;
118     }
119
120     public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
121         this.taskRunnerFactory = taskRunnerFactory;
122     }
123
124     public Journal getJournal() throws IOException JavaDoc {
125         if( journal == null ) {
126             createJournal();
127         }
128         return journal;
129     }
130
131     public void setJournal(Journal journal) {
132         this.journal = journal;
133     }
134
135     public File JavaDoc getJournalArchiveDirectory() {
136         if( journalArchiveDirectory == null && useQuickJournal ) {
137             journalArchiveDirectory = new File JavaDoc(getDataDirectoryFile(), "journal");
138         }
139         return journalArchiveDirectory;
140     }
141
142     public void setJournalArchiveDirectory(File JavaDoc journalArchiveDirectory) {
143         this.journalArchiveDirectory = journalArchiveDirectory;
144     }
145
146
147     public boolean isUseQuickJournal() {
148         return useQuickJournal;
149     }
150
151     /**
152      * Enables or disables the use of quick journal, which keeps messages in the journal and just
153      * stores a reference to the messages in JDBC. Defaults to false so that messages actually reside
154      * long term in the JDBC database.
155      */

156     public void setUseQuickJournal(boolean useQuickJournal) {
157         this.useQuickJournal = useQuickJournal;
158     }
159
160     public JDBCAdapter getAdapter() throws IOException JavaDoc {
161         return jdbcPersistenceAdapter.getAdapter();
162     }
163
164     public void setAdapter(JDBCAdapter adapter) {
165         jdbcPersistenceAdapter.setAdapter(adapter);
166     }
167
168     public Statements getStatements() {
169         return jdbcPersistenceAdapter.getStatements();
170     }
171     public void setStatements(Statements statements) {
172         jdbcPersistenceAdapter.setStatements(statements);
173     }
174
175     public boolean isUseDatabaseLock() {
176         return jdbcPersistenceAdapter.isUseDatabaseLock();
177     }
178
179     /**
180      * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
181      */

182     public void setUseDatabaseLock(boolean useDatabaseLock) {
183         jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
184     }
185
186     public boolean isCreateTablesOnStartup() {
187         return jdbcPersistenceAdapter.isCreateTablesOnStartup();
188     }
189
190     /**
191      * Sets whether or not tables are created on startup
192      */

193     public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
194         jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
195     }
196     
197     public int getJournalThreadPriority(){
198         return journalThreadPriority;
199     }
200
201     /**
202      * Sets the thread priority of the journal thread
203      */

204     public void setJournalThreadPriority(int journalThreadPriority){
205         this.journalThreadPriority=journalThreadPriority;
206     }
207
208     /**
209      * @throws IOException
210      */

211     protected void createJournal() throws IOException JavaDoc {
212         File JavaDoc journalDir = new File JavaDoc(getDataDirectoryFile(), "journal").getCanonicalFile();
213         if( failIfJournalIsLocked ) {
214             journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
215         } else {
216             while( true ) {
217                 try {
218                     journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
219                     break;
220                 } catch (JournalLockedException e) {
221                     log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked.");
222                     try {
223                         Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
224                     } catch (InterruptedException JavaDoc e1) {
225                     }
226                 }
227             }
228         }
229     }
230
231     
232
233 }
234
Popular Tags