KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > loadbalancer > paralleldb > ParallelDB


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Jaco Swart, Jean-Bernard van Zuylen.
23  */

24
25 package org.objectweb.cjdbc.controller.loadbalancer.paralleldb;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.SQLException JavaDoc;
29 import java.sql.Savepoint JavaDoc;
30 import java.util.Hashtable JavaDoc;
31
32 import javax.management.NotCompliantMBeanException JavaDoc;
33
34 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
35 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
36 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
37 import org.objectweb.cjdbc.common.i18n.Translate;
38 import org.objectweb.cjdbc.common.sql.AbstractRequest;
39 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
40 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
41 import org.objectweb.cjdbc.common.sql.SelectRequest;
42 import org.objectweb.cjdbc.common.sql.StoredProcedure;
43 import org.objectweb.cjdbc.common.sql.UnknownRequest;
44 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
45 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
46 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
47 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
48 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
49 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
50 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
51 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
52 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
53 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
54
55 /**
56  * This class defines a ParallelDB
57  *
58  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
59  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
60  * </a>
61  * @version 1.0
62  */

63 /**
64  * These are generic functions for all ParallelDB load balancers.
65  * <p>
66  * Read and write queries are load balanced on the backends without any
67  * replication (assuming that the underlying parallel database takes care of
68  * data replication). The load balancers provide failover for reads and writes.
69  *
70  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
71  * @author <a HREF="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
72  * @version 1.0
73  */

74 public abstract class ParallelDB extends AbstractLoadBalancer
75 {
76   //transaction id -> DatabaseBackend
77
private Hashtable JavaDoc backendPerTransactionId;
78   private int numberOfEnabledBackends = 0;
79
80   /**
81    * Creates a new <code>ParallelDB</code> load balancer with NO_PARSING and a
82    * SingleDB RAIDb level.
83    *
84    * @param vdb the virtual database this load balancer belongs to.
85    * @throws SQLException if an error occurs
86    * @throws NotCompliantMBeanException if the MBean is not JMX compliant
87    */

88   public ParallelDB(VirtualDatabase vdb) throws SQLException JavaDoc,
89       NotCompliantMBeanException JavaDoc
90   {
91     super(vdb, RAIDbLevels.SingleDB, ParsingGranularities.NO_PARSING);
92   }
93
94   //
95
// Request handling
96
//
97

98   /**
99    * Choose a backend using the implementation specific load balancing algorithm
100    * for read request execution.
101    *
102    * @param request request to execute
103    * @return the chosen backend
104    * @throws SQLException if an error occurs
105    */

106   public abstract DatabaseBackend chooseBackendForReadRequest(
107       AbstractRequest request) throws SQLException JavaDoc;
108
109   /**
110    * Choose a backend using the implementation specific load balancing algorithm
111    * for write request execution.
112    *
113    * @param request request to execute
114    * @return the chosen backend
115    * @throws SQLException if an error occurs
116    */

117   public abstract DatabaseBackend chooseBackendForWriteRequest(
118       AbstractWriteRequest request) throws SQLException JavaDoc;
119
120   /**
121    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadRequest(SelectRequest,
122    * MetadataCache)
123    */

124   public ControllerResultSet execReadRequest(SelectRequest request,
125       MetadataCache metadataCache) throws SQLException JavaDoc
126   {
127     DatabaseBackend backend;
128     if (request.isAutoCommit())
129       backend = chooseBackendForReadRequest(request);
130     else
131       backend = (DatabaseBackend) backendPerTransactionId.get(new Long JavaDoc(request
132           .getTransactionId()));
133
134     if (backend == null)
135       throw new SQLException JavaDoc(Translate.get(
136           "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
137               .getSQLShortFormLength())));
138
139     ControllerResultSet rs = null;
140     // Execute the request on the chosen backend
141
try
142     {
143       rs = executeReadRequestOnBackend(request, backend, metadataCache);
144     }
145     catch (UnreachableBackendException urbe)
146     {
147       // Try to execute query on different backend
148
return execReadRequest(request, metadataCache);
149     }
150     catch (SQLException JavaDoc se)
151     {
152       String JavaDoc msg = Translate.get("loadbalancer.request.failed", new String JavaDoc[]{
153           String.valueOf(request.getId()), se.getMessage()});
154       if (logger.isInfoEnabled())
155         logger.info(msg);
156       throw new SQLException JavaDoc(msg);
157     }
158     catch (RuntimeException JavaDoc e)
159     {
160       String JavaDoc msg = Translate.get("loadbalancer.request.failed.on.backend",
161           new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
162               backend.getName(), e.getMessage()});
163       logger.error(msg, e);
164       throw new SQLException JavaDoc(msg);
165     }
166
167     return rs;
168   }
169
170   /**
171    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadOnlyReadStoredProcedure(StoredProcedure,
172    * MetadataCache)
173    */

174   public ControllerResultSet execReadOnlyReadStoredProcedure(
175       StoredProcedure proc, MetadataCache metadataCache) throws SQLException JavaDoc
176   {
177     return execReadStoredProcedure(proc, metadataCache);
178   }
179
180   /**
181    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
182    * MetadataCache)
183    */

184   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
185       MetadataCache metadataCache) throws SQLException JavaDoc
186   {
187     DatabaseBackend backend;
188     if (proc.isAutoCommit())
189       backend = chooseBackendForReadRequest(proc);
190     else
191       backend = (DatabaseBackend) backendPerTransactionId.get(new Long JavaDoc(proc
192           .getTransactionId()));
193
194     if (backend == null)
195       throw new SQLException JavaDoc(Translate.get(
196           "loadbalancer.storedprocedure.no.backend.found", proc
197               .getSQLShortForm(vdb.getSQLShortFormLength())));
198
199     ControllerResultSet rs = null;
200     // Execute the request on the chosen backend
201
try
202     {
203       rs = executeReadStoredProcedureOnBackend(proc, backend, metadataCache);
204     }
205     catch (UnreachableBackendException urbe)
206     {
207       // Try to execute query on different backend
208
return execReadStoredProcedure(proc, metadataCache);
209     }
210     catch (SQLException JavaDoc se)
211     {
212       String JavaDoc msg = Translate.get("loadbalancer.storedprocedure.failed",
213           new String JavaDoc[]{String.valueOf(proc.getId()), se.getMessage()});
214       if (logger.isInfoEnabled())
215         logger.info(msg);
216       throw new SQLException JavaDoc(msg);
217     }
218     catch (RuntimeException JavaDoc e)
219     {
220       String JavaDoc msg = Translate.get(
221           "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
222               proc.getSQLShortForm(vdb.getSQLShortFormLength()),
223               backend.getName(), e.getMessage()});
224       logger.error(msg, e);
225       throw new SQLException JavaDoc(msg);
226     }
227
228     return rs;
229   }
230
231   /**
232    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteRequest(org.objectweb.cjdbc.common.sql.AbstractWriteRequest)
233    */

234   public int execWriteRequest(AbstractWriteRequest request)
235       throws AllBackendsFailedException, SQLException JavaDoc
236   {
237     DatabaseBackend backend;
238     if (request.isAutoCommit())
239       backend = chooseBackendForWriteRequest(request);
240     else
241       backend = (DatabaseBackend) backendPerTransactionId.get(new Long JavaDoc(request
242           .getTransactionId()));
243
244     if (backend == null)
245       throw new SQLException JavaDoc(Translate.get(
246           "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
247               .getSQLShortFormLength())));
248
249     int result;
250     // Execute the request on the chosen backend
251
try
252     {
253       result = executeWriteRequestOnBackend(request, backend);
254     }
255     catch (UnreachableBackendException urbe)
256     {
257       // Try to execute query on different backend
258
return execWriteRequest(request);
259     }
260     catch (SQLException JavaDoc se)
261     {
262       String JavaDoc msg = Translate.get("loadbalancer.request.failed", new String JavaDoc[]{
263           String.valueOf(request.getId()), se.getMessage()});
264       if (logger.isInfoEnabled())
265         logger.info(msg);
266       throw new SQLException JavaDoc(msg);
267     }
268     catch (RuntimeException JavaDoc e)
269     {
270       String JavaDoc msg = Translate.get("loadbalancer.request.failed.on.backend",
271           new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
272               backend.getName(), e.getMessage()});
273       logger.error(msg, e);
274       throw new SQLException JavaDoc(msg);
275     }
276
277     return result;
278   }
279
280   /**
281    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteRequestWithKeys(AbstractWriteRequest,
282    * MetadataCache)
283    */

284   public ControllerResultSet execWriteRequestWithKeys(
285       AbstractWriteRequest request, MetadataCache metadataCache)
286       throws AllBackendsFailedException, SQLException JavaDoc
287   {
288     DatabaseBackend backend;
289     if (request.isAutoCommit())
290       backend = chooseBackendForWriteRequest(request);
291     else
292       backend = (DatabaseBackend) backendPerTransactionId.get(new Long JavaDoc(request
293           .getTransactionId()));
294
295     if (backend == null)
296       throw new SQLException JavaDoc(Translate.get(
297           "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
298               .getSQLShortFormLength())));
299
300     ControllerResultSet rs;
301     // Execute the request on the chosen backend
302
try
303     {
304       rs = executeWriteRequestWithKeysOnBackend(request, backend, metadataCache);
305     }
306     catch (UnreachableBackendException urbe)
307     {
308       // Try to execute query on different backend
309
return execWriteRequestWithKeys(request, metadataCache);
310     }
311     catch (SQLException JavaDoc se)
312     {
313       String JavaDoc msg = Translate.get("loadbalancer.request.failed", new String JavaDoc[]{
314           String.valueOf(request.getId()), se.getMessage()});
315       if (logger.isInfoEnabled())
316         logger.info(msg);
317       throw new SQLException JavaDoc(msg);
318     }
319     catch (RuntimeException JavaDoc e)
320     {
321       String JavaDoc msg = Translate.get("loadbalancer.request.failed.on.backend",
322           new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
323               backend.getName(), e.getMessage()});
324       logger.error(msg, e);
325       throw new SQLException JavaDoc(msg);
326     }
327
328     return rs;
329   }
330
331   /**
332    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)
333    */

334   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException JavaDoc
335   {
336     DatabaseBackend backend;
337     if (proc.isAutoCommit())
338       backend = chooseBackendForReadRequest(proc);
339     else
340       backend = (DatabaseBackend) backendPerTransactionId.get(new Long JavaDoc(proc
341           .getTransactionId()));
342
343     if (backend == null)
344       throw new SQLException JavaDoc(Translate.get(
345           "loadbalancer.storedprocedure.no.backend.found", proc
346               .getSQLShortForm(vdb.getSQLShortFormLength())));
347
348     int result;
349     // Execute the request on the chosen backend
350
try
351     {
352       result = executeWriteStoredProcedureOnBackend(proc, backend);
353     }
354     catch (UnreachableBackendException urbe)
355     {
356       // Try to execute query on different backend
357
return execWriteStoredProcedure(proc);
358     }
359     catch (SQLException JavaDoc se)
360     {
361       String JavaDoc msg = Translate.get("loadbalancer.storedprocedure.failed",
362           new String JavaDoc[]{String.valueOf(proc.getId()), se.getMessage()});
363       if (logger.isInfoEnabled())
364         logger.info(msg);
365       throw new SQLException JavaDoc(msg);
366     }
367     catch (RuntimeException JavaDoc e)
368     {
369       String JavaDoc msg = Translate.get(
370           "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
371               proc.getSQLShortForm(vdb.getSQLShortFormLength()),
372               backend.getName(), e.getMessage()});
373       logger.error(msg, e);
374       throw new SQLException JavaDoc(msg);
375     }
376
377     return result;
378   }
379
380   /**
381    * Execute a read request on the selected backend.
382    *
383    * @param request the request to execute
384    * @param backend the backend that will execute the request
385    * @param metadataCache MetadataCache (null if none)
386    * @return the ControllerResultSet
387    * @throws SQLException if an error occurs
388    */

389   private ControllerResultSet executeReadRequestOnBackend(
390       SelectRequest request, DatabaseBackend backend,
391       MetadataCache metadataCache) throws SQLException JavaDoc,
392       UnreachableBackendException
393   {
394     // Ok, we have a backend, let's execute the request
395
AbstractConnectionManager cm = backend.getConnectionManager(request
396         .getLogin());
397
398     // Sanity check
399
if (cm == null)
400     {
401       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
402           new String JavaDoc[]{request.getLogin(), backend.getName()});
403       logger.error(msg);
404       throw new SQLException JavaDoc(msg);
405     }
406
407     // Execute the query
408
if (request.isAutoCommit())
409     {
410       ControllerResultSet rs = null;
411       boolean badConnection;
412       do
413       {
414         badConnection = false;
415         // Use a connection just for this request
416
Connection JavaDoc c = null;
417         try
418         {
419           c = cm.getConnection();
420         }
421         catch (UnreachableBackendException e1)
422         {
423           logger.error(Translate.get(
424               "loadbalancer.backend.disabling.unreachable", backend.getName()));
425           disableBackend(backend);
426           throw new UnreachableBackendException(Translate.get(
427               "loadbalancer.backend.unreacheable", backend.getName()));
428         }
429
430         // Sanity check
431
if (c == null)
432           throw new SQLException JavaDoc(Translate.get(
433               "loadbalancer.backend.no.connection", backend.getName()));
434
435         // Execute Query
436
try
437         {
438           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
439           cm.releaseConnection(c);
440         }
441         catch (SQLException JavaDoc e)
442         {
443           cm.releaseConnection(c);
444           throw new SQLException JavaDoc(Translate.get(
445               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
446                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
447                   backend.getName(), e.getMessage()}));
448         }
449         catch (BadConnectionException e)
450         { // Get rid of the bad connection
451
cm.deleteConnection(c);
452           badConnection = true;
453         }
454       }
455       while (badConnection);
456       if (logger.isDebugEnabled())
457         logger.debug(Translate.get("loadbalancer.execute.on", new String JavaDoc[]{
458             String.valueOf(request.getId()), backend.getName()}));
459       return rs;
460     }
461     else
462     { // Inside a transaction
463
Connection JavaDoc c;
464       long tid = request.getTransactionId();
465       Long JavaDoc lTid = new Long JavaDoc(tid);
466
467       try
468       {
469         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
470             request.getTransactionIsolation());
471       }
472       catch (UnreachableBackendException e1)
473       {
474         logger.error(Translate.get(
475             "loadbalancer.backend.disabling.unreachable", backend.getName()));
476         disableBackend(backend);
477         throw new SQLException JavaDoc(Translate.get(
478             "loadbalancer.backend.unreacheable", backend.getName()));
479       }
480       catch (NoTransactionStartWhenDisablingException e)
481       {
482         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
483             new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
484                 backend.getName()});
485         logger.error(msg);
486         throw new SQLException JavaDoc(msg);
487       }
488
489       // Sanity check
490
if (c == null)
491         throw new SQLException JavaDoc(Translate.get(
492             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
493                 String.valueOf(tid), backend.getName()}));
494
495       // Execute Query
496
ControllerResultSet rs = null;
497       try
498       {
499         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
500       }
501       catch (SQLException JavaDoc e)
502       {
503         throw new SQLException JavaDoc(Translate.get(
504             "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
505                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
506                 backend.getName(), e.getMessage()}));
507       }
508       catch (BadConnectionException e)
509       { // Connection failed, so did the transaction
510
// Disable the backend.
511
cm.deleteConnection(tid);
512         String JavaDoc msg = Translate.get(
513             "loadbalancer.backend.disabling.connection.failure", backend
514                 .getName());
515         logger.error(msg);
516         disableBackend(backend);
517         throw new SQLException JavaDoc(msg);
518       }
519       if (logger.isDebugEnabled())
520         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
521             new String JavaDoc[]{String.valueOf(tid), String.valueOf(request.getId()),
522                 backend.getName()}));
523       return rs;
524     }
525   }
526
527   /**
528    * Execute a stored procedure on the selected backend.
529    *
530    * @param proc the stored procedure to execute
531    * @param backend the backend that will execute the request
532    * @param metadataCache MetadataCache (null if none)
533    * @return the ControllerResultSet
534    * @throws SQLException if an error occurs
535    */

536   private ControllerResultSet executeReadStoredProcedureOnBackend(
537       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
538       throws SQLException JavaDoc, UnreachableBackendException
539   {
540     // Ok, we have a backend, let's execute the request
541
AbstractConnectionManager cm = backend
542         .getConnectionManager(proc.getLogin());
543
544     // Sanity check
545
if (cm == null)
546     {
547       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
548           new String JavaDoc[]{proc.getLogin(), backend.getName()});
549       logger.error(msg);
550       throw new SQLException JavaDoc(msg);
551     }
552
553     // Execute the query
554
if (proc.isAutoCommit())
555     {
556       // Use a connection just for this request
557
Connection JavaDoc c = null;
558       try
559       {
560         c = cm.getConnection();
561       }
562       catch (UnreachableBackendException e1)
563       {
564         logger.error(Translate.get(
565             "loadbalancer.backend.disabling.unreachable", backend.getName()));
566         disableBackend(backend);
567         throw new UnreachableBackendException(Translate.get(
568             "loadbalancer.backend.unreacheable", backend.getName()));
569       }
570
571       // Sanity check
572
if (c == null)
573         throw new UnreachableBackendException(Translate.get(
574             "loadbalancer.backend.no.connection", backend.getName()));
575
576       // Execute Query
577
ControllerResultSet rs = null;
578       try
579       {
580         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
581             backend, c, metadataCache);
582       }
583       catch (Exception JavaDoc e)
584       {
585         throw new SQLException JavaDoc(Translate.get(
586             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
587                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
588                 backend.getName(), e.getMessage()}));
589       }
590       finally
591       {
592         cm.releaseConnection(c);
593       }
594       if (logger.isDebugEnabled())
595         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
596             new String JavaDoc[]{String.valueOf(proc.getId()), backend.getName()}));
597       return rs;
598     }
599     else
600     { // Inside a transaction
601
Connection JavaDoc c;
602       long tid = proc.getTransactionId();
603       Long JavaDoc lTid = new Long JavaDoc(tid);
604
605       try
606       {
607         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
608             proc.getTransactionIsolation());
609       }
610       catch (UnreachableBackendException e1)
611       {
612         logger.error(Translate.get(
613             "loadbalancer.backend.disabling.unreachable", backend.getName()));
614         disableBackend(backend);
615         throw new SQLException JavaDoc(Translate.get(
616             "loadbalancer.backend.unreacheable", backend.getName()));
617       }
618       catch (NoTransactionStartWhenDisablingException e)
619       {
620         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
621             new String JavaDoc[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
622                 backend.getName()});
623         logger.error(msg);
624         throw new SQLException JavaDoc(msg);
625       }
626
627       // Sanity check
628
if (c == null)
629         throw new SQLException JavaDoc(Translate.get(
630             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
631                 String.valueOf(tid), backend.getName()}));
632
633       // Execute Query
634
ControllerResultSet rs;
635       try
636       {
637         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
638             backend, c, metadataCache);
639       }
640       catch (Exception JavaDoc e)
641       {
642         throw new SQLException JavaDoc(Translate.get(
643             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
644                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
645                 backend.getName(), e.getMessage()}));
646       }
647       if (logger.isDebugEnabled())
648         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
649             new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
650                 backend.getName()}));
651       return rs;
652     }
653   }
654
655   /**
656    * Execute a write request on the selected backend.
657    *
658    * @param request the request to execute
659    * @param backend the backend that will execute the request
660    * @return the number of modified rows
661    * @throws SQLException if an error occurs
662    */

663   private int executeWriteRequestOnBackend(AbstractWriteRequest request,
664       DatabaseBackend backend) throws SQLException JavaDoc, UnreachableBackendException
665   {
666     if (backend == null)
667       throw new SQLException JavaDoc(Translate.get(
668           "loadbalancer.execute.no.backend.available", request.getId()));
669
670     try
671     {
672       AbstractConnectionManager cm = backend.getConnectionManager(request
673           .getLogin());
674       if (request.isAutoCommit())
675       { // Use a connection just for this request
676
Connection JavaDoc c = null;
677         try
678         {
679           c = cm.getConnection();
680         }
681         catch (UnreachableBackendException e1)
682         {
683           String JavaDoc backendName = backend.getName();
684           logger.error(Translate.get(
685               "loadbalancer.backend.disabling.unreachable", backendName));
686           disableBackend(backend);
687           throw new UnreachableBackendException(Translate.get(
688               "loadbalancer.backend.unreacheable", backendName));
689         }
690
691         // Sanity check
692
if (c == null)
693           throw new UnreachableBackendException(Translate.get(
694               "loadbalancer.backend.no.connection", backend.getName()));
695
696         // Execute Query
697
int result;
698         try
699         {
700           result = executeUpdateRequestOnBackend(request, backend, c);
701         }
702         catch (Exception JavaDoc e)
703         {
704           throw new SQLException JavaDoc(Translate.get(
705               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
706                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
707                   backend.getName(), e.getMessage()}));
708         }
709         finally
710         {
711           cm.releaseConnection(c);
712         }
713         return result;
714       }
715       else
716       { // Re-use the connection used by this transaction
717
Connection JavaDoc c = cm.retrieveConnection(request.getTransactionId());
718
719         // Sanity check
720
if (c == null)
721           throw new SQLException JavaDoc(Translate.get(
722               "loadbalancer.unable.retrieve.connection",
723               new String JavaDoc[]{String.valueOf(request.getTransactionId()),
724                   backend.getName()}));
725
726         // Execute Query
727
int result;
728         try
729         {
730           result = executeUpdateRequestOnBackend(request, backend, c);
731           return result;
732         }
733         catch (Exception JavaDoc e)
734         {
735           throw new SQLException JavaDoc(Translate.get(
736               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
737                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
738                   backend.getName(), e.getMessage()}));
739         }
740       }
741     }
742     catch (RuntimeException JavaDoc e)
743     {
744       String JavaDoc msg = Translate.get("loadbalancer.request.failed.on.backend",
745           new String JavaDoc[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
746               backend.getName(), e.getMessage()});
747       logger.fatal(msg, e);
748       throw new SQLException JavaDoc(msg);
749     }
750   }
751
752   /**
753    * Execute a write request on the selected backend and return the
754    * autogenerated keys.
755    *
756    * @param request the request to execute
757    * @param backend the backend that will execute the request
758    * @param metadataCache MetadataCache (null if none)
759    * @return the ResultSet containing the auto-generated keys
760    * @throws SQLException if an error occurs
761    */

762   private ControllerResultSet executeWriteRequestWithKeysOnBackend(
763       AbstractWriteRequest request, DatabaseBackend backend,
764       MetadataCache metadataCache) throws SQLException JavaDoc,
765       UnreachableBackendException
766   {
767     if (backend == null)
768       throw new SQLException JavaDoc(Translate.get(
769           "loadbalancer.execute.no.backend.available", request.getId()));
770
771     if (!backend.getDriverCompliance().supportGetGeneratedKeys())
772       throw new SQLException JavaDoc(Translate.get(
773           "loadbalancer.backend.autogeneratedkeys.unsupported", backend
774               .getName()));
775
776     try
777     {
778       AbstractConnectionManager cm = backend.getConnectionManager(request
779           .getLogin());
780       if (request.isAutoCommit())
781       { // Use a connection just for this request
782
Connection JavaDoc c = null;
783         try
784         {
785           c = cm.getConnection();
786         }
787         catch (UnreachableBackendException e1)
788         {
789           String JavaDoc backendName = backend.getName();
790           logger.error(Translate.get(
791               "loadbalancer.backend.disabling.unreachable", backendName));
792           disableBackend(backend);
793           throw new UnreachableBackendException(Translate.get(
794               "loadbalancer.backend.unreacheable", backendName));
795         }
796
797         // Sanity check
798
if (c == null)
799           throw new UnreachableBackendException(Translate.get(
800               "loadbalancer.backend.no.connection", backend.getName()));
801
802         // Execute Query
803
ControllerResultSet result;
804         try
805         {
806           result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
807               metadataCache);
808         }
809         catch (Exception JavaDoc e)
810         {
811           throw new SQLException JavaDoc(Translate.get(
812               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
813                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
814                   backend.getName(), e.getMessage()}));
815         }
816         finally
817         {
818           cm.releaseConnection(c);
819         }
820         return result;
821       }
822       else
823       { // Re-use the connection used by this transaction
824
Connection JavaDoc c = cm.retrieveConnection(request.getTransactionId());
825
826         // Sanity check
827
if (c == null)
828           throw new SQLException JavaDoc(Translate.get(
829               "loadbalancer.unable.retrieve.connection",
830               new String JavaDoc[]{String.valueOf(request.getTransactionId()),
831                   backend.getName()}));
832
833         // Execute Query
834
try
835         {
836           return executeUpdateRequestOnBackendWithKeys(request, backend, c,
837               metadataCache);
838         }
839         catch (Exception JavaDoc e)
840         {
841           throw new SQLException JavaDoc(Translate.get(
842               "loadbalancer.request.failed.on.backend", new String JavaDoc[]{
843                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
844                   backend.getName(), e.getMessage()}));
845         }
846       }
847     }
848     catch (RuntimeException JavaDoc e)
849     {
850       String JavaDoc msg = Translate
851           .get("loadbalancer.request.failed", new String JavaDoc[]{
852               request.getSQLShortForm(vdb.getSQLShortFormLength()),
853               e.getMessage()});
854       logger.fatal(msg, e);
855       throw new SQLException JavaDoc(msg);
856     }
857   }
858
859   /**
860    * Execute a stored procedure on the selected backend.
861    *
862    * @param proc the stored procedure to execute
863    * @param backend the backend that will execute the request
864    * @return the ResultSet
865    * @throws SQLException if an error occurs
866    */

867   private int executeWriteStoredProcedureOnBackend(StoredProcedure proc,
868       DatabaseBackend backend) throws SQLException JavaDoc, UnreachableBackendException
869   {
870     // Ok, we have a backend, let's execute the request
871
AbstractConnectionManager cm = backend
872         .getConnectionManager(proc.getLogin());
873
874     // Sanity check
875
if (cm == null)
876     {
877       String JavaDoc msg = Translate.get("loadbalancer.connectionmanager.not.found",
878           new String JavaDoc[]{proc.getLogin(), backend.getName()});
879       logger.error(msg);
880       throw new SQLException JavaDoc(msg);
881     }
882
883     // Execute the query
884
if (proc.isAutoCommit())
885     {
886       // Use a connection just for this request
887
Connection JavaDoc c = null;
888       try
889       {
890         c = cm.getConnection();
891       }
892       catch (UnreachableBackendException e1)
893       {
894         logger.error(Translate.get(
895             "loadbalancer.backend.disabling.unreachable", backend.getName()));
896         disableBackend(backend);
897         throw new UnreachableBackendException(Translate.get(
898             "loadbalancer.backend.unreacheable", backend.getName()));
899       }
900
901       // Sanity check
902
if (c == null)
903         throw new UnreachableBackendException(Translate.get(
904             "loadbalancer.backend.no.connection", backend.getName()));
905
906       // Execute Query
907
int result;
908       try
909       {
910         result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
911             proc, backend, c);
912
913         // Warning! No way to detect if schema has been modified unless
914
// we ask the backend again using DatabaseMetaData.getTables().
915
}
916       catch (Exception JavaDoc e)
917       {
918         throw new SQLException JavaDoc(Translate.get(
919             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
920                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
921                 backend.getName(), e.getMessage()}));
922       }
923       finally
924       {
925         cm.releaseConnection(c);
926       }
927       if (logger.isDebugEnabled())
928         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
929             new String JavaDoc[]{String.valueOf(proc.getId()), backend.getName()}));
930       return result;
931     }
932     else
933     { // Inside a transaction
934
Connection JavaDoc c;
935       long tid = proc.getTransactionId();
936       Long JavaDoc lTid = new Long JavaDoc(tid);
937
938       try
939       {
940         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm,
941             proc.getTransactionIsolation());
942       }
943       catch (UnreachableBackendException e1)
944       {
945         logger.error(Translate.get(
946             "loadbalancer.backend.disabling.unreachable", backend.getName()));
947         disableBackend(backend);
948         throw new SQLException JavaDoc(Translate.get(
949             "loadbalancer.backend.unreacheable", backend.getName()));
950       }
951       catch (NoTransactionStartWhenDisablingException e)
952       {
953         String JavaDoc msg = Translate.get("loadbalancer.backend.is.disabling",
954             new String JavaDoc[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
955                 backend.getName()});
956         logger.error(msg);
957         throw new SQLException JavaDoc(msg);
958       }
959
960       // Sanity check
961
if (c == null)
962         throw new SQLException JavaDoc(Translate.get(
963             "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
964                 String.valueOf(tid), backend.getName()}));
965
966       // Execute Query
967
int result;
968       try
969       {
970         result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
971             proc, backend, c);
972
973         // Warning! No way to detect if schema has been modified unless
974
// we ask the backend again using DatabaseMetaData.getTables().
975
}
976       catch (Exception JavaDoc e)
977       {
978         throw new SQLException JavaDoc(Translate.get(
979             "loadbalancer.storedprocedure.failed.on.backend", new String JavaDoc[]{
980                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
981                 backend.getName(), e.getMessage()}));
982       }
983       if (logger.isDebugEnabled())
984         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
985             new String JavaDoc[]{String.valueOf(tid), String.valueOf(proc.getId()),
986                 backend.getName()}));
987       return result;
988     }
989   }
990
991   //
992
// Transaction Management
993
//
994

995   /**
996    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getInformation()
997    */

998   /**
999    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#begin(org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData)
1000   */

1001  public void begin(TransactionMarkerMetaData tm) throws SQLException JavaDoc
1002  {
1003    Long JavaDoc lTid = new Long JavaDoc(tm.getTransactionId());
1004    if (backendPerTransactionId.containsKey(lTid))
1005      throw new SQLException JavaDoc(Translate.get(
1006          "loadbalancer.transaction.already.started", lTid.toString()));
1007
1008    DatabaseBackend backend = chooseBackendForReadRequest(new UnknownRequest(
1009        "begin", false, 0, "\n"));
1010    backendPerTransactionId.put(lTid, backend);
1011    backend.startTransaction(lTid);
1012  }
1013
1014  /**
1015   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#commit(org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData)
1016   */

1017  public void commit(TransactionMarkerMetaData tm) throws SQLException JavaDoc
1018  {
1019    long tid = tm.getTransactionId();
1020    Long JavaDoc lTid = new Long JavaDoc(tid);
1021    DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
1022
1023    AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
1024    Connection JavaDoc c = cm.retrieveConnection(tid);
1025
1026    // Sanity check
1027
if (c == null)
1028    { // Bad connection
1029
db.stopTransaction(lTid);
1030
1031      throw new SQLException JavaDoc(Translate.get(
1032          "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1033              String.valueOf(tid), db.getName()}));
1034    }
1035
1036    // Execute Query
1037
try
1038    {
1039      c.commit();
1040      c.setAutoCommit(true);
1041    }
1042    catch (Exception JavaDoc e)
1043    {
1044      String JavaDoc msg = Translate.get("loadbalancer.commit.failed", new String JavaDoc[]{
1045          String.valueOf(tid), db.getName(), e.getMessage()});
1046      logger.error(msg);
1047      throw new SQLException JavaDoc(msg);
1048    }
1049    finally
1050    {
1051      cm.releaseConnection(tid);
1052      db.stopTransaction(lTid);
1053    }
1054  }
1055
1056  /**
1057   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#rollback(org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData)
1058   */

1059  public void rollback(TransactionMarkerMetaData tm) throws SQLException JavaDoc
1060  {
1061    long tid = tm.getTransactionId();
1062    Long JavaDoc lTid = new Long JavaDoc(tid);
1063    DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
1064
1065    AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
1066    Connection JavaDoc c = cm.retrieveConnection(tid);
1067
1068    // Sanity check
1069
if (c == null)
1070    { // Bad connection
1071
db.stopTransaction(lTid);
1072
1073      throw new SQLException JavaDoc(Translate.get(
1074          "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1075              String.valueOf(tid), db.getName()}));
1076    }
1077
1078    // Execute Query
1079
try
1080    {
1081      c.rollback();
1082      c.setAutoCommit(true);
1083    }
1084    catch (Exception JavaDoc e)
1085    {
1086      String JavaDoc msg = Translate.get("loadbalancer.rollback.failed", new String JavaDoc[]{
1087          String.valueOf(tid), db.getName(), e.getMessage()});
1088      logger.error(msg);
1089      throw new SQLException JavaDoc(msg);
1090    }
1091    finally
1092    {
1093      cm.releaseConnection(tid);
1094      db.stopTransaction(lTid);
1095    }
1096  }
1097
1098  /**
1099   * Rollback a transaction to a savepoint
1100   *
1101   * @param tm The transaction marker metadata
1102   * @param savepointName The name of the savepoint
1103   * @throws SQLException if an error occurs
1104   */

1105  public void rollback(TransactionMarkerMetaData tm, String JavaDoc savepointName)
1106      throws SQLException JavaDoc
1107  {
1108    long tid = tm.getTransactionId();
1109    Long JavaDoc lTid = new Long JavaDoc(tid);
1110    DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
1111
1112    AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
1113    Connection JavaDoc c = cm.retrieveConnection(tid);
1114
1115    // Sanity check
1116
if (c == null)
1117    { // Bad connection
1118
db.stopTransaction(lTid);
1119
1120      throw new SQLException JavaDoc(Translate.get(
1121          "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1122              String.valueOf(tid), db.getName()}));
1123    }
1124
1125    // Retrieve savepoint
1126
Savepoint JavaDoc savepoint = db.getSavepoint(lTid, savepointName);
1127    if (savepoint == null)
1128    {
1129      throw new SQLException JavaDoc(Translate.get(
1130          "loadbalancer.unable.retrieve.savepoint", new String JavaDoc[]{savepointName,
1131              String.valueOf(tid), db.getName()}));
1132    }
1133    
1134    // Execute Query
1135
try
1136    {
1137      c.rollback(savepoint);
1138    }
1139    catch (Exception JavaDoc e)
1140    {
1141      String JavaDoc msg = Translate.get("loadbalancer.rollbacksavepoint.failed",
1142          new String JavaDoc[]{savepointName, String.valueOf(tid), db.getName(),
1143          e.getMessage()});
1144      logger.error(msg);
1145      throw new SQLException JavaDoc(msg);
1146    }
1147  }
1148  
1149  /**
1150   * Release a savepoint from a transaction
1151   *
1152   * @param tm The transaction marker metadata
1153   * @param name The name of the savepoint ro release
1154   * @throws SQLException if an error occurs
1155   */

1156  public void releaseSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1157      throws SQLException JavaDoc
1158  {
1159    long tid = tm.getTransactionId();
1160    Long JavaDoc lTid = new Long JavaDoc(tid);
1161    
1162    DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.get(lTid);
1163    AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
1164    Connection JavaDoc c = cm.retrieveConnection(tid);
1165
1166    // Sanity check
1167
if (c == null)
1168    { // Bad connection
1169
db.stopTransaction(lTid);
1170
1171      throw new SQLException JavaDoc(Translate.get(
1172          "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1173              String.valueOf(tid), db.getName()}));
1174    }
1175
1176    // Retrieve savepoint
1177
Savepoint JavaDoc savepoint = db.getSavepoint(lTid, name);
1178    if (savepoint == null)
1179    {
1180      throw new SQLException JavaDoc(Translate.get(
1181          "loadbalancer.unable.retrieve.savepoint", new String JavaDoc[]{
1182              String.valueOf(tid), name, db.getName()}));
1183    }
1184    
1185    // Execute Query
1186
try
1187    {
1188      c.releaseSavepoint(savepoint);
1189    }
1190    catch (Exception JavaDoc e)
1191    {
1192      String JavaDoc msg = Translate.get("loadbalancer.releasesavepoint.failed",
1193          new String JavaDoc[]{name ,String.valueOf(tid), db.getName(),
1194              e.getMessage()});
1195      logger.error(msg);
1196      throw new SQLException JavaDoc(msg);
1197    }
1198    finally
1199    {
1200      db.removeSavepoint(lTid, savepoint);
1201    }
1202  }
1203  
1204  /**
1205   * Set a savepoint to a transaction.
1206   *
1207   * @param tm The transaction marker metadata
1208   * @param name The name of the new savepoint
1209   * @throws SQLException if an error occurs
1210   */

1211  public void setSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1212      throws SQLException JavaDoc
1213  {
1214    long tid = tm.getTransactionId();
1215    Long JavaDoc lTid = new Long JavaDoc(tid);
1216    
1217    DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.get(lTid);
1218    AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
1219    Connection JavaDoc c = cm.retrieveConnection(tid);
1220
1221    // Sanity check
1222
if (c == null)
1223    { // Bad connection
1224
db.stopTransaction(lTid);
1225
1226      throw new SQLException JavaDoc(Translate.get(
1227          "loadbalancer.unable.retrieve.connection", new String JavaDoc[]{
1228              String.valueOf(tid), db.getName()}));
1229    }
1230
1231    // Execute Query
1232
Savepoint JavaDoc savepoint = null;
1233    try
1234    {
1235      savepoint = c.setSavepoint(name);
1236    }
1237    catch (Exception JavaDoc e)
1238    {
1239      String JavaDoc msg = Translate.get("loadbalancer.setsavepoint.failed",
1240          new String JavaDoc[]{name, String.valueOf(tid), db.getName(),
1241              e.getMessage()});
1242      logger.error(msg);
1243      throw new SQLException JavaDoc(msg);
1244    }
1245    finally
1246    {
1247      if (savepoint != null)
1248        db.addSavepoint(lTid, savepoint);
1249    }
1250  }
1251  
1252  /**
1253   * Enables a backend that was previously disabled. Asks the corresponding
1254   * connection manager to initialize the connections if needed.
1255   * <p>
1256   * No sanity checks are performed by this function.
1257   *
1258   * @param db the database backend to enable
1259   * @param writeEnabled True if the backend must be enabled for writes
1260   * @throws SQLException if an error occurs
1261   */

1262  public void enableBackend(DatabaseBackend db, boolean writeEnabled)
1263      throws SQLException JavaDoc
1264  {
1265    logger.info(Translate.get("loadbalancer.backend.enabling", db.getName()));
1266    if (!db.isInitialized())
1267      db.initializeConnections();
1268    db.enableRead();
1269    if (writeEnabled)
1270      db.enableWrite();
1271    numberOfEnabledBackends++;
1272  }
1273
1274  /**
1275   * Disables a backend that was previously enabled. Asks the corresponding
1276   * connection manager to finalize the connections if needed.
1277   * <p>
1278   * No sanity checks are performed by this function.
1279   *
1280   * @param db the database backend to disable
1281   * @throws SQLException if an error occurs
1282   */

1283  public void disableBackend(DatabaseBackend db) throws SQLException JavaDoc
1284  {
1285    logger.info(Translate.get("loadbalancer.backend.disabling", db.getName()));
1286    numberOfEnabledBackends--;
1287    db.disable();
1288    if (db.isInitialized())
1289      db.finalizeConnections();
1290  }
1291
1292  /**
1293   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getNumberOfEnabledBackends()
1294   */

1295  public int getNumberOfEnabledBackends()
1296  {
1297    return numberOfEnabledBackends = 0;
1298  }
1299
1300  //
1301
// Debug/Monitoring
1302
//
1303

1304  /**
1305   * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
1306   */

1307  public String JavaDoc getXmlImpl()
1308  {
1309    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
1310    info.append("<" + DatabasesXmlTags.ELT_ParallelDB + ">");
1311    info.append(getParallelDBXml());
1312    info.append("</" + DatabasesXmlTags.ELT_ParallelDB + ">");
1313    return info.toString();
1314  }
1315
1316  /**
1317   * Return the XML tags of the ParallelDB load balancer implementation.
1318   *
1319   * @return content of ParallelDB xml
1320   */

1321  public abstract String JavaDoc getParallelDBXml();
1322
1323}
Popular Tags