1 23 24 package org.continuent.sequoia.controller.loadbalancer.tasks; 25 26 import java.sql.Connection ; 27 import java.sql.SQLException ; 28 29 import org.continuent.sequoia.common.exceptions.UnreachableBackendException; 30 import org.continuent.sequoia.common.i18n.Translate; 31 import org.continuent.sequoia.common.log.Trace; 32 import org.continuent.sequoia.controller.backend.DatabaseBackend; 33 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult; 34 import org.continuent.sequoia.controller.connection.AbstractConnectionManager; 35 import org.continuent.sequoia.controller.connection.PooledConnection; 36 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer; 37 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread; 38 import org.continuent.sequoia.controller.requests.AbstractRequest; 39 import org.continuent.sequoia.controller.requests.StoredProcedure; 40 41 47 public class CallableStatementExecuteUpdateTask extends AbstractTask 48 { 49 private StoredProcedure proc; 50 private ExecuteUpdateResult result; 51 52 static Trace endUserLogger = Trace 53 .getLogger("org.continuent.sequoia.enduser"); 54 55 62 public CallableStatementExecuteUpdateTask(int nbToComplete, int totalNb, 63 StoredProcedure proc) 64 { 65 super(nbToComplete, totalNb, proc.isPersistentConnection(), proc 66 .getPersistentConnectionId()); 67 this.proc = proc; 68 } 69 70 76 public void executeTask(BackendWorkerThread backendThread) 77 throws SQLException 78 { 79 DatabaseBackend backend = backendThread.getBackend(); 80 81 try 82 { 83 AbstractConnectionManager cm = backend.getConnectionManager(proc 84 .getLogin()); 85 if (cm == null) 86 { 87 SQLException se = new SQLException ( 88 "No Connection Manager for Virtual Login:" + proc.getLogin()); 89 try 90 { 91 notifyFailure(backendThread, -1, se); 92 } 93 catch (SQLException ignore) 94 { 95 96 } 97 throw se; 98 } 99 100 Trace logger = backendThread.getLogger(); 101 if (proc.isAutoCommit()) 102 executeInAutoCommit(backendThread, backend, cm, logger); 103 else 104 executeInTransaction(backendThread, backend, cm, logger); 105 106 if (result == null) 107 return; 109 int resultOnFirstBackendToSucceed = notifySuccess(backendThread, result 110 .getUpdateCount()); 111 if (resultOnFirstBackendToSucceed != result.getUpdateCount()) 112 { 113 String msg = "Disabling backend " + backend.getName() 114 + " that reports a different number of updated rows (" 115 + result.getUpdateCount() + ") than first backend to succeed (" 116 + resultOnFirstBackendToSucceed + ") for stored procedure " + proc; 117 logger.error(msg); 118 backendThread.getLoadBalancer().disableBackend(backend, true); 120 endUserLogger.error(Translate.get("loadbalancer.backend.disabling", 121 backend.getName())); 122 throw new SQLException (msg); 123 } 124 } 125 finally 126 { 127 backend.getTaskQueues().completeStoredProcedureExecution(this); 128 } 129 } 130 131 private void executeInAutoCommit(BackendWorkerThread backendThread, 132 DatabaseBackend backend, AbstractConnectionManager cm, Trace logger) 133 throws SQLException 134 { 135 if (!backend.canAcceptTasks(proc)) 136 { 137 notifyCompletion(backendThread); 141 return; 142 } 143 144 PooledConnection c = null; 146 try 147 { 148 c = cm.retrieveConnectionInAutoCommit(proc); 149 } 150 catch (UnreachableBackendException e1) 151 { 152 SQLException se = new SQLException ("Backend " + backend.getName() 153 + " is no more reachable."); 154 try 155 { 156 notifyFailure(backendThread, -1, se); 157 } 158 catch (SQLException ignore) 159 { 160 } 161 backendThread.getLoadBalancer().disableBackend(backend, true); 164 String msg = Translate.get("loadbalancer.backend.disabling.unreachable", 165 backend.getName()); 166 logger.error(msg); 167 endUserLogger.error(msg); 168 throw se; 169 } 170 171 if (c == null) 173 { 174 SQLException se = new SQLException ("No more connections"); 175 try 176 { if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se)) 178 return; 179 } 180 catch (SQLException ignore) 181 { 182 } 183 backendThread.getLoadBalancer().disableBackend(backend, true); 186 String msg = "Stored procedure '" 187 + proc.getSqlShortForm(backend.getSqlShortFormLength()) 188 + "' failed on backend " + backend.getName() + " but " + getSuccess() 189 + " succeeded (" + se + ")"; 190 logger.error(msg); 191 endUserLogger.error(Translate.get("loadbalancer.backend.disabling", 192 backend.getName())); 193 throw new SQLException (msg); 194 } 195 196 try 198 { 199 result = AbstractLoadBalancer 200 .executeCallableStatementExecuteUpdateOnBackend(proc, backend, 201 backendThread, c.getConnection()); 202 203 backend.setSchemaIsDirtyIfNeeded(proc); 204 } 205 catch (Exception e) 206 { 207 try 208 { if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e)) 210 { 211 result = null; 212 return; 213 } 214 } 215 catch (SQLException ignore) 216 { 217 } 218 backendThread.getLoadBalancer().disableBackend(backend, true); 221 String msg = "Stored procedure '" 222 + proc.getSqlShortForm(backend.getSqlShortFormLength()) 223 + "' failed on backend " + backend.getName() + " but " + getSuccess() 224 + " succeeded (" + e + ")"; 225 logger.error(msg); 226 endUserLogger.error(Translate.get("loadbalancer.backend.disabling", 227 backend.getName())); 228 throw new SQLException (msg); 229 } 230 finally 231 { 232 cm.releaseConnectionInAutoCommit(proc, c); 233 } 234 } 235 236 private void executeInTransaction(BackendWorkerThread backendThread, 237 DatabaseBackend backend, AbstractConnectionManager cm, Trace logger) 238 throws SQLException 239 { 240 Connection c; 242 long tid = proc.getTransactionId(); 243 244 try 245 { 246 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(proc, cm); 247 } 248 catch (UnreachableBackendException ube) 249 { 250 SQLException se = new SQLException ("Backend " + backend.getName() 251 + " is no more reachable."); 252 try 253 { 254 notifyFailure(backendThread, -1, se); 255 } 256 catch (SQLException ignore) 257 { 258 } 259 backendThread.getLoadBalancer().disableBackend(backend, true); 262 String msg = Translate.get("loadbalancer.backend.disabling.unreachable", 263 backend.getName()); 264 logger.error(msg); 265 endUserLogger.error(msg); 266 throw se; 267 } 268 catch (SQLException e1) 269 { 270 SQLException se = new SQLException ( 271 "Unable to get connection for transaction " + tid); 272 try 273 { if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se)) 275 return; 276 } 277 catch (SQLException ignore) 278 { 279 } 280 backendThread.getLoadBalancer().disableBackend(backend, true); 283 String msg = "Request '" 284 + proc.getSqlShortForm(backend.getSqlShortFormLength()) 285 + "' failed on backend " + backend.getName() + " but " + getSuccess() 286 + " succeeded (" + se + ")"; 287 logger.error(msg); 288 endUserLogger.error(Translate.get("loadbalancer.backend.disabling", 289 backend.getName())); 290 throw new SQLException (msg); 291 } 292 293 if (c == null) 295 { SQLException se = new SQLException ( 297 "Unable to retrieve connection for transaction " + tid); 298 try 299 { if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, se)) 301 return; 302 } 303 catch (SQLException ignore) 304 { 305 } 306 backendThread.getLoadBalancer().disableBackend(backend, true); 309 String msg = "Request '" 310 + proc.getSqlShortForm(backend.getSqlShortFormLength()) 311 + "' failed on backend " + backend.getName() + " but " + getSuccess() 312 + " succeeded (" + se + ")"; 313 logger.error(msg); 314 endUserLogger.error(Translate.get("loadbalancer.backend.disabling", 315 backend.getName())); 316 throw new SQLException (msg); 317 } 318 319 try 321 { 322 result = AbstractLoadBalancer 323 .executeCallableStatementExecuteUpdateOnBackend(proc, backend, 324 backendThread, c); 325 326 backend.setSchemaIsDirtyIfNeeded(proc); 327 } 328 catch (Exception e) 329 { 330 try 331 { if (!notifyFailure(backendThread, proc.getTimeout() * 1000L, e)) 333 { 334 result = null; 335 return; 336 } 337 } 338 catch (SQLException ignore) 339 { 340 } 341 backendThread.getLoadBalancer().disableBackend(backend, true); 344 String msg = "Stored procedure '" 345 + proc.getSqlShortForm(backend.getSqlShortFormLength()) 346 + "' failed on backend " + backend.getName() + " but " + getSuccess() 347 + " succeeded (" + e + ")"; 348 logger.error(msg); 349 endUserLogger.error(Translate.get("loadbalancer.backend.disabling", 350 backend.getName())); 351 throw new SQLException (msg); 352 } 353 } 354 355 358 public AbstractRequest getRequest() 359 { 360 return proc; 361 } 362 363 368 public ExecuteUpdateResult getResult() 369 { 370 return result; 371 } 372 373 376 public long getTransactionId() 377 { 378 return proc.getTransactionId(); 379 } 380 381 384 public boolean isAutoCommit() 385 { 386 return proc.isAutoCommit(); 387 } 388 389 392 public boolean equals(Object other) 393 { 394 if ((other == null) 395 || !(other instanceof CallableStatementExecuteUpdateTask)) 396 return false; 397 398 CallableStatementExecuteUpdateTask cseut = (CallableStatementExecuteUpdateTask) other; 399 if (proc == null) 400 return cseut.getRequest() == null; 401 return proc.equals(cseut.getRequest()); 402 } 403 404 407 public int hashCode() 408 { 409 return (int) proc.getId(); 410 } 411 412 415 public String toString() 416 { 417 if (proc.isAutoCommit()) 418 return "Autocommit CallableStatementExecuteUpdateTask " 419 + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")"; 420 else 421 return "CallableStatementExecuteUpdateTask for transaction " 422 + proc.getTransactionId() + " (" + proc.getUniqueKey() + ")"; 423 } 424 425 } | Popular Tags |