KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sf > hajdbc > local > LocalDatabaseCluster


1 /*
2  * HA-JDBC: High-Availability JDBC
3  * Copyright (c) 2004-2006 Paul Ferraro
4  *
5  * This library is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU Lesser General Public License as published by the
7  * Free Software Foundation; either version 2.1 of the License, or (at your
8  * option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public License
16  * along with this library; if not, write to the Free Software Foundation,
17  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: ferraro@users.sourceforge.net
20  */

21 package net.sf.hajdbc.local;
22
23 import java.sql.Connection JavaDoc;
24 import java.sql.DatabaseMetaData JavaDoc;
25 import java.sql.Driver JavaDoc;
26 import java.sql.ResultSet JavaDoc;
27 import java.sql.Statement JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.Iterator JavaDoc;
32 import java.util.LinkedList JavaDoc;
33 import java.util.List JavaDoc;
34 import java.util.Map JavaDoc;
35 import java.util.Set JavaDoc;
36 import java.util.TreeSet JavaDoc;
37 import java.util.concurrent.ConcurrentHashMap JavaDoc;
38 import java.util.concurrent.ExecutorService JavaDoc;
39 import java.util.concurrent.SynchronousQueue JavaDoc;
40 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
41 import java.util.concurrent.TimeUnit JavaDoc;
42 import java.util.concurrent.locks.Lock JavaDoc;
43 import java.util.concurrent.locks.ReadWriteLock JavaDoc;
44 import java.util.concurrent.locks.ReentrantReadWriteLock JavaDoc;
45 import java.util.prefs.BackingStoreException JavaDoc;
46 import java.util.prefs.Preferences JavaDoc;
47
48 import javax.management.JMException JavaDoc;
49 import javax.management.MBeanServer JavaDoc;
50 import javax.management.ObjectName JavaDoc;
51 import javax.management.StandardMBean JavaDoc;
52 import javax.sql.DataSource JavaDoc;
53
54 import net.sf.hajdbc.Balancer;
55 import net.sf.hajdbc.Database;
56 import net.sf.hajdbc.DatabaseCluster;
57 import net.sf.hajdbc.DatabaseClusterFactory;
58 import net.sf.hajdbc.DatabaseClusterMBean;
59 import net.sf.hajdbc.Dialect;
60 import net.sf.hajdbc.Messages;
61 import net.sf.hajdbc.SQLException;
62 import net.sf.hajdbc.SynchronizationStrategy;
63 import net.sf.hajdbc.SynchronizationStrategyBuilder;
64 import net.sf.hajdbc.sql.DataSourceDatabase;
65 import net.sf.hajdbc.sql.DriverDatabase;
66 import net.sf.hajdbc.util.concurrent.CronThreadPoolExecutor;
67 import net.sf.hajdbc.util.concurrent.SynchronousExecutor;
68
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 /**
73  * @author Paul Ferraro
74  * @version $Revision: 1364 $
75  * @since 1.0
76  */

77 public class LocalDatabaseCluster implements DatabaseCluster
78 {
79     private static final String JavaDoc STATE_DELIMITER = ",";
80     
81     private static Preferences JavaDoc preferences = Preferences.userNodeForPackage(LocalDatabaseCluster.class);
82     static Logger logger = LoggerFactory.getLogger(LocalDatabaseCluster.class);
83     
84     private String JavaDoc id;
85     private Balancer balancer;
86     private Dialect dialect;
87     private String JavaDoc defaultSynchronizationStrategyId;
88     private String JavaDoc failureDetectionSchedule;
89     private String JavaDoc autoActivationSchedule;
90     private int minThreads;
91     private int maxThreads;
92     private int maxIdle;
93     private Transaction transaction;
94     
95     private Map JavaDoc<String JavaDoc, Database> databaseMap = new ConcurrentHashMap JavaDoc<String JavaDoc, Database>();
96     private Map JavaDoc<Database, Object JavaDoc> connectionFactoryMap = new ConcurrentHashMap JavaDoc<Database, Object JavaDoc>();
97     private ExecutorService JavaDoc transactionalExecutor;
98     private ExecutorService JavaDoc nonTransactionalExecutor;
99     private CronThreadPoolExecutor cronExecutor = new CronThreadPoolExecutor(2);
100     private ReadWriteLock JavaDoc lock = createReadWriteLock();
101     
102     /**
103      * Work around for missing constructor in backport-util-concurrent package.
104      * @return ReadWriteLock implementation
105      */

106     private static ReadWriteLock JavaDoc createReadWriteLock()
107     {
108         try
109         {
110             return new ReentrantReadWriteLock JavaDoc(true);
111         }
112         catch (NoSuchMethodError JavaDoc e)
113         {
114             return new ReentrantReadWriteLock JavaDoc();
115         }
116     }
117     
118     /**
119      * @see net.sf.hajdbc.DatabaseCluster#loadState()
120      */

121     public String JavaDoc[] loadState() throws java.sql.SQLException JavaDoc
122     {
123         try
124         {
125             preferences.sync();
126             
127             String JavaDoc state = preferences.get(this.id, null);
128             
129             if (state == null)
130             {
131                 return null;
132             }
133             
134             if (state.length() == 0)
135             {
136                 return new String JavaDoc[0];
137             }
138             
139             String JavaDoc[] databases = state.split(STATE_DELIMITER);
140             
141             // Validate persisted cluster state
142
for (String JavaDoc id: databases)
143             {
144                 if (!this.databaseMap.containsKey(id))
145                 {
146                     // Persisted cluster state is invalid!
147
preferences.remove(this.id);
148                     preferences.flush();
149                     
150                     return null;
151                 }
152             }
153             
154             return databases;
155         }
156         catch (BackingStoreException JavaDoc e)
157         {
158             throw new SQLException(Messages.getMessage(Messages.CLUSTER_STATE_LOAD_FAILED, this), e);
159         }
160     }
161
162     /**
163      * @see net.sf.hajdbc.DatabaseCluster#getConnectionFactoryMap()
164      */

165     public Map JavaDoc<Database, ?> getConnectionFactoryMap()
166     {
167         return this.connectionFactoryMap;
168     }
169     
170     /**
171      * @see net.sf.hajdbc.DatabaseCluster#isAlive(net.sf.hajdbc.Database)
172      */

173     public boolean isAlive(Database database)
174     {
175         Connection JavaDoc connection = null;
176         
177         try
178         {
179             connection = database.connect(this.connectionFactoryMap.get(database));
180             
181             Statement JavaDoc statement = connection.createStatement();
182             
183             statement.execute(this.dialect.getSimpleSQL());
184
185             statement.close();
186             
187             return true;
188         }
189         catch (java.sql.SQLException JavaDoc e)
190         {
191             return false;
192         }
193         finally
194         {
195             if (connection != null)
196             {
197                 try
198                 {
199                     connection.close();
200                 }
201                 catch (java.sql.SQLException JavaDoc e)
202                 {
203                     logger.warn(e.toString(), e);
204                 }
205             }
206         }
207     }
208     
209     /**
210      * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database)
211      */

212     public synchronized boolean deactivate(Database database)
213     {
214         MBeanServer JavaDoc server = DatabaseClusterFactory.getMBeanServer();
215
216         try
217         {
218             ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
219             
220             // Reregister database mbean using "inactive" interface
221
if (server.isRegistered(name))
222             {
223                 server.unregisterMBean(name);
224             }
225             
226             server.registerMBean(new StandardMBean JavaDoc(database, database.getInactiveMBeanClass()), name);
227         }
228         catch (JMException JavaDoc e)
229         {
230             throw new IllegalStateException JavaDoc(e.toString(), e);
231         }
232         
233         boolean removed = this.balancer.remove(database);
234         
235         if (removed)
236         {
237             this.storeState();
238         }
239         
240         return removed;
241     }
242
243     /**
244      * @see net.sf.hajdbc.DatabaseClusterMBean#getId()
245      */

246     public String JavaDoc getId()
247     {
248         return this.id;
249     }
250     
251     /**
252      * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database)
253      */

254     public synchronized boolean activate(Database database)
255     {
256         MBeanServer JavaDoc server = DatabaseClusterFactory.getMBeanServer();
257
258         try
259         {
260             ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
261             
262             // Reregister database mbean using "active" interface
263
if (server.isRegistered(name))
264             {
265                 server.unregisterMBean(name);
266             }
267             
268             server.registerMBean(new StandardMBean JavaDoc(database, database.getActiveMBeanClass()), name);
269             
270             if (database.isDirty())
271             {
272                 DatabaseClusterFactory.getInstance().exportConfiguration();
273                 
274                 database.clean();
275             }
276             
277             boolean added = this.balancer.add(database);
278             
279             if (added)
280             {
281                 this.storeState();
282             }
283             
284             return added;
285         }
286         catch (JMException JavaDoc e)
287         {
288             throw new IllegalStateException JavaDoc(e);
289         }
290     }
291     
292     private void storeState()
293     {
294         StringBuilder JavaDoc builder = new StringBuilder JavaDoc();
295         
296         Iterator JavaDoc<Database> databases = this.balancer.list().iterator();
297         
298         while (databases.hasNext())
299         {
300             builder.append(databases.next().getId());
301             
302             if (databases.hasNext())
303             {
304                 builder.append(STATE_DELIMITER);
305             }
306         }
307         
308         preferences.put(this.id, builder.toString());
309         
310         try
311         {
312             preferences.flush();
313         }
314         catch (BackingStoreException JavaDoc e)
315         {
316             logger.warn(Messages.getMessage(Messages.CLUSTER_STATE_STORE_FAILED, this), e);
317         }
318     }
319     
320     /**
321      * @see net.sf.hajdbc.DatabaseClusterMBean#getActiveDatabases()
322      */

323     public Collection JavaDoc<String JavaDoc> getActiveDatabases()
324     {
325         return this.extractIdentifiers(this.balancer.list());
326     }
327
328     /**
329      * @see net.sf.hajdbc.DatabaseClusterMBean#getInactiveDatabases()
330      */

331     public Collection JavaDoc<String JavaDoc> getInactiveDatabases()
332     {
333         return this.extractIdentifiers(this.getInactiveDatabaseSet());
334     }
335     
336     protected Set JavaDoc<Database> getInactiveDatabaseSet()
337     {
338         Set JavaDoc<Database> databaseSet = new TreeSet JavaDoc<Database>(this.databaseMap.values());
339
340         databaseSet.removeAll(this.balancer.list());
341         
342         return databaseSet;
343     }
344     
345     private List JavaDoc<String JavaDoc> extractIdentifiers(Collection JavaDoc<Database> databases)
346     {
347         List JavaDoc<String JavaDoc> databaseList = new ArrayList JavaDoc<String JavaDoc>(databases.size());
348         
349         for (Database database: databases)
350         {
351             databaseList.add(database.getId());
352         }
353         
354         return databaseList;
355     }
356     
357     /**
358      * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String)
359      */

360     public Database getDatabase(String JavaDoc id)
361     {
362         Database database = this.databaseMap.get(id);
363         
364         if (database == null)
365         {
366             throw new IllegalArgumentException JavaDoc(Messages.getMessage(Messages.INVALID_DATABASE, id, this));
367         }
368         
369         return database;
370     }
371
372     /**
373      * @see net.sf.hajdbc.DatabaseCluster#getDefaultSynchronizationStrategy()
374      */

375     public SynchronizationStrategy getDefaultSynchronizationStrategy()
376     {
377         return DatabaseClusterFactory.getInstance().getSynchronizationStrategy(this.defaultSynchronizationStrategyId);
378     }
379     
380     /**
381      * @see net.sf.hajdbc.DatabaseCluster#getBalancer()
382      */

383     public Balancer getBalancer()
384     {
385         return this.balancer;
386     }
387
388     /**
389      * @see net.sf.hajdbc.DatabaseCluster#getTransactionalExecutor()
390      */

391     public ExecutorService JavaDoc getTransactionalExecutor()
392     {
393         return this.transactionalExecutor;
394     }
395
396     /**
397      * @see net.sf.hajdbc.DatabaseCluster#getNonTransactionalExecutor()
398      */

399     public ExecutorService JavaDoc getNonTransactionalExecutor()
400     {
401         return this.nonTransactionalExecutor;
402     }
403     
404     /**
405      * @see net.sf.hajdbc.DatabaseCluster#getDialect()
406      */

407     public Dialect getDialect()
408     {
409         return this.dialect;
410     }
411
412     /**
413      * @see net.sf.hajdbc.DatabaseCluster#readLock()
414      */

415     public Lock JavaDoc readLock()
416     {
417         return this.lock.readLock();
418     }
419     
420     /**
421      * @see net.sf.hajdbc.DatabaseCluster#writeLock()
422      */

423     public Lock JavaDoc writeLock()
424     {
425         return this.lock.writeLock();
426     }
427     
428     /**
429      * @see net.sf.hajdbc.DatabaseClusterMBean#isAlive(java.lang.String)
430      */

431     public final boolean isAlive(String JavaDoc id)
432     {
433         return this.isAlive(this.getDatabase(id));
434     }
435
436     /**
437      * @see net.sf.hajdbc.DatabaseClusterMBean#deactivate(java.lang.String)
438      */

439     public final void deactivate(String JavaDoc databaseId)
440     {
441         if (this.deactivate(this.getDatabase(databaseId)))
442         {
443             logger.info(Messages.getMessage(Messages.DATABASE_DEACTIVATED, databaseId, this));
444         }
445     }
446
447     /**
448      * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String)
449      */

450     public final void activate(String JavaDoc databaseId)
451     {
452         this.activate(databaseId, this.getDefaultSynchronizationStrategy());
453     }
454     
455     /**
456      * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String, java.lang.String)
457      */

458     public final void activate(String JavaDoc databaseId, String JavaDoc strategyId)
459     {
460         this.activate(databaseId, DatabaseClusterFactory.getInstance().getSynchronizationStrategy(strategyId));
461     }
462     
463     /**
464      * @see net.sf.hajdbc.DatabaseClusterMBean#getVersion()
465      */

466     public String JavaDoc getVersion()
467     {
468         return DatabaseClusterFactory.getVersion();
469     }
470
471     /**
472      * Handles a failure caused by the specified cause on the specified database.
473      * If the database is not alive, then it is deactivated, otherwise an exception is thrown back to the caller.
474      * @param database a database descriptor
475      * @param cause the cause of the failure
476      * @throws java.sql.SQLException if the database is alive
477      */

478     public final void handleFailure(Database database, java.sql.SQLException JavaDoc cause) throws java.sql.SQLException JavaDoc
479     {
480         if (this.isAlive(database))
481         {
482             throw cause;
483         }
484         
485         if (this.deactivate(database))
486         {
487             logger.warn(Messages.getMessage(Messages.DATABASE_NOT_ALIVE, database, this), cause);
488         }
489     }
490     
491     /**
492      * @see net.sf.hajdbc.DatabaseClusterMBean#add(java.lang.String, java.lang.String, java.lang.String)
493      */

494     public void add(String JavaDoc id, String JavaDoc driver, String JavaDoc url)
495     {
496         DriverDatabase database = new DriverDatabase();
497         
498         database.setId(id);
499         database.setDriver(driver);
500         database.setUrl(url);
501         
502         this.register(database);
503         
504         this.add(database);
505     }
506     
507     /**
508      * @see net.sf.hajdbc.DatabaseClusterMBean#add(java.lang.String, java.lang.String)
509      */

510     public void add(String JavaDoc id, String JavaDoc name)
511     {
512         DataSourceDatabase database = new DataSourceDatabase();
513         
514         database.setId(id);
515         database.setName(name);
516         
517         this.register(database);
518         
519         this.add(database);
520     }
521     
522     private void register(Database database)
523     {
524         MBeanServer JavaDoc server = DatabaseClusterFactory.getMBeanServer();
525
526         try
527         {
528             ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
529             
530             server.registerMBean(new StandardMBean JavaDoc(database, database.getInactiveMBeanClass()), name);
531         }
532         catch (JMException JavaDoc e)
533         {
534             logger.error(e.toString(), e);
535             
536             throw new IllegalStateException JavaDoc(e);
537         }
538     }
539     
540     /**
541      * @see net.sf.hajdbc.DatabaseClusterMBean#remove(java.lang.String)
542      */

543     public synchronized void remove(String JavaDoc id)
544     {
545         Database database = this.getDatabase(id);
546         
547         if (this.balancer.contains(database))
548         {
549             throw new IllegalStateException JavaDoc(Messages.getMessage(Messages.DATABASE_STILL_ACTIVE, id, this));
550         }
551         
552         this.unregister(database);
553         
554         this.databaseMap.remove(id);
555         this.connectionFactoryMap.remove(database);
556         
557         DatabaseClusterFactory.getInstance().exportConfiguration();
558     }
559
560     private void unregister(Database database)
561     {
562         MBeanServer JavaDoc server = DatabaseClusterFactory.getMBeanServer();
563
564         try
565         {
566             ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
567             
568             server.unregisterMBean(name);
569         }
570         catch (JMException JavaDoc e)
571         {
572             logger.error(e.toString(), e);
573             
574             throw new IllegalStateException JavaDoc(e);
575         }
576     }
577     
578     /**
579      * @see net.sf.hajdbc.DatabaseCluster#start()
580      */

581     public void start() throws java.sql.SQLException JavaDoc
582     {
583         MBeanServer JavaDoc server = DatabaseClusterFactory.getMBeanServer();
584
585         try
586         {
587             server.registerMBean(new StandardMBean JavaDoc(this, DatabaseClusterMBean.class), DatabaseClusterFactory.getObjectName(this.id));
588             
589             for (Database database: this.databaseMap.values())
590             {
591                 ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
592                 
593                 server.registerMBean(new StandardMBean JavaDoc(database, database.getInactiveMBeanClass()), name);
594             }
595         }
596         catch (JMException JavaDoc e)
597         {
598             throw new SQLException(e);
599         }
600         
601         String JavaDoc[] databases = this.loadState();
602         
603         if (databases != null)
604         {
605             for (String JavaDoc id: databases)
606             {
607                 this.activate(this.getDatabase(id));
608             }
609         }
610         else
611         {
612             for (String JavaDoc id: this.getInactiveDatabases())
613             {
614                 Database database = this.getDatabase(id);
615                 
616                 if (this.isAlive(database))
617                 {
618                     this.activate(database);
619                 }
620             }
621         }
622         
623         this.nonTransactionalExecutor = new ThreadPoolExecutor JavaDoc(this.minThreads, this.maxThreads, this.maxIdle, TimeUnit.SECONDS, new SynchronousQueue JavaDoc<Runnable JavaDoc>(), new ThreadPoolExecutor.CallerRunsPolicy JavaDoc());
624         
625         this.transactionalExecutor = this.transaction.equals(Transaction.XA) ? new SynchronousExecutor() : this.nonTransactionalExecutor;
626         
627         if (this.failureDetectionSchedule != null)
628         {
629             this.cronExecutor.schedule(new FailureDetectionTask(), this.failureDetectionSchedule);
630         }
631         
632         if (this.autoActivationSchedule != null)
633         {
634             this.cronExecutor.schedule(new AutoActivationTask(), this.autoActivationSchedule);
635         }
636     }
637     
638     /**
639      * @see net.sf.hajdbc.DatabaseCluster#stop()
640      */

641     public synchronized void stop()
642     {
643         MBeanServer JavaDoc server = DatabaseClusterFactory.getMBeanServer();
644
645         for (String JavaDoc databaseId: this.databaseMap.keySet())
646         {
647             try
648             {
649                 ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id, databaseId);
650                 
651                 if (server.isRegistered(name))
652                 {
653                     server.unregisterMBean(name);
654                 }
655             }
656             catch (JMException JavaDoc e)
657             {
658                 logger.warn(e.getMessage(), e);
659             }
660         }
661         
662         try
663         {
664             ObjectName JavaDoc name = DatabaseClusterFactory.getObjectName(this.id);
665             
666             if (server.isRegistered(name))
667             {
668                 server.unregisterMBean(name);
669             }
670         }
671         catch (JMException JavaDoc e)
672         {
673             logger.warn(e.getMessage(), e);
674         }
675         
676         this.cronExecutor.shutdownNow();
677         
678         if (this.nonTransactionalExecutor != null)
679         {
680             this.nonTransactionalExecutor.shutdownNow();
681         }
682         
683         if (this.transactionalExecutor != null)
684         {
685             this.transactionalExecutor.shutdownNow();
686         }
687     }
688     
689     /**
690      * @see java.lang.Object#toString()
691      */

692     @Override JavaDoc
693     public final String JavaDoc toString()
694     {
695         return this.getId();
696     }
697     
698     /**
699      * @see java.lang.Object#equals(java.lang.Object)
700      */

701     @Override JavaDoc
702     public final boolean equals(Object JavaDoc object)
703     {
704         DatabaseCluster databaseCluster = (DatabaseCluster) object;
705         
706         return this.getId().equals(databaseCluster.getId());
707     }
708     
709     SynchronizationStrategyBuilder getDefaultSynchronizationStrategyBuilder()
710     {
711         return new SynchronizationStrategyBuilder(this.defaultSynchronizationStrategyId);
712     }
713     
714     void setDefaultSynchronizationStrategyBuilder(SynchronizationStrategyBuilder builder)
715     {
716         this.defaultSynchronizationStrategyId = builder.getId();
717     }
718     
719     synchronized void add(Database database)
720     {
721         String JavaDoc id = database.getId();
722         
723         if (this.databaseMap.containsKey(id))
724         {
725             throw new IllegalArgumentException JavaDoc(Messages.getMessage(Messages.DATABASE_ALREADY_EXISTS, id, this));
726         }
727         
728         this.connectionFactoryMap.put(database, database.createConnectionFactory());
729         this.databaseMap.put(id, database);
730     }
731     
732     Iterator JavaDoc<Database> getDriverDatabases()
733     {
734         return this.getDatabases(Driver JavaDoc.class);
735     }
736     
737     Iterator JavaDoc<Database> getDataSourceDatabases()
738     {
739         return this.getDatabases(DataSource JavaDoc.class);
740     }
741     
742     Iterator JavaDoc<Database> getDatabases(Class JavaDoc targetClass)
743     {
744         List JavaDoc<Database> databaseList = new ArrayList JavaDoc<Database>(this.databaseMap.size());
745         
746         for (Database database: this.databaseMap.values())
747         {
748             if (targetClass.equals(database.getConnectionFactoryClass()))
749             {
750                 databaseList.add(database);
751             }
752         }
753         
754         return databaseList.iterator();
755     }
756     
757     private void activate(String JavaDoc databaseId, SynchronizationStrategy strategy)
758     {
759         try
760         {
761             if (this.activate(this.getDatabase(databaseId), strategy))
762             {
763                 logger.info(Messages.getMessage(Messages.DATABASE_ACTIVATED, databaseId, this));
764             }
765         }
766         catch (java.sql.SQLException JavaDoc e)
767         {
768             logger.error(Messages.getMessage(Messages.DATABASE_ACTIVATE_FAILED, databaseId, this), e);
769             
770             java.sql.SQLException JavaDoc exception = e.getNextException();
771             
772             while (exception != null)
773             {
774                 logger.error(exception.getMessage(), e);
775                 
776                 exception = exception.getNextException();
777             }
778
779             throw new IllegalStateException JavaDoc(e.toString());
780         }
781         catch (InterruptedException JavaDoc e)
782         {
783             logger.warn(e.toString(), e);
784             throw new IllegalMonitorStateException JavaDoc(e.toString());
785         }
786     }
787     
788     boolean activate(Database database, SynchronizationStrategy strategy) throws java.sql.SQLException JavaDoc, InterruptedException JavaDoc
789     {
790         if (this.getBalancer().contains(database))
791         {
792             return false;
793         }
794         
795         if (!this.isAlive(database))
796         {
797             return false;
798         }
799         
800         Lock JavaDoc lock = this.writeLock();
801         
802         lock.lockInterruptibly();
803         
804         try
805         {
806             List JavaDoc<Database> databaseList = this.getBalancer().list();
807             
808             if (databaseList.isEmpty())
809             {
810                 return this.activate(database);
811             }
812             
813             this.activate(database, databaseList, strategy);
814             
815             return true;
816         }
817         finally
818         {
819             lock.unlock();
820         }
821     }
822     
823     private void activate(Database inactiveDatabase, List JavaDoc<Database> activeDatabaseList, SynchronizationStrategy strategy) throws java.sql.SQLException JavaDoc
824     {
825         Database activeDatabase = this.getBalancer().next();
826         
827         Connection JavaDoc inactiveConnection = null;
828         Connection JavaDoc activeConnection = null;
829
830         List JavaDoc<Connection JavaDoc> connectionList = new ArrayList JavaDoc<Connection JavaDoc>(activeDatabaseList.size());
831         
832         try
833         {
834             Map JavaDoc<Database, ?> connectionFactoryMap = this.getConnectionFactoryMap();
835             
836             inactiveConnection = inactiveDatabase.connect(connectionFactoryMap.get(inactiveDatabase));
837             
838             Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> schemaMap = new HashMap JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>>();
839             
840             DatabaseMetaData JavaDoc metaData = inactiveConnection.getMetaData();
841
842             ResultSet JavaDoc resultSet = metaData.getTables(null, null, "%", new String JavaDoc[] { "TABLE" });
843             
844             while (resultSet.next())
845             {
846                 String JavaDoc table = resultSet.getString("TABLE_NAME");
847                 String JavaDoc schema = resultSet.getString("TABLE_SCHEM");
848
849                 List JavaDoc<String JavaDoc> tableList = schemaMap.get(schema);
850                 
851                 if (tableList == null)
852                 {
853                     tableList = new LinkedList JavaDoc<String JavaDoc>();
854                     
855                     schemaMap.put(schema, tableList);
856                 }
857                 
858                 tableList.add(table);
859             }
860             
861             resultSet.close();
862
863             activeConnection = activeDatabase.connect(connectionFactoryMap.get(activeDatabase));
864
865             Dialect dialect = this.getDialect();
866             
867             if (strategy.requiresTableLocking())
868             {
869                 logger.info(Messages.getMessage(Messages.TABLE_LOCK_ACQUIRE));
870                 
871                 Map JavaDoc<String JavaDoc, Map JavaDoc<String JavaDoc, String JavaDoc>> lockTableSQLMap = new HashMap JavaDoc<String JavaDoc, Map JavaDoc<String JavaDoc, String JavaDoc>>();
872                 
873                 // Lock all tables on all active databases
874
for (Database database: activeDatabaseList)
875                 {
876                     Connection JavaDoc connection = database.equals(activeDatabase) ? activeConnection : database.connect(connectionFactoryMap.get(database));
877                     
878                     connectionList.add(connection);
879                     
880                     connection.setAutoCommit(false);
881                     connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
882                     
883                     Statement JavaDoc statement = connection.createStatement();
884                     
885                     for (Map.Entry JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> schemaMapEntry: schemaMap.entrySet())
886                     {
887                         String JavaDoc schema = schemaMapEntry.getKey();
888                         
889                         Map JavaDoc<String JavaDoc, String JavaDoc> map = lockTableSQLMap.get(schema);
890                         
891                         if (map == null)
892                         {
893                             map = new HashMap JavaDoc<String JavaDoc, String JavaDoc>();
894                             
895                             lockTableSQLMap.put(schema, map);
896                         }
897                         
898                         for (String JavaDoc table: schemaMapEntry.getValue())
899                         {
900                             String JavaDoc sql = map.get(table);
901                             
902                             if (sql == null)
903                             {
904                                 sql = dialect.getLockTableSQL(metaData, schema, table);
905                                 
906                                 logger.debug(sql);
907                                 
908                                 map.put(table, sql);
909                             }
910                             
911                             statement.execute(sql);
912                         }
913                     }
914                     
915                     statement.close();
916                 }
917             }
918             
919             logger.info(Messages.getMessage(Messages.DATABASE_SYNC_START, inactiveDatabase, this));
920
921             strategy.synchronize(inactiveConnection, activeConnection, schemaMap, dialect);
922             
923             logger.info(Messages.getMessage(Messages.DATABASE_SYNC_END, inactiveDatabase, this));
924     
925             this.activate(inactiveDatabase);
926             
927             if (strategy.requiresTableLocking())
928             {
929                 logger.info(Messages.getMessage(Messages.TABLE_LOCK_ACQUIRE));
930                 
931                 // Release table locks
932
this.rollback(connectionList);
933             }
934         }
935         catch (java.sql.SQLException JavaDoc e)
936         {
937             this.rollback(connectionList);
938             
939             throw e;
940         }
941         finally
942         {
943             this.close(activeConnection);
944             this.close(inactiveConnection);
945             
946             for (Connection JavaDoc connection: connectionList)
947             {
948                 this.close(connection);
949             }
950         }
951     }
952     
953     private void rollback(List JavaDoc<Connection JavaDoc> connectionList)
954     {
955         for (Connection JavaDoc connection: connectionList)
956         {
957             try
958             {
959                 connection.rollback();
960                 connection.setAutoCommit(true);
961             }
962             catch (java.sql.SQLException JavaDoc e)
963             {
964                 logger.warn(e.toString(), e);
965             }
966         }
967     }
968     
969     private void close(Connection JavaDoc connection)
970     {
971         if (connection != null)
972         {
973             try
974             {
975                 if (!connection.isClosed())
976                 {
977                     connection.close();
978                 }
979             }
980             catch (java.sql.SQLException JavaDoc e)
981             {
982                 logger.warn(e.toString(), e);
983             }
984         }
985     }
986     
987     private class FailureDetectionTask implements Runnable JavaDoc
988     {
989         /**
990          * @see java.lang.Runnable#run()
991          */

992         public void run()
993         {
994             for (Database database: LocalDatabaseCluster.this.getBalancer().list())
995             {
996                 if (!LocalDatabaseCluster.this.isAlive(database))
997                 {
998                     if (LocalDatabaseCluster.this.deactivate(database))
999                     {
1000                        logger.warn(Messages.getMessage(Messages.DATABASE_NOT_ALIVE, database, LocalDatabaseCluster.this));
1001                    }
1002                }
1003            }
1004        }
1005    }
1006    
1007    private class AutoActivationTask implements Runnable JavaDoc
1008    {
1009        /**
1010         * @see java.lang.Runnable#run()
1011         */

1012        public void run()
1013        {
1014            for (Database database: LocalDatabaseCluster.this.getInactiveDatabaseSet())
1015            {
1016                try
1017                {
1018                    if (LocalDatabaseCluster.this.activate(database, LocalDatabaseCluster.this.getDefaultSynchronizationStrategy()))
1019                    {
1020                        logger.info(Messages.getMessage(Messages.DATABASE_ACTIVATED, database, LocalDatabaseCluster.this));
1021                    }
1022                }
1023                catch (java.sql.SQLException JavaDoc e)
1024                {
1025                    logger.warn(Messages.getMessage(Messages.DATABASE_ACTIVATE_FAILED, database, LocalDatabaseCluster.this), e);
1026
1027                    java.sql.SQLException JavaDoc exception = e.getNextException();
1028                    
1029                    while (exception != null)
1030                    {
1031                        logger.warn(exception.getMessage(), e);
1032                        
1033                        exception = exception.getNextException();
1034                    }
1035                }
1036                catch (InterruptedException JavaDoc e)
1037                {
1038                    break;
1039                }
1040            }
1041        }
1042    }
1043}
1044
Popular Tags