1 24 25 package org.continuent.sequoia.controller.loadbalancer.tasks; 26 27 import java.sql.Connection ; 28 import java.sql.SQLException ; 29 30 import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException; 31 import org.continuent.sequoia.common.exceptions.UnreachableBackendException; 32 import org.continuent.sequoia.common.i18n.Translate; 33 import org.continuent.sequoia.common.log.Trace; 34 import org.continuent.sequoia.controller.backend.DatabaseBackend; 35 import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult; 36 import org.continuent.sequoia.controller.connection.AbstractConnectionManager; 37 import org.continuent.sequoia.controller.connection.PooledConnection; 38 import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer; 39 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread; 40 import org.continuent.sequoia.controller.requests.AbstractRequest; 41 import org.continuent.sequoia.controller.requests.AbstractWriteRequest; 42 43 51 public class StatementExecuteUpdateTask extends AbstractTask 52 { 53 private AbstractWriteRequest request; 54 private ExecuteUpdateResult result = null; 55 56 static Trace endUserLogger = Trace 57 .getLogger("org.continuent.sequoia.enduser"); 58 59 66 public StatementExecuteUpdateTask(int nbToComplete, int totalNb, 67 AbstractWriteRequest request) 68 { 69 super(nbToComplete, totalNb, request.isPersistentConnection(), request 70 .getPersistentConnectionId()); 71 this.request = request; 72 } 73 74 80 public void executeTask(BackendWorkerThread backendThread) 81 throws SQLException 82 { 83 DatabaseBackend backend = backendThread.getBackend(); 84 85 try 86 { 87 AbstractConnectionManager cm = backend.getConnectionManager(request 88 .getLogin()); 89 if (cm == null) 90 { 91 SQLException se = new SQLException ( 92 "No Connection Manager for Virtual Login:" + request.getLogin()); 93 try 94 { 95 notifyFailure(backendThread, -1, se); 96 } 97 catch (SQLException ignore) 98 { 99 } 100 throw se; 101 } 102 103 Trace logger = backendThread.getLogger(); 104 if (request.isAutoCommit()) 105 executeInAutoCommit(backendThread, backend, cm, logger); 106 else 107 executeInTransaction(backendThread, backend, cm, logger); 108 109 113 if (result != null) 114 { 115 int resultOnFirstBackendToSucceed = notifySuccess(backendThread, result 116 .getUpdateCount()); 117 if (resultOnFirstBackendToSucceed != result.getUpdateCount()) 118 { 119 String msg = "Disabling backend " + backend.getName() 120 + " that reports a different number of updated rows (" 121 + result.getUpdateCount() + ") than first backend to succeed (" 122 + resultOnFirstBackendToSucceed + ") for request " + request; 123 logger.error(msg); 124 backendThread.getLoadBalancer().disableBackend(backend, true); 126 endUserLogger.error(Translate.get( 127 "loadbalancer.backend.disabling", backend.getName())); 128 throw new SQLException (msg); 129 } 130 } 131 } 132 finally 133 { 134 backend.getTaskQueues().completeWriteRequestExecution(this); 135 } 136 } 137 138 private void executeInAutoCommit(BackendWorkerThread backendThread, 139 DatabaseBackend backend, AbstractConnectionManager cm, Trace logger) 140 throws SQLException 141 { 142 if (!backend.canAcceptTasks(request)) 143 { 144 notifyCompletion(backendThread); 149 return; 150 } 151 152 PooledConnection c = null; 154 try 155 { 156 c = cm.retrieveConnectionInAutoCommit(request); 157 } 158 catch (UnreachableBackendException e1) 159 { 160 SQLException se = new SQLException ("Backend " + backend.getName() 161 + " is no more reachable."); 162 try 163 { 164 notifyFailure(backendThread, -1, se); 165 } 166 catch (SQLException ignore) 167 { 168 } 169 172 backendThread.getLoadBalancer().disableBackend(backend, true); 173 String msg = Translate.get( 174 "loadbalancer.backend.disabling.unreachable", backend.getName()); 175 logger.error(msg); 176 endUserLogger.error(msg); 177 throw se; 178 } 179 180 if (c == null) 182 { 183 SQLException se = new SQLException ("No more connections"); 184 try 185 { if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se)) 187 return; 188 } 189 catch (SQLException ignore) 190 { 191 } 192 backendThread.getLoadBalancer().disableBackend(backend, true); 195 String msg = "Request '" 196 + request.getSqlShortForm(backend.getSqlShortFormLength()) 197 + "' failed on backend " + backend.getName() + " but " + getSuccess() 198 + " succeeded (" + se + ")"; 199 logger.error(msg); 200 endUserLogger.error(Translate.get( 201 "loadbalancer.backend.disabling", backend.getName())); 202 throw new SQLException (msg); 203 } 204 205 try 207 { 208 result = AbstractLoadBalancer.executeStatementExecuteUpdateOnBackend( 209 request, backend, backendThread, c.getConnection()); 210 211 backend.updateDatabaseBackendSchema(request); 212 } 213 catch (Exception e) 214 { 215 try 216 { if (!notifyFailure(backendThread, request.getTimeout() * 1000L, e)) 218 { 219 result = null; 220 return; 221 } 222 } 223 catch (SQLException ignore) 224 { 225 } 226 backendThread.getLoadBalancer().disableBackend(backend, true); 229 String msg = "Request '" 230 + request.getSqlShortForm(backend.getSqlShortFormLength()) 231 + "' failed on backend " + backend.getName() + " but " + getSuccess() 232 + " succeeded (" + e + ")"; 233 234 if (logger.isDebugEnabled()) 235 logger.debug(msg, e); 236 else 237 logger.error(msg); 238 endUserLogger.error(Translate.get( 239 "loadbalancer.backend.disabling", backend.getName())); 240 throw new SQLException (msg); 241 } 242 finally 243 { 244 cm.releaseConnectionInAutoCommit(request, c); 245 } 246 } 247 248 private void executeInTransaction(BackendWorkerThread backendThread, 249 DatabaseBackend backend, AbstractConnectionManager cm, Trace logger) 250 throws SQLException 251 { 252 Connection c; 254 long tid = request.getTransactionId(); 255 256 try 257 { 258 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(request, cm); 259 } 260 catch (UnreachableBackendException ube) 261 { 262 SQLException se = new SQLException ("Backend " + backend.getName() 263 + " is no more reachable."); 264 try 265 { 266 notifyFailure(backendThread, -1, se); 267 } 268 catch (SQLException ignore) 269 { 270 } 271 backendThread.getLoadBalancer().disableBackend(backend, true); 274 String msg = Translate.get( 275 "loadbalancer.backend.disabling.unreachable", backend.getName()); 276 logger.error(msg); 277 endUserLogger.error(msg); 278 throw se; 279 } 280 catch (NoTransactionStartWhenDisablingException e) 281 { 282 notifyCompletion(backendThread); 287 return; 288 } 289 catch (SQLException e1) 290 { 291 SQLException se = new SQLException ( 292 "Unable to get connection for transaction " + tid); 293 try 294 { if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se)) 296 return; 297 } 298 catch (SQLException ignore) 299 { 300 } 301 backendThread.getLoadBalancer().disableBackend(backend, true); 304 String msg = "Request '" 305 + request.getSqlShortForm(backend.getSqlShortFormLength()) 306 + "' failed on backend " + backend.getName() + " but " + getSuccess() 307 + " succeeded (" + se + ")"; 308 logger.error(msg); 309 endUserLogger.error(Translate.get( 310 "loadbalancer.backend.disabling", backend.getName())); 311 throw new SQLException (msg); 312 } 313 314 if (c == null) 316 { SQLException se = new SQLException ( 318 "Unable to retrieve connection for transaction " + tid); 319 try 320 { if (!notifyFailure(backendThread, request.getTimeout() * 1000L, se)) 322 return; 323 } 324 catch (SQLException ignore) 325 { 326 } 327 backendThread.getLoadBalancer().disableBackend(backend, true); 330 String msg = "Request '" 331 + request.getSqlShortForm(backend.getSqlShortFormLength()) 332 + "' failed on backend " + backend.getName() + " but " + getSuccess() 333 + " succeeded (" + se + ")"; 334 logger.error(msg); 335 endUserLogger.error(Translate.get( 336 "loadbalancer.backend.disabling", backend.getName())); 337 throw new SQLException (msg); 338 } 339 340 try 342 { 343 result = AbstractLoadBalancer.executeStatementExecuteUpdateOnBackend( 344 request, backend, backendThread, c); 345 346 backend.updateDatabaseBackendSchema(request); 347 } 348 catch (Exception e) 349 { 350 try 351 { if (!notifyFailure(backendThread, request.getTimeout() * 1000L, e)) 353 { 354 result = null; 355 return; 356 } 357 } 358 catch (SQLException ignore) 359 { 360 } 361 backendThread.getLoadBalancer().disableBackend(backend, true); 363 String msg = "Request '" 364 + request.getSqlShortForm(backend.getSqlShortFormLength()) 365 + "' failed on backend " + backend.getName() + " but " + getSuccess() 366 + " succeeded (" + e + ")"; 367 if (logger.isDebugEnabled()) 368 logger.debug(msg, e); 369 else 370 logger.error(msg); 371 endUserLogger.error(Translate.get( 372 "loadbalancer.backend.disabling", backend.getName())); 373 throw new SQLException (msg); 374 } 375 } 376 377 380 public AbstractRequest getRequest() 381 { 382 return request; 383 } 384 385 390 public ExecuteUpdateResult getResult() 391 { 392 return result; 393 } 394 395 398 public long getTransactionId() 399 { 400 return request.getTransactionId(); 401 } 402 403 406 public boolean isAutoCommit() 407 { 408 return request.isAutoCommit(); 409 } 410 411 414 public boolean equals(Object other) 415 { 416 if ((other == null) || !(other instanceof StatementExecuteUpdateTask)) 417 return false; 418 419 StatementExecuteUpdateTask seut = (StatementExecuteUpdateTask) other; 420 return this.request.equals(seut.getRequest()); 421 } 422 423 426 public int hashCode() 427 { 428 return (int) request.getId(); 429 } 430 431 434 public String toString() 435 { 436 if (request.isAutoCommit()) 437 return "Autocommit StatementExecuteUpdateTask " 438 + request.getTransactionId() + " (" + request.getUniqueKey() + ")"; 439 else 440 return "StatementExecuteUpdateTask from transaction " 441 + request.getTransactionId() + " (" + request.getUniqueKey() + ")"; 442 } 443 444 } | Popular Tags |