KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jdbc > JDBCPersistenceAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jdbc;
19
20 import java.util.concurrent.ScheduledFuture JavaDoc;
21 import java.util.concurrent.ScheduledThreadPoolExecutor JavaDoc;
22 import java.util.concurrent.ThreadFactory JavaDoc;
23 import java.util.concurrent.TimeUnit JavaDoc;
24
25 import org.apache.activemq.broker.BrokerService;
26 import org.apache.activemq.broker.BrokerServiceAware;
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.command.ActiveMQQueue;
29 import org.apache.activemq.command.ActiveMQTopic;
30 import org.apache.activemq.memory.UsageManager;
31 import org.apache.activemq.openwire.OpenWireFormat;
32 import org.apache.activemq.store.MessageStore;
33 import org.apache.activemq.store.PersistenceAdapter;
34 import org.apache.activemq.store.TopicMessageStore;
35 import org.apache.activemq.store.TransactionStore;
36 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
37 import org.apache.activemq.store.memory.MemoryTransactionStore;
38 import org.apache.activemq.util.FactoryFinder;
39 import org.apache.activemq.util.IOExceptionSupport;
40 import org.apache.activemq.wireformat.WireFormat;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 import javax.sql.DataSource JavaDoc;
45
46 import java.io.File JavaDoc;
47 import java.io.IOException JavaDoc;
48 import java.sql.SQLException JavaDoc;
49 import java.util.Collections JavaDoc;
50 import java.util.Set JavaDoc;
51
52 /**
53  * A {@link PersistenceAdapter} implementation using JDBC for persistence
54  * storage.
55  *
56  * This persistence adapter will correctly remember prepared XA transactions,
57  * but it will not keep track of local transaction commits so that operations
58  * performed against the Message store are done as a single uow.
59  *
60  * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
61  *
62  * @version $Revision: 1.9 $
63  */

64 public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, BrokerServiceAware {
65
66     private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
67     private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/");
68
69     private WireFormat wireFormat = new OpenWireFormat();
70     private BrokerService brokerService;
71     private Statements statements;
72     private JDBCAdapter adapter;
73     private MemoryTransactionStore transactionStore;
74     private ScheduledThreadPoolExecutor JavaDoc clockDaemon;
75     private ScheduledFuture JavaDoc clockTicket;
76     private int cleanupPeriod = 1000 * 60 * 5;
77     private boolean useExternalMessageReferences;
78     private boolean useDatabaseLock = true;
79     private int lockKeepAlivePeriod = 0;
80     private DatabaseLocker databaseLocker;
81     private boolean createTablesOnStartup = true;
82
83     public JDBCPersistenceAdapter() {
84     }
85
86     public JDBCPersistenceAdapter(DataSource JavaDoc ds, WireFormat wireFormat) {
87         super(ds);
88         this.wireFormat = wireFormat;
89     }
90
91     public Set JavaDoc getDestinations() {
92         // Get a connection and insert the message into the DB.
93
TransactionContext c = null;
94         try {
95             c = getTransactionContext();
96             return getAdapter().doGetDestinations(c);
97         }
98         catch (IOException JavaDoc e) {
99             return Collections.EMPTY_SET;
100         }
101         catch (SQLException JavaDoc e) {
102             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
103             return Collections.EMPTY_SET;
104         }
105         finally {
106             if (c != null) {
107                 try {
108                     c.close();
109                 }
110                 catch (Throwable JavaDoc e) {
111                 }
112             }
113         }
114     }
115
116     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException JavaDoc {
117         MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination);
118         if (transactionStore != null) {
119             rc = transactionStore.proxy(rc);
120         }
121         return rc;
122     }
123
124     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException JavaDoc {
125         TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination);
126         if (transactionStore != null) {
127             rc = transactionStore.proxy(rc);
128         }
129         return rc;
130     }
131
132     public TransactionStore createTransactionStore() throws IOException JavaDoc {
133         if (transactionStore == null) {
134             transactionStore = new MemoryTransactionStore();
135         }
136         return this.transactionStore;
137     }
138
139     public long getLastMessageBrokerSequenceId() throws IOException JavaDoc {
140         // Get a connection and insert the message into the DB.
141
TransactionContext c = getTransactionContext();
142         try {
143             return getAdapter().doGetLastMessageBrokerSequenceId(c);
144         } catch (SQLException JavaDoc e) {
145             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
146             throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
147         } finally {
148             c.close();
149         }
150     }
151
152     public void start() throws Exception JavaDoc {
153         getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
154
155         if (isCreateTablesOnStartup()) {
156             TransactionContext transactionContext = getTransactionContext();
157             transactionContext.begin();
158             try {
159                 try {
160                     getAdapter().doCreateTables(transactionContext);
161                 } catch (SQLException JavaDoc e) {
162                     log.warn("Cannot create tables due to: " + e);
163                     JDBCPersistenceAdapter.log("Failure Details: ",e);
164                 }
165             } finally {
166                 transactionContext.commit();
167             }
168         }
169         
170         if (isUseDatabaseLock()) {
171             DatabaseLocker service = getDatabaseLocker();
172             if (service == null) {
173                 log.warn("No databaseLocker configured for the JDBC Persistence Adapter");
174             }
175             else {
176                 service.start();
177             }
178         }
179
180         cleanup();
181
182         // Cleanup the db periodically.
183
if (cleanupPeriod > 0) {
184             clockTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable JavaDoc() {
185                 public void run() {
186                     cleanup();
187                 }
188             }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
189         }
190     }
191
192     public synchronized void stop() throws Exception JavaDoc {
193         if (clockTicket != null) {
194             clockTicket.cancel(true);
195             clockTicket = null;
196         }
197         if (clockDaemon != null) {
198             clockDaemon.shutdown();
199             clockDaemon = null;
200         }
201         DatabaseLocker service = getDatabaseLocker();
202         if (service != null) {
203             service.stop();
204         }
205     }
206
207     public void cleanup() {
208         TransactionContext c = null;
209         try {
210             log.debug("Cleaning up old messages.");
211             c = getTransactionContext();
212             getAdapter().doDeleteOldMessages(c);
213         }
214         catch (IOException JavaDoc e) {
215             log.warn("Old message cleanup failed due to: " + e, e);
216         }
217         catch (SQLException JavaDoc e) {
218             log.warn("Old message cleanup failed due to: " + e);
219             JDBCPersistenceAdapter.log("Failure Details: ", e);
220         }
221         finally {
222             if (c != null) {
223                 try {
224                     c.close();
225                 }
226                 catch (Throwable JavaDoc e) {
227                 }
228             }
229             log.debug("Cleanup done.");
230         }
231     }
232
233     public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor JavaDoc clockDaemon) {
234         this.clockDaemon = clockDaemon;
235     }
236
237     public ScheduledThreadPoolExecutor JavaDoc getScheduledThreadPoolExecutor() {
238         if (clockDaemon == null) {
239             clockDaemon = new ScheduledThreadPoolExecutor JavaDoc(5, new ThreadFactory JavaDoc() {
240                 public Thread JavaDoc newThread(Runnable JavaDoc runnable) {
241                     Thread JavaDoc thread = new Thread JavaDoc(runnable, "ActiveMQ Cleanup Timer");
242                     thread.setDaemon(true);
243                     return thread;
244                 }
245             });
246         }
247         return clockDaemon;
248     }
249
250     public JDBCAdapter getAdapter() throws IOException JavaDoc {
251         if (adapter == null) {
252             setAdapter(createAdapter());
253         }
254         return adapter;
255     }
256
257     
258     public DatabaseLocker getDatabaseLocker() throws IOException JavaDoc {
259         if (databaseLocker == null) {
260             databaseLocker = createDatabaseLocker();
261             if (lockKeepAlivePeriod > 0) {
262                 getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable JavaDoc() {
263                     public void run() {
264                         databaseLockKeepAlive();
265                     }
266                 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
267             }
268         }
269         return databaseLocker;
270     }
271
272     /**
273      * Sets the database locker strategy to use to lock the database on startup
274      */

275     public void setDatabaseLocker(DatabaseLocker databaseLocker) {
276         this.databaseLocker = databaseLocker;
277     }
278     
279     public BrokerService getBrokerService() {
280         return brokerService;
281     }
282
283     public void setBrokerService(BrokerService brokerService) {
284         this.brokerService = brokerService;
285     }
286
287     /**
288      * @throws IOException
289      */

290     protected JDBCAdapter createAdapter() throws IOException JavaDoc {
291         JDBCAdapter adapter=null;
292         TransactionContext c = getTransactionContext();
293         try {
294
295             try {
296
297                 // Make the filename file system safe.
298
String JavaDoc dirverName = c.getConnection().getMetaData().getDriverName();
299                 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
300
301                 try {
302                     adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName);
303                     log.info("Database driver recognized: [" + dirverName + "]");
304                 } catch (Throwable JavaDoc e) {
305                     log.warn("Database driver NOT recognized: [" + dirverName
306                             + "]. Will use default JDBC implementation.");
307                 }
308
309             } catch (SQLException JavaDoc e) {
310                 log.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
311                                 + e.getMessage());
312                 JDBCPersistenceAdapter.log("Failure Details: ",e);
313             }
314
315             // Use the default JDBC adapter if the
316
// Database type is not recognized.
317
if (adapter == null) {
318                 adapter = new DefaultJDBCAdapter();
319             }
320
321         } finally {
322             c.close();
323         }
324         return adapter;
325     }
326
327     public void setAdapter(JDBCAdapter adapter) {
328         this.adapter = adapter;
329         this.adapter.setStatements(getStatements());
330     }
331
332     public WireFormat getWireFormat() {
333         return wireFormat;
334     }
335
336     public void setWireFormat(WireFormat wireFormat) {
337         this.wireFormat = wireFormat;
338     }
339
340     public TransactionContext getTransactionContext(ConnectionContext context) throws IOException JavaDoc {
341         if (context == null) {
342             return getTransactionContext();
343         } else {
344             TransactionContext answer = (TransactionContext) context.getLongTermStoreContext();
345             if (answer == null) {
346                 answer = new TransactionContext(getDataSource());
347                 context.setLongTermStoreContext(answer);
348             }
349             return answer;
350         }
351     }
352
353     public TransactionContext getTransactionContext() throws IOException JavaDoc {
354         return new TransactionContext(getDataSource());
355     }
356
357     public void beginTransaction(ConnectionContext context) throws IOException JavaDoc {
358         TransactionContext transactionContext = getTransactionContext(context);
359         transactionContext.begin();
360     }
361
362     public void commitTransaction(ConnectionContext context) throws IOException JavaDoc {
363         TransactionContext transactionContext = getTransactionContext(context);
364         transactionContext.commit();
365     }
366
367     public void rollbackTransaction(ConnectionContext context) throws IOException JavaDoc {
368         TransactionContext transactionContext = getTransactionContext(context);
369         transactionContext.rollback();
370     }
371
372     public int getCleanupPeriod() {
373         return cleanupPeriod;
374     }
375
376     /**
377      * Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics
378      */

379     public void setCleanupPeriod(int cleanupPeriod) {
380         this.cleanupPeriod = cleanupPeriod;
381     }
382
383     public void deleteAllMessages() throws IOException JavaDoc {
384         TransactionContext c = getTransactionContext();
385         try {
386             getAdapter().doDropTables(c);
387             getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
388             getAdapter().doCreateTables(c);
389         } catch (SQLException JavaDoc e) {
390             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
391             throw IOExceptionSupport.create(e);
392         } finally {
393             c.close();
394         }
395     }
396
397     public boolean isUseExternalMessageReferences() {
398         return useExternalMessageReferences;
399     }
400
401     public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
402         this.useExternalMessageReferences = useExternalMessageReferences;
403     }
404     
405     public boolean isCreateTablesOnStartup() {
406         return createTablesOnStartup;
407     }
408
409     /**
410      * Sets whether or not tables are created on startup
411      */

412     public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
413         this.createTablesOnStartup = createTablesOnStartup;
414     }
415
416     public boolean isUseDatabaseLock() {
417         return useDatabaseLock;
418     }
419
420     /**
421      * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
422      */

423     public void setUseDatabaseLock(boolean useDatabaseLock) {
424         this.useDatabaseLock = useDatabaseLock;
425     }
426
427     static public void log(String JavaDoc msg, SQLException JavaDoc e) {
428         String JavaDoc s = msg+e.getMessage();
429         while( e.getNextException() != null ) {
430             e = e.getNextException();
431             s += ", due to: "+e.getMessage();
432         }
433         log.debug(s, e);
434     }
435
436     public Statements getStatements() {
437         if( statements == null ) {
438             statements = new Statements();
439         }
440         return statements;
441     }
442
443     public void setStatements(Statements statements) {
444         this.statements = statements;
445     }
446
447     /**
448      * @param usageManager The UsageManager that is controlling the destination's memory usage.
449      */

450     public void setUsageManager(UsageManager usageManager) {
451     }
452
453
454     protected void databaseLockKeepAlive() {
455         boolean stop = false;
456         try {
457             DatabaseLocker locker = getDatabaseLocker();
458             if (locker != null) {
459                 if (!locker.keepAlive()) {
460                     stop = true;
461                 }
462             }
463         }
464         catch (IOException JavaDoc e) {
465             log.error("Failed to get database when trying keepalive: " + e, e);
466         }
467         if (stop) {
468             stopBroker();
469         }
470     }
471
472     protected void stopBroker() {
473         // we can no longer keep the lock so lets fail
474
log.info("No longer able to keep the exclusive lock so giving up being a master");
475         try {
476             brokerService.stop();
477         }
478         catch (Exception JavaDoc e) {
479             log.warn("Failed to stop broker");
480         }
481     }
482
483     protected DatabaseLocker createDatabaseLocker() throws IOException JavaDoc {
484         return new DefaultDatabaseLocker(getDataSource(), getStatements());
485     }
486     
487     public void setBrokerName(String JavaDoc brokerName){
488     }
489     
490     public String JavaDoc toString(){
491         return "JDBCPersistenceAdaptor("+super.toString()+")";
492     }
493
494     public void setDirectory(File JavaDoc dir){
495     }
496
497     public void checkpoint(boolean sync) throws IOException JavaDoc{
498     }
499 }
500
Popular Tags