1 24 25 package org.objectweb.cjdbc.controller.loadbalancer; 26 27 import java.sql.CallableStatement ; 28 import java.sql.Connection ; 29 import java.sql.PreparedStatement ; 30 import java.sql.SQLException ; 31 import java.sql.Statement ; 32 import java.util.ArrayList ; 33 import java.util.LinkedList ; 34 35 import javax.management.NotCompliantMBeanException ; 36 37 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 38 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 39 import org.objectweb.cjdbc.common.i18n.Translate; 40 import org.objectweb.cjdbc.common.jmx.mbeans.AbstractLoadBalancerMBean; 41 import org.objectweb.cjdbc.common.log.Trace; 42 import org.objectweb.cjdbc.common.sql.AbstractRequest; 43 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 44 import org.objectweb.cjdbc.common.sql.SelectRequest; 45 import org.objectweb.cjdbc.common.sql.StoredProcedure; 46 import org.objectweb.cjdbc.common.sql.filters.MacrosHandler; 47 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 48 import org.objectweb.cjdbc.common.xml.XmlComponent; 49 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 50 import org.objectweb.cjdbc.controller.backend.DriverCompliance; 51 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 52 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 53 import org.objectweb.cjdbc.controller.jmx.AbstractStandardMBean; 54 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 55 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 56 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 57 58 72 public abstract class AbstractLoadBalancer extends AbstractStandardMBean 73 implements 74 XmlComponent, 75 AbstractLoadBalancerMBean 76 { 77 78 89 protected VirtualDatabase vdb; 91 protected int raidbLevel; 92 protected int parsingGranularity; 93 94 protected LinkedList totalOrderQueue; 95 96 protected static Trace logger = Trace 97 .getLogger("org.objectweb.cjdbc.controller.loadbalancer"); 98 99 protected MacrosHandler macroHandler; 100 101 110 protected AbstractLoadBalancer(VirtualDatabase vdb, int raidbLevel, 111 int parsingGranularity) throws SQLException , NotCompliantMBeanException 112 { 113 super(AbstractLoadBalancerMBean.class); 114 this.raidbLevel = raidbLevel; 115 this.parsingGranularity = parsingGranularity; 116 this.vdb = vdb; 117 this.totalOrderQueue = vdb.getTotalOrderQueue(); 118 try 119 { 120 vdb.acquireReadLockBackendLists(); 121 } 122 catch (InterruptedException e) 123 { 124 String msg = Translate.get( 125 "loadbalancer.backendlist.acquire.readlock.failed", e); 126 logger.error(msg); 127 throw new SQLException (msg); 128 } 129 int size = vdb.getBackends().size(); 130 ArrayList backends = vdb.getBackends(); 131 for (int i = 0; i < size; i++) 132 { 133 DatabaseBackend backend = (DatabaseBackend) backends.get(i); 134 if (backend.isReadEnabled() || backend.isWriteEnabled()) 135 { 136 if (logger.isWarnEnabled()) 137 logger.warn(Translate.get( 138 "loadbalancer.constructor.backends.not.disabled", backend 139 .getName())); 140 try 141 { 142 disableBackend(backend); 143 } 144 catch (Exception e) 145 { backend.disable(); 147 } 148 } 149 } 150 vdb.releaseReadLockBackendLists(); 151 } 152 153 157 162 public int getRAIDbLevel() 163 { 164 return raidbLevel; 165 } 166 167 172 public void setRAIDbLevel(int raidbLevel) 173 { 174 this.raidbLevel = raidbLevel; 175 } 176 177 182 public int getParsingGranularity() 183 { 184 return parsingGranularity; 185 } 186 187 192 public void setParsingGranularity(int parsingGranularity) 193 { 194 this.parsingGranularity = parsingGranularity; 195 } 196 197 201 209 public void handleMacros(AbstractRequest request) 210 { 211 if (macroHandler == null) 212 return; 213 214 if (!request.needsMacroProcessing()) 217 return; 218 219 if (request.isDriverProcessed() || (request.getSqlSkeleton() == null)) 220 request.setSQL(macroHandler.processMacros(request.getSQL())); 221 else 222 request.setSqlSkeleton(macroHandler.processMacros(request 223 .getSqlSkeleton())); 224 } 225 226 240 public boolean waitForTotalOrder(Object request, boolean errorIfNotFound) 241 { 242 if (totalOrderQueue != null) 243 { 244 synchronized (totalOrderQueue) 245 { 246 int index = totalOrderQueue.indexOf(request); 247 while (index > 0) 248 { 249 if (logger.isDebugEnabled()) 250 logger.debug("Waiting for " + index 251 + " queries to execute (current is " + totalOrderQueue.get(0) 252 + ")"); 253 try 254 { 255 totalOrderQueue.wait(); 256 } 257 catch (InterruptedException ignore) 258 { 259 } 260 index = totalOrderQueue.indexOf(request); 261 } 262 if (index == -1) 263 { 264 if (errorIfNotFound) 265 logger 266 .error("Request was not found in total order queue, posting out of order (" 267 + request + ")"); 268 return false; 269 } 270 else 271 return true; 272 } 273 } 274 return false; 275 } 276 277 281 public void removeHeadFromAndNotifyTotalOrderQueue() 282 { 283 if (totalOrderQueue != null) 284 { 285 synchronized (totalOrderQueue) 286 { 287 totalOrderQueue.removeFirst(); 288 totalOrderQueue.notifyAll(); 289 } 290 } 291 } 292 293 302 public abstract ControllerResultSet execReadRequest(SelectRequest request, 303 MetadataCache metadataCache) throws SQLException ; 304 305 315 public abstract int execWriteRequest(AbstractWriteRequest request) 316 throws AllBackendsFailedException, SQLException ; 317 318 329 public abstract ControllerResultSet execWriteRequestWithKeys( 330 AbstractWriteRequest request, MetadataCache metadataCache) 331 throws AllBackendsFailedException, SQLException ; 332 333 342 public abstract ControllerResultSet execReadOnlyReadStoredProcedure( 343 StoredProcedure proc, MetadataCache metadataCache) throws SQLException ; 344 345 356 public abstract ControllerResultSet execReadStoredProcedure( 357 StoredProcedure proc, MetadataCache metadataCache) 358 throws AllBackendsFailedException, SQLException ; 359 360 369 public abstract int execWriteStoredProcedure(StoredProcedure proc) 370 throws AllBackendsFailedException, SQLException ; 371 372 385 public static final ControllerResultSet executeSelectRequestOnBackend( 386 SelectRequest request, DatabaseBackend backend, Connection c, 387 MetadataCache metadataCache) throws SQLException , BadConnectionException 388 { 389 ControllerResultSet rs = null; 390 try 391 { 392 backend.addPendingReadRequest(request); 393 String sql = request.getSQL(); 394 sql = backend.rewriteQuery(sql); 396 397 Statement s; if (request.isDriverProcessed() || (request.getSqlSkeleton() == null)) 399 s = c.createStatement(); 400 else 401 { 402 s = c.prepareStatement(request.getSqlSkeleton()); 403 org.objectweb.cjdbc.driver.PreparedStatement.setPreparedStatement(sql, 404 (PreparedStatement ) s); 405 } 406 407 DriverCompliance driverCompliance = backend.getDriverCompliance(); 409 if (driverCompliance.supportSetQueryTimeout()) 410 s.setQueryTimeout(request.getTimeout()); 411 if ((request.getCursorName() != null) 412 && (driverCompliance.supportSetCursorName())) 413 s.setCursorName(request.getCursorName()); 414 if ((request.getFetchSize() != 0) 415 && driverCompliance.supportSetFetchSize()) 416 s.setFetchSize(request.getFetchSize()); 417 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 418 s.setMaxRows(request.getMaxRows()); 419 if (request.isDriverProcessed() || (request.getSqlSkeleton() == null)) 420 rs = new ControllerResultSet(request, s.executeQuery(sql), 421 metadataCache, s); 422 else 423 rs = new ControllerResultSet(request, ((PreparedStatement ) s) 424 .executeQuery(), metadataCache, s); 425 } 426 catch (SQLException e) 427 { if (backend.isValidConnection(c)) 429 throw e; else 431 throw new BadConnectionException(e); 432 } 433 finally 434 { 435 backend.removePendingRequest(request); 436 } 437 return rs; 438 } 439 440 452 public static final int executeUpdateRequestOnBackend( 453 AbstractWriteRequest request, DatabaseBackend backend, Connection c) 454 throws SQLException , BadConnectionException 455 { 456 try 457 { 458 backend.addPendingWriteRequest(request); 459 String sql = request.getSQL(); 460 sql = backend.rewriteQuery(sql); 462 463 Statement s; if (request.isDriverProcessed() || (request.getSqlSkeleton() == null)) 465 s = c.createStatement(); 466 else 467 { 468 s = c.prepareStatement(request.getSqlSkeleton()); 469 org.objectweb.cjdbc.driver.PreparedStatement.setPreparedStatement(sql, 470 (PreparedStatement ) s); 471 } 472 473 DriverCompliance driverCompliance = backend.getDriverCompliance(); 475 if (driverCompliance.supportSetQueryTimeout()) 476 s.setQueryTimeout(request.getTimeout()); 477 478 int rows = 0; 479 if (request.isDriverProcessed() || (request.getSqlSkeleton() == null)) 480 rows = s.executeUpdate(sql); 481 else 482 rows = ((PreparedStatement ) s).executeUpdate(); 483 484 s.close(); 485 return rows; 486 } 487 catch (SQLException e) 488 { if (backend.isValidConnection(c)) 490 throw e; else 492 throw new BadConnectionException(e); 493 } 494 finally 495 { 496 backend.removePendingRequest(request); 497 } 498 } 499 500 513 public static final ControllerResultSet executeUpdateRequestOnBackendWithKeys( 514 AbstractWriteRequest request, DatabaseBackend backend, Connection c, 515 MetadataCache metadataCache) throws SQLException , BadConnectionException 516 { 517 try 518 { 519 backend.addPendingWriteRequest(request); 520 String sql = request.getSQL(); 521 sql = backend.rewriteQuery(sql); 523 524 Statement s = c.createStatement(); 525 526 DriverCompliance driverCompliance = backend.getDriverCompliance(); 528 if (driverCompliance.supportSetQueryTimeout()) 529 s.setQueryTimeout(request.getTimeout()); 530 if (!driverCompliance.supportGetGeneratedKeys()) 531 throw new SQLException ("Backend " + backend.getName() 532 + " does not support RETURN_GENERATED_KEYS"); 533 534 s.executeUpdate(sql, Statement.RETURN_GENERATED_KEYS); 535 ControllerResultSet rs = new ControllerResultSet(request, s 536 .getGeneratedKeys(), metadataCache, s); 537 return rs; 538 } 539 catch (SQLException e) 540 { if (backend.isValidConnection(c)) 542 throw e; else 544 throw new BadConnectionException(e); 545 } 546 finally 547 { 548 backend.removePendingRequest(request); 549 } 550 } 551 552 564 public static final ControllerResultSet executeReadStoredProcedureOnBackend( 565 StoredProcedure proc, DatabaseBackend backend, Connection c, 566 MetadataCache metadataCache) throws SQLException , BadConnectionException 567 { 568 try 569 { 570 backend.addPendingReadRequest(proc); 571 572 CallableStatement cs; 575 if (proc.isDriverProcessed()) 576 cs = c.prepareCall(proc.getSQL()); 577 else 578 { 579 cs = c.prepareCall(proc.getSqlSkeleton()); 580 org.objectweb.cjdbc.driver.PreparedStatement.setPreparedStatement(proc 581 .getSQL(), cs); 582 } 583 if (backend.getDriverCompliance().supportSetQueryTimeout()) 584 cs.setQueryTimeout(proc.getTimeout()); 585 if ((proc.getMaxRows() > 0) 586 && backend.getDriverCompliance().supportSetMaxRows()) 587 cs.setMaxRows(proc.getMaxRows()); 588 ControllerResultSet rs = new ControllerResultSet(proc, cs.executeQuery(), 589 metadataCache, cs); 590 return rs; 591 } 592 catch (SQLException e) 593 { if (backend.isValidConnection(c)) 595 throw e; else 597 throw new BadConnectionException(e); 598 } 599 finally 600 { 601 backend.removePendingRequest(proc); 602 } 603 } 604 605 616 public static final int executeWriteStoredProcedureOnBackend( 617 StoredProcedure proc, DatabaseBackend backend, Connection c) 618 throws SQLException , BadConnectionException 619 { 620 try 621 { 622 backend.addPendingWriteRequest(proc); 623 624 CallableStatement cs; 627 if (proc.isDriverProcessed()) 628 cs = c.prepareCall(proc.getSQL()); 629 else 630 { 631 cs = c.prepareCall(proc.getSqlSkeleton()); 632 org.objectweb.cjdbc.driver.PreparedStatement.setPreparedStatement(proc 633 .getSQL(), cs); 634 } 635 if (backend.getDriverCompliance().supportSetQueryTimeout()) 636 cs.setQueryTimeout(proc.getTimeout()); 637 if ((proc.getMaxRows() > 0) 638 && backend.getDriverCompliance().supportSetMaxRows()) 639 cs.setMaxRows(proc.getMaxRows()); 640 int rows = cs.executeUpdate(); 641 cs.close(); 642 return rows; 643 } 644 catch (SQLException e) 645 { if (backend.isValidConnection(c)) 647 throw e; else 649 throw new BadConnectionException(e); 650 } 651 finally 652 { 653 backend.removePendingRequest(proc); 654 } 655 } 656 657 661 667 public abstract void begin(TransactionMarkerMetaData tm) throws SQLException ; 668 669 677 public abstract void commit(TransactionMarkerMetaData tm) 678 throws AllBackendsFailedException, SQLException ; 679 680 688 public abstract void rollback(TransactionMarkerMetaData tm) 689 throws AllBackendsFailedException, SQLException ; 690 691 700 public abstract void rollback(TransactionMarkerMetaData tm, 701 String savepointName) throws AllBackendsFailedException, SQLException ; 702 703 712 public abstract void setSavepoint(TransactionMarkerMetaData tm, String name) 713 throws AllBackendsFailedException, SQLException ; 714 715 724 public abstract void releaseSavepoint(TransactionMarkerMetaData tm, 725 String name) throws AllBackendsFailedException, SQLException ; 726 727 745 public static final Connection getConnectionAndBeginTransaction( 746 DatabaseBackend backend, AbstractConnectionManager cm, long tid, 747 int transactionIsolationLevel) throws SQLException , 748 UnreachableBackendException 749 { 750 Connection c = null; 751 boolean isConnectionValid = false; 752 do 753 { 754 c = cm.getConnection(tid); 755 756 if (c == null) 758 throw new UnreachableBackendException(Translate.get( 759 "loadbalancer.unable.get.connection", new String []{ 760 String.valueOf(tid), backend.getName()})); 761 try 762 { 763 if (transactionIsolationLevel != org.objectweb.cjdbc.driver.Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL) 764 c.setTransactionIsolation(transactionIsolationLevel); 765 c.setAutoCommit(false); 766 isConnectionValid = true; 767 } 768 catch (SQLException e) 769 { 770 if (backend.isValidConnection(c)) 771 throw e; else 773 { 774 cm.deleteConnection(tid); 775 } 776 } 777 } 778 while (!isConnectionValid); 779 return c; 780 } 781 782 786 795 public abstract void enableBackend(DatabaseBackend db, boolean writeEnabled) 796 throws SQLException ; 797 798 807 public abstract void disableBackend(DatabaseBackend db) throws SQLException ; 808 809 815 public abstract int getNumberOfEnabledBackends(); 816 817 824 public void setWeight(String name, int w) throws SQLException 825 { 826 throw new SQLException ("Weight is not supported by this load balancer"); 827 } 828 829 833 838 public abstract String getInformation(); 839 840 845 public abstract String getXmlImpl(); 846 847 854 public void setMacroHandler(MacrosHandler handler) 855 { 856 this.macroHandler = handler; 857 } 858 859 862 public String getXml() 863 { 864 StringBuffer info = new StringBuffer (); 865 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 866 info.append(getXmlImpl()); 867 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 868 return info.toString(); 869 } 870 871 874 public String getAssociatedString() 875 { 876 return "loadbalancer"; 877 } 878 } 879 | Popular Tags |