1 24 25 package org.objectweb.cjdbc.controller.loadbalancer; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 30 import org.objectweb.cjdbc.common.i18n.Translate; 31 import org.objectweb.cjdbc.common.log.Trace; 32 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 33 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 34 35 41 public class BackendWorkerThread extends Thread 42 { 43 51 private AbstractLoadBalancer loadBalancer; 52 private DatabaseBackend backend; 53 private ArrayList taskList; 54 private ArrayList tidList; 55 private boolean isKilled = false; 56 57 private AbstractTask currentlyProcessingTask; 59 private Long currentTaskTid; 61 62 private Trace logger = null; 63 64 67 68 75 public BackendWorkerThread(DatabaseBackend backend, 76 AbstractLoadBalancer loadBalancer) throws SQLException 77 { 78 this("BackendWorkerThread for backend '" + backend.getName() 79 + "' with RAIDb level:" + loadBalancer.getRAIDbLevel(), backend, 80 loadBalancer); 81 } 82 83 91 public BackendWorkerThread(String name, DatabaseBackend backend, 92 AbstractLoadBalancer loadBalancer) throws SQLException 93 { 94 super(name); 95 if (backend == null) 97 { 98 String msg = Translate.get("backendworkerthread.null.backend"); 99 logger = Trace 100 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend"); 101 logger.error(msg); 102 throw new SQLException (msg); 103 } 104 105 backend.checkDriverCompliance(); 106 107 logger = Trace 108 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend." 109 + backend.getName()); 110 111 if (loadBalancer == null) 112 { 113 String msg = Translate.get("backendworkerthread.null.loadbalancer"); 114 logger.error(msg); 115 throw new SQLException (msg); 116 } 117 118 this.backend = backend; 119 this.loadBalancer = loadBalancer; 120 taskList = new ArrayList (); 121 tidList = new ArrayList (); 122 } 123 124 127 128 135 public void addTask(AbstractTask task) 136 { 137 if (!isKilled) 138 { 139 taskList.add(task); 140 backend.addPendingWriteRequest(task); 142 } 143 else 144 task.notifyCompletion(); 145 } 146 147 155 public void addTask(AbstractTask task, long transactionId) 156 { 157 if (!isKilled) 158 { 159 tidList.add(new Long (transactionId)); 160 task.setHasTid(true); 161 addTask(task); 162 } 163 else 164 task.notifyCompletion(); 165 } 166 167 178 public void insertTaskAfterLastWriteForTransaction(AbstractTask task, 179 Long transactionId) 180 { 181 if (!isKilled) 182 { 183 task.setHasTid(true); 184 185 int lastTidIndex = tidList.lastIndexOf(transactionId); 187 if (lastTidIndex == -1) 188 { taskList.add(task); 190 tidList.add(transactionId); 191 backend.addPendingWriteRequest(task); 192 return; 193 } 194 195 int lastRequestIndex = 0; 198 while (lastTidIndex >= 0) 199 { 200 AbstractTask t = (AbstractTask) taskList.get(lastRequestIndex); 201 if (t.hasTid()) 202 lastTidIndex--; 203 lastRequestIndex++; 204 } 205 206 taskList.add(lastRequestIndex, task); 208 tidList.add(lastTidIndex + 1, transactionId); 209 backend.addPendingWriteRequest(task); 212 } 213 else 214 task.notifyCompletion(); 215 } 216 217 224 public void addPriorityTask(AbstractTask task) 225 { 226 if (!isKilled) 227 { 228 taskList.add(0, task); 229 backend.addPendingWriteRequest(task); 231 } 232 else 233 task.notifyCompletion(); 234 } 235 236 244 public void addPriorityTask(AbstractTask task, long transactionId) 245 { 246 if (!isKilled) 247 { 248 task.setHasTid(true); 249 addPriorityTask(task); 250 tidList.add(0, new Long (transactionId)); 251 } 252 else 253 task.notifyCompletion(); 254 } 255 256 262 public boolean hasTaskForTransaction(Long tid) 263 { 264 synchronized (this) 265 { 266 if ((currentTaskTid != null) && (currentTaskTid.equals(tid))) 267 return true; 269 else 270 return tidList.contains(tid); 271 } 272 } 273 274 279 public void waitForAllTasksToComplete(long transactionId) 280 { 281 if ((transactionId == 0) || (tidList == null)) 282 return; 283 284 Long tid = new Long (transactionId); 285 synchronized (this) 286 { 287 if (!tidList.contains(tid)) 288 { 289 if ((currentTaskTid != null) 290 && (currentTaskTid.longValue() == transactionId)) 291 { 292 try 293 { 294 if (logger.isDebugEnabled()) 295 logger.debug(Translate.get("backendworkerthread.waiting.task")); 296 wait(); 297 } 298 catch (InterruptedException ignore) 299 { 300 } 301 return; 302 } 303 else 304 return; 305 } 306 307 while (tidList.contains(tid)) 308 { 309 if (logger.isDebugEnabled()) 310 logger.debug(Translate.get("backendworkerthread.waiting.transaction", 311 String.valueOf(tid))); 312 313 try 314 { 315 wait(); 316 } 317 catch (InterruptedException ignore) 318 { 319 } 320 } 321 } 322 } 323 324 327 public void waitForAllTasksToComplete() 328 { 329 synchronized (this) 330 { 331 Object current; 332 if (taskList.size() == 0) 333 { 334 if (currentlyProcessingTask != null) 335 { 336 try 337 { 338 if (logger.isDebugEnabled()) 339 logger.debug(Translate.get("backendworkerthread.waiting.task")); 340 wait(); 341 } 342 catch (InterruptedException ignore) 343 { 344 logger.warn(Translate 345 .get("backendworkerthread.no.full.task.synchronization")); 346 } 347 return; 348 } 349 else 350 { return; 352 } 353 } 354 else 355 current = taskList.get(taskList.size() - 1); 356 357 if (logger.isDebugEnabled()) 358 logger.debug(Translate.get("backendworkerthread.waiting.request", 359 current.toString())); 360 361 while (taskList.contains(current)) 362 { 363 try 364 { 365 wait(); 366 } 367 catch (InterruptedException ignore) 368 { 369 } 370 } 371 } 372 } 373 374 379 public void kill() 380 { 381 kill(true); 382 } 383 384 388 public void killWithoutDisablingBackend() 389 { 390 kill(false); 391 } 392 393 400 private void kill(boolean forceDisable) 401 { 402 synchronized (this) 403 { 404 if (backend.isKilled()) 405 return; 406 407 String msg = this.getName() + " is shutting down"; 408 logger.info(msg); 409 410 while (!taskList.isEmpty()) 412 { 413 AbstractTask task = (AbstractTask) taskList.remove(0); 414 try 415 { 416 task.notifyFailure(this, 1, new SQLException (msg)); 417 } 418 catch (SQLException ignore) 419 { 420 } 421 } 422 isKilled = true; 423 notify(); } 425 if (forceDisable) 426 { 427 try 428 { 429 loadBalancer.disableBackend(backend); 433 } 434 catch (SQLException ignore) 435 { 436 } 437 } 438 } 439 440 444 public void run() 445 { 446 currentlyProcessingTask = null; 447 448 while (!isKilled) 449 { 450 synchronized (this) 451 { 452 while (taskList.isEmpty() && !isKilled) 453 { try 455 { 456 wait(); 457 } 458 catch (InterruptedException e) 459 { 460 logger.warn(Translate.get("backendworkerthread.wait.interrupted")); 461 break; 462 } 463 } 464 try 465 { currentlyProcessingTask = (AbstractTask) taskList.remove(0); 467 if (currentlyProcessingTask.hasTid()) 468 currentTaskTid = (Long ) tidList.remove(0); 469 else 470 currentTaskTid = null; 471 } 472 catch (IndexOutOfBoundsException oob) 473 { 474 logger.warn(Translate.get("backendworkerthread.no.task"), oob); 476 currentlyProcessingTask = null; 477 } 478 } 479 try 481 { 482 if (currentlyProcessingTask == null) 483 { 484 logger.warn("Null task in BackendWorkerThread"); 485 continue; 486 } 487 if (logger.isDebugEnabled()) 488 logger.debug(Translate.get("backendworkerthread.execute.task", 489 currentlyProcessingTask.toString())); 490 currentlyProcessingTask.execute(this); 491 } 492 catch (SQLException e) 493 { 494 logger.warn(Translate.get("backendworkerthread.task.failed", e)); 496 } 497 catch (RuntimeException re) 498 { 499 try 502 { 503 currentlyProcessingTask.notifyFailure(this, 1, new SQLException (re 504 .getMessage())); 505 } 506 catch (SQLException e1) 507 { 508 } 510 logger.fatal(Translate.get( 511 "backendworkerthread.task.runtime.exception", 512 currentlyProcessingTask.toString()), re); 513 } 514 finally 515 { 516 try 517 { 518 backend.removePendingRequest(currentlyProcessingTask); 519 } 520 catch (RuntimeException e) 521 { 522 logger.warn( 523 Translate.get("backendworkerthread.remove.task.error", e), e); 524 } 525 } 526 527 synchronized (this) 532 { 533 notifyAll(); 534 currentlyProcessingTask = null; 535 currentTaskTid = null; 536 } 537 } 539 try 541 { 542 if (backend.isReadEnabled() || backend.isWriteEnabled()) 543 loadBalancer.disableBackend(backend); 544 } 545 catch (SQLException e) 546 { 547 logger.error(Translate.get("backendworkerthread.backend.disable.failed", 548 new String []{backend.getName(), e.getMessage()})); 549 } 550 } 551 552 555 556 561 public DatabaseBackend getBackend() 562 { 563 return backend; 564 } 565 566 571 public Trace getLogger() 572 { 573 return logger; 574 } 575 576 } | Popular Tags |