KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > raidb1 > RAIDb1


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet
22  * Contributor(s): ______________________
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.ConcurrentModificationException JavaDoc;
31 import java.util.Iterator JavaDoc;
32
33 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
34 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
35 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
36 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory;
37 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
38 import org.objectweb.cjdbc.common.i18n.Translate;
39 import org.objectweb.cjdbc.common.log.Trace;
40 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
41 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
42 import org.objectweb.cjdbc.common.sql.SelectRequest;
43 import org.objectweb.cjdbc.common.sql.StoredProcedure;
44 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
45 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
46 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
47 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
48 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
49 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
50 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
51 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
52 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
53 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
54 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
57 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask;
58 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
59 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask;
60 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask;
61 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
62 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
63 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
64 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
65 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
66 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
67 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
68 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
69 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReleaseSavepoint;
70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.RollbackToSavepoint;
72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetSavepoint;
73
74 /**
75  * RAIDb-1 load balancer.
76  * <p>
77  * This class is an abstract call because the read requests coming from the
78  * request controller are NOT treated here but in the subclasses. Transaction
79  * management and write requests are broadcasted to all backends.
80  *
81  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
82  * @version 1.0
83  */

84 public abstract class RAIDb1 extends AbstractLoadBalancer
85 {
86   //
87
// How the code is organized ?
88
//
89
// 1. Member variables
90
// 2. Constructor(s)
91
// 3. Request handling
92
// 4. Transaction handling
93
// 5. Backend management
94
//
95

96   /**
97    * List of <code>BackendWorkerThread</code> that executes possibly blocking
98    * queries
99    */

100   protected ArrayList JavaDoc backendBlockingThreads;
101   /**
102    * List of <code>BackendWorkerThread</code> that executes non-blocking
103    * queries
104    */

105   protected ArrayList JavaDoc backendNonBlockingThreads;
106   /** Lock on backendBlockingThreads list */
107   protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
108   /** Lock on backendNonBlockingThreads list */
109   protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
110   /** Should we wait for all backends to commit before returning ? */
111   protected WaitForCompletionPolicy waitForCompletionPolicy;
112   protected static Trace logger = Trace
113                                                                             .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1");
114
115   /*
116    * Constructors
117    */

118
119   /**
120    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
121    * worker thread is created for each backend.
122    *
123    * @param vdb the virtual database this load balancer belongs to.
124    * @param waitForCompletionPolicy How many backends must complete before
125    * returning the result?
126    * @exception Exception if an error occurs
127    */

128   public RAIDb1(VirtualDatabase vdb,
129       WaitForCompletionPolicy waitForCompletionPolicy) throws Exception JavaDoc
130   {
131     super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING);
132     this.waitForCompletionPolicy = waitForCompletionPolicy;
133     backendBlockingThreads = new ArrayList JavaDoc();
134     backendNonBlockingThreads = new ArrayList JavaDoc();
135   }
136
137   /*
138    * Request Handling
139    */

140
141   /**
142    * Returns the number of nodes to wait for according to the defined
143    * <code>waitForCompletion</code> policy.
144    *
145    * @param nbOfThreads total number of threads
146    * @return int number of threads to wait for
147    */

148   private int getNbToWait(int nbOfThreads)
149   {
150     int nbToWait;
151     switch (waitForCompletionPolicy.getPolicy())
152     {
153       case WaitForCompletionPolicy.FIRST :
154         nbToWait = 1;
155         break;
156       case WaitForCompletionPolicy.MAJORITY :
157         nbToWait = nbOfThreads / 2 + 1;
158         break;
159       case WaitForCompletionPolicy.ALL :
160         nbToWait = nbOfThreads;
161         break;
162       default :
163         logger
164             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
165         nbToWait = nbOfThreads;
166         break;
167     }
168     return nbToWait;
169   }
170
171   /**
172    * @see AbstractLoadBalancer#execReadRequest(SelectRequest, MetadataCache)
173    */

174   public abstract ControllerResultSet execReadRequest(SelectRequest request,
175       MetadataCache metadataCache) throws SQLException JavaDoc;
176
177   /**
178    * Execute a read request on the selected backend.
179    *
180    * @param request the request to execute
181    * @param backend the backend that will execute the request
182    * @param metadataCache the metadataCache if any or null
183    * @return the ResultSet
184    * @throws SQLException if an error occurs
185    */

186   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
187       DatabaseBackend backend, MetadataCache metadataCache)
188       throws SQLException JavaDoc, UnreachableBackendException
189   {
190     // Handle macros
191
handleMacros(request);
192
193     // Ok, we have a backend, let's execute the request
194
AbstractConnectionManager cm = backend.getConnectionManager(request
195         .getLogin());
196
197     // Sanity check
198
if (cm == null)
199     {
200       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
201           new String JavaDoc[]{request.getLogin(), backend.getName()});
202       logger.error(msg);
203       throw new SQLException JavaDoc(msg);
204     }
205
206     // Execute the query
207
if (request.isAutoCommit())
208     {
209       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
210         // We could do something finer grain here by waiting
211
// only for writes that depend on the tables we need
212
// but is that really worth the overhead ?
213
waitForAllWritesToComplete(backend);
214
215       ControllerResultSet rs = null;
216       boolean badConnection;
217       do
218       {
219         badConnection = false;
220         // Use a connection just for this request
221
Connection JavaDoc c = null;
222         try
223         {
224           c = cm.getConnection();
225         }
226         catch (UnreachableBackendException e1)
227         {
228           logger.error(Translate.get(
229               "loadbalancer.backend.disabling.unreachable", backend.getName()));
230           disableBackend(backend);
231           throw new UnreachableBackendException(Translate.get(
232               "loadbalancer.backend.unreacheable", backend.getName()));
233         }
234
235         // Sanity check
236
if (c == null)
237           throw new SQLException JavaDoc(Translate.get(
238               "loadbalancer.backend.no.connection", backend.getName()));
239
240         // Execute Query
241
try
242         {
243           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
244           cm.releaseConnection(c);
245         }
246         catch (SQLException JavaDoc e)
247         {
248           cm.releaseConnection(c);
249           throw SQLExceptionFactory.getSQLException(e, Translate.get(
250               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
251                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
252                   backend.getName(), e.getMessage()}));
253         }
254         catch (BadConnectionException e)
255         { // Get rid of the bad connection
256
cm.deleteConnection(c);
257           badConnection = true;
258         }
259       }
260       while (badConnection);
261       if (logger.isDebugEnabled())
262         logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
263             String.valueOf(request.getId()), backend.getName()}));
264       return rs;
265     }
266     else
267     { // Inside a transaction
268
Connection JavaDoc c;
269       long tid = request.getTransactionId();
270       Long JavaDoc lTid = new Long JavaDoc(tid);
271
272       // Wait for previous writes to complete
273
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
274         waitForAllWritesToComplete(backend, request.getTransactionId());
275
276       try
277       {
278         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
279             request.getTransactionIsolation());
280       }
281       catch (UnreachableBackendException e1)
282       {
283         logger.error(Translate.get(
284             "loadbalancer.backend.disabling.unreachable", backend.getName()));
285         disableBackend(backend);
286         throw new SQLException JavaDoc(Translate.get(
287             "loadbalancer.backend.unreacheable", backend.getName()));
288       }
289       catch (NoTransactionStartWhenDisablingException e)
290       {
291         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
292             new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
293                 backend.getName()});
294         logger.error(msg);
295         throw new SQLException JavaDoc(msg);
296       }
297
298       // Sanity check
299
if (c == null)
300         throw new SQLException JavaDoc(Translate.get(
301             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
302                 String.valueOf(tid), backend.getName()}));
303
304       // Execute Query
305
ControllerResultSet rs = null;
306       try
307       {
308         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
309       }
310       catch (SQLException JavaDoc e)
311       {
312         throw SQLExceptionFactory.getSQLException(e, Translate.get(
313             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
314                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
315                 backend.getName(), e.getMessage()}));
316       }
317       catch (BadConnectionException e)
318       { // Connection failed, so did the transaction
319
// Disable the backend.
320
cm.deleteConnection(tid);
321         String JavaDoc msg = Translate.get(
322             "loadbalancer.backend.disabling.connection.failure", backend
323                 .getName());
324         logger.error(msg);
325         disableBackend(backend);
326         throw new SQLException JavaDoc(msg);
327       }
328       if (logger.isDebugEnabled())
329         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
330             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
331                 backend.getName()}));
332       return rs;
333     }
334   }
335
336   /**
337    * Performs a write request. This request is broadcasted to all nodes.
338    *
339    * @param request an <code>AbstractWriteRequest</code>
340    * @return number of rows affected by the request
341    * @throws AllBackendsFailedException if all backends failed to execute the
342    * request
343    * @exception SQLException if an error occurs
344    * @exception NoMoreBackendException if no backends left to execute the
345    * request
346    */

347   public int execWriteRequest(AbstractWriteRequest request)
348       throws AllBackendsFailedException, NoMoreBackendException, SQLException JavaDoc
349   {
350     return ((WriteRequestTask) execWriteRequest(request, false, null))
351         .getResult();
352   }
353
354   /**
355    * Perform a write request and return the auto generated keys.
356    *
357    * @param request the request to execute
358    * @param metadataCache the metadataCache if any or null
359    * @return auto generated keys.
360    * @throws AllBackendsFailedException if all backends failed to execute the
361    * request
362    * @exception SQLException if an error occurs
363    */

364   public ControllerResultSet execWriteRequestWithKeys(
365       AbstractWriteRequest request, MetadataCache metadataCache)
366       throws AllBackendsFailedException, SQLException JavaDoc
367   {
368     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
369         metadataCache)).getResult();
370   }
371
372   /**
373    * Common code for execWriteRequest(AbstractWriteRequest) and
374    * execWriteRequestWithKeys(AbstractWriteRequest).
375    * <p>
376    * Note that macros are processed here.
377    * <p>
378    * The result is given back in AbstractTask.getResult().
379    *
380    * @param request the request to execute
381    * @param useKeys true if this must give an auto generated keys ResultSet
382    * @param metadataCache the metadataCache if any or null
383    * @throws AllBackendsFailedException if all backends failed to execute the
384    * request
385    * @throws SQLException if an error occurs
386    */

387   private AbstractTask execWriteRequest(AbstractWriteRequest request,
388       boolean useKeys, MetadataCache metadataCache)
389       throws AllBackendsFailedException, NoMoreBackendException, SQLException JavaDoc
390   {
391     ArrayList JavaDoc backendThreads;
392     ReadPrioritaryFIFOWriteLock lock;
393
394     // Total ordering mainly for distributed virtual databases.
395
// If waitForTotalOrder returns true then the query has been scheduled in
396
// total order and there is no need to take a write lock later to resolve
397
// potential conflicts.
398
boolean canTakeReadLock = waitForTotalOrder(request, true);
399
400     // Handle macros
401
handleMacros(request);
402
403     // Determine which list (blocking or not) to use
404
if (request.mightBlock())
405     { // Blocking
406
backendThreads = backendBlockingThreads;
407       lock = backendBlockingThreadsRWLock;
408     }
409     else
410     { // Non-blocking
411
backendThreads = backendNonBlockingThreads;
412       lock = backendNonBlockingThreadsRWLock;
413       if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
414           && (request.getTransactionId() != 0))
415         waitForAllWritesToComplete(request.getTransactionId());
416       // else all previous requests for this transaction have already completed
417
// since the policy is to wait for all backends to complete and there is
418
// no risk of an asynchrony here.
419
}
420
421     try
422     {
423       if (canTakeReadLock)
424         lock.acquireRead();
425       else
426         lock.acquireWrite();
427     }
428     catch (InterruptedException JavaDoc e)
429     {
430       String JavaDoc msg = Translate.get(
431           "loadbalancer.backendlist.acquire.writelock.failed", e);
432       logger.error(msg);
433       throw new SQLException JavaDoc(msg);
434     }
435
436     // Note that the backendThreads list is only supposed to contain enabled
437
// backends. When a backend is disabled, its backendWorkerThread is removed
438
// from the list.
439
int nbOfThreads = backendThreads.size();
440     if (nbOfThreads == 0)
441     {
442       if (canTakeReadLock)
443         lock.releaseRead();
444       else
445         lock.releaseWrite();
446
447       // Unblock next query from total order queue
448
removeHeadFromAndNotifyTotalOrderQueue();
449       throw new NoMoreBackendException(Translate
450           .get("loadbalancer.backendlist.empty"));
451     }
452     else
453     {
454       if (logger.isDebugEnabled())
455         logger.debug(Translate.get("loadbalancer.execute.on.several",
456             new String JavaDoc[]{String.valueOf(request.getId()),
457                 String.valueOf(nbOfThreads)}));
458     }
459
460     // Create the task
461
AbstractTask task;
462     if (useKeys)
463       task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
464           nbOfThreads, request, metadataCache);
465     else
466       task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
467           request);
468
469     // We have to first post the request on each backend before letting the
470
// first backend to execute the request. Therefore we have 2 phases:
471
// 1. post the task in each thread queue
472
// 2. notify each thread to execute the query
473

474     // 1. Post the task
475
if (request.isAutoCommit())
476     {
477       for (int i = 0; i < nbOfThreads; i++)
478       {
479         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
480             .get(i);
481         synchronized (thread)
482         {
483           thread.addTask(task);
484         }
485       }
486     }
487     else
488     {
489       for (int i = 0; i < nbOfThreads; i++)
490       {
491         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
492             .get(i);
493         synchronized (thread)
494         {
495           thread.addTask(task, request.getTransactionId());
496         }
497       }
498     }
499
500     // 2. Start the task execution on each backend
501
for (int i = 0; i < nbOfThreads; i++)
502     {
503       BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
504       synchronized (thread)
505       {
506         thread.notify();
507       }
508     }
509
510     // Release lock
511
if (canTakeReadLock)
512       lock.releaseRead();
513     else
514       lock.releaseWrite();
515
516     // Unblock next query from total order queue
517
removeHeadFromAndNotifyTotalOrderQueue();
518
519     synchronized (task)
520     {
521       if (!task.hasCompleted())
522       {
523         // Wait for completion (notified by the task)
524
try
525         {
526           // Wait on task
527
long timeout = request.getTimeout() * 1000;
528           if (timeout > 0)
529           {
530             long start = System.currentTimeMillis();
531             task.wait(timeout);
532             long end = System.currentTimeMillis();
533             long remaining = timeout - (end - start);
534             if (remaining <= 0)
535             {
536               if (task.setExpiredTimeout())
537               { // Task will be ignored by all backends
538
String JavaDoc msg = Translate.get("loadbalancer.request.timeout",
539                     new String JavaDoc[]{String.valueOf(request.getId()),
540                         String.valueOf(task.getSuccess()),
541                         String.valueOf(task.getFailed())});
542
543                 logger.warn(msg);
544                 throw new SQLException JavaDoc(msg);
545               }
546               // else task execution already started, to late to cancel
547
}
548             // No need to update request timeout since the execution is finished
549
}
550           else
551             task.wait();
552         }
553         catch (InterruptedException JavaDoc e)
554         {
555           if (task.setExpiredTimeout())
556           { // Task will be ignored by all backends
557
String JavaDoc msg = Translate.get("loadbalancer.request.timeout",
558                 new String JavaDoc[]{String.valueOf(request.getId()),
559                     String.valueOf(task.getSuccess()),
560                     String.valueOf(task.getFailed())});
561
562             logger.warn(msg);
563             throw new SQLException JavaDoc(msg);
564           }
565           // else task execution already started, to late to cancel
566
}
567       }
568
569       if (task.getSuccess() > 0)
570         return task;
571       else
572       { // All tasks failed
573
ArrayList JavaDoc exceptions = task.getExceptions();
574         if (exceptions == null)
575           throw new AllBackendsFailedException(Translate.get(
576               "loadbalancer.request.failed.all", request.getId()));
577         else
578         {
579           String JavaDoc errorMsg = Translate.get("loadbalancer.request.failed.stack",
580               request.getId())
581               + "\n";
582           SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
583               errorMsg);
584           logger.error(ex.getMessage());
585           throw ex;
586         }
587       }
588     }
589   }
590
591   /**
592    * Execute a stored procedure on the selected backend.
593    *
594    * @param proc the stored procedure to execute
595    * @param backend the backend that will execute the request
596    * @param metadataCache the metadataCache if any or null
597    * @return the ResultSet
598    * @throws SQLException if an error occurs
599    */

600   protected ControllerResultSet executeStoredProcedureOnBackend(
601       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
602       throws SQLException JavaDoc, UnreachableBackendException
603   {
604     // Ok, we have a backend, let's execute the request
605
AbstractConnectionManager cm = backend
606         .getConnectionManager(proc.getLogin());
607
608     // Sanity check
609
if (cm == null)
610     {
611       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
612           new String JavaDoc[]{proc.getLogin(), backend.getName()});
613       logger.error(msg);
614       throw new SQLException JavaDoc(msg);
615     }
616
617     // Execute the query
618
if (proc.isAutoCommit())
619     {
620       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
621         // We could do something finer grain here by waiting
622
// only for writes that depend on the tables we need
623
// but is that really worth the overhead ?
624
waitForAllWritesToComplete(backend);
625
626       // Use a connection just for this request
627
Connection JavaDoc c = null;
628       try
629       {
630         c = cm.getConnection();
631       }
632       catch (UnreachableBackendException e1)
633       {
634         logger.error(Translate.get(
635             "loadbalancer.backend.disabling.unreachable", backend.getName()));
636         disableBackend(backend);
637         throw new UnreachableBackendException(Translate.get(
638             "loadbalancer.backend.unreacheable", backend.getName()));
639       }
640
641       // Sanity check
642
if (c == null)
643         throw new UnreachableBackendException(Translate.get(
644             "loadbalancer.backend.no.connection", backend.getName()));
645
646       // Execute Query
647
ControllerResultSet rs = null;
648       try
649       {
650         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
651             backend, c, metadataCache);
652       }
653       catch (Exception JavaDoc e)
654       {
655         throw new SQLException JavaDoc(Translate.get(
656             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
657                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
658                 backend.getName(), e.getMessage()}));
659       }
660       finally
661       {
662         cm.releaseConnection(c);
663       }
664       if (logger.isDebugEnabled())
665         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
666             new String JavaDoc[]{String.valueOf(proc.getId()), backend.getName()}));
667       return rs;
668     }
669     else
670     { // Inside a transaction
671
Connection JavaDoc c;
672       long tid = proc.getTransactionId();
673       Long JavaDoc lTid = new Long JavaDoc(tid);
674
675       // Wait for previous writes to complete
676
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
677         waitForAllWritesToComplete(backend, proc.getTransactionId());
678
679       try
680       {
681         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
682             proc.getTransactionIsolation());
683       }
684       catch (UnreachableBackendException e1)
685       {
686         logger.error(Translate.get(
687             "loadbalancer.backend.disabling.unreachable", backend.getName()));
688         disableBackend(backend);
689         throw new SQLException JavaDoc(Translate.get(
690             "loadbalancer.backend.unreacheable", backend.getName()));
691       }
692       catch (NoTransactionStartWhenDisablingException e)
693       {
694         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
695             new String JavaDoc[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
696                 backend.getName()});
697         logger.error(msg);
698         throw new SQLException JavaDoc(msg);
699       }
700
701       // Sanity check
702
if (c == null)
703         throw new SQLException JavaDoc(Translate.get(
704             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
705                 String.valueOf(tid), backend.getName()}));
706
707       // Execute Query
708
ControllerResultSet rs;
709       try
710       {
711         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
712             backend, c, metadataCache);
713       }
714       catch (Exception JavaDoc e)
715       {
716         throw new SQLException JavaDoc(Translate.get(
717             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
718                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
719                 backend.getName(), e.getMessage()}));
720       }
721       if (logger.isDebugEnabled())
722         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
723             new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
724                 backend.getName()}));
725       return rs;
726     }
727   }
728
729   /**
730    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
731    * MetadataCache)
732    */

733   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
734       MetadataCache metadataCache) throws SQLException JavaDoc
735   {
736     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
737         proc, true, metadataCache);
738     return task.getResult();
739   }
740
741   /**
742    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)
743    */

744   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException JavaDoc
745   {
746     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
747         proc, false, null);
748     return task.getResult();
749   }
750
751   /**
752    * Post the stored procedure call in the threads task list.
753    * <p>
754    * Note that macros are also processed here.
755    *
756    * @param proc the stored procedure to call
757    * @param isRead true if the call returns a ResultSet
758    * @param metadataCache the metadataCache if any or null
759    * @return the task that has been executed (caller can get the result by
760    * calling getResult())
761    * @throws SQLException if an error occurs
762    */

763   private AbstractTask callStoredProcedure(StoredProcedure proc,
764       boolean isRead, MetadataCache metadataCache) throws SQLException JavaDoc
765   {
766     ArrayList JavaDoc backendThreads = backendBlockingThreads;
767     ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
768
769     // Total ordering mainly for distributed virtual databases.
770
// If waitForTotalOrder returns true then the query has been scheduled in
771
// total order and there is no need to take a write lock later to resolve
772
// potential conflicts.
773
boolean canTakeReadLock = waitForTotalOrder(proc, true);
774
775     // Handle macros
776
handleMacros(proc);
777
778     try
779     {
780       // Note that a read stored procedure here is supposed to also execute
781
// writes and as the scheduler cannot block atomically on multiple tables
782
// for a writes, we have to lock as a write even for a read stored
783
// procedure. A read-only stored procedure will execute in a separate
784
// method call (see executeReadOnlyStoredProcedure).
785
if (canTakeReadLock)
786         lock.acquireRead();
787       else
788         lock.acquireWrite();
789     }
790     catch (InterruptedException JavaDoc e)
791     {
792       String JavaDoc msg;
793       msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed",
794           e);
795       logger.error(msg);
796       throw new SQLException JavaDoc(msg);
797     }
798
799     int nbOfThreads = backendThreads.size();
800     if (nbOfThreads == 0)
801     {
802       if (canTakeReadLock)
803         lock.releaseRead();
804       else
805         lock.releaseWrite();
806
807       // Unblock next query from total order queue
808
removeHeadFromAndNotifyTotalOrderQueue();
809       throw new NoMoreBackendException(Translate
810           .get("loadbalancer.backendlist.empty"));
811     }
812     else
813     {
814       if (logger.isDebugEnabled())
815         logger.debug(Translate.get("loadbalancer.execute.on.several",
816             new String JavaDoc[]{String.valueOf(proc.getId()),
817                 String.valueOf(nbOfThreads)}));
818     }
819
820     // Create the task
821
AbstractTask task;
822     if (isRead)
823       task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
824           proc, metadataCache);
825     else
826       task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
827           nbOfThreads, proc);
828
829     // Post the task in each backendThread tasklist and wakeup the threads
830
for (int i = 0; i < nbOfThreads; i++)
831     {
832       BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
833       synchronized (thread)
834       {
835         if (proc.isAutoCommit())
836           thread.addTask(task);
837         else
838           thread.addTask(task, proc.getTransactionId());
839         thread.notify();
840       }
841     }
842
843     if (canTakeReadLock)
844       lock.releaseRead();
845     else
846       lock.releaseWrite();
847
848     // Unblock next query from total order queue
849
removeHeadFromAndNotifyTotalOrderQueue();
850
851     synchronized (task)
852     {
853       if (!task.hasCompleted())
854       {
855         // Wait for completion (notified by the task)
856
try
857         {
858           // Wait on task
859
long timeout = proc.getTimeout() * 1000;
860           if (timeout > 0)
861           {
862             long start = System.currentTimeMillis();
863             task.wait(timeout);
864             long end = System.currentTimeMillis();
865             long remaining = timeout - (end - start);
866             if (remaining <= 0)
867             {
868               if (task.setExpiredTimeout())
869               { // Task will be ignored by all backends
870
String JavaDoc msg = Translate.get(
871                     "loadbalancer.storedprocedure.timeout", new String JavaDoc[]{
872                         String.valueOf(proc.getId()),
873                         String.valueOf(task.getSuccess()),
874                         String.valueOf(task.getFailed())});
875                 logger.warn(msg);
876                 throw new SQLException JavaDoc(msg);
877               }
878               // else task execution already started, to late to cancel
879
}
880             // No need to update request timeout since the execution is finished
881
}
882           else
883             task.wait();
884         }
885         catch (InterruptedException JavaDoc e)
886         {
887           if (task.setExpiredTimeout())
888           { // Task will be ignored by all backends
889
String JavaDoc msg = Translate.get("loadbalancer.storedprocedure.timeout",
890                 new String JavaDoc[]{String.valueOf(proc.getId()),
891                     String.valueOf(task.getSuccess()),
892                     String.valueOf(task.getFailed())});
893             logger.warn(msg);
894             throw new SQLException JavaDoc(msg);
895           }
896           // else task execution already started, to late to cancel
897
}
898       }
899
900       if (task.getSuccess() > 0)
901         return task;
902       else
903       { // All tasks failed
904
ArrayList JavaDoc exceptions = task.getExceptions();
905         if (exceptions == null)
906           throw new SQLException JavaDoc(Translate.get(
907               "loadbalancer.storedprocedure.all.failed", proc.getId()));
908         else
909         {
910           String JavaDoc errorMsg = Translate.get(
911               "loadbalancer.storedprocedure.failed.stack", proc.getId())
912               + "\n";
913           SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
914               errorMsg);
915           logger.error(ex.getMessage());
916           throw ex;
917         }
918       }
919     }
920   }
921
922   /*
923    * Transaction management
924    */

925
926   /**
927    * Begins a new transaction.
928    *
929    * @param tm the transaction marker metadata
930    * @exception SQLException if an error occurs
931    */

932   public final void begin(TransactionMarkerMetaData tm) throws SQLException JavaDoc
933   {
934   }
935
936   /**
937    * Commits a transaction.
938    *
939    * @param tm the transaction marker metadata
940    * @exception SQLException if an error occurs
941    */

942   public void commit(TransactionMarkerMetaData tm) throws SQLException JavaDoc
943   {
944     long tid = tm.getTransactionId();
945     Long JavaDoc lTid = new Long JavaDoc(tid);
946     // List of backends that still have pending queries for the transaction to
947
// commit
948
ArrayList JavaDoc asynchronousBackends = null;
949     CommitTask task = null;
950
951     // Ordering for distributed virtual database
952
Commit totalOrderCommit = null;
953     boolean canTakeReadLock = false;
954     if (vdb.getTotalOrderQueue() != null)
955     {
956       totalOrderCommit = new Commit(tm.getLogin(), tid);
957       // Total ordering mainly for distributed virtual databases.
958
// If waitForTotalOrder returns true then the query has been scheduled in
959
// total order and there is no need to take a write lock later to resolve
960
// potential conflicts.
961
canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
962       if (!canTakeReadLock)
963         // This is a local commit no total order info
964
totalOrderCommit = null;
965     }
966
967     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
968     { // Insert commit after last write
969
try
970       {
971         if (canTakeReadLock)
972           backendBlockingThreadsRWLock.acquireRead();
973         else
974         {
975           // Lock in write to ensure that all writes are posted and we wait in
976
// the queue, else a read lock has the priority with the
977
// implementation we are using.
978
backendBlockingThreadsRWLock.acquireWrite();
979         }
980       }
981       catch (InterruptedException JavaDoc e)
982       {
983         String JavaDoc msg = Translate.get(
984             "loadbalancer.backendlist.acquire.writelock.failed", e);
985         logger.error(msg);
986         throw new SQLException JavaDoc(msg);
987       }
988
989       int nbOfThreads = backendBlockingThreads.size();
990       // Create the task
991
task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
992           .getTimeout(), tm.getLogin(), tid);
993
994       for (int i = 0; i < nbOfThreads; i++)
995       {
996         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
997             .get(i);
998         if (thread.hasTaskForTransaction(lTid))
999         {
1000          if (asynchronousBackends == null)
1001            asynchronousBackends = new ArrayList JavaDoc();
1002          asynchronousBackends.add(thread.getBackend());
1003          synchronized (thread)
1004          {
1005            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1006            thread.notify();
1007          }
1008        }
1009      }
1010
1011      if (canTakeReadLock)
1012        backendBlockingThreadsRWLock.releaseRead();
1013      else
1014        backendBlockingThreadsRWLock.releaseWrite();
1015
1016      // Unset the task
1017
if (asynchronousBackends == null)
1018        task = null;
1019    }
1020
1021    try
1022    {
1023      if (canTakeReadLock)
1024        backendNonBlockingThreadsRWLock.acquireRead();
1025      else
1026        backendNonBlockingThreadsRWLock.acquireWrite();
1027    }
1028    catch (InterruptedException JavaDoc e)
1029    {
1030      String JavaDoc msg = Translate.get(
1031          "loadbalancer.backendlist.acquire.writelock.failed", e);
1032      logger.error(msg);
1033      throw new SQLException JavaDoc(msg);
1034    }
1035
1036    int nbOfThreads = backendNonBlockingThreads.size();
1037    ArrayList JavaDoc commitList = new ArrayList JavaDoc();
1038
1039    // Build the list of backends that need to commit this transaction
1040
for (int i = 0; i < nbOfThreads; i++)
1041    {
1042      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1043          .get(i);
1044      DatabaseBackend backend = thread.getBackend();
1045      // If the transaction has been started on this backend and it was not
1046
// previously treated in the asynchronous backend list (late nodes), then
1047
// we have to post the task now in the asynchronous list.
1048
if (backend.isStartedTransaction(lTid)
1049          && ((asynchronousBackends == null) || (!asynchronousBackends
1050              .contains(backend))))
1051        commitList.add(thread);
1052    }
1053
1054    // If no backend was late and the commit task has not been posted to any
1055
// backend yet, then we have to create a task for the backends that really
1056
// need to commit the transaction (in the blocking queue).
1057
// Backends for which we have to post in the non blocking queue
1058
int nbOfThreadsToCommit = commitList.size();
1059    if ((task == null) && (nbOfThreadsToCommit != 0))
1060      task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1061          nbOfThreadsToCommit, tm.getTimeout(), tm.getLogin(), tid);
1062
1063    // Post the task in each backendThread tasklist and wakeup the threads. This
1064
// could either be the remaining threads that were not in the asynchronous
1065
// queue or all the backends that started the transaction.
1066
for (int i = 0; i < nbOfThreadsToCommit; i++)
1067    {
1068      BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
1069      synchronized (thread)
1070      {
1071        thread.addTask(task, tid);
1072        thread.notify();
1073      }
1074    }
1075
1076    if (canTakeReadLock)
1077      backendNonBlockingThreadsRWLock.releaseRead();
1078    else
1079      backendNonBlockingThreadsRWLock.releaseWrite();
1080
1081    // Unblock next query from total order queue
1082
if (totalOrderCommit != null)
1083      removeHeadFromAndNotifyTotalOrderQueue();
1084
1085    // Check if someone had something to commit
1086
if (task == null)
1087      return;
1088
1089    synchronized (task)
1090    {
1091      if (!task.hasCompleted())
1092      {
1093        // Wait for completion (notified by the task)
1094
try
1095        {
1096          // Wait on task
1097
long timeout = tm.getTimeout();
1098          if (timeout > 0)
1099          {
1100            long start = System.currentTimeMillis();
1101            task.wait(timeout);
1102            long end = System.currentTimeMillis();
1103            long remaining = timeout - (end - start);
1104            if (remaining <= 0)
1105            {
1106              if (task.setExpiredTimeout())
1107              { // Task will be ignored by all backends
1108
String JavaDoc msg = Translate.get("loadbalancer.commit.timeout",
1109                    new String JavaDoc[]{String.valueOf(tid),
1110                        String.valueOf(task.getSuccess()),
1111                        String.valueOf(task.getFailed())});
1112                logger.warn(msg);
1113                throw new SQLException JavaDoc(msg);
1114              }
1115              // else task execution already started, too late to cancel
1116
}
1117          }
1118          else
1119            task.wait();
1120        }
1121        catch (InterruptedException JavaDoc e)
1122        {
1123          if (task.setExpiredTimeout())
1124          { // Task will be ignored by all backends
1125
String JavaDoc msg = Translate.get("loadbalancer.commit.timeout",
1126                new String JavaDoc[]{String.valueOf(tid),
1127                    String.valueOf(task.getSuccess()),
1128                    String.valueOf(task.getFailed())});
1129            logger.warn(msg);
1130            throw new SQLException JavaDoc(msg);
1131          }
1132          // else task execution already started, too late to cancel
1133
}
1134      }
1135
1136      if (task.getSuccess() > 0)
1137        return;
1138      else
1139      { // All tasks failed
1140
ArrayList JavaDoc exceptions = task.getExceptions();
1141        if (exceptions == null)
1142          throw new SQLException JavaDoc(Translate.get(
1143              "loadbalancer.commit.all.failed", tid));
1144        else
1145        {
1146          String JavaDoc errorMsg = Translate.get("loadbalancer.commit.failed.stack",
1147              tid)
1148              + "\n";
1149          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1150              errorMsg);
1151          logger.error(ex.getMessage());
1152          throw ex;
1153        }
1154      }
1155    }
1156  }
1157
1158  /**
1159   * Rollbacks a transaction.
1160   *
1161   * @param tm the transaction marker metadata
1162   * @exception SQLException if an error occurs
1163   */

1164  public void rollback(TransactionMarkerMetaData tm) throws SQLException JavaDoc
1165  {
1166    long tid = tm.getTransactionId();
1167    Long JavaDoc lTid = new Long JavaDoc(tid);
1168    // List of backends that still have pending queries for the transaction to
1169
// rollback
1170
ArrayList JavaDoc asynchronousBackends = null;
1171    RollbackTask task = null;
1172
1173    // Ordering for distributed virtual database
1174
Rollback totalOrderRollback = null;
1175    boolean canTakeReadLock = false;
1176    if (vdb.getTotalOrderQueue() != null)
1177    {
1178      totalOrderRollback = new Rollback(tm.getLogin(), tid);
1179      // Total ordering mainly for distributed virtual databases.
1180
// If waitForTotalOrder returns true then the query has been scheduled in
1181
// total order and there is no need to take a write lock later to resolve
1182
// potential conflicts.
1183
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1184      if (!canTakeReadLock)
1185        // This is a local rollback no total order info
1186
totalOrderRollback = null;
1187    }
1188
1189    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1190    {
1191      try
1192      {
1193        if (canTakeReadLock)
1194          backendBlockingThreadsRWLock.acquireRead();
1195        else
1196        {
1197          // Lock in write to ensure that all writes are posted and we wait in
1198
// the queue, else a read lock has the priority with the
1199
// implementation we are using.
1200
backendBlockingThreadsRWLock.acquireWrite();
1201        }
1202      }
1203      catch (InterruptedException JavaDoc e)
1204      {
1205        String JavaDoc msg = Translate.get(
1206            "loadbalancer.backendlist.acquire.writelock.failed", e);
1207        logger.error(msg);
1208        throw new SQLException JavaDoc(msg);
1209      }
1210
1211      int nbOfThreads = backendBlockingThreads.size();
1212      // Create the task
1213
task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1214          .getTimeout(), tm.getLogin(), tid);
1215
1216      for (int i = 0; i < nbOfThreads; i++)
1217      {
1218        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1219            .get(i);
1220        if (thread.hasTaskForTransaction(lTid))
1221        {
1222          if (asynchronousBackends == null)
1223            asynchronousBackends = new ArrayList JavaDoc();
1224          asynchronousBackends.add(thread.getBackend());
1225          synchronized (thread)
1226          {
1227            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1228            thread.notify();
1229          }
1230        }
1231      }
1232
1233      if (canTakeReadLock)
1234        backendBlockingThreadsRWLock.releaseRead();
1235      else
1236        backendBlockingThreadsRWLock.releaseWrite();
1237
1238      // Unset the task
1239
if (asynchronousBackends == null)
1240        task = null;
1241    }
1242
1243    try
1244    {
1245      if (canTakeReadLock)
1246        backendNonBlockingThreadsRWLock.acquireRead();
1247      else
1248        backendNonBlockingThreadsRWLock.acquireWrite();
1249    }
1250    catch (InterruptedException JavaDoc e)
1251    {
1252      String JavaDoc msg = Translate.get(
1253          "loadbalancer.backendlist.acquire.writelock.failed", e);
1254      logger.error(msg);
1255      throw new SQLException JavaDoc(msg);
1256    }
1257
1258    int nbOfThreads = backendNonBlockingThreads.size();
1259    ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1260
1261    // Build the list of backend that need to rollback this transaction
1262
for (int i = 0; i < nbOfThreads; i++)
1263    {
1264      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1265          .get(i);
1266      DatabaseBackend backend = thread.getBackend();
1267      // If the transaction has been started on this backend and it was not
1268
// previously treated in the asynchronous backend list (late nodes), then
1269
// we have to post the task now in the asynchronous list.
1270
if (backend.isStartedTransaction(lTid)
1271          && ((asynchronousBackends == null) || (!asynchronousBackends
1272              .contains(backend))))
1273        rollbackList.add(thread);
1274    }
1275
1276    int nbOfThreadsToRollback = rollbackList.size();
1277    // If no backend was late and the rollback task has not been posted to any
1278
// backend yet, then we have to create a task for the backends that really
1279
// need to rollback the transaction.
1280
if ((task == null) && (nbOfThreadsToRollback != 0))
1281      task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1282          nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid);
1283
1284    // Post the task in each backendThread tasklist and wakeup the threads. This
1285
// could either be the remaining threads that were not in the asynchronous
1286
// queue or all the backends that started the transaction.
1287
for (int i = 0; i < nbOfThreadsToRollback; i++)
1288    {
1289      BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
1290      synchronized (thread)
1291      {
1292        thread.addTask(task, tid);
1293        thread.notify();
1294      }
1295    }
1296
1297    // Release lock
1298
if (canTakeReadLock)
1299      backendNonBlockingThreadsRWLock.releaseRead();
1300    else
1301      backendNonBlockingThreadsRWLock.releaseWrite();
1302
1303    // Unblock next query from total order queue
1304
if (totalOrderRollback != null)
1305      removeHeadFromAndNotifyTotalOrderQueue();
1306
1307    // Check if someone had something to rollback
1308
if (task == null)
1309      return;
1310
1311    synchronized (task)
1312    {
1313      if (!task.hasCompleted())
1314      {
1315        // Wait for completion (notified by the task)
1316
try
1317        {
1318          // Wait on task
1319
long timeout = tm.getTimeout();
1320          if (timeout > 0)
1321          {
1322            long start = System.currentTimeMillis();
1323            task.wait(timeout);
1324            long end = System.currentTimeMillis();
1325            long remaining = timeout - (end - start);
1326            if (remaining <= 0)
1327            {
1328              if (task.setExpiredTimeout())
1329              { // Task will be ignored by all backends
1330
String JavaDoc msg = Translate.get("loadbalancer.rollback.timeout",
1331                    new String JavaDoc[]{String.valueOf(tid),
1332                        String.valueOf(task.getSuccess()),
1333                        String.valueOf(task.getFailed())});
1334                logger.warn(msg);
1335                throw new SQLException JavaDoc(msg);
1336              }
1337              // else task execution already started, to late to cancel
1338
}
1339          }
1340          else
1341            task.wait();
1342        }
1343        catch (InterruptedException JavaDoc e)
1344        {
1345          if (task.setExpiredTimeout())
1346          { // Task will be ignored by all backends
1347
String JavaDoc msg = Translate.get("loadbalancer.rollback.timeout",
1348                new String JavaDoc[]{String.valueOf(tid),
1349                    String.valueOf(task.getSuccess()),
1350                    String.valueOf(task.getFailed())});
1351            logger.warn(msg);
1352            throw new SQLException JavaDoc(msg);
1353          }
1354          // else task execution already started, to late to cancel
1355
}
1356      }
1357
1358      if (task.getSuccess() > 0)
1359        return;
1360      else
1361      { // All tasks failed
1362
ArrayList JavaDoc exceptions = task.getExceptions();
1363        if (exceptions == null)
1364          throw new SQLException JavaDoc(Translate.get(
1365              "loadbalancer.rollback.all.failed", tid));
1366        else
1367        {
1368          String JavaDoc errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
1369              tid)
1370              + "\n";
1371          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1372              errorMsg);
1373          logger.error(ex.getMessage());
1374          throw ex;
1375        }
1376      }
1377    }
1378  }
1379
1380  /**
1381   * Rollback a transaction to a savepoint
1382   *
1383   * @param tm The transaction marker metadata
1384   * @param savepointName The name of the savepoint
1385   * @throws SQLException if an error occurs
1386   */

1387  public void rollback(TransactionMarkerMetaData tm, String JavaDoc savepointName)
1388      throws SQLException JavaDoc
1389  {
1390    long tid = tm.getTransactionId();
1391    Long JavaDoc lTid = new Long JavaDoc(tid);
1392    // List of backends that still have pending queries for the transaction for
1393
// which to rollback to a savepoint
1394
ArrayList JavaDoc asynchronousBackends = null;
1395    RollbackToSavepointTask task = null;
1396
1397    // Ordering for distributed virtual database
1398
RollbackToSavepoint totalOrderRollback = null;
1399    boolean canTakeReadLock = false;
1400    if (vdb.getTotalOrderQueue() != null)
1401    {
1402      totalOrderRollback = new RollbackToSavepoint(tid, savepointName);
1403      // Total ordering mainly for distributed virtual databases.
1404
// If waitForTotalOrder returns true then the query has been scheduled in
1405
// total order and there is no need to take a write lock later to resolve
1406
// potential conflicts.
1407
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1408      if (!canTakeReadLock)
1409        // This is a local commit no total order info
1410
totalOrderRollback = null;
1411    }
1412
1413    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1414    {
1415      try
1416      {
1417        if (canTakeReadLock)
1418          backendBlockingThreadsRWLock.acquireRead();
1419        else
1420        {
1421          // Lock in write to ensure that all writes are posted and we wait in
1422
// the queue, else a read lock has the priority with the
1423
// implementation we are using.
1424
backendBlockingThreadsRWLock.acquireWrite();
1425        }
1426      }
1427      catch (InterruptedException JavaDoc e)
1428      {
1429        String JavaDoc msg = Translate.get(
1430            "loadbalancer.backendlist.acquire.writelock.failed", e);
1431        logger.error(msg);
1432        throw new SQLException JavaDoc(msg);
1433      }
1434
1435      int nbOfThreads = backendBlockingThreads.size();
1436      // Create the task
1437
task = new RollbackToSavepointTask(getNbToWait(nbOfThreads), nbOfThreads,
1438          tm.getTimeout(), tm.getLogin(), tid, savepointName);
1439
1440      for (int i = 0; i < nbOfThreads; i++)
1441      {
1442        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1443            .get(i);
1444        if (thread.hasTaskForTransaction(lTid))
1445        {
1446          if (asynchronousBackends == null)
1447            asynchronousBackends = new ArrayList JavaDoc();
1448          asynchronousBackends.add(thread.getBackend());
1449          synchronized (thread)
1450          {
1451            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1452            thread.notify();
1453          }
1454        }
1455      }
1456
1457      if (canTakeReadLock)
1458        backendBlockingThreadsRWLock.releaseRead();
1459      else
1460        backendBlockingThreadsRWLock.releaseWrite();
1461
1462      // Unset the task
1463
if (asynchronousBackends == null)
1464        task = null;
1465    }
1466
1467    try
1468    {
1469      if (canTakeReadLock)
1470        backendNonBlockingThreadsRWLock.acquireRead();
1471      else
1472        backendNonBlockingThreadsRWLock.acquireWrite();
1473    }
1474    catch (InterruptedException JavaDoc e)
1475    {
1476      String JavaDoc msg = Translate.get(
1477          "loadbalancer.backendlist.acquire.writelock.failed", e);
1478      logger.error(msg);
1479      throw new SQLException JavaDoc(msg);
1480    }
1481
1482    int nbOfThreads = backendNonBlockingThreads.size();
1483    ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1484
1485    // Build the list of backend that need to rollback to savepoint for this
1486
// transaction
1487
for (int i = 0; i < nbOfThreads; i++)
1488    {
1489      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1490          .get(i);
1491      DatabaseBackend backend = thread.getBackend();
1492      // If the transaction has been started on this backend and it was not
1493
// previously treated in the asynchronous backend list (late nodes), then
1494
// we have to post the task now in the asynchronous list.
1495
if (backend.isStartedTransaction(lTid)
1496          && ((asynchronousBackends == null) || (!asynchronousBackends
1497              .contains(backend))))
1498        rollbackList.add(thread);
1499    }
1500
1501    int nbOfThreadsToRollback = rollbackList.size();
1502    // If no backend was late and the rollback task has not been posted to any
1503
// backend yet, then we have to create a task for the backends that really
1504
// need to rollback the transaction.
1505
if ((task == null) && (nbOfThreadsToRollback != 0))
1506      task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback),
1507          nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid,
1508          savepointName);
1509
1510    // Post the task in each backendThread tasklist and wakeup the threads. This
1511
// could either be the remaining threads that were not in the asynchronous
1512
// queue or all the backends that started the transaction.
1513
for (int i = 0; i < nbOfThreadsToRollback; i++)
1514    {
1515      BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
1516      synchronized (thread)
1517      {
1518        thread.addTask(task, tid);
1519        thread.notify();
1520      }
1521    }
1522
1523    // Release lock
1524
if (canTakeReadLock)
1525      backendNonBlockingThreadsRWLock.releaseRead();
1526    else
1527      backendNonBlockingThreadsRWLock.releaseWrite();
1528
1529    // Unblock next query from total order queue
1530
if (totalOrderRollback != null)
1531      removeHeadFromAndNotifyTotalOrderQueue();
1532
1533    // Check if someone had something to rollback
1534
if (task == null)
1535      return;
1536
1537    synchronized (task)
1538    {
1539      if (!task.hasCompleted())
1540      {
1541        // Wait for completion (notified by the task)
1542
try
1543        {
1544          // Wait on task
1545
long timeout = tm.getTimeout();
1546          if (timeout > 0)
1547          {
1548            long start = System.currentTimeMillis();
1549            task.wait(timeout);
1550            long end = System.currentTimeMillis();
1551            long remaining = timeout - (end - start);
1552            if (remaining <= 0)
1553            {
1554              if (task.setExpiredTimeout())
1555              { // Task will be ignored by all backends
1556
String JavaDoc msg = Translate.get(
1557                    "loadbalancer.rollbacksavepoint.timeout", new String JavaDoc[]{
1558                        savepointName, String.valueOf(tid),
1559                        String.valueOf(task.getSuccess()),
1560                        String.valueOf(task.getFailed())});
1561                logger.warn(msg);
1562                throw new SQLException JavaDoc(msg);
1563              }
1564              // else task execution already started, to late to cancel
1565
}
1566          }
1567          else
1568            task.wait();
1569        }
1570        catch (InterruptedException JavaDoc e)
1571        {
1572          if (task.setExpiredTimeout())
1573          { // Task will be ignored by all backends
1574
String JavaDoc msg = Translate.get(
1575                "loadbalancer.rollbacksavepoint.timeout", new String JavaDoc[]{
1576                    savepointName, String.valueOf(tid),
1577                    String.valueOf(task.getSuccess()),
1578                    String.valueOf(task.getFailed())});
1579            logger.warn(msg);
1580            throw new SQLException JavaDoc(msg);
1581          }
1582          // else task execution already started, to late to cancel
1583
}
1584      }
1585
1586      if (task.getSuccess() > 0)
1587        return;
1588      else
1589      { // All tasks failed
1590
ArrayList JavaDoc exceptions = task.getExceptions();
1591        if (exceptions == null)
1592          throw new SQLException JavaDoc(Translate.get(
1593              "loadbalancer.rollbacksavepoint.all.failed", new String JavaDoc[]{
1594                  savepointName, String.valueOf(tid)}));
1595        else
1596        {
1597          String JavaDoc errorMsg = Translate.get(
1598              "loadbalancer.rollbacksavepoint.failed.stack", new String JavaDoc[]{
1599                  savepointName, String.valueOf(tid)})
1600              + "\n";
1601          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1602              errorMsg);
1603          logger.error(ex.getMessage());
1604          throw ex;
1605        }
1606      }
1607    }
1608  }
1609
1610  /**
1611   * Release a savepoint from a transaction
1612   *
1613   * @param tm The transaction marker metadata
1614   * @param name The name of the savepoint ro release
1615   * @throws SQLException if an error occurs
1616   */

1617  public void releaseSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1618      throws SQLException JavaDoc
1619  {
1620    long tid = tm.getTransactionId();
1621    Long JavaDoc lTid = new Long JavaDoc(tid);
1622
1623    // List of backends that still have pending queries for the transaction for
1624
// which a savepoint will be released
1625
ArrayList JavaDoc asynchronousBackends = null;
1626    ReleaseSavepointTask task = null;
1627
1628    // Ordering for distributed virtual database
1629
ReleaseSavepoint totalOrderRelease = null;
1630    boolean canTakeReadLock = false;
1631    if (vdb.getTotalOrderQueue() != null)
1632    {
1633      totalOrderRelease = new ReleaseSavepoint(tid, name);
1634      // Total ordering mainly for distributed virtual databases.
1635
// If waitForTotalOrder returns true then the query has been scheduled in
1636
// total order and there is no need to take a write lock later to resolve
1637
// potential conflicts.
1638
canTakeReadLock = waitForTotalOrder(totalOrderRelease, false);
1639      if (!canTakeReadLock)
1640        // This is a local commit no total order info
1641
totalOrderRelease = null;
1642    }
1643
1644    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1645    {
1646      try
1647      {
1648        if (canTakeReadLock)
1649          backendBlockingThreadsRWLock.acquireRead();
1650        else
1651        {
1652          // Lock in write to ensure that all writes are posted and we wait in
1653
// the queue, else a read lock has the priority with the
1654
// implementation we are using.
1655
backendBlockingThreadsRWLock.acquireWrite();
1656        }
1657      }
1658      catch (InterruptedException JavaDoc e)
1659      {
1660        String JavaDoc msg = Translate.get(
1661            "loadbalancer.backendlist.acquire.writelock.failed", e);
1662        logger.error(msg);
1663        throw new SQLException JavaDoc(msg);
1664      }
1665
1666      int nbOfThreads = backendBlockingThreads.size();
1667
1668      // Create the task
1669
task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1670          .getTimeout(), tm.getLogin(), tid, name);
1671
1672      for (int i = 0; i < nbOfThreads; i++)
1673      {
1674        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1675            .get(i);
1676        if (thread.hasTaskForTransaction(lTid))
1677        {
1678          if (asynchronousBackends == null)
1679            asynchronousBackends = new ArrayList JavaDoc();
1680          asynchronousBackends.add(thread.getBackend());
1681          synchronized (thread)
1682          {
1683            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1684            thread.notify();
1685          }
1686        }
1687      }
1688
1689      // Release lock
1690
if (canTakeReadLock)
1691        backendBlockingThreadsRWLock.releaseRead();
1692      else
1693        backendBlockingThreadsRWLock.releaseWrite();
1694
1695      // Unset the task
1696
if (asynchronousBackends == null)
1697        task = null;
1698    }
1699
1700    try
1701    {
1702      if (canTakeReadLock)
1703        backendNonBlockingThreadsRWLock.acquireRead();
1704      else
1705        backendNonBlockingThreadsRWLock.acquireWrite();
1706    }
1707    catch (InterruptedException JavaDoc e)
1708    {
1709      String JavaDoc msg = Translate.get(
1710          "loadbalancer.backendlist.acquire.writelock.failed", e);
1711      logger.error(msg);
1712      throw new SQLException JavaDoc(msg);
1713    }
1714
1715    int nbOfThreads = backendNonBlockingThreads.size();
1716    ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1717    // Build the list of backend that need to release a savepoint for this
1718
// transaction
1719
for (int i = 0; i < nbOfThreads; i++)
1720    {
1721      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1722          .get(i);
1723      DatabaseBackend backend = thread.getBackend();
1724      // If the transaction has been started on this backend and it was not
1725
// previously treated in the asynchronous backend list (late nodes), then
1726
// we have to post the task now in the asynchronous list.
1727
if (backend.isStartedTransaction(lTid)
1728          && ((asynchronousBackends == null) || (!asynchronousBackends
1729              .contains(backend))))
1730        savepointList.add(thread);
1731    }
1732
1733    nbOfThreads = savepointList.size();
1734    if (nbOfThreads == 0)
1735    {
1736      if (canTakeReadLock)
1737        backendNonBlockingThreadsRWLock.releaseRead();
1738      else
1739        backendNonBlockingThreadsRWLock.releaseWrite();
1740      return;
1741    }
1742
1743    if (task == null)
1744      task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1745          .getTimeout(), tm.getLogin(), tid, name);
1746
1747    synchronized (task)
1748    {
1749      // Post the task in each backendThread tasklist and wakeup the threads
1750
for (int i = 0; i < nbOfThreads; i++)
1751      {
1752        BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i);
1753        synchronized (thread)
1754        {
1755          thread.addTask(task, tid);
1756          thread.notify();
1757        }
1758      }
1759
1760      // Release lock
1761
if (canTakeReadLock)
1762        backendNonBlockingThreadsRWLock.releaseRead();
1763      else
1764        backendNonBlockingThreadsRWLock.releaseWrite();
1765
1766      // Unblock next query from total order queue
1767
if (totalOrderRelease != null)
1768        removeHeadFromAndNotifyTotalOrderQueue();
1769
1770      // Wait for completion (notified by the task)
1771
try
1772      {
1773        // Wait on task
1774
long timeout = tm.getTimeout();
1775        if (timeout > 0)
1776        {
1777          long start = System.currentTimeMillis();
1778          task.wait(timeout);
1779          long end = System.currentTimeMillis();
1780          long remaining = timeout - (end - start);
1781          if (remaining <= 0)
1782          {
1783            if (task.setExpiredTimeout())
1784            { // Task will be ignored by all backends
1785
String JavaDoc msg = Translate.get(
1786                  "loadbalancer.releasesavepoint.timeout", new String JavaDoc[]{name,
1787                      String.valueOf(tid), String.valueOf(task.getSuccess()),
1788                      String.valueOf(task.getFailed())});
1789              logger.warn(msg);
1790              throw new SQLException JavaDoc(msg);
1791            }
1792            // else task execution already started, to late to cancel
1793
}
1794        }
1795        else
1796          task.wait();
1797      }
1798      catch (InterruptedException JavaDoc e)
1799      {
1800        if (task.setExpiredTimeout())
1801        { // Task will be ignored by all backends
1802
String JavaDoc msg = Translate.get("loadbalancer.releasesavepoint.timeout",
1803              new String JavaDoc[]{name, String.valueOf(tid),
1804                  String.valueOf(task.getSuccess()),
1805                  String.valueOf(task.getFailed())});
1806          logger.warn(msg);
1807          throw new SQLException JavaDoc(msg);
1808        }
1809        // else task execution already started, to late to cancel
1810
}
1811
1812      if (task.getSuccess() > 0)
1813        return;
1814      else
1815      { // All tasks failed
1816
ArrayList JavaDoc exceptions = task.getExceptions();
1817        if (exceptions == null)
1818          throw new SQLException JavaDoc(Translate.get(
1819              "loadbalancer.releasesavepoint.all.failed", new String JavaDoc[]{name,
1820                  String.valueOf(tid)}));
1821        else
1822        {
1823          String JavaDoc errorMsg = Translate.get(
1824              "loadbalancer.releasesavepoint.failed.stack", new String JavaDoc[]{name,
1825                  String.valueOf(tid)})
1826              + "\n";
1827          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1828              errorMsg);
1829          logger.error(ex.getMessage());
1830          throw ex;
1831        }
1832      }
1833    }
1834  }
1835
1836  /**
1837   * Set a savepoint to a transaction.
1838   *
1839   * @param tm The transaction marker metadata
1840   * @param name The name of the new savepoint
1841   * @throws SQLException if an error occurs
1842   */

1843  public void setSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1844      throws SQLException JavaDoc
1845  {
1846    long tid = tm.getTransactionId();
1847    Long JavaDoc lTid = new Long JavaDoc(tid);
1848
1849    // List of backends that still have pending queries for the transaction for
1850
// which a savepoint will be set
1851
ArrayList JavaDoc asynchronousBackends = null;
1852    SavepointTask task = null;
1853
1854    // Ordering for distributed virtual database
1855
SetSavepoint totalOrderSavepoint = null;
1856    boolean canTakeReadLock = false;
1857    if (vdb.getTotalOrderQueue() != null)
1858    {
1859      totalOrderSavepoint = new SetSavepoint(tid, name);
1860      // Total ordering mainly for distributed virtual databases.
1861
// If waitForTotalOrder returns true then the query has been scheduled in
1862
// total order and there is no need to take a write lock later to resolve
1863
// potential conflicts.
1864
canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false);
1865      if (!canTakeReadLock)
1866        // This is a local commit no total order info
1867
totalOrderSavepoint = null;
1868    }
1869
1870    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1871    {
1872      try
1873      {
1874        if (canTakeReadLock)
1875          backendBlockingThreadsRWLock.acquireRead();
1876        else
1877        {
1878          // Lock in write to ensure that all writes are posted and we wait in
1879
// the queue, else a read lock has the priority with the
1880
// implementation we are using.
1881
backendBlockingThreadsRWLock.acquireWrite();
1882        }
1883      }
1884      catch (InterruptedException JavaDoc e)
1885      {
1886        String JavaDoc msg = Translate.get(
1887            "loadbalancer.backendlist.acquire.writelock.failed", e);
1888        logger.error(msg);
1889        throw new SQLException JavaDoc(msg);
1890      }
1891
1892      int nbOfThreads = backendBlockingThreads.size();
1893
1894      // Create the task
1895
task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1896          .getTimeout(), tm.getLogin(), tid, name);
1897
1898      for (int i = 0; i < nbOfThreads; i++)
1899      {
1900        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1901            .get(i);
1902        if (thread.hasTaskForTransaction(lTid))
1903        {
1904          if (asynchronousBackends == null)
1905            asynchronousBackends = new ArrayList JavaDoc();
1906          asynchronousBackends.add(thread.getBackend());
1907          synchronized (thread)
1908          {
1909            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1910            thread.notify();
1911          }
1912        }
1913      }
1914
1915      // Release lock
1916
if (canTakeReadLock)
1917        backendBlockingThreadsRWLock.releaseRead();
1918      else
1919        backendBlockingThreadsRWLock.releaseWrite();
1920
1921      // Unset the task
1922
if (asynchronousBackends == null)
1923        task = null;
1924    }
1925
1926    try
1927    {
1928      if (canTakeReadLock)
1929        backendNonBlockingThreadsRWLock.acquireRead();
1930      else
1931        backendNonBlockingThreadsRWLock.acquireWrite();
1932    }
1933    catch (InterruptedException JavaDoc e)
1934    {
1935      String JavaDoc msg = Translate.get(
1936          "loadbalancer.backendlist.acquire.writelock.failed", e);
1937      logger.error(msg);
1938      throw new SQLException JavaDoc(msg);
1939    }
1940
1941    int nbOfThreads = backendNonBlockingThreads.size();
1942    ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1943    // Build the list of backend that need to set a savepoint for this
1944
// transaction
1945
for (int i = 0; i < nbOfThreads; i++)
1946    {
1947      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1948          .get(i);
1949      DatabaseBackend backend = thread.getBackend();
1950      // If the transaction has been started on this backend and it was not
1951
// previously treated in the asynchronous backend list (late nodes), then
1952
// we have to post the task now in the asynchronous list.
1953
if (backend.isStartedTransaction(lTid)
1954          && ((asynchronousBackends == null) || (!asynchronousBackends
1955              .contains(backend))))
1956        savepointList.add(thread);
1957    }
1958
1959    nbOfThreads = savepointList.size();
1960    if (nbOfThreads == 0)
1961    {
1962      if (canTakeReadLock)
1963        backendNonBlockingThreadsRWLock.releaseRead();
1964      else
1965        backendNonBlockingThreadsRWLock.releaseWrite();
1966      return;
1967    }
1968
1969    if (task == null)
1970      task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1971          .getTimeout(), tm.getLogin(), tid, name);
1972
1973    synchronized (task)
1974    {
1975      // Post the task in each backendThread tasklist and wakeup the threads
1976
for (int i = 0; i < nbOfThreads; i++)
1977      {
1978        BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i);
1979        synchronized (thread)
1980        {
1981          thread.addTask(task, tid);
1982          thread.notify();
1983        }
1984      }
1985
1986      // Release lock
1987
if (canTakeReadLock)
1988        backendNonBlockingThreadsRWLock.releaseRead();
1989      else
1990        backendNonBlockingThreadsRWLock.releaseWrite();
1991
1992      // Unblock next query from total order queue
1993
if (totalOrderSavepoint != null)
1994        removeHeadFromAndNotifyTotalOrderQueue();
1995
1996      // Wait for completion (notified by the task)
1997
try
1998      {
1999        // Wait on task
2000
long timeout = tm.getTimeout();
2001        if (timeout > 0)
2002        {
2003          long start = System.currentTimeMillis();
2004          task.wait(timeout);
2005          long end = System.currentTimeMillis();
2006          long remaining = timeout - (end - start);
2007          if (remaining <= 0)
2008          {
2009            if (task.setExpiredTimeout())
2010            { // Task will be ignored by all backends
2011
String JavaDoc msg = Translate.get("loadbalancer.setsavepoint.timeout",
2012                  new String JavaDoc[]{name, String.valueOf(tid),
2013                      String.valueOf(task.getSuccess()),
2014                      String.valueOf(task.getFailed())});
2015              logger.warn(msg);
2016              throw new SQLException JavaDoc(msg);
2017            }
2018            // else task execution already started, to late to cancel
2019
}
2020        }
2021        else
2022          task.wait();
2023      }
2024      catch (InterruptedException JavaDoc e)
2025      {
2026        if (task.setExpiredTimeout())
2027        { // Task will be ignored by all backends
2028
String JavaDoc msg = Translate.get("loadbalancer.setsavepoint.timeout",
2029              new String JavaDoc[]{name, String.valueOf(tid),
2030                  String.valueOf(task.getSuccess()),
2031                  String.valueOf(task.getFailed())});
2032          logger.warn(msg);
2033          throw new SQLException JavaDoc(msg);
2034        }
2035        // else task execution already started, to late to cancel
2036
}
2037
2038      if (task.getSuccess() > 0)
2039        return;
2040      else
2041      { // All tasks failed
2042
ArrayList JavaDoc exceptions = task.getExceptions();
2043        if (exceptions == null)
2044          throw new SQLException JavaDoc(Translate.get(
2045              "loadbalancer.setsavepoint.all.failed", new String JavaDoc[]{name,
2046                  String.valueOf(tid)}));
2047        else
2048        {
2049          String JavaDoc errorMsg = Translate.get(
2050              "loadbalancer.setsavepoint.failed.stack", new String JavaDoc[]{name,
2051                  String.valueOf(tid)})
2052              + "\n";
2053          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
2054              errorMsg);
2055          logger.error(ex.getMessage());
2056          throw ex;
2057        }
2058      }
2059    }
2060  }
2061
2062  /**
2063   * Wait for all writes to be posted on BackendBlockingThreads by simply
2064   * acquiring the RW lock in write and releasing it.
2065   *
2066   * @throws SQLException if we fail to acquire the lock
2067   */

2068  private void waitForAllWritesToBePostedOnBackendBlockingThreads()
2069      throws SQLException JavaDoc
2070  {
2071    // Lock in write to ensure that all writes are posted and we wait in the
2072
// queue, else a read lock has the priority with the implementation we are
2073
// using.
2074
try
2075    {
2076      backendBlockingThreadsRWLock.acquireWrite();
2077    }
2078    catch (InterruptedException JavaDoc e)
2079    {
2080      String JavaDoc msg = Translate.get(
2081          "loadbalancer.backendlist.acquire.writelock.failed", e);
2082      logger.error(msg);
2083      throw new SQLException JavaDoc(msg);
2084    }
2085    backendBlockingThreadsRWLock.releaseWrite();
2086  }
2087
2088  /**
2089   * Waits for all writes of the given transaction in the blocking thread queue
2090   * to complete before being able to complete the transaction.
2091   *
2092   * @throws SQLException if a locking error occurs
2093   */

2094  protected void waitForAllWritesToComplete(long transactionId)
2095      throws SQLException JavaDoc
2096  {
2097    waitForAllWritesToBePostedOnBackendBlockingThreads();
2098
2099    boolean success = false;
2100    while (!success)
2101    {
2102      try
2103      { // Note that we are not synchronized here and we might have concurrent
2104
// modifications of the backend list.
2105
for (Iterator JavaDoc iter = backendBlockingThreads.iterator(); iter.hasNext();)
2106        {
2107          BackendWorkerThread thread = (BackendWorkerThread) iter.next();
2108          thread.waitForAllTasksToComplete(transactionId);
2109        }
2110        success = true;
2111      }
2112      catch (ConcurrentModificationException JavaDoc e)
2113      { // List has been modified while we were iterating
2114
// Retry until we succeed
2115
}
2116    }
2117  }
2118
2119  /**
2120   * Waits for all writes of the given transaction in the blocking thread queue
2121   * of the given backend to complete before being able to complete the
2122   * transaction.
2123   *
2124   * @throws SQLException if we fail to acquire the lock
2125   * @see #executeRequestOnBackend
2126   */

2127  protected void waitForAllWritesToComplete(DatabaseBackend backend,
2128      long transactionId) throws SQLException JavaDoc
2129  {
2130    waitForAllWritesToBePostedOnBackendBlockingThreads();
2131
2132    boolean success = false;
2133    while (!success)
2134    {
2135      try
2136      { // Note that we are not synchronized here and we might have concurrent
2137
// modifications of the backend list.
2138
for (Iterator JavaDoc iter = backendBlockingThreads.iterator(); iter.hasNext();)
2139        {
2140          BackendWorkerThread thread = (BackendWorkerThread) iter.next();
2141          if (thread.getBackend() == backend)
2142          {
2143            thread.waitForAllTasksToComplete(transactionId);
2144            break;
2145          }
2146        }
2147        success = true;
2148      }
2149      catch (ConcurrentModificationException JavaDoc e)
2150      { // List has been modified while we were iterating
2151
// Retry until we succeed
2152
}
2153    }
2154  }
2155
2156  /**
2157   * Waits for all writes in the blocking thread queue of the given backend to
2158   * complete.
2159   *
2160   * @throws SQLException if we fail to acquire the lock
2161   * @see #executeRequestOnBackend
2162   */

2163  protected void waitForAllWritesToComplete(DatabaseBackend backend)
2164      throws SQLException JavaDoc
2165  {
2166    waitForAllWritesToBePostedOnBackendBlockingThreads();
2167
2168    boolean success = false;
2169    while (!success)
2170    {
2171      try
2172      { // Note that we are not synchronized here and we might have concurrent
2173
// modifications of the backend list.
2174
for (Iterator JavaDoc iter = backendBlockingThreads.iterator(); iter.hasNext();)
2175        {
2176          BackendWorkerThread thread = (BackendWorkerThread) iter.next();
2177          if (thread.getBackend() == backend)
2178          {
2179            thread.waitForAllTasksToComplete();
2180            break;
2181          }
2182        }
2183        success = true;
2184      }
2185      catch (ConcurrentModificationException JavaDoc e)
2186      { // List has been modified while we were iterating
2187
// Retry until we succeed
2188
}
2189    }
2190  }
2191
2192  /*
2193   * Backends management
2194   */

2195
2196  /**
2197   * Enables a Backend that was previously disabled.
2198   * <p>
2199   * Ask the corresponding connection manager to initialize the connections if
2200   * needed.
2201   * <p>
2202   * No sanity checks are performed by this function.
2203   *
2204   * @param db the database backend to enable
2205   * @param writeEnabled True if the backend must be enabled for writes
2206   * @throws SQLException if an error occurs
2207   */

2208  public void enableBackend(DatabaseBackend db, boolean writeEnabled)
2209      throws SQLException JavaDoc
2210  {
2211    if (writeEnabled && db.isWriteCanBeEnabled())
2212    {
2213      // Create 2 worker threads
2214
BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
2215      BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
2216
2217      // Add first to the blocking thread list
2218
try
2219      {
2220        backendBlockingThreadsRWLock.acquireWrite();
2221      }
2222      catch (InterruptedException JavaDoc e)
2223      {
2224        String JavaDoc msg = Translate.get(
2225            "loadbalancer.backendlist.acquire.writelock.failed", e);
2226        logger.error(msg);
2227        throw new SQLException JavaDoc(msg);
2228      }
2229      backendBlockingThreads.add(blockingThread);
2230      backendBlockingThreadsRWLock.releaseWrite();
2231      blockingThread.start();
2232      logger.info(Translate.get(
2233          "loadbalancer.backend.workerthread.blocking.add", db.getName()));
2234
2235      // Then add to the non-blocking thread list
2236
try
2237      {
2238        backendNonBlockingThreadsRWLock.acquireWrite();
2239      }
2240      catch (InterruptedException JavaDoc e)
2241      {
2242        String JavaDoc msg = Translate.get(
2243            "loadbalancer.backendlist.acquire.writelock.failed", e);
2244        logger.error(msg);
2245        throw new SQLException JavaDoc(msg);
2246      }
2247      backendNonBlockingThreads.add(nonBlockingThread);
2248      backendNonBlockingThreadsRWLock.releaseWrite();
2249      nonBlockingThread.start();
2250      logger.info(Translate.get(
2251          "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
2252      db.enableWrite();
2253    }
2254
2255    if (!db.isInitialized())
2256      db.initializeConnections();
2257    db.enableRead();
2258  }
2259
2260  /**
2261   * Disables a backend that was previously enabled.
2262   * <p>
2263   * Ask the corresponding connection manager to finalize the connections if
2264   * needed.
2265   * <p>
2266   * No sanity checks are performed by this function.
2267   *
2268   * @param db the database backend to disable
2269   * @throws SQLException if an error occurs
2270   */

2271  public synchronized void disableBackend(DatabaseBackend db)
2272      throws SQLException JavaDoc
2273  {
2274    if (db.isWriteEnabled())
2275    {
2276      KillThreadTask killBlockingThreadTask = new KillThreadTask(1, 1);
2277
2278      // Starts with backendBlockingThreads
2279
try
2280      {
2281        backendBlockingThreadsRWLock.acquireWrite();
2282      }
2283      catch (InterruptedException JavaDoc e)
2284      {
2285        String JavaDoc msg = Translate.get(
2286            "loadbalancer.backendlist.acquire.writelock.failed", e);
2287        logger.error(msg);
2288        throw new SQLException JavaDoc(msg);
2289      }
2290
2291      int nbOfThreads = backendBlockingThreads.size();
2292
2293      // Find the right blocking thread
2294
for (int i = 0; i < nbOfThreads; i++)
2295      {
2296        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
2297            .get(i);
2298        if (thread.getBackend().equals(db))
2299        {
2300          logger.info(Translate
2301              .get("loadbalancer.backend.workerthread.blocking.remove", db
2302                  .getName()));
2303
2304          // Remove it from the backendBlockingThread list
2305
backendBlockingThreads.remove(i);
2306
2307          synchronized (thread)
2308          {
2309            // Kill the thread
2310
thread.addPriorityTask(killBlockingThreadTask);
2311            thread.notify();
2312          }
2313          break;
2314        }
2315      }
2316
2317      backendBlockingThreadsRWLock.releaseWrite();
2318
2319      // Wait for the thread to be killed
2320
synchronized (killBlockingThreadTask)
2321      {
2322        if (!killBlockingThreadTask.hasFullyCompleted())
2323          try
2324          {
2325            killBlockingThreadTask.wait();
2326          }
2327          catch (InterruptedException JavaDoc ignore)
2328          {
2329          }
2330      }
2331
2332      // Continue with backendNonBlockingThreads
2333
KillThreadTask killNonBlockingThreadTask = new KillThreadTask(1, 1);
2334
2335      try
2336      {
2337        backendNonBlockingThreadsRWLock.acquireWrite();
2338      }
2339      catch (InterruptedException JavaDoc e)
2340      {
2341        String JavaDoc msg = Translate.get(
2342            "loadbalancer.backendlist.acquire.writelock.failed", e);
2343        logger.error(msg);
2344        throw new SQLException JavaDoc(msg);
2345      }
2346
2347      // Find the right non-blocking thread
2348
nbOfThreads = backendNonBlockingThreads.size();
2349      for (int i = 0; i < nbOfThreads; i++)
2350      {
2351        BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
2352            .get(i);
2353        if (thread.getBackend().equals(db))
2354        {
2355          logger.info(Translate.get(
2356              "loadbalancer.backend.workerthread.non.blocking.remove", db
2357                  .getName()));
2358
2359          // Remove it from the backendNonBlockingThreads list
2360
backendNonBlockingThreads.remove(i);
2361
2362          synchronized (thread)
2363          {
2364            // Kill the thread
2365
thread.addPriorityTask(killNonBlockingThreadTask);
2366            thread.notify();
2367          }
2368          break;
2369        }
2370      }
2371
2372      backendNonBlockingThreadsRWLock.releaseWrite();
2373
2374      // Wait for the thread to be killed
2375
synchronized (killNonBlockingThreadTask)
2376      {
2377        if (!killNonBlockingThreadTask.hasFullyCompleted())
2378          try
2379          {
2380            killNonBlockingThreadTask.wait();
2381          }
2382          catch (InterruptedException JavaDoc ignore)
2383          {
2384          }
2385      }
2386    }
2387
2388    db.disable();
2389    if (db.isInitialized())
2390      db.finalizeConnections();
2391  }
2392
2393  /**
2394   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getNumberOfEnabledBackends()
2395   */

2396  public int getNumberOfEnabledBackends()
2397  {
2398    return backendBlockingThreads.size();
2399  }
2400
2401  //
2402
// Debug/Monitoring
2403
//
2404

2405  /**
2406   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2407   */

2408  public String JavaDoc getXmlImpl()
2409  {
2410    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
2411    info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2412    if (waitForCompletionPolicy != null)
2413      info.append(waitForCompletionPolicy.getXml());
2414    if (macroHandler != null)
2415      info.append(macroHandler.getXml());
2416    info.append(getRaidb1Xml());
2417    info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
2418    return info.toString();
2419  }
2420
2421  /**
2422   * Surrounding raidb1 tags can be treated by <method>getXmlImpl </method>
2423   * above, but more detailed content have to be returned by the method
2424   * <method>getRaidb1Xml </method> below.
2425   *
2426   * @return content of Raidb1 xml
2427   */

2428  public abstract String JavaDoc getRaidb1Xml();
2429}
Popular Tags