1 23 24 package org.continuent.sequoia.controller.loadbalancer.tasks; 25 26 import java.sql.ResultSet ; 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 import java.util.HashMap ; 30 import java.util.List ; 31 import java.util.Map ; 32 33 import org.continuent.sequoia.common.exceptions.SQLExceptionFactory; 34 import org.continuent.sequoia.controller.backend.DatabaseBackend; 35 import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread; 36 import org.continuent.sequoia.controller.requests.AbstractRequest; 37 38 46 public abstract class AbstractTask 47 { 48 56 57 private int totalNb; 58 59 60 private int nbToComplete; 61 62 63 private int executionStarted; 64 65 private int success = 0; 66 67 private int failed = 0; 68 69 private List notifications = null; 70 71 private List exceptions = null; 72 73 private Map locksMap = new HashMap (); 74 75 private ResultSet generatedKeysResultSet; 77 78 private boolean timeoutExpired = false; 80 81 82 private int resultOnFirstBackendToSucceed; 83 84 85 private boolean persistentConnection; 86 87 88 private long persistentConnectionId; 89 90 93 94 105 public AbstractTask(int nbToComplete, int totalNb, 106 boolean isPersistentConnection, long persistentConnectionId) 107 { 108 this.nbToComplete = nbToComplete; 109 this.totalNb = totalNb; 110 success = 0; 111 failed = 0; 112 executionStarted = 0; 113 notifications = new ArrayList (nbToComplete); 114 this.persistentConnection = isPersistentConnection; 115 this.persistentConnectionId = persistentConnectionId; 116 } 117 118 121 122 128 public void execute(BackendWorkerThread backendThread) throws SQLException 129 { 130 synchronized (this) 131 { 132 if (timeoutExpired && (executionStarted == 0)) 137 return; 138 this.executionStarted++; 139 } 140 executeTask(backendThread); 141 } 144 145 151 public abstract void executeTask(BackendWorkerThread backendThread) 152 throws SQLException ; 153 154 167 public synchronized void notifyCompletion(BackendWorkerThread backendThread) 168 { 169 if ((backendThread != null) && !addNotification(backendThread)) 170 return; 171 172 totalNb--; 173 if (success + failed >= totalNb) 175 { 176 notifyAll(); } 178 } 179 180 196 public synchronized boolean notifyFailure(BackendWorkerThread backendThread, 197 long timeout, Throwable e) throws SQLException 198 { 199 if (!addNotification(backendThread)) 200 { 201 if (backendThread != null) 202 backendThread.getLogger().info( 203 "Backend " + backendThread.getBackend() + " already notified task " 204 + toString()); 205 return success > 0; 206 } 207 208 failed++; 209 210 if (exceptions == null) 212 exceptions = new ArrayList (); 213 String backendName; 214 if (backendThread == null) 215 { backendName = "Query not processed"; 217 } 218 else 219 backendName = backendThread.getName(); 220 221 if (e instanceof SQLException ) 222 { 223 SQLException sqlEx = (SQLException ) e; 224 exceptions.add(SQLExceptionFactory.getSQLException(sqlEx, "Backend " 225 + backendName + " failed (" + sqlEx.getLocalizedMessage() + ")")); 226 } 227 else 228 exceptions.add(new SQLException ("Backend " + backendName + " failed (" 229 + e.getLocalizedMessage() + ")").initCause(e)); 230 231 if (success + failed >= totalNb) 233 { 234 notifyAll(); } 236 else 237 { 238 if ((timeout > -1) && (success == 0)) 239 { 240 try 241 { wait(timeout); 243 } 244 catch (InterruptedException ie) 245 { 246 throw (SQLException ) new SQLException ( 247 "Wait interrupted() in failed task of backend " + backendName 248 + " (" + e.getLocalizedMessage() + ")").initCause(e); 249 } 250 } 251 } 252 return success > 0; 253 } 254 255 260 public synchronized void notifySuccess(BackendWorkerThread backendThread) 261 262 { 263 if (!addNotification(backendThread)) 264 return; 265 266 doNotifySuccess(); 267 } 268 269 272 private void doNotifySuccess() 273 { 274 success++; 275 276 if ((success == nbToComplete) || (success + failed >= totalNb)) 278 { 279 if (failed > 0) 280 notifyAll(); else 282 notify(); 283 } 284 } 285 286 296 public synchronized int notifySuccess(BackendWorkerThread backendThread, 297 int result) 298 { 299 if (success == 0) 300 { 301 306 resultOnFirstBackendToSucceed = result; 307 } 308 309 318 if (!addNotification(backendThread)) 319 return resultOnFirstBackendToSucceed; 320 321 doNotifySuccess(); 322 323 return resultOnFirstBackendToSucceed; 324 } 325 326 332 private boolean addNotification(BackendWorkerThread backendThread) 333 { 334 if (notifications.contains(backendThread)) 335 return false; 336 notifications.add(backendThread); 337 return true; 338 } 339 340 344 351 public abstract boolean isAutoCommit(); 352 353 358 public List getExceptions() 359 { 360 return exceptions; 361 } 362 363 368 public synchronized int getExecutionStarted() 369 { 370 return executionStarted; 371 } 372 373 382 public synchronized boolean setExpiredTimeout() 383 { 384 this.timeoutExpired = true; 385 return executionStarted == 0; 386 } 387 388 393 public int getFailed() 394 { 395 return failed; 396 } 397 398 403 public ResultSet getGeneratedKeysResultSet() 404 { 405 return generatedKeysResultSet; 406 } 407 408 413 public void setGeneratedKeysResultSet(ResultSet generatedKeysResultSet) 414 { 415 this.generatedKeysResultSet = generatedKeysResultSet; 416 } 417 418 424 public List getLocks(DatabaseBackend backend) 425 { 426 synchronized (locksMap) 427 { 428 return (List ) locksMap.get(backend); 429 } 430 } 431 432 439 public synchronized void setLocks(DatabaseBackend backend, List locks) 440 { 441 synchronized (locksMap) 442 { 443 447 if (locksMap.get(backend) == null) 448 locksMap.put(backend, locks); 449 else 450 { 451 backend.getLogger().fatal( 452 "Double locks entry: " + locks + " and " + locksMap.get(backend), 453 new Exception ()); 454 } 455 } 456 } 457 458 463 public int getNbToComplete() 464 { 465 return nbToComplete; 466 } 467 468 473 public final boolean isPersistentConnection() 474 { 475 return persistentConnection; 476 } 477 478 483 public final void setPersistentConnection(boolean persistentConnection) 484 { 485 this.persistentConnection = persistentConnection; 486 } 487 488 493 public final long getPersistentConnectionId() 494 { 495 return persistentConnectionId; 496 } 497 498 503 public final void setPersistentConnectionId(long persistentConnectionId) 504 { 505 this.persistentConnectionId = persistentConnectionId; 506 } 507 508 514 public abstract AbstractRequest getRequest(); 515 516 521 public int getSuccess() 522 { 523 return success; 524 } 525 526 532 public int getTotalNb() 533 { 534 return totalNb; 535 } 536 537 543 public void setTotalNb(int totalNb) 544 { 545 this.totalNb = totalNb; 546 } 547 548 554 public abstract long getTransactionId(); 555 556 564 public synchronized boolean hasCompleted() 565 { 566 return ((success >= nbToComplete) || (success + failed == totalNb)); 567 } 568 569 575 public synchronized boolean hasFullyCompleted() 576 { 577 return success + failed == totalNb; 578 } 579 580 } 581 | Popular Tags |