KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > raidb2 > RAIDb2


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): Jean-Bernard van Zuylen.
22  */

23
24 package org.continuent.sequoia.controller.loadbalancer.raidb2;
25
26 import java.sql.Connection JavaDoc;
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.List JavaDoc;
30
31 import org.continuent.sequoia.common.exceptions.BadConnectionException;
32 import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
33 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
34 import org.continuent.sequoia.common.exceptions.NotImplementedException;
35 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
36 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
37 import org.continuent.sequoia.common.i18n.Translate;
38 import org.continuent.sequoia.common.log.Trace;
39 import org.continuent.sequoia.common.xml.DatabasesXmlTags;
40 import org.continuent.sequoia.controller.backend.DatabaseBackend;
41 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
42 import org.continuent.sequoia.controller.backend.result.ExecuteResult;
43 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
44 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
45 import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
46 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
47 import org.continuent.sequoia.controller.connection.PooledConnection;
48 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
49 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
50 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
51 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
52 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException;
53 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
54 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule;
55 import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
56 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteQueryTask;
57 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteTask;
58 import org.continuent.sequoia.controller.loadbalancer.tasks.CallableStatementExecuteUpdateTask;
59 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
60 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
61 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
62 import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
63 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
64 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
65 import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
66 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateTask;
67 import org.continuent.sequoia.controller.loadbalancer.tasks.StatementExecuteUpdateWithKeysTask;
68 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
69 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
70 import org.continuent.sequoia.controller.requests.AbstractRequest;
71 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
72 import org.continuent.sequoia.controller.requests.ParsingGranularities;
73 import org.continuent.sequoia.controller.requests.SelectRequest;
74 import org.continuent.sequoia.controller.requests.StoredProcedure;
75 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
76 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedClosePersistentConnection;
77 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedCommit;
78 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedOpenPersistentConnection;
79 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedReleaseSavepoint;
80 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollback;
81 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRollbackToSavepoint;
82 import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedSetSavepoint;
83
84 /**
85  * RAIDb-2 load balancer.
86  * <p>
87  * This class is an abstract call because the read requests coming from the
88  * Request Manager are NOT treated here but in the subclasses. Transaction
89  * management and write requests are broadcasted to all backends owning the
90  * written table.
91  *
92  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
93  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
94  * </a>
95  * @version 1.0
96  */

97 public abstract class RAIDb2 extends AbstractLoadBalancer
98 {
99   //
100
// How the code is organized ?
101
// 1. Member variables
102
// 2. Constructor(s)
103
// 3. Request handling
104
// 4. Transaction handling
105
// 5. Backend management
106
//
107

108   protected CreateTablePolicy createTablePolicy;
109   protected static Trace logger = Trace
110                                          .getLogger("org.continuent.sequoia.controller.loadbalancer.raidb2");
111
112   /*
113    * Constructors
114    */

115
116   /**
117    * Creates a new RAIDb-2 request load balancer. A new backend worker thread is
118    * created for each backend.
119    *
120    * @param vdb the virtual database this load balancer belongs to.
121    * @param waitForCompletionPolicy how many backends must complete before
122    * returning the result ?
123    * @param createTablePolicy the policy defining how 'create table' statements
124    * should be handled
125    * @exception Exception if an error occurs
126    */

127   public RAIDb2(VirtualDatabase vdb,
128       WaitForCompletionPolicy waitForCompletionPolicy,
129       CreateTablePolicy createTablePolicy) throws Exception JavaDoc
130   {
131     super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
132
133     this.waitForCompletionPolicy = waitForCompletionPolicy;
134     this.createTablePolicy = createTablePolicy;
135   }
136
137   /*
138    * Request Handling
139    */

140
141   /**
142    * Implementation specific load balanced read execution.
143    *
144    * @param request an <code>SelectRequest</code>
145    * @param metadataCache the metadataCache if any or null
146    * @return the corresponding <code>java.sql.ResultSet</code>
147    * @exception SQLException if an error occurs
148    */

149   public abstract ControllerResultSet statementExecuteQuery(
150       SelectRequest request, MetadataCache metadataCache) throws SQLException JavaDoc;
151
152   /**
153    * Performs a write request. This request is broadcasted to all nodes that
154    * owns the table to be written.
155    *
156    * @param request an <code>AbstractWriteRequest</code>
157    * @return number of rows affected by the request
158    * @throws AllBackendsFailedException if all backends failed to execute the
159    * request
160    * @exception SQLException if an error occurs
161    */

162   public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request)
163       throws AllBackendsFailedException, SQLException JavaDoc
164   {
165     return ((StatementExecuteUpdateTask) execWriteRequest(request, false, null))
166         .getResult();
167   }
168
169   /**
170    * Perform a write request and return the auto generated keys.
171    *
172    * @param request the request to execute
173    * @param metadataCache the metadataCache if any or null
174    * @return update count and auto generated keys.
175    * @throws AllBackendsFailedException if all backends failed to execute the
176    * request
177    * @exception SQLException if an error occurs
178    */

179   public GeneratedKeysResult statementExecuteUpdateWithKeys(
180       AbstractWriteRequest request, MetadataCache metadataCache)
181       throws AllBackendsFailedException, SQLException JavaDoc
182   {
183     return ((StatementExecuteUpdateWithKeysTask) execWriteRequest(request,
184         true, metadataCache)).getResult();
185   }
186
187   /**
188    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
189    * MetadataCache)
190    */

191   public ExecuteResult statementExecute(AbstractRequest request,
192       MetadataCache metadataCache) throws SQLException JavaDoc,
193       AllBackendsFailedException
194   {
195     throw new NotImplementedException(
196         "Statement.execute() is currently not supported with RAIDb-2");
197   }
198
199   /**
200    * Common code for execWriteRequest(AbstractWriteRequest) and
201    * execWriteRequestWithKeys(AbstractWriteRequest).
202    * <p>
203    * Note that macros are processed here.
204    * <p>
205    * The result is given back in AbstractTask.getResult().
206    *
207    * @param request the request to execute
208    * @param useKeys true if this must give an auto generated keys ResultSet
209    * @param metadataCache the metadataCache if any or null
210    * @throws AllBackendsFailedException if all backends failed to execute the
211    * request
212    * @throws SQLException if an error occurs
213    */

214   private AbstractTask execWriteRequest(AbstractWriteRequest request,
215       boolean useKeys, MetadataCache metadataCache)
216       throws AllBackendsFailedException, SQLException JavaDoc
217   {
218     // Handle macros
219
handleMacros(request);
220
221     // Total ordering mainly for distributed virtual databases.
222
boolean removeFromTotalOrderQueue = waitForTotalOrder(request, true);
223
224     // Log lazy begin if needed
225
if (request.isLazyTransactionStart())
226       this.vdb.getRequestManager().logLazyTransactionBegin(
227           request.getTransactionId());
228
229     // Log request
230
if (recoveryLog != null)
231       recoveryLog.logRequestExecuting(request);
232
233     int nbOfThreads = acquireLockAndCheckNbOfThreads(request, String
234         .valueOf(request.getId()));
235
236     List JavaDoc writeList = new ArrayList JavaDoc();
237
238     if (request.isCreate())
239     {
240       try
241       {
242         writeList = getBackendsForCreateTableRequest(request.getTableName());
243       }
244       catch (CreateTableException e)
245       {
246         releaseLockAndUnlockNextQuery(request);
247         throw new SQLException JavaDoc(Translate.get(
248             "loadbalancer.create.table.rule.failed", e.getMessage()));
249       }
250     }
251     else
252     {
253       writeList = getBackendsWithTable(request.getTableName(), nbOfThreads);
254     }
255
256     nbOfThreads = writeList.size();
257     if (nbOfThreads == 0)
258     {
259       String JavaDoc msg = Translate.get("loadbalancer.execute.no.backend.found",
260           request.getSqlShortForm(vdb.getSqlShortFormLength()));
261       logger.warn(msg);
262
263       releaseLockAndUnlockNextQuery(request);
264       throw new SQLException JavaDoc(msg);
265     }
266     if (logger.isDebugEnabled())
267     {
268       logger.debug(Translate.get("loadbalancer.execute.on.several", String
269           .valueOf(request.getId()), String.valueOf(nbOfThreads)));
270     }
271
272     // Create the task
273
AbstractTask task;
274     if (useKeys)
275       task = new StatementExecuteUpdateWithKeysTask(getNbToWait(nbOfThreads),
276           nbOfThreads, request, metadataCache);
277     else
278       task = new StatementExecuteUpdateTask(getNbToWait(nbOfThreads),
279           nbOfThreads, request);
280
281     atomicTaskPostInQueueAndReleaseLock(request, task, writeList,
282         removeFromTotalOrderQueue);
283
284     synchronized (task)
285     {
286       if (!task.hasCompleted())
287         waitForTaskCompletion(request.getTimeout() * 1000L, String
288             .valueOf(request.getId()), task);
289
290       checkTaskCompletion(task);
291       return task;
292     }
293   }
294
295   /**
296    * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends containing
297    * the table identified by <code>tableName</code>.
298    *
299    * @param tableName name of the table
300    * @param nbOfThreads number of threads
301    * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends
302    * containing the table identified by <code>tableName</code>
303    */

304   // FIXME why don't we just iterate on the enabledBackends list (assuming we
305
// acquire/release the read lock)?
306
// TODO should be moved to VirtualDatabase
307
private List JavaDoc getBackendsWithTable(String JavaDoc tableName, int nbOfThreads)
308   {
309     List JavaDoc backendsWithTable = new ArrayList JavaDoc();
310     for (int i = 0; i < nbOfThreads; i++)
311     {
312       DatabaseBackend b = (DatabaseBackend) enabledBackends.get(i);
313       if (b.hasTable(tableName))
314         backendsWithTable.add(b);
315     }
316     return backendsWithTable;
317   }
318
319   /**
320    * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends with a
321    * transaction identified by <code>tid</code> already started.
322    *
323    * @param tid Transaction identifier
324    * @param nbOfThreads number of threads
325    * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends with a
326    * transaction identified by <code>tid</code> already started
327    */

328   // FIXME why don't we just iterate on the enabledBackends list (assuming we
329
// acquire/release the read lock)?
330
// TODO should be moved to VirtualDatabase
331
private List JavaDoc getBackendsWithStartedTransaction(Long JavaDoc tid, int nbOfThreads)
332   {
333     // Build the list of backends that need to commit this transaction
334
List JavaDoc backendsWithStartedTransaction = new ArrayList JavaDoc(nbOfThreads);
335     for (int i = 0; i < nbOfThreads; i++)
336     {
337       DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
338       if (backend.isStartedTransaction(tid))
339         backendsWithStartedTransaction.add(backend);
340     }
341     return backendsWithStartedTransaction;
342   }
343
344   /**
345    * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends containing
346    * the stored procedure identified by <code>procedureName</code>.
347    *
348    * @param procedureName name of the stored procedure
349    * @param nbOfParameters number of parameters of the stored procedure
350    * @param nbOfThreads number of threads
351    * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends
352    * containing the table identified by <code>tableName</code>
353    */

354   // FIXME why don't we just iterate on the enabledBackends list (assuming we
355
// acquire/release the read lock)?
356
// TODO should be moved to VirtualDatabase
357
private List JavaDoc getBackendsWithStoredProcedure(String JavaDoc procedureName,
358       int nbOfParameters, int nbOfThreads)
359   {
360     List JavaDoc backendsWithStoredProcedure = new ArrayList JavaDoc(nbOfThreads);
361     for (int i = 0; i < nbOfThreads; i++)
362     {
363       DatabaseBackend b = (DatabaseBackend) enabledBackends.get(i);
364       if (b.hasStoredProcedure(procedureName, nbOfParameters))
365         backendsWithStoredProcedure.add(b);
366     }
367     return backendsWithStoredProcedure;
368   }
369
370   /**
371    * Gets a <code>&lt;DatabaseBacken&gt;List</code> of all backends according
372    * to the create table policy.
373    *
374    * @param tableName the name of the table to create
375    * @return a <code>&lt;DatabaseBacken&gt;List</code> of all backends
376    * according to the create table policy.
377    * @throws CreateTableException if an error occurs while getting the backends
378    * according to the create table policy
379    */

380   // TODO should be moved to VirtualDatabase
381
private List JavaDoc getBackendsForCreateTableRequest(String JavaDoc tableName)
382       throws CreateTableException
383   {
384     // Choose the backend according to the defined policy
385
CreateTableRule rule = createTablePolicy.getTableRule(tableName);
386     if (rule == null)
387     {
388       rule = createTablePolicy.getDefaultRule();
389     }
390     return rule.getBackends(vdb.getBackends());
391   }
392
393   /**
394    * Executes a select request <em>within a transation</em> on a given
395    * backend. The transaction is lazily begun if it was not already started.
396    *
397    * @param request the Select request to execute
398    * @param backend the backend on which the request must be executed
399    * @param metadataCache the metadata cache
400    * @return a <code>ControllerResultSet</code>
401    * @throws SQLException if an error occurs while executing the request
402    */

403   private ControllerResultSet executeSelectRequestInTransaction(
404       SelectRequest request, DatabaseBackend backend,
405       MetadataCache metadataCache) throws SQLException JavaDoc
406   {
407     AbstractConnectionManager cm = backend.getConnectionManager(request
408         .getLogin());
409
410     if (cm == null)
411     {
412       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
413           request.getLogin(), backend.getName());
414       logger.error(msg);
415       throw new SQLException JavaDoc(msg);
416     }
417
418     Connection JavaDoc c;
419     long tid = request.getTransactionId();
420
421     try
422     {
423       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
424     }
425     catch (UnreachableBackendException e1)
426     {
427       logger.error(Translate.get("loadbalancer.backend.disabling.unreachable",
428           backend.getName()));
429       disableBackend(backend, true);
430       throw new SQLException JavaDoc(Translate.get("loadbalancer.backend.unreacheable",
431           backend.getName()));
432     }
433     catch (NoTransactionStartWhenDisablingException e)
434     {
435       String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling", request
436           .getSqlShortForm(vdb.getSqlShortFormLength()), backend.getName());
437       logger.error(msg);
438       throw new SQLException JavaDoc(msg);
439     }
440
441     if (c == null)
442       throw new SQLException JavaDoc(Translate.get(
443           "loadbalancer.unable.retrieve.connection", String.valueOf(tid),
444           backend.getName()));
445
446     try
447     {
448       ControllerResultSet rs = executeStatementExecuteQueryOnBackend(request,
449           backend, null, c, metadataCache);
450       if (logger.isDebugEnabled())
451       {
452         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
453             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
454                 backend.getName()}));
455       }
456       return rs;
457     }
458     catch (SQLException JavaDoc e)
459     {
460       throw e;
461     }
462     catch (BadConnectionException e)
463     { // Connection failed, so did the transaction
464
// Disable the backend.
465
cm.deleteConnection(tid);
466       String JavaDoc msg = Translate.get(
467           "loadbalancer.backend.disabling.connection.failure", backend
468               .getName());
469       logger.error(msg);
470       disableBackend(backend, true);
471       throw new SQLException JavaDoc(msg);
472     }
473     catch (Throwable JavaDoc e)
474     {
475       throw new SQLException JavaDoc(Translate.get(
476           "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
477               request.getSqlShortForm(vdb.getSqlShortFormLength()),
478               backend.getName(), e.getMessage()}));
479     }
480   }
481
482   /**
483    * Executes a select request <em>in autocommit mode</em> on a given backend.
484    *
485    * @param request the Select request to execute
486    * @param backend the backend on which the request must be executed
487    * @param metadataCache the metadata cache
488    * @return a <code>ControllerResultSet</code>
489    * @throws SQLException if an error occurs while executing the request
490    * @throws UnreachableBackendException if the backend is not reachable
491    */

492   private ControllerResultSet executeSelectRequestInAutoCommit(
493       SelectRequest request, DatabaseBackend backend,
494       MetadataCache metadataCache) throws SQLException JavaDoc,
495       UnreachableBackendException
496   {
497     AbstractConnectionManager cm = backend.getConnectionManager(request
498         .getLogin());
499
500     // Sanity check
501
if (cm == null)
502     {
503       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
504           request.getLogin(), backend.getName());
505       logger.error(msg);
506       throw new SQLException JavaDoc(msg);
507     }
508
509     ControllerResultSet rs = null;
510     boolean badConnection;
511     do
512     {
513       badConnection = false;
514       // Use a connection just for this request
515
PooledConnection c = null;
516       try
517       {
518         c = cm.retrieveConnectionInAutoCommit(request);
519       }
520       catch (UnreachableBackendException e1)
521       {
522         logger.error(Translate.get(
523             "loadbalancer.backend.disabling.unreachable", backend.getName()));
524         disableBackend(backend, true);
525         throw new UnreachableBackendException(Translate.get(
526             "loadbalancer.backend.unreacheable", backend.getName()));
527       }
528
529       // Sanity check
530
if (c == null)
531         throw new UnreachableBackendException("No more connections on backend "
532             + backend.getName());
533
534       // Execute Query
535
try
536       {
537         rs = executeStatementExecuteQueryOnBackend(request, backend, null, c
538             .getConnection(), metadataCache);
539         cm.releaseConnectionInAutoCommit(request, c);
540       }
541       catch (SQLException JavaDoc e)
542       {
543         cm.releaseConnectionInAutoCommit(request, c);
544         throw new SQLException JavaDoc(Translate.get(
545             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
546                 request.getSqlShortForm(vdb.getSqlShortFormLength()),
547                 backend.getName(), e.getMessage()}));
548       }
549       catch (BadConnectionException e)
550       { // Get rid of the bad connection
551
cm.deleteConnection(c);
552         badConnection = true;
553       }
554       catch (Throwable JavaDoc e)
555       {
556         throw new SQLException JavaDoc(Translate.get(
557             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
558                 request.getSqlShortForm(vdb.getSqlShortFormLength()),
559                 backend.getName(), e.getMessage()}));
560       }
561     }
562     while (badConnection);
563     if (logger.isDebugEnabled())
564       logger.debug(Translate.get("loadbalancer.execute.on", String
565           .valueOf(request.getId()), backend.getName()));
566     return rs;
567   }
568
569   /**
570    * Execute a read request on the selected backend.
571    *
572    * @param request the request to execute
573    * @param backend the backend that will execute the request
574    * @param metadataCache a metadataCache if any or null
575    * @return the ResultSet
576    * @throws SQLException if an error occurs
577    */

578   protected ControllerResultSet executeReadRequestOnBackend(
579       SelectRequest request, DatabaseBackend backend,
580       MetadataCache metadataCache) throws SQLException JavaDoc,
581       UnreachableBackendException
582   {
583     // Handle macros
584
handleMacros(request);
585
586     // Execute the query
587
if (request.isAutoCommit())
588     {
589       return executeSelectRequestInAutoCommit(request, backend, metadataCache);
590     }
591     else
592     {
593       return executeSelectRequestInTransaction(request, backend, metadataCache);
594     }
595   }
596
597   protected static final int CALLABLE_STATEMENT_EXECUTE_QUERY = 1;
598   protected static final int CALLABLE_STATEMENT_EXECUTE = 2;
599
600   /**
601    * Execute a stored procedure on the selected backend.
602    *
603    * @param proc the stored procedure to execute
604    * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
605    * false if we must call CallableStatement.execute()
606    * @param backend the backend that will execute the request
607    * @param metadataCache the metadataCache if any or null
608    * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
609    * <code>ExecuteResult</code> object otherwise
610    * @throws SQLException if an error occurs
611    */

612   protected Object JavaDoc executeStoredProcedureOnBackend(StoredProcedure proc,
613       boolean isExecuteQuery, DatabaseBackend backend,
614       MetadataCache metadataCache) throws SQLException JavaDoc,
615       UnreachableBackendException
616   {
617     // Execute the query
618
if (proc.isAutoCommit())
619     {
620       return executeStoredProcedureInAutoCommit(proc, isExecuteQuery, backend,
621           metadataCache);
622     }
623     else
624     {
625       return executeStoredProcedureInTransaction(proc, isExecuteQuery, backend,
626           metadataCache);
627     }
628   }
629
630   /**
631    * Executes a stored procedure <em>within a transation</em> on the selected
632    * backend. The transaction is lazily started if it has not already begun.
633    *
634    * @param proc the <em>autocommit</em> stored procedure to execute
635    * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
636    * false if we must call CallableStatement.execute()
637    * @param backend the backend that will execute the request
638    * @param metadataCache the metadataCache if any or null
639    * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
640    * <code>ExecuteResult</code> object otherwise
641    * @throws SQLException if an error occurs
642    */

643   private Object JavaDoc executeStoredProcedureInTransaction(StoredProcedure proc,
644       boolean isExecuteQuery, DatabaseBackend backend,
645       MetadataCache metadataCache) throws SQLException JavaDoc
646   {
647     // Inside a transaction
648
// Ok, we have a backend, let's execute the request
649
AbstractConnectionManager cm = backend
650         .getConnectionManager(proc.getLogin());
651
652     // Sanity check
653
if (cm == null)
654     {
655       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
656           proc.getLogin(), backend.getName());
657       logger.error(msg);
658       throw new SQLException JavaDoc(msg);
659     }
660
661     Connection JavaDoc c;
662     long tid = proc.getTransactionId();
663
664     try
665     {
666       c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm);
667     }
668     catch (UnreachableBackendException e1)
669     {
670       logger.error(Translate.get("loadbalancer.backend.disabling.unreachable",
671           backend.getName()));
672       disableBackend(backend, true);
673       throw new SQLException JavaDoc(Translate.get("loadbalancer.backend.unreacheable",
674           backend.getName()));
675     }
676     catch (NoTransactionStartWhenDisablingException e)
677     {
678       String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling", proc
679           .getSqlShortForm(vdb.getSqlShortFormLength()), backend.getName());
680       logger.error(msg);
681       throw new SQLException JavaDoc(msg);
682     }
683
684     // Sanity check
685
if (c == null)
686       throw new SQLException JavaDoc(Translate.get(
687           "loadbalancer.unable.retrieve.connection", String.valueOf(tid),
688           backend.getName()));
689
690     // Execute Query
691
try
692     {
693       if (isExecuteQuery)
694         return AbstractLoadBalancer
695             .executeCallableStatementExecuteQueryOnBackend(proc, backend, null,
696                 c, metadataCache);
697       else
698         return AbstractLoadBalancer.executeCallableStatementExecuteOnBackend(
699             proc, backend, null, c, metadataCache);
700     }
701     catch (Exception JavaDoc e)
702     {
703       throw new SQLException JavaDoc(Translate.get(
704           "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
705               proc.getSqlShortForm(vdb.getSqlShortFormLength()),
706               backend.getName(), e.getMessage()}));
707     }
708     finally
709     {
710       if (logger.isDebugEnabled())
711         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
712             new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
713                 backend.getName()}));
714     }
715   }
716
717   /**
718    * Executes a stored procedure <em>in autocommit mode</em> on the selected
719    * backend.
720    *
721    * @param proc the <em>autocommit</em> stored procedure to execute
722    * @param isExecuteQuery true if we must call CallableStatement.executeQuery,
723    * false if we must call CallableStatement.execute()
724    * @param backend the backend that will execute the request
725    * @param metadataCache the metadataCache if any or null
726    * @return a <code>ControllerResultSet</code> if isExecuteQuery is true, an
727    * <code>ExecuteResult</code> object otherwise
728    * @throws SQLException if an error occurs
729    */

730   private Object JavaDoc executeStoredProcedureInAutoCommit(StoredProcedure proc,
731       boolean isExecuteQuery, DatabaseBackend backend,
732       MetadataCache metadataCache) throws SQLException JavaDoc,
733       UnreachableBackendException
734   {
735     // Ok, we have a backend, let's execute the request
736
AbstractConnectionManager cm = backend
737         .getConnectionManager(proc.getLogin());
738
739     // Sanity check
740
if (cm == null)
741     {
742       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
743           proc.getLogin(), backend.getName());
744       logger.error(msg);
745       throw new SQLException JavaDoc(msg);
746     }
747
748     // Use a connection just for this request
749
PooledConnection c = null;
750     try
751     {
752       c = cm.retrieveConnectionInAutoCommit(proc);
753     }
754     catch (UnreachableBackendException e1)
755     {
756       logger.error(Translate.get("loadbalancer.backend.disabling.unreachable",
757           backend.getName()));
758       disableBackend(backend, true);
759       throw new UnreachableBackendException(Translate.get(
760           "loadbalancer.backend.unreacheable", backend.getName()));
761     }
762
763     // Sanity check
764
if (c == null)
765       throw new SQLException JavaDoc(Translate.get(
766           "loadbalancer.backend.no.connection", backend.getName()));
767
768     // Execute Query
769
try
770     {
771       if (isExecuteQuery)
772         return AbstractLoadBalancer
773             .executeCallableStatementExecuteQueryOnBackend(proc, backend, null,
774                 c.getConnection(), metadataCache);
775       else
776         return AbstractLoadBalancer.executeCallableStatementExecuteOnBackend(
777             proc, backend, null, c.getConnection(), metadataCache);
778     }
779     catch (Exception JavaDoc e)
780     {
781       throw new SQLException JavaDoc(Translate.get(
782           "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
783               proc.getSqlShortForm(vdb.getSqlShortFormLength()),
784               backend.getName(), e.getMessage()}));
785     }
786     finally
787     {
788       cm.releaseConnectionInAutoCommit(proc, c);
789       if (logger.isDebugEnabled())
790         logger.debug(Translate.get("loadbalancer.storedprocedure.on", String
791             .valueOf(proc.getId()), backend.getName()));
792     }
793   }
794
795   /**
796    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
797    * MetadataCache)
798    */

799   public ControllerResultSet callableStatementExecuteQuery(
800       StoredProcedure proc, MetadataCache metadataCache) throws SQLException JavaDoc,
801       AllBackendsFailedException
802   {
803     CallableStatementExecuteQueryTask task = (CallableStatementExecuteQueryTask) callStoredProcedure(
804         proc, EXECUTE_QUERY_TASK, metadataCache);
805     return task.getResult();
806   }
807
808   /**
809    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
810    */

811   public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc)
812       throws SQLException JavaDoc, AllBackendsFailedException
813   {
814     CallableStatementExecuteUpdateTask task = (CallableStatementExecuteUpdateTask) callStoredProcedure(
815         proc, EXECUTE_UPDATE_TASK, null);
816     return task.getResult();
817   }
818
819   /**
820    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(StoredProcedure,
821    * MetadataCache)
822    */

823   public ExecuteResult callableStatementExecute(StoredProcedure proc,
824       MetadataCache metadataCache) throws SQLException JavaDoc,
825       AllBackendsFailedException
826   {
827     CallableStatementExecuteTask task = (CallableStatementExecuteTask) callStoredProcedure(
828         proc, EXECUTE_TASK, metadataCache);
829     return task.getResult();
830   }
831
832   private static final int EXECUTE_QUERY_TASK = 0;
833   private static final int EXECUTE_UPDATE_TASK = 1;
834   private static final int EXECUTE_TASK = 2;
835
836   /**
837    * Post the stored procedure call in the threads task list.
838    * <p>
839    * Note that macros are also processed here.
840    *
841    * @param proc the stored procedure to call
842    * @param taskType one of EXECUTE_QUERY_TASK, EXECUTE_UPDATE_TASK or
843    * EXECUTE_TASK
844    * @param metadataCache the metadataCache if any or null
845    * @return the task that has been executed (caller can get the result by
846    * calling getResult())
847    * @throws SQLException if an error occurs
848    * @throws AllBackendsFailedException if all backends failed to execute the
849    * stored procedure
850    * @throws NoMoreBackendException if no backend was available to execute the
851    * stored procedure
852    */

853   private AbstractTask callStoredProcedure(StoredProcedure proc, int taskType,
854       MetadataCache metadataCache) throws SQLException JavaDoc,
855       AllBackendsFailedException, NoMoreBackendException
856   {
857     // Total ordering mainly for distributed virtual databases.
858
boolean removeFromTotalOrderQueue = waitForTotalOrder(proc, true);
859
860     // Handle macros
861
handleMacros(proc);
862
863     int nbOfThreads = acquireLockAndCheckNbOfThreads(proc, String.valueOf(proc
864         .getId()));
865
866     // Create the task
867
AbstractTask task;
868     switch (taskType)
869     {
870       case EXECUTE_QUERY_TASK :
871         task = new CallableStatementExecuteQueryTask(getNbToWait(nbOfThreads),
872             nbOfThreads, proc, metadataCache);
873         break;
874       case EXECUTE_UPDATE_TASK :
875         task = new CallableStatementExecuteUpdateTask(getNbToWait(nbOfThreads),
876             nbOfThreads, proc);
877         break;
878       case EXECUTE_TASK :
879         task = new CallableStatementExecuteTask(getNbToWait(nbOfThreads),
880             nbOfThreads, proc, metadataCache);
881         break;
882       default :
883         throw new RuntimeException JavaDoc("Unhandled task type " + taskType
884             + " in callStoredProcedure");
885     }
886
887     List JavaDoc backendList = getBackendsWithStoredProcedure(proc.getProcedureKey(),
888         proc.getNbOfParameters(), nbOfThreads);
889
890     if (backendList.size() == 0)
891     {
892       releaseLockAndUnlockNextQuery(proc);
893       throw new SQLException JavaDoc(Translate.get(
894           "loadbalancer.backend.no.required.storedprocedure", proc
895               .getProcedureKey()));
896     }
897
898     task.setTotalNb(backendList.size());
899     atomicTaskPostInQueueAndReleaseLock(proc, task, backendList,
900         removeFromTotalOrderQueue);
901
902     synchronized (task)
903     {
904       if (!task.hasCompleted())
905         waitForTaskCompletion(proc.getTimeout() * 1000L, String.valueOf(proc
906             .getId()), task);
907
908       checkTaskCompletion(task);
909       return task;
910     }
911   }
912
913   /**
914    * Check the completion status of the task and throws appropriate Exceptions
915    * if the status of the task was not successful, otherwise do nothing.
916    *
917    * @param task the completed AbstractTask
918    * @throws AllBackendsFailedException if all backends failed to execute the
919    * request
920    * @exception NoMoreBackendException if no backends are left to execute the
921    * request
922    * @exception SQLException if an error occurs
923    */

924   private void checkTaskCompletion(AbstractTask task)
925       throws NoMoreBackendException, AllBackendsFailedException, SQLException JavaDoc
926   {
927     AbstractRequest request = task.getRequest();
928
929     if (task.getSuccess() > 0)
930       return;
931
932     // Check that someone failed, it might be the case that we only have
933
// disabling backends left and they have not played this query (thus
934
// none of them have succeeded or failed).
935
if (task.getFailed() == 0)
936     {
937       throw new NoMoreBackendException(Translate
938           .get("loadbalancer.backendlist.empty"));
939     }
940
941     if (task.getSuccess() == 0)
942     {
943       // All backends that executed the query failed
944
List JavaDoc exceptions = task.getExceptions();
945       if (exceptions == null)
946         throw new AllBackendsFailedException(Translate.get(
947             "loadbalancer.request.failed.all", new Object JavaDoc[]{request.getType(),
948                 String.valueOf(request.getId())}));
949       else
950       {
951         String JavaDoc errorMsg = Translate.get("loadbalancer.request.failed.stack",
952             new Object JavaDoc[]{request.getType(), String.valueOf(request.getId())})
953             + "\n";
954         SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
955             errorMsg);
956         logger.error(ex.getMessage());
957         throw ex;
958       }
959     }
960   }
961
962   /**
963    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
964    */

965   public ControllerResultSet getPreparedStatementGetMetaData(
966       AbstractRequest request) throws SQLException JavaDoc
967   {
968     // Choose a backend
969
try
970     {
971       vdb.acquireReadLockBackendLists();
972     }
973     catch (InterruptedException JavaDoc e)
974     {
975       String JavaDoc msg = Translate.get(
976           "loadbalancer.backendlist.acquire.readlock.failed", e);
977       logger.error(msg);
978       throw new SQLException JavaDoc(msg);
979     }
980
981     /*
982      * The backend that will execute the query
983      */

984     DatabaseBackend backend = null;
985
986     // Note that vdb lock is released in the finally clause of this try/catch
987
// block
988
try
989     {
990       ArrayList JavaDoc backends = vdb.getBackends();
991       int size = backends.size();
992
993       if (size == 0)
994         throw new SQLException JavaDoc(Translate.get(
995             "loadbalancer.execute.no.backend.available", request.getId()));
996
997       // Choose the first available backend
998
for (int i = 0; i < size; i++)
999       {
1000        DatabaseBackend b = (DatabaseBackend) backends.get(i);
1001        if (b.isReadEnabled())
1002        {
1003          backend = b;
1004          break;
1005        }
1006      }
1007    }
1008    catch (Throwable JavaDoc e)
1009    {
1010      String JavaDoc msg = Translate.get("loadbalancer.execute.find.backend.failed",
1011          new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
1012              e.getMessage()});
1013      logger.error(msg, e);
1014      throw new SQLException JavaDoc(msg);
1015    }
1016    finally
1017    {
1018      vdb.releaseReadLockBackendLists();
1019    }
1020
1021    if (backend == null)
1022      throw new NoMoreBackendException(Translate.get(
1023          "loadbalancer.execute.no.backend.enabled", request.getId()));
1024
1025    // Ok, we have a backend, let's execute the request
1026
AbstractConnectionManager cm = backend.getConnectionManager(request
1027        .getLogin());
1028
1029    // Sanity check
1030
if (cm == null)
1031    {
1032      String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
1033          new String JavaDoc[]{request.getLogin(), backend.getName()});
1034      logger.error(msg);
1035      throw new SQLException JavaDoc(msg);
1036    }
1037
1038    // Execute the query
1039
if (request.isAutoCommit())
1040    {
1041      ControllerResultSet rs = null;
1042      boolean badConnection;
1043      do
1044      {
1045        badConnection = false;
1046        // Use a connection just for this request
1047
PooledConnection c = null;
1048        try
1049        {
1050          c = cm.retrieveConnectionInAutoCommit(request);
1051        }
1052        catch (UnreachableBackendException e1)
1053        {
1054          logger.error(Translate.get(
1055              "loadbalancer.backend.disabling.unreachable", backend.getName()));
1056          disableBackend(backend, true);
1057          // Retry on a different backend
1058
return getPreparedStatementGetMetaData(request);
1059        }
1060
1061        // Sanity check
1062
if (c == null)
1063          throw new SQLException JavaDoc(Translate.get(
1064              "loadbalancer.backend.no.connection", backend.getName()));
1065
1066        // Execute Query
1067
try
1068        {
1069          rs = preparedStatementGetMetaDataOnBackend(
1070              request.getSqlOrTemplate(), backend, c.getConnection());
1071          cm.releaseConnectionInAutoCommit(request, c);
1072        }
1073        catch (SQLException JavaDoc e)
1074        {
1075          cm.releaseConnectionInAutoCommit(request, c);
1076          throw SQLExceptionFactory.getSQLException(e, Translate.get(
1077              "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1078                  request.getSqlShortForm(vdb.getSqlShortFormLength()),
1079                  backend.getName(), e.getMessage()}));
1080        }
1081        catch (BadConnectionException e)
1082        { // Get rid of the bad connection
1083
cm.deleteConnection(c);
1084          badConnection = true;
1085        }
1086        catch (Throwable JavaDoc e)
1087        {
1088          cm.releaseConnectionInAutoCommit(request, c);
1089          throw new SQLException JavaDoc(Translate.get(
1090              "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1091                  request.getSqlShortForm(vdb.getSqlShortFormLength()),
1092                  backend.getName(), e.getMessage()}));
1093        }
1094      }
1095      while (badConnection);
1096      if (logger.isDebugEnabled())
1097        logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
1098            String.valueOf(request.getId()), backend.getName()}));
1099      return rs;
1100    }
1101    else
1102    { // Inside a transaction
1103
Connection JavaDoc c;
1104      long tid = request.getTransactionId();
1105
1106      try
1107      {
1108        c = backend
1109            .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
1110      }
1111      catch (UnreachableBackendException e1)
1112      {
1113        logger.error(Translate.get(
1114            "loadbalancer.backend.disabling.unreachable", backend.getName()));
1115        disableBackend(backend, true);
1116        throw new NoMoreBackendException(Translate.get(
1117            "loadbalancer.backend.unreacheable", backend.getName()));
1118      }
1119      catch (NoTransactionStartWhenDisablingException e)
1120      {
1121        String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
1122            new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
1123                backend.getName()});
1124        logger.error(msg);
1125        throw new SQLException JavaDoc(msg);
1126      }
1127
1128      // Sanity check
1129
if (c == null)
1130        throw new SQLException JavaDoc(Translate.get(
1131            "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1132                String.valueOf(tid), backend.getName()}));
1133
1134      // Execute Query
1135
ControllerResultSet rs = null;
1136      try
1137      {
1138        rs = preparedStatementGetMetaDataOnBackend(request.getSqlOrTemplate(),
1139            backend, c);
1140      }
1141      catch (SQLException JavaDoc e)
1142      {
1143        throw e;
1144      }
1145      catch (BadConnectionException e)
1146      { // Connection failed, so did the transaction
1147
// Disable the backend.
1148
cm.deleteConnection(tid);
1149        String JavaDoc msg = Translate.get(
1150            "loadbalancer.backend.disabling.connection.failure", backend
1151                .getName());
1152        logger.error(msg);
1153        disableBackend(backend, true);
1154        throw new SQLException JavaDoc(msg);
1155      }
1156      catch (Throwable JavaDoc e)
1157      {
1158        throw new SQLException JavaDoc(Translate.get(
1159            "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1160                request.getSqlShortForm(vdb.getSqlShortFormLength()),
1161                backend.getName(), e.getMessage()}));
1162      }
1163      if (logger.isDebugEnabled())
1164        logger.debug(Translate.get("loadbalancer.execute.transaction.on",
1165            new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
1166                backend.getName()}));
1167      return rs;
1168    }
1169  }
1170
1171  /*
1172   * Transaction management
1173   */

1174  /**
1175   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1176   */

1177  public void abort(TransactionMetaData tm) throws SQLException JavaDoc
1178  {
1179    long tid = tm.getTransactionId();
1180
1181    DistributedRollback toqObject = null;
1182    /*
1183     * Let previous queries be flushed into the load balancer queues so that we
1184     * can abort them and that no queries for that transaction wait in the total
1185     * order queue while we are aborting. Note that the wait and remove from the
1186     * total order queue will be done in the call to rollback at the end of this
1187     * method.
1188     */

1189    if (vdb.getTotalOrderQueue() != null)
1190    {
1191      toqObject = new DistributedRollback(tm.getLogin(), tid);
1192      waitForTotalOrder(toqObject, false);
1193    }
1194
1195    // Acquire the lock
1196
String JavaDoc requestDescription = "abort " + tid;
1197    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1198
1199    boolean rollbackInProgress = false;
1200    synchronized (enabledBackends)
1201    {
1202      // Abort all queries on all backends that have started this transaction
1203
for (int i = 0; i < nbOfThreads; i++)
1204      {
1205        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1206        rollbackInProgress = rollbackInProgress
1207            || backend.getTaskQueues().abortAllQueriesForTransaction(tid);
1208      }
1209    }
1210
1211    // Release the lock
1212
backendListLock.releaseRead();
1213
1214    if (rollbackInProgress)
1215    { // already aborting
1216
if (vdb.getTotalOrderQueue() != null)
1217        removeObjectFromAndNotifyTotalOrderQueue(toqObject);
1218      return;
1219    }
1220
1221    rollback(tm);
1222  }
1223
1224  /**
1225   * Begins a new transaction.
1226   *
1227   * @param tm the transaction marker metadata
1228   * @exception SQLException if an error occurs
1229   */

1230  public final void begin(TransactionMetaData tm) throws SQLException JavaDoc
1231  {
1232  }
1233
1234  /**
1235   * Commits a transaction.
1236   *
1237   * @param tm the transaction marker metadata
1238   * @exception SQLException if an error occurs
1239   */

1240  public void commit(TransactionMetaData tm) throws SQLException JavaDoc
1241  {
1242    long tid = tm.getTransactionId();
1243
1244    // Ordering for distributed virtual database
1245
boolean canTakeReadLock = false;
1246    DistributedCommit totalOrderCommit = null;
1247    if (vdb.getTotalOrderQueue() != null)
1248    {
1249      // Total ordering mainly for distributed virtual databases.
1250
// If waitForTotalOrder returns true then the query has been scheduled in
1251
// total order and there is no need to take a write lock later to resolve
1252
// potential conflicts.
1253
totalOrderCommit = new DistributedCommit(tm.getLogin(), tid);
1254      canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1255      if (!canTakeReadLock)
1256        // This is a local commit no total order info
1257
totalOrderCommit = null;
1258    }
1259
1260    // Acquire the lock
1261
int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderCommit,
1262        "commit " + tid);
1263
1264    List JavaDoc commitList = getBackendsWithStartedTransaction(new Long JavaDoc(tid),
1265        nbOfThreads);
1266
1267    int nbOfThreadsToCommit = commitList.size();
1268    CommitTask task = null;
1269    if (nbOfThreadsToCommit != 0)
1270    {
1271      task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1272          nbOfThreadsToCommit, tm);
1273    }
1274    // FIXME waht if nbOfThreadsToCommit == 0?
1275

1276    // Post the task in the non-conflicting queues.
1277
synchronized (enabledBackends)
1278    {
1279      for (int i = 0; i < nbOfThreadsToCommit; i++)
1280      {
1281        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1282        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1283      }
1284    }
1285
1286    // Release the lock
1287
backendListLock.releaseRead();
1288
1289    // Unblock next query from total order queue
1290
if (totalOrderCommit != null)
1291      removeObjectFromAndNotifyTotalOrderQueue(totalOrderCommit);
1292
1293    waitForCommit(task, tm.getTimeout());
1294  }
1295
1296  /**
1297   * Rollbacks a transaction.
1298   *
1299   * @param tm the transaction marker metadata
1300   * @exception SQLException if an error occurs
1301   */

1302  public void rollback(TransactionMetaData tm) throws SQLException JavaDoc
1303  {
1304    long tid = tm.getTransactionId();
1305
1306    // Ordering for distributed virtual database
1307
DistributedRollback totalOrderRollback = null;
1308    boolean canTakeReadLock = false;
1309    if (vdb.getTotalOrderQueue() != null)
1310    {
1311      totalOrderRollback = new DistributedRollback(tm.getLogin(), tid);
1312      // Total ordering mainly for distributed virtual databases.
1313
// If waitForTotalOrder returns true then the query has been scheduled in
1314
// total order and there is no need to take a write lock later to resolve
1315
// potential conflicts.
1316
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1317      if (!canTakeReadLock)
1318        // This is a local rollback no total order info
1319
totalOrderRollback = null;
1320    }
1321
1322    // Acquire the lock
1323
int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRollback,
1324        "rollback " + tid);
1325
1326    // Build the list of backends that need to rollback this transaction
1327
List JavaDoc rollbackList = getBackendsWithStartedTransaction(new Long JavaDoc(tid),
1328        nbOfThreads);
1329
1330    int nbOfThreadsToRollback = rollbackList.size();
1331    RollbackTask task = null;
1332    if (nbOfThreadsToRollback != 0)
1333      task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1334          nbOfThreadsToRollback, tm);
1335
1336    // Post the task in the non-conflicting queues.
1337
synchronized (enabledBackends)
1338    {
1339      for (int i = 0; i < nbOfThreadsToRollback; i++)
1340      {
1341        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1342        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1343      }
1344    }
1345
1346    // Release the lock
1347
backendListLock.releaseRead();
1348
1349    // Unblock next query from total order queue
1350
if (totalOrderRollback != null)
1351      removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1352
1353    waitForRollback(task, tm.getTimeout());
1354  }
1355
1356  /**
1357   * Rollback a transaction to a savepoint
1358   *
1359   * @param tm The transaction marker metadata
1360   * @param savepointName The name of the savepoint
1361   * @throws SQLException if an error occurs
1362   */

1363  public void rollbackToSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1364      throws SQLException JavaDoc
1365  {
1366    long tid = tm.getTransactionId();
1367
1368    // Ordering for distributed virtual database
1369
DistributedRollbackToSavepoint totalOrderRollback = null;
1370    boolean canTakeReadLock = false;
1371    if (vdb.getTotalOrderQueue() != null)
1372    {
1373      totalOrderRollback = new DistributedRollbackToSavepoint(tid,
1374          savepointName);
1375      // Total ordering mainly for distributed virtual databases.
1376
// If waitForTotalOrder returns true then the query has been scheduled in
1377
// total order and there is no need to take a write lock later to resolve
1378
// potential conflicts.
1379
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1380      if (!canTakeReadLock)
1381        // This is a local commit no total order info
1382
totalOrderRollback = null;
1383    }
1384
1385    // Acquire the lock
1386
int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRollback,
1387        "rollback " + savepointName + " " + tid);
1388
1389    // Build the list of backends that need to rollback this transaction
1390
List JavaDoc rollbackList = getBackendsWithStartedTransaction(new Long JavaDoc(tid),
1391        nbOfThreads);
1392
1393    int nbOfThreadsToRollback = rollbackList.size();
1394    RollbackToSavepointTask task = null;
1395    if (nbOfThreadsToRollback != 0)
1396      task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback),
1397          nbOfThreadsToRollback, tm, savepointName);
1398
1399    // Post the task in the non-conflicting queues.
1400
synchronized (enabledBackends)
1401    {
1402      for (int i = 0; i < nbOfThreadsToRollback; i++)
1403      {
1404        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1405        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1406      }
1407    }
1408
1409    // Release the lock
1410
backendListLock.releaseRead();
1411
1412    // Unblock next query from total order queue
1413
if (totalOrderRollback != null)
1414      removeObjectFromAndNotifyTotalOrderQueue(totalOrderRollback);
1415
1416    waitForRollbackToSavepoint(task, savepointName, tm.getTimeout());
1417  }
1418
1419  /**
1420   * Release a savepoint from a transaction
1421   *
1422   * @param tm The transaction marker metadata
1423   * @param savepointName The name of the savepoint ro release
1424   * @throws SQLException if an error occurs
1425   */

1426  public void releaseSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1427      throws SQLException JavaDoc
1428  {
1429    long tid = tm.getTransactionId();
1430
1431    // Ordering for distributed virtual database
1432
DistributedReleaseSavepoint totalOrderRelease = null;
1433    boolean canTakeReadLock = false;
1434    if (vdb.getTotalOrderQueue() != null)
1435    {
1436      totalOrderRelease = new DistributedReleaseSavepoint(tid, savepointName);
1437      // Total ordering mainly for distributed virtual databases.
1438
// If waitForTotalOrder returns true then the query has been scheduled in
1439
// total order and there is no need to take a write lock later to resolve
1440
// potential conflicts.
1441
canTakeReadLock = waitForTotalOrder(totalOrderRelease, false);
1442      if (!canTakeReadLock)
1443        // This is a local commit no total order info
1444
totalOrderRelease = null;
1445    }
1446
1447    // Acquire the lock
1448
int nbOfThreads = acquireLockAndCheckNbOfThreads(totalOrderRelease,
1449        "release savepoint " + savepointName + " " + tid);
1450
1451    // Build the list of backends that need to rollback this transaction
1452
List JavaDoc savepointList = getBackendsWithStartedTransaction(new Long JavaDoc(tid),
1453        nbOfThreads);
1454
1455    int nbOfSavepoints = savepointList.size();
1456    ReleaseSavepointTask task = null;
1457    if (nbOfSavepoints != 0)
1458      task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads,
1459          tm, savepointName);
1460
1461    // Post the task in the non-conflicting queues.
1462
synchronized (enabledBackends)
1463    {
1464      for (int i = 0; i < nbOfSavepoints; i++)
1465      {
1466        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1467        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1468      }
1469    }
1470
1471    // Release the lock
1472
backendListLock.releaseRead();
1473
1474    // Unblock next query from total order queue
1475
if (totalOrderRelease != null)
1476      removeObjectFromAndNotifyTotalOrderQueue(totalOrderRelease);
1477
1478    waitForReleaseSavepoint(task, savepointName, tm.getTimeout());
1479  }
1480
1481  /**
1482   * Set a savepoint to a transaction.
1483   *
1484   * @param tm The transaction marker metadata
1485   * @param savepointName The name of the new savepoint
1486   * @throws SQLException if an error occurs
1487   */

1488  public void setSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1489      throws SQLException JavaDoc
1490  {
1491    long tid = tm.getTransactionId();
1492
1493    // Ordering for distributed virtual database
1494
DistributedSetSavepoint totalOrderSavepoint = null;
1495    boolean canTakeReadLock = false;
1496    if (vdb.getTotalOrderQueue() != null)
1497    {
1498      totalOrderSavepoint = new DistributedSetSavepoint(tm.getLogin(), tid,
1499          savepointName);
1500      // Total ordering mainly for distributed virtual databases.
1501
// If waitForTotalOrder returns true then the query has been scheduled in
1502
// total order and there is no need to take a write lock later to resolve
1503
// potential conflicts.
1504
canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false);
1505      if (!canTakeReadLock)
1506        // This is a local commit no total order info
1507
totalOrderSavepoint = null;
1508    }
1509
1510    // Update the recovery log
1511
if (recoveryLog != null)
1512      recoveryLog.logSetSavepoint(tm, savepointName);
1513
1514    // Acquire the lock
1515
String JavaDoc requestDescription = "set savepoint " + savepointName + " " + tid;
1516    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1517
1518    SavepointTask task = null;
1519
1520    // Post the task in the non-conflicting queues of all backends.
1521
synchronized (enabledBackends)
1522    {
1523      if (nbOfThreads != 0)
1524      {
1525        task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm,
1526            savepointName);
1527        for (int i = 0; i < nbOfThreads; i++)
1528        {
1529          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1530          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1531        }
1532      }
1533    }
1534
1535    // Release the lock
1536
backendListLock.releaseRead();
1537
1538    // Unblock next query from total order queue
1539
if (totalOrderSavepoint != null)
1540      removeObjectFromAndNotifyTotalOrderQueue(totalOrderSavepoint);
1541
1542    waitForSavepoint(task, savepointName, tm.getTimeout());
1543  }
1544
1545  /**
1546   * Wait for a <code>CommitTask</code> to complete
1547   *
1548   * @param task the <code>CommitTask</code>
1549   * @param timeout the timeout (in milliseconds) for task completion
1550   * @throws SQLException if an error occurs while completing the commit task
1551   */

1552  // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1553
// FIXME the timeout is a field of the CommitTask but the getter is not
1554
// defined
1555
private void waitForCommit(CommitTask task, long timeout) throws SQLException JavaDoc
1556  {
1557    // Check if someone had something to commit
1558
if (task == null)
1559      return;
1560
1561    long tid = task.getTransactionId();
1562
1563    synchronized (task)
1564    {
1565      if (!task.hasCompleted())
1566        waitForTaskCompletion(timeout, "commit " + tid, task);
1567
1568      if (task.getSuccess() == 0)
1569      { // All tasks failed
1570
List JavaDoc exceptions = task.getExceptions();
1571        if (exceptions == null)
1572          throw new SQLException JavaDoc(Translate.get(
1573              "loadbalancer.commit.all.failed", tid));
1574        else
1575        {
1576          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1577              Translate.get("loadbalancer.commit.failed.stack", tid) + "\n");
1578          logger.error(ex.getMessage());
1579          throw ex;
1580        }
1581      }
1582    }
1583  }
1584
1585  /**
1586   * Wait for a <code>RollbackTask</code> to complete
1587   *
1588   * @param task the <code>RollbackTask</code>
1589   * @param timeout the timeout (in milliseconds) for task completion
1590   * @throws SQLException if an error occurs while completing the rollback task
1591   */

1592  // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1593
// FIXME the timeout is a field of the RollbacTask but the getter is not
1594
// defined
1595
private void waitForRollback(RollbackTask task, long timeout)
1596      throws SQLException JavaDoc
1597  {
1598    // Check if someone had something to rollback
1599
if (task == null)
1600      return;
1601
1602    long tid = task.getTransactionId();
1603
1604    synchronized (task)
1605    {
1606      if (!task.hasCompleted())
1607        waitForTaskCompletion(timeout, "rollback " + tid, task);
1608
1609      if (task.getSuccess() == 0)
1610      { // All tasks failed
1611
List JavaDoc exceptions = task.getExceptions();
1612        if (exceptions == null)
1613          throw new SQLException JavaDoc(Translate.get(
1614              "loadbalancer.rollback.all.failed", tid));
1615        else
1616        {
1617          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1618              Translate.get("loadbalancer.rollback.failed.stack", tid) + "\n");
1619          logger.error(ex.getMessage());
1620          throw ex;
1621        }
1622      }
1623    }
1624  }
1625
1626  /**
1627   * Wait for a <code>RollbackToSavepointTask</code> to complete
1628   *
1629   * @param task the <code>RollbackToSavepointTask</code>
1630   * @param savepoint the savepoint name
1631   * @param timeout the timeout (in milliseconds) for task completion
1632   * @throws SQLException if an error occurs while completing the rollback to
1633   * savepoint task
1634   */

1635  // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1636
// FIXME the timeout is a field of the RollbackToSavepointTask but the getter
1637
// is not defined
1638
// FIXME the savepoint is a field of the RollbackToSavepointTask but the
1639
// getter is not defined
1640
private void waitForRollbackToSavepoint(RollbackToSavepointTask task,
1641      String JavaDoc savepoint, long timeout) throws SQLException JavaDoc
1642  {
1643    // Check if someone had something to rollback
1644
if (task == null)
1645      return;
1646
1647    long tid = task.getTransactionId();
1648
1649    synchronized (task)
1650    {
1651      if (!task.hasCompleted())
1652        waitForTaskCompletion(timeout, "rollback " + savepoint + " " + tid,
1653            task);
1654
1655      if (task.getSuccess() == 0)
1656      { // All tasks failed
1657
List JavaDoc exceptions = task.getExceptions();
1658        if (exceptions == null)
1659          throw new SQLException JavaDoc(Translate.get(
1660              "loadbalancer.rollbacksavepoint.all.failed", savepoint, String
1661                  .valueOf(tid)));
1662        else
1663        {
1664          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1665              Translate.get("loadbalancer.rollbacksavepoint.failed.stack",
1666                  savepoint, String.valueOf(tid))
1667                  + "\n");
1668          logger.error(ex.getMessage());
1669          throw ex;
1670        }
1671      }
1672    }
1673  }
1674
1675  /**
1676   * Wait for a <code>ReleaseSavepointTask</code> to complete
1677   *
1678   * @param task the <code>ReleaseSavepointTask</code>
1679   * @param savepoint the savepoint name
1680   * @param timeout the timeout (in milliseconds) for task completion
1681   * @throws SQLException if an error occurs while completing the release
1682   * savepoint task
1683   */

1684  // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1685
// FIXME the timeout is a field of the ReleaseSavepointTask but the getter is
1686
// not defined
1687
// FIXME the savepoint is a field of the ReleaseSavepointTask but the getter
1688
// is not defined
1689
private void waitForReleaseSavepoint(ReleaseSavepointTask task,
1690      String JavaDoc savepoint, long timeout) throws SQLException JavaDoc
1691  {
1692    // Check if someone had something to release
1693
if (task == null)
1694      return;
1695
1696    long tid = task.getTransactionId();
1697
1698    synchronized (task)
1699    {
1700      if (!task.hasCompleted())
1701        waitForTaskCompletion(timeout, "release savepoint " + savepoint + " "
1702            + tid, task);
1703
1704      if (task.getSuccess() == 0)
1705      { // All tasks failed
1706
List JavaDoc exceptions = task.getExceptions();
1707        if (exceptions == null)
1708          throw new SQLException JavaDoc(Translate.get(
1709              "loadbalancer.releasesavepoint.all.failed", savepoint, String
1710                  .valueOf(tid)));
1711        else
1712        {
1713          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1714              Translate.get("loadbalancer.releasesavepoint.failed.stack",
1715                  savepoint, String.valueOf(tid))
1716                  + "\n");
1717          logger.error(ex.getMessage());
1718          throw ex;
1719        }
1720      }
1721    }
1722  }
1723
1724  /**
1725   * Wait for a <code>SavepointTask</code> to complete
1726   *
1727   * @param task the <code>SavepointTask</code>
1728   * @param savepoint the savepoint name
1729   * @param timeout the timeout (in milliseconds) for task completion
1730   * @throws SQLException if an error occurs while completing the savepoint task
1731   */

1732  // TODO method can be reused for RAIDb0, RAIDb1 and RAIDb2
1733
// FIXME the timeout is a field of the SavepointTask but the getter is not
1734
// defined
1735
// FIXME the savepoint is a field of the SavepointTask but the getter is not
1736
// defined
1737
private void waitForSavepoint(SavepointTask task, String JavaDoc savepoint,
1738      long timeout) throws SQLException JavaDoc
1739  {
1740    // Check if someone had something to release
1741
if (task == null)
1742      return;
1743
1744    long tid = task.getTransactionId();
1745
1746    synchronized (task)
1747    {
1748      if (!task.hasCompleted())
1749        waitForTaskCompletion(timeout,
1750            "set savepoint " + savepoint + " " + tid, task);
1751
1752      if (task.getSuccess() == 0)
1753      { // All tasks failed
1754
List JavaDoc exceptions = task.getExceptions();
1755        if (exceptions == null)
1756          throw new SQLException JavaDoc(Translate.get(
1757              "loadbalancer.setsavepoint.all.failed", savepoint, String
1758                  .valueOf(tid)));
1759        else
1760        {
1761          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1762              Translate.get("loadbalancer.setsavepoint.failed.stack",
1763                  savepoint, String.valueOf(tid))
1764                  + "\n");
1765          logger.error(ex.getMessage());
1766          throw ex;
1767        }
1768      }
1769    }
1770  }
1771
1772  //
1773
// Utility functions
1774
//
1775
/**
1776   * Check in which queue the task should be posted and atomically posts the
1777   * task in the queue of all backends.
1778   *
1779   * @param task the task to post
1780   * @param writeList backend list to post the task to
1781   * @param removeFromTotalOrderQueue true if the query must be removed from the
1782   * total order queue
1783   */

1784  private void atomicTaskPostInQueueAndReleaseLock(AbstractRequest request,
1785      AbstractTask task, List JavaDoc writeList, boolean removeFromTotalOrderQueue)
1786  {
1787    synchronized (enabledBackends)
1788    {
1789      int nbOfThreads = writeList.size();
1790      for (int i = 0; i < nbOfThreads; i++)
1791      {
1792        BackendTaskQueues queues = ((DatabaseBackend) writeList.get(i))
1793            .getTaskQueues();
1794        queues.addTaskToBackendTotalOrderQueue(task);
1795      }
1796    }
1797
1798    backendListLock.releaseRead();
1799
1800    // Unblock next query from total order queue
1801
if (removeFromTotalOrderQueue)
1802    {
1803      removeObjectFromAndNotifyTotalOrderQueue(request);
1804    }
1805  }
1806
1807  /**
1808   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1809   * long)
1810   */

1811  public void closePersistentConnection(String JavaDoc login,
1812      long persistentConnectionId) throws SQLException JavaDoc
1813  {
1814    /*
1815     * We assume a synchronous execution and connection closing can only come
1816     * after all requests have been executed in that connection. We post to all
1817     * backends and let the task deal with whether that backend had a persistent
1818     * connection or not.
1819     */

1820
1821    String JavaDoc requestDescription = "closing persistent connection "
1822        + persistentConnectionId;
1823    int nbOfThreads = 0;
1824
1825    DistributedClosePersistentConnection totalOrderQueueObject = null;
1826
1827    boolean removefromTotalOrder = false;
1828    if (vdb.getTotalOrderQueue() != null)
1829    {
1830      totalOrderQueueObject = new DistributedClosePersistentConnection(login,
1831          persistentConnectionId);
1832      removefromTotalOrder = waitForTotalOrder(totalOrderQueueObject, true);
1833    }
1834
1835    ClosePersistentConnectionTask task = null;
1836    try
1837    {
1838      nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1839
1840      task = new ClosePersistentConnectionTask(getNbToWait(nbOfThreads),
1841          nbOfThreads, login, persistentConnectionId);
1842
1843      // Post the task in the non-conflicting queues.
1844
synchronized (enabledBackends)
1845      {
1846        for (int i = 0; i < nbOfThreads; i++)
1847        {
1848          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1849          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1850        }
1851      }
1852
1853      // Release the lock
1854
backendListLock.releaseRead();
1855
1856      if (removefromTotalOrder)
1857        removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1858      totalOrderQueueObject = null;
1859
1860      synchronized (task)
1861      {
1862        if (!task.hasCompleted())
1863          try
1864          {
1865            waitForTaskCompletion(0, requestDescription, task);
1866          }
1867          catch (SQLException JavaDoc ignore)
1868          {
1869          }
1870      }
1871    }
1872    finally
1873    {
1874      if (totalOrderQueueObject != null)
1875      { // NoMoreBackendException occured
1876
removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1877      }
1878
1879      if (logger.isDebugEnabled())
1880        logger.debug(requestDescription + " completed on " + nbOfThreads
1881            + " backends.");
1882    }
1883  }
1884
1885  /**
1886   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1887   * long)
1888   */

1889  public void openPersistentConnection(String JavaDoc login, long persistentConnectionId)
1890      throws SQLException JavaDoc
1891  {
1892    String JavaDoc requestDescription = "opening persistent connection "
1893        + persistentConnectionId;
1894    int nbOfThreads = 0;
1895
1896    DistributedOpenPersistentConnection totalOrderQueueObject = null;
1897    if (vdb.getTotalOrderQueue() != null)
1898    {
1899      totalOrderQueueObject = new DistributedOpenPersistentConnection(login,
1900          persistentConnectionId);
1901      waitForTotalOrder(totalOrderQueueObject, true);
1902    }
1903
1904    OpenPersistentConnectionTask task = null;
1905    try
1906    {
1907      nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1908
1909      task = new OpenPersistentConnectionTask(getNbToWait(nbOfThreads),
1910          nbOfThreads, login, persistentConnectionId);
1911
1912      // Post the task in the non-conflicting queues.
1913
synchronized (enabledBackends)
1914      {
1915        for (int i = 0; i < nbOfThreads; i++)
1916        {
1917          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1918          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1919        }
1920      }
1921
1922      // Release the lock
1923
backendListLock.releaseRead();
1924
1925      removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1926      totalOrderQueueObject = null;
1927
1928      synchronized (task)
1929      {
1930        if (!task.hasCompleted())
1931          try
1932          {
1933            waitForTaskCompletion(0, requestDescription, task);
1934          }
1935          catch (SQLException JavaDoc ignore)
1936          {
1937          }
1938      }
1939    }
1940    finally
1941    {
1942      if (totalOrderQueueObject != null)
1943      { // NoMoreBackendException occured
1944
removeObjectFromAndNotifyTotalOrderQueue(totalOrderQueueObject);
1945      }
1946
1947      if (logger.isDebugEnabled())
1948        logger.debug(requestDescription + " completed on " + nbOfThreads
1949            + " backends.");
1950    }
1951  }
1952
1953  /**
1954   * Enables a Backend that was previously disabled.
1955   * <p>
1956   * Ask the corresponding connection manager to initialize the connections if
1957   * needed.
1958   * <p>
1959   * No sanity checks are performed by this function.
1960   *
1961   * @param db the database backend to enable
1962   * @param writeEnabled True if the backend must be enabled for writes
1963   * @throws SQLException if an error occurs
1964   */

1965  public synchronized void enableBackend(DatabaseBackend db,
1966      boolean writeEnabled) throws SQLException JavaDoc
1967  {
1968    if (!db.isInitialized())
1969      db.initializeConnections();
1970
1971    if (writeEnabled && db.isWriteCanBeEnabled())
1972    {
1973      // Create the new backend task queues
1974
db.setTaskQueues(new BackendTaskQueues(db, waitForCompletionPolicy,
1975          this.vdb.getRequestManager()));
1976      db.startWorkerThreads(this);
1977      db.enableWrite();
1978    }
1979
1980    db.enableRead();
1981    try
1982    {
1983      backendListLock.acquireWrite();
1984    }
1985    catch (InterruptedException JavaDoc e)
1986    {
1987      logger.error("Error while acquiring write lock in enableBackend", e);
1988    }
1989    synchronized (enabledBackends)
1990    {
1991      enabledBackends.add(db);
1992    }
1993    backendListLock.releaseWrite();
1994  }
1995
1996  /**
1997   * Disables a backend that was previously enabled.
1998   * <p>
1999   * Ask the corresponding connection manager to finalize the connections if
2000   * needed.
2001   * <p>
2002   * No sanity checks are performed by this function.
2003   *
2004   * @param db the database backend to disable
2005   * @param forceDisable true if disabling must be forced on the backend
2006   * @throws SQLException if an error occurs
2007   */

2008  public void disableBackend(DatabaseBackend db, boolean forceDisable)
2009      throws SQLException JavaDoc
2010  {
2011    if (!db.disable())
2012    {
2013      // Another thread has already started the disable process
2014
return;
2015    }
2016    synchronized (this)
2017    {
2018      try
2019      {
2020        backendListLock.acquireWrite();
2021      }
2022      catch (InterruptedException JavaDoc e)
2023      {
2024        logger.error("Error while acquiring write lock in enableBackend", e);
2025      }
2026
2027      try
2028      {
2029        synchronized (enabledBackends)
2030        {
2031          enabledBackends.remove(db);
2032          if (enabledBackends.isEmpty())
2033          {
2034            // Cleanup schema for any remaining locks
2035
this.vdb.getRequestManager().setDatabaseSchema(null);
2036          }
2037        }
2038
2039        if (!forceDisable)
2040          terminateThreadsAndConnections(db);
2041      }
2042      finally
2043      {
2044        backendListLock.releaseWrite();
2045      }
2046
2047      if (forceDisable)
2048      {
2049        db.shutdownConnectionManagers();
2050        terminateThreadsAndConnections(db);
2051      }
2052
2053      // sanity check on backend's active transaction
2054
if (!db.getActiveTransactions().isEmpty())
2055      {
2056        if (logger.isWarnEnabled())
2057        {
2058          logger.warn("Active transactions after backend " + db.getName()
2059              + " is disabled: " + db.getActiveTransactions());
2060        }
2061      }
2062
2063    }
2064  }
2065
2066  private void terminateThreadsAndConnections(DatabaseBackend db)
2067      throws SQLException JavaDoc
2068  {
2069    db.terminateWorkerThreads();
2070
2071    if (db.isInitialized())
2072      db.finalizeConnections();
2073  }
2074
2075  /**
2076   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2077   */

2078  public String JavaDoc getXmlImpl()
2079  {
2080    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
2081    info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2082    if (createTablePolicy != null)
2083      info.append(createTablePolicy.getXml());
2084    if (waitForCompletionPolicy != null)
2085      info.append(waitForCompletionPolicy.getXml());
2086    if (macroHandler != null)
2087      info.append(macroHandler.getXml());
2088    this.getRaidb2Xml();
2089    info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2090    return info.toString();
2091  }
2092
2093  /**
2094   * return xml formatted information about this raidb2 load balancer
2095   *
2096   * @return xml formatted string
2097   */

2098  public abstract String JavaDoc getRaidb2Xml();
2099
2100}
Popular Tags