KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > raidb1 > RAIDb1


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Copyright (C) 2005-2006 Continuent, Inc.
7  * Contact: sequoia@continuent.org
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Initial developer(s): Emmanuel Cecchet
22  * Contributor(s): ______________________
23  */

24
25 package org.continuent.sequoia.controller.loadbalancer.raidb1;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.List JavaDoc;
31
32 import javax.management.ObjectName JavaDoc;
33
34 import org.continuent.sequoia.common.exceptions.BadConnectionException;
35 import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
36 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
37 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
38 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
39 import org.continuent.sequoia.common.i18n.Translate;
40 import org.continuent.sequoia.common.jmx.JmxConstants;
41 import org.continuent.sequoia.common.log.Trace;
42 import org.continuent.sequoia.common.xml.DatabasesXmlTags;
43 import org.continuent.sequoia.controller.backend.DatabaseBackend;
44 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
45 import org.continuent.sequoia.controller.backend.result.ExecuteResult;
46 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
47 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
48 import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
49 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
50 import org.continuent.sequoia.controller.connection.PooledConnection;
51 import org.continuent.sequoia.controller.jmx.MBeanServerManager;
52 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
53 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
54 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
55 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueuesControl;
56 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
57 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
58 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask;
59 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
60 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask;
61 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
62 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
63 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
64 import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
65 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
66 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
67 import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
68 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteQueryTask;
69 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteTask;
70 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
71 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask;
72 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
73 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
74 import org.continuent.sequoia.controller.requests.AbstractRequest;
75 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
76 import org.continuent.sequoia.controller.requests.ParsingGranularities;
77 import org.continuent.sequoia.controller.requests.SelectRequest;
78 import org.continuent.sequoia.controller.requests.StoredProcedure;
79 import org.continuent.sequoia.controller.semantic.SemanticBehavior;
80 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
81 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
82 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
83 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
84 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
85 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
86 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
87 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
88
89 /**
90  * RAIDb-1 load balancer.
91  * <p>
92  * This class is an abstract call because the read requests coming from the
93  * request controller are NOT treated here but in the subclasses. Transaction
94  * management and write requests are broadcasted to all backends.
95  *
96  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
97  * @version 1.0
98  */

99 public abstract class RAIDb1 extends AbstractLoadBalancer
100 {
101   //
102
// How the code is organized ?
103
//
104
// 1. Member variables
105
// 2. Constructor(s)
106
// 3. Request handling
107
// 4. Transaction handling
108
// 5. Backend management
109
//
110

111   protected static Trace logger = Trace
112                                            .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb1");
113
114   protected static Trace endUserLogger = Trace
115                                            .getLogger("org.continuent.sequoia.enduser");
116
117   /*
118    * Constructors
119    */

120
121   /**
122    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
123    * worker thread is created for each backend.
124    *
125    * @param vdb the virtual database this load balancer belongs to.
126    * @param waitForCompletionPolicy How many backends must complete before
127    * returning the result?
128    * @exception Exception if an error occurs
129    */

130   public RAIDb1(VirtualDatabase vdb,
131       WaitForCompletionPolicy waitForCompletionPolicy) throws Exception JavaDoc
132   {
133     super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.TABLE);
134     this.waitForCompletionPolicy = waitForCompletionPolicy;
135   }
136
137   /*
138    * Request Handling
139    */

140
141   /**
142    * Perform a read request. If request.isMustBroadcast() is set, then the query
143    * is broadcasted to all nodes else a single node is chosen according to the
144    * load balancing policy.
145    *
146    * @param request the <code>SelectRequest</code> to execute
147    * @param metadataCache MetadataCache (null if none)
148    * @return the corresponding <code>ControllerResultSet</code>
149    * @exception SQLException if an error occurs
150    * @throws AllBackendsFailedException if all backends failed to execute the
151    * request
152    */

153   public ControllerResultSet statementExecuteQuery(SelectRequest request,
154       MetadataCache metadataCache) throws SQLException JavaDoc,
155       AllBackendsFailedException
156   {
157     if (request.isMustBroadcast())
158       return execBroadcastReadRequest(request, metadataCache);
159     else
160       return execSingleBackendReadRequest(request, metadataCache);
161   }
162
163   /**
164    * Implementation specific execution of a request on a single backend chosen
165    * according to the load balancing strategy.
166    *
167    * @param request the request to execute
168    * @param metadataCache the metadata cache if any or null
169    * @return the ResultSet
170    * @throws SQLException if an error occurs
171    */

172   public abstract ControllerResultSet execSingleBackendReadRequest(
173       SelectRequest request, MetadataCache metadataCache) throws SQLException JavaDoc;
174
175   /**
176    * Broadcast a read request execution on all backends. This is similar to a
177    * write execution and is useful for queries such as SELECT ... FOR UPDATE.
178    *
179    * @param request the <code>SelectRequest</code> to execute
180    * @param metadataCache MetadataCache (null if none)
181    * @return the corresponding <code>ControllerResultSet</code>
182    * @exception SQLException if an error occurs
183    * @throws AllBackendsFailedException if all backends failed to execute the
184    * request
185    * @throws NoMoreBackendException if no backend was available to execute the
186    * stored procedure
187    */

188   private ControllerResultSet execBroadcastReadRequest(SelectRequest request,
189       MetadataCache metadataCache) throws SQLException JavaDoc,
190       AllBackendsFailedException, NoMoreBackendException
191   {
192     // Handle macros
193
handleMacros(request);
194
195     // Total ordering for distributed virtual databases.
196
boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true);
197
198     // Log lazy begin if needed
199
if (request.isLazyTransactionStart())
200       this.vdb.getRequestManager().logLazyTransactionBegin(
201           request.getTransactionId());
202
203     // Log request
204
if (recoveryLog != null)
205       request.setLogId(recoveryLog.logRequestExecuting(request));
206
207     int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String
208         .valueOf(request.getId()));
209
210     // Create the task and just wait for the first node to return
211
StatementExecuteQueryTask task = new StatementExecuteQueryTask(1,
212         nbOfThreads, request, metadataCache);
213
214     atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
215         removeFromTotalOrderQueue);
216
217     synchronized (task)
218     {
219       if (!task.hasCompleted())
220         waitForTaskCompletion(request.getTimeout() * 1000L, String
221             .valueOf(request.getId()), task);
222
223       checkTaskCompletion(task);
224     }
225
226     // Update log with success
227
if (recoveryLog != null)
228       recoveryLog.logRequestCompletion(request.getLogId(), true, request
229           .getExecTimeInMs());
230
231     return task.getResult();
232   }
233
234   /**
235    * Performs a write request. This request is broadcasted to all nodes.
236    *
237    * @param request an <code>AbstractWriteRequest</code>
238    * @return number of rows affected by the request
239    * @throws AllBackendsFailedException if all backends failed to execute the
240    * request
241    * @exception NoMoreBackendException if no backends are left to execute the
242    * request
243    * @exception SQLException if an error occurs
244    */

245   public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request)
246       throws AllBackendsFailedException, NoMoreBackendException, SQLException JavaDoc
247   {
248     return ((StatementExecuteUpdateTask) execWriteRequest(request, false, null))
249         .getResult();
250   }
251
252   /**
253    * Perform a write request and return the auto generated keys.
254    *
255    * @param request the request to execute
256    * @param metadataCache the metadataCache if any or null
257    * @return update count and auto generated keys.
258    * @throws AllBackendsFailedException if all backends failed to execute the
259    * request
260    * @exception NoMoreBackendException if no backends are left to execute the
261    * request
262    * @exception SQLException if an error occurs
263    */

264   public GeneratedKeysResult statementExecuteUpdateWithKeys(
265       AbstractWriteRequest request, MetadataCache metadataCache)
266       throws AllBackendsFailedException, NoMoreBackendException, SQLException JavaDoc
267   {
268     return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(request,
269         true, metadataCache)).getResult();
270   }
271
272   /**
273    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
274    * MetadataCache)
275    */

276   public ExecuteResult statementExecute(AbstractRequest request,
277       MetadataCache metadataCache) throws SQLException JavaDoc,
278       AllBackendsFailedException
279   {
280     StatementExecuteTask task = (StatementExecuteTask) callStoredProcedure(
281         request, STATEMENT_EXECUTE_TASK, metadataCache);
282     return task.getResult();
283   }
284
285   /**
286    * Common code for execWriteRequest(AbstractWriteRequest) and
287    * execWriteRequestWithKeys(AbstractWriteRequest).
288    * <p>
289    * Note that macros are processed here.
290    * <p>
291    * The result is given back in AbstractTask.getResult().
292    *
293    * @param request the request to execute
294    * @param useKeys true if this must give an auto generated keys ResultSet
295    * @param metadataCache the metadataCache if any or null
296    * @throws AllBackendsFailedException if all backends failed to execute the
297    * request
298    * @throws SQLException if an error occurs
299    */

300   private AbstractTask execWriteRequest(AbstractWriteRequest request,
301       boolean useKeys, MetadataCache metadataCache)
302       throws AllBackendsFailedException, NoMoreBackendException, SQLException JavaDoc
303   {
304     // Handle macros
305
handleMacros(request);
306
307     // Total ordering mainly for distributed virtual databases.
308
boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true);
309
310     // Log lazy begin if needed
311
if (request.isLazyTransactionStart())
312       this.vdb.getRequestManager().logLazyTransactionBegin(
313           request.getTransactionId());
314
315     // Log request
316
if (recoveryLog != null)
317       recoveryLog.logRequestExecuting(request);
318
319     int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String
320         .valueOf(request.getId()));
321
322     // Create the task
323
AbstractTask task;
324     if (useKeys)
325       task = new StatementExecuteUpdateWithKeysTask(getNbToWait(nbOfThreads),
326           nbOfThreads, request, metadataCache);
327     else
328       task = new StatementExecuteUpdateTask(getNbToWait(nbOfThreads),
329           nbOfThreads, request);
330
331     atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
332         removeFromTotalOrderQueue);
333
334     try
335     {
336       synchronized (task)
337       {
338         if (!task.hasCompleted())
339           waitForTaskCompletion(request.getTimeout() * 1000L, String
340               .valueOf(request.getId()), task);
341
342         checkTaskCompletion(task);
343         return task;
344       }
345     }
346     finally
347     {
348       if (!request.isAutoCommit())
349       { // Check that transaction was not aborted in parallel
350
try
351         {
352           this.vdb.getRequestManager().getTransactionMetaData(
353               new Long JavaDoc(request.getTransactionId()));
354         }
355         catch (SQLException JavaDoc e)
356         { // Transaction was aborted it cannot be found anymore in the active
357
// transaction list. Force an abort
358
logger
359               .info("Concurrent abort detected, re-enforcing abort of transaction "
360                   + request.getTransactionId());
361           abort(new TransactionMetaData(request.getTransactionId(), 0, request
362               .getLogin(), request.isPersistentConnection(), request
363               .getPersistentConnectionId()));
364         }
365       }
366     }
367   }
368
369   protected static final int STATEMENT_EXECUTE_QUERY = 0;
370   protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1;
371   protected static final int CALLABLE_STATEMENT_EXECUTE = 2;
372
373   /**
374    * Execute a read request on the selected backend.
375    *
376    * @param request the request to execute
377    * @param backend the backend that will execute the request
378    * @param metadataCache the metadataCache if any or null
379    * @return the ResultSet
380    * @throws SQLException if an error occurs
381    */

382   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
383       DatabaseBackend backend, MetadataCache metadataCache)
384       throws SQLException JavaDoc, UnreachableBackendException
385   {
386     // Handle macros
387
handleMacros(request);
388
389     // Ok, we have a backend, let's execute the request
390
AbstractConnectionManager cm = backend.getConnectionManager(request
391         .getLogin());
392
393     // Sanity check
394
if (cm == null)
395     {
396       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
397           new String JavaDoc[]{request.getLogin(), backend.getName()});
398       logger.error(msg);
399       throw new SQLException JavaDoc(msg);
400     }
401
402     // Execute the query
403
if (request.isAutoCommit())
404     {
405       ControllerResultSet rs = null;
406       boolean badConnection;
407       do
408       {
409         badConnection = false;
410         // Use a connection just for this request
411
PooledConnection c = null;
412         try
413         {
414           c = cm.retrieveConnectionInAutoCommit(request);
415         }
416         catch (UnreachableBackendException e1)
417         {
418           String JavaDoc msg = Translate.get(
419               "loadbalancer.backend.disabling.unreachable", backend.getName());
420           logger.error(msg);
421           endUserLogger.error(msg);
422           disableBackend(backend, true);
423           throw new UnreachableBackendException(Translate.get(
424               "loadbalancer.backend.unreacheable", backend.getName()));
425         }
426
427         // Sanity check
428
if (c == null)
429           throw new UnreachableBackendException(Translate.get(
430               "loadbalancer.backend.no.connection", backend.getName()));
431
432         // Execute Query
433
try
434         {
435           rs = executeStatementExecuteQueryOnBackend(request, backend, null, c
436               .getConnection(), metadataCache);
437           cm.releaseConnectionInAutoCommit(request, c);
438         }
439         catch (SQLException JavaDoc e)
440         {
441           cm.releaseConnectionInAutoCommit(request, c);
442           throw SQLExceptionFactory.getSQLException(e, Translate.get(
443               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
444                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
445                   backend.getName(), e.getMessage()}));
446         }
447         catch (BadConnectionException e)
448         { // Get rid of the bad connection
449
cm.deleteConnection(c);
450           if (request.isPersistentConnection())
451           {
452             cm.deletePersistentConnection(request.getPersistentConnectionId());
453           }
454           badConnection = true;
455         }
456         catch (UnreachableBackendException e)
457         {
458           String JavaDoc msg = Translate.get(
459               "loadbalancer.backend.disabling.unreachable", backend.getName());
460           logger.error(msg);
461           endUserLogger.error(msg);
462           disableBackend(backend, true);
463           throw new UnreachableBackendException(Translate.get(
464               "loadbalancer.backend.unreacheable", backend.getName()));
465         }
466         catch (Throwable JavaDoc e)
467         {
468
469           logger.error("Unexpected exception:", e);
470           cm.releaseConnectionInAutoCommit(request, c);
471           throw new SQLException JavaDoc(Translate.get(
472               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
473                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
474                   backend.getName(), e.getMessage()}));
475         }
476       }
477       while (badConnection);
478       if (logger.isDebugEnabled())
479         logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
480             String.valueOf(request.getId()), backend.getName()}));
481       return rs;
482     }
483     else
484     { // Inside a transaction
485
Connection JavaDoc c;
486       long tid = request.getTransactionId();
487
488       try
489       {
490         c = backend
491             .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
492       }
493       catch (UnreachableBackendException e1)
494       {
495         String JavaDoc msg = Translate.get(
496             "loadbalancer.backend.disabling.unreachable", backend.getName());
497         logger.error(msg);
498         endUserLogger.error(msg);
499         disableBackend(backend, true);
500         throw new UnreachableBackendException(Translate.get(
501             "loadbalancer.backend.unreacheable", backend.getName()));
502       }
503       catch (NoTransactionStartWhenDisablingException e)
504       {
505         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
506             new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
507                 backend.getName()});
508         logger.error(msg);
509         throw new UnreachableBackendException(msg);
510       }
511
512       // Sanity check
513
if (c == null)
514         throw new SQLException JavaDoc(Translate.get(
515             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
516                 String.valueOf(tid), backend.getName()}));
517
518       // Execute Query
519
ControllerResultSet rs = null;
520       try
521       {
522         rs = executeStatementExecuteQueryOnBackend(request, backend, null, c,
523             metadataCache);
524       }
525       catch (SQLException JavaDoc e)
526       {
527         throw SQLExceptionFactory.getSQLException(e, Translate.get(
528             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
529                 request.getSqlShortForm(vdb.getSqlShortFormLength()),
530                 backend.getName(), e.getMessage()}));
531       }
532       catch (BadConnectionException e)
533       { // Connection failed, so did the transaction
534
// Disable the backend.
535
cm.deleteConnection(tid);
536         String JavaDoc msg = Translate.get(
537             "loadbalancer.backend.disabling.connection.failure", backend
538                 .getName());
539         logger.error(msg);
540         endUserLogger.error(msg);
541         disableBackend(backend, true);
542         throw new UnreachableBackendException(msg);
543       }
544       catch (UnreachableBackendException e)
545       {
546         String JavaDoc msg = Translate.get(
547             "loadbalancer.backend.disabling.unreachable", backend.getName());
548         logger.error(msg);
549         endUserLogger.error(msg);
550         disableBackend(backend, true);
551         throw e;
552       }
553       catch (Throwable JavaDoc e)
554       {
555         logger.error("Unexpected exception:", e);
556         throw new SQLException JavaDoc(Translate.get(
557             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
558                 request.getSqlShortForm(vdb.getSqlShortFormLength()),
559                 backend.getName(), e.getMessage()}));
560       }
561       if (logger.isDebugEnabled())
562         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
563             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
564                 backend.getName()}));
565       return rs;
566     }
567   }
568
569   /**
570    * Execute a stored procedure on the selected backend.
571    *
572    * @param proc the stored procedure to execute
573    * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
574    * false if we must call CallableStatement.execute()
575    * @param backend the backend that will execute the request
576    * @param metadataCache the metadataCache if any or null
577    * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
578    * <code>ExecuteResult</code> object otherwise
579    * @throws SQLException if an error occurs
580    */

581   protected Object JavaDoc executeStoredProcedureOnBackend(StoredProcedure proc,
582       boolean isExecuteQuery, DatabaseBackend backend,
583       MetadataCache metadataCache) throws SQLException JavaDoc,
584       UnreachableBackendException
585   {
586     // Ok, we have a backend, let's execute the request
587
AbstractConnectionManager cm = backend
588         .getConnectionManager(proc.getLogin());
589
590     // Sanity check
591
if (cm == null)
592     {
593       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
594           new String JavaDoc[]{proc.getLogin(), backend.getName()});
595       logger.error(msg);
596       throw new SQLException JavaDoc(msg);
597     }
598
599     // Execute the query
600
if (proc.isAutoCommit())
601     {
602       Object JavaDoc result = null;
603       boolean badConnection;
604       PooledConnection c = null;
605       do
606       {
607         badConnection = false;
608         PooledConnection previousConnection = c;
609         // Use a connection just for this request
610
try
611         {
612           c = cm.retrieveConnectionInAutoCommit(proc);
613         }
614         catch (UnreachableBackendException e1)
615         {
616           String JavaDoc msg = Translate.get(
617               "loadbalancer.backend.disabling.unreachable", backend.getName());
618           logger.error(msg);
619           endUserLogger.error(msg);
620           disableBackend(backend, true);
621           throw new UnreachableBackendException(Translate.get(
622               "loadbalancer.backend.unreacheable", backend.getName()));
623         }
624
625         // Sanity check
626
if (c == null || c == previousConnection)
627           throw new UnreachableBackendException(Translate.get(
628               "loadbalancer.backend.no.connection", backend.getName()));
629
630         // Execute Query
631
try
632         {
633           if (isExecuteQuery)
634             result = AbstractLoadBalancer
635                 .executeCallableStatementExecuteQueryOnBackend(proc, backend,
636                     null, c.getConnection(), metadataCache);
637           else
638             result = AbstractLoadBalancer
639                 .executeCallableStatementExecuteOnBackend(proc, backend, null,
640                     c.getConnection(), metadataCache);
641         }
642         catch (BadConnectionException e)
643         { // Get rid of the bad connection
644
cm.deleteConnection(c);
645           if (proc.isPersistentConnection())
646             cm.deletePersistentConnection(proc.getPersistentConnectionId());
647           badConnection = true;
648         }
649         catch (Throwable JavaDoc e)
650         {
651           logger.error("Unexpected exception:", e);
652           throw new SQLException JavaDoc(Translate.get(
653               "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
654                   proc.getSqlShortForm(vdb.getSqlShortFormLength()),
655                   backend.getName(), e.getMessage()}));
656         }
657         finally
658         {
659           cm.releaseConnectionInAutoCommit(proc, c);
660         }
661       }
662       while (badConnection);
663
664       if (logger.isDebugEnabled())
665         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
666             new String JavaDoc[]{String.valueOf(proc.getId()), backend.getName()}));
667
668       return result;
669     }
670     else
671     { // Inside a transaction
672
Connection JavaDoc c;
673       long tid = proc.getTransactionId();
674
675       try
676       {
677         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm);
678       }
679       catch (UnreachableBackendException e)
680       {
681         // intercept the UBE to disable the unreachable backend
682
// and propagate the exception
683
endUserLogger.error(Translate.get(
684             "loadbalancer.backend.disabling.unreachable", backend.getName()));
685         disableBackend(backend, true);
686         throw e;
687       }
688       catch (NoTransactionStartWhenDisablingException e)
689       {
690         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
691             new String JavaDoc[]{proc.getSqlShortForm(vdb.getSqlShortFormLength()),
692                 backend.getName()});
693         logger.error(msg);
694         throw new UnreachableBackendException(msg);
695       }
696
697       // Sanity check
698
if (c == null)
699         throw new SQLException JavaDoc(Translate.get(
700             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
701                 String.valueOf(tid), backend.getName()}));
702
703       // Execute Query
704
try
705       {
706         if (logger.isDebugEnabled())
707           logger.debug(Translate.get("loadbalancer.execute.transaction.on",
708               new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
709                   backend.getName()}));
710         if (isExecuteQuery)
711           return AbstractLoadBalancer
712               .executeCallableStatementExecuteQueryOnBackend(proc, backend,
713                   null, c, metadataCache);
714         else
715           return AbstractLoadBalancer.executeCallableStatementExecuteOnBackend(
716               proc, backend, null, c, metadataCache);
717       }
718       catch (BadConnectionException e)
719       { // Connection failed, so did the transaction
720
// Disable the backend.
721
cm.deleteConnection(tid);
722         String JavaDoc msg = Translate.get(
723             "loadbalancer.backend.disabling.connection.failure", backend
724                 .getName());
725         logger.error(msg);
726         endUserLogger.error(msg);
727         disableBackend(backend, true);
728         throw new UnreachableBackendException(msg);
729       }
730       catch (Throwable JavaDoc e)
731       {
732         logger.error("Unexpected exception:", e);
733         throw new SQLException JavaDoc(Translate.get(
734             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
735                 proc.getSqlShortForm(vdb.getSqlShortFormLength()),
736                 backend.getName(), e.getMessage()}));
737       }
738     }
739   }
740
741   /**
742    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
743    * MetadataCache)
744    */

745   public ControllerResultSet callableStatementExecuteQuery(
746       StoredProcedure proc, MetadataCache metadataCache) throws SQLException JavaDoc,
747       AllBackendsFailedException
748   {
749     CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure(
750         proc, EXECUTE_QUERY_TASK, metadataCache);
751     return task.getResult();
752   }
753
754   /**
755    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
756    */

757   public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc)
758       throws SQLException JavaDoc, AllBackendsFailedException
759   {
760     CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure(
761         proc, EXECUTE_UPDATE_TASK, null);
762     return task.getResult();
763   }
764
765   /**
766    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
767    * MetadataCache)
768    */

769   public ExecuteResult callableStatementExecute(StoredProcedure proc,
770       MetadataCache metadataCache) throws SQLException JavaDoc,
771       AllBackendsFailedException
772   {
773     CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure(
774         proc, CALLABLE_EXECUTE_TASK, metadataCache);
775     return task.getResult();
776   }
777
778   private static final int EXECUTE_QUERY_TASK = 0;
779   private static final int EXECUTE_UPDATE_TASK = 1;
780   private static final int CALLABLE_EXECUTE_TASK = 2;
781   private static final int STATEMENT_EXECUTE_TASK = 3;
782
783   /**
784    * Post the stored procedure call in the threads task list.
785    * <p>
786    * Note that macros are also processed here.
787    *
788    * @param request the stored procedure to call or the request to execute if
789    * taskType is STATEMENT_EXECUTE_TASK
790    * @param taskType one of EXECUTE_QUERY_TASK, EXECUTE_UPDATE_TASK,
791    * CALLABLE_EXECUTE_TASK or STATEMENT_EXECUTE_TASK
792    * @param metadataCache the metadataCache if any or null
793    * @return the task that has been executed (caller can get the result by
794    * calling getResult())
795    * @throws SQLException if an error occurs
796    * @throws AllBackendsFailedException if all backends failed to execute the
797    * stored procedure
798    * @throws NoMoreBackendException if no backend was available to execute the
799    * stored procedure
800    */

801   private AbstractTask callStoredProcedure(AbstractRequest request,
802       int taskType, MetadataCache metadataCache) throws SQLException JavaDoc,
803       AllBackendsFailedException, NoMoreBackendException
804   {
805     // Handle macros
806
handleMacros(request);
807
808     // Total ordering mainly for distributed virtual databases.
809
boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true);
810
811     // Log lazy begin if needed
812
if (request.isLazyTransactionStart())
813       this.vdb.getRequestManager().logLazyTransactionBegin(
814           request.getTransactionId());
815
816     // Log request
817
if (recoveryLog != null)
818     {
819       boolean mustLog = !request.isReadOnly();
820       if (taskType != STATEMENT_EXECUTE_TASK)
821       { // faster than (request instanceof StoredProcedure)
822
SemanticBehavior semantic = ((StoredProcedure) request).getSemantic();
823         mustLog = (semantic == null) || !semantic.isReadOnly();
824       }
825       if (mustLog)
826         recoveryLog.logRequestExecuting(request);
827     }
828
829     int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String
830         .valueOf(request.getId()));
831
832     // Create the task
833
AbstractTask task;
834     switch (taskType)
835     {
836       case EXECUTE_QUERY_TASK :
837         task = new CallableStatementExecuteQueryTask(getNbToWait(nbOfThreads),
838             nbOfThreads, (StoredProcedure) request, metadataCache);
839         break;
840       case EXECUTE_UPDATE_TASK :
841         task = new CallableStatementExecuteUpdateTask(getNbToWait(nbOfThreads),
842             nbOfThreads, (StoredProcedure) request);
843         break;
844       case CALLABLE_EXECUTE_TASK :
845         task = new CallableStatementExecuteTask(getNbToWait(nbOfThreads),
846             nbOfThreads, (StoredProcedure) request, metadataCache);
847         break;
848       case STATEMENT_EXECUTE_TASK :
849         task = new StatementExecuteTask(getNbToWait(nbOfThreads), nbOfThreads,
850             (AbstractWriteRequest) request, metadataCache);
851         break;
852       default :
853         throw new RuntimeException JavaDoc("Unhandled task type " + taskType
854             + " in callStoredProcedure");
855     }
856
857     atomicTaskPostInQueueAndReleaseLock(request, task, nbOfThreads,
858         removeFromTotalOrderQueue);
859
860     try
861     {
862       synchronized (task)
863       {
864         if (!task.hasCompleted())
865           waitForTaskCompletion(request.getTimeout() * 1000L, String
866               .valueOf(request.getId()), task);
867
868         checkTaskCompletion(task);
869         return task;
870       }
871     }
872     finally
873     {
874       if (!request.isAutoCommit())
875       { // Check that transaction was not aborted in parallel
876
try
877         {
878           this.vdb.getRequestManager().getTransactionMetaData(
879               new Long JavaDoc(request.getTransactionId()));
880         }
881         catch (SQLException JavaDoc e)
882         { // Transaction was aborted it cannot be found anymore in the active
883
// transaction list. Force an abort
884
logger
885               .info("Concurrent abort detected, re-inforcing abort of transaction "
886                   + request.getTransactionId());
887           abort(new TransactionMetaData(request.getTransactionId(), 0, request
888               .getLogin(), request.isPersistentConnection(), request
889               .getPersistentConnectionId()));
890         }
891       }
892     }
893   }
894
895   /**
896    * Check the completion status of the task and throws appropriate Exceptions
897    * if the status of the task was not successful, otherwise do nothing.
898    *
899    * @param task the completed AbstractTask
900    * @throws AllBackendsFailedException if all backends failed to execute the
901    * request
902    * @exception NoMoreBackendException if no backends are left to execute the
903    * request
904    * @exception SQLException if an error occurs
905    */

906   private void checkTaskCompletion(AbstractTask task)
907       throws NoMoreBackendException, AllBackendsFailedException, SQLException JavaDoc
908   {
909     AbstractRequest request = task.getRequest();
910
911     if (task.getSuccess() > 0)
912       return;
913
914     // Check that someone failed, it might be the case that we only have
915
// disabling backends left and they have not played this query (thus
916
// none of them have succeeded or failed).
917
if (task.getFailed() == 0)
918     {
919       throw new NoMoreBackendException(Translate
920           .get("loadbalancer.backendlist.empty"));
921     }
922
923     if (task.getSuccess() == 0)
924     {
925       // All backends that executed the query failed
926
List JavaDoc exceptions = task.getExceptions();
927       if (exceptions == null)
928         throw new AllBackendsFailedException(Translate.get(
929             "loadbalancer.request.failed.all", new Object JavaDoc[]{request.getType(),
930                 String.valueOf(request.getId())}));
931       else
932       {
933         String JavaDoc errorMsg = Translate.get("loadbalancer.request.failed.stack",
934             new Object JavaDoc[]{request.getType(), String.valueOf(request.getId())})
935             + "\n";
936         SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
937             errorMsg);
938         // This maybe just invalid SQL, see SEQUOIA-809
939
logger.debug(ex.getMessage());
940         throw ex;
941       }
942     }
943   }
944
945   /**
946    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
947    */

948   public ControllerResultSet getPreparedStatementGetMetaData(
949       AbstractRequest request) throws SQLException JavaDoc
950   {
951     // Choose a backend
952
try
953     {
954       vdb.acquireReadLockBackendLists();
955     }
956     catch (InterruptedException JavaDoc e)
957     {
958       String JavaDoc msg = Translate.get(
959           "loadbalancer.backendlist.acquire.readlock.failed", e);
960       logger.error(msg);
961       throw new SQLException JavaDoc(msg);
962     }
963
964     /*
965      * The backend that will execute the query
966      */

967     DatabaseBackend backend = null;
968
969     // Note that vdb lock is released in the finally clause of this try/catch
970
// block
971
try
972     {
973       ArrayList JavaDoc backends = vdb.getBackends();
974       int size = backends.size();
975
976       if (size == 0)
977         throw new SQLException JavaDoc(Translate.get(
978             "loadbalancer.execute.no.backend.available", request.getId()));
979
980       // Choose the first available backend
981
for (int i = 0; i < size; i++)
982       {
983         DatabaseBackend b = (DatabaseBackend) backends.get(i);
984         if (b.isReadEnabled())
985         {
986           backend = b;
987           break;
988         }
989       }
990     }
991     catch (Throwable JavaDoc e)
992     {
993       String JavaDoc msg = Translate.get("loadbalancer.execute.find.backend.failed",
994           new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
995               e.getMessage()});
996       logger.error(msg, e);
997       throw new SQLException JavaDoc(msg);
998     }
999     finally
1000    {
1001      vdb.releaseReadLockBackendLists();
1002    }
1003
1004    if (backend == null)
1005      throw new NoMoreBackendException(Translate.get(
1006          "loadbalancer.execute.no.backend.enabled", request.getId()));
1007
1008    // Ok, we have a backend, let's execute the request
1009
AbstractConnectionManager cm = backend.getConnectionManager(request
1010        .getLogin());
1011
1012    // Sanity check
1013
if (cm == null)
1014    {
1015      String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
1016          new String JavaDoc[]{request.getLogin(), backend.getName()});
1017      logger.error(msg);
1018      throw new SQLException JavaDoc(msg);
1019    }
1020
1021    // Execute the query
1022
if (request.isAutoCommit())
1023    {
1024      ControllerResultSet rs = null;
1025      boolean badConnection;
1026      do
1027      {
1028        badConnection = false;
1029        // Use a connection just for this request
1030
PooledConnection c = null;
1031        try
1032        {
1033          c = cm.retrieveConnectionInAutoCommit(request);
1034        }
1035        catch (UnreachableBackendException e1)
1036        {
1037          String JavaDoc msg = Translate.get(
1038              "loadbalancer.backend.disabling.unreachable", backend.getName());
1039          logger.error(msg);
1040          endUserLogger.error(msg);
1041          disableBackend(backend, true);
1042          // Retry on a different backend
1043
return getPreparedStatementGetMetaData(request);
1044        }
1045
1046        // Sanity check
1047
if (c == null)
1048          throw new SQLException JavaDoc(Translate.get(
1049              "loadbalancer.backend.no.connection", backend.getName()));
1050
1051        // Execute Query
1052
try
1053        {
1054          rs = preparedStatementGetMetaDataOnBackend(
1055              request.getSqlOrTemplate(), backend, c.getConnection());
1056          cm.releaseConnectionInAutoCommit(request, c);
1057        }
1058        catch (SQLException JavaDoc e)
1059        {
1060          cm.releaseConnectionInAutoCommit(request, c);
1061          throw SQLExceptionFactory.getSQLException(e, Translate.get(
1062              "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1063                  request.getSqlShortForm(vdb.getSqlShortFormLength()),
1064                  backend.getName(), e.getMessage()}));
1065        }
1066        catch (BadConnectionException e)
1067        { // Get rid of the bad connection
1068
cm.deleteConnection(c);
1069          badConnection = true;
1070        }
1071        catch (Throwable JavaDoc e)
1072        {
1073          cm.releaseConnectionInAutoCommit(request, c);
1074
1075          logger.error("Unexpected exception:", e);
1076          throw new SQLException JavaDoc(Translate.get(
1077              "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1078                  request.getSqlShortForm(vdb.getSqlShortFormLength()),
1079                  backend.getName(), e.getMessage()}));
1080        }
1081      }
1082      while (badConnection);
1083      if (logger.isDebugEnabled())
1084        logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
1085            String.valueOf(request.getId()), backend.getName()}));
1086      return rs;
1087    }
1088    else
1089    { // Inside a transaction
1090
Connection JavaDoc c;
1091      long tid = request.getTransactionId();
1092
1093      try
1094      {
1095        c = backend
1096            .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
1097      }
1098      catch (UnreachableBackendException e1)
1099      {
1100        String JavaDoc msg = Translate.get(
1101            "loadbalancer.backend.disabling.unreachable", backend.getName());
1102        logger.error(msg);
1103        endUserLogger.error(msg);
1104        disableBackend(backend, true);
1105        throw new SQLException JavaDoc(Translate.get(
1106            "loadbalancer.backend.unreacheable", backend.getName()));
1107      }
1108      catch (NoTransactionStartWhenDisablingException e)
1109      {
1110        String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
1111            new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
1112                backend.getName()});
1113        logger.error(msg);
1114        throw new SQLException JavaDoc(msg);
1115      }
1116
1117      // Sanity check
1118
if (c == null)
1119        throw new SQLException JavaDoc(Translate.get(
1120            "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1121                String.valueOf(tid), backend.getName()}));
1122
1123      // Execute Query
1124
ControllerResultSet rs = null;
1125      try
1126      {
1127        rs = preparedStatementGetMetaDataOnBackend(request.getSqlOrTemplate(),
1128            backend, c);
1129      }
1130      catch (SQLException JavaDoc e)
1131      {
1132        throw e;
1133      }
1134      catch (BadConnectionException e)
1135      { // Connection failed, so did the transaction
1136
// Disable the backend.
1137
cm.deleteConnection(tid);
1138        String JavaDoc msg = Translate.get(
1139            "loadbalancer.backend.disabling.connection.failure", backend
1140                .getName());
1141        logger.error(msg);
1142        endUserLogger.error(msg);
1143        disableBackend(backend, true);
1144        throw new SQLException JavaDoc(msg);
1145      }
1146      catch (Throwable JavaDoc e)
1147      {
1148
1149        logger.error("Unexpected exception:", e);
1150        throw new SQLException JavaDoc(Translate.get(
1151            "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1152                request.getSqlShortForm(vdb.getSqlShortFormLength()),
1153                backend.getName(), e.getMessage()}));
1154      }
1155      if (logger.isDebugEnabled())
1156        logger.debug(Translate.get("loadbalancer.execute.transaction.on",
1157            new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
1158                backend.getName()}));
1159      return rs;
1160    }
1161  }
1162
1163  /*
1164   * Transaction management
1165   */

1166
1167  /**
1168   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1169   */

1170  public void abort(TransactionMetaData tm) throws SQLException JavaDoc
1171  {
1172    long tid = tm.getTransactionId();
1173    boolean executeRollback = false;
1174    DistributedRollback toqObject = null;
1175    /*
1176     * Let previous queries be flushed into the load balancer queues so that we
1177     * can abort them and that no queries for that transaction wait in the total
1178     * order queue while we are aborting. Note that the wait and remove from the
1179     * total order queue will be done in the call to rollback at the end of this
1180     * method.
1181     */

1182    if (vdb.getTotalOrderQueue() != null)
1183    {
1184      toqObject = new DistributedRollback(tm.getLogin(), tid);
1185      waitForTotalOrder(toqObject, false);
1186    }
1187
1188    try
1189    {
1190      // Acquire the lock
1191
String JavaDoc requestDescription = "abort " + tid;
1192      int nbOfThreads = acquireLockAndCheckNbOfThreads(toqObject,
1193          requestDescription);
1194
1195      boolean rollbackInProgress = false;
1196      synchronized (enabledBackends)
1197      {
1198        // Abort all queries on all backends that have started this transaction
1199
for (int i = 0; i < nbOfThreads; i++)
1200        {
1201          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1202          rollbackInProgress = rollbackInProgress
1203              || backend.getTaskQueues().abortAllQueriesForTransaction(tid);
1204        }
1205      }
1206
1207      // Release the lock
1208
backendListLock.releaseRead();
1209
1210      if (rollbackInProgress)
1211      { // already aborting
1212
if (vdb.getTotalOrderQueue() != null)
1213          removeObjectFromAndNotifyTotalOrderQueue(toqObject);
1214        return;
1215      }
1216
1217      executeRollback = true;
1218      rollback(tm);
1219    }
1220    catch (NoMoreBackendException ignore)
1221    {
1222      if (!executeRollback && (recoveryLog != null))
1223        recoveryLog.logAbort(tm); // Executing status
1224
}
1225  }
1226
1227  /**
1228   * Begins a new transaction.
1229   *
1230   * @param tm the transaction marker metadata
1231   * @exception SQLException if an error occurs
1232   */

1233  public final void begin(TransactionMetaData tm) throws SQLException JavaDoc
1234  {
1235  }
1236
1237  /**
1238   * Commits a transaction.
1239   *
1240   * @param tm the transaction marker metadata
1241   * @exception SQLException if an error occurs
1242   */

1243  public void commit(TransactionMetaData tm) throws SQLException JavaDoc
1244  {
1245    long tid = tm.getTransactionId();
1246    Long JavaDoc lTid = new Long JavaDoc(tid);
1247
1248    // Ordering for distributed virtual database
1249
boolean canTakeReadLock = false;
1250    DistributedCommit totalOrderCommit = null;
1251    if (vdb.getTotalOrderQueue() != null)
1252    {
1253      // Total ordering mainly for distributed virtual databases.
1254
// If waitForTotalOrder returns true then the query has been scheduled in
1255
// total order and there is no need to take a write lock later to resolve
1256
// potential conflicts.
1257
totalOrderCommit = new DistributedCommit(tm.getLogin(), tid);
1258      canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1259      if (!canTakeReadLock)
1260        // This is a local commit no total order info
1261
totalOrderCommit = null;
1262    }
1263
1264    // Update the recovery log
1265
if (recoveryLog != null)
1266      recoveryLog.logCommit(tm);
1267
1268    // Acquire the lock
1269
String JavaDoc requestDescription = "commit " + tid;
1270    int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderCommit,
1271        requestDescription);
1272
1273    // Build the list of backends that need to commit this transaction
1274
ArrayList JavaDoc commitList = new ArrayList JavaDoc(nbOfThreads);
1275    for (int i = 0; i < nbOfThreads; i++)
1276    {
1277      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1278      if (backend.isStartedTransaction(lTid))
1279        commitList.add(backend);
1280    }
1281
1282    int nbOfThreadsToCommit = commitList.size();
1283    CommitTask task = null;
1284    if (nbOfThreadsToCommit != 0)
1285      task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1286          nbOfThreadsToCommit, tm);
1287
1288    // Post the task in the non-conflicting queues.
1289
synchronized (enabledBackends)
1290    {
1291      for (int i = 0; i < nbOfThreadsToCommit; i++)
1292      {
1293        DatabaseBackend backend = (DatabaseBackend) commitList.get(i);
1294        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1295      }
1296    }
1297
1298    // Release the lock
1299
backendListLock.releaseRead();
1300
1301    // Unblock next query from total order queue
1302
if (totalOrderCommit != null)
1303      removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1304
1305    // Check if someone had something to commit
1306
if (task == null)
1307      return;
1308
1309    synchronized (task)
1310    {
1311      if (!task.hasCompleted())
1312        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1313
1314      if (task.getSuccess() == 0)
1315      { // All tasks failed
1316
List JavaDoc exceptions = task.getExceptions();
1317        if (exceptions == null)
1318          throw new SQLException JavaDoc(Translate.get(
1319              "loadbalancer.commit.all.failed", tid));
1320        else
1321        {
1322          String JavaDoc errorMsg = Translate.get("loadbalancer.commit.failed.stack",
1323              tid)
1324              + "\n";
1325          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1326              errorMsg);
1327          logger.error(ex.getMessage());
1328          throw ex;
1329        }
1330      }
1331    }
1332  }
1333
1334  /**
1335   * Rollbacks a transaction.
1336   *
1337   * @param tm the transaction marker metadata
1338   * @exception SQLException if an error occurs
1339   */

1340  public void rollback(TransactionMetaData tm) throws SQLException JavaDoc
1341  {
1342    long tid = tm.getTransactionId();
1343    Long JavaDoc lTid = new Long JavaDoc(tid);
1344
1345    // Ordering for distributed virtual database
1346
DistributedRollback totalOrderRollback = null;
1347    boolean canTakeReadLock = false;
1348    if (vdb.getTotalOrderQueue() != null)
1349    {
1350      totalOrderRollback = new DistributedRollback(tm.getLogin(), tid);
1351      // Total ordering mainly for distributed virtual databases.
1352
// If waitForTotalOrder returns true then the query has been scheduled in
1353
// total order and there is no need to take a write lock later to resolve
1354
// potential conflicts.
1355
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1356      if (!canTakeReadLock)
1357        // This is a local rollback no total order info
1358
totalOrderRollback = null;
1359    }
1360
1361    // Update the recovery log
1362
if (recoveryLog != null)
1363      recoveryLog.logRollback(tm);
1364
1365    // Acquire the lock
1366
String JavaDoc requestDescription = "rollback " + tid;
1367    int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRollback,
1368        requestDescription);
1369
1370    // Build the list of backends that need to rollback this transaction
1371
ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1372    for (int i = 0; i < nbOfThreads; i++)
1373    {
1374      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1375      if (backend.isStartedTransaction(lTid))
1376        rollbackList.add(backend);
1377    }
1378
1379    int nbOfThreadsToRollback = rollbackList.size();
1380    RollbackTask task = null;
1381    task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1382        nbOfThreadsToRollback, tm);
1383
1384    // Post the task in the non-conflicting queues.
1385
synchronized (enabledBackends)
1386    {
1387      for (int i = 0; i < nbOfThreadsToRollback; i++)
1388      {
1389        DatabaseBackend backend = (DatabaseBackend) rollbackList.get(i);
1390        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1391      }
1392    }
1393
1394    // Release the lock
1395
backendListLock.releaseRead();
1396
1397    // Unblock next query from total order queue
1398
if (totalOrderRollback != null)
1399      removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1400
1401    // Check if someone had something to rollback
1402
if (nbOfThreadsToRollback == 0)
1403      return;
1404
1405    synchronized (task)
1406    {
1407      if (!task.hasCompleted())
1408        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1409
1410      if (task.getSuccess() > 0)
1411        return;
1412
1413      // All tasks failed
1414
List JavaDoc exceptions = task.getExceptions();
1415      if (exceptions == null)
1416        throw new SQLException JavaDoc(Translate.get(
1417            "loadbalancer.rollback.all.failed", tid));
1418      else
1419      {
1420        String JavaDoc errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
1421            tid)
1422            + "\n";
1423        SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1424            errorMsg);
1425        logger.error(ex.getMessage());
1426        throw ex;
1427      }
1428    }
1429  }
1430
1431  /**
1432   * Rollback a transaction to a savepoint
1433   *
1434   * @param tm The transaction marker metadata
1435   * @param savepointName The name of the savepoint
1436   * @throws SQLException if an error occurs
1437   */

1438  public void rollbackToSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1439      throws SQLException JavaDoc
1440  {
1441    long tid = tm.getTransactionId();
1442    Long JavaDoc lTid = new Long JavaDoc(tid);
1443
1444    // Ordering for distributed virtual database
1445
DistributedRollbackToSavepoint totalOrderRollback = null;
1446    boolean canTakeReadLock = false;
1447    if (vdb.getTotalOrderQueue() != null)
1448    {
1449      totalOrderRollback = new DistributedRollbackToSavepoint(tid,
1450          savepointName);
1451      // Total ordering mainly for distributed virtual databases.
1452
// If waitForTotalOrder returns true then the query has been scheduled in
1453
// total order and there is no need to take a write lock later to resolve
1454
// potential conflicts.
1455
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1456      if (!canTakeReadLock)
1457        // This is a local commit no total order info
1458
totalOrderRollback = null;
1459    }
1460
1461    // Update the recovery log
1462
if (recoveryLog != null)
1463      recoveryLog.logRollbackToSavepoint(tm, savepointName);
1464
1465    // Acquire the lock
1466
String JavaDoc requestDescription = "rollback " + savepointName + " " + tid;
1467    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1468
1469    // Build the list of backends that need to rollback this transaction
1470
ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1471    for (int i = 0; i < nbOfThreads; i++)
1472    {
1473      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1474      if (backend.isStartedTransaction(lTid))
1475        rollbackList.add(backend);
1476    }
1477
1478    int nbOfThreadsToRollback = rollbackList.size();
1479    RollbackToSavepointTask task = null;
1480    if (nbOfThreadsToRollback != 0)
1481      task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback),
1482          nbOfThreadsToRollback, tm, savepointName);
1483
1484    // Post the task in the non-conflicting queues.
1485
synchronized (enabledBackends)
1486    {
1487      for (int i = 0; i < nbOfThreadsToRollback; i++)
1488      {
1489        DatabaseBackend backend = (DatabaseBackend) rollbackList.get(i);
1490        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1491      }
1492    }
1493
1494    // Release the lock
1495
backendListLock.releaseRead();
1496
1497    // Unblock next query from total order queue
1498
if (totalOrderRollback != null)
1499      removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1500
1501    // Check if someone had something to rollback
1502
if (task == null)
1503      return;
1504
1505    synchronized (task)
1506    {
1507      if (!task.hasCompleted())
1508        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1509
1510      if (task.getSuccess() == 0)
1511      { // All tasks failed
1512
List JavaDoc exceptions = task.getExceptions();
1513        if (exceptions == null)
1514          throw new SQLException JavaDoc(Translate.get(
1515              "loadbalancer.rollbacksavepoint.all.failed", new String JavaDoc[]{
1516                  savepointName, String.valueOf(tid)}));
1517        else
1518        {
1519          String JavaDoc errorMsg = Translate.get(
1520              "loadbalancer.rollbacksavepoint.failed.stack", new String JavaDoc[]{
1521                  savepointName, String.valueOf(tid)})
1522              + "\n";
1523          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1524              errorMsg);
1525          logger.error(ex.getMessage());
1526          throw ex;
1527        }
1528      }
1529    }
1530  }
1531
1532  /**
1533   * Release a savepoint from a transaction
1534   *
1535   * @param tm The transaction marker metadata
1536   * @param savepointName The name of the savepoint ro release
1537   * @throws SQLException if an error occurs
1538   */

1539  public void releaseSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1540      throws SQLException JavaDoc
1541  {
1542    long tid = tm.getTransactionId();
1543    Long JavaDoc lTid = new Long JavaDoc(tid);
1544
1545    // Ordering for distributed virtual database
1546
DistributedReleaseSavepoint totalOrderRelease = null;
1547    boolean canTakeReadLock = false;
1548    if (vdb.getTotalOrderQueue() != null)
1549    {
1550      totalOrderRelease = new DistributedReleaseSavepoint(tid, savepointName);
1551      // Total ordering mainly for distributed virtual databases.
1552
// If waitForTotalOrder returns true then the query has been scheduled in
1553
// total order and there is no need to take a write lock later to resolve
1554
// potential conflicts.
1555
canTakeReadLock = waitForTotalOrder(totalOrderRelease, false);
1556      if (!canTakeReadLock)
1557        // This is a local commit no total order info
1558
totalOrderRelease = null;
1559    }
1560
1561    // Update the recovery log
1562
if (recoveryLog != null)
1563      recoveryLog.logReleaseSavepoint(tm, savepointName);
1564
1565    // Acquire the lock
1566
String JavaDoc requestDescription = "release savepoint " + savepointName + " "
1567        + tid;
1568    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1569
1570    // Build the list of backends that need to rollback this transaction
1571
ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1572    for (int i = 0; i < nbOfThreads; i++)
1573    {
1574      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1575      if (backend.isStartedTransaction(lTid))
1576        savepointList.add(backend);
1577    }
1578
1579    int nbOfSavepoints = savepointList.size();
1580    ReleaseSavepointTask task = null;
1581    if (nbOfSavepoints != 0)
1582      task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads,
1583          tm, savepointName);
1584
1585    // Post the task in the non-conflicting queues.
1586
synchronized (enabledBackends)
1587    {
1588      for (int i = 0; i < nbOfSavepoints; i++)
1589      {
1590        DatabaseBackend backend = (DatabaseBackend) savepointList.get(i);
1591        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1592      }
1593    }
1594
1595    // Release the lock
1596
backendListLock.releaseRead();
1597
1598    // Unblock next query from total order queue
1599
if (totalOrderRelease != null)
1600      removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease);
1601
1602    // Check if someone had something to release
1603
if (task == null)
1604      return;
1605
1606    synchronized (task)
1607    {
1608      if (!task.hasCompleted())
1609        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1610
1611      if (task.getSuccess() == 0)
1612      { // All tasks failed
1613
List JavaDoc exceptions = task.getExceptions();
1614        if (exceptions == null)
1615          throw new SQLException JavaDoc(Translate.get(
1616              "loadbalancer.releasesavepoint.all.failed", new String JavaDoc[]{
1617                  savepointName, String.valueOf(tid)}));
1618        else
1619        {
1620          String JavaDoc errorMsg = Translate.get(
1621              "loadbalancer.releasesavepoint.failed.stack", new String JavaDoc[]{
1622                  savepointName, String.valueOf(tid)})
1623              + "\n";
1624          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1625              errorMsg);
1626          logger.error(ex.getMessage());
1627          throw ex;
1628        }
1629      }
1630    }
1631  }
1632
1633  /**
1634   * Set a savepoint to a transaction.
1635   *
1636   * @param tm The transaction marker metadata
1637   * @param savepointName The name of the new savepoint
1638   * @throws SQLException if an error occurs
1639   */

1640  public void setSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1641      throws SQLException JavaDoc
1642  {
1643    long tid = tm.getTransactionId();
1644
1645    // Ordering for distributed virtual database
1646
DistributedSetSavepoint totalOrderSavepoint = null;
1647    boolean canTakeReadLock = false;
1648    if (vdb.getTotalOrderQueue() != null)
1649    {
1650      totalOrderSavepoint = new DistributedSetSavepoint(tm.getLogin(), tid,
1651          savepointName);
1652      // Total ordering mainly for distributed virtual databases.
1653
// If waitForTotalOrder returns true then the query has been scheduled in
1654
// total order and there is no need to take a write lock later to resolve
1655
// potential conflicts.
1656
canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false);
1657      if (!canTakeReadLock)
1658        // This is a local commit no total order info
1659
totalOrderSavepoint = null;
1660    }
1661
1662    // Update the recovery log
1663
if (recoveryLog != null)
1664      recoveryLog.logSetSavepoint(tm, savepointName);
1665
1666    // Acquire the lock
1667
String JavaDoc requestDescription = "set savepoint " + savepointName + " " + tid;
1668    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1669
1670    SavepointTask task = null;
1671
1672    // Post the task in the non-conflicting queues of all backends.
1673
synchronized (enabledBackends)
1674    {
1675      if (nbOfThreads != 0)
1676      {
1677        task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm,
1678            savepointName);
1679        for (int i = 0; i < nbOfThreads; i++)
1680        {
1681          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1682          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1683        }
1684      }
1685    }
1686
1687    // Release the lock
1688
backendListLock.releaseRead();
1689
1690    // Unblock next query from total order queue
1691
if (totalOrderSavepoint != null)
1692      removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint);
1693
1694    // Check if someone had something to release
1695
if (task == null)
1696      return;
1697
1698    synchronized (task)
1699    {
1700      if (!task.hasCompleted())
1701        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1702
1703      if (task.getSuccess() == 0)
1704      { // All tasks failed
1705
List JavaDoc exceptions = task.getExceptions();
1706        if (exceptions == null)
1707          throw new SQLException JavaDoc(Translate.get(
1708              "loadbalancer.setsavepoint.all.failed", new String JavaDoc[]{
1709                  savepointName, String.valueOf(tid)}));
1710        else
1711        {
1712          String JavaDoc errorMsg = Translate.get(
1713              "loadbalancer.setsavepoint.failed.stack", new String JavaDoc[]{
1714                  savepointName, String.valueOf(tid)})
1715              + "\n";
1716          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1717              errorMsg);
1718          logger.error(ex.getMessage());
1719          throw ex;
1720        }
1721      }
1722    }
1723  }
1724
1725  //
1726
// Utility functions
1727
//
1728

1729  /**
1730   * Check in which queue the task should be posted and atomically posts the
1731   * task in the queue of all backends. The list of locks acquired by the
1732   * request is set on the task if the request is in autocommit mode (if it is
1733   * in a transaction it is automatically added to the transaction lock list).
1734   * The list of lock can be null if no lock has been acquired.
1735   *
1736   * @param task the task to post
1737   * @param nbOfThreads number of threads in the backend list (must already be
1738   * locked)
1739   * @param removeFromTotalOrderQueue true if the query must be removed from the
1740   * total order queue
1741   */

1742  private void atomicTaskPostInQueueAndReleaseLock(AbstractRequest request,
1743      AbstractTask task, int nbOfThreads, boolean removeFromTotalOrderQueue)
1744  {
1745    synchronized (enabledBackends)
1746    {
1747      for (int i = 0; i < nbOfThreads; i++)
1748      {
1749        BackendTaskQueues queues = ((DatabaseBackend) enabledBackends.get(i))
1750            .getTaskQueues();
1751        queues.addTaskToBackendTotalOrderQueue(task);
1752      }
1753    }
1754
1755    backendListLock.releaseRead();
1756
1757    // Unblock next query from total order queue
1758
if (removeFromTotalOrderQueue)
1759    {
1760      removeObjectFromAndNotifyTotalOrderQueue(request);
1761    }
1762  }
1763
1764  /**
1765   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1766   * long)
1767   */

1768  public void closePersistentConnection(String JavaDoc login,
1769      long persistentConnectionId) throws SQLException JavaDoc
1770  {
1771    /*
1772     * We assume a synchronous execution and connection closing can only come
1773     * after all requests have been executed in that connection. We post to all
1774     * backends and let the task deal with whether that backend had a persistent
1775     * connection or not.
1776     */

1777
1778    String JavaDoc requestDescription = "closing persistent connection "
1779        + persistentConnectionId;
1780    int nbOfThreads = 0;
1781
1782    DistributedClosePersistentConnection totalOrderQueueObject = null;
1783    boolean removefromTotalOrder = false;
1784    if (vdb.getTotalOrderQueue() != null)
1785    {
1786      totalOrderQueueObject = new DistributedClosePersistentConnection(login,
1787          persistentConnectionId);
1788      removefromTotalOrder = waitForTotalOrder(totalOrderQueueObject, false);
1789    }
1790
1791    ClosePersistentConnectionTask task = null;
1792    try
1793    {
1794      nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1795
1796      task = new ClosePersistentConnectionTask(getNbToWait(nbOfThreads),
1797          nbOfThreads, login, persistentConnectionId);
1798
1799      // Post the task in the non-conflicting queues.
1800
synchronized (enabledBackends)
1801      {
1802        for (int i = 0; i < nbOfThreads; i++)
1803        {
1804          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1805          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1806        }
1807      }
1808
1809      // Release the lock
1810
backendListLock.releaseRead();
1811
1812      if (removefromTotalOrder)
1813        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1814      totalOrderQueueObject = null;
1815
1816      synchronized (task)
1817      {
1818        if (!task.hasCompleted())
1819          try
1820          {
1821            waitForTaskCompletion(0, requestDescription, task);
1822          }
1823          catch (SQLException JavaDoc ignore)
1824          {
1825          }
1826      }
1827    }
1828    finally
1829    {
1830      if (totalOrderQueueObject != null)
1831      { // NoMoreBackendException occured
1832
removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1833      }
1834
1835      if (logger.isDebugEnabled())
1836        logger.debug(requestDescription + " completed on " + nbOfThreads
1837            + " backends.");
1838    }
1839  }
1840
1841  /**
1842   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1843   * long)
1844   */

1845  public void openPersistentConnection(String JavaDoc login, long persistentConnectionId)
1846      throws SQLException JavaDoc
1847  {
1848    String JavaDoc requestDescription = "opening persistent connection "
1849        + persistentConnectionId;
1850    int nbOfThreads = 0;
1851
1852    DistributedOpenPersistentConnection totalOrderQueueObject = null;
1853    if (vdb.getTotalOrderQueue() != null)
1854    {
1855      totalOrderQueueObject = new DistributedOpenPersistentConnection(login,
1856          persistentConnectionId);
1857      waitForTotalOrder(totalOrderQueueObject, true);
1858    }
1859
1860    OpenPersistentConnectionTask task = null;
1861    try
1862    {
1863      nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1864
1865      task = new OpenPersistentConnectionTask(getNbToWait(nbOfThreads),
1866          nbOfThreads, login, persistentConnectionId);
1867
1868      // Post the task in the non-conflicting queues.
1869
synchronized (enabledBackends)
1870      {
1871        for (int i = 0; i < nbOfThreads; i++)
1872        {
1873          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1874          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1875        }
1876      }
1877
1878      // Release the lock
1879
backendListLock.releaseRead();
1880
1881      removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1882      totalOrderQueueObject = null;
1883
1884      synchronized (task)
1885      {
1886        if (!task.hasCompleted())
1887          try
1888          {
1889            waitForTaskCompletion(0, requestDescription, task);
1890          }
1891          catch (SQLException JavaDoc ignore)
1892          {
1893          }
1894      }
1895    }
1896    finally
1897    {
1898      if (totalOrderQueueObject != null)
1899      { // NoMoreBackendException occured
1900
removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1901      }
1902
1903      if (logger.isDebugEnabled())
1904        logger.debug(requestDescription + " completed on " + nbOfThreads
1905            + " backends.");
1906    }
1907  }
1908
1909  /**
1910   * Enables a Backend that was previously disabled.
1911   * <p>
1912   * Ask the corresponding connection manager to initialize the connections if
1913   * needed.
1914   * <p>
1915   * No sanity checks are performed by this function.
1916   *
1917   * @param db the database backend to enable
1918   * @param writeEnabled True if the backend must be enabled for writes
1919   * @throws SQLException if an error occurs
1920   */

1921  public synchronized void enableBackend(DatabaseBackend db,
1922      boolean writeEnabled) throws SQLException JavaDoc
1923  {
1924    if (!db.isInitialized())
1925      db.initializeConnections();
1926
1927    if (writeEnabled && db.isWriteCanBeEnabled())
1928    {
1929      BackendTaskQueues taskqueues = new BackendTaskQueues(db,
1930          waitForCompletionPolicy, this.vdb.getRequestManager());
1931      // Create the new backend task queues
1932
try
1933      {
1934        ObjectName JavaDoc taskQueuesObjectName = JmxConstants
1935            .getBackendTaskQueuesObjectName(db.getVirtualDatabaseName(), db
1936                .getName());
1937        if (MBeanServerManager.getInstance().isRegistered(taskQueuesObjectName))
1938        {
1939          MBeanServerManager.unregister(taskQueuesObjectName);
1940        }
1941        MBeanServerManager.registerMBean(new BackendTaskQueuesControl(
1942            taskqueues), taskQueuesObjectName);
1943      }
1944      catch (Exception JavaDoc e)
1945      {
1946        if (logger.isWarnEnabled())
1947        {
1948          logger.warn("failed to register task queue mbeans for " + db, e);
1949        }
1950      }
1951      db.setTaskQueues(taskqueues);
1952      db.startWorkerThreads(this);
1953      db.startDeadlockDetectionThread(this.vdb);
1954      db.enableWrite();
1955    }
1956
1957    db.enableRead();
1958    try
1959    {
1960      backendListLock.acquireWrite();
1961    }
1962    catch (InterruptedException JavaDoc e)
1963    {
1964      logger.error("Error while acquiring write lock in enableBackend", e);
1965    }
1966
1967    synchronized (enabledBackends)
1968    {
1969      enabledBackends.add(db);
1970    }
1971
1972    backendListLock.releaseWrite();
1973  }
1974
1975  /**
1976   * Disables a backend that was previously enabled.
1977   * <p>
1978   * Ask the corresponding connection manager to finalize the connections if
1979   * needed.
1980   * <p>
1981   * No sanity checks are performed by this function.
1982   *
1983   * @param db the database backend to disable
1984   * @param forceDisable true if disabling must be forced on the backend
1985   * @throws SQLException if an error occurs
1986   */

1987  public void disableBackend(DatabaseBackend db, boolean forceDisable)
1988      throws SQLException JavaDoc
1989  {
1990    if (!db.disable())
1991    {
1992      // Another thread has already started the disable process
1993
return;
1994    }
1995    synchronized (this)
1996    {
1997      try
1998      {
1999        backendListLock.acquireWrite();
2000      }
2001      catch (InterruptedException JavaDoc e)
2002      {
2003        logger.error("Error while acquiring write lock in enableBackend", e);
2004      }
2005
2006      try
2007      {
2008        synchronized (enabledBackends)
2009        {
2010          enabledBackends.remove(db);
2011          if (enabledBackends.isEmpty())
2012          {
2013            // Cleanup schema for any remaining locks
2014
this.vdb.getRequestManager().setDatabaseSchema(null);
2015          }
2016        }
2017
2018        if (!forceDisable)
2019          terminateThreadsAndConnections(db);
2020      }
2021      finally
2022      {
2023        backendListLock.releaseWrite();
2024      }
2025
2026      if (forceDisable)
2027      {
2028        db.shutdownConnectionManagers();
2029        terminateThreadsAndConnections(db);
2030      }
2031
2032      // sanity check on backend's active transaction
2033
if (!db.getActiveTransactions().isEmpty())
2034      {
2035        if (logger.isWarnEnabled())
2036        {
2037          logger.warn("Active transactions after backend " + db.getName()
2038              + " is disabled: " + db.getActiveTransactions());
2039        }
2040      }
2041    }
2042  }
2043
2044  private void terminateThreadsAndConnections(DatabaseBackend db)
2045      throws SQLException JavaDoc
2046  {
2047    db.terminateWorkerThreads();
2048    db.terminateDeadlockDetectionThread();
2049
2050    if (db.isInitialized())
2051      db.finalizeConnections();
2052  }
2053
2054  //
2055
// Debug/Monitoring
2056
//
2057

2058  /**
2059   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2060   */

2061  public String JavaDoc getXmlImpl()
2062  {
2063    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
2064    info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2065    if (waitForCompletionPolicy != null)
2066      info.append(waitForCompletionPolicy.getXml());
2067    if (macroHandler != null)
2068      info.append(macroHandler.getXml());
2069    info.append(getRaidb1Xml());
2070    info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2071    return info.toString();
2072  }
2073
2074  /**
2075   * Surrounding raidb1 tags can be treated by <method>getXmlImpl </method>
2076   * above, but more detailed content have to be returned by the method
2077   * <method>getRaidb1Xml </method> below.
2078   *
2079   * @return content of Raidb1 xml
2080   */

2081  public abstract String JavaDoc getRaidb1Xml();
2082}
2083
Popular Tags