KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > loadbalancer > raidb0 > RAIDb0


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): Jaco Swart, Jean-Bernard van Zuylen
22  */

23
24 package org.continuent.sequoia.controller.loadbalancer.raidb0;
25
26 import java.sql.Connection JavaDoc;
27 import java.sql.SQLException JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.List JavaDoc;
32
33 import org.continuent.sequoia.common.exceptions.BadConnectionException;
34 import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
35 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
36 import org.continuent.sequoia.common.exceptions.NotImplementedException;
37 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory;
38 import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
39 import org.continuent.sequoia.common.i18n.Translate;
40 import org.continuent.sequoia.common.log.Trace;
41 import org.continuent.sequoia.common.xml.DatabasesXmlTags;
42 import org.continuent.sequoia.controller.backend.DatabaseBackend;
43 import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
44 import org.continuent.sequoia.controller.backend.result.ExecuteResult;
45 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
46 import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
47 import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
48 import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
49 import org.continuent.sequoia.controller.connection.PooledConnection;
50 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
51 import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
52 import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
53 import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
54 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableException;
55 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
56 import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTableRule;
57 import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
58 import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
59 import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
60 import org.continuent.sequoia.controller.loadbalancer.tasks.ReleaseSavepointTask;
61 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
62 import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackToSavepointTask;
63 import org.continuent.sequoia.controller.loadbalancer.tasks.SavepointTask;
64 import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
65 import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
66 import org.continuent.sequoia.controller.requests.AbstractRequest;
67 import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
68 import org.continuent.sequoia.controller.requests.ParsingGranularities;
69 import org.continuent.sequoia.controller.requests.SelectRequest;
70 import org.continuent.sequoia.controller.requests.StoredProcedure;
71 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
72
73 /**
74  * RAIDb-0: database partitioning.
75  * <p>
76  * The requests are sent to the backend nodes hosting the tables needed to
77  * execute the request. If no backend has the needed tables to perform a
78  * request, it will fail.
79  *
80  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
81  * @author <a HREF="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
82  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
83  * </a>
84  * @version 1.0
85  */

86 public class RAIDb0 extends AbstractLoadBalancer
87 {
88   //
89
// How the code is organized?
90
// 1. Member variables
91
// 2. Constructor(s)
92
// 3. Request handling
93
// 4. Transaction handling
94
// 5. Backend management
95
// 6. Debug/Monitoring
96
//
97

98   private CreateTablePolicy createTablePolicy;
99   protected static Trace logger = Trace
100                                        .getLogger("org.continuent.sequoia.controller.loadbalancer.RAIDb0");
101
102   /*
103    * Constructors
104    */

105
106   /**
107    * Creates a new RAIDb-0 request load balancer.
108    *
109    * @param vdb the virtual database this load balancer belongs to.
110    * @param createTablePolicy the policy defining how 'create table' statements
111    * should be handled
112    * @throws Exception if an error occurs
113    */

114   public RAIDb0(VirtualDatabase vdb, CreateTablePolicy createTablePolicy)
115       throws Exception JavaDoc
116   {
117     super(vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE);
118     this.createTablePolicy = createTablePolicy;
119     this.waitForCompletionPolicy = new WaitForCompletionPolicy(
120         WaitForCompletionPolicy.ALL, false, 0);
121   }
122
123   /*
124    * Request Handling
125    */

126
127   /**
128    * Performs a read request on the backend that has the needed tables to
129    * executes the request.
130    *
131    * @param request an <code>SelectRequest</code>
132    * @param metadataCache the metadataCache if any or null
133    * @return the corresponding <code>java.sql.ResultSet</code>
134    * @exception SQLException if an error occurs
135    * @see AbstractLoadBalancer#statementExecuteQuery(SelectRequest,
136    * MetadataCache)
137    */

138   public ControllerResultSet statementExecuteQuery(SelectRequest request,
139       MetadataCache metadataCache) throws SQLException JavaDoc
140   {
141     try
142     {
143       vdb.acquireReadLockBackendLists(); // Acquire read lock
144
}
145     catch (InterruptedException JavaDoc e)
146     {
147       String JavaDoc msg = Translate.get(
148           "loadbalancer.backendlist.acquire.readlock.failed", e);
149       logger.error(msg);
150       throw new SQLException JavaDoc(msg);
151     }
152
153     try
154     {
155       ControllerResultSet rs = null;
156       Collection JavaDoc fromTables = request.getFrom();
157
158       if (fromTables == null)
159         throw new SQLException JavaDoc(Translate.get("loadbalancer.from.not.found",
160             request.getSqlShortForm(vdb.getSqlShortFormLength())));
161
162       // Find the backend that has the needed tables
163
ArrayList JavaDoc backends = vdb.getBackends();
164       int size = backends.size();
165       int enabledBackends = 0;
166
167       DatabaseBackend backend = null;
168       // The backend that will execute the query
169
for (int i = 0; i < size; i++)
170       {
171         backend = (DatabaseBackend) backends.get(i);
172         if (backend.isReadEnabled())
173           enabledBackends++;
174         if (backend.isReadEnabled() && backend.hasTables(fromTables))
175           break;
176         else
177           backend = null;
178       }
179
180       if (backend == null)
181       {
182         if (enabledBackends == 0)
183           throw new NoMoreBackendException(Translate.get(
184               "loadbalancer.execute.no.backend.enabled", request.getId()));
185         else
186           throw new SQLException JavaDoc(Translate.get(
187               "loadbalancer.backend.no.required.tables", fromTables.toString()));
188       }
189
190       if (logger.isDebugEnabled())
191       {
192         logger.debug("Backend " + backend.getName()
193             + " has all tables which are:");
194         for (Iterator JavaDoc iter = fromTables.iterator(); iter.hasNext();)
195           logger.debug(iter.next());
196       }
197
198       // Execute the request on the chosen backend
199
try
200       {
201         rs = executeRequestOnBackend(request, backend, metadataCache);
202       }
203       catch (SQLException JavaDoc se)
204       {
205         String JavaDoc msg = Translate.get("loadbalancer.request.failed", new String JavaDoc[]{
206             String.valueOf(request.getId()), se.getMessage()});
207         if (logger.isInfoEnabled())
208           logger.info(msg);
209         throw new SQLException JavaDoc(msg);
210       }
211
212       return rs;
213     }
214     catch (RuntimeException JavaDoc e)
215     {
216       String JavaDoc msg = Translate
217           .get("loadbalancer.request.failed", new String JavaDoc[]{
218               request.getSqlShortForm(vdb.getSqlShortFormLength()),
219               e.getMessage()});
220       logger.fatal(msg, e);
221       throw new SQLException JavaDoc(msg);
222     }
223     finally
224     {
225       vdb.releaseReadLockBackendLists(); // Release the lock
226
}
227   }
228
229   /**
230    * Performs a write request on the backend that has the needed tables to
231    * executes the request.
232    *
233    * @param request an <code>AbstractWriteRequest</code>
234    * @return number of rows affected by the request
235    * @exception SQLException if an error occurs
236    */

237   public ExecuteUpdateResult statementExecuteUpdate(AbstractWriteRequest request)
238       throws SQLException JavaDoc
239   {
240     // Handle macros
241
handleMacros(request);
242
243     try
244     {
245       vdb.acquireReadLockBackendLists(); // Acquire read lock
246
}
247     catch (InterruptedException JavaDoc e)
248     {
249       String JavaDoc msg = Translate.get(
250           "loadbalancer.backendlist.acquire.readlock.failed", e);
251       logger.error(msg);
252       throw new SQLException JavaDoc(msg);
253     }
254
255     boolean success = false;
256     try
257     {
258       // Log lazy begin if needed
259
if (request.isLazyTransactionStart())
260         this.vdb.getRequestManager().logLazyTransactionBegin(
261             request.getTransactionId());
262
263       // Log request
264
if (recoveryLog != null)
265         recoveryLog.logRequestExecuting(request);
266
267       String JavaDoc table = request.getTableName();
268       AbstractConnectionManager cm = null;
269
270       if (table == null)
271         throw new SQLException JavaDoc(Translate.get(
272             "loadbalancer.request.target.table.not.found", request
273                 .getSqlShortForm(vdb.getSqlShortFormLength())));
274
275       // Find the backend that has the needed table
276
ArrayList JavaDoc backends = vdb.getBackends();
277       int size = backends.size();
278
279       DatabaseBackend backend = null;
280       // The backend that will execute the query
281
if (request.isCreate())
282       { // Choose the backend according to the defined policy
283
CreateTableRule rule = createTablePolicy.getTableRule(request
284             .getTableName());
285         if (rule == null)
286           rule = createTablePolicy.getDefaultRule();
287
288         // Ask the rule to pickup a backend
289
ArrayList JavaDoc choosen;
290         try
291         {
292           choosen = rule.getBackends(backends);
293         }
294         catch (CreateTableException e)
295         {
296           throw new SQLException JavaDoc(Translate.get(
297               "loadbalancer.create.table.rule.failed", e.getMessage()));
298         }
299
300         // Get the connection manager from the chosen backend
301
if (choosen != null)
302           backend = (DatabaseBackend) choosen.get(0);
303         if (backend != null)
304           cm = backend.getConnectionManager(request.getLogin());
305       }
306       else
307       { // Find the backend that has the table
308
for (int i = 0; i < size; i++)
309         {
310           backend = (DatabaseBackend) backends.get(i);
311           if ((backend.isWriteEnabled() || backend.isDisabling())
312               && backend.hasTable(table))
313           {
314             cm = backend.getConnectionManager(request.getLogin());
315             break;
316           }
317         }
318       }
319
320       // Sanity check
321
if (cm == null)
322         throw new SQLException JavaDoc(Translate.get(
323             "loadbalancer.backend.no.required.table", table));
324
325       // Ok, let's execute the query
326

327       if (request.isAutoCommit())
328       {
329         // We do not execute request outside the already open transactions if we
330
// are disabling the backend.
331
if (!backend.canAcceptTasks(request))
332           throw new SQLException JavaDoc(Translate.get(
333               "loadbalancer.backend.is.disabling", new String JavaDoc[]{
334                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
335                   backend.getName()}));
336
337         // Use a connection just for this request
338
PooledConnection c = null;
339         try
340         {
341           c = cm.retrieveConnectionInAutoCommit(request);
342         }
343         catch (UnreachableBackendException e1)
344         {
345           logger.error(Translate.get(
346               "loadbalancer.backend.disabling.unreachable", backend.getName()));
347           disableBackend(backend, true);
348           throw new SQLException JavaDoc(Translate.get(
349               "loadbalancer.backend.unreacheable", backend.getName()));
350         }
351
352         // Sanity check
353
if (c == null)
354           throw new SQLException JavaDoc(Translate.get(
355               "loadbalancer.backend.no.connection", backend.getName()));
356
357         ExecuteUpdateResult result;
358         try
359         {
360           result = executeStatementExecuteUpdateOnBackend(request, backend,
361               null, c.getConnection());
362         }
363         catch (Exception JavaDoc e)
364         {
365           throw new SQLException JavaDoc(Translate.get("loadbalancer.request.failed",
366               new String JavaDoc[]{
367                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
368                   e.getMessage()}));
369         }
370         finally
371         {
372           cm.releaseConnectionInAutoCommit(request, c);
373         }
374         if (logger.isDebugEnabled())
375           logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
376               String.valueOf(request.getId()), backend.getName()}));
377         return result;
378       }
379       else
380       { // Inside a transaction
381
Connection JavaDoc c;
382         long tid = request.getTransactionId();
383
384         try
385         {
386           c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request,
387               cm);
388         }
389         catch (UnreachableBackendException e1)
390         {
391           logger.error(Translate.get(
392               "loadbalancer.backend.disabling.unreachable", backend.getName()));
393           disableBackend(backend, true);
394           throw new SQLException JavaDoc(Translate.get(
395               "loadbalancer.backend.unreacheable", backend.getName()));
396         }
397         catch (NoTransactionStartWhenDisablingException e)
398         {
399           String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
400               new String JavaDoc[]{
401                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
402                   backend.getName()});
403           logger.error(msg);
404           throw new SQLException JavaDoc(msg);
405         }
406
407         // Sanity check
408
if (c == null)
409           throw new SQLException JavaDoc(Translate.get(
410               "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
411                   String.valueOf(tid), backend.getName()}));
412
413         // Execute the query
414
ExecuteUpdateResult result;
415         try
416         {
417           result = executeStatementExecuteUpdateOnBackend(request, backend,
418               null, c);
419         }
420         catch (Exception JavaDoc e)
421         {
422           throw new SQLException JavaDoc(Translate.get("loadbalancer.request.failed",
423               new String JavaDoc[]{
424                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
425                   e.getMessage()}));
426         }
427         if (logger.isDebugEnabled())
428           logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
429               String.valueOf(request.getId()), backend.getName()}));
430         success = true;
431         return result;
432       }
433     }
434     catch (RuntimeException JavaDoc e)
435     {
436       String JavaDoc msg = Translate
437           .get("loadbalancer.request.failed", new String JavaDoc[]{
438               request.getSqlShortForm(vdb.getSqlShortFormLength()),
439               e.getMessage()});
440       logger.fatal(msg, e);
441       throw new SQLException JavaDoc(msg);
442     }
443     finally
444     {
445       vdb.releaseReadLockBackendLists(); // Release the lock
446
if (!success)
447         recoveryLog.logRequestCompletion(request.getLogId(), false, request
448             .getExecTimeInMs());
449     }
450   }
451
452   /**
453    * @see AbstractLoadBalancer#statementExecuteUpdateWithKeys(AbstractWriteRequest,
454    * MetadataCache)
455    */

456   public GeneratedKeysResult statementExecuteUpdateWithKeys(
457       AbstractWriteRequest request, MetadataCache metadataCache)
458       throws SQLException JavaDoc
459   {
460     // Handle macros
461
handleMacros(request);
462
463     try
464     {
465       vdb.acquireReadLockBackendLists(); // Acquire
466
// read
467
// lock
468
}
469     catch (InterruptedException JavaDoc e)
470     {
471       String JavaDoc msg = Translate.get(
472           "loadbalancer.backendlist.acquire.readlock.failed", e);
473       logger.error(msg);
474       throw new SQLException JavaDoc(msg);
475     }
476
477     boolean success = false;
478     try
479     {
480       // Log lazy begin if needed
481
if (request.isLazyTransactionStart())
482         this.vdb.getRequestManager().logLazyTransactionBegin(
483             request.getTransactionId());
484
485       // Log request
486
if (recoveryLog != null)
487         recoveryLog.logRequestExecuting(request);
488
489       String JavaDoc table = request.getTableName();
490       AbstractConnectionManager cm = null;
491
492       if (table == null)
493         throw new SQLException JavaDoc(Translate.get(
494             "loadbalancer.request.target.table.not.found", request
495                 .getSqlShortForm(vdb.getSqlShortFormLength())));
496
497       // Find the backend that has the needed table
498
ArrayList JavaDoc backends = vdb.getBackends();
499       int size = backends.size();
500
501       DatabaseBackend backend = null;
502       // The backend that will execute the query
503
if (request.isCreate())
504       { // Choose the backend according to the defined policy
505
CreateTableRule rule = createTablePolicy.getTableRule(request
506             .getTableName());
507         if (rule == null)
508           rule = createTablePolicy.getDefaultRule();
509
510         // Ask the rule to pickup a backend
511
ArrayList JavaDoc choosen;
512         try
513         {
514           choosen = rule.getBackends(backends);
515         }
516         catch (CreateTableException e)
517         {
518           throw new SQLException JavaDoc(Translate.get(
519               "loadbalancer.create.table.rule.failed", e.getMessage()));
520         }
521
522         // Get the connection manager from the chosen backend
523
if (choosen != null)
524           backend = (DatabaseBackend) choosen.get(0);
525         if (backend != null)
526           cm = backend.getConnectionManager(request.getLogin());
527       }
528       else
529       { // Find the backend that has the table
530
for (int i = 0; i < size; i++)
531         {
532           backend = (DatabaseBackend) backends.get(i);
533           if ((backend.isWriteEnabled() || backend.isDisabling())
534               && backend.hasTable(table))
535           {
536             cm = backend.getConnectionManager(request.getLogin());
537             break;
538           }
539         }
540       }
541
542       // Sanity check
543
if (cm == null)
544         throw new SQLException JavaDoc(Translate.get(
545             "loadbalancer.backend.no.required.table", table));
546
547       if (!backend.getDriverCompliance().supportGetGeneratedKeys())
548         throw new SQLException JavaDoc(Translate.get(
549             "loadbalancer.backend.autogeneratedkeys.unsupported", backend
550                 .getName()));
551
552       // Ok, let's execute the query
553

554       if (request.isAutoCommit())
555       {
556         // We do not execute request outside the already open transactions if we
557
// are disabling the backend.
558
if (!backend.canAcceptTasks(request))
559           throw new SQLException JavaDoc(Translate.get(
560               "loadbalancer.backend.is.disabling", new String JavaDoc[]{
561                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
562                   backend.getName()}));
563
564         // Use a connection just for this request
565
PooledConnection c = null;
566         try
567         {
568           c = cm.retrieveConnectionInAutoCommit(request);
569         }
570         catch (UnreachableBackendException e1)
571         {
572           logger.error(Translate.get(
573               "loadbalancer.backend.disabling.unreachable", backend.getName()));
574           disableBackend(backend, true);
575           throw new SQLException JavaDoc(Translate.get(
576               "loadbalancer.backend.unreacheable", backend.getName()));
577         }
578
579         // Sanity check
580
if (c == null)
581           throw new SQLException JavaDoc(Translate.get(
582               "loadbalancer.backend.no.connection", backend.getName()));
583
584         // Execute Query
585
GeneratedKeysResult result;
586         try
587         {
588           result = executeStatementExecuteUpdateWithKeysOnBackend(request,
589               backend, null, c.getConnection(), metadataCache);
590         }
591         catch (Exception JavaDoc e)
592         {
593           throw new SQLException JavaDoc(Translate.get("loadbalancer.request.failed",
594               new String JavaDoc[]{
595                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
596                   e.getMessage()}));
597         }
598         finally
599         {
600           backend.removePendingRequest(request);
601           cm.releaseConnectionInAutoCommit(request, c);
602         }
603         if (logger.isDebugEnabled())
604           logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
605               String.valueOf(request.getId()), backend.getName()}));
606         return result;
607       }
608       else
609       { // Inside a transaction
610
Connection JavaDoc c;
611         long tid = request.getTransactionId();
612
613         try
614         {
615           c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request,
616               cm);
617         }
618         catch (UnreachableBackendException e1)
619         {
620           logger.error(Translate.get(
621               "loadbalancer.backend.disabling.unreachable", backend.getName()));
622           disableBackend(backend, true);
623           throw new SQLException JavaDoc(Translate.get(
624               "loadbalancer.backend.unreacheable", backend.getName()));
625         }
626         catch (NoTransactionStartWhenDisablingException e)
627         {
628           String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
629               new String JavaDoc[]{
630                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
631                   backend.getName()});
632           logger.error(msg);
633           throw new SQLException JavaDoc(msg);
634         }
635
636         // Sanity check
637
if (c == null)
638           throw new SQLException JavaDoc(Translate.get(
639               "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
640                   String.valueOf(tid), backend.getName()}));
641
642         // Execute the query
643
GeneratedKeysResult result;
644         try
645         {
646           result = executeStatementExecuteUpdateWithKeysOnBackend(request,
647               backend, null, c, metadataCache);
648         }
649         catch (Exception JavaDoc e)
650         {
651           throw new SQLException JavaDoc(Translate.get("loadbalancer.request.failed",
652               new String JavaDoc[]{
653                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
654                   e.getMessage()}));
655         }
656         if (logger.isDebugEnabled())
657           logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
658               String.valueOf(request.getId()), backend.getName()}));
659         success = true;
660         return result;
661       }
662     }
663     catch (RuntimeException JavaDoc e)
664     {
665       String JavaDoc msg = Translate
666           .get("loadbalancer.request.failed", new String JavaDoc[]{
667               request.getSqlShortForm(vdb.getSqlShortFormLength()),
668               e.getMessage()});
669       logger.fatal(msg, e);
670       throw new SQLException JavaDoc(msg);
671     }
672     finally
673     {
674       vdb.releaseReadLockBackendLists(); // Release the lock
675
if (!success)
676         recoveryLog.logRequestCompletion(request.getLogId(), false, request
677             .getExecTimeInMs());
678     }
679   }
680
681   /**
682    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecute(AbstractRequest,
683    * MetadataCache)
684    */

685   public ExecuteResult statementExecute(AbstractRequest request,
686       MetadataCache metadataCache) throws SQLException JavaDoc,
687       AllBackendsFailedException
688   {
689     throw new NotImplementedException(
690         "Statement.execute() is currently not supported with RAIDb-0");
691   }
692
693   /**
694    * Execute a read request on the selected backend.
695    *
696    * @param request the request to execute
697    * @param backend the backend that will execute the request
698    * @param metadataCache the metadataCache if any or null
699    * @return the ControllerResultSet if (!success)
700    * recoveryLog.logRequestCompletion(request.getLogId(), false,
701    * request.getExecTimeInMs());
702    * @throws SQLException if an error occurs
703    */

704   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
705       DatabaseBackend backend, MetadataCache metadataCache) throws SQLException JavaDoc
706   {
707     // Handle macros
708
handleMacros(request);
709
710     // Ok, we have a backend, let's execute the request
711
AbstractConnectionManager cm = backend.getConnectionManager(request
712         .getLogin());
713
714     // Sanity check
715
if (cm == null)
716     {
717       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
718           new String JavaDoc[]{request.getLogin(), backend.getName()});
719       logger.error(msg);
720       throw new SQLException JavaDoc(msg);
721     }
722
723     // Execute the query
724
if (request.isAutoCommit())
725     {
726       ControllerResultSet rs = null;
727       boolean badConnection;
728       do
729       {
730         badConnection = false;
731         // Use a connection just for this request
732
PooledConnection c = null;
733         try
734         {
735           c = cm.retrieveConnectionInAutoCommit(request);
736         }
737         catch (UnreachableBackendException e1)
738         {
739           logger.error(Translate.get(
740               "loadbalancer.backend.disabling.unreachable", backend.getName()));
741           disableBackend(backend, true);
742           throw new SQLException JavaDoc(Translate.get(
743               "loadbalancer.backend.unreacheable", backend.getName()));
744         }
745
746         // Sanity check
747
if (c == null)
748           throw new SQLException JavaDoc(Translate.get(
749               "loadbalancer.backend.no.connection", backend.getName()));
750
751         // Execute Query
752
try
753         {
754           rs = executeStatementExecuteQueryOnBackend(request, backend, null, c
755               .getConnection(), metadataCache);
756           cm.releaseConnectionInAutoCommit(request, c);
757         }
758         catch (BadConnectionException e)
759         { // Get rid of the bad connection
760
cm.deleteConnection(c);
761           badConnection = true;
762         }
763         catch (Throwable JavaDoc e)
764         {
765           cm.releaseConnectionInAutoCommit(request, c);
766           throw new SQLException JavaDoc(Translate.get(
767               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
768                   request.getSqlShortForm(vdb.getSqlShortFormLength()),
769                   backend.getName(), e.getMessage()}));
770         }
771       }
772       while (badConnection);
773       if (logger.isDebugEnabled())
774         logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
775             String.valueOf(request.getId()), backend.getName()}));
776       return rs;
777     }
778     else
779     { // Inside a transaction
780
Connection JavaDoc c;
781       long tid = request.getTransactionId();
782
783       try
784       {
785         c = backend
786             .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
787       }
788       catch (UnreachableBackendException e1)
789       {
790         logger.error(Translate.get(
791             "loadbalancer.backend.disabling.unreachable", backend.getName()));
792         disableBackend(backend, true);
793         throw new SQLException JavaDoc(Translate.get(
794             "loadbalancer.backend.unreacheable", backend.getName()));
795       }
796       catch (NoTransactionStartWhenDisablingException e)
797       {
798         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
799             new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
800                 backend.getName()});
801         logger.error(msg);
802         throw new SQLException JavaDoc(msg);
803       }
804
805       // Sanity check
806
if (c == null)
807         throw new SQLException JavaDoc(Translate.get(
808             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
809                 String.valueOf(tid), backend.getName()}));
810
811       // Execute Query
812
ControllerResultSet rs = null;
813       try
814       {
815         rs = executeStatementExecuteQueryOnBackend(request, backend, null, c,
816             metadataCache);
817       }
818       catch (BadConnectionException e)
819       { // Get rid of the bad connection
820
cm.deleteConnection(tid);
821         throw new SQLException JavaDoc(Translate
822             .get("loadbalancer.connection.failed", new String JavaDoc[]{
823                 String.valueOf(tid), backend.getName(), e.getMessage()}));
824       }
825       catch (Throwable JavaDoc e)
826       {
827         throw new SQLException JavaDoc(Translate.get(
828             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
829                 request.getSqlShortForm(vdb.getSqlShortFormLength()),
830                 backend.getName(), e.getMessage()}));
831       }
832       if (logger.isDebugEnabled())
833         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
834             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
835                 backend.getName()}));
836       return rs;
837     }
838   }
839
840   /**
841    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
842    * MetadataCache)
843    */

844   public ControllerResultSet readOnlyCallableStatementExecuteQuery(
845       StoredProcedure proc, MetadataCache metadataCache) throws SQLException JavaDoc
846   {
847     throw new SQLException JavaDoc(
848         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
849   }
850
851   /**
852    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
853    * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
854    */

855   public ExecuteResult readOnlyCallableStatementExecute(StoredProcedure proc,
856       MetadataCache metadataCache) throws SQLException JavaDoc
857   {
858     throw new SQLException JavaDoc(
859         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
860   }
861
862   /**
863    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteQuery(StoredProcedure,
864    * MetadataCache)
865    */

866   public ControllerResultSet callableStatementExecuteQuery(
867       StoredProcedure proc, MetadataCache metadataCache) throws SQLException JavaDoc
868   {
869     throw new SQLException JavaDoc(
870         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
871   }
872
873   /**
874    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecuteUpdate(org.continuent.sequoia.controller.requests.StoredProcedure)
875    */

876   public ExecuteUpdateResult callableStatementExecuteUpdate(StoredProcedure proc)
877       throws SQLException JavaDoc
878   {
879     throw new SQLException JavaDoc(
880         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
881   }
882
883   /**
884    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#callableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
885    * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
886    */

887   public ExecuteResult callableStatementExecute(StoredProcedure proc,
888       MetadataCache metadataCache) throws AllBackendsFailedException,
889       SQLException JavaDoc
890   {
891     throw new SQLException JavaDoc(
892         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
893   }
894
895   /**
896    * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getPreparedStatementGetMetaData(org.continuent.sequoia.controller.requests.AbstractRequest)
897    */

898   public ControllerResultSet getPreparedStatementGetMetaData(
899       AbstractRequest request) throws SQLException JavaDoc
900   {
901     // Choose a backend
902
try
903     {
904       vdb.acquireReadLockBackendLists();
905     }
906     catch (InterruptedException JavaDoc e)
907     {
908       String JavaDoc msg = Translate.get(
909           "loadbalancer.backendlist.acquire.readlock.failed", e);
910       logger.error(msg);
911       throw new SQLException JavaDoc(msg);
912     }
913
914     /*
915      * The backend that will execute the query
916      */

917     DatabaseBackend backend = null;
918
919     // Note that vdb lock is released in the finally clause of this try/catch
920
// block
921
try
922     {
923       ArrayList JavaDoc backends = vdb.getBackends();
924       int size = backends.size();
925
926       if (size == 0)
927         throw new NoMoreBackendException(Translate.get(
928             "loadbalancer.execute.no.backend.available", request.getId()));
929
930       // Choose the first available backend
931
for (int i = 0; i < size; i++)
932       {
933         DatabaseBackend b = (DatabaseBackend) backends.get(i);
934         if (b.isReadEnabled())
935         {
936           backend = b;
937           break;
938         }
939       }
940     }
941     catch (Throwable JavaDoc e)
942     {
943       String JavaDoc msg = Translate.get("loadbalancer.execute.find.backend.failed",
944           new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
945               e.getMessage()});
946       logger.error(msg, e);
947       throw new SQLException JavaDoc(msg);
948     }
949     finally
950     {
951       vdb.releaseReadLockBackendLists();
952     }
953
954     if (backend == null)
955       throw new NoMoreBackendException(Translate.get(
956           "loadbalancer.execute.no.backend.enabled", request.getId()));
957
958     // Ok, we have a backend, let's execute the request
959
AbstractConnectionManager cm = backend.getConnectionManager(request
960         .getLogin());
961
962     // Sanity check
963
if (cm == null)
964     {
965       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
966           new String JavaDoc[]{request.getLogin(), backend.getName()});
967       logger.error(msg);
968       throw new SQLException JavaDoc(msg);
969     }
970
971     // Execute the query
972
if (request.isAutoCommit())
973     {
974       ControllerResultSet rs = null;
975       boolean badConnection;
976       do
977       {
978         badConnection = false;
979         // Use a connection just for this request
980
PooledConnection c = null;
981         try
982         {
983           c = cm.retrieveConnectionInAutoCommit(request);
984         }
985         catch (UnreachableBackendException e1)
986         {
987           logger.error(Translate.get(
988               "loadbalancer.backend.disabling.unreachable", backend.getName()));
989           disableBackend(backend, true);
990           // Retry on a different backend
991
return getPreparedStatementGetMetaData(request);
992         }
993
994         // Sanity check
995
if (c == null)
996           throw new SQLException JavaDoc(Translate.get(
997               "loadbalancer.backend.no.connection", backend.getName()));
998
999         // Execute Query
1000
try
1001        {
1002          rs = preparedStatementGetMetaDataOnBackend(
1003              request.getSqlOrTemplate(), backend, c.getConnection());
1004          cm.releaseConnectionInAutoCommit(request, c);
1005        }
1006        catch (SQLException JavaDoc e)
1007        {
1008          cm.releaseConnectionInAutoCommit(request, c);
1009          throw SQLExceptionFactory.getSQLException(e, Translate.get(
1010              "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1011                  request.getSqlShortForm(vdb.getSqlShortFormLength()),
1012                  backend.getName(), e.getMessage()}));
1013        }
1014        catch (BadConnectionException e)
1015        { // Get rid of the bad connection
1016
cm.deleteConnection(c);
1017          badConnection = true;
1018        }
1019        catch (Throwable JavaDoc e)
1020        {
1021          cm.releaseConnectionInAutoCommit(request, c);
1022          throw new SQLException JavaDoc(Translate.get(
1023              "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1024                  request.getSqlShortForm(vdb.getSqlShortFormLength()),
1025                  backend.getName(), e.getMessage()}));
1026        }
1027      }
1028      while (badConnection);
1029      if (logger.isDebugEnabled())
1030        logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
1031            String.valueOf(request.getId()), backend.getName()}));
1032      return rs;
1033    }
1034    else
1035    { // Inside a transaction
1036
Connection JavaDoc c;
1037      long tid = request.getTransactionId();
1038
1039      try
1040      {
1041        c = backend
1042            .getConnectionForTransactionAndLazyBeginIfNeeded(request, cm);
1043      }
1044      catch (UnreachableBackendException e1)
1045      {
1046        logger.error(Translate.get(
1047            "loadbalancer.backend.disabling.unreachable", backend.getName()));
1048        disableBackend(backend, true);
1049        throw new SQLException JavaDoc(Translate.get(
1050            "loadbalancer.backend.unreacheable", backend.getName()));
1051      }
1052      catch (NoTransactionStartWhenDisablingException e)
1053      {
1054        String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
1055            new String JavaDoc[]{request.getSqlShortForm(vdb.getSqlShortFormLength()),
1056                backend.getName()});
1057        logger.error(msg);
1058        throw new SQLException JavaDoc(msg);
1059      }
1060
1061      // Sanity check
1062
if (c == null)
1063        throw new SQLException JavaDoc(Translate.get(
1064            "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1065                String.valueOf(tid), backend.getName()}));
1066
1067      // Execute Query
1068
ControllerResultSet rs = null;
1069      try
1070      {
1071        rs = preparedStatementGetMetaDataOnBackend(request.getSqlOrTemplate(),
1072            backend, c);
1073      }
1074      catch (SQLException JavaDoc e)
1075      {
1076        throw e;
1077      }
1078      catch (BadConnectionException e)
1079      { // Connection failed, so did the transaction
1080
// Disable the backend.
1081
cm.deleteConnection(tid);
1082        String JavaDoc msg = Translate.get(
1083            "loadbalancer.backend.disabling.connection.failure", backend
1084                .getName());
1085        logger.error(msg);
1086        disableBackend(backend, true);
1087        throw new SQLException JavaDoc(msg);
1088      }
1089      catch (Throwable JavaDoc e)
1090      {
1091        throw new SQLException JavaDoc(Translate.get(
1092            "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
1093                request.getSqlShortForm(vdb.getSqlShortFormLength()),
1094                backend.getName(), e.getMessage()}));
1095      }
1096      if (logger.isDebugEnabled())
1097        logger.debug(Translate.get("loadbalancer.execute.transaction.on",
1098            new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
1099                backend.getName()}));
1100      return rs;
1101    }
1102  }
1103
1104  /*
1105   * Transaction management
1106   */

1107
1108  /**
1109   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#abort(org.continuent.sequoia.controller.requestmanager.TransactionMetaData)
1110   */

1111  public void abort(TransactionMetaData tm) throws SQLException JavaDoc
1112  {
1113    rollback(tm);
1114  }
1115
1116  /**
1117   * Begins a new transaction.
1118   *
1119   * @param tm the transaction marker metadata
1120   * @throws SQLException if an error occurs
1121   */

1122  public final void begin(TransactionMetaData tm) throws SQLException JavaDoc
1123  {
1124  }
1125
1126  /**
1127   * Commits a transaction.
1128   *
1129   * @param tm the transaction marker metadata
1130   * @throws SQLException if an error occurs
1131   */

1132  public void commit(TransactionMetaData tm) throws SQLException JavaDoc
1133  {
1134    long tid = tm.getTransactionId();
1135    Long JavaDoc lTid = new Long JavaDoc(tid);
1136
1137    long logId = 0;
1138    // Log the request
1139
if (recoveryLog != null)
1140      logId = recoveryLog.logCommit(tm);
1141
1142    // Acquire the lock
1143
String JavaDoc requestDescription = "commit " + tid;
1144    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1145
1146    // Build the list of backends that need to commit this transaction
1147
ArrayList JavaDoc commitList = new ArrayList JavaDoc(nbOfThreads);
1148    for (int i = 0; i < nbOfThreads; i++)
1149    {
1150      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1151      if (backend.isStartedTransaction(lTid))
1152        commitList.add(backend);
1153    }
1154
1155    int nbOfThreadsToCommit = commitList.size();
1156    CommitTask task = null;
1157    if (nbOfThreadsToCommit != 0)
1158      task = new CommitTask(getNbToWait(nbOfThreadsToCommit),
1159          nbOfThreadsToCommit, tm);
1160
1161    // Post the task in the non-conflicting queues.
1162
synchronized (enabledBackends)
1163    {
1164      for (int i = 0; i < nbOfThreadsToCommit; i++)
1165      {
1166        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1167        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1168      }
1169    }
1170
1171    // Release the lock
1172
backendListLock.releaseRead();
1173
1174    // Check if someone had something to commit
1175
if (task == null)
1176      return;
1177
1178    synchronized (task)
1179    {
1180      if (!task.hasCompleted())
1181        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1182
1183      if (task.getSuccess() == 0)
1184      { // All tasks failed
1185

1186        // Notify failure in recovery log
1187
if (recoveryLog != null)
1188          recoveryLog.logRequestCompletion(logId, false, 0);
1189
1190        List JavaDoc exceptions = task.getExceptions();
1191        if (exceptions == null)
1192          throw new SQLException JavaDoc(Translate.get(
1193              "loadbalancer.commit.all.failed", tid));
1194        else
1195        {
1196          String JavaDoc errorMsg = Translate.get("loadbalancer.commit.failed.stack",
1197              tid)
1198              + "\n";
1199          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1200              errorMsg);
1201          logger.error(ex.getMessage());
1202          throw ex;
1203        }
1204      }
1205    }
1206  }
1207
1208  /**
1209   * Rollbacks a transaction.
1210   *
1211   * @param tm the transaction marker metadata
1212   * @throws SQLException if an error occurs
1213   */

1214  public void rollback(TransactionMetaData tm) throws SQLException JavaDoc
1215  {
1216    long tid = tm.getTransactionId();
1217    Long JavaDoc lTid = new Long JavaDoc(tid);
1218
1219    long logId = 0;
1220    // Log the request
1221
if (recoveryLog != null)
1222      logId = recoveryLog.logRollback(tm);
1223
1224    // Acquire the lock
1225
String JavaDoc requestDescription = "rollback " + tid;
1226    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1227
1228    // Build the list of backends that need to rollback this transaction
1229
ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1230    for (int i = 0; i < nbOfThreads; i++)
1231    {
1232      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1233      if (backend.isStartedTransaction(lTid))
1234        rollbackList.add(backend);
1235    }
1236
1237    int nbOfThreadsToRollback = rollbackList.size();
1238    RollbackTask task = null;
1239    if (nbOfThreadsToRollback != 0)
1240      task = new RollbackTask(getNbToWait(nbOfThreadsToRollback),
1241          nbOfThreadsToRollback, tm);
1242
1243    // Post the task in the non-conflicting queues.
1244
synchronized (enabledBackends)
1245    {
1246      for (int i = 0; i < nbOfThreadsToRollback; i++)
1247      {
1248        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1249        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1250      }
1251    }
1252
1253    // Release the lock
1254
backendListLock.releaseRead();
1255
1256    // Check if someone had something to rollback
1257
if (task == null)
1258      return;
1259
1260    synchronized (task)
1261    {
1262      if (!task.hasCompleted())
1263        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1264
1265      if (task.getSuccess() == 0)
1266      { // All tasks failed
1267

1268        // Notify failure in recovery log
1269
if (recoveryLog != null)
1270          recoveryLog.logRequestCompletion(logId, false, 0);
1271
1272        List JavaDoc exceptions = task.getExceptions();
1273        if (exceptions == null)
1274          throw new SQLException JavaDoc(Translate.get(
1275              "loadbalancer.rollback.all.failed", tid));
1276        else
1277        {
1278          String JavaDoc errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
1279              tid)
1280              + "\n";
1281          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1282              errorMsg);
1283          logger.error(ex.getMessage());
1284          throw ex;
1285        }
1286      }
1287    }
1288  }
1289
1290  /**
1291   * Rollback a transaction to a savepoint
1292   *
1293   * @param tm The transaction marker metadata
1294   * @param savepointName The name of the savepoint
1295   * @throws SQLException if an error occurs
1296   */

1297  public void rollbackToSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1298      throws SQLException JavaDoc
1299  {
1300    long tid = tm.getTransactionId();
1301    Long JavaDoc lTid = new Long JavaDoc(tid);
1302
1303    long logId = 0;
1304    // Log the request
1305
if (recoveryLog != null)
1306      logId = recoveryLog.logRollbackToSavepoint(tm, savepointName);
1307
1308    // Acquire the lock
1309
String JavaDoc requestDescription = "rollback " + savepointName + " " + tid;
1310    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1311
1312    // Build the list of backends that need to rollback this transaction
1313
ArrayList JavaDoc rollbackList = new ArrayList JavaDoc();
1314    for (int i = 0; i < nbOfThreads; i++)
1315    {
1316      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1317      if (backend.isStartedTransaction(lTid))
1318        rollbackList.add(backend);
1319    }
1320
1321    int nbOfThreadsToRollback = rollbackList.size();
1322    RollbackToSavepointTask task = null;
1323    if (nbOfThreadsToRollback != 0)
1324      task = new RollbackToSavepointTask(getNbToWait(nbOfThreadsToRollback),
1325          nbOfThreadsToRollback, tm, savepointName);
1326
1327    // Post the task in the non-conflicting queues.
1328
synchronized (enabledBackends)
1329    {
1330      for (int i = 0; i < nbOfThreadsToRollback; i++)
1331      {
1332        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1333        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1334      }
1335    }
1336
1337    // Release the lock
1338
backendListLock.releaseRead();
1339
1340    // Check if someone had something to rollback
1341
if (task == null)
1342      return;
1343
1344    synchronized (task)
1345    {
1346      if (!task.hasCompleted())
1347        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1348
1349      if (task.getSuccess() == 0)
1350      { // All tasks failed
1351

1352        // Notify failure in recovery log
1353
if (recoveryLog != null)
1354          recoveryLog.logRequestCompletion(logId, false, 0);
1355
1356        List JavaDoc exceptions = task.getExceptions();
1357        if (exceptions == null)
1358          throw new SQLException JavaDoc(Translate.get(
1359              "loadbalancer.rollbacksavepoint.all.failed", new String JavaDoc[]{
1360                  savepointName, String.valueOf(tid)}));
1361        else
1362        {
1363          String JavaDoc errorMsg = Translate.get(
1364              "loadbalancer.rollbacksavepoint.failed.stack", new String JavaDoc[]{
1365                  savepointName, String.valueOf(tid)})
1366              + "\n";
1367          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1368              errorMsg);
1369          logger.error(ex.getMessage());
1370          throw ex;
1371        }
1372      }
1373    }
1374  }
1375
1376  /**
1377   * Release a savepoint from a transaction
1378   *
1379   * @param tm The transaction marker metadata
1380   * @param savepointName The name of the savepoint ro release
1381   * @throws SQLException if an error occurs
1382   */

1383  public void releaseSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1384      throws SQLException JavaDoc
1385  {
1386    long tid = tm.getTransactionId();
1387    Long JavaDoc lTid = new Long JavaDoc(tid);
1388
1389    long logId = 0;
1390    // Log the request
1391
if (recoveryLog != null)
1392      logId = recoveryLog.logReleaseSavepoint(tm, savepointName);
1393
1394    // Acquire the lock
1395
String JavaDoc requestDescription = "release savepoint " + savepointName + " "
1396        + tid;
1397    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1398
1399    // Build the list of backends that need to rollback this transaction
1400
ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1401    for (int i = 0; i < nbOfThreads; i++)
1402    {
1403      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1404      if (backend.isStartedTransaction(lTid))
1405        savepointList.add(backend);
1406    }
1407
1408    int nbOfSavepoints = savepointList.size();
1409    ReleaseSavepointTask task = null;
1410    if (nbOfSavepoints != 0)
1411      task = new ReleaseSavepointTask(getNbToWait(nbOfThreads), nbOfThreads,
1412          tm, savepointName);
1413
1414    // Post the task in the non-conflicting queues.
1415
synchronized (enabledBackends)
1416    {
1417      for (int i = 0; i < nbOfSavepoints; i++)
1418      {
1419        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1420        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1421      }
1422    }
1423
1424    // Release the lock
1425
backendListLock.releaseRead();
1426
1427    // Check if someone had something to release
1428
if (task == null)
1429      return;
1430
1431    synchronized (task)
1432    {
1433      if (!task.hasCompleted())
1434        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1435
1436      if (task.getSuccess() == 0)
1437      { // All tasks failed
1438

1439        // Notify failure in recovery log
1440
if (recoveryLog != null)
1441          recoveryLog.logRequestCompletion(logId, false, 0);
1442
1443        List JavaDoc exceptions = task.getExceptions();
1444        if (exceptions == null)
1445          throw new SQLException JavaDoc(Translate.get(
1446              "loadbalancer.releasesavepoint.all.failed", new String JavaDoc[]{
1447                  savepointName, String.valueOf(tid)}));
1448        else
1449        {
1450          String JavaDoc errorMsg = Translate.get(
1451              "loadbalancer.releasesavepoint.failed.stack", new String JavaDoc[]{
1452                  savepointName, String.valueOf(tid)})
1453              + "\n";
1454          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1455              errorMsg);
1456          logger.error(ex.getMessage());
1457          throw ex;
1458        }
1459      }
1460    }
1461  }
1462
1463  /**
1464   * Set a savepoint to a transaction.
1465   *
1466   * @param tm The transaction marker metadata
1467   * @param savepointName The name of the new savepoint
1468   * @throws SQLException if an error occurs
1469   */

1470  public void setSavepoint(TransactionMetaData tm, String JavaDoc savepointName)
1471      throws SQLException JavaDoc
1472  {
1473    long tid = tm.getTransactionId();
1474    Long JavaDoc lTid = new Long JavaDoc(tid);
1475
1476    long logId = 0;
1477    // Log the request
1478
if (recoveryLog != null)
1479      logId = recoveryLog.logSetSavepoint(tm, savepointName);
1480
1481    // Acquire the lock
1482
String JavaDoc requestDescription = "set savepoint " + savepointName + " " + tid;
1483    int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1484
1485    // Build the list of backends that need to rollback this transaction
1486
ArrayList JavaDoc savepointList = new ArrayList JavaDoc();
1487    for (int i = 0; i < nbOfThreads; i++)
1488    {
1489      DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1490      if (backend.isStartedTransaction(lTid))
1491        savepointList.add(backend);
1492    }
1493
1494    int nbOfSavepoints = savepointList.size();
1495    SavepointTask task = null;
1496    if (nbOfSavepoints != 0)
1497      task = new SavepointTask(getNbToWait(nbOfThreads), nbOfThreads, tm,
1498          savepointName);
1499
1500    // Post the task in the non-conflicting queues.
1501
synchronized (enabledBackends)
1502    {
1503      for (int i = 0; i < nbOfSavepoints; i++)
1504      {
1505        DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1506        backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1507      }
1508    }
1509
1510    // Release the lock
1511
backendListLock.releaseRead();
1512
1513    // Check if someone had something to release
1514
if (task == null)
1515      return;
1516
1517    synchronized (task)
1518    {
1519      if (!task.hasCompleted())
1520        waitForTaskCompletion(tm.getTimeout(), requestDescription, task);
1521
1522      if (task.getSuccess() == 0)
1523      { // All tasks failed
1524

1525        // Notify failure in recovery log
1526
if (recoveryLog != null)
1527          recoveryLog.logRequestCompletion(logId, false, 0);
1528
1529        List JavaDoc exceptions = task.getExceptions();
1530        if (exceptions == null)
1531          throw new SQLException JavaDoc(Translate.get(
1532              "loadbalancer.setsavepoint.all.failed", new String JavaDoc[]{
1533                  savepointName, String.valueOf(tid)}));
1534        else
1535        {
1536          String JavaDoc errorMsg = Translate.get(
1537              "loadbalancer.setsavepoint.failed.stack", new String JavaDoc[]{
1538                  savepointName, String.valueOf(tid)})
1539              + "\n";
1540          SQLException JavaDoc ex = SQLExceptionFactory.getSQLException(exceptions,
1541              errorMsg);
1542          logger.error(ex.getMessage());
1543          throw ex;
1544        }
1545      }
1546    }
1547  }
1548
1549  /**
1550   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#closePersistentConnection(java.lang.String,
1551   * long)
1552   */

1553  public void closePersistentConnection(String JavaDoc login,
1554      long persistentConnectionId) throws SQLException JavaDoc
1555  {
1556    /*
1557     * We assume a synchronous execution and connection closing can only come
1558     * after all requests have been executed in that connection. We post to all
1559     * backends and let the task deal with whether that backend had a persistent
1560     * connection or not.
1561     */

1562
1563    String JavaDoc requestDescription = "closing persistent connection "
1564        + persistentConnectionId;
1565    ClosePersistentConnectionTask task = null;
1566    try
1567    {
1568      int nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1569
1570      task = new ClosePersistentConnectionTask(getNbToWait(nbOfThreads),
1571          nbOfThreads, login, persistentConnectionId);
1572
1573      // Post the task in the non-conflicting queues.
1574
synchronized (enabledBackends)
1575      {
1576        for (int i = 0; i < nbOfThreads; i++)
1577        {
1578          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1579          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1580        }
1581      }
1582
1583      // Release the lock
1584
backendListLock.releaseRead();
1585    }
1586    finally
1587    {
1588      if (task == null)
1589      { // Happens if we had a NoMoreBackendException
1590
task = new ClosePersistentConnectionTask(0, 0, login,
1591            persistentConnectionId);
1592      }
1593    }
1594  }
1595
1596  /**
1597   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#openPersistentConnection(String,
1598   * long)
1599   */

1600  public void openPersistentConnection(String JavaDoc login, long persistentConnectionId)
1601      throws SQLException JavaDoc
1602  {
1603    String JavaDoc requestDescription = "opening persistent connection "
1604        + persistentConnectionId;
1605    int nbOfThreads = 0;
1606    OpenPersistentConnectionTask task = null;
1607    try
1608    {
1609      nbOfThreads = acquireLockAndCheckNbOfThreads(null, requestDescription);
1610
1611      task = new OpenPersistentConnectionTask(getNbToWait(nbOfThreads),
1612          nbOfThreads, login, persistentConnectionId);
1613
1614      // Post the task in the non-conflicting queues.
1615
synchronized (enabledBackends)
1616      {
1617        for (int i = 0; i < nbOfThreads; i++)
1618        {
1619          DatabaseBackend backend = (DatabaseBackend) enabledBackends.get(i);
1620          backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task);
1621        }
1622      }
1623
1624      // Release the lock
1625
backendListLock.releaseRead();
1626
1627      synchronized (task)
1628      {
1629        if (!task.hasCompleted())
1630          try
1631          {
1632            waitForTaskCompletion(0, requestDescription, task);
1633          }
1634          catch (SQLException JavaDoc ignore)
1635          {
1636          }
1637      }
1638    }
1639    finally
1640    {
1641      if (logger.isDebugEnabled())
1642        logger.debug(requestDescription + " completed on " + nbOfThreads
1643            + " backends.");
1644    }
1645  }
1646
1647  /**
1648   * Enables a Backend that was previously disabled.
1649   * <p>
1650   * Ask the corresponding connection manager to initialize the connections if
1651   * needed.
1652   * <p>
1653   * No sanity checks are performed by this function.
1654   *
1655   * @param db the database backend to enable
1656   * @param writeEnabled True if the backend must be enabled for writes
1657   * @throws SQLException if an error occurs
1658   */

1659  public void enableBackend(DatabaseBackend db, boolean writeEnabled)
1660      throws SQLException JavaDoc
1661  {
1662    if (!db.isInitialized())
1663      db.initializeConnections();
1664
1665    if (writeEnabled && db.isWriteCanBeEnabled())
1666    {
1667      // Create the new backend task queues
1668
db.setTaskQueues(new BackendTaskQueues(db, waitForCompletionPolicy,
1669          this.vdb.getRequestManager()));
1670      db.startWorkerThreads(this);
1671      db.enableWrite();
1672    }
1673
1674    db.enableRead();
1675    try
1676    {
1677      backendListLock.acquireWrite();
1678    }
1679    catch (InterruptedException JavaDoc e)
1680    {
1681      logger.error("Error while acquiring write lock in enableBackend", e);
1682    }
1683    synchronized (enabledBackends)
1684    {
1685      enabledBackends.add(db);
1686    }
1687    backendListLock.releaseWrite();
1688  }
1689
1690  /**
1691   * Disables a backend that was previously enabled.
1692   * <p>
1693   * Ask the corresponding connection manager to finalize the connections if
1694   * needed.
1695   * <p>
1696   * No sanity checks are performed by this function.
1697   *
1698   * @param db the database backend to disable
1699   * @param forceDisable true if disabling must be forced on the backend
1700   * @throws SQLException if an error occurs
1701   */

1702  public void disableBackend(DatabaseBackend db, boolean forceDisable)
1703      throws SQLException JavaDoc
1704  {
1705    if (!db.disable())
1706    {
1707      // Another thread has already started the disable process
1708
return;
1709    }
1710    synchronized (this)
1711    {
1712      if (forceDisable)
1713        db.shutdownConnectionManagers();
1714      db.terminateWorkerThreads();
1715
1716      if (db.isInitialized())
1717        db.finalizeConnections();
1718
1719      try
1720      {
1721        backendListLock.acquireWrite();
1722      }
1723      catch (InterruptedException JavaDoc e)
1724      {
1725        logger.error("Error while acquiring write lock in enableBackend", e);
1726      }
1727      synchronized (enabledBackends)
1728      {
1729        enabledBackends.remove(db);
1730        if (enabledBackends.isEmpty())
1731        {
1732          // Cleanup schema for any remaining locks
1733
this.vdb.getRequestManager().setDatabaseSchema(null);
1734        }
1735      }
1736      backendListLock.releaseWrite();
1737    }
1738  }
1739
1740  /**
1741   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#setWeight(String,
1742   * int)
1743   */

1744  public void setWeight(String JavaDoc name, int w) throws SQLException JavaDoc
1745  {
1746    throw new SQLException JavaDoc("Weight is not supported with this load balancer");
1747  }
1748
1749  /*
1750   * Debug/Monitoring
1751   */

1752
1753  /**
1754   * Get information about the Request load balancer
1755   *
1756   * @return <code>String</code> containing information
1757   */

1758  public String JavaDoc getInformation()
1759  {
1760    return "RAIDb-0 Request load balancer\n";
1761  }
1762
1763  /**
1764   * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
1765   */

1766  public String JavaDoc getXmlImpl()
1767  {
1768    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
1769    info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
1770    createTablePolicy.getXml();
1771    info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
1772    return info.toString();
1773  }
1774
1775}
Popular Tags