KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > raidb2 > RAIDb2


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Jean-Bernard van Zuylen.
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.ConcurrentModificationException JavaDoc;
31 import java.util.Iterator JavaDoc;
32
33 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
34 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
35 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory;
36 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
37 import org.objectweb.cjdbc.common.i18n.Translate;
38 import org.objectweb.cjdbc.common.log.Trace;
39 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
40 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
41 import org.objectweb.cjdbc.common.sql.SelectRequest;
42 import org.objectweb.cjdbc.common.sql.StoredProcedure;
43 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
44 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
45 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
46 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
47 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
48 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
49 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
50 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
51 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
52 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
53 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
54 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
55 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
56 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
57 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
58 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
59 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReleaseSavepointTask;
60 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
61 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackToSavepointTask;
62 import org.objectweb.cjdbc.controller.loadbalancer.tasks.SavepointTask;
63 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
64 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
65 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
66 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
67 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
68 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
69 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
70 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
71 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ReleaseSavepoint;
72 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
73 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.RollbackToSavepoint;
74 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.SetSavepoint;
75
76 /**
77  * RAIDb-2 load balancer.
78  * <p>
79  * This class is an abstract call because the read requests coming from the
80  * Request Manager are NOT treated here but in the subclasses. Transaction
81  * management and write requests are broadcasted to all backends owning the
82  * written table.
83  *
84  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
85  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
86  * </a>
87  * @version 1.0
88  */

89 public abstract class RAIDb2 extends AbstractLoadBalancer
90 {
91   //
92
// How the code is organized ?
93
// 1. Member variables
94
// 2. Constructor(s)
95
// 3. Request handling
96
// 4. Transaction handling
97
// 5. Backend management
98
//
99

100   protected ArrayList JavaDoc backendBlockingThreads;
101   protected ArrayList JavaDoc backendNonBlockingThreads;
102   protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
103   protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
104   // Should we wait for all backends to commit before returning ?
105
protected WaitForCompletionPolicy waitForCompletionPolicy;
106   protected CreateTablePolicy createTablePolicy;
107
108   protected static Trace logger = Trace
109                                                                             .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2");
110
111   /*
112    * Constructors
113    */

114
115   /**
116    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
117    * worker thread is created for each backend.
118    *
119    * @param vdb the virtual database this load balancer belongs to.
120    * @param waitForCompletionPolicy how many backends must complete before
121    * returning the result ?
122    * @param createTablePolicy the policy defining how 'create table' statements
123    * should be handled
124    * @exception Exception if an error occurs
125    */

126   public RAIDb2(VirtualDatabase vdb,
127       WaitForCompletionPolicy waitForCompletionPolicy,
128       CreateTablePolicy createTablePolicy) throws Exception JavaDoc
129   {
130     super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
131
132     this.waitForCompletionPolicy = waitForCompletionPolicy;
133     backendBlockingThreads = new ArrayList JavaDoc();
134     backendNonBlockingThreads = new ArrayList JavaDoc();
135     this.createTablePolicy = createTablePolicy;
136   }
137
138   /*
139    * Request Handling
140    */

141
142   /**
143    * Returns the number of nodes to wait for according to the defined
144    * <code>waitForCompletion</code> policy.
145    *
146    * @param nbOfThreads total number of threads
147    * @return int number of threads to wait for
148    */

149   private int getNbToWait(int nbOfThreads)
150   {
151     int nbToWait;
152     switch (waitForCompletionPolicy.getPolicy())
153     {
154       case WaitForCompletionPolicy.FIRST :
155         nbToWait = 1;
156         break;
157       case WaitForCompletionPolicy.MAJORITY :
158         nbToWait = nbOfThreads / 2 + 1;
159         break;
160       case WaitForCompletionPolicy.ALL :
161         nbToWait = nbOfThreads;
162         break;
163       default :
164         logger
165             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
166         nbToWait = nbOfThreads;
167         break;
168     }
169     return nbToWait;
170   }
171
172   /**
173    * Performs a write request. This request is broadcasted to all nodes that
174    * owns the table to be written.
175    *
176    * @param request an <code>AbstractWriteRequest</code>
177    * @return number of rows affected by the request
178    * @throws AllBackendsFailedException if all backends failed to execute the
179    * request
180    * @exception SQLException if an error occurs
181    */

182   public int execWriteRequest(AbstractWriteRequest request)
183       throws AllBackendsFailedException, SQLException JavaDoc
184   {
185     return ((WriteRequestTask) execWriteRequest(request, false, null))
186         .getResult();
187   }
188
189   /**
190    * Perform a write request and return the auto generated keys.
191    *
192    * @param request the request to execute
193    * @param metadataCache the metadataCache if any or null
194    * @return auto generated keys.
195    * @throws AllBackendsFailedException if all backends failed to execute the
196    * request
197    * @exception SQLException if an error occurs
198    */

199   public ControllerResultSet execWriteRequestWithKeys(
200       AbstractWriteRequest request, MetadataCache metadataCache)
201       throws AllBackendsFailedException, SQLException JavaDoc
202   {
203     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
204         metadataCache)).getResult();
205   }
206
207   /**
208    * Common code for execWriteRequest(AbstractWriteRequest) and
209    * execWriteRequestWithKeys(AbstractWriteRequest).
210    * <p>
211    * Note that macros are processed here.
212    * <p>
213    * The result is given back in AbstractTask.getResult().
214    *
215    * @param request the request to execute
216    * @param useKeys true if this must give an auto generated keys ResultSet
217    * @param metadataCache the metadataCache if any or null
218    * @throws AllBackendsFailedException if all backends failed to execute the
219    * request
220    * @throws SQLException if an error occurs
221    */

222   private AbstractTask execWriteRequest(AbstractWriteRequest request,
223       boolean useKeys, MetadataCache metadataCache)
224       throws AllBackendsFailedException, SQLException JavaDoc
225   {
226     ArrayList JavaDoc backendThreads;
227     ReadPrioritaryFIFOWriteLock lock;
228
229     // Total ordering mainly for distributed virtual databases.
230
// If waitForTotalOrder returns true then the query has been scheduled in
231
// total order and there is no need to take a write lock later to resolve
232
// potential conflicts.
233
boolean canTakeReadLock = waitForTotalOrder(request, true);
234
235     // Handle macros
236
handleMacros(request);
237
238     // Determine which list (blocking or not) to use
239
if (request.mightBlock())
240     { // Blocking
241
backendThreads = backendBlockingThreads;
242       lock = backendBlockingThreadsRWLock;
243     }
244     else
245     { // Non-blocking
246
backendThreads = backendNonBlockingThreads;
247       lock = backendNonBlockingThreadsRWLock;
248       if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
249           && (request.getTransactionId() != 0))
250         waitForAllWritesToComplete(request.getTransactionId());
251     }
252
253     try
254     {
255       if (canTakeReadLock)
256         lock.acquireRead();
257       else
258         lock.acquireWrite();
259     }
260     catch (InterruptedException JavaDoc e)
261     {
262       String JavaDoc msg = Translate.get(
263           "loadbalancer.backendlist.acquire.writelock.failed", e);
264       logger.error(msg);
265       throw new SQLException JavaDoc(msg);
266     }
267
268     int nbOfThreads = backendThreads.size();
269     ArrayList JavaDoc writeList = new ArrayList JavaDoc();
270     String JavaDoc tableName = request.getTableName();
271
272     if (request.isCreate())
273     { // Choose the backend according to the defined policy
274
CreateTableRule rule = createTablePolicy.getTableRule(request
275           .getTableName());
276       if (rule == null)
277         rule = createTablePolicy.getDefaultRule();
278
279       // Ask the rule to pickup the backends
280
ArrayList JavaDoc chosen;
281       try
282       {
283         chosen = rule.getBackends(vdb.getBackends());
284       }
285       catch (CreateTableException e)
286       {
287         throw new SQLException JavaDoc(Translate.get(
288             "loadbalancer.create.table.rule.failed", e.getMessage()));
289       }
290
291       // Build the thread list from the backend list
292
if (chosen != null)
293       {
294         for (int i = 0; i < nbOfThreads; i++)
295         {
296           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
297               .get(i);
298           if (chosen.contains(thread.getBackend()))
299             writeList.add(thread);
300         }
301       }
302     }
303     else
304     { // Build the list of backends that need to execute this request
305
for (int i = 0; i < nbOfThreads; i++)
306       {
307         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
308             .get(i);
309         if (thread.getBackend().hasTable(tableName))
310           writeList.add(thread);
311       }
312     }
313
314     nbOfThreads = writeList.size();
315     if (nbOfThreads == 0)
316     {
317       if (canTakeReadLock)
318         lock.releaseRead();
319       else
320         lock.releaseWrite();
321
322       String JavaDoc msg = Translate.get("loadbalancer.execute.no.backend.found",
323           request.getSQLShortForm(vdb.getSQLShortFormLength()));
324       logger.warn(msg);
325
326       // Unblock next query from total order queue
327
removeHeadFromAndNotifyTotalOrderQueue();
328       throw new SQLException JavaDoc(msg);
329     }
330     else
331     {
332       if (logger.isDebugEnabled())
333         logger.debug(Translate.get("loadbalancer.execute.on.several",
334             new String JavaDoc[]{String.valueOf(request.getId()),
335                 String.valueOf(nbOfThreads)}));
336     }
337
338     // Create the task
339
AbstractTask task;
340     if (useKeys)
341       task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
342           nbOfThreads, request, metadataCache);
343     else
344       task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
345           request);
346
347     // We have to first post the request on each backend before letting the
348
// first backend to execute the request. Therefore we have 2 phases:
349
// 1. post the task in each thread queue
350
// 2. notify each thread to execute the query
351

352     // 1. Post the task
353
if (request.isAutoCommit())
354     {
355       for (int i = 0; i < nbOfThreads; i++)
356       {
357         BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
358         synchronized (thread)
359         {
360           thread.addTask(task);
361         }
362       }
363     }
364     else
365     {
366       for (int i = 0; i < nbOfThreads; i++)
367       {
368         BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
369         synchronized (thread)
370         {
371           thread.addTask(task, request.getTransactionId());
372         }
373       }
374     }
375
376     // 2. Start the task execution on each backend
377
for (int i = 0; i < nbOfThreads; i++)
378     {
379       BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
380       synchronized (thread)
381       {
382         thread.notify();
383       }
384     }
385
386     // Release lock
387
if (canTakeReadLock)
388       lock.releaseRead();
389     else
390       lock.releaseWrite();
391
392     // Unblock next query from total order queue
393
removeHeadFromAndNotifyTotalOrderQueue();
394
395     synchronized (task)
396     {
397       if (!task.hasCompleted())
398       {
399         // Wait for completion (notified by the task)
400
try
401         {
402           // Wait on task
403
long timeout = request.getTimeout() * 1000;
404           if (timeout > 0)
405           {
406             long start = System.currentTimeMillis();
407             task.wait(timeout);
408             long end = System.currentTimeMillis();
409             long remaining = timeout - (end - start);
410             if (remaining <= 0)
411             {
412               if (task.setExpiredTimeout())
413               { // Task will be ignored by all backends
414
String JavaDoc msg = Translate.get("loadbalancer.request.timeout",
415                     new String JavaDoc[]{String.valueOf(request.getId()),
416                         String.valueOf(task.getSuccess()),
417                         String.valueOf(task.getFailed())});
418                 logger.warn(msg);
419                 throw new SQLException JavaDoc(msg);
420               }
421               // else task execution already started, to late to cancel
422
}
423             // No need to update request timeout since the execution is finished
424
}
425           else
426             task.wait();
427         }
428         catch (InterruptedException JavaDoc e)
429         {
430           if (task.setExpiredTimeout())
431           { // Task will be ignored by all backends
432
String JavaDoc msg = Translate.get("loadbalancer.request.timeout",
433                 new String JavaDoc[]{String.valueOf(request.getId()),
434                     String.valueOf(task.getSuccess()),
435                     String.valueOf(task.getFailed())});
436             logger.warn(msg);
437             throw new SQLException JavaDoc(msg);
438           }
439           // else task execution already started, to late to cancel
440
}
441       }
442
443       if (task.getSuccess() > 0)
444         return task;
445       else
446       { // All tasks failed
447
ArrayList JavaDoc exceptions = task.getExceptions();
448         if (exceptions == null)
449           throw new AllBackendsFailedException(Translate.get(
450               "loadbalancer.request.failed.all", request.getId()));
451         else
452         {
453           String JavaDoc errorMsg = Translate.get("loadbalancer.request.failed.stack",
454               request.getId())
455               + "\n";
456           for (int i = 0; i < exceptions.size(); i++)
457             errorMsg += ((SQLException JavaDoc) exceptions.get(i)).getMessage() + "\n";
458           logger.error(errorMsg);
459           throw new SQLException JavaDoc(errorMsg);
460         }
461       }
462     }
463   }
464
465   /**
466    * Implementation specific load balanced read execution.
467    *
468    * @param request an <code>SelectRequest</code>
469    * @param metadataCache the metadataCache if any or null
470    * @return the corresponding <code>java.sql.ResultSet</code>
471    * @exception SQLException if an error occurs
472    */

473   public abstract ControllerResultSet execReadRequest(SelectRequest request,
474       MetadataCache metadataCache) throws SQLException JavaDoc;
475
476   /**
477    * Execute a read request on the selected backend.
478    *
479    * @param request the request to execute
480    * @param backend the backend that will execute the request
481    * @param metadataCache a metadataCache if any or null
482    * @return the ResultSet
483    * @throws SQLException if an error occurs
484    */

485   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
486       DatabaseBackend backend, MetadataCache metadataCache)
487       throws SQLException JavaDoc, UnreachableBackendException
488   {
489     // Handle macros
490
handleMacros(request);
491
492     // Ok, we have a backend, let's execute the request
493
AbstractConnectionManager cm = backend.getConnectionManager(request
494         .getLogin());
495
496     // Sanity check
497
if (cm == null)
498     {
499       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
500           new String JavaDoc[]{request.getLogin(), backend.getName()});
501       logger.error(msg);
502       throw new SQLException JavaDoc(msg);
503     }
504
505     // Execute the query
506
if (request.isAutoCommit())
507     {
508       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
509         // We could do something finer grain here by waiting
510
// only for writes that depend on the tables we need
511
// but is that really worth the overhead ?
512
waitForAllWritesToComplete(backend);
513
514       ControllerResultSet rs = null;
515       boolean badConnection;
516       do
517       {
518         badConnection = false;
519         // Use a connection just for this request
520
Connection JavaDoc c = null;
521         try
522         {
523           c = cm.getConnection();
524         }
525         catch (UnreachableBackendException e1)
526         {
527           logger.error(Translate.get(
528               "loadbalancer.backend.disabling.unreachable", backend.getName()));
529           disableBackend(backend);
530           throw new UnreachableBackendException(Translate.get(
531               "loadbalancer.backend.unreacheable", backend.getName()));
532         }
533
534         // Sanity check
535
if (c == null)
536           throw new UnreachableBackendException(
537               "No more connections on backend " + backend.getName());
538
539         // Execute Query
540
try
541         {
542           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
543           cm.releaseConnection(c);
544         }
545         catch (SQLException JavaDoc e)
546         {
547           cm.releaseConnection(c);
548           throw new SQLException JavaDoc(Translate.get(
549               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
550                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
551                   backend.getName(), e.getMessage()}));
552         }
553         catch (BadConnectionException e)
554         { // Get rid of the bad connection
555
cm.deleteConnection(c);
556           badConnection = true;
557         }
558       }
559       while (badConnection);
560       if (logger.isDebugEnabled())
561         logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
562             String.valueOf(request.getId()), backend.getName()}));
563       return rs;
564     }
565     else
566     { // Inside a transaction
567
Connection JavaDoc c;
568       long tid = request.getTransactionId();
569       Long JavaDoc lTid = new Long JavaDoc(tid);
570
571       // Wait for previous writes to complete
572
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
573         waitForAllWritesToComplete(backend, request.getTransactionId());
574
575       try
576       {
577         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
578             request.getTransactionIsolation());
579       }
580       catch (UnreachableBackendException e1)
581       {
582         logger.error(Translate.get(
583             "loadbalancer.backend.disabling.unreachable", backend.getName()));
584         disableBackend(backend);
585         throw new SQLException JavaDoc(Translate.get(
586             "loadbalancer.backend.unreacheable", backend.getName()));
587       }
588       catch (NoTransactionStartWhenDisablingException e)
589       {
590         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
591             new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
592                 backend.getName()});
593         logger.error(msg);
594         throw new SQLException JavaDoc(msg);
595       }
596
597       // Sanity check
598
if (c == null)
599         throw new SQLException JavaDoc(Translate.get(
600             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
601                 String.valueOf(tid), backend.getName()}));
602
603       // Execute Query
604
ControllerResultSet rs = null;
605       try
606       {
607         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
608       }
609       catch (SQLException JavaDoc e)
610       {
611         throw new SQLException JavaDoc(Translate.get(
612             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
613                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
614                 backend.getName(), e.getMessage()}));
615       }
616       catch (BadConnectionException e)
617       { // Connection failed, so did the transaction
618
// Disable the backend.
619
cm.deleteConnection(tid);
620         String JavaDoc msg = Translate.get(
621             "loadbalancer.backend.disabling.connection.failure", backend
622                 .getName());
623         logger.error(msg);
624         disableBackend(backend);
625         throw new SQLException JavaDoc(msg);
626       }
627       if (logger.isDebugEnabled())
628         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
629             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
630                 backend.getName()}));
631       return rs;
632     }
633   }
634
635   /**
636    * Execute a stored procedure on the selected backend.
637    *
638    * @param proc the stored procedure to execute
639    * @param backend the backend that will execute the request
640    * @param metadataCache the metadataCache if any or null
641    * @return the ResultSet
642    * @throws SQLException if an error occurs
643    */

644   protected ControllerResultSet executeStoredProcedureOnBackend(
645       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
646       throws SQLException JavaDoc, UnreachableBackendException
647   {
648     // Ok, we have a backend, let's execute the request
649
AbstractConnectionManager cm = backend
650         .getConnectionManager(proc.getLogin());
651
652     // Sanity check
653
if (cm == null)
654     {
655       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
656           new String JavaDoc[]{proc.getLogin(), backend.getName()});
657       logger.error(msg);
658       throw new SQLException JavaDoc(msg);
659     }
660
661     // Execute the query
662
if (proc.isAutoCommit())
663     {
664       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
665         // We could do something finer grain here by waiting
666
// only for writes that depend on the tables we need
667
// but is that really worth the overhead ?
668
waitForAllWritesToComplete(backend);
669
670       // Use a connection just for this request
671
Connection JavaDoc c = null;
672       try
673       {
674         c = cm.getConnection();
675       }
676       catch (UnreachableBackendException e1)
677       {
678         logger.error(Translate.get(
679             "loadbalancer.backend.disabling.unreachable", backend.getName()));
680         disableBackend(backend);
681         throw new UnreachableBackendException(Translate.get(
682             "loadbalancer.backend.unreacheable", backend.getName()));
683       }
684
685       // Sanity check
686
if (c == null)
687         throw new SQLException JavaDoc(Translate.get(
688             "loadbalancer.backend.no.connection", backend.getName()));
689
690       // Execute Query
691
ControllerResultSet rs = null;
692       try
693       {
694         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
695             backend, c, metadataCache);
696       }
697       catch (Exception JavaDoc e)
698       {
699         throw new SQLException JavaDoc(Translate.get(
700             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
701                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
702                 backend.getName(), e.getMessage()}));
703       }
704       finally
705       {
706         cm.releaseConnection(c);
707       }
708       if (logger.isDebugEnabled())
709         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
710             new String JavaDoc[]{String.valueOf(proc.getId()), backend.getName()}));
711       return rs;
712     }
713     else
714     { // Inside a transaction
715
Connection JavaDoc c;
716       long tid = proc.getTransactionId();
717       Long JavaDoc lTid = new Long JavaDoc(tid);
718
719       // Wait for previous writes to complete
720
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
721         waitForAllWritesToComplete(backend, proc.getTransactionId());
722
723       try
724       {
725         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
726             proc.getTransactionIsolation());
727       }
728       catch (UnreachableBackendException e1)
729       {
730         logger.error(Translate.get(
731             "loadbalancer.backend.disabling.unreachable", backend.getName()));
732         disableBackend(backend);
733         throw new SQLException JavaDoc(Translate.get(
734             "loadbalancer.backend.unreacheable", backend.getName()));
735       }
736       catch (NoTransactionStartWhenDisablingException e)
737       {
738         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
739             new String JavaDoc[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
740                 backend.getName()});
741         logger.error(msg);
742         throw new SQLException JavaDoc(msg);
743       }
744
745       // Sanity check
746
if (c == null)
747         throw new SQLException JavaDoc(Translate.get(
748             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
749                 String.valueOf(tid), backend.getName()}));
750
751       // Execute Query
752
ControllerResultSet rs;
753       try
754       {
755         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
756             backend, c, metadataCache);
757       }
758       catch (Exception JavaDoc e)
759       {
760         throw new SQLException JavaDoc(Translate.get(
761             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
762                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
763                 backend.getName(), e.getMessage()}));
764       }
765       if (logger.isDebugEnabled())
766         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
767             new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
768                 backend.getName()}));
769       return rs;
770     }
771   }
772
773   /**
774    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
775    * MetadataCache)
776    */

777   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
778       MetadataCache metadataCache) throws SQLException JavaDoc
779   {
780     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
781         proc, true, metadataCache);
782     return task.getResult();
783   }
784
785   /**
786    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(StoredProcedure)
787    */

788   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException JavaDoc
789   {
790     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
791         proc, false, null);
792     return task.getResult();
793   }
794
795   /**
796    * Post the stored procedure call in the threads task list.
797    * <p>
798    * Note that macros are processed here.
799    *
800    * @param proc the stored procedure to call
801    * @param isRead true if the call returns a ResultSet
802    * @param metadataCache the metadataCache if any or null
803    * @return the task that has been executed (caller can get the result by
804    * calling getResult())
805    * @throws SQLException if an error occurs
806    */

807   private AbstractTask callStoredProcedure(StoredProcedure proc,
808       boolean isRead, MetadataCache metadataCache) throws SQLException JavaDoc
809   {
810     ArrayList JavaDoc backendThreads = backendBlockingThreads;
811     ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
812
813     // Total ordering mainly for distributed virtual databases.
814
// If waitForTotalOrder returns true then the query has been scheduled in
815
// total order and there is no need to take a write lock later to resolve
816
// potential conflicts.
817
boolean canTakeReadLock = waitForTotalOrder(proc, true);
818
819     // Handle macros
820
handleMacros(proc);
821
822     try
823     {
824       // Note that a read stored procedure here is supposed to also execute
825
// writes and as the scheduler cannot block atomically on multiple tables
826
// for a writes, we have to lock as a write even for a read stored
827
// procedure. A read-only stored procedure will execute in a separate
828
// method call (see executeReadOnlyStoredProcedure).
829
if (canTakeReadLock)
830         lock.acquireRead();
831       else
832         lock.acquireWrite();
833     }
834     catch (InterruptedException JavaDoc e)
835     {
836       String JavaDoc msg;
837       msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed",
838           e);
839       logger.error(msg);
840       throw new SQLException JavaDoc(msg);
841     }
842
843     int nbOfThreads = backendThreads.size();
844
845     // Create the task
846
AbstractTask task;
847     if (isRead)
848       task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
849           proc, metadataCache);
850     else
851       task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
852           nbOfThreads, proc);
853
854     int nbOfBackends = 0;
855
856     // Post the task in each backendThread tasklist and wakeup the threads
857
for (int i = 0; i < nbOfThreads; i++)
858     {
859       BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
860       if (thread.getBackend().hasStoredProcedure(proc.getProcedureName()))
861       {
862         nbOfBackends++;
863         synchronized (thread)
864         {
865           if (proc.isAutoCommit())
866             thread.addTask(task);
867           else
868             thread.addTask(task, proc.getTransactionId());
869           thread.notify();
870         }
871       }
872     }
873
874     if (canTakeReadLock)
875       lock.releaseRead();
876     else
877       lock.releaseWrite();
878
879     // Unblock next query from total order queue
880
removeHeadFromAndNotifyTotalOrderQueue();
881
882     if (nbOfBackends == 0)
883     {
884       throw new SQLException JavaDoc(Translate.get(
885           "loadbalancer.backend.no.required.storedprocedure", proc
886               .getProcedureName()));
887     }
888     else
889       task.setTotalNb(nbOfBackends);
890
891     synchronized (task)
892     {
893       if (!task.hasCompleted())
894       {
895         // Wait for completion (notified by the task)
896
try
897         {
898           // Wait on task
899
long timeout = proc.getTimeout() * 1000;
900           if (timeout > 0)
901           {
902             long start = System.currentTimeMillis();
903             task.wait(timeout);
904             long end = System.currentTimeMillis();
905             long remaining = timeout - (end - start);
906             if (remaining <= 0)
907             {
908               if (task.setExpiredTimeout())
909               { // Task will be ignored by all backends
910
String JavaDoc msg = Translate.get(
911                     "loadbalancer.storedprocedure.timeout", new String JavaDoc[]{
912                         String.valueOf(proc.getId()),
913                         String.valueOf(task.getSuccess()),
914                         String.valueOf(task.getFailed())});
915                 logger.warn(msg);
916                 throw new SQLException JavaDoc(msg);
917               }
918               // else task execution already started, to late to cancel
919
}
920             // No need to update request timeout since the execution is finished
921
}
922           else
923             task.wait();
924         }
925         catch (InterruptedException JavaDoc e)
926         {
927           if (task.setExpiredTimeout())
928           { // Task will be ignored by all backends
929
String JavaDoc msg = Translate.get("loadbalancer.storedprocedure.timeout",
930                 new String JavaDoc[]{String.valueOf(proc.getId()),
931                     String.valueOf(task.getSuccess()),
932                     String.valueOf(task.getFailed())});
933             logger.warn(msg);
934             throw new SQLException JavaDoc(msg);
935           }
936           // else task execution already started, to late to cancel
937
}
938       }
939
940       if (task.getSuccess() > 0)
941         return task;
942       else
943       { // All tasks failed
944
ArrayList JavaDoc exceptions = task.getExceptions();
945         if (exceptions == null)
946           throw new SQLException JavaDoc(Translate.get(
947               "loadbalancer.storedprocedure.all.failed", proc.getId()));
948         else
949         {
950           String JavaDoc errorMsg = Translate.get(
951               "loadbalancer.storedprocedure.failed.stack", proc.getId())
952               + "\n";
953           for (int i = 0; i < exceptions.size(); i++)
954             errorMsg += ((SQLException JavaDoc) exceptions.get(i)).getMessage() + "\n";
955           logger.error(errorMsg);
956           throw new SQLException JavaDoc(errorMsg);
957         }
958       }
959     }
960   }
961
962   /*
963    * Transaction management
964    */

965
966   /**
967    * Begins a new transaction.
968    *
969    * @param tm the transaction marker metadata
970    * @exception SQLException if an error occurs
971    */

972   public final void begin(TransactionMarkerMetaData tm) throws SQLException JavaDoc
973   {
974   }
975
976   /**
977    * Commits a transaction.
978    *
979    * @param tm the transaction marker metadata
980    * @exception SQLException if an error occurs
981    */

982   public void commit(TransactionMarkerMetaData tm) throws SQLException JavaDoc
983   {
984     long tid = tm.getTransactionId();
985     Long JavaDoc lTid = new Long JavaDoc(tid);
986     // List of backends that still have pending queries for the transaction to
987
// commit
988
ArrayList JavaDoc asynchronousBackends = null;
989     CommitTask task = null;
990
991     // Ordering for distributed virtual database
992
Commit totalOrderCommit = null;
993     boolean canTakeReadLock = false;
994     if (vdb.getTotalOrderQueue() != null)
995     {
996       totalOrderCommit = new Commit(tm.getLogin(), tid);
997       // Total ordering mainly for distributed virtual databases.
998
// If waitForTotalOrder returns true then the query has been scheduled in
999
// total order and there is no need to take a write lock later to resolve
1000
// potential conflicts.
1001
canTakeReadLock = waitForTotalOrder(totalOrderCommit, false);
1002      if (!canTakeReadLock)
1003        // This is a local commit no total order info
1004
totalOrderCommit = null;
1005    }
1006
1007    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1008    { // Insert commit after last write
1009
try
1010      {
1011        if (canTakeReadLock)
1012          backendBlockingThreadsRWLock.acquireRead();
1013        else
1014        {
1015          // Lock in write to ensure that all writes are posted and we wait in
1016
// the queue, else a read lock has the priority with the
1017
// implementation we are using.
1018
backendBlockingThreadsRWLock.acquireWrite();
1019        }
1020      }
1021      catch (InterruptedException JavaDoc e)
1022      {
1023        String JavaDoc msg = Translate.get(
1024            "loadbalancer.backendlist.acquire.writelock.failed", e);
1025        logger.error(msg);
1026        throw new SQLException JavaDoc(msg);
1027      }
1028
1029      int nbOfThreads = backendBlockingThreads.size();
1030      // Create the task
1031
task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1032          .getTimeout(), tm.getLogin(), tid);
1033
1034      for (int i = 0; i < nbOfThreads; i++)
1035      {
1036        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1037            .get(i);
1038        if (thread.hasTaskForTransaction(lTid))
1039        {
1040          if (asynchronousBackends == null)
1041            asynchronousBackends = new ArrayList JavaDoc();
1042          asynchronousBackends.add(thread.getBackend());
1043          synchronized (thread)
1044          {
1045            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1046            thread.notify();
1047          }
1048        }
1049      }
1050
1051      if (canTakeReadLock)
1052        backendBlockingThreadsRWLock.releaseRead();
1053      else
1054        backendBlockingThreadsRWLock.releaseWrite();
1055
1056      // Unset the task
1057
if (asynchronousBackends == null)
1058        task = null;
1059    }
1060
1061    try
1062    {
1063      if (canTakeReadLock)
1064        backendNonBlockingThreadsRWLock.acquireRead();
1065      else
1066      {
1067        // Lock in write to ensure that all writes are posted and we wait in
1068
// the queue, else a read lock has the priority with the
1069
// implementation we are using.
1070
backendNonBlockingThreadsRWLock.acquireWrite();
1071      }
1072    }
1073    catch (InterruptedException JavaDoc e)
1074    {
1075      String JavaDoc msg = Translate.get(
1076          "loadbalancer.backendlist.acquire.writelock.failed", e);
1077      logger.error(msg);
1078      throw new SQLException JavaDoc(msg);
1079    }
1080
1081    int nbOfThreads = backendNonBlockingThreads.size();
1082    ArrayList JavaDoc commitList = new ArrayList JavaDoc();
1083
1084    // Build the list of backends that need to commit this transaction
1085
for (int i = 0; i < nbOfThreads; i++)
1086    {
1087      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1088          .get(i);
1089      DatabaseBackend backend = thread.getBackend();
1090      // If the transaction has been started on this backend and it was not
1091
// previously treated in the asynchronous backend list (late nodes), then
1092
// we have to post the task now in the asynchronous list.
1093
if (backend.isStartedTransaction(lTid)
1094          && ((asynchronousBackends == null) || (!asynchronousBackends
1095              .contains(backend))))
1096        commitList.add(thread);
1097    }
1098
1099    // If no backend was late and the commit task has not been posted to any
1100
// backend yet, then we have to create a task for the backends that really
1101
// need to commit the transaction (in the blocking queue).
1102
// Backends for which we have to post in the non blocking queue
1103
int nbOfThreadsToCommit = commitList.size();
1104    if ((task == null) && (nbOfThreadsToCommit != 0))
1105      task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1106          nbOfThreadsToCommit, tm.getTimeout(), tm.getLogin(), tid);
1107
1108    // Post the task in each backendThread tasklist and wakeup the threads. This
1109
// could either be the remaining threads that were not in the asynchronous
1110
// queue or all the backends that started the transaction.
1111
for (int i = 0; i < nbOfThreadsToCommit; i++)
1112    {
1113      BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
1114      synchronized (thread)
1115      {
1116        thread.addTask(task, tid);
1117        thread.notify();
1118      }
1119    }
1120
1121    if (canTakeReadLock)
1122      backendNonBlockingThreadsRWLock.releaseRead();
1123    else
1124      backendNonBlockingThreadsRWLock.releaseWrite();
1125
1126    // Unblock next query from total order queue
1127
if (totalOrderCommit != null)
1128      removeHeadFromAndNotifyTotalOrderQueue();
1129
1130    // Check if someone had something to commit
1131
if (task == null)
1132      return;
1133
1134    synchronized (task)
1135    {
1136      if (!task.hasCompleted())
1137      {
1138        // Wait for completion (notified by the task)
1139
try
1140        {
1141          // Wait on task
1142
long timeout = tm.getTimeout();
1143          if (timeout > 0)
1144          {
1145            long start = System.currentTimeMillis();
1146            task.wait(timeout);
1147            long end = System.currentTimeMillis();
1148            long remaining = timeout - (end - start);
1149            if (remaining <= 0)
1150            {
1151              if (task.setExpiredTimeout())
1152              { // Task will be ignored by all backends
1153
String JavaDoc msg = Translate.get("loadbalancer.commit.timeout",
1154                    new String JavaDoc[]{String.valueOf(tid),
1155                        String.valueOf(task.getSuccess()),
1156                        String.valueOf(task.getFailed())});
1157                logger.warn(msg);
1158                throw new SQLException JavaDoc(msg);
1159              }
1160              // else task execution already started, too late to cancel
1161
}
1162          }
1163          else
1164            task.wait();
1165        }
1166        catch (InterruptedException JavaDoc e)
1167        {
1168          if (task.setExpiredTimeout())
1169          { // Task will be ignored by all backends
1170
String JavaDoc msg = Translate.get("loadbalancer.commit.timeout",
1171                new String JavaDoc[]{String.valueOf(tid),
1172                    String.valueOf(task.getSuccess()),
1173                    String.valueOf(task.getFailed())});
1174            logger.warn(msg);
1175            throw new SQLException JavaDoc(msg);
1176          }
1177          // else task execution already started, too late to cancel
1178
}
1179      }
1180
1181      if (task.getSuccess() > 0)
1182        return;
1183      else
1184      { // All tasks failed
1185
ArrayList JavaDoc exceptions = task.getExceptions();
1186        if (exceptions == null)
1187          throw new SQLException JavaDoc(Translate.get(
1188              "loadbalancer.commit.all.failed", tid));
1189        else
1190        {
1191          String JavaDoc errorMsg = Translate.get("loadbalancer.commit.failed.stack",
1192              tid)
1193              + "\n";
1194          for (int i = 0; i < exceptions.size(); i++)
1195            errorMsg += ((SQLException JavaDoc) exceptions.get(i)).getMessage() + "\n";
1196          logger.error(errorMsg);
1197          throw new SQLException JavaDoc(errorMsg);
1198        }
1199      }
1200    }
1201  }
1202
1203  /**
1204   * Rollbacks a transaction.
1205   *
1206   * @param tm the transaction marker metadata
1207   * @exception SQLException if an error occurs
1208   */

1209  public void rollback(TransactionMarkerMetaData tm) throws SQLException JavaDoc
1210  {
1211    long tid = tm.getTransactionId();
1212    Long JavaDoc lTid = new Long JavaDoc(tid);
1213    // List of backends that still have pending queries for the transaction to
1214
// rollback
1215
ArrayList JavaDoc asynchronousBackends = null;
1216    RollbackTask task = null;
1217
1218    // Ordering for distributed virtual database
1219
Rollback totalOrderRollback = null;
1220    boolean canTakeReadLock = false;
1221    if (vdb.getTotalOrderQueue() != null)
1222    {
1223      totalOrderRollback = new Rollback(tm.getLogin(), tid);
1224      // Total ordering mainly for distributed virtual databases.
1225
// If waitForTotalOrder returns true then the query has been scheduled in
1226
// total order and there is no need to take a write lock later to resolve
1227
// potential conflicts.
1228
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1229      if (!canTakeReadLock)
1230        // This is a local rollback no total order info
1231
totalOrderRollback = null;
1232    }
1233
1234    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1235    {
1236      try
1237      {
1238        if (canTakeReadLock)
1239          backendBlockingThreadsRWLock.acquireRead();
1240        else
1241        {
1242          // Lock in write to ensure that all writes are posted and we wait in
1243
// the queue, else a read lock has the priority with the
1244
// implementation we are using.
1245
backendBlockingThreadsRWLock.acquireWrite();
1246        }
1247      }
1248      catch (InterruptedException JavaDoc e)
1249      {
1250        String JavaDoc msg = Translate.get(
1251            "loadbalancer.backendlist.acquire.writelock.failed", e);
1252        logger.error(msg);
1253        throw new SQLException JavaDoc(msg);
1254      }
1255
1256      int nbOfThreads = backendBlockingThreads.size();
1257      // Create the task
1258
task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1259          .getTimeout(), tm.getLogin(), tid);
1260
1261      for (int i = 0; i < nbOfThreads; i++)
1262      {
1263        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1264            .get(i);
1265        if (thread.hasTaskForTransaction(lTid))
1266        {
1267          if (asynchronousBackends == null)
1268            asynchronousBackends = new ArrayList JavaDoc();
1269          asynchronousBackends.add(thread.getBackend());
1270          synchronized (thread)
1271          {
1272            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1273            thread.notify();
1274          }
1275        }
1276      }
1277
1278      if (canTakeReadLock)
1279        backendBlockingThreadsRWLock.releaseRead();
1280      else
1281        backendBlockingThreadsRWLock.releaseWrite();
1282
1283      // Unset the task
1284
if (asynchronousBackends == null)
1285        task = null;
1286    }
1287
1288    try
1289    {
1290      if (canTakeReadLock)
1291        backendNonBlockingThreadsRWLock.acquireRead();
1292      else
1293        backendNonBlockingThreadsRWLock.acquireWrite();
1294    }
1295    catch (InterruptedException JavaDoc e)
1296    {
1297      String JavaDoc msg = Translate.get(
1298          "loadbalancer.backendlist.acquire.writelock.failed", e);
1299      logger.error(msg);
1300      throw new SQLException JavaDoc(msg);
1301    }
1302
1303    int nbOfThreads = backendNonBlockingThreads.size();
1304    ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1305
1306    // Build the list of backend that need to rollback this transaction
1307
for (int i = 0; i < nbOfThreads; i++)
1308    {
1309      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1310          .get(i);
1311      DatabaseBackend backend = thread.getBackend();
1312      // If the transaction has been started on this backend and it was not
1313
// previously treated in the asynchronous backend list (late nodes), then
1314
// we have to post the task now in the asynchronous list.
1315
if (backend.isStartedTransaction(lTid)
1316          && ((asynchronousBackends == null) || (!asynchronousBackends
1317              .contains(backend))))
1318        rollbackList.add(thread);
1319    }
1320
1321    int nbOfThreadsToRollback = rollbackList.size();
1322    // If no backend was late and the rollback task has not been posted to any
1323
// backend yet, then we have to create a task for the backends that really
1324
// need to rollback the transaction.
1325
if ((task == null) && (nbOfThreadsToRollback != 0))
1326      task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1327          nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid);
1328
1329    // Post the task in each backendThread tasklist and wakeup the threads. This
1330
// could either be the remaining threads that were not in the asynchronous
1331
// queue or all the backends that started the transaction.
1332
for (int i = 0; i < nbOfThreadsToRollback; i++)
1333    {
1334      BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
1335      synchronized (thread)
1336      {
1337        thread.addTask(task, tid);
1338        thread.notify();
1339      }
1340    }
1341
1342    // Release lock
1343
if (canTakeReadLock)
1344      backendNonBlockingThreadsRWLock.releaseRead();
1345    else
1346      backendNonBlockingThreadsRWLock.releaseWrite();
1347
1348    // Unblock next query from total order queue
1349
if (totalOrderRollback != null)
1350      removeHeadFromAndNotifyTotalOrderQueue();
1351
1352    // Check if someone had something to rollback
1353
if (task == null)
1354      return;
1355
1356    synchronized (task)
1357    {
1358      if (!task.hasCompleted())
1359      {
1360        // Wait for completion (notified by the task)
1361
try
1362        {
1363          // Wait on task
1364
long timeout = tm.getTimeout();
1365          if (timeout > 0)
1366          {
1367            long start = System.currentTimeMillis();
1368            task.wait(timeout);
1369            long end = System.currentTimeMillis();
1370            long remaining = timeout - (end - start);
1371            if (remaining <= 0)
1372            {
1373              if (task.setExpiredTimeout())
1374              { // Task will be ignored by all backends
1375
String JavaDoc msg = Translate.get("loadbalancer.rollback.timeout",
1376                    new String JavaDoc[]{String.valueOf(tid),
1377                        String.valueOf(task.getSuccess()),
1378                        String.valueOf(task.getFailed())});
1379                logger.warn(msg);
1380                throw new SQLException JavaDoc(msg);
1381              }
1382              // else task execution already started, to late to cancel
1383
}
1384          }
1385          else
1386            task.wait();
1387        }
1388        catch (InterruptedException JavaDoc e)
1389        {
1390          if (task.setExpiredTimeout())
1391          { // Task will be ignored by all backends
1392
String JavaDoc msg = Translate.get("loadbalancer.rollback.timeout",
1393                new String JavaDoc[]{String.valueOf(tid),
1394                    String.valueOf(task.getSuccess()),
1395                    String.valueOf(task.getFailed())});
1396            logger.warn(msg);
1397            throw new SQLException JavaDoc(msg);
1398          }
1399          // else task execution already started, to late to cancel
1400
}
1401      }
1402
1403      if (task.getSuccess() > 0)
1404        return;
1405      else
1406      { // All tasks failed
1407
ArrayList JavaDoc exceptions = task.getExceptions();
1408        if (exceptions == null)
1409          throw new SQLException JavaDoc(Translate.get(
1410              "loadbalancer.rollback.all.failed", tid));
1411        else
1412        {
1413          String JavaDoc errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
1414              tid)
1415              + "\n";
1416          for (int i = 0; i < exceptions.size(); i++)
1417            errorMsg += ((SQLException JavaDoc) exceptions.get(i)).getMessage() + "\n";
1418          logger.error(errorMsg);
1419          throw new SQLException JavaDoc(errorMsg);
1420        }
1421      }
1422    }
1423  }
1424
1425  /**
1426   * Rollback a transaction to a savepoint
1427   *
1428   * @param tm The transaction marker metadata
1429   * @param savepointName The name of the savepoint
1430   * @throws AllBackendsFailedException if all backends failed to perform the
1431   * rollback
1432   * @throws SQLException if an error occurs
1433   */

1434  public void rollback(TransactionMarkerMetaData tm, String JavaDoc savepointName)
1435      throws AllBackendsFailedException, SQLException JavaDoc
1436  {
1437    long tid = tm.getTransactionId();
1438    Long JavaDoc lTid = new Long JavaDoc(tid);
1439    // List of backends that still have pending queries for the transaction for
1440
// which to rollback to a savepoint
1441
ArrayList JavaDoc asynchronousBackends = null;
1442    RollbackToSavepointTask task = null;
1443
1444    // Ordering for distributed virtual database
1445
RollbackToSavepoint totalOrderRollback = null;
1446    boolean canTakeReadLock = false;
1447    if (vdb.getTotalOrderQueue() != null)
1448    {
1449      totalOrderRollback = new RollbackToSavepoint(tid, savepointName);
1450      // Total ordering mainly for distributed virtual databases.
1451
// If waitForTotalOrder returns true then the query has been scheduled in
1452
// total order and there is no need to take a write lock later to resolve
1453
// potential conflicts.
1454
canTakeReadLock = waitForTotalOrder(totalOrderRollback, false);
1455      if (!canTakeReadLock)
1456        // This is a local commit no total order info
1457
totalOrderRollback = null;
1458    }
1459
1460    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1461    {
1462      try
1463      {
1464        if (canTakeReadLock)
1465          backendBlockingThreadsRWLock.acquireRead();
1466        else
1467        {
1468          // Lock in write to ensure that all writes are posted and we wait in
1469
// the queue, else a read lock has the priority with the
1470
// implementation we are using.
1471
backendBlockingThreadsRWLock.acquireWrite();
1472        }
1473      }
1474      catch (InterruptedException JavaDoc e)
1475      {
1476        String JavaDoc msg = Translate.get(
1477            "loadbalancer.backendlist.acquire.writelock.failed", e);
1478        logger.error(msg);
1479        throw new SQLException JavaDoc(msg);
1480      }
1481
1482      int nbOfThreads = backendBlockingThreads.size();
1483      // Create the task
1484
task = new RollbackToSavepointTask(getNbToWait(nbOfThreads), nbOfThreads,
1485          tm.getTimeout(), tm.getLogin(), tid, savepointName);
1486
1487      for (int i = 0; i < nbOfThreads; i++)
1488      {
1489        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1490            .get(i);
1491        if (thread.hasTaskForTransaction(lTid))
1492        {
1493          if (asynchronousBackends == null)
1494            asynchronousBackends = new ArrayList JavaDoc();
1495          asynchronousBackends.add(thread.getBackend());
1496          synchronized (thread)
1497          {
1498            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1499            thread.notify();
1500          }
1501        }
1502      }
1503
1504      if (canTakeReadLock)
1505        backendBlockingThreadsRWLock.releaseRead();
1506      else
1507        backendBlockingThreadsRWLock.releaseWrite();
1508
1509      // Unset the task
1510
if (asynchronousBackends == null)
1511        task = null;
1512    }
1513
1514    try
1515    {
1516      if (canTakeReadLock)
1517        backendNonBlockingThreadsRWLock.acquireRead();
1518      else
1519        backendNonBlockingThreadsRWLock.acquireWrite();
1520    }
1521    catch (InterruptedException JavaDoc e)
1522    {
1523      String JavaDoc msg = Translate.get(
1524          "loadbalancer.backendlist.acquire.writelock.failed", e);
1525      logger.error(msg);
1526      throw new SQLException JavaDoc(msg);
1527    }
1528
1529    int nbOfThreads = backendNonBlockingThreads.size();
1530    ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1531
1532    // Build the list of backend that need to rollback to savepoint for this
1533
// transaction
1534
for (int i = 0; i < nbOfThreads; i++)
1535    {
1536      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1537          .get(i);
1538      DatabaseBackend backend = thread.getBackend();
1539      // If the transaction has been started on this backend and it was not
1540
// previously treated in the asynchronous backend list (late nodes), then
1541
// we have to post the task now in the asynchronous list.
1542
if (backend.isStartedTransaction(lTid)
1543          && ((asynchronousBackends == null) || (!asynchronousBackends
1544              .contains(backend))))
1545        rollbackList.add(thread);
1546    }
1547
1548    int nbOfThreadsToRollback = rollbackList.size();
1549    // If no backend was late and the rollback task has not been posted to any
1550
// backend yet, then we have to create a task for the backends that really
1551
// need to rollback the transaction.
1552
if ((task == null) && (nbOfThreadsToRollback != 0))
1553      task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback),
1554          nbOfThreadsToRollback, tm.getTimeout(), tm.getLogin(), tid,
1555          savepointName);
1556
1557    // Post the task in each backendThread tasklist and wakeup the threads. This
1558
// could either be the remaining threads that were not in the asynchronous
1559
// queue or all the backends that started the transaction.
1560
for (int i = 0; i < nbOfThreadsToRollback; i++)
1561    {
1562      BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
1563      synchronized (thread)
1564      {
1565        thread.addTask(task, tid);
1566        thread.notify();
1567      }
1568    }
1569
1570    // Release lock
1571
if (canTakeReadLock)
1572      backendNonBlockingThreadsRWLock.releaseRead();
1573    else
1574      backendNonBlockingThreadsRWLock.releaseWrite();
1575
1576    // Unblock next query from total order queue
1577
if (totalOrderRollback != null)
1578      removeHeadFromAndNotifyTotalOrderQueue();
1579
1580    // Check if someone had something to rollback
1581
if (task == null)
1582      return;
1583
1584    synchronized (task)
1585    {
1586      if (!task.hasCompleted())
1587      {
1588        // Wait for completion (notified by the task)
1589
try
1590        {
1591          // Wait on task
1592
long timeout = tm.getTimeout();
1593          if (timeout > 0)
1594          {
1595            long start = System.currentTimeMillis();
1596            task.wait(timeout);
1597            long end = System.currentTimeMillis();
1598            long remaining = timeout - (end - start);
1599            if (remaining <= 0)
1600            {
1601              if (task.setExpiredTimeout())
1602              { // Task will be ignored by all backends
1603
String JavaDoc msg = Translate.get(
1604                    "loadbalancer.rollbacksavepoint.timeout", new String JavaDoc[]{
1605                        savepointName, String.valueOf(tid),
1606                        String.valueOf(task.getSuccess()),
1607                        String.valueOf(task.getFailed())});
1608                logger.warn(msg);
1609                throw new SQLException JavaDoc(msg);
1610              }
1611              // else task execution already started, to late to cancel
1612
}
1613          }
1614          else
1615            task.wait();
1616        }
1617        catch (InterruptedException JavaDoc e)
1618        {
1619          if (task.setExpiredTimeout())
1620          { // Task will be ignored by all backends
1621
String JavaDoc msg = Translate.get(
1622                "loadbalancer.rollbacksavepoint.timeout", new String JavaDoc[]{
1623                    savepointName, String.valueOf(tid),
1624                    String.valueOf(task.getSuccess()),
1625                    String.valueOf(task.getFailed())});
1626            logger.warn(msg);
1627            throw new SQLException JavaDoc(msg);
1628          }
1629          // else task execution already started, to late to cancel
1630
}
1631      }
1632
1633      if (task.getSuccess() > 0)
1634        return;
1635      else
1636      { // All tasks failed
1637
ArrayList JavaDoc exceptions = task.getExceptions();
1638        if (exceptions == null)
1639          throw new SQLException JavaDoc(Translate.get(
1640              "loadbalancer.rollbacksavepoint.all.failed", new String JavaDoc[]{
1641                  savepointName, String.valueOf(tid)}));
1642        else
1643        {
1644          String JavaDoc errorMsg = Translate.get(
1645              "loadbalancer.rollbacksavepoint.failed.stack", new String JavaDoc[]{
1646                  savepointName, String.valueOf(tid)})
1647              + "\n";
1648          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1649              errorMsg);
1650          logger.error(ex.getMessage());
1651          throw ex;
1652        }
1653      }
1654    }
1655  }
1656
1657  /**
1658   * Release a savepoint from a transaction
1659   *
1660   * @param tm The transaction marker metadata
1661   * @param name The name of the savepoint ro release
1662   * @throws SQLException if an error occurs
1663   */

1664  public void releaseSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1665      throws SQLException JavaDoc
1666  {
1667    long tid = tm.getTransactionId();
1668    Long JavaDoc lTid = new Long JavaDoc(tid);
1669
1670    // List of backends that still have pending queries for the transaction for
1671
// which a savepoint will be released
1672
ArrayList JavaDoc asynchronousBackends = null;
1673    ReleaseSavepointTask task = null;
1674
1675    // Ordering for distributed virtual database
1676
ReleaseSavepoint totalOrderRelease = null;
1677    boolean canTakeReadLock = false;
1678    if (vdb.getTotalOrderQueue() != null)
1679    {
1680      totalOrderRelease = new ReleaseSavepoint(tid, name);
1681      // Total ordering mainly for distributed virtual databases.
1682
// If waitForTotalOrder returns true then the query has been scheduled in
1683
// total order and there is no need to take a write lock later to resolve
1684
// potential conflicts.
1685
canTakeReadLock = waitForTotalOrder(totalOrderRelease, false);
1686      if (!canTakeReadLock)
1687        // This is a local commit no total order info
1688
totalOrderRelease = null;
1689    }
1690
1691    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1692    {
1693      try
1694      {
1695        if (canTakeReadLock)
1696          backendBlockingThreadsRWLock.acquireRead();
1697        else
1698        {
1699          // Lock in write to ensure that all writes are posted and we wait in
1700
// the queue, else a read lock has the priority with the
1701
// implementation we are using.
1702
backendBlockingThreadsRWLock.acquireWrite();
1703        }
1704      }
1705      catch (InterruptedException JavaDoc e)
1706      {
1707        String JavaDoc msg = Translate.get(
1708            "loadbalancer.backendlist.acquire.writelock.failed", e);
1709        logger.error(msg);
1710        throw new SQLException JavaDoc(msg);
1711      }
1712
1713      int nbOfThreads = backendBlockingThreads.size();
1714
1715      // Create the task
1716
task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1717          .getTimeout(), tm.getLogin(), tid, name);
1718
1719      for (int i = 0; i < nbOfThreads; i++)
1720      {
1721        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1722            .get(i);
1723        if (thread.hasTaskForTransaction(lTid))
1724        {
1725          if (asynchronousBackends == null)
1726            asynchronousBackends = new ArrayList JavaDoc();
1727          asynchronousBackends.add(thread.getBackend());
1728          synchronized (thread)
1729          {
1730            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1731            thread.notify();
1732          }
1733        }
1734      }
1735
1736      // Release lock
1737
if (canTakeReadLock)
1738        backendBlockingThreadsRWLock.releaseRead();
1739      else
1740        backendBlockingThreadsRWLock.releaseWrite();
1741
1742      // Unset the task
1743
if (asynchronousBackends == null)
1744        task = null;
1745    }
1746
1747    try
1748    {
1749      if (canTakeReadLock)
1750        backendNonBlockingThreadsRWLock.acquireRead();
1751      else
1752        backendNonBlockingThreadsRWLock.acquireWrite();
1753    }
1754    catch (InterruptedException JavaDoc e)
1755    {
1756      String JavaDoc msg = Translate.get(
1757          "loadbalancer.backendlist.acquire.writelock.failed", e);
1758      logger.error(msg);
1759      throw new SQLException JavaDoc(msg);
1760    }
1761
1762    int nbOfThreads = backendNonBlockingThreads.size();
1763    ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1764    // Build the list of backend that need to release a savepoint for this
1765
// transaction
1766
for (int i = 0; i < nbOfThreads; i++)
1767    {
1768      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1769          .get(i);
1770      DatabaseBackend backend = thread.getBackend();
1771      // If the transaction has been started on this backend and it was not
1772
// previously treated in the asynchronous backend list (late nodes), then
1773
// we have to post the task now in the asynchronous list.
1774
if (backend.isStartedTransaction(lTid)
1775          && ((asynchronousBackends == null) || (!asynchronousBackends
1776              .contains(backend))))
1777        savepointList.add(thread);
1778    }
1779
1780    nbOfThreads = savepointList.size();
1781    if (nbOfThreads == 0)
1782    {
1783      if (canTakeReadLock)
1784        backendNonBlockingThreadsRWLock.releaseRead();
1785      else
1786        backendNonBlockingThreadsRWLock.releaseWrite();
1787      return;
1788    }
1789
1790    if (task == null)
1791      task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1792          .getTimeout(), tm.getLogin(), tid, name);
1793
1794    synchronized (task)
1795    {
1796      // Post the task in each backendThread tasklist and wakeup the threads
1797
for (int i = 0; i < nbOfThreads; i++)
1798      {
1799        BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i);
1800        synchronized (thread)
1801        {
1802          thread.addTask(task, tid);
1803          thread.notify();
1804        }
1805      }
1806
1807      // Release lock
1808
if (canTakeReadLock)
1809        backendNonBlockingThreadsRWLock.releaseRead();
1810      else
1811        backendNonBlockingThreadsRWLock.releaseWrite();
1812
1813      // Unblock next query from total order queue
1814
if (totalOrderRelease != null)
1815        removeHeadFromAndNotifyTotalOrderQueue();
1816
1817      // Wait for completion (notified by the task)
1818
try
1819      {
1820        // Wait on task
1821
long timeout = tm.getTimeout();
1822        if (timeout > 0)
1823        {
1824          long start = System.currentTimeMillis();
1825          task.wait(timeout);
1826          long end = System.currentTimeMillis();
1827          long remaining = timeout - (end - start);
1828          if (remaining <= 0)
1829          {
1830            if (task.setExpiredTimeout())
1831            { // Task will be ignored by all backends
1832
String JavaDoc msg = Translate.get(
1833                  "loadbalancer.releasesavepoint.timeout", new String JavaDoc[]{name,
1834                      String.valueOf(tid), String.valueOf(task.getSuccess()),
1835                      String.valueOf(task.getFailed())});
1836              logger.warn(msg);
1837              throw new SQLException JavaDoc(msg);
1838            }
1839            // else task execution already started, to late to cancel
1840
}
1841        }
1842        else
1843          task.wait();
1844      }
1845      catch (InterruptedException JavaDoc e)
1846      {
1847        if (task.setExpiredTimeout())
1848        { // Task will be ignored by all backends
1849
String JavaDoc msg = Translate.get("loadbalancer.releasesavepoint.timeout",
1850              new String JavaDoc[]{name, String.valueOf(tid),
1851                  String.valueOf(task.getSuccess()),
1852                  String.valueOf(task.getFailed())});
1853          logger.warn(msg);
1854          throw new SQLException JavaDoc(msg);
1855        }
1856        // else task execution already started, to late to cancel
1857
}
1858
1859      if (task.getSuccess() > 0)
1860        return;
1861      else
1862      { // All tasks failed
1863
ArrayList JavaDoc exceptions = task.getExceptions();
1864        if (exceptions == null)
1865          throw new SQLException JavaDoc(Translate.get(
1866              "loadbalancer.releasesavepoint.all.failed", new String JavaDoc[]{name,
1867                  String.valueOf(tid)}));
1868        else
1869        {
1870          String JavaDoc errorMsg = Translate.get(
1871              "loadbalancer.releasesavepoint.failed.stack", new String JavaDoc[]{name,
1872                  String.valueOf(tid)})
1873              + "\n";
1874          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1875              errorMsg);
1876          logger.error(ex.getMessage());
1877          throw ex;
1878        }
1879      }
1880    }
1881  }
1882
1883  /**
1884   * Set a savepoint to a transaction.
1885   *
1886   * @param tm The transaction marker metadata
1887   * @param name The name of the new savepoint
1888   * @throws SQLException if an error occurs
1889   */

1890  public void setSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1891      throws SQLException JavaDoc
1892  {
1893    long tid = tm.getTransactionId();
1894    Long JavaDoc lTid = new Long JavaDoc(tid);
1895
1896    // List of backends that still have pending queries for the transaction for
1897
// which a savepoint will be set
1898
ArrayList JavaDoc asynchronousBackends = null;
1899    SavepointTask task = null;
1900
1901    // Ordering for distributed virtual database
1902
SetSavepoint totalOrderSavepoint = null;
1903    boolean canTakeReadLock = false;
1904    if (vdb.getTotalOrderQueue() != null)
1905    {
1906      totalOrderSavepoint = new SetSavepoint(tid, name);
1907      // Total ordering mainly for distributed virtual databases.
1908
// If waitForTotalOrder returns true then the query has been scheduled in
1909
// total order and there is no need to take a write lock later to resolve
1910
// potential conflicts.
1911
canTakeReadLock = waitForTotalOrder(totalOrderSavepoint, false);
1912      if (!canTakeReadLock)
1913        // This is a local commit no total order info
1914
totalOrderSavepoint = null;
1915    }
1916
1917    if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
1918    {
1919      try
1920      {
1921        if (canTakeReadLock)
1922          backendBlockingThreadsRWLock.acquireRead();
1923        else
1924        {
1925          // Lock in write to ensure that all writes are posted and we wait in
1926
// the queue, else a read lock has the priority with the
1927
// implementation we are using.
1928
backendBlockingThreadsRWLock.acquireWrite();
1929        }
1930      }
1931      catch (InterruptedException JavaDoc e)
1932      {
1933        String JavaDoc msg = Translate.get(
1934            "loadbalancer.backendlist.acquire.writelock.failed", e);
1935        logger.error(msg);
1936        throw new SQLException JavaDoc(msg);
1937      }
1938
1939      int nbOfThreads = backendBlockingThreads.size();
1940
1941      // Create the task
1942
task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
1943          .getTimeout(), tm.getLogin(), tid, name);
1944
1945      for (int i = 0; i < nbOfThreads; i++)
1946      {
1947        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
1948            .get(i);
1949        if (thread.hasTaskForTransaction(lTid))
1950        {
1951          if (asynchronousBackends == null)
1952            asynchronousBackends = new ArrayList JavaDoc();
1953          asynchronousBackends.add(thread.getBackend());
1954          synchronized (thread)
1955          {
1956            thread.insertTaskAfterLastWriteForTransaction(task, lTid);
1957            thread.notify();
1958          }
1959        }
1960      }
1961
1962      // Release lock
1963
if (canTakeReadLock)
1964        backendBlockingThreadsRWLock.releaseRead();
1965      else
1966        backendBlockingThreadsRWLock.releaseWrite();
1967
1968      // Unset the task
1969
if (asynchronousBackends == null)
1970        task = null;
1971    }
1972
1973    try
1974    {
1975      if (canTakeReadLock)
1976        backendNonBlockingThreadsRWLock.acquireRead();
1977      else
1978        backendNonBlockingThreadsRWLock.acquireWrite();
1979    }
1980    catch (InterruptedException JavaDoc e)
1981    {
1982      String JavaDoc msg = Translate.get(
1983          "loadbalancer.backendlist.acquire.writelock.failed", e);
1984      logger.error(msg);
1985      throw new SQLException JavaDoc(msg);
1986    }
1987
1988    int nbOfThreads = backendNonBlockingThreads.size();
1989    ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1990    // Build the list of backend that need to set a savepoint for this
1991
// transaction
1992
for (int i = 0; i < nbOfThreads; i++)
1993    {
1994      BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
1995          .get(i);
1996      DatabaseBackend backend = thread.getBackend();
1997      // If the transaction has been started on this backend and it was not
1998
// previously treated in the asynchronous backend list (late nodes), then
1999
// we have to post the task now in the asynchronous list.
2000
if (backend.isStartedTransaction(lTid)
2001          && ((asynchronousBackends == null) || (!asynchronousBackends
2002              .contains(backend))))
2003        savepointList.add(thread);
2004    }
2005
2006    nbOfThreads = savepointList.size();
2007    if (nbOfThreads == 0)
2008    {
2009      if (canTakeReadLock)
2010        backendNonBlockingThreadsRWLock.releaseRead();
2011      else
2012        backendNonBlockingThreadsRWLock.releaseWrite();
2013      return;
2014    }
2015
2016    if (task == null)
2017      task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm
2018          .getTimeout(), tm.getLogin(), tid, name);
2019
2020    synchronized (task)
2021    {
2022      // Post the task in each backendThread tasklist and wakeup the threads
2023
for (int i = 0; i < nbOfThreads; i++)
2024      {
2025        BackendWorkerThread thread = (BackendWorkerThread) savepointList.get(i);
2026        synchronized (thread)
2027        {
2028          thread.addTask(task, tid);
2029          thread.notify();
2030        }
2031      }
2032
2033      // Release lock
2034
if (canTakeReadLock)
2035        backendNonBlockingThreadsRWLock.releaseRead();
2036      else
2037        backendNonBlockingThreadsRWLock.releaseWrite();
2038
2039      // Unblock next query from total order queue
2040
if (totalOrderSavepoint != null)
2041        removeHeadFromAndNotifyTotalOrderQueue();
2042
2043      // Wait for completion (notified by the task)
2044
try
2045      {
2046        // Wait on task
2047
long timeout = tm.getTimeout();
2048        if (timeout > 0)
2049        {
2050          long start = System.currentTimeMillis();
2051          task.wait(timeout);
2052          long end = System.currentTimeMillis();
2053          long remaining = timeout - (end - start);
2054          if (remaining <= 0)
2055          {
2056            if (task.setExpiredTimeout())
2057            { // Task will be ignored by all backends
2058
String JavaDoc msg = Translate.get("loadbalancer.setsavepoint.timeout",
2059                  new String JavaDoc[]{name, String.valueOf(tid),
2060                      String.valueOf(task.getSuccess()),
2061                      String.valueOf(task.getFailed())});
2062              logger.warn(msg);
2063              throw new SQLException JavaDoc(msg);
2064            }
2065            // else task execution already started, to late to cancel
2066
}
2067        }
2068        else
2069          task.wait();
2070      }
2071      catch (InterruptedException JavaDoc e)
2072      {
2073        if (task.setExpiredTimeout())
2074        { // Task will be ignored by all backends
2075
String JavaDoc msg = Translate.get("loadbalancer.setsavepoint.timeout",
2076              new String JavaDoc[]{name, String.valueOf(tid),
2077                  String.valueOf(task.getSuccess()),
2078                  String.valueOf(task.getFailed())});
2079          logger.warn(msg);
2080          throw new SQLException JavaDoc(msg);
2081        }
2082        // else task execution already started, to late to cancel
2083
}
2084
2085      if (task.getSuccess() > 0)
2086        return;
2087      else
2088      { // All tasks failed
2089
ArrayList JavaDoc exceptions = task.getExceptions();
2090        if (exceptions == null)
2091          throw new SQLException JavaDoc(Translate.get(
2092              "loadbalancer.setsavepoint.all.failed", new String JavaDoc[]{name,
2093                  String.valueOf(tid)}));
2094        else
2095        {
2096          String JavaDoc errorMsg = Translate.get(
2097              "loadbalancer.setsavepoint.failed.stack", new String JavaDoc[]{name,
2098                  String.valueOf(tid)})
2099              + "\n";
2100          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
2101              errorMsg);
2102          logger.error(ex.getMessage());
2103          throw ex;
2104        }
2105      }
2106    }
2107  }
2108
2109  /**
2110   * Wait for all writes to be posted on BackendBlockingThreads by simply
2111   * acquiring the RW lock in write and releasing it.
2112   *
2113   * @throws SQLException if we fail to acquire the lock
2114   */

2115  private void waitForAllWritesToBePostedOnBackendBlockingThreads()
2116      throws SQLException JavaDoc
2117  {
2118    // Lock in write to ensure that all writes are posted and we wait in the
2119
// queue, else a read lock has the priority with the implementation we are
2120
// using.
2121
try
2122    {
2123      backendBlockingThreadsRWLock.acquireWrite();
2124    }
2125    catch (InterruptedException JavaDoc e)
2126    {
2127      String JavaDoc msg = Translate.get(
2128          "loadbalancer.backendlist.acquire.writelock.failed", e);
2129      logger.error(msg);
2130      throw new SQLException JavaDoc(msg);
2131    }
2132    backendBlockingThreadsRWLock.releaseWrite();
2133  }
2134
2135  /**
2136   * Waits for all writes of the given transaction in the blocking thread queue
2137   * to complete before being able to complete the transaction.
2138   *
2139   * @throws SQLException if a locking error occurs
2140   */

2141  protected void waitForAllWritesToComplete(long transactionId)
2142      throws SQLException JavaDoc
2143  {
2144    waitForAllWritesToBePostedOnBackendBlockingThreads();
2145
2146    boolean success = false;
2147    while (!success)
2148    {
2149      try
2150      { // Note that we are not synchronized here and we might have concurrent
2151
// modifications of the backend list.
2152
for (Iterator JavaDoc iter = backendBlockingThreads.iterator(); iter.hasNext();)
2153        {
2154          BackendWorkerThread thread = (BackendWorkerThread) iter.next();
2155          thread.waitForAllTasksToComplete(transactionId);
2156        }
2157        success = true;
2158      }
2159      catch (ConcurrentModificationException JavaDoc e)
2160      { // List has been modified while we were iterating
2161
// Retry until we succeed
2162
}
2163    }
2164  }
2165
2166  /**
2167   * Waits for all writes of the given transaction in the blocking thread queue
2168   * of the given backend to complete before being able to complete the
2169   * transaction.
2170   *
2171   * @throws SQLException if we fail to acquire the lock
2172   * @see #executeRequestOnBackend
2173   */

2174  protected void waitForAllWritesToComplete(DatabaseBackend backend,
2175      long transactionId) throws SQLException JavaDoc
2176  {
2177    waitForAllWritesToBePostedOnBackendBlockingThreads();
2178
2179    boolean success = false;
2180    while (!success)
2181    {
2182      try
2183      { // Note that we are not synchronized here and we might have concurrent
2184
// modifications of the backend list.
2185
for (Iterator JavaDoc iter = backendBlockingThreads.iterator(); iter.hasNext();)
2186        {
2187          BackendWorkerThread thread = (BackendWorkerThread) iter.next();
2188          if (thread.getBackend() == backend)
2189          {
2190            thread.waitForAllTasksToComplete(transactionId);
2191            break;
2192          }
2193        }
2194        success = true;
2195      }
2196      catch (ConcurrentModificationException JavaDoc e)
2197      { // List has been modified while we were iterating
2198
// Retry until we succeed
2199
}
2200    }
2201  }
2202
2203  /**
2204   * Waits for all writes in the blocking thread queue of the given backend to
2205   * complete.
2206   *
2207   * @throws SQLException if we fail to acquire the lock
2208   * @see #executeRequestOnBackend
2209   */

2210  protected void waitForAllWritesToComplete(DatabaseBackend backend)
2211      throws SQLException JavaDoc
2212  {
2213    waitForAllWritesToBePostedOnBackendBlockingThreads();
2214
2215    boolean success = false;
2216    while (!success)
2217    {
2218      try
2219      { // Note that we are not synchronized here and we might have concurrent
2220
// modifications of the backend list.
2221
for (Iterator JavaDoc iter = backendBlockingThreads.iterator(); iter.hasNext();)
2222        {
2223          BackendWorkerThread thread = (BackendWorkerThread) iter.next();
2224          if (thread.getBackend() == backend)
2225          {
2226            thread.waitForAllTasksToComplete();
2227            break;
2228          }
2229        }
2230        success = true;
2231      }
2232      catch (ConcurrentModificationException JavaDoc e)
2233      { // List has been modified while we were iterating
2234
// Retry until we succeed
2235
}
2236    }
2237  }
2238
2239  /*
2240   * Backends management
2241   */

2242
2243  /**
2244   * Enables a Backend that was previously disabled.
2245   * <p>
2246   * Ask the corresponding connection manager to initialize the connections if
2247   * needed.
2248   * <p>
2249   * No sanity checks are performed by this function.
2250   *
2251   * @param db The database backend to enable
2252   * @param writeEnabled True if the backend must be enabled for writes
2253   * @throws SQLException if an error occurs
2254   */

2255  public void enableBackend(DatabaseBackend db, boolean writeEnabled)
2256      throws SQLException JavaDoc
2257  {
2258    if (writeEnabled)
2259    {
2260      // Create 2 worker threads
2261
BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
2262      BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
2263
2264      // Add first to the blocking thread list
2265
try
2266      {
2267        backendBlockingThreadsRWLock.acquireWrite();
2268      }
2269      catch (InterruptedException JavaDoc e)
2270      {
2271        String JavaDoc msg = Translate.get(
2272            "loadbalancer.backendlist.acquire.writelock.failed", e);
2273        logger.error(msg);
2274        throw new SQLException JavaDoc(msg);
2275      }
2276      backendBlockingThreads.add(blockingThread);
2277      backendBlockingThreadsRWLock.releaseWrite();
2278      blockingThread.start();
2279      logger.info(Translate.get(
2280          "loadbalancer.backend.workerthread.blocking.add", db.getName()));
2281
2282      // Then add to the non-blocking thread list
2283
try
2284      {
2285        backendNonBlockingThreadsRWLock.acquireWrite();
2286      }
2287      catch (InterruptedException JavaDoc e)
2288      {
2289        String JavaDoc msg = Translate.get(
2290            "loadbalancer.backendlist.acquire.writelock.failed", e);
2291        logger.error(msg);
2292        throw new SQLException JavaDoc(msg);
2293      }
2294      backendNonBlockingThreads.add(nonBlockingThread);
2295      backendNonBlockingThreadsRWLock.releaseWrite();
2296      nonBlockingThread.start();
2297      logger.info(Translate.get(
2298          "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
2299      db.enableWrite();
2300    }
2301
2302    if (!db.isInitialized())
2303      db.initializeConnections();
2304    db.enableRead();
2305  }
2306
2307  /**
2308   * Disables a backend that was previously enabled.
2309   * <p>
2310   * Ask the corresponding connection manager to finalize the connections if
2311   * needed.
2312   * <p>
2313   * No sanity checks are performed by this function.
2314   *
2315   * @param db the database backend to disable
2316   * @throws SQLException if an error occurs
2317   */

2318  public synchronized void disableBackend(DatabaseBackend db)
2319      throws SQLException JavaDoc
2320  {
2321    if (db.isWriteEnabled())
2322    {
2323      KillThreadTask killBlockingThreadTask = new KillThreadTask(1, 1);
2324
2325      // Starts with backendBlockingThreads
2326
try
2327      {
2328        backendBlockingThreadsRWLock.acquireWrite();
2329      }
2330      catch (InterruptedException JavaDoc e)
2331      {
2332        String JavaDoc msg = Translate.get(
2333            "loadbalancer.backendlist.acquire.writelock.failed", e);
2334        logger.error(msg);
2335        throw new SQLException JavaDoc(msg);
2336      }
2337
2338      int nbOfThreads = backendBlockingThreads.size();
2339
2340      // Find the right blocking thread
2341
for (int i = 0; i < nbOfThreads; i++)
2342      {
2343        BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
2344            .get(i);
2345        if (thread.getBackend().equals(db))
2346        {
2347          logger.info(Translate
2348              .get("loadbalancer.backend.workerthread.blocking.remove", db
2349                  .getName()));
2350
2351          // Remove it from the backendBlockingThread list
2352
backendBlockingThreads.remove(i);
2353
2354          synchronized (thread)
2355          {
2356            // Kill the thread
2357
thread.addPriorityTask(killBlockingThreadTask);
2358            thread.notify();
2359          }
2360          break;
2361        }
2362      }
2363
2364      backendBlockingThreadsRWLock.releaseWrite();
2365
2366      // Wait for the thread to be killed
2367
synchronized (killBlockingThreadTask)
2368      {
2369        if (!killBlockingThreadTask.hasFullyCompleted())
2370          try
2371          {
2372            killBlockingThreadTask.wait();
2373          }
2374          catch (InterruptedException JavaDoc ignore)
2375          {
2376          }
2377      }
2378
2379      // Continue with backendNonBlockingThreads
2380
KillThreadTask killNonBlockingThreadTask = new KillThreadTask(1, 1);
2381
2382      try
2383      {
2384        backendNonBlockingThreadsRWLock.acquireWrite();
2385      }
2386      catch (InterruptedException JavaDoc e)
2387      {
2388        String JavaDoc msg = Translate.get(
2389            "loadbalancer.backendlist.acquire.writelock.failed", e);
2390        logger.error(msg);
2391        throw new SQLException JavaDoc(msg);
2392      }
2393
2394      // Find the right non-blocking thread
2395
nbOfThreads = backendNonBlockingThreads.size();
2396      for (int i = 0; i < nbOfThreads; i++)
2397      {
2398        BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
2399            .get(i);
2400        if (thread.getBackend().equals(db))
2401        {
2402          logger.info(Translate.get(
2403              "loadbalancer.backend.workerthread.non.blocking.remove", db
2404                  .getName()));
2405
2406          // Remove it from the backendNonBlockingThreads list
2407
backendNonBlockingThreads.remove(i);
2408
2409          synchronized (thread)
2410          {
2411            // Kill the thread
2412
thread.addPriorityTask(killNonBlockingThreadTask);
2413            thread.notify();
2414          }
2415          break;
2416        }
2417      }
2418
2419      backendNonBlockingThreadsRWLock.releaseWrite();
2420
2421      // Wait for the thread to be killed
2422
synchronized (killNonBlockingThreadTask)
2423      {
2424        if (!killNonBlockingThreadTask.hasFullyCompleted())
2425          try
2426          {
2427            killNonBlockingThreadTask.wait();
2428          }
2429          catch (InterruptedException JavaDoc ignore)
2430          {
2431          }
2432      }
2433    }
2434
2435    db.disable();
2436    if (db.isInitialized())
2437      db.finalizeConnections();
2438  }
2439
2440  /**
2441   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getNumberOfEnabledBackends()
2442   */

2443  public int getNumberOfEnabledBackends()
2444  {
2445    return backendBlockingThreads.size();
2446  }
2447
2448  /**
2449   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
2450   */

2451  public String JavaDoc getXmlImpl()
2452  {
2453    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
2454    info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2455    if (createTablePolicy != null)
2456      info.append(createTablePolicy.getXml());
2457    if (waitForCompletionPolicy != null)
2458      info.append(waitForCompletionPolicy.getXml());
2459    if (macroHandler != null)
2460      info.append(macroHandler.getXml());
2461    this.getRaidb2Xml();
2462    info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
2463    return info.toString();
2464  }
2465
2466  /**
2467   * return xml formatted information about this raidb2 load balancer
2468   *
2469   * @return xml formatted string
2470   */

2471  public abstract String JavaDoc getRaidb2Xml();
2472
2473}
Popular Tags