KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > raidb2 > RAIDb2


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Jean-Bernard van Zuylen.
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.ConcurrentModificationException JavaDoc;
31 import java.util.Iterator JavaDoc;
32
33 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
34 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
35 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory;
36 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
37 import org.objectweb.cjdbc.common.i18n.Translate;
38 import org.objectweb.cjdbc.common.log.Trace;
39 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
40 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
41 import org.objectweb.cjdbc.common.sql.SelectRequest;
42 import org.objectweb.cjdbc.common.sql.StoredProcedure;
43 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
44 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
45 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
46 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
47 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
48 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
49 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
50 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
51 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
52 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
53 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
54 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
57 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
58 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
59 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask;
60 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
61 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask;
62 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask;
63 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
64 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
65 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
66 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
67 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
68 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
69 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReleaseSavepoint;
72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
73 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.RollbackToSavepoint;
74 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetSavepoint;
75
76 /**
77  * RAIDb-2 load balancer.
78  * <p>
79  * This class is an abstract call because the read requests coming from the
80  * Request Manager are NOT treated here but in the subclasses. Transaction
81  * management and write requests are broadcasted to all backends owning the
82  * written table.
83  *
84  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
85  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
86  * </a>
87  * @version 1.0
88  */

89 public abstract class RAIDb2 extends AbstractLoadBalancer
90 {
91   //
92
// How the code is organized ?
93
// 1. Member variables
94
// 2. Constructor(s)
95
// 3. Request handling
96
// 4. Transaction handling
97
// 5. Backend management
98
//
99

100   protected ArrayList JavaDoc backendBlockingThreads;
101   protected ArrayList JavaDoc backendNonBlockingThreads;
102   protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
103   protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
104   // Should we wait for all backends to commit before returning ?
105
protected WaitForCompletionPolicy waitForCompletionPolicy;
106   protected CreateTablePolicy createTablePolicy;
107
108   protected static Trace logger = Trace
109                                                                             .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2");
110
111   /*
112    * Constructors
113    */

114
115   /**
116    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
117    * worker thread is created for each backend.
118    *
119    * @param vdb the virtual database this load balancer belongs to.
120    * @param waitForCompletionPolicy how many backends must complete before
121    * returning the result ?
122    * @param createTablePolicy the policy defining how 'create table' statements
123    * should be handled
124    * @exception Exception if an error occurs
125    */

126   public RAIDb2(VirtualDatabase vdb,
127       WaitForCompletionPolicy waitForCompletionPolicy,
128       CreateTablePolicy createTablePolicy) throws Exception JavaDoc
129   {
130     super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
131
132     this.waitForCompletionPolicy = waitForCompletionPolicy;
133     backendBlockingThreads = new ArrayList JavaDoc();
134     backendNonBlockingThreads = new ArrayList JavaDoc();
135     this.createTablePolicy = createTablePolicy;
136   }
137
138   /*
139    * Request Handling
140    */

141
142   /**
143    * Returns the number of nodes to wait for according to the defined
144    * <code>waitForCompletion</code> policy.
145    *
146    * @param nbOfThreads total number of threads
147    * @return int number of threads to wait for
148    */

149   private int getNbToWait(int nbOfThreads)
150   {
151     int nbToWait;
152     switch (waitForCompletionPolicy.getPolicy())
153     {
154       case WaitForCompletionPolicy.FIRST :
155         nbToWait = 1;
156         break;
157       case WaitForCompletionPolicy.MAJORITY :
158         nbToWait = nbOfThreads / 2 + 1;
159         break;
160       case WaitForCompletionPolicy.ALL :
161         nbToWait = nbOfThreads;
162         break;
163       default :
164         logger
165             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
166         nbToWait = nbOfThreads;
167         break;
168     }
169     return nbToWait;
170   }
171
172   /**
173    * Performs a write request. This request is broadcasted to all nodes that
174    * owns the table to be written.
175    *
176    * @param request an <code>AbstractWriteRequest</code>
177    * @return number of rows affected by the request
178    * @throws AllBackendsFailedException if all backends failed to execute the
179    * request
180    * @exception SQLException if an error occurs
181    */

182   public int execWriteRequest(AbstractWriteRequest request)
183       throws AllBackendsFailedException, SQLException JavaDoc
184   {
185     return ((WriteRequestTask) execWriteRequest(request, false, null))
186         .getResult();
187   }
188
189   /**
190    * Perform a write request and return the auto generated keys.
191    *
192    * @param request the request to execute
193    * @param metadataCache the metadataCache if any or null
194    * @return auto generated keys.
195    * @throws AllBackendsFailedException if all backends failed to execute the
196    * request
197    * @exception SQLException if an error occurs
198    */

199   public ControllerResultSet execWriteRequestWithKeys(
200       AbstractWriteRequest request, MetadataCache metadataCache)
201       throws AllBackendsFailedException, SQLException JavaDoc
202   {
203     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
204         metadataCache)).getResult();
205   }
206
207   /**
208    * Common code for execWriteRequest(AbstractWriteRequest) and
209    * execWriteRequestWithKeys(AbstractWriteRequest).
210    * <p>
211    * Note that macros are processed here.
212    * <p>
213    * The result is given back in AbstractTask.getResult().
214    *
215    * @param request the request to execute
216    * @param useKeys true if this must give an auto generated keys ResultSet
217    * @param metadataCache the metadataCache if any or null
218    * @throws AllBackendsFailedException if all backends failed to execute the
219    * request
220    * @throws SQLException if an error occurs
221    */

222   private AbstractTask execWriteRequest(AbstractWriteRequest request,
223       boolean useKeys, MetadataCache metadataCache)
224       throws AllBackendsFailedException, SQLException JavaDoc
225   {
226     ArrayList JavaDoc backendThreads;
227     ReadPrioritaryFIFOWriteLock lock;
228
229     // Total ordering mainly for distributed virtual databases.
230
// If waitForTotalOrder returns true then the query has been scheduled in
231
// total order and there is no need to take a write lock later to resolve
232
// potential conflicts.
233
boolean canTakeReadLock = waitForTotalOrder(request, true);
234
235     // Handle macros
236
handleMacros(request);
237
238     // Determine which list (blocking or not) to use
239
if (request.mightBlock())
240     { // Blocking
241
backendThreads = backendBlockingThreads;
242       lock = backendBlockingThreadsRWLock;
243     }
244     else
245     { // Non-blocking
246
backendThreads = backendNonBlockingThreads;
247       lock = backendNonBlockingThreadsRWLock;
248       if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
249           && (request.getTransactionId() != 0))
250         waitForAllWritesToComplete(request.getTransactionId());
251     }
252
253     try
254     {
255       if (canTakeReadLock)
256         lock.acquireRead();
257       else
258         lock.acquireWrite();
259     }
260     catch (InterruptedException JavaDoc e)
261     {
262       String JavaDoc msg = Translate.get(
263           "loadbalancer.backendlist.acquire.writelock.failed", e);
264       logger.error(msg);
265       throw new SQLException JavaDoc(msg);
266     }
267
268     int nbOfThreads = backendThreads.size();
269     ArrayList JavaDoc writeList = new ArrayList JavaDoc();
270     String JavaDoc tableName = request.getTableName();
271
272     if (request.isCreate())
273     { // Choose the backend according to the defined policy
274
CreateTableRule rule = createTablePolicy.getTableRule(request
275           .getTableName());
276       if (rule == null)
277         rule = createTablePolicy.getDefaultRule();
278
279       // Ask the rule to pickup the backends
280
ArrayList JavaDoc chosen;
281       try
282       {
283         chosen = rule.getBackends(vdb.getBackends());
284       }
285       catch (CreateTableException e)
286       {
287         throw new SQLException JavaDoc(Translate.get(
288             "loadbalancer.create.table.rule.failed", e.getMessage()));
289       }
290
291       // Build the thread list from the backend list
292
if (chosen != null)
293       {
294         for (int i = 0; i < nbOfThreads; i++)
295         {
296           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
297               .get(i);
298           if (chosen.contains(thread.getBackend()))
299             writeList.add(thread);
300         }
301       }
302     }
303     else
304     { // Build the list of backends that need to execute this request
305
for (int i = 0; i < nbOfThreads; i++)
306       {
307         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
308             .get(i);
309         if (thread.getBackend().hasTable(tableName))
310           writeList.add(thread);
311       }
312     }
313
314     nbOfThreads = writeList.size();
315     if (nbOfThreads == 0)
316     {
317       if (canTakeReadLock)
318         lock.releaseRead();
319       else
320         lock.releaseWrite();
321
322       String JavaDoc msg = Translate.get("loadbalancer.execute.no.backend.found",
323           request.getSQLShortForm(vdb.getSQLShortFormLength()));
324       logger.warn(msg);
325
326       // Unblock next query from total order queue
327
removeHeadFromAndNotifyTotalOrderQueue();
328       throw new SQLException JavaDoc(msg);
329     }
330     else
331     {
332       if (logger.isDebugEnabled())
333         logger.debug(Translate.get("loadbalancer.execute.on.several",
334             new String JavaDoc[]{String.valueOf(request.getId()),
335                 String.valueOf(nbOfThreads)}));
336     }
337
338     // Create the task
339
AbstractTask task;
340     if (useKeys)
341       task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
342           nbOfThreads, request, metadataCache);
343     else
344       task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
345           request);
346
347     // We have to first post the request on each backend before letting the
348
// first backend to execute the request. Therefore we have 2 phases:
349
// 1. post the task in each thread queue
350
// 2. notify each thread to execute the query
351

352     // 1. Post the task
353
if (request.isAutoCommit())
354     {
355       for (int i = 0; i < nbOfThreads; i++)
356       {
357         BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
358         synchronized (thread)
359         {
360           thread.addTask(task);
361         }
362       }
363     }
364     else
365     {
366       for (int i = 0; i < nbOfThreads; i++)
367       {
368         BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
369         synchronized (thread)
370         {
371           thread.addTask(task, request.getTransactionId());
372         }
373       }
374     }
375
376     // 2. Start the task execution on each backend
377
for (int i = 0; i < nbOfThreads; i++)
378     {
379       BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
380       synchronized (thread)
381       {
382         thread.notify();
383       }
384     }
385
386     // Release lock
387
if (canTakeReadLock)
388       lock.releaseRead();
389     else
390       lock.releaseWrite();
391
392     // Unblock next query from total order queue
393
removeHeadFromAndNotifyTotalOrderQueue();
394
395     synchronized (task)
396     {
397       if (!task.hasCompleted())
398       {
399         // Wait for completion (notified by the task)
400
try
401         {
402           // Wait on task
403
long timeout = request.getTimeout() * 1000;
404           if (timeout > 0)
405           {
406             long start = System.currentTimeMillis();
407             task.wait(timeout);
408             long end = System.currentTimeMillis();
409             long remaining = timeout - (end - start);
410             if (remaining <= 0)
411             {
412               if (task.setExpiredTimeout())
413               { // Task will be ignored by all backends
414
String JavaDoc msg = Translate.get("loadbalancer.request.timeout",
415                     new String JavaDoc[]{String.valueOf(request.getId()),
416                         String.valueOf(task.getSuccess()),
417                         String.valueOf(task.getFailed())});
418                 logger.warn(msg);
419                 throw new SQLException JavaDoc(msg);
420               }
421               // else task execution already started, to late to cancel
422
}
423             // No need to update request timeout since the execution is finished
424
}
425           else
426             task.wait();
427         }
428         catch (InterruptedException JavaDoc e)
429         {
430           if (task.setExpiredTimeout())
431           { // Task will be ignored by all backends
432
String JavaDoc msg = Translate.get("loadbalancer.request.timeout",
433                 new String JavaDoc[]{String.valueOf(request.getId()),
434                     String.valueOf(task.getSuccess()),
435                     String.valueOf(task.getFailed())});
436             logger.warn(msg);
437             throw new SQLException JavaDoc(msg);
438           }
439           // else task execution already started, to late to cancel
440
}
441       }
442
443       if (task.getSuccess() > 0)
444         return task;
445       else
446       { // All tasks failed
447
ArrayList JavaDoc exceptions = task.getExceptions();
448         if (exceptions == null)
449           throw new AllBackendsFailedException(Translate.get(
450               "loadbalancer.request.failed.all", request.getId()));
451         else
452         {
453           String JavaDoc errorMsg = Translate.get("loadbalancer.request.failed.stack",
454               request.getId())
455               + "\n";
456           for (int i = 0; i < exceptions.size(); i++)
457             errorMsg += ((SQLException JavaDoc) exceptions.get(i)).getMessage() + "\n";
458           logger.error(errorMsg);
459           throw new SQLException JavaDoc(errorMsg);
460         }
461       }
462     }
463   }
464
465   /**
466    * Implementation specific load balanced read execution.
467    *
468    * @param request an <code>SelectRequest</code>
469    * @param metadataCache the metadataCache if any or null
470    * @return the corresponding <code>java.sql.ResultSet</code>
471    * @exception SQLException if an error occurs
472    */

473   public abstract ControllerResultSet execReadRequest(SelectRequest request,
474       MetadataCache metadataCache) throws SQLException JavaDoc;
475
476   /**
477    * Execute a read request on the selected backend.
478    *
479    * @param request the request to execute
480    * @param backend the backend that will execute the request
481    * @param metadataCache a metadataCache if any or null
482    * @return the ResultSet
483    * @throws SQLException if an error occurs
484    */

485   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
486       DatabaseBackend backend, MetadataCache metadataCache)
487       throws SQLException JavaDoc, UnreachableBackendException
488   {
489     // Handle macros
490
handleMacros(request);
491
492     // Ok, we have a backend, let's execute the request
493
AbstractConnectionManager cm = backend.getConnectionManager(request
494         .getLogin());
495
496     // Sanity check
497
if (cm == null)
498     {
499       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
500           new String JavaDoc[]{request.getLogin(), backend.getName()});
501       logger.error(msg);
502       throw new SQLException JavaDoc(msg);
503     }
504
505     // Execute the query
506
if (request.isAutoCommit())
507     {
508       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
509         // We could do something finer grain here by waiting
510
// only for writes that depend on the tables we need
511
// but is that really worth the overhead ?
512
waitForAllWritesToComplete(backend);
513
514       ControllerResultSet rs = null;
515       boolean badConnection;
516       do
517       {
518         badConnection = false;
519         // Use a connection just for this request
520
Connection JavaDoc c = null;
521         try
522         {
523           c = cm.getConnection();
524         }
525         catch (UnreachableBackendException e1)
526         {
527           logger.error(Translate.get(
528               "loadbalancer.backend.disabling.unreachable", backend.getName()));
529           disableBackend(backend);
530           throw new UnreachableBackendException(Translate.get(
531               "loadbalancer.backend.unreacheable", backend.getName()));
532         }
533
534         // Sanity check
535
if (c == null)
536           throw new UnreachableBackendException(
537               "No more connections on backend " + backend.getName());
538
539         // Execute Query
540
try
541         {
542           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
543           cm.releaseConnection(c);
544         }
545         catch (SQLException JavaDoc e)
546         {
547           cm.releaseConnection(c);
548           throw new SQLException JavaDoc(Translate.get(
549               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
550                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
551                   backend.getName(), e.getMessage()}));
552         }
553         catch (BadConnectionException e)
554         { // Get rid of the bad connection
555
cm.deleteConnection(c);
556           badConnection = true;
557         }
558       }
559       while (badConnection);
560       if (logger.isDebugEnabled())
561         logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
562             String.valueOf(request.getId()), backend.getName()}));
563       return rs;
564     }
565     else
566     { // Inside a transaction
567
Connection JavaDoc c;
568       long tid = request.getTransactionId();
569       Long JavaDoc lTid = new Long JavaDoc(tid);
570
571       // Wait for previous writes to complete
572
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
573         waitForAllWritesToComplete(backend, request.getTransactionId());
574
575       try
576       {
577         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
578             request.getTransactionIsolation());
579       }
580       catch (UnreachableBackendException e1)
581       {
582         logger.error(Translate.get(
583             "loadbalancer.backend.disabling.unreachable", backend.getName()));
584         disableBackend(backend);
585         throw new SQLException JavaDoc(Translate.get(
586             "loadbalancer.backend.unreacheable", backend.getName()));
587       }
588       catch (NoTransactionStartWhenDisablingException e)
589       {
590         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
591             new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
592                 backend.getName()});
593         logger.error(msg);
594         throw new SQLException JavaDoc(msg);
595       }
596
597       // Sanity check
598
if (c == null)
599         throw new SQLException JavaDoc(Translate.get(
600             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
601                 String.valueOf(tid), backend.getName()}));
602
603       // Execute Query
604
ControllerResultSet rs = null;
605       try
606       {
607         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
608       }
609       catch (SQLException JavaDoc e)
610       {
611         throw new SQLException JavaDoc(Translate.get(
612             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
613                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
614                 backend.getName(), e.getMessage()}));
615       }
616       catch (BadConnectionException e)
617       { // Connection failed, so did the transaction
618
// Disable the backend.
619
cm.deleteConnection(tid);
620         String JavaDoc msg = Translate.get(
621             "loadbalancer.backend.disabling.connection.failure", backend
622                 .getName());
623         logger.error(msg);
624         disableBackend(backend);
625         throw new SQLException JavaDoc(msg);
626       }
627       if (logger.isDebugEnabled())
628         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
629             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
630                 backend.getName()}));
631       return rs;
632     }
633   }
634
635   /**
636    * Execute a stored procedure on the selected backend.
637    *
638    * @param proc the stored procedure to execute
639    * @param backend the backend that will execute the request
640    * @param metadataCache the metadataCache if any or null
641    * @return the ResultSet
642    * @throws SQLException if an error occurs
643    */

644   protected ControllerResultSet executeStoredProcedureOnBackend(
645       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
646       throws SQLException JavaDoc, UnreachableBackendException
647   {
648     // Ok, we have a backend, let's execute the request
649
AbstractConnectionManager cm = backend
650         .getConnectionManager(proc.getLogin());
651
652     // Sanity check
653
if (cm == null)
654     {
655       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
656           new String JavaDoc[]{proc.getLogin(), backend.getName()});
657       logger.error(msg);
658       throw new SQLException JavaDoc(msg);
659     }
660
661     // Execute the query
662
if (proc.isAutoCommit())
663     {
664       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
665         // We could do something finer grain here by waiting
666
// only for writes that depend on the tables we need
667
// but is that really worth the overhead ?
668
waitForAllWritesToComplete(backend);
669
670       // Use a connection just for this request
671
Connection JavaDoc c = null;
672       try
673       {
674         c = cm.getConnection();
675       }
676       catch (UnreachableBackendException e1)
677       {
678         logger.error(Translate.get(
679             "loadbalancer.backend.disabling.unreachable", backend.getName()));
680         disableBackend(backend);
681         throw new UnreachableBackendException(Translate.get(
682             "loadbalancer.backend.unreacheable", backend.getName()));
683       }
684
685       // Sanity check
686
if (c == null)
687         throw new SQLException JavaDoc(Translate.get(
688             "loadbalancer.backend.no.connection", backend.getName()));
689
690       // Execute Query
691
ControllerResultSet rs = null;
692       try
693       {
694         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
695             backend, c, metadataCache);
696       }
697       catch (Exception JavaDoc e)
698       {
699         throw new SQLException JavaDoc(Translate.get(
700             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
701                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
702                 backend.getName(), e.getMessage()}));
703       }
704       finally
705       {
706         cm.releaseConnection(c);
707       }
708       if (logger.isDebugEnabled())
709         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
710             new String JavaDoc[]{String.valueOf(proc.getId()), backend.getName()}));
711       return rs;
712     }
713     else
714     { // Inside a transaction
715
Connection JavaDoc c;
716       long tid = proc.getTransactionId();
717       Long JavaDoc lTid = new Long JavaDoc(tid);
718
719       // Wait for previous writes to complete
720
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
721         waitForAllWritesToComplete(backend, proc.getTransactionId());
722
723       try
724       {
725         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
726             proc.getTransactionIsolation());
727       }
728       catch (UnreachableBackendException e1)
729       {
730         logger.error(Translate.get(
731             "loadbalancer.backend.disabling.unreachable", backend.getName()));
732         disableBackend(backend);
733         throw new SQLException JavaDoc(Translate.get(
734             "loadbalancer.backend.unreacheable", backend.getName()));
735       }
736       catch (NoTransactionStartWhenDisablingException e)
737       {
738         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
739             new String JavaDoc[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
740                 backend.getName()});
741         logger.error(msg);
742         throw new SQLException JavaDoc(msg);
743       }
744
745       // Sanity check
746
if (c == null)
747         throw new SQLException JavaDoc(Translate.get(
748             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
749                 String.valueOf(tid), backend.getName()}));
750
751       // Execute Query
752
ControllerResultSet rs;
753       try
754       {
755         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
756             backend, c, metadataCache);
757       }
758       catch (Exception JavaDoc e)
759       {
760         throw new SQLException JavaDoc(Translate.get(
761             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
762                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
763                 backend.getName(), e.getMessage()}));
764       }
765       if (logger.isDebugEnabled())
766         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
767             new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
768                 backend.getName()}));
769       return rs;
770     }
771   }
772
773   /**
774    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
775    * MetadataCache)
776    */

777   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
778       MetadataCache metadataCache) throws SQLException JavaDoc
779   {
780     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
781         proc, true, metadataCache);
782     return task.getResult();
783   }
784
785   /**
786    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(StoredProcedure)
787    */

788   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException JavaDoc
789   {
790     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
791         proc, false, null);
792     return task.getResult();
793   }
794
795   /**
796    * Post the stored procedure call in the threads task list.
797    * <p>
798    * Note that macros are processed here.
799    *
800    * @param proc the stored procedure to call
801    * @param isRead true if the call returns a ResultSet
802    * @param metadataCache the metadataCache if any or null
803    * @return the task that has been executed (caller can get the result by
804    * calling getResult())
805    * @throws SQLException if an error occurs
806    */

807   private AbstractTask callStoredProcedure(StoredProcedure proc,
808       boolean isRead, MetadataCache metadataCache) throws SQLException JavaDoc
809   {
810     ArrayList JavaDoc backendThreads = backendBlockingThreads;
811     ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
812
813     // Total ordering mainly for distributed virtual databases.
814
// If waitForTotalOrder returns true then the query has been scheduled in
815
// total order and there is no need to take a write lock